speed enhancement for BLOBHeap opening process

using concurrency of FileIO and content processing

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@5360 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
orbiter 16 years ago
parent 1545e5440a
commit 2e2120046f

@ -33,6 +33,7 @@ import java.util.Iterator;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import de.anomic.server.serverMemory;
import de.anomic.server.logging.serverLog;
@ -79,13 +80,14 @@ public final class kelondroBLOBHeap implements kelondroBLOB {
this.ordering = ordering;
this.heapFile = heapFile;
this.index = new kelondroBytesLongMap(keylength, this.ordering, 0);
this.index = null; // will be created as result of initialization process
this.free = new TreeMap<Long, Integer>();
this.file = new RandomAccessFile(heapFile, "rw");
final byte[] key = new byte[keylength];
byte[] key = new byte[keylength];
int reclen;
long seek = 0;
kelondroBytesLongMap.initDataConsumer indexready = kelondroBytesLongMap.asynchronusInitializer(keylength, this.ordering, 0, Math.max(10, (int) (Runtime.getRuntime().freeMemory() / (10 * 1024 * 1024))));
loop: while (true) { // don't test available() here because this does not work for files > 2GB
try {
@ -115,21 +117,17 @@ public final class kelondroBLOBHeap implements kelondroBLOB {
// it is an empty record, store to free list
if (reclen > 0) free.put(seek, reclen);
} else {
// store key and access address of entry in index
try {
if (this.ordering.wellformed(key)) {
index.addl(key, seek);
} else {
serverLog.logWarning("kelondroBLOBHeap", "BLOB " + heapFile.getName() + ": skiped not wellformed key " + new String(key) + " at seek pos " + seek);
}
} catch (final IOException e) {
e.printStackTrace();
break loop;
if (this.ordering.wellformed(key)) {
indexready.consume(key, seek);
key = new byte[keylength];
} else {
serverLog.logWarning("kelondroBLOBHeap", "BLOB " + heapFile.getName() + ": skiped not wellformed key " + new String(key) + " at seek pos " + seek);
}
}
// new seek position
seek += 4L + reclen;
}
indexready.finish();
// try to merge free entries
if (this.free.size() > 1) {
@ -157,6 +155,14 @@ public final class kelondroBLOBHeap implements kelondroBLOB {
serverLog.logInfo("kelondroBLOBHeap", "BLOB " + heapFile.getName() + ": merged " + merged + " free records");
}
try {
this.index = indexready.result();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
// DEBUG
/*
Iterator<byte[]> i = index.keys(true, null);

@ -27,18 +27,18 @@ package de.anomic.kelondro;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class kelondroBytesLongMap {
private final kelondroRow rowdef;
private kelondroIndex index;
public kelondroBytesLongMap(final kelondroIndex ki) {
assert (ki.row().columns() == 2); // must be a key/index relation
assert (ki.row().width(1) == 8); // the value must be a b256-encoded int, 4 bytes long
this.index = ki;
this.rowdef = ki.row();
}
private kelondroRAMIndex index;
public kelondroBytesLongMap(final int keylength, final kelondroByteOrder objectOrder, final int space) {
this.rowdef = new kelondroRow(new kelondroColumn[]{new kelondroColumn("key", kelondroColumn.celltype_binary, kelondroColumn.encoder_bytes, keylength, "key"), new kelondroColumn("long c-8 {b256}")}, objectOrder, 0);
@ -132,4 +132,73 @@ public class kelondroBytesLongMap {
index = null;
}
public static initDataConsumer asynchronusInitializer(final int keylength, final kelondroByteOrder objectOrder, final int space, int bufferSize) {
initDataConsumer initializer = new initDataConsumer(new kelondroBytesLongMap(keylength, objectOrder, space), bufferSize);
ExecutorService service = Executors.newSingleThreadExecutor();
initializer.setResult(service.submit(initializer));
service.shutdown();
return initializer;
}
public static class entry {
public byte[] key;
public long l;
public entry(final byte[] key, final long l) {
this.key = key;
this.l = l;
}
}
public static class initDataConsumer implements Callable<kelondroBytesLongMap> {
private BlockingQueue<entry> cache;
private final entry poison = new entry(new byte[0], 0);
private kelondroBytesLongMap map;
private Future<kelondroBytesLongMap> result;
public initDataConsumer(kelondroBytesLongMap map, int bufferCount) {
this.map = map;
cache = new ArrayBlockingQueue<entry>(bufferCount);
}
protected void setResult(Future<kelondroBytesLongMap> result) {
this.result = result;
}
public void consume(final byte[] key, final long l) {
try {
cache.put(new entry(key, l));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void finish() {
try {
cache.put(poison);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public kelondroBytesLongMap result() throws InterruptedException, ExecutionException {
return this.result.get();
}
public kelondroBytesLongMap call() throws IOException {
try {
entry c;
while(true) {
c = cache.take();
if (c == poison) break;
map.addl(c.key, c.l);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
map.index.finishInitialization();
return map;
}
}
}

@ -57,7 +57,7 @@ public class kelondroRAMIndex implements kelondroIndex {
return index0.row();
}
private final void finishInitialization() {
protected final void finishInitialization() {
if (index1 == null) {
// finish initialization phase
index0.sort();

Loading…
Cancel
Save