third generation of R/W head path optimization

- data from collection arrays are read in order
- merged data is written in order

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@3419 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
orbiter 18 years ago
parent 1fe505f0b0
commit 51e12049fa

@ -322,6 +322,16 @@ public final class indexRAMRI implements indexRI {
}
public synchronized String maxScoreWordHash() {
if (cache.size() == 0) return null;
try {
return (String) hashScore.getMaxObject();
} catch (Exception e) {
log.logSevere("flushFromMem: " + e.getMessage(), e);
}
return null;
}
public synchronized String bestFlushWordHash() {
// select appropriate hash
// we have 2 different methods to find a good hash:

@ -34,6 +34,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import de.anomic.index.indexContainer;
import de.anomic.server.serverFileUtils;
@ -342,6 +343,52 @@ public class kelondroCollectionIndex {
// after calling this method there must be a index.put(indexrow);
}
private ArrayList array_add_multiple(TreeMap array_add_map, int serialNumber, int chunkSize) throws IOException {
// returns a List of kelondroRow.Entry entries for indexrow storage
Map.Entry entry;
Iterator i = array_add_map.entrySet().iterator();
Iterator j;
ArrayList actionList;
int partitionNumber;
kelondroFixedWidthArray array;
Object[] objs;
byte[] key;
kelondroRowCollection collection;
kelondroRow.Entry indexrow;
ArrayList indexrows = new ArrayList();
while (i.hasNext()) {
entry = (Map.Entry) i.next();
actionList = (ArrayList) entry.getValue();
partitionNumber = ((Integer) entry.getKey()).intValue();
array = getArray(partitionNumber, serialNumber, chunkSize);
j = actionList.iterator();
while (j.hasNext()) {
objs = (Object[]) j.next();
key = (byte[]) objs[0];
collection = (kelondroRowCollection) objs[1];
indexrow = (kelondroRow.Entry) objs[2];
// define new row
kelondroRow.Entry arrayEntry = array.row().newEntry();
arrayEntry.setCol(0, key);
arrayEntry.setCol(1, collection.exportCollection());
// write a new entry in this array
int rowNumber = array.add(arrayEntry);
// store the new row number in the index
indexrow.setCol(idx_col_chunkcount, collection.size());
indexrow.setCol(idx_col_clusteridx, (byte) partitionNumber);
indexrow.setCol(idx_col_indexpos, (long) rowNumber);
indexrow.setCol(idx_col_lastwrote, kelondroRowCollection.daysSince2000(System.currentTimeMillis()));
indexrows.add(indexrow);
}
}
// after calling this method there must be a index.put(indexrow);
return indexrows;
}
private void array_replace(
byte[] key, kelondroRowCollection collection, kelondroRow.Entry indexrow,
int partitionNumber, int serialNumber, int chunkSize,
@ -367,6 +414,53 @@ public class kelondroCollectionIndex {
// after calling this method there mus be a index.put(indexrow);
}
private ArrayList array_replace_multiple(TreeMap array_replace_map, int serialNumber, int chunkSize) throws IOException {
Map.Entry entry, e;
Iterator i = array_replace_map.entrySet().iterator();
Iterator j;
TreeMap actionMap;
int partitionNumber;
kelondroFixedWidthArray array;
ArrayList indexrows = new ArrayList();
Object[] objs;
int rowNumber;
byte[] key;
kelondroRowCollection collection;
kelondroRow.Entry indexrow;
while (i.hasNext()) {
entry = (Map.Entry) i.next();
actionMap = (TreeMap) entry.getValue();
partitionNumber = ((Integer) entry.getKey()).intValue();
array = getArray(partitionNumber, serialNumber, chunkSize);
j = actionMap.entrySet().iterator();
while (j.hasNext()) {
e = (Map.Entry) j.next();
rowNumber = ((Integer) e.getKey()).intValue();
objs = (Object[]) e.getValue();
key = (byte[]) objs[0];
collection = (kelondroRowCollection) objs[1];
indexrow = (kelondroRow.Entry) objs[2];
// define new row
kelondroRow.Entry arrayEntry = array.row().newEntry();
arrayEntry.setCol(0, key);
arrayEntry.setCol(1, collection.exportCollection());
// overwrite entry in this array
array.set(rowNumber, arrayEntry);
// update the index entry
indexrow.setCol(idx_col_chunkcount, collection.size());
indexrow.setCol(idx_col_clusteridx, (byte) partitionNumber);
indexrow.setCol(idx_col_lastwrote, kelondroRowCollection.daysSince2000(System.currentTimeMillis()));
indexrows.add(indexrow);
}
}
// after calling this method there mus be a index.put(indexrow);
return indexrows;
}
public synchronized void put(byte[] key, kelondroRowCollection collection) throws IOException, kelondroOutOfLimitsException {
// first find an old entry, if one exists
@ -428,8 +522,11 @@ public class kelondroCollectionIndex {
indexContainer container;
byte[] key;
ArrayList newContainer = new ArrayList();
ArrayList existingContainer = new ArrayList();
TreeMap existingContainer = new TreeMap(); // a mapping from Integer (partition) to a TreeMap (mapping from index to object triple)
TreeMap containerMap; // temporary map; mapping from index position to object triple with {key, container, indexrow}
kelondroRow.Entry indexrow;
int oldrownumber1; // index of the entry in array
int oldPartitionNumber1; // points to array file
while (i.hasNext()) {
container = (indexContainer) i.next();
@ -441,7 +538,12 @@ public class kelondroCollectionIndex {
if (indexrow == null) {
newContainer.add(new Object[]{key, container});
} else {
existingContainer.add(new Object[]{key, container, indexrow});
oldrownumber1 = (int) indexrow.getColLong(idx_col_indexpos);
oldPartitionNumber1 = (int) indexrow.getColByte(idx_col_clusteridx);
containerMap = (TreeMap) existingContainer.get(new Integer(oldPartitionNumber1));
if (containerMap == null) containerMap = new TreeMap();
containerMap.put(new Integer(oldrownumber1), new Object[]{key, container, indexrow});
existingContainer.put(new Integer(oldPartitionNumber1), containerMap);
}
}
@ -449,56 +551,90 @@ public class kelondroCollectionIndex {
// this is done in such a way, that there is a optimized path for the R/W head
// merge existing containers
i = existingContainer.iterator();
Map.Entry tripleEntry;
Object[] record;
ArrayList indexrows_existing = new ArrayList();
kelondroRowCollection collection;
while (i.hasNext()) {
record = (Object[]) i.next(); // {byte[], indexContainer, kelondroRow.Entry}
TreeMap array_replace_map = new TreeMap();
TreeMap array_add_map = new TreeMap();
ArrayList actionList;
TreeMap actionMap;
while (existingContainer.size() > 0) {
oldPartitionNumber1 = ((Integer) existingContainer.lastKey()).intValue();
containerMap = (TreeMap) existingContainer.remove(new Integer(oldPartitionNumber1));
Iterator j = containerMap.entrySet().iterator();
while (j.hasNext()) {
tripleEntry = (Map.Entry) j.next();
oldrownumber1 = ((Integer) tripleEntry.getKey()).intValue();
record = (Object[]) tripleEntry.getValue(); // {byte[], indexContainer, kelondroRow.Entry}
// merge with the old collection
key = (byte[]) record[0];
collection = (kelondroRowCollection) record[1];
indexrow = (kelondroRow.Entry) record[2];
// read old information
int oldchunksize = (int) indexrow.getColLong(idx_col_chunksize); // needed only for migration
int oldchunkcount = (int) indexrow.getColLong(idx_col_chunkcount); // the number if rows in the collection
int oldrownumber = (int) indexrow.getColLong(idx_col_indexpos); // index of the entry in array
int oldPartitionNumber = (int) indexrow.getColByte(idx_col_clusteridx); // points to array file
assert (oldPartitionNumber >= arrayIndex(oldchunkcount));
int oldSerialNumber = 0;
// load the old collection and join it
kelondroRowSet oldcollection = getwithparams(indexrow, oldchunksize, oldchunkcount, oldPartitionNumber, oldrownumber, oldSerialNumber, false);
// merge with the old collection
key = (byte[]) record[0];
collection = (kelondroRowCollection) record[1];
indexrow = (kelondroRow.Entry) record[2];
// read old information
int oldchunksize = (int) indexrow.getColLong(idx_col_chunksize); // needed only for migration
int oldchunkcount = (int) indexrow.getColLong(idx_col_chunkcount); // the number if rows in the collection
int oldrownumber = (int) indexrow.getColLong(idx_col_indexpos); // index of the entry in array
int oldPartitionNumber = (int) indexrow.getColByte(idx_col_clusteridx); // points to array file
assert oldPartitionNumber1 == oldPartitionNumber;
assert oldrownumber1 == oldrownumber;
assert (oldPartitionNumber >= arrayIndex(oldchunkcount));
int oldSerialNumber = 0;
// load the old collection and join it
kelondroRowSet oldcollection = getwithparams(indexrow, oldchunksize, oldchunkcount, oldPartitionNumber, oldrownumber, oldSerialNumber, false);
// join with new collection
oldcollection.addAllUnique(collection);
oldcollection.shape();
oldcollection.uniq(); // FIXME: not clear if it would be better to insert the collection with put to avoid double-entries
oldcollection.trim();
collection = oldcollection;
int newPartitionNumber = arrayIndex(collection.size());
// see if we need new space or if we can overwrite the old space
if (oldPartitionNumber == newPartitionNumber) {
array_replace(
key, collection, indexrow,
oldPartitionNumber, oldSerialNumber, this.payloadrow.objectsize(),
oldrownumber); // modifies indexrow
} else {
array_remove(
oldPartitionNumber, oldSerialNumber, this.payloadrow.objectsize(),
oldrownumber);
array_add(
key, collection, indexrow,
newPartitionNumber, oldSerialNumber, this.payloadrow.objectsize()); // modifies indexrow
// join with new collection
oldcollection.addAllUnique(collection);
oldcollection.shape();
oldcollection.uniq(); // FIXME: not clear if it would be better to insert the collection with put to avoid double-entries
oldcollection.trim();
collection = oldcollection;
int newPartitionNumber = arrayIndex(collection.size());
// see if we need new space or if we can overwrite the old space
if (oldPartitionNumber == newPartitionNumber) {
actionMap = (TreeMap) array_replace_map.get(new Integer(oldPartitionNumber));
if (actionMap == null) actionMap = new TreeMap();
actionMap.put(new Integer(oldrownumber), new Object[]{key, collection, indexrow});
array_replace_map.put(new Integer(oldPartitionNumber), actionMap);
/*
array_replace(
key, collection, indexrow,
oldPartitionNumber, oldSerialNumber, this.payloadrow.objectsize(),
oldrownumber); // modifies indexrow
indexrows_existing.add(indexrow); // indexrows are collected and written later as block
*/
} else {
array_remove(
oldPartitionNumber, oldSerialNumber, this.payloadrow.objectsize(),
oldrownumber);
actionList = (ArrayList) array_add_map.get(new Integer(newPartitionNumber));
if (actionList == null) actionList = new ArrayList();
actionList.add(new Object[]{key, collection, indexrow});
array_add_map.put(new Integer(newPartitionNumber), actionList);
/*
array_add(
key, collection, indexrow,
newPartitionNumber, oldSerialNumber, this.payloadrow.objectsize()); // modifies indexrow
indexrows_existing.add(indexrow); // indexrows are collected and written later as block
*/
}
// memory protection: flush collected collections
}
arrayResolveRemoved(); // remove all to-be-removed marked entries
indexrows_existing.add(indexrow); // indexrows are collected and written later as block
}
// finallly flush the collected collections
indexrows_existing.addAll(array_replace_multiple(array_replace_map, 0, this.payloadrow.objectsize()));
array_replace_map = new TreeMap(); // delete references
indexrows_existing.addAll(array_add_multiple(array_add_map, 0, this.payloadrow.objectsize()));
array_add_map = new TreeMap(); // delete references
// write new containers
i = newContainer.iterator();
ArrayList indexrows_new = new ArrayList();
@ -510,6 +646,9 @@ public class kelondroCollectionIndex {
indexrows_new.add(indexrow); // collect new index rows
}
// remove all to-be-removed marked entries
arrayResolveRemoved();
// write index entries
index.putMultiple(indexrows_existing, new Date()); // write modified indexrows in optimized manner
index.addUniqueMultiple(indexrows_new, new Date()); // write new indexrows in optimized manner

@ -114,7 +114,7 @@ public class kelondroFlexTable extends kelondroFlexWidthArray implements kelondr
public boolean has(byte[] key) throws IOException {
// it is not recommended to implement or use a has predicate unless
// it can be ensured that it causes no IO
assert (RAMIndex == true) : "RAM index warning in file " + super.tablename;
if ((kelondroRecords.debugmode) && (RAMIndex != true)) serverLog.logWarning("kelondroFlexTable", "RAM index warning in file " + super.tablename);
return index.geti(key) >= 0;
}

@ -65,8 +65,8 @@ public final class plasmaWordIndex implements indexRI {
public plasmaWordIndex(File indexRoot, long rwibuffer, long lurlbuffer, long preloadTime, serverLog log) {
File textindexcache = new File(indexRoot, "PUBLIC/TEXT/RICACHE");
if (!(textindexcache.exists())) textindexcache.mkdirs();
this.dhtOutCache = new indexRAMRI(textindexcache, indexRWIEntryNew.urlEntryRow, 2040, "dump1.array", log);
this.dhtInCache = new indexRAMRI(textindexcache, indexRWIEntryNew.urlEntryRow, 2040, "dump2.array", log);
this.dhtOutCache = new indexRAMRI(textindexcache, indexRWIEntryNew.urlEntryRow, 4000, "dump1.array", log);
this.dhtInCache = new indexRAMRI(textindexcache, indexRWIEntryNew.urlEntryRow, 4000, "dump2.array", log);
// create collections storage path
File textindexcollections = new File(indexRoot, "PUBLIC/TEXT/RICOLLECTION");
@ -182,29 +182,37 @@ public final class plasmaWordIndex implements indexRI {
}
public void flushCacheSome() {
flushCacheSome(dhtOutCache);
flushCacheSome(dhtInCache);
}
private void flushCacheSome(indexRAMRI ram) {
flushCache(ram, flushsize);
while (ram.maxURLinCache() >= 2040) flushCache(ram, 1);
flushCache(dhtOutCache, flushsize);
flushCache(dhtInCache, flushsize);
}
private void flushCache(indexRAMRI ram, int count) {
if (ram.size() <= 5000) return;
if (count <= 0) return;
if (count > 5000) count = 5000;
busyCacheFlush = true;
String wordHash;
ArrayList containerList = new ArrayList();
synchronized (this) {
boolean collectMax = true;
indexContainer c;
while (collectMax) {
wordHash = ram.maxScoreWordHash();
c = ram.getContainer(wordHash, null, -1);
if ((c != null) && (c.size() > 4000)) {
containerList.add(ram.deleteContainer(wordHash));
} else {
collectMax = false;
}
}
count = count - containerList.size();
for (int i = 0; i < count; i++) { // possible position of outOfMemoryError ?
if (ram.size() == 0) break;
// select one word to flush
wordHash = ram.bestFlushWordHash();
// move one container from ram to flush list
indexContainer c = ram.deleteContainer(wordHash);
c = ram.deleteContainer(wordHash);
if (c != null) containerList.add(c);
}
// flush the containers

@ -51,8 +51,6 @@ import java.util.TreeMap;
import de.anomic.server.logging.serverLog;
public abstract class serverAbstractSwitch implements serverSwitch {
private static final long delayBetweenSave = 2000;
// configuration management
private final File configFile;
@ -65,7 +63,6 @@ public abstract class serverAbstractSwitch implements serverSwitch {
private final TreeMap switchActions;
protected serverLog log;
protected int serverJobs;
private long lastTimeSaved;
public serverAbstractSwitch(String rootPath, String initPath, String configPath) {
// we initialize the switchboard with a property file,
@ -132,9 +129,6 @@ public abstract class serverAbstractSwitch implements serverSwitch {
// init busy state control
serverJobs = 0;
// save control
lastTimeSaved = System.currentTimeMillis();
}
// a logger for this switchboard
@ -243,15 +237,12 @@ public abstract class serverAbstractSwitch implements serverSwitch {
}
private void saveConfig() {
if (System.currentTimeMillis() > this.lastTimeSaved + delayBetweenSave) {
try {
synchronized (configProps) {
serverFileUtils.saveMap(configFile, configProps, configComment);
}
} catch (IOException e) {
System.out.println("ERROR: cannot write config file " + configFile.toString() + ": " + e.getMessage());
try {
synchronized (configProps) {
serverFileUtils.saveMap(configFile, configProps, configComment);
}
this.lastTimeSaved = System.currentTimeMillis();
} catch (IOException e) {
System.out.println("ERROR: cannot write config file " + configFile.toString() + ": " + e.getMessage());
}
}

Loading…
Cancel
Save