From 55d8e686eacb924b20f96588981aa72a206311a6 Mon Sep 17 00:00:00 2001 From: orbiter Date: Tue, 13 Apr 2010 23:29:55 +0000 Subject: [PATCH] performance hacks git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@6807 6c8d7289-2bf4-0310-a012-ef5d649a1542 --- htroot/yacysearch.java | 4 +- source/de/anomic/crawler/CrawlQueues.java | 12 +++--- source/de/anomic/search/ResultFetcher.java | 2 +- source/de/anomic/search/Switchboard.java | 12 +++--- source/de/anomic/server/serverCore.java | 2 +- source/net/yacy/kelondro/blob/Compressor.java | 36 ++++++------------ .../net/yacy/kelondro/util/EventTracker.java | 37 +++++++++---------- .../kelondro/workflow/AbstractBusyThread.java | 2 +- .../kelondro/workflow/InstantBusyThread.java | 3 ++ 9 files changed, 50 insertions(+), 60 deletions(-) diff --git a/htroot/yacysearch.java b/htroot/yacysearch.java index 9c84fbb41..011d492fd 100644 --- a/htroot/yacysearch.java +++ b/htroot/yacysearch.java @@ -480,7 +480,7 @@ public class yacysearch { EventTracker.update("SEARCH", new ProfilingGraph.searchEvent(theQuery.id(true), SearchEvent.INITIALIZATION, 0, 0), false, 30000, ProfilingGraph.maxTime); // tell all threads to do nothing for a specific time - sb.intermissionAllThreads(10000); + sb.intermissionAllThreads(3000); // filter out words that appear in bluelist theQuery.filterOut(Switchboard.blueList); @@ -496,7 +496,7 @@ public class yacysearch { offset = 0; } final SearchEvent theSearch = SearchEventCache.getEvent(theQuery, sb.peers, sb.crawlResults, (sb.isRobinsonMode()) ? sb.clusterhashes : null, false, sb.loader); - try {Thread.sleep(100);} catch (InterruptedException e1) {} // wait a little time to get first results in the search + try {Thread.sleep(global ? 100 : 10);} catch (InterruptedException e1) {} // wait a little time to get first results in the search // generate result object //serverLog.logFine("LOCAL_SEARCH", "SEARCH TIME AFTER ORDERING OF SEARCH RESULTS: " + (System.currentTimeMillis() - timestamp) + " ms"); diff --git a/source/de/anomic/crawler/CrawlQueues.java b/source/de/anomic/crawler/CrawlQueues.java index 1a7c69bf7..bb40d04a8 100644 --- a/source/de/anomic/crawler/CrawlQueues.java +++ b/source/de/anomic/crawler/CrawlQueues.java @@ -192,11 +192,11 @@ public class CrawlQueues { public boolean coreCrawlJob() { - final boolean robinsonPrivateCase = ((sb.isRobinsonMode()) && - (!sb.getConfig(SwitchboardConstants.CLUSTER_MODE, "").equals(SwitchboardConstants.CLUSTER_MODE_PUBLIC_CLUSTER)) && - (!sb.getConfig(SwitchboardConstants.CLUSTER_MODE, "").equals(SwitchboardConstants.CLUSTER_MODE_PRIVATE_CLUSTER))); + final boolean robinsonPrivateCase = (sb.isRobinsonMode() && + !sb.getConfig(SwitchboardConstants.CLUSTER_MODE, "").equals(SwitchboardConstants.CLUSTER_MODE_PUBLIC_CLUSTER) && + !sb.getConfig(SwitchboardConstants.CLUSTER_MODE, "").equals(SwitchboardConstants.CLUSTER_MODE_PRIVATE_CLUSTER)); - if (((robinsonPrivateCase) || (coreCrawlJobSize() <= 20)) && (limitCrawlJobSize() > 0)) { + if ((robinsonPrivateCase || coreCrawlJobSize() <= 20) && limitCrawlJobSize() > 0) { // move some tasks to the core crawl job so we have something to do final int toshift = Math.min(10, limitCrawlJobSize()); // this cannot be a big number because the balancer makes a forced waiting if it cannot balance for (int i = 0; i < toshift; i++) { @@ -209,12 +209,12 @@ public class CrawlQueues { String queueCheck = crawlIsPossible(NoticedURL.STACK_TYPE_CORE); if (queueCheck != null) { - log.logInfo("omitting de-queue/local: " + queueCheck); + if (log.isFine()) log.logFine("omitting de-queue/local: " + queueCheck); return false; } if (isPaused(SwitchboardConstants.CRAWLJOB_LOCAL_CRAWL)) { - log.logInfo("omitting de-queue/local: paused"); + if (log.isFine()) log.logFine("omitting de-queue/local: paused"); return false; } diff --git a/source/de/anomic/search/ResultFetcher.java b/source/de/anomic/search/ResultFetcher.java index 4a794646f..a053a04cf 100644 --- a/source/de/anomic/search/ResultFetcher.java +++ b/source/de/anomic/search/ResultFetcher.java @@ -301,7 +301,7 @@ public class ResultFetcher { // finally wait until enough results are there produced from the // snippet fetch process while ((anyWorkerAlive()) && (result.size() <= item)) { - try {Thread.sleep((item % query.itemsPerPage) * 50L);} catch (final InterruptedException e) {} + try {Thread.sleep((item % query.itemsPerPage) * 10L);} catch (final InterruptedException e) {} } // finally, if there is something, return the result diff --git a/source/de/anomic/search/Switchboard.java b/source/de/anomic/search/Switchboard.java index 317ea4864..c85a3d706 100644 --- a/source/de/anomic/search/Switchboard.java +++ b/source/de/anomic/search/Switchboard.java @@ -563,22 +563,22 @@ public final class Switchboard extends serverSwitch { "storeDocumentIndex", "This is the sequencing step of the indexing queue. Files are written as streams, too much councurrency would destroy IO performance. In this process the words are written to the RWI cache, which flushes if it is full.", new String[]{"RWI/Cache/Collections"}, - this, "storeDocumentIndex", WorkflowProcessor.useCPU + 40, null, indexerThreads); + this, "storeDocumentIndex", 2 * WorkflowProcessor.useCPU, null, indexerThreads); this.indexingAnalysisProcessor = new WorkflowProcessor( "webStructureAnalysis", "This just stores the link structure of the document into a web structure database.", new String[]{"storeDocumentIndex"}, - this, "webStructureAnalysis", WorkflowProcessor.useCPU + 20, indexingStorageProcessor, WorkflowProcessor.useCPU + 1); + this, "webStructureAnalysis", 2 * WorkflowProcessor.useCPU, indexingStorageProcessor, WorkflowProcessor.useCPU + 1); this.indexingCondensementProcessor = new WorkflowProcessor( "condenseDocument", "This does a structural analysis of plain texts: markup of headlines, slicing into phrases (i.e. sentences), markup with position, counting of words, calculation of term frequency.", new String[]{"webStructureAnalysis"}, - this, "condenseDocument", WorkflowProcessor.useCPU + 10, indexingAnalysisProcessor, WorkflowProcessor.useCPU + 1); + this, "condenseDocument", 4 * WorkflowProcessor.useCPU, indexingAnalysisProcessor, WorkflowProcessor.useCPU + 1); this.indexingDocumentProcessor = new WorkflowProcessor( "parseDocument", "This does the parsing of the newly loaded documents from the web. The result is not only a plain text document, but also a list of URLs that are embedded into the document. The urls are handed over to the CrawlStacker. This process has two child process queues!", new String[]{"condenseDocument", "CrawlStacker"}, - this, "parseDocument", 2 * WorkflowProcessor.useCPU + 1, indexingCondensementProcessor, 2 * WorkflowProcessor.useCPU + 1); + this, "parseDocument", 4 * WorkflowProcessor.useCPU, indexingCondensementProcessor, WorkflowProcessor.useCPU + 1); // deploy busy threads log.logConfig("Starting Threads"); @@ -1925,8 +1925,8 @@ public final class Switchboard extends serverSwitch { // 10 < wantedPPM < 1000: custom performance // 1000 <= wantedPPM : maximum performance if (wPPM <= 10) wPPM = 10; - if (wPPM >= 6000) wPPM = 6000; - final int newBusySleep = 60000 / wPPM; // for wantedPPM = 10: 6000; for wantedPPM = 1000: 60 + if (wPPM >= 30000) wPPM = 30000; + final int newBusySleep = 30000 / wPPM; // for wantedPPM = 10: 6000; for wantedPPM = 1000: 60 BusyThread thread; diff --git a/source/de/anomic/server/serverCore.java b/source/de/anomic/server/serverCore.java index 5262e9c6d..551681ca1 100644 --- a/source/de/anomic/server/serverCore.java +++ b/source/de/anomic/server/serverCore.java @@ -445,7 +445,7 @@ public final class serverCore extends AbstractBusyThread implements BusyThread { for (Thread t: threadList) { if (t == null) continue; if (!(t instanceof Session)) { - log.logSevere("serverCore.getJobList - thread is not Session: " + t.getClass().getName()); + //log.logSevere("serverCore.getJobList - thread is not Session: " + t.getClass().getName()); continue; } l.add((Session) t); diff --git a/source/net/yacy/kelondro/blob/Compressor.java b/source/net/yacy/kelondro/blob/Compressor.java index ebc024529..8a7fbe1a0 100644 --- a/source/net/yacy/kelondro/blob/Compressor.java +++ b/source/net/yacy/kelondro/blob/Compressor.java @@ -32,7 +32,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.util.LinkedHashMap; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -52,7 +52,7 @@ public class Compressor implements BLOB { static byte[] plainMagic = {(byte) 'p', (byte) '|'}; // magic for plain content (no encoding) private final BLOB backend; - private LinkedHashMap buffer; // entries which are not yet compressed, format is RAW (without magic) + private HashMap buffer; // entries which are not yet compressed, format is RAW (without magic) private BlockingQueue writeQueue; private long bufferlength; private final long maxbufferlength; @@ -131,22 +131,7 @@ public class Compressor implements BLOB { } private void initBuffer() { - this.buffer = new LinkedHashMap(100, 0.1f, false) { - private static final long serialVersionUID = 1L; - @Override - protected boolean removeEldestEntry(final Map.Entry eldest) { - if (size() > 100) { - try { - Compressor.this.writeQueue.put(new Entity(eldest.getKey(), eldest.getValue())); - } catch (InterruptedException e) { - Log.logException(e); - } - return true; - } else { - return false; - } - } - }; + this.buffer = new HashMap(); this.bufferlength = 0; } @@ -300,16 +285,17 @@ public class Compressor implements BLOB { return 0; } - public synchronized void put(byte[] key, byte[] b) throws IOException { + public void put(byte[] key, byte[] b) throws IOException { // first ensure that the files do not exist anywhere remove(key); // check if the buffer is full or could be full after this write - if (this.bufferlength + b.length * 2 > this.maxbufferlength) { + if (this.bufferlength + b.length * 2 > this.maxbufferlength) synchronized (this) { // in case that we compress, just compress as much as is necessary to get enough room - while (this.bufferlength + b.length * 2 > this.maxbufferlength && !this.buffer.isEmpty()) { - try { + while (this.bufferlength + b.length * 2 > this.maxbufferlength) { + try { + if (this.buffer.isEmpty()) break; flushOne(); } catch (RowSpaceExceededException e) { Log.logException(e); @@ -323,8 +309,10 @@ public class Compressor implements BLOB { // files are written uncompressed to the uncompressed-queue // they are either written uncompressed to the database // or compressed later - this.buffer.put(new String(key), b); - this.bufferlength += b.length; + synchronized (this) { + this.buffer.put(new String(key), b); + this.bufferlength += b.length; + } } public synchronized void remove(byte[] key) throws IOException { diff --git a/source/net/yacy/kelondro/util/EventTracker.java b/source/net/yacy/kelondro/util/EventTracker.java index b075db9bb..50a2b642c 100644 --- a/source/net/yacy/kelondro/util/EventTracker.java +++ b/source/net/yacy/kelondro/util/EventTracker.java @@ -28,13 +28,12 @@ package net.yacy.kelondro.util; import java.util.Iterator; import java.util.Map; -import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; public class EventTracker { - private final static Map> historyMaps = new ConcurrentHashMap>(); + private final static Map> historyMaps = new ConcurrentHashMap>(); private final static Map eventAccess = new ConcurrentHashMap(); // value: last time when this was accessed public final static void update( @@ -58,7 +57,7 @@ public class EventTracker { } // get event history container - Queue history = historyMaps.get(eventName); + ConcurrentLinkedQueue history = historyMaps.get(eventName); // create history if (history == null) { @@ -73,28 +72,28 @@ public class EventTracker { } // update history - synchronized (history) { - - // update entry - history.offer(new Event(eventPayload)); - - // clean up too old entries - int tp = history.size() - maxQueueSize; - while (tp-- > 0) history.poll(); - if (history.size() % 10 == 0) { // reduce number of System.currentTimeMillis() calls - Event e; - final long now = System.currentTimeMillis(); - while (history.size() > 0) { - e = history.peek(); - if (now - e.time < maxQueueAge) break; - history.poll(); + history.offer(new Event(eventPayload)); + + // clean up too old entries + int tp = history.size() - maxQueueSize; + while (tp-- > 0) history.poll(); + if (history.size() % 10 == 0) { // reduce number of System.currentTimeMillis() calls + synchronized (history) { + if (history.size() % 10 == 0) { // check again + Event e; + final long now = System.currentTimeMillis(); + while (history.size() > 0) { + e = history.peek(); + if (now - e.time < maxQueueAge) break; + history.poll(); + } } } } } public final static Iterator getHistory(final String eventName) { - Queue list = historyMaps.get(eventName); + ConcurrentLinkedQueue list = historyMaps.get(eventName); if (list == null) return null; return list.iterator(); } diff --git a/source/net/yacy/kelondro/workflow/AbstractBusyThread.java b/source/net/yacy/kelondro/workflow/AbstractBusyThread.java index 9faec36fb..5fe9d82c4 100644 --- a/source/net/yacy/kelondro/workflow/AbstractBusyThread.java +++ b/source/net/yacy/kelondro/workflow/AbstractBusyThread.java @@ -163,7 +163,7 @@ public abstract class AbstractBusyThread extends AbstractThread implements BusyT if (isBusy) { memstamp1 = MemoryControl.used(); if (memstamp1 >= memstamp0) { - // no GC in between. this is not shure but most probable + // no GC in between. this is not sure but most probable memuse += memstamp1 - memstamp0; } else { // GC was obviously in between. Add an average as simple heuristic diff --git a/source/net/yacy/kelondro/workflow/InstantBusyThread.java b/source/net/yacy/kelondro/workflow/InstantBusyThread.java index 979194b02..44176476e 100644 --- a/source/net/yacy/kelondro/workflow/InstantBusyThread.java +++ b/source/net/yacy/kelondro/workflow/InstantBusyThread.java @@ -125,6 +125,9 @@ public final class InstantBusyThread extends AbstractBusyThread implements BusyT Log.logSevere("BUSYTHREAD", "OutOfMemory Error in serverInstantThread.job, thread '" + this.getName() + "': " + e.getMessage()); Log.logException(e); freemem(); + } catch (final Exception e) { + Log.logSevere("BUSYTHREAD", "Generic Exception, thread '" + this.getName() + "': " + e.getMessage()); + Log.logException(e); } instantThreadCounter--; synchronized(jobs) {jobs.remove(this.handle);}