*) new import function for IndexImport_p.html

- can be used to import the crawling queue (noticeUrlDB + stacks)
   

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@1518 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
theli 19 years ago
parent 4fa2be73c3
commit 50d85657b8

@ -30,7 +30,8 @@
<td>Import&nbsp;Type:</td> <td>Import&nbsp;Type:</td>
<td title="the path to the database that should be imported"><select name="importType" size="1"> <td title="the path to the database that should be imported"><select name="importType" size="1">
<option value="plasmaDB">PLASMA DB Import</option> <option value="plasmaDB">PLASMA DB Import</option>
<option value="assortment">Assortment File Import</option> <option value="assortment">Assortment File Import</option>
<option value="NURL">Crawling Queue Import</option>
</select> </select>
</td> </td>
<td title="the cache size that should be used for the import db">Cache Size</td> <td title="the cache size that should be used for the import db">Cache Size</td>
@ -69,7 +70,7 @@
<td class="small" >Status</td> <td class="small" >Status</td>
<td class="small" >%</td> <td class="small" >%</td>
<td class="small" >Elapsed<br>Time</td> <td class="small" >Elapsed<br>Time</td>
<td class="small" >Estimated<br>Time</td> <td class="small" >Time<br>Left</td>
<td class="small" >Import Status</td> <td class="small" >Import Status</td>
<td class="small" >Abort Import</td> <td class="small" >Abort Import</td>
<td class="small" >Pause Import</td> <td class="small" >Pause Import</td>
@ -120,7 +121,7 @@
<tr class="TableCellLight"> <tr class="TableCellLight">
<td class="small">#[type]#</td> <td class="small">#[type]#</td>
<td class="small" title="#[fullName]#">#[shortName]#</td> <td class="small" title="#[fullName]#">#[shortName]#</td>
<td class="small"><font color="#(runningStatus)#red::green::red#(/runningStatus)#">#(runningStatus)#Finished::<b>Error:</b> #[errorMsg]#::Paused#(/runningStatus)#</font></td> <td class="small"><font color="#(runningStatus)#green::red::blue#(/runningStatus)#">#(runningStatus)#Finished::<b>Error:</b> #[errorMsg]#::Paused#(/runningStatus)#</font></td>
<td class="small" align="right">#[percent]#</td> <td class="small" align="right">#[percent]#</td>
<td class="small" align="right">#[elapsed]#</td> <td class="small" align="right">#[elapsed]#</td>
<td class="small" align="right"><tt>#[status]#</tt></td> <td class="small" align="right"><tt>#[status]#</tt></td>
@ -189,7 +190,8 @@ You need to have at least the following directories and files in this path:
<td>No</td> <td>No</td>
<td>The assortment file that should be imported.<br> <td>The assortment file that should be imported.<br>
<b>Attention:</b> The assortment file must have the postfix "[0-9]{3}\.db". <b>Attention:</b> The assortment file must have the postfix "[0-9]{3}\.db".
If you would like to import an assortment file from the <tt>PLASMADB\ACLUSTER\ABKP</tt></td> If you would like to import an assortment file from the <tt>PLASMADB\ACLUSTER\ABKP</tt>
you have to rename it first.</td>
</tr> </tr>
</table> </table>
</p> </p>
@ -199,7 +201,6 @@ Please note that the imported words are useless if the destination peer doesn't
the URLs the imported words belongs to. the URLs the imported words belongs to.
</p> </p>
<!--
<p><h3>Crawling Queue Import:</h3></p> <p><h3>Crawling Queue Import:</h3></p>
<p> <p>
<b>Example Path:</b> <tt>E:\PLASMADB\</tt> <b>Example Path:</b> <tt>E:\PLASMADB\</tt>
@ -240,7 +241,7 @@ You need to have at least the following directories and files in this path:
<tr><td><tt>urlNoticeRemote0.stack</tt></td></tr> <tr><td><tt>urlNoticeRemote0.stack</tt></td></tr>
</table> </table>
</p> </p>
-->
#%env/templates/footer.template%# #%env/templates/footer.template%#
</body> </body>
</html> </html>

@ -171,13 +171,13 @@ public final class IndexImport_p {
prop.put("running.jobs_" + i + "_shortName",shortName); prop.put("running.jobs_" + i + "_shortName",shortName);
// specifies if the importer is still running // specifies if the importer is still running
prop.put("running.jobs_" + i + "_stopped", currThread.isStopped() ? 1:0); prop.put("running.jobs_" + i + "_stopped", currThread.isStopped() ? 0:1);
// specifies if the importer was paused // specifies if the importer was paused
prop.put("running.jobs_" + i + "_paused", currThread.isPaused() ? 1:0); prop.put("running.jobs_" + i + "_paused", currThread.isPaused() ? 1:0);
// setting the status // setting the status
prop.put("running.jobs_" + i + "_runningStatus", currThread.isPaused() ? 2 : currThread.isStopped() ? 1 : 0); prop.put("running.jobs_" + i + "_runningStatus", currThread.isPaused() ? 2 : currThread.isStopped() ? 0 : 1);
// other information // other information
prop.put("running.jobs_" + i + "_percent", Integer.toString(currThread.getProcessingStatusPercent())); prop.put("running.jobs_" + i + "_percent", Integer.toString(currThread.getProcessingStatusPercent()));
@ -202,9 +202,9 @@ public final class IndexImport_p {
prop.put("finished.jobs_" + i + "_type", currThread.getJobType()); prop.put("finished.jobs_" + i + "_type", currThread.getJobType());
prop.put("finished.jobs_" + i + "_fullName", fullName); prop.put("finished.jobs_" + i + "_fullName", fullName);
prop.put("finished.jobs_" + i + "_shortName", shortName); prop.put("finished.jobs_" + i + "_shortName", shortName);
if (error != null) { if (error != null) {
prop.put("finished.jobs_" + i + "_runningStatus", 2); prop.put("finished.jobs_" + i + "_runningStatus", 1);
prop.put("finished.jobs_" + i + "_runningStatus_errorMsg", error); prop.put("finished.jobs_" + i + "_runningStatus_errorMsg", error.replaceAll("\n", "<br>"));
} else { } else {
prop.put("finished.jobs_" + i + "_runningStatus", 0); prop.put("finished.jobs_" + i + "_runningStatus", 0);
} }

@ -31,9 +31,13 @@ public abstract class AbstractImporter extends Thread implements dbImporter{
} }
public void init(File theImportPath) { public void init(File theImportPath) {
if (theImportPath == null) throw new NullPointerException("The Import path must not be null.");
this.importPath = theImportPath; this.importPath = theImportPath;
// getting a job id from the import manager
this.jobID = this.sb.dbImportManager.getJobID(); this.jobID = this.sb.dbImportManager.getJobID();
// initializing the logger and setting a more verbose thread name
this.log = new serverLog("IMPORT_" + this.jobType + "_" + this.jobID); this.log = new serverLog("IMPORT_" + this.jobType + "_" + this.jobID);
this.setName("IMPORT_" + this.jobType + "_" + this.sb.dbImportManager.getJobID()); this.setName("IMPORT_" + this.jobType + "_" + this.sb.dbImportManager.getJobID());
} }
@ -83,7 +87,7 @@ public abstract class AbstractImporter extends Thread implements dbImporter{
} }
public boolean isStopped() { public boolean isStopped() {
return this.isAlive(); return !this.isAlive();
} }
public int getJobID() { public int getJobID() {
@ -95,7 +99,7 @@ public abstract class AbstractImporter extends Thread implements dbImporter{
} }
public long getElapsedTime() { public long getElapsedTime() {
return System.currentTimeMillis()-this.globalStart; return isStopped()?this.globalEnd-this.globalStart:System.currentTimeMillis()-this.globalStart;
} }
public String getJobType() { public String getJobType() {

@ -62,6 +62,8 @@ public class dbImportManager {
newImporter = new plasmaDbImporter(this.sb); newImporter = new plasmaDbImporter(this.sb);
} else if (type.equalsIgnoreCase("ASSORTMENT")) { } else if (type.equalsIgnoreCase("ASSORTMENT")) {
newImporter = new plasmaWordIndexAssortmentImporter(this.sb); newImporter = new plasmaWordIndexAssortmentImporter(this.sb);
} else if (type.equalsIgnoreCase("NURL")) {
newImporter = new plasmaCrawlNURLImporter(this.sb);
} }
return newImporter; return newImporter;
} }
@ -71,10 +73,13 @@ public class dbImportManager {
* e.g. on server shutdown * e.g. on server shutdown
*/ */
public void close() { public void close() {
/* clear the finished thread list */
this.finishedJobs.clear();
/* waiting for all threads to finish */ /* waiting for all threads to finish */
int threadCount = runningJobs.activeCount(); int threadCount = this.runningJobs.activeCount();
Thread[] threadList = new Thread[threadCount]; Thread[] threadList = new Thread[threadCount];
threadCount = runningJobs.enumerate(threadList); threadCount = this.runningJobs.enumerate(threadList);
if (threadCount == 0) return; if (threadCount == 0) return;

@ -0,0 +1,212 @@
package de.anomic.plasma.dbImport;
import java.io.File;
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.TreeMap;
import de.anomic.plasma.plasmaCrawlNURL;
import de.anomic.plasma.plasmaCrawlProfile;
import de.anomic.plasma.plasmaSwitchboard;
import de.anomic.plasma.plasmaCrawlNURL.Entry;
public class plasmaCrawlNURLImporter extends AbstractImporter implements
dbImporter {
private HashSet importProfileHandleCache = new HashSet();
private plasmaCrawlProfile importProfileDB;
private plasmaCrawlNURL importNurlDB;
private int importStartSize;
private int urlCount = 0;
private int profileCount = 0;
public plasmaCrawlNURLImporter(plasmaSwitchboard theSb) {
super(theSb);
this.jobType="NURL";
}
public long getEstimatedTime() {
return (this.urlCount==0)?0:((this.importStartSize*getElapsedTime())/(this.urlCount))-getElapsedTime();
}
public String getJobName() {
return this.importPath.toString();
}
public int getProcessingStatusPercent() {
return (this.urlCount)/((this.importStartSize<100)?1:(this.importStartSize)/100);
}
public String getStatus() {
StringBuffer theStatus = new StringBuffer();
theStatus.append("#URLs=").append(this.urlCount).append("\n");
theStatus.append("#Profiles=").append(this.profileCount);
return theStatus.toString();
}
public void init(File theImportPath, int theCacheSize) {
super.init(theImportPath);
this.cacheSize = theCacheSize;
File noticeUrlDbFile = new File(this.importPath,"urlNotice1.db");
File profileDbFile = new File(this.importPath, "crawlProfiles0.db");
String errorMsg = null;
if (!this.importPath.exists())
errorMsg = "The import path '" + this.importPath + "' does not exist.";
else if (!this.importPath.isDirectory())
errorMsg = "The import path '" + this.importPath + "' is not a directory.";
else if (!this.importPath.canRead())
errorMsg = "The import path '" + this.importPath + "' is not readable.";
else if (!this.importPath.canWrite())
errorMsg = "The import path '" + this.importPath + "' is not writeable.";
else if (!noticeUrlDbFile.exists())
errorMsg = "The noticeUrlDB file '" + noticeUrlDbFile + "' does not exist.";
else if (noticeUrlDbFile.isDirectory())
errorMsg = "The noticeUrlDB file '" + noticeUrlDbFile + "' is not a file.";
else if (!noticeUrlDbFile.canRead())
errorMsg = "The noticeUrlDB file '" + noticeUrlDbFile + "' is not readable.";
else if (!noticeUrlDbFile.canWrite())
errorMsg = "The noticeUrlDB file '" + noticeUrlDbFile + "' is not writeable.";
else if (!profileDbFile.exists())
errorMsg = "The profileDB file '" + profileDbFile + "' does not exist.";
else if (profileDbFile.isDirectory())
errorMsg = "The profileDB file '" + profileDbFile + "' is not a file.";
else if (!profileDbFile.canRead())
errorMsg = "The profileDB file '" + profileDbFile + "' is not readable.";
// else if (!profileDbFile.canWrite())
// errorMsg = "The profileDB file '" + profileDbFile + "' is not writeable.";
if (errorMsg != null) {
this.log.logSevere(errorMsg);
throw new IllegalArgumentException(errorMsg);
}
// init noticeUrlDB
this.log.logInfo("Initializing the source noticeUrlDB");
this.importNurlDB = new plasmaCrawlNURL(this.importPath, this.cacheSize*(3/4));
this.importStartSize = this.importNurlDB.size();
int stackSize = this.importNurlDB.stackSize();
// init profile DB
this.log.logInfo("Initializing the source profileDB");
this.importProfileDB = new plasmaCrawlProfile(profileDbFile,this.cacheSize*(1/3));
}
public void run() {
try {
// waiting on init thread to finish
this.importNurlDB.waitOnInitThread();
// the stack types we want to import
int[] stackTypes = new int[] {plasmaCrawlNURL.STACK_TYPE_CORE,
plasmaCrawlNURL.STACK_TYPE_LIMIT,
plasmaCrawlNURL.STACK_TYPE_REMOTE,
-1};
// looping through the various stacks
for (int i=0; i< stackTypes.length; i++) {
if (stackTypes[i] != -1) {
this.log.logInfo("Starting to import stacktype '" + stackTypes[i] + "' containing '" + this.importNurlDB.stackSize(stackTypes[i]) + "' entries.");
} else {
this.log.logInfo("Starting to import '" + this.importNurlDB.size() + "' entries not available in any stack.");
}
// getting an interator and loop through the URL entries
Iterator iter = (stackTypes[i] == -1)?this.importNurlDB.urlHashes("------------", true):null;
while (true) {
String nextHash = null;
Entry urlEntry = null;
try {
if (stackTypes[i] != -1) {
if (this.importNurlDB.stackSize(stackTypes[i]) == 0) break;
this.urlCount++;
urlEntry = this.importNurlDB.pop(stackTypes[i]);
nextHash = urlEntry.hash();
} else {
if (!iter.hasNext()) break;
this.urlCount++;
nextHash = (String)iter.next();
urlEntry = this.importNurlDB.getEntry(nextHash);
}
} catch (IOException e) {
this.log.logWarning("Unable to import entry: " + e.toString());
if ((stackTypes[i] != -1) &&(this.importNurlDB.stackSize(stackTypes[i]) == 0)) break;
continue;
}
// getting a handler to the crawling profile the url belongs to
try {
String profileHandle = urlEntry.profileHandle();
if (profileHandle == null) {
this.log.logWarning("Profile handle of url entry '" + nextHash + "' unknown.");
continue;
}
// if we havn't imported the profile until yet we need to do it now
if (!this.importProfileHandleCache.contains(profileHandle)) {
// testing if the profile is already known
plasmaCrawlProfile.entry profileEntry = this.sb.profiles.getEntry(profileHandle);
// if not we need to import it
if (profileEntry == null) {
// copy and store the source profile entry into the destination db
plasmaCrawlProfile.entry sourceEntry = this.importProfileDB.getEntry(profileHandle);
if (sourceEntry != null) {
this.profileCount++;
this.importProfileHandleCache.add(profileHandle);
this.sb.profiles.newEntry((TreeMap)((TreeMap)sourceEntry.map()).clone());
} else {
this.log.logWarning("Profile '" + profileHandle + "' of url entry '" + nextHash + "' unknown.");
continue;
}
}
}
// if the url does not alredy exists in the destination stack we insert it now
if (!this.sb.urlPool.noticeURL.existsInStack(nextHash)) {
this.sb.urlPool.noticeURL.newEntry(urlEntry,(stackTypes[i] != -1)?stackTypes[i]:plasmaCrawlNURL.STACK_TYPE_CORE);
}
// removing hash from the import db
} finally {
this.importNurlDB.remove(nextHash);
}
if (this.urlCount % 100 == 0) {
this.log.logFine(this.urlCount + " URLs and '" + this.profileCount + "' profile entries processed so far.");
}
if (this.isAborted()) break;
}
this.log.logInfo("Finished to import stacktype '" + stackTypes[i] + "'");
}
int size = this.importNurlDB.size();
int stackSize = this.importNurlDB.stackSize();
// TODO: what todo with nurlDB entries that do not exist in any stack?
} catch (Exception e) {
this.error = e.toString();
this.log.logSevere("Import process had detected an error",e);
} finally {
this.log.logInfo("Import process finished.");
this.globalEnd = System.currentTimeMillis();
this.sb.dbImportManager.finishedJobs.add(this);
this.importNurlDB.close();
this.importProfileDB.close();
}
}
}

@ -3,8 +3,6 @@ package de.anomic.plasma.dbImport;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.Iterator; import java.util.Iterator;
import java.util.Vector;
import de.anomic.plasma.plasmaCrawlLURL; import de.anomic.plasma.plasmaCrawlLURL;
import de.anomic.plasma.plasmaSwitchboard; import de.anomic.plasma.plasmaSwitchboard;
import de.anomic.plasma.plasmaWordIndex; import de.anomic.plasma.plasmaWordIndex;
@ -29,8 +27,8 @@ public class plasmaDbImporter extends AbstractImporter implements dbImporter {
private long urlCounter = 0, wordCounter = 0, entryCounter = 0; private long urlCounter = 0, wordCounter = 0, entryCounter = 0;
public plasmaDbImporter(plasmaSwitchboard sb) { public plasmaDbImporter(plasmaSwitchboard theSb) {
super(sb); super(theSb);
this.jobType = "PLASMADB"; this.jobType = "PLASMADB";
} }
@ -49,12 +47,12 @@ public class plasmaDbImporter extends AbstractImporter implements dbImporter {
return theStatus.toString(); return theStatus.toString();
} }
public void init(File theImportPath, int cacheSize) { public void init(File theImportPath, int theCacheSize) {
super.init(theImportPath); super.init(theImportPath);
this.homeWordIndex = this.sb.wordIndex; this.homeWordIndex = this.sb.wordIndex;
this.homeUrlDB = this.sb.urlPool.loadedURL; this.homeUrlDB = this.sb.urlPool.loadedURL;
this.cacheSize = cacheSize; this.cacheSize = theCacheSize;
if (this.cacheSize < 2*1024*1024) this.cacheSize = 8*1024*1024; if (this.cacheSize < 2*1024*1024) this.cacheSize = 8*1024*1024;
if (this.homeWordIndex.getRoot().equals(this.importPath)) { if (this.homeWordIndex.getRoot().equals(this.importPath)) {
@ -94,12 +92,13 @@ public class plasmaDbImporter extends AbstractImporter implements dbImporter {
// thid seems to be better: // thid seems to be better:
// (this.importStartSize-this.importWordIndex.size())*100/((this.importStartSize==0)?1:this.importStartSize); // (this.importStartSize-this.importWordIndex.size())*100/((this.importStartSize==0)?1:this.importStartSize);
// but maxint (2,147,483,647) could be exceeded when WordIndexes reach 20M entries // but maxint (2,147,483,647) could be exceeded when WordIndexes reach 20M entries
return (this.importStartSize-this.importWordIndex.size())/((this.importStartSize<100)?1:(this.importStartSize)/100); //return (this.importStartSize-this.importWordIndex.size())/((this.importStartSize<100)?1:(this.importStartSize)/100);
return (int)(this.wordCounter)/((this.importStartSize<100)?1:(this.importStartSize)/100);
} }
public long getEstimatedTime() { public long getEstimatedTime() {
return (this.wordCounter==0)?0:this.importWordIndex.size()*((System.currentTimeMillis()-this.globalStart)/this.wordCounter); return (this.wordCounter==0)?0:((this.importStartSize*getElapsedTime())/this.wordCounter)-getElapsedTime();
} }
public void importWordsDB() { public void importWordsDB() {
@ -112,14 +111,14 @@ public class plasmaDbImporter extends AbstractImporter implements dbImporter {
// iterate over all words from import db // iterate over all words from import db
Iterator importWordHashIterator = this.importWordIndex.wordHashes(wordChunkStartHash, true, false); Iterator importWordHashIterator = this.importWordIndex.wordHashes(this.wordChunkStartHash, true, false);
while (!isAborted() && importWordHashIterator.hasNext()) { while (!isAborted() && importWordHashIterator.hasNext()) {
plasmaWordIndexEntryContainer newContainer; plasmaWordIndexEntryContainer newContainer = null;
try { try {
wordCounter++; this.wordCounter++;
wordHash = (String) importWordHashIterator.next(); this.wordHash = (String) importWordHashIterator.next();
newContainer = importWordIndex.getContainer(wordHash, true, -1); newContainer = this.importWordIndex.getContainer(this.wordHash, true, -1);
if (newContainer.size() == 0) continue; if (newContainer.size() == 0) continue;
@ -132,22 +131,22 @@ public class plasmaDbImporter extends AbstractImporter implements dbImporter {
if (isAborted()) break; if (isAborted()) break;
// getting next word index entry // getting next word index entry
entryCounter++; this.entryCounter++;
importWordIdxEntry = (plasmaWordIndexEntry) importWordIdxEntries.next(); importWordIdxEntry = (plasmaWordIndexEntry) importWordIdxEntries.next();
String urlHash = importWordIdxEntry.getUrlHash(); String urlHash = importWordIdxEntry.getUrlHash();
if ((this.importUrlDB.exists(urlHash)) && (!this.homeUrlDB.exists(urlHash))) try { if ((this.importUrlDB.exists(urlHash)) && (!this.homeUrlDB.exists(urlHash))) try {
// importing the new url // importing the new url
plasmaCrawlLURL.Entry urlEntry = this.importUrlDB.getEntry(urlHash, importWordIdxEntry); plasmaCrawlLURL.Entry urlEntry = this.importUrlDB.getEntry(urlHash, importWordIdxEntry);
urlCounter++; this.urlCounter++;
this.homeUrlDB.newEntry(urlEntry); this.homeUrlDB.newEntry(urlEntry);
if (urlCounter % 500 == 0) { if (this.urlCounter % 500 == 0) {
this.log.logFine(urlCounter + " URLs processed so far."); this.log.logFine(this.urlCounter + " URLs processed so far.");
} }
} catch (IOException e) {} } catch (IOException e) {}
if (entryCounter % 500 == 0) { if (this.entryCounter % 500 == 0) {
this.log.logFine(entryCounter + " word entries and " + wordCounter + " word entities processed so far."); this.log.logFine(this.entryCounter + " word entries and " + this.wordCounter + " word entities processed so far.");
} }
} }
@ -155,45 +154,45 @@ public class plasmaDbImporter extends AbstractImporter implements dbImporter {
if (isAborted()) break; if (isAborted()) break;
// importing entity container to home db // importing entity container to home db
homeWordIndex.addEntries(newContainer, true); this.homeWordIndex.addEntries(newContainer, false);
// delete complete index entity file // delete complete index entity file
importWordIndex.deleteIndex(wordHash); this.importWordIndex.deleteIndex(this.wordHash);
// print out some statistical information // print out some statistical information
if (wordCounter%500 == 0) { if (this.wordCounter%500 == 0) {
wordChunkEndHash = wordHash; this.wordChunkEndHash = this.wordHash;
wordChunkEnd = System.currentTimeMillis(); this.wordChunkEnd = System.currentTimeMillis();
long duration = wordChunkEnd - wordChunkStart; long duration = this.wordChunkEnd - this.wordChunkStart;
log.logInfo(wordCounter + " word entities imported " + this.log.logInfo(this.wordCounter + " word entities imported " +
"[" + wordChunkStartHash + " .. " + wordChunkEndHash + "] " + "[" + this.wordChunkStartHash + " .. " + this.wordChunkEndHash + "] " +
this.getProcessingStatusPercent() + "%\n" + this.getProcessingStatusPercent() + "%\n" +
"Speed: "+ 500*1000/duration + " word entities/s" + "Speed: "+ 500*1000/duration + " word entities/s" +
" | Elapsed time: " + serverDate.intervalToString(getElapsedTime()) + " | Elapsed time: " + serverDate.intervalToString(getElapsedTime()) +
" | Estimated time: " + serverDate.intervalToString(getEstimatedTime()) + "\n" + " | Estimated time: " + serverDate.intervalToString(getEstimatedTime()) + "\n" +
"Home Words = " + homeWordIndex.size() + "Home Words = " + this.homeWordIndex.size() +
" | Import Words = " + importWordIndex.size()); " | Import Words = " + this.importWordIndex.size());
wordChunkStart = wordChunkEnd; this.wordChunkStart = this.wordChunkEnd;
wordChunkStartHash = wordChunkEndHash; this.wordChunkStartHash = this.wordChunkEndHash;
} }
} catch (Exception e) { } catch (Exception e) {
log.logSevere("Import of word entity '" + wordHash + "' failed.",e); this.log.logSevere("Import of word entity '" + this.wordHash + "' failed.",e);
} finally { } finally {
if (newContainer != null) newContainer.clear();
} }
} }
this.log.logInfo("Home word index contains " + homeWordIndex.size() + " words and " + homeUrlDB.size() + " URLs."); this.log.logInfo("Home word index contains " + this.homeWordIndex.size() + " words and " + this.homeUrlDB.size() + " URLs.");
this.log.logInfo("Import word index contains " + importWordIndex.size() + " words and " + importUrlDB.size() + " URLs."); this.log.logInfo("Import word index contains " + this.importWordIndex.size() + " words and " + this.importUrlDB.size() + " URLs.");
this.log.logInfo("DB-IMPORT FINISHED");
} catch (Exception e) { } catch (Exception e) {
this.log.logSevere("Database import failed.",e); this.log.logSevere("Database import failed.",e);
e.printStackTrace(); e.printStackTrace();
this.error = e.toString(); this.error = e.toString();
} finally { } finally {
if (importUrlDB != null) try { importUrlDB.close(); } catch (Exception e){} this.log.logInfo("Import process finished.");
if (importWordIndex != null) try { importWordIndex.close(5000); } catch (Exception e){} if (this.importUrlDB != null) try { this.importUrlDB.close(); } catch (Exception e){}
if (this.importWordIndex != null) try { this.importWordIndex.close(5000); } catch (Exception e){}
} }
} }

@ -4,7 +4,6 @@ import java.io.File;
import java.util.Iterator; import java.util.Iterator;
import de.anomic.plasma.plasmaSwitchboard; import de.anomic.plasma.plasmaSwitchboard;
import de.anomic.plasma.plasmaWordIndex;
import de.anomic.plasma.plasmaWordIndexAssortment; import de.anomic.plasma.plasmaWordIndexAssortment;
import de.anomic.plasma.plasmaWordIndexEntryContainer; import de.anomic.plasma.plasmaWordIndexEntryContainer;
@ -22,27 +21,33 @@ public class plasmaWordIndexAssortmentImporter extends AbstractImporter implemen
this.jobType = "ASSORTMENT"; this.jobType = "ASSORTMENT";
} }
public void init(File importAssortmentFile, int cacheSize) { public void init(File theImportAssortmentFile, int theCacheSize) {
super.init(importAssortmentFile); super.init(theImportAssortmentFile);
this.importAssortmentFile = importAssortmentFile; this.importAssortmentFile = theImportAssortmentFile;
this.cacheSize = cacheSize; this.cacheSize = theCacheSize;
if (this.cacheSize < 2*1024*1024) this.cacheSize = 8*1024*1024; if (this.cacheSize < 2*1024*1024) this.cacheSize = 2*1024*1024;
String errorMsg = null; String errorMsg = null;
if (!importAssortmentFile.getName().matches("indexAssortment0[0-6][0-9]\\.db")) errorMsg = "AssortmentFile '" + importAssortmentFile + "' has an invalid name."; if (!this.importAssortmentFile.getName().matches("indexAssortment0[0-6][0-9]\\.db"))
if (!importAssortmentFile.exists()) errorMsg = "AssortmentFile '" + importAssortmentFile + "' does not exist."; errorMsg = "AssortmentFile '" + this.importAssortmentFile + "' has an invalid name.";
else if (importAssortmentFile.isDirectory()) errorMsg = "AssortmentFile '" + importAssortmentFile + "' is a directory."; if (!this.importAssortmentFile.exists())
else if (!importAssortmentFile.canRead()) errorMsg = "AssortmentFile '" + importAssortmentFile + "' is not readable."; errorMsg = "AssortmentFile '" + this.importAssortmentFile + "' does not exist.";
else if (!importAssortmentFile.canWrite()) errorMsg = "AssortmentFile '" + importAssortmentFile + "' is not writeable."; else if (this.importAssortmentFile.isDirectory())
errorMsg = "AssortmentFile '" + this.importAssortmentFile + "' is a directory.";
else if (!this.importAssortmentFile.canRead())
errorMsg = "AssortmentFile '" + this.importAssortmentFile + "' is not readable.";
else if (!this.importAssortmentFile.canWrite())
errorMsg = "AssortmentFile '" + this.importAssortmentFile + "' is not writeable.";
// getting the assortment length
File importAssortmentPath = null; File importAssortmentPath = null;
int assortmentNr = -1; int assortmentNr = -1;
try { try {
importAssortmentPath = new File(importAssortmentFile.getParent()); importAssortmentPath = new File(this.importAssortmentFile.getParent());
assortmentNr = Integer.valueOf(importAssortmentFile.getName().substring("indexAssortment".length(),"indexAssortment".length()+3)).intValue(); assortmentNr = Integer.valueOf(this.importAssortmentFile.getName().substring("indexAssortment".length(),"indexAssortment".length()+3)).intValue();
if (assortmentNr <1 || assortmentNr > 64) { if (assortmentNr <1 || assortmentNr > 64) {
errorMsg = "AssortmentFile '" + importAssortmentFile + "' has an invalid name."; errorMsg = "AssortmentFile '" + this.importAssortmentFile + "' has an invalid name.";
} }
} catch (NumberFormatException e) { } catch (NumberFormatException e) {
errorMsg = "Unable to parse the assortment file number."; errorMsg = "Unable to parse the assortment file number.";
@ -53,14 +58,14 @@ public class plasmaWordIndexAssortmentImporter extends AbstractImporter implemen
throw new IllegalStateException(errorMsg); throw new IllegalStateException(errorMsg);
} }
// initializing the import assortment db
this.log.logInfo("Initializing source assortment file"); this.log.logInfo("Initializing source assortment file");
this.assortmentFile = new plasmaWordIndexAssortment(importAssortmentPath,assortmentNr,8*1024*1024, this.log); this.assortmentFile = new plasmaWordIndexAssortment(importAssortmentPath,assortmentNr,8*1024*1024, this.log);
this.importStartSize = this.assortmentFile.size(); this.importStartSize = this.assortmentFile.size();
} }
public long getEstimatedTime() { public long getEstimatedTime() {
return (this.wordEntityCount==0)?0:this.assortmentFile.size()*((System.currentTimeMillis()-this.globalStart)/this.wordEntityCount); return (this.wordEntityCount==0)?0:((this.assortmentFile.size()*getElapsedTime())/(this.wordEntityCount))-getElapsedTime();
} }
public String getJobName() { public String getJobName() {
@ -82,12 +87,18 @@ public class plasmaWordIndexAssortmentImporter extends AbstractImporter implemen
public void run() { public void run() {
try { try {
// getting a content interator
Iterator contentIter = this.assortmentFile.content(); Iterator contentIter = this.assortmentFile.content();
while (contentIter.hasNext()) { while (contentIter.hasNext()) {
this.wordEntityCount++; this.wordEntityCount++;
// getting next entry as byte array
byte[][] row = (byte[][]) contentIter.next(); byte[][] row = (byte[][]) contentIter.next();
// getting the word hash
String hash = new String(row[0]); String hash = new String(row[0]);
// creating an word entry container
plasmaWordIndexEntryContainer container; plasmaWordIndexEntryContainer container;
try { try {
container = this.assortmentFile.row2container(hash, row); container = this.assortmentFile.row2container(hash, row);
@ -99,7 +110,7 @@ public class plasmaWordIndexAssortmentImporter extends AbstractImporter implemen
this.wordEntryCount += container.size(); this.wordEntryCount += container.size();
// importing entity container to home db // importing entity container to home db
this.sb.wordIndex.addEntries(container, true); this.sb.wordIndex.addEntries(container, false);
if (this.wordEntityCount % 500 == 0) { if (this.wordEntityCount % 500 == 0) {
this.log.logFine(this.wordEntityCount + " word entities processed so far."); this.log.logFine(this.wordEntityCount + " word entities processed so far.");
@ -111,8 +122,9 @@ public class plasmaWordIndexAssortmentImporter extends AbstractImporter implemen
} }
} catch (Exception e) { } catch (Exception e) {
this.error = e.toString(); this.error = e.toString();
this.log.logSevere("Error detected",e); this.log.logSevere("Import process had detected an error",e);
} finally { } finally {
this.log.logInfo("Import process finished.");
this.globalEnd = System.currentTimeMillis(); this.globalEnd = System.currentTimeMillis();
this.sb.dbImportManager.finishedJobs.add(this); this.sb.dbImportManager.finishedJobs.add(this);
this.assortmentFile.close(); this.assortmentFile.close();

@ -99,6 +99,7 @@ public class plasmaCrawlNURL extends plasmaURL {
private final HashSet stackIndex; // to find out if a specific link is already on any stack private final HashSet stackIndex; // to find out if a specific link is already on any stack
private File cacheStacksPath; private File cacheStacksPath;
private int bufferkb; private int bufferkb;
initStackIndex initThead;
public plasmaCrawlNURL(File cacheStacksPath, int bufferkb) { public plasmaCrawlNURL(File cacheStacksPath, int bufferkb) {
super(); super();
@ -145,7 +146,17 @@ public class plasmaCrawlNURL extends plasmaURL {
// init stack Index // init stack Index
stackIndex = new HashSet(); stackIndex = new HashSet();
new initStackIndex().start(); (initThead = new initStackIndex()).start();
}
public void waitOnInitThread() {
try {
if (this.initThead != null) {
this.initThead.join();
}
} catch (NullPointerException e) {
} catch (InterruptedException e) {}
} }
private void openHashCache() { private void openHashCache() {
@ -229,6 +240,7 @@ public class plasmaCrawlNURL extends plasmaURL {
} catch (Exception e) { } catch (Exception e) {
musicStack = kelondroStack.reset(musicStack); musicStack = kelondroStack.reset(musicStack);
} }
plasmaCrawlNURL.this.initThead = null;
} }
} }
@ -277,6 +289,22 @@ public class plasmaCrawlNURL extends plasmaURL {
push(stackMode, url.getHost(), e.hash); push(stackMode, url.getHost(), e.hash);
return e; return e;
} }
public synchronized Entry newEntry(Entry oldEntry, int stackMode) {
if (oldEntry == null) return null;
return newEntry(
oldEntry.initiator(),
oldEntry.url(),
oldEntry.loaddate(),
oldEntry.referrerHash(),
oldEntry.name(),
oldEntry.profileHandle(),
oldEntry.depth(),
oldEntry.anchors,
oldEntry.forkfactor,
stackMode
);
}
private void push(int stackType, String domain, String hash) { private void push(int stackType, String domain, String hash) {
try { try {

@ -150,6 +150,30 @@ public class plasmaCrawlProfile {
} catch (IOException e) {} } catch (IOException e) {}
} }
public entry newEntry(Map mem) {
entry ne = new entry(mem);
try {
profileTable.set(ne.handle(), ne.map());
} catch (kelondroException e) {
resetDatabase();
try {
profileTable.set(ne.handle(), ne.map());
} catch (IOException ee) {
e.printStackTrace();
System.exit(0);
}
} catch (IOException e) {
resetDatabase();
try {
profileTable.set(ne.handle(), ne.map());
} catch (IOException ee) {
e.printStackTrace();
System.exit(0);
}
}
return ne;
}
public entry newEntry(String name, String startURL, String generalFilter, String specificFilter, public entry newEntry(String name, String startURL, String generalFilter, String specificFilter,
int generalDepth, int specificDepth, int generalDepth, int specificDepth,
boolean crawlingQ, boolean crawlingQ,

@ -853,6 +853,10 @@ public final class serverCore extends serverAbstractThread implements serverThre
} }
} }
public long getRequestStartTime() {
return this.start;
}
public long getTime() { public long getTime() {
return System.currentTimeMillis() - this.start; return System.currentTimeMillis() - this.start;
} }

Loading…
Cancel
Save