- The word flush divisor is applied when an indexing loop is executed.
- The divisor is used to divide the number of words in the cache; the result is the number of words that are flushed.
- The idleDivisor is used if the indexing queue is empty, the busyDivisor is used if the indexing queue contains at least one element.
-
-
-
-
word-flush busy divisor:
-
-
+
+ The word flush size is applied when an indexing loop is executed, and the cache size is exceeded.
diff --git a/htroot/PerformanceQueues_p.java b/htroot/PerformanceQueues_p.java
index 731e6cc38..0107bd20b 100644
--- a/htroot/PerformanceQueues_p.java
+++ b/htroot/PerformanceQueues_p.java
@@ -190,14 +190,9 @@ public class PerformanceQueues_p {
int wordCacheInitCount = post.getInt("wordCacheInitCount", 30000);
switchboard.setConfig("wordCacheInitCount", Integer.toString(wordCacheInitCount));
- int maxWaitingWordFlush = post.getInt("maxWaitingWordFlush", 180);
- switchboard.setConfig("maxWaitingWordFlush", Integer.toString(maxWaitingWordFlush));
-
- int wordFlushIdleDivisor = post.getInt("wordFlushIdleDivisor", 420);
- switchboard.setConfig("wordFlushIdleDivisor", Integer.toString(wordFlushIdleDivisor));
- int wordFlushBusyDivisor = post.getInt("wordFlushBusyDivisor", 5000);
- switchboard.setConfig("wordFlushBusyDivisor", Integer.toString(wordFlushBusyDivisor));
- switchboard.wordIndex.setWordFlushDivisor(wordFlushIdleDivisor, wordFlushBusyDivisor);
+ int flushsize = post.getInt("wordFlushSize", 2000);
+ switchboard.setConfig("wordFlushSize", Integer.toString(flushsize));
+ switchboard.wordIndex.setWordFlushSize(flushsize);
}
if ((post != null) && (post.containsKey("poolConfig"))) {
@@ -287,8 +282,7 @@ public class PerformanceQueues_p {
prop.put("wordOutCacheMaxCount", switchboard.getConfigLong("wordCacheMaxCount", 20000));
prop.put("wordInCacheMaxCount", switchboard.getConfigLong("indexDistribution.dhtReceiptLimit", 1000));
prop.put("wordCacheInitCount", switchboard.getConfigLong("wordCacheInitCount", 30000));
- prop.put("wordFlushIdleDivisor", switchboard.getConfigLong("wordFlushIdleDivisor", 420));
- prop.put("wordFlushBusyDivisor", switchboard.getConfigLong("wordFlushBusyDivisor", 5000));
+ prop.put("wordFlushSize", switchboard.getConfigLong("wordFlushSize", 2000));
prop.put("onlineCautionDelay", switchboard.getConfig("onlineCautionDelay", "30000"));
prop.put("onlineCautionDelayCurrent", System.currentTimeMillis() - switchboard.proxyLastAccess);
diff --git a/source/de/anomic/index/indexCachedRI.java b/source/de/anomic/index/indexCachedRI.java
index 3e366d32d..12198b445 100644
--- a/source/de/anomic/index/indexCachedRI.java
+++ b/source/de/anomic/index/indexCachedRI.java
@@ -40,15 +40,13 @@ import de.anomic.kelondro.kelondroRow;
import de.anomic.server.logging.serverLog;
public class indexCachedRI implements indexRI {
-
- private static final int flushsize = 1000;
private kelondroRow payloadrow;
private kelondroOrder indexOrder;
private indexRAMRI riExtern, riIntern;
private indexCollectionRI backend;
public boolean busyCacheFlush; // shows if a cache flush is currently performed
- private int idleDivisor, busyDivisor;
+ private int flushsize;
public indexCachedRI(indexRAMRI riExtern, indexRAMRI riIntern, indexCollectionRI backend, kelondroOrder payloadorder, kelondroRow payloadrow, serverLog log) {
this.riExtern = riExtern;
@@ -57,8 +55,7 @@ public class indexCachedRI implements indexRI {
this.indexOrder = payloadorder;
this.payloadrow = payloadrow;
this.busyCacheFlush = false;
- this.busyDivisor = 5000;
- this.idleDivisor = 420;
+ this.flushsize = 2000;
}
public kelondroRow payloadrow() {
@@ -69,9 +66,8 @@ public class indexCachedRI implements indexRI {
return 1024 * 1024;
}
- public void setWordFlushDivisor(int idleDivisor, int busyDivisor) {
- this.idleDivisor = idleDivisor;
- this.busyDivisor = busyDivisor;
+ public void setWordFlushSize(int flushsize) {
+ this.flushsize = flushsize;
}
public void flushControl() {
@@ -102,22 +98,19 @@ public class indexCachedRI implements indexRI {
}
}
- public void flushCacheSome(boolean busy) {
- flushCacheSome(riExtern, busy);
- flushCacheSome(riIntern, busy);
+ public void flushCacheSome() {
+ flushCacheSome(riExtern);
+ flushCacheSome(riIntern);
}
- private void flushCacheSome(indexRAMRI ram, boolean busy) {
- int flushCount = (busy) ? ram.size() / busyDivisor : ram.size() / idleDivisor;
- if (flushCount > 100) flushCount = 100;
- if (flushCount < 1) flushCount = Math.min(1, ram.size());
- flushCache(ram, flushCount);
- while (ram.maxURLinCache() > 1024) flushCache(ram, 1);
+ private void flushCacheSome(indexRAMRI ram) {
+ flushCache(ram, flushsize);
+ while (ram.maxURLinCache() > 2048) flushCache(ram, 1);
}
private void flushCache(indexRAMRI ram, int count) {
if (count <= 0) return;
- if (count > 1000) count = 1000;
+ if (count > 5000) count = 5000;
busyCacheFlush = true;
String wordHash;
ArrayList containerList = new ArrayList();
diff --git a/source/de/anomic/kelondro/kelondroArray.java b/source/de/anomic/kelondro/kelondroArray.java
index 16994c1a1..298801119 100644
--- a/source/de/anomic/kelondro/kelondroArray.java
+++ b/source/de/anomic/kelondro/kelondroArray.java
@@ -38,8 +38,10 @@ public interface kelondroArray {
public int add(kelondroRow.Entry rowinstance) throws IOException;
- public void remove(int index) throws IOException;
+ public void remove(int index, boolean marked) throws IOException;
+ public void resolveMarkedRemoved() throws IOException;
+
public void print() throws IOException;
}
diff --git a/source/de/anomic/kelondro/kelondroCollectionIndex.java b/source/de/anomic/kelondro/kelondroCollectionIndex.java
index dd4add483..6f9a20349 100644
--- a/source/de/anomic/kelondro/kelondroCollectionIndex.java
+++ b/source/de/anomic/kelondro/kelondroCollectionIndex.java
@@ -41,8 +41,10 @@ import de.anomic.server.logging.serverLog;
public class kelondroCollectionIndex {
+ private static final int serialNumber = 0;
+
protected kelondroIndex index;
- int keylength;
+ private int keylength;
private File path;
private String filenameStub;
private int loadfactor;
@@ -235,6 +237,13 @@ public class kelondroCollectionIndex {
return array;
}
+ private void arrayResolveRemoved() throws IOException {
+ Iterator i = arrays.values().iterator();
+ while (i.hasNext()) {
+ ((kelondroFixedWidthArray) i.next()).resolveMarkedRemoved();
+ }
+ }
+
private int arrayCapacity(int arrayCounter) {
int load = this.loadfactor;
for (int i = 0; i < arrayCounter; i++) load = load * this.loadfactor;
@@ -269,11 +278,11 @@ public class kelondroCollectionIndex {
return 2 * m * this.payloadrow.objectsize;
}
- private kelondroRow.Entry putnew(byte[] key, kelondroRowCollection collection) throws IOException {
+ private kelondroRow.Entry array_new(byte[] key, kelondroRowCollection collection) throws IOException {
// the collection is new
int newPartitionNumber = arrayIndex(collection.size());
kelondroRow.Entry indexrow = index.row().newEntry();
- kelondroFixedWidthArray array = getArray(newPartitionNumber, 0, this.payloadrow.objectsize());
+ kelondroFixedWidthArray array = getArray(newPartitionNumber, serialNumber, this.payloadrow.objectsize());
// define row
kelondroRow.Entry arrayEntry = array.row().newEntry();
@@ -293,64 +302,67 @@ public class kelondroCollectionIndex {
indexrow.setCol(idx_col_lastread, kelondroRowCollection.daysSince2000(System.currentTimeMillis()));
indexrow.setCol(idx_col_lastwrote, kelondroRowCollection.daysSince2000(System.currentTimeMillis()));
- // after calling this method there mus be a index.addUnique(indexrow);
+ // after calling this method there must be an index.addUnique(indexrow);
return indexrow;
}
- private void putreplace(
+ private void array_remove(
+ int oldPartitionNumber, int serialNumber, int chunkSize,
+ int oldRownumber) throws IOException {
+ // we need a new slot, that means we must first delete the old entry
+ // find array file
+ kelondroFixedWidthArray array = getArray(oldPartitionNumber, serialNumber, chunkSize);
+
+ // delete old entry
+ array.remove(oldRownumber, true);
+ }
+
+ private void array_add(
byte[] key, kelondroRowCollection collection, kelondroRow.Entry indexrow,
- int serialNumber, int chunkSize,
- int partitionNumber, int rownumber) throws IOException {
- // we don't need a new slot, just write collection into the old one
+ int partitionNumber, int serialNumber, int chunkSize) throws IOException {
- // find array file
+ // write a new entry in the other array
kelondroFixedWidthArray array = getArray(partitionNumber, serialNumber, chunkSize);
-
+
// define new row
kelondroRow.Entry arrayEntry = array.row().newEntry();
arrayEntry.setCol(0, key);
arrayEntry.setCol(1, collection.exportCollection());
-
- // overwrite entry in this array
- array.set(rownumber, arrayEntry);
-
- // update the index entry
+
+ // write a new entry in this array
+ int rowNumber = array.add(arrayEntry);
+
+ // store the new row number in the index
indexrow.setCol(idx_col_chunkcount, collection.size());
indexrow.setCol(idx_col_clusteridx, (byte) partitionNumber);
+ indexrow.setCol(idx_col_indexpos, (long) rowNumber);
indexrow.setCol(idx_col_lastwrote, kelondroRowCollection.daysSince2000(System.currentTimeMillis()));
-
- // after calling this method there mus be a index.put(indexrow);
+
+ // after calling this method there must be a index.put(indexrow);
}
- private void puttransit(
+ private void array_replace(
byte[] key, kelondroRowCollection collection, kelondroRow.Entry indexrow,
- int serialNumber, int chunkSize,
- int oldPartitionNumber, int oldRownumber,
- int newPartitionNumber) throws IOException {
- // we need a new slot, that means we must first delete the old entry
- // find array file
- kelondroFixedWidthArray array = getArray(oldPartitionNumber, serialNumber, chunkSize);
+ int partitionNumber, int serialNumber, int chunkSize,
+ int rowNumber) throws IOException {
+ // we don't need a new slot, just write collection into the old one
- // delete old entry
- array.remove(oldRownumber);
+ // find array file
+ kelondroFixedWidthArray array = getArray(partitionNumber, serialNumber, chunkSize);
- // write a new entry in the other array
- array = getArray(newPartitionNumber, 0, this.payloadrow.objectsize());
-
- // define row
+ // define new row
kelondroRow.Entry arrayEntry = array.row().newEntry();
arrayEntry.setCol(0, key);
arrayEntry.setCol(1, collection.exportCollection());
-
- // write a new entry in this array
- int newRowNumber = array.add(arrayEntry);
-
- // store the new row number in the index
+
+ // overwrite entry in this array
+ array.set(rowNumber, arrayEntry);
+
+ // update the index entry
indexrow.setCol(idx_col_chunkcount, collection.size());
- indexrow.setCol(idx_col_clusteridx, (byte) newPartitionNumber);
- indexrow.setCol(idx_col_indexpos, (long) newRowNumber);
+ indexrow.setCol(idx_col_clusteridx, (byte) partitionNumber);
indexrow.setCol(idx_col_lastwrote, kelondroRowCollection.daysSince2000(System.currentTimeMillis()));
-
+
// after calling this method there mus be a index.put(indexrow);
}
@@ -362,7 +374,7 @@ public class kelondroCollectionIndex {
if (indexrow == null) {
// create new row and index entry
if ((collection != null) && (collection.size() > 0)) {
- indexrow = putnew(key, collection); // modifies indexrow
+ indexrow = array_new(key, collection); // modifies indexrow
index.addUnique(indexrow);
}
return;
@@ -375,12 +387,11 @@ public class kelondroCollectionIndex {
int oldrownumber = (int) indexrow.getColLong(idx_col_indexpos); // index of the entry in array
int oldPartitionNumber = (int) indexrow.getColByte(idx_col_clusteridx); // points to array file
assert (oldPartitionNumber >= arrayIndex(oldchunkcount));
- int oldSerialNumber = 0;
if ((collection == null) || (collection.size() == 0)) {
// delete the index entry and the array
- kelondroFixedWidthArray array = getArray(oldPartitionNumber, oldSerialNumber, oldchunksize);
- array.remove(oldrownumber);
+ kelondroFixedWidthArray array = getArray(oldPartitionNumber, serialNumber, oldchunksize);
+ array.remove(oldrownumber ,false);
index.remove(key);
return;
}
@@ -389,17 +400,19 @@ public class kelondroCollectionIndex {
// see if we need new space or if we can overwrite the old space
if (oldPartitionNumber == newPartitionNumber) {
- putreplace(
+ array_replace(
key, collection, indexrow,
- oldSerialNumber, this.payloadrow.objectsize(),
- oldPartitionNumber, oldrownumber); // modifies indexrow
+ oldPartitionNumber, serialNumber, this.payloadrow.objectsize(),
+ oldrownumber); // modifies indexrow
} else {
- puttransit(
+ array_remove(
+ oldPartitionNumber, serialNumber, this.payloadrow.objectsize(),
+ oldrownumber);
+ array_add(
key, collection, indexrow,
- oldSerialNumber, this.payloadrow.objectsize(),
- oldPartitionNumber, oldrownumber,
- newPartitionNumber); // modifies indexrow
+ newPartitionNumber, serialNumber, this.payloadrow.objectsize()); // modifies indexrow
}
+ arrayResolveRemoved(); // remove all to-be-removed marked entries
index.put(indexrow); // write modified indexrow
}
@@ -434,23 +447,68 @@ public class kelondroCollectionIndex {
// now iterate through the container lists and execute merges
// this is done in such a way, that there is a optimized path for the R/W head
- // write new containers
- i = newContainer.iterator();
- Object[] record;
- while (i.hasNext()) {
- record = (Object[]) i.next(); // {byte[], indexContainer}
- mergeNew((byte[]) record[0], (indexContainer) record[1]);
- }
-
// merge existing containers
i = existingContainer.iterator();
+ Object[] record;
ArrayList indexrows = new ArrayList();
+ kelondroRowCollection collection;
while (i.hasNext()) {
record = (Object[]) i.next(); // {byte[], indexContainer, kelondroRow.Entry}
+
+ // merge with the old collection
+ key = (byte[]) record[0];
+ collection = (kelondroRowCollection) record[1];
indexrow = (kelondroRow.Entry) record[2];
- mergeExisting((byte[]) record[0], (indexContainer) record[1], indexrow); // modifies indexrow
+
+ // read old information
+ int oldchunksize = (int) indexrow.getColLong(idx_col_chunksize); // needed only for migration
+ int oldchunkcount = (int) indexrow.getColLong(idx_col_chunkcount); // the number if rows in the collection
+ int oldrownumber = (int) indexrow.getColLong(idx_col_indexpos); // index of the entry in array
+ int oldPartitionNumber = (int) indexrow.getColByte(idx_col_clusteridx); // points to array file
+ assert (oldPartitionNumber >= arrayIndex(oldchunkcount));
+ int oldSerialNumber = 0;
+
+ // load the old collection and join it
+ kelondroRowSet oldcollection = getwithparams(indexrow, oldchunksize, oldchunkcount, oldPartitionNumber, oldrownumber, oldSerialNumber, false);
+
+ // join with new collection
+ oldcollection.addAllUnique(collection);
+ oldcollection.shape();
+ oldcollection.uniq(); // FIXME: not clear if it would be better to insert the collection with put to avoid double-entries
+ oldcollection.trim();
+ collection = oldcollection;
+
+ int newPartitionNumber = arrayIndex(collection.size());
+
+ // see if we need new space or if we can overwrite the old space
+ if (oldPartitionNumber == newPartitionNumber) {
+ array_replace(
+ key, collection, indexrow,
+ oldPartitionNumber, oldSerialNumber, this.payloadrow.objectsize(),
+ oldrownumber); // modifies indexrow
+ } else {
+ array_remove(
+ oldPartitionNumber, oldSerialNumber, this.payloadrow.objectsize(),
+ oldrownumber);
+ array_add(
+ key, collection, indexrow,
+ newPartitionNumber, oldSerialNumber, this.payloadrow.objectsize()); // modifies indexrow
+ }
+ arrayResolveRemoved(); // remove all to-be-removed marked entries
indexrows.add(indexrow); // indexrows are collected and written later as block
}
+
+ // write new containers
+ i = newContainer.iterator();
+ while (i.hasNext()) {
+ record = (Object[]) i.next(); // {byte[], indexContainer}
+ key = (byte[]) record[0];
+ collection = (indexContainer) record[1];
+ indexrow = array_new(key, collection); // modifies indexrow
+ index.addUnique(indexrow); // write modified indexrow
+ }
+
+ // write index entries
index.putMultiple(indexrows, new Date()); // write modified indexrows in optimized manner
}
@@ -461,60 +519,52 @@ public class kelondroCollectionIndex {
// first find an old entry, if one exists
kelondroRow.Entry indexrow = index.get(key);
if (indexrow == null) {
- mergeNew(key, container);
+ indexrow = array_new(key, container); // modifies indexrow
+ index.addUnique(indexrow); // write modified indexrow
} else {
- mergeExisting(key, container, indexrow); // modifies indexrow
+ // merge with the old collection
+ // attention! this modifies the indexrow entry which must be written with index.put(indexrow) afterwards!
+ kelondroRowCollection collection = (kelondroRowCollection) container;
+
+ // read old information
+ int oldchunksize = (int) indexrow.getColLong(idx_col_chunksize); // needed only for migration
+ int oldchunkcount = (int) indexrow.getColLong(idx_col_chunkcount); // the number if rows in the collection
+ int oldrownumber = (int) indexrow.getColLong(idx_col_indexpos); // index of the entry in array
+ int oldPartitionNumber = (int) indexrow.getColByte(idx_col_clusteridx); // points to array file
+ assert (oldPartitionNumber >= arrayIndex(oldchunkcount));
+ int oldSerialNumber = 0;
+
+ // load the old collection and join it
+ kelondroRowSet oldcollection = getwithparams(indexrow, oldchunksize, oldchunkcount, oldPartitionNumber, oldrownumber, oldSerialNumber, false);
+
+ // join with new collection
+ oldcollection.addAllUnique(collection);
+ oldcollection.shape();
+ oldcollection.uniq(); // FIXME: not clear if it would be better to insert the collection with put to avoid double-entries
+ oldcollection.trim();
+ collection = oldcollection;
+
+ int newPartitionNumber = arrayIndex(collection.size());
+
+ // see if we need new space or if we can overwrite the old space
+ if (oldPartitionNumber == newPartitionNumber) {
+ array_replace(
+ key, collection, indexrow,
+ oldPartitionNumber, oldSerialNumber, this.payloadrow.objectsize(),
+ oldrownumber); // modifies indexrow
+ } else {
+ array_remove(
+ oldPartitionNumber, oldSerialNumber, this.payloadrow.objectsize(),
+ oldrownumber);
+ array_add(
+ key, collection, indexrow,
+ newPartitionNumber, oldSerialNumber, this.payloadrow.objectsize()); // modifies indexrow
+ }
+ arrayResolveRemoved(); // remove all to-be-removed marked entries
index.put(indexrow); // write modified indexrow
}
}
- private void mergeNew(byte[] key, kelondroRowCollection collection) throws IOException, kelondroOutOfLimitsException {
- // create new row and index entry
-
- kelondroRow.Entry indexrow = putnew(key, collection); // modifies indexrow
- index.addUnique(indexrow); // write modified indexrow
- }
-
- private void mergeExisting(byte[] key, kelondroRowCollection collection, kelondroRow.Entry indexrow) throws IOException, kelondroOutOfLimitsException {
- // merge with the old collection
- // attention! this modifies the indexrow entry which must be written with index.put(indexrow) afterwards!
-
- // read old information
- int oldchunksize = (int) indexrow.getColLong(idx_col_chunksize); // needed only for migration
- int oldchunkcount = (int) indexrow.getColLong(idx_col_chunkcount); // the number if rows in the collection
- int oldrownumber = (int) indexrow.getColLong(idx_col_indexpos); // index of the entry in array
- int oldPartitionNumber = (int) indexrow.getColByte(idx_col_clusteridx); // points to array file
- assert (oldPartitionNumber >= arrayIndex(oldchunkcount));
- int oldSerialNumber = 0;
-
- // load the old collection and join it
- kelondroRowSet oldcollection = getwithparams(indexrow, oldchunksize, oldchunkcount, oldPartitionNumber, oldrownumber, oldSerialNumber, false);
-
- // join with new collection
- oldcollection.addAllUnique(collection);
- oldcollection.shape();
- oldcollection.uniq(); // FIXME: not clear if it would be better to insert the collection with put to avoid double-entries
- oldcollection.trim();
- collection = oldcollection;
-
- int newPartitionNumber = arrayIndex(collection.size());
-
- // see if we need new space or if we can overwrite the old space
- if (oldPartitionNumber == newPartitionNumber) {
- putreplace(
- key, collection, indexrow,
- oldSerialNumber, this.payloadrow.objectsize(),
- oldPartitionNumber, oldrownumber); // modifies indexrow
- } else {
- puttransit(
- key, collection, indexrow,
- oldSerialNumber, this.payloadrow.objectsize(),
- oldPartitionNumber, oldrownumber,
- newPartitionNumber); // modifies indexrow
- }
-
- }
-
public synchronized int remove(byte[] key, Set removekeys) throws IOException, kelondroOutOfLimitsException {
if ((removekeys == null) || (removekeys.size() == 0)) return 0;
@@ -531,12 +581,11 @@ public class kelondroCollectionIndex {
int oldrownumber = (int) indexrow.getColLong(idx_col_indexpos); // index of the entry in array
int oldPartitionNumber = (int) indexrow.getColByte(idx_col_clusteridx); // points to array file
assert (oldPartitionNumber >= arrayIndex(oldchunkcount));
- int oldSerialNumber = 0;
int removed = 0;
assert (removekeys != null);
// load the old collection and remove keys
- kelondroRowSet oldcollection = getwithparams(indexrow, oldchunksize, oldchunkcount, oldPartitionNumber, oldrownumber, oldSerialNumber, false);
+ kelondroRowSet oldcollection = getwithparams(indexrow, oldchunksize, oldchunkcount, oldPartitionNumber, oldrownumber, serialNumber, false);
// remove the keys from the set
Iterator i = removekeys.iterator();
@@ -551,8 +600,8 @@ public class kelondroCollectionIndex {
if (oldcollection.size() == 0) {
// delete the index entry and the array
- kelondroFixedWidthArray array = getArray(oldPartitionNumber, oldSerialNumber, oldchunksize);
- array.remove(oldrownumber);
+ kelondroFixedWidthArray array = getArray(oldPartitionNumber, serialNumber, oldchunksize);
+ array.remove(oldrownumber, false);
index.remove(key);
return removed;
}
@@ -561,17 +610,19 @@ public class kelondroCollectionIndex {
// see if we need new space or if we can overwrite the old space
if (oldPartitionNumber == newPartitionNumber) {
- putreplace(
+ array_replace(
key, oldcollection, indexrow,
- oldSerialNumber, this.payloadrow.objectsize(),
- oldPartitionNumber, oldrownumber); // modifies indexrow
+ oldPartitionNumber, serialNumber, this.payloadrow.objectsize(),
+ oldrownumber); // modifies indexrow
} else {
- puttransit(
+ array_remove(
+ oldPartitionNumber, serialNumber, this.payloadrow.objectsize(),
+ oldrownumber);
+ array_add(
key, oldcollection, indexrow,
- oldSerialNumber, this.payloadrow.objectsize(),
- oldPartitionNumber, oldrownumber,
- newPartitionNumber); // modifies indexrow
+ newPartitionNumber, serialNumber, this.payloadrow.objectsize()); // modifies indexrow
}
+ arrayResolveRemoved(); // remove all to-be-removed marked entries
index.put(indexrow); // write modified indexrow
return removed;
}
@@ -630,7 +681,7 @@ public class kelondroCollectionIndex {
if (!(index.row().objectOrder.wellformed(arraykey))) {
// cleanup for a bad bug that corrupted the database
index.remove(indexkey); // the RowCollection must be considered lost
- array.remove(rownumber); // loose the RowCollection (we don't know how much is lost)
+ array.remove(rownumber, false); // loose the RowCollection (we don't know how much is lost)
serverLog.logSevere("kelondroCollectionIndex." + array.filename, "lost a RowCollection because of a bad arraykey");
return new kelondroRowSet(this.payloadrow, 0);
}
@@ -658,7 +709,7 @@ public class kelondroCollectionIndex {
index.put(indexrow);
array.logFailure("INCONSISTENCY in " + arrayFile(this.path, this.filenameStub, this.loadfactor, chunksize, clusteridx, serialnumber).toString() + ": array has different chunkcount than index: index = " + chunkcount + ", array = " + chunkcountInArray + "; the index has been auto-fixed");
}
- if (remove) array.remove(rownumber); // index is removed in calling method
+ if (remove) array.remove(rownumber, false); // index is removed in calling method
return collection;
}
diff --git a/source/de/anomic/kelondro/kelondroFixedWidthArray.java b/source/de/anomic/kelondro/kelondroFixedWidthArray.java
index 464829fca..e07f20007 100644
--- a/source/de/anomic/kelondro/kelondroFixedWidthArray.java
+++ b/source/de/anomic/kelondro/kelondroFixedWidthArray.java
@@ -48,6 +48,8 @@ package de.anomic.kelondro;
import java.io.File;
import java.io.IOException;
+import java.util.Iterator;
+import java.util.TreeSet;
public class kelondroFixedWidthArray extends kelondroRecords implements kelondroArray {
@@ -55,6 +57,8 @@ public class kelondroFixedWidthArray extends kelondroRecords implements kelondro
private static short thisOHBytes = 0; // our record definition does not need extra bytes
private static short thisOHHandles = 0; // and no handles
+ private TreeSet markedRemoved; // a set of Integer indexes of removed records (only temporary)
+
public kelondroFixedWidthArray(File file, kelondroRow rowdef, int intprops) throws IOException {
// this creates a new array
super(file, 0, 0, thisOHBytes, thisOHHandles, rowdef, intprops, rowdef.columns() /* txtProps */, 80 /* txtPropWidth */);
@@ -67,6 +71,7 @@ public class kelondroFixedWidthArray extends kelondroRecords implements kelondro
try {super.setText(i, rowdef.column(i).toString().getBytes());} catch (IOException e) {}
}
}
+ markedRemoved = new TreeSet();
}
public kelondroFixedWidthArray(kelondroRA ra, kelondroRow rowdef, int intprops) throws IOException {
@@ -79,6 +84,7 @@ public class kelondroFixedWidthArray extends kelondroRecords implements kelondro
for (int i = 0; i < rowdef.columns(); i++) {
try {super.setText(i, rowdef.column(i).toString().getBytes());} catch (IOException e) {}
}
+ markedRemoved = new TreeSet();
}
public static kelondroFixedWidthArray open(File file, kelondroRow rowdef, int intprops) {
@@ -123,26 +129,51 @@ public class kelondroFixedWidthArray extends kelondroRecords implements kelondro
}
public synchronized int add(kelondroRow.Entry rowentry) throws IOException {
-
- Node n = newNode(rowentry.bytes());
- n.commit(CP_NONE);
-
- return n.handle().hashCode();
+ // adds a new rowentry, but re-uses a previously as-deleted marked entry
+ if (markedRemoved.size() == 0) {
+ // no records there to be re-used
+ Node n = newNode(rowentry.bytes());
+ n.commit(CP_NONE);
+ return n.handle().hashCode();
+ } else {
+ // re-use a removed record
+ Integer index = (Integer) markedRemoved.first();
+ markedRemoved.remove(index);
+ set(index.intValue(), rowentry);
+ return index.intValue();
+ }
}
+
+ public synchronized void remove(int index, boolean marked) throws IOException {
+ assert (index < (super.free() + super.size())) : "remove: index " + index + " out of bounds " + (super.free() + super.size());
+
+ if (marked) {
+ // does not remove directly, but sets only a mark that a record is to be deleted
+ // this record can be re-used with add
+ markedRemoved.add(new Integer(index));
+ } else {
+
+ // get the node at position index
+ Handle h = new Handle(index);
+ Node n = getNode(h, false);
- public synchronized void remove(int index) throws IOException {
- if (index >= (super.free() + super.size())) throw new IOException("remove: index " + index + " out of bounds " + (super.free() + super.size()));
-
- // get the node at position index
- Handle h = new Handle(index);
- Node n = getNode(h, false);
-
- // erase the row
- n.setValueRow(null);
- n.commit(CP_NONE);
+ // erase the row
+ n.setValueRow(null);
+ n.commit(CP_NONE);
- // mark row as deleted so it can be re-used
- deleteNode(h);
+ // mark row as deleted so it can be re-used
+ deleteNode(h);
+ }
+ }
+
+ public synchronized void resolveMarkedRemoved() throws IOException {
+ Iterator i = markedRemoved.iterator();
+ Integer index;
+ while (i.hasNext()) {
+ index = (Integer) i.next();
+ remove(index.intValue(), false);
+ }
+ markedRemoved.clear();
}
public void print() throws IOException {
@@ -181,14 +212,14 @@ public class kelondroFixedWidthArray extends kelondroRecords implements kelondro
k = new kelondroFixedWidthArray(f, rowdef, 6);
k.add(k.row().newEntry(new byte[][]{"a".getBytes(), "xxxx".getBytes()}));
k.add(k.row().newEntry(new byte[][]{"b".getBytes(), "xxxx".getBytes()}));
- k.remove(0);
+ k.remove(0, false);
k.add(k.row().newEntry(new byte[][]{"c".getBytes(), "xxxx".getBytes()}));
k.add(k.row().newEntry(new byte[][]{"d".getBytes(), "xxxx".getBytes()}));
k.add(k.row().newEntry(new byte[][]{"e".getBytes(), "xxxx".getBytes()}));
k.add(k.row().newEntry(new byte[][]{"f".getBytes(), "xxxx".getBytes()}));
- k.remove(0);
- k.remove(1);
+ k.remove(0, false);
+ k.remove(1, false);
k.print();
k.print(true);
@@ -203,9 +234,10 @@ public class kelondroFixedWidthArray extends kelondroRecords implements kelondro
k.add(k.row().newEntry(new byte[][]{(Integer.toString(i) + "-" + Integer.toString(j)).getBytes(), "xxxx".getBytes()}));
}
for (int j = 0; j < i; j++) {
- k.remove(j);
+ k.remove(j, true);
}
}
+ k.resolveMarkedRemoved();
k.print();
k.print(true);
k.close();
diff --git a/source/de/anomic/kelondro/kelondroFlexTable.java b/source/de/anomic/kelondro/kelondroFlexTable.java
index 6ce428919..4cc4ed455 100644
--- a/source/de/anomic/kelondro/kelondroFlexTable.java
+++ b/source/de/anomic/kelondro/kelondroFlexTable.java
@@ -235,7 +235,7 @@ public class kelondroFlexTable extends kelondroFlexWidthArray implements kelondr
if (i < 0) return null;
kelondroRow.Entry r;
r = super.get(i);
- super.remove(i);
+ super.remove(i, false);
return r;
}
@@ -244,7 +244,7 @@ public class kelondroFlexTable extends kelondroFlexWidthArray implements kelondr
if (i < 0) return null;
kelondroRow.Entry r;
r = super.get(i);
- super.remove(i);
+ super.remove(i, false);
return r;
}
diff --git a/source/de/anomic/kelondro/kelondroFlexWidthArray.java b/source/de/anomic/kelondro/kelondroFlexWidthArray.java
index 96fe56004..26c0d9005 100644
--- a/source/de/anomic/kelondro/kelondroFlexWidthArray.java
+++ b/source/de/anomic/kelondro/kelondroFlexWidthArray.java
@@ -268,12 +268,12 @@ public class kelondroFlexWidthArray implements kelondroArray {
return p;
}
- public void remove(int index) throws IOException {
+ public void remove(int index, boolean marked) throws IOException {
int r = 0;
synchronized (col) {
// remove only from the first column
- col[0].remove(index);
+ col[0].remove(index, marked);
r = r + col[r].row().columns();
// the other columns will be blanked out only
@@ -284,6 +284,12 @@ public class kelondroFlexWidthArray implements kelondroArray {
}
}
+ public synchronized void resolveMarkedRemoved() throws IOException {
+ synchronized (col) {
+ col[0].resolveMarkedRemoved();
+ }
+ }
+
public void print() throws IOException {
System.out.println("PRINTOUT of table, length=" + size());
kelondroRow.Entry row;
@@ -308,14 +314,14 @@ public class kelondroFlexWidthArray implements kelondroArray {
kelondroFlexWidthArray k = kelondroFlexWidthArray.open(f, "flextest", rowdef);
k.add(k.row().newEntry(new byte[][]{"a".getBytes(), "xxxx".getBytes()}));
k.add(k.row().newEntry(new byte[][]{"b".getBytes(), "xxxx".getBytes()}));
- k.remove(0);
+ k.remove(0, false);
k.add(k.row().newEntry(new byte[][]{"c".getBytes(), "xxxx".getBytes()}));
k.add(k.row().newEntry(new byte[][]{"d".getBytes(), "xxxx".getBytes()}));
k.add(k.row().newEntry(new byte[][]{"e".getBytes(), "xxxx".getBytes()}));
k.add(k.row().newEntry(new byte[][]{"f".getBytes(), "xxxx".getBytes()}));
- k.remove(0);
- k.remove(1);
+ k.remove(0, false);
+ k.remove(1, false);
k.print();
k.col[0].print(true);
@@ -335,10 +341,11 @@ public class kelondroFlexWidthArray implements kelondroArray {
k.close();
k = kelondroFlexWidthArray.open(f, "flextest", rowdef);
for (int j = 0; j < i; j++) {
- k.remove(i*2 - j - 1);
+ k.remove(i*2 - j - 1, true);
}
k.close();
}
+ k.resolveMarkedRemoved();
k = kelondroFlexWidthArray.open(f, "flextest", rowdef);
k.print();
k.col[0].print(true);
diff --git a/source/de/anomic/plasma/plasmaSwitchboard.java b/source/de/anomic/plasma/plasmaSwitchboard.java
index 0608614e6..d23ad82a0 100644
--- a/source/de/anomic/plasma/plasmaSwitchboard.java
+++ b/source/de/anomic/plasma/plasmaSwitchboard.java
@@ -1628,7 +1628,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
public void deQueueFreeMem() {
// flush some entries from the RAM cache
- wordIndex.flushCacheSome(false);
+ wordIndex.flushCacheSome();
// adopt maximum cache size to current size to prevent that further OutOfMemoryErrors occur
int newMaxCount = Math.max(2000, Math.min((int) getConfigLong(WORDCACHE_MAX_COUNT, 20000), wordIndex.dhtOutCacheSize()));
setConfig(WORDCACHE_MAX_COUNT, Integer.toString(newMaxCount));
@@ -1644,8 +1644,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
}
// flush some entries from the RAM cache
- // (new permanent cache flushing)
- wordIndex.flushCacheSome(sbQueue.size() != 0);
+ if (sbQueue.size() == 0) wordIndex.flushCacheSome(); // permanent flushing only if we are not busy
wordIndex.loadedURL.flushCacheSome();
boolean doneSomething = false;
diff --git a/source/de/anomic/plasma/plasmaWordIndex.java b/source/de/anomic/plasma/plasmaWordIndex.java
index 1d59f4cb6..1540cb0fe 100644
--- a/source/de/anomic/plasma/plasmaWordIndex.java
+++ b/source/de/anomic/plasma/plasmaWordIndex.java
@@ -55,13 +55,11 @@ import de.anomic.yacy.yacyDHTAction;
public final class plasmaWordIndex implements indexRI {
- private static final int flushsize = 2000;
-
private final kelondroOrder indexOrder = kelondroBase64Order.enhancedCoder;
private final indexRAMRI dhtOutCache, dhtInCache;
private final indexCollectionRI collections; // new database structure to replace AssortmentCluster and FileCluster
public boolean busyCacheFlush; // shows if a cache flush is currently performed
- private int idleDivisor, busyDivisor;
+ private int flushsize;
public final plasmaCrawlLURL loadedURL;
public plasmaWordIndex(File indexRoot, long rwibuffer, long lurlbuffer, long preloadTime, serverLog log) {
@@ -80,8 +78,7 @@ public final class plasmaWordIndex implements indexRI {
// performance settings
busyCacheFlush = false;
- this.busyDivisor = 5000;
- this.idleDivisor = 420;
+ this.flushsize = 2000;
}
public int minMem() {
@@ -128,9 +125,8 @@ public final class plasmaWordIndex implements indexRI {
dhtInCache.setMaxWordCount(maxWords);
}
- public void setWordFlushDivisor(int idleDivisor, int busyDivisor) {
- this.idleDivisor = idleDivisor;
- this.busyDivisor = busyDivisor;
+ public void setWordFlushSize(int flushsize) {
+ this.flushsize = flushsize;
}
public void flushControl() {
@@ -185,23 +181,19 @@ public final class plasmaWordIndex implements indexRI {
}
}
- public void flushCacheSome(boolean busy) {
- flushCacheSome(dhtOutCache, busy);
- flushCacheSome(dhtInCache, busy);
+ public void flushCacheSome() {
+ flushCacheSome(dhtOutCache);
+ flushCacheSome(dhtInCache);
}
- private void flushCacheSome(indexRAMRI ram, boolean busy) {
- int flushCount = (busy) ? ram.size() / busyDivisor : ram.size() / idleDivisor;
- if (flushCount > 1000) flushCount = 1000;
- if (flushCount >= 1) {
- flushCache(ram, flushCount);
- }
+ private void flushCacheSome(indexRAMRI ram) {
+ flushCache(ram, flushsize);
while (ram.maxURLinCache() >= 2040) flushCache(ram, 1);
}
private void flushCache(indexRAMRI ram, int count) {
if (count <= 0) return;
- if (count >= 5000) count = 5000;
+ if (count > 5000) count = 5000;
busyCacheFlush = true;
String wordHash;
ArrayList containerList = new ArrayList();
diff --git a/yacy.init b/yacy.init
index 4e329d63c..d620488eb 100644
--- a/yacy.init
+++ b/yacy.init
@@ -588,14 +588,9 @@ javastart_priority=0
# wordCacheMaxLow/High is the number of word indexes that shall be held in the
# ram cache during indexing. When YaCy is shut down, this cache must be
# flushed to disc; this may last some minutes.
-# The low value is valid for crawling tasks, the high value is valid for
-# remote index transmissions and search requests
-# maxWaitingWordFlush gives the number of seconds that the shutdown
-# may last for the word flush
wordCacheMaxCount = 20000
wordCacheInitCount = 30000
-wordFlushIdleDivisor = 400;
-wordFlushBusyDivisor = 4000;
+wordFlushSize = 2000;
# Specifies if yacy can be used as transparent http proxy.
#