From eb9b1389863d5f1e38f1d08435d372159b080cb6 Mon Sep 17 00:00:00 2001 From: theli Date: Mon, 4 Sep 2006 06:52:55 +0000 Subject: [PATCH] *) next step of restructuring for new crawlers - conversion of the crawler pool into a keyed object pool - crawlers are now loaded based on the url protocol (of course works only for http now) git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@2473 6c8d7289-2bf4-0310-a012-ef5d649a1542 --- htroot/IndexCreateLoaderQueue_p.java | 4 +- htroot/PerformanceQueues_p.java | 9 +-- htroot/YPStats_p.java | 5 +- htroot/xml/queues_p.java | 4 +- .../CrawlWorker.java} | 15 ++-- .../plasma/crawler/plasmaCrawlerFactory.java | 68 ++++++++++++++----- .../plasma/crawler/plasmaCrawlerPool.java | 43 ++++++------ .../de/anomic/plasma/plasmaCrawlLoader.java | 20 +++--- source/de/anomic/plasma/plasmaParser.java | 3 +- .../de/anomic/plasma/plasmaSnippetCache.java | 4 +- 10 files changed, 108 insertions(+), 67 deletions(-) rename source/de/anomic/plasma/crawler/{plasmaCrawlWorker.java => http/CrawlWorker.java} (96%) diff --git a/htroot/IndexCreateLoaderQueue_p.java b/htroot/IndexCreateLoaderQueue_p.java index b2bebb11f..8f0b54915 100644 --- a/htroot/IndexCreateLoaderQueue_p.java +++ b/htroot/IndexCreateLoaderQueue_p.java @@ -47,7 +47,7 @@ import de.anomic.data.wikiCode; import de.anomic.http.httpHeader; import de.anomic.plasma.plasmaCrawlLoaderMessage; import de.anomic.plasma.plasmaSwitchboard; -import de.anomic.plasma.crawler.plasmaCrawlWorker; +import de.anomic.plasma.crawler.http.CrawlWorker; import de.anomic.server.serverObjects; import de.anomic.server.serverSwitch; import de.anomic.yacy.yacyCore; @@ -74,7 +74,7 @@ public class IndexCreateLoaderQueue_p { yacySeed initiator; int i, count = 0; for (i = 0; i < threadCount; i++) { - plasmaCrawlWorker theWorker = (plasmaCrawlWorker)threadList[i]; + CrawlWorker theWorker = (CrawlWorker)threadList[i]; plasmaCrawlLoaderMessage theMsg = theWorker.theMsg; if (theMsg == null) continue; diff --git a/htroot/PerformanceQueues_p.java b/htroot/PerformanceQueues_p.java index 6bb18215e..c13e7252f 100644 --- a/htroot/PerformanceQueues_p.java +++ b/htroot/PerformanceQueues_p.java @@ -47,6 +47,7 @@ import java.util.Iterator; import java.util.Map; import java.io.File; +import org.apache.commons.pool.impl.GenericKeyedObjectPool; import org.apache.commons.pool.impl.GenericObjectPool; import de.anomic.http.httpHeader; @@ -193,12 +194,12 @@ public class PerformanceQueues_p { * configuring the crawler pool */ // getting the current crawler pool configuration - GenericObjectPool.Config crawlerPoolConfig = switchboard.cacheLoader.getPoolConfig(); + GenericKeyedObjectPool.Config crawlerPoolConfig = switchboard.cacheLoader.getPoolConfig(); int maxActive = Integer.parseInt(post.get("Crawler Pool_maxActive","8")); int maxIdle = Integer.parseInt(post.get("Crawler Pool_maxIdle","4")); int minIdle = Integer.parseInt(post.get("Crawler Pool_minIdle","0")); - crawlerPoolConfig.minIdle = (minIdle > maxIdle) ? maxIdle/2 : minIdle; + //crawlerPoolConfig.minIdle = (minIdle > maxIdle) ? maxIdle/2 : minIdle; crawlerPoolConfig.maxIdle = (maxIdle > maxActive) ? maxActive/2 : maxIdle; crawlerPoolConfig.maxActive = maxActive; @@ -287,11 +288,11 @@ public class PerformanceQueues_p { prop.put("assortmentCluster", asizes.length / 8); // table thread pool settings - GenericObjectPool.Config crawlerPoolConfig = switchboard.cacheLoader.getPoolConfig(); + GenericKeyedObjectPool.Config crawlerPoolConfig = switchboard.cacheLoader.getPoolConfig(); prop.put("pool_0_name","Crawler Pool"); prop.put("pool_0_maxActive",crawlerPoolConfig.maxActive); prop.put("pool_0_maxIdle",crawlerPoolConfig.maxIdle); - prop.put("pool_0_minIdle",crawlerPoolConfig.minIdle); + //prop.put("pool_0_minIdle",crawlerPoolConfig.minIdle); prop.put("pool_0_numActive",switchboard.cacheLoader.getNumActiveWorker()); prop.put("pool_0_numIdle",switchboard.cacheLoader.getNumIdleWorker()); diff --git a/htroot/YPStats_p.java b/htroot/YPStats_p.java index 140358f0b..539760890 100644 --- a/htroot/YPStats_p.java +++ b/htroot/YPStats_p.java @@ -46,6 +46,7 @@ import java.util.Iterator; +import org.apache.commons.pool.impl.GenericKeyedObjectPool; import org.apache.commons.pool.impl.GenericObjectPool; import de.anomic.http.httpHeader; @@ -156,11 +157,11 @@ public class YPStats_p { prop.put("wordCacheMaxCount", switchboard.getConfig("wordCacheMaxCount", "10000")); // table thread pool settings - GenericObjectPool.Config crawlerPoolConfig = switchboard.cacheLoader.getPoolConfig(); + GenericKeyedObjectPool.Config crawlerPoolConfig = switchboard.cacheLoader.getPoolConfig(); prop.put("pool_0_name","Crawler Pool"); prop.put("pool_0_maxActive",crawlerPoolConfig.maxActive); prop.put("pool_0_maxIdle",crawlerPoolConfig.maxIdle); - prop.put("pool_0_minIdle",crawlerPoolConfig.minIdle); + prop.put("pool_0_minIdle",0 /* crawlerPoolConfig.minIdle */); serverThread httpd = switchboard.getThread("10_httpd"); GenericObjectPool.Config httpdPoolConfig = ((serverCore)httpd).getPoolConfig(); diff --git a/htroot/xml/queues_p.java b/htroot/xml/queues_p.java index 91e26d088..1875773fc 100644 --- a/htroot/xml/queues_p.java +++ b/htroot/xml/queues_p.java @@ -57,7 +57,7 @@ import de.anomic.http.httpHeader; import de.anomic.plasma.plasmaCrawlLoaderMessage; import de.anomic.plasma.plasmaCrawlNURL; import de.anomic.plasma.plasmaCrawlProfile; -import de.anomic.plasma.crawler.plasmaCrawlWorker; +import de.anomic.plasma.crawler.http.CrawlWorker; import de.anomic.plasma.plasmaSwitchboard; import de.anomic.plasma.plasmaSwitchboardQueue; import de.anomic.server.serverObjects; @@ -150,7 +150,7 @@ public class queues_p { } int i, count = 0; for (i = 0; i < size; i++) { - plasmaCrawlWorker theWorker = (plasmaCrawlWorker)threadList[i]; + CrawlWorker theWorker = (CrawlWorker)threadList[i]; plasmaCrawlLoaderMessage theMsg = theWorker.theMsg; if (theMsg == null) continue; diff --git a/source/de/anomic/plasma/crawler/plasmaCrawlWorker.java b/source/de/anomic/plasma/crawler/http/CrawlWorker.java similarity index 96% rename from source/de/anomic/plasma/crawler/plasmaCrawlWorker.java rename to source/de/anomic/plasma/crawler/http/CrawlWorker.java index 4c171afdb..25800c8b1 100644 --- a/source/de/anomic/plasma/crawler/plasmaCrawlWorker.java +++ b/source/de/anomic/plasma/crawler/http/CrawlWorker.java @@ -42,7 +42,7 @@ //the intact and unchanged copyright notice. //Contributions and changes to the program code must be marked as such. -package de.anomic.plasma.crawler; +package de.anomic.plasma.crawler.http; import java.io.File; import java.io.FileOutputStream; @@ -66,13 +66,14 @@ import de.anomic.plasma.plasmaCrawlProfile; import de.anomic.plasma.plasmaHTCache; import de.anomic.plasma.plasmaParser; import de.anomic.plasma.plasmaSwitchboard; +import de.anomic.plasma.crawler.plasmaCrawlerPool; import de.anomic.plasma.urlPattern.plasmaURLPattern; import de.anomic.server.serverSystem; import de.anomic.server.logging.serverLog; import de.anomic.tools.bitfield; import de.anomic.yacy.yacyCore; -public final class plasmaCrawlWorker extends Thread { +public final class CrawlWorker extends Thread { public static final int DEFAULT_CRAWLING_RETRY_COUNT = 5; public static final String threadBaseName = "CrawlerWorker"; @@ -93,12 +94,12 @@ public final class plasmaCrawlWorker extends Thread { private plasmaCrawlProfile.entry profile; // private String error; - boolean destroyed = false; + public boolean destroyed = false; private boolean running = false; private boolean stopped = false; private boolean done = false; - public plasmaCrawlWorker( + public CrawlWorker( ThreadGroup theTG, plasmaCrawlerPool thePool, plasmaSwitchboard theSb, @@ -162,7 +163,7 @@ public final class plasmaCrawlWorker extends Thread { if (this.done) { synchronized (this) { // return thread back into pool - this.myPool.returnObject(this); + this.myPool.returnObject("http",this); // We are waiting for a new task now. if (!this.stopped && !this.destroyed && !this.isInterrupted()) { @@ -182,14 +183,14 @@ public final class plasmaCrawlWorker extends Thread { serverLog.logFiner("CRAWLER-POOL","Interruption of thread '" + this.getName() + "' detected."); } finally { if (this.myPool != null && !this.destroyed) - this.myPool.invalidateObject(this); + this.myPool.invalidateObject("http",this); } } public void execute() { try { // setting threadname - this.setName(plasmaCrawlWorker.threadBaseName + "_" + this.url); + this.setName(CrawlWorker.threadBaseName + "_" + this.url); // refreshing timeout value this.socketTimeout = (int) this.sb.getConfigLong("crawler.clientTimeout", 10000); diff --git a/source/de/anomic/plasma/crawler/plasmaCrawlerFactory.java b/source/de/anomic/plasma/crawler/plasmaCrawlerFactory.java index ac179e0f9..cb9986ca9 100644 --- a/source/de/anomic/plasma/crawler/plasmaCrawlerFactory.java +++ b/source/de/anomic/plasma/crawler/plasmaCrawlerFactory.java @@ -1,10 +1,15 @@ package de.anomic.plasma.crawler; +import java.lang.reflect.Constructor; + +import org.apache.commons.pool.KeyedPoolableObjectFactory; + import de.anomic.plasma.plasmaHTCache; import de.anomic.plasma.plasmaSwitchboard; +import de.anomic.plasma.crawler.http.CrawlWorker; import de.anomic.server.logging.serverLog; -public final class plasmaCrawlerFactory implements org.apache.commons.pool.PoolableObjectFactory { +public final class plasmaCrawlerFactory implements KeyedPoolableObjectFactory { private plasmaCrawlerPool thePool; private final ThreadGroup theThreadGroup; @@ -16,7 +21,8 @@ public final class plasmaCrawlerFactory implements org.apache.commons.pool.Poola ThreadGroup threadGroup, plasmaSwitchboard theSb, plasmaHTCache theCacheManager, - serverLog log) { + serverLog log + ) { super(); @@ -36,25 +42,55 @@ public final class plasmaCrawlerFactory implements org.apache.commons.pool.Poola /** * @see org.apache.commons.pool.PoolableObjectFactory#makeObject() */ - public Object makeObject() { - return new plasmaCrawlWorker( - this.theThreadGroup, - this.thePool, - this.sb, - this.cacheManager, - this.theLog); + public Object makeObject(Object key) throws Exception { + if (!(key instanceof String)) + throw new IllegalArgumentException("The object key must be of type string."); + + // getting the class name + String className = this.getClass().getPackage().getName() + "." + key + ".CrawlWorker"; + + // loading class by name + Class moduleClass = Class.forName(className); + + // getting the constructor + Constructor classConstructor = moduleClass.getConstructor( new Class[] { + ThreadGroup.class, + plasmaCrawlerPool.class, + plasmaSwitchboard.class, + plasmaHTCache.class, + serverLog.class + } ); + + // instantiating class + CrawlWorker theCrawlWorker = (CrawlWorker) classConstructor.newInstance(new Object[] { + this.theThreadGroup, + this.thePool, + this.sb, + this.cacheManager, + this.theLog + }); + + // return the newly created object + return theCrawlWorker; + +// return new plasmaCrawlWorker( +// this.theThreadGroup, +// this.thePool, +// this.sb, +// this.cacheManager, +// this.theLog); } /** * @see org.apache.commons.pool.PoolableObjectFactory#destroyObject(java.lang.Object) */ - public void destroyObject(Object obj) { + public void destroyObject(Object key, Object obj) { if (obj == null) return; - if (obj instanceof plasmaCrawlWorker) { - plasmaCrawlWorker theWorker = (plasmaCrawlWorker) obj; + if (obj instanceof CrawlWorker) { + CrawlWorker theWorker = (CrawlWorker) obj; synchronized(theWorker) { theWorker.destroyed = true; - theWorker.setName(plasmaCrawlWorker.threadBaseName + "_destroyed"); + theWorker.setName(CrawlWorker.threadBaseName + "_destroyed"); theWorker.setStopped(true); theWorker.interrupt(); } @@ -64,7 +100,7 @@ public final class plasmaCrawlerFactory implements org.apache.commons.pool.Poola /** * @see org.apache.commons.pool.PoolableObjectFactory#validateObject(java.lang.Object) */ - public boolean validateObject(Object obj) { + public boolean validateObject(Object key, Object obj) { return true; } @@ -72,7 +108,7 @@ public final class plasmaCrawlerFactory implements org.apache.commons.pool.Poola * @param obj * */ - public void activateObject(Object obj) { + public void activateObject(Object key, Object obj) { //log.debug(" activateObject..."); } @@ -81,7 +117,7 @@ public final class plasmaCrawlerFactory implements org.apache.commons.pool.Poola * */ - public void passivateObject(Object obj) { + public void passivateObject(Object key, Object obj) { //log.debug(" passivateObject..." + obj); /* if (obj instanceof plasmaCrawlWorker) { diff --git a/source/de/anomic/plasma/crawler/plasmaCrawlerPool.java b/source/de/anomic/plasma/crawler/plasmaCrawlerPool.java index 322974c81..d15c705ff 100644 --- a/source/de/anomic/plasma/crawler/plasmaCrawlerPool.java +++ b/source/de/anomic/plasma/crawler/plasmaCrawlerPool.java @@ -1,49 +1,48 @@ package de.anomic.plasma.crawler; -import org.apache.commons.pool.impl.GenericObjectPool; +import org.apache.commons.pool.impl.GenericKeyedObjectPool; +import de.anomic.plasma.crawler.http.CrawlWorker; import de.anomic.server.logging.serverLog; -public final class plasmaCrawlerPool extends GenericObjectPool { +public final class plasmaCrawlerPool extends GenericKeyedObjectPool { private final ThreadGroup theThreadGroup; public boolean isClosed = false; - public plasmaCrawlerPool(plasmaCrawlerFactory objFactory, - GenericObjectPool.Config config, - ThreadGroup threadGroup) { + public plasmaCrawlerPool(plasmaCrawlerFactory objFactory, GenericKeyedObjectPool.Config config, ThreadGroup threadGroup) { super(objFactory, config); this.theThreadGroup = threadGroup; objFactory.setPool(this); } - public Object borrowObject() throws Exception { - return super.borrowObject(); + public Object borrowObject(Object key) throws Exception { + return super.borrowObject(key); } - public void returnObject(Object obj) { + public void returnObject(Object key,Object obj) { if (obj == null) return; - if (obj instanceof plasmaCrawlWorker) { + if (obj instanceof CrawlWorker) { try { - ((plasmaCrawlWorker)obj).setName(plasmaCrawlWorker.threadBaseName + "_inPool"); - super.returnObject(obj); + ((CrawlWorker)obj).setName(CrawlWorker.threadBaseName + "_inPool"); + super.returnObject(key,obj); } catch (Exception e) { - ((plasmaCrawlWorker)obj).setStopped(true); + ((CrawlWorker)obj).setStopped(true); serverLog.logSevere("CRAWLER-POOL","Unable to return crawler thread to pool.",e); } } else { - serverLog.logSevere("CRAWLER-POOL","Object of wront type '" + obj.getClass().getName() + + serverLog.logSevere("CRAWLER-POOL","Object of wrong type '" + obj.getClass().getName() + "' returned to pool."); } } - public void invalidateObject(Object obj) { + public void invalidateObject(Object key,Object obj) { if (obj == null) return; if (this.isClosed) return; - if (obj instanceof plasmaCrawlWorker) { + if (obj instanceof CrawlWorker) { try { - ((plasmaCrawlWorker)obj).setName(plasmaCrawlWorker.threadBaseName + "_invalidated"); - ((plasmaCrawlWorker)obj).setStopped(true); - super.invalidateObject(obj); + ((CrawlWorker)obj).setName(CrawlWorker.threadBaseName + "_invalidated"); + ((CrawlWorker)obj).setStopped(true); + super.invalidateObject(key,obj); } catch (Exception e) { serverLog.logSevere("CRAWLER-POOL","Unable to invalidate crawling thread.",e); } @@ -65,11 +64,11 @@ public final class plasmaCrawlerPool extends GenericObjectPool { // signaling shutdown to all still running or pooled threads ... serverLog.logInfo("CRAWLER","Signaling shutdown to " + threadCount + " remaining crawler threads ..."); for ( int currentThreadIdx = 0; currentThreadIdx < threadCount; currentThreadIdx++ ) { - ((plasmaCrawlWorker)threadList[currentThreadIdx]).setStopped(true); + ((CrawlWorker)threadList[currentThreadIdx]).setStopped(true); } // giving the crawlers some time to finish shutdown - try { Thread.sleep(500); } catch(Exception e) {} + try { Thread.sleep(500); } catch(Exception e) {/* Ignore this. Shutdown in progress */} // sending interrupted signal to all remaining threads serverLog.logInfo("CRAWLER","Sending interruption signal to " + this.theThreadGroup.activeCount() + " remaining crawler threads ..."); @@ -81,7 +80,7 @@ public final class plasmaCrawlerPool extends GenericObjectPool { Thread currentThread = threadList[currentThreadIdx]; if (currentThread.isAlive()) { serverLog.logInfo("CRAWLER","Trying to shutdown crawler thread '" + currentThread.getName() + "' [" + currentThreadIdx + "]."); - ((plasmaCrawlWorker)currentThread).close(); + ((CrawlWorker)currentThread).close(); } } @@ -90,7 +89,7 @@ public final class plasmaCrawlerPool extends GenericObjectPool { Thread currentThread = threadList[currentThreadIdx]; if (currentThread.isAlive()) { serverLog.logInfo("CRAWLER","Waiting for crawler thread '" + currentThread.getName() + "' [" + currentThreadIdx + "] to finish shutdown."); - try { currentThread.join(500); } catch (InterruptedException ex) {} + try { currentThread.join(500); } catch (InterruptedException ex) {/* Ignore this. Shutdown in progress */} } } serverLog.logWarning("CRAWLER","Shutdown of remaining crawler threads finish."); diff --git a/source/de/anomic/plasma/plasmaCrawlLoader.java b/source/de/anomic/plasma/plasmaCrawlLoader.java index 21cdb2034..1ee1f9ffc 100644 --- a/source/de/anomic/plasma/plasmaCrawlLoader.java +++ b/source/de/anomic/plasma/plasmaCrawlLoader.java @@ -44,13 +44,14 @@ package de.anomic.plasma; +import org.apache.commons.pool.impl.GenericKeyedObjectPool; import org.apache.commons.pool.impl.GenericObjectPool; import de.anomic.net.URL; -import de.anomic.plasma.crawler.plasmaCrawlWorker; import de.anomic.plasma.crawler.plasmaCrawlerFactory; import de.anomic.plasma.crawler.plasmaCrawlerMsgQueue; import de.anomic.plasma.crawler.plasmaCrawlerPool; +import de.anomic.plasma.crawler.http.CrawlWorker; import de.anomic.server.logging.serverLog; public final class plasmaCrawlLoader extends Thread { @@ -62,7 +63,7 @@ public final class plasmaCrawlLoader extends Thread { private final plasmaCrawlerMsgQueue theQueue; private final plasmaCrawlerPool crawlwerPool; - private GenericObjectPool.Config crawlerPoolConfig = null; + private GenericKeyedObjectPool.Config crawlerPoolConfig = null; private final ThreadGroup theThreadGroup = new ThreadGroup("CrawlerThreads"); private boolean stopped = false; @@ -80,7 +81,7 @@ public final class plasmaCrawlLoader extends Thread { // configuring the crawler thread pool // implementation of session thread pool - this.crawlerPoolConfig = new GenericObjectPool.Config(); + this.crawlerPoolConfig = new GenericKeyedObjectPool.Config(); // The maximum number of active connections that can be allocated from pool at the same time, // 0 for no limit @@ -89,7 +90,7 @@ public final class plasmaCrawlLoader extends Thread { // The maximum number of idle connections connections in the pool // 0 = no limit. this.crawlerPoolConfig.maxIdle = Integer.parseInt(switchboard.getConfig("crawler.MaxIdleThreads","7")); - this.crawlerPoolConfig.minIdle = Integer.parseInt(switchboard.getConfig("crawler.MinIdleThreads","5")); + //this.crawlerPoolConfig.minIdle = Integer.parseInt(switchboard.getConfig("crawler.MinIdleThreads","5")); // block undefinitely this.crawlerPoolConfig.maxWait = -1; @@ -113,11 +114,11 @@ public final class plasmaCrawlLoader extends Thread { this.start(); } - public GenericObjectPool.Config getPoolConfig() { + public GenericKeyedObjectPool.Config getPoolConfig() { return this.crawlerPoolConfig; } - public void setPoolConfig(GenericObjectPool.Config newConfig) { + public void setPoolConfig(GenericKeyedObjectPool.Config newConfig) { this.crawlwerPool.setConfig(newConfig); } @@ -148,9 +149,12 @@ public final class plasmaCrawlLoader extends Thread { // getting a new message from the crawler queue plasmaCrawlLoaderMessage theMsg = this.theQueue.waitForMessage(); + // TODO: getting the protocol of the next URL + String protocol = theMsg.url.getProtocol(); + // getting a new crawler from the crawler pool - plasmaCrawlWorker theWorker = (plasmaCrawlWorker) this.crawlwerPool.borrowObject(); - theWorker.execute(theMsg); + CrawlWorker theWorker = (CrawlWorker) this.crawlwerPool.borrowObject(protocol); + if (theWorker != null) theWorker.execute(theMsg); } catch (InterruptedException e) { Thread.interrupted(); diff --git a/source/de/anomic/plasma/plasmaParser.java b/source/de/anomic/plasma/plasmaParser.java index 2c921a78b..092d12440 100644 --- a/source/de/anomic/plasma/plasmaParser.java +++ b/source/de/anomic/plasma/plasmaParser.java @@ -904,8 +904,7 @@ final class plasmaParserFactory implements KeyedPoolableObjectFactory { final class plasmaParserPool extends GenericKeyedObjectPool { - public plasmaParserPool(plasmaParserFactory objFactory, - GenericKeyedObjectPool.Config config) { + public plasmaParserPool(plasmaParserFactory objFactory, GenericKeyedObjectPool.Config config) { super(objFactory, config); } diff --git a/source/de/anomic/plasma/plasmaSnippetCache.java b/source/de/anomic/plasma/plasmaSnippetCache.java index a528fb689..073e9cc8a 100644 --- a/source/de/anomic/plasma/plasmaSnippetCache.java +++ b/source/de/anomic/plasma/plasmaSnippetCache.java @@ -44,7 +44,7 @@ package de.anomic.plasma; import java.io.IOException; import de.anomic.net.URL; -import de.anomic.plasma.crawler.plasmaCrawlWorker; +import de.anomic.plasma.crawler.http.CrawlWorker; import java.util.Enumeration; import java.util.HashMap; @@ -419,7 +419,7 @@ public class plasmaSnippetCache { } public plasmaHTCache.Entry loadResourceFromWeb(URL url, int socketTimeout) throws IOException { - return plasmaCrawlWorker.load( + return CrawlWorker.load( url, "", null,