diff --git a/source/de/anomic/plasma/plasmaCrawlStacker.java b/source/de/anomic/plasma/plasmaCrawlStacker.java index f8f017fd9..e71ed04b1 100644 --- a/source/de/anomic/plasma/plasmaCrawlStacker.java +++ b/source/de/anomic/plasma/plasmaCrawlStacker.java @@ -44,8 +44,6 @@ // the intact and unchanged copyright notice. // Contributions and changes to the program code must be marked as such. - - package de.anomic.plasma; import java.io.File; @@ -54,12 +52,14 @@ import java.net.InetAddress; import java.net.MalformedURLException; import java.net.URL; import java.net.UnknownHostException; -import java.util.ArrayList; import java.util.Date; import java.util.Iterator; import java.util.LinkedList; +import org.apache.commons.pool.impl.GenericObjectPool; + import de.anomic.data.robotsParser; +import de.anomic.http.httpc; import de.anomic.kelondro.kelondroTree; import de.anomic.kelondro.kelondroRecords.Node; import de.anomic.server.serverCodings; @@ -70,8 +70,10 @@ import de.anomic.yacy.yacyCore; public final class plasmaCrawlStacker { - private final serverLog log = new serverLog("STACKCRAWL"); - private final plasmaSwitchboard sb; + final WorkerPool theWorkerPool; + final ThreadGroup theWorkerThreadGroup = new ThreadGroup("stackCrawlThreadGroup"); + final serverLog log = new serverLog("STACKCRAWL"); + final plasmaSwitchboard sb; private boolean stopped = false; private stackCrawlQueue queue; @@ -81,6 +83,9 @@ public final class plasmaCrawlStacker { this.queue = new stackCrawlQueue(dbPath,dbCacheSize); this.log.logInfo(this.queue.size() + " entries in the stackCrawl queue."); this.log.logInfo("STACKCRAWL thread initialized."); + + this.theWorkerPool = new WorkerPool(new WorkterFactory(this.theWorkerThreadGroup)); + } public int size() { @@ -92,22 +97,11 @@ public final class plasmaCrawlStacker { // getting a new message from the crawler queue stackCrawlMessage theMsg = this.queue.waitForMessage(); - // process message - String rejectReason = dequeue(theMsg); - - if (rejectReason != null) { - this.sb.urlPool.errorURL.newEntry( - new URL(theMsg.url()), - theMsg.referrerHash(), - theMsg.initiatorHash(), - yacyCore.seedDB.mySeed.hash, - theMsg.name, - rejectReason, - new bitfield(plasmaURL.urlFlagLength), - false - ); - } + // getting a free session thread from the pool + Worker worker = (Worker) this.theWorkerPool.borrowObject(); + // processing the new request + worker.execute(theMsg); } catch (InterruptedException e) { Thread.interrupted(); this.stopped = true; @@ -142,7 +136,7 @@ public final class plasmaCrawlStacker { } } - private String dequeue(stackCrawlMessage theMsg) throws InterruptedException { + public String dequeue(stackCrawlMessage theMsg) throws InterruptedException { plasmaCrawlProfile.entry profile = this.sb.profiles.getEntry(theMsg.profileHandle()); if (profile == null) { @@ -165,6 +159,7 @@ public final class plasmaCrawlStacker { // stacks a crawl item. The position can also be remote // returns null if successful, a reason string if not successful + long startTime = System.currentTimeMillis(); String reason = null; // failure reason // strange errors @@ -187,7 +182,8 @@ public final class plasmaCrawlStacker { nexturl = new URL(nexturlString); } catch (MalformedURLException e) { reason = "denied_(url_'" + nexturlString + "'_wrong)"; - this.log.logSevere("Wrong URL in stackCrawl: " + nexturlString); + this.log.logSevere("Wrong URL in stackCrawl: " + nexturlString + + ". Stack processing time: " + (System.currentTimeMillis()-startTime)); return reason; } @@ -196,16 +192,19 @@ public final class plasmaCrawlStacker { InetAddress hostAddress = InetAddress.getByName(nexturl.getHost()); if (hostAddress.isSiteLocalAddress()) { reason = "denied_(private_ip_address)"; - this.log.logFine("Host in URL '" + nexturlString + "' has private ip address."); + this.log.logFine("Host in URL '" + nexturlString + "' has private ip address." + + "Stack processing time: " + (System.currentTimeMillis()-startTime)); return reason; } else if (hostAddress.isLoopbackAddress()) { reason = "denied_(loopback_ip_address)"; - this.log.logFine("Host in URL '" + nexturlString + "' has loopback ip address."); + this.log.logFine("Host in URL '" + nexturlString + "' has loopback ip address." + + "Stack processing time: " + (System.currentTimeMillis()-startTime)); return reason; } } catch (UnknownHostException e) { reason = "denied_(unknown_host)"; - this.log.logFine("Unknown host in URL '" + nexturlString + "'."); + this.log.logFine("Unknown host in URL '" + nexturlString + "'." + + "Stack processing time: " + (System.currentTimeMillis()-startTime)); return reason; } @@ -213,7 +212,8 @@ public final class plasmaCrawlStacker { String hostlow = nexturl.getHost().toLowerCase(); if (plasmaSwitchboard.urlBlacklist.isListed(hostlow, nexturl.getPath())) { reason = "denied_(url_in_blacklist)"; - this.log.logFine("URL '" + nexturlString + "' is in blacklist."); + this.log.logFine("URL '" + nexturlString + "' is in blacklist." + + "Stack processing time: " + (System.currentTimeMillis()-startTime)); return reason; } @@ -223,7 +223,8 @@ public final class plasmaCrawlStacker { /* urlPool.errorURL.newEntry(nexturl, referrerHash, initiatorHash, yacyCore.seedDB.mySeed.hash, name, reason, new bitfield(plasmaURL.urlFlagLength), false);*/ - this.log.logFine("URL '" + nexturlString + "' does not match crawling filter '" + profile.generalFilter() + "'."); + this.log.logFine("URL '" + nexturlString + "' does not match crawling filter '" + profile.generalFilter() + "'." + + "Stack processing time: " + (System.currentTimeMillis()-startTime)); return reason; } @@ -233,7 +234,8 @@ public final class plasmaCrawlStacker { /* urlPool.errorURL.newEntry(nexturl, referrerHash, initiatorHash, yacyCore.seedDB.mySeed.hash, name, reason, new bitfield(plasmaURL.urlFlagLength), false);*/ - this.log.logFine("URL '" + nexturlString + "' is cgi URL."); + this.log.logFine("URL '" + nexturlString + "' is cgi URL." + + "Stack processing time: " + (System.currentTimeMillis()-startTime)); return reason; } @@ -243,7 +245,8 @@ public final class plasmaCrawlStacker { /* urlPool.errorURL.newEntry(nexturl, referrerHash, initiatorHash, yacyCore.seedDB.mySeed.hash, name, reason, new bitfield(plasmaURL.urlFlagLength), false);*/ - this.log.logFine("URL '" + nexturlString + "' is post URL."); + this.log.logFine("URL '" + nexturlString + "' is post URL." + + "Stack processing time: " + (System.currentTimeMillis()-startTime)); return reason; } @@ -255,7 +258,8 @@ public final class plasmaCrawlStacker { /* urlPool.errorURL.newEntry(nexturl, referrerHash, initiatorHash, yacyCore.seedDB.mySeed.hash, name, reason, new bitfield(plasmaURL.urlFlagLength), false);*/ - this.log.logFine("URL '" + nexturlString + "' is double registered in '" + dbocc + "'."); + this.log.logFine("URL '" + nexturlString + "' is double registered in '" + dbocc + "'." + + "Stack processing time: " + (System.currentTimeMillis()-startTime)); return reason; } @@ -265,7 +269,8 @@ public final class plasmaCrawlStacker { /* urlPool.errorURL.newEntry(nexturl, referrerHash, initiatorHash, yacyCore.seedDB.mySeed.hash, name, reason, new bitfield(plasmaURL.urlFlagLength), false);*/ - this.log.logFine("Crawling of URL '" + nexturlString + "' disallowed by robots.txt."); + this.log.logFine("Crawling of URL '" + nexturlString + "' disallowed by robots.txt." + + "Stack processing time: " + (System.currentTimeMillis()-startTime)); return reason; } @@ -280,7 +285,7 @@ public final class plasmaCrawlStacker { (yacyCore.seedDB.mySeed.isPrincipal())) /* qualified */; if ((!local)&&(!global)) { - this.log.logFine("URL '" + nexturlString + "' can neither be crawled local nor global."); + this.log.logSevere("URL '" + nexturlString + "' can neither be crawled local nor global."); } this.sb.urlPool.noticeURL.newEntry(initiatorHash, /* initiator, needed for p2p-feedback */ @@ -531,7 +536,7 @@ public final class plasmaCrawlStacker { byte[][] entryBytes = null; stackCrawlMessage newMessage = null; synchronized(this.urlEntryHashCache) { - urlHash = (String) this.urlEntryHashCache.removeLast(); + urlHash = (String) this.urlEntryHashCache.removeFirst(); entryBytes = this.urlEntryCache.remove(urlHash.getBytes()); } @@ -542,4 +547,263 @@ public final class plasmaCrawlStacker { } } + public final class WorkterFactory implements org.apache.commons.pool.PoolableObjectFactory { + + final ThreadGroup workerThreadGroup; + public WorkterFactory(ThreadGroup theWorkerThreadGroup) { + super(); + + if (theWorkerThreadGroup == null) + throw new IllegalArgumentException("The threadgroup object must not be null."); + + this.workerThreadGroup = theWorkerThreadGroup; } + + public Object makeObject() { + Worker newWorker = new Worker(this.workerThreadGroup); + newWorker.setPriority(Thread.MAX_PRIORITY); + return newWorker; + } + + /** + * @see org.apache.commons.pool.PoolableObjectFactory#destroyObject(java.lang.Object) + */ + public void destroyObject(Object obj) { + if (obj instanceof Worker) { + Worker theWorker = (Worker) obj; + theWorker.setStopped(true); + } + } + + /** + * @see org.apache.commons.pool.PoolableObjectFactory#validateObject(java.lang.Object) + */ + public boolean validateObject(Object obj) { + if (obj instanceof Worker) + { + Worker theWorker = (Worker) obj; + if (!theWorker.isAlive() || theWorker.isInterrupted()) return false; + if (theWorker.isRunning()) return true; + return false; + } + return true; + } + + /** + * @param obj + * + */ + public void activateObject(Object obj) { + //log.debug(" activateObject..."); + } + + /** + * @param obj + * + */ + public void passivateObject(Object obj) { + //log.debug(" passivateObject..." + obj); +// if (obj instanceof Session) { +// Session theSession = (Session) obj; +// } + } + } + + public final class WorkerPool extends GenericObjectPool { + public boolean isClosed = false; + + /** + * First constructor. + * @param objFactory + */ + public WorkerPool(WorkterFactory objFactory) { + super(objFactory); + this.setMaxIdle(10); // Maximum idle threads. + this.setMaxActive(50); // Maximum active threads. + this.setMinEvictableIdleTimeMillis(30000); //Evictor runs every 30 secs. + //this.setMaxWait(1000); // Wait 1 second till a thread is available + } + + public WorkerPool(plasmaStackCrawlThread.WorkterFactory objFactory, + GenericObjectPool.Config config) { + super(objFactory, config); + } + + public Object borrowObject() throws Exception { + return super.borrowObject(); + } + + public void returnObject(Object obj) throws Exception { + super.returnObject(obj); + } + + public synchronized void close() throws Exception { + + /* + * shutdown all still running session threads ... + */ + this.isClosed = true; + + /* waiting for all threads to finish */ + int threadCount = theWorkerThreadGroup.activeCount(); + Thread[] threadList = new Thread[threadCount]; + threadCount = theWorkerThreadGroup.enumerate(threadList); + + try { + // trying to gracefull stop all still running sessions ... + log.logInfo("Signaling shutdown to " + threadCount + " remaining stackCrawl threads ..."); + for ( int currentThreadIdx = 0; currentThreadIdx < threadCount; currentThreadIdx++ ) { + Thread currentThread = threadList[currentThreadIdx]; + if (currentThread.isAlive()) { + ((Worker)currentThread).setStopped(true); + } + } + + // waiting a frew ms for the session objects to continue processing + try { Thread.sleep(500); } catch (InterruptedException ex) {} + + // interrupting all still running or pooled threads ... + log.logInfo("Sending interruption signal to " + theWorkerThreadGroup.activeCount() + " remaining stackCrawl threads ..."); + theWorkerThreadGroup.interrupt(); + + // if there are some sessions that are blocking in IO, we simply close the socket + log.logFine("Trying to abort " + theWorkerThreadGroup.activeCount() + " remaining stackCrawl threads ..."); + for ( int currentThreadIdx = 0; currentThreadIdx < threadCount; currentThreadIdx++ ) { + Thread currentThread = threadList[currentThreadIdx]; + if (currentThread.isAlive()) { + log.logInfo("Trying to shutdown stackCrawl thread '" + currentThread.getName() + "' [" + currentThreadIdx + "]."); + ((Worker)currentThread).close(); + } + } + + // we need to use a timeout here because of missing interruptable session threads ... + log.logFine("Waiting for " + theWorkerThreadGroup.activeCount() + " remaining stackCrawl threads to finish shutdown ..."); + for ( int currentThreadIdx = 0; currentThreadIdx < threadCount; currentThreadIdx++ ) { + Thread currentThread = threadList[currentThreadIdx]; + if (currentThread.isAlive()) { + log.logFine("Waiting for stackCrawl thread '" + currentThread.getName() + "' [" + currentThreadIdx + "] to finish shutdown."); + try { currentThread.join(500); } catch (InterruptedException ex) {} + } + } + + log.logInfo("Shutdown of remaining stackCrawl threads finish."); + } catch (Exception e) { + log.logSevere("Unexpected error while trying to shutdown all remaining stackCrawl threads.",e); + } + + super.close(); + } + + } + + public final class Worker extends Thread { + private boolean running = false; + private boolean stopped = false; + private boolean done = false; + private stackCrawlMessage theMsg; + + public Worker(ThreadGroup theThreadGroup) { + super(theThreadGroup,"stackCrawlThread"); + } + + public void setStopped(boolean stopped) { + this.stopped = stopped; + } + + public void close() { + if (this.isAlive()) { + try { + // trying to close all still open httpc-Sockets first + int closedSockets = httpc.closeOpenSockets(this); + if (closedSockets > 0) { + log.logInfo(closedSockets + " HTTP-client sockets of thread '" + this.getName() + "' closed."); + } + } catch (Exception e) {} + } + } + + public synchronized void execute(stackCrawlMessage newMsg) { + this.theMsg = newMsg; + this.done = false; + + if (!this.running) { + // this.setDaemon(true); + this.start(); + } else { + this.notifyAll(); + } + } + + public void reset() { + this.done = true; + this.theMsg = null; + } + + public boolean isRunning() { + return this.running; + } + + public void run() { + this.running = true; + + // The thread keeps running. + while (!this.stopped && !Thread.interrupted()) { + if (this.done) { + // We are waiting for a task now. + synchronized (this) { + try { + this.wait(); //Wait until we get a request to process. + } catch (InterruptedException e) { + this.stopped = true; + // log.error("", e); + } + } + } else { + //There is a task....let us execute it. + try { + execute(); + } catch (Exception e) { + // log.error("", e); + } finally { + reset(); + + if (!this.stopped && !this.isInterrupted() && !theWorkerPool.isClosed) { + try { + this.setName("stackCrawlThread_inPool"); + theWorkerPool.returnObject(this); + } catch (Exception e1) { + // e1.printStackTrace(); + this.stopped = true; + } + } + } + } + } + } + + private void execute() throws InterruptedException { + try { + String rejectReason = dequeue(this.theMsg); + + if (rejectReason != null) { + sb.urlPool.errorURL.newEntry( + new URL(this.theMsg.url()), + this.theMsg.referrerHash(), + this.theMsg.initiatorHash(), + yacyCore.seedDB.mySeed.hash, + this.theMsg.name, + rejectReason, + new bitfield(plasmaURL.urlFlagLength), + false + ); + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + this.done = true; + } + + } + } +} +