removed the permanent cache flush and replaced it with a periodic cache flush

The cache is now flushed only for one second every ten seconds. During a crawl the cache
fills up completely, and is only flushed if space is needed for more documents.

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@5446 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
orbiter 16 years ago
parent ef7fe537c5
commit c6880ce28b

@ -3,7 +3,7 @@ javacSource=1.5
javacTarget=1.5
# Release Configuration
releaseVersion=0.617
releaseVersion=0.618
stdReleaseFile=yacy_v${releaseVersion}_${DSTAMP}_${releaseNr}.tar.gz
embReleaseFile=yacy_emb_v${releaseVersion}_${DSTAMP}_${releaseNr}.tar.gz
proReleaseFile=yacy_pro_v${releaseVersion}_${DSTAMP}_${releaseNr}.tar.gz

@ -28,6 +28,9 @@
80_indexing_idlesleep=1000
80_indexing_busysleep=100
80_indexing_memprereq=6291456
85_cacheflush_idlesleep=120000
85_cacheflush_busysleep=60000
85_cacheflush_memprereq=0
82_crawlstack_idlesleep=5000
82_crawlstack_busysleep=1
82_crawlstack_memprereq=1048576

@ -542,8 +542,6 @@ filterOutStopwordsFromTopwords=true
# the prereq-value is a memory pre-requisite: that much bytes must
# be available/free in the heap; othervise the loop is not executed
# and another idlesleep is performed
performanceProfile=defaults/yacy.init
performanceSpeed=100
20_dhtdistribution_idlesleep=30000
20_dhtdistribution_busysleep=10000
20_dhtdistribution_memprereq=6291456
@ -568,10 +566,20 @@ performanceSpeed=100
80_indexing_idlesleep=1000
80_indexing_busysleep=10
80_indexing_memprereq=6291456
85_cacheflush_idlesleep=60000
85_cacheflush_busysleep=10000
85_cacheflush_memprereq=0
90_cleanup_idlesleep=300000
90_cleanup_busysleep=300000
90_cleanup_memprereq=0
# additional attributes:
# performanceIO is a percent-value. a value of 10 means, that 10% of the busysleep time
# is used to flush the RAM cache, which is the major part of the IO in YaCy
performanceProfile=defaults/yacy.init
performanceSpeed=100
performanceIO=10
# cleanup-process:
# properties for tasks that are performed during cleanup
cleanup.deletionProcessedNews = true

@ -194,7 +194,7 @@ public class Threaddump_p {
String threadtitle = tracename + "Thread= " + thread.getName() + " " + (thread.isDaemon()?"daemon":"") + " id=" + thread.getId() + " " + thread.getState().toString();
for (int i = 0; i < stackTraceElements.length; i++) {
ste = stackTraceElements[i];
if (ste.getClassName().startsWith("java.") || ste.getClassName().startsWith("sun.")) continue;
//if (ste.getClassName().startsWith("java.") || ste.getClassName().startsWith("sun.")) continue;
if (i == 0) {
line = getLine(getClassFile(classPath, ste.getClassName()), ste.getLineNumber());
} else {

@ -87,13 +87,7 @@ public final class ProtocolLoader {
if (wait > 0) {
// force a sleep here. Instead just sleep we clean up the accessTime map
final long untilTime = System.currentTimeMillis() + wait;
final Iterator<Map.Entry<String, Long>> i = accessTime.entrySet().iterator();
Map.Entry<String, Long> e;
while (i.hasNext()) {
e = i.next();
if (System.currentTimeMillis() > untilTime) break;
if (System.currentTimeMillis() - e.getValue().longValue() > minDelay) i.remove();
}
cleanupAccessTimeTable(untilTime);
if (System.currentTimeMillis() < untilTime)
try {Thread.sleep(untilTime - System.currentTimeMillis());} catch (final InterruptedException ee) {}
}
@ -107,6 +101,16 @@ public final class ProtocolLoader {
throw new IOException("Unsupported protocol '" + protocol + "' in url " + entry.url());
}
public synchronized void cleanupAccessTimeTable(long timeout) {
final Iterator<Map.Entry<String, Long>> i = accessTime.entrySet().iterator();
Map.Entry<String, Long> e;
while (i.hasNext()) {
e = i.next();
if (System.currentTimeMillis() > timeout) break;
if (System.currentTimeMillis() - e.getValue().longValue() > minDelay) i.remove();
}
}
public String process(final CrawlEntry entry, final String parserMode) {
// load a resource, store it to htcache and push queue entry to switchboard queue
// returns null if everything went fine, a fail reason string if a problem occurred
@ -121,7 +125,7 @@ public final class ProtocolLoader {
return (stored) ? null : "not stored";
} catch (IOException e) {
entry.setStatus("error", serverProcessorJob.STATUS_FINISHED);
log.logWarning("problem loading " + entry.url().toString());
log.logWarning("problem loading " + entry.url().toString() + ": " + e.getMessage());
return "load error - " + e.getMessage();
}
}

@ -95,7 +95,7 @@ public class JakartaCommonsHttpClient {
// conManager.getParams().setDefaultMaxConnectionsPerHost(4); // default 2
conManager.getParams().setMaxTotalConnections(200); // Proxy may need many connections
conManager.getParams().setConnectionTimeout(60000); // set a default timeout
conManager.getParams().setDefaultMaxConnectionsPerHost(20); // prevent DoS by mistake
conManager.getParams().setDefaultMaxConnectionsPerHost(10); // prevent DoS by mistake
// TODO should this be configurable?
// accept self-signed or untrusted certificates

@ -159,6 +159,7 @@ public class indexCollectionRI implements indexRI {
}
public void addEntries(final indexContainer newEntries) {
if (newEntries == null) return;
try {
collectionIndex.merge(newEntries);
} catch (final kelondroOutOfLimitsException e) {

@ -169,12 +169,12 @@ public final class indexRAMRI implements indexRI, indexRIReader {
return null;
}
private String bestFlushWordHash() {
public String bestFlushWordHash() {
// select appropriate hash
// we have 2 different methods to find a good hash:
// - the oldest entry in the cache
// - the entry with maximum count
if (heap.size() == 0) return null;
if (heap == null || heap.size() == 0) return null;
try {
//return hashScore.getMaxObject();
String hash = null;
@ -265,6 +265,7 @@ public final class indexRAMRI implements indexRI, indexRIReader {
public synchronized indexContainer deleteContainer(final String wordHash) {
// returns the index that had been deleted
if (wordHash == null) return null;
final indexContainer container = heap.delete(wordHash);
hashScore.deleteScore(wordHash);
hashDate.deleteScore(wordHash);

@ -319,6 +319,11 @@ public class kelondroBase64Order extends kelondroAbstractOrder<byte[]> implement
bc = b[boffset + i];
if ((ac == 0) && (bc == 0)) return 0; // zero-terminated length
assert (bc >= 0) && (bc < 128) : "bc = " + bc + ", b = " + serverLog.arrayList(b, boffset, al);
if (ac == bc) {
// shortcut in case of equality: we don't need to lookup the ahpla value
i++;
continue;
}
acc = ahpla[ac];
assert (acc >= 0) : "acc = " + acc + ", a = " + serverLog.arrayList(a, aoffset, al) + "/" + new String(a, aoffset, al) + ", aoffset = 0x" + Integer.toHexString(aoffset) + ", i = " + i + "\n" + serverLog.table(a, 16, aoffset);
bcc = ahpla[bc];

@ -35,7 +35,7 @@ public interface kelondroByteOrder extends kelondroOrder<byte[]> {
public int compare(byte[] a, int astart, int alen, byte[] b, int bstart, int blen);
public static class StringOrder implements Comparator<String> {
public final static class StringOrder implements Comparator<String> {
public kelondroByteOrder baseOrder;
public StringOrder(final kelondroByteOrder base) {
@ -46,7 +46,7 @@ public interface kelondroByteOrder extends kelondroOrder<byte[]> {
this.baseOrder = (kelondroByteOrder) base;
}
public int compare(final String s1, final String s2) {
public final int compare(final String s1, final String s2) {
return baseOrder.compare(s1.getBytes(), s2.getBytes());
}

@ -161,7 +161,7 @@ public class kelondroRowCollection implements Iterable<kelondroRow.Entry> {
public synchronized byte[] exportCollection() {
// returns null if the collection is empty
trim(false);
assert this.size() * this.rowdef.objectsize == this.chunkcache.length;
assert this.size() * this.rowdef.objectsize == this.chunkcache.length : "this.size() = " + this.size() + ", objectsize = " + this.rowdef.objectsize + ", chunkcache.length = " + this.chunkcache.length;
final kelondroRow row = exportRow(chunkcache.length);
final kelondroRow.Entry entry = row.newEntry();
assert (sortBound <= chunkcount) : "sortBound = " + sortBound + ", chunkcount = " + chunkcount;

@ -594,7 +594,9 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<IndexingStack.
deployThread(plasmaSwitchboardConstants.CLEANUP, "Cleanup", "simple cleaning process for monitoring information", null,
new serverInstantBusyThread(this, plasmaSwitchboardConstants.CLEANUP_METHOD_START, plasmaSwitchboardConstants.CLEANUP_METHOD_JOBCOUNT, plasmaSwitchboardConstants.CLEANUP_METHOD_FREEMEM), 600000); // all 5 Minutes, wait 10 minutes until first run
deployThread(plasmaSwitchboardConstants.INDEXER, "Indexing", "thread that either initiates a parsing/indexing queue, distributes the index into the DHT, stores parsed documents or flushes the index cache", "/IndexCreateIndexingQueue_p.html",
deployThread(plasmaSwitchboardConstants.CACHEFLUSH, "Cache Flush", "thread that flushes the index cache", "",
new serverInstantBusyThread(this, plasmaSwitchboardConstants.CACHEFLUSH_METHOD_START, plasmaSwitchboardConstants.CACHEFLUSH_METHOD_JOBCOUNT, plasmaSwitchboardConstants.CACHEFLUSH_METHOD_FREEMEM), 3000);
deployThread(plasmaSwitchboardConstants.INDEXER, "Indexing", "thread that either initiates a parsing/indexing queue, distributes the index into the DHT, stores parsed documents", "/IndexCreateIndexingQueue_p.html",
new serverInstantBusyThread(this, plasmaSwitchboardConstants.INDEXER_METHOD_START, plasmaSwitchboardConstants.INDEXER_METHOD_JOBCOUNT, plasmaSwitchboardConstants.INDEXER_METHOD_FREEMEM), 10000);
deployThread(plasmaSwitchboardConstants.CRAWLJOB_REMOTE_TRIGGERED_CRAWL, "Remote Crawl Job", "thread that performes a single crawl/indexing step triggered by a remote peer", null,
new serverInstantBusyThread(crawlQueues, plasmaSwitchboardConstants.CRAWLJOB_REMOTE_TRIGGERED_CRAWL_METHOD_START, plasmaSwitchboardConstants.CRAWLJOB_REMOTE_TRIGGERED_CRAWL_METHOD_JOBCOUNT, plasmaSwitchboardConstants.CRAWLJOB_REMOTE_TRIGGERED_CRAWL_METHOD_FREEMEM), 30000);
@ -1088,6 +1090,16 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<IndexingStack.
log.logConfig("SWITCHBOARD SHUTDOWN TERMINATED");
}
public int rwiCacheSize() {
return webIndex.cacheSize();
}
public boolean rwiCacheFlush() {
if (rwiCacheSize() == 0) return false;
webIndex.flushCacheFor((int) ((this.getConfigLong(plasmaSwitchboardConstants.CACHEFLUSH_BUSYSLEEP, 10000) * this.getConfigLong("performanceIO", 10)) / 100));
return true;
}
public int queueSize() {
return webIndex.queuePreStack.size();
}
@ -1103,7 +1115,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<IndexingStack.
public void deQueueFreeMem() {
// flush some entries from the RAM cache
webIndex.flushCacheSome();
webIndex.flushCacheFor(3000);
// empty some caches
webIndex.clearCache();
plasmaSearchEvent.cleanupEvents(true);
@ -1153,11 +1165,6 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<IndexingStack.
}
boolean doneSomething = false;
// flush some entries from the RAM cache
if (webIndex.queuePreStack.size() == 0) {
doneSomething = webIndex.flushCacheSome() > 0; // permanent flushing only if we are not busy
}
// possibly delete entries from last chunk
if ((this.dhtTransferChunk != null) && (this.dhtTransferChunk.getStatus() == plasmaDHTChunk.chunkStatus_COMPLETE)) {

@ -135,6 +135,18 @@ public final class plasmaSwitchboardConstants {
public static final String INDEXER_METHOD_JOBCOUNT = "queueSize";
public static final String INDEXER_METHOD_FREEMEM = "deQueueFreeMem";
public static final String INDEXER_SLOTS = "indexer.slots";
// 85_cacheflush
/**
* the cache flush thread starts a flush of the RAM cache.
* This periodic flushing replaces the permanent flushing
*/
public static final String CACHEFLUSH = "85_cacheflush";
public static final String CACHEFLUSH_MEMPREREQ = "85_cacheflush_memprereq";
public static final String CACHEFLUSH_IDLESLEEP = "85_cacheflush_idlesleep";
public static final String CACHEFLUSH_BUSYSLEEP = "85_cacheflush_busysleep";
public static final String CACHEFLUSH_METHOD_START = "rwiCacheFlush";
public static final String CACHEFLUSH_METHOD_JOBCOUNT = "rwiCacheSize";
public static final String CACHEFLUSH_METHOD_FREEMEM = "deQueueFreeMem";
// 90_cleanup
/**
* <p><code>public static final String <strong>CLEANUP</strong> = "90_cleanup"</code></p>

@ -471,13 +471,14 @@ public final class plasmaWordIndex implements indexRI {
serverProfiling.update("wordcache", Long.valueOf(cs));
// To ensure termination an additional counter is used
int l = 0;
while ((l++ < 100) && (theCache.maxURLinCache() > wCacheMaxChunk)) {
flushCache(theCache, Math.min(20, theCache.size()));
while (theCache.size() > 0 && (l++ < 100) && (theCache.maxURLinCache() > wCacheMaxChunk)) {
flushCacheOne(theCache);
}
// next flush more entries if the size exceeds the maximum size of the cache
if ((theCache.size() > theCache.getMaxWordCount()) ||
(serverMemory.available() < collections.minMem())) {
flushCache(theCache, Math.min(theCache.size() - theCache.getMaxWordCount() + 1, theCache.size()));
while (theCache.size() > 0 &&
((theCache.size() > theCache.getMaxWordCount()) ||
(serverMemory.available() < collections.minMem()))) {
flushCacheOne(theCache);
}
if (cacheSize() != cs) serverProfiling.update("wordcache", Long.valueOf(cacheSize()));
}
@ -520,40 +521,33 @@ public final class plasmaWordIndex implements indexRI {
dhtFlushControl(this.dhtOutCache);
}
}
public int flushCacheSome() {
final int fo = flushCache(dhtOutCache, Math.max(1, dhtOutCache.size() / lowcachedivisor));
final int fi = flushCache(dhtInCache, Math.max(1, dhtInCache.size() / lowcachedivisor));
return fo + fi;
public void flushCacheFor(int time) {
flushCacheUntil(System.currentTimeMillis() + time);
}
private int flushCache(final indexRAMRI ram, int count) {
if (count <= 0) return 0;
private synchronized void flushCacheUntil(long timeout) {
while (System.currentTimeMillis() < timeout &&
(dhtOutCache.size() > 0 || dhtInCache.size() > 0)) {
flushCacheOne(dhtOutCache);
flushCacheOne(dhtInCache);
}
}
private synchronized void flushCacheOne(final indexRAMRI ram) {
if (ram.size() > 0) collections.addEntries(flushContainer(ram));
}
private indexContainer flushContainer(final indexRAMRI ram) {
String wordHash;
final ArrayList<indexContainer> containerList = new ArrayList<indexContainer>();
count = Math.min(5000, Math.min(count, ram.size()));
boolean collectMax = true;
indexContainer c;
while (collectMax) {
synchronized (ram) {
wordHash = ram.maxScoreWordHash();
c = ram.getContainer(wordHash, null);
if ((c != null) && (c.size() > wCacheMaxChunk)) {
containerList.add(ram.deleteContainer(wordHash));
if (serverMemory.available() < collections.minMem()) break; // protect memory during flush
} else {
collectMax = false;
}
}
wordHash = ram.maxScoreWordHash();
c = ram.getContainer(wordHash, null);
if ((c != null) && (c.size() > wCacheMaxChunk)) {
return ram.deleteContainer(wordHash);
} else {
return ram.deleteContainer(ram.bestFlushWordHash());
}
count = count - containerList.size();
containerList.addAll(ram.bestFlushContainers(count));
// flush the containers
for (final indexContainer container : containerList) collections.addEntries(container);
//System.out.println("DEBUG-Finished flush of " + count + " entries from RAM to DB in " + (System.currentTimeMillis() - start) + " milliseconds");
return containerList.size();
}

Loading…
Cancel
Save