replaced busy thread control of crawl stacker by blocking threads

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@5400 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
orbiter 16 years ago
parent f29b48d9ff
commit d376d81fc4

@ -568,12 +568,6 @@ performanceSpeed=100
80_indexing_idlesleep=1000
80_indexing_busysleep=10
80_indexing_memprereq=6291456
82_crawlstack_idlesleep=1000
82_crawlstack_busysleep=0
82_crawlstack_memprereq=1048576
83_crawlstack_idlesleep=1200
83_crawlstack_busysleep=0
83_crawlstack_memprereq=1048576
90_cleanup_idlesleep=300000
90_cleanup_busysleep=300000
90_cleanup_memprereq=0

@ -29,8 +29,6 @@ public class PeerLoadPicture {
final CircleThreadPiece misc = new CircleThreadPiece("Misc.", new Color(190, 50, 180));
final HashMap<String, CircleThreadPiece> pieces = new HashMap<String, CircleThreadPiece>();
pieces.put(null, idle);
pieces.put(plasmaSwitchboardConstants.CRAWLSTACK0, new CircleThreadPiece("Stacking0", new Color(115, 200, 210)));
pieces.put(plasmaSwitchboardConstants.CRAWLSTACK1, new CircleThreadPiece("Stacking1", new Color(115, 200, 210)));
pieces.put(plasmaSwitchboardConstants.INDEXER, new CircleThreadPiece("Parsing/Indexing", new Color(255, 130, 0)));
pieces.put(plasmaSwitchboardConstants.INDEX_DIST, new CircleThreadPiece("DHT-Distribution", new Color(119, 136, 153)));
pieces.put(plasmaSwitchboardConstants.PEER_PING, new CircleThreadPiece("YaCy Core", new Color(255, 230, 160)));

@ -31,15 +31,13 @@ package de.anomic.crawler;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Date;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import de.anomic.index.indexReferenceBlacklist;
import de.anomic.index.indexURLReference;
import de.anomic.plasma.plasmaSwitchboard;
import de.anomic.plasma.plasmaWordIndex;
import de.anomic.server.serverDomains;
import de.anomic.server.serverProcessor;
import de.anomic.server.logging.serverLog;
import de.anomic.yacy.yacyURL;
@ -47,7 +45,7 @@ public final class CrawlStacker {
final serverLog log = new serverLog("STACKCRAWL");
private BlockingQueue<CrawlEntry> fastQueue, slowQueue;
private serverProcessor<CrawlEntry> fastQueue, slowQueue;
private long dnsHit, dnsMiss;
private CrawlQueues nextQueue;
private plasmaWordIndex wordIndex;
@ -67,13 +65,14 @@ public final class CrawlStacker {
this.acceptLocalURLs = acceptLocalURLs;
this.acceptGlobalURLs = acceptGlobalURLs;
this.fastQueue = new LinkedBlockingQueue<CrawlEntry>();
this.slowQueue = new ArrayBlockingQueue<CrawlEntry>(1000);
this.fastQueue = new serverProcessor<CrawlEntry>(this, "job", 10000, null, 2);
this.slowQueue = new serverProcessor<CrawlEntry>(this, "job", 1000, null, 5);
this.log.logInfo("STACKCRAWL thread initialized.");
}
public int size() {
return this.fastQueue.size() + this.slowQueue.size();
return this.fastQueue.queueSize() + this.slowQueue.queueSize();
}
public void clear() {
@ -83,9 +82,10 @@ public final class CrawlStacker {
public void close() {
this.log.logInfo("Shutdown. Flushing remaining " + size() + " crawl stacker job entries. please wait.");
while (size() > 0) {
if (!job()) break;
}
this.fastQueue.announceShutdown();
this.slowQueue.announceShutdown();
this.fastQueue.awaitShutdown(2000);
this.slowQueue.awaitShutdown(2000);
this.log.logInfo("Shutdown. Closing stackCrawl queue.");
@ -107,19 +107,17 @@ public final class CrawlStacker {
}
}
/*
public boolean job() {
if (this.fastQueue.size() > 0 && job(this.fastQueue)) return true;
if (this.slowQueue.size() == 0) return false;
if (this.fastQueue.queueSize() > 0 && job(this.fastQueue)) return true;
if (this.slowQueue.queueSize() == 0) return false;
return job(this.slowQueue);
}
*/
private boolean job(BlockingQueue<CrawlEntry> queue) {
public CrawlEntry job(CrawlEntry entry) {
// this is the method that is called by the busy thread from outside
if (queue.size() == 0) return false;
// get the next entry from the queue
CrawlEntry entry = queue.poll();
if (entry == null) return false;
if (entry == null) return null;
try {
final String rejectReason = stackCrawl(entry);
@ -132,9 +130,9 @@ public final class CrawlStacker {
}
} catch (final Exception e) {
CrawlStacker.this.log.logWarning("Error while processing stackCrawl entry.\n" + "Entry: " + entry.toString() + "Error: " + e.toString(), e);
return false;
return null;
}
return true;
return null;
}
public void enqueueEntry(final CrawlEntry entry) {
@ -144,14 +142,14 @@ public final class CrawlStacker {
if (prefetchHost(entry.url().getHost())) {
try {
this.fastQueue.put(entry);
this.fastQueue.enQueue(entry);
this.dnsHit++;
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
try {
this.slowQueue.put(entry);
this.slowQueue.enQueue(entry);
this.dnsMiss++;
} catch (InterruptedException e) {
e.printStackTrace();

@ -578,10 +578,6 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<IndexingStack.
deployThread(plasmaSwitchboardConstants.CLEANUP, "Cleanup", "simple cleaning process for monitoring information", null,
new serverInstantBusyThread(this, plasmaSwitchboardConstants.CLEANUP_METHOD_START, plasmaSwitchboardConstants.CLEANUP_METHOD_JOBCOUNT, plasmaSwitchboardConstants.CLEANUP_METHOD_FREEMEM), 600000); // all 5 Minutes, wait 10 minutes until first run
deployThread(plasmaSwitchboardConstants.CRAWLSTACK0, "Crawl URL Stacker", "process that checks url for double-occurrences and for allowance/disallowance by robots.txt", null,
new serverInstantBusyThread(crawlStacker, plasmaSwitchboardConstants.CRAWLSTACK_METHOD_START, plasmaSwitchboardConstants.CRAWLSTACK_METHOD_JOBCOUNT, plasmaSwitchboardConstants.CRAWLSTACK_METHOD_FREEMEM), 8000);
deployThread(plasmaSwitchboardConstants.CRAWLSTACK1, "Crawl URL Stacker", "process that checks url for double-occurrences and for allowance/disallowance by robots.txt", null,
new serverInstantBusyThread(crawlStacker, plasmaSwitchboardConstants.CRAWLSTACK_METHOD_START, plasmaSwitchboardConstants.CRAWLSTACK_METHOD_JOBCOUNT, plasmaSwitchboardConstants.CRAWLSTACK_METHOD_FREEMEM), 8000);
deployThread(plasmaSwitchboardConstants.INDEXER, "Indexing", "thread that either initiates a parsing/indexing queue, distributes the index into the DHT, stores parsed documents or flushes the index cache", "/IndexCreateIndexingQueue_p.html",
new serverInstantBusyThread(this, plasmaSwitchboardConstants.INDEXER_METHOD_START, plasmaSwitchboardConstants.INDEXER_METHOD_JOBCOUNT, plasmaSwitchboardConstants.INDEXER_METHOD_FREEMEM), 10000);
deployThread(plasmaSwitchboardConstants.CRAWLJOB_REMOTE_TRIGGERED_CRAWL, "Remote Crawl Job", "thread that performes a single crawl/indexing step triggered by a remote peer", null,
@ -1036,10 +1032,15 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<IndexingStack.
if (transferIdxThread != null) stopTransferWholeIndex(false);
log.logConfig("SWITCHBOARD SHUTDOWN STEP 2: sending termination signal to threaded indexing");
// closing all still running db importer jobs
indexingDocumentProcessor.shutdown(4000);
indexingCondensementProcessor.shutdown(3000);
indexingAnalysisProcessor.shutdown(2000);
indexingStorageProcessor.shutdown(1000);
indexingDocumentProcessor.announceShutdown();
crawlStacker.close();
indexingDocumentProcessor.awaitShutdown(4000);
indexingCondensementProcessor.announceShutdown();
indexingAnalysisProcessor.announceShutdown();
indexingStorageProcessor.announceShutdown();
indexingCondensementProcessor.awaitShutdown(3000);
indexingAnalysisProcessor.awaitShutdown(2000);
indexingStorageProcessor.awaitShutdown(1000);
this.dbImportManager.close();
JakartaCommonsHttpClient.closeAllConnections();
wikiDB.close();
@ -1048,7 +1049,6 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<IndexingStack.
userDB.close();
bookmarksDB.close();
messageDB.close();
crawlStacker.close();
robots.close();
parser.close();
webStructure.flushCitationReference("crg");
@ -1161,11 +1161,6 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<IndexingStack.
return doneSomething; // nothing to do
}
if (crawlStacker.size() >= getConfigLong(plasmaSwitchboardConstants.CRAWLSTACK_SLOTS, 2000)) {
if (this.log.isFine()) log.logFine("deQueue: too many processes in stack crawl thread queue (" + "stackCrawlQueue=" + crawlStacker.size() + ")");
return doneSomething;
}
// if we were interrupted we should return now
if (Thread.currentThread().isInterrupted()) {
if (this.log.isFine()) log.logFine("deQueue: thread was interrupted");

@ -135,19 +135,6 @@ public final class plasmaSwitchboardConstants {
public static final String INDEXER_METHOD_JOBCOUNT = "queueSize";
public static final String INDEXER_METHOD_FREEMEM = "deQueueFreeMem";
public static final String INDEXER_SLOTS = "indexer.slots";
// 82_crawlstack
/**
* <p><code>public static final String <strong>CRAWLSTACK</strong> = "82_crawlstack"</code></p>
* <p>Name of the crawl stacker thread, performing several checks on new URLs to crawl, i.e. double-check</p>
*/
public static final String CRAWLSTACK0 = "82_crawlstack";
public static final String CRAWLSTACK1 = "83_crawlstack";
public static final String CRAWLSTACK_METHOD_START = "job";
public static final String CRAWLSTACK_METHOD_JOBCOUNT = "size";
public static final String CRAWLSTACK_METHOD_FREEMEM = null;
public static final String CRAWLSTACK_IDLESLEEP = "82_crawlstack_idlesleep";
public static final String CRAWLSTACK_BUSYSLEEP = "82_crawlstack_busysleep";
public static final String CRAWLSTACK_SLOTS = "stacker.slots";
// 90_cleanup
/**
* <p><code>public static final String <strong>CLEANUP</strong> = "90_cleanup"</code></p>

@ -46,6 +46,7 @@ public abstract class serverAbstractBlockingThread<J extends serverProcessorJob>
return this.output;
}
@SuppressWarnings("unchecked")
public void run() {
this.open();
if (log != null) {
@ -61,10 +62,11 @@ public abstract class serverAbstractBlockingThread<J extends serverProcessorJob>
timestamp = System.currentTimeMillis();
memstamp0 = serverMemory.used();
final J in = this.input.take();
if ((in == null) || (in.status == serverProcessorJob.STATUS_POISON)) {
if ((in == null) || (in == serverProcessorJob.poisonPill) || (in.status == serverProcessorJob.STATUS_POISON)) {
// the poison pill: shutdown
// a null element is pushed to the queue on purpose to signal
// that a termination should be made
if (this.output != null) this.output.enQueue((J) serverProcessorJob.poisonPill); // pass on the pill
this.running = false;
break;
}

@ -76,7 +76,10 @@ public class serverInstantBlockingThread<J extends serverProcessorJob> extends s
@SuppressWarnings("unchecked")
public J job(final J next) throws Exception {
if (next == null || next == serverProcessorJob.poisonPill) return null; // poison pill: shutdown
// see if we got a poison pill to tell us to shut down
if (next == null) return (J) serverProcessorJob.poisonPill;
if (next == serverProcessorJob.poisonPill || next.status == serverProcessorJob.STATUS_POISON) return next;
instantThreadCounter++;
//System.out.println("started job " + this.handle + ": " + this.getName());
jobs.put(this.handle, this.getName());

@ -63,7 +63,24 @@ public class serverProcessor<J extends serverProcessorJob> {
}
public int queueSize() {
return input.size();
return this.input.size();
}
public void clear() {
if (this.input != null) this.input.clear();
}
public synchronized void relaxCapacity() {
if (this.input.size() == 0) return;
if (this.input.remainingCapacity() > 1000) return;
BlockingQueue<J> i = new LinkedBlockingQueue<J>();
J e;
while (this.input.size() > 0) {
e = this.input.poll();
if (e == null) break;
i.add(e);
}
this.input = i;
}
@SuppressWarnings("unchecked")
@ -89,20 +106,31 @@ public class serverProcessor<J extends serverProcessorJob> {
}
@SuppressWarnings("unchecked")
public void shutdown(final long millisTimeout) {
public void announceShutdown() {
if (executor == null) return;
if (executor.isShutdown()) return;
// before we put pills into the queue, make sure that they will take them
relaxCapacity();
// put poison pills into the queue
for (int i = 0; i < poolsize; i++) {
try {
serverLog.logInfo("serverProcessor", "putting poison pill in queue for " + this.methodName + ", thread " + i);
input.put((J) serverProcessorJob.poisonPill); // put a poison pill into the queue which will kill the job
serverLog.logInfo("serverProcessor", ".. poison pill is in queue for " + this.methodName + ", thread " + i + ". awaiting termination");
} catch (final InterruptedException e) { }
}
// wait for shutdown
try {
executor.awaitTermination(millisTimeout, TimeUnit.MILLISECONDS);
} catch (final InterruptedException e) {}
executor.shutdown();
}
public void awaitShutdown(final long millisTimeout) {
if (executor != null & !executor.isShutdown()) {
// wait for shutdown
try {
executor.awaitTermination(millisTimeout, TimeUnit.MILLISECONDS);
} catch (final InterruptedException e) {}
executor.shutdown();
}
serverLog.logInfo("serverProcessor", "queue for " + this.methodName + ": shutdown.");
this.executor = null;
this.input = null;
}

Loading…
Cancel
Save