- more asserts

- some bugfixes
- some patches for bugs that are already in the database

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@2935 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
orbiter 19 years ago
parent b18273bbb2
commit bdc9216366

@ -45,14 +45,14 @@ public class indexCachedRI implements indexRI {
private kelondroRow payloadrow;
private kelondroOrder indexOrder = new kelondroNaturalOrder(true);
private indexRAMRI dhtOutCache, dhtInCache;
private indexRAMRI riExtern, riIntern;
private indexRI backend;
public boolean busyCacheFlush; // shows if a cache flush is currently performed
private int idleDivisor, busyDivisor;
public indexCachedRI(indexRAMRI dhtOutCache, indexRAMRI dhtInCache, indexRI backend, kelondroRow payloadrow, serverLog log) {
this.dhtOutCache = dhtOutCache;
this.dhtInCache = dhtInCache;
public indexCachedRI(indexRAMRI riExtern, indexRAMRI riIntern, indexRI backend, kelondroRow payloadrow, serverLog log) {
this.riExtern = riExtern;
this.riIntern = riIntern;
this.backend = backend;
this.payloadrow = payloadrow;
this.busyCacheFlush = false;
@ -72,11 +72,11 @@ public class indexCachedRI implements indexRI {
public void flushControl() {
// check for forced flush
synchronized (this) {
if (dhtOutCache.size() > dhtOutCache.getMaxWordCount()) {
flushCache(dhtOutCache, dhtOutCache.size() + 500 - dhtOutCache.getMaxWordCount());
if (riExtern.size() > riExtern.getMaxWordCount()) {
flushCache(riExtern, riExtern.size() + 500 - riExtern.getMaxWordCount());
}
if (dhtInCache.size() > dhtInCache.getMaxWordCount()) {
flushCache(dhtInCache, dhtInCache.size() + 500 - dhtInCache.getMaxWordCount());
if (riIntern.size() > riIntern.getMaxWordCount()) {
flushCache(riIntern, riIntern.size() + 500 - riIntern.getMaxWordCount());
}
}
}
@ -91,47 +91,44 @@ public class indexCachedRI implements indexRI {
return new indexContainer(wordHash, payloadrow);
}
public indexContainer addEntry(String wordHash, indexEntry entry, long updateTime, boolean dhtInCase) {
public indexContainer addEntry(String wordHash, indexEntry entry, long updateTime, boolean intern) {
// add the entry
if (dhtInCase) {
dhtInCache.addEntry(wordHash, entry, updateTime, true);
if (intern) {
riIntern.addEntry(wordHash, entry, updateTime, true);
} else {
dhtOutCache.addEntry(wordHash, entry, updateTime, false);
riExtern.addEntry(wordHash, entry, updateTime, false);
flushControl();
}
return null;
}
public indexContainer addEntries(indexContainer entries, long updateTime, boolean dhtInCase) {
public indexContainer addEntries(indexContainer entries, long updateTime, boolean intern) {
// add the entry
if (dhtInCase) {
dhtInCache.addEntries(entries, updateTime, true);
if (intern) {
riIntern.addEntries(entries, updateTime, true);
} else {
dhtOutCache.addEntries(entries, updateTime, false);
riExtern.addEntries(entries, updateTime, false);
flushControl();
}
return null;
}
public void flushCacheSome(boolean busy) {
flushCacheSome(dhtOutCache, busy);
flushCacheSome(dhtInCache, busy);
flushCacheSome(riExtern, busy);
flushCacheSome(riIntern, busy);
}
private void flushCacheSome(indexRAMRI ram, boolean busy) {
int flushCount;
if (ram.size() > ram.getMaxWordCount()) {
flushCount = ram.size() + 100 - ram.getMaxWordCount();
} else {
flushCount = (busy) ? ram.size() / busyDivisor : ram.size() / idleDivisor;
if (flushCount > 100) flushCount = 100;
if (flushCount < 1) flushCount = Math.min(1, ram.size());
}
int flushCount = (busy) ? ram.size() / busyDivisor : ram.size() / idleDivisor;
if (flushCount > 100) flushCount = 100;
if (flushCount < 1) flushCount = Math.min(1, ram.size());
flushCache(ram, flushCount);
while (ram.maxURLinCache() > 1024) flushCache(ram, 1);
}
private void flushCache(indexRAMRI ram, int count) {
if (count <= 0) return;
if (count > 1000) count = 1000;
busyCacheFlush = true;
String wordHash;
for (int i = 0; i < count; i++) { // possible position of outOfMemoryError ?
@ -149,7 +146,7 @@ public class indexCachedRI implements indexRI {
}
// pause to next loop to give other processes a chance to use IO
try {this.wait(8);} catch (InterruptedException e) {}
//try {this.wait(8);} catch (InterruptedException e) {}
}
}
busyCacheFlush = false;
@ -189,11 +186,11 @@ public class indexCachedRI implements indexRI {
public indexContainer getContainer(String wordHash, Set urlselection, boolean deleteIfEmpty, long maxTime) {
// get from cache
indexContainer container = dhtOutCache.getContainer(wordHash, urlselection, true, maxTime);
indexContainer container = riExtern.getContainer(wordHash, urlselection, true, maxTime);
if (container == null) {
container = dhtInCache.getContainer(wordHash, urlselection, true, maxTime);
container = riIntern.getContainer(wordHash, urlselection, true, maxTime);
} else {
container.add(dhtInCache.getContainer(wordHash, urlselection, true, maxTime), maxTime);
container.add(riIntern.getContainer(wordHash, urlselection, true, maxTime), maxTime);
}
// get from collection index
@ -236,52 +233,52 @@ public class indexCachedRI implements indexRI {
}
public int size() {
return java.lang.Math.max(backend.size(), java.lang.Math.max(dhtInCache.size(), dhtOutCache.size()));
return java.lang.Math.max(backend.size(), java.lang.Math.max(riIntern.size(), riExtern.size()));
}
public int indexSize(String wordHash) {
int size = backend.indexSize(wordHash);
size += dhtInCache.indexSize(wordHash);
size += dhtOutCache.indexSize(wordHash);
size += riIntern.indexSize(wordHash);
size += riExtern.indexSize(wordHash);
return size;
}
public void close(int waitingBoundSeconds) {
synchronized (this) {
dhtInCache.close(waitingBoundSeconds);
dhtOutCache.close(waitingBoundSeconds);
riIntern.close(waitingBoundSeconds);
riExtern.close(waitingBoundSeconds);
backend.close(-1);
}
}
public indexContainer deleteContainer(String wordHash) {
indexContainer c = new indexContainer(wordHash, payloadrow);
c.add(dhtInCache.deleteContainer(wordHash), -1);
c.add(dhtOutCache.deleteContainer(wordHash), -1);
c.add(riIntern.deleteContainer(wordHash), -1);
c.add(riExtern.deleteContainer(wordHash), -1);
c.add(backend.deleteContainer(wordHash), -1);
return c;
}
public boolean removeEntry(String wordHash, String urlHash, boolean deleteComplete) {
boolean removed = false;
removed = removed | (dhtInCache.removeEntry(wordHash, urlHash, deleteComplete));
removed = removed | (dhtOutCache.removeEntry(wordHash, urlHash, deleteComplete));
removed = removed | (riIntern.removeEntry(wordHash, urlHash, deleteComplete));
removed = removed | (riExtern.removeEntry(wordHash, urlHash, deleteComplete));
removed = removed | (backend.removeEntry(wordHash, urlHash, deleteComplete));
return removed;
}
public int removeEntries(String wordHash, Set urlHashes, boolean deleteComplete) {
int removed = 0;
removed += dhtInCache.removeEntries(wordHash, urlHashes, deleteComplete);
removed += dhtOutCache.removeEntries(wordHash, urlHashes, deleteComplete);
removed += riIntern.removeEntries(wordHash, urlHashes, deleteComplete);
removed += riExtern.removeEntries(wordHash, urlHashes, deleteComplete);
removed += backend.removeEntries(wordHash, urlHashes, deleteComplete);
return removed;
}
public String removeEntriesExpl(String wordHash, Set urlHashes, boolean deleteComplete) {
String removed = "";
removed += dhtInCache.removeEntries(wordHash, urlHashes, deleteComplete) + ", ";
removed += dhtOutCache.removeEntries(wordHash, urlHashes, deleteComplete) + ", ";
removed += riIntern.removeEntries(wordHash, urlHashes, deleteComplete) + ", ";
removed += riExtern.removeEntries(wordHash, urlHashes, deleteComplete) + ", ";
removed += backend.removeEntries(wordHash, urlHashes, deleteComplete) + ", ";
return removed;
}
@ -293,7 +290,7 @@ public class indexCachedRI implements indexRI {
containerOrder.rotate(startHash.getBytes());
TreeSet containers = new TreeSet(containerOrder);
Iterator i = wordContainers(startHash, ramOnly, rot);
if (ramOnly) count = Math.min(dhtOutCache.size(), count);
if (ramOnly) count = Math.min(riExtern.size(), count);
indexContainer container;
while ((count > 0) && (i.hasNext())) {
container = (indexContainer) i.next();
@ -313,10 +310,10 @@ public class indexCachedRI implements indexRI {
public Iterator wordContainers(String startHash, boolean ramOnly, boolean rot) {
if (rot) return new rotatingContainerIterator(startHash, ramOnly);
if (ramOnly) {
return dhtOutCache.wordContainers(startHash, false);
return riExtern.wordContainers(startHash, false);
}
return new kelondroMergeIterator(
dhtOutCache.wordContainers(startHash, false),
riExtern.wordContainers(startHash, false),
backend.wordContainers(startHash, false),
new indexContainerOrder(kelondroNaturalOrder.naturalOrder),
indexContainer.containerMergeMethod,

@ -271,6 +271,7 @@ public class indexURLEntry implements Cloneable, indexEntry {
}
public indexEntry generateNormalized(indexEntry min, indexEntry max) {
assert (this.urlHash().length() == 12) : "this.urlhash = " + this.urlHash();
indexURLEntry e = (indexURLEntry) this.clone();
e.normalize(min, max);
return e;

@ -203,7 +203,7 @@ public class kelondroFlexTable extends kelondroFlexWidthArray implements kelondr
}
public synchronized void addUnique(kelondroRow.Entry row) throws IOException {
assert row.bytes().length <= this.rowdef.objectsize;
assert row.bytes().length == this.rowdef.objectsize;
index.addi(row.getColBytes(0), super.add(row));
}

@ -175,19 +175,18 @@ public class kelondroFlexWidthArray implements kelondroArray {
}
public kelondroRow.Entry set(int index, kelondroRow.Entry rowentry) throws IOException {
assert rowentry.bytes().length <= this.rowdef.objectsize;
assert rowentry.bytes().length == this.rowdef.objectsize;
int c = 0;
kelondroRow.Entry e0, e1, p;
p = rowdef.newEntry();
int lastcol;
synchronized (col) {
while (c < rowdef.columns()) {
lastcol = c + col[c].row().columns() - 1;
lastcol = c + col[c].row().columns() - 1;
e0 = col[c].row().newEntry(
rowentry.bytes(),
rowdef.colstart[c],
rowdef.colstart[lastcol] - rowdef.colstart[c]
+ rowdef.width(lastcol));
rowdef.colstart[lastcol] - rowdef.colstart[c] + rowdef.width(lastcol));
e1 = col[c].set(index, e0);
for (int i = 0; i < col[c].row().columns(); i++) {
p.setCol(c + i, e1.getColBytes(i));
@ -199,7 +198,7 @@ public class kelondroFlexWidthArray implements kelondroArray {
}
public int add(kelondroRow.Entry rowentry) throws IOException {
assert rowentry.bytes().length <= this.rowdef.objectsize;
assert rowentry.bytes().length == this.rowdef.objectsize;
kelondroRow.Entry e;
int index = -1;
int lastcol;
@ -209,7 +208,7 @@ public class kelondroFlexWidthArray implements kelondroArray {
int c = col[0].row().columns();
while (c < rowdef.columns()) {
lastcol = c + col[c].row().columns() - 1;
lastcol = c + col[c].row().columns() - 1;
e = col[c].row().newEntry(
rowentry.bytes(),
rowdef.colstart[c],

@ -158,20 +158,14 @@ public class kelondroRow {
}
public Entry(byte[] rowinstance) {
if (rowinstance.length == objectsize) {
this.rowinstance = rowinstance;
} else {
this.rowinstance = new byte[objectsize];
int ll = Math.min(objectsize, rowinstance.length);
System.arraycopy(rowinstance, 0, this.rowinstance, 0, ll);
for (int i = ll; i < objectsize; i++) this.rowinstance[i] = 0;
}
this(rowinstance, 0, rowinstance.length);
}
public Entry(byte[] rowinstance, int start, int length) {
this.rowinstance = new byte[length];
System.arraycopy(rowinstance, start, this.rowinstance, 0, length);
for (int i = rowinstance.length; i < objectsize; i++) this.rowinstance[i] = 0;
this.rowinstance = new byte[objectsize];
int ll = Math.min(objectsize, length);
System.arraycopy(rowinstance, start, this.rowinstance, 0, ll);
for (int i = ll; i < objectsize; i++) this.rowinstance[i] = 0;
}
public Entry(byte[][] cols) {
@ -432,7 +426,7 @@ public class kelondroRow {
public String toString() {
return toPropertyForm(true, false, false);
}
}
public final class EntryIndex extends Entry {

@ -86,7 +86,7 @@ public class kelondroRowCollection {
int chunkcachelength = exportedCollectionRowinstance.length - exportOverheadSize;
kelondroRow.Entry exportedCollection = exportRow(chunkcachelength).newEntry(exportedCollectionRowinstance);
this.chunkcount = (int) exportedCollection.getColLong(exp_chunkcount);
assert (this.chunkcount <= chunkcachelength / rowdef.objectsize) : "chunkcount = " + this.chunkcount + ", chunkcachelength = " + chunkcachelength + ", rowdef.objectsize = " + rowdef.objectsize;
//assert (this.chunkcount <= chunkcachelength / rowdef.objectsize) : "chunkcount = " + this.chunkcount + ", chunkcachelength = " + chunkcachelength + ", rowdef.objectsize = " + rowdef.objectsize;
if ((this.chunkcount > chunkcachelength / rowdef.objectsize)) {
serverLog.logWarning("RowCollection", "corrected wrong chunkcount; chunkcount = " + this.chunkcount + ", chunkcachelength = " + chunkcachelength + ", rowdef.objectsize = " + rowdef.objectsize);
this.chunkcount = chunkcachelength / rowdef.objectsize; // patch problem
@ -137,6 +137,7 @@ public class kelondroRowCollection {
kelondroRow row = exportRow(chunkcache.length);
kelondroRow.Entry entry = row.newEntry();
assert (sortBound <= chunkcount) : "sortBound = " + sortBound + ", chunkcount = " + chunkcount;
assert (this.chunkcount <= chunkcache.length / rowdef.objectsize) : "chunkcount = " + this.chunkcount + ", chunkcache.length = " + chunkcache.length + ", rowdef.objectsize = " + rowdef.objectsize;
entry.setCol(exp_chunkcount, this.chunkcount);
entry.setCol(exp_last_read, daysSince2000(this.lastTimeRead));
entry.setCol(exp_last_wrote, daysSince2000(this.lastTimeWrote));
@ -183,6 +184,9 @@ public class kelondroRowCollection {
public final kelondroRow.Entry get(int index) {
assert (index >= 0) : "get: access with index " + index + " is below zero";
assert (index < chunkcount) : "get: access with index " + index + " is above chunkcount " + chunkcount + "; sortBound = " + sortBound;
assert (index * rowdef.objectsize < chunkcache.length);
if (index >= chunkcount) return null;
if (index * rowdef.objectsize() >= chunkcache.length) return null;
byte[] a = new byte[rowdef.objectsize()];
synchronized (chunkcache) {
System.arraycopy(chunkcache, index * rowdef.objectsize(), a, 0, rowdef.objectsize());
@ -198,6 +202,8 @@ public class kelondroRowCollection {
public final void set(int index, byte[] a, int astart, int alength) {
assert (index >= 0) : "get: access with index " + index + " is below zero";
assert (index < chunkcount) : "get: access with index " + index + " is above chunkcount " + chunkcount;
assert (!(bugappearance(a, astart, alength))) : "a = " + serverLog.arrayList(a, astart, alength);
if (bugappearance(a, astart, alength)) return; // TODO: this is temporary; remote peers may still submit bad entries
int l = Math.min(rowdef.objectsize(), Math.min(alength, a.length - astart));
synchronized (chunkcache) {
System.arraycopy(a, astart, chunkcache, index * rowdef.objectsize(), l);
@ -223,6 +229,8 @@ public class kelondroRowCollection {
assert (!(serverLog.allZero(a, astart, alength))) : "a = " + serverLog.arrayList(a, astart, alength);
assert (alength > 0);
assert (astart + alength <= a.length);
assert (!(bugappearance(a, astart, alength))) : "a = " + serverLog.arrayList(a, astart, alength);
if (bugappearance(a, astart, alength)) return; // TODO: this is temporary; remote peers may still submit bad entries
int l = Math.min(rowdef.objectsize(), Math.min(alength, a.length - astart));
synchronized (chunkcache) {
ensureSize(chunkcount + 1);
@ -232,6 +240,18 @@ public class kelondroRowCollection {
this.lastTimeWrote = System.currentTimeMillis();
}
private static boolean bugappearance(byte[] a, int astart, int alength) {
// check strange appearances of '@[B', which is not a b64-value or any other hash fragment
if (astart + 3 > alength) return false;
loop: for (int i = astart; i <= alength - 3; i++) {
if (a[i ] != 64) continue loop;
if (a[i + 1] != 91) continue loop;
if (a[i + 2] != 66) continue loop;
return true;
}
return false;
}
public final void addAll(kelondroRowCollection c) {
assert(rowdef.objectsize() >= c.rowdef.objectsize());
synchronized(chunkcache) {

@ -235,19 +235,16 @@ public final class plasmaWordIndex implements indexRI {
}
private void flushCacheSome(indexRAMRI ram, boolean busy) {
int flushCount;
if (ram.size() > ram.getMaxWordCount()) {
flushCount = ram.size() + 100 - ram.getMaxWordCount();
} else {
flushCount = (busy) ? ram.size() / busyDivisor : ram.size() / idleDivisor;
if (flushCount > 100) flushCount = 100;
if (flushCount < 1) flushCount = Math.min(1, ram.size());
}
int flushCount = (busy) ? ram.size() / busyDivisor : ram.size() / idleDivisor;
if (flushCount > 100) flushCount = 100;
if (flushCount < 1) flushCount = Math.min(1, ram.size());
flushCache(ram, flushCount);
while (ram.maxURLinCache() > ((useCollectionIndex) ? 1024 : 64)) flushCache(ram, 1);
}
private void flushCache(indexRAMRI ram, int count) {
if (count <= 0) return;
if (count > 1000) count = 1000;
busyCacheFlush = true;
String wordHash;
//System.out.println("DEBUG-Started flush of " + count + " entries from RAM to DB");
@ -274,7 +271,7 @@ public final class plasmaWordIndex implements indexRI {
}
// pause to next loop to give other processes a chance to use IO
try {this.wait(8);} catch (InterruptedException e) {}
//try {this.wait(8);} catch (InterruptedException e) {}
}
}
//System.out.println("DEBUG-Finished flush of " + count + " entries from RAM to DB in " + (System.currentTimeMillis() - start) + " milliseconds");

Loading…
Cancel
Save