From 959eefbc4f9454df5137402a878ee8a4272c581f Mon Sep 17 00:00:00 2001 From: theli Date: Sun, 9 Oct 2005 04:43:07 +0000 Subject: [PATCH] *) Robots.txt parser/ppt cutting of comments at the line end *) Adding Threadpool for stackCrawl Thread to speedup robots.txt download and double url checks git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@882 6c8d7289-2bf4-0310-a012-ef5d649a1542 --- source/de/anomic/data/robotsParser.java | 16 +- source/de/anomic/http/httpc.java | 101 +++--- source/de/anomic/kelondro/kelondroFileRA.java | 3 +- .../anomic/plasma/plasmaCrawlRobotsTxt.java | 3 +- source/de/anomic/plasma/plasmaSearch.java | 2 +- .../anomic/plasma/plasmaStackCrawlThread.java | 333 ++++++++++++++++-- source/de/anomic/plasma/plasmaWordIndex.java | 4 +- .../anomic/plasma/plasmaWordIndexCache.java | 18 +- .../plasma/plasmaWordIndexDistribution.java | 2 +- yacy.init | 2 +- 10 files changed, 375 insertions(+), 109 deletions(-) diff --git a/source/de/anomic/data/robotsParser.java b/source/de/anomic/data/robotsParser.java index cf9c867c7..3d6b11802 100644 --- a/source/de/anomic/data/robotsParser.java +++ b/source/de/anomic/data/robotsParser.java @@ -65,15 +65,8 @@ import de.anomic.server.logging.serverLog; /* * A class for Parsing robots.txt files. * It only parses the Deny Part, yet. - * TODO: Allow, Do not deny if User-Agent: != yacy - * + * * * http://www.robotstxt.org/wc/norobots-rfc.html - * - * Use: - * robotsParser rp=new Robotsparser(robotsfile); - * if(rp.isAllowedRobots("/test")){ - * System.out.println("/test is allowed"); - * } */ public final class robotsParser{ @@ -115,6 +108,13 @@ public final class robotsParser{ } else if (line.startsWith("#")) { // we can ignore this. Just a comment line } else if ((!rule4Yacy) && (line.startsWith("User-agent:"))) { + // cutting off comments at the line end + pos = line.indexOf("#"); + if (pos != -1) { + line = line.substring(0,pos); + } + + // getting out the robots name pos = line.indexOf(" "); if (pos != -1) { String userAgent = line.substring(pos).trim(); diff --git a/source/de/anomic/http/httpc.java b/source/de/anomic/http/httpc.java index 0a6bc2cc4..93129eaf5 100644 --- a/source/de/anomic/http/httpc.java +++ b/source/de/anomic/http/httpc.java @@ -109,7 +109,7 @@ public final class httpc { private static final SimpleDateFormat EMLFormatter = new SimpleDateFormat("dd MMM yyyy HH:mm:ss", Locale.US); private static final SimpleDateFormat ShortFormatter = new SimpleDateFormat("yyyyMMddHHmmss"); //Mo 06 Sep 2004 23:32 - private static final HashMap reverseMappingCache = new HashMap(); + static final HashMap reverseMappingCache = new HashMap(); // the dns cache private static final HashMap nameCacheHit = new HashMap(); @@ -124,7 +124,7 @@ public final class httpc { // class variables private Socket socket = null; // client socket for commands private Thread socketOwner = null; - private String host = null; + String host = null; private long timeout; private long handle; @@ -134,7 +134,7 @@ public final class httpc { private boolean remoteProxyUse = false; private String savedRemoteHost = null; - private String requestPath = null; + String requestPath = null; private boolean allowContentEncoding = true; static boolean useYacyReferer = true; public static boolean yacyDebugMode = false; @@ -294,7 +294,7 @@ public final class httpc { */ public boolean isClosed() { if (this.socket == null) return true; - else return (!this.socket.isConnected()) || (this.socket.isClosed()); + return (!this.socket.isConnected()) || (this.socket.isClosed()); } /** @@ -312,9 +312,8 @@ public final class httpc { if ((ip != null) && (!(ip.equals("127.0.0.1"))) && (!(ip.equals("localhost")))) { nameCacheHit.put(host, ip); return ip; - } else { - return null; } + return null; } catch (UnknownHostException e) { //nameCacheMiss.add(host); } @@ -335,9 +334,8 @@ public final class httpc { if ((ip != null) && (!(ip.equals("127.0.0.1"))) && (!(ip.equals("localhost")))) { nameCacheHit.put(host, ip); return true; - } else { - return false; } + return false; } catch (UnknownHostException e) { //nameCacheMiss.add(host); return false; @@ -411,7 +409,7 @@ public final class httpc { * @throws IOException */ void init(String server, int port, int timeout, boolean ssl) throws IOException { - handle = System.currentTimeMillis(); + this.handle = System.currentTimeMillis(); //serverLog.logDebug("HTTPC", handle + " initialized"); this.remoteProxyUse = false; this.timeout = timeout; @@ -483,7 +481,7 @@ public final class httpc { } if (this.socket != null) { try {this.socket.close();} catch (Exception e) {} - this.unregisterOpenSocket(this.socket,this.socketOwner); + httpc.unregisterOpenSocket(this.socket,this.socketOwner); this.socket = null; this.socketOwner = null; } @@ -540,7 +538,7 @@ public final class httpc { if ((path == null) || (path.length() == 0)) path = "/"; // for debuggug: - requestPath = path; + this.requestPath = path; // prepare header if (header == null) header = new httpHeader(); @@ -564,7 +562,7 @@ public final class httpc { // the host is mandatory, if we use HTTP/1.1 if (!(header.containsKey(httpHeader.HOST))) { if (this.remoteProxyUse) - header.put(httpHeader.HOST, savedRemoteHost); + header.put(httpHeader.HOST, this.savedRemoteHost); else header.put(httpHeader.HOST, this.host); } @@ -608,7 +606,7 @@ public final class httpc { // send request if ((this.remoteProxyUse) && (!(method.equals(httpHeader.METHOD_CONNECT)))) path = "http://" + this.savedRemoteHost + path; - serverCore.send(clientOutput, method + " " + path + " HTTP/1.0"); // if set to HTTP/1.1, servers give time-outs? + serverCore.send(this.clientOutput, method + " " + path + " HTTP/1.0"); // if set to HTTP/1.1, servers give time-outs? // send header //System.out.println("***HEADER for path " + path + ": PROXY TO SERVER = " + header.toString()); // DEBUG @@ -695,18 +693,18 @@ public final class httpc { len = Integer.parseInt(cl); // transfer len bytes from ins to the server while ((len > 0) && ((c = ins.read(buffer)) >= 0)) { - clientOutput.write(buffer, 0, c); + this.clientOutput.write(buffer, 0, c); len -= c; } } else { len = 0; while ((c = ins.read(buffer)) >= 0) { - clientOutput.write(buffer, 0, c); + this.clientOutput.write(buffer, 0, c); len += c; } requestHeader.put(httpHeader.CONTENT_LENGTH, Integer.toString(len)); } - clientOutput.flush(); + this.clientOutput.flush(); return new response(false); } catch (SocketException e) { throw new IOException(e.getMessage()); @@ -827,7 +825,7 @@ public final class httpc { send(httpHeader.METHOD_POST, path, requestHeader, false); // send the body //System.out.println("body=" + buf.toString()); - serverCore.send(clientOutput, body); + serverCore.send(this.clientOutput, body); return new response(false); } @@ -907,9 +905,8 @@ do upload httpc.response res = con.GET(path, null); if (res.status.startsWith("2")) { return res.writeContent(); - } else { - return res.status.getBytes(); } + return res.status.getBytes(); } catch (Exception e) { throw new IOException(e.getMessage()); } finally { @@ -961,9 +958,8 @@ do upload //System.out.println("response=" + res.toString()); if (res.status.startsWith("2")) { return res.writeContent(); - } else { - return res.status.getBytes(); } + return res.status.getBytes(); } catch (Exception e) { throw new IOException(e.getMessage()); } finally { @@ -1035,10 +1031,9 @@ do upload if (res.status.startsWith("2")) { // success return res.responseHeader; - } else { - // fail - return res.responseHeader; } + // fail + return res.responseHeader; } catch (Exception e) { throw new IOException(e.getMessage()); } finally { @@ -1251,28 +1246,28 @@ do upload public response(boolean zipped) throws IOException { // lets start with worst-case attributes as set-up - responseHeader = new httpHeader(reverseMappingCache); - statusCode = 503; - statusText = "internal httpc error"; - status = Integer.toString(statusCode) + " " + statusText; - gzip = false; + this.responseHeader = new httpHeader(reverseMappingCache); + this.statusCode = 503; + this.statusText = "internal httpc error"; + this.status = Integer.toString(this.statusCode) + " " + this.statusText; + this.gzip = false; // check connection status - if (clientInput == null) { + if (httpc.this.clientInput == null) { // the server has meanwhile disconnected - statusCode = 503; - statusText = "lost connection to server"; - status = Integer.toString(statusCode) + " " + statusText; + this.statusCode = 503; + this.statusText = "lost connection to server"; + this.status = Integer.toString(this.statusCode) + " " + this.statusText; return; // in bad mood } // reads in the http header, right now, right here - byte[] b = serverCore.receive(clientInput, readLineBuffer, terminalMaxLength, false); + byte[] b = serverCore.receive(httpc.this.clientInput, httpc.this.readLineBuffer, terminalMaxLength, false); if (b == null) { // the server has meanwhile disconnected - statusCode = 503; - statusText = "server has closed connection"; - status = Integer.toString(statusCode) + " " + statusText; + this.statusCode = 503; + this.statusText = "server has closed connection"; + this.status = Integer.toString(this.statusCode) + " " + this.statusText; return; // in bad mood } @@ -1286,7 +1281,7 @@ do upload if ((this.statusCode==500)&&(this.statusText.equals("status line parse error"))) { // flush in anything that comes without parsing - while ((b != null) && (b.length != 0)) b = serverCore.receive(clientInput, readLineBuffer, terminalMaxLength, false); + while ((b != null) && (b.length != 0)) b = serverCore.receive(httpc.this.clientInput, httpc.this.readLineBuffer, terminalMaxLength, false); return; // in bad mood } @@ -1294,13 +1289,13 @@ do upload if (this.statusCode == 400) { // bad request // flush in anything that comes without parsing - while ((b = serverCore.receive(clientInput, readLineBuffer, terminalMaxLength, false)).length != 0) {} + while ((b = serverCore.receive(httpc.this.clientInput, httpc.this.readLineBuffer, terminalMaxLength, false)).length != 0) {} return; // in bad mood } // at this point we should have a valid response. read in the header properties String key = ""; - while ((b = serverCore.receive(clientInput, readLineBuffer, terminalMaxLength, false)) != null) { + while ((b = serverCore.receive(httpc.this.clientInput, httpc.this.readLineBuffer, terminalMaxLength, false)) != null) { if (b.length == 0) break; buffer = new String(b); //System.out.println("#H#" + buffer); // debug @@ -1308,15 +1303,15 @@ do upload // use old entry if (key.length() == 0) throw new IOException("header corrupted - input error"); // attach new line - if (!(responseHeader.containsKey(key))) throw new IOException("header corrupted - internal error"); - responseHeader.put(key, (String) responseHeader.get(key) + " " + buffer.trim()); + if (!(this.responseHeader.containsKey(key))) throw new IOException("header corrupted - internal error"); + this.responseHeader.put(key, (String) this.responseHeader.get(key) + " " + buffer.trim()); } else { // create new entry int p = buffer.indexOf(":"); if (p > 0) { - responseHeader.add(buffer.substring(0, p).trim(), buffer.substring(p + 1).trim()); + this.responseHeader.add(buffer.substring(0, p).trim(), buffer.substring(p + 1).trim()); } else { - serverLog.logSevere("HTTPC", "RESPONSE PARSE ERROR: HOST='" + host + "', PATH='" + requestPath + "', STATUS='" + status + "'"); + serverLog.logSevere("HTTPC", "RESPONSE PARSE ERROR: HOST='" + httpc.this.host + "', PATH='" + httpc.this.requestPath + "', STATUS='" + this.status + "'"); serverLog.logSevere("HTTPC", "..............BUFFER: " + buffer); } } @@ -1325,11 +1320,11 @@ do upload // we will now manipulate the header if the content is gzip encoded, because // reading the content with "writeContent" will gunzip on-the-fly - gzip = ((zipped) && (responseHeader.gzip())); + this.gzip = ((zipped) && (this.responseHeader.gzip())); - if (gzip) { - responseHeader.remove("CONTENT-ENCODING"); // we fake that we don't have encoding, since what comes out does not have gzip and we also don't know what was encoded - responseHeader.remove("CONTENT-LENGTH"); // we cannot use the length during gunzippig yet; still we can hope that it works + if (this.gzip) { + this.responseHeader.remove(httpHeader.CONTENT_ENCODING); // we fake that we don't have encoding, since what comes out does not have gzip and we also don't know what was encoded + this.responseHeader.remove(httpHeader.CONTENT_LENGTH); // we cannot use the length during gunzippig yet; still we can hope that it works } } @@ -1353,7 +1348,7 @@ do upload * @return True, if the request was successfull. */ public boolean success() { - return ((status.charAt(0) == '2') || (status.charAt(0) == '3')); + return ((this.status.charAt(0) == '2') || (this.status.charAt(0) == '3')); } /** @@ -1366,7 +1361,7 @@ do upload public byte[] writeContent() throws IOException { int contentLength = (int) this.responseHeader.contentLength(); serverByteBuffer sbb = new serverByteBuffer((contentLength==-1)?8192:contentLength); - writeContentX(null, sbb, clientInput); + writeContentX(null, sbb, httpc.this.clientInput); return sbb.getBytes(); } @@ -1381,7 +1376,7 @@ do upload public byte[] writeContent(OutputStream procOS) throws IOException { int contentLength = (int) this.responseHeader.contentLength(); serverByteBuffer sbb = new serverByteBuffer((contentLength==-1)?8192:contentLength); - writeContentX(procOS, sbb, clientInput); + writeContentX(procOS, sbb, httpc.this.clientInput); return sbb.getBytes(); } @@ -1399,7 +1394,7 @@ do upload FileOutputStream bufferOS = null; try { if (file != null) bufferOS = new FileOutputStream(file); - writeContentX(procOS, bufferOS, clientInput); + writeContentX(procOS, bufferOS, httpc.this.clientInput); } finally { if (bufferOS != null) { bufferOS.close(); @@ -1476,7 +1471,7 @@ do upload * status of this instance. */ public void print() { - serverLog.logInfo("HTTPC", "RESPONSE: status=" + status + ", header=" + responseHeader.toString()); + serverLog.logInfo("HTTPC", "RESPONSE: status=" + this.status + ", header=" + this.responseHeader.toString()); } } diff --git a/source/de/anomic/kelondro/kelondroFileRA.java b/source/de/anomic/kelondro/kelondroFileRA.java index 9e8c07d90..7dd098a32 100644 --- a/source/de/anomic/kelondro/kelondroFileRA.java +++ b/source/de/anomic/kelondro/kelondroFileRA.java @@ -46,11 +46,10 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.RandomAccessFile; import java.io.FileDescriptor; -import java.io.SyncFailedException; import java.util.Map; import java.util.Properties; -public class kelondroFileRA extends kelondroAbstractRA implements kelondroRA { +public final class kelondroFileRA extends kelondroAbstractRA implements kelondroRA { protected RandomAccessFile RAFile; protected FileDescriptor RADescriptor; diff --git a/source/de/anomic/plasma/plasmaCrawlRobotsTxt.java b/source/de/anomic/plasma/plasmaCrawlRobotsTxt.java index 8dc17894d..86ab710a7 100644 --- a/source/de/anomic/plasma/plasmaCrawlRobotsTxt.java +++ b/source/de/anomic/plasma/plasmaCrawlRobotsTxt.java @@ -57,12 +57,11 @@ import java.util.Map; import de.anomic.kelondro.kelondroDyn; import de.anomic.kelondro.kelondroMap; import de.anomic.kelondro.kelondroException; -import de.anomic.kelondro.kelondroRecords; import de.anomic.server.logging.serverLog; public class plasmaCrawlRobotsTxt { private kelondroMap robotsTable; - private File robotsTableFile; + private final File robotsTableFile; private int bufferkb; public plasmaCrawlRobotsTxt(File robotsTableFile, int bufferkb) throws IOException { diff --git a/source/de/anomic/plasma/plasmaSearch.java b/source/de/anomic/plasma/plasmaSearch.java index cb231e6da..50070a07a 100644 --- a/source/de/anomic/plasma/plasmaSearch.java +++ b/source/de/anomic/plasma/plasmaSearch.java @@ -112,7 +112,7 @@ public final class plasmaSearch { wordHash = plasmaWordIndexEntry.word2hash(word); entry = new plasmaWordIndexEntry(urlHash, count, p++, 0, 0, age, quality, language, doctype, true); - wordIndex.addEntries(plasmaWordIndexEntryContainer.instantContainer(wordHash, System.currentTimeMillis(), entry)); + this.wordIndex.addEntries(plasmaWordIndexEntryContainer.instantContainer(wordHash, System.currentTimeMillis(), entry)); } //System.out.println("DEBUG: plasmaSearch.addPageIndex: added " + condenser.getWords().size() + " words, flushed " + c + " entries"); return condenser.getWords().size(); diff --git a/source/de/anomic/plasma/plasmaStackCrawlThread.java b/source/de/anomic/plasma/plasmaStackCrawlThread.java index 49995aed7..3f957d344 100644 --- a/source/de/anomic/plasma/plasmaStackCrawlThread.java +++ b/source/de/anomic/plasma/plasmaStackCrawlThread.java @@ -6,12 +6,14 @@ import java.net.InetAddress; import java.net.MalformedURLException; import java.net.URL; import java.net.UnknownHostException; -import java.util.ArrayList; import java.util.Date; import java.util.Iterator; import java.util.LinkedList; +import org.apache.commons.pool.impl.GenericObjectPool; + import de.anomic.data.robotsParser; +import de.anomic.http.httpc; import de.anomic.kelondro.kelondroTree; import de.anomic.kelondro.kelondroRecords.Node; import de.anomic.server.serverCodings; @@ -22,8 +24,10 @@ import de.anomic.yacy.yacyCore; public final class plasmaStackCrawlThread extends Thread { - private final serverLog log = new serverLog("STACKCRAWL"); - private final plasmaSwitchboard sb; + final WorkerPool theWorkerPool; + final ThreadGroup theWorkerThreadGroup = new ThreadGroup("stackCrawlThreadGroup"); + final serverLog log = new serverLog("STACKCRAWL"); + final plasmaSwitchboard sb; private boolean stopped = false; private stackCrawlQueue queue; @@ -34,6 +38,9 @@ public final class plasmaStackCrawlThread extends Thread { this.queue = new stackCrawlQueue(dbPath,dbCacheSize); this.log.logInfo(this.queue.size() + " entries in the stackCrawl queue."); this.log.logInfo("STACKCRAWL thread initialized."); + + this.theWorkerPool = new WorkerPool(new WorkterFactory(this.theWorkerThreadGroup)); + } public void stopIt() { @@ -58,22 +65,11 @@ public final class plasmaStackCrawlThread extends Thread { // getting a new message from the crawler queue stackCrawlMessage theMsg = this.queue.waitForMessage(); - // process message - String rejectReason = stackCrawlDequeue(theMsg); - - if (rejectReason != null) { - this.sb.urlPool.errorURL.newEntry( - new URL(theMsg.url()), - theMsg.referrerHash(), - theMsg.initiatorHash(), - yacyCore.seedDB.mySeed.hash, - theMsg.name, - rejectReason, - new bitfield(plasmaURL.urlFlagLength), - false - ); - } + // getting a free session thread from the pool + Worker worker = (Worker) this.theWorkerPool.borrowObject(); + // processing the new request + worker.execute(theMsg); } catch (InterruptedException e) { Thread.interrupted(); this.stopped = true; @@ -83,6 +79,14 @@ public final class plasmaStackCrawlThread extends Thread { } } + + try { + this.log.logFine("Shutdown. Terminationg worker threads."); + this.theWorkerPool.close(); + } catch (Exception e1) { + this.log.logSevere("Unable to shutdown all remaining stackCrawl threads", e1); + } + try { this.log.logFine("Shutdown. Closing stackCrawl queue."); this.queue.close(); @@ -141,6 +145,7 @@ public final class plasmaStackCrawlThread extends Thread { // stacks a crawl item. The position can also be remote // returns null if successful, a reason string if not successful + long startTime = System.currentTimeMillis(); String reason = null; // failure reason // strange errors @@ -163,7 +168,8 @@ public final class plasmaStackCrawlThread extends Thread { nexturl = new URL(nexturlString); } catch (MalformedURLException e) { reason = "denied_(url_'" + nexturlString + "'_wrong)"; - this.log.logSevere("Wrong URL in stackCrawl: " + nexturlString); + this.log.logSevere("Wrong URL in stackCrawl: " + nexturlString + + ". Stack processing time: " + (System.currentTimeMillis()-startTime)); return reason; } @@ -172,16 +178,19 @@ public final class plasmaStackCrawlThread extends Thread { InetAddress hostAddress = InetAddress.getByName(nexturl.getHost()); if (hostAddress.isSiteLocalAddress()) { reason = "denied_(private_ip_address)"; - this.log.logFine("Host in URL '" + nexturlString + "' has private ip address."); + this.log.logFine("Host in URL '" + nexturlString + "' has private ip address." + + "Stack processing time: " + (System.currentTimeMillis()-startTime)); return reason; } else if (hostAddress.isLoopbackAddress()) { reason = "denied_(loopback_ip_address)"; - this.log.logFine("Host in URL '" + nexturlString + "' has loopback ip address."); + this.log.logFine("Host in URL '" + nexturlString + "' has loopback ip address." + + "Stack processing time: " + (System.currentTimeMillis()-startTime)); return reason; } } catch (UnknownHostException e) { reason = "denied_(unknown_host)"; - this.log.logFine("Unknown host in URL '" + nexturlString + "'."); + this.log.logFine("Unknown host in URL '" + nexturlString + "'." + + "Stack processing time: " + (System.currentTimeMillis()-startTime)); return reason; } @@ -189,7 +198,8 @@ public final class plasmaStackCrawlThread extends Thread { String hostlow = nexturl.getHost().toLowerCase(); if (plasmaSwitchboard.urlBlacklist.isListed(hostlow, nexturl.getPath())) { reason = "denied_(url_in_blacklist)"; - this.log.logFine("URL '" + nexturlString + "' is in blacklist."); + this.log.logFine("URL '" + nexturlString + "' is in blacklist." + + "Stack processing time: " + (System.currentTimeMillis()-startTime)); return reason; } @@ -199,7 +209,8 @@ public final class plasmaStackCrawlThread extends Thread { /* urlPool.errorURL.newEntry(nexturl, referrerHash, initiatorHash, yacyCore.seedDB.mySeed.hash, name, reason, new bitfield(plasmaURL.urlFlagLength), false);*/ - this.log.logFine("URL '" + nexturlString + "' does not match crawling filter '" + profile.generalFilter() + "'."); + this.log.logFine("URL '" + nexturlString + "' does not match crawling filter '" + profile.generalFilter() + "'." + + "Stack processing time: " + (System.currentTimeMillis()-startTime)); return reason; } @@ -209,7 +220,8 @@ public final class plasmaStackCrawlThread extends Thread { /* urlPool.errorURL.newEntry(nexturl, referrerHash, initiatorHash, yacyCore.seedDB.mySeed.hash, name, reason, new bitfield(plasmaURL.urlFlagLength), false);*/ - this.log.logFine("URL '" + nexturlString + "' is cgi URL."); + this.log.logFine("URL '" + nexturlString + "' is cgi URL." + + "Stack processing time: " + (System.currentTimeMillis()-startTime)); return reason; } @@ -219,7 +231,8 @@ public final class plasmaStackCrawlThread extends Thread { /* urlPool.errorURL.newEntry(nexturl, referrerHash, initiatorHash, yacyCore.seedDB.mySeed.hash, name, reason, new bitfield(plasmaURL.urlFlagLength), false);*/ - this.log.logFine("URL '" + nexturlString + "' is post URL."); + this.log.logFine("URL '" + nexturlString + "' is post URL." + + "Stack processing time: " + (System.currentTimeMillis()-startTime)); return reason; } @@ -231,7 +244,8 @@ public final class plasmaStackCrawlThread extends Thread { /* urlPool.errorURL.newEntry(nexturl, referrerHash, initiatorHash, yacyCore.seedDB.mySeed.hash, name, reason, new bitfield(plasmaURL.urlFlagLength), false);*/ - this.log.logFine("URL '" + nexturlString + "' is double registered in '" + dbocc + "'."); + this.log.logFine("URL '" + nexturlString + "' is double registered in '" + dbocc + "'." + + "Stack processing time: " + (System.currentTimeMillis()-startTime)); return reason; } @@ -241,7 +255,8 @@ public final class plasmaStackCrawlThread extends Thread { /* urlPool.errorURL.newEntry(nexturl, referrerHash, initiatorHash, yacyCore.seedDB.mySeed.hash, name, reason, new bitfield(plasmaURL.urlFlagLength), false);*/ - this.log.logFine("Crawling of URL '" + nexturlString + "' disallowed by robots.txt."); + this.log.logFine("Crawling of URL '" + nexturlString + "' disallowed by robots.txt." + + "Stack processing time: " + (System.currentTimeMillis()-startTime)); return reason; } @@ -256,7 +271,7 @@ public final class plasmaStackCrawlThread extends Thread { (yacyCore.seedDB.mySeed.isPrincipal())) /* qualified */; if ((!local)&&(!global)) { - this.log.logFine("URL '" + nexturlString + "' can neither be crawled local nor global."); + this.log.logSevere("URL '" + nexturlString + "' can neither be crawled local nor global."); } this.sb.urlPool.noticeURL.newEntry(initiatorHash, /* initiator, needed for p2p-feedback */ @@ -507,7 +522,7 @@ public final class plasmaStackCrawlThread extends Thread { byte[][] entryBytes = null; stackCrawlMessage newMessage = null; synchronized(this.urlEntryHashCache) { - urlHash = (String) this.urlEntryHashCache.removeLast(); + urlHash = (String) this.urlEntryHashCache.removeFirst(); entryBytes = this.urlEntryCache.remove(urlHash.getBytes()); } @@ -518,4 +533,262 @@ public final class plasmaStackCrawlThread extends Thread { } } + public final class WorkterFactory implements org.apache.commons.pool.PoolableObjectFactory { + + final ThreadGroup workerThreadGroup; + public WorkterFactory(ThreadGroup theWorkerThreadGroup) { + super(); + + if (theWorkerThreadGroup == null) + throw new IllegalArgumentException("The threadgroup object must not be null."); + + this.workerThreadGroup = theWorkerThreadGroup; +} + + public Object makeObject() { + Worker newWorker = new Worker(this.workerThreadGroup); + newWorker.setPriority(Thread.MAX_PRIORITY); + return newWorker; + } + + /** + * @see org.apache.commons.pool.PoolableObjectFactory#destroyObject(java.lang.Object) + */ + public void destroyObject(Object obj) { + if (obj instanceof Worker) { + Worker theWorker = (Worker) obj; + theWorker.setStopped(true); + } + } + + /** + * @see org.apache.commons.pool.PoolableObjectFactory#validateObject(java.lang.Object) + */ + public boolean validateObject(Object obj) { + if (obj instanceof Worker) + { + Worker theWorker = (Worker) obj; + if (!theWorker.isAlive() || theWorker.isInterrupted()) return false; + if (theWorker.isRunning()) return true; + return false; + } + return true; + } + + /** + * @param obj + * + */ + public void activateObject(Object obj) { + //log.debug(" activateObject..."); + } + + /** + * @param obj + * + */ + public void passivateObject(Object obj) { + //log.debug(" passivateObject..." + obj); +// if (obj instanceof Session) { +// Session theSession = (Session) obj; +// } + } + } + + public final class WorkerPool extends GenericObjectPool { + public boolean isClosed = false; + + /** + * First constructor. + * @param objFactory + */ + public WorkerPool(WorkterFactory objFactory) { + super(objFactory); + this.setMaxIdle(10); // Maximum idle threads. + this.setMaxActive(50); // Maximum active threads. + this.setMinEvictableIdleTimeMillis(30000); //Evictor runs every 30 secs. + //this.setMaxWait(1000); // Wait 1 second till a thread is available + } + + public WorkerPool(plasmaStackCrawlThread.WorkterFactory objFactory, + GenericObjectPool.Config config) { + super(objFactory, config); + } + + public Object borrowObject() throws Exception { + return super.borrowObject(); + } + + public void returnObject(Object obj) throws Exception { + super.returnObject(obj); + } + + public synchronized void close() throws Exception { + + /* + * shutdown all still running session threads ... + */ + this.isClosed = true; + + /* waiting for all threads to finish */ + int threadCount = plasmaStackCrawlThread.this.theWorkerThreadGroup.activeCount(); + Thread[] threadList = new Thread[threadCount]; + threadCount = plasmaStackCrawlThread.this.theWorkerThreadGroup.enumerate(threadList); + + try { + // trying to gracefull stop all still running sessions ... + plasmaStackCrawlThread.this.log.logInfo("Signaling shutdown to " + threadCount + " remaining stackCrawl threads ..."); + for ( int currentThreadIdx = 0; currentThreadIdx < threadCount; currentThreadIdx++ ) { + Thread currentThread = threadList[currentThreadIdx]; + if (currentThread.isAlive()) { + ((Worker)currentThread).setStopped(true); + } + } + + // waiting a frew ms for the session objects to continue processing + try { Thread.sleep(500); } catch (InterruptedException ex) {} + + // interrupting all still running or pooled threads ... + plasmaStackCrawlThread.this.log.logInfo("Sending interruption signal to " + plasmaStackCrawlThread.this.theWorkerThreadGroup.activeCount() + " remaining stackCrawl threads ..."); + plasmaStackCrawlThread.this.theWorkerThreadGroup.interrupt(); + + // if there are some sessions that are blocking in IO, we simply close the socket + plasmaStackCrawlThread.this.log.logFine("Trying to abort " + plasmaStackCrawlThread.this.theWorkerThreadGroup.activeCount() + " remaining stackCrawl threads ..."); + for ( int currentThreadIdx = 0; currentThreadIdx < threadCount; currentThreadIdx++ ) { + Thread currentThread = threadList[currentThreadIdx]; + if (currentThread.isAlive()) { + plasmaStackCrawlThread.this.log.logInfo("Trying to shutdown stackCrawl thread '" + currentThread.getName() + "' [" + currentThreadIdx + "]."); + ((Worker)currentThread).close(); + } + } + + // we need to use a timeout here because of missing interruptable session threads ... + plasmaStackCrawlThread.this.log.logFine("Waiting for " + plasmaStackCrawlThread.this.theWorkerThreadGroup.activeCount() + " remaining stackCrawl threads to finish shutdown ..."); + for ( int currentThreadIdx = 0; currentThreadIdx < threadCount; currentThreadIdx++ ) { + Thread currentThread = threadList[currentThreadIdx]; + if (currentThread.isAlive()) { + plasmaStackCrawlThread.this.log.logFine("Waiting for stackCrawl thread '" + currentThread.getName() + "' [" + currentThreadIdx + "] to finish shutdown."); + try { currentThread.join(500); } catch (InterruptedException ex) {} + } + } + + plasmaStackCrawlThread.this.log.logInfo("Shutdown of remaining stackCrawl threads finish."); + } catch (Exception e) { + plasmaStackCrawlThread.this.log.logSevere("Unexpected error while trying to shutdown all remaining stackCrawl threads.",e); + } + + super.close(); + } + + } + + public final class Worker extends Thread { + private boolean running = false; + private boolean stopped = false; + private boolean done = false; + private stackCrawlMessage theMsg; + + public Worker(ThreadGroup theThreadGroup) { + super(theThreadGroup,"stackCrawlThread"); + } + + public void setStopped(boolean stopped) { + this.stopped = stopped; + } + + public void close() { + if (this.isAlive()) { + try { + // trying to close all still open httpc-Sockets first + int closedSockets = httpc.closeOpenSockets(this); + if (closedSockets > 0) { + plasmaStackCrawlThread.this.log.logInfo(closedSockets + " HTTP-client sockets of thread '" + this.getName() + "' closed."); + } + } catch (Exception e) {} + } + } + + public synchronized void execute(stackCrawlMessage newMsg) { + this.theMsg = newMsg; + this.done = false; + + if (!this.running) { + // this.setDaemon(true); + this.start(); + } else { + this.notifyAll(); + } + } + + public void reset() { + this.done = true; + this.theMsg = null; + } + + public boolean isRunning() { + return this.running; + } + + public void run() { + this.running = true; + + // The thread keeps running. + while (!this.stopped && !Thread.interrupted()) { + if (this.done) { + // We are waiting for a task now. + synchronized (this) { + try { + this.wait(); //Wait until we get a request to process. + } catch (InterruptedException e) { + this.stopped = true; + // log.error("", e); + } + } + } else { + //There is a task....let us execute it. + try { + execute(); + } catch (Exception e) { + // log.error("", e); + } finally { + reset(); + + if (!this.stopped && !this.isInterrupted() && !plasmaStackCrawlThread.this.theWorkerPool.isClosed) { + try { + this.setName("stackCrawlThread_inPool"); + plasmaStackCrawlThread.this.theWorkerPool.returnObject(this); + } catch (Exception e1) { + // e1.printStackTrace(); + this.stopped = true; + } + } + } + } + } + } + + private void execute() throws InterruptedException { + try { + String rejectReason = stackCrawlDequeue(this.theMsg); + + if (rejectReason != null) { + plasmaStackCrawlThread.this.sb.urlPool.errorURL.newEntry( + new URL(this.theMsg.url()), + this.theMsg.referrerHash(), + this.theMsg.initiatorHash(), + yacyCore.seedDB.mySeed.hash, + this.theMsg.name, + rejectReason, + new bitfield(plasmaURL.urlFlagLength), + false + ); + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + this.done = true; + } + + } + } } diff --git a/source/de/anomic/plasma/plasmaWordIndex.java b/source/de/anomic/plasma/plasmaWordIndex.java index ad53dd971..6884f22de 100644 --- a/source/de/anomic/plasma/plasmaWordIndex.java +++ b/source/de/anomic/plasma/plasmaWordIndex.java @@ -60,8 +60,8 @@ import de.anomic.yacy.yacySeedDB; public final class plasmaWordIndex { - final File databaseRoot; - final plasmaWordIndexCache ramCache; + private final File databaseRoot; + private final plasmaWordIndexCache ramCache; public plasmaWordIndex(File databaseRoot, int bufferkb, serverLog log) throws IOException { this.databaseRoot = databaseRoot; diff --git a/source/de/anomic/plasma/plasmaWordIndexCache.java b/source/de/anomic/plasma/plasmaWordIndexCache.java index d14eb487c..86482df7f 100644 --- a/source/de/anomic/plasma/plasmaWordIndexCache.java +++ b/source/de/anomic/plasma/plasmaWordIndexCache.java @@ -466,7 +466,7 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface { public synchronized int addEntries(plasmaWordIndexEntryContainer container, long updateTime) { // this puts the entries into the cache, not into the assortment directly - + // check cache space if (cache.size() > 0) try { // pause to get space in the cache (while it is flushed) @@ -480,14 +480,14 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface { // stop flushing now for one moment flushThread.pause(); - //serverLog.logDebug("PLASMA INDEXING", "addEntryToIndexMem: cache.size=" + cache.size() + "; hashScore.size=" + hashScore.size()); - - // put new words into cache + //serverLog.logDebug("PLASMA INDEXING", "addEntryToIndexMem: cache.size=" + cache.size() + "; hashScore.size=" + hashScore.size()); + + // put new words into cache int added = 0; - synchronized (cache) { - String wordHash = container.wordHash(); - plasmaWordIndexEntryContainer entries = (plasmaWordIndexEntryContainer) cache.get(wordHash); // null pointer exception? wordhash != null! must be cache==null - if (entries == null) entries = new plasmaWordIndexEntryContainer(wordHash); + String wordHash = container.wordHash(); + synchronized (cache) { + plasmaWordIndexEntryContainer entries = (plasmaWordIndexEntryContainer) cache.get(wordHash); // null pointer exception? wordhash != null! must be cache==null + if (entries == null) entries = new plasmaWordIndexEntryContainer(wordHash); added = entries.add(container); if (added > 0) { cache.put(wordHash, entries); @@ -495,7 +495,7 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface { hashDate.setScore(wordHash, intTime(updateTime)); } entries = null; - } + } //System.out.println("DEBUG: cache = " + cache.toString()); flushThread.proceed(); return added; diff --git a/source/de/anomic/plasma/plasmaWordIndexDistribution.java b/source/de/anomic/plasma/plasmaWordIndexDistribution.java index 0b7d33403..48e74fd69 100644 --- a/source/de/anomic/plasma/plasmaWordIndexDistribution.java +++ b/source/de/anomic/plasma/plasmaWordIndexDistribution.java @@ -792,7 +792,7 @@ public final class plasmaWordIndexDistribution { } else if (selectionTime < transferTime){ this.chunkSize +=100; //chunkSize+=50; - } else if (transferTime >= selectionTime){ + } else if (selectionTime >= selectionTime){ if (chunkSize>200) chunkSize-=100; } diff --git a/yacy.init b/yacy.init index 6344519b9..dca7b79df 100644 --- a/yacy.init +++ b/yacy.init @@ -475,7 +475,7 @@ ramCacheRobots = 2097152 ramCacheProfiles = 8192 # ram cache for stack crawl thread db -ramCachePreNURL = 8192 +ramCachePreNURL = 4194304 # default memory settings for startup of yacy # is only valid in unix/shell environments and