diff --git a/source/de/anomic/index/indexCachedRI.java b/source/de/anomic/index/indexCachedRI.java index 6dd921a56..90b6748e5 100644 --- a/source/de/anomic/index/indexCachedRI.java +++ b/source/de/anomic/index/indexCachedRI.java @@ -45,14 +45,14 @@ public class indexCachedRI implements indexRI { private kelondroRow payloadrow; private kelondroOrder indexOrder = new kelondroNaturalOrder(true); - private indexRAMRI dhtOutCache, dhtInCache; + private indexRAMRI riExtern, riIntern; private indexRI backend; public boolean busyCacheFlush; // shows if a cache flush is currently performed private int idleDivisor, busyDivisor; - public indexCachedRI(indexRAMRI dhtOutCache, indexRAMRI dhtInCache, indexRI backend, kelondroRow payloadrow, serverLog log) { - this.dhtOutCache = dhtOutCache; - this.dhtInCache = dhtInCache; + public indexCachedRI(indexRAMRI riExtern, indexRAMRI riIntern, indexRI backend, kelondroRow payloadrow, serverLog log) { + this.riExtern = riExtern; + this.riIntern = riIntern; this.backend = backend; this.payloadrow = payloadrow; this.busyCacheFlush = false; @@ -72,11 +72,11 @@ public class indexCachedRI implements indexRI { public void flushControl() { // check for forced flush synchronized (this) { - if (dhtOutCache.size() > dhtOutCache.getMaxWordCount()) { - flushCache(dhtOutCache, dhtOutCache.size() + 500 - dhtOutCache.getMaxWordCount()); + if (riExtern.size() > riExtern.getMaxWordCount()) { + flushCache(riExtern, riExtern.size() + 500 - riExtern.getMaxWordCount()); } - if (dhtInCache.size() > dhtInCache.getMaxWordCount()) { - flushCache(dhtInCache, dhtInCache.size() + 500 - dhtInCache.getMaxWordCount()); + if (riIntern.size() > riIntern.getMaxWordCount()) { + flushCache(riIntern, riIntern.size() + 500 - riIntern.getMaxWordCount()); } } } @@ -91,47 +91,44 @@ public class indexCachedRI implements indexRI { return new indexContainer(wordHash, payloadrow); } - public indexContainer addEntry(String wordHash, indexEntry entry, long updateTime, boolean dhtInCase) { + public indexContainer addEntry(String wordHash, indexEntry entry, long updateTime, boolean intern) { // add the entry - if (dhtInCase) { - dhtInCache.addEntry(wordHash, entry, updateTime, true); + if (intern) { + riIntern.addEntry(wordHash, entry, updateTime, true); } else { - dhtOutCache.addEntry(wordHash, entry, updateTime, false); + riExtern.addEntry(wordHash, entry, updateTime, false); flushControl(); } return null; } - public indexContainer addEntries(indexContainer entries, long updateTime, boolean dhtInCase) { + public indexContainer addEntries(indexContainer entries, long updateTime, boolean intern) { // add the entry - if (dhtInCase) { - dhtInCache.addEntries(entries, updateTime, true); + if (intern) { + riIntern.addEntries(entries, updateTime, true); } else { - dhtOutCache.addEntries(entries, updateTime, false); + riExtern.addEntries(entries, updateTime, false); flushControl(); } return null; } public void flushCacheSome(boolean busy) { - flushCacheSome(dhtOutCache, busy); - flushCacheSome(dhtInCache, busy); + flushCacheSome(riExtern, busy); + flushCacheSome(riIntern, busy); } private void flushCacheSome(indexRAMRI ram, boolean busy) { - int flushCount; - if (ram.size() > ram.getMaxWordCount()) { - flushCount = ram.size() + 100 - ram.getMaxWordCount(); - } else { - flushCount = (busy) ? ram.size() / busyDivisor : ram.size() / idleDivisor; - if (flushCount > 100) flushCount = 100; - if (flushCount < 1) flushCount = Math.min(1, ram.size()); - } + int flushCount = (busy) ? ram.size() / busyDivisor : ram.size() / idleDivisor; + if (flushCount > 100) flushCount = 100; + if (flushCount < 1) flushCount = Math.min(1, ram.size()); flushCache(ram, flushCount); + while (ram.maxURLinCache() > 1024) flushCache(ram, 1); } private void flushCache(indexRAMRI ram, int count) { if (count <= 0) return; + if (count > 1000) count = 1000; busyCacheFlush = true; String wordHash; for (int i = 0; i < count; i++) { // possible position of outOfMemoryError ? @@ -149,7 +146,7 @@ public class indexCachedRI implements indexRI { } // pause to next loop to give other processes a chance to use IO - try {this.wait(8);} catch (InterruptedException e) {} + //try {this.wait(8);} catch (InterruptedException e) {} } } busyCacheFlush = false; @@ -189,11 +186,11 @@ public class indexCachedRI implements indexRI { public indexContainer getContainer(String wordHash, Set urlselection, boolean deleteIfEmpty, long maxTime) { // get from cache - indexContainer container = dhtOutCache.getContainer(wordHash, urlselection, true, maxTime); + indexContainer container = riExtern.getContainer(wordHash, urlselection, true, maxTime); if (container == null) { - container = dhtInCache.getContainer(wordHash, urlselection, true, maxTime); + container = riIntern.getContainer(wordHash, urlselection, true, maxTime); } else { - container.add(dhtInCache.getContainer(wordHash, urlselection, true, maxTime), maxTime); + container.add(riIntern.getContainer(wordHash, urlselection, true, maxTime), maxTime); } // get from collection index @@ -236,52 +233,52 @@ public class indexCachedRI implements indexRI { } public int size() { - return java.lang.Math.max(backend.size(), java.lang.Math.max(dhtInCache.size(), dhtOutCache.size())); + return java.lang.Math.max(backend.size(), java.lang.Math.max(riIntern.size(), riExtern.size())); } public int indexSize(String wordHash) { int size = backend.indexSize(wordHash); - size += dhtInCache.indexSize(wordHash); - size += dhtOutCache.indexSize(wordHash); + size += riIntern.indexSize(wordHash); + size += riExtern.indexSize(wordHash); return size; } public void close(int waitingBoundSeconds) { synchronized (this) { - dhtInCache.close(waitingBoundSeconds); - dhtOutCache.close(waitingBoundSeconds); + riIntern.close(waitingBoundSeconds); + riExtern.close(waitingBoundSeconds); backend.close(-1); } } public indexContainer deleteContainer(String wordHash) { indexContainer c = new indexContainer(wordHash, payloadrow); - c.add(dhtInCache.deleteContainer(wordHash), -1); - c.add(dhtOutCache.deleteContainer(wordHash), -1); + c.add(riIntern.deleteContainer(wordHash), -1); + c.add(riExtern.deleteContainer(wordHash), -1); c.add(backend.deleteContainer(wordHash), -1); return c; } public boolean removeEntry(String wordHash, String urlHash, boolean deleteComplete) { boolean removed = false; - removed = removed | (dhtInCache.removeEntry(wordHash, urlHash, deleteComplete)); - removed = removed | (dhtOutCache.removeEntry(wordHash, urlHash, deleteComplete)); + removed = removed | (riIntern.removeEntry(wordHash, urlHash, deleteComplete)); + removed = removed | (riExtern.removeEntry(wordHash, urlHash, deleteComplete)); removed = removed | (backend.removeEntry(wordHash, urlHash, deleteComplete)); return removed; } public int removeEntries(String wordHash, Set urlHashes, boolean deleteComplete) { int removed = 0; - removed += dhtInCache.removeEntries(wordHash, urlHashes, deleteComplete); - removed += dhtOutCache.removeEntries(wordHash, urlHashes, deleteComplete); + removed += riIntern.removeEntries(wordHash, urlHashes, deleteComplete); + removed += riExtern.removeEntries(wordHash, urlHashes, deleteComplete); removed += backend.removeEntries(wordHash, urlHashes, deleteComplete); return removed; } public String removeEntriesExpl(String wordHash, Set urlHashes, boolean deleteComplete) { String removed = ""; - removed += dhtInCache.removeEntries(wordHash, urlHashes, deleteComplete) + ", "; - removed += dhtOutCache.removeEntries(wordHash, urlHashes, deleteComplete) + ", "; + removed += riIntern.removeEntries(wordHash, urlHashes, deleteComplete) + ", "; + removed += riExtern.removeEntries(wordHash, urlHashes, deleteComplete) + ", "; removed += backend.removeEntries(wordHash, urlHashes, deleteComplete) + ", "; return removed; } @@ -293,7 +290,7 @@ public class indexCachedRI implements indexRI { containerOrder.rotate(startHash.getBytes()); TreeSet containers = new TreeSet(containerOrder); Iterator i = wordContainers(startHash, ramOnly, rot); - if (ramOnly) count = Math.min(dhtOutCache.size(), count); + if (ramOnly) count = Math.min(riExtern.size(), count); indexContainer container; while ((count > 0) && (i.hasNext())) { container = (indexContainer) i.next(); @@ -313,10 +310,10 @@ public class indexCachedRI implements indexRI { public Iterator wordContainers(String startHash, boolean ramOnly, boolean rot) { if (rot) return new rotatingContainerIterator(startHash, ramOnly); if (ramOnly) { - return dhtOutCache.wordContainers(startHash, false); + return riExtern.wordContainers(startHash, false); } return new kelondroMergeIterator( - dhtOutCache.wordContainers(startHash, false), + riExtern.wordContainers(startHash, false), backend.wordContainers(startHash, false), new indexContainerOrder(kelondroNaturalOrder.naturalOrder), indexContainer.containerMergeMethod, diff --git a/source/de/anomic/index/indexURLEntry.java b/source/de/anomic/index/indexURLEntry.java index 0f5986d06..94aa977d6 100644 --- a/source/de/anomic/index/indexURLEntry.java +++ b/source/de/anomic/index/indexURLEntry.java @@ -271,6 +271,7 @@ public class indexURLEntry implements Cloneable, indexEntry { } public indexEntry generateNormalized(indexEntry min, indexEntry max) { + assert (this.urlHash().length() == 12) : "this.urlhash = " + this.urlHash(); indexURLEntry e = (indexURLEntry) this.clone(); e.normalize(min, max); return e; diff --git a/source/de/anomic/kelondro/kelondroFlexTable.java b/source/de/anomic/kelondro/kelondroFlexTable.java index 19c60b875..d20f4a2d2 100644 --- a/source/de/anomic/kelondro/kelondroFlexTable.java +++ b/source/de/anomic/kelondro/kelondroFlexTable.java @@ -203,7 +203,7 @@ public class kelondroFlexTable extends kelondroFlexWidthArray implements kelondr } public synchronized void addUnique(kelondroRow.Entry row) throws IOException { - assert row.bytes().length <= this.rowdef.objectsize; + assert row.bytes().length == this.rowdef.objectsize; index.addi(row.getColBytes(0), super.add(row)); } diff --git a/source/de/anomic/kelondro/kelondroFlexWidthArray.java b/source/de/anomic/kelondro/kelondroFlexWidthArray.java index c50d04b24..2cb543955 100644 --- a/source/de/anomic/kelondro/kelondroFlexWidthArray.java +++ b/source/de/anomic/kelondro/kelondroFlexWidthArray.java @@ -175,19 +175,18 @@ public class kelondroFlexWidthArray implements kelondroArray { } public kelondroRow.Entry set(int index, kelondroRow.Entry rowentry) throws IOException { - assert rowentry.bytes().length <= this.rowdef.objectsize; + assert rowentry.bytes().length == this.rowdef.objectsize; int c = 0; kelondroRow.Entry e0, e1, p; p = rowdef.newEntry(); int lastcol; synchronized (col) { while (c < rowdef.columns()) { - lastcol = c + col[c].row().columns() - 1; + lastcol = c + col[c].row().columns() - 1; e0 = col[c].row().newEntry( rowentry.bytes(), rowdef.colstart[c], - rowdef.colstart[lastcol] - rowdef.colstart[c] - + rowdef.width(lastcol)); + rowdef.colstart[lastcol] - rowdef.colstart[c] + rowdef.width(lastcol)); e1 = col[c].set(index, e0); for (int i = 0; i < col[c].row().columns(); i++) { p.setCol(c + i, e1.getColBytes(i)); @@ -199,7 +198,7 @@ public class kelondroFlexWidthArray implements kelondroArray { } public int add(kelondroRow.Entry rowentry) throws IOException { - assert rowentry.bytes().length <= this.rowdef.objectsize; + assert rowentry.bytes().length == this.rowdef.objectsize; kelondroRow.Entry e; int index = -1; int lastcol; @@ -209,7 +208,7 @@ public class kelondroFlexWidthArray implements kelondroArray { int c = col[0].row().columns(); while (c < rowdef.columns()) { - lastcol = c + col[c].row().columns() - 1; + lastcol = c + col[c].row().columns() - 1; e = col[c].row().newEntry( rowentry.bytes(), rowdef.colstart[c], diff --git a/source/de/anomic/kelondro/kelondroRow.java b/source/de/anomic/kelondro/kelondroRow.java index 5e4dc3624..16347829d 100644 --- a/source/de/anomic/kelondro/kelondroRow.java +++ b/source/de/anomic/kelondro/kelondroRow.java @@ -158,20 +158,14 @@ public class kelondroRow { } public Entry(byte[] rowinstance) { - if (rowinstance.length == objectsize) { - this.rowinstance = rowinstance; - } else { - this.rowinstance = new byte[objectsize]; - int ll = Math.min(objectsize, rowinstance.length); - System.arraycopy(rowinstance, 0, this.rowinstance, 0, ll); - for (int i = ll; i < objectsize; i++) this.rowinstance[i] = 0; - } + this(rowinstance, 0, rowinstance.length); } public Entry(byte[] rowinstance, int start, int length) { - this.rowinstance = new byte[length]; - System.arraycopy(rowinstance, start, this.rowinstance, 0, length); - for (int i = rowinstance.length; i < objectsize; i++) this.rowinstance[i] = 0; + this.rowinstance = new byte[objectsize]; + int ll = Math.min(objectsize, length); + System.arraycopy(rowinstance, start, this.rowinstance, 0, ll); + for (int i = ll; i < objectsize; i++) this.rowinstance[i] = 0; } public Entry(byte[][] cols) { @@ -432,7 +426,7 @@ public class kelondroRow { public String toString() { return toPropertyForm(true, false, false); } - + } public final class EntryIndex extends Entry { diff --git a/source/de/anomic/kelondro/kelondroRowCollection.java b/source/de/anomic/kelondro/kelondroRowCollection.java index a35d90e70..e478142c9 100644 --- a/source/de/anomic/kelondro/kelondroRowCollection.java +++ b/source/de/anomic/kelondro/kelondroRowCollection.java @@ -86,7 +86,7 @@ public class kelondroRowCollection { int chunkcachelength = exportedCollectionRowinstance.length - exportOverheadSize; kelondroRow.Entry exportedCollection = exportRow(chunkcachelength).newEntry(exportedCollectionRowinstance); this.chunkcount = (int) exportedCollection.getColLong(exp_chunkcount); - assert (this.chunkcount <= chunkcachelength / rowdef.objectsize) : "chunkcount = " + this.chunkcount + ", chunkcachelength = " + chunkcachelength + ", rowdef.objectsize = " + rowdef.objectsize; + //assert (this.chunkcount <= chunkcachelength / rowdef.objectsize) : "chunkcount = " + this.chunkcount + ", chunkcachelength = " + chunkcachelength + ", rowdef.objectsize = " + rowdef.objectsize; if ((this.chunkcount > chunkcachelength / rowdef.objectsize)) { serverLog.logWarning("RowCollection", "corrected wrong chunkcount; chunkcount = " + this.chunkcount + ", chunkcachelength = " + chunkcachelength + ", rowdef.objectsize = " + rowdef.objectsize); this.chunkcount = chunkcachelength / rowdef.objectsize; // patch problem @@ -137,6 +137,7 @@ public class kelondroRowCollection { kelondroRow row = exportRow(chunkcache.length); kelondroRow.Entry entry = row.newEntry(); assert (sortBound <= chunkcount) : "sortBound = " + sortBound + ", chunkcount = " + chunkcount; + assert (this.chunkcount <= chunkcache.length / rowdef.objectsize) : "chunkcount = " + this.chunkcount + ", chunkcache.length = " + chunkcache.length + ", rowdef.objectsize = " + rowdef.objectsize; entry.setCol(exp_chunkcount, this.chunkcount); entry.setCol(exp_last_read, daysSince2000(this.lastTimeRead)); entry.setCol(exp_last_wrote, daysSince2000(this.lastTimeWrote)); @@ -183,6 +184,9 @@ public class kelondroRowCollection { public final kelondroRow.Entry get(int index) { assert (index >= 0) : "get: access with index " + index + " is below zero"; assert (index < chunkcount) : "get: access with index " + index + " is above chunkcount " + chunkcount + "; sortBound = " + sortBound; + assert (index * rowdef.objectsize < chunkcache.length); + if (index >= chunkcount) return null; + if (index * rowdef.objectsize() >= chunkcache.length) return null; byte[] a = new byte[rowdef.objectsize()]; synchronized (chunkcache) { System.arraycopy(chunkcache, index * rowdef.objectsize(), a, 0, rowdef.objectsize()); @@ -198,6 +202,8 @@ public class kelondroRowCollection { public final void set(int index, byte[] a, int astart, int alength) { assert (index >= 0) : "get: access with index " + index + " is below zero"; assert (index < chunkcount) : "get: access with index " + index + " is above chunkcount " + chunkcount; + assert (!(bugappearance(a, astart, alength))) : "a = " + serverLog.arrayList(a, astart, alength); + if (bugappearance(a, astart, alength)) return; // TODO: this is temporary; remote peers may still submit bad entries int l = Math.min(rowdef.objectsize(), Math.min(alength, a.length - astart)); synchronized (chunkcache) { System.arraycopy(a, astart, chunkcache, index * rowdef.objectsize(), l); @@ -223,6 +229,8 @@ public class kelondroRowCollection { assert (!(serverLog.allZero(a, astart, alength))) : "a = " + serverLog.arrayList(a, astart, alength); assert (alength > 0); assert (astart + alength <= a.length); + assert (!(bugappearance(a, astart, alength))) : "a = " + serverLog.arrayList(a, astart, alength); + if (bugappearance(a, astart, alength)) return; // TODO: this is temporary; remote peers may still submit bad entries int l = Math.min(rowdef.objectsize(), Math.min(alength, a.length - astart)); synchronized (chunkcache) { ensureSize(chunkcount + 1); @@ -232,6 +240,18 @@ public class kelondroRowCollection { this.lastTimeWrote = System.currentTimeMillis(); } + private static boolean bugappearance(byte[] a, int astart, int alength) { + // check strange appearances of '@[B', which is not a b64-value or any other hash fragment + if (astart + 3 > alength) return false; + loop: for (int i = astart; i <= alength - 3; i++) { + if (a[i ] != 64) continue loop; + if (a[i + 1] != 91) continue loop; + if (a[i + 2] != 66) continue loop; + return true; + } + return false; + } + public final void addAll(kelondroRowCollection c) { assert(rowdef.objectsize() >= c.rowdef.objectsize()); synchronized(chunkcache) { diff --git a/source/de/anomic/plasma/plasmaWordIndex.java b/source/de/anomic/plasma/plasmaWordIndex.java index 084e4263c..8ab5079c4 100644 --- a/source/de/anomic/plasma/plasmaWordIndex.java +++ b/source/de/anomic/plasma/plasmaWordIndex.java @@ -235,19 +235,16 @@ public final class plasmaWordIndex implements indexRI { } private void flushCacheSome(indexRAMRI ram, boolean busy) { - int flushCount; - if (ram.size() > ram.getMaxWordCount()) { - flushCount = ram.size() + 100 - ram.getMaxWordCount(); - } else { - flushCount = (busy) ? ram.size() / busyDivisor : ram.size() / idleDivisor; - if (flushCount > 100) flushCount = 100; - if (flushCount < 1) flushCount = Math.min(1, ram.size()); - } + int flushCount = (busy) ? ram.size() / busyDivisor : ram.size() / idleDivisor; + if (flushCount > 100) flushCount = 100; + if (flushCount < 1) flushCount = Math.min(1, ram.size()); flushCache(ram, flushCount); + while (ram.maxURLinCache() > ((useCollectionIndex) ? 1024 : 64)) flushCache(ram, 1); } private void flushCache(indexRAMRI ram, int count) { if (count <= 0) return; + if (count > 1000) count = 1000; busyCacheFlush = true; String wordHash; //System.out.println("DEBUG-Started flush of " + count + " entries from RAM to DB"); @@ -274,7 +271,7 @@ public final class plasmaWordIndex implements indexRI { } // pause to next loop to give other processes a chance to use IO - try {this.wait(8);} catch (InterruptedException e) {} + //try {this.wait(8);} catch (InterruptedException e) {} } } //System.out.println("DEBUG-Finished flush of " + count + " entries from RAM to DB in " + (System.currentTimeMillis() - start) + " milliseconds");