- removed deprecated threads

- added automatic http client reset. this was necessary because excessive intranet crawling caused deadlocks. this hack solved the problem.

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@5768 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
orbiter 16 years ago
parent 293290c317
commit 9bfb2641db

@ -36,6 +36,7 @@ import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import de.anomic.http.httpClient;
import de.anomic.kelondro.table.FlexWidthArray; import de.anomic.kelondro.table.FlexWidthArray;
import de.anomic.kelondro.text.Document; import de.anomic.kelondro.text.Document;
import de.anomic.kelondro.util.DateFormatter; import de.anomic.kelondro.util.DateFormatter;
@ -572,6 +573,7 @@ public class CrawlQueues {
"denied by robots.txt"); "denied by robots.txt");
eentry.store(); eentry.store();
errorURL.push(eentry); errorURL.push(eentry);
this.entry.setStatus("worker-disallowed", serverProcessorJob.STATUS_FINISHED);
} else { } else {
// starting a load from the internet // starting a load from the internet
this.entry.setStatus("worker-loading", serverProcessorJob.STATUS_RUNNING); this.entry.setStatus("worker-loading", serverProcessorJob.STATUS_RUNNING);
@ -585,6 +587,7 @@ public class CrawlQueues {
"cannot load: " + result); "cannot load: " + result);
eentry.store(); eentry.store();
errorURL.push(eentry); errorURL.push(eentry);
this.entry.setStatus("worker-error", serverProcessorJob.STATUS_FINISHED);
} else { } else {
this.entry.setStatus("worker-processed", serverProcessorJob.STATUS_FINISHED); this.entry.setStatus("worker-processed", serverProcessorJob.STATUS_FINISHED);
} }
@ -599,9 +602,10 @@ public class CrawlQueues {
eentry.store(); eentry.store();
errorURL.push(eentry); errorURL.push(eentry);
e.printStackTrace(); e.printStackTrace();
httpClient.initConnectionManager();
this.entry.setStatus("worker-exception", serverProcessorJob.STATUS_FINISHED);
} finally { } finally {
workers.remove(code); workers.remove(code);
this.entry.setStatus("worker-finalized", serverProcessorJob.STATUS_FINISHED);
} }
} }

@ -289,25 +289,25 @@ public final class CrawlStacker {
//int b = nextQueue.noticeURL.stackSize(NoticedURL.STACK_TYPE_LIMIT); //int b = nextQueue.noticeURL.stackSize(NoticedURL.STACK_TYPE_LIMIT);
nextQueue.noticeURL.push(NoticedURL.STACK_TYPE_LIMIT, entry); nextQueue.noticeURL.push(NoticedURL.STACK_TYPE_LIMIT, entry);
//assert b < nextQueue.noticeURL.stackSize(NoticedURL.STACK_TYPE_LIMIT); //assert b < nextQueue.noticeURL.stackSize(NoticedURL.STACK_TYPE_LIMIT);
this.log.logInfo("stacked/global: " + entry.url().toString() + ", stacksize = " + nextQueue.noticeURL.stackSize(NoticedURL.STACK_TYPE_LIMIT)); //this.log.logInfo("stacked/global: " + entry.url().toString() + ", stacksize = " + nextQueue.noticeURL.stackSize(NoticedURL.STACK_TYPE_LIMIT));
} else if (local) { } else if (local) {
if (proxy) this.log.logWarning("URL '" + entry.url().toString() + "' has conflicting initiator properties: local = true, proxy = true, initiator = " + entry.initiator() + ", profile.handle = " + profile.handle()); if (proxy) this.log.logWarning("URL '" + entry.url().toString() + "' has conflicting initiator properties: local = true, proxy = true, initiator = " + entry.initiator() + ", profile.handle = " + profile.handle());
if (remote) this.log.logWarning("URL '" + entry.url().toString() + "' has conflicting initiator properties: local = true, remote = true, initiator = " + entry.initiator() + ", profile.handle = " + profile.handle()); if (remote) this.log.logWarning("URL '" + entry.url().toString() + "' has conflicting initiator properties: local = true, remote = true, initiator = " + entry.initiator() + ", profile.handle = " + profile.handle());
//int b = nextQueue.noticeURL.stackSize(NoticedURL.STACK_TYPE_CORE); //int b = nextQueue.noticeURL.stackSize(NoticedURL.STACK_TYPE_CORE);
nextQueue.noticeURL.push(NoticedURL.STACK_TYPE_CORE, entry); nextQueue.noticeURL.push(NoticedURL.STACK_TYPE_CORE, entry);
//assert b < nextQueue.noticeURL.stackSize(NoticedURL.STACK_TYPE_CORE); //assert b < nextQueue.noticeURL.stackSize(NoticedURL.STACK_TYPE_CORE);
this.log.logInfo("stacked/local: " + entry.url().toString() + ", stacksize = " + nextQueue.noticeURL.stackSize(NoticedURL.STACK_TYPE_CORE)); //this.log.logInfo("stacked/local: " + entry.url().toString() + ", stacksize = " + nextQueue.noticeURL.stackSize(NoticedURL.STACK_TYPE_CORE));
} else if (proxy) { } else if (proxy) {
if (remote) this.log.logWarning("URL '" + entry.url().toString() + "' has conflicting initiator properties: proxy = true, remote = true, initiator = " + entry.initiator() + ", profile.handle = " + profile.handle()); if (remote) this.log.logWarning("URL '" + entry.url().toString() + "' has conflicting initiator properties: proxy = true, remote = true, initiator = " + entry.initiator() + ", profile.handle = " + profile.handle());
//int b = nextQueue.noticeURL.stackSize(NoticedURL.STACK_TYPE_CORE); //int b = nextQueue.noticeURL.stackSize(NoticedURL.STACK_TYPE_CORE);
nextQueue.noticeURL.push(NoticedURL.STACK_TYPE_CORE, entry); nextQueue.noticeURL.push(NoticedURL.STACK_TYPE_CORE, entry);
//assert b < nextQueue.noticeURL.stackSize(NoticedURL.STACK_TYPE_CORE); //assert b < nextQueue.noticeURL.stackSize(NoticedURL.STACK_TYPE_CORE);
this.log.logInfo("stacked/proxy: " + entry.url().toString() + ", stacksize = " + nextQueue.noticeURL.stackSize(NoticedURL.STACK_TYPE_CORE)); //this.log.logInfo("stacked/proxy: " + entry.url().toString() + ", stacksize = " + nextQueue.noticeURL.stackSize(NoticedURL.STACK_TYPE_CORE));
} else if (remote) { } else if (remote) {
//int b = nextQueue.noticeURL.stackSize(NoticedURL.STACK_TYPE_REMOTE); //int b = nextQueue.noticeURL.stackSize(NoticedURL.STACK_TYPE_REMOTE);
nextQueue.noticeURL.push(NoticedURL.STACK_TYPE_REMOTE, entry); nextQueue.noticeURL.push(NoticedURL.STACK_TYPE_REMOTE, entry);
//assert b < nextQueue.noticeURL.stackSize(NoticedURL.STACK_TYPE_REMOTE); //assert b < nextQueue.noticeURL.stackSize(NoticedURL.STACK_TYPE_REMOTE);
this.log.logInfo("stacked/remote: " + entry.url().toString() + ", stacksize = " + nextQueue.noticeURL.stackSize(NoticedURL.STACK_TYPE_REMOTE)); //this.log.logInfo("stacked/remote: " + entry.url().toString() + ", stacksize = " + nextQueue.noticeURL.stackSize(NoticedURL.STACK_TYPE_REMOTE));
} }
return null; return null;

@ -185,7 +185,7 @@ public class Latency {
} }
public void slowdown() { public void slowdown() {
this.lastacc = System.currentTimeMillis(); this.lastacc = System.currentTimeMillis();
this.timeacc = Math.min(60000, average() * 5); this.timeacc = Math.min(60000, average() * 2);
this.count = 1; this.count = 1;
} }
public int count() { public int count() {

@ -76,8 +76,8 @@ public class httpClient {
* "the HttpClient instance and connection manager should be shared among all threads for maximum efficiency." * "the HttpClient instance and connection manager should be shared among all threads for maximum efficiency."
* (Concurrent execution of HTTP methods, http://hc.apache.org/httpclient-3.x/performance.html) * (Concurrent execution of HTTP methods, http://hc.apache.org/httpclient-3.x/performance.html)
*/ */
private final static MultiThreadedHttpConnectionManager conManager = new MultiThreadedHttpConnectionManager(); private static MultiThreadedHttpConnectionManager conManager = null;
private final static HttpClient apacheHttpClient = new HttpClient(conManager); private static HttpClient apacheHttpClient = null;
// last ; must be before location (this is parsed) // last ; must be before location (this is parsed)
private final static String jakartaUserAgent = " " + private final static String jakartaUserAgent = " " +
@ -87,24 +87,7 @@ public class httpClient {
/** /**
* set options for client * set options for client
*/ */
// simple user agent initConnectionManager();
setUserAgent("yacy (www.yacy.net; " + getSystemOST() + ")");
// only one retry
apacheHttpClient.getParams().setParameter(HttpMethodParams.RETRY_HANDLER,
new DefaultHttpMethodRetryHandler(1, false));
/**
* set options for connection manager
*/
// conManager.getParams().setDefaultMaxConnectionsPerHost(4); // default 2
HostConfiguration localHostConfiguration = new HostConfiguration();
conManager.getParams().setMaxTotalConnections(200); // Proxy may need many connections
conManager.getParams().setConnectionTimeout(60000); // set a default timeout
conManager.getParams().setDefaultMaxConnectionsPerHost(3); // prevent DoS by mistake
localHostConfiguration.setHost("localhost");
conManager.getParams().setMaxConnectionsPerHost(localHostConfiguration, 100);
localHostConfiguration.setHost("127.0.0.1");
conManager.getParams().setMaxConnectionsPerHost(localHostConfiguration, 100);
// TODO should this be configurable?
// accept self-signed or untrusted certificates // accept self-signed or untrusted certificates
Protocol.registerProtocol("https", new Protocol("https", Protocol.registerProtocol("https", new Protocol("https",
@ -125,6 +108,32 @@ public class httpClient {
System.setProperty("sun.net.client.defaultReadTimeout", "60000"); System.setProperty("sun.net.client.defaultReadTimeout", "60000");
} }
public static void initConnectionManager() {
MultiThreadedHttpConnectionManager.shutdownAll();
conManager = new MultiThreadedHttpConnectionManager();
apacheHttpClient = new HttpClient(conManager);
/**
* set options for connection manager
*/
// conManager.getParams().setDefaultMaxConnectionsPerHost(4); // default 2
HostConfiguration localHostConfiguration = new HostConfiguration();
conManager.getParams().setMaxTotalConnections(200); // Proxy may need many connections
conManager.getParams().setConnectionTimeout(60000); // set a default timeout
conManager.getParams().setDefaultMaxConnectionsPerHost(10);
localHostConfiguration.setHost("localhost");
conManager.getParams().setMaxConnectionsPerHost(localHostConfiguration, 100);
localHostConfiguration.setHost("127.0.0.1");
conManager.getParams().setMaxConnectionsPerHost(localHostConfiguration, 100);
// only one retry
apacheHttpClient.getParams().setParameter(HttpMethodParams.RETRY_HANDLER,
new DefaultHttpMethodRetryHandler(1, false));
// simple user agent
setUserAgent("yacy (www.yacy.net; " + getSystemOST() + ")");
}
/** /**
* every x milliseconds do a cleanup (close old connections) * every x milliseconds do a cleanup (close old connections)
* *

@ -284,6 +284,7 @@ public final class IndexCell extends AbstractBufferedIndex implements BufferedIn
// clean-up the cache // clean-up the cache
if (this.lastCleanup + cleanupCycle > System.currentTimeMillis()) return; if (this.lastCleanup + cleanupCycle > System.currentTimeMillis()) return;
//System.out.println("----cleanup check");
this.array.shrink(this.targetFileSize, this.maxFileSize); this.array.shrink(this.targetFileSize, this.maxFileSize);
this.lastCleanup = System.currentTimeMillis(); this.lastCleanup = System.currentTimeMillis();
} }

@ -605,8 +605,6 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<IndexingStack.
deployThread(plasmaSwitchboardConstants.CLEANUP, "Cleanup", "simple cleaning process for monitoring information", null, deployThread(plasmaSwitchboardConstants.CLEANUP, "Cleanup", "simple cleaning process for monitoring information", null,
new serverInstantBusyThread(this, plasmaSwitchboardConstants.CLEANUP_METHOD_START, plasmaSwitchboardConstants.CLEANUP_METHOD_JOBCOUNT, plasmaSwitchboardConstants.CLEANUP_METHOD_FREEMEM), 600000); // all 5 Minutes, wait 10 minutes until first run new serverInstantBusyThread(this, plasmaSwitchboardConstants.CLEANUP_METHOD_START, plasmaSwitchboardConstants.CLEANUP_METHOD_JOBCOUNT, plasmaSwitchboardConstants.CLEANUP_METHOD_FREEMEM), 600000); // all 5 Minutes, wait 10 minutes until first run
deployThread(plasmaSwitchboardConstants.CACHEFLUSH, "Cache Flush", "thread that flushes the index cache", "",
new serverInstantBusyThread(this, plasmaSwitchboardConstants.CACHEFLUSH_METHOD_START, plasmaSwitchboardConstants.CACHEFLUSH_METHOD_JOBCOUNT, plasmaSwitchboardConstants.CACHEFLUSH_METHOD_FREEMEM), 120000); // the cache flush does not need to be started soon, start it late after 2 minutes
deployThread(plasmaSwitchboardConstants.INDEXER, "Indexing", "thread that either initiates a parsing/indexing queue, distributes the index into the DHT, stores parsed documents", "/IndexCreateIndexingQueue_p.html", deployThread(plasmaSwitchboardConstants.INDEXER, "Indexing", "thread that either initiates a parsing/indexing queue, distributes the index into the DHT, stores parsed documents", "/IndexCreateIndexingQueue_p.html",
new serverInstantBusyThread(this, plasmaSwitchboardConstants.INDEXER_METHOD_START, plasmaSwitchboardConstants.INDEXER_METHOD_JOBCOUNT, plasmaSwitchboardConstants.INDEXER_METHOD_FREEMEM), 10000); new serverInstantBusyThread(this, plasmaSwitchboardConstants.INDEXER_METHOD_START, plasmaSwitchboardConstants.INDEXER_METHOD_JOBCOUNT, plasmaSwitchboardConstants.INDEXER_METHOD_FREEMEM), 10000);
deployThread(plasmaSwitchboardConstants.CRAWLJOB_REMOTE_TRIGGERED_CRAWL, "Remote Crawl Job", "thread that performes a single crawl/indexing step triggered by a remote peer", null, deployThread(plasmaSwitchboardConstants.CRAWLJOB_REMOTE_TRIGGERED_CRAWL, "Remote Crawl Job", "thread that performes a single crawl/indexing step triggered by a remote peer", null,
@ -1122,16 +1120,6 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<IndexingStack.
log.logConfig("SWITCHBOARD SHUTDOWN TERMINATED"); log.logConfig("SWITCHBOARD SHUTDOWN TERMINATED");
} }
public int rwiCacheSize() {
return webIndex.index().getBufferSize();
}
public boolean rwiCacheFlush() {
if (rwiCacheSize() == 0) return false;
webIndex.index().cleanupBuffer((int) ((this.getConfigLong(plasmaSwitchboardConstants.CACHEFLUSH_BUSYSLEEP, 10000) * this.getConfigLong("performanceIO", 10)) / 100));
return true;
}
public int queueSize() { public int queueSize() {
return webIndex.queuePreStack.size(); return webIndex.queuePreStack.size();
} }

@ -110,18 +110,6 @@ public final class plasmaSwitchboardConstants {
public static final String CRAWLJOB_REMOTE_TRIGGERED_CRAWL_METHOD_FREEMEM = null; public static final String CRAWLJOB_REMOTE_TRIGGERED_CRAWL_METHOD_FREEMEM = null;
public static final String CRAWLJOB_REMOTE_TRIGGERED_CRAWL_IDLESLEEP = "62_remotetriggeredcrawl_idlesleep"; public static final String CRAWLJOB_REMOTE_TRIGGERED_CRAWL_IDLESLEEP = "62_remotetriggeredcrawl_idlesleep";
public static final String CRAWLJOB_REMOTE_TRIGGERED_CRAWL_BUSYSLEEP = "62_remotetriggeredcrawl_busysleep"; public static final String CRAWLJOB_REMOTE_TRIGGERED_CRAWL_BUSYSLEEP = "62_remotetriggeredcrawl_busysleep";
// 74_parsing
/**
* <p><code>public static final String <strong>INDEXER</strong> = "80_indexing"</code></p>
* <p>Name of the indexer thread, performing the actual indexing of a website</p>
*/
public static final String PARSER = "74_indexing";
public static final String PARSER_MEMPREREQ = "74_indexing_memprereq";
public static final String PARSER_IDLESLEEP = "74_indexing_idlesleep";
public static final String PARSER_BUSYSLEEP = "74_indexing_busysleep";
public static final String PARSER_METHOD_START = "deQueueProcess";
public static final String PARSER_METHOD_JOBCOUNT = "queueSize";
public static final String PARSER_METHOD_FREEMEM = "deQueueFreeMem";
// 80_indexing // 80_indexing
/** /**
* <p><code>public static final String <strong>INDEXER</strong> = "80_indexing"</code></p> * <p><code>public static final String <strong>INDEXER</strong> = "80_indexing"</code></p>
@ -135,18 +123,6 @@ public final class plasmaSwitchboardConstants {
public static final String INDEXER_METHOD_JOBCOUNT = "queueSize"; public static final String INDEXER_METHOD_JOBCOUNT = "queueSize";
public static final String INDEXER_METHOD_FREEMEM = "deQueueFreeMem"; public static final String INDEXER_METHOD_FREEMEM = "deQueueFreeMem";
public static final String INDEXER_SLOTS = "indexer.slots"; public static final String INDEXER_SLOTS = "indexer.slots";
// 85_cacheflush
/**
* the cache flush thread starts a flush of the RAM cache.
* This periodic flushing replaces the permanent flushing
*/
public static final String CACHEFLUSH = "85_cacheflush";
public static final String CACHEFLUSH_MEMPREREQ = "85_cacheflush_memprereq";
public static final String CACHEFLUSH_IDLESLEEP = "85_cacheflush_idlesleep";
public static final String CACHEFLUSH_BUSYSLEEP = "85_cacheflush_busysleep";
public static final String CACHEFLUSH_METHOD_START = "rwiCacheFlush";
public static final String CACHEFLUSH_METHOD_JOBCOUNT = "rwiCacheSize";
public static final String CACHEFLUSH_METHOD_FREEMEM = "deQueueFreeMem";
// 90_cleanup // 90_cleanup
/** /**
* <p><code>public static final String <strong>CLEANUP</strong> = "90_cleanup"</code></p> * <p><code>public static final String <strong>CLEANUP</strong> = "90_cleanup"</code></p>

Loading…
Cancel
Save