From d376d81fc4705e91406954053b5c332ee9a067f9 Mon Sep 17 00:00:00 2001 From: orbiter Date: Thu, 18 Dec 2008 23:18:34 +0000 Subject: [PATCH] replaced busy thread control of crawl stacker by blocking threads git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@5400 6c8d7289-2bf4-0310-a012-ef5d649a1542 --- defaults/yacy.init | 6 --- htroot/PeerLoadPicture.java | 2 - source/de/anomic/crawler/CrawlStacker.java | 42 +++++++++---------- .../de/anomic/plasma/plasmaSwitchboard.java | 23 ++++------ .../plasma/plasmaSwitchboardConstants.java | 13 ------ .../server/serverAbstractBlockingThread.java | 4 +- .../server/serverInstantBlockingThread.java | 5 ++- source/de/anomic/server/serverProcessor.java | 42 +++++++++++++++---- 8 files changed, 71 insertions(+), 66 deletions(-) diff --git a/defaults/yacy.init b/defaults/yacy.init index 0a42d7836..b2def6ce8 100644 --- a/defaults/yacy.init +++ b/defaults/yacy.init @@ -568,12 +568,6 @@ performanceSpeed=100 80_indexing_idlesleep=1000 80_indexing_busysleep=10 80_indexing_memprereq=6291456 -82_crawlstack_idlesleep=1000 -82_crawlstack_busysleep=0 -82_crawlstack_memprereq=1048576 -83_crawlstack_idlesleep=1200 -83_crawlstack_busysleep=0 -83_crawlstack_memprereq=1048576 90_cleanup_idlesleep=300000 90_cleanup_busysleep=300000 90_cleanup_memprereq=0 diff --git a/htroot/PeerLoadPicture.java b/htroot/PeerLoadPicture.java index 30a5d2797..ebaef2b8d 100644 --- a/htroot/PeerLoadPicture.java +++ b/htroot/PeerLoadPicture.java @@ -29,8 +29,6 @@ public class PeerLoadPicture { final CircleThreadPiece misc = new CircleThreadPiece("Misc.", new Color(190, 50, 180)); final HashMap pieces = new HashMap(); pieces.put(null, idle); - pieces.put(plasmaSwitchboardConstants.CRAWLSTACK0, new CircleThreadPiece("Stacking0", new Color(115, 200, 210))); - pieces.put(plasmaSwitchboardConstants.CRAWLSTACK1, new CircleThreadPiece("Stacking1", new Color(115, 200, 210))); pieces.put(plasmaSwitchboardConstants.INDEXER, new CircleThreadPiece("Parsing/Indexing", new Color(255, 130, 0))); pieces.put(plasmaSwitchboardConstants.INDEX_DIST, new CircleThreadPiece("DHT-Distribution", new Color(119, 136, 153))); pieces.put(plasmaSwitchboardConstants.PEER_PING, new CircleThreadPiece("YaCy Core", new Color(255, 230, 160))); diff --git a/source/de/anomic/crawler/CrawlStacker.java b/source/de/anomic/crawler/CrawlStacker.java index 82a13da11..2fbc90b28 100644 --- a/source/de/anomic/crawler/CrawlStacker.java +++ b/source/de/anomic/crawler/CrawlStacker.java @@ -31,15 +31,13 @@ package de.anomic.crawler; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Date; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; import de.anomic.index.indexReferenceBlacklist; import de.anomic.index.indexURLReference; import de.anomic.plasma.plasmaSwitchboard; import de.anomic.plasma.plasmaWordIndex; import de.anomic.server.serverDomains; +import de.anomic.server.serverProcessor; import de.anomic.server.logging.serverLog; import de.anomic.yacy.yacyURL; @@ -47,7 +45,7 @@ public final class CrawlStacker { final serverLog log = new serverLog("STACKCRAWL"); - private BlockingQueue fastQueue, slowQueue; + private serverProcessor fastQueue, slowQueue; private long dnsHit, dnsMiss; private CrawlQueues nextQueue; private plasmaWordIndex wordIndex; @@ -67,13 +65,14 @@ public final class CrawlStacker { this.acceptLocalURLs = acceptLocalURLs; this.acceptGlobalURLs = acceptGlobalURLs; - this.fastQueue = new LinkedBlockingQueue(); - this.slowQueue = new ArrayBlockingQueue(1000); + this.fastQueue = new serverProcessor(this, "job", 10000, null, 2); + this.slowQueue = new serverProcessor(this, "job", 1000, null, 5); + this.log.logInfo("STACKCRAWL thread initialized."); } public int size() { - return this.fastQueue.size() + this.slowQueue.size(); + return this.fastQueue.queueSize() + this.slowQueue.queueSize(); } public void clear() { @@ -83,9 +82,10 @@ public final class CrawlStacker { public void close() { this.log.logInfo("Shutdown. Flushing remaining " + size() + " crawl stacker job entries. please wait."); - while (size() > 0) { - if (!job()) break; - } + this.fastQueue.announceShutdown(); + this.slowQueue.announceShutdown(); + this.fastQueue.awaitShutdown(2000); + this.slowQueue.awaitShutdown(2000); this.log.logInfo("Shutdown. Closing stackCrawl queue."); @@ -107,19 +107,17 @@ public final class CrawlStacker { } } + /* public boolean job() { - if (this.fastQueue.size() > 0 && job(this.fastQueue)) return true; - if (this.slowQueue.size() == 0) return false; + if (this.fastQueue.queueSize() > 0 && job(this.fastQueue)) return true; + if (this.slowQueue.queueSize() == 0) return false; return job(this.slowQueue); } + */ - private boolean job(BlockingQueue queue) { + public CrawlEntry job(CrawlEntry entry) { // this is the method that is called by the busy thread from outside - if (queue.size() == 0) return false; - - // get the next entry from the queue - CrawlEntry entry = queue.poll(); - if (entry == null) return false; + if (entry == null) return null; try { final String rejectReason = stackCrawl(entry); @@ -132,9 +130,9 @@ public final class CrawlStacker { } } catch (final Exception e) { CrawlStacker.this.log.logWarning("Error while processing stackCrawl entry.\n" + "Entry: " + entry.toString() + "Error: " + e.toString(), e); - return false; + return null; } - return true; + return null; } public void enqueueEntry(final CrawlEntry entry) { @@ -144,14 +142,14 @@ public final class CrawlStacker { if (prefetchHost(entry.url().getHost())) { try { - this.fastQueue.put(entry); + this.fastQueue.enQueue(entry); this.dnsHit++; } catch (InterruptedException e) { e.printStackTrace(); } } else { try { - this.slowQueue.put(entry); + this.slowQueue.enQueue(entry); this.dnsMiss++; } catch (InterruptedException e) { e.printStackTrace(); diff --git a/source/de/anomic/plasma/plasmaSwitchboard.java b/source/de/anomic/plasma/plasmaSwitchboard.java index d77218ac9..d3189cb4b 100644 --- a/source/de/anomic/plasma/plasmaSwitchboard.java +++ b/source/de/anomic/plasma/plasmaSwitchboard.java @@ -578,10 +578,6 @@ public final class plasmaSwitchboard extends serverAbstractSwitch= getConfigLong(plasmaSwitchboardConstants.CRAWLSTACK_SLOTS, 2000)) { - if (this.log.isFine()) log.logFine("deQueue: too many processes in stack crawl thread queue (" + "stackCrawlQueue=" + crawlStacker.size() + ")"); - return doneSomething; - } - // if we were interrupted we should return now if (Thread.currentThread().isInterrupted()) { if (this.log.isFine()) log.logFine("deQueue: thread was interrupted"); diff --git a/source/de/anomic/plasma/plasmaSwitchboardConstants.java b/source/de/anomic/plasma/plasmaSwitchboardConstants.java index 4535dcd2d..165ea6a69 100644 --- a/source/de/anomic/plasma/plasmaSwitchboardConstants.java +++ b/source/de/anomic/plasma/plasmaSwitchboardConstants.java @@ -135,19 +135,6 @@ public final class plasmaSwitchboardConstants { public static final String INDEXER_METHOD_JOBCOUNT = "queueSize"; public static final String INDEXER_METHOD_FREEMEM = "deQueueFreeMem"; public static final String INDEXER_SLOTS = "indexer.slots"; - // 82_crawlstack - /** - *

public static final String CRAWLSTACK = "82_crawlstack"

- *

Name of the crawl stacker thread, performing several checks on new URLs to crawl, i.e. double-check

- */ - public static final String CRAWLSTACK0 = "82_crawlstack"; - public static final String CRAWLSTACK1 = "83_crawlstack"; - public static final String CRAWLSTACK_METHOD_START = "job"; - public static final String CRAWLSTACK_METHOD_JOBCOUNT = "size"; - public static final String CRAWLSTACK_METHOD_FREEMEM = null; - public static final String CRAWLSTACK_IDLESLEEP = "82_crawlstack_idlesleep"; - public static final String CRAWLSTACK_BUSYSLEEP = "82_crawlstack_busysleep"; - public static final String CRAWLSTACK_SLOTS = "stacker.slots"; // 90_cleanup /** *

public static final String CLEANUP = "90_cleanup"

diff --git a/source/de/anomic/server/serverAbstractBlockingThread.java b/source/de/anomic/server/serverAbstractBlockingThread.java index 37f98f130..f14e5ca68 100644 --- a/source/de/anomic/server/serverAbstractBlockingThread.java +++ b/source/de/anomic/server/serverAbstractBlockingThread.java @@ -46,6 +46,7 @@ public abstract class serverAbstractBlockingThread return this.output; } + @SuppressWarnings("unchecked") public void run() { this.open(); if (log != null) { @@ -61,10 +62,11 @@ public abstract class serverAbstractBlockingThread timestamp = System.currentTimeMillis(); memstamp0 = serverMemory.used(); final J in = this.input.take(); - if ((in == null) || (in.status == serverProcessorJob.STATUS_POISON)) { + if ((in == null) || (in == serverProcessorJob.poisonPill) || (in.status == serverProcessorJob.STATUS_POISON)) { // the poison pill: shutdown // a null element is pushed to the queue on purpose to signal // that a termination should be made + if (this.output != null) this.output.enQueue((J) serverProcessorJob.poisonPill); // pass on the pill this.running = false; break; } diff --git a/source/de/anomic/server/serverInstantBlockingThread.java b/source/de/anomic/server/serverInstantBlockingThread.java index a143012e1..fc81eeb37 100644 --- a/source/de/anomic/server/serverInstantBlockingThread.java +++ b/source/de/anomic/server/serverInstantBlockingThread.java @@ -76,7 +76,10 @@ public class serverInstantBlockingThread extends s @SuppressWarnings("unchecked") public J job(final J next) throws Exception { - if (next == null || next == serverProcessorJob.poisonPill) return null; // poison pill: shutdown + // see if we got a poison pill to tell us to shut down + if (next == null) return (J) serverProcessorJob.poisonPill; + if (next == serverProcessorJob.poisonPill || next.status == serverProcessorJob.STATUS_POISON) return next; + instantThreadCounter++; //System.out.println("started job " + this.handle + ": " + this.getName()); jobs.put(this.handle, this.getName()); diff --git a/source/de/anomic/server/serverProcessor.java b/source/de/anomic/server/serverProcessor.java index 04a971012..1b9fa2f86 100644 --- a/source/de/anomic/server/serverProcessor.java +++ b/source/de/anomic/server/serverProcessor.java @@ -63,7 +63,24 @@ public class serverProcessor { } public int queueSize() { - return input.size(); + return this.input.size(); + } + + public void clear() { + if (this.input != null) this.input.clear(); + } + + public synchronized void relaxCapacity() { + if (this.input.size() == 0) return; + if (this.input.remainingCapacity() > 1000) return; + BlockingQueue i = new LinkedBlockingQueue(); + J e; + while (this.input.size() > 0) { + e = this.input.poll(); + if (e == null) break; + i.add(e); + } + this.input = i; } @SuppressWarnings("unchecked") @@ -89,20 +106,31 @@ public class serverProcessor { } @SuppressWarnings("unchecked") - public void shutdown(final long millisTimeout) { + public void announceShutdown() { if (executor == null) return; if (executor.isShutdown()) return; + // before we put pills into the queue, make sure that they will take them + relaxCapacity(); // put poison pills into the queue for (int i = 0; i < poolsize; i++) { try { + serverLog.logInfo("serverProcessor", "putting poison pill in queue for " + this.methodName + ", thread " + i); input.put((J) serverProcessorJob.poisonPill); // put a poison pill into the queue which will kill the job + serverLog.logInfo("serverProcessor", ".. poison pill is in queue for " + this.methodName + ", thread " + i + ". awaiting termination"); } catch (final InterruptedException e) { } } - // wait for shutdown - try { - executor.awaitTermination(millisTimeout, TimeUnit.MILLISECONDS); - } catch (final InterruptedException e) {} - executor.shutdown(); + } + + public void awaitShutdown(final long millisTimeout) { + if (executor != null & !executor.isShutdown()) { + // wait for shutdown + try { + executor.awaitTermination(millisTimeout, TimeUnit.MILLISECONDS); + + } catch (final InterruptedException e) {} + executor.shutdown(); + } + serverLog.logInfo("serverProcessor", "queue for " + this.methodName + ": shutdown."); this.executor = null; this.input = null; }