Add servlet to import warc file from filesystem IndexImportWarc_p.html.

Apply Importer interface to WarcImporter
pull/60/head^2
reger 8 years ago
parent 1d81b8f102
commit ba339a2a45

@ -0,0 +1,51 @@
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
<head>
<title>YaCy '#[clientname]#': Warc Import</title>
#%env/templates/metas.template%#
#(import)#::<meta http-equiv="REFRESH" content="10;url=IndexImportWarc_p.html" />
<!-- the url= removes http get parameters on refresh, preventing restart of import -->
#(/import)#
</head>
<body id="IndexImportWarc">
#%env/templates/header.template%#
#%env/templates/submenuIndexImport.template%#
<h2>Web Archive File Import</h2>
#(import)#
<p>No import thread is running, you can start a new thread here</p>
<form action="IndexImportWarc_p.html" method="get" accept-charset="UTF-8">
<!-- no post method here, we don't want to transmit the whole file, only the path-->
<fieldset>
<legend>Warc File Selection: select an warc file (which may be gz compressed)</legend>
<p>
You can download warc archives for example here
<a href="https://archive.org/search.php?query=subject%3A%22warcarchives%22&and[]=subject%3A%22warcarchives%22" target="_blank">Internet Archive</a>.
</p>
<div class="input-group">
<span style="display: inline-block">
<input name="file" type="file" value="" size="75" /></span>
<div class="btn-group">
<input name="submit" class="btn btn-primary" type="submit" value="Import Warc File" />
</div>
</div>
</fieldset>
</form>
<br />
::
<form><fieldset><legend>Import Process</legend>
<dl>
<dt>Thread:</dt><dd>#[thread]#</dd>
<dt>Warc File:</dt><dd>#[warcfile]#</dd>
<dt>Processed:</dt><dd>#[count]# Entries</dd>
<dt>Speed:</dt><dd>#[speed]# pages per second</dd>
<dt>Running Time:</dt><dd>#[runningHours]# hours, #[runningMinutes]# minutes</dd>
<dt>Remaining Time:</dt><dd>#[remainingHours]# hours, #[remainingMinutes]# minutes</dd>
</dl>
</fieldset></form>
#(/import)#
#%env/templates/footer.template%#
</body>
</html>

@ -0,0 +1,76 @@
// IndexImportWarc_p.java
// -------------------------
// (c) 2017 by reger24; https://github.com/reger24
//
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 2 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
import java.io.File;
import java.io.FileNotFoundException;
import net.yacy.cora.protocol.RequestHeader;
import net.yacy.document.importer.WarcImporter;
import net.yacy.search.Switchboard;
import net.yacy.server.serverObjects;
import net.yacy.server.serverSwitch;
public class IndexImportWarc_p {
public static serverObjects respond(@SuppressWarnings("unused") final RequestHeader header, final serverObjects post, final serverSwitch env) {
final serverObjects prop = new serverObjects();
final Switchboard sb = (Switchboard) env;
if (WarcImporter.job != null && WarcImporter.job.isAlive()) {
// one import is running, no option to insert anything
prop.put("import", 1);
prop.put("import_thread", "running");
prop.put("import_warcfile", WarcImporter.job.source());
prop.put("import_count", WarcImporter.job.count());
prop.put("import_speed", WarcImporter.job.speed());
prop.put("import_runningHours", (WarcImporter.job.runningTime() / 60) / 60);
prop.put("import_runningMinutes", (WarcImporter.job.runningTime() / 60) % 60);
prop.put("import_remainingHours", (WarcImporter.job.remainingTime() / 60) / 60);
prop.put("import_remainingMinutes", (WarcImporter.job.remainingTime() / 60) % 60);
} else {
prop.put("import", 0);
if (post != null) {
if (post.containsKey("file")) {
String file = post.get("file");
final File sourcefile = new File(file);
if (sourcefile.exists()) {
try {
WarcImporter wi = new WarcImporter(sourcefile);
wi.start();
prop.put("import_thread", "started");
} catch (FileNotFoundException ex) {
prop.put("import_thread", "Error: file not found [" + file + "]");
}
prop.put("import_warcfile", file);
} else {
prop.put("import_warcfile", "");
prop.put("import_thread", "Error: file not found [" + file + "]");
}
prop.put("import", 1);
prop.put("import_count", 0);
prop.put("import_speed", 0);
prop.put("import_runningHours", 0);
prop.put("import_runningMinutes", 0);
prop.put("import_remainingHours", 0);
prop.put("import_remainingMinutes", 0);
}
}
}
return prop;
}
}

@ -13,6 +13,7 @@
<ul class="SubMenu">
<li><a href="Load_RSS_p.html" class="MenuItemLink #(authorized)#lock::unlock#(/authorized)#">RSS Feed Importer</a></li>
<li><a href="IndexImportOAIPMH_p.html" class="MenuItemLink #(authorized)#lock::unlock#(/authorized)#">OAI-PMH Importer</a></li>
<li><a href="IndexImportWarc_p.html" class="MenuItemLink #(authorized)#lock::unlock#(/authorized)#">Warc Importer</a></li>
</ul>
</div>

@ -22,6 +22,9 @@
*/
package net.yacy.document.importer;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import net.yacy.cora.document.id.DigestURL;
@ -52,7 +55,29 @@ import org.jwat.warc.WarcRecord;
* http://archive-access.sourceforge.net/warc/warc_file_format-0.9.html
* http://archive-access.sourceforge.net/warc/
*/
public class WarcImporter {
public class WarcImporter extends Thread implements Importer {
static public Importer job; // static object to assure only one importer is running (if started from a servlet, this object is used to store the thread)
private final InputStream source; // current input warc archive
private String name; // file name of input source
private int recordCnt; // number of responses indexed (for statistic)
private long startTime; // (for statistic)
private final long sourceSize; // length of the input source (for statistic)
private long consumed; // bytes consumed from input source (for statistic)
public WarcImporter(InputStream f) {
source = f;
recordCnt = 0;
sourceSize = -1;
}
public WarcImporter(File f) throws FileNotFoundException{
name = f.getName();
sourceSize = f.length();
source = new FileInputStream(f);
}
/**
* Reads a Warc file and adds all contained responses to the index.
@ -64,7 +89,8 @@ public class WarcImporter {
public void indexWarcRecords(InputStream f) throws IOException {
byte[] content;
int cnt = 0;
job = this;
startTime = System.currentTimeMillis();
WarcReader localwarcReader = WarcReaderFactory.getReader(f);
WarcRecord wrec = localwarcReader.getNextRecord();
@ -126,13 +152,82 @@ public class WarcImporter {
);
Switchboard.getSwitchboard().toIndexer(response);
cnt++;
recordCnt++;
}
}
}
this.consumed = localwarcReader.getConsumed();
wrec = localwarcReader.getNextRecord();
}
localwarcReader.close();
ConcurrentLog.info("WarcImporter", "Indexed " + cnt + " documents");
ConcurrentLog.info("WarcImporter", "Indexed " + recordCnt + " documents");
job = null;
}
@Override
public void run() {
try {
this.indexWarcRecords(this.source);
} catch (IOException ex) {
ConcurrentLog.info("WarcImporter", ex.getMessage());
}
}
/**
* Filename of the input source
* @return
*/
@Override
public String source() {
return this.name;
}
/**
* Number of responses (pages) indexed
* @return
*/
@Override
public int count() {
return this.recordCnt;
}
/**
* Indexed responses per second
* @return
*/
@Override
public int speed() {
if (this.recordCnt == 0) return 0;
return (int) (this.recordCnt / Math.max(0L, runningTime() ));
}
/**
* Duration in seconds running, working on the current import source
* @return duration in seconds
*/
@Override
public long runningTime() {
return (System.currentTimeMillis() - this.startTime) / 1000L;
}
/**
* Estimate on time remaining calculated from length of input source and
* processed bytes.
* @return duration in seconds
*/
@Override
public long remainingTime() {
if (this.consumed == 0) {
return 0;
} else {
long speed = this.consumed / runningTime();
return (this.sourceSize - this.consumed) / speed;
}
}
@Override
public String status() {
return "";
}
}

@ -2005,9 +2005,13 @@ public final class Switchboard extends serverSwitch {
return moved;
} else if (s.endsWith(".warc") || s.endsWith(".warc.gz")) {
try {
InputStream is = new BufferedInputStream(new FileInputStream(infile));
WarcImporter wri = new WarcImporter();
wri.indexWarcRecords(is);
WarcImporter wri = new WarcImporter(infile);
wri.start();
try {
wri.join();
} catch (InterruptedException ex) {
return moved;
}
moved = infile.renameTo(outfile);
} catch (IOException ex) {
log.warn("IO Error processing warc file " + infile);

Loading…
Cancel
Save