From c6880ce28b805fecfc09f63ac7b35c0d9686c3d3 Mon Sep 17 00:00:00 2001 From: orbiter Date: Tue, 6 Jan 2009 13:51:59 +0000 Subject: [PATCH] removed the permanent cache flush and replaced it with a periodic cache flush The cache is now flushed only for one second every ten seconds. During a crawl the cache fills up completely, and is only flushed if space is needed for more documents. git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@5446 6c8d7289-2bf4-0310-a012-ef5d649a1542 --- build.properties | 2 +- defaults/performance_dht.profile | 3 + defaults/yacy.init | 12 +++- htroot/Threaddump_p.java | 2 +- source/de/anomic/crawler/ProtocolLoader.java | 20 +++--- .../anomic/http/JakartaCommonsHttpClient.java | 2 +- source/de/anomic/index/indexCollectionRI.java | 1 + source/de/anomic/index/indexRAMRI.java | 5 +- .../anomic/kelondro/kelondroBase64Order.java | 5 ++ .../de/anomic/kelondro/kelondroByteOrder.java | 4 +- .../kelondro/kelondroRowCollection.java | 2 +- .../de/anomic/plasma/plasmaSwitchboard.java | 21 ++++--- .../plasma/plasmaSwitchboardConstants.java | 12 ++++ source/de/anomic/plasma/plasmaWordIndex.java | 62 +++++++++---------- 14 files changed, 94 insertions(+), 59 deletions(-) diff --git a/build.properties b/build.properties index ca66a9402..ff261ed84 100644 --- a/build.properties +++ b/build.properties @@ -3,7 +3,7 @@ javacSource=1.5 javacTarget=1.5 # Release Configuration -releaseVersion=0.617 +releaseVersion=0.618 stdReleaseFile=yacy_v${releaseVersion}_${DSTAMP}_${releaseNr}.tar.gz embReleaseFile=yacy_emb_v${releaseVersion}_${DSTAMP}_${releaseNr}.tar.gz proReleaseFile=yacy_pro_v${releaseVersion}_${DSTAMP}_${releaseNr}.tar.gz diff --git a/defaults/performance_dht.profile b/defaults/performance_dht.profile index 322283f9d..8a4cafbaa 100644 --- a/defaults/performance_dht.profile +++ b/defaults/performance_dht.profile @@ -28,6 +28,9 @@ 80_indexing_idlesleep=1000 80_indexing_busysleep=100 80_indexing_memprereq=6291456 +85_cacheflush_idlesleep=120000 +85_cacheflush_busysleep=60000 +85_cacheflush_memprereq=0 82_crawlstack_idlesleep=5000 82_crawlstack_busysleep=1 82_crawlstack_memprereq=1048576 diff --git a/defaults/yacy.init b/defaults/yacy.init index 4c646ed58..ad1121cea 100644 --- a/defaults/yacy.init +++ b/defaults/yacy.init @@ -542,8 +542,6 @@ filterOutStopwordsFromTopwords=true # the prereq-value is a memory pre-requisite: that much bytes must # be available/free in the heap; othervise the loop is not executed # and another idlesleep is performed -performanceProfile=defaults/yacy.init -performanceSpeed=100 20_dhtdistribution_idlesleep=30000 20_dhtdistribution_busysleep=10000 20_dhtdistribution_memprereq=6291456 @@ -568,10 +566,20 @@ performanceSpeed=100 80_indexing_idlesleep=1000 80_indexing_busysleep=10 80_indexing_memprereq=6291456 +85_cacheflush_idlesleep=60000 +85_cacheflush_busysleep=10000 +85_cacheflush_memprereq=0 90_cleanup_idlesleep=300000 90_cleanup_busysleep=300000 90_cleanup_memprereq=0 +# additional attributes: +# performanceIO is a percent-value. a value of 10 means, that 10% of the busysleep time +# is used to flush the RAM cache, which is the major part of the IO in YaCy +performanceProfile=defaults/yacy.init +performanceSpeed=100 +performanceIO=10 + # cleanup-process: # properties for tasks that are performed during cleanup cleanup.deletionProcessedNews = true diff --git a/htroot/Threaddump_p.java b/htroot/Threaddump_p.java index 2dc5e224a..f3c284276 100644 --- a/htroot/Threaddump_p.java +++ b/htroot/Threaddump_p.java @@ -194,7 +194,7 @@ public class Threaddump_p { String threadtitle = tracename + "Thread= " + thread.getName() + " " + (thread.isDaemon()?"daemon":"") + " id=" + thread.getId() + " " + thread.getState().toString(); for (int i = 0; i < stackTraceElements.length; i++) { ste = stackTraceElements[i]; - if (ste.getClassName().startsWith("java.") || ste.getClassName().startsWith("sun.")) continue; + //if (ste.getClassName().startsWith("java.") || ste.getClassName().startsWith("sun.")) continue; if (i == 0) { line = getLine(getClassFile(classPath, ste.getClassName()), ste.getLineNumber()); } else { diff --git a/source/de/anomic/crawler/ProtocolLoader.java b/source/de/anomic/crawler/ProtocolLoader.java index 2dd643d87..7a13650fc 100644 --- a/source/de/anomic/crawler/ProtocolLoader.java +++ b/source/de/anomic/crawler/ProtocolLoader.java @@ -87,13 +87,7 @@ public final class ProtocolLoader { if (wait > 0) { // force a sleep here. Instead just sleep we clean up the accessTime map final long untilTime = System.currentTimeMillis() + wait; - final Iterator> i = accessTime.entrySet().iterator(); - Map.Entry e; - while (i.hasNext()) { - e = i.next(); - if (System.currentTimeMillis() > untilTime) break; - if (System.currentTimeMillis() - e.getValue().longValue() > minDelay) i.remove(); - } + cleanupAccessTimeTable(untilTime); if (System.currentTimeMillis() < untilTime) try {Thread.sleep(untilTime - System.currentTimeMillis());} catch (final InterruptedException ee) {} } @@ -107,6 +101,16 @@ public final class ProtocolLoader { throw new IOException("Unsupported protocol '" + protocol + "' in url " + entry.url()); } + public synchronized void cleanupAccessTimeTable(long timeout) { + final Iterator> i = accessTime.entrySet().iterator(); + Map.Entry e; + while (i.hasNext()) { + e = i.next(); + if (System.currentTimeMillis() > timeout) break; + if (System.currentTimeMillis() - e.getValue().longValue() > minDelay) i.remove(); + } + } + public String process(final CrawlEntry entry, final String parserMode) { // load a resource, store it to htcache and push queue entry to switchboard queue // returns null if everything went fine, a fail reason string if a problem occurred @@ -121,7 +125,7 @@ public final class ProtocolLoader { return (stored) ? null : "not stored"; } catch (IOException e) { entry.setStatus("error", serverProcessorJob.STATUS_FINISHED); - log.logWarning("problem loading " + entry.url().toString()); + log.logWarning("problem loading " + entry.url().toString() + ": " + e.getMessage()); return "load error - " + e.getMessage(); } } diff --git a/source/de/anomic/http/JakartaCommonsHttpClient.java b/source/de/anomic/http/JakartaCommonsHttpClient.java index 3d385f2cd..9b38d46af 100644 --- a/source/de/anomic/http/JakartaCommonsHttpClient.java +++ b/source/de/anomic/http/JakartaCommonsHttpClient.java @@ -95,7 +95,7 @@ public class JakartaCommonsHttpClient { // conManager.getParams().setDefaultMaxConnectionsPerHost(4); // default 2 conManager.getParams().setMaxTotalConnections(200); // Proxy may need many connections conManager.getParams().setConnectionTimeout(60000); // set a default timeout - conManager.getParams().setDefaultMaxConnectionsPerHost(20); // prevent DoS by mistake + conManager.getParams().setDefaultMaxConnectionsPerHost(10); // prevent DoS by mistake // TODO should this be configurable? // accept self-signed or untrusted certificates diff --git a/source/de/anomic/index/indexCollectionRI.java b/source/de/anomic/index/indexCollectionRI.java index c938115a0..7afd93bb8 100644 --- a/source/de/anomic/index/indexCollectionRI.java +++ b/source/de/anomic/index/indexCollectionRI.java @@ -159,6 +159,7 @@ public class indexCollectionRI implements indexRI { } public void addEntries(final indexContainer newEntries) { + if (newEntries == null) return; try { collectionIndex.merge(newEntries); } catch (final kelondroOutOfLimitsException e) { diff --git a/source/de/anomic/index/indexRAMRI.java b/source/de/anomic/index/indexRAMRI.java index a9bbf9910..2e4631b51 100644 --- a/source/de/anomic/index/indexRAMRI.java +++ b/source/de/anomic/index/indexRAMRI.java @@ -169,12 +169,12 @@ public final class indexRAMRI implements indexRI, indexRIReader { return null; } - private String bestFlushWordHash() { + public String bestFlushWordHash() { // select appropriate hash // we have 2 different methods to find a good hash: // - the oldest entry in the cache // - the entry with maximum count - if (heap.size() == 0) return null; + if (heap == null || heap.size() == 0) return null; try { //return hashScore.getMaxObject(); String hash = null; @@ -265,6 +265,7 @@ public final class indexRAMRI implements indexRI, indexRIReader { public synchronized indexContainer deleteContainer(final String wordHash) { // returns the index that had been deleted + if (wordHash == null) return null; final indexContainer container = heap.delete(wordHash); hashScore.deleteScore(wordHash); hashDate.deleteScore(wordHash); diff --git a/source/de/anomic/kelondro/kelondroBase64Order.java b/source/de/anomic/kelondro/kelondroBase64Order.java index 3343ed809..81245ba5b 100644 --- a/source/de/anomic/kelondro/kelondroBase64Order.java +++ b/source/de/anomic/kelondro/kelondroBase64Order.java @@ -319,6 +319,11 @@ public class kelondroBase64Order extends kelondroAbstractOrder implement bc = b[boffset + i]; if ((ac == 0) && (bc == 0)) return 0; // zero-terminated length assert (bc >= 0) && (bc < 128) : "bc = " + bc + ", b = " + serverLog.arrayList(b, boffset, al); + if (ac == bc) { + // shortcut in case of equality: we don't need to lookup the ahpla value + i++; + continue; + } acc = ahpla[ac]; assert (acc >= 0) : "acc = " + acc + ", a = " + serverLog.arrayList(a, aoffset, al) + "/" + new String(a, aoffset, al) + ", aoffset = 0x" + Integer.toHexString(aoffset) + ", i = " + i + "\n" + serverLog.table(a, 16, aoffset); bcc = ahpla[bc]; diff --git a/source/de/anomic/kelondro/kelondroByteOrder.java b/source/de/anomic/kelondro/kelondroByteOrder.java index 41632f540..5906e8936 100644 --- a/source/de/anomic/kelondro/kelondroByteOrder.java +++ b/source/de/anomic/kelondro/kelondroByteOrder.java @@ -35,7 +35,7 @@ public interface kelondroByteOrder extends kelondroOrder { public int compare(byte[] a, int astart, int alen, byte[] b, int bstart, int blen); - public static class StringOrder implements Comparator { + public final static class StringOrder implements Comparator { public kelondroByteOrder baseOrder; public StringOrder(final kelondroByteOrder base) { @@ -46,7 +46,7 @@ public interface kelondroByteOrder extends kelondroOrder { this.baseOrder = (kelondroByteOrder) base; } - public int compare(final String s1, final String s2) { + public final int compare(final String s1, final String s2) { return baseOrder.compare(s1.getBytes(), s2.getBytes()); } diff --git a/source/de/anomic/kelondro/kelondroRowCollection.java b/source/de/anomic/kelondro/kelondroRowCollection.java index e85dc4933..ced82bc58 100644 --- a/source/de/anomic/kelondro/kelondroRowCollection.java +++ b/source/de/anomic/kelondro/kelondroRowCollection.java @@ -161,7 +161,7 @@ public class kelondroRowCollection implements Iterable { public synchronized byte[] exportCollection() { // returns null if the collection is empty trim(false); - assert this.size() * this.rowdef.objectsize == this.chunkcache.length; + assert this.size() * this.rowdef.objectsize == this.chunkcache.length : "this.size() = " + this.size() + ", objectsize = " + this.rowdef.objectsize + ", chunkcache.length = " + this.chunkcache.length; final kelondroRow row = exportRow(chunkcache.length); final kelondroRow.Entry entry = row.newEntry(); assert (sortBound <= chunkcount) : "sortBound = " + sortBound + ", chunkcount = " + chunkcount; diff --git a/source/de/anomic/plasma/plasmaSwitchboard.java b/source/de/anomic/plasma/plasmaSwitchboard.java index 0b006f3b2..57a4f3376 100644 --- a/source/de/anomic/plasma/plasmaSwitchboard.java +++ b/source/de/anomic/plasma/plasmaSwitchboard.java @@ -594,7 +594,9 @@ public final class plasmaSwitchboard extends serverAbstractSwitch 0; // permanent flushing only if we are not busy - } // possibly delete entries from last chunk if ((this.dhtTransferChunk != null) && (this.dhtTransferChunk.getStatus() == plasmaDHTChunk.chunkStatus_COMPLETE)) { diff --git a/source/de/anomic/plasma/plasmaSwitchboardConstants.java b/source/de/anomic/plasma/plasmaSwitchboardConstants.java index 165ea6a69..909f36ca9 100644 --- a/source/de/anomic/plasma/plasmaSwitchboardConstants.java +++ b/source/de/anomic/plasma/plasmaSwitchboardConstants.java @@ -135,6 +135,18 @@ public final class plasmaSwitchboardConstants { public static final String INDEXER_METHOD_JOBCOUNT = "queueSize"; public static final String INDEXER_METHOD_FREEMEM = "deQueueFreeMem"; public static final String INDEXER_SLOTS = "indexer.slots"; + // 85_cacheflush + /** + * the cache flush thread starts a flush of the RAM cache. + * This periodic flushing replaces the permanent flushing + */ + public static final String CACHEFLUSH = "85_cacheflush"; + public static final String CACHEFLUSH_MEMPREREQ = "85_cacheflush_memprereq"; + public static final String CACHEFLUSH_IDLESLEEP = "85_cacheflush_idlesleep"; + public static final String CACHEFLUSH_BUSYSLEEP = "85_cacheflush_busysleep"; + public static final String CACHEFLUSH_METHOD_START = "rwiCacheFlush"; + public static final String CACHEFLUSH_METHOD_JOBCOUNT = "rwiCacheSize"; + public static final String CACHEFLUSH_METHOD_FREEMEM = "deQueueFreeMem"; // 90_cleanup /** *

public static final String CLEANUP = "90_cleanup"

diff --git a/source/de/anomic/plasma/plasmaWordIndex.java b/source/de/anomic/plasma/plasmaWordIndex.java index a890d5294..efb1ded01 100644 --- a/source/de/anomic/plasma/plasmaWordIndex.java +++ b/source/de/anomic/plasma/plasmaWordIndex.java @@ -471,13 +471,14 @@ public final class plasmaWordIndex implements indexRI { serverProfiling.update("wordcache", Long.valueOf(cs)); // To ensure termination an additional counter is used int l = 0; - while ((l++ < 100) && (theCache.maxURLinCache() > wCacheMaxChunk)) { - flushCache(theCache, Math.min(20, theCache.size())); + while (theCache.size() > 0 && (l++ < 100) && (theCache.maxURLinCache() > wCacheMaxChunk)) { + flushCacheOne(theCache); } // next flush more entries if the size exceeds the maximum size of the cache - if ((theCache.size() > theCache.getMaxWordCount()) || - (serverMemory.available() < collections.minMem())) { - flushCache(theCache, Math.min(theCache.size() - theCache.getMaxWordCount() + 1, theCache.size())); + while (theCache.size() > 0 && + ((theCache.size() > theCache.getMaxWordCount()) || + (serverMemory.available() < collections.minMem()))) { + flushCacheOne(theCache); } if (cacheSize() != cs) serverProfiling.update("wordcache", Long.valueOf(cacheSize())); } @@ -520,40 +521,33 @@ public final class plasmaWordIndex implements indexRI { dhtFlushControl(this.dhtOutCache); } } - - public int flushCacheSome() { - final int fo = flushCache(dhtOutCache, Math.max(1, dhtOutCache.size() / lowcachedivisor)); - final int fi = flushCache(dhtInCache, Math.max(1, dhtInCache.size() / lowcachedivisor)); - return fo + fi; + + public void flushCacheFor(int time) { + flushCacheUntil(System.currentTimeMillis() + time); } - private int flushCache(final indexRAMRI ram, int count) { - if (count <= 0) return 0; - + private synchronized void flushCacheUntil(long timeout) { + while (System.currentTimeMillis() < timeout && + (dhtOutCache.size() > 0 || dhtInCache.size() > 0)) { + flushCacheOne(dhtOutCache); + flushCacheOne(dhtInCache); + } + } + + private synchronized void flushCacheOne(final indexRAMRI ram) { + if (ram.size() > 0) collections.addEntries(flushContainer(ram)); + } + + private indexContainer flushContainer(final indexRAMRI ram) { String wordHash; - final ArrayList containerList = new ArrayList(); - count = Math.min(5000, Math.min(count, ram.size())); - boolean collectMax = true; indexContainer c; - while (collectMax) { - synchronized (ram) { - wordHash = ram.maxScoreWordHash(); - c = ram.getContainer(wordHash, null); - if ((c != null) && (c.size() > wCacheMaxChunk)) { - containerList.add(ram.deleteContainer(wordHash)); - if (serverMemory.available() < collections.minMem()) break; // protect memory during flush - } else { - collectMax = false; - } - } + wordHash = ram.maxScoreWordHash(); + c = ram.getContainer(wordHash, null); + if ((c != null) && (c.size() > wCacheMaxChunk)) { + return ram.deleteContainer(wordHash); + } else { + return ram.deleteContainer(ram.bestFlushWordHash()); } - count = count - containerList.size(); - containerList.addAll(ram.bestFlushContainers(count)); - - // flush the containers - for (final indexContainer container : containerList) collections.addEntries(container); - //System.out.println("DEBUG-Finished flush of " + count + " entries from RAM to DB in " + (System.currentTimeMillis() - start) + " milliseconds"); - return containerList.size(); }