implemented multithreading of indexing

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@221 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
orbiter 20 years ago
parent 7c318f8886
commit 33f9315e58

@ -209,10 +209,10 @@ public class IndexCreate_p {
prop.put("xdstopwChecked", env.getConfig("xdstopw", "").equals("true") ? 1 : 0);
prop.put("xpstopwChecked", env.getConfig("xpstopw", "").equals("true") ? 1 : 0);
int processStackSize = switchboard.processStack.size();
int queueStackSize = switchboard.queueStack.size();
int loaderThreadsSize = switchboard.cacheLoader.size();
int crawlerListSize = switchboard.noticeURL.stackSize();
int completequeue = processStackSize + loaderThreadsSize + crawlerListSize;
int completequeue = queueStackSize + loaderThreadsSize + crawlerListSize;
if ((completequeue > 0) || ((post != null) && (post.containsKey("refreshpage")))) {
prop.put("refreshbutton", 1);
@ -318,15 +318,15 @@ public class IndexCreate_p {
yacySeed initiator;
if (switchboard.processStack.size() == 0) {
if (switchboard.queueStack.size() == 0) {
prop.put("indexing-queue", 0); //is empty
} else {
prop.put("indexing-queue", 1);
prop.put("indexing-queue_num", switchboard.processStack.size());//num entries in queue
prop.put("indexing-queue_num", switchboard.queueStack.size());//num entries in queue
dark = true;
plasmaHTCache.Entry pcentry;
for (i = 0; i < switchboard.processStack.size(); i++) {
pcentry = (plasmaHTCache.Entry) switchboard.processStack.get(i);
for (i = 0; i < switchboard.queueStack.size(); i++) {
pcentry = (plasmaHTCache.Entry) switchboard.queueStack.get(i);
if (pcentry != null) {
initiator = yacyCore.seedDB.getConnected(pcentry.initiator());
prop.put("indexing-queue_list_"+i+"_dark", ((dark) ? 1 : 0));

@ -1,5 +1,5 @@
// ProxyIndexingMonitor_p.java
// -----------------------
// ---------------------------
// part of the AnomicHTTPD caching proxy
// (C) by Michael Peter Christen; mc@anomic.de
// first published on http://www.anomic.de

@ -173,7 +173,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
public plasmaHTCache cacheManager;
public plasmaSnippetCache snippetCache;
public plasmaCrawlLoader cacheLoader;
public LinkedList processStack = new LinkedList();
public LinkedList queueStack = new LinkedList();
public messageBoard messageDB;
public wikiBoard wikiDB;
public String remoteProxyHost;
@ -334,14 +334,18 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
// deploy threads
log.logSystem("Starting Threads");
int indexing_cluster = Integer.parseInt(getConfig("?80_indexing_cluster", "1"));
if (indexing_cluster < 1) indexing_cluster = 1;
deployThread("90_cleanup", "Cleanup", "simple cleaning process for monitoring information" ,
new serverInstantThread(this, "cleanupJob", "cleanupJobSize"), 10000); // all 5 Minutes
deployThread("80_dequeue", "Indexing Dequeue", "thread that creates database entries from scraped web content and performes indexing" ,
deployThread("80_indexing", "Parsing/Indexing", "thread that performes document parsing and indexing" ,
new serverInstantThread(this, "deQueue", "queueSize"), 10000);
setConfig("81_dequeue_idlesleep" , getConfig("80_dequeue_idlesleep", ""));
setConfig("81_dequeue_busysleep" , getConfig("80_dequeue_busysleep", ""));
deployThread("81_dequeue", "Indexing Dequeue (second job, test run)", "thread that creates database entries from scraped web content and performes indexing" ,
new serverInstantThread(this, "deQueue", "queueSize"), 11000);
for (int i = 1; i < indexing_cluster; i++) {
setConfig((i + 80) + "_indexing_idlesleep", getConfig("80_indexing_idlesleep", ""));
setConfig((i + 80) + "_indexing_busysleep", getConfig("80_indexing_busysleep", ""));
deployThread((i + 80) + "_indexing", "Parsing/Indexing (cluster job)", "thread that performes document parsing and indexing" ,
new serverInstantThread(this, "deQueue", "queueSize"), 10000 + (i * 1000));
}
deployThread("70_cachemanager", "Proxy Cache Enqueue", "job takes new proxy files from RAM stack, stores them, and hands over to the Indexing Stack",
new serverInstantThread(cacheManager, "job", "size"), 10000);
deployThread("62_remotetriggeredcrawl", "Remote Crawl Job", "thread that performes a single crawl/indexing step triggered by a remote peer",
@ -406,7 +410,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
} catch (IOException e) {}
}
private void cleanProfiles() {
if (queueSize() > 0) return;
if ((queueStack.size() > 0) || (cacheLoader.size() > 0) || (noticeURL.stackSize() > 0)) return;
Iterator i = profiles.profiles(true);
plasmaCrawlProfile.entry entry;
try {
@ -467,7 +471,8 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
*/
public int queueSize() {
return processStack.size() + cacheLoader.size() + noticeURL.stackSize();
return queueStack.size();
//return processStack.size() + cacheLoader.size() + noticeURL.stackSize();
}
public int lUrlSize() {
@ -480,13 +485,13 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
public void enQueue(Object job) {
plasmaHTCache.Entry entry = (plasmaHTCache.Entry) job;
processStack.addLast(entry);
queueStack.addLast(entry);
}
public boolean deQueue() {
// work off fresh entries from the proxy or from the crawler
if (processStack.size() == 0) {
if (queueStack.size() == 0) {
//log.logDebug("DEQUEUE: queue is empty");
return false; // nothing to do
}
@ -496,12 +501,12 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
// do one processing step
log.logDebug("DEQUEUE: cacheManager=" + ((cacheManager.idle()) ? "idle" : "busy") +
", processStack=" + processStack.size() +
", queueStack=" + queueStack.size() +
", coreStackSize=" + noticeURL.coreStackSize() +
", limitStackSize=" + noticeURL.limitStackSize() +
", overhangStackSize=" + noticeURL.overhangStackSize() +
", remoteStackSize=" + noticeURL.remoteStackSize());
processResourceStack((plasmaHTCache.Entry) processStack.removeFirst());
processResourceStack((plasmaHTCache.Entry) queueStack.removeFirst());
return true;
}
@ -574,9 +579,9 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
//log.logDebug("CoreCrawl: queue is empty");
return false;
}
if (processStack.size() >= crawlSlots) {
if (queueStack.size() >= crawlSlots) {
log.logDebug("CoreCrawl: too many processes in queue, dismissed (" +
"processStack=" + processStack.size() + ")");
"queueStack=" + queueStack.size() + ")");
return false;
}
if (cacheLoader.size() >= crawlSlots) {
@ -652,7 +657,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
", permission=" + ((yacyCore.seedDB == null) ? "undefined" : (((yacyCore.seedDB.mySeed.isSenior()) || (yacyCore.seedDB.mySeed.isPrincipal())) ? "true" : "false")));
boolean tryRemote =
((noticeURL.coreStackSize() != 0) || (processStack.size() != 0)) /* should do ourself */ &&
((noticeURL.coreStackSize() != 0) || (queueStack.size() != 0)) /* should do ourself */ &&
(profile.remoteIndexing()) /* granted */ &&
(urlEntry.initiator() != null) && (!(urlEntry.initiator().equals(plasmaURL.dummyHash))) /* not proxy */ &&
((yacyCore.seedDB.mySeed.isSenior()) ||
@ -664,9 +669,9 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
}
// alternatively do a local crawl
if (processStack.size() >= crawlSlots) {
if (queueStack.size() >= crawlSlots) {
log.logDebug("LimitCrawl: too many processes in queue, dismissed (" +
"processStack=" + processStack.size() + ")");
"queueStack=" + queueStack.size() + ")");
return false;
}
if (cacheLoader.size() >= crawlSlots) {
@ -692,9 +697,10 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
//log.logDebug("GlobalCrawl: queue is empty");
return false;
}
if (processStack.size() > 0) {
/*
if (queueStack.size() > 0) {
log.logDebug("GlobalCrawl: any processe is in queue, dismissed (" +
"processStack=" + processStack.size() + ")");
"processStack=" + queueStack.size() + ")");
return false;
}
if (noticeURL.coreStackSize() > 0) {
@ -702,6 +708,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
"coreStackSize=" + noticeURL.coreStackSize() + ")");
return false;
}
*/
// if the server is busy, we do this more slowly
if (!(cacheManager.idle())) try {Thread.currentThread().sleep(2000);} catch (InterruptedException e) {}
@ -1377,7 +1384,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
public String toString() {
// it is possible to use this method in the cgi pages.
// actually it is used there for testing purpose
return "PROPS: " + super.toString() + "; QUEUE: " + processStack.toString();
return "PROPS: " + super.toString() + "; QUEUE: " + queueStack.toString();
}
// method for index deletion
@ -1448,7 +1455,10 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
int transferred;
long starttime = System.currentTimeMillis();
try {
if ((queueSize() == 0) &&
if (
(queueStack.size() == 0) &&
(cacheLoader.size() == 0) &&
(noticeURL.stackSize() == 0) &&
(getConfig("allowDistributeIndex", "false").equals("true")) &&
((transferred = performTransferIndex(indexCount, peerCount, true)) > 0)) {
indexCount = transferred;

@ -420,11 +420,20 @@ xpstopw=true
62_remotetriggeredcrawl_busysleep=0
70_cachemanager_idlesleep=10000
70_cachemanager_busysleep=0
80_dequeue_idlesleep=10000
80_dequeue_busysleep=0
80_indexing_idlesleep=10000
80_indexing_busysleep=0
90_cleanup_idlesleep=300000
90_cleanup_busysleep=300000
# multiprocessor-settings
# you may want to run time-consuming processes on several processors
# the most time-consuming process is the indexing-Process
# We implemented an option to run several of these processes here
# setting the number of processes to Zero is not allowed
# If you have a double-processor system,
# a cluster value of '2' would be appropriate
80_indexing_cluster=1
# ram cache for database files
# ram cache for indexCache.db

Loading…
Cancel
Save