From f4cfd19835edf79dc8337de5329335f6f4b3cb1a Mon Sep 17 00:00:00 2001 From: orbiter Date: Tue, 27 Feb 2007 13:01:22 +0000 Subject: [PATCH] second Generation of collection R/W head path optimization: - permanent cache flush is switched off. The optimized cache flush works better if it is a large number of collections that is flushed together - the flush size can be configured instead the flush divisor. There is only one size for all flushes - collection records that shall be removed during collection transition (jump from one collection file to another) are now not really removed but only marked in RAM. add-operations to the collection use these marked collection spaces - index bulk write operations are now separated for each file of a kelondroFlex git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@3414 6c8d7289-2bf4-0310-a012-ef5d649a1542 --- htroot/PerformanceQueues_p.html | 16 +- htroot/PerformanceQueues_p.java | 14 +- source/de/anomic/index/indexCachedRI.java | 29 +- source/de/anomic/kelondro/kelondroArray.java | 4 +- .../kelondro/kelondroCollectionIndex.java | 291 ++++++++++-------- .../kelondro/kelondroFixedWidthArray.java | 74 +++-- .../de/anomic/kelondro/kelondroFlexTable.java | 4 +- .../kelondro/kelondroFlexWidthArray.java | 19 +- .../de/anomic/plasma/plasmaSwitchboard.java | 5 +- source/de/anomic/plasma/plasmaWordIndex.java | 28 +- yacy.init | 7 +- 11 files changed, 274 insertions(+), 217 deletions(-) diff --git a/htroot/PerformanceQueues_p.html b/htroot/PerformanceQueues_p.html index 53d41b5c9..1080ae0d0 100644 --- a/htroot/PerformanceQueues_p.html +++ b/htroot/PerformanceQueues_p.html @@ -143,20 +143,12 @@ - word-flush idle divisor: + word flush size: - + - - The word flush divisor is applied when an indexing loop is executed. - The divisor is used to divide the number of words in the cache; the result is the number of words that are flushed. - The idleDivisor is used if the indexing queue is empty, the busyDivisor is used if the indexing queue contains at least one element. - - - - word-flush busy divisor: - - + + The word flush size is applied when an indexing loop is executed, and the cache size is exceeded. diff --git a/htroot/PerformanceQueues_p.java b/htroot/PerformanceQueues_p.java index 731e6cc38..0107bd20b 100644 --- a/htroot/PerformanceQueues_p.java +++ b/htroot/PerformanceQueues_p.java @@ -190,14 +190,9 @@ public class PerformanceQueues_p { int wordCacheInitCount = post.getInt("wordCacheInitCount", 30000); switchboard.setConfig("wordCacheInitCount", Integer.toString(wordCacheInitCount)); - int maxWaitingWordFlush = post.getInt("maxWaitingWordFlush", 180); - switchboard.setConfig("maxWaitingWordFlush", Integer.toString(maxWaitingWordFlush)); - - int wordFlushIdleDivisor = post.getInt("wordFlushIdleDivisor", 420); - switchboard.setConfig("wordFlushIdleDivisor", Integer.toString(wordFlushIdleDivisor)); - int wordFlushBusyDivisor = post.getInt("wordFlushBusyDivisor", 5000); - switchboard.setConfig("wordFlushBusyDivisor", Integer.toString(wordFlushBusyDivisor)); - switchboard.wordIndex.setWordFlushDivisor(wordFlushIdleDivisor, wordFlushBusyDivisor); + int flushsize = post.getInt("wordFlushSize", 2000); + switchboard.setConfig("wordFlushSize", Integer.toString(flushsize)); + switchboard.wordIndex.setWordFlushSize(flushsize); } if ((post != null) && (post.containsKey("poolConfig"))) { @@ -287,8 +282,7 @@ public class PerformanceQueues_p { prop.put("wordOutCacheMaxCount", switchboard.getConfigLong("wordCacheMaxCount", 20000)); prop.put("wordInCacheMaxCount", switchboard.getConfigLong("indexDistribution.dhtReceiptLimit", 1000)); prop.put("wordCacheInitCount", switchboard.getConfigLong("wordCacheInitCount", 30000)); - prop.put("wordFlushIdleDivisor", switchboard.getConfigLong("wordFlushIdleDivisor", 420)); - prop.put("wordFlushBusyDivisor", switchboard.getConfigLong("wordFlushBusyDivisor", 5000)); + prop.put("wordFlushSize", switchboard.getConfigLong("wordFlushSize", 2000)); prop.put("onlineCautionDelay", switchboard.getConfig("onlineCautionDelay", "30000")); prop.put("onlineCautionDelayCurrent", System.currentTimeMillis() - switchboard.proxyLastAccess); diff --git a/source/de/anomic/index/indexCachedRI.java b/source/de/anomic/index/indexCachedRI.java index 3e366d32d..12198b445 100644 --- a/source/de/anomic/index/indexCachedRI.java +++ b/source/de/anomic/index/indexCachedRI.java @@ -40,15 +40,13 @@ import de.anomic.kelondro.kelondroRow; import de.anomic.server.logging.serverLog; public class indexCachedRI implements indexRI { - - private static final int flushsize = 1000; private kelondroRow payloadrow; private kelondroOrder indexOrder; private indexRAMRI riExtern, riIntern; private indexCollectionRI backend; public boolean busyCacheFlush; // shows if a cache flush is currently performed - private int idleDivisor, busyDivisor; + private int flushsize; public indexCachedRI(indexRAMRI riExtern, indexRAMRI riIntern, indexCollectionRI backend, kelondroOrder payloadorder, kelondroRow payloadrow, serverLog log) { this.riExtern = riExtern; @@ -57,8 +55,7 @@ public class indexCachedRI implements indexRI { this.indexOrder = payloadorder; this.payloadrow = payloadrow; this.busyCacheFlush = false; - this.busyDivisor = 5000; - this.idleDivisor = 420; + this.flushsize = 2000; } public kelondroRow payloadrow() { @@ -69,9 +66,8 @@ public class indexCachedRI implements indexRI { return 1024 * 1024; } - public void setWordFlushDivisor(int idleDivisor, int busyDivisor) { - this.idleDivisor = idleDivisor; - this.busyDivisor = busyDivisor; + public void setWordFlushSize(int flushsize) { + this.flushsize = flushsize; } public void flushControl() { @@ -102,22 +98,19 @@ public class indexCachedRI implements indexRI { } } - public void flushCacheSome(boolean busy) { - flushCacheSome(riExtern, busy); - flushCacheSome(riIntern, busy); + public void flushCacheSome() { + flushCacheSome(riExtern); + flushCacheSome(riIntern); } - private void flushCacheSome(indexRAMRI ram, boolean busy) { - int flushCount = (busy) ? ram.size() / busyDivisor : ram.size() / idleDivisor; - if (flushCount > 100) flushCount = 100; - if (flushCount < 1) flushCount = Math.min(1, ram.size()); - flushCache(ram, flushCount); - while (ram.maxURLinCache() > 1024) flushCache(ram, 1); + private void flushCacheSome(indexRAMRI ram) { + flushCache(ram, flushsize); + while (ram.maxURLinCache() > 2048) flushCache(ram, 1); } private void flushCache(indexRAMRI ram, int count) { if (count <= 0) return; - if (count > 1000) count = 1000; + if (count > 5000) count = 5000; busyCacheFlush = true; String wordHash; ArrayList containerList = new ArrayList(); diff --git a/source/de/anomic/kelondro/kelondroArray.java b/source/de/anomic/kelondro/kelondroArray.java index 16994c1a1..298801119 100644 --- a/source/de/anomic/kelondro/kelondroArray.java +++ b/source/de/anomic/kelondro/kelondroArray.java @@ -38,8 +38,10 @@ public interface kelondroArray { public int add(kelondroRow.Entry rowinstance) throws IOException; - public void remove(int index) throws IOException; + public void remove(int index, boolean marked) throws IOException; + public void resolveMarkedRemoved() throws IOException; + public void print() throws IOException; } diff --git a/source/de/anomic/kelondro/kelondroCollectionIndex.java b/source/de/anomic/kelondro/kelondroCollectionIndex.java index dd4add483..6f9a20349 100644 --- a/source/de/anomic/kelondro/kelondroCollectionIndex.java +++ b/source/de/anomic/kelondro/kelondroCollectionIndex.java @@ -41,8 +41,10 @@ import de.anomic.server.logging.serverLog; public class kelondroCollectionIndex { + private static final int serialNumber = 0; + protected kelondroIndex index; - int keylength; + private int keylength; private File path; private String filenameStub; private int loadfactor; @@ -235,6 +237,13 @@ public class kelondroCollectionIndex { return array; } + private void arrayResolveRemoved() throws IOException { + Iterator i = arrays.values().iterator(); + while (i.hasNext()) { + ((kelondroFixedWidthArray) i.next()).resolveMarkedRemoved(); + } + } + private int arrayCapacity(int arrayCounter) { int load = this.loadfactor; for (int i = 0; i < arrayCounter; i++) load = load * this.loadfactor; @@ -269,11 +278,11 @@ public class kelondroCollectionIndex { return 2 * m * this.payloadrow.objectsize; } - private kelondroRow.Entry putnew(byte[] key, kelondroRowCollection collection) throws IOException { + private kelondroRow.Entry array_new(byte[] key, kelondroRowCollection collection) throws IOException { // the collection is new int newPartitionNumber = arrayIndex(collection.size()); kelondroRow.Entry indexrow = index.row().newEntry(); - kelondroFixedWidthArray array = getArray(newPartitionNumber, 0, this.payloadrow.objectsize()); + kelondroFixedWidthArray array = getArray(newPartitionNumber, serialNumber, this.payloadrow.objectsize()); // define row kelondroRow.Entry arrayEntry = array.row().newEntry(); @@ -293,64 +302,67 @@ public class kelondroCollectionIndex { indexrow.setCol(idx_col_lastread, kelondroRowCollection.daysSince2000(System.currentTimeMillis())); indexrow.setCol(idx_col_lastwrote, kelondroRowCollection.daysSince2000(System.currentTimeMillis())); - // after calling this method there mus be a index.addUnique(indexrow); + // after calling this method there must be an index.addUnique(indexrow); return indexrow; } - private void putreplace( + private void array_remove( + int oldPartitionNumber, int serialNumber, int chunkSize, + int oldRownumber) throws IOException { + // we need a new slot, that means we must first delete the old entry + // find array file + kelondroFixedWidthArray array = getArray(oldPartitionNumber, serialNumber, chunkSize); + + // delete old entry + array.remove(oldRownumber, true); + } + + private void array_add( byte[] key, kelondroRowCollection collection, kelondroRow.Entry indexrow, - int serialNumber, int chunkSize, - int partitionNumber, int rownumber) throws IOException { - // we don't need a new slot, just write collection into the old one + int partitionNumber, int serialNumber, int chunkSize) throws IOException { - // find array file + // write a new entry in the other array kelondroFixedWidthArray array = getArray(partitionNumber, serialNumber, chunkSize); - + // define new row kelondroRow.Entry arrayEntry = array.row().newEntry(); arrayEntry.setCol(0, key); arrayEntry.setCol(1, collection.exportCollection()); - - // overwrite entry in this array - array.set(rownumber, arrayEntry); - - // update the index entry + + // write a new entry in this array + int rowNumber = array.add(arrayEntry); + + // store the new row number in the index indexrow.setCol(idx_col_chunkcount, collection.size()); indexrow.setCol(idx_col_clusteridx, (byte) partitionNumber); + indexrow.setCol(idx_col_indexpos, (long) rowNumber); indexrow.setCol(idx_col_lastwrote, kelondroRowCollection.daysSince2000(System.currentTimeMillis())); - - // after calling this method there mus be a index.put(indexrow); + + // after calling this method there must be a index.put(indexrow); } - private void puttransit( + private void array_replace( byte[] key, kelondroRowCollection collection, kelondroRow.Entry indexrow, - int serialNumber, int chunkSize, - int oldPartitionNumber, int oldRownumber, - int newPartitionNumber) throws IOException { - // we need a new slot, that means we must first delete the old entry - // find array file - kelondroFixedWidthArray array = getArray(oldPartitionNumber, serialNumber, chunkSize); + int partitionNumber, int serialNumber, int chunkSize, + int rowNumber) throws IOException { + // we don't need a new slot, just write collection into the old one - // delete old entry - array.remove(oldRownumber); + // find array file + kelondroFixedWidthArray array = getArray(partitionNumber, serialNumber, chunkSize); - // write a new entry in the other array - array = getArray(newPartitionNumber, 0, this.payloadrow.objectsize()); - - // define row + // define new row kelondroRow.Entry arrayEntry = array.row().newEntry(); arrayEntry.setCol(0, key); arrayEntry.setCol(1, collection.exportCollection()); - - // write a new entry in this array - int newRowNumber = array.add(arrayEntry); - - // store the new row number in the index + + // overwrite entry in this array + array.set(rowNumber, arrayEntry); + + // update the index entry indexrow.setCol(idx_col_chunkcount, collection.size()); - indexrow.setCol(idx_col_clusteridx, (byte) newPartitionNumber); - indexrow.setCol(idx_col_indexpos, (long) newRowNumber); + indexrow.setCol(idx_col_clusteridx, (byte) partitionNumber); indexrow.setCol(idx_col_lastwrote, kelondroRowCollection.daysSince2000(System.currentTimeMillis())); - + // after calling this method there mus be a index.put(indexrow); } @@ -362,7 +374,7 @@ public class kelondroCollectionIndex { if (indexrow == null) { // create new row and index entry if ((collection != null) && (collection.size() > 0)) { - indexrow = putnew(key, collection); // modifies indexrow + indexrow = array_new(key, collection); // modifies indexrow index.addUnique(indexrow); } return; @@ -375,12 +387,11 @@ public class kelondroCollectionIndex { int oldrownumber = (int) indexrow.getColLong(idx_col_indexpos); // index of the entry in array int oldPartitionNumber = (int) indexrow.getColByte(idx_col_clusteridx); // points to array file assert (oldPartitionNumber >= arrayIndex(oldchunkcount)); - int oldSerialNumber = 0; if ((collection == null) || (collection.size() == 0)) { // delete the index entry and the array - kelondroFixedWidthArray array = getArray(oldPartitionNumber, oldSerialNumber, oldchunksize); - array.remove(oldrownumber); + kelondroFixedWidthArray array = getArray(oldPartitionNumber, serialNumber, oldchunksize); + array.remove(oldrownumber ,false); index.remove(key); return; } @@ -389,17 +400,19 @@ public class kelondroCollectionIndex { // see if we need new space or if we can overwrite the old space if (oldPartitionNumber == newPartitionNumber) { - putreplace( + array_replace( key, collection, indexrow, - oldSerialNumber, this.payloadrow.objectsize(), - oldPartitionNumber, oldrownumber); // modifies indexrow + oldPartitionNumber, serialNumber, this.payloadrow.objectsize(), + oldrownumber); // modifies indexrow } else { - puttransit( + array_remove( + oldPartitionNumber, serialNumber, this.payloadrow.objectsize(), + oldrownumber); + array_add( key, collection, indexrow, - oldSerialNumber, this.payloadrow.objectsize(), - oldPartitionNumber, oldrownumber, - newPartitionNumber); // modifies indexrow + newPartitionNumber, serialNumber, this.payloadrow.objectsize()); // modifies indexrow } + arrayResolveRemoved(); // remove all to-be-removed marked entries index.put(indexrow); // write modified indexrow } @@ -434,23 +447,68 @@ public class kelondroCollectionIndex { // now iterate through the container lists and execute merges // this is done in such a way, that there is a optimized path for the R/W head - // write new containers - i = newContainer.iterator(); - Object[] record; - while (i.hasNext()) { - record = (Object[]) i.next(); // {byte[], indexContainer} - mergeNew((byte[]) record[0], (indexContainer) record[1]); - } - // merge existing containers i = existingContainer.iterator(); + Object[] record; ArrayList indexrows = new ArrayList(); + kelondroRowCollection collection; while (i.hasNext()) { record = (Object[]) i.next(); // {byte[], indexContainer, kelondroRow.Entry} + + // merge with the old collection + key = (byte[]) record[0]; + collection = (kelondroRowCollection) record[1]; indexrow = (kelondroRow.Entry) record[2]; - mergeExisting((byte[]) record[0], (indexContainer) record[1], indexrow); // modifies indexrow + + // read old information + int oldchunksize = (int) indexrow.getColLong(idx_col_chunksize); // needed only for migration + int oldchunkcount = (int) indexrow.getColLong(idx_col_chunkcount); // the number if rows in the collection + int oldrownumber = (int) indexrow.getColLong(idx_col_indexpos); // index of the entry in array + int oldPartitionNumber = (int) indexrow.getColByte(idx_col_clusteridx); // points to array file + assert (oldPartitionNumber >= arrayIndex(oldchunkcount)); + int oldSerialNumber = 0; + + // load the old collection and join it + kelondroRowSet oldcollection = getwithparams(indexrow, oldchunksize, oldchunkcount, oldPartitionNumber, oldrownumber, oldSerialNumber, false); + + // join with new collection + oldcollection.addAllUnique(collection); + oldcollection.shape(); + oldcollection.uniq(); // FIXME: not clear if it would be better to insert the collection with put to avoid double-entries + oldcollection.trim(); + collection = oldcollection; + + int newPartitionNumber = arrayIndex(collection.size()); + + // see if we need new space or if we can overwrite the old space + if (oldPartitionNumber == newPartitionNumber) { + array_replace( + key, collection, indexrow, + oldPartitionNumber, oldSerialNumber, this.payloadrow.objectsize(), + oldrownumber); // modifies indexrow + } else { + array_remove( + oldPartitionNumber, oldSerialNumber, this.payloadrow.objectsize(), + oldrownumber); + array_add( + key, collection, indexrow, + newPartitionNumber, oldSerialNumber, this.payloadrow.objectsize()); // modifies indexrow + } + arrayResolveRemoved(); // remove all to-be-removed marked entries indexrows.add(indexrow); // indexrows are collected and written later as block } + + // write new containers + i = newContainer.iterator(); + while (i.hasNext()) { + record = (Object[]) i.next(); // {byte[], indexContainer} + key = (byte[]) record[0]; + collection = (indexContainer) record[1]; + indexrow = array_new(key, collection); // modifies indexrow + index.addUnique(indexrow); // write modified indexrow + } + + // write index entries index.putMultiple(indexrows, new Date()); // write modified indexrows in optimized manner } @@ -461,60 +519,52 @@ public class kelondroCollectionIndex { // first find an old entry, if one exists kelondroRow.Entry indexrow = index.get(key); if (indexrow == null) { - mergeNew(key, container); + indexrow = array_new(key, container); // modifies indexrow + index.addUnique(indexrow); // write modified indexrow } else { - mergeExisting(key, container, indexrow); // modifies indexrow + // merge with the old collection + // attention! this modifies the indexrow entry which must be written with index.put(indexrow) afterwards! + kelondroRowCollection collection = (kelondroRowCollection) container; + + // read old information + int oldchunksize = (int) indexrow.getColLong(idx_col_chunksize); // needed only for migration + int oldchunkcount = (int) indexrow.getColLong(idx_col_chunkcount); // the number if rows in the collection + int oldrownumber = (int) indexrow.getColLong(idx_col_indexpos); // index of the entry in array + int oldPartitionNumber = (int) indexrow.getColByte(idx_col_clusteridx); // points to array file + assert (oldPartitionNumber >= arrayIndex(oldchunkcount)); + int oldSerialNumber = 0; + + // load the old collection and join it + kelondroRowSet oldcollection = getwithparams(indexrow, oldchunksize, oldchunkcount, oldPartitionNumber, oldrownumber, oldSerialNumber, false); + + // join with new collection + oldcollection.addAllUnique(collection); + oldcollection.shape(); + oldcollection.uniq(); // FIXME: not clear if it would be better to insert the collection with put to avoid double-entries + oldcollection.trim(); + collection = oldcollection; + + int newPartitionNumber = arrayIndex(collection.size()); + + // see if we need new space or if we can overwrite the old space + if (oldPartitionNumber == newPartitionNumber) { + array_replace( + key, collection, indexrow, + oldPartitionNumber, oldSerialNumber, this.payloadrow.objectsize(), + oldrownumber); // modifies indexrow + } else { + array_remove( + oldPartitionNumber, oldSerialNumber, this.payloadrow.objectsize(), + oldrownumber); + array_add( + key, collection, indexrow, + newPartitionNumber, oldSerialNumber, this.payloadrow.objectsize()); // modifies indexrow + } + arrayResolveRemoved(); // remove all to-be-removed marked entries index.put(indexrow); // write modified indexrow } } - private void mergeNew(byte[] key, kelondroRowCollection collection) throws IOException, kelondroOutOfLimitsException { - // create new row and index entry - - kelondroRow.Entry indexrow = putnew(key, collection); // modifies indexrow - index.addUnique(indexrow); // write modified indexrow - } - - private void mergeExisting(byte[] key, kelondroRowCollection collection, kelondroRow.Entry indexrow) throws IOException, kelondroOutOfLimitsException { - // merge with the old collection - // attention! this modifies the indexrow entry which must be written with index.put(indexrow) afterwards! - - // read old information - int oldchunksize = (int) indexrow.getColLong(idx_col_chunksize); // needed only for migration - int oldchunkcount = (int) indexrow.getColLong(idx_col_chunkcount); // the number if rows in the collection - int oldrownumber = (int) indexrow.getColLong(idx_col_indexpos); // index of the entry in array - int oldPartitionNumber = (int) indexrow.getColByte(idx_col_clusteridx); // points to array file - assert (oldPartitionNumber >= arrayIndex(oldchunkcount)); - int oldSerialNumber = 0; - - // load the old collection and join it - kelondroRowSet oldcollection = getwithparams(indexrow, oldchunksize, oldchunkcount, oldPartitionNumber, oldrownumber, oldSerialNumber, false); - - // join with new collection - oldcollection.addAllUnique(collection); - oldcollection.shape(); - oldcollection.uniq(); // FIXME: not clear if it would be better to insert the collection with put to avoid double-entries - oldcollection.trim(); - collection = oldcollection; - - int newPartitionNumber = arrayIndex(collection.size()); - - // see if we need new space or if we can overwrite the old space - if (oldPartitionNumber == newPartitionNumber) { - putreplace( - key, collection, indexrow, - oldSerialNumber, this.payloadrow.objectsize(), - oldPartitionNumber, oldrownumber); // modifies indexrow - } else { - puttransit( - key, collection, indexrow, - oldSerialNumber, this.payloadrow.objectsize(), - oldPartitionNumber, oldrownumber, - newPartitionNumber); // modifies indexrow - } - - } - public synchronized int remove(byte[] key, Set removekeys) throws IOException, kelondroOutOfLimitsException { if ((removekeys == null) || (removekeys.size() == 0)) return 0; @@ -531,12 +581,11 @@ public class kelondroCollectionIndex { int oldrownumber = (int) indexrow.getColLong(idx_col_indexpos); // index of the entry in array int oldPartitionNumber = (int) indexrow.getColByte(idx_col_clusteridx); // points to array file assert (oldPartitionNumber >= arrayIndex(oldchunkcount)); - int oldSerialNumber = 0; int removed = 0; assert (removekeys != null); // load the old collection and remove keys - kelondroRowSet oldcollection = getwithparams(indexrow, oldchunksize, oldchunkcount, oldPartitionNumber, oldrownumber, oldSerialNumber, false); + kelondroRowSet oldcollection = getwithparams(indexrow, oldchunksize, oldchunkcount, oldPartitionNumber, oldrownumber, serialNumber, false); // remove the keys from the set Iterator i = removekeys.iterator(); @@ -551,8 +600,8 @@ public class kelondroCollectionIndex { if (oldcollection.size() == 0) { // delete the index entry and the array - kelondroFixedWidthArray array = getArray(oldPartitionNumber, oldSerialNumber, oldchunksize); - array.remove(oldrownumber); + kelondroFixedWidthArray array = getArray(oldPartitionNumber, serialNumber, oldchunksize); + array.remove(oldrownumber, false); index.remove(key); return removed; } @@ -561,17 +610,19 @@ public class kelondroCollectionIndex { // see if we need new space or if we can overwrite the old space if (oldPartitionNumber == newPartitionNumber) { - putreplace( + array_replace( key, oldcollection, indexrow, - oldSerialNumber, this.payloadrow.objectsize(), - oldPartitionNumber, oldrownumber); // modifies indexrow + oldPartitionNumber, serialNumber, this.payloadrow.objectsize(), + oldrownumber); // modifies indexrow } else { - puttransit( + array_remove( + oldPartitionNumber, serialNumber, this.payloadrow.objectsize(), + oldrownumber); + array_add( key, oldcollection, indexrow, - oldSerialNumber, this.payloadrow.objectsize(), - oldPartitionNumber, oldrownumber, - newPartitionNumber); // modifies indexrow + newPartitionNumber, serialNumber, this.payloadrow.objectsize()); // modifies indexrow } + arrayResolveRemoved(); // remove all to-be-removed marked entries index.put(indexrow); // write modified indexrow return removed; } @@ -630,7 +681,7 @@ public class kelondroCollectionIndex { if (!(index.row().objectOrder.wellformed(arraykey))) { // cleanup for a bad bug that corrupted the database index.remove(indexkey); // the RowCollection must be considered lost - array.remove(rownumber); // loose the RowCollection (we don't know how much is lost) + array.remove(rownumber, false); // loose the RowCollection (we don't know how much is lost) serverLog.logSevere("kelondroCollectionIndex." + array.filename, "lost a RowCollection because of a bad arraykey"); return new kelondroRowSet(this.payloadrow, 0); } @@ -658,7 +709,7 @@ public class kelondroCollectionIndex { index.put(indexrow); array.logFailure("INCONSISTENCY in " + arrayFile(this.path, this.filenameStub, this.loadfactor, chunksize, clusteridx, serialnumber).toString() + ": array has different chunkcount than index: index = " + chunkcount + ", array = " + chunkcountInArray + "; the index has been auto-fixed"); } - if (remove) array.remove(rownumber); // index is removed in calling method + if (remove) array.remove(rownumber, false); // index is removed in calling method return collection; } diff --git a/source/de/anomic/kelondro/kelondroFixedWidthArray.java b/source/de/anomic/kelondro/kelondroFixedWidthArray.java index 464829fca..e07f20007 100644 --- a/source/de/anomic/kelondro/kelondroFixedWidthArray.java +++ b/source/de/anomic/kelondro/kelondroFixedWidthArray.java @@ -48,6 +48,8 @@ package de.anomic.kelondro; import java.io.File; import java.io.IOException; +import java.util.Iterator; +import java.util.TreeSet; public class kelondroFixedWidthArray extends kelondroRecords implements kelondroArray { @@ -55,6 +57,8 @@ public class kelondroFixedWidthArray extends kelondroRecords implements kelondro private static short thisOHBytes = 0; // our record definition does not need extra bytes private static short thisOHHandles = 0; // and no handles + private TreeSet markedRemoved; // a set of Integer indexes of removed records (only temporary) + public kelondroFixedWidthArray(File file, kelondroRow rowdef, int intprops) throws IOException { // this creates a new array super(file, 0, 0, thisOHBytes, thisOHHandles, rowdef, intprops, rowdef.columns() /* txtProps */, 80 /* txtPropWidth */); @@ -67,6 +71,7 @@ public class kelondroFixedWidthArray extends kelondroRecords implements kelondro try {super.setText(i, rowdef.column(i).toString().getBytes());} catch (IOException e) {} } } + markedRemoved = new TreeSet(); } public kelondroFixedWidthArray(kelondroRA ra, kelondroRow rowdef, int intprops) throws IOException { @@ -79,6 +84,7 @@ public class kelondroFixedWidthArray extends kelondroRecords implements kelondro for (int i = 0; i < rowdef.columns(); i++) { try {super.setText(i, rowdef.column(i).toString().getBytes());} catch (IOException e) {} } + markedRemoved = new TreeSet(); } public static kelondroFixedWidthArray open(File file, kelondroRow rowdef, int intprops) { @@ -123,26 +129,51 @@ public class kelondroFixedWidthArray extends kelondroRecords implements kelondro } public synchronized int add(kelondroRow.Entry rowentry) throws IOException { - - Node n = newNode(rowentry.bytes()); - n.commit(CP_NONE); - - return n.handle().hashCode(); + // adds a new rowentry, but re-uses a previously as-deleted marked entry + if (markedRemoved.size() == 0) { + // no records there to be re-used + Node n = newNode(rowentry.bytes()); + n.commit(CP_NONE); + return n.handle().hashCode(); + } else { + // re-use a removed record + Integer index = (Integer) markedRemoved.first(); + markedRemoved.remove(index); + set(index.intValue(), rowentry); + return index.intValue(); + } } + + public synchronized void remove(int index, boolean marked) throws IOException { + assert (index < (super.free() + super.size())) : "remove: index " + index + " out of bounds " + (super.free() + super.size()); + + if (marked) { + // does not remove directly, but sets only a mark that a record is to be deleted + // this record can be re-used with add + markedRemoved.add(new Integer(index)); + } else { + + // get the node at position index + Handle h = new Handle(index); + Node n = getNode(h, false); - public synchronized void remove(int index) throws IOException { - if (index >= (super.free() + super.size())) throw new IOException("remove: index " + index + " out of bounds " + (super.free() + super.size())); - - // get the node at position index - Handle h = new Handle(index); - Node n = getNode(h, false); - - // erase the row - n.setValueRow(null); - n.commit(CP_NONE); + // erase the row + n.setValueRow(null); + n.commit(CP_NONE); - // mark row as deleted so it can be re-used - deleteNode(h); + // mark row as deleted so it can be re-used + deleteNode(h); + } + } + + public synchronized void resolveMarkedRemoved() throws IOException { + Iterator i = markedRemoved.iterator(); + Integer index; + while (i.hasNext()) { + index = (Integer) i.next(); + remove(index.intValue(), false); + } + markedRemoved.clear(); } public void print() throws IOException { @@ -181,14 +212,14 @@ public class kelondroFixedWidthArray extends kelondroRecords implements kelondro k = new kelondroFixedWidthArray(f, rowdef, 6); k.add(k.row().newEntry(new byte[][]{"a".getBytes(), "xxxx".getBytes()})); k.add(k.row().newEntry(new byte[][]{"b".getBytes(), "xxxx".getBytes()})); - k.remove(0); + k.remove(0, false); k.add(k.row().newEntry(new byte[][]{"c".getBytes(), "xxxx".getBytes()})); k.add(k.row().newEntry(new byte[][]{"d".getBytes(), "xxxx".getBytes()})); k.add(k.row().newEntry(new byte[][]{"e".getBytes(), "xxxx".getBytes()})); k.add(k.row().newEntry(new byte[][]{"f".getBytes(), "xxxx".getBytes()})); - k.remove(0); - k.remove(1); + k.remove(0, false); + k.remove(1, false); k.print(); k.print(true); @@ -203,9 +234,10 @@ public class kelondroFixedWidthArray extends kelondroRecords implements kelondro k.add(k.row().newEntry(new byte[][]{(Integer.toString(i) + "-" + Integer.toString(j)).getBytes(), "xxxx".getBytes()})); } for (int j = 0; j < i; j++) { - k.remove(j); + k.remove(j, true); } } + k.resolveMarkedRemoved(); k.print(); k.print(true); k.close(); diff --git a/source/de/anomic/kelondro/kelondroFlexTable.java b/source/de/anomic/kelondro/kelondroFlexTable.java index 6ce428919..4cc4ed455 100644 --- a/source/de/anomic/kelondro/kelondroFlexTable.java +++ b/source/de/anomic/kelondro/kelondroFlexTable.java @@ -235,7 +235,7 @@ public class kelondroFlexTable extends kelondroFlexWidthArray implements kelondr if (i < 0) return null; kelondroRow.Entry r; r = super.get(i); - super.remove(i); + super.remove(i, false); return r; } @@ -244,7 +244,7 @@ public class kelondroFlexTable extends kelondroFlexWidthArray implements kelondr if (i < 0) return null; kelondroRow.Entry r; r = super.get(i); - super.remove(i); + super.remove(i, false); return r; } diff --git a/source/de/anomic/kelondro/kelondroFlexWidthArray.java b/source/de/anomic/kelondro/kelondroFlexWidthArray.java index 96fe56004..26c0d9005 100644 --- a/source/de/anomic/kelondro/kelondroFlexWidthArray.java +++ b/source/de/anomic/kelondro/kelondroFlexWidthArray.java @@ -268,12 +268,12 @@ public class kelondroFlexWidthArray implements kelondroArray { return p; } - public void remove(int index) throws IOException { + public void remove(int index, boolean marked) throws IOException { int r = 0; synchronized (col) { // remove only from the first column - col[0].remove(index); + col[0].remove(index, marked); r = r + col[r].row().columns(); // the other columns will be blanked out only @@ -284,6 +284,12 @@ public class kelondroFlexWidthArray implements kelondroArray { } } + public synchronized void resolveMarkedRemoved() throws IOException { + synchronized (col) { + col[0].resolveMarkedRemoved(); + } + } + public void print() throws IOException { System.out.println("PRINTOUT of table, length=" + size()); kelondroRow.Entry row; @@ -308,14 +314,14 @@ public class kelondroFlexWidthArray implements kelondroArray { kelondroFlexWidthArray k = kelondroFlexWidthArray.open(f, "flextest", rowdef); k.add(k.row().newEntry(new byte[][]{"a".getBytes(), "xxxx".getBytes()})); k.add(k.row().newEntry(new byte[][]{"b".getBytes(), "xxxx".getBytes()})); - k.remove(0); + k.remove(0, false); k.add(k.row().newEntry(new byte[][]{"c".getBytes(), "xxxx".getBytes()})); k.add(k.row().newEntry(new byte[][]{"d".getBytes(), "xxxx".getBytes()})); k.add(k.row().newEntry(new byte[][]{"e".getBytes(), "xxxx".getBytes()})); k.add(k.row().newEntry(new byte[][]{"f".getBytes(), "xxxx".getBytes()})); - k.remove(0); - k.remove(1); + k.remove(0, false); + k.remove(1, false); k.print(); k.col[0].print(true); @@ -335,10 +341,11 @@ public class kelondroFlexWidthArray implements kelondroArray { k.close(); k = kelondroFlexWidthArray.open(f, "flextest", rowdef); for (int j = 0; j < i; j++) { - k.remove(i*2 - j - 1); + k.remove(i*2 - j - 1, true); } k.close(); } + k.resolveMarkedRemoved(); k = kelondroFlexWidthArray.open(f, "flextest", rowdef); k.print(); k.col[0].print(true); diff --git a/source/de/anomic/plasma/plasmaSwitchboard.java b/source/de/anomic/plasma/plasmaSwitchboard.java index 0608614e6..d23ad82a0 100644 --- a/source/de/anomic/plasma/plasmaSwitchboard.java +++ b/source/de/anomic/plasma/plasmaSwitchboard.java @@ -1628,7 +1628,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser public void deQueueFreeMem() { // flush some entries from the RAM cache - wordIndex.flushCacheSome(false); + wordIndex.flushCacheSome(); // adopt maximum cache size to current size to prevent that further OutOfMemoryErrors occur int newMaxCount = Math.max(2000, Math.min((int) getConfigLong(WORDCACHE_MAX_COUNT, 20000), wordIndex.dhtOutCacheSize())); setConfig(WORDCACHE_MAX_COUNT, Integer.toString(newMaxCount)); @@ -1644,8 +1644,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser } // flush some entries from the RAM cache - // (new permanent cache flushing) - wordIndex.flushCacheSome(sbQueue.size() != 0); + if (sbQueue.size() == 0) wordIndex.flushCacheSome(); // permanent flushing only if we are not busy wordIndex.loadedURL.flushCacheSome(); boolean doneSomething = false; diff --git a/source/de/anomic/plasma/plasmaWordIndex.java b/source/de/anomic/plasma/plasmaWordIndex.java index 1d59f4cb6..1540cb0fe 100644 --- a/source/de/anomic/plasma/plasmaWordIndex.java +++ b/source/de/anomic/plasma/plasmaWordIndex.java @@ -55,13 +55,11 @@ import de.anomic.yacy.yacyDHTAction; public final class plasmaWordIndex implements indexRI { - private static final int flushsize = 2000; - private final kelondroOrder indexOrder = kelondroBase64Order.enhancedCoder; private final indexRAMRI dhtOutCache, dhtInCache; private final indexCollectionRI collections; // new database structure to replace AssortmentCluster and FileCluster public boolean busyCacheFlush; // shows if a cache flush is currently performed - private int idleDivisor, busyDivisor; + private int flushsize; public final plasmaCrawlLURL loadedURL; public plasmaWordIndex(File indexRoot, long rwibuffer, long lurlbuffer, long preloadTime, serverLog log) { @@ -80,8 +78,7 @@ public final class plasmaWordIndex implements indexRI { // performance settings busyCacheFlush = false; - this.busyDivisor = 5000; - this.idleDivisor = 420; + this.flushsize = 2000; } public int minMem() { @@ -128,9 +125,8 @@ public final class plasmaWordIndex implements indexRI { dhtInCache.setMaxWordCount(maxWords); } - public void setWordFlushDivisor(int idleDivisor, int busyDivisor) { - this.idleDivisor = idleDivisor; - this.busyDivisor = busyDivisor; + public void setWordFlushSize(int flushsize) { + this.flushsize = flushsize; } public void flushControl() { @@ -185,23 +181,19 @@ public final class plasmaWordIndex implements indexRI { } } - public void flushCacheSome(boolean busy) { - flushCacheSome(dhtOutCache, busy); - flushCacheSome(dhtInCache, busy); + public void flushCacheSome() { + flushCacheSome(dhtOutCache); + flushCacheSome(dhtInCache); } - private void flushCacheSome(indexRAMRI ram, boolean busy) { - int flushCount = (busy) ? ram.size() / busyDivisor : ram.size() / idleDivisor; - if (flushCount > 1000) flushCount = 1000; - if (flushCount >= 1) { - flushCache(ram, flushCount); - } + private void flushCacheSome(indexRAMRI ram) { + flushCache(ram, flushsize); while (ram.maxURLinCache() >= 2040) flushCache(ram, 1); } private void flushCache(indexRAMRI ram, int count) { if (count <= 0) return; - if (count >= 5000) count = 5000; + if (count > 5000) count = 5000; busyCacheFlush = true; String wordHash; ArrayList containerList = new ArrayList(); diff --git a/yacy.init b/yacy.init index 4e329d63c..d620488eb 100644 --- a/yacy.init +++ b/yacy.init @@ -588,14 +588,9 @@ javastart_priority=0 # wordCacheMaxLow/High is the number of word indexes that shall be held in the # ram cache during indexing. When YaCy is shut down, this cache must be # flushed to disc; this may last some minutes. -# The low value is valid for crawling tasks, the high value is valid for -# remote index transmissions and search requests -# maxWaitingWordFlush gives the number of seconds that the shutdown -# may last for the word flush wordCacheMaxCount = 20000 wordCacheInitCount = 30000 -wordFlushIdleDivisor = 400; -wordFlushBusyDivisor = 4000; +wordFlushSize = 2000; # Specifies if yacy can be used as transparent http proxy. #