diff --git a/source/de/anomic/plasma/crawler/AbstractCrawlWorker.java b/source/de/anomic/plasma/crawler/AbstractCrawlWorker.java
new file mode 100644
index 000000000..cd77a4c65
--- /dev/null
+++ b/source/de/anomic/plasma/crawler/AbstractCrawlWorker.java
@@ -0,0 +1,214 @@
+package de.anomic.plasma.crawler;
+
+import java.io.IOException;
+
+import de.anomic.index.indexURL;
+import de.anomic.net.URL;
+import de.anomic.plasma.plasmaCrawlEURL;
+import de.anomic.plasma.plasmaCrawlLoaderMessage;
+import de.anomic.plasma.plasmaCrawlProfile;
+import de.anomic.plasma.plasmaHTCache;
+import de.anomic.plasma.plasmaSwitchboard;
+import de.anomic.server.logging.serverLog;
+import de.anomic.tools.bitfield;
+import de.anomic.yacy.yacyCore;
+
+public abstract class AbstractCrawlWorker extends Thread implements plasmaCrawlWorker {
+
+ /**
+ * The protocol that is supported by this crawler
+ * e.g. http
, ftp
, etc.
+ */
+ protected String protocol;
+
+ /* ============================================================
+ * Variables for thread pool management
+ * ============================================================ */
+ public boolean destroyed = false;
+ protected boolean running = false;
+ protected boolean stopped = false;
+ protected boolean done = false;
+
+ /* ============================================================
+ * Crawl job specific variables
+ * ============================================================ */
+ public plasmaCrawlLoaderMessage theMsg;
+ protected URL url;
+ protected String name;
+ protected String refererURLString;
+ protected String initiator;
+ protected int depth;
+ protected long startdate;
+ protected plasmaCrawlProfile.entry profile;
+ protected boolean acceptAllContent;
+
+ /**
+ * The crawler thread pool
+ */
+ protected final plasmaCrawlerPool myPool;
+
+ /**
+ * reference to the plasma switchboard
+ */
+ protected final plasmaSwitchboard sb;
+
+ /**
+ * reference to the cache manager
+ */
+ protected final plasmaHTCache cacheManager;
+
+ /**
+ * Logging class
+ */
+ protected final serverLog log;
+
+
+ /**
+ * Constructor of this class
+ * @param theTG the crawl worker thread group
+ * @param thePool the crawl worker thread pool
+ * @param theSb plasma switchboard
+ * @param theCacheManager cache manager
+ * @param theLog server log
+ */
+ public AbstractCrawlWorker(
+ ThreadGroup theTG,
+ plasmaCrawlerPool thePool,
+ plasmaSwitchboard theSb,
+ plasmaHTCache theCacheManager,
+ serverLog theLog
+ ) {
+ super(theTG,plasmaCrawlWorker.threadBaseName + "_created");
+
+ this.myPool = thePool;
+ this.sb = theSb;
+ this.cacheManager = theCacheManager;
+ this.log = theLog;
+ }
+
+ public abstract void close();
+
+ public void run() {
+ this.running = true;
+
+ try {
+ // The thread keeps running.
+ while (!this.stopped && !this.isInterrupted() && !this.myPool.isClosed) {
+ if (this.done) {
+ synchronized (this) {
+ // return thread back into pool
+ this.myPool.returnObject(this.protocol,this);
+
+ // We are waiting for a new task now.
+ if (!this.stopped && !this.destroyed && !this.isInterrupted()) {
+ this.wait();
+ }
+ }
+ } else {
+ try {
+ // executing the new task
+ execute();
+ } finally {
+ reset();
+ }
+ }
+ }
+ } catch (InterruptedException ex) {
+ serverLog.logFiner("CRAWLER-POOL","Interruption of thread '" + this.getName() + "' detected.");
+ } finally {
+ if (this.myPool != null && !this.destroyed)
+ this.myPool.invalidateObject(this.protocol,this);
+ }
+ }
+
+ public void execute() {
+ try {
+ // setting threadname
+ this.setName(plasmaCrawlWorker.threadBaseName + "_" + this.url);
+
+ // load some configuration variables
+ init();
+
+ // loading resource
+ plasmaHTCache.Entry resource = load();
+
+ // store a reference to the result in the message object
+ // this is e.g. needed by the snippet fetcher
+ this.theMsg.setResult(resource);
+
+ } catch (IOException e) {
+ //throw e;
+ } finally {
+ this.done = true;
+ }
+ }
+
+ public void execute(plasmaCrawlLoaderMessage theNewMsg) {
+ synchronized (this) {
+
+ this.theMsg = theNewMsg;
+
+ this.url = theNewMsg.url;
+ this.name = theNewMsg.name;
+ this.refererURLString = theNewMsg.referer;
+ this.initiator = theNewMsg.initiator;
+ this.depth = theNewMsg.depth;
+ this.profile = theNewMsg.profile;
+ this.acceptAllContent = theNewMsg.acceptAllContent;
+
+ this.startdate = System.currentTimeMillis();
+
+ this.done = false;
+
+ if (!this.running) {
+ // if the thread is not running until yet, we need to start it now
+ this.start();
+ } else {
+ // inform the thread about the new crawl job
+ this.notifyAll();
+ }
+ }
+ }
+
+ public void setStopped(boolean isStopped) {
+ this.stopped = isStopped;
+ }
+
+ public boolean isRunning() {
+ return this.running;
+ }
+
+ public void reset() {
+ this.theMsg = null;
+ this.url = null;
+ this.name = null;
+ this.refererURLString = null;
+ this.initiator = null;
+ this.depth = 0;
+ this.startdate = 0;
+ this.profile = null;
+ this.acceptAllContent = false;
+ }
+
+ protected void addURLtoErrorDB(String failreason) {
+ // convert the referrer URL into a hash value
+ String referrerHash = (this.refererURLString==null)?null:indexURL.urlHash(this.refererURLString);
+
+ // create a new errorURL DB entry
+ plasmaCrawlEURL.Entry ee = this.sb.urlPool.errorURL.newEntry(
+ this.url,
+ referrerHash,
+ this.initiator,
+ yacyCore.seedDB.mySeed.hash,
+ this.name,
+ (failreason==null)?"Unknown reason":failreason,
+ new bitfield(indexURL.urlFlagLength)
+ );
+
+ // store the entry
+ ee.store();
+
+ // push it onto the stack
+ this.sb.urlPool.errorURL.stackPushEntry(ee);
+ }
+}
diff --git a/source/de/anomic/plasma/crawler/http/CrawlWorker.java b/source/de/anomic/plasma/crawler/http/CrawlWorker.java
index 25800c8b1..a1aaebc9a 100644
--- a/source/de/anomic/plasma/crawler/http/CrawlWorker.java
+++ b/source/de/anomic/plasma/crawler/http/CrawlWorker.java
@@ -61,59 +61,51 @@ import de.anomic.index.indexURL;
import de.anomic.net.URL;
import de.anomic.plasma.plasmaCrawlEURL;
import de.anomic.plasma.plasmaCrawlLoader;
-import de.anomic.plasma.plasmaCrawlLoaderMessage;
-import de.anomic.plasma.plasmaCrawlProfile;
import de.anomic.plasma.plasmaHTCache;
import de.anomic.plasma.plasmaParser;
import de.anomic.plasma.plasmaSwitchboard;
+import de.anomic.plasma.crawler.AbstractCrawlWorker;
import de.anomic.plasma.crawler.plasmaCrawlerPool;
import de.anomic.plasma.urlPattern.plasmaURLPattern;
import de.anomic.server.serverSystem;
import de.anomic.server.logging.serverLog;
-import de.anomic.tools.bitfield;
-import de.anomic.yacy.yacyCore;
-public final class CrawlWorker extends Thread {
+public final class CrawlWorker extends AbstractCrawlWorker {
- public static final int DEFAULT_CRAWLING_RETRY_COUNT = 5;
- public static final String threadBaseName = "CrawlerWorker";
-
- private final plasmaCrawlerPool myPool;
- private final plasmaSwitchboard sb;
- private final plasmaHTCache cacheManager;
- private final serverLog log;
+ public static final int DEFAULT_CRAWLING_RETRY_COUNT = 5;
+
+ /**
+ * The socket timeout that should be used
+ */
private int socketTimeout;
-
- public plasmaCrawlLoaderMessage theMsg;
- private URL url;
- private String name;
- private String referer;
- private String initiator;
- private int depth;
- private long startdate;
- private plasmaCrawlProfile.entry profile;
-// private String error;
-
- public boolean destroyed = false;
- private boolean running = false;
- private boolean stopped = false;
- private boolean done = false;
-
+
+ /**
+ * The remote http proxy that should be used
+ */
+ private httpRemoteProxyConfig remoteProxyConfig;
+
+ private String acceptEncoding;
+ private String acceptLanguage;
+ private String acceptCharset;
+
+ /**
+ * Constructor of this class
+ * @param theTG
+ * @param thePool
+ * @param theSb
+ * @param theCacheManager
+ * @param theLog
+ */
public CrawlWorker(
ThreadGroup theTG,
plasmaCrawlerPool thePool,
plasmaSwitchboard theSb,
plasmaHTCache theCacheManager,
serverLog theLog) {
- super(theTG,threadBaseName + "_created");
-
- this.myPool = thePool;
- this.sb = theSb;
- this.cacheManager = theCacheManager;
- this.log = theLog;
+ super(theTG,thePool,theSb,theCacheManager,theLog);
- // setting the crawler timeout properly
- this.socketTimeout = (int) this.sb.getConfigLong("crawler.clientTimeout", 10000);
+ // this crawler supports http
+ this.protocol = "http";
}
public long getDuration() {
@@ -121,189 +113,58 @@ public final class CrawlWorker extends Thread {
return (startDate != 0) ? System.currentTimeMillis() - startDate : 0;
}
- public synchronized void execute(plasmaCrawlLoaderMessage theNewMsg) {
- this.theMsg = theNewMsg;
-
- this.url = theNewMsg.url;
- this.name = theNewMsg.name;
- this.referer = theNewMsg.referer;
- this.initiator = theNewMsg.initiator;
- this.depth = theNewMsg.depth;
- this.profile = theNewMsg.profile;
-
- this.startdate = System.currentTimeMillis();
-// this.error = null;
-
- this.done = false;
- if (!this.running) {
-// this.setDaemon(true);
- this.start();
- } else {
- this.notifyAll();
- }
- }
-
- public void reset() {
- this.theMsg = null;
- this.url = null;
- this.referer = null;
- this.initiator = null;
- this.depth = 0;
- this.startdate = 0;
- this.profile = null;
-// this.error = null;
- }
-
- public void run() {
- this.running = true;
-
- try {
- // The thread keeps running.
- while (!this.stopped && !this.isInterrupted() && !this.myPool.isClosed) {
- if (this.done) {
- synchronized (this) {
- // return thread back into pool
- this.myPool.returnObject("http",this);
-
- // We are waiting for a new task now.
- if (!this.stopped && !this.destroyed && !this.isInterrupted()) {
- this.wait();
- }
- }
- } else {
- try {
- // executing the new task
- execute();
- } finally {
- reset();
- }
- }
- }
- } catch (InterruptedException ex) {
- serverLog.logFiner("CRAWLER-POOL","Interruption of thread '" + this.getName() + "' detected.");
- } finally {
- if (this.myPool != null && !this.destroyed)
- this.myPool.invalidateObject("http",this);
- }
- }
-
- public void execute() {
- try {
- // setting threadname
- this.setName(CrawlWorker.threadBaseName + "_" + this.url);
-
- // refreshing timeout value
+ public void init() {
+ // refreshing timeout value
+ if (this.theMsg.timeout < 0) {
this.socketTimeout = (int) this.sb.getConfigLong("crawler.clientTimeout", 10000);
-
- // loading resource
- load(this.url,
- this.name,
- this.referer,
- this.initiator,
- this.depth,
- this.profile,
- this.socketTimeout,
- this.sb.remoteProxyConfig,
- this.cacheManager,
- false,
- this.log
- );
-
- } catch (IOException e) {
- //throw e;
- }
- finally {
- this.done = true;
- }
- }
-
- public void setStopped(boolean isStopped) {
- this.stopped = isStopped;
- }
-
- public boolean isRunning() {
- return this.running;
- }
-
- public void close() {
- if (this.isAlive()) {
- try {
- // trying to close all still open httpc-Sockets first
- int closedSockets = httpc.closeOpenSockets(this);
- if (closedSockets > 0) {
- this.log.logInfo(closedSockets + " HTTP-client sockets of thread '" + this.getName() + "' closed.");
- }
- } catch (Exception e) {/* ignore this. shutdown in progress */}
+ } else {
+ this.socketTimeout = this.theMsg.timeout;
}
+
+ // some http header values
+ this.acceptEncoding = this.sb.getConfig("crawler.acceptEncoding", "gzip,deflate");
+ this.acceptLanguage = this.sb.getConfig("crawler.acceptLanguage","en-us,en;q=0.5");
+ this.acceptCharset = this.sb.getConfig("crawler.acceptCharset","ISO-8859-1,utf-8;q=0.7,*;q=0.7");
+
+ // getting the http proxy config
+ this.remoteProxyConfig = this.sb.remoteProxyConfig;
}
-
- public static plasmaHTCache.Entry load(
- URL url,
- String name,
- String referer,
- String initiator,
- int depth,
- plasmaCrawlProfile.entry profile,
- int socketTimeout,
- httpRemoteProxyConfig theRemoteProxyConfig,
- plasmaHTCache cacheManager,
- boolean acceptAllContent,
- serverLog log
- ) throws IOException {
- return load(url,
- name,
- referer,
- initiator,
- depth,
- profile,
- socketTimeout,
- theRemoteProxyConfig,
- cacheManager,
- log,
- DEFAULT_CRAWLING_RETRY_COUNT,
- true,
- acceptAllContent
+ public plasmaHTCache.Entry load() throws IOException {
+ return load(DEFAULT_CRAWLING_RETRY_COUNT);
+ }
+
+ protected plasmaHTCache.Entry createCacheEntry(Date requestDate, httpHeader requestHeader, httpc.response response) {
+ return this.cacheManager.newEntry(
+ requestDate,
+ this.depth,
+ this.url,
+ this.name,
+ requestHeader,
+ response.status,
+ response.responseHeader,
+ this.initiator,
+ this.profile
);
- }
-
- private static plasmaHTCache.Entry load(
- URL url,
- String name,
- String refererURLString,
- String initiator,
- int depth,
- plasmaCrawlProfile.entry profile,
- int socketTimeout,
- httpRemoteProxyConfig theRemoteProxyConfig,
- plasmaHTCache cacheManager,
- serverLog log,
- int crawlingRetryCount,
- boolean useContentEncodingGzip,
- boolean acceptAllContent
- ) throws IOException {
- if (url == null) return null;
+ }
+
+ private plasmaHTCache.Entry load(int crawlingRetryCount) throws IOException {
// if the recrawling limit was exceeded we stop crawling now
if (crawlingRetryCount <= 0) return null;
- // getting a reference to the plasmaSwitchboard
- plasmaSwitchboard sb = plasmaCrawlLoader.switchboard;
-
Date requestDate = new Date(); // remember the time...
- String host = url.getHost();
- String path = url.getFile();
- int port = url.getPort();
- boolean ssl = url.getProtocol().equals("https");
+ String host = this.url.getHost();
+ String path = this.url.getFile();
+ int port = this.url.getPort();
+ boolean ssl = this.url.getProtocol().equals("https");
if (port < 0) port = (ssl) ? 443 : 80;
-
- refererURLString = (refererURLString == null) ? "" : refererURLString.trim();
// check if url is in blacklist
String hostlow = host.toLowerCase();
if (plasmaSwitchboard.urlBlacklist.isListed(plasmaURLPattern.BLACKLIST_CRAWLER, hostlow, path)) {
- log.logInfo("CRAWLER Rejecting URL '" + url.toString() + "'. URL is in blacklist.");
- addURLtoErrorDB(url, refererURLString, initiator, name, plasmaCrawlEURL.DENIED_URL_IN_BLACKLIST, new bitfield(indexURL.urlFlagLength));
+ this.log.logInfo("CRAWLER Rejecting URL '" + this.url.toString() + "'. URL is in blacklist.");
+ addURLtoErrorDB(plasmaCrawlEURL.DENIED_URL_IN_BLACKLIST);
return null;
}
@@ -317,20 +178,22 @@ public final class CrawlWorker extends Thread {
// create a request header
httpHeader requestHeader = new httpHeader();
requestHeader.put(httpHeader.USER_AGENT, httpdProxyHandler.crawlerUserAgent);
- requestHeader.put(httpHeader.REFERER, refererURLString);
- requestHeader.put(httpHeader.ACCEPT_LANGUAGE, sb.getConfig("crawler.acceptLanguage","en-us,en;q=0.5"));
- requestHeader.put(httpHeader.ACCEPT_CHARSET, sb.getConfig("crawler.acceptCharset","ISO-8859-1,utf-8;q=0.7,*;q=0.7"));
- if (useContentEncodingGzip) requestHeader.put(httpHeader.ACCEPT_ENCODING, "gzip,deflate");
-
-// System.out.println("CRAWLER_REQUEST_HEADER=" + requestHeader.toString()); // DEBUG
+ if (this.refererURLString != null && this.refererURLString.length() > 0)
+ requestHeader.put(httpHeader.REFERER, this.refererURLString);
+ if (this.acceptLanguage != null && this.acceptLanguage.length() > 0)
+ requestHeader.put(httpHeader.ACCEPT_LANGUAGE, this.acceptLanguage);
+ if (this.acceptCharset != null && this.acceptCharset.length() > 0)
+ requestHeader.put(httpHeader.ACCEPT_CHARSET, this.acceptCharset);
+ if (this.acceptEncoding != null && this.acceptEncoding.length() > 0)
+ requestHeader.put(httpHeader.ACCEPT_ENCODING, this.acceptEncoding);
// open the connection
- remote = ((theRemoteProxyConfig != null) && (theRemoteProxyConfig.useProxy()))
- ? httpc.getInstance(host, host, port, socketTimeout, ssl, theRemoteProxyConfig,"CRAWLER",null)
- : httpc.getInstance(host, host, port, socketTimeout, ssl, "CRAWLER",null);
+ remote = ((this.remoteProxyConfig != null) && (this.remoteProxyConfig.useProxy()))
+ ? httpc.getInstance(host, host, port, this.socketTimeout, ssl, this.remoteProxyConfig,"CRAWLER",null)
+ : httpc.getInstance(host, host, port, this.socketTimeout, ssl, "CRAWLER",null);
// specifying if content encoding is allowed
- remote.setAllowContentEncoding(useContentEncodingGzip);
+ remote.setAllowContentEncoding((this.acceptEncoding != null && this.acceptEncoding.length() > 0));
// send request
httpc.response res = remote.GET(path, requestHeader);
@@ -339,33 +202,33 @@ public final class CrawlWorker extends Thread {
// the transfer is ok
// create a new cache entry
- htCache = cacheManager.newEntry(requestDate, depth, url, name, requestHeader, res.status, res.responseHeader, initiator, profile);
+ htCache = createCacheEntry(requestDate, requestHeader, res);
// aborting download if content is to long ...
if (htCache.cacheFile.getAbsolutePath().length() > serverSystem.maxPathLength) {
remote.close();
- log.logInfo("REJECTED URL " + url.toString() + " because path too long '" + cacheManager.cachePath.getAbsolutePath() + "'");
- addURLtoErrorDB(url, refererURLString, initiator, name, plasmaCrawlEURL.DENIED_CACHEFILE_PATH_TOO_LONG, new bitfield(indexURL.urlFlagLength));
+ this.log.logInfo("REJECTED URL " + this.url.toString() + " because path too long '" + this.cacheManager.cachePath.getAbsolutePath() + "'");
+ addURLtoErrorDB(plasmaCrawlEURL.DENIED_CACHEFILE_PATH_TOO_LONG);
return (htCache = null);
}
// reserve cache entry
- if (!htCache.cacheFile.getCanonicalPath().startsWith(cacheManager.cachePath.getCanonicalPath())) {
+ if (!htCache.cacheFile.getCanonicalPath().startsWith(this.cacheManager.cachePath.getCanonicalPath())) {
// if the response has not the right file type then reject file
remote.close();
- log.logInfo("REJECTED URL " + url.toString() + " because of an invalid file path ('" +
+ this.log.logInfo("REJECTED URL " + this.url.toString() + " because of an invalid file path ('" +
htCache.cacheFile.getCanonicalPath() + "' does not start with '" +
- cacheManager.cachePath.getAbsolutePath() + "').");
- addURLtoErrorDB(url, refererURLString, initiator, name, plasmaCrawlEURL.DENIED_INVALID_CACHEFILE_PATH, new bitfield(indexURL.urlFlagLength));
+ this.cacheManager.cachePath.getAbsolutePath() + "').");
+ addURLtoErrorDB(plasmaCrawlEURL.DENIED_INVALID_CACHEFILE_PATH);
return (htCache = null);
}
// request has been placed and result has been returned. work off response
- File cacheFile = cacheManager.getCachePath(url);
+ File cacheFile = this.cacheManager.getCachePath(this.url);
try {
- if ((acceptAllContent) || (plasmaParser.supportedContent(plasmaParser.PARSER_MODE_CRAWLER,url,res.responseHeader.mime()))) {
+ if ((acceptAllContent) || (plasmaParser.supportedContent(plasmaParser.PARSER_MODE_CRAWLER,this.url,res.responseHeader.mime()))) {
if (cacheFile.isFile()) {
- cacheManager.deleteFile(url);
+ this.cacheManager.deleteFile(this.url);
}
// we write the new cache entry to file system directly
cacheFile.getParentFile().mkdirs();
@@ -374,21 +237,21 @@ public final class CrawlWorker extends Thread {
fos = new FileOutputStream(cacheFile);
res.writeContent(fos); // superfluous write to array
htCache.cacheArray = null;
- cacheManager.writeFileAnnouncement(cacheFile);
+ this.cacheManager.writeFileAnnouncement(cacheFile);
//htCache.cacheArray = res.writeContent(fos); // writes in cacheArray and cache file
} finally {
- if (fos!=null)try{fos.close();}catch(Exception e){}
+ if (fos!=null)try{fos.close();}catch(Exception e){/* ignore this */}
}
// enQueue new entry with response header
- if (profile != null) {
- cacheManager.push(htCache);
+ if (this.profile != null) {
+ this.cacheManager.push(htCache);
}
} else {
// if the response has not the right file type then reject file
remote.close();
- log.logInfo("REJECTED WRONG MIME/EXT TYPE " + res.responseHeader.mime() + " for URL " + url.toString());
- addURLtoErrorDB(url, refererURLString, initiator, name, plasmaCrawlEURL.DENIED_WRONG_MIMETYPE_OR_EXT, new bitfield(indexURL.urlFlagLength));
+ this.log.logInfo("REJECTED WRONG MIME/EXT TYPE " + res.responseHeader.mime() + " for URL " + this.url.toString());
+ addURLtoErrorDB(plasmaCrawlEURL.DENIED_WRONG_MIMETYPE_OR_EXT);
htCache = null;
}
} catch (SocketException e) {
@@ -398,8 +261,8 @@ public final class CrawlWorker extends Thread {
// but we clean the cache also, since it may be only partial
// and most possible corrupted
if (cacheFile.exists()) cacheFile.delete();
- log.logSevere("CRAWLER LOADER ERROR1: with URL=" + url.toString() + ": " + e.toString());
- addURLtoErrorDB(url, refererURLString, initiator, name, plasmaCrawlEURL.DENIED_CONNECTION_ERROR, new bitfield(indexURL.urlFlagLength));
+ this.log.logSevere("CRAWLER LOADER ERROR1: with URL=" + this.url.toString() + ": " + e.toString());
+ addURLtoErrorDB(plasmaCrawlEURL.DENIED_CONNECTION_ERROR);
htCache = null;
}
} else if (res.status.startsWith("30")) {
@@ -410,13 +273,13 @@ public final class CrawlWorker extends Thread {
redirectionUrlString = redirectionUrlString.trim();
if (redirectionUrlString.length() == 0) {
- log.logWarning("CRAWLER Redirection of URL=" + url.toString() + " aborted. Location header is empty.");
- addURLtoErrorDB(url, refererURLString, initiator, name, plasmaCrawlEURL.DENIED_REDIRECTION_HEADER_EMPTY, new bitfield(indexURL.urlFlagLength));
+ this.log.logWarning("CRAWLER Redirection of URL=" + this.url.toString() + " aborted. Location header is empty.");
+ addURLtoErrorDB(plasmaCrawlEURL.DENIED_REDIRECTION_HEADER_EMPTY);
return null;
}
// normalizing URL
- redirectionUrlString = new URL(url, redirectionUrlString).toNormalform();
+ redirectionUrlString = new URL(this.url, redirectionUrlString).toNormalform();
// generating the new URL object
URL redirectionUrl = new URL(redirectionUrlString);
@@ -426,13 +289,13 @@ public final class CrawlWorker extends Thread {
remote = null;
// restart crawling with new url
- log.logInfo("CRAWLER Redirection detected ('" + res.status + "') for URL " + url.toString());
- log.logInfo("CRAWLER ..Redirecting request to: " + redirectionUrl);
+ this.log.logInfo("CRAWLER Redirection detected ('" + res.status + "') for URL " + this.url.toString());
+ this.log.logInfo("CRAWLER ..Redirecting request to: " + redirectionUrl);
// if we are already doing a shutdown we don't need to retry crawling
if (Thread.currentThread().isInterrupted()) {
- log.logSevere("CRAWLER Retry of URL=" + url.toString() + " aborted because of server shutdown.");
- addURLtoErrorDB(url, refererURLString, initiator, name, plasmaCrawlEURL.DENIED_SERVER_SHUTDOWN, new bitfield(indexURL.urlFlagLength));
+ this.log.logSevere("CRAWLER Retry of URL=" + this.url.toString() + " aborted because of server shutdown.");
+ addURLtoErrorDB(plasmaCrawlEURL.DENIED_SERVER_SHUTDOWN);
return null;
}
@@ -443,20 +306,8 @@ public final class CrawlWorker extends Thread {
plasmaCrawlLoader.switchboard.urlPool.noticeURL.remove(urlhash);
// retry crawling with new url
- plasmaHTCache.Entry redirectedEntry = load(redirectionUrl,
- name,
- refererURLString,
- initiator,
- depth,
- profile,
- socketTimeout,
- theRemoteProxyConfig,
- cacheManager,
- log,
- --crawlingRetryCount,
- useContentEncodingGzip,
- acceptAllContent
- );
+ this.url = redirectionUrl;
+ plasmaHTCache.Entry redirectedEntry = load(crawlingRetryCount-1);
if (redirectedEntry != null) {
// TODO: Here we can store the content of the redirection
@@ -475,15 +326,15 @@ public final class CrawlWorker extends Thread {
}
}
} else {
- log.logInfo("Redirection counter exceeded for URL " + url.toString() + ". Processing aborted.");
- addURLtoErrorDB(url, refererURLString, initiator, name, plasmaCrawlEURL.DENIED_REDIRECTION_COUNTER_EXCEEDED, new bitfield(indexURL.urlFlagLength));
+ this.log.logInfo("Redirection counter exceeded for URL " + this.url.toString() + ". Processing aborted.");
+ addURLtoErrorDB(plasmaCrawlEURL.DENIED_REDIRECTION_COUNTER_EXCEEDED);
}
}else {
// if the response has not the right response type then reject file
- log.logInfo("REJECTED WRONG STATUS TYPE '" + res.status + "' for URL " + url.toString());
+ this.log.logInfo("REJECTED WRONG STATUS TYPE '" + res.status + "' for URL " + this.url.toString());
// not processed any further
- addURLtoErrorDB(url, refererURLString, initiator, name, plasmaCrawlEURL.DENIED_WRONG_HTTP_STATUSCODE + res.statusCode + ")", new bitfield(indexURL.urlFlagLength));
+ addURLtoErrorDB(plasmaCrawlEURL.DENIED_WRONG_HTTP_STATUSCODE + res.statusCode + ")");
}
if (remote != null) remote.close();
@@ -498,13 +349,13 @@ public final class CrawlWorker extends Thread {
(errorMsg.indexOf("socket closed") >= 0) &&
(Thread.currentThread().isInterrupted())
) {
- log.logInfo("CRAWLER Interruption detected because of server shutdown.");
+ this.log.logInfo("CRAWLER Interruption detected because of server shutdown.");
failreason = plasmaCrawlEURL.DENIED_SERVER_SHUTDOWN;
} else if (e instanceof MalformedURLException) {
- log.logWarning("CRAWLER Malformed URL '" + url.toString() + "' detected. ");
+ this.log.logWarning("CRAWLER Malformed URL '" + this.url.toString() + "' detected. ");
failreason = plasmaCrawlEURL.DENIED_MALFORMED_URL;
} else if (e instanceof NoRouteToHostException) {
- log.logWarning("CRAWLER No route to host found while trying to crawl URL '" + url.toString() + "'.");
+ this.log.logWarning("CRAWLER No route to host found while trying to crawl URL '" + this.url.toString() + "'.");
failreason = plasmaCrawlEURL.DENIED_NO_ROUTE_TO_HOST;
} else if ((e instanceof UnknownHostException) ||
((errorMsg != null) && (errorMsg.indexOf("unknown host") >= 0))) {
@@ -571,23 +422,10 @@ public final class CrawlWorker extends Thread {
if (crawlingRetryCount > 2) crawlingRetryCount = 2;
// retry crawling
- return load(url,
- name,
- refererURLString,
- initiator,
- depth,
- profile,
- socketTimeout,
- theRemoteProxyConfig,
- cacheManager,
- log,
- --crawlingRetryCount,
- false,
- false
- );
+ return load(crawlingRetryCount - 1);
}
if (failreason != null) {
- addURLtoErrorDB(url, refererURLString, initiator, name, failreason, new bitfield(indexURL.urlFlagLength));
+ addURLtoErrorDB(failreason);
}
return null;
} finally {
@@ -595,34 +433,15 @@ public final class CrawlWorker extends Thread {
}
}
- private static void addURLtoErrorDB(
- URL url,
- String referrerString,
- String initiator,
- String name,
- String failreason,
- bitfield flags
- ) {
- // getting a reference to the plasmaSwitchboard
- plasmaSwitchboard sb = plasmaCrawlLoader.switchboard;
-
- // convert the referrer URL into a hash value
- String referrerHash = (referrerString==null)?null:indexURL.urlHash(referrerString);
-
- // create a new errorURL DB entry
- plasmaCrawlEURL.Entry ee = sb.urlPool.errorURL.newEntry(
- url,
- referrerHash,
- initiator,
- yacyCore.seedDB.mySeed.hash,
- name,
- failreason,
- flags
- );
- // store the entry
- ee.store();
- // push it onto the stack
- sb.urlPool.errorURL.stackPushEntry(ee);
- }
-
+ public void close() {
+ if (this.isAlive()) {
+ try {
+ // trying to close all still open httpc-Sockets first
+ int closedSockets = httpc.closeOpenSockets(this);
+ if (closedSockets > 0) {
+ this.log.logInfo(closedSockets + " HTTP-client sockets of thread '" + this.getName() + "' closed.");
+ }
+ } catch (Exception e) {/* ignore this. shutdown in progress */}
+ }
+ }
}
\ No newline at end of file
diff --git a/source/de/anomic/plasma/crawler/plasmaCrawlWorker.java b/source/de/anomic/plasma/crawler/plasmaCrawlWorker.java
new file mode 100644
index 000000000..1fa2753f5
--- /dev/null
+++ b/source/de/anomic/plasma/crawler/plasmaCrawlWorker.java
@@ -0,0 +1,20 @@
+package de.anomic.plasma.crawler;
+
+import java.io.IOException;
+
+import de.anomic.plasma.plasmaCrawlLoaderMessage;
+import de.anomic.plasma.plasmaHTCache;
+
+
+public interface plasmaCrawlWorker {
+
+ public static final String threadBaseName = "CrawlerWorker";
+
+ public void reset();
+ public void execute();
+ public void execute(plasmaCrawlLoaderMessage theNewMsg);
+ public void init();
+
+ public void close();
+ public plasmaHTCache.Entry load() throws IOException;
+}
diff --git a/source/de/anomic/plasma/crawler/plasmaCrawlerFactory.java b/source/de/anomic/plasma/crawler/plasmaCrawlerFactory.java
index cb9986ca9..dfd29d5e5 100644
--- a/source/de/anomic/plasma/crawler/plasmaCrawlerFactory.java
+++ b/source/de/anomic/plasma/crawler/plasmaCrawlerFactory.java
@@ -90,7 +90,7 @@ public final class plasmaCrawlerFactory implements KeyedPoolableObjectFactory {
CrawlWorker theWorker = (CrawlWorker) obj;
synchronized(theWorker) {
theWorker.destroyed = true;
- theWorker.setName(CrawlWorker.threadBaseName + "_destroyed");
+ theWorker.setName(plasmaCrawlWorker.threadBaseName + "_destroyed");
theWorker.setStopped(true);
theWorker.interrupt();
}
diff --git a/source/de/anomic/plasma/crawler/plasmaCrawlerPool.java b/source/de/anomic/plasma/crawler/plasmaCrawlerPool.java
index d15c705ff..2c0772552 100644
--- a/source/de/anomic/plasma/crawler/plasmaCrawlerPool.java
+++ b/source/de/anomic/plasma/crawler/plasmaCrawlerPool.java
@@ -23,7 +23,7 @@ public final class plasmaCrawlerPool extends GenericKeyedObjectPool {
if (obj == null) return;
if (obj instanceof CrawlWorker) {
try {
- ((CrawlWorker)obj).setName(CrawlWorker.threadBaseName + "_inPool");
+ ((CrawlWorker)obj).setName(plasmaCrawlWorker.threadBaseName + "_inPool");
super.returnObject(key,obj);
} catch (Exception e) {
((CrawlWorker)obj).setStopped(true);
@@ -40,7 +40,7 @@ public final class plasmaCrawlerPool extends GenericKeyedObjectPool {
if (this.isClosed) return;
if (obj instanceof CrawlWorker) {
try {
- ((CrawlWorker)obj).setName(CrawlWorker.threadBaseName + "_invalidated");
+ ((CrawlWorker)obj).setName(plasmaCrawlWorker.threadBaseName + "_invalidated");
((CrawlWorker)obj).setStopped(true);
super.invalidateObject(key,obj);
} catch (Exception e) {
diff --git a/source/de/anomic/plasma/plasmaCrawlLoader.java b/source/de/anomic/plasma/plasmaCrawlLoader.java
index 1ee1f9ffc..1604d1ebf 100644
--- a/source/de/anomic/plasma/plasmaCrawlLoader.java
+++ b/source/de/anomic/plasma/plasmaCrawlLoader.java
@@ -142,6 +142,15 @@ public final class plasmaCrawlLoader extends Thread {
return this.theThreadGroup;
}
+ private void execute(plasmaCrawlLoaderMessage theMsg) throws Exception {
+ // getting the protocol of the next URL
+ String protocol = theMsg.url.getProtocol();
+
+ // getting a new crawler from the crawler pool
+ CrawlWorker theWorker = (CrawlWorker) this.crawlwerPool.borrowObject(protocol);
+ if (theWorker != null) theWorker.execute(theMsg);
+ }
+
public void run() {
while (!this.stopped && !Thread.interrupted()) {
@@ -149,12 +158,8 @@ public final class plasmaCrawlLoader extends Thread {
// getting a new message from the crawler queue
plasmaCrawlLoaderMessage theMsg = this.theQueue.waitForMessage();
- // TODO: getting the protocol of the next URL
- String protocol = theMsg.url.getProtocol();
-
- // getting a new crawler from the crawler pool
- CrawlWorker theWorker = (CrawlWorker) this.crawlwerPool.borrowObject(protocol);
- if (theWorker != null) theWorker.execute(theMsg);
+ // start new crawl job
+ this.execute(theMsg);
} catch (InterruptedException e) {
Thread.interrupted();
@@ -177,26 +182,81 @@ public final class plasmaCrawlLoader extends Thread {
}
}
+
+ public plasmaHTCache.Entry loadSync(
+ URL url,
+ String urlName,
+ String referer,
+ String initiator,
+ int depth,
+ plasmaCrawlProfile.entry profile,
+ int timeout
+ ) {
+
+ plasmaHTCache.Entry result = null;
+ if (!this.crawlwerPool.isClosed) {
+ int crawlingPriority = 5;
+
+ // creating a new crawler queue object
+ plasmaCrawlLoaderMessage theMsg = new plasmaCrawlLoaderMessage(
+ url,
+ urlName,
+ referer,
+ initiator,
+ depth,
+ profile,
+ crawlingPriority,
+ true,
+ timeout
+ );
+
+
+ try {
+ // start new crawl job
+ this.execute(theMsg);
+
+ // wait for the crawl job result
+ result = theMsg.waitForResult();
+
+ } catch (Exception e) {
+ this.log.logSevere("plasmaCrawlLoader.loadSync", e);
+ }
+ }
+
+ // return the result
+ return result;
+ }
- public void loadParallel(
+ public void loadAsync(
URL url,
- String name,
+ String urlName,
String referer,
String initiator,
int depth,
- plasmaCrawlProfile.entry profile) {
+ plasmaCrawlProfile.entry profile
+ ) {
if (!this.crawlwerPool.isClosed) {
int crawlingPriority = 5;
// creating a new crawler queue object
- plasmaCrawlLoaderMessage theMsg = new plasmaCrawlLoaderMessage(url, name, referer, initiator, depth, profile, crawlingPriority);
+ plasmaCrawlLoaderMessage theMsg = new plasmaCrawlLoaderMessage(
+ url, // url
+ urlName, // url name
+ referer, // referer URL
+ initiator, // crawling initiator peer
+ depth, // crawling depth
+ profile, // crawling profile
+ crawlingPriority, // crawling priority
+ false, // only download documents whose mimetypes are enabled for the crawler
+ -1 // use default crawler timeout
+ );
// adding the message to the queue
try {
this.theQueue.addMessage(theMsg);
} catch (InterruptedException e) {
- this.log.logSevere("plasmaCrawlLoader.loadParallel", e);
+ this.log.logSevere("plasmaCrawlLoader.loadAsync", e);
}
}
}
diff --git a/source/de/anomic/plasma/plasmaCrawlLoaderMessage.java b/source/de/anomic/plasma/plasmaCrawlLoaderMessage.java
index 5ad330d83..d79674b19 100644
--- a/source/de/anomic/plasma/plasmaCrawlLoaderMessage.java
+++ b/source/de/anomic/plasma/plasmaCrawlLoaderMessage.java
@@ -43,15 +43,22 @@
package de.anomic.plasma;
import de.anomic.net.URL;
+import de.anomic.server.serverSemaphore;
public final class plasmaCrawlLoaderMessage {
public final int crawlingPriority;
+
public final URL url;
public final String name;
public final String referer;
public final String initiator;
public final int depth;
public final plasmaCrawlProfile.entry profile;
+ public final boolean acceptAllContent;
+ public final int timeout;
+
+ private serverSemaphore resultSync = null;
+ private plasmaHTCache.Entry result;
// loadParallel(URL url, String referer, String initiator, int depth, plasmaCrawlProfile.entry profile) {
public plasmaCrawlLoaderMessage(
@@ -61,7 +68,10 @@ public final class plasmaCrawlLoaderMessage {
String initiator,
int depth,
plasmaCrawlProfile.entry profile,
- int crawlingPriority) {
+ int crawlingPriority,
+ boolean acceptAllContent,
+ int timeout
+ ) {
this.url = url;
this.name = name;
this.referer = referer;
@@ -69,5 +79,32 @@ public final class plasmaCrawlLoaderMessage {
this.depth = depth;
this.profile = profile;
this.crawlingPriority = crawlingPriority;
+ this.acceptAllContent = acceptAllContent;
+ this.timeout = timeout;
+
+ this.resultSync = new serverSemaphore(0);
+ this.result = null;
}
+
+ public void setResult(plasmaHTCache.Entry theResult) {
+ // store the result
+ this.result = theResult;
+
+ // notify blocking result readers
+ this.resultSync.V();
+ }
+
+ public plasmaHTCache.Entry waitForResult() throws InterruptedException {
+ plasmaHTCache.Entry theResult = null;
+
+ this.resultSync.P();
+ /* =====> CRITICAL SECTION <======== */
+
+ theResult = this.result;
+
+ /* =====> CRITICAL SECTION <======== */
+ this.resultSync.V();
+
+ return theResult;
+ }
}
\ No newline at end of file
diff --git a/source/de/anomic/plasma/plasmaCrawlStacker.java b/source/de/anomic/plasma/plasmaCrawlStacker.java
index 5af0710ad..42fef71d8 100644
--- a/source/de/anomic/plasma/plasmaCrawlStacker.java
+++ b/source/de/anomic/plasma/plasmaCrawlStacker.java
@@ -50,7 +50,6 @@ import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.MalformedURLException;
-import de.anomic.net.URL;
import java.util.Date;
import java.util.Iterator;
import java.util.LinkedList;
@@ -66,10 +65,9 @@ import de.anomic.kelondro.kelondroFlexTable;
import de.anomic.kelondro.kelondroIndex;
import de.anomic.kelondro.kelondroRow;
import de.anomic.kelondro.kelondroTree;
-import de.anomic.plasma.plasmaCrawlEURL;
+import de.anomic.net.URL;
import de.anomic.plasma.urlPattern.plasmaURLPattern;
import de.anomic.server.serverSemaphore;
-import de.anomic.server.serverThread;
import de.anomic.server.logging.serverLog;
import de.anomic.tools.bitfield;
import de.anomic.yacy.yacyCore;
diff --git a/source/de/anomic/plasma/plasmaSnippetCache.java b/source/de/anomic/plasma/plasmaSnippetCache.java
index 073e9cc8a..2ebdc5213 100644
--- a/source/de/anomic/plasma/plasmaSnippetCache.java
+++ b/source/de/anomic/plasma/plasmaSnippetCache.java
@@ -419,18 +419,18 @@ public class plasmaSnippetCache {
}
public plasmaHTCache.Entry loadResourceFromWeb(URL url, int socketTimeout) throws IOException {
- return CrawlWorker.load(
- url,
- "",
- null,
- null,
- 0,
- null,
- socketTimeout,
- this.sb.remoteProxyConfig,
- this.cacheManager,
- true,
- this.log);
+
+ plasmaHTCache.Entry result = this.sb.cacheLoader.loadSync(
+ url,
+ "",
+ null,
+ null,
+ 0,
+ null,
+ socketTimeout
+ );
+
+ return result;
}
public void fetch(plasmaSearchResult acc, Set queryhashes, String urlmask, int fetchcount, long maxTime) {
diff --git a/source/de/anomic/plasma/plasmaSwitchboard.java b/source/de/anomic/plasma/plasmaSwitchboard.java
index a333a7406..f10a893aa 100644
--- a/source/de/anomic/plasma/plasmaSwitchboard.java
+++ b/source/de/anomic/plasma/plasmaSwitchboard.java
@@ -1874,7 +1874,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
} catch (IOException e) {
refererURL = null;
}
- cacheLoader.loadParallel(urlEntry.url(), urlEntry.name(), (refererURL!=null)?refererURL.toString():null, urlEntry.initiator(), urlEntry.depth(), profile);
+ cacheLoader.loadAsync(urlEntry.url(), urlEntry.name(), (refererURL!=null)?refererURL.toString():null, urlEntry.initiator(), urlEntry.depth(), profile);
log.logInfo(stats + ": enqueued for load " + urlEntry.url() + " [" + urlEntry.hash() + "]");
return;
}
diff --git a/yacy.init b/yacy.init
index 2e81d3943..a9f34757a 100644
--- a/yacy.init
+++ b/yacy.init
@@ -631,6 +631,7 @@ msgForwardingTo=root@localhost
onlineCautionDelay=30000
# Some configuration values for the crawler
+crawler.acceptEncoding=gzip,deflate
crawler.acceptLanguage=en-us,en;q=0.5
crawler.acceptCharset=ISO-8859-1,utf-8;q=0.7,*;q=0.7
crawler.clientTimeout=9000