second Generation of collection R/W head path optimization:

- permanent cache flush is switched off. The optimized cache flush
  works better if it is a large number of collections that is flushed
  together
- the flush size can be configured instead the flush divisor. There is
  only one size for all flushes
- collection records that shall be removed during collection transition
  (jump from one collection file to another) are now not really removed
  but only marked in RAM. add-operations to the collection use these
  marked collection spaces
- index bulk write operations are now separated for each file of a kelondroFlex


git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@3414 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
orbiter 18 years ago
parent e92e8b2ae3
commit f4cfd19835

@ -143,20 +143,12 @@
</td> </td>
</tr> </tr>
<tr valign="top" class="TableCellDark"> <tr valign="top" class="TableCellDark">
<td>word-flush idle divisor:</td> <td>word flush size:</td>
<td colspan="2"> <td colspan="2">
<input name="wordFlushIdleDivisor" type="text" size="20" maxlength="100" value="#[wordFlushIdleDivisor]#" /> <input name="wordFlushSize" type="text" size="20" maxlength="100" value="#[wordFlushSize]#" />
</td> </td>
<td rowspan="2"> <td>
The word flush divisor is applied when an indexing loop is executed. The word flush size is applied when an indexing loop is executed, and the cache size is exceeded.
The divisor is used to divide the number of words in the cache; the result is the number of words that are flushed.
The idleDivisor is used if the indexing queue is empty, the busyDivisor is used if the indexing queue contains at least one element.
</td>
</tr>
<tr valign="top" class="TableCellDark">
<td>word-flush busy divisor:</td>
<td colspan="2">
<input name="wordFlushBusyDivisor" type="text" size="20" maxlength="100" value="#[wordFlushBusyDivisor]#" />
</td> </td>
</tr> </tr>
<tr valign="top" class="TableCellLight"> <tr valign="top" class="TableCellLight">

@ -190,14 +190,9 @@ public class PerformanceQueues_p {
int wordCacheInitCount = post.getInt("wordCacheInitCount", 30000); int wordCacheInitCount = post.getInt("wordCacheInitCount", 30000);
switchboard.setConfig("wordCacheInitCount", Integer.toString(wordCacheInitCount)); switchboard.setConfig("wordCacheInitCount", Integer.toString(wordCacheInitCount));
int maxWaitingWordFlush = post.getInt("maxWaitingWordFlush", 180); int flushsize = post.getInt("wordFlushSize", 2000);
switchboard.setConfig("maxWaitingWordFlush", Integer.toString(maxWaitingWordFlush)); switchboard.setConfig("wordFlushSize", Integer.toString(flushsize));
switchboard.wordIndex.setWordFlushSize(flushsize);
int wordFlushIdleDivisor = post.getInt("wordFlushIdleDivisor", 420);
switchboard.setConfig("wordFlushIdleDivisor", Integer.toString(wordFlushIdleDivisor));
int wordFlushBusyDivisor = post.getInt("wordFlushBusyDivisor", 5000);
switchboard.setConfig("wordFlushBusyDivisor", Integer.toString(wordFlushBusyDivisor));
switchboard.wordIndex.setWordFlushDivisor(wordFlushIdleDivisor, wordFlushBusyDivisor);
} }
if ((post != null) && (post.containsKey("poolConfig"))) { if ((post != null) && (post.containsKey("poolConfig"))) {
@ -287,8 +282,7 @@ public class PerformanceQueues_p {
prop.put("wordOutCacheMaxCount", switchboard.getConfigLong("wordCacheMaxCount", 20000)); prop.put("wordOutCacheMaxCount", switchboard.getConfigLong("wordCacheMaxCount", 20000));
prop.put("wordInCacheMaxCount", switchboard.getConfigLong("indexDistribution.dhtReceiptLimit", 1000)); prop.put("wordInCacheMaxCount", switchboard.getConfigLong("indexDistribution.dhtReceiptLimit", 1000));
prop.put("wordCacheInitCount", switchboard.getConfigLong("wordCacheInitCount", 30000)); prop.put("wordCacheInitCount", switchboard.getConfigLong("wordCacheInitCount", 30000));
prop.put("wordFlushIdleDivisor", switchboard.getConfigLong("wordFlushIdleDivisor", 420)); prop.put("wordFlushSize", switchboard.getConfigLong("wordFlushSize", 2000));
prop.put("wordFlushBusyDivisor", switchboard.getConfigLong("wordFlushBusyDivisor", 5000));
prop.put("onlineCautionDelay", switchboard.getConfig("onlineCautionDelay", "30000")); prop.put("onlineCautionDelay", switchboard.getConfig("onlineCautionDelay", "30000"));
prop.put("onlineCautionDelayCurrent", System.currentTimeMillis() - switchboard.proxyLastAccess); prop.put("onlineCautionDelayCurrent", System.currentTimeMillis() - switchboard.proxyLastAccess);

@ -41,14 +41,12 @@ import de.anomic.server.logging.serverLog;
public class indexCachedRI implements indexRI { public class indexCachedRI implements indexRI {
private static final int flushsize = 1000;
private kelondroRow payloadrow; private kelondroRow payloadrow;
private kelondroOrder indexOrder; private kelondroOrder indexOrder;
private indexRAMRI riExtern, riIntern; private indexRAMRI riExtern, riIntern;
private indexCollectionRI backend; private indexCollectionRI backend;
public boolean busyCacheFlush; // shows if a cache flush is currently performed public boolean busyCacheFlush; // shows if a cache flush is currently performed
private int idleDivisor, busyDivisor; private int flushsize;
public indexCachedRI(indexRAMRI riExtern, indexRAMRI riIntern, indexCollectionRI backend, kelondroOrder payloadorder, kelondroRow payloadrow, serverLog log) { public indexCachedRI(indexRAMRI riExtern, indexRAMRI riIntern, indexCollectionRI backend, kelondroOrder payloadorder, kelondroRow payloadrow, serverLog log) {
this.riExtern = riExtern; this.riExtern = riExtern;
@ -57,8 +55,7 @@ public class indexCachedRI implements indexRI {
this.indexOrder = payloadorder; this.indexOrder = payloadorder;
this.payloadrow = payloadrow; this.payloadrow = payloadrow;
this.busyCacheFlush = false; this.busyCacheFlush = false;
this.busyDivisor = 5000; this.flushsize = 2000;
this.idleDivisor = 420;
} }
public kelondroRow payloadrow() { public kelondroRow payloadrow() {
@ -69,9 +66,8 @@ public class indexCachedRI implements indexRI {
return 1024 * 1024; return 1024 * 1024;
} }
public void setWordFlushDivisor(int idleDivisor, int busyDivisor) { public void setWordFlushSize(int flushsize) {
this.idleDivisor = idleDivisor; this.flushsize = flushsize;
this.busyDivisor = busyDivisor;
} }
public void flushControl() { public void flushControl() {
@ -102,22 +98,19 @@ public class indexCachedRI implements indexRI {
} }
} }
public void flushCacheSome(boolean busy) { public void flushCacheSome() {
flushCacheSome(riExtern, busy); flushCacheSome(riExtern);
flushCacheSome(riIntern, busy); flushCacheSome(riIntern);
} }
private void flushCacheSome(indexRAMRI ram, boolean busy) { private void flushCacheSome(indexRAMRI ram) {
int flushCount = (busy) ? ram.size() / busyDivisor : ram.size() / idleDivisor; flushCache(ram, flushsize);
if (flushCount > 100) flushCount = 100; while (ram.maxURLinCache() > 2048) flushCache(ram, 1);
if (flushCount < 1) flushCount = Math.min(1, ram.size());
flushCache(ram, flushCount);
while (ram.maxURLinCache() > 1024) flushCache(ram, 1);
} }
private void flushCache(indexRAMRI ram, int count) { private void flushCache(indexRAMRI ram, int count) {
if (count <= 0) return; if (count <= 0) return;
if (count > 1000) count = 1000; if (count > 5000) count = 5000;
busyCacheFlush = true; busyCacheFlush = true;
String wordHash; String wordHash;
ArrayList containerList = new ArrayList(); ArrayList containerList = new ArrayList();

@ -38,7 +38,9 @@ public interface kelondroArray {
public int add(kelondroRow.Entry rowinstance) throws IOException; public int add(kelondroRow.Entry rowinstance) throws IOException;
public void remove(int index) throws IOException; public void remove(int index, boolean marked) throws IOException;
public void resolveMarkedRemoved() throws IOException;
public void print() throws IOException; public void print() throws IOException;

@ -41,8 +41,10 @@ import de.anomic.server.logging.serverLog;
public class kelondroCollectionIndex { public class kelondroCollectionIndex {
private static final int serialNumber = 0;
protected kelondroIndex index; protected kelondroIndex index;
int keylength; private int keylength;
private File path; private File path;
private String filenameStub; private String filenameStub;
private int loadfactor; private int loadfactor;
@ -235,6 +237,13 @@ public class kelondroCollectionIndex {
return array; return array;
} }
private void arrayResolveRemoved() throws IOException {
Iterator i = arrays.values().iterator();
while (i.hasNext()) {
((kelondroFixedWidthArray) i.next()).resolveMarkedRemoved();
}
}
private int arrayCapacity(int arrayCounter) { private int arrayCapacity(int arrayCounter) {
int load = this.loadfactor; int load = this.loadfactor;
for (int i = 0; i < arrayCounter; i++) load = load * this.loadfactor; for (int i = 0; i < arrayCounter; i++) load = load * this.loadfactor;
@ -269,11 +278,11 @@ public class kelondroCollectionIndex {
return 2 * m * this.payloadrow.objectsize; return 2 * m * this.payloadrow.objectsize;
} }
private kelondroRow.Entry putnew(byte[] key, kelondroRowCollection collection) throws IOException { private kelondroRow.Entry array_new(byte[] key, kelondroRowCollection collection) throws IOException {
// the collection is new // the collection is new
int newPartitionNumber = arrayIndex(collection.size()); int newPartitionNumber = arrayIndex(collection.size());
kelondroRow.Entry indexrow = index.row().newEntry(); kelondroRow.Entry indexrow = index.row().newEntry();
kelondroFixedWidthArray array = getArray(newPartitionNumber, 0, this.payloadrow.objectsize()); kelondroFixedWidthArray array = getArray(newPartitionNumber, serialNumber, this.payloadrow.objectsize());
// define row // define row
kelondroRow.Entry arrayEntry = array.row().newEntry(); kelondroRow.Entry arrayEntry = array.row().newEntry();
@ -293,17 +302,26 @@ public class kelondroCollectionIndex {
indexrow.setCol(idx_col_lastread, kelondroRowCollection.daysSince2000(System.currentTimeMillis())); indexrow.setCol(idx_col_lastread, kelondroRowCollection.daysSince2000(System.currentTimeMillis()));
indexrow.setCol(idx_col_lastwrote, kelondroRowCollection.daysSince2000(System.currentTimeMillis())); indexrow.setCol(idx_col_lastwrote, kelondroRowCollection.daysSince2000(System.currentTimeMillis()));
// after calling this method there mus be a index.addUnique(indexrow); // after calling this method there must be an index.addUnique(indexrow);
return indexrow; return indexrow;
} }
private void putreplace( private void array_remove(
int oldPartitionNumber, int serialNumber, int chunkSize,
int oldRownumber) throws IOException {
// we need a new slot, that means we must first delete the old entry
// find array file
kelondroFixedWidthArray array = getArray(oldPartitionNumber, serialNumber, chunkSize);
// delete old entry
array.remove(oldRownumber, true);
}
private void array_add(
byte[] key, kelondroRowCollection collection, kelondroRow.Entry indexrow, byte[] key, kelondroRowCollection collection, kelondroRow.Entry indexrow,
int serialNumber, int chunkSize, int partitionNumber, int serialNumber, int chunkSize) throws IOException {
int partitionNumber, int rownumber) throws IOException {
// we don't need a new slot, just write collection into the old one
// find array file // write a new entry in the other array
kelondroFixedWidthArray array = getArray(partitionNumber, serialNumber, chunkSize); kelondroFixedWidthArray array = getArray(partitionNumber, serialNumber, chunkSize);
// define new row // define new row
@ -311,44 +329,38 @@ public class kelondroCollectionIndex {
arrayEntry.setCol(0, key); arrayEntry.setCol(0, key);
arrayEntry.setCol(1, collection.exportCollection()); arrayEntry.setCol(1, collection.exportCollection());
// overwrite entry in this array // write a new entry in this array
array.set(rownumber, arrayEntry); int rowNumber = array.add(arrayEntry);
// update the index entry // store the new row number in the index
indexrow.setCol(idx_col_chunkcount, collection.size()); indexrow.setCol(idx_col_chunkcount, collection.size());
indexrow.setCol(idx_col_clusteridx, (byte) partitionNumber); indexrow.setCol(idx_col_clusteridx, (byte) partitionNumber);
indexrow.setCol(idx_col_indexpos, (long) rowNumber);
indexrow.setCol(idx_col_lastwrote, kelondroRowCollection.daysSince2000(System.currentTimeMillis())); indexrow.setCol(idx_col_lastwrote, kelondroRowCollection.daysSince2000(System.currentTimeMillis()));
// after calling this method there mus be a index.put(indexrow); // after calling this method there must be a index.put(indexrow);
} }
private void puttransit( private void array_replace(
byte[] key, kelondroRowCollection collection, kelondroRow.Entry indexrow, byte[] key, kelondroRowCollection collection, kelondroRow.Entry indexrow,
int serialNumber, int chunkSize, int partitionNumber, int serialNumber, int chunkSize,
int oldPartitionNumber, int oldRownumber, int rowNumber) throws IOException {
int newPartitionNumber) throws IOException { // we don't need a new slot, just write collection into the old one
// we need a new slot, that means we must first delete the old entry
// find array file
kelondroFixedWidthArray array = getArray(oldPartitionNumber, serialNumber, chunkSize);
// delete old entry
array.remove(oldRownumber);
// write a new entry in the other array // find array file
array = getArray(newPartitionNumber, 0, this.payloadrow.objectsize()); kelondroFixedWidthArray array = getArray(partitionNumber, serialNumber, chunkSize);
// define row // define new row
kelondroRow.Entry arrayEntry = array.row().newEntry(); kelondroRow.Entry arrayEntry = array.row().newEntry();
arrayEntry.setCol(0, key); arrayEntry.setCol(0, key);
arrayEntry.setCol(1, collection.exportCollection()); arrayEntry.setCol(1, collection.exportCollection());
// write a new entry in this array // overwrite entry in this array
int newRowNumber = array.add(arrayEntry); array.set(rowNumber, arrayEntry);
// store the new row number in the index // update the index entry
indexrow.setCol(idx_col_chunkcount, collection.size()); indexrow.setCol(idx_col_chunkcount, collection.size());
indexrow.setCol(idx_col_clusteridx, (byte) newPartitionNumber); indexrow.setCol(idx_col_clusteridx, (byte) partitionNumber);
indexrow.setCol(idx_col_indexpos, (long) newRowNumber);
indexrow.setCol(idx_col_lastwrote, kelondroRowCollection.daysSince2000(System.currentTimeMillis())); indexrow.setCol(idx_col_lastwrote, kelondroRowCollection.daysSince2000(System.currentTimeMillis()));
// after calling this method there mus be a index.put(indexrow); // after calling this method there mus be a index.put(indexrow);
@ -362,7 +374,7 @@ public class kelondroCollectionIndex {
if (indexrow == null) { if (indexrow == null) {
// create new row and index entry // create new row and index entry
if ((collection != null) && (collection.size() > 0)) { if ((collection != null) && (collection.size() > 0)) {
indexrow = putnew(key, collection); // modifies indexrow indexrow = array_new(key, collection); // modifies indexrow
index.addUnique(indexrow); index.addUnique(indexrow);
} }
return; return;
@ -375,12 +387,11 @@ public class kelondroCollectionIndex {
int oldrownumber = (int) indexrow.getColLong(idx_col_indexpos); // index of the entry in array int oldrownumber = (int) indexrow.getColLong(idx_col_indexpos); // index of the entry in array
int oldPartitionNumber = (int) indexrow.getColByte(idx_col_clusteridx); // points to array file int oldPartitionNumber = (int) indexrow.getColByte(idx_col_clusteridx); // points to array file
assert (oldPartitionNumber >= arrayIndex(oldchunkcount)); assert (oldPartitionNumber >= arrayIndex(oldchunkcount));
int oldSerialNumber = 0;
if ((collection == null) || (collection.size() == 0)) { if ((collection == null) || (collection.size() == 0)) {
// delete the index entry and the array // delete the index entry and the array
kelondroFixedWidthArray array = getArray(oldPartitionNumber, oldSerialNumber, oldchunksize); kelondroFixedWidthArray array = getArray(oldPartitionNumber, serialNumber, oldchunksize);
array.remove(oldrownumber); array.remove(oldrownumber ,false);
index.remove(key); index.remove(key);
return; return;
} }
@ -389,17 +400,19 @@ public class kelondroCollectionIndex {
// see if we need new space or if we can overwrite the old space // see if we need new space or if we can overwrite the old space
if (oldPartitionNumber == newPartitionNumber) { if (oldPartitionNumber == newPartitionNumber) {
putreplace( array_replace(
key, collection, indexrow, key, collection, indexrow,
oldSerialNumber, this.payloadrow.objectsize(), oldPartitionNumber, serialNumber, this.payloadrow.objectsize(),
oldPartitionNumber, oldrownumber); // modifies indexrow oldrownumber); // modifies indexrow
} else { } else {
puttransit( array_remove(
oldPartitionNumber, serialNumber, this.payloadrow.objectsize(),
oldrownumber);
array_add(
key, collection, indexrow, key, collection, indexrow,
oldSerialNumber, this.payloadrow.objectsize(), newPartitionNumber, serialNumber, this.payloadrow.objectsize()); // modifies indexrow
oldPartitionNumber, oldrownumber,
newPartitionNumber); // modifies indexrow
} }
arrayResolveRemoved(); // remove all to-be-removed marked entries
index.put(indexrow); // write modified indexrow index.put(indexrow); // write modified indexrow
} }
@ -434,23 +447,68 @@ public class kelondroCollectionIndex {
// now iterate through the container lists and execute merges // now iterate through the container lists and execute merges
// this is done in such a way, that there is a optimized path for the R/W head // this is done in such a way, that there is a optimized path for the R/W head
// write new containers
i = newContainer.iterator();
Object[] record;
while (i.hasNext()) {
record = (Object[]) i.next(); // {byte[], indexContainer}
mergeNew((byte[]) record[0], (indexContainer) record[1]);
}
// merge existing containers // merge existing containers
i = existingContainer.iterator(); i = existingContainer.iterator();
Object[] record;
ArrayList indexrows = new ArrayList(); ArrayList indexrows = new ArrayList();
kelondroRowCollection collection;
while (i.hasNext()) { while (i.hasNext()) {
record = (Object[]) i.next(); // {byte[], indexContainer, kelondroRow.Entry} record = (Object[]) i.next(); // {byte[], indexContainer, kelondroRow.Entry}
// merge with the old collection
key = (byte[]) record[0];
collection = (kelondroRowCollection) record[1];
indexrow = (kelondroRow.Entry) record[2]; indexrow = (kelondroRow.Entry) record[2];
mergeExisting((byte[]) record[0], (indexContainer) record[1], indexrow); // modifies indexrow
// read old information
int oldchunksize = (int) indexrow.getColLong(idx_col_chunksize); // needed only for migration
int oldchunkcount = (int) indexrow.getColLong(idx_col_chunkcount); // the number if rows in the collection
int oldrownumber = (int) indexrow.getColLong(idx_col_indexpos); // index of the entry in array
int oldPartitionNumber = (int) indexrow.getColByte(idx_col_clusteridx); // points to array file
assert (oldPartitionNumber >= arrayIndex(oldchunkcount));
int oldSerialNumber = 0;
// load the old collection and join it
kelondroRowSet oldcollection = getwithparams(indexrow, oldchunksize, oldchunkcount, oldPartitionNumber, oldrownumber, oldSerialNumber, false);
// join with new collection
oldcollection.addAllUnique(collection);
oldcollection.shape();
oldcollection.uniq(); // FIXME: not clear if it would be better to insert the collection with put to avoid double-entries
oldcollection.trim();
collection = oldcollection;
int newPartitionNumber = arrayIndex(collection.size());
// see if we need new space or if we can overwrite the old space
if (oldPartitionNumber == newPartitionNumber) {
array_replace(
key, collection, indexrow,
oldPartitionNumber, oldSerialNumber, this.payloadrow.objectsize(),
oldrownumber); // modifies indexrow
} else {
array_remove(
oldPartitionNumber, oldSerialNumber, this.payloadrow.objectsize(),
oldrownumber);
array_add(
key, collection, indexrow,
newPartitionNumber, oldSerialNumber, this.payloadrow.objectsize()); // modifies indexrow
}
arrayResolveRemoved(); // remove all to-be-removed marked entries
indexrows.add(indexrow); // indexrows are collected and written later as block indexrows.add(indexrow); // indexrows are collected and written later as block
} }
// write new containers
i = newContainer.iterator();
while (i.hasNext()) {
record = (Object[]) i.next(); // {byte[], indexContainer}
key = (byte[]) record[0];
collection = (indexContainer) record[1];
indexrow = array_new(key, collection); // modifies indexrow
index.addUnique(indexrow); // write modified indexrow
}
// write index entries
index.putMultiple(indexrows, new Date()); // write modified indexrows in optimized manner index.putMultiple(indexrows, new Date()); // write modified indexrows in optimized manner
} }
@ -461,23 +519,12 @@ public class kelondroCollectionIndex {
// first find an old entry, if one exists // first find an old entry, if one exists
kelondroRow.Entry indexrow = index.get(key); kelondroRow.Entry indexrow = index.get(key);
if (indexrow == null) { if (indexrow == null) {
mergeNew(key, container); indexrow = array_new(key, container); // modifies indexrow
} else {
mergeExisting(key, container, indexrow); // modifies indexrow
index.put(indexrow); // write modified indexrow
}
}
private void mergeNew(byte[] key, kelondroRowCollection collection) throws IOException, kelondroOutOfLimitsException {
// create new row and index entry
kelondroRow.Entry indexrow = putnew(key, collection); // modifies indexrow
index.addUnique(indexrow); // write modified indexrow index.addUnique(indexrow); // write modified indexrow
} } else {
private void mergeExisting(byte[] key, kelondroRowCollection collection, kelondroRow.Entry indexrow) throws IOException, kelondroOutOfLimitsException {
// merge with the old collection // merge with the old collection
// attention! this modifies the indexrow entry which must be written with index.put(indexrow) afterwards! // attention! this modifies the indexrow entry which must be written with index.put(indexrow) afterwards!
kelondroRowCollection collection = (kelondroRowCollection) container;
// read old information // read old information
int oldchunksize = (int) indexrow.getColLong(idx_col_chunksize); // needed only for migration int oldchunksize = (int) indexrow.getColLong(idx_col_chunksize); // needed only for migration
@ -501,18 +548,21 @@ public class kelondroCollectionIndex {
// see if we need new space or if we can overwrite the old space // see if we need new space or if we can overwrite the old space
if (oldPartitionNumber == newPartitionNumber) { if (oldPartitionNumber == newPartitionNumber) {
putreplace( array_replace(
key, collection, indexrow, key, collection, indexrow,
oldSerialNumber, this.payloadrow.objectsize(), oldPartitionNumber, oldSerialNumber, this.payloadrow.objectsize(),
oldPartitionNumber, oldrownumber); // modifies indexrow oldrownumber); // modifies indexrow
} else { } else {
puttransit( array_remove(
oldPartitionNumber, oldSerialNumber, this.payloadrow.objectsize(),
oldrownumber);
array_add(
key, collection, indexrow, key, collection, indexrow,
oldSerialNumber, this.payloadrow.objectsize(), newPartitionNumber, oldSerialNumber, this.payloadrow.objectsize()); // modifies indexrow
oldPartitionNumber, oldrownumber, }
newPartitionNumber); // modifies indexrow arrayResolveRemoved(); // remove all to-be-removed marked entries
index.put(indexrow); // write modified indexrow
} }
} }
public synchronized int remove(byte[] key, Set removekeys) throws IOException, kelondroOutOfLimitsException { public synchronized int remove(byte[] key, Set removekeys) throws IOException, kelondroOutOfLimitsException {
@ -531,12 +581,11 @@ public class kelondroCollectionIndex {
int oldrownumber = (int) indexrow.getColLong(idx_col_indexpos); // index of the entry in array int oldrownumber = (int) indexrow.getColLong(idx_col_indexpos); // index of the entry in array
int oldPartitionNumber = (int) indexrow.getColByte(idx_col_clusteridx); // points to array file int oldPartitionNumber = (int) indexrow.getColByte(idx_col_clusteridx); // points to array file
assert (oldPartitionNumber >= arrayIndex(oldchunkcount)); assert (oldPartitionNumber >= arrayIndex(oldchunkcount));
int oldSerialNumber = 0;
int removed = 0; int removed = 0;
assert (removekeys != null); assert (removekeys != null);
// load the old collection and remove keys // load the old collection and remove keys
kelondroRowSet oldcollection = getwithparams(indexrow, oldchunksize, oldchunkcount, oldPartitionNumber, oldrownumber, oldSerialNumber, false); kelondroRowSet oldcollection = getwithparams(indexrow, oldchunksize, oldchunkcount, oldPartitionNumber, oldrownumber, serialNumber, false);
// remove the keys from the set // remove the keys from the set
Iterator i = removekeys.iterator(); Iterator i = removekeys.iterator();
@ -551,8 +600,8 @@ public class kelondroCollectionIndex {
if (oldcollection.size() == 0) { if (oldcollection.size() == 0) {
// delete the index entry and the array // delete the index entry and the array
kelondroFixedWidthArray array = getArray(oldPartitionNumber, oldSerialNumber, oldchunksize); kelondroFixedWidthArray array = getArray(oldPartitionNumber, serialNumber, oldchunksize);
array.remove(oldrownumber); array.remove(oldrownumber, false);
index.remove(key); index.remove(key);
return removed; return removed;
} }
@ -561,17 +610,19 @@ public class kelondroCollectionIndex {
// see if we need new space or if we can overwrite the old space // see if we need new space or if we can overwrite the old space
if (oldPartitionNumber == newPartitionNumber) { if (oldPartitionNumber == newPartitionNumber) {
putreplace( array_replace(
key, oldcollection, indexrow, key, oldcollection, indexrow,
oldSerialNumber, this.payloadrow.objectsize(), oldPartitionNumber, serialNumber, this.payloadrow.objectsize(),
oldPartitionNumber, oldrownumber); // modifies indexrow oldrownumber); // modifies indexrow
} else { } else {
puttransit( array_remove(
oldPartitionNumber, serialNumber, this.payloadrow.objectsize(),
oldrownumber);
array_add(
key, oldcollection, indexrow, key, oldcollection, indexrow,
oldSerialNumber, this.payloadrow.objectsize(), newPartitionNumber, serialNumber, this.payloadrow.objectsize()); // modifies indexrow
oldPartitionNumber, oldrownumber,
newPartitionNumber); // modifies indexrow
} }
arrayResolveRemoved(); // remove all to-be-removed marked entries
index.put(indexrow); // write modified indexrow index.put(indexrow); // write modified indexrow
return removed; return removed;
} }
@ -630,7 +681,7 @@ public class kelondroCollectionIndex {
if (!(index.row().objectOrder.wellformed(arraykey))) { if (!(index.row().objectOrder.wellformed(arraykey))) {
// cleanup for a bad bug that corrupted the database // cleanup for a bad bug that corrupted the database
index.remove(indexkey); // the RowCollection must be considered lost index.remove(indexkey); // the RowCollection must be considered lost
array.remove(rownumber); // loose the RowCollection (we don't know how much is lost) array.remove(rownumber, false); // loose the RowCollection (we don't know how much is lost)
serverLog.logSevere("kelondroCollectionIndex." + array.filename, "lost a RowCollection because of a bad arraykey"); serverLog.logSevere("kelondroCollectionIndex." + array.filename, "lost a RowCollection because of a bad arraykey");
return new kelondroRowSet(this.payloadrow, 0); return new kelondroRowSet(this.payloadrow, 0);
} }
@ -658,7 +709,7 @@ public class kelondroCollectionIndex {
index.put(indexrow); index.put(indexrow);
array.logFailure("INCONSISTENCY in " + arrayFile(this.path, this.filenameStub, this.loadfactor, chunksize, clusteridx, serialnumber).toString() + ": array has different chunkcount than index: index = " + chunkcount + ", array = " + chunkcountInArray + "; the index has been auto-fixed"); array.logFailure("INCONSISTENCY in " + arrayFile(this.path, this.filenameStub, this.loadfactor, chunksize, clusteridx, serialnumber).toString() + ": array has different chunkcount than index: index = " + chunkcount + ", array = " + chunkcountInArray + "; the index has been auto-fixed");
} }
if (remove) array.remove(rownumber); // index is removed in calling method if (remove) array.remove(rownumber, false); // index is removed in calling method
return collection; return collection;
} }

@ -48,6 +48,8 @@ package de.anomic.kelondro;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.Iterator;
import java.util.TreeSet;
public class kelondroFixedWidthArray extends kelondroRecords implements kelondroArray { public class kelondroFixedWidthArray extends kelondroRecords implements kelondroArray {
@ -55,6 +57,8 @@ public class kelondroFixedWidthArray extends kelondroRecords implements kelondro
private static short thisOHBytes = 0; // our record definition does not need extra bytes private static short thisOHBytes = 0; // our record definition does not need extra bytes
private static short thisOHHandles = 0; // and no handles private static short thisOHHandles = 0; // and no handles
private TreeSet markedRemoved; // a set of Integer indexes of removed records (only temporary)
public kelondroFixedWidthArray(File file, kelondroRow rowdef, int intprops) throws IOException { public kelondroFixedWidthArray(File file, kelondroRow rowdef, int intprops) throws IOException {
// this creates a new array // this creates a new array
super(file, 0, 0, thisOHBytes, thisOHHandles, rowdef, intprops, rowdef.columns() /* txtProps */, 80 /* txtPropWidth */); super(file, 0, 0, thisOHBytes, thisOHHandles, rowdef, intprops, rowdef.columns() /* txtProps */, 80 /* txtPropWidth */);
@ -67,6 +71,7 @@ public class kelondroFixedWidthArray extends kelondroRecords implements kelondro
try {super.setText(i, rowdef.column(i).toString().getBytes());} catch (IOException e) {} try {super.setText(i, rowdef.column(i).toString().getBytes());} catch (IOException e) {}
} }
} }
markedRemoved = new TreeSet();
} }
public kelondroFixedWidthArray(kelondroRA ra, kelondroRow rowdef, int intprops) throws IOException { public kelondroFixedWidthArray(kelondroRA ra, kelondroRow rowdef, int intprops) throws IOException {
@ -79,6 +84,7 @@ public class kelondroFixedWidthArray extends kelondroRecords implements kelondro
for (int i = 0; i < rowdef.columns(); i++) { for (int i = 0; i < rowdef.columns(); i++) {
try {super.setText(i, rowdef.column(i).toString().getBytes());} catch (IOException e) {} try {super.setText(i, rowdef.column(i).toString().getBytes());} catch (IOException e) {}
} }
markedRemoved = new TreeSet();
} }
public static kelondroFixedWidthArray open(File file, kelondroRow rowdef, int intprops) { public static kelondroFixedWidthArray open(File file, kelondroRow rowdef, int intprops) {
@ -123,15 +129,29 @@ public class kelondroFixedWidthArray extends kelondroRecords implements kelondro
} }
public synchronized int add(kelondroRow.Entry rowentry) throws IOException { public synchronized int add(kelondroRow.Entry rowentry) throws IOException {
// adds a new rowentry, but re-uses a previously as-deleted marked entry
if (markedRemoved.size() == 0) {
// no records there to be re-used
Node n = newNode(rowentry.bytes()); Node n = newNode(rowentry.bytes());
n.commit(CP_NONE); n.commit(CP_NONE);
return n.handle().hashCode(); return n.handle().hashCode();
} else {
// re-use a removed record
Integer index = (Integer) markedRemoved.first();
markedRemoved.remove(index);
set(index.intValue(), rowentry);
return index.intValue();
} }
}
public synchronized void remove(int index, boolean marked) throws IOException {
assert (index < (super.free() + super.size())) : "remove: index " + index + " out of bounds " + (super.free() + super.size());
public synchronized void remove(int index) throws IOException { if (marked) {
if (index >= (super.free() + super.size())) throw new IOException("remove: index " + index + " out of bounds " + (super.free() + super.size())); // does not remove directly, but sets only a mark that a record is to be deleted
// this record can be re-used with add
markedRemoved.add(new Integer(index));
} else {
// get the node at position index // get the node at position index
Handle h = new Handle(index); Handle h = new Handle(index);
@ -144,6 +164,17 @@ public class kelondroFixedWidthArray extends kelondroRecords implements kelondro
// mark row as deleted so it can be re-used // mark row as deleted so it can be re-used
deleteNode(h); deleteNode(h);
} }
}
public synchronized void resolveMarkedRemoved() throws IOException {
Iterator i = markedRemoved.iterator();
Integer index;
while (i.hasNext()) {
index = (Integer) i.next();
remove(index.intValue(), false);
}
markedRemoved.clear();
}
public void print() throws IOException { public void print() throws IOException {
System.out.println("PRINTOUT of table, length=" + size()); System.out.println("PRINTOUT of table, length=" + size());
@ -181,14 +212,14 @@ public class kelondroFixedWidthArray extends kelondroRecords implements kelondro
k = new kelondroFixedWidthArray(f, rowdef, 6); k = new kelondroFixedWidthArray(f, rowdef, 6);
k.add(k.row().newEntry(new byte[][]{"a".getBytes(), "xxxx".getBytes()})); k.add(k.row().newEntry(new byte[][]{"a".getBytes(), "xxxx".getBytes()}));
k.add(k.row().newEntry(new byte[][]{"b".getBytes(), "xxxx".getBytes()})); k.add(k.row().newEntry(new byte[][]{"b".getBytes(), "xxxx".getBytes()}));
k.remove(0); k.remove(0, false);
k.add(k.row().newEntry(new byte[][]{"c".getBytes(), "xxxx".getBytes()})); k.add(k.row().newEntry(new byte[][]{"c".getBytes(), "xxxx".getBytes()}));
k.add(k.row().newEntry(new byte[][]{"d".getBytes(), "xxxx".getBytes()})); k.add(k.row().newEntry(new byte[][]{"d".getBytes(), "xxxx".getBytes()}));
k.add(k.row().newEntry(new byte[][]{"e".getBytes(), "xxxx".getBytes()})); k.add(k.row().newEntry(new byte[][]{"e".getBytes(), "xxxx".getBytes()}));
k.add(k.row().newEntry(new byte[][]{"f".getBytes(), "xxxx".getBytes()})); k.add(k.row().newEntry(new byte[][]{"f".getBytes(), "xxxx".getBytes()}));
k.remove(0); k.remove(0, false);
k.remove(1); k.remove(1, false);
k.print(); k.print();
k.print(true); k.print(true);
@ -203,9 +234,10 @@ public class kelondroFixedWidthArray extends kelondroRecords implements kelondro
k.add(k.row().newEntry(new byte[][]{(Integer.toString(i) + "-" + Integer.toString(j)).getBytes(), "xxxx".getBytes()})); k.add(k.row().newEntry(new byte[][]{(Integer.toString(i) + "-" + Integer.toString(j)).getBytes(), "xxxx".getBytes()}));
} }
for (int j = 0; j < i; j++) { for (int j = 0; j < i; j++) {
k.remove(j); k.remove(j, true);
} }
} }
k.resolveMarkedRemoved();
k.print(); k.print();
k.print(true); k.print(true);
k.close(); k.close();

@ -235,7 +235,7 @@ public class kelondroFlexTable extends kelondroFlexWidthArray implements kelondr
if (i < 0) return null; if (i < 0) return null;
kelondroRow.Entry r; kelondroRow.Entry r;
r = super.get(i); r = super.get(i);
super.remove(i); super.remove(i, false);
return r; return r;
} }
@ -244,7 +244,7 @@ public class kelondroFlexTable extends kelondroFlexWidthArray implements kelondr
if (i < 0) return null; if (i < 0) return null;
kelondroRow.Entry r; kelondroRow.Entry r;
r = super.get(i); r = super.get(i);
super.remove(i); super.remove(i, false);
return r; return r;
} }

@ -268,12 +268,12 @@ public class kelondroFlexWidthArray implements kelondroArray {
return p; return p;
} }
public void remove(int index) throws IOException { public void remove(int index, boolean marked) throws IOException {
int r = 0; int r = 0;
synchronized (col) { synchronized (col) {
// remove only from the first column // remove only from the first column
col[0].remove(index); col[0].remove(index, marked);
r = r + col[r].row().columns(); r = r + col[r].row().columns();
// the other columns will be blanked out only // the other columns will be blanked out only
@ -284,6 +284,12 @@ public class kelondroFlexWidthArray implements kelondroArray {
} }
} }
public synchronized void resolveMarkedRemoved() throws IOException {
synchronized (col) {
col[0].resolveMarkedRemoved();
}
}
public void print() throws IOException { public void print() throws IOException {
System.out.println("PRINTOUT of table, length=" + size()); System.out.println("PRINTOUT of table, length=" + size());
kelondroRow.Entry row; kelondroRow.Entry row;
@ -308,14 +314,14 @@ public class kelondroFlexWidthArray implements kelondroArray {
kelondroFlexWidthArray k = kelondroFlexWidthArray.open(f, "flextest", rowdef); kelondroFlexWidthArray k = kelondroFlexWidthArray.open(f, "flextest", rowdef);
k.add(k.row().newEntry(new byte[][]{"a".getBytes(), "xxxx".getBytes()})); k.add(k.row().newEntry(new byte[][]{"a".getBytes(), "xxxx".getBytes()}));
k.add(k.row().newEntry(new byte[][]{"b".getBytes(), "xxxx".getBytes()})); k.add(k.row().newEntry(new byte[][]{"b".getBytes(), "xxxx".getBytes()}));
k.remove(0); k.remove(0, false);
k.add(k.row().newEntry(new byte[][]{"c".getBytes(), "xxxx".getBytes()})); k.add(k.row().newEntry(new byte[][]{"c".getBytes(), "xxxx".getBytes()}));
k.add(k.row().newEntry(new byte[][]{"d".getBytes(), "xxxx".getBytes()})); k.add(k.row().newEntry(new byte[][]{"d".getBytes(), "xxxx".getBytes()}));
k.add(k.row().newEntry(new byte[][]{"e".getBytes(), "xxxx".getBytes()})); k.add(k.row().newEntry(new byte[][]{"e".getBytes(), "xxxx".getBytes()}));
k.add(k.row().newEntry(new byte[][]{"f".getBytes(), "xxxx".getBytes()})); k.add(k.row().newEntry(new byte[][]{"f".getBytes(), "xxxx".getBytes()}));
k.remove(0); k.remove(0, false);
k.remove(1); k.remove(1, false);
k.print(); k.print();
k.col[0].print(true); k.col[0].print(true);
@ -335,10 +341,11 @@ public class kelondroFlexWidthArray implements kelondroArray {
k.close(); k.close();
k = kelondroFlexWidthArray.open(f, "flextest", rowdef); k = kelondroFlexWidthArray.open(f, "flextest", rowdef);
for (int j = 0; j < i; j++) { for (int j = 0; j < i; j++) {
k.remove(i*2 - j - 1); k.remove(i*2 - j - 1, true);
} }
k.close(); k.close();
} }
k.resolveMarkedRemoved();
k = kelondroFlexWidthArray.open(f, "flextest", rowdef); k = kelondroFlexWidthArray.open(f, "flextest", rowdef);
k.print(); k.print();
k.col[0].print(true); k.col[0].print(true);

@ -1628,7 +1628,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
public void deQueueFreeMem() { public void deQueueFreeMem() {
// flush some entries from the RAM cache // flush some entries from the RAM cache
wordIndex.flushCacheSome(false); wordIndex.flushCacheSome();
// adopt maximum cache size to current size to prevent that further OutOfMemoryErrors occur // adopt maximum cache size to current size to prevent that further OutOfMemoryErrors occur
int newMaxCount = Math.max(2000, Math.min((int) getConfigLong(WORDCACHE_MAX_COUNT, 20000), wordIndex.dhtOutCacheSize())); int newMaxCount = Math.max(2000, Math.min((int) getConfigLong(WORDCACHE_MAX_COUNT, 20000), wordIndex.dhtOutCacheSize()));
setConfig(WORDCACHE_MAX_COUNT, Integer.toString(newMaxCount)); setConfig(WORDCACHE_MAX_COUNT, Integer.toString(newMaxCount));
@ -1644,8 +1644,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
} }
// flush some entries from the RAM cache // flush some entries from the RAM cache
// (new permanent cache flushing) if (sbQueue.size() == 0) wordIndex.flushCacheSome(); // permanent flushing only if we are not busy
wordIndex.flushCacheSome(sbQueue.size() != 0);
wordIndex.loadedURL.flushCacheSome(); wordIndex.loadedURL.flushCacheSome();
boolean doneSomething = false; boolean doneSomething = false;

@ -55,13 +55,11 @@ import de.anomic.yacy.yacyDHTAction;
public final class plasmaWordIndex implements indexRI { public final class plasmaWordIndex implements indexRI {
private static final int flushsize = 2000;
private final kelondroOrder indexOrder = kelondroBase64Order.enhancedCoder; private final kelondroOrder indexOrder = kelondroBase64Order.enhancedCoder;
private final indexRAMRI dhtOutCache, dhtInCache; private final indexRAMRI dhtOutCache, dhtInCache;
private final indexCollectionRI collections; // new database structure to replace AssortmentCluster and FileCluster private final indexCollectionRI collections; // new database structure to replace AssortmentCluster and FileCluster
public boolean busyCacheFlush; // shows if a cache flush is currently performed public boolean busyCacheFlush; // shows if a cache flush is currently performed
private int idleDivisor, busyDivisor; private int flushsize;
public final plasmaCrawlLURL loadedURL; public final plasmaCrawlLURL loadedURL;
public plasmaWordIndex(File indexRoot, long rwibuffer, long lurlbuffer, long preloadTime, serverLog log) { public plasmaWordIndex(File indexRoot, long rwibuffer, long lurlbuffer, long preloadTime, serverLog log) {
@ -80,8 +78,7 @@ public final class plasmaWordIndex implements indexRI {
// performance settings // performance settings
busyCacheFlush = false; busyCacheFlush = false;
this.busyDivisor = 5000; this.flushsize = 2000;
this.idleDivisor = 420;
} }
public int minMem() { public int minMem() {
@ -128,9 +125,8 @@ public final class plasmaWordIndex implements indexRI {
dhtInCache.setMaxWordCount(maxWords); dhtInCache.setMaxWordCount(maxWords);
} }
public void setWordFlushDivisor(int idleDivisor, int busyDivisor) { public void setWordFlushSize(int flushsize) {
this.idleDivisor = idleDivisor; this.flushsize = flushsize;
this.busyDivisor = busyDivisor;
} }
public void flushControl() { public void flushControl() {
@ -185,23 +181,19 @@ public final class plasmaWordIndex implements indexRI {
} }
} }
public void flushCacheSome(boolean busy) { public void flushCacheSome() {
flushCacheSome(dhtOutCache, busy); flushCacheSome(dhtOutCache);
flushCacheSome(dhtInCache, busy); flushCacheSome(dhtInCache);
} }
private void flushCacheSome(indexRAMRI ram, boolean busy) { private void flushCacheSome(indexRAMRI ram) {
int flushCount = (busy) ? ram.size() / busyDivisor : ram.size() / idleDivisor; flushCache(ram, flushsize);
if (flushCount > 1000) flushCount = 1000;
if (flushCount >= 1) {
flushCache(ram, flushCount);
}
while (ram.maxURLinCache() >= 2040) flushCache(ram, 1); while (ram.maxURLinCache() >= 2040) flushCache(ram, 1);
} }
private void flushCache(indexRAMRI ram, int count) { private void flushCache(indexRAMRI ram, int count) {
if (count <= 0) return; if (count <= 0) return;
if (count >= 5000) count = 5000; if (count > 5000) count = 5000;
busyCacheFlush = true; busyCacheFlush = true;
String wordHash; String wordHash;
ArrayList containerList = new ArrayList(); ArrayList containerList = new ArrayList();

@ -588,14 +588,9 @@ javastart_priority=0
# wordCacheMaxLow/High is the number of word indexes that shall be held in the # wordCacheMaxLow/High is the number of word indexes that shall be held in the
# ram cache during indexing. When YaCy is shut down, this cache must be # ram cache during indexing. When YaCy is shut down, this cache must be
# flushed to disc; this may last some minutes. # flushed to disc; this may last some minutes.
# The low value is valid for crawling tasks, the high value is valid for
# remote index transmissions and search requests
# maxWaitingWordFlush gives the number of seconds that the shutdown
# may last for the word flush
wordCacheMaxCount = 20000 wordCacheMaxCount = 20000
wordCacheInitCount = 30000 wordCacheInitCount = 30000
wordFlushIdleDivisor = 400; wordFlushSize = 2000;
wordFlushBusyDivisor = 4000;
# Specifies if yacy can be used as transparent http proxy. # Specifies if yacy can be used as transparent http proxy.
# #

Loading…
Cancel
Save