workflow update to OAI-PMH importer

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@6445 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
orbiter 15 years ago
parent b0b7a4f9a5
commit 2fa6bf440b

@ -46,10 +46,9 @@
<dl>
<dt>Thread:</dt><dd>#[thread]#</dd>
<dt>Source:</dt><dd>#[source]#</dd>
<dt>Processed:</dt><dd>#[count]# records</dd>
<dt>Processed Chunks:</dt><dd>#[chunkCount]# records</dd>
<dt>Imported Records:</dt><dd>#[recordsCount]# records</dd>
<dt>Speed:</dt><dd>#[speed]# records per second</dd>
<dt>Running Time:</dt><dd>#[runningHours]# hours, #[runningMinutes]# minutes</dd>
<dt>Remaining Time:</dt><dd>#[remainingHours]# hours, #[remainingMinutes]# minutes</dd>
</dl>
</fieldset></form>
#(/import-all)#

@ -24,6 +24,7 @@
import java.io.IOException;
import java.net.MalformedURLException;
import java.util.NoSuchElementException;
import net.yacy.document.importer.OAIPMHImporter;
import net.yacy.document.importer.OAIPMHReader;
@ -46,18 +47,26 @@ public class IndexImportOAIPMH_p {
prop.put("import-all_status", 0);
prop.put("defaulturl", "");
if (OAIPMHImporter.job != null) {
OAIPMHImporter job = null;
try {
job = OAIPMHImporter.runningJobs.first();
} catch (NoSuchElementException e0) {
try {
job = OAIPMHImporter.startedJobs.first();
} catch (NoSuchElementException e1) {
try {
job = OAIPMHImporter.finishedJobs.first();
} catch (NoSuchElementException e2) {}
}
}
if (job != null) {
// one import is running, no option to insert anything
prop.put("import-all", 1);
prop.put("import-all_thread", (OAIPMHImporter.job.isAlive()) ? "running" : "finished");
prop.put("import-all_source", OAIPMHImporter.job.source());
prop.put("import-all_count", OAIPMHImporter.job.count());
prop.put("import-all_speed", OAIPMHImporter.job.speed());
prop.put("import-all_runningHours", (OAIPMHImporter.job.runningTime() / 60) / 60);
prop.put("import-all_runningMinutes", (OAIPMHImporter.job.runningTime() / 60) % 60);
prop.put("import-all_remainingHours", (OAIPMHImporter.job.remainingTime() / 60) / 60);
prop.put("import-all_remainingMinutes", (OAIPMHImporter.job.remainingTime() / 60) % 60);
prop.put("import-all_thread", (job.isAlive()) ? "running" : "finished");
prop.put("import-all_source", job.source());
prop.put("import-all_chunkCount", job.chunkCount());
prop.put("import-all_recordsCount", job.count());
prop.put("import-all_speed", job.speed());
return prop;
}
@ -100,17 +109,14 @@ public class IndexImportOAIPMH_p {
DigestURI url = null;
try {
url = new DigestURI(oaipmhurl, null);
OAIPMHImporter.job = new OAIPMHImporter(sb.loader, url);
OAIPMHImporter.job.start();
job = new OAIPMHImporter(sb.loader, url);
job.start();
prop.put("import-all", 1);
prop.put("import-all_thread", "started");
prop.put("import-all_source", OAIPMHImporter.job.source());
prop.put("import-all_count", 0);
prop.put("import-all_source", job.source());
prop.put("import-all_chunkCount", 0);
prop.put("import-all_recordsCount", 0);
prop.put("import-all_speed", 0);
prop.put("import-all_runningHours", 0);
prop.put("import-all_runningMinutes", 0);
prop.put("import-all_remainingHours", 0);
prop.put("import-all_remainingMinutes", 0);
} catch (MalformedURLException e) {
e.printStackTrace();
prop.put("import-all", 0);

@ -28,6 +28,7 @@ package net.yacy.document.importer;
import java.io.IOException;
import java.net.MalformedURLException;
import java.util.TreeSet;
import net.yacy.kelondro.data.meta.DigestURI;
import net.yacy.repository.LoaderDispatcher;
@ -41,21 +42,29 @@ import de.anomic.search.Switchboard;
// http://opus.bsz-bw.de/fhhv/oai2/oai2.php?verb=ListRecords&metadataPrefix=oai_dc
public class OAIPMHImporter extends Thread implements Importer {
public class OAIPMHImporter extends Thread implements Importer, Comparable<OAIPMHImporter> {
public static OAIPMHImporter job; // if started from a servlet, this object is used to store the thread
private static int importerCounter = 0;
public static TreeSet<OAIPMHImporter> startedJobs = new TreeSet<OAIPMHImporter>();
public static TreeSet<OAIPMHImporter> runningJobs = new TreeSet<OAIPMHImporter>();
public static TreeSet<OAIPMHImporter> finishedJobs = new TreeSet<OAIPMHImporter>();
private LoaderDispatcher loader;
private DigestURI source;
private int count;
private long startTime;
private int recordsCount, chunkCount;
private long startTime, finishTime;
private ResumptionToken resumptionToken;
private String message;
private int serialNumber;
public OAIPMHImporter(LoaderDispatcher loader, DigestURI source) {
this.serialNumber = importerCounter++;
this.loader = loader;
this.count = 0;
this.recordsCount = 0;
this.chunkCount = 0;
this.startTime = System.currentTimeMillis();
this.finishTime = 0;
this.resumptionToken = null;
this.message = "import initialized";
// fix start url
@ -67,10 +76,15 @@ public class OAIPMHImporter extends Thread implements Importer {
// this should never happen
e.printStackTrace();
}
startedJobs.add(this);
}
public int count() {
return this.count;
return this.recordsCount;
}
public int chunkCount() {
return this.chunkCount;
}
public String status() {
@ -82,11 +96,11 @@ public class OAIPMHImporter extends Thread implements Importer {
}
public long remainingTime() {
return Long.MAX_VALUE; // we don't know
return (this.isAlive()) ? Long.MAX_VALUE : 0; // we don't know
}
public long runningTime() {
return System.currentTimeMillis() - this.startTime;
return (this.isAlive()) ? System.currentTimeMillis() - this.startTime : this.finishTime - this.startTime;
}
public String source() {
@ -98,10 +112,17 @@ public class OAIPMHImporter extends Thread implements Importer {
}
public void run() {
while (runningJobs.size() > 10) {
try {Thread.sleep(1000 + 1000 * System.currentTimeMillis() % 6);} catch (InterruptedException e) {}
}
startedJobs.remove(this);
runningJobs.add(this);
this.message = "loading first part of records";
while (true) {
try {
OAIPMHReader reader = new OAIPMHReader(this.loader, this.source, Switchboard.getSwitchboard().surrogatesInPath, "oaipmh");
this.chunkCount++;
this.recordsCount += reader.getResumptionToken().getRecordCounter();
this.source = reader.getResumptionToken().resumptionURL(this.source);
if (this.source == null) {
this.message = "import terminated with source = null";
@ -113,5 +134,26 @@ public class OAIPMHImporter extends Thread implements Importer {
break;
}
}
this.finishTime = System.currentTimeMillis();
runningJobs.remove(this);
finishedJobs.add(this);
}
// methods that are needed to put the object into a Hashtable or a Map:
public int hashCode() {
return this.serialNumber;
}
public boolean equals(OAIPMHImporter o) {
return this.compareTo(o) == 0;
}
// methods that are needed to put the object into a Tree:
public int compareTo(OAIPMHImporter o) {
if (this.serialNumber > o.serialNumber) return 1;
if (this.serialNumber < o.serialNumber) return -1;
return 0;
}
}

@ -56,7 +56,7 @@ public class OAIPMHReader {
// load the file from the net
Response response;
response = loader.load(source, true, true, CrawlProfile.CACHE_STRATEGY_NOCACHE);
response = loader.load(source, false, true, CrawlProfile.CACHE_STRATEGY_NOCACHE);
byte[] b = response.getContent();
this.resumptionToken = new ResumptionToken(new ByteArrayInputStream(b));
String file = filePrefix + "_" + this.source.getHost() + "_" + DateFormatter.formatShortMilliSecond(new Date());

Loading…
Cancel
Save