diff --git a/build.properties b/build.properties index f5e7c0fd0..54aabe816 100644 --- a/build.properties +++ b/build.properties @@ -3,7 +3,7 @@ javacSource=1.5 javacTarget=1.5 # Release Configuration -releaseVersion=0.576 +releaseVersion=0.577 stdReleaseFile=yacy_v${releaseVersion}_${DSTAMP}_${releaseNr}.tar.gz embReleaseFile=yacy_emb_v${releaseVersion}_${DSTAMP}_${releaseNr}.tar.gz proReleaseFile=yacy_pro_v${releaseVersion}_${DSTAMP}_${releaseNr}.tar.gz diff --git a/defaults/yacy.init b/defaults/yacy.init index ddd0b9a8a..3bd1bec15 100644 --- a/defaults/yacy.init +++ b/defaults/yacy.init @@ -610,7 +610,6 @@ javastart_priority__pro=0 # flushed to disc; this may last some minutes. wordCacheMaxCount = 20000 wordCacheInitCount = 20000 -wordFlushSize = 500 wordCacheMaxCount__pro = 20000 wordCacheInitCount__pro = 20000 wordFlushSize__pro = 500 diff --git a/htroot/PerformanceQueues_p.html b/htroot/PerformanceQueues_p.html index 6a7941460..ce38167aa 100644 --- a/htroot/PerformanceQueues_p.html +++ b/htroot/PerformanceQueues_p.html @@ -138,15 +138,6 @@ This is is the init size of space for words in cache. - - word flush size: - - - - - The word flush size is applied when an indexing loop is executed, and the cache size is exceeded. - - diff --git a/htroot/PerformanceQueues_p.java b/htroot/PerformanceQueues_p.java index 537aa03e7..79200044e 100644 --- a/htroot/PerformanceQueues_p.java +++ b/htroot/PerformanceQueues_p.java @@ -189,10 +189,6 @@ public class PerformanceQueues_p { int wordCacheInitCount = post.getInt(plasmaSwitchboard.WORDCACHE_INIT_COUNT, 30000); switchboard.setConfig(plasmaSwitchboard.WORDCACHE_INIT_COUNT, Integer.toString(wordCacheInitCount)); - - int flushsize = post.getInt("wordFlushSize", 2000); - switchboard.setConfig("wordFlushSize", Integer.toString(flushsize)); - switchboard.wordIndex.setWordFlushSize(flushsize); } if ((post != null) && (post.containsKey("poolConfig"))) { @@ -249,7 +245,6 @@ public class PerformanceQueues_p { prop.putNum("maxWaitingWordFlush", switchboard.getConfigLong("maxWaitingWordFlush", 180)); prop.put("wordCacheMaxCount", switchboard.getConfigLong(plasmaSwitchboard.WORDCACHE_MAX_COUNT, 20000)); prop.put("wordCacheInitCount", switchboard.getConfigLong(plasmaSwitchboard.WORDCACHE_INIT_COUNT, 30000)); - prop.put("wordFlushSize", switchboard.getConfigLong("wordFlushSize", 2000)); prop.put("crawlPauseProxy", switchboard.getConfigLong(plasmaSwitchboard.PROXY_ONLINE_CAUTION_DELAY, 30000)); prop.put("crawlPauseLocalsearch", switchboard.getConfigLong(plasmaSwitchboard.LOCALSEACH_ONLINE_CAUTION_DELAY, 30000)); prop.put("crawlPauseRemotesearch", switchboard.getConfigLong(plasmaSwitchboard.REMOTESEARCH_ONLINE_CAUTION_DELAY, 30000)); diff --git a/source/de/anomic/kelondro/kelondroSplitTable.java b/source/de/anomic/kelondro/kelondroSplitTable.java index 6c4a21dcd..1038acbde 100644 --- a/source/de/anomic/kelondro/kelondroSplitTable.java +++ b/source/de/anomic/kelondro/kelondroSplitTable.java @@ -36,8 +36,18 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import de.anomic.server.serverMemory; +import de.anomic.server.serverProcessor; public class kelondroSplitTable implements kelondroIndex { @@ -47,6 +57,10 @@ public class kelondroSplitTable implements kelondroIndex { private static final long minimumRAM4Eco = 80 * 1024 * 1024; private static final int EcoFSBufferSize = 20; + private static final kelondroIndex dummyIndex = new kelondroRAMIndex(new kelondroRow(new kelondroColumn[]{new kelondroColumn("key", kelondroColumn.celltype_binary, kelondroColumn.encoder_bytes, 2, "key")}, kelondroNaturalOrder.naturalOrder, 0), 0); + + // the thread pool for the keeperOf executor service + private ExecutorService executor; private HashMap tables; // a map from a date string to a kelondroIndex object private kelondroRow rowdef; @@ -63,6 +77,9 @@ public class kelondroSplitTable implements kelondroIndex { } public void init(boolean resetOnFail) { + + // init the thread pool for the keeperOf executor service + this.executor = new ThreadPoolExecutor(serverProcessor.useCPU + 1, serverProcessor.useCPU + 1, 10, TimeUnit.SECONDS, new ArrayBlockingQueue(serverProcessor.useCPU + 1)); // initialized tables map this.tables = new HashMap(); @@ -174,12 +191,9 @@ public class kelondroSplitTable implements kelondroIndex { } public int writeBufferSize() { - Iterator i = tables.values().iterator(); int s = 0; - kelondroIndex ki; - while (i.hasNext()) { - ki = ((kelondroIndex) i.next()); - if (ki instanceof kelondroCache) s += ((kelondroCache) ki).writeBufferSize(); + for (final kelondroIndex index : tables.values()) { + if (index instanceof kelondroCache) s += ((kelondroCache) index).writeBufferSize(); } return s; } @@ -189,19 +203,13 @@ public class kelondroSplitTable implements kelondroIndex { } public boolean has(byte[] key) throws IOException { - Iterator i = tables.values().iterator(); - kelondroIndex table; - while (i.hasNext()) { - table = (kelondroIndex) i.next(); - if (table.has(key)) return true; - } - return false; + return keeperOf(key) != null; } public synchronized kelondroRow.Entry get(byte[] key) throws IOException { - Object[] keeper = keeperOf(key); + kelondroIndex keeper = keeperOf(key); if (keeper == null) return null; - return (kelondroRow.Entry) keeper[1]; + return keeper.get(key); } public synchronized void putMultiple(List rows) throws IOException { @@ -214,8 +222,8 @@ public class kelondroSplitTable implements kelondroIndex { public synchronized kelondroRow.Entry put(kelondroRow.Entry row, Date entryDate) throws IOException { assert row.objectsize() <= this.rowdef.objectsize; - Object[] keeper = keeperOf(row.getColBytes(0)); - if (keeper != null) return ((kelondroIndex) keeper[0]).put(row); + kelondroIndex keeper = keeperOf(row.getColBytes(0)); + if (keeper != null) return keeper.put(row); if ((entryDate == null) || (entryDate.after(new Date()))) entryDate = new Date(); // fix date String suffix = dateSuffix(entryDate); if (suffix == null) return null; @@ -247,17 +255,62 @@ public class kelondroSplitTable implements kelondroIndex { return null; } - public synchronized Object[] keeperOf(byte[] key) throws IOException { - Iterator i = tables.values().iterator(); - kelondroIndex table; - kelondroRow.Entry entry; - while (i.hasNext()) { - table = (kelondroIndex) i.next(); - entry = table.get(key); - if (entry != null) return new Object[]{table, entry}; + public synchronized kelondroIndex keeperOf(final byte[] key) throws IOException { + // because the index is stored only in one table, + // and the index is completely in RAM, a concurrency will create + // not concurrent File accesses + //long start = System.currentTimeMillis(); + + // start a concurrent query to database tables + CompletionService cs = new ExecutorCompletionService(executor); + int s = tables.size(); + for (final kelondroIndex table : tables.values()) { + cs.submit(new Callable() { + public kelondroIndex call() { + try { + if (table.has(key)) return table; else return dummyIndex; + } catch (IOException e) { + return dummyIndex; + } + } + }); } + + // read the result + try { + for (int i = 0; i < s; i++) { + Future f = cs.take(); + kelondroIndex index = f.get(); + if (index != dummyIndex) { + //System.out.println("*DEBUG SplitTable success.time = " + (System.currentTimeMillis() - start) + " ms"); + return index; + } + } + //System.out.println("*DEBUG SplitTable fail.time = " + (System.currentTimeMillis() - start) + " ms"); + return null; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (ExecutionException e) { + throw new RuntimeException(e.getCause()); + } + //System.out.println("*DEBUG SplitTable fail.time = " + (System.currentTimeMillis() - start) + " ms"); return null; } + /* + public synchronized kelondroIndex keeperOf(byte[] key) throws IOException { + // TODO: apply concurrency here! + // because the index is stored only in one table, + // and the index is completely in RAM, a concurrency would create not concurrent File accesses + long start = System.currentTimeMillis(); + for (final kelondroIndex table : tables.values()) { + if (table.has(key)) { + System.out.println("*DEBUG SplitTable success.time = " + (System.currentTimeMillis() - start) + " ms"); + return table; + } + } + System.out.println("*DEBUG SplitTable fail.time = " + (System.currentTimeMillis() - start) + " ms"); + return null; + }*/ public synchronized void addUnique(kelondroRow.Entry row) throws IOException { addUnique(row, null); @@ -303,15 +356,9 @@ public class kelondroSplitTable implements kelondroIndex { } public synchronized kelondroRow.Entry remove(byte[] key, boolean keepOrder) throws IOException { - Iterator i = tables.values().iterator(); - kelondroIndex table; - kelondroRow.Entry entry; - while (i.hasNext()) { - table = i.next(); - entry = table.remove(key, keepOrder); - if (entry != null) return entry; - } - return null; + kelondroIndex table = keeperOf(key); + if (table == null) return null; + return table.remove(key, keepOrder); } public synchronized kelondroRow.Entry removeOne() throws IOException { @@ -372,11 +419,17 @@ public class kelondroSplitTable implements kelondroIndex { public synchronized void close() { if (tables == null) return; + this.executor.shutdown(); + try { + this.executor.awaitTermination(3, TimeUnit.SECONDS); + } catch (InterruptedException e) { + } + this.executor = null; Iterator i = tables.values().iterator(); while (i.hasNext()) { i.next().close(); } - tables = null; + this.tables = null; } public static void main(String[] args) { diff --git a/source/de/anomic/plasma/plasmaCrawlNURL.java b/source/de/anomic/plasma/plasmaCrawlNURL.java index 7b0e9423d..eb3927f46 100644 --- a/source/de/anomic/plasma/plasmaCrawlNURL.java +++ b/source/de/anomic/plasma/plasmaCrawlNURL.java @@ -62,7 +62,7 @@ public class plasmaCrawlNURL { public static final int STACK_TYPE_MOVIE = 12; // put on movie stack public static final int STACK_TYPE_MUSIC = 13; // put on music stack - private static final long minimumLocalDelta = 10; // the minimum time difference between access of the same local domain + private static final long minimumLocalDelta = 0; // the minimum time difference between access of the same local domain private static final long minimumGlobalDelta = 333; // the minimum time difference between access of the same global domain private static final long maximumDomAge = 60000; // the maximum age of a domain until it is used for another crawl attempt diff --git a/source/de/anomic/plasma/plasmaSwitchboard.java b/source/de/anomic/plasma/plasmaSwitchboard.java index aa37ea39a..44bd63575 100644 --- a/source/de/anomic/plasma/plasmaSwitchboard.java +++ b/source/de/anomic/plasma/plasmaSwitchboard.java @@ -902,7 +902,10 @@ public final class plasmaSwitchboard extends serverAbstractSwitch it = cache.wordContainers(null, false); while (it.hasNext()) cacheBytes += it.next().size() * entryBytes; } - return cacheBytes; } @@ -209,21 +205,15 @@ public final class plasmaWordIndex implements indexRI { dhtInCache.setMaxWordCount(maxWords); } - public void setWordFlushSize(int flushsize) { - this.flushsize = flushsize; - } - public void dhtFlushControl(indexRAMRI theCache) { // check for forced flush - int count = -1; - synchronized (theCache) { - if ((theCache.maxURLinCache() > wCacheMaxChunk ) || - (theCache.size() > theCache.getMaxWordCount()) || - (serverMemory.available() < collections.minMem())) { - count = theCache.size() + flushsize - theCache.getMaxWordCount(); - } + while (theCache.maxURLinCache() > wCacheMaxChunk ) { + flushCache(theCache, Math.min(10, theCache.size())); + } + if ((theCache.size() > theCache.getMaxWordCount()) || + (serverMemory.available() < collections.minMem())) { + flushCache(theCache, Math.min(theCache.size() - theCache.getMaxWordCount() + 1, theCache.size())); } - if (count >= 0) flushCache(theCache, (count > 0) ? count : 1); } public long getUpdateTime(String wordHash) { @@ -271,8 +261,8 @@ public final class plasmaWordIndex implements indexRI { } public int flushCacheSome() { - int fo = flushCache(dhtOutCache, (dhtOutCache.size() > 3 * flushsize) ? flushsize : Math.min(flushsize, Math.max(1, dhtOutCache.size() / lowcachedivisor))); - int fi = flushCache(dhtInCache, (dhtInCache.size() > 3 * flushsize) ? flushsize : Math.min(flushsize, Math.max(1, dhtInCache.size() / lowcachedivisor))); + int fo = flushCache(dhtOutCache, Math.max(1, dhtOutCache.size() / lowcachedivisor)); + int fi = flushCache(dhtInCache, Math.max(1, dhtInCache.size() / lowcachedivisor)); return fo + fi; } diff --git a/source/de/anomic/server/serverCore.java b/source/de/anomic/server/serverCore.java index ea6ab2915..267144499 100644 --- a/source/de/anomic/server/serverCore.java +++ b/source/de/anomic/server/serverCore.java @@ -699,7 +699,7 @@ public final class serverCore extends serverAbstractBusyThread implements server } catch (IOException e) { e.printStackTrace(); } - busySessions.remove(this); + if (busySessions != null) busySessions.remove(this); } }