From 4b01ff754858fe9983fd9b78ed1a2d176bcd6de7 Mon Sep 17 00:00:00 2001 From: orbiter Date: Sun, 22 May 2005 23:59:52 +0000 Subject: [PATCH] activated assortments, removed write-queues git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@151 6c8d7289-2bf4-0310-a012-ef5d649a1542 --- htroot/Performance_p.html | 10 +- htroot/Performance_p.java | 8 +- source/de/anomic/kelondro/kelondroMap.java | 101 +----------------- .../anomic/plasma/plasmaWordIndexCache.java | 81 ++++++++------ 4 files changed, 68 insertions(+), 132 deletions(-) diff --git a/htroot/Performance_p.html b/htroot/Performance_p.html index f19d542ad..50fb4cff2 100644 --- a/htroot/Performance_p.html +++ b/htroot/Performance_p.html @@ -81,13 +81,13 @@ Changes take effect immediately If this is a big number, it shows that the caching works efficiently. + #{assortmentCluster}# - Singletons Cache Size: - #[singletonsSize]# - - The Singletons Cache is a database that holds words that occurred only once. - + Assortment #[assortmentSlot]# Cache Size: + #[assortmentSize]# + + #{/assortmentCluster}# Maximum number of Word Caches: diff --git a/htroot/Performance_p.java b/htroot/Performance_p.java index ae186155f..6b4cacf82 100644 --- a/htroot/Performance_p.java +++ b/htroot/Performance_p.java @@ -178,7 +178,13 @@ public class Performance_p { prop.put("maxURLinWordCache", "" + switchboard.wordIndex.maxURLinWordCache()); prop.put("maxWaitingWordFlush", switchboard.getConfig("maxWaitingWordFlush", "180")); prop.put("wordCacheMax", switchboard.getConfig("wordCacheMax", "10000")); - prop.put("singletonsSize", switchboard.wordIndex.assortmentSizes()[0]); + + int[] asizes = switchboard.wordIndex.assortmentSizes(); + for (int i = 0; i < asizes.length; i++) { + prop.put("assortmentCluster_" + i + "_assortmentSlot", i + 1); + prop.put("assortmentCluster_" + i + "_assortmentSize", asizes[i]); + } + prop.put("assortmentCluster", asizes.length); // table thread pool settings GenericObjectPool.Config crawlerPoolConfig = switchboard.cacheLoader.getPoolConfig(); diff --git a/source/de/anomic/kelondro/kelondroMap.java b/source/de/anomic/kelondro/kelondroMap.java index fc8288a4a..e7c39d6ff 100644 --- a/source/de/anomic/kelondro/kelondroMap.java +++ b/source/de/anomic/kelondro/kelondroMap.java @@ -61,7 +61,6 @@ public class kelondroMap { private HashMap sortClusterMap; // a String-kelondroMScoreCluster - relation private HashMap accMap; // to store accumulations of specific fields private int elementCount; - private writeQueue writeWorker; public kelondroMap(kelondroDyn dyn) { this(dyn, null, null); @@ -124,96 +123,9 @@ public class kelondroMap { // fill acc map if (accfields != null) for (int i = 0; i < accfields.length; i++) accMap.put(accfields[i], accumulator[i]); - - // initialize a writeQueue and start it - writeWorker = new writeQueue(); - writeWorker.start(); } - class writeQueue extends Thread { - - private LinkedList queue = new LinkedList(); - boolean run; - - public writeQueue() { - super("kelondroMap:WriteQueue"); - run = true; - } - - public void stack(String key) { - //System.out.println("kelondroMap: stack(" + dyn.entryFile.name() + ") " + key); - if (this.isAlive()) - queue.addLast(key); - else - workoff(key); - } - - public void workoff() { - String newKey = null; - synchronized (this.queue) { - if (this.queue.size() > 0) { - newKey = (String) this.queue.removeFirst(); - } - } - if (newKey != null) workoff(newKey); - } - - public void dequeue(String key) { - // take out one entry - synchronized (this.queue) { - ListIterator i = queue.listIterator(); - String k; - while (i.hasNext()) { - k = (String) i.next(); - if (k.equals(key)) { - i.remove(); - return; - } - } - } - } - - public void workoff(String key) { - //System.out.println("kelondroMap: workoff(" + dyn.entryFile.name() + ") " + key); - Map map = (Map) cache.get(key); - if (map == null) return; - try { - writeKra(key, map, ""); - } catch (IOException e) { - System.out.println("PANIC! Critical Error in kelondroMap.writeQueue.workoff(" + dyn.entryFile.name() + "): " + e.getMessage()); - e.printStackTrace(); - run = false; - } - } - - public void run() { - try {sleep(((System.currentTimeMillis() / 3) % 10) * 10000);} catch (InterruptedException e) {} // offset start - - //System.out.println("XXXX! " + (System.currentTimeMillis() / 1000) + " " + dyn.entryFile.name()); - int c; - while (run) { - c = 0; while ((run) && (c++ < 10)) try {sleep(1000);} catch (InterruptedException e) {} - //System.out.println("PING! " + (System.currentTimeMillis() / 1000) + " " + dyn.entryFile.name()); - while (queue.size() > 0) { - if (run) try {sleep(5000 / queue.size());} catch (InterruptedException e) {} - workoff(); - } - } - while (queue.size() > 0) workoff(); - } - - public void terminate(boolean waitFor) { - run = false; - if (waitFor) while (this.isAlive()) try {sleep(500);} catch (InterruptedException e) {} - } - } - - /* - public synchronized boolean has(String key) throws IOException { - return (cache.containsKey(key)) || (dyn.existsDyn(key)); - } - */ - + public synchronized void set(String key, Map newMap) throws IOException { // update elementCount if ((sortfields != null) || (accfields != null)) { @@ -227,8 +139,8 @@ public class kelondroMap { } } - // stack to write queue - writeWorker.stack(key); + // write entry + writeKra(key, newMap, ""); // check for space in cache checkCacheSpace(); @@ -237,7 +149,6 @@ public class kelondroMap { cacheScore.setScore(key, (int) ((System.currentTimeMillis() - startup) / 1000)); cache.put(key, newMap); - // update sortCluster if (sortClusterMap != null) updateSortCluster(key, newMap); @@ -298,9 +209,6 @@ public class kelondroMap { } } - // remove from queue - writeWorker.dequeue(key); - // remove from cache cacheScore.deleteScore(key); cache.remove(key); @@ -345,6 +253,7 @@ public class kelondroMap { return map; } + private synchronized void checkCacheSpace() { // check for space in cache if (cache.size() >= cachesize) { @@ -396,7 +305,7 @@ public class kelondroMap { public void close() throws IOException { // finish queue - writeWorker.terminate(true); + //writeWorker.terminate(true); // close cluster if (sortClusterMap != null) { diff --git a/source/de/anomic/plasma/plasmaWordIndexCache.java b/source/de/anomic/plasma/plasmaWordIndexCache.java index 05b69e24d..a75742a15 100644 --- a/source/de/anomic/plasma/plasmaWordIndexCache.java +++ b/source/de/anomic/plasma/plasmaWordIndexCache.java @@ -55,7 +55,8 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface { private static final String indexDumpFileName = "indexDump0.stack"; private static final String oldSingletonFileName = "indexSingletons0.db"; private static final String newSingletonFileName = "indexAssortment001.db"; - private static final int assortmentLimit = 1; + private static final String indexAssortmentClusterPath = "ACLUSTER"; + private static final int assortmentLimit = 3; // class variables @@ -79,11 +80,19 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface { } public plasmaWordIndexCache(File databaseRoot, plasmaWordIndexInterface backend, int singletonbufferkb, serverLog log) { - // migrate + // migrate#1 File oldSingletonFile = new File(databaseRoot, oldSingletonFileName); File newSingletonFile = new File(databaseRoot, newSingletonFileName); if ((oldSingletonFile.exists()) && (!(newSingletonFile.exists()))) oldSingletonFile.renameTo(newSingletonFile); + // create new assortment cluster path + File assortmentClusterPath = new File(databaseRoot, indexAssortmentClusterPath); + if (!(assortmentClusterPath.exists())) assortmentClusterPath.mkdirs(); + + // migrate#2 + File acSingletonFile = new File(assortmentClusterPath, newSingletonFileName); + if ((newSingletonFile.exists()) && (!(acSingletonFile.exists()))) newSingletonFile.renameTo(acSingletonFile); + // creates a new index cache // the cache has a back-end where indexes that do not fit in the cache are flushed this.databaseRoot = databaseRoot; @@ -94,7 +103,7 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface { this.maxWords = 10000; this.backend = backend; this.log = log; - this.assortmentCluster = new plasmaWordIndexAssortmentCluster(databaseRoot, assortmentLimit, singletonBufferSize, log); + this.assortmentCluster = new plasmaWordIndexAssortmentCluster(assortmentClusterPath, assortmentLimit, singletonBufferSize, log); // read in dump of last session try { @@ -261,7 +270,7 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface { // now decide where to flush that container plasmaWordIndexEntryContainer flushedFromAssortment = assortmentCluster.removeFromAll(key); - if (flushedFromAssortment == null) { + if ((flushedFromAssortment == null) || (flushedFromAssortment.size() == 0)) { // not found in assortments if (container.size() <= assortmentLimit) { // this fits into the assortments @@ -288,15 +297,15 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface { hashScore.setScore(key, container.size()); hashDate.put(key, new Long(time)); } - return -1; + return -flushedFromAssortment.size(); } else { // add this to the backend - return backend.addEntries(container, java.lang.Math.max(time, flushedFromAssortment.updated())); + return backend.addEntries(container, java.lang.Math.max(time, flushedFromAssortment.updated())) - flushedFromAssortment.size(); } } } - private boolean flushFromSingleton(String key) { + private boolean flushFromAssortmentCluster(String key) { // this should only be called if the singleton shall be deleted or returned in an index entity plasmaWordIndexEntryContainer container = assortmentCluster.removeFromAll(key); if (container == null) { @@ -330,8 +339,8 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface { // generate flush list Iterator i = hashScore.scores(true); - TreeMap[] al = new TreeMap[hashScore.getMaxScore() + 1]; - for (int k = 0; k < al.length; k++) al[k] = new TreeMap(); // by create time ordered hash-list + TreeMap[] clusterCandidate = new TreeMap[hashScore.getMaxScore()]; + for (int k = 0; k < clusterCandidate.length; k++) clusterCandidate[k] = new TreeMap(); // by create time ordered hash-list while (i.hasNext()) { // get the entry properties key = (String) i.next(); @@ -339,36 +348,48 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface { count = hashScore.getScore(key); // put it into a specific ohl - al[count].put(createTime, key); + clusterCandidate[count - 1].put(createTime, key); //System.out.println("COUNT FOR KEY " + key + ": " + count); } // print statistics - for (int k = 1; k < al.length; k++) log.logDebug("FLUSH-LIST " + k + ": " + al[k].size() + " entries"); + for (int k = 0; k < clusterCandidate.length; k++) + log.logDebug("FLUSH-LIST " + (k + 1) + ": " + clusterCandidate[k].size() + " entries"); - // flush singletons - i = al[1].entrySet().iterator(); - Map.Entry entry; - while (i.hasNext()) { - entry = (Map.Entry) i.next(); - key = (String) entry.getValue(); - createTime = (Long) entry.getKey(); - if ((createTime != null) && ((System.currentTimeMillis() - createTime.longValue()) > 90000)) { - //log.logDebug("flushing singleton-key " + key + ", count=" + count + ", cachesize=" + cache.size() + ", singleton-size=" + singletons.size()); - count += flushFromMem((String) key, true); - } - } + Map.Entry entry; + int candidateCounter; + // flush from assortment cluster + for (int cluster = 0; cluster < assortmentLimit; cluster++) { + candidateCounter = 0; + // select a specific cluster + i = clusterCandidate[cluster].entrySet().iterator(); + // check each element in this flush-list: too old? + while (i.hasNext()) { + entry = (Map.Entry) i.next(); + key = (String) entry.getValue(); + createTime = (Long) entry.getKey(); + if ((createTime != null) && ((System.currentTimeMillis() - createTime.longValue()) > 90000)) { + //log.logDebug("flushing singleton-key " + key + ", count=" + count + ", cachesize=" + cache.size() + ", singleton-size=" + singletons.size()); + count += java.lang.Math.abs(flushFromMem(key, true)); + candidateCounter += cluster + 1; + } + } + if (candidateCounter > 0) log.logDebug("flushed low-cluster #" + (cluster + 1) + ", count=" + count + ", candidateCounter=" + candidateCounter + ", cachesize=" + cache.size()); + if (count > 2000) return count; + } // flush high-scores - for (int k = al.length - 1; k >= 2; k--) { - i = al[k].entrySet().iterator(); + for (int cluster = clusterCandidate.length; cluster > 0; cluster--) { + candidateCounter = 0; + i = clusterCandidate[cluster - 1].entrySet().iterator(); while (i.hasNext()) { entry = (Map.Entry) i.next(); key = (String) entry.getValue(); createTime = (Long) entry.getKey(); - if ((createTime != null) && ((System.currentTimeMillis() - createTime.longValue()) > (600000/k))) { - //log.logDebug("flushing high-key " + key + ", count=" + count + ", cachesize=" + cache.size() + ", singleton-size=" + singletons.size()); - count += flushFromMem(key, false); + if ((createTime != null) && ((System.currentTimeMillis() - createTime.longValue()) > (600000/cluster))) { + count += java.lang.Math.abs(flushFromMem(key, false)); + candidateCounter += cluster + 1; + log.logDebug("flushed high-cluster #" + (cluster + 1) + ", key=" + key + ", count=" + count + ", cachesize=" + cache.size()); } if (count > 2000) return count; } @@ -380,7 +401,7 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface { public plasmaWordIndexEntity getIndex(String wordHash, boolean deleteIfEmpty) { flushFromMem(wordHash, false); - flushFromSingleton(wordHash); + flushFromAssortmentCluster(wordHash); return backend.getIndex(wordHash, deleteIfEmpty); } @@ -402,7 +423,7 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface { public synchronized int removeEntries(String wordHash, String[] urlHashes, boolean deleteComplete) { flushFromMem(wordHash, false); - flushFromSingleton(wordHash); + flushFromAssortmentCluster(wordHash); return backend.removeEntries(wordHash, urlHashes, deleteComplete); }