diff --git a/htroot/IndexImport_p.html b/htroot/IndexImport_p.html index 6906d8dca..46e3e8516 100644 --- a/htroot/IndexImport_p.html +++ b/htroot/IndexImport_p.html @@ -10,7 +10,6 @@ #%env/templates/submenuIndexControl.template%#

Index DB Import

-

The local index currently consists of (at least) #[wcount]# reverse word indexes and #[ucount]# URL references.


#(error)# @@ -22,13 +21,40 @@

#[error_msg]#

#[error_stackTrace]#

#(/error)# -

Starting new Job

+

Starting new Job

+ + +
- - - - + + + + + + + + + + + + +
Import Path:
Import Type: + Cache Size + Usage Examples
Import Path:
+
Attention:
Always do a backup of your source and destination database before starting to use this import function.
+
@@ -38,15 +64,13 @@

- + + - - - - + @@ -54,25 +78,23 @@ - - + + + - - - - + @@ -87,32 +109,138 @@

PathJob TypeJob Name Status % Elapsed
Time
Estimated
Time
Word Hash# URLs# Word
Entities
# Word
Entries
Import Status Abort Import Pause Import
#[path]##(status)#Finished::Running::Paused#(/status)##[type]##[shortName]##(runningStatus)#Finished::Running::Paused#(/runningStatus)# #[percent]# #[elapsed]# #[estimated]##[wordHash]##[url_num]##[word_entity_num]##[word_entry_num]##[status]# #(stopped)#:: - + #(/stopped)# #(paused)# - + :: - + #(/paused)#
+ - - - - + #{finished.jobs}# - - + + + - - - - + #{/finished.jobs}#
Job Type Path Status % Elapsed
Time
Word Hash# URLs# Word
Entities
# Word
Entries
Import Status
#[path]##(status)#Finished::Error: #[errorMsg]#::Paused#(/status)##[type]##[shortName]##(runningStatus)#Finished::Error: #[errorMsg]#::Paused#(/runningStatus)# #[percent]# #[elapsed]##[wordHash]##[url_num]##[word_entity_num]##[word_entry_num]##[status]#

Last Refresh: #[date]#

+
+

Usage Examples:

+ +

Plasma DB Import:

+

+ Example Path: E:\PLASMADB\
+

+

+Requirements:
+You need to have at least the following directories and files in this path: + + + + + + + + + + + + + + + + + + + + + + + + + +
NameTypeWiteableDescription
urlHash.dbFileNoThe LoadedURL Database containing all loaded and indexed URLs
ACLUSTERDirectoryYesThe assortment directory containing parts of the word index.
WORDSDirectoryYesThe words directory containing parts of the word index.
+

+ + +

Assortment Import:

+

+ Example Path: E:\PLASMADB\ACLUSTER\indexAssortment001.db +

+

+Requirements:
+You need to have at least the following directories and files in this path: + + + + + + + + + + + + + +
NameTypeWiteableDescription
indexAssortment001.dbFileNoThe assortment file that should be imported.
+ Attention: The assortment file must have the postfix "[0-9]{3}\.db". + If you would like to import an assortment file from the PLASMADB\ACLUSTER\ABKP
+

+

+Notes:
+Please note that the imported words are useless if the destination peer doesn't know +the URLs the imported words belongs to. +

+ #%env/templates/footer.template%# \ No newline at end of file diff --git a/htroot/IndexImport_p.java b/htroot/IndexImport_p.java index 7227d54a7..0da035991 100644 --- a/htroot/IndexImport_p.java +++ b/htroot/IndexImport_p.java @@ -54,8 +54,10 @@ import java.util.Date; import java.util.Vector; import de.anomic.http.httpHeader; -import de.anomic.plasma.plasmaDbImporter; import de.anomic.plasma.plasmaSwitchboard; +import de.anomic.plasma.dbImport.dbImportManager; +import de.anomic.plasma.dbImport.dbImporter; +import de.anomic.plasma.dbImport.plasmaDbImporter; import de.anomic.server.serverByteBuffer; import de.anomic.server.serverDate; import de.anomic.server.serverObjects; @@ -75,24 +77,33 @@ public final class IndexImport_p { try { // getting the import path String importPath = (String) post.get("importPath"); + String importType = (String) post.get("importType"); + String cacheSizeStr = (String) post.get("cacheSize"); + int cacheSize = 8*1024*1024; + try { + cacheSize = Integer.valueOf(cacheSizeStr).intValue(); + } catch (NumberFormatException e) {} boolean startImport = true; - // check if there is an already running thread with the same import path - Thread[] importThreads = new Thread[plasmaDbImporter.runningJobs.activeCount()*2]; - activeCount = plasmaDbImporter.runningJobs.enumerate(importThreads); - - for (int i=0; i < activeCount; i++) { - plasmaDbImporter currThread = (plasmaDbImporter) importThreads[i]; - if (currThread.getImportRoot().equals(new File(importPath))) { - prop.put("error",2); - startImport = false; - } - } +// // check if there is an already running thread with the same import path +// Thread[] importThreads = new Thread[plasmaDbImporter.runningJobs.activeCount()*2]; +// activeCount = plasmaDbImporter.runningJobs.enumerate(importThreads); +// +// for (int i=0; i < activeCount; i++) { +// plasmaDbImporter currThread = (plasmaDbImporter) importThreads[i]; +// if (currThread.getJobName().equals(new File(importPath))) { +// prop.put("error",2); +// startImport = false; +// } +// } +// if (startImport) { - plasmaDbImporter newImporter = new plasmaDbImporter(switchboard.wordIndex,switchboard.urlPool.loadedURL,importPath); - newImporter.start(); - + dbImporter importerThread = switchboard.dbImportManager.getNewImporter(importType); + if (importerThread != null) { + importerThread.init(new File(importPath),cacheSize); + importerThread.startIt(); + } prop.put("LOCATION",""); return prop; } @@ -108,7 +119,7 @@ public final class IndexImport_p { errorOut.close(); } } else if (post.containsKey("clearFinishedJobList")) { - plasmaDbImporter.finishedJobs.clear(); + switchboard.dbImportManager.finishedJobs.clear(); prop.put("LOCATION",""); return prop; } else if ( @@ -117,25 +128,22 @@ public final class IndexImport_p { (post.containsKey("continueIndexDbImport")) ) { // getting the job nr of the thread - String jobNr = (String) post.get("jobNr"); - - Thread[] importThreads = new Thread[plasmaDbImporter.runningJobs.activeCount()*2]; - activeCount = plasmaDbImporter.runningJobs.enumerate(importThreads); - - for (int i=0; i < activeCount; i++) { - plasmaDbImporter currThread = (plasmaDbImporter) importThreads[i]; - if (currThread.getJobNr() == Integer.valueOf(jobNr).intValue()) { - if (post.containsKey("stopIndexDbImport")) { - currThread.stoppIt(); - try { currThread.join(); } catch (InterruptedException e) {e.printStackTrace();} - } else if (post.containsKey("pauseIndexDbImport")) { - currThread.pauseIt(); - } else if (post.containsKey("continueIndexDbImport")) { - currThread.continueIt(); - } - break; - } - } + String jobID = (String) post.get("jobNr"); + dbImporter importer = switchboard.dbImportManager.getImporterByID(Integer.valueOf(jobID).intValue()); + if (importer != null) { + if (post.containsKey("stopIndexDbImport")) { + try { + importer.stopIt(); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } else if (post.containsKey("pauseIndexDbImport")) { + importer.pauseIt(); + } else if (post.containsKey("continueIndexDbImport")) { + importer.continueIt(); + } + } prop.put("LOCATION",""); return prop; } @@ -147,60 +155,64 @@ public final class IndexImport_p { /* * Loop over all currently running jobs */ - Thread[] importThreads = new Thread[plasmaDbImporter.runningJobs.activeCount()*2]; - activeCount = plasmaDbImporter.runningJobs.enumerate(importThreads); + dbImporter[] importThreads = switchboard.dbImportManager.getRunningImporter(); + activeCount = importThreads.length; for (int i=0; i < activeCount; i++) { - plasmaDbImporter currThread = (plasmaDbImporter) importThreads[i]; + dbImporter currThread = importThreads[i]; + // get import type + prop.put("running.jobs_" + i + "_type", currThread.getJobType()); + // root path of the source db - prop.put("running.jobs_" + i + "_path", currThread.getImportRoot().toString()); + String fullName = currThread.getJobName().toString(); + String shortName = (fullName.length()>30)?fullName.substring(0,12) + "..." + fullName.substring(fullName.length()-22,fullName.length()):fullName; + prop.put("running.jobs_" + i + "_fullName",fullName); + prop.put("running.jobs_" + i + "_shortName",shortName); // specifies if the importer is still running - prop.put("running.jobs_" + i + "_stopped", currThread.isAlive() ? 1:0); + prop.put("running.jobs_" + i + "_stopped", currThread.isStopped() ? 1:0); // specifies if the importer was paused prop.put("running.jobs_" + i + "_paused", currThread.isPaused() ? 1:0); // setting the status - prop.put("running.jobs_" + i + "_status", currThread.isPaused() ? 2 : currThread.isAlive() ? 1 : 0); + prop.put("running.jobs_" + i + "_runningStatus", currThread.isPaused() ? 2 : currThread.isStopped() ? 1 : 0); // other information - prop.put("running.jobs_" + i + "_percent", Integer.toString(currThread.getProcessingStatus())); + prop.put("running.jobs_" + i + "_percent", Integer.toString(currThread.getProcessingStatusPercent())); prop.put("running.jobs_" + i + "_elapsed", serverDate.intervalToString(currThread.getElapsedTime())); prop.put("running.jobs_" + i + "_estimated", serverDate.intervalToString(currThread.getEstimatedTime())); - prop.put("running.jobs_" + i + "_wordHash", currThread.getCurrentWordhash()); - prop.put("running.jobs_" + i + "_url_num", Long.toString(currThread.getUrlCounter())); - prop.put("running.jobs_" + i + "_word_entity_num", Long.toString(currThread.getWordEntityCounter())); - prop.put("running.jobs_" + i + "_word_entry_num", Long.toString(currThread.getWordEntryCounter())); + prop.put("running.jobs_" + i + "_status", currThread.getStatus().replace("\n", "
")); // job number of the importer thread - prop.put("running.jobs_" + i + "_job_nr", Integer.toString(currThread.getJobNr())); + prop.put("running.jobs_" + i + "_job_nr", Integer.toString(currThread.getJobID())); } prop.put("running.jobs",activeCount); /* * Loop over all finished jobs */ - Vector finishedJobs = (Vector) plasmaDbImporter.finishedJobs.clone(); - for (int i=0; i30)?fullName.substring(0,12) + "..." + fullName.substring(fullName.length()-22,fullName.length()):fullName; + prop.put("finished.jobs_" + i + "_type", currThread.getJobType()); + prop.put("finished.jobs_" + i + "_fullName", fullName); + prop.put("finished.jobs_" + i + "_shortName", shortName); if (error != null) { - prop.put("finished.jobs_" + i + "_status", 2); - prop.put("finished.jobs_" + i + "_status_errorMsg", error); + prop.put("finished.jobs_" + i + "_runningStatus", 2); + prop.put("finished.jobs_" + i + "_runningStatus_errorMsg", error); } else { - prop.put("finished.jobs_" + i + "_status", 0); + prop.put("finished.jobs_" + i + "_runningStatus", 0); } - prop.put("finished.jobs_" + i + "_percent", Integer.toString(currThread.getProcessingStatus())); + prop.put("finished.jobs_" + i + "_percent", Integer.toString(currThread.getProcessingStatusPercent())); prop.put("finished.jobs_" + i + "_elapsed", serverDate.intervalToString(currThread.getElapsedTime())); - prop.put("finished.jobs_" + i + "_wordHash", currThread.getCurrentWordhash()); - prop.put("finished.jobs_" + i + "_url_num", Long.toString(currThread.getUrlCounter())); - prop.put("finished.jobs_" + i + "_word_entity_num", Long.toString(currThread.getWordEntityCounter())); - prop.put("finished.jobs_" + i + "_word_entry_num", Long.toString(currThread.getWordEntryCounter())); + prop.put("finished.jobs_" + i + "_status", currThread.getStatus().replace("\n", "
")); } - prop.put("finished.jobs",finishedJobs.size()); + prop.put("finished.jobs",finishedJobs.length); prop.put("date",(new Date()).toString()); return prop; diff --git a/source/de/anomic/plasma/dbImport/AbstractImporter.java b/source/de/anomic/plasma/dbImport/AbstractImporter.java new file mode 100644 index 000000000..6383ff478 --- /dev/null +++ b/source/de/anomic/plasma/dbImport/AbstractImporter.java @@ -0,0 +1,113 @@ +package de.anomic.plasma.dbImport; + +import java.io.File; + +import de.anomic.plasma.plasmaSwitchboard; +import de.anomic.server.logging.serverLog; + +public abstract class AbstractImporter extends Thread implements dbImporter{ + + protected int jobID; + protected String jobType; + protected serverLog log; + protected boolean stopped = false; + protected boolean paused = false; + + protected plasmaSwitchboard sb; + protected File importPath; + protected int cacheSize; + + protected long globalStart = System.currentTimeMillis(); + protected long globalEnd; + protected String error; + + public AbstractImporter(plasmaSwitchboard theSb) { + super(theSb.dbImportManager.runningJobs,""); + this.sb = theSb; + } + + public String getError() { + return this.error; + } + + public void init(File theImportPath) { + this.importPath = theImportPath; + + this.jobID = this.sb.dbImportManager.getJobID(); + this.log = new serverLog("IMPORT_" + this.jobType + "_" + this.jobID); + this.setName("IMPORT_" + this.jobType + "_" + this.sb.dbImportManager.getJobID()); + } + + public void startIt() { + this.start(); + } + + public void stopIt() throws InterruptedException { + this.stopped = true; + this.continueIt(); + this.join(); + } + + public void pauseIt() { + synchronized(this) { + this.paused = true; + } + } + + public void continueIt() { + synchronized(this) { + if (this.paused) { + this.paused = false; + this.notifyAll(); + } + } + } + + public boolean isPaused() { + synchronized(this) { + return this.paused; + } + } + + protected boolean isAborted() { + synchronized(this) { + if (this.paused) { + try { + this.wait(); + } + catch (InterruptedException e){} + } + } + + return (this.stopped) || Thread.currentThread().isInterrupted(); + } + + public boolean isStopped() { + return this.isAlive(); + } + + public int getJobID() { + return this.jobID; + } + + public long getTotalRuntime() { + return (this.globalEnd == 0)?System.currentTimeMillis()-this.globalStart:this.globalEnd-this.globalStart; + } + + public long getElapsedTime() { + return System.currentTimeMillis()-this.globalStart; + } + + public String getJobType() { + return this.jobType; + } + + public File getImportPath() { + return this.importPath; + } + + public abstract long getEstimatedTime(); + public abstract String getJobName(); + public abstract int getProcessingStatusPercent(); + +} diff --git a/source/de/anomic/plasma/dbImport/dbImportManager.java b/source/de/anomic/plasma/dbImport/dbImportManager.java new file mode 100644 index 000000000..8a18202ea --- /dev/null +++ b/source/de/anomic/plasma/dbImport/dbImportManager.java @@ -0,0 +1,115 @@ +package de.anomic.plasma.dbImport; + +import java.util.Vector; + +import de.anomic.plasma.plasmaSwitchboard; +import de.anomic.server.logging.serverLog; + +public class dbImportManager { + + public final Vector finishedJobs = new Vector(); + public final ThreadGroup runningJobs = new ThreadGroup("ImporterThreads"); + public int currMaxJobNr = 0; + private plasmaSwitchboard sb; + + public dbImportManager(plasmaSwitchboard theSb) { + this.sb = theSb; + } + + public int getJobID() { + int jobID; + synchronized(runningJobs) { + jobID = currMaxJobNr; + currMaxJobNr++; + } + return jobID; + } + + public dbImporter[] getRunningImporter() { + Thread[] importThreads = new Thread[runningJobs.activeCount()*2]; + int activeCount = runningJobs.enumerate(importThreads); + dbImporter[] importers = new dbImporter[activeCount]; + for (int i=0; i 64) { + errorMsg = "AssortmentFile '" + importAssortmentFile + "' has an invalid name."; + } + } catch (NumberFormatException e) { + errorMsg = "Unable to parse the assortment file number."; + } + + if (errorMsg != null) { + this.log.logSevere(errorMsg); + throw new IllegalStateException(errorMsg); + } + + + this.log.logInfo("Initializing source assortment file"); + this.assortmentFile = new plasmaWordIndexAssortment(importAssortmentPath,assortmentNr,8*1024*1024, this.log); + this.importStartSize = this.assortmentFile.size(); + } + + public long getEstimatedTime() { + return (this.wordEntityCount==0)?0:this.assortmentFile.size()*((System.currentTimeMillis()-this.globalStart)/this.wordEntityCount); + } + + public String getJobName() { + return this.getImportPath().toString(); + } + + public int getProcessingStatusPercent() { + return (this.wordEntityCount)/((this.importStartSize<100)?1:(this.importStartSize)/100); + } + + public String getStatus() { + StringBuffer theStatus = new StringBuffer(); + + theStatus.append("#Word Entities=").append(this.wordEntityCount).append("\n"); + theStatus.append("#Word Entries=").append(this.wordEntryCount); + + return theStatus.toString(); + } + + public void run() { + try { + Iterator contentIter = this.assortmentFile.content(); + while (contentIter.hasNext()) { + this.wordEntityCount++; + + byte[][] row = (byte[][]) contentIter.next(); + String hash = new String(row[0]); + plasmaWordIndexEntryContainer container; + try { + container = this.assortmentFile.row2container(hash, row); + } catch (NullPointerException e) { + this.log.logWarning("NullpointerException detected in row with hash '" + hash + "'."); + if (this.wordEntityCount < this.importStartSize) continue; + return; + } + this.wordEntryCount += container.size(); + + // importing entity container to home db + this.sb.wordIndex.addEntries(container, true); + + if (this.wordEntityCount % 500 == 0) { + this.log.logFine(this.wordEntityCount + " word entities processed so far."); + } + if (this.wordEntryCount % 2000 == 0) { + this.log.logFine(this.wordEntryCount + " word entries processed so far."); + } + if (isAborted()) break; + } + } catch (Exception e) { + this.error = e.toString(); + this.log.logSevere("Error detected",e); + } finally { + this.globalEnd = System.currentTimeMillis(); + this.sb.dbImportManager.finishedJobs.add(this); + this.assortmentFile.close(); + } + } + +} diff --git a/source/de/anomic/plasma/plasmaSwitchboard.java b/source/de/anomic/plasma/plasmaSwitchboard.java index 3e90e9500..f3283aceb 100644 --- a/source/de/anomic/plasma/plasmaSwitchboard.java +++ b/source/de/anomic/plasma/plasmaSwitchboard.java @@ -132,6 +132,7 @@ import de.anomic.kelondro.kelondroException; import de.anomic.kelondro.kelondroMSetTools; import de.anomic.kelondro.kelondroNaturalOrder; import de.anomic.kelondro.kelondroTables; +import de.anomic.plasma.dbImport.dbImportManager; import de.anomic.server.serverAbstractSwitch; import de.anomic.server.serverCodings; import de.anomic.server.serverDate; @@ -199,6 +200,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser public bookmarksDB bookmarksDB; //public StringBuffer crl; // local citation references public StringBuffer crg; // global citation references + public dbImportManager dbImportManager; /* * Remote Proxy configuration @@ -566,6 +568,8 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser //plasmaSnippetCache.result scr = snippetCache.retrieve(new URL("http://www.heise.de/security/news/foren/go.shtml?read=1&msg_id=7301419&forum_id=72721"), query, true); //plasmaSnippetCache.result scr = snippetCache.retrieve(new URL("http://www.heise.de/kiosk/archiv/ct/2003/4/20"), query, true, 260); + this.dbImportManager = new dbImportManager(this); + sb=this; log.logConfig("Finished Switchboard Initialization"); } @@ -770,7 +774,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser terminateAllThreads(true); log.logConfig("SWITCHBOARD SHUTDOWN STEP 2: sending termination signal to threaded indexing"); // closing all still running db importer jobs - plasmaDbImporter.close(); + this.dbImportManager.close(); indexDistribution.close(); cacheLoader.close(); wikiDB.close();