*) next step of restructuring for new crawlers

- adding interface class (plasma/crawler/plasmaCrawlWorker.java) for protocol specific crawl-worker threads 
   - moving reusable code into abstract crawl-worker class AbstractCrawlWorker.java
   - the load method of the worker threads should not be called directly anymore (e.g. by the snippet fetcher)
     to crawl a page and wait for the result use function plasmaCrawlLoader.loadSync([...])

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@2474 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
theli 19 years ago
parent eb9b138986
commit 09b106eb04

@ -0,0 +1,214 @@
package de.anomic.plasma.crawler;
import java.io.IOException;
import de.anomic.index.indexURL;
import de.anomic.net.URL;
import de.anomic.plasma.plasmaCrawlEURL;
import de.anomic.plasma.plasmaCrawlLoaderMessage;
import de.anomic.plasma.plasmaCrawlProfile;
import de.anomic.plasma.plasmaHTCache;
import de.anomic.plasma.plasmaSwitchboard;
import de.anomic.server.logging.serverLog;
import de.anomic.tools.bitfield;
import de.anomic.yacy.yacyCore;
public abstract class AbstractCrawlWorker extends Thread implements plasmaCrawlWorker {
/**
* The protocol that is supported by this crawler
* e.g. <code>http</code>, <code>ftp</code>, etc.
*/
protected String protocol;
/* ============================================================
* Variables for thread pool management
* ============================================================ */
public boolean destroyed = false;
protected boolean running = false;
protected boolean stopped = false;
protected boolean done = false;
/* ============================================================
* Crawl job specific variables
* ============================================================ */
public plasmaCrawlLoaderMessage theMsg;
protected URL url;
protected String name;
protected String refererURLString;
protected String initiator;
protected int depth;
protected long startdate;
protected plasmaCrawlProfile.entry profile;
protected boolean acceptAllContent;
/**
* The crawler thread pool
*/
protected final plasmaCrawlerPool myPool;
/**
* reference to the plasma switchboard
*/
protected final plasmaSwitchboard sb;
/**
* reference to the cache manager
*/
protected final plasmaHTCache cacheManager;
/**
* Logging class
*/
protected final serverLog log;
/**
* Constructor of this class
* @param theTG the crawl worker thread group
* @param thePool the crawl worker thread pool
* @param theSb plasma switchboard
* @param theCacheManager cache manager
* @param theLog server log
*/
public AbstractCrawlWorker(
ThreadGroup theTG,
plasmaCrawlerPool thePool,
plasmaSwitchboard theSb,
plasmaHTCache theCacheManager,
serverLog theLog
) {
super(theTG,plasmaCrawlWorker.threadBaseName + "_created");
this.myPool = thePool;
this.sb = theSb;
this.cacheManager = theCacheManager;
this.log = theLog;
}
public abstract void close();
public void run() {
this.running = true;
try {
// The thread keeps running.
while (!this.stopped && !this.isInterrupted() && !this.myPool.isClosed) {
if (this.done) {
synchronized (this) {
// return thread back into pool
this.myPool.returnObject(this.protocol,this);
// We are waiting for a new task now.
if (!this.stopped && !this.destroyed && !this.isInterrupted()) {
this.wait();
}
}
} else {
try {
// executing the new task
execute();
} finally {
reset();
}
}
}
} catch (InterruptedException ex) {
serverLog.logFiner("CRAWLER-POOL","Interruption of thread '" + this.getName() + "' detected.");
} finally {
if (this.myPool != null && !this.destroyed)
this.myPool.invalidateObject(this.protocol,this);
}
}
public void execute() {
try {
// setting threadname
this.setName(plasmaCrawlWorker.threadBaseName + "_" + this.url);
// load some configuration variables
init();
// loading resource
plasmaHTCache.Entry resource = load();
// store a reference to the result in the message object
// this is e.g. needed by the snippet fetcher
this.theMsg.setResult(resource);
} catch (IOException e) {
//throw e;
} finally {
this.done = true;
}
}
public void execute(plasmaCrawlLoaderMessage theNewMsg) {
synchronized (this) {
this.theMsg = theNewMsg;
this.url = theNewMsg.url;
this.name = theNewMsg.name;
this.refererURLString = theNewMsg.referer;
this.initiator = theNewMsg.initiator;
this.depth = theNewMsg.depth;
this.profile = theNewMsg.profile;
this.acceptAllContent = theNewMsg.acceptAllContent;
this.startdate = System.currentTimeMillis();
this.done = false;
if (!this.running) {
// if the thread is not running until yet, we need to start it now
this.start();
} else {
// inform the thread about the new crawl job
this.notifyAll();
}
}
}
public void setStopped(boolean isStopped) {
this.stopped = isStopped;
}
public boolean isRunning() {
return this.running;
}
public void reset() {
this.theMsg = null;
this.url = null;
this.name = null;
this.refererURLString = null;
this.initiator = null;
this.depth = 0;
this.startdate = 0;
this.profile = null;
this.acceptAllContent = false;
}
protected void addURLtoErrorDB(String failreason) {
// convert the referrer URL into a hash value
String referrerHash = (this.refererURLString==null)?null:indexURL.urlHash(this.refererURLString);
// create a new errorURL DB entry
plasmaCrawlEURL.Entry ee = this.sb.urlPool.errorURL.newEntry(
this.url,
referrerHash,
this.initiator,
yacyCore.seedDB.mySeed.hash,
this.name,
(failreason==null)?"Unknown reason":failreason,
new bitfield(indexURL.urlFlagLength)
);
// store the entry
ee.store();
// push it onto the stack
this.sb.urlPool.errorURL.stackPushEntry(ee);
}
}

@ -61,59 +61,51 @@ import de.anomic.index.indexURL;
import de.anomic.net.URL; import de.anomic.net.URL;
import de.anomic.plasma.plasmaCrawlEURL; import de.anomic.plasma.plasmaCrawlEURL;
import de.anomic.plasma.plasmaCrawlLoader; import de.anomic.plasma.plasmaCrawlLoader;
import de.anomic.plasma.plasmaCrawlLoaderMessage;
import de.anomic.plasma.plasmaCrawlProfile;
import de.anomic.plasma.plasmaHTCache; import de.anomic.plasma.plasmaHTCache;
import de.anomic.plasma.plasmaParser; import de.anomic.plasma.plasmaParser;
import de.anomic.plasma.plasmaSwitchboard; import de.anomic.plasma.plasmaSwitchboard;
import de.anomic.plasma.crawler.AbstractCrawlWorker;
import de.anomic.plasma.crawler.plasmaCrawlerPool; import de.anomic.plasma.crawler.plasmaCrawlerPool;
import de.anomic.plasma.urlPattern.plasmaURLPattern; import de.anomic.plasma.urlPattern.plasmaURLPattern;
import de.anomic.server.serverSystem; import de.anomic.server.serverSystem;
import de.anomic.server.logging.serverLog; import de.anomic.server.logging.serverLog;
import de.anomic.tools.bitfield;
import de.anomic.yacy.yacyCore;
public final class CrawlWorker extends Thread { public final class CrawlWorker extends AbstractCrawlWorker {
public static final int DEFAULT_CRAWLING_RETRY_COUNT = 5; public static final int DEFAULT_CRAWLING_RETRY_COUNT = 5;
public static final String threadBaseName = "CrawlerWorker";
private final plasmaCrawlerPool myPool; /**
private final plasmaSwitchboard sb; * The socket timeout that should be used
private final plasmaHTCache cacheManager; */
private final serverLog log;
private int socketTimeout; private int socketTimeout;
public plasmaCrawlLoaderMessage theMsg; /**
private URL url; * The remote http proxy that should be used
private String name; */
private String referer; private httpRemoteProxyConfig remoteProxyConfig;
private String initiator;
private int depth; private String acceptEncoding;
private long startdate; private String acceptLanguage;
private plasmaCrawlProfile.entry profile; private String acceptCharset;
// private String error;
/**
public boolean destroyed = false; * Constructor of this class
private boolean running = false; * @param theTG
private boolean stopped = false; * @param thePool
private boolean done = false; * @param theSb
* @param theCacheManager
* @param theLog
*/
public CrawlWorker( public CrawlWorker(
ThreadGroup theTG, ThreadGroup theTG,
plasmaCrawlerPool thePool, plasmaCrawlerPool thePool,
plasmaSwitchboard theSb, plasmaSwitchboard theSb,
plasmaHTCache theCacheManager, plasmaHTCache theCacheManager,
serverLog theLog) { serverLog theLog) {
super(theTG,threadBaseName + "_created"); super(theTG,thePool,theSb,theCacheManager,theLog);
this.myPool = thePool;
this.sb = theSb;
this.cacheManager = theCacheManager;
this.log = theLog;
// setting the crawler timeout properly // this crawler supports http
this.socketTimeout = (int) this.sb.getConfigLong("crawler.clientTimeout", 10000); this.protocol = "http";
} }
public long getDuration() { public long getDuration() {
@ -121,189 +113,58 @@ public final class CrawlWorker extends Thread {
return (startDate != 0) ? System.currentTimeMillis() - startDate : 0; return (startDate != 0) ? System.currentTimeMillis() - startDate : 0;
} }
public synchronized void execute(plasmaCrawlLoaderMessage theNewMsg) { public void init() {
this.theMsg = theNewMsg; // refreshing timeout value
if (this.theMsg.timeout < 0) {
this.url = theNewMsg.url;
this.name = theNewMsg.name;
this.referer = theNewMsg.referer;
this.initiator = theNewMsg.initiator;
this.depth = theNewMsg.depth;
this.profile = theNewMsg.profile;
this.startdate = System.currentTimeMillis();
// this.error = null;
this.done = false;
if (!this.running) {
// this.setDaemon(true);
this.start();
} else {
this.notifyAll();
}
}
public void reset() {
this.theMsg = null;
this.url = null;
this.referer = null;
this.initiator = null;
this.depth = 0;
this.startdate = 0;
this.profile = null;
// this.error = null;
}
public void run() {
this.running = true;
try {
// The thread keeps running.
while (!this.stopped && !this.isInterrupted() && !this.myPool.isClosed) {
if (this.done) {
synchronized (this) {
// return thread back into pool
this.myPool.returnObject("http",this);
// We are waiting for a new task now.
if (!this.stopped && !this.destroyed && !this.isInterrupted()) {
this.wait();
}
}
} else {
try {
// executing the new task
execute();
} finally {
reset();
}
}
}
} catch (InterruptedException ex) {
serverLog.logFiner("CRAWLER-POOL","Interruption of thread '" + this.getName() + "' detected.");
} finally {
if (this.myPool != null && !this.destroyed)
this.myPool.invalidateObject("http",this);
}
}
public void execute() {
try {
// setting threadname
this.setName(CrawlWorker.threadBaseName + "_" + this.url);
// refreshing timeout value
this.socketTimeout = (int) this.sb.getConfigLong("crawler.clientTimeout", 10000); this.socketTimeout = (int) this.sb.getConfigLong("crawler.clientTimeout", 10000);
} else {
// loading resource this.socketTimeout = this.theMsg.timeout;
load(this.url,
this.name,
this.referer,
this.initiator,
this.depth,
this.profile,
this.socketTimeout,
this.sb.remoteProxyConfig,
this.cacheManager,
false,
this.log
);
} catch (IOException e) {
//throw e;
}
finally {
this.done = true;
} }
}
public void setStopped(boolean isStopped) { // some http header values
this.stopped = isStopped; this.acceptEncoding = this.sb.getConfig("crawler.acceptEncoding", "gzip,deflate");
} this.acceptLanguage = this.sb.getConfig("crawler.acceptLanguage","en-us,en;q=0.5");
this.acceptCharset = this.sb.getConfig("crawler.acceptCharset","ISO-8859-1,utf-8;q=0.7,*;q=0.7");
public boolean isRunning() { // getting the http proxy config
return this.running; this.remoteProxyConfig = this.sb.remoteProxyConfig;
} }
public void close() { public plasmaHTCache.Entry load() throws IOException {
if (this.isAlive()) { return load(DEFAULT_CRAWLING_RETRY_COUNT);
try {
// trying to close all still open httpc-Sockets first
int closedSockets = httpc.closeOpenSockets(this);
if (closedSockets > 0) {
this.log.logInfo(closedSockets + " HTTP-client sockets of thread '" + this.getName() + "' closed.");
}
} catch (Exception e) {/* ignore this. shutdown in progress */}
}
} }
protected plasmaHTCache.Entry createCacheEntry(Date requestDate, httpHeader requestHeader, httpc.response response) {
public static plasmaHTCache.Entry load( return this.cacheManager.newEntry(
URL url, requestDate,
String name, this.depth,
String referer, this.url,
String initiator, this.name,
int depth, requestHeader,
plasmaCrawlProfile.entry profile, response.status,
int socketTimeout, response.responseHeader,
httpRemoteProxyConfig theRemoteProxyConfig, this.initiator,
plasmaHTCache cacheManager, this.profile
boolean acceptAllContent,
serverLog log
) throws IOException {
return load(url,
name,
referer,
initiator,
depth,
profile,
socketTimeout,
theRemoteProxyConfig,
cacheManager,
log,
DEFAULT_CRAWLING_RETRY_COUNT,
true,
acceptAllContent
); );
} }
private static plasmaHTCache.Entry load( private plasmaHTCache.Entry load(int crawlingRetryCount) throws IOException {
URL url,
String name,
String refererURLString,
String initiator,
int depth,
plasmaCrawlProfile.entry profile,
int socketTimeout,
httpRemoteProxyConfig theRemoteProxyConfig,
plasmaHTCache cacheManager,
serverLog log,
int crawlingRetryCount,
boolean useContentEncodingGzip,
boolean acceptAllContent
) throws IOException {
if (url == null) return null;
// if the recrawling limit was exceeded we stop crawling now // if the recrawling limit was exceeded we stop crawling now
if (crawlingRetryCount <= 0) return null; if (crawlingRetryCount <= 0) return null;
// getting a reference to the plasmaSwitchboard
plasmaSwitchboard sb = plasmaCrawlLoader.switchboard;
Date requestDate = new Date(); // remember the time... Date requestDate = new Date(); // remember the time...
String host = url.getHost(); String host = this.url.getHost();
String path = url.getFile(); String path = this.url.getFile();
int port = url.getPort(); int port = this.url.getPort();
boolean ssl = url.getProtocol().equals("https"); boolean ssl = this.url.getProtocol().equals("https");
if (port < 0) port = (ssl) ? 443 : 80; if (port < 0) port = (ssl) ? 443 : 80;
refererURLString = (refererURLString == null) ? "" : refererURLString.trim();
// check if url is in blacklist // check if url is in blacklist
String hostlow = host.toLowerCase(); String hostlow = host.toLowerCase();
if (plasmaSwitchboard.urlBlacklist.isListed(plasmaURLPattern.BLACKLIST_CRAWLER, hostlow, path)) { if (plasmaSwitchboard.urlBlacklist.isListed(plasmaURLPattern.BLACKLIST_CRAWLER, hostlow, path)) {
log.logInfo("CRAWLER Rejecting URL '" + url.toString() + "'. URL is in blacklist."); this.log.logInfo("CRAWLER Rejecting URL '" + this.url.toString() + "'. URL is in blacklist.");
addURLtoErrorDB(url, refererURLString, initiator, name, plasmaCrawlEURL.DENIED_URL_IN_BLACKLIST, new bitfield(indexURL.urlFlagLength)); addURLtoErrorDB(plasmaCrawlEURL.DENIED_URL_IN_BLACKLIST);
return null; return null;
} }
@ -317,20 +178,22 @@ public final class CrawlWorker extends Thread {
// create a request header // create a request header
httpHeader requestHeader = new httpHeader(); httpHeader requestHeader = new httpHeader();
requestHeader.put(httpHeader.USER_AGENT, httpdProxyHandler.crawlerUserAgent); requestHeader.put(httpHeader.USER_AGENT, httpdProxyHandler.crawlerUserAgent);
requestHeader.put(httpHeader.REFERER, refererURLString); if (this.refererURLString != null && this.refererURLString.length() > 0)
requestHeader.put(httpHeader.ACCEPT_LANGUAGE, sb.getConfig("crawler.acceptLanguage","en-us,en;q=0.5")); requestHeader.put(httpHeader.REFERER, this.refererURLString);
requestHeader.put(httpHeader.ACCEPT_CHARSET, sb.getConfig("crawler.acceptCharset","ISO-8859-1,utf-8;q=0.7,*;q=0.7")); if (this.acceptLanguage != null && this.acceptLanguage.length() > 0)
if (useContentEncodingGzip) requestHeader.put(httpHeader.ACCEPT_ENCODING, "gzip,deflate"); requestHeader.put(httpHeader.ACCEPT_LANGUAGE, this.acceptLanguage);
if (this.acceptCharset != null && this.acceptCharset.length() > 0)
// System.out.println("CRAWLER_REQUEST_HEADER=" + requestHeader.toString()); // DEBUG requestHeader.put(httpHeader.ACCEPT_CHARSET, this.acceptCharset);
if (this.acceptEncoding != null && this.acceptEncoding.length() > 0)
requestHeader.put(httpHeader.ACCEPT_ENCODING, this.acceptEncoding);
// open the connection // open the connection
remote = ((theRemoteProxyConfig != null) && (theRemoteProxyConfig.useProxy())) remote = ((this.remoteProxyConfig != null) && (this.remoteProxyConfig.useProxy()))
? httpc.getInstance(host, host, port, socketTimeout, ssl, theRemoteProxyConfig,"CRAWLER",null) ? httpc.getInstance(host, host, port, this.socketTimeout, ssl, this.remoteProxyConfig,"CRAWLER",null)
: httpc.getInstance(host, host, port, socketTimeout, ssl, "CRAWLER",null); : httpc.getInstance(host, host, port, this.socketTimeout, ssl, "CRAWLER",null);
// specifying if content encoding is allowed // specifying if content encoding is allowed
remote.setAllowContentEncoding(useContentEncodingGzip); remote.setAllowContentEncoding((this.acceptEncoding != null && this.acceptEncoding.length() > 0));
// send request // send request
httpc.response res = remote.GET(path, requestHeader); httpc.response res = remote.GET(path, requestHeader);
@ -339,33 +202,33 @@ public final class CrawlWorker extends Thread {
// the transfer is ok // the transfer is ok
// create a new cache entry // create a new cache entry
htCache = cacheManager.newEntry(requestDate, depth, url, name, requestHeader, res.status, res.responseHeader, initiator, profile); htCache = createCacheEntry(requestDate, requestHeader, res);
// aborting download if content is to long ... // aborting download if content is to long ...
if (htCache.cacheFile.getAbsolutePath().length() > serverSystem.maxPathLength) { if (htCache.cacheFile.getAbsolutePath().length() > serverSystem.maxPathLength) {
remote.close(); remote.close();
log.logInfo("REJECTED URL " + url.toString() + " because path too long '" + cacheManager.cachePath.getAbsolutePath() + "'"); this.log.logInfo("REJECTED URL " + this.url.toString() + " because path too long '" + this.cacheManager.cachePath.getAbsolutePath() + "'");
addURLtoErrorDB(url, refererURLString, initiator, name, plasmaCrawlEURL.DENIED_CACHEFILE_PATH_TOO_LONG, new bitfield(indexURL.urlFlagLength)); addURLtoErrorDB(plasmaCrawlEURL.DENIED_CACHEFILE_PATH_TOO_LONG);
return (htCache = null); return (htCache = null);
} }
// reserve cache entry // reserve cache entry
if (!htCache.cacheFile.getCanonicalPath().startsWith(cacheManager.cachePath.getCanonicalPath())) { if (!htCache.cacheFile.getCanonicalPath().startsWith(this.cacheManager.cachePath.getCanonicalPath())) {
// if the response has not the right file type then reject file // if the response has not the right file type then reject file
remote.close(); remote.close();
log.logInfo("REJECTED URL " + url.toString() + " because of an invalid file path ('" + this.log.logInfo("REJECTED URL " + this.url.toString() + " because of an invalid file path ('" +
htCache.cacheFile.getCanonicalPath() + "' does not start with '" + htCache.cacheFile.getCanonicalPath() + "' does not start with '" +
cacheManager.cachePath.getAbsolutePath() + "')."); this.cacheManager.cachePath.getAbsolutePath() + "').");
addURLtoErrorDB(url, refererURLString, initiator, name, plasmaCrawlEURL.DENIED_INVALID_CACHEFILE_PATH, new bitfield(indexURL.urlFlagLength)); addURLtoErrorDB(plasmaCrawlEURL.DENIED_INVALID_CACHEFILE_PATH);
return (htCache = null); return (htCache = null);
} }
// request has been placed and result has been returned. work off response // request has been placed and result has been returned. work off response
File cacheFile = cacheManager.getCachePath(url); File cacheFile = this.cacheManager.getCachePath(this.url);
try { try {
if ((acceptAllContent) || (plasmaParser.supportedContent(plasmaParser.PARSER_MODE_CRAWLER,url,res.responseHeader.mime()))) { if ((acceptAllContent) || (plasmaParser.supportedContent(plasmaParser.PARSER_MODE_CRAWLER,this.url,res.responseHeader.mime()))) {
if (cacheFile.isFile()) { if (cacheFile.isFile()) {
cacheManager.deleteFile(url); this.cacheManager.deleteFile(this.url);
} }
// we write the new cache entry to file system directly // we write the new cache entry to file system directly
cacheFile.getParentFile().mkdirs(); cacheFile.getParentFile().mkdirs();
@ -374,21 +237,21 @@ public final class CrawlWorker extends Thread {
fos = new FileOutputStream(cacheFile); fos = new FileOutputStream(cacheFile);
res.writeContent(fos); // superfluous write to array res.writeContent(fos); // superfluous write to array
htCache.cacheArray = null; htCache.cacheArray = null;
cacheManager.writeFileAnnouncement(cacheFile); this.cacheManager.writeFileAnnouncement(cacheFile);
//htCache.cacheArray = res.writeContent(fos); // writes in cacheArray and cache file //htCache.cacheArray = res.writeContent(fos); // writes in cacheArray and cache file
} finally { } finally {
if (fos!=null)try{fos.close();}catch(Exception e){} if (fos!=null)try{fos.close();}catch(Exception e){/* ignore this */}
} }
// enQueue new entry with response header // enQueue new entry with response header
if (profile != null) { if (this.profile != null) {
cacheManager.push(htCache); this.cacheManager.push(htCache);
} }
} else { } else {
// if the response has not the right file type then reject file // if the response has not the right file type then reject file
remote.close(); remote.close();
log.logInfo("REJECTED WRONG MIME/EXT TYPE " + res.responseHeader.mime() + " for URL " + url.toString()); this.log.logInfo("REJECTED WRONG MIME/EXT TYPE " + res.responseHeader.mime() + " for URL " + this.url.toString());
addURLtoErrorDB(url, refererURLString, initiator, name, plasmaCrawlEURL.DENIED_WRONG_MIMETYPE_OR_EXT, new bitfield(indexURL.urlFlagLength)); addURLtoErrorDB(plasmaCrawlEURL.DENIED_WRONG_MIMETYPE_OR_EXT);
htCache = null; htCache = null;
} }
} catch (SocketException e) { } catch (SocketException e) {
@ -398,8 +261,8 @@ public final class CrawlWorker extends Thread {
// but we clean the cache also, since it may be only partial // but we clean the cache also, since it may be only partial
// and most possible corrupted // and most possible corrupted
if (cacheFile.exists()) cacheFile.delete(); if (cacheFile.exists()) cacheFile.delete();
log.logSevere("CRAWLER LOADER ERROR1: with URL=" + url.toString() + ": " + e.toString()); this.log.logSevere("CRAWLER LOADER ERROR1: with URL=" + this.url.toString() + ": " + e.toString());
addURLtoErrorDB(url, refererURLString, initiator, name, plasmaCrawlEURL.DENIED_CONNECTION_ERROR, new bitfield(indexURL.urlFlagLength)); addURLtoErrorDB(plasmaCrawlEURL.DENIED_CONNECTION_ERROR);
htCache = null; htCache = null;
} }
} else if (res.status.startsWith("30")) { } else if (res.status.startsWith("30")) {
@ -410,13 +273,13 @@ public final class CrawlWorker extends Thread {
redirectionUrlString = redirectionUrlString.trim(); redirectionUrlString = redirectionUrlString.trim();
if (redirectionUrlString.length() == 0) { if (redirectionUrlString.length() == 0) {
log.logWarning("CRAWLER Redirection of URL=" + url.toString() + " aborted. Location header is empty."); this.log.logWarning("CRAWLER Redirection of URL=" + this.url.toString() + " aborted. Location header is empty.");
addURLtoErrorDB(url, refererURLString, initiator, name, plasmaCrawlEURL.DENIED_REDIRECTION_HEADER_EMPTY, new bitfield(indexURL.urlFlagLength)); addURLtoErrorDB(plasmaCrawlEURL.DENIED_REDIRECTION_HEADER_EMPTY);
return null; return null;
} }
// normalizing URL // normalizing URL
redirectionUrlString = new URL(url, redirectionUrlString).toNormalform(); redirectionUrlString = new URL(this.url, redirectionUrlString).toNormalform();
// generating the new URL object // generating the new URL object
URL redirectionUrl = new URL(redirectionUrlString); URL redirectionUrl = new URL(redirectionUrlString);
@ -426,13 +289,13 @@ public final class CrawlWorker extends Thread {
remote = null; remote = null;
// restart crawling with new url // restart crawling with new url
log.logInfo("CRAWLER Redirection detected ('" + res.status + "') for URL " + url.toString()); this.log.logInfo("CRAWLER Redirection detected ('" + res.status + "') for URL " + this.url.toString());
log.logInfo("CRAWLER ..Redirecting request to: " + redirectionUrl); this.log.logInfo("CRAWLER ..Redirecting request to: " + redirectionUrl);
// if we are already doing a shutdown we don't need to retry crawling // if we are already doing a shutdown we don't need to retry crawling
if (Thread.currentThread().isInterrupted()) { if (Thread.currentThread().isInterrupted()) {
log.logSevere("CRAWLER Retry of URL=" + url.toString() + " aborted because of server shutdown."); this.log.logSevere("CRAWLER Retry of URL=" + this.url.toString() + " aborted because of server shutdown.");
addURLtoErrorDB(url, refererURLString, initiator, name, plasmaCrawlEURL.DENIED_SERVER_SHUTDOWN, new bitfield(indexURL.urlFlagLength)); addURLtoErrorDB(plasmaCrawlEURL.DENIED_SERVER_SHUTDOWN);
return null; return null;
} }
@ -443,20 +306,8 @@ public final class CrawlWorker extends Thread {
plasmaCrawlLoader.switchboard.urlPool.noticeURL.remove(urlhash); plasmaCrawlLoader.switchboard.urlPool.noticeURL.remove(urlhash);
// retry crawling with new url // retry crawling with new url
plasmaHTCache.Entry redirectedEntry = load(redirectionUrl, this.url = redirectionUrl;
name, plasmaHTCache.Entry redirectedEntry = load(crawlingRetryCount-1);
refererURLString,
initiator,
depth,
profile,
socketTimeout,
theRemoteProxyConfig,
cacheManager,
log,
--crawlingRetryCount,
useContentEncodingGzip,
acceptAllContent
);
if (redirectedEntry != null) { if (redirectedEntry != null) {
// TODO: Here we can store the content of the redirection // TODO: Here we can store the content of the redirection
@ -475,15 +326,15 @@ public final class CrawlWorker extends Thread {
} }
} }
} else { } else {
log.logInfo("Redirection counter exceeded for URL " + url.toString() + ". Processing aborted."); this.log.logInfo("Redirection counter exceeded for URL " + this.url.toString() + ". Processing aborted.");
addURLtoErrorDB(url, refererURLString, initiator, name, plasmaCrawlEURL.DENIED_REDIRECTION_COUNTER_EXCEEDED, new bitfield(indexURL.urlFlagLength)); addURLtoErrorDB(plasmaCrawlEURL.DENIED_REDIRECTION_COUNTER_EXCEEDED);
} }
}else { }else {
// if the response has not the right response type then reject file // if the response has not the right response type then reject file
log.logInfo("REJECTED WRONG STATUS TYPE '" + res.status + "' for URL " + url.toString()); this.log.logInfo("REJECTED WRONG STATUS TYPE '" + res.status + "' for URL " + this.url.toString());
// not processed any further // not processed any further
addURLtoErrorDB(url, refererURLString, initiator, name, plasmaCrawlEURL.DENIED_WRONG_HTTP_STATUSCODE + res.statusCode + ")", new bitfield(indexURL.urlFlagLength)); addURLtoErrorDB(plasmaCrawlEURL.DENIED_WRONG_HTTP_STATUSCODE + res.statusCode + ")");
} }
if (remote != null) remote.close(); if (remote != null) remote.close();
@ -498,13 +349,13 @@ public final class CrawlWorker extends Thread {
(errorMsg.indexOf("socket closed") >= 0) && (errorMsg.indexOf("socket closed") >= 0) &&
(Thread.currentThread().isInterrupted()) (Thread.currentThread().isInterrupted())
) { ) {
log.logInfo("CRAWLER Interruption detected because of server shutdown."); this.log.logInfo("CRAWLER Interruption detected because of server shutdown.");
failreason = plasmaCrawlEURL.DENIED_SERVER_SHUTDOWN; failreason = plasmaCrawlEURL.DENIED_SERVER_SHUTDOWN;
} else if (e instanceof MalformedURLException) { } else if (e instanceof MalformedURLException) {
log.logWarning("CRAWLER Malformed URL '" + url.toString() + "' detected. "); this.log.logWarning("CRAWLER Malformed URL '" + this.url.toString() + "' detected. ");
failreason = plasmaCrawlEURL.DENIED_MALFORMED_URL; failreason = plasmaCrawlEURL.DENIED_MALFORMED_URL;
} else if (e instanceof NoRouteToHostException) { } else if (e instanceof NoRouteToHostException) {
log.logWarning("CRAWLER No route to host found while trying to crawl URL '" + url.toString() + "'."); this.log.logWarning("CRAWLER No route to host found while trying to crawl URL '" + this.url.toString() + "'.");
failreason = plasmaCrawlEURL.DENIED_NO_ROUTE_TO_HOST; failreason = plasmaCrawlEURL.DENIED_NO_ROUTE_TO_HOST;
} else if ((e instanceof UnknownHostException) || } else if ((e instanceof UnknownHostException) ||
((errorMsg != null) && (errorMsg.indexOf("unknown host") >= 0))) { ((errorMsg != null) && (errorMsg.indexOf("unknown host") >= 0))) {
@ -571,23 +422,10 @@ public final class CrawlWorker extends Thread {
if (crawlingRetryCount > 2) crawlingRetryCount = 2; if (crawlingRetryCount > 2) crawlingRetryCount = 2;
// retry crawling // retry crawling
return load(url, return load(crawlingRetryCount - 1);
name,
refererURLString,
initiator,
depth,
profile,
socketTimeout,
theRemoteProxyConfig,
cacheManager,
log,
--crawlingRetryCount,
false,
false
);
} }
if (failreason != null) { if (failreason != null) {
addURLtoErrorDB(url, refererURLString, initiator, name, failreason, new bitfield(indexURL.urlFlagLength)); addURLtoErrorDB(failreason);
} }
return null; return null;
} finally { } finally {
@ -595,34 +433,15 @@ public final class CrawlWorker extends Thread {
} }
} }
private static void addURLtoErrorDB( public void close() {
URL url, if (this.isAlive()) {
String referrerString, try {
String initiator, // trying to close all still open httpc-Sockets first
String name, int closedSockets = httpc.closeOpenSockets(this);
String failreason, if (closedSockets > 0) {
bitfield flags this.log.logInfo(closedSockets + " HTTP-client sockets of thread '" + this.getName() + "' closed.");
) { }
// getting a reference to the plasmaSwitchboard } catch (Exception e) {/* ignore this. shutdown in progress */}
plasmaSwitchboard sb = plasmaCrawlLoader.switchboard; }
// convert the referrer URL into a hash value
String referrerHash = (referrerString==null)?null:indexURL.urlHash(referrerString);
// create a new errorURL DB entry
plasmaCrawlEURL.Entry ee = sb.urlPool.errorURL.newEntry(
url,
referrerHash,
initiator,
yacyCore.seedDB.mySeed.hash,
name,
failreason,
flags
);
// store the entry
ee.store();
// push it onto the stack
sb.urlPool.errorURL.stackPushEntry(ee);
} }
} }

@ -0,0 +1,20 @@
package de.anomic.plasma.crawler;
import java.io.IOException;
import de.anomic.plasma.plasmaCrawlLoaderMessage;
import de.anomic.plasma.plasmaHTCache;
public interface plasmaCrawlWorker {
public static final String threadBaseName = "CrawlerWorker";
public void reset();
public void execute();
public void execute(plasmaCrawlLoaderMessage theNewMsg);
public void init();
public void close();
public plasmaHTCache.Entry load() throws IOException;
}

@ -90,7 +90,7 @@ public final class plasmaCrawlerFactory implements KeyedPoolableObjectFactory {
CrawlWorker theWorker = (CrawlWorker) obj; CrawlWorker theWorker = (CrawlWorker) obj;
synchronized(theWorker) { synchronized(theWorker) {
theWorker.destroyed = true; theWorker.destroyed = true;
theWorker.setName(CrawlWorker.threadBaseName + "_destroyed"); theWorker.setName(plasmaCrawlWorker.threadBaseName + "_destroyed");
theWorker.setStopped(true); theWorker.setStopped(true);
theWorker.interrupt(); theWorker.interrupt();
} }

@ -23,7 +23,7 @@ public final class plasmaCrawlerPool extends GenericKeyedObjectPool {
if (obj == null) return; if (obj == null) return;
if (obj instanceof CrawlWorker) { if (obj instanceof CrawlWorker) {
try { try {
((CrawlWorker)obj).setName(CrawlWorker.threadBaseName + "_inPool"); ((CrawlWorker)obj).setName(plasmaCrawlWorker.threadBaseName + "_inPool");
super.returnObject(key,obj); super.returnObject(key,obj);
} catch (Exception e) { } catch (Exception e) {
((CrawlWorker)obj).setStopped(true); ((CrawlWorker)obj).setStopped(true);
@ -40,7 +40,7 @@ public final class plasmaCrawlerPool extends GenericKeyedObjectPool {
if (this.isClosed) return; if (this.isClosed) return;
if (obj instanceof CrawlWorker) { if (obj instanceof CrawlWorker) {
try { try {
((CrawlWorker)obj).setName(CrawlWorker.threadBaseName + "_invalidated"); ((CrawlWorker)obj).setName(plasmaCrawlWorker.threadBaseName + "_invalidated");
((CrawlWorker)obj).setStopped(true); ((CrawlWorker)obj).setStopped(true);
super.invalidateObject(key,obj); super.invalidateObject(key,obj);
} catch (Exception e) { } catch (Exception e) {

@ -142,6 +142,15 @@ public final class plasmaCrawlLoader extends Thread {
return this.theThreadGroup; return this.theThreadGroup;
} }
private void execute(plasmaCrawlLoaderMessage theMsg) throws Exception {
// getting the protocol of the next URL
String protocol = theMsg.url.getProtocol();
// getting a new crawler from the crawler pool
CrawlWorker theWorker = (CrawlWorker) this.crawlwerPool.borrowObject(protocol);
if (theWorker != null) theWorker.execute(theMsg);
}
public void run() { public void run() {
while (!this.stopped && !Thread.interrupted()) { while (!this.stopped && !Thread.interrupted()) {
@ -149,12 +158,8 @@ public final class plasmaCrawlLoader extends Thread {
// getting a new message from the crawler queue // getting a new message from the crawler queue
plasmaCrawlLoaderMessage theMsg = this.theQueue.waitForMessage(); plasmaCrawlLoaderMessage theMsg = this.theQueue.waitForMessage();
// TODO: getting the protocol of the next URL // start new crawl job
String protocol = theMsg.url.getProtocol(); this.execute(theMsg);
// getting a new crawler from the crawler pool
CrawlWorker theWorker = (CrawlWorker) this.crawlwerPool.borrowObject(protocol);
if (theWorker != null) theWorker.execute(theMsg);
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.interrupted(); Thread.interrupted();
@ -178,25 +183,80 @@ public final class plasmaCrawlLoader extends Thread {
} }
public void loadParallel( public plasmaHTCache.Entry loadSync(
URL url,
String urlName,
String referer,
String initiator,
int depth,
plasmaCrawlProfile.entry profile,
int timeout
) {
plasmaHTCache.Entry result = null;
if (!this.crawlwerPool.isClosed) {
int crawlingPriority = 5;
// creating a new crawler queue object
plasmaCrawlLoaderMessage theMsg = new plasmaCrawlLoaderMessage(
url,
urlName,
referer,
initiator,
depth,
profile,
crawlingPriority,
true,
timeout
);
try {
// start new crawl job
this.execute(theMsg);
// wait for the crawl job result
result = theMsg.waitForResult();
} catch (Exception e) {
this.log.logSevere("plasmaCrawlLoader.loadSync", e);
}
}
// return the result
return result;
}
public void loadAsync(
URL url, URL url,
String name, String urlName,
String referer, String referer,
String initiator, String initiator,
int depth, int depth,
plasmaCrawlProfile.entry profile) { plasmaCrawlProfile.entry profile
) {
if (!this.crawlwerPool.isClosed) { if (!this.crawlwerPool.isClosed) {
int crawlingPriority = 5; int crawlingPriority = 5;
// creating a new crawler queue object // creating a new crawler queue object
plasmaCrawlLoaderMessage theMsg = new plasmaCrawlLoaderMessage(url, name, referer, initiator, depth, profile, crawlingPriority); plasmaCrawlLoaderMessage theMsg = new plasmaCrawlLoaderMessage(
url, // url
urlName, // url name
referer, // referer URL
initiator, // crawling initiator peer
depth, // crawling depth
profile, // crawling profile
crawlingPriority, // crawling priority
false, // only download documents whose mimetypes are enabled for the crawler
-1 // use default crawler timeout
);
// adding the message to the queue // adding the message to the queue
try { try {
this.theQueue.addMessage(theMsg); this.theQueue.addMessage(theMsg);
} catch (InterruptedException e) { } catch (InterruptedException e) {
this.log.logSevere("plasmaCrawlLoader.loadParallel", e); this.log.logSevere("plasmaCrawlLoader.loadAsync", e);
} }
} }
} }

@ -43,15 +43,22 @@
package de.anomic.plasma; package de.anomic.plasma;
import de.anomic.net.URL; import de.anomic.net.URL;
import de.anomic.server.serverSemaphore;
public final class plasmaCrawlLoaderMessage { public final class plasmaCrawlLoaderMessage {
public final int crawlingPriority; public final int crawlingPriority;
public final URL url; public final URL url;
public final String name; public final String name;
public final String referer; public final String referer;
public final String initiator; public final String initiator;
public final int depth; public final int depth;
public final plasmaCrawlProfile.entry profile; public final plasmaCrawlProfile.entry profile;
public final boolean acceptAllContent;
public final int timeout;
private serverSemaphore resultSync = null;
private plasmaHTCache.Entry result;
// loadParallel(URL url, String referer, String initiator, int depth, plasmaCrawlProfile.entry profile) { // loadParallel(URL url, String referer, String initiator, int depth, plasmaCrawlProfile.entry profile) {
public plasmaCrawlLoaderMessage( public plasmaCrawlLoaderMessage(
@ -61,7 +68,10 @@ public final class plasmaCrawlLoaderMessage {
String initiator, String initiator,
int depth, int depth,
plasmaCrawlProfile.entry profile, plasmaCrawlProfile.entry profile,
int crawlingPriority) { int crawlingPriority,
boolean acceptAllContent,
int timeout
) {
this.url = url; this.url = url;
this.name = name; this.name = name;
this.referer = referer; this.referer = referer;
@ -69,5 +79,32 @@ public final class plasmaCrawlLoaderMessage {
this.depth = depth; this.depth = depth;
this.profile = profile; this.profile = profile;
this.crawlingPriority = crawlingPriority; this.crawlingPriority = crawlingPriority;
this.acceptAllContent = acceptAllContent;
this.timeout = timeout;
this.resultSync = new serverSemaphore(0);
this.result = null;
}
public void setResult(plasmaHTCache.Entry theResult) {
// store the result
this.result = theResult;
// notify blocking result readers
this.resultSync.V();
}
public plasmaHTCache.Entry waitForResult() throws InterruptedException {
plasmaHTCache.Entry theResult = null;
this.resultSync.P();
/* =====> CRITICAL SECTION <======== */
theResult = this.result;
/* =====> CRITICAL SECTION <======== */
this.resultSync.V();
return theResult;
} }
} }

@ -50,7 +50,6 @@ import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.MalformedURLException; import java.net.MalformedURLException;
import de.anomic.net.URL;
import java.util.Date; import java.util.Date;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
@ -66,10 +65,9 @@ import de.anomic.kelondro.kelondroFlexTable;
import de.anomic.kelondro.kelondroIndex; import de.anomic.kelondro.kelondroIndex;
import de.anomic.kelondro.kelondroRow; import de.anomic.kelondro.kelondroRow;
import de.anomic.kelondro.kelondroTree; import de.anomic.kelondro.kelondroTree;
import de.anomic.plasma.plasmaCrawlEURL; import de.anomic.net.URL;
import de.anomic.plasma.urlPattern.plasmaURLPattern; import de.anomic.plasma.urlPattern.plasmaURLPattern;
import de.anomic.server.serverSemaphore; import de.anomic.server.serverSemaphore;
import de.anomic.server.serverThread;
import de.anomic.server.logging.serverLog; import de.anomic.server.logging.serverLog;
import de.anomic.tools.bitfield; import de.anomic.tools.bitfield;
import de.anomic.yacy.yacyCore; import de.anomic.yacy.yacyCore;

@ -419,18 +419,18 @@ public class plasmaSnippetCache {
} }
public plasmaHTCache.Entry loadResourceFromWeb(URL url, int socketTimeout) throws IOException { public plasmaHTCache.Entry loadResourceFromWeb(URL url, int socketTimeout) throws IOException {
return CrawlWorker.load(
url, plasmaHTCache.Entry result = this.sb.cacheLoader.loadSync(
"", url,
null, "",
null, null,
0, null,
null, 0,
socketTimeout, null,
this.sb.remoteProxyConfig, socketTimeout
this.cacheManager, );
true,
this.log); return result;
} }
public void fetch(plasmaSearchResult acc, Set queryhashes, String urlmask, int fetchcount, long maxTime) { public void fetch(plasmaSearchResult acc, Set queryhashes, String urlmask, int fetchcount, long maxTime) {

@ -1874,7 +1874,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
} catch (IOException e) { } catch (IOException e) {
refererURL = null; refererURL = null;
} }
cacheLoader.loadParallel(urlEntry.url(), urlEntry.name(), (refererURL!=null)?refererURL.toString():null, urlEntry.initiator(), urlEntry.depth(), profile); cacheLoader.loadAsync(urlEntry.url(), urlEntry.name(), (refererURL!=null)?refererURL.toString():null, urlEntry.initiator(), urlEntry.depth(), profile);
log.logInfo(stats + ": enqueued for load " + urlEntry.url() + " [" + urlEntry.hash() + "]"); log.logInfo(stats + ": enqueued for load " + urlEntry.url() + " [" + urlEntry.hash() + "]");
return; return;
} }

@ -631,6 +631,7 @@ msgForwardingTo=root@localhost
onlineCautionDelay=30000 onlineCautionDelay=30000
# Some configuration values for the crawler # Some configuration values for the crawler
crawler.acceptEncoding=gzip,deflate
crawler.acceptLanguage=en-us,en;q=0.5 crawler.acceptLanguage=en-us,en;q=0.5
crawler.acceptCharset=ISO-8859-1,utf-8;q=0.7,*;q=0.7 crawler.acceptCharset=ISO-8859-1,utf-8;q=0.7,*;q=0.7
crawler.clientTimeout=9000 crawler.clientTimeout=9000

Loading…
Cancel
Save