*) Redesign of db import functionality

- restructuring to allow different import tasks to be controlled via one gui 
   - adding possibility to import a single assortment file
   - adding possibility to set the cache size that should be used

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@1504 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
theli 19 years ago
parent 56936139ae
commit 6a99304b2b

@ -10,7 +10,6 @@
#%env/templates/submenuIndexControl.template%#
<br><br>
<h2>Index DB Import</h2>
<p>The local index currently consists of (at least) #[wcount]# reverse word indexes and #[ucount]# URL references.</p>
<hr>
#(error)#<!-- 0 -->
@ -22,13 +21,40 @@
<p><font color="red"><b>#[error_msg]#</b></font></p>
<p><font color="red"><code>#[error_stackTrace]#</code></font></p>
#(/error)#
<h3>Starting new Job</h3>
<form action="IndexImport_p.html" method="post" enctype="multipart/form-data">
<h3>Starting new Job</h3>
<table>
<tr><td>
<table border="0" cellpadding="2" cellspacing="1">
<tr>
<td title="Path to the PLASMADB directory of the foreign peer">Import Path:</td>
<td><input name="importPath" type="text" size="50" value=""></td>
<td><input type="submit" name="startIndexDbImport" value="Start New Import"></td>
<tr class="TableCellLight">
<td>Import&nbsp;Type:</td>
<td title="the path to the database that should be imported"><select name="importType" size="1">
<option value="plasmaDB">PLASMA DB Import</option>
<option value="assortment">Assortment File Import</option>
</select>
</td>
<td title="the cache size that should be used for the import db">Cache Size</td>
<td><select name="cacheSize" size="1">
<option value="2097152">2 MB</option>
<option value="4194304">4 MB</option>
<option value="8388608" selected="selected">8 MB</option>
<option value="16777216">16 MB</option>
<option value="33554432">32 MB</option>
<option value="67108864">64 MB</option>
<option value="134217728">128 MB</option>
</select>
</td>
<td><a href="#usage">Usage Examples</a></td>
</tr>
<tr class="TableCellLight">
<td title="Path to the PLASMADB directory of the foreign peer">Import&nbsp;Path:</td>
<td colspan="3"><input name="importPath" type="text" size="50" value=""></td>
<td><input type="submit" name="startIndexDbImport" value="Start Import"></td>
</tr>
</table>
</td>
<td><font color="red"><b>Attention:</b><br>Always do a backup of your source and destination database before starting to use this import function.</font>
</td>
</tr>
</table>
</form>
@ -38,15 +64,13 @@
<p>
<table border="0" cellpadding="2" cellspacing="1">
<tr class="TableHeader" valign="bottom">
<td class="small" width="150">Path</td>
<td class="small">Job Type</td>
<td class="small" width="150">Job Name</td>
<td class="small" >Status</td>
<td class="small" >%</td>
<td class="small" >Elapsed<br>Time</td>
<td class="small" >Estimated<br>Time</td>
<td class="small" >Word Hash</td>
<td class="small" ># URLs</td>
<td class="small" ># Word<br>Entities</td>
<td class="small" ># Word<br>Entries</td>
<td class="small" >Import Status</td>
<td class="small" >Abort Import</td>
<td class="small" >Pause Import</td>
</tr>
@ -54,25 +78,23 @@
<form action="IndexImport_p.html" method="post" enctype="multipart/form-data">
<input type="hidden" name="jobNr" value="#[job_nr]#">
<tr class="TableCellLight">
<td class="small">#[path]#</td>
<td class="small"><font color="#(status)#red::green::blue#(/status)#">#(status)#Finished::Running::Paused#(/status)#</font></td>
<td class="small">#[type]#</td>
<td class="small" title="#[fullName]#">#[shortName]#</td>
<td class="small"><font color="#(runningStatus)#red::green::blue#(/runningStatus)#">#(runningStatus)#Finished::Running::Paused#(/runningStatus)#</font></td>
<td class="small" align="right">#[percent]#</td>
<td class="small" align="right">#[elapsed]#</td>
<td class="small" align="right">#[estimated]#</td>
<td class="small" align="right"><tt>#[wordHash]#</tt></td>
<td class="small" align="rigth">#[url_num]#</td>
<td class="small" align="rigth">#[word_entity_num]#</td>
<td class="small" align="rigth">#[word_entry_num]#</td>
<td class="small" align="left"><tt>#[status]#</tt></td>
<td class="small">
#(stopped)#::
<input type="submit" name="stopIndexDbImport" value="Abort Import">
<input type="submit" name="stopIndexDbImport" value="Abort">
#(/stopped)#
</td>
<td class="small">
#(paused)#
<input type="submit" name="pauseIndexDbImport" value="Pause Import">
<input type="submit" name="pauseIndexDbImport" value="Pause">
::
<input type="submit" name="continueIndexDbImport" value="Continue Import">
<input type="submit" name="continueIndexDbImport" value="Continue">
#(/paused)#
</td>
</tr>
@ -87,32 +109,138 @@
<p>
<table border="0" cellpadding="2" cellspacing="1">
<tr class="TableHeader" valign="bottom">
<td class="small">Job Type</td>
<td class="small" width="150">Path</td>
<td class="small" >Status</td>
<td class="small" >%</td>
<td class="small" >Elapsed<br>Time</td>
<td class="small" >Word Hash</td>
<td class="small" ># URLs</td>
<td class="small" ># Word<br>Entities</td>
<td class="small" ># Word<br>Entries</td>
<td class="small" >Import Status</td>
</tr>
#{finished.jobs}#
<tr class="TableCellLight">
<td class="small">#[path]#</td>
<td class="small"><font color="#(status)#red::green::red#(/status)#">#(status)#Finished::<b>Error:</b> #[errorMsg]#::Paused#(/status)#</font></td>
<td class="small">#[type]#</td>
<td class="small" title="#[fullName]#">#[shortName]#</td>
<td class="small"><font color="#(runningStatus)#red::green::red#(/runningStatus)#">#(runningStatus)#Finished::<b>Error:</b> #[errorMsg]#::Paused#(/runningStatus)#</font></td>
<td class="small" align="right">#[percent]#</td>
<td class="small" align="right">#[elapsed]#</td>
<td class="small" align="right"><tt>#[wordHash]#</tt></td>
<td class="small" align="rigth">#[url_num]#</td>
<td class="small" align="rigth">#[word_entity_num]#</td>
<td class="small" align="rigth">#[word_entry_num]#</td>
<td class="small" align="right"><tt>#[status]#</tt></td>
</tr>
#{/finished.jobs}#
</table>
<input type="submit" name="clearFinishedJobList" value="Clear List">
</form>
<p><font size="-3"><i>Last Refresh:</i> #[date]#</font></p>
<hr>
<p><div id="usage"><h2>Usage Examples:</h2></div></p>
<p><h3>Plasma DB Import:</h3></p>
<p>
<b>Example Path:</b> <tt>E:\PLASMADB\</tt><br>
</p>
<p>
<b>Requirements:</b><br>
You need to have at least the following directories and files in this path:
<table border="1" cellpadding="2" cellspacing="1">
<tr style="background-color: lightgrey">
<td>Name</td>
<td>Type</td>
<td>Witeable</td>
<td>Description</td>
</tr>
<tr>
<td><tt>urlHash.db</tt></td>
<td>File</td>
<td>No</td>
<td>The LoadedURL Database containing all loaded and indexed URLs</td>
</tr>
<tr>
<td><tt>ACLUSTER</tt></td>
<td>Directory</td>
<td>Yes</td>
<td>The assortment directory containing parts of the word index.</td>
</tr>
<tr>
<td><tt>WORDS</tt></td>
<td>Directory</td>
<td>Yes</td>
<td>The words directory containing parts of the word index.</td>
</tr>
</table>
</p>
<p><h3>Assortment Import:</h3></p>
<p>
<b>Example Path:</b> <tt>E:\PLASMADB\ACLUSTER\indexAssortment001.db</tt>
</p>
<p>
<b>Requirements:</b><br>
You need to have at least the following directories and files in this path:
<table border="1" cellpadding="2" cellspacing="1">
<tr style="background-color: lightgrey">
<td>Name</td>
<td>Type</td>
<td>Witeable</td>
<td>Description</td>
</tr>
<tr>
<td><tt>indexAssortment001.db</tt></td>
<td>File</td>
<td>No</td>
<td>The assortment file that should be imported.<br>
<b>Attention:</b> The assortment file must have the postfix "[0-9]{3}\.db".
If you would like to import an assortment file from the <tt>PLASMADB\ACLUSTER\ABKP</tt></td>
</tr>
</table>
</p>
<p>
<b>Notes:</b><br>
Please note that the imported words are useless if the destination peer doesn't know
the URLs the imported words belongs to.
</p>
<!--
<p><h3>Crawling Queue Import:</h3></p>
<p>
<b>Example Path:</b> <tt>E:\PLASMADB\</tt>
</p>
<b>Requirements:</b><br>
You need to have at least the following directories and files in this path:
<table border="1" cellpadding="2" cellspacing="1">
<tr style="background-color: lightgrey">
<td>Name</td>
<td>Type</td>
<td>Witeable</td>
<td>Description</td>
</tr>
<tr>
<td><tt>crawlProfiles0.db</tt></td>
<td>File</td>
<td>No</td>
<td>Contains data about the crawljob an URL belongs to</td>
</tr>
<tr>
<td><tt>urlNotice1.db</tt></td>
<td>File</td>
<td>Yes</td>
<td>The crawling queue</td>
</tr>
<tr>
<td><tt>urlNoticeImage0.stack</tt></td>
<td rowspan="8">File</td>
<td rowspan="8">Yes</td>
<td rowspan="8">Various stack files that belong to the crawling queue</td>
</tr>
<tr><td><tt>urlNoticeImage0.stack</tt></td></tr>
<tr><td><tt>urlNoticeLimit0.stack</tt></td></tr>
<tr><td><tt>urlNoticeLocal0.stack</tt></td></tr>
<tr><td><tt>urlNoticeMovie0.stack</tt></td></tr>
<tr><td><tt>urlNoticeMusic0.stack</tt></td></tr>
<tr><td><tt>urlNoticeOverhang0.stack</tt></td></tr>
<tr><td><tt>urlNoticeRemote0.stack</tt></td></tr>
</table>
</p>
-->
#%env/templates/footer.template%#
</body>
</html>

@ -54,8 +54,10 @@ import java.util.Date;
import java.util.Vector;
import de.anomic.http.httpHeader;
import de.anomic.plasma.plasmaDbImporter;
import de.anomic.plasma.plasmaSwitchboard;
import de.anomic.plasma.dbImport.dbImportManager;
import de.anomic.plasma.dbImport.dbImporter;
import de.anomic.plasma.dbImport.plasmaDbImporter;
import de.anomic.server.serverByteBuffer;
import de.anomic.server.serverDate;
import de.anomic.server.serverObjects;
@ -75,24 +77,33 @@ public final class IndexImport_p {
try {
// getting the import path
String importPath = (String) post.get("importPath");
String importType = (String) post.get("importType");
String cacheSizeStr = (String) post.get("cacheSize");
int cacheSize = 8*1024*1024;
try {
cacheSize = Integer.valueOf(cacheSizeStr).intValue();
} catch (NumberFormatException e) {}
boolean startImport = true;
// check if there is an already running thread with the same import path
Thread[] importThreads = new Thread[plasmaDbImporter.runningJobs.activeCount()*2];
activeCount = plasmaDbImporter.runningJobs.enumerate(importThreads);
for (int i=0; i < activeCount; i++) {
plasmaDbImporter currThread = (plasmaDbImporter) importThreads[i];
if (currThread.getImportRoot().equals(new File(importPath))) {
prop.put("error",2);
startImport = false;
}
}
// // check if there is an already running thread with the same import path
// Thread[] importThreads = new Thread[plasmaDbImporter.runningJobs.activeCount()*2];
// activeCount = plasmaDbImporter.runningJobs.enumerate(importThreads);
//
// for (int i=0; i < activeCount; i++) {
// plasmaDbImporter currThread = (plasmaDbImporter) importThreads[i];
// if (currThread.getJobName().equals(new File(importPath))) {
// prop.put("error",2);
// startImport = false;
// }
// }
//
if (startImport) {
plasmaDbImporter newImporter = new plasmaDbImporter(switchboard.wordIndex,switchboard.urlPool.loadedURL,importPath);
newImporter.start();
dbImporter importerThread = switchboard.dbImportManager.getNewImporter(importType);
if (importerThread != null) {
importerThread.init(new File(importPath),cacheSize);
importerThread.startIt();
}
prop.put("LOCATION","");
return prop;
}
@ -108,7 +119,7 @@ public final class IndexImport_p {
errorOut.close();
}
} else if (post.containsKey("clearFinishedJobList")) {
plasmaDbImporter.finishedJobs.clear();
switchboard.dbImportManager.finishedJobs.clear();
prop.put("LOCATION","");
return prop;
} else if (
@ -117,25 +128,22 @@ public final class IndexImport_p {
(post.containsKey("continueIndexDbImport"))
) {
// getting the job nr of the thread
String jobNr = (String) post.get("jobNr");
Thread[] importThreads = new Thread[plasmaDbImporter.runningJobs.activeCount()*2];
activeCount = plasmaDbImporter.runningJobs.enumerate(importThreads);
for (int i=0; i < activeCount; i++) {
plasmaDbImporter currThread = (plasmaDbImporter) importThreads[i];
if (currThread.getJobNr() == Integer.valueOf(jobNr).intValue()) {
if (post.containsKey("stopIndexDbImport")) {
currThread.stoppIt();
try { currThread.join(); } catch (InterruptedException e) {e.printStackTrace();}
} else if (post.containsKey("pauseIndexDbImport")) {
currThread.pauseIt();
} else if (post.containsKey("continueIndexDbImport")) {
currThread.continueIt();
}
break;
}
}
String jobID = (String) post.get("jobNr");
dbImporter importer = switchboard.dbImportManager.getImporterByID(Integer.valueOf(jobID).intValue());
if (importer != null) {
if (post.containsKey("stopIndexDbImport")) {
try {
importer.stopIt();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} else if (post.containsKey("pauseIndexDbImport")) {
importer.pauseIt();
} else if (post.containsKey("continueIndexDbImport")) {
importer.continueIt();
}
}
prop.put("LOCATION","");
return prop;
}
@ -147,60 +155,64 @@ public final class IndexImport_p {
/*
* Loop over all currently running jobs
*/
Thread[] importThreads = new Thread[plasmaDbImporter.runningJobs.activeCount()*2];
activeCount = plasmaDbImporter.runningJobs.enumerate(importThreads);
dbImporter[] importThreads = switchboard.dbImportManager.getRunningImporter();
activeCount = importThreads.length;
for (int i=0; i < activeCount; i++) {
plasmaDbImporter currThread = (plasmaDbImporter) importThreads[i];
dbImporter currThread = importThreads[i];
// get import type
prop.put("running.jobs_" + i + "_type", currThread.getJobType());
// root path of the source db
prop.put("running.jobs_" + i + "_path", currThread.getImportRoot().toString());
String fullName = currThread.getJobName().toString();
String shortName = (fullName.length()>30)?fullName.substring(0,12) + "..." + fullName.substring(fullName.length()-22,fullName.length()):fullName;
prop.put("running.jobs_" + i + "_fullName",fullName);
prop.put("running.jobs_" + i + "_shortName",shortName);
// specifies if the importer is still running
prop.put("running.jobs_" + i + "_stopped", currThread.isAlive() ? 1:0);
prop.put("running.jobs_" + i + "_stopped", currThread.isStopped() ? 1:0);
// specifies if the importer was paused
prop.put("running.jobs_" + i + "_paused", currThread.isPaused() ? 1:0);
// setting the status
prop.put("running.jobs_" + i + "_status", currThread.isPaused() ? 2 : currThread.isAlive() ? 1 : 0);
prop.put("running.jobs_" + i + "_runningStatus", currThread.isPaused() ? 2 : currThread.isStopped() ? 1 : 0);
// other information
prop.put("running.jobs_" + i + "_percent", Integer.toString(currThread.getProcessingStatus()));
prop.put("running.jobs_" + i + "_percent", Integer.toString(currThread.getProcessingStatusPercent()));
prop.put("running.jobs_" + i + "_elapsed", serverDate.intervalToString(currThread.getElapsedTime()));
prop.put("running.jobs_" + i + "_estimated", serverDate.intervalToString(currThread.getEstimatedTime()));
prop.put("running.jobs_" + i + "_wordHash", currThread.getCurrentWordhash());
prop.put("running.jobs_" + i + "_url_num", Long.toString(currThread.getUrlCounter()));
prop.put("running.jobs_" + i + "_word_entity_num", Long.toString(currThread.getWordEntityCounter()));
prop.put("running.jobs_" + i + "_word_entry_num", Long.toString(currThread.getWordEntryCounter()));
prop.put("running.jobs_" + i + "_status", currThread.getStatus().replace("\n", "<br>"));
// job number of the importer thread
prop.put("running.jobs_" + i + "_job_nr", Integer.toString(currThread.getJobNr()));
prop.put("running.jobs_" + i + "_job_nr", Integer.toString(currThread.getJobID()));
}
prop.put("running.jobs",activeCount);
/*
* Loop over all finished jobs
*/
Vector finishedJobs = (Vector) plasmaDbImporter.finishedJobs.clone();
for (int i=0; i<finishedJobs.size(); i++) {
plasmaDbImporter currThread = (plasmaDbImporter) finishedJobs.get(i);
dbImporter[] finishedJobs = switchboard.dbImportManager.getFinishedImporter();
for (int i=0; i<finishedJobs.length; i++) {
dbImporter currThread = finishedJobs[i];
String error = currThread.getError();
prop.put("finished.jobs_" + i + "_path", currThread.getImportRoot().toString());
String fullName = currThread.getJobName().toString();
String shortName = (fullName.length()>30)?fullName.substring(0,12) + "..." + fullName.substring(fullName.length()-22,fullName.length()):fullName;
prop.put("finished.jobs_" + i + "_type", currThread.getJobType());
prop.put("finished.jobs_" + i + "_fullName", fullName);
prop.put("finished.jobs_" + i + "_shortName", shortName);
if (error != null) {
prop.put("finished.jobs_" + i + "_status", 2);
prop.put("finished.jobs_" + i + "_status_errorMsg", error);
prop.put("finished.jobs_" + i + "_runningStatus", 2);
prop.put("finished.jobs_" + i + "_runningStatus_errorMsg", error);
} else {
prop.put("finished.jobs_" + i + "_status", 0);
prop.put("finished.jobs_" + i + "_runningStatus", 0);
}
prop.put("finished.jobs_" + i + "_percent", Integer.toString(currThread.getProcessingStatus()));
prop.put("finished.jobs_" + i + "_percent", Integer.toString(currThread.getProcessingStatusPercent()));
prop.put("finished.jobs_" + i + "_elapsed", serverDate.intervalToString(currThread.getElapsedTime()));
prop.put("finished.jobs_" + i + "_wordHash", currThread.getCurrentWordhash());
prop.put("finished.jobs_" + i + "_url_num", Long.toString(currThread.getUrlCounter()));
prop.put("finished.jobs_" + i + "_word_entity_num", Long.toString(currThread.getWordEntityCounter()));
prop.put("finished.jobs_" + i + "_word_entry_num", Long.toString(currThread.getWordEntryCounter()));
prop.put("finished.jobs_" + i + "_status", currThread.getStatus().replace("\n", "<br>"));
}
prop.put("finished.jobs",finishedJobs.size());
prop.put("finished.jobs",finishedJobs.length);
prop.put("date",(new Date()).toString());
return prop;

@ -0,0 +1,113 @@
package de.anomic.plasma.dbImport;
import java.io.File;
import de.anomic.plasma.plasmaSwitchboard;
import de.anomic.server.logging.serverLog;
public abstract class AbstractImporter extends Thread implements dbImporter{
protected int jobID;
protected String jobType;
protected serverLog log;
protected boolean stopped = false;
protected boolean paused = false;
protected plasmaSwitchboard sb;
protected File importPath;
protected int cacheSize;
protected long globalStart = System.currentTimeMillis();
protected long globalEnd;
protected String error;
public AbstractImporter(plasmaSwitchboard theSb) {
super(theSb.dbImportManager.runningJobs,"");
this.sb = theSb;
}
public String getError() {
return this.error;
}
public void init(File theImportPath) {
this.importPath = theImportPath;
this.jobID = this.sb.dbImportManager.getJobID();
this.log = new serverLog("IMPORT_" + this.jobType + "_" + this.jobID);
this.setName("IMPORT_" + this.jobType + "_" + this.sb.dbImportManager.getJobID());
}
public void startIt() {
this.start();
}
public void stopIt() throws InterruptedException {
this.stopped = true;
this.continueIt();
this.join();
}
public void pauseIt() {
synchronized(this) {
this.paused = true;
}
}
public void continueIt() {
synchronized(this) {
if (this.paused) {
this.paused = false;
this.notifyAll();
}
}
}
public boolean isPaused() {
synchronized(this) {
return this.paused;
}
}
protected boolean isAborted() {
synchronized(this) {
if (this.paused) {
try {
this.wait();
}
catch (InterruptedException e){}
}
}
return (this.stopped) || Thread.currentThread().isInterrupted();
}
public boolean isStopped() {
return this.isAlive();
}
public int getJobID() {
return this.jobID;
}
public long getTotalRuntime() {
return (this.globalEnd == 0)?System.currentTimeMillis()-this.globalStart:this.globalEnd-this.globalStart;
}
public long getElapsedTime() {
return System.currentTimeMillis()-this.globalStart;
}
public String getJobType() {
return this.jobType;
}
public File getImportPath() {
return this.importPath;
}
public abstract long getEstimatedTime();
public abstract String getJobName();
public abstract int getProcessingStatusPercent();
}

@ -0,0 +1,115 @@
package de.anomic.plasma.dbImport;
import java.util.Vector;
import de.anomic.plasma.plasmaSwitchboard;
import de.anomic.server.logging.serverLog;
public class dbImportManager {
public final Vector finishedJobs = new Vector();
public final ThreadGroup runningJobs = new ThreadGroup("ImporterThreads");
public int currMaxJobNr = 0;
private plasmaSwitchboard sb;
public dbImportManager(plasmaSwitchboard theSb) {
this.sb = theSb;
}
public int getJobID() {
int jobID;
synchronized(runningJobs) {
jobID = currMaxJobNr;
currMaxJobNr++;
}
return jobID;
}
public dbImporter[] getRunningImporter() {
Thread[] importThreads = new Thread[runningJobs.activeCount()*2];
int activeCount = runningJobs.enumerate(importThreads);
dbImporter[] importers = new dbImporter[activeCount];
for (int i=0; i<activeCount; i++) {
importers[i] = (dbImporter) importThreads[i];
}
return importers;
}
public dbImporter[] getFinishedImporter() {
return (dbImporter[]) finishedJobs.toArray(new dbImporter[finishedJobs.size()]);
}
public dbImporter getImporterByID(int jobID) {
Thread[] importThreads = new Thread[this.runningJobs.activeCount()*2];
int activeCount = this.runningJobs.enumerate(importThreads);
for (int i=0; i < activeCount; i++) {
dbImporter currThread = (dbImporter) importThreads[i];
if (currThread.getJobID() == Integer.valueOf(jobID).intValue()) {
return currThread;
}
}
return null;
}
public dbImporter getNewImporter(String type) {
if (type == null) return null;
if (type.length() == 0) return null;
dbImporter newImporter = null;
if (type.equals("plasmaDB")) {
newImporter = new plasmaDbImporter(this.sb);
} else if (type.equalsIgnoreCase("ASSORTMENT")) {
newImporter = new plasmaWordIndexAssortmentImporter(this.sb);
}
return newImporter;
}
/**
* Can be used to close all still running importer threads
* e.g. on server shutdown
*/
public void close() {
/* waiting for all threads to finish */
int threadCount = runningJobs.activeCount();
Thread[] threadList = new Thread[threadCount];
threadCount = runningJobs.enumerate(threadList);
if (threadCount == 0) return;
serverLog log = new serverLog("DB-IMPORT");
try {
// trying to gracefull stop all still running sessions ...
log.logInfo("Signaling shutdown to " + threadCount + " remaining dbImporter threads ...");
for ( int currentThreadIdx = 0; currentThreadIdx < threadCount; currentThreadIdx++ ) {
Thread currentThread = threadList[currentThreadIdx];
if (currentThread.isAlive()) {
((plasmaDbImporter)currentThread).stopIt();
}
}
// waiting a few ms for the session objects to continue processing
try { Thread.sleep(500); } catch (InterruptedException ex) {}
// interrupting all still running or pooled threads ...
log.logInfo("Sending interruption signal to " + runningJobs.activeCount() + " remaining dbImporter threads ...");
runningJobs.interrupt();
// we need to use a timeout here because of missing interruptable session threads ...
log.logFine("Waiting for " + runningJobs.activeCount() + " remaining dbImporter threads to finish shutdown ...");
for ( int currentThreadIdx = 0; currentThreadIdx < threadCount; currentThreadIdx++ ) {
Thread currentThread = threadList[currentThreadIdx];
if (currentThread.isAlive()) {
log.logFine("Waiting for dbImporter thread '" + currentThread.getName() + "' [" + currentThreadIdx + "] to finish shutdown.");
try { currentThread.join(500); } catch (InterruptedException ex) {}
}
}
log.logInfo("Shutdown of remaining dbImporter threads finished.");
} catch (Exception e) {
log.logSevere("Unexpected error while trying to shutdown all remaining dbImporter threads.",e);
}
}
}

@ -0,0 +1,31 @@
package de.anomic.plasma.dbImport;
import java.io.File;
import de.anomic.plasma.plasmaSwitchboard;
public interface dbImporter {
// functions to pause and continue importing
public boolean isPaused();
public void pauseIt();
public void continueIt();
public void stopIt() throws InterruptedException;
public boolean isStopped();
// getting status information
public long getTotalRuntime();
public long getElapsedTime();
public long getEstimatedTime();
public int getProcessingStatusPercent();
public int getJobID();
public String getJobName();
public String getJobType();
public File getImportPath();
public String getError();
public String getStatus();
public void init(File importPath, int cacheSize);
public void startIt();
}

@ -1,186 +1,81 @@
package de.anomic.plasma;
package de.anomic.plasma.dbImport;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.Vector;
import de.anomic.plasma.plasmaCrawlLURL;
import de.anomic.plasma.plasmaSwitchboard;
import de.anomic.plasma.plasmaWordIndex;
import de.anomic.plasma.plasmaWordIndexEntry;
import de.anomic.plasma.plasmaWordIndexEntryContainer;
import de.anomic.server.serverDate;
import de.anomic.server.logging.serverLog;
public class plasmaDbImporter extends Thread {
public class plasmaDbImporter extends AbstractImporter implements dbImporter {
public static final Vector finishedJobs = new Vector();
public static final ThreadGroup runningJobs = new ThreadGroup("DbImport");
public static int currMaxJobNr = 0;
private plasmaCrawlLURL homeUrlDB;
private plasmaWordIndex homeWordIndex;
private final int jobNr;
private final plasmaCrawlLURL homeUrlDB;
private final plasmaWordIndex homeWordIndex;
private plasmaCrawlLURL importUrlDB;
private plasmaWordIndex importWordIndex;
private int importStartSize;
private final plasmaCrawlLURL importUrlDB;
private final plasmaWordIndex importWordIndex;
//private final String importPath;
private final File importRoot;
private final int importStartSize;
private final serverLog log;
private boolean stopped = false;
private boolean paused = false;
private String wordHash = "------------";
long wordChunkStart = System.currentTimeMillis(), wordChunkEnd = this.wordChunkStart;
String wordChunkStartHash = "------------", wordChunkEndHash;
private long urlCounter = 0, wordCounter = 0, entryCounter = 0;
private long globalStart = System.currentTimeMillis();
private long globalEnd;
private String error;
public void stoppIt() {
this.stopped = true;
this.continueIt();
}
public void pauseIt() {
synchronized(this) {
this.paused = true;
}
}
public void continueIt() {
synchronized(this) {
if (this.paused) {
this.paused = false;
this.notifyAll();
}
}
public plasmaDbImporter(plasmaSwitchboard sb) {
super(sb);
this.jobType = "PLASMADB";
}
public boolean isPaused() {
synchronized(this) {
return this.paused;
}
public String getJobName() {
return this.importPath.toString();
}
/**
* Can be used to close all still running importer threads
* e.g. on server shutdown
*/
public static void close() {
/* waiting for all threads to finish */
int threadCount = runningJobs.activeCount();
Thread[] threadList = new Thread[threadCount];
threadCount = plasmaDbImporter.runningJobs.enumerate(threadList);
public String getStatus() {
StringBuffer theStatus = new StringBuffer();
if (threadCount == 0) return;
theStatus.append("Hash=").append(this.wordHash).append("\n");
theStatus.append("#URL=").append(this.urlCounter).append("\n");
theStatus.append("#Word Entities=").append(this.wordCounter).append("\n");
theStatus.append("#Word Entries=").append(this.entryCounter);
serverLog log = new serverLog("DB-IMPORT");
try {
// trying to gracefull stop all still running sessions ...
log.logInfo("Signaling shutdown to " + threadCount + " remaining dbImporter threads ...");
for ( int currentThreadIdx = 0; currentThreadIdx < threadCount; currentThreadIdx++ ) {
Thread currentThread = threadList[currentThreadIdx];
if (currentThread.isAlive()) {
((plasmaDbImporter)currentThread).stoppIt();
}
}
// waiting a few ms for the session objects to continue processing
try { Thread.sleep(500); } catch (InterruptedException ex) {}
// interrupting all still running or pooled threads ...
log.logInfo("Sending interruption signal to " + runningJobs.activeCount() + " remaining dbImporter threads ...");
plasmaDbImporter.runningJobs.interrupt();
// we need to use a timeout here because of missing interruptable session threads ...
log.logFine("Waiting for " + runningJobs.activeCount() + " remaining dbImporter threads to finish shutdown ...");
for ( int currentThreadIdx = 0; currentThreadIdx < threadCount; currentThreadIdx++ ) {
Thread currentThread = threadList[currentThreadIdx];
if (currentThread.isAlive()) {
log.logFine("Waiting for dbImporter thread '" + currentThread.getName() + "' [" + currentThreadIdx + "] to finish shutdown.");
try { currentThread.join(500); } catch (InterruptedException ex) {}
}
}
log.logInfo("Shutdown of remaining dbImporter threads finished.");
} catch (Exception e) {
log.logSevere("Unexpected error while trying to shutdown all remaining dbImporter threads.",e);
}
}
public String getError() {
return this.error;
}
public int getJobNr() {
return this.jobNr;
}
public String getCurrentWordhash() {
return this.wordHash;
}
public long getUrlCounter() {
return this.urlCounter;
}
public long getWordEntityCounter() {
return this.wordCounter;
}
public long getWordEntryCounter() {
return this.entryCounter;
return theStatus.toString();
}
public File getImportRoot() {
return this.importRoot;
}
public int getImportWordDbSize() {
return this.importWordIndex.size();
}
public plasmaDbImporter(plasmaWordIndex theHomeIndexDB, plasmaCrawlLURL theHomeUrlDB, String theImportPath) {
super(runningJobs,"DB-Import_" + theImportPath);
this.log = new serverLog("DB-IMPORT");
synchronized(runningJobs) {
this.jobNr = currMaxJobNr;
currMaxJobNr++;
}
if (theImportPath == null) throw new NullPointerException();
//this.importPath = theImportPath;
this.importRoot = new File(theImportPath);
if (theHomeIndexDB == null) throw new NullPointerException();
this.homeWordIndex = theHomeIndexDB;
if (theHomeUrlDB == null) throw new NullPointerException();
this.homeUrlDB = theHomeUrlDB;
public void init(File theImportPath, int cacheSize) {
super.init(theImportPath);
this.homeWordIndex = this.sb.wordIndex;
this.homeUrlDB = this.sb.urlPool.loadedURL;
this.cacheSize = cacheSize;
if (this.cacheSize < 2*1024*1024) this.cacheSize = 8*1024*1024;
if (this.homeWordIndex.getRoot().equals(this.importRoot)) {
if (this.homeWordIndex.getRoot().equals(this.importPath)) {
throw new IllegalArgumentException("Import and home DB directory must not be equal");
}
// configure import DB
String errorMsg = null;
if (!this.importRoot.exists()) errorMsg = "Import directory does not exist.";
if (!this.importRoot.canRead()) errorMsg = "Import directory is not readable.";
if (!this.importRoot.canWrite()) errorMsg = "Import directory is not writeable";
if (!this.importRoot.isDirectory()) errorMsg = "ImportDirectory is not a directory.";
if (!this.importPath.exists()) errorMsg = "Import directory does not exist.";
if (!this.importPath.canRead()) errorMsg = "Import directory is not readable.";
if (!this.importPath.canWrite()) errorMsg = "Import directory is not writeable";
if (!this.importPath.isDirectory()) errorMsg = "ImportDirectory is not a directory.";
if (errorMsg != null) {
this.log.logSevere(errorMsg + "\nName: " + this.importRoot.getAbsolutePath());
this.log.logSevere(errorMsg + "\nName: " + this.importPath.getAbsolutePath());
throw new IllegalArgumentException(errorMsg);
}
this.log.logFine("Initializing source word index db.");
this.importWordIndex = new plasmaWordIndex(this.importRoot, 8*1024*1024, this.log);
this.importWordIndex = new plasmaWordIndex(this.importPath, this.cacheSize/2, this.log);
this.log.logFine("Initializing import URL db.");
this.importUrlDB = new plasmaCrawlLURL(new File(this.importRoot, "urlHash.db"), 4*1024*1024);
this.importUrlDB = new plasmaCrawlLURL(new File(this.importPath, "urlHash.db"), this.cacheSize/2);
this.importStartSize = this.importWordIndex.size();
}
@ -189,24 +84,19 @@ public class plasmaDbImporter extends Thread {
importWordsDB();
} finally {
this.globalEnd = System.currentTimeMillis();
finishedJobs.add(this);
this.sb.dbImportManager.finishedJobs.add(this);
}
}
public long getTotalRuntime() {
return (this.globalEnd == 0)?System.currentTimeMillis()-this.globalStart:this.globalEnd-this.globalStart;
}
public int getProcessingStatus() {
public int getProcessingStatusPercent() {
// thid seems to be better:
// (this.importStartSize-this.importWordIndex.size())*100/((this.importStartSize==0)?1:this.importStartSize);
// but maxint (2,147,483,647) could be exceeded when WordIndexes reach 20M entries
return (this.importStartSize-this.importWordIndex.size())/((this.importStartSize<100)?1:(this.importStartSize)/100);
}
public long getElapsedTime() {
return System.currentTimeMillis()-this.globalStart;
}
public long getEstimatedTime() {
return (this.wordCounter==0)?0:this.importWordIndex.size()*((System.currentTimeMillis()-this.globalStart)/this.wordCounter);
@ -216,7 +106,7 @@ public class plasmaDbImporter extends Thread {
this.log.logInfo("STARTING DB-IMPORT");
try {
this.log.logInfo("Importing DB from '" + this.importRoot.getAbsolutePath() + "' to '" + this.homeWordIndex.getRoot().getAbsolutePath() + "'.");
this.log.logInfo("Importing DB from '" + this.importPath.getAbsolutePath() + "' to '" + this.homeWordIndex.getRoot().getAbsolutePath() + "'.");
this.log.logInfo("Home word index contains " + this.homeWordIndex.size() + " words and " + this.homeUrlDB.size() + " URLs.");
this.log.logInfo("Import word index contains " + this.importWordIndex.size() + " words and " + this.importUrlDB.size() + " URLs.");
@ -277,7 +167,7 @@ public class plasmaDbImporter extends Thread {
long duration = wordChunkEnd - wordChunkStart;
log.logInfo(wordCounter + " word entities imported " +
"[" + wordChunkStartHash + " .. " + wordChunkEndHash + "] " +
this.getProcessingStatus() + "%\n" +
this.getProcessingStatusPercent() + "%\n" +
"Speed: "+ 500*1000/duration + " word entities/s" +
" | Elapsed time: " + serverDate.intervalToString(getElapsedTime()) +
" | Estimated time: " + serverDate.intervalToString(getEstimatedTime()) + "\n" +
@ -307,17 +197,6 @@ public class plasmaDbImporter extends Thread {
}
}
private boolean isAborted() {
synchronized(this) {
if (this.paused) {
try {
this.wait();
}
catch (InterruptedException e){}
}
}
return (this.stopped) || Thread.currentThread().isInterrupted();
}
}

@ -0,0 +1,122 @@
package de.anomic.plasma.dbImport;
import java.io.File;
import java.util.Iterator;
import de.anomic.plasma.plasmaSwitchboard;
import de.anomic.plasma.plasmaWordIndex;
import de.anomic.plasma.plasmaWordIndexAssortment;
import de.anomic.plasma.plasmaWordIndexEntryContainer;
public class plasmaWordIndexAssortmentImporter extends AbstractImporter implements dbImporter{
private int importStartSize;
private int wordEntityCount = 0;
private int wordEntryCount = 0;
private File importAssortmentFile;
private plasmaWordIndexAssortment assortmentFile;
public plasmaWordIndexAssortmentImporter(plasmaSwitchboard sb) {
super(sb);
this.jobType = "ASSORTMENT";
}
public void init(File importAssortmentFile, int cacheSize) {
super.init(importAssortmentFile);
this.importAssortmentFile = importAssortmentFile;
this.cacheSize = cacheSize;
if (this.cacheSize < 2*1024*1024) this.cacheSize = 8*1024*1024;
String errorMsg = null;
if (!importAssortmentFile.getName().matches("indexAssortment0[0-6][0-9]\\.db")) errorMsg = "AssortmentFile '" + importAssortmentFile + "' has an invalid name.";
if (!importAssortmentFile.exists()) errorMsg = "AssortmentFile '" + importAssortmentFile + "' does not exist.";
else if (importAssortmentFile.isDirectory()) errorMsg = "AssortmentFile '" + importAssortmentFile + "' is a directory.";
else if (!importAssortmentFile.canRead()) errorMsg = "AssortmentFile '" + importAssortmentFile + "' is not readable.";
else if (!importAssortmentFile.canWrite()) errorMsg = "AssortmentFile '" + importAssortmentFile + "' is not writeable.";
File importAssortmentPath = null;
int assortmentNr = -1;
try {
importAssortmentPath = new File(importAssortmentFile.getParent());
assortmentNr = Integer.valueOf(importAssortmentFile.getName().substring("indexAssortment".length(),"indexAssortment".length()+3)).intValue();
if (assortmentNr <1 || assortmentNr > 64) {
errorMsg = "AssortmentFile '" + importAssortmentFile + "' has an invalid name.";
}
} catch (NumberFormatException e) {
errorMsg = "Unable to parse the assortment file number.";
}
if (errorMsg != null) {
this.log.logSevere(errorMsg);
throw new IllegalStateException(errorMsg);
}
this.log.logInfo("Initializing source assortment file");
this.assortmentFile = new plasmaWordIndexAssortment(importAssortmentPath,assortmentNr,8*1024*1024, this.log);
this.importStartSize = this.assortmentFile.size();
}
public long getEstimatedTime() {
return (this.wordEntityCount==0)?0:this.assortmentFile.size()*((System.currentTimeMillis()-this.globalStart)/this.wordEntityCount);
}
public String getJobName() {
return this.getImportPath().toString();
}
public int getProcessingStatusPercent() {
return (this.wordEntityCount)/((this.importStartSize<100)?1:(this.importStartSize)/100);
}
public String getStatus() {
StringBuffer theStatus = new StringBuffer();
theStatus.append("#Word Entities=").append(this.wordEntityCount).append("\n");
theStatus.append("#Word Entries=").append(this.wordEntryCount);
return theStatus.toString();
}
public void run() {
try {
Iterator contentIter = this.assortmentFile.content();
while (contentIter.hasNext()) {
this.wordEntityCount++;
byte[][] row = (byte[][]) contentIter.next();
String hash = new String(row[0]);
plasmaWordIndexEntryContainer container;
try {
container = this.assortmentFile.row2container(hash, row);
} catch (NullPointerException e) {
this.log.logWarning("NullpointerException detected in row with hash '" + hash + "'.");
if (this.wordEntityCount < this.importStartSize) continue;
return;
}
this.wordEntryCount += container.size();
// importing entity container to home db
this.sb.wordIndex.addEntries(container, true);
if (this.wordEntityCount % 500 == 0) {
this.log.logFine(this.wordEntityCount + " word entities processed so far.");
}
if (this.wordEntryCount % 2000 == 0) {
this.log.logFine(this.wordEntryCount + " word entries processed so far.");
}
if (isAborted()) break;
}
} catch (Exception e) {
this.error = e.toString();
this.log.logSevere("Error detected",e);
} finally {
this.globalEnd = System.currentTimeMillis();
this.sb.dbImportManager.finishedJobs.add(this);
this.assortmentFile.close();
}
}
}

@ -132,6 +132,7 @@ import de.anomic.kelondro.kelondroException;
import de.anomic.kelondro.kelondroMSetTools;
import de.anomic.kelondro.kelondroNaturalOrder;
import de.anomic.kelondro.kelondroTables;
import de.anomic.plasma.dbImport.dbImportManager;
import de.anomic.server.serverAbstractSwitch;
import de.anomic.server.serverCodings;
import de.anomic.server.serverDate;
@ -199,6 +200,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
public bookmarksDB bookmarksDB;
//public StringBuffer crl; // local citation references
public StringBuffer crg; // global citation references
public dbImportManager dbImportManager;
/*
* Remote Proxy configuration
@ -566,6 +568,8 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
//plasmaSnippetCache.result scr = snippetCache.retrieve(new URL("http://www.heise.de/security/news/foren/go.shtml?read=1&msg_id=7301419&forum_id=72721"), query, true);
//plasmaSnippetCache.result scr = snippetCache.retrieve(new URL("http://www.heise.de/kiosk/archiv/ct/2003/4/20"), query, true, 260);
this.dbImportManager = new dbImportManager(this);
sb=this;
log.logConfig("Finished Switchboard Initialization");
}
@ -770,7 +774,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
terminateAllThreads(true);
log.logConfig("SWITCHBOARD SHUTDOWN STEP 2: sending termination signal to threaded indexing");
// closing all still running db importer jobs
plasmaDbImporter.close();
this.dbImportManager.close();
indexDistribution.close();
cacheLoader.close();
wikiDB.close();

Loading…
Cancel
Save