From ba339a2a454fb31d6527bd53f3652d75bc7674b4 Mon Sep 17 00:00:00 2001
From: reger No import thread is running, you can start a new thread here usage rule
---
htroot/IndexImportOAIPMH_p.html | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
diff --git a/htroot/IndexImportOAIPMH_p.html b/htroot/IndexImportOAIPMH_p.html
index 671d3acdd..84c9da15f 100644
--- a/htroot/IndexImportOAIPMH_p.html
+++ b/htroot/IndexImportOAIPMH_p.html
@@ -19,12 +19,12 @@
#(import-one)#::
- Web Archive File Import
+
+ #(import)#
+
+ ::
+
+ #(/import)#
+
+ #%env/templates/footer.template%#
+
+
\ No newline at end of file
diff --git a/htroot/IndexImportWarc_p.java b/htroot/IndexImportWarc_p.java
new file mode 100644
index 000000000..f503fe98b
--- /dev/null
+++ b/htroot/IndexImportWarc_p.java
@@ -0,0 +1,76 @@
+// IndexImportWarc_p.java
+// -------------------------
+// (c) 2017 by reger24; https://github.com/reger24
+//
+// This program is free software; you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation; either version 2 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program; if not, write to the Free Software
+// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+import java.io.File;
+import java.io.FileNotFoundException;
+
+import net.yacy.cora.protocol.RequestHeader;
+import net.yacy.document.importer.WarcImporter;
+import net.yacy.search.Switchboard;
+import net.yacy.server.serverObjects;
+import net.yacy.server.serverSwitch;
+
+public class IndexImportWarc_p {
+
+ public static serverObjects respond(@SuppressWarnings("unused") final RequestHeader header, final serverObjects post, final serverSwitch env) {
+ final serverObjects prop = new serverObjects();
+ final Switchboard sb = (Switchboard) env;
+
+ if (WarcImporter.job != null && WarcImporter.job.isAlive()) {
+ // one import is running, no option to insert anything
+ prop.put("import", 1);
+ prop.put("import_thread", "running");
+ prop.put("import_warcfile", WarcImporter.job.source());
+ prop.put("import_count", WarcImporter.job.count());
+ prop.put("import_speed", WarcImporter.job.speed());
+ prop.put("import_runningHours", (WarcImporter.job.runningTime() / 60) / 60);
+ prop.put("import_runningMinutes", (WarcImporter.job.runningTime() / 60) % 60);
+ prop.put("import_remainingHours", (WarcImporter.job.remainingTime() / 60) / 60);
+ prop.put("import_remainingMinutes", (WarcImporter.job.remainingTime() / 60) % 60);
+ } else {
+ prop.put("import", 0);
+ if (post != null) {
+ if (post.containsKey("file")) {
+ String file = post.get("file");
+ final File sourcefile = new File(file);
+ if (sourcefile.exists()) {
+ try {
+ WarcImporter wi = new WarcImporter(sourcefile);
+ wi.start();
+ prop.put("import_thread", "started");
+ } catch (FileNotFoundException ex) {
+ prop.put("import_thread", "Error: file not found [" + file + "]");
+ }
+ prop.put("import_warcfile", file);
+ } else {
+ prop.put("import_warcfile", "");
+ prop.put("import_thread", "Error: file not found [" + file + "]");
+ }
+ prop.put("import", 1);
+ prop.put("import_count", 0);
+ prop.put("import_speed", 0);
+ prop.put("import_runningHours", 0);
+ prop.put("import_runningMinutes", 0);
+ prop.put("import_remainingHours", 0);
+ prop.put("import_remainingMinutes", 0);
+ }
+ }
+ }
+ return prop;
+ }
+}
diff --git a/htroot/env/templates/submenuIndexImport.template b/htroot/env/templates/submenuIndexImport.template
index b44d9d95a..d85b313c2 100644
--- a/htroot/env/templates/submenuIndexImport.template
+++ b/htroot/env/templates/submenuIndexImport.template
@@ -13,6 +13,7 @@
diff --git a/source/net/yacy/document/importer/WarcImporter.java b/source/net/yacy/document/importer/WarcImporter.java
index e3463a02c..e921765ce 100644
--- a/source/net/yacy/document/importer/WarcImporter.java
+++ b/source/net/yacy/document/importer/WarcImporter.java
@@ -22,6 +22,9 @@
*/
package net.yacy.document.importer;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import net.yacy.cora.document.id.DigestURL;
@@ -52,7 +55,29 @@ import org.jwat.warc.WarcRecord;
* http://archive-access.sourceforge.net/warc/warc_file_format-0.9.html
* http://archive-access.sourceforge.net/warc/
*/
-public class WarcImporter {
+public class WarcImporter extends Thread implements Importer {
+
+ static public Importer job; // static object to assure only one importer is running (if started from a servlet, this object is used to store the thread)
+
+ private final InputStream source; // current input warc archive
+ private String name; // file name of input source
+
+ private int recordCnt; // number of responses indexed (for statistic)
+ private long startTime; // (for statistic)
+ private final long sourceSize; // length of the input source (for statistic)
+ private long consumed; // bytes consumed from input source (for statistic)
+
+ public WarcImporter(InputStream f) {
+ source = f;
+ recordCnt = 0;
+ sourceSize = -1;
+ }
+
+ public WarcImporter(File f) throws FileNotFoundException{
+ name = f.getName();
+ sourceSize = f.length();
+ source = new FileInputStream(f);
+ }
/**
* Reads a Warc file and adds all contained responses to the index.
@@ -64,7 +89,8 @@ public class WarcImporter {
public void indexWarcRecords(InputStream f) throws IOException {
byte[] content;
- int cnt = 0;
+ job = this;
+ startTime = System.currentTimeMillis();
WarcReader localwarcReader = WarcReaderFactory.getReader(f);
WarcRecord wrec = localwarcReader.getNextRecord();
@@ -126,13 +152,82 @@ public class WarcImporter {
);
Switchboard.getSwitchboard().toIndexer(response);
- cnt++;
+ recordCnt++;
}
}
}
+ this.consumed = localwarcReader.getConsumed();
wrec = localwarcReader.getNextRecord();
}
localwarcReader.close();
- ConcurrentLog.info("WarcImporter", "Indexed " + cnt + " documents");
+ ConcurrentLog.info("WarcImporter", "Indexed " + recordCnt + " documents");
+ job = null;
+ }
+
+ @Override
+ public void run() {
+ try {
+ this.indexWarcRecords(this.source);
+ } catch (IOException ex) {
+ ConcurrentLog.info("WarcImporter", ex.getMessage());
+ }
+ }
+
+ /**
+ * Filename of the input source
+ * @return
+ */
+ @Override
+ public String source() {
+ return this.name;
+ }
+
+ /**
+ * Number of responses (pages) indexed
+ * @return
+ */
+ @Override
+ public int count() {
+ return this.recordCnt;
+ }
+
+ /**
+ * Indexed responses per second
+ * @return
+ */
+ @Override
+ public int speed() {
+ if (this.recordCnt == 0) return 0;
+ return (int) (this.recordCnt / Math.max(0L, runningTime() ));
+ }
+
+ /**
+ * Duration in seconds running, working on the current import source
+ * @return duration in seconds
+ */
+ @Override
+ public long runningTime() {
+ return (System.currentTimeMillis() - this.startTime) / 1000L;
}
+
+ /**
+ * Estimate on time remaining calculated from length of input source and
+ * processed bytes.
+ * @return duration in seconds
+ */
+ @Override
+ public long remainingTime() {
+ if (this.consumed == 0) {
+ return 0;
+ } else {
+ long speed = this.consumed / runningTime();
+ return (this.sourceSize - this.consumed) / speed;
+ }
+ }
+
+ @Override
+ public String status() {
+ return "";
+ }
+
}
diff --git a/source/net/yacy/search/Switchboard.java b/source/net/yacy/search/Switchboard.java
index 68a9294e9..2a85da0fa 100644
--- a/source/net/yacy/search/Switchboard.java
+++ b/source/net/yacy/search/Switchboard.java
@@ -2005,9 +2005,13 @@ public final class Switchboard extends serverSwitch {
return moved;
} else if (s.endsWith(".warc") || s.endsWith(".warc.gz")) {
try {
- InputStream is = new BufferedInputStream(new FileInputStream(infile));
- WarcImporter wri = new WarcImporter();
- wri.indexWarcRecords(is);
+ WarcImporter wri = new WarcImporter(infile);
+ wri.start();
+ try {
+ wri.join();
+ } catch (InterruptedException ex) {
+ return moved;
+ }
moved = infile.renameTo(outfile);
} catch (IOException ex) {
log.warn("IO Error processing warc file " + infile);
From 777cb5b812033ab2829f83ce80fe8bf7fe0315cd Mon Sep 17 00:00:00 2001
From: reger
+
Import failed: #[error]#
+ + ::Import failed: #[error]#
#(/import-one)# From c19d60f06b90ad7fddcc70913c249d377d37007d Mon Sep 17 00:00:00 2001 From: reger