From 33f9315e5860b0cea20c387956e350dcbcde5ba7 Mon Sep 17 00:00:00 2001 From: orbiter Date: Wed, 8 Jun 2005 13:19:05 +0000 Subject: [PATCH] implemented multithreading of indexing git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@221 6c8d7289-2bf4-0310-a012-ef5d649a1542 --- htroot/IndexCreate_p.java | 12 ++--- htroot/ProxyIndexingMonitor_p.java | 2 +- .../de/anomic/plasma/plasmaSwitchboard.java | 52 +++++++++++-------- yacy.init | 13 ++++- 4 files changed, 49 insertions(+), 30 deletions(-) diff --git a/htroot/IndexCreate_p.java b/htroot/IndexCreate_p.java index 138585722..f1db873b6 100644 --- a/htroot/IndexCreate_p.java +++ b/htroot/IndexCreate_p.java @@ -209,10 +209,10 @@ public class IndexCreate_p { prop.put("xdstopwChecked", env.getConfig("xdstopw", "").equals("true") ? 1 : 0); prop.put("xpstopwChecked", env.getConfig("xpstopw", "").equals("true") ? 1 : 0); - int processStackSize = switchboard.processStack.size(); + int queueStackSize = switchboard.queueStack.size(); int loaderThreadsSize = switchboard.cacheLoader.size(); int crawlerListSize = switchboard.noticeURL.stackSize(); - int completequeue = processStackSize + loaderThreadsSize + crawlerListSize; + int completequeue = queueStackSize + loaderThreadsSize + crawlerListSize; if ((completequeue > 0) || ((post != null) && (post.containsKey("refreshpage")))) { prop.put("refreshbutton", 1); @@ -318,15 +318,15 @@ public class IndexCreate_p { yacySeed initiator; - if (switchboard.processStack.size() == 0) { + if (switchboard.queueStack.size() == 0) { prop.put("indexing-queue", 0); //is empty } else { prop.put("indexing-queue", 1); - prop.put("indexing-queue_num", switchboard.processStack.size());//num entries in queue + prop.put("indexing-queue_num", switchboard.queueStack.size());//num entries in queue dark = true; plasmaHTCache.Entry pcentry; - for (i = 0; i < switchboard.processStack.size(); i++) { - pcentry = (plasmaHTCache.Entry) switchboard.processStack.get(i); + for (i = 0; i < switchboard.queueStack.size(); i++) { + pcentry = (plasmaHTCache.Entry) switchboard.queueStack.get(i); if (pcentry != null) { initiator = yacyCore.seedDB.getConnected(pcentry.initiator()); prop.put("indexing-queue_list_"+i+"_dark", ((dark) ? 1 : 0)); diff --git a/htroot/ProxyIndexingMonitor_p.java b/htroot/ProxyIndexingMonitor_p.java index e81cec049..1dd018b8f 100644 --- a/htroot/ProxyIndexingMonitor_p.java +++ b/htroot/ProxyIndexingMonitor_p.java @@ -1,5 +1,5 @@ // ProxyIndexingMonitor_p.java -// ----------------------- +// --------------------------- // part of the AnomicHTTPD caching proxy // (C) by Michael Peter Christen; mc@anomic.de // first published on http://www.anomic.de diff --git a/source/de/anomic/plasma/plasmaSwitchboard.java b/source/de/anomic/plasma/plasmaSwitchboard.java index 163d45ff6..dec9c3788 100644 --- a/source/de/anomic/plasma/plasmaSwitchboard.java +++ b/source/de/anomic/plasma/plasmaSwitchboard.java @@ -173,7 +173,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser public plasmaHTCache cacheManager; public plasmaSnippetCache snippetCache; public plasmaCrawlLoader cacheLoader; - public LinkedList processStack = new LinkedList(); + public LinkedList queueStack = new LinkedList(); public messageBoard messageDB; public wikiBoard wikiDB; public String remoteProxyHost; @@ -334,14 +334,18 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser // deploy threads log.logSystem("Starting Threads"); + int indexing_cluster = Integer.parseInt(getConfig("?80_indexing_cluster", "1")); + if (indexing_cluster < 1) indexing_cluster = 1; deployThread("90_cleanup", "Cleanup", "simple cleaning process for monitoring information" , new serverInstantThread(this, "cleanupJob", "cleanupJobSize"), 10000); // all 5 Minutes - deployThread("80_dequeue", "Indexing Dequeue", "thread that creates database entries from scraped web content and performes indexing" , + deployThread("80_indexing", "Parsing/Indexing", "thread that performes document parsing and indexing" , new serverInstantThread(this, "deQueue", "queueSize"), 10000); - setConfig("81_dequeue_idlesleep" , getConfig("80_dequeue_idlesleep", "")); - setConfig("81_dequeue_busysleep" , getConfig("80_dequeue_busysleep", "")); - deployThread("81_dequeue", "Indexing Dequeue (second job, test run)", "thread that creates database entries from scraped web content and performes indexing" , - new serverInstantThread(this, "deQueue", "queueSize"), 11000); + for (int i = 1; i < indexing_cluster; i++) { + setConfig((i + 80) + "_indexing_idlesleep", getConfig("80_indexing_idlesleep", "")); + setConfig((i + 80) + "_indexing_busysleep", getConfig("80_indexing_busysleep", "")); + deployThread((i + 80) + "_indexing", "Parsing/Indexing (cluster job)", "thread that performes document parsing and indexing" , + new serverInstantThread(this, "deQueue", "queueSize"), 10000 + (i * 1000)); + } deployThread("70_cachemanager", "Proxy Cache Enqueue", "job takes new proxy files from RAM stack, stores them, and hands over to the Indexing Stack", new serverInstantThread(cacheManager, "job", "size"), 10000); deployThread("62_remotetriggeredcrawl", "Remote Crawl Job", "thread that performes a single crawl/indexing step triggered by a remote peer", @@ -406,7 +410,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser } catch (IOException e) {} } private void cleanProfiles() { - if (queueSize() > 0) return; + if ((queueStack.size() > 0) || (cacheLoader.size() > 0) || (noticeURL.stackSize() > 0)) return; Iterator i = profiles.profiles(true); plasmaCrawlProfile.entry entry; try { @@ -467,7 +471,8 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser */ public int queueSize() { - return processStack.size() + cacheLoader.size() + noticeURL.stackSize(); + return queueStack.size(); + //return processStack.size() + cacheLoader.size() + noticeURL.stackSize(); } public int lUrlSize() { @@ -480,13 +485,13 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser public void enQueue(Object job) { plasmaHTCache.Entry entry = (plasmaHTCache.Entry) job; - processStack.addLast(entry); + queueStack.addLast(entry); } public boolean deQueue() { // work off fresh entries from the proxy or from the crawler - if (processStack.size() == 0) { + if (queueStack.size() == 0) { //log.logDebug("DEQUEUE: queue is empty"); return false; // nothing to do } @@ -496,12 +501,12 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser // do one processing step log.logDebug("DEQUEUE: cacheManager=" + ((cacheManager.idle()) ? "idle" : "busy") + - ", processStack=" + processStack.size() + + ", queueStack=" + queueStack.size() + ", coreStackSize=" + noticeURL.coreStackSize() + ", limitStackSize=" + noticeURL.limitStackSize() + ", overhangStackSize=" + noticeURL.overhangStackSize() + ", remoteStackSize=" + noticeURL.remoteStackSize()); - processResourceStack((plasmaHTCache.Entry) processStack.removeFirst()); + processResourceStack((plasmaHTCache.Entry) queueStack.removeFirst()); return true; } @@ -574,9 +579,9 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser //log.logDebug("CoreCrawl: queue is empty"); return false; } - if (processStack.size() >= crawlSlots) { + if (queueStack.size() >= crawlSlots) { log.logDebug("CoreCrawl: too many processes in queue, dismissed (" + - "processStack=" + processStack.size() + ")"); + "queueStack=" + queueStack.size() + ")"); return false; } if (cacheLoader.size() >= crawlSlots) { @@ -652,7 +657,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser ", permission=" + ((yacyCore.seedDB == null) ? "undefined" : (((yacyCore.seedDB.mySeed.isSenior()) || (yacyCore.seedDB.mySeed.isPrincipal())) ? "true" : "false"))); boolean tryRemote = - ((noticeURL.coreStackSize() != 0) || (processStack.size() != 0)) /* should do ourself */ && + ((noticeURL.coreStackSize() != 0) || (queueStack.size() != 0)) /* should do ourself */ && (profile.remoteIndexing()) /* granted */ && (urlEntry.initiator() != null) && (!(urlEntry.initiator().equals(plasmaURL.dummyHash))) /* not proxy */ && ((yacyCore.seedDB.mySeed.isSenior()) || @@ -664,9 +669,9 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser } // alternatively do a local crawl - if (processStack.size() >= crawlSlots) { + if (queueStack.size() >= crawlSlots) { log.logDebug("LimitCrawl: too many processes in queue, dismissed (" + - "processStack=" + processStack.size() + ")"); + "queueStack=" + queueStack.size() + ")"); return false; } if (cacheLoader.size() >= crawlSlots) { @@ -692,9 +697,10 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser //log.logDebug("GlobalCrawl: queue is empty"); return false; } - if (processStack.size() > 0) { + /* + if (queueStack.size() > 0) { log.logDebug("GlobalCrawl: any processe is in queue, dismissed (" + - "processStack=" + processStack.size() + ")"); + "processStack=" + queueStack.size() + ")"); return false; } if (noticeURL.coreStackSize() > 0) { @@ -702,6 +708,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser "coreStackSize=" + noticeURL.coreStackSize() + ")"); return false; } + */ // if the server is busy, we do this more slowly if (!(cacheManager.idle())) try {Thread.currentThread().sleep(2000);} catch (InterruptedException e) {} @@ -1377,7 +1384,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser public String toString() { // it is possible to use this method in the cgi pages. // actually it is used there for testing purpose - return "PROPS: " + super.toString() + "; QUEUE: " + processStack.toString(); + return "PROPS: " + super.toString() + "; QUEUE: " + queueStack.toString(); } // method for index deletion @@ -1448,7 +1455,10 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser int transferred; long starttime = System.currentTimeMillis(); try { - if ((queueSize() == 0) && + if ( + (queueStack.size() == 0) && + (cacheLoader.size() == 0) && + (noticeURL.stackSize() == 0) && (getConfig("allowDistributeIndex", "false").equals("true")) && ((transferred = performTransferIndex(indexCount, peerCount, true)) > 0)) { indexCount = transferred; diff --git a/yacy.init b/yacy.init index 956115c5d..0aeb3f39a 100644 --- a/yacy.init +++ b/yacy.init @@ -420,11 +420,20 @@ xpstopw=true 62_remotetriggeredcrawl_busysleep=0 70_cachemanager_idlesleep=10000 70_cachemanager_busysleep=0 -80_dequeue_idlesleep=10000 -80_dequeue_busysleep=0 +80_indexing_idlesleep=10000 +80_indexing_busysleep=0 90_cleanup_idlesleep=300000 90_cleanup_busysleep=300000 +# multiprocessor-settings +# you may want to run time-consuming processes on several processors +# the most time-consuming process is the indexing-Process +# We implemented an option to run several of these processes here +# setting the number of processes to Zero is not allowed +# If you have a double-processor system, +# a cluster value of '2' would be appropriate +80_indexing_cluster=1 + # ram cache for database files # ram cache for indexCache.db