From 2fa6bf440b28bfae7e044d92df28b4dedb84f6c1 Mon Sep 17 00:00:00 2001 From: orbiter Date: Mon, 2 Nov 2009 18:19:30 +0000 Subject: [PATCH] workflow update to OAI-PMH importer git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@6445 6c8d7289-2bf4-0310-a012-ef5d649a1542 --- htroot/IndexImportOAIPMH_p.html | 5 +- htroot/IndexImportOAIPMH_p.java | 42 ++++++++------ .../document/importer/OAIPMHImporter.java | 58 ++++++++++++++++--- .../yacy/document/importer/OAIPMHReader.java | 2 +- 4 files changed, 77 insertions(+), 30 deletions(-) diff --git a/htroot/IndexImportOAIPMH_p.html b/htroot/IndexImportOAIPMH_p.html index 325a1a8e2..e99da0dcb 100644 --- a/htroot/IndexImportOAIPMH_p.html +++ b/htroot/IndexImportOAIPMH_p.html @@ -46,10 +46,9 @@
Thread:
#[thread]#
Source:
#[source]#
-
Processed:
#[count]# records
+
Processed Chunks:
#[chunkCount]# records
+
Imported Records:
#[recordsCount]# records
Speed:
#[speed]# records per second
-
Running Time:
#[runningHours]# hours, #[runningMinutes]# minutes
-
Remaining Time:
#[remainingHours]# hours, #[remainingMinutes]# minutes
#(/import-all)# diff --git a/htroot/IndexImportOAIPMH_p.java b/htroot/IndexImportOAIPMH_p.java index f3e84483d..136d4a76c 100644 --- a/htroot/IndexImportOAIPMH_p.java +++ b/htroot/IndexImportOAIPMH_p.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.net.MalformedURLException; +import java.util.NoSuchElementException; import net.yacy.document.importer.OAIPMHImporter; import net.yacy.document.importer.OAIPMHReader; @@ -46,18 +47,26 @@ public class IndexImportOAIPMH_p { prop.put("import-all_status", 0); prop.put("defaulturl", ""); - - if (OAIPMHImporter.job != null) { + OAIPMHImporter job = null; + try { + job = OAIPMHImporter.runningJobs.first(); + } catch (NoSuchElementException e0) { + try { + job = OAIPMHImporter.startedJobs.first(); + } catch (NoSuchElementException e1) { + try { + job = OAIPMHImporter.finishedJobs.first(); + } catch (NoSuchElementException e2) {} + } + } + if (job != null) { // one import is running, no option to insert anything prop.put("import-all", 1); - prop.put("import-all_thread", (OAIPMHImporter.job.isAlive()) ? "running" : "finished"); - prop.put("import-all_source", OAIPMHImporter.job.source()); - prop.put("import-all_count", OAIPMHImporter.job.count()); - prop.put("import-all_speed", OAIPMHImporter.job.speed()); - prop.put("import-all_runningHours", (OAIPMHImporter.job.runningTime() / 60) / 60); - prop.put("import-all_runningMinutes", (OAIPMHImporter.job.runningTime() / 60) % 60); - prop.put("import-all_remainingHours", (OAIPMHImporter.job.remainingTime() / 60) / 60); - prop.put("import-all_remainingMinutes", (OAIPMHImporter.job.remainingTime() / 60) % 60); + prop.put("import-all_thread", (job.isAlive()) ? "running" : "finished"); + prop.put("import-all_source", job.source()); + prop.put("import-all_chunkCount", job.chunkCount()); + prop.put("import-all_recordsCount", job.count()); + prop.put("import-all_speed", job.speed()); return prop; } @@ -100,17 +109,14 @@ public class IndexImportOAIPMH_p { DigestURI url = null; try { url = new DigestURI(oaipmhurl, null); - OAIPMHImporter.job = new OAIPMHImporter(sb.loader, url); - OAIPMHImporter.job.start(); + job = new OAIPMHImporter(sb.loader, url); + job.start(); prop.put("import-all", 1); prop.put("import-all_thread", "started"); - prop.put("import-all_source", OAIPMHImporter.job.source()); - prop.put("import-all_count", 0); + prop.put("import-all_source", job.source()); + prop.put("import-all_chunkCount", 0); + prop.put("import-all_recordsCount", 0); prop.put("import-all_speed", 0); - prop.put("import-all_runningHours", 0); - prop.put("import-all_runningMinutes", 0); - prop.put("import-all_remainingHours", 0); - prop.put("import-all_remainingMinutes", 0); } catch (MalformedURLException e) { e.printStackTrace(); prop.put("import-all", 0); diff --git a/source/net/yacy/document/importer/OAIPMHImporter.java b/source/net/yacy/document/importer/OAIPMHImporter.java index de69c017a..adecf934d 100644 --- a/source/net/yacy/document/importer/OAIPMHImporter.java +++ b/source/net/yacy/document/importer/OAIPMHImporter.java @@ -28,6 +28,7 @@ package net.yacy.document.importer; import java.io.IOException; import java.net.MalformedURLException; +import java.util.TreeSet; import net.yacy.kelondro.data.meta.DigestURI; import net.yacy.repository.LoaderDispatcher; @@ -41,21 +42,29 @@ import de.anomic.search.Switchboard; // http://opus.bsz-bw.de/fhhv/oai2/oai2.php?verb=ListRecords&metadataPrefix=oai_dc -public class OAIPMHImporter extends Thread implements Importer { +public class OAIPMHImporter extends Thread implements Importer, Comparable { - public static OAIPMHImporter job; // if started from a servlet, this object is used to store the thread + private static int importerCounter = 0; + + public static TreeSet startedJobs = new TreeSet(); + public static TreeSet runningJobs = new TreeSet(); + public static TreeSet finishedJobs = new TreeSet(); private LoaderDispatcher loader; private DigestURI source; - private int count; - private long startTime; + private int recordsCount, chunkCount; + private long startTime, finishTime; private ResumptionToken resumptionToken; private String message; + private int serialNumber; public OAIPMHImporter(LoaderDispatcher loader, DigestURI source) { + this.serialNumber = importerCounter++; this.loader = loader; - this.count = 0; + this.recordsCount = 0; + this.chunkCount = 0; this.startTime = System.currentTimeMillis(); + this.finishTime = 0; this.resumptionToken = null; this.message = "import initialized"; // fix start url @@ -67,10 +76,15 @@ public class OAIPMHImporter extends Thread implements Importer { // this should never happen e.printStackTrace(); } + startedJobs.add(this); } public int count() { - return this.count; + return this.recordsCount; + } + + public int chunkCount() { + return this.chunkCount; } public String status() { @@ -82,11 +96,11 @@ public class OAIPMHImporter extends Thread implements Importer { } public long remainingTime() { - return Long.MAX_VALUE; // we don't know + return (this.isAlive()) ? Long.MAX_VALUE : 0; // we don't know } public long runningTime() { - return System.currentTimeMillis() - this.startTime; + return (this.isAlive()) ? System.currentTimeMillis() - this.startTime : this.finishTime - this.startTime; } public String source() { @@ -98,10 +112,17 @@ public class OAIPMHImporter extends Thread implements Importer { } public void run() { + while (runningJobs.size() > 10) { + try {Thread.sleep(1000 + 1000 * System.currentTimeMillis() % 6);} catch (InterruptedException e) {} + } + startedJobs.remove(this); + runningJobs.add(this); this.message = "loading first part of records"; while (true) { try { OAIPMHReader reader = new OAIPMHReader(this.loader, this.source, Switchboard.getSwitchboard().surrogatesInPath, "oaipmh"); + this.chunkCount++; + this.recordsCount += reader.getResumptionToken().getRecordCounter(); this.source = reader.getResumptionToken().resumptionURL(this.source); if (this.source == null) { this.message = "import terminated with source = null"; @@ -113,5 +134,26 @@ public class OAIPMHImporter extends Thread implements Importer { break; } } + this.finishTime = System.currentTimeMillis(); + runningJobs.remove(this); + finishedJobs.add(this); + } + + + // methods that are needed to put the object into a Hashtable or a Map: + + public int hashCode() { + return this.serialNumber; + } + + public boolean equals(OAIPMHImporter o) { + return this.compareTo(o) == 0; + } + + // methods that are needed to put the object into a Tree: + public int compareTo(OAIPMHImporter o) { + if (this.serialNumber > o.serialNumber) return 1; + if (this.serialNumber < o.serialNumber) return -1; + return 0; } } \ No newline at end of file diff --git a/source/net/yacy/document/importer/OAIPMHReader.java b/source/net/yacy/document/importer/OAIPMHReader.java index f2eb20415..2c28251dc 100644 --- a/source/net/yacy/document/importer/OAIPMHReader.java +++ b/source/net/yacy/document/importer/OAIPMHReader.java @@ -56,7 +56,7 @@ public class OAIPMHReader { // load the file from the net Response response; - response = loader.load(source, true, true, CrawlProfile.CACHE_STRATEGY_NOCACHE); + response = loader.load(source, false, true, CrawlProfile.CACHE_STRATEGY_NOCACHE); byte[] b = response.getContent(); this.resumptionToken = new ResumptionToken(new ByteArrayInputStream(b)); String file = filePrefix + "_" + this.source.getHost() + "_" + DateFormatter.formatShortMilliSecond(new Date());