From ba339a2a454fb31d6527bd53f3652d75bc7674b4 Mon Sep 17 00:00:00 2001 From: reger Date: Sun, 2 Apr 2017 03:32:21 +0200 Subject: [PATCH] Add servlet to import warc file from filesystem IndexImportWarc_p.html. Apply Importer interface to WarcImporter --- htroot/IndexImportWarc_p.html | 51 +++++++++ htroot/IndexImportWarc_p.java | 76 +++++++++++++ .../env/templates/submenuIndexImport.template | 1 + .../yacy/document/importer/WarcImporter.java | 103 +++++++++++++++++- source/net/yacy/search/Switchboard.java | 10 +- 5 files changed, 234 insertions(+), 7 deletions(-) create mode 100644 htroot/IndexImportWarc_p.html create mode 100644 htroot/IndexImportWarc_p.java diff --git a/htroot/IndexImportWarc_p.html b/htroot/IndexImportWarc_p.html new file mode 100644 index 000000000..d6003bc9e --- /dev/null +++ b/htroot/IndexImportWarc_p.html @@ -0,0 +1,51 @@ + + + + YaCy '#[clientname]#': Warc Import + #%env/templates/metas.template%# + #(import)#:: + + #(/import)# + + + #%env/templates/header.template%# + #%env/templates/submenuIndexImport.template%# +

Web Archive File Import

+ + #(import)# +

No import thread is running, you can start a new thread here

+
+ +
+ Warc File Selection: select an warc file (which may be gz compressed) +

+ You can download warc archives for example here + Internet Archive. +

+
+ + +
+ +
+
+
+
+ +
+ :: +
Import Process +
+
Thread:
#[thread]#
+
Warc File:
#[warcfile]#
+
Processed:
#[count]# Entries
+
Speed:
#[speed]# pages per second
+
Running Time:
#[runningHours]# hours, #[runningMinutes]# minutes
+
Remaining Time:
#[remainingHours]# hours, #[remainingMinutes]# minutes
+
+
+ #(/import)# + + #%env/templates/footer.template%# + + \ No newline at end of file diff --git a/htroot/IndexImportWarc_p.java b/htroot/IndexImportWarc_p.java new file mode 100644 index 000000000..f503fe98b --- /dev/null +++ b/htroot/IndexImportWarc_p.java @@ -0,0 +1,76 @@ +// IndexImportWarc_p.java +// ------------------------- +// (c) 2017 by reger24; https://github.com/reger24 +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation; either version 2 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +import java.io.File; +import java.io.FileNotFoundException; + +import net.yacy.cora.protocol.RequestHeader; +import net.yacy.document.importer.WarcImporter; +import net.yacy.search.Switchboard; +import net.yacy.server.serverObjects; +import net.yacy.server.serverSwitch; + +public class IndexImportWarc_p { + + public static serverObjects respond(@SuppressWarnings("unused") final RequestHeader header, final serverObjects post, final serverSwitch env) { + final serverObjects prop = new serverObjects(); + final Switchboard sb = (Switchboard) env; + + if (WarcImporter.job != null && WarcImporter.job.isAlive()) { + // one import is running, no option to insert anything + prop.put("import", 1); + prop.put("import_thread", "running"); + prop.put("import_warcfile", WarcImporter.job.source()); + prop.put("import_count", WarcImporter.job.count()); + prop.put("import_speed", WarcImporter.job.speed()); + prop.put("import_runningHours", (WarcImporter.job.runningTime() / 60) / 60); + prop.put("import_runningMinutes", (WarcImporter.job.runningTime() / 60) % 60); + prop.put("import_remainingHours", (WarcImporter.job.remainingTime() / 60) / 60); + prop.put("import_remainingMinutes", (WarcImporter.job.remainingTime() / 60) % 60); + } else { + prop.put("import", 0); + if (post != null) { + if (post.containsKey("file")) { + String file = post.get("file"); + final File sourcefile = new File(file); + if (sourcefile.exists()) { + try { + WarcImporter wi = new WarcImporter(sourcefile); + wi.start(); + prop.put("import_thread", "started"); + } catch (FileNotFoundException ex) { + prop.put("import_thread", "Error: file not found [" + file + "]"); + } + prop.put("import_warcfile", file); + } else { + prop.put("import_warcfile", ""); + prop.put("import_thread", "Error: file not found [" + file + "]"); + } + prop.put("import", 1); + prop.put("import_count", 0); + prop.put("import_speed", 0); + prop.put("import_runningHours", 0); + prop.put("import_runningMinutes", 0); + prop.put("import_remainingHours", 0); + prop.put("import_remainingMinutes", 0); + } + } + } + return prop; + } +} diff --git a/htroot/env/templates/submenuIndexImport.template b/htroot/env/templates/submenuIndexImport.template index b44d9d95a..d85b313c2 100644 --- a/htroot/env/templates/submenuIndexImport.template +++ b/htroot/env/templates/submenuIndexImport.template @@ -13,6 +13,7 @@ diff --git a/source/net/yacy/document/importer/WarcImporter.java b/source/net/yacy/document/importer/WarcImporter.java index e3463a02c..e921765ce 100644 --- a/source/net/yacy/document/importer/WarcImporter.java +++ b/source/net/yacy/document/importer/WarcImporter.java @@ -22,6 +22,9 @@ */ package net.yacy.document.importer; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import net.yacy.cora.document.id.DigestURL; @@ -52,7 +55,29 @@ import org.jwat.warc.WarcRecord; * http://archive-access.sourceforge.net/warc/warc_file_format-0.9.html * http://archive-access.sourceforge.net/warc/ */ -public class WarcImporter { +public class WarcImporter extends Thread implements Importer { + + static public Importer job; // static object to assure only one importer is running (if started from a servlet, this object is used to store the thread) + + private final InputStream source; // current input warc archive + private String name; // file name of input source + + private int recordCnt; // number of responses indexed (for statistic) + private long startTime; // (for statistic) + private final long sourceSize; // length of the input source (for statistic) + private long consumed; // bytes consumed from input source (for statistic) + + public WarcImporter(InputStream f) { + source = f; + recordCnt = 0; + sourceSize = -1; + } + + public WarcImporter(File f) throws FileNotFoundException{ + name = f.getName(); + sourceSize = f.length(); + source = new FileInputStream(f); + } /** * Reads a Warc file and adds all contained responses to the index. @@ -64,7 +89,8 @@ public class WarcImporter { public void indexWarcRecords(InputStream f) throws IOException { byte[] content; - int cnt = 0; + job = this; + startTime = System.currentTimeMillis(); WarcReader localwarcReader = WarcReaderFactory.getReader(f); WarcRecord wrec = localwarcReader.getNextRecord(); @@ -126,13 +152,82 @@ public class WarcImporter { ); Switchboard.getSwitchboard().toIndexer(response); - cnt++; + recordCnt++; } } } + this.consumed = localwarcReader.getConsumed(); wrec = localwarcReader.getNextRecord(); } localwarcReader.close(); - ConcurrentLog.info("WarcImporter", "Indexed " + cnt + " documents"); + ConcurrentLog.info("WarcImporter", "Indexed " + recordCnt + " documents"); + job = null; + } + + @Override + public void run() { + try { + this.indexWarcRecords(this.source); + } catch (IOException ex) { + ConcurrentLog.info("WarcImporter", ex.getMessage()); + } + } + + /** + * Filename of the input source + * @return + */ + @Override + public String source() { + return this.name; + } + + /** + * Number of responses (pages) indexed + * @return + */ + @Override + public int count() { + return this.recordCnt; + } + + /** + * Indexed responses per second + * @return + */ + @Override + public int speed() { + if (this.recordCnt == 0) return 0; + return (int) (this.recordCnt / Math.max(0L, runningTime() )); + } + + /** + * Duration in seconds running, working on the current import source + * @return duration in seconds + */ + @Override + public long runningTime() { + return (System.currentTimeMillis() - this.startTime) / 1000L; } + + /** + * Estimate on time remaining calculated from length of input source and + * processed bytes. + * @return duration in seconds + */ + @Override + public long remainingTime() { + if (this.consumed == 0) { + return 0; + } else { + long speed = this.consumed / runningTime(); + return (this.sourceSize - this.consumed) / speed; + } + } + + @Override + public String status() { + return ""; + } + } diff --git a/source/net/yacy/search/Switchboard.java b/source/net/yacy/search/Switchboard.java index 68a9294e9..2a85da0fa 100644 --- a/source/net/yacy/search/Switchboard.java +++ b/source/net/yacy/search/Switchboard.java @@ -2005,9 +2005,13 @@ public final class Switchboard extends serverSwitch { return moved; } else if (s.endsWith(".warc") || s.endsWith(".warc.gz")) { try { - InputStream is = new BufferedInputStream(new FileInputStream(infile)); - WarcImporter wri = new WarcImporter(); - wri.indexWarcRecords(is); + WarcImporter wri = new WarcImporter(infile); + wri.start(); + try { + wri.join(); + } catch (InterruptedException ex) { + return moved; + } moved = infile.renameTo(outfile); } catch (IOException ex) { log.warn("IO Error processing warc file " + infile);