- preparation of parsing/indexing queue for concurrent execution

- remote crawl receipts are now transmitted concurrently in separate threads (makes remove crawls much faster!)

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@4605 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
orbiter 17 years ago
parent 9b0e20fb06
commit 968c775025

@ -3,7 +3,7 @@ javacSource=1.5
javacTarget=1.5 javacTarget=1.5
# Release Configuration # Release Configuration
releaseVersion=0.575 releaseVersion=0.576
stdReleaseFile=yacy_v${releaseVersion}_${DSTAMP}_${releaseNr}.tar.gz stdReleaseFile=yacy_v${releaseVersion}_${DSTAMP}_${releaseNr}.tar.gz
embReleaseFile=yacy_emb_v${releaseVersion}_${DSTAMP}_${releaseNr}.tar.gz embReleaseFile=yacy_emb_v${releaseVersion}_${DSTAMP}_${releaseNr}.tar.gz
proReleaseFile=yacy_pro_v${releaseVersion}_${DSTAMP}_${releaseNr}.tar.gz proReleaseFile=yacy_pro_v${releaseVersion}_${DSTAMP}_${releaseNr}.tar.gz

@ -84,7 +84,7 @@ public class IndexCreateIndexingQueue_p {
if (post.containsKey("clearIndexingQueue")) { if (post.containsKey("clearIndexingQueue")) {
try { try {
synchronized (switchboard.sbQueue) { synchronized (switchboard.sbQueue) {
plasmaSwitchboardQueue.Entry entry = null; plasmaSwitchboardQueue.QueueEntry entry = null;
while ((entry = switchboard.sbQueue.pop()) != null) { while ((entry = switchboard.sbQueue.pop()) != null) {
if ((entry != null) && (entry.profile() != null) && (!(entry.profile().storeHTCache()))) { if ((entry != null) && (entry.profile() != null) && (!(entry.profile().storeHTCache()))) {
plasmaHTCache.deleteURLfromCache(entry.url()); plasmaHTCache.deleteURLfromCache(entry.url());
@ -106,26 +106,24 @@ public class IndexCreateIndexingQueue_p {
yacySeed initiator; yacySeed initiator;
boolean dark; boolean dark;
if ((switchboard.sbQueue.size() == 0) && (switchboard.indexingTasksInProcess.size() == 0)) { if ((switchboard.sbQueue.size() == 0) && (switchboard.sbQueue.getActiveQueueSize() == 0)) {
prop.put("indexing-queue", "0"); //is empty prop.put("indexing-queue", "0"); //is empty
} else { } else {
prop.put("indexing-queue", "1"); // there are entries in the queue or in process prop.put("indexing-queue", "1"); // there are entries in the queue or in process
dark = true; dark = true;
plasmaSwitchboardQueue.Entry pcentry; plasmaSwitchboardQueue.QueueEntry pcentry;
int inProcessCount = 0, entryCount = 0, totalCount = 0; int entryCount = 0, totalCount = 0;
long totalSize = 0; long totalSize = 0;
ArrayList<plasmaSwitchboardQueue.Entry> entryList = new ArrayList<plasmaSwitchboardQueue.Entry>();
// getting all entries that are currently in process // getting all entries that are currently in process
synchronized (switchboard.indexingTasksInProcess) { ArrayList<plasmaSwitchboardQueue.QueueEntry> entryList = new ArrayList<plasmaSwitchboardQueue.QueueEntry>();
inProcessCount = switchboard.indexingTasksInProcess.size(); entryList.addAll(switchboard.sbQueue.getActiveQueueEntries());
entryList.addAll(switchboard.indexingTasksInProcess.values()); int inProcessCount = entryList.size();
}
// getting all enqueued entries // getting all enqueued entries
if ((switchboard.sbQueue.size() > 0)) { if ((switchboard.sbQueue.size() > 0)) {
Iterator<plasmaSwitchboardQueue.Entry> i = switchboard.sbQueue.entryIterator(false); Iterator<plasmaSwitchboardQueue.QueueEntry> i = switchboard.sbQueue.entryIterator(false);
while (i.hasNext()) entryList.add(i.next()); while (i.hasNext()) entryList.add(i.next());
} }
@ -134,7 +132,7 @@ public class IndexCreateIndexingQueue_p {
for (int i = 0; (i < count) && (entryCount < showLimit); i++) { for (int i = 0; (i < count) && (entryCount < showLimit); i++) {
boolean inProcess = i < inProcessCount; boolean inProcess = i < inProcessCount;
pcentry = (plasmaSwitchboardQueue.Entry) entryList.get(i); pcentry = (plasmaSwitchboardQueue.QueueEntry) entryList.get(i);
if ((pcentry != null)&&(pcentry.url() != null)) { if ((pcentry != null)&&(pcentry.url() != null)) {
long entrySize = pcentry.size(); long entrySize = pcentry.size();
totalSize += entrySize; totalSize += entrySize;

@ -305,7 +305,7 @@ public class Status {
prop.putNum("connectionsMax", httpd.getMaxSessionCount()); prop.putNum("connectionsMax", httpd.getMaxSessionCount());
// Queue information // Queue information
int indexingJobCount = sb.getThread("80_indexing").getJobCount()+sb.indexingTasksInProcess.size(); int indexingJobCount = sb.getThread("80_indexing").getJobCount() + sb.sbQueue.getActiveQueueSize();
int indexingMaxCount = (int) sb.getConfigLong(plasmaSwitchboard.INDEXER_SLOTS, 30); int indexingMaxCount = (int) sb.getConfigLong(plasmaSwitchboard.INDEXER_SLOTS, 30);
int indexingPercent = (indexingMaxCount==0)?0:indexingJobCount*100/indexingMaxCount; int indexingPercent = (indexingMaxCount==0)?0:indexingJobCount*100/indexingMaxCount;
prop.putNum("indexingQueueSize", indexingJobCount); prop.putNum("indexingQueueSize", indexingJobCount);

@ -87,28 +87,25 @@ public class queues_p {
yacySeed initiator; yacySeed initiator;
//indexing queue //indexing queue
prop.putNum("indexingSize", sb.getThread(plasmaSwitchboard.INDEXER).getJobCount()+sb.indexingTasksInProcess.size()); prop.putNum("indexingSize", sb.getThread(plasmaSwitchboard.INDEXER).getJobCount() + sb.sbQueue.getActiveQueueSize());
prop.putNum("indexingMax", (int) sb.getConfigLong(plasmaSwitchboard.INDEXER_SLOTS, 30)); prop.putNum("indexingMax", (int) sb.getConfigLong(plasmaSwitchboard.INDEXER_SLOTS, 30));
prop.putNum("urlpublictextSize", sb.wordIndex.countURL()); prop.putNum("urlpublictextSize", sb.wordIndex.countURL());
prop.putNum("rwipublictextSize", sb.wordIndex.size()); prop.putNum("rwipublictextSize", sb.wordIndex.size());
if ((sb.sbQueue.size() == 0) && (sb.indexingTasksInProcess.size() == 0)) { if ((sb.sbQueue.size() == 0) && (sb.sbQueue.getActiveQueueSize() == 0)) {
prop.put("list", "0"); //is empty prop.put("list", "0"); //is empty
} else { } else {
plasmaSwitchboardQueue.Entry pcentry; plasmaSwitchboardQueue.QueueEntry pcentry;
int inProcessCount = 0;
long totalSize = 0; long totalSize = 0;
int i=0; //counter int i=0; //counter
ArrayList<plasmaSwitchboardQueue.Entry> entryList = new ArrayList<plasmaSwitchboardQueue.Entry>();
// getting all entries that are currently in process // getting all entries that are currently in process
synchronized (sb.indexingTasksInProcess) { ArrayList<plasmaSwitchboardQueue.QueueEntry> entryList = new ArrayList<plasmaSwitchboardQueue.QueueEntry>();
inProcessCount = sb.indexingTasksInProcess.size(); entryList.addAll(sb.sbQueue.getActiveQueueEntries());
entryList.addAll(sb.indexingTasksInProcess.values()); int inProcessCount = entryList.size();
}
// getting all enqueued entries // getting all enqueued entries
if ((sb.sbQueue.size() > 0)) { if ((sb.sbQueue.size() > 0)) {
Iterator<plasmaSwitchboardQueue.Entry> i1 = sb.sbQueue.entryIterator(false); Iterator<plasmaSwitchboardQueue.QueueEntry> i1 = sb.sbQueue.entryIterator(false);
while (i1.hasNext()) entryList.add(i1.next()); while (i1.hasNext()) entryList.add(i1.next());
} }
@ -118,8 +115,8 @@ public class queues_p {
int ok = 0; int ok = 0;
for (i = 0; i < size; i++) { for (i = 0; i < size; i++) {
boolean inProcess = i < inProcessCount; boolean inProcess = i < inProcessCount;
pcentry = (plasmaSwitchboardQueue.Entry) entryList.get(i); pcentry = entryList.get(i);
if ((pcentry != null)&&(pcentry.url() != null)) { if ((pcentry != null) && (pcentry.url() != null)) {
long entrySize = pcentry.size(); long entrySize = pcentry.size();
totalSize += entrySize; totalSize += entrySize;
initiator = yacyCore.seedDB.getConnected(pcentry.initiator()); initiator = yacyCore.seedDB.getConnected(pcentry.initiator());

@ -48,6 +48,7 @@ public class kelondroBufferedEcoFS {
} }
private void flushBuffer() throws IOException { private void flushBuffer() throws IOException {
if (efs == null) return;
Iterator<Map.Entry<Long, byte[]>> i = buffer.entrySet().iterator(); Iterator<Map.Entry<Long, byte[]>> i = buffer.entrySet().iterator();
Map.Entry<Long, byte[]> entry; Map.Entry<Long, byte[]> entry;
while (i.hasNext()) { while (i.hasNext()) {
@ -71,7 +72,7 @@ public class kelondroBufferedEcoFS {
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); e.printStackTrace();
} }
efs.close(); if (efs != null) efs.close();
efs = null; efs = null;
} }

@ -41,7 +41,7 @@ public class plasmaProfiling {
public static long lastPPMUpdate = System.currentTimeMillis()- 30000; public static long lastPPMUpdate = System.currentTimeMillis()- 30000;
public static void updateIndexedPage(plasmaSwitchboardQueue.Entry entry) { public static void updateIndexedPage(plasmaSwitchboardQueue.QueueEntry entry) {
if (System.currentTimeMillis() - lastPPMUpdate > 30000) { if (System.currentTimeMillis() - lastPPMUpdate > 30000) {
// we don't want to do this too often // we don't want to do this too often
yacyCore.peerActions.updateMySeed(); yacyCore.peerActions.updateMySeed();

@ -156,7 +156,7 @@ import de.anomic.yacy.yacySeed;
import de.anomic.yacy.yacyURL; import de.anomic.yacy.yacyURL;
import de.anomic.yacy.yacyVersion; import de.anomic.yacy.yacyVersion;
public final class plasmaSwitchboard extends serverAbstractSwitch<plasmaSwitchboardQueue.Entry> implements serverSwitch<plasmaSwitchboardQueue.Entry> { public final class plasmaSwitchboard extends serverAbstractSwitch<plasmaSwitchboardQueue.QueueEntry> implements serverSwitch<plasmaSwitchboardQueue.QueueEntry> {
// load slots // load slots
public static int xstackCrawlSlots = 2000; public static int xstackCrawlSlots = 2000;
@ -216,7 +216,6 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<plasmaSwitchbo
public plasmaParser parser; public plasmaParser parser;
public long proxyLastAccess, localSearchLastAccess, remoteSearchLastAccess; public long proxyLastAccess, localSearchLastAccess, remoteSearchLastAccess;
public yacyCore yc; public yacyCore yc;
public HashMap<String, plasmaSwitchboardQueue.Entry> indexingTasksInProcess;
public userDB userDB; public userDB userDB;
public bookmarksDB bookmarksDB; public bookmarksDB bookmarksDB;
public plasmaWebStructure webStructure; public plasmaWebStructure webStructure;
@ -365,6 +364,21 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<plasmaSwitchbo
public static final String PROXY_CACHE_ENQUEUE_IDLESLEEP = "70_cachemanager_idlesleep"; public static final String PROXY_CACHE_ENQUEUE_IDLESLEEP = "70_cachemanager_idlesleep";
public static final String PROXY_CACHE_ENQUEUE_BUSYSLEEP = "70_cachemanager_busysleep"; public static final String PROXY_CACHE_ENQUEUE_BUSYSLEEP = "70_cachemanager_busysleep";
// 74_parsing
/**
* <p><code>public static final String <strong>INDEXER</strong> = "80_indexing"</code></p>
* <p>Name of the indexer thread, performing the actual indexing of a website</p>
*/
public static final String PARSER = "74_indexing";
public static final String PARSER_MEMPREREQ = "74_indexing_memprereq";
public static final String PARSER_IDLESLEEP = "74_indexing_idlesleep";
public static final String PARSER_BUSYSLEEP = "74_indexing_busysleep";
public static final String PARSER_METHOD_START = "deQueueProcess";
public static final String PARSER_METHOD_JOBCOUNT = "queueSize";
public static final String PARSER_METHOD_FREEMEM = "deQueueFreeMem";
// 80_indexing // 80_indexing
/** /**
* <p><code>public static final String <strong>INDEXER</strong> = "80_indexing"</code></p> * <p><code>public static final String <strong>INDEXER</strong> = "80_indexing"</code></p>
@ -1122,13 +1136,10 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<plasmaSwitchbo
// create queue // create queue
this.sbQueue = new plasmaSwitchboardQueue(wordIndex, new File(this.plasmaPath, "switchboardQueue2.stack"), this.profilesActiveCrawls); this.sbQueue = new plasmaSwitchboardQueue(wordIndex, new File(this.plasmaPath, "switchboardQueue2.stack"), this.profilesActiveCrawls);
// create in process list
this.indexingTasksInProcess = new HashMap<String, plasmaSwitchboardQueue.Entry>();
// going through the sbQueue Entries and registering all content files as in use // going through the sbQueue Entries and registering all content files as in use
int count = 0; int count = 0;
plasmaSwitchboardQueue.Entry queueEntry; plasmaSwitchboardQueue.QueueEntry queueEntry;
Iterator<plasmaSwitchboardQueue.Entry> i1 = sbQueue.entryIterator(true); Iterator<plasmaSwitchboardQueue.QueueEntry> i1 = sbQueue.entryIterator(true);
while (i1.hasNext()) { while (i1.hasNext()) {
queueEntry = i1.next(); queueEntry = i1.next();
if ((queueEntry != null) && (queueEntry.url() != null) && (queueEntry.cacheFile().exists())) { if ((queueEntry != null) && (queueEntry.url() != null) && (queueEntry.cacheFile().exists())) {
@ -1284,8 +1295,12 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<plasmaSwitchbo
deployThread(CRAWLSTACK, "Crawl URL Stacker", "process that checks url for double-occurrences and for allowance/disallowance by robots.txt", null, deployThread(CRAWLSTACK, "Crawl URL Stacker", "process that checks url for double-occurrences and for allowance/disallowance by robots.txt", null,
new serverInstantThread(crawlStacker, CRAWLSTACK_METHOD_START, CRAWLSTACK_METHOD_JOBCOUNT, CRAWLSTACK_METHOD_FREEMEM), 8000); new serverInstantThread(crawlStacker, CRAWLSTACK_METHOD_START, CRAWLSTACK_METHOD_JOBCOUNT, CRAWLSTACK_METHOD_FREEMEM), 8000);
deployThread(INDEXER, "Parsing/Indexing", "thread that performes document parsing and indexing", "/IndexCreateIndexingQueue_p.html", //deployThread(PARSER, "Parsing", "thread that feeds a concurrent document parsing queue", "/IndexCreateIndexingQueue_p.html",
//new serverInstantThread(this, PARSER_METHOD_START, PARSER_METHOD_JOBCOUNT, PARSER_METHOD_FREEMEM), 10000);
deployThread(INDEXER, "Indexing", "thread that either distributes the index into the DHT, stores parsed documents or flushes the index cache", "/IndexCreateIndexingQueue_p.html",
new serverInstantThread(this, INDEXER_METHOD_START, INDEXER_METHOD_JOBCOUNT, INDEXER_METHOD_FREEMEM), 10000); new serverInstantThread(this, INDEXER_METHOD_START, INDEXER_METHOD_JOBCOUNT, INDEXER_METHOD_FREEMEM), 10000);
for (i = 1; i < indexing_cluster; i++) { for (i = 1; i < indexing_cluster; i++) {
setConfig((i + 80) + "_indexing_idlesleep", getConfig(INDEXER_IDLESLEEP, "")); setConfig((i + 80) + "_indexing_idlesleep", getConfig(INDEXER_IDLESLEEP, ""));
setConfig((i + 80) + "_indexing_busysleep", getConfig(INDEXER_BUSYSLEEP, "")); setConfig((i + 80) + "_indexing_busysleep", getConfig(INDEXER_BUSYSLEEP, ""));
@ -1475,8 +1490,6 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<plasmaSwitchbo
new plasmaSearchRankingProfile("", crypt.simpleDecode(sb.getConfig("rankingProfile", ""), null)); new plasmaSearchRankingProfile("", crypt.simpleDecode(sb.getConfig("rankingProfile", ""), null));
} }
/** /**
* This method changes the HTCache size.<br> * This method changes the HTCache size.<br>
* @param newCacheSize in MB * @param newCacheSize in MB
@ -1743,14 +1756,14 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<plasmaSwitchbo
return sbQueue.size(); return sbQueue.size();
} }
public void enQueue(plasmaSwitchboardQueue.Entry job) { public void enQueue(plasmaSwitchboardQueue.QueueEntry job) {
assert job != null; assert job != null;
if (!(job instanceof plasmaSwitchboardQueue.Entry)) { if (!(job instanceof plasmaSwitchboardQueue.QueueEntry)) {
System.out.println("Internal error at plasmaSwitchboard.enQueue: wrong job type"); System.out.println("Internal error at plasmaSwitchboard.enQueue: wrong job type");
System.exit(0); System.exit(0);
} }
try { try {
sbQueue.push((plasmaSwitchboardQueue.Entry) job); sbQueue.push((plasmaSwitchboardQueue.QueueEntry) job);
} catch (IOException e) { } catch (IOException e) {
log.logSevere("IOError in plasmaSwitchboard.enQueue: " + e.getMessage(), e); log.logSevere("IOError in plasmaSwitchboard.enQueue: " + e.getMessage(), e);
} }
@ -1765,9 +1778,9 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<plasmaSwitchbo
wordIndex.setMaxWordCount(newMaxCount); */ wordIndex.setMaxWordCount(newMaxCount); */
} }
public plasmaSwitchboardQueue.Entry deQueue() { public plasmaSwitchboardQueue.QueueEntry deQueue() {
// getting the next entry from the indexing queue // getting the next entry from the indexing queue
plasmaSwitchboardQueue.Entry nextentry = null; plasmaSwitchboardQueue.QueueEntry nextentry = null;
synchronized (sbQueue) { synchronized (sbQueue) {
// do one processing step // do one processing step
log.logFine("DEQUEUE: sbQueueSize=" + sbQueue.size() + log.logFine("DEQUEUE: sbQueueSize=" + sbQueue.size() +
@ -1851,21 +1864,67 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<plasmaSwitchbo
return false; return false;
} }
plasmaSwitchboardQueue.Entry nextentry = deQueue(); // get next queue entry and start a queue processing
plasmaSwitchboardQueue.QueueEntry queueEntry = deQueue();
synchronized (this.indexingTasksInProcess) { assert queueEntry != null;
this.indexingTasksInProcess.put(nextentry.urlHash(), nextentry); if (queueEntry == null) return true;
if (queueEntry.profile() == null) {
queueEntry.close();
return true;
} }
sbQueue.enQueueToActive(queueEntry);
// THE FOLLOWING CAN BE CONCURRENT ->
// parse and index the resource // parse and index the resource
plasmaParserDocument document = parseDocument(nextentry); queueEntry.updateStatus(plasmaSwitchboardQueue.QUEUE_STATE_PARSING_WAITING);
if (document != null) { queueEntry.updateStatus(plasmaSwitchboardQueue.QUEUE_STATE_PARSING_RUNNING);
plasmaCondenser condensement = condenseDocument(nextentry, document); plasmaParserDocument document = parseDocument(queueEntry);
if (condensement != null) { if (document == null) {
document.notifyWebStructure(webStructure, condensement, nextentry.getModificationDate()); if (!queueEntry.profile().storeHTCache()) {
storeDocumentIndex(nextentry, document, condensement); plasmaHTCache.filesInUse.remove(queueEntry.cacheFile());
//plasmaHTCache.deleteURLfromCache(entry.url());
}
queueEntry.close();
return true;
}
queueEntry.updateStatus(plasmaSwitchboardQueue.QUEUE_STATE_PARSING_COMPLETE);
// do condensing
queueEntry.updateStatus(plasmaSwitchboardQueue.QUEUE_STATE_CONDENSING_WAITING);
queueEntry.updateStatus(plasmaSwitchboardQueue.QUEUE_STATE_CONDENSING_RUNNING);
plasmaCondenser condensement = condenseDocument(queueEntry, document);
if (condensement == null) {
if (!queueEntry.profile().storeHTCache()) {
plasmaHTCache.filesInUse.remove(queueEntry.cacheFile());
//plasmaHTCache.deleteURLfromCache(entry.url());
} }
queueEntry.close();
return true;
}
queueEntry.updateStatus(plasmaSwitchboardQueue.QUEUE_STATE_CONDENSING_COMPLETE);
// do a web structure analysis
queueEntry.updateStatus(plasmaSwitchboardQueue.QUEUE_STATE_STRUCTUREANALYSIS_WAITING);
queueEntry.updateStatus(plasmaSwitchboardQueue.QUEUE_STATE_STRUCTUREANALYSIS_RUNNING);
document.notifyWebStructure(webStructure, condensement, queueEntry.getModificationDate());
queueEntry.updateStatus(plasmaSwitchboardQueue.QUEUE_STATE_STRUCTUREANALYSIS_COMPLETE);
// <- CONCURRENT UNTIL HERE, THEN SERIALIZE AGAIN
// store the result
queueEntry.updateStatus(plasmaSwitchboardQueue.QUEUE_STATE_INDEXSTORAGE_WAITING);
queueEntry.updateStatus(plasmaSwitchboardQueue.QUEUE_STATE_INDEXSTORAGE_RUNNING);
storeDocumentIndex(queueEntry, document, condensement);
queueEntry.updateStatus(plasmaSwitchboardQueue.QUEUE_STATE_INDEXSTORAGE_COMPLETE);
// finally close the queue process
if (!queueEntry.profile().storeHTCache()) {
plasmaHTCache.filesInUse.remove(queueEntry.cacheFile());
//plasmaHTCache.deleteURLfromCache(entry.url());
} }
queueEntry.close();
return true; return true;
} catch (InterruptedException e) { } catch (InterruptedException e) {
log.logInfo("DEQUEUE: Shutdown detected."); log.logInfo("DEQUEUE: Shutdown detected.");
@ -2076,116 +2135,71 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<plasmaSwitchbo
} }
} }
private plasmaParserDocument parseResource(plasmaSwitchboardQueue.Entry entry) throws InterruptedException, ParserException { private plasmaParserDocument parseDocument(plasmaSwitchboardQueue.QueueEntry entry) throws InterruptedException {
plasmaParserDocument document = null;
int processCase = entry.processCase();
// the mimetype of this entry log.logFine("processResourceStack processCase=" + processCase +
String mimeType = entry.getMimeType(); ", depth=" + entry.depth() +
String charset = entry.getCharacterEncoding(); ", maxDepth=" + ((entry.profile() == null) ? "null" : Integer.toString(entry.profile().generalDepth())) +
", filter=" + ((entry.profile() == null) ? "null" : entry.profile().generalFilter()) +
// the parser logger ", initiatorHash=" + entry.initiator() +
//serverLog parserLogger = parser.getLogger(); //", responseHeader=" + ((entry.responseHeader() == null) ? "null" : entry.responseHeader().toString()) +
", url=" + entry.url()); // DEBUG
// PARSE CONTENT
long parsingStartTime = System.currentTimeMillis();
// parse the document
return parseResource(entry.url(), mimeType, charset, entry.cacheFile());
}
public plasmaParserDocument parseResource(yacyURL location, String mimeType, String documentCharset, File sourceFile) throws InterruptedException, ParserException {
plasmaParserDocument doc = parser.parseSource(location, mimeType, documentCharset, sourceFile);
assert(doc != null) : "Unexpected error. Parser returned null.";
return doc;
}
private plasmaParserDocument parseDocument(plasmaSwitchboardQueue.Entry entry) throws InterruptedException {
plasmaParserDocument document = null;
try { try {
long stackStartTime = 0, stackEndTime = 0, // parse the document
parsingStartTime = 0, parsingEndTime = 0; document = parser.parseSource(entry.url(), entry.getMimeType(), entry.getCharacterEncoding(), entry.cacheFile());
int processCase = entry.processCase(); assert(document != null) : "Unexpected error. Parser returned null.";
if (document == null) return null;
log.logFine("processResourceStack processCase=" + processCase + } catch (ParserException e) {
", depth=" + entry.depth() + this.log.logInfo("Unable to parse the resource '" + entry.url() + "'. " + e.getMessage());
", maxDepth=" + ((entry.profile() == null) ? "null" : Integer.toString(entry.profile().generalDepth())) + addURLtoErrorDB(entry.url(), entry.referrerHash(), entry.initiator(), entry.anchorName(), e.getErrorCode(), new kelondroBitfield());
", filter=" + ((entry.profile() == null) ? "null" : entry.profile().generalFilter()) + if (document != null) {
", initiatorHash=" + entry.initiator() + document.close();
//", responseHeader=" + ((entry.responseHeader() == null) ? "null" : entry.responseHeader().toString()) + document = null;
", url=" + entry.url()); // DEBUG
// PARSE CONTENT
parsingStartTime = System.currentTimeMillis();
try {
document = this.parseResource(entry);
if (document == null) return null;
} catch (ParserException e) {
this.log.logInfo("Unable to parse the resource '" + entry.url() + "'. " + e.getMessage());
addURLtoErrorDB(entry.url(), entry.referrerHash(), entry.initiator(), entry.anchorName(), e.getErrorCode(), new kelondroBitfield());
if (document != null) {
document.close();
document = null;
}
return null;
}
parsingEndTime = System.currentTimeMillis();
// get the document date
Date docDate = entry.getModificationDate();
// put anchors on crawl stack
stackStartTime = System.currentTimeMillis();
if (
((processCase == PROCESSCASE_4_PROXY_LOAD) || (processCase == PROCESSCASE_5_LOCAL_CRAWLING)) &&
((entry.profile() == null) || (entry.depth() < entry.profile().generalDepth()))
) {
Map<yacyURL, String> hl = document.getHyperlinks();
Iterator<Map.Entry<yacyURL, String>> i = hl.entrySet().iterator();
yacyURL nextUrl;
Map.Entry<yacyURL, String> nextEntry;
while (i.hasNext()) {
// check for interruption
checkInterruption();
// fetching the next hyperlink
nextEntry = i.next();
nextUrl = nextEntry.getKey();
// enqueue the hyperlink into the pre-notice-url db
crawlStacker.enqueueEntry(nextUrl, entry.urlHash(), entry.initiator(), nextEntry.getValue(), docDate, entry.depth() + 1, entry.profile());
}
if (log.isInfo()) log.logInfo("CRAWL: ADDED " + hl.size() + " LINKS FROM " + entry.url().toNormalform(false, true) +
", NEW CRAWL STACK SIZE IS " + crawlQueues.noticeURL.stackSize(plasmaCrawlNURL.STACK_TYPE_CORE) +
", STACKING TIME = " + (stackEndTime-stackStartTime) +
", PARSING TIME = " + (parsingEndTime-parsingStartTime));
}
stackEndTime = System.currentTimeMillis();
} catch (Exception e) {
if (e instanceof InterruptedException) throw (InterruptedException)e;
this.log.logSevere("Unexpected exception while parsing/indexing URL ",e);
} catch (Error e) {
this.log.logSevere("Unexpected exception while parsing/indexing URL ",e);
} finally {
checkInterruption();
// The following code must be into the finally block, otherwise it will not be executed
// on errors!
// removing current entry from in process list
synchronized (this.indexingTasksInProcess) {
this.indexingTasksInProcess.remove(entry.urlHash());
} }
return null;
// explicit delete/free resources }
if ((entry != null) && (entry.profile() != null) && (!(entry.profile().storeHTCache()))) {
plasmaHTCache.filesInUse.remove(entry.cacheFile()); long parsingEndTime = System.currentTimeMillis();
//plasmaHTCache.deleteURLfromCache(entry.url());
// get the document date
Date docDate = entry.getModificationDate();
// put anchors on crawl stack
long stackStartTime = System.currentTimeMillis();
if (
((processCase == PROCESSCASE_4_PROXY_LOAD) || (processCase == PROCESSCASE_5_LOCAL_CRAWLING)) &&
((entry.profile() == null) || (entry.depth() < entry.profile().generalDepth()))
) {
Map<yacyURL, String> hl = document.getHyperlinks();
Iterator<Map.Entry<yacyURL, String>> i = hl.entrySet().iterator();
yacyURL nextUrl;
Map.Entry<yacyURL, String> nextEntry;
while (i.hasNext()) {
// check for interruption
checkInterruption();
// fetching the next hyperlink
nextEntry = i.next();
nextUrl = nextEntry.getKey();
// enqueue the hyperlink into the pre-notice-url db
crawlStacker.enqueueEntry(nextUrl, entry.urlHash(), entry.initiator(), nextEntry.getValue(), docDate, entry.depth() + 1, entry.profile());
} }
entry = null; long stackEndTime = System.currentTimeMillis();
if (log.isInfo()) log.logInfo("CRAWL: ADDED " + hl.size() + " LINKS FROM " + entry.url().toNormalform(false, true) +
if (document != null) try { document.close(); } catch (Exception e) {} ", NEW CRAWL STACK SIZE IS " + crawlQueues.noticeURL.stackSize(plasmaCrawlNURL.STACK_TYPE_CORE) +
", STACKING TIME = " + (stackEndTime-stackStartTime) +
", PARSING TIME = " + (parsingEndTime-parsingStartTime));
} }
return document; return document;
} }
private plasmaCondenser condenseDocument(plasmaSwitchboardQueue.Entry entry, plasmaParserDocument document) throws InterruptedException { private plasmaCondenser condenseDocument(plasmaSwitchboardQueue.QueueEntry entry, plasmaParserDocument document) throws InterruptedException {
// CREATE INDEX // CREATE INDEX
String dc_title = document.dc_title(); String dc_title = document.dc_title();
yacyURL referrerURL = entry.referrerURL(); yacyURL referrerURL = entry.referrerURL();
@ -2229,7 +2243,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<plasmaSwitchbo
return condenser; return condenser;
} }
private void storeDocumentIndex(plasmaSwitchboardQueue.Entry entry, plasmaParserDocument document, plasmaCondenser condenser) { private void storeDocumentIndex(plasmaSwitchboardQueue.QueueEntry entry, plasmaParserDocument document, plasmaCondenser condenser) {
// CREATE INDEX // CREATE INDEX
String dc_title = document.dc_title(); String dc_title = document.dc_title();
@ -2275,131 +2289,23 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<plasmaSwitchbo
if ((processCase == PROCESSCASE_6_GLOBAL_CRAWLING) && (initiatorPeer != null)) { if ((processCase == PROCESSCASE_6_GLOBAL_CRAWLING) && (initiatorPeer != null)) {
log.logInfo("Sending crawl receipt for '" + entry.url().toNormalform(false, true) + "' to " + initiatorPeer.getName()); log.logInfo("Sending crawl receipt for '" + entry.url().toNormalform(false, true) + "' to " + initiatorPeer.getName());
if (clusterhashes != null) initiatorPeer.setAlternativeAddress((String) clusterhashes.get(initiatorPeer.hash)); if (clusterhashes != null) initiatorPeer.setAlternativeAddress((String) clusterhashes.get(initiatorPeer.hash));
yacyClient.crawlReceipt(initiatorPeer, "crawl", "fill", "indexed", newEntry, ""); // start a thread for receipt sending to avoid a blocking here
new Thread(new receiptSending(initiatorPeer, newEntry)).start();
} }
} }
/*
private void indexDocument(plasmaSwitchboardQueue.Entry entry, plasmaParserDocument document, plasmaCondenser condenser) throws InterruptedException { public class receiptSending implements Runnable {
long indexingStartTime = System.currentTimeMillis(), indexingEndTime = 0, yacySeed initiatorPeer;
storageStartTime = 0, storageEndTime = 0; indexURLReference reference;
// CREATE INDEX
String dc_title = document.dc_title();
yacyURL referrerURL = entry.referrerURL();
Date docDate = entry.getModificationDate();
int processCase = entry.processCase();
// generate citation reference
Integer[] ioLinks = webStructure.generateCitationReference(entry.url(), entry.urlHash(), docDate, document, condenser); // [outlinksSame, outlinksOther]
// check for interruption
checkInterruption();
// create a new loaded URL db entry
long ldate = System.currentTimeMillis();
indexURLReference newEntry = new indexURLReference(
entry.url(), // URL
dc_title, // document description
document.dc_creator(), // author
document.dc_subject(' '), // tags
"", // ETag
docDate, // modification date
new Date(), // loaded date
new Date(ldate + Math.max(0, ldate - docDate.getTime()) / 2), // freshdate, computed with Proxy-TTL formula
(referrerURL == null) ? null : referrerURL.hash(), // referer hash
new byte[0], // md5
(int) entry.size(), // size
condenser.RESULT_NUMB_WORDS, // word count
plasmaHTCache.docType(document.dc_format()), // doctype
condenser.RESULT_FLAGS, // flags
yacyURL.language(entry.url()), // language
ioLinks[0].intValue(), // llocal
ioLinks[1].intValue(), // lother
document.getAudiolinks().size(), // laudio
document.getImages().size(), // limage
document.getVideolinks().size(), // lvideo
document.getApplinks().size() // lapp
);
// STORE URL TO LOADED-URL-DB
try {
wordIndex.putURL(newEntry);
} catch (IOException e) {
log.logFine("Not Indexed Resource '" + entry.url().toNormalform(false, true) + "': process case=" + processCase);
addURLtoErrorDB(entry.url(), referrerURL.hash(), entry.initiator(), dc_title, "error storing url: " + e.getMessage(), new kelondroBitfield());
return;
}
crawlResults.stack(
newEntry, // loaded url db entry
entry.initiator(), // initiator peer hash
yacyCore.seedDB.mySeed().hash, // executor peer hash
processCase // process case
);
// check for interruption
checkInterruption();
// STORE WORD INDEX
if ((!entry.profile().indexText()) && (!entry.profile().indexMedia())) {
log.logFine("Not Indexed Resource '" + entry.url().toNormalform(false, true) + "': process case=" + processCase);
addURLtoErrorDB(entry.url(), referrerURL.hash(), entry.initiator(), dc_title, plasmaCrawlEURL.DENIED_UNKNOWN_INDEXING_PROCESS_CASE, new kelondroBitfield());
return;
}
// remove stopwords
log.logInfo("Excluded " + condenser.excludeWords(stopwords) + " words in URL " + entry.url());
indexingEndTime = System.currentTimeMillis();
storageStartTime = System.currentTimeMillis();
int words = 0;
// STORE PAGE INDEX INTO WORD INDEX DB
words = wordIndex.addPageIndex(
entry.url(), // document url
docDate, // document mod date
(int) entry.size(), // document size
document, // document content
condenser, // document condenser
yacyURL.language(entry.url()), // document language
plasmaHTCache.docType(document.dc_format()),// document type
ioLinks[0].intValue(), // outlinkSame
ioLinks[1].intValue() // outlinkOthers
);
storageEndTime = System.currentTimeMillis();
//increment number of indexed urls
indexedPages++;
if (log.isInfo()) { public receiptSending(yacySeed initiatorPeer, indexURLReference reference) {
// TODO: UTF-8 docDescription seems not to be displayed correctly because this.initiatorPeer = initiatorPeer;
// of string concatenation this.reference = reference;
log.logInfo("*Indexed " + words + " words in URL " + entry.url() +
" [" + entry.urlHash() + "]" +
"\n\tDescription: " + dc_title +
"\n\tMimeType: " + document.dc_format() + " | Charset: " + document.getCharset() + " | " +
"Size: " + document.getTextLength() + " bytes | " +
"Anchors: " + ((document.getAnchors() == null) ? 0 : document.getAnchors().size()) +
"\n\tIndexingTime: " + (indexingEndTime-indexingStartTime) + " ms | " +
"StorageTime: " + (storageEndTime-storageStartTime) + " ms");
} }
public void run() {
// update profiling info yacyClient.crawlReceipt(initiatorPeer, "crawl", "fill", "indexed", reference, "");
plasmaProfiling.updateIndexedPage(entry);
// check for interruption
checkInterruption();
yacySeed initiatorPeer = entry.initiatorPeer();
// if this was performed for a remote crawl request, notify requester
if ((processCase == PROCESSCASE_6_GLOBAL_CRAWLING) && (initiatorPeer != null)) {
log.logInfo("Sending crawl receipt for '" + entry.url().toNormalform(false, true) + "' to " + initiatorPeer.getName());
if (clusterhashes != null) initiatorPeer.setAlternativeAddress((String) clusterhashes.get(initiatorPeer.hash));
yacyClient.crawlReceipt(initiatorPeer, "crawl", "fill", "indexed", newEntry, "");
} }
} }
*/
private static SimpleDateFormat DateFormatter = new SimpleDateFormat("EEE, dd MMM yyyy"); private static SimpleDateFormat DateFormatter = new SimpleDateFormat("EEE, dd MMM yyyy");
public static String dateString(Date date) { public static String dateString(Date date) {

@ -47,8 +47,10 @@ package de.anomic.plasma;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.net.MalformedURLException; import java.net.MalformedURLException;
import java.util.Collection;
import java.util.Date; import java.util.Date;
import java.util.Iterator; import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import de.anomic.index.indexURLReference; import de.anomic.index.indexURLReference;
import de.anomic.kelondro.kelondroBase64Order; import de.anomic.kelondro.kelondroBase64Order;
@ -68,11 +70,13 @@ public class plasmaSwitchboardQueue {
plasmaCrawlProfile profiles; plasmaCrawlProfile profiles;
plasmaWordIndex index; plasmaWordIndex index;
private File sbQueueStackPath; private File sbQueueStackPath;
ConcurrentHashMap<String, QueueEntry> queueInProcess;
public plasmaSwitchboardQueue(plasmaWordIndex index, File sbQueueStackPath, plasmaCrawlProfile profiles) { public plasmaSwitchboardQueue(plasmaWordIndex index, File sbQueueStackPath, plasmaCrawlProfile profiles) {
this.sbQueueStackPath = sbQueueStackPath; this.sbQueueStackPath = sbQueueStackPath;
this.profiles = profiles; this.profiles = profiles;
this.index = index; this.index = index;
this.queueInProcess = new ConcurrentHashMap<String, QueueEntry>();
initQueueStack(); initQueueStack();
} }
@ -104,7 +108,7 @@ public class plasmaSwitchboardQueue {
return sbQueueStack.size(); return sbQueueStack.size();
} }
public synchronized void push(Entry entry) throws IOException { public synchronized void push(QueueEntry entry) throws IOException {
if (entry == null) return; if (entry == null) return;
sbQueueStack.push(sbQueueStack.row().newEntry(new byte[][]{ sbQueueStack.push(sbQueueStack.row().newEntry(new byte[][]{
entry.url.toString().getBytes(), entry.url.toString().getBytes(),
@ -118,20 +122,20 @@ public class plasmaSwitchboardQueue {
})); }));
} }
public synchronized Entry pop() throws IOException { public synchronized QueueEntry pop() throws IOException {
if (sbQueueStack.size() == 0) return null; if (sbQueueStack.size() == 0) return null;
kelondroRow.Entry b = sbQueueStack.pot(); kelondroRow.Entry b = sbQueueStack.pot();
if (b == null) return null; if (b == null) return null;
return new Entry(b); return new QueueEntry(b);
} }
public synchronized Entry remove(String urlHash) { public synchronized QueueEntry remove(String urlHash) {
Iterator<kelondroRow.Entry> i = sbQueueStack.stackIterator(true); Iterator<kelondroRow.Entry> i = sbQueueStack.stackIterator(true);
kelondroRow.Entry rowentry; kelondroRow.Entry rowentry;
Entry entry; QueueEntry entry;
while (i.hasNext()) { while (i.hasNext()) {
rowentry = (kelondroRow.Entry) i.next(); rowentry = (kelondroRow.Entry) i.next();
entry = new Entry(rowentry); entry = new QueueEntry(rowentry);
if (entry.urlHash().equals(urlHash)) { if (entry.urlHash().equals(urlHash)) {
i.remove(); i.remove();
return entry; return entry;
@ -160,13 +164,13 @@ public class plasmaSwitchboardQueue {
super.finalize(); super.finalize();
} }
public Iterator<Entry> entryIterator(boolean up) { public Iterator<QueueEntry> entryIterator(boolean up) {
// iterates the elements in an ordered way. // iterates the elements in an ordered way.
// returns plasmaSwitchboardQueue.Entry - type Objects // returns plasmaSwitchboardQueue.Entry - type Objects
return new entryIterator(up); return new entryIterator(up);
} }
public class entryIterator implements Iterator<Entry> { public class entryIterator implements Iterator<QueueEntry> {
Iterator<kelondroRow.Entry> rows; Iterator<kelondroRow.Entry> rows;
@ -178,8 +182,8 @@ public class plasmaSwitchboardQueue {
return rows.hasNext(); return rows.hasNext();
} }
public Entry next() { public QueueEntry next() {
return new Entry((kelondroRow.Entry) rows.next()); return new QueueEntry((kelondroRow.Entry) rows.next());
} }
public void remove() { public void remove() {
@ -187,12 +191,43 @@ public class plasmaSwitchboardQueue {
} }
} }
public Entry newEntry(yacyURL url, String referrer, Date ifModifiedSince, boolean requestWithCookie, public QueueEntry newEntry(yacyURL url, String referrer, Date ifModifiedSince, boolean requestWithCookie,
String initiator, int depth, String profilehandle, String anchorName) { String initiator, int depth, String profilehandle, String anchorName) {
return new Entry(url, referrer, ifModifiedSince, requestWithCookie, initiator, depth, profilehandle, anchorName); return new QueueEntry(url, referrer, ifModifiedSince, requestWithCookie, initiator, depth, profilehandle, anchorName);
} }
public class Entry { public void enQueueToActive(QueueEntry entry) {
queueInProcess.put(entry.urlHash(), entry);
}
public QueueEntry getActiveEntry(String urlhash) {
// show one entry from the queue
return this.queueInProcess.get(urlhash);
}
public int getActiveQueueSize() {
return this.queueInProcess.size();
}
public Collection<QueueEntry> getActiveQueueEntries() {
return this.queueInProcess.values();
}
public static final int QUEUE_STATE_FRESH = 0;
public static final int QUEUE_STATE_PARSING_WAITING = 1;
public static final int QUEUE_STATE_PARSING_RUNNING = 2;
public static final int QUEUE_STATE_PARSING_COMPLETE = 3;
public static final int QUEUE_STATE_CONDENSING_WAITING = 4;
public static final int QUEUE_STATE_CONDENSING_RUNNING = 5;
public static final int QUEUE_STATE_CONDENSING_COMPLETE = 6;
public static final int QUEUE_STATE_STRUCTUREANALYSIS_WAITING = 7;
public static final int QUEUE_STATE_STRUCTUREANALYSIS_RUNNING = 8;
public static final int QUEUE_STATE_STRUCTUREANALYSIS_COMPLETE = 9;
public static final int QUEUE_STATE_INDEXSTORAGE_WAITING = 10;
public static final int QUEUE_STATE_INDEXSTORAGE_RUNNING = 11;
public static final int QUEUE_STATE_INDEXSTORAGE_COMPLETE = 12;
public class QueueEntry {
yacyURL url; // plasmaURL.urlStringLength yacyURL url; // plasmaURL.urlStringLength
String referrerHash; // plasmaURL.urlHashLength String referrerHash; // plasmaURL.urlHashLength
Date ifModifiedSince; // 6 Date ifModifiedSince; // 6
@ -201,13 +236,14 @@ public class plasmaSwitchboardQueue {
int depth; // plasmaURL.urlCrawlDepthLength int depth; // plasmaURL.urlCrawlDepthLength
String profileHandle; // plasmaURL.urlCrawlProfileHandleLength String profileHandle; // plasmaURL.urlCrawlProfileHandleLength
String anchorName; // plasmaURL.urlDescrLength String anchorName; // plasmaURL.urlDescrLength
int status;
// computed values // computed values
private plasmaCrawlProfile.entry profileEntry; private plasmaCrawlProfile.entry profileEntry;
private IResourceInfo contentInfo; private IResourceInfo contentInfo;
private yacyURL referrerURL; private yacyURL referrerURL;
public Entry(yacyURL url, String referrer, Date ifModifiedSince, boolean requestWithCookie, public QueueEntry(yacyURL url, String referrer, Date ifModifiedSince, boolean requestWithCookie,
String initiator, int depth, String profileHandle, String anchorName) { String initiator, int depth, String profileHandle, String anchorName) {
this.url = url; this.url = url;
this.referrerHash = referrer; this.referrerHash = referrer;
@ -221,9 +257,10 @@ public class plasmaSwitchboardQueue {
this.profileEntry = null; this.profileEntry = null;
this.contentInfo = null; this.contentInfo = null;
this.referrerURL = null; this.referrerURL = null;
this.status = QUEUE_STATE_FRESH;
} }
public Entry(kelondroRow.Entry row) { public QueueEntry(kelondroRow.Entry row) {
long ims = row.getColLong(2); long ims = row.getColLong(2);
byte flags = row.getColByte(3); byte flags = row.getColByte(3);
try { try {
@ -242,9 +279,10 @@ public class plasmaSwitchboardQueue {
this.profileEntry = null; this.profileEntry = null;
this.contentInfo = null; this.contentInfo = null;
this.referrerURL = null; this.referrerURL = null;
this.status = QUEUE_STATE_FRESH;
} }
public Entry(byte[][] row) throws IOException { public QueueEntry(byte[][] row) throws IOException {
long ims = (row[2] == null) ? 0 : kelondroBase64Order.enhancedCoder.decodeLong(new String(row[2], "UTF-8")); long ims = (row[2] == null) ? 0 : kelondroBase64Order.enhancedCoder.decodeLong(new String(row[2], "UTF-8"));
byte flags = (row[3] == null) ? 0 : row[3][0]; byte flags = (row[3] == null) ? 0 : row[3][0];
try { try {
@ -263,6 +301,19 @@ public class plasmaSwitchboardQueue {
this.profileEntry = null; this.profileEntry = null;
this.contentInfo = null; this.contentInfo = null;
this.referrerURL = null; this.referrerURL = null;
this.status = QUEUE_STATE_FRESH;
}
public void updateStatus(int newStatus) {
this.status = newStatus;
}
public void close() {
queueInProcess.remove(this.url.hash());
}
public void finalize() {
this.close();
} }
public yacyURL url() { public yacyURL url() {

@ -605,7 +605,7 @@ public final class plasmaWordIndex implements indexRI {
return containers; // this may return less containers as demanded return containers; // this may return less containers as demanded
} }
public indexURLReference storeDocument(plasmaSwitchboardQueue.Entry entry, plasmaParserDocument document, plasmaCondenser condenser) throws IOException { public indexURLReference storeDocument(plasmaSwitchboardQueue.QueueEntry entry, plasmaParserDocument document, plasmaCondenser condenser) throws IOException {
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
// CREATE INDEX // CREATE INDEX

Loading…
Cancel
Save