From 3031903d503fca482662ff8a0746bb88c637cd41 Mon Sep 17 00:00:00 2001 From: orbiter Date: Tue, 13 Dec 2005 16:00:20 +0000 Subject: [PATCH] re-design of RAM cache flush into assortment cluster git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@1209 6c8d7289-2bf4-0310-a012-ef5d649a1542 --- .../anomic/kelondro/kelondroObjectSpace.java | 6 +- .../plasma/plasmaWordIndexAssortment.java | 54 +++++-- .../plasmaWordIndexAssortmentCluster.java | 88 ++++++++--- .../anomic/plasma/plasmaWordIndexCache.java | 147 +++++++----------- 4 files changed, 158 insertions(+), 137 deletions(-) diff --git a/source/de/anomic/kelondro/kelondroObjectSpace.java b/source/de/anomic/kelondro/kelondroObjectSpace.java index 756e0a61e..bca6f24e6 100644 --- a/source/de/anomic/kelondro/kelondroObjectSpace.java +++ b/source/de/anomic/kelondro/kelondroObjectSpace.java @@ -47,10 +47,12 @@ import java.util.HashMap; public class kelondroObjectSpace { private static final int minSize = 10; + private static final int maxSize = 4096; + private static HashMap objects = new HashMap(); public static byte[] alloc(int len) { - if (len < minSize) return new byte[len]; + if ((len < minSize) || (len > maxSize)) return new byte[len]; synchronized (objects) { ArrayList buf = (ArrayList) objects.get(new Integer(len)); if ((buf == null) || (buf.size() == 0)) return new byte[len]; @@ -59,7 +61,7 @@ public class kelondroObjectSpace { } public static void recycle(byte[] b) { - if (b.length < minSize) { + if ((b.length < minSize) || (b.length > maxSize)) { b = null; return; } diff --git a/source/de/anomic/plasma/plasmaWordIndexAssortment.java b/source/de/anomic/plasma/plasmaWordIndexAssortment.java index 1dc6b5bb4..7466f86ed 100644 --- a/source/de/anomic/plasma/plasmaWordIndexAssortment.java +++ b/source/de/anomic/plasma/plasmaWordIndexAssortment.java @@ -134,12 +134,12 @@ public final class plasmaWordIndexAssortment { row[1] = kelondroRecords.long2bytes(1, 4); row[2] = kelondroRecords.long2bytes(newContainer.updated(), 8); Iterator entries = newContainer.entries(); - plasmaWordIndexEntry entry; + plasmaWordIndexEntry entry; for (int i = 0; i < assortmentLength; i++) { entry = (plasmaWordIndexEntry) entries.next(); - row[3 + 2 * i] = entry.getUrlHash().getBytes(); - row[4 + 2 * i] = entry.toEncodedForm(true).getBytes(); - } + row[3 + 2 * i] = entry.getUrlHash().getBytes(); + row[4 + 2 * i] = entry.toEncodedForm(true).getBytes(); + } byte[][] oldrow = null; try { oldrow = assortments.put(row); @@ -170,20 +170,42 @@ public final class plasmaWordIndexAssortment { resetDatabase(); return null; } - if (row == null) - return null; - long updateTime = kelondroRecords.bytes2long(row[2]); - // plasmaWordIndexEntry[] wordEntries = new plasmaWordIndexEntry[this.bufferStructureLength]; - plasmaWordIndexEntryContainer container = new plasmaWordIndexEntryContainer(wordHash); - for (int i = 0; i < assortmentLength; i++) { - container.add( - new plasmaWordIndexEntry[] { new plasmaWordIndexEntry( - new String(row[3 + 2 * i]), new String( - row[4 + 2 * i])) }, updateTime); - } - return container; + return row2container(wordHash, row); } + public plasmaWordIndexEntryContainer get(String wordHash) { + // gets a word index from assortment database + // and returns the content record + byte[][] row = null; + try { + row = assortments.get(wordHash.getBytes()); + } catch (IOException e) { + log.logSevere("removeAssortment/IO-error: " + e.getMessage() + + " - reset assortment-DB " + assortments.file(), e); + resetDatabase(); + return null; + } catch (kelondroException e) { + log.logSevere("removeAssortment/kelondro-error: " + e.getMessage() + + " - reset assortment-DB " + assortments.file(), e); + resetDatabase(); + return null; + } + return row2container(wordHash, row); + } + + private plasmaWordIndexEntryContainer row2container(String wordHash, byte[][] row) { + if (row == null) return null; + final long updateTime = kelondroRecords.bytes2long(row[2]); + plasmaWordIndexEntryContainer container = new plasmaWordIndexEntryContainer(wordHash); + for (int i = 0; i < assortmentLength; i++) { + container.add( + new plasmaWordIndexEntry[] { new plasmaWordIndexEntry( + new String(row[3 + 2 * i]), new String( + row[4 + 2 * i])) }, updateTime); + } + return container; + } + private void resetDatabase() { // deletes the assortment database and creates a new one if (assortments != null) try { diff --git a/source/de/anomic/plasma/plasmaWordIndexAssortmentCluster.java b/source/de/anomic/plasma/plasmaWordIndexAssortmentCluster.java index f0b9ce168..1707a8277 100644 --- a/source/de/anomic/plasma/plasmaWordIndexAssortmentCluster.java +++ b/source/de/anomic/plasma/plasmaWordIndexAssortmentCluster.java @@ -57,46 +57,47 @@ import de.anomic.server.logging.serverLog; public final class plasmaWordIndexAssortmentCluster { // class variables - private int clusterCount; - public int clusterCapacity; + private int clusterCount; // number of cluster files + public int clusterCapacity; // number of all url referrences that can be stored to a single word in the cluster //private serverLog log; private plasmaWordIndexAssortment[] assortments; private long completeBufferKB; public plasmaWordIndexAssortmentCluster(File assortmentsPath, int clusterCount, int bufferkb, serverLog log) { - // set class variables - if (!(assortmentsPath.exists())) assortmentsPath.mkdirs(); - this.clusterCount = clusterCount; + // set class variables + if (!(assortmentsPath.exists())) assortmentsPath.mkdirs(); + this.clusterCount = clusterCount; this.clusterCapacity = clusterCount * (clusterCount + 1) / 2; this.completeBufferKB = bufferkb; - //this.log = log; - this.assortments = new plasmaWordIndexAssortment[clusterCount]; + // this.log = log; + this.assortments = new plasmaWordIndexAssortment[clusterCount]; // open cluster and close it directly again to detect the element sizes int[] sizes = new int[clusterCount]; int sumSizes = 1; plasmaWordIndexAssortment testAssortment; for (int i = 0; i < clusterCount; i++) { - testAssortment = new plasmaWordIndexAssortment(assortmentsPath, i + 1, 0, null); + testAssortment = new plasmaWordIndexAssortment(assortmentsPath, i + 1, 0, null); sizes[i] = testAssortment.size() + clusterCount - i; sumSizes += sizes[i]; testAssortment.close(); testAssortment = null; - } - - // initialize cluster using the cluster elements size for optimal buffer size - for (int i = 0; i < clusterCount; i++) { - assortments[i] = new plasmaWordIndexAssortment(assortmentsPath, i + 1, (int) (completeBufferKB * (long) sizes[i] / (long) sumSizes), log); - } + } + + // initialize cluster using the cluster elements size for optimal buffer + // size + for (int i = 0; i < clusterCount; i++) { + assortments[i] = new plasmaWordIndexAssortment(assortmentsPath, i + 1, (int) (completeBufferKB * (long) sizes[i] / (long) sumSizes), log); + } } private plasmaWordIndexEntryContainer storeSingular(String wordHash, plasmaWordIndexEntryContainer newContainer) { - // this tries to store the record. If the record does not fit, or a same hash already - // exists and would not fit together with the new record, then the record is deleted from - // the assortmen(s) and returned together with the newRecord. - // if storage was successful, NULL is returned. - if (newContainer.size() > clusterCount) return newContainer; // it will not fit + // this tries to store the record. If the record does not fit, or a same hash already + // exists and would not fit together with the new record, then the record is deleted from + // the assortmen(s) and returned together with the newRecord. + // if storage was successful, NULL is returned. + if (newContainer.size() > clusterCount) return newContainer; // it will not fit plasmaWordIndexEntryContainer buffer; while ((buffer = assortments[newContainer.size() - 1].remove(wordHash)) != null) { newContainer.add(buffer); @@ -109,18 +110,18 @@ public final class plasmaWordIndexAssortmentCluster { } private void storeForced(String wordHash, plasmaWordIndexEntryContainer newContainer) { - // this stores the record and overwrites an existing record. - // this is safe of we can be shure that the record does not exist before. - if ((newContainer == null) || (newContainer.size() == 0) || (newContainer.size() > clusterCount)) return; // it will not fit + // this stores the record and overwrites an existing record. + // this is safe if we can be shure that the record does not exist before. + if ((newContainer == null) || (newContainer.size() == 0) || (newContainer.size() > clusterCount)) return; // it will not fit assortments[newContainer.size() - 1].store(wordHash, newContainer); } private void storeStretched(String wordHash, plasmaWordIndexEntryContainer newContainer) { - // this stores the record and stretches the storage over + // this stores the record and stretches the storage over // all the assortments that are necessary to fit in the record // IMPORTANT: it must be ensured that the wordHash does not exist in the cluster before // i.e. by calling removeFromAll - if (newContainer.size() <= clusterCount) { + if (newContainer.size() <= clusterCount) { storeForced(wordHash, newContainer); return; } @@ -154,9 +155,48 @@ public final class plasmaWordIndexAssortmentCluster { } public plasmaWordIndexEntryContainer storeTry(String wordHash, plasmaWordIndexEntryContainer newContainer) { + // this is called by the index ram cache flush process + // it returnes NULL if the storage was successful + // it returnes a new container if the given container cannot be stored + // containers that are returned will be stored in a WORDS file if (newContainer.size() > clusterCapacity) return newContainer; // it will not fit + + // split the container into several smaller containers that will take the whole thing + // first find out how the container can be splitted + int testsize = Math.min(clusterCount, newContainer.size()); + int [] spaces = new int[testsize]; + for (int i = testsize - 1; i >= 0; i--) spaces[i] = 0; + int need = newContainer.size(); + int s = testsize - 1; + while (s >= 0) { + spaces[s] = (assortments[s].get(wordHash) == null) ? (s + 1) : 0; + need -= spaces[s]; + assert (need >= 0); + if (need == 0) break; + s = (need < s) ? need : s - 1; + } + if (need == 0) { + // we found spaces so that we can put in the newContainer into these spaces + + plasmaWordIndexEntryContainer c; + Iterator i = newContainer.entries(); + for (int j = testsize - 1; j >= 0; j--) { + if (spaces[j] == 0) continue; + c = new plasmaWordIndexEntryContainer(wordHash); + for (int k = 0; k <= j; k++) { + assert (i.hasNext()); + c.add((plasmaWordIndexEntry) i.next(), newContainer.updated()); + } + storeForced(wordHash, c); + } + + return null; + } + if (newContainer.size() <= clusterCount) newContainer = storeSingular(wordHash, newContainer); if (newContainer == null) return null; + + // clean up the whole thing and try to insert the container then newContainer.add(removeFromAll(wordHash, -1)); if (newContainer.size() > clusterCapacity) return newContainer; storeStretched(wordHash, newContainer); diff --git a/source/de/anomic/plasma/plasmaWordIndexCache.java b/source/de/anomic/plasma/plasmaWordIndexCache.java index 8e71df42d..feb30f300 100644 --- a/source/de/anomic/plasma/plasmaWordIndexCache.java +++ b/source/de/anomic/plasma/plasmaWordIndexCache.java @@ -61,8 +61,6 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface { // environment constants private static final String indexArrayFileName = "indexDump1.array"; - private static final String oldSingletonFileName = "indexSingletons0.db"; - private static final String newSingletonFileName = "indexAssortment001.db"; private static final String indexAssortmentClusterPath = "ACLUSTER"; private static final int assortmentCount = 64; private static final int ramCacheLimit = 200; @@ -88,19 +86,11 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface { } public plasmaWordIndexCache(File databaseRoot, plasmaWordIndexInterface backend, int assortmentbufferkb, serverLog log) { - // migrate#1 - File oldSingletonFile = new File(databaseRoot, oldSingletonFileName); - File newSingletonFile = new File(databaseRoot, newSingletonFileName); - if ((oldSingletonFile.exists()) && (!(newSingletonFile.exists()))) oldSingletonFile.renameTo(newSingletonFile); // create new assortment cluster path File assortmentClusterPath = new File(databaseRoot, indexAssortmentClusterPath); if (!(assortmentClusterPath.exists())) assortmentClusterPath.mkdirs(); - // migrate#2 - File acSingletonFile = new File(assortmentClusterPath, newSingletonFileName); - if ((newSingletonFile.exists()) && (!(acSingletonFile.exists()))) newSingletonFile.renameTo(acSingletonFile); - // create flushing thread flushThread = new flush(); @@ -283,7 +273,7 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface { } private final class flush extends Thread { - boolean terminate, pause; + boolean terminate; long intermission; public flush() { @@ -305,27 +295,13 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface { } this.intermission = 0; } - if (pause) { - try {sleep(300);} catch (InterruptedException e) {} - } else { - flushFromMem(); - try { - pausetime = 1 + java.lang.Math.min(1000, 5 * maxWordsHigh/(cache.size() + 1)); - if (cache.size() == 0) pausetime = 2000; - sleep(pausetime); - } catch (InterruptedException e) {} - } + flushFromMem(); + pausetime = 1 + java.lang.Math.min(1000, 5 * maxWordsHigh / (cache.size() + 1)); + if (cache.size() == 0) pausetime = 2000; + try { sleep(pausetime); } catch (InterruptedException e) { } } } - public void pause() { - pause = true; - } - - public void proceed() { - pause = false; - } - public void terminate() { terminate = true; } @@ -337,28 +313,24 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface { // - the oldest entry in the cache // - the entry with maximum count if (cache.size() == 0) return; - flushThread.pause(); try { - String hash = (String) hashScore.getMaxObject(); - if (hash == null) { - flushThread.proceed(); - return; - } - int count = hashScore.getMaxScore(); - long time = longTime(hashDate.getScore(hash)); - if ((count > ramCacheLimit) || - ((count > assortmentCount) && (System.currentTimeMillis() - time > 10000))) { - // flush high-score entries - flushFromMem(hash); - } else { - // flush oldest entries - hash = (String) hashDate.getMinObject(); - flushFromMem(hash); + synchronized (cache) { + String hash = (String) hashScore.getMaxObject(); + if (hash == null) return; + int count = hashScore.getMaxScore(); + long time = longTime(hashDate.getScore(hash)); + if ((count > ramCacheLimit) || ((count > assortmentCount) && (System.currentTimeMillis() - time > 10000))) { + // flush high-score entries + flushFromMem(hash); + } else { + // flush oldest entries + hash = (String) hashDate.getMinObject(); + flushFromMem(hash); + } } } catch (Exception e) { log.logSevere("flushFromMem: " + e.getMessage(), e); } - flushThread.proceed(); } private int flushFromMem(String key) { @@ -378,22 +350,13 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface { } // now decide where to flush that container - //if (container.size() <= assortmentCluster.clusterCapacity) { - // this fits into the assortments - plasmaWordIndexEntryContainer feedback = assortmentCluster.storeTry(key, container); - if (feedback == null) { - return container.size(); - } else { - // *** should care about another option here *** - return backend.addEntries(feedback, time, true); - } - /* + plasmaWordIndexEntryContainer feedback = assortmentCluster.storeTry(key, container); + if (feedback == null) { + return container.size(); } else { - // store to back-end; this should be a rare case - return backend.addEntries(container, time, true); + // *** should care about another option here *** + return backend.addEntries(feedback, time, true); } - **/ - } private int intTime(long longTime) { @@ -405,7 +368,7 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface { } private boolean flushFromAssortmentCluster(String key, long maxTime) { - // this should only be called if the assortment shall be deleted or returned in an index entity + // this should only be called if the assortment shall be deleted or returned in an index entity maxTime = 8 * maxTime / 10; // reserve time for later adding to backend plasmaWordIndexEntryContainer container = assortmentCluster.removeFromAll(key, maxTime); if (container == null) { @@ -418,18 +381,21 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface { } public plasmaWordIndexEntity getIndex(String wordHash, boolean deleteIfEmpty, long maxTime) { - flushThread.pause(); + // this possibly creates an index file in the back-end + // the index file is opened and returned as entity object long start = System.currentTimeMillis(); - flushFromMem(wordHash); - if (maxTime < 0) { - flushFromAssortmentCluster(wordHash, -1); - } else { - long remaining = maxTime - (System.currentTimeMillis() - start); - if (remaining > 0) flushFromAssortmentCluster(wordHash, remaining); + synchronized (cache) { + flushFromMem(wordHash); + if (maxTime < 0) { + flushFromAssortmentCluster(wordHash, -1); + } else { + long remaining = maxTime - (System.currentTimeMillis() - start); + if (remaining > 0) + flushFromAssortmentCluster(wordHash, remaining); + } } - flushThread.proceed(); long r = maxTime - (System.currentTimeMillis() - start); - return backend.getIndex(wordHash, deleteIfEmpty, (r < 0) ? 0 : r); + return backend.getIndex(wordHash, deleteIfEmpty, (r < 0) ? 0 : r); } public long getUpdateTime(String wordHash) { @@ -444,7 +410,6 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface { } public void deleteIndex(String wordHash) { - flushThread.pause(); synchronized (cache) { cache.remove(wordHash); hashScore.deleteScore(wordHash); @@ -452,15 +417,12 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface { } assortmentCluster.removeFromAll(wordHash, -1); backend.deleteIndex(wordHash); - flushThread.proceed(); } public synchronized int removeEntries(String wordHash, String[] urlHashes, boolean deleteComplete) { - flushThread.pause(); flushFromMem(wordHash); flushFromAssortmentCluster(wordHash, -1); int removed = backend.removeEntries(wordHash, urlHashes, deleteComplete); - flushThread.proceed(); return removed; } @@ -469,6 +431,7 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface { int added = 0; // check cache space + /* if (cache.size() > 0) try { // pause to get space in the cache (while it is flushed) long pausetime; @@ -483,16 +446,12 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface { // slow down if we reach cache limit Thread.sleep(pausetime); } catch (InterruptedException e) {} - + */ //serverLog.logDebug("PLASMA INDEXING", "addEntryToIndexMem: cache.size=" + cache.size() + "; hashScore.size=" + hashScore.size()); // put new words into cache String wordHash = container.wordHash(); - synchronized (cache) { - // stop flushing now for one moment - flushThread.pause(); - // put container into cache plasmaWordIndexEntryContainer entries = (plasmaWordIndexEntryContainer) cache.get(wordHash); // null pointer exception? wordhash != null! must be cache==null if (entries == null) entries = new plasmaWordIndexEntryContainer(wordHash); @@ -503,28 +462,26 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface { hashDate.setScore(wordHash, intTime(updateTime)); } entries = null; - - // resume flushing - flushThread.proceed(); + + // force flush (sometimes) + if (System.currentTimeMillis() % 5 == 0) flushFromMem(); } - //System.out.println("DEBUG: cache = " + cache.toString()); - return added; } private void addEntry(String wordHash, plasmaWordIndexEntry newEntry, long updateTime) { - flushThread.pause(); - plasmaWordIndexEntryContainer container = (plasmaWordIndexEntryContainer) cache.get(wordHash); - if (container == null) container = new plasmaWordIndexEntryContainer(wordHash); - plasmaWordIndexEntry[] entries = new plasmaWordIndexEntry[]{newEntry}; - if (container.add(entries, updateTime) > 0) { - cache.put(wordHash, container); - hashScore.incScore(wordHash); - hashDate.setScore(wordHash, intTime(updateTime)); + synchronized (cache) { + plasmaWordIndexEntryContainer container = (plasmaWordIndexEntryContainer) cache.get(wordHash); + if (container == null) container = new plasmaWordIndexEntryContainer(wordHash); + plasmaWordIndexEntry[] entries = new plasmaWordIndexEntry[] { newEntry }; + if (container.add(entries, updateTime) > 0) { + cache.put(wordHash, container); + hashScore.incScore(wordHash); + hashDate.setScore(wordHash, intTime(updateTime)); + } + entries = null; + container = null; } - entries = null; - container = null; - flushThread.proceed(); } public void close(int waitingSeconds) {