- added a deadlock prevention function in cache flushing

- removed unused methods in collection index

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@4630 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
orbiter 17 years ago
parent 764a40e37d
commit 20dadba426

@ -30,7 +30,6 @@ import java.io.File;
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import de.anomic.kelondro.kelondroBase64Order;
@ -178,17 +177,6 @@ public class indexCollectionRI implements indexRI {
}
}
public void addMultipleEntries(List<indexContainer> containerList) {
try {
for (int i = 0; i < containerList.size(); i++) collectionIndex.merge(containerList.get(i));
//collectionIndex.mergeMultiple(containerList);
} catch (kelondroOutOfLimitsException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
public void close() {
collectionIndex.close();
}

@ -36,12 +36,10 @@ import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.TimeZone;
import java.util.TreeMap;
import de.anomic.index.indexContainer;
import de.anomic.kelondro.kelondroRow.EntryIndex;
@ -421,51 +419,6 @@ public class kelondroCollectionIndex {
// after calling this method there must be a index.put(indexrow);
}
private ArrayList<kelondroRow.Entry> array_add_multiple(TreeMap<Integer, ArrayList<Object[]>> array_add_map, int serialNumber, int chunkSize) throws IOException {
// returns a List of kelondroRow.Entry entries for indexrow storage
Map.Entry<Integer, ArrayList<Object[]>> entry;
Iterator<Map.Entry<Integer, ArrayList<Object[]>>> i = array_add_map.entrySet().iterator();
Iterator<Object[]> j;
ArrayList<Object[]> actionList;
int partitionNumber;
kelondroFixedWidthArray array;
Object[] objs;
byte[] key;
kelondroRowCollection collection;
kelondroRow.Entry indexrow;
ArrayList<kelondroRow.Entry> indexrows = new ArrayList<kelondroRow.Entry>();
while (i.hasNext()) {
entry = i.next();
actionList = entry.getValue();
partitionNumber = entry.getKey().intValue();
array = getArray(partitionNumber, serialNumber, index.row().objectOrder, chunkSize);
j = actionList.iterator();
while (j.hasNext()) {
objs = (Object[]) j.next();
key = (byte[]) objs[0];
collection = (kelondroRowCollection) objs[1];
indexrow = (kelondroRow.Entry) objs[2];
// define new row
kelondroRow.Entry arrayEntry = array.row().newEntry();
arrayEntry.setCol(0, key);
arrayEntry.setCol(1, collection.exportCollection());
// write a new entry in this array
int rowNumber = array.add(arrayEntry);
// store the new row number in the index
indexrow.setCol(idx_col_chunkcount, collection.size());
indexrow.setCol(idx_col_clusteridx, (byte) partitionNumber);
indexrow.setCol(idx_col_indexpos, (long) rowNumber);
indexrow.setCol(idx_col_lastwrote, kelondroRowCollection.daysSince2000(System.currentTimeMillis()));
indexrows.add(indexrow);
}
}
// after calling this method there must be a index.put(indexrow);
return indexrows;
}
private void array_replace(
byte[] key, kelondroRowCollection collection, kelondroRow.Entry indexrow,
int partitionNumber, int serialNumber, int chunkSize,
@ -492,54 +445,6 @@ public class kelondroCollectionIndex {
// after calling this method there must be a index.put(indexrow);
}
private ArrayList<kelondroRow.Entry> array_replace_multiple(TreeMap<Integer, TreeMap<Integer, Object[]>> array_replace_map, int serialNumber, int chunkSize) throws IOException {
Map.Entry<Integer, TreeMap<Integer, Object[]>> entry;
Map.Entry<Integer, Object[]> e;
Iterator<Map.Entry<Integer, TreeMap<Integer, Object[]>>> i = array_replace_map.entrySet().iterator();
Iterator<Map.Entry<Integer, Object[]>> j;
TreeMap<Integer, Object[]> actionMap;
int partitionNumber;
kelondroFixedWidthArray array;
ArrayList<kelondroRow.Entry> indexrows = new ArrayList<kelondroRow.Entry>();
Object[] objs;
int rowNumber;
byte[] key;
kelondroRowCollection collection;
kelondroRow.Entry indexrow;
while (i.hasNext()) {
entry = i.next();
actionMap = entry.getValue();
partitionNumber = ((Integer) entry.getKey()).intValue();
array = getArray(partitionNumber, serialNumber, index.row().objectOrder, chunkSize);
j = actionMap.entrySet().iterator();
while (j.hasNext()) {
e = j.next();
rowNumber = ((Integer) e.getKey()).intValue();
objs = (Object[]) e.getValue();
key = (byte[]) objs[0];
collection = (kelondroRowCollection) objs[1];
indexrow = (kelondroRow.Entry) objs[2];
// define new row
kelondroRow.Entry arrayEntry = array.row().newEntry();
arrayEntry.setCol(0, key);
arrayEntry.setCol(1, collection.exportCollection());
// overwrite entry in this array
array.set(rowNumber, arrayEntry);
// update the index entry
indexrow.setCol(idx_col_chunkcount, collection.size());
indexrow.setCol(idx_col_clusteridx, (byte) partitionNumber);
indexrow.setCol(idx_col_lastwrote, kelondroRowCollection.daysSince2000(System.currentTimeMillis()));
indexrows.add(indexrow);
}
}
// after calling this method there mus be a index.put(indexrow);
return indexrows;
}
public synchronized void put(byte[] key, kelondroRowCollection collection) throws IOException, kelondroOutOfLimitsException {
assert (key != null);
assert (collection != null);
@ -588,165 +493,6 @@ public class kelondroCollectionIndex {
index.put(indexrow); // write modified indexrow
}
public synchronized void mergeMultiple(List<indexContainer> containerList) throws IOException, kelondroOutOfLimitsException {
// merge a bulk of index containers
// this method should be used to optimize the R/W head path length
// separate the list in two halves:
// - containers that do not exist yet in the collection
// - containers that do exist in the collection and must be merged
Iterator<indexContainer> i = containerList.iterator();
indexContainer container;
byte[] key;
ArrayList<Object[]> newContainer = new ArrayList<Object[]>();
TreeMap<Integer, TreeMap<Integer, Object[]>> existingContainer = new TreeMap<Integer, TreeMap<Integer, Object[]>>(); // a mapping from Integer (partition) to a TreeMap (mapping from index to object triple)
TreeMap<Integer, Object[]> containerMap; // temporary map; mapping from index position to object triple with {key, container, indexrow}
kelondroRow.Entry indexrow;
int oldrownumber1; // index of the entry in array
int oldPartitionNumber1; // points to array file
while (i.hasNext()) {
container = (indexContainer) i.next();
if ((container == null) || (container.size() == 0)) continue;
key = container.getWordHash().getBytes();
// first find an old entry, if one exists
indexrow = index.get(key);
if (indexrow == null) {
newContainer.add(new Object[]{key, container});
} else {
oldrownumber1 = (int) indexrow.getColLong(idx_col_indexpos);
oldPartitionNumber1 = (int) indexrow.getColByte(idx_col_clusteridx);
containerMap = existingContainer.get(new Integer(oldPartitionNumber1));
if (containerMap == null) containerMap = new TreeMap<Integer, Object[]>();
containerMap.put(new Integer(oldrownumber1), new Object[]{key, container, indexrow});
existingContainer.put(new Integer(oldPartitionNumber1), containerMap);
}
}
// now iterate through the container lists and execute merges
// this is done in such a way, that there is a optimized path for the R/W head
// merge existing containers
Map.Entry<Integer, Object[]> tripleEntry;
Object[] record;
ArrayList<kelondroRow.Entry> indexrows_existing = new ArrayList<kelondroRow.Entry>();
kelondroRowCollection collection;
TreeMap<Integer, TreeMap<Integer, Object[]>> array_replace_map = new TreeMap<Integer, TreeMap<Integer, Object[]>>();
TreeMap<Integer, ArrayList<Object[]>> array_add_map = new TreeMap<Integer, ArrayList<Object[]>>();
ArrayList<Object[]> actionList;
TreeMap<Integer, Object[]> actionMap;
//boolean madegc = false;
//System.out.println("DEBUG existingContainer: " + existingContainer.toString());
while (existingContainer.size() > 0) {
oldPartitionNumber1 = ((Integer) existingContainer.lastKey()).intValue();
containerMap = existingContainer.remove(new Integer(oldPartitionNumber1));
Iterator<Map.Entry<Integer, Object[]>> j = containerMap.entrySet().iterator();
while (j.hasNext()) {
tripleEntry = j.next();
oldrownumber1 = ((Integer) tripleEntry.getKey()).intValue();
record = (Object[]) tripleEntry.getValue(); // {byte[], indexContainer, kelondroRow.Entry}
// merge with the old collection
key = (byte[]) record[0];
collection = (kelondroRowCollection) record[1];
indexrow = (kelondroRow.Entry) record[2];
// read old information
int oldchunksize = (int) indexrow.getColLong(idx_col_chunksize); // needed only for migration
int oldchunkcount = (int) indexrow.getColLong(idx_col_chunkcount); // the number if rows in the collection
int oldrownumber = (int) indexrow.getColLong(idx_col_indexpos); // index of the entry in array
int oldPartitionNumber = (int) indexrow.getColByte(idx_col_clusteridx); // points to array file
assert oldPartitionNumber1 == oldPartitionNumber : "oldPartitionNumber1 = " + oldPartitionNumber1 + ", oldPartitionNumber = " + oldPartitionNumber + ", containerMap = " + containerMap + ", existingContainer: " + existingContainer.toString();
assert oldrownumber1 == oldrownumber : "oldrownumber1 = " + oldrownumber1 + ", oldrownumber = " + oldrownumber + ", containerMap = " + containerMap + ", existingContainer: " + existingContainer.toString();
assert (oldPartitionNumber >= arrayIndex(oldchunkcount));
int oldSerialNumber = 0;
// load the old collection and join it
collection.addAllUnique(getwithparams(indexrow, oldchunksize, oldchunkcount, oldPartitionNumber, oldrownumber, oldSerialNumber, false));
collection.sort();
collection.uniq(); // FIXME: not clear if it would be better to insert the collection with put to avoid double-entries
collection.trim(false);
// check for size of collection:
// if necessary shrink the collection and dump a part of that collection
// to avoid that this grows too big
if (arrayIndex(collection.size()) > maxPartitions) {
shrinkCollection(key, collection, arrayCapacity(maxPartitions));
}
// determine new partition position
int newPartitionNumber = arrayIndex(collection.size());
// see if we need new space or if we can overwrite the old space
if (oldPartitionNumber == newPartitionNumber) {
actionMap = array_replace_map.get(new Integer(oldPartitionNumber));
if (actionMap == null) actionMap = new TreeMap<Integer, Object[]>();
actionMap.put(new Integer(oldrownumber), new Object[]{key, collection, indexrow});
array_replace_map.put(new Integer(oldPartitionNumber), actionMap);
/*
array_replace(
key, collection, indexrow,
oldPartitionNumber, oldSerialNumber, this.payloadrow.objectsize(),
oldrownumber); // modifies indexrow
indexrows_existing.add(indexrow); // indexrows are collected and written later as block
*/
} else {
array_remove(
oldPartitionNumber, oldSerialNumber, this.payloadrow.objectsize,
oldrownumber);
actionList = array_add_map.get(new Integer(newPartitionNumber));
if (actionList == null) actionList = new ArrayList<Object[]>();
actionList.add(new Object[]{key, collection, indexrow});
array_add_map.put(new Integer(newPartitionNumber), actionList);
/*
array_add(
key, collection, indexrow,
newPartitionNumber, oldSerialNumber, this.payloadrow.objectsize()); // modifies indexrow
indexrows_existing.add(indexrow); // indexrows are collected and written later as block
*/
}
// memory protection: flush collected collections
if (serverMemory.available() < minMem()) {
// emergency flush
indexrows_existing.addAll(array_replace_multiple(array_replace_map, 0, this.payloadrow.objectsize));
array_replace_map = new TreeMap<Integer, TreeMap<Integer, Object[]>>(); // delete references
indexrows_existing.addAll(array_add_multiple(array_add_map, 0, this.payloadrow.objectsize));
array_add_map = new TreeMap<Integer, ArrayList<Object[]>>(); // delete references
//if (!madegc) {
// prevent that this flush is made again even when there is enough memory
serverMemory.gc(10000, "kelendroCollectionIndex.mergeMultiple(...)"); // thq
// prevent that this gc happens more than one time
// madegc = true;
//}
}
}
}
// finallly flush the collected collections
indexrows_existing.addAll(array_replace_multiple(array_replace_map, 0, this.payloadrow.objectsize));
array_replace_map = new TreeMap<Integer, TreeMap<Integer, Object[]>>(); // delete references
indexrows_existing.addAll(array_add_multiple(array_add_map, 0, this.payloadrow.objectsize));
array_add_map = new TreeMap<Integer, ArrayList<Object[]>>(); // delete references
// write new containers
Iterator<Object[]> k = newContainer.iterator();
ArrayList<kelondroRow.Entry> indexrows_new = new ArrayList<kelondroRow.Entry>();
while (k.hasNext()) {
record = k.next(); // {byte[], indexContainer}
key = (byte[]) record[0];
collection = (indexContainer) record[1];
indexrow = array_new(key, collection); // modifies indexrow
indexrows_new.add(indexrow); // collect new index rows
}
// write index entries
index.putMultiple(indexrows_existing); // write modified indexrows in optimized manner
index.addUniqueMultiple(indexrows_new); // write new indexrows in optimized manner
}
public synchronized void merge(indexContainer container) throws IOException, kelondroOutOfLimitsException {
if ((container == null) || (container.size() == 0)) return;
byte[] key = container.getWordHash().getBytes();

@ -57,7 +57,7 @@ public class kelondroSplitTable implements kelondroIndex {
private static final long minimumRAM4Eco = 80 * 1024 * 1024;
private static final int EcoFSBufferSize = 20;
private static final kelondroIndex dummyIndex = new kelondroRAMIndex(new kelondroRow(new kelondroColumn[]{new kelondroColumn("key", kelondroColumn.celltype_binary, kelondroColumn.encoder_bytes, 2, "key")}, kelondroNaturalOrder.naturalOrder, 0), 0);
static final kelondroIndex dummyIndex = new kelondroRAMIndex(new kelondroRow(new kelondroColumn[]{new kelondroColumn("key", kelondroColumn.celltype_binary, kelondroColumn.encoder_bytes, 2, "key")}, kelondroNaturalOrder.naturalOrder, 0), 0);
// the thread pool for the keeperOf executor service
private ExecutorService executor;
@ -255,7 +255,7 @@ public class kelondroSplitTable implements kelondroIndex {
return null;
}
public synchronized kelondroIndex keeperOf(final byte[] key) throws IOException {
public synchronized kelondroIndex keeperOf(final byte[] key) {
// because the index is stored only in one table,
// and the index is completely in RAM, a concurrency will create
// not concurrent File accesses

@ -207,9 +207,14 @@ public final class plasmaWordIndex implements indexRI {
public void dhtFlushControl(indexRAMRI theCache) {
// check for forced flush
while (theCache.maxURLinCache() > wCacheMaxChunk ) {
int l = 0;
// flush elements that are too big. This flushinfg depends on the fact that the flush rule
// selects the biggest elements first for flushing. If it does not for any reason, the following
// loop would not terminate. To ensure termination an additional counter is used
while ((l++ < 100) && (theCache.maxURLinCache() > wCacheMaxChunk)) {
flushCache(theCache, Math.min(10, theCache.size()));
}
// next flush more entries if the size exceeds the maximum size of the cache
if ((theCache.size() > theCache.getMaxWordCount()) ||
(serverMemory.available() < collections.minMem())) {
flushCache(theCache, Math.min(theCache.size() - theCache.getMaxWordCount() + 1, theCache.size()));
@ -301,7 +306,7 @@ public final class plasmaWordIndex implements indexRI {
if (c != null) containerList.add(c);
}
// flush the containers
collections.addMultipleEntries(containerList);
for (indexContainer container : containerList) collections.addEntries(container);
//System.out.println("DEBUG-Finished flush of " + count + " entries from RAM to DB in " + (System.currentTimeMillis() - start) + " milliseconds");
return containerList.size();
}

Loading…
Cancel
Save