From 09b106eb042acc8d18c601f7c1e58a1aa3bd6cd1 Mon Sep 17 00:00:00 2001 From: theli Date: Mon, 4 Sep 2006 09:00:18 +0000 Subject: [PATCH] *) next step of restructuring for new crawlers - adding interface class (plasma/crawler/plasmaCrawlWorker.java) for protocol specific crawl-worker threads - moving reusable code into abstract crawl-worker class AbstractCrawlWorker.java - the load method of the worker threads should not be called directly anymore (e.g. by the snippet fetcher) to crawl a page and wait for the result use function plasmaCrawlLoader.loadSync([...]) git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@2474 6c8d7289-2bf4-0310-a012-ef5d649a1542 --- .../plasma/crawler/AbstractCrawlWorker.java | 214 +++++++++ .../plasma/crawler/http/CrawlWorker.java | 429 +++++------------- .../plasma/crawler/plasmaCrawlWorker.java | 20 + .../plasma/crawler/plasmaCrawlerFactory.java | 2 +- .../plasma/crawler/plasmaCrawlerPool.java | 4 +- .../de/anomic/plasma/plasmaCrawlLoader.java | 82 +++- .../plasma/plasmaCrawlLoaderMessage.java | 39 +- .../de/anomic/plasma/plasmaCrawlStacker.java | 4 +- .../de/anomic/plasma/plasmaSnippetCache.java | 24 +- .../de/anomic/plasma/plasmaSwitchboard.java | 2 +- yacy.init | 1 + 11 files changed, 485 insertions(+), 336 deletions(-) create mode 100644 source/de/anomic/plasma/crawler/AbstractCrawlWorker.java create mode 100644 source/de/anomic/plasma/crawler/plasmaCrawlWorker.java diff --git a/source/de/anomic/plasma/crawler/AbstractCrawlWorker.java b/source/de/anomic/plasma/crawler/AbstractCrawlWorker.java new file mode 100644 index 000000000..cd77a4c65 --- /dev/null +++ b/source/de/anomic/plasma/crawler/AbstractCrawlWorker.java @@ -0,0 +1,214 @@ +package de.anomic.plasma.crawler; + +import java.io.IOException; + +import de.anomic.index.indexURL; +import de.anomic.net.URL; +import de.anomic.plasma.plasmaCrawlEURL; +import de.anomic.plasma.plasmaCrawlLoaderMessage; +import de.anomic.plasma.plasmaCrawlProfile; +import de.anomic.plasma.plasmaHTCache; +import de.anomic.plasma.plasmaSwitchboard; +import de.anomic.server.logging.serverLog; +import de.anomic.tools.bitfield; +import de.anomic.yacy.yacyCore; + +public abstract class AbstractCrawlWorker extends Thread implements plasmaCrawlWorker { + + /** + * The protocol that is supported by this crawler + * e.g. http, ftp, etc. + */ + protected String protocol; + + /* ============================================================ + * Variables for thread pool management + * ============================================================ */ + public boolean destroyed = false; + protected boolean running = false; + protected boolean stopped = false; + protected boolean done = false; + + /* ============================================================ + * Crawl job specific variables + * ============================================================ */ + public plasmaCrawlLoaderMessage theMsg; + protected URL url; + protected String name; + protected String refererURLString; + protected String initiator; + protected int depth; + protected long startdate; + protected plasmaCrawlProfile.entry profile; + protected boolean acceptAllContent; + + /** + * The crawler thread pool + */ + protected final plasmaCrawlerPool myPool; + + /** + * reference to the plasma switchboard + */ + protected final plasmaSwitchboard sb; + + /** + * reference to the cache manager + */ + protected final plasmaHTCache cacheManager; + + /** + * Logging class + */ + protected final serverLog log; + + + /** + * Constructor of this class + * @param theTG the crawl worker thread group + * @param thePool the crawl worker thread pool + * @param theSb plasma switchboard + * @param theCacheManager cache manager + * @param theLog server log + */ + public AbstractCrawlWorker( + ThreadGroup theTG, + plasmaCrawlerPool thePool, + plasmaSwitchboard theSb, + plasmaHTCache theCacheManager, + serverLog theLog + ) { + super(theTG,plasmaCrawlWorker.threadBaseName + "_created"); + + this.myPool = thePool; + this.sb = theSb; + this.cacheManager = theCacheManager; + this.log = theLog; + } + + public abstract void close(); + + public void run() { + this.running = true; + + try { + // The thread keeps running. + while (!this.stopped && !this.isInterrupted() && !this.myPool.isClosed) { + if (this.done) { + synchronized (this) { + // return thread back into pool + this.myPool.returnObject(this.protocol,this); + + // We are waiting for a new task now. + if (!this.stopped && !this.destroyed && !this.isInterrupted()) { + this.wait(); + } + } + } else { + try { + // executing the new task + execute(); + } finally { + reset(); + } + } + } + } catch (InterruptedException ex) { + serverLog.logFiner("CRAWLER-POOL","Interruption of thread '" + this.getName() + "' detected."); + } finally { + if (this.myPool != null && !this.destroyed) + this.myPool.invalidateObject(this.protocol,this); + } + } + + public void execute() { + try { + // setting threadname + this.setName(plasmaCrawlWorker.threadBaseName + "_" + this.url); + + // load some configuration variables + init(); + + // loading resource + plasmaHTCache.Entry resource = load(); + + // store a reference to the result in the message object + // this is e.g. needed by the snippet fetcher + this.theMsg.setResult(resource); + + } catch (IOException e) { + //throw e; + } finally { + this.done = true; + } + } + + public void execute(plasmaCrawlLoaderMessage theNewMsg) { + synchronized (this) { + + this.theMsg = theNewMsg; + + this.url = theNewMsg.url; + this.name = theNewMsg.name; + this.refererURLString = theNewMsg.referer; + this.initiator = theNewMsg.initiator; + this.depth = theNewMsg.depth; + this.profile = theNewMsg.profile; + this.acceptAllContent = theNewMsg.acceptAllContent; + + this.startdate = System.currentTimeMillis(); + + this.done = false; + + if (!this.running) { + // if the thread is not running until yet, we need to start it now + this.start(); + } else { + // inform the thread about the new crawl job + this.notifyAll(); + } + } + } + + public void setStopped(boolean isStopped) { + this.stopped = isStopped; + } + + public boolean isRunning() { + return this.running; + } + + public void reset() { + this.theMsg = null; + this.url = null; + this.name = null; + this.refererURLString = null; + this.initiator = null; + this.depth = 0; + this.startdate = 0; + this.profile = null; + this.acceptAllContent = false; + } + + protected void addURLtoErrorDB(String failreason) { + // convert the referrer URL into a hash value + String referrerHash = (this.refererURLString==null)?null:indexURL.urlHash(this.refererURLString); + + // create a new errorURL DB entry + plasmaCrawlEURL.Entry ee = this.sb.urlPool.errorURL.newEntry( + this.url, + referrerHash, + this.initiator, + yacyCore.seedDB.mySeed.hash, + this.name, + (failreason==null)?"Unknown reason":failreason, + new bitfield(indexURL.urlFlagLength) + ); + + // store the entry + ee.store(); + + // push it onto the stack + this.sb.urlPool.errorURL.stackPushEntry(ee); + } +} diff --git a/source/de/anomic/plasma/crawler/http/CrawlWorker.java b/source/de/anomic/plasma/crawler/http/CrawlWorker.java index 25800c8b1..a1aaebc9a 100644 --- a/source/de/anomic/plasma/crawler/http/CrawlWorker.java +++ b/source/de/anomic/plasma/crawler/http/CrawlWorker.java @@ -61,59 +61,51 @@ import de.anomic.index.indexURL; import de.anomic.net.URL; import de.anomic.plasma.plasmaCrawlEURL; import de.anomic.plasma.plasmaCrawlLoader; -import de.anomic.plasma.plasmaCrawlLoaderMessage; -import de.anomic.plasma.plasmaCrawlProfile; import de.anomic.plasma.plasmaHTCache; import de.anomic.plasma.plasmaParser; import de.anomic.plasma.plasmaSwitchboard; +import de.anomic.plasma.crawler.AbstractCrawlWorker; import de.anomic.plasma.crawler.plasmaCrawlerPool; import de.anomic.plasma.urlPattern.plasmaURLPattern; import de.anomic.server.serverSystem; import de.anomic.server.logging.serverLog; -import de.anomic.tools.bitfield; -import de.anomic.yacy.yacyCore; -public final class CrawlWorker extends Thread { +public final class CrawlWorker extends AbstractCrawlWorker { - public static final int DEFAULT_CRAWLING_RETRY_COUNT = 5; - public static final String threadBaseName = "CrawlerWorker"; - - private final plasmaCrawlerPool myPool; - private final plasmaSwitchboard sb; - private final plasmaHTCache cacheManager; - private final serverLog log; + public static final int DEFAULT_CRAWLING_RETRY_COUNT = 5; + + /** + * The socket timeout that should be used + */ private int socketTimeout; - - public plasmaCrawlLoaderMessage theMsg; - private URL url; - private String name; - private String referer; - private String initiator; - private int depth; - private long startdate; - private plasmaCrawlProfile.entry profile; -// private String error; - - public boolean destroyed = false; - private boolean running = false; - private boolean stopped = false; - private boolean done = false; - + + /** + * The remote http proxy that should be used + */ + private httpRemoteProxyConfig remoteProxyConfig; + + private String acceptEncoding; + private String acceptLanguage; + private String acceptCharset; + + /** + * Constructor of this class + * @param theTG + * @param thePool + * @param theSb + * @param theCacheManager + * @param theLog + */ public CrawlWorker( ThreadGroup theTG, plasmaCrawlerPool thePool, plasmaSwitchboard theSb, plasmaHTCache theCacheManager, serverLog theLog) { - super(theTG,threadBaseName + "_created"); - - this.myPool = thePool; - this.sb = theSb; - this.cacheManager = theCacheManager; - this.log = theLog; + super(theTG,thePool,theSb,theCacheManager,theLog); - // setting the crawler timeout properly - this.socketTimeout = (int) this.sb.getConfigLong("crawler.clientTimeout", 10000); + // this crawler supports http + this.protocol = "http"; } public long getDuration() { @@ -121,189 +113,58 @@ public final class CrawlWorker extends Thread { return (startDate != 0) ? System.currentTimeMillis() - startDate : 0; } - public synchronized void execute(plasmaCrawlLoaderMessage theNewMsg) { - this.theMsg = theNewMsg; - - this.url = theNewMsg.url; - this.name = theNewMsg.name; - this.referer = theNewMsg.referer; - this.initiator = theNewMsg.initiator; - this.depth = theNewMsg.depth; - this.profile = theNewMsg.profile; - - this.startdate = System.currentTimeMillis(); -// this.error = null; - - this.done = false; - if (!this.running) { -// this.setDaemon(true); - this.start(); - } else { - this.notifyAll(); - } - } - - public void reset() { - this.theMsg = null; - this.url = null; - this.referer = null; - this.initiator = null; - this.depth = 0; - this.startdate = 0; - this.profile = null; -// this.error = null; - } - - public void run() { - this.running = true; - - try { - // The thread keeps running. - while (!this.stopped && !this.isInterrupted() && !this.myPool.isClosed) { - if (this.done) { - synchronized (this) { - // return thread back into pool - this.myPool.returnObject("http",this); - - // We are waiting for a new task now. - if (!this.stopped && !this.destroyed && !this.isInterrupted()) { - this.wait(); - } - } - } else { - try { - // executing the new task - execute(); - } finally { - reset(); - } - } - } - } catch (InterruptedException ex) { - serverLog.logFiner("CRAWLER-POOL","Interruption of thread '" + this.getName() + "' detected."); - } finally { - if (this.myPool != null && !this.destroyed) - this.myPool.invalidateObject("http",this); - } - } - - public void execute() { - try { - // setting threadname - this.setName(CrawlWorker.threadBaseName + "_" + this.url); - - // refreshing timeout value + public void init() { + // refreshing timeout value + if (this.theMsg.timeout < 0) { this.socketTimeout = (int) this.sb.getConfigLong("crawler.clientTimeout", 10000); - - // loading resource - load(this.url, - this.name, - this.referer, - this.initiator, - this.depth, - this.profile, - this.socketTimeout, - this.sb.remoteProxyConfig, - this.cacheManager, - false, - this.log - ); - - } catch (IOException e) { - //throw e; - } - finally { - this.done = true; - } - } - - public void setStopped(boolean isStopped) { - this.stopped = isStopped; - } - - public boolean isRunning() { - return this.running; - } - - public void close() { - if (this.isAlive()) { - try { - // trying to close all still open httpc-Sockets first - int closedSockets = httpc.closeOpenSockets(this); - if (closedSockets > 0) { - this.log.logInfo(closedSockets + " HTTP-client sockets of thread '" + this.getName() + "' closed."); - } - } catch (Exception e) {/* ignore this. shutdown in progress */} + } else { + this.socketTimeout = this.theMsg.timeout; } + + // some http header values + this.acceptEncoding = this.sb.getConfig("crawler.acceptEncoding", "gzip,deflate"); + this.acceptLanguage = this.sb.getConfig("crawler.acceptLanguage","en-us,en;q=0.5"); + this.acceptCharset = this.sb.getConfig("crawler.acceptCharset","ISO-8859-1,utf-8;q=0.7,*;q=0.7"); + + // getting the http proxy config + this.remoteProxyConfig = this.sb.remoteProxyConfig; } - - public static plasmaHTCache.Entry load( - URL url, - String name, - String referer, - String initiator, - int depth, - plasmaCrawlProfile.entry profile, - int socketTimeout, - httpRemoteProxyConfig theRemoteProxyConfig, - plasmaHTCache cacheManager, - boolean acceptAllContent, - serverLog log - ) throws IOException { - return load(url, - name, - referer, - initiator, - depth, - profile, - socketTimeout, - theRemoteProxyConfig, - cacheManager, - log, - DEFAULT_CRAWLING_RETRY_COUNT, - true, - acceptAllContent + public plasmaHTCache.Entry load() throws IOException { + return load(DEFAULT_CRAWLING_RETRY_COUNT); + } + + protected plasmaHTCache.Entry createCacheEntry(Date requestDate, httpHeader requestHeader, httpc.response response) { + return this.cacheManager.newEntry( + requestDate, + this.depth, + this.url, + this.name, + requestHeader, + response.status, + response.responseHeader, + this.initiator, + this.profile ); - } - - private static plasmaHTCache.Entry load( - URL url, - String name, - String refererURLString, - String initiator, - int depth, - plasmaCrawlProfile.entry profile, - int socketTimeout, - httpRemoteProxyConfig theRemoteProxyConfig, - plasmaHTCache cacheManager, - serverLog log, - int crawlingRetryCount, - boolean useContentEncodingGzip, - boolean acceptAllContent - ) throws IOException { - if (url == null) return null; + } + + private plasmaHTCache.Entry load(int crawlingRetryCount) throws IOException { // if the recrawling limit was exceeded we stop crawling now if (crawlingRetryCount <= 0) return null; - // getting a reference to the plasmaSwitchboard - plasmaSwitchboard sb = plasmaCrawlLoader.switchboard; - Date requestDate = new Date(); // remember the time... - String host = url.getHost(); - String path = url.getFile(); - int port = url.getPort(); - boolean ssl = url.getProtocol().equals("https"); + String host = this.url.getHost(); + String path = this.url.getFile(); + int port = this.url.getPort(); + boolean ssl = this.url.getProtocol().equals("https"); if (port < 0) port = (ssl) ? 443 : 80; - - refererURLString = (refererURLString == null) ? "" : refererURLString.trim(); // check if url is in blacklist String hostlow = host.toLowerCase(); if (plasmaSwitchboard.urlBlacklist.isListed(plasmaURLPattern.BLACKLIST_CRAWLER, hostlow, path)) { - log.logInfo("CRAWLER Rejecting URL '" + url.toString() + "'. URL is in blacklist."); - addURLtoErrorDB(url, refererURLString, initiator, name, plasmaCrawlEURL.DENIED_URL_IN_BLACKLIST, new bitfield(indexURL.urlFlagLength)); + this.log.logInfo("CRAWLER Rejecting URL '" + this.url.toString() + "'. URL is in blacklist."); + addURLtoErrorDB(plasmaCrawlEURL.DENIED_URL_IN_BLACKLIST); return null; } @@ -317,20 +178,22 @@ public final class CrawlWorker extends Thread { // create a request header httpHeader requestHeader = new httpHeader(); requestHeader.put(httpHeader.USER_AGENT, httpdProxyHandler.crawlerUserAgent); - requestHeader.put(httpHeader.REFERER, refererURLString); - requestHeader.put(httpHeader.ACCEPT_LANGUAGE, sb.getConfig("crawler.acceptLanguage","en-us,en;q=0.5")); - requestHeader.put(httpHeader.ACCEPT_CHARSET, sb.getConfig("crawler.acceptCharset","ISO-8859-1,utf-8;q=0.7,*;q=0.7")); - if (useContentEncodingGzip) requestHeader.put(httpHeader.ACCEPT_ENCODING, "gzip,deflate"); - -// System.out.println("CRAWLER_REQUEST_HEADER=" + requestHeader.toString()); // DEBUG + if (this.refererURLString != null && this.refererURLString.length() > 0) + requestHeader.put(httpHeader.REFERER, this.refererURLString); + if (this.acceptLanguage != null && this.acceptLanguage.length() > 0) + requestHeader.put(httpHeader.ACCEPT_LANGUAGE, this.acceptLanguage); + if (this.acceptCharset != null && this.acceptCharset.length() > 0) + requestHeader.put(httpHeader.ACCEPT_CHARSET, this.acceptCharset); + if (this.acceptEncoding != null && this.acceptEncoding.length() > 0) + requestHeader.put(httpHeader.ACCEPT_ENCODING, this.acceptEncoding); // open the connection - remote = ((theRemoteProxyConfig != null) && (theRemoteProxyConfig.useProxy())) - ? httpc.getInstance(host, host, port, socketTimeout, ssl, theRemoteProxyConfig,"CRAWLER",null) - : httpc.getInstance(host, host, port, socketTimeout, ssl, "CRAWLER",null); + remote = ((this.remoteProxyConfig != null) && (this.remoteProxyConfig.useProxy())) + ? httpc.getInstance(host, host, port, this.socketTimeout, ssl, this.remoteProxyConfig,"CRAWLER",null) + : httpc.getInstance(host, host, port, this.socketTimeout, ssl, "CRAWLER",null); // specifying if content encoding is allowed - remote.setAllowContentEncoding(useContentEncodingGzip); + remote.setAllowContentEncoding((this.acceptEncoding != null && this.acceptEncoding.length() > 0)); // send request httpc.response res = remote.GET(path, requestHeader); @@ -339,33 +202,33 @@ public final class CrawlWorker extends Thread { // the transfer is ok // create a new cache entry - htCache = cacheManager.newEntry(requestDate, depth, url, name, requestHeader, res.status, res.responseHeader, initiator, profile); + htCache = createCacheEntry(requestDate, requestHeader, res); // aborting download if content is to long ... if (htCache.cacheFile.getAbsolutePath().length() > serverSystem.maxPathLength) { remote.close(); - log.logInfo("REJECTED URL " + url.toString() + " because path too long '" + cacheManager.cachePath.getAbsolutePath() + "'"); - addURLtoErrorDB(url, refererURLString, initiator, name, plasmaCrawlEURL.DENIED_CACHEFILE_PATH_TOO_LONG, new bitfield(indexURL.urlFlagLength)); + this.log.logInfo("REJECTED URL " + this.url.toString() + " because path too long '" + this.cacheManager.cachePath.getAbsolutePath() + "'"); + addURLtoErrorDB(plasmaCrawlEURL.DENIED_CACHEFILE_PATH_TOO_LONG); return (htCache = null); } // reserve cache entry - if (!htCache.cacheFile.getCanonicalPath().startsWith(cacheManager.cachePath.getCanonicalPath())) { + if (!htCache.cacheFile.getCanonicalPath().startsWith(this.cacheManager.cachePath.getCanonicalPath())) { // if the response has not the right file type then reject file remote.close(); - log.logInfo("REJECTED URL " + url.toString() + " because of an invalid file path ('" + + this.log.logInfo("REJECTED URL " + this.url.toString() + " because of an invalid file path ('" + htCache.cacheFile.getCanonicalPath() + "' does not start with '" + - cacheManager.cachePath.getAbsolutePath() + "')."); - addURLtoErrorDB(url, refererURLString, initiator, name, plasmaCrawlEURL.DENIED_INVALID_CACHEFILE_PATH, new bitfield(indexURL.urlFlagLength)); + this.cacheManager.cachePath.getAbsolutePath() + "')."); + addURLtoErrorDB(plasmaCrawlEURL.DENIED_INVALID_CACHEFILE_PATH); return (htCache = null); } // request has been placed and result has been returned. work off response - File cacheFile = cacheManager.getCachePath(url); + File cacheFile = this.cacheManager.getCachePath(this.url); try { - if ((acceptAllContent) || (plasmaParser.supportedContent(plasmaParser.PARSER_MODE_CRAWLER,url,res.responseHeader.mime()))) { + if ((acceptAllContent) || (plasmaParser.supportedContent(plasmaParser.PARSER_MODE_CRAWLER,this.url,res.responseHeader.mime()))) { if (cacheFile.isFile()) { - cacheManager.deleteFile(url); + this.cacheManager.deleteFile(this.url); } // we write the new cache entry to file system directly cacheFile.getParentFile().mkdirs(); @@ -374,21 +237,21 @@ public final class CrawlWorker extends Thread { fos = new FileOutputStream(cacheFile); res.writeContent(fos); // superfluous write to array htCache.cacheArray = null; - cacheManager.writeFileAnnouncement(cacheFile); + this.cacheManager.writeFileAnnouncement(cacheFile); //htCache.cacheArray = res.writeContent(fos); // writes in cacheArray and cache file } finally { - if (fos!=null)try{fos.close();}catch(Exception e){} + if (fos!=null)try{fos.close();}catch(Exception e){/* ignore this */} } // enQueue new entry with response header - if (profile != null) { - cacheManager.push(htCache); + if (this.profile != null) { + this.cacheManager.push(htCache); } } else { // if the response has not the right file type then reject file remote.close(); - log.logInfo("REJECTED WRONG MIME/EXT TYPE " + res.responseHeader.mime() + " for URL " + url.toString()); - addURLtoErrorDB(url, refererURLString, initiator, name, plasmaCrawlEURL.DENIED_WRONG_MIMETYPE_OR_EXT, new bitfield(indexURL.urlFlagLength)); + this.log.logInfo("REJECTED WRONG MIME/EXT TYPE " + res.responseHeader.mime() + " for URL " + this.url.toString()); + addURLtoErrorDB(plasmaCrawlEURL.DENIED_WRONG_MIMETYPE_OR_EXT); htCache = null; } } catch (SocketException e) { @@ -398,8 +261,8 @@ public final class CrawlWorker extends Thread { // but we clean the cache also, since it may be only partial // and most possible corrupted if (cacheFile.exists()) cacheFile.delete(); - log.logSevere("CRAWLER LOADER ERROR1: with URL=" + url.toString() + ": " + e.toString()); - addURLtoErrorDB(url, refererURLString, initiator, name, plasmaCrawlEURL.DENIED_CONNECTION_ERROR, new bitfield(indexURL.urlFlagLength)); + this.log.logSevere("CRAWLER LOADER ERROR1: with URL=" + this.url.toString() + ": " + e.toString()); + addURLtoErrorDB(plasmaCrawlEURL.DENIED_CONNECTION_ERROR); htCache = null; } } else if (res.status.startsWith("30")) { @@ -410,13 +273,13 @@ public final class CrawlWorker extends Thread { redirectionUrlString = redirectionUrlString.trim(); if (redirectionUrlString.length() == 0) { - log.logWarning("CRAWLER Redirection of URL=" + url.toString() + " aborted. Location header is empty."); - addURLtoErrorDB(url, refererURLString, initiator, name, plasmaCrawlEURL.DENIED_REDIRECTION_HEADER_EMPTY, new bitfield(indexURL.urlFlagLength)); + this.log.logWarning("CRAWLER Redirection of URL=" + this.url.toString() + " aborted. Location header is empty."); + addURLtoErrorDB(plasmaCrawlEURL.DENIED_REDIRECTION_HEADER_EMPTY); return null; } // normalizing URL - redirectionUrlString = new URL(url, redirectionUrlString).toNormalform(); + redirectionUrlString = new URL(this.url, redirectionUrlString).toNormalform(); // generating the new URL object URL redirectionUrl = new URL(redirectionUrlString); @@ -426,13 +289,13 @@ public final class CrawlWorker extends Thread { remote = null; // restart crawling with new url - log.logInfo("CRAWLER Redirection detected ('" + res.status + "') for URL " + url.toString()); - log.logInfo("CRAWLER ..Redirecting request to: " + redirectionUrl); + this.log.logInfo("CRAWLER Redirection detected ('" + res.status + "') for URL " + this.url.toString()); + this.log.logInfo("CRAWLER ..Redirecting request to: " + redirectionUrl); // if we are already doing a shutdown we don't need to retry crawling if (Thread.currentThread().isInterrupted()) { - log.logSevere("CRAWLER Retry of URL=" + url.toString() + " aborted because of server shutdown."); - addURLtoErrorDB(url, refererURLString, initiator, name, plasmaCrawlEURL.DENIED_SERVER_SHUTDOWN, new bitfield(indexURL.urlFlagLength)); + this.log.logSevere("CRAWLER Retry of URL=" + this.url.toString() + " aborted because of server shutdown."); + addURLtoErrorDB(plasmaCrawlEURL.DENIED_SERVER_SHUTDOWN); return null; } @@ -443,20 +306,8 @@ public final class CrawlWorker extends Thread { plasmaCrawlLoader.switchboard.urlPool.noticeURL.remove(urlhash); // retry crawling with new url - plasmaHTCache.Entry redirectedEntry = load(redirectionUrl, - name, - refererURLString, - initiator, - depth, - profile, - socketTimeout, - theRemoteProxyConfig, - cacheManager, - log, - --crawlingRetryCount, - useContentEncodingGzip, - acceptAllContent - ); + this.url = redirectionUrl; + plasmaHTCache.Entry redirectedEntry = load(crawlingRetryCount-1); if (redirectedEntry != null) { // TODO: Here we can store the content of the redirection @@ -475,15 +326,15 @@ public final class CrawlWorker extends Thread { } } } else { - log.logInfo("Redirection counter exceeded for URL " + url.toString() + ". Processing aborted."); - addURLtoErrorDB(url, refererURLString, initiator, name, plasmaCrawlEURL.DENIED_REDIRECTION_COUNTER_EXCEEDED, new bitfield(indexURL.urlFlagLength)); + this.log.logInfo("Redirection counter exceeded for URL " + this.url.toString() + ". Processing aborted."); + addURLtoErrorDB(plasmaCrawlEURL.DENIED_REDIRECTION_COUNTER_EXCEEDED); } }else { // if the response has not the right response type then reject file - log.logInfo("REJECTED WRONG STATUS TYPE '" + res.status + "' for URL " + url.toString()); + this.log.logInfo("REJECTED WRONG STATUS TYPE '" + res.status + "' for URL " + this.url.toString()); // not processed any further - addURLtoErrorDB(url, refererURLString, initiator, name, plasmaCrawlEURL.DENIED_WRONG_HTTP_STATUSCODE + res.statusCode + ")", new bitfield(indexURL.urlFlagLength)); + addURLtoErrorDB(plasmaCrawlEURL.DENIED_WRONG_HTTP_STATUSCODE + res.statusCode + ")"); } if (remote != null) remote.close(); @@ -498,13 +349,13 @@ public final class CrawlWorker extends Thread { (errorMsg.indexOf("socket closed") >= 0) && (Thread.currentThread().isInterrupted()) ) { - log.logInfo("CRAWLER Interruption detected because of server shutdown."); + this.log.logInfo("CRAWLER Interruption detected because of server shutdown."); failreason = plasmaCrawlEURL.DENIED_SERVER_SHUTDOWN; } else if (e instanceof MalformedURLException) { - log.logWarning("CRAWLER Malformed URL '" + url.toString() + "' detected. "); + this.log.logWarning("CRAWLER Malformed URL '" + this.url.toString() + "' detected. "); failreason = plasmaCrawlEURL.DENIED_MALFORMED_URL; } else if (e instanceof NoRouteToHostException) { - log.logWarning("CRAWLER No route to host found while trying to crawl URL '" + url.toString() + "'."); + this.log.logWarning("CRAWLER No route to host found while trying to crawl URL '" + this.url.toString() + "'."); failreason = plasmaCrawlEURL.DENIED_NO_ROUTE_TO_HOST; } else if ((e instanceof UnknownHostException) || ((errorMsg != null) && (errorMsg.indexOf("unknown host") >= 0))) { @@ -571,23 +422,10 @@ public final class CrawlWorker extends Thread { if (crawlingRetryCount > 2) crawlingRetryCount = 2; // retry crawling - return load(url, - name, - refererURLString, - initiator, - depth, - profile, - socketTimeout, - theRemoteProxyConfig, - cacheManager, - log, - --crawlingRetryCount, - false, - false - ); + return load(crawlingRetryCount - 1); } if (failreason != null) { - addURLtoErrorDB(url, refererURLString, initiator, name, failreason, new bitfield(indexURL.urlFlagLength)); + addURLtoErrorDB(failreason); } return null; } finally { @@ -595,34 +433,15 @@ public final class CrawlWorker extends Thread { } } - private static void addURLtoErrorDB( - URL url, - String referrerString, - String initiator, - String name, - String failreason, - bitfield flags - ) { - // getting a reference to the plasmaSwitchboard - plasmaSwitchboard sb = plasmaCrawlLoader.switchboard; - - // convert the referrer URL into a hash value - String referrerHash = (referrerString==null)?null:indexURL.urlHash(referrerString); - - // create a new errorURL DB entry - plasmaCrawlEURL.Entry ee = sb.urlPool.errorURL.newEntry( - url, - referrerHash, - initiator, - yacyCore.seedDB.mySeed.hash, - name, - failreason, - flags - ); - // store the entry - ee.store(); - // push it onto the stack - sb.urlPool.errorURL.stackPushEntry(ee); - } - + public void close() { + if (this.isAlive()) { + try { + // trying to close all still open httpc-Sockets first + int closedSockets = httpc.closeOpenSockets(this); + if (closedSockets > 0) { + this.log.logInfo(closedSockets + " HTTP-client sockets of thread '" + this.getName() + "' closed."); + } + } catch (Exception e) {/* ignore this. shutdown in progress */} + } + } } \ No newline at end of file diff --git a/source/de/anomic/plasma/crawler/plasmaCrawlWorker.java b/source/de/anomic/plasma/crawler/plasmaCrawlWorker.java new file mode 100644 index 000000000..1fa2753f5 --- /dev/null +++ b/source/de/anomic/plasma/crawler/plasmaCrawlWorker.java @@ -0,0 +1,20 @@ +package de.anomic.plasma.crawler; + +import java.io.IOException; + +import de.anomic.plasma.plasmaCrawlLoaderMessage; +import de.anomic.plasma.plasmaHTCache; + + +public interface plasmaCrawlWorker { + + public static final String threadBaseName = "CrawlerWorker"; + + public void reset(); + public void execute(); + public void execute(plasmaCrawlLoaderMessage theNewMsg); + public void init(); + + public void close(); + public plasmaHTCache.Entry load() throws IOException; +} diff --git a/source/de/anomic/plasma/crawler/plasmaCrawlerFactory.java b/source/de/anomic/plasma/crawler/plasmaCrawlerFactory.java index cb9986ca9..dfd29d5e5 100644 --- a/source/de/anomic/plasma/crawler/plasmaCrawlerFactory.java +++ b/source/de/anomic/plasma/crawler/plasmaCrawlerFactory.java @@ -90,7 +90,7 @@ public final class plasmaCrawlerFactory implements KeyedPoolableObjectFactory { CrawlWorker theWorker = (CrawlWorker) obj; synchronized(theWorker) { theWorker.destroyed = true; - theWorker.setName(CrawlWorker.threadBaseName + "_destroyed"); + theWorker.setName(plasmaCrawlWorker.threadBaseName + "_destroyed"); theWorker.setStopped(true); theWorker.interrupt(); } diff --git a/source/de/anomic/plasma/crawler/plasmaCrawlerPool.java b/source/de/anomic/plasma/crawler/plasmaCrawlerPool.java index d15c705ff..2c0772552 100644 --- a/source/de/anomic/plasma/crawler/plasmaCrawlerPool.java +++ b/source/de/anomic/plasma/crawler/plasmaCrawlerPool.java @@ -23,7 +23,7 @@ public final class plasmaCrawlerPool extends GenericKeyedObjectPool { if (obj == null) return; if (obj instanceof CrawlWorker) { try { - ((CrawlWorker)obj).setName(CrawlWorker.threadBaseName + "_inPool"); + ((CrawlWorker)obj).setName(plasmaCrawlWorker.threadBaseName + "_inPool"); super.returnObject(key,obj); } catch (Exception e) { ((CrawlWorker)obj).setStopped(true); @@ -40,7 +40,7 @@ public final class plasmaCrawlerPool extends GenericKeyedObjectPool { if (this.isClosed) return; if (obj instanceof CrawlWorker) { try { - ((CrawlWorker)obj).setName(CrawlWorker.threadBaseName + "_invalidated"); + ((CrawlWorker)obj).setName(plasmaCrawlWorker.threadBaseName + "_invalidated"); ((CrawlWorker)obj).setStopped(true); super.invalidateObject(key,obj); } catch (Exception e) { diff --git a/source/de/anomic/plasma/plasmaCrawlLoader.java b/source/de/anomic/plasma/plasmaCrawlLoader.java index 1ee1f9ffc..1604d1ebf 100644 --- a/source/de/anomic/plasma/plasmaCrawlLoader.java +++ b/source/de/anomic/plasma/plasmaCrawlLoader.java @@ -142,6 +142,15 @@ public final class plasmaCrawlLoader extends Thread { return this.theThreadGroup; } + private void execute(plasmaCrawlLoaderMessage theMsg) throws Exception { + // getting the protocol of the next URL + String protocol = theMsg.url.getProtocol(); + + // getting a new crawler from the crawler pool + CrawlWorker theWorker = (CrawlWorker) this.crawlwerPool.borrowObject(protocol); + if (theWorker != null) theWorker.execute(theMsg); + } + public void run() { while (!this.stopped && !Thread.interrupted()) { @@ -149,12 +158,8 @@ public final class plasmaCrawlLoader extends Thread { // getting a new message from the crawler queue plasmaCrawlLoaderMessage theMsg = this.theQueue.waitForMessage(); - // TODO: getting the protocol of the next URL - String protocol = theMsg.url.getProtocol(); - - // getting a new crawler from the crawler pool - CrawlWorker theWorker = (CrawlWorker) this.crawlwerPool.borrowObject(protocol); - if (theWorker != null) theWorker.execute(theMsg); + // start new crawl job + this.execute(theMsg); } catch (InterruptedException e) { Thread.interrupted(); @@ -177,26 +182,81 @@ public final class plasmaCrawlLoader extends Thread { } } + + public plasmaHTCache.Entry loadSync( + URL url, + String urlName, + String referer, + String initiator, + int depth, + plasmaCrawlProfile.entry profile, + int timeout + ) { + + plasmaHTCache.Entry result = null; + if (!this.crawlwerPool.isClosed) { + int crawlingPriority = 5; + + // creating a new crawler queue object + plasmaCrawlLoaderMessage theMsg = new plasmaCrawlLoaderMessage( + url, + urlName, + referer, + initiator, + depth, + profile, + crawlingPriority, + true, + timeout + ); + + + try { + // start new crawl job + this.execute(theMsg); + + // wait for the crawl job result + result = theMsg.waitForResult(); + + } catch (Exception e) { + this.log.logSevere("plasmaCrawlLoader.loadSync", e); + } + } + + // return the result + return result; + } - public void loadParallel( + public void loadAsync( URL url, - String name, + String urlName, String referer, String initiator, int depth, - plasmaCrawlProfile.entry profile) { + plasmaCrawlProfile.entry profile + ) { if (!this.crawlwerPool.isClosed) { int crawlingPriority = 5; // creating a new crawler queue object - plasmaCrawlLoaderMessage theMsg = new plasmaCrawlLoaderMessage(url, name, referer, initiator, depth, profile, crawlingPriority); + plasmaCrawlLoaderMessage theMsg = new plasmaCrawlLoaderMessage( + url, // url + urlName, // url name + referer, // referer URL + initiator, // crawling initiator peer + depth, // crawling depth + profile, // crawling profile + crawlingPriority, // crawling priority + false, // only download documents whose mimetypes are enabled for the crawler + -1 // use default crawler timeout + ); // adding the message to the queue try { this.theQueue.addMessage(theMsg); } catch (InterruptedException e) { - this.log.logSevere("plasmaCrawlLoader.loadParallel", e); + this.log.logSevere("plasmaCrawlLoader.loadAsync", e); } } } diff --git a/source/de/anomic/plasma/plasmaCrawlLoaderMessage.java b/source/de/anomic/plasma/plasmaCrawlLoaderMessage.java index 5ad330d83..d79674b19 100644 --- a/source/de/anomic/plasma/plasmaCrawlLoaderMessage.java +++ b/source/de/anomic/plasma/plasmaCrawlLoaderMessage.java @@ -43,15 +43,22 @@ package de.anomic.plasma; import de.anomic.net.URL; +import de.anomic.server.serverSemaphore; public final class plasmaCrawlLoaderMessage { public final int crawlingPriority; + public final URL url; public final String name; public final String referer; public final String initiator; public final int depth; public final plasmaCrawlProfile.entry profile; + public final boolean acceptAllContent; + public final int timeout; + + private serverSemaphore resultSync = null; + private plasmaHTCache.Entry result; // loadParallel(URL url, String referer, String initiator, int depth, plasmaCrawlProfile.entry profile) { public plasmaCrawlLoaderMessage( @@ -61,7 +68,10 @@ public final class plasmaCrawlLoaderMessage { String initiator, int depth, plasmaCrawlProfile.entry profile, - int crawlingPriority) { + int crawlingPriority, + boolean acceptAllContent, + int timeout + ) { this.url = url; this.name = name; this.referer = referer; @@ -69,5 +79,32 @@ public final class plasmaCrawlLoaderMessage { this.depth = depth; this.profile = profile; this.crawlingPriority = crawlingPriority; + this.acceptAllContent = acceptAllContent; + this.timeout = timeout; + + this.resultSync = new serverSemaphore(0); + this.result = null; } + + public void setResult(plasmaHTCache.Entry theResult) { + // store the result + this.result = theResult; + + // notify blocking result readers + this.resultSync.V(); + } + + public plasmaHTCache.Entry waitForResult() throws InterruptedException { + plasmaHTCache.Entry theResult = null; + + this.resultSync.P(); + /* =====> CRITICAL SECTION <======== */ + + theResult = this.result; + + /* =====> CRITICAL SECTION <======== */ + this.resultSync.V(); + + return theResult; + } } \ No newline at end of file diff --git a/source/de/anomic/plasma/plasmaCrawlStacker.java b/source/de/anomic/plasma/plasmaCrawlStacker.java index 5af0710ad..42fef71d8 100644 --- a/source/de/anomic/plasma/plasmaCrawlStacker.java +++ b/source/de/anomic/plasma/plasmaCrawlStacker.java @@ -50,7 +50,6 @@ import java.io.File; import java.io.IOException; import java.net.InetAddress; import java.net.MalformedURLException; -import de.anomic.net.URL; import java.util.Date; import java.util.Iterator; import java.util.LinkedList; @@ -66,10 +65,9 @@ import de.anomic.kelondro.kelondroFlexTable; import de.anomic.kelondro.kelondroIndex; import de.anomic.kelondro.kelondroRow; import de.anomic.kelondro.kelondroTree; -import de.anomic.plasma.plasmaCrawlEURL; +import de.anomic.net.URL; import de.anomic.plasma.urlPattern.plasmaURLPattern; import de.anomic.server.serverSemaphore; -import de.anomic.server.serverThread; import de.anomic.server.logging.serverLog; import de.anomic.tools.bitfield; import de.anomic.yacy.yacyCore; diff --git a/source/de/anomic/plasma/plasmaSnippetCache.java b/source/de/anomic/plasma/plasmaSnippetCache.java index 073e9cc8a..2ebdc5213 100644 --- a/source/de/anomic/plasma/plasmaSnippetCache.java +++ b/source/de/anomic/plasma/plasmaSnippetCache.java @@ -419,18 +419,18 @@ public class plasmaSnippetCache { } public plasmaHTCache.Entry loadResourceFromWeb(URL url, int socketTimeout) throws IOException { - return CrawlWorker.load( - url, - "", - null, - null, - 0, - null, - socketTimeout, - this.sb.remoteProxyConfig, - this.cacheManager, - true, - this.log); + + plasmaHTCache.Entry result = this.sb.cacheLoader.loadSync( + url, + "", + null, + null, + 0, + null, + socketTimeout + ); + + return result; } public void fetch(plasmaSearchResult acc, Set queryhashes, String urlmask, int fetchcount, long maxTime) { diff --git a/source/de/anomic/plasma/plasmaSwitchboard.java b/source/de/anomic/plasma/plasmaSwitchboard.java index a333a7406..f10a893aa 100644 --- a/source/de/anomic/plasma/plasmaSwitchboard.java +++ b/source/de/anomic/plasma/plasmaSwitchboard.java @@ -1874,7 +1874,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser } catch (IOException e) { refererURL = null; } - cacheLoader.loadParallel(urlEntry.url(), urlEntry.name(), (refererURL!=null)?refererURL.toString():null, urlEntry.initiator(), urlEntry.depth(), profile); + cacheLoader.loadAsync(urlEntry.url(), urlEntry.name(), (refererURL!=null)?refererURL.toString():null, urlEntry.initiator(), urlEntry.depth(), profile); log.logInfo(stats + ": enqueued for load " + urlEntry.url() + " [" + urlEntry.hash() + "]"); return; } diff --git a/yacy.init b/yacy.init index 2e81d3943..a9f34757a 100644 --- a/yacy.init +++ b/yacy.init @@ -631,6 +631,7 @@ msgForwardingTo=root@localhost onlineCautionDelay=30000 # Some configuration values for the crawler +crawler.acceptEncoding=gzip,deflate crawler.acceptLanguage=en-us,en;q=0.5 crawler.acceptCharset=ISO-8859-1,utf-8;q=0.7,*;q=0.7 crawler.clientTimeout=9000