diff --git a/source/de/anomic/kelondro/kelondroBLOBHeap.java b/source/de/anomic/kelondro/kelondroBLOBHeap.java index 5584bc4d3..1297a4fc3 100755 --- a/source/de/anomic/kelondro/kelondroBLOBHeap.java +++ b/source/de/anomic/kelondro/kelondroBLOBHeap.java @@ -33,6 +33,7 @@ import java.util.Iterator; import java.util.Map; import java.util.SortedMap; import java.util.TreeMap; +import java.util.concurrent.ExecutionException; import de.anomic.server.serverMemory; import de.anomic.server.logging.serverLog; @@ -79,13 +80,14 @@ public final class kelondroBLOBHeap implements kelondroBLOB { this.ordering = ordering; this.heapFile = heapFile; - this.index = new kelondroBytesLongMap(keylength, this.ordering, 0); + this.index = null; // will be created as result of initialization process this.free = new TreeMap(); this.file = new RandomAccessFile(heapFile, "rw"); - final byte[] key = new byte[keylength]; + byte[] key = new byte[keylength]; int reclen; long seek = 0; - + kelondroBytesLongMap.initDataConsumer indexready = kelondroBytesLongMap.asynchronusInitializer(keylength, this.ordering, 0, Math.max(10, (int) (Runtime.getRuntime().freeMemory() / (10 * 1024 * 1024)))); + loop: while (true) { // don't test available() here because this does not work for files > 2GB try { @@ -115,21 +117,17 @@ public final class kelondroBLOBHeap implements kelondroBLOB { // it is an empty record, store to free list if (reclen > 0) free.put(seek, reclen); } else { - // store key and access address of entry in index - try { - if (this.ordering.wellformed(key)) { - index.addl(key, seek); - } else { - serverLog.logWarning("kelondroBLOBHeap", "BLOB " + heapFile.getName() + ": skiped not wellformed key " + new String(key) + " at seek pos " + seek); - } - } catch (final IOException e) { - e.printStackTrace(); - break loop; + if (this.ordering.wellformed(key)) { + indexready.consume(key, seek); + key = new byte[keylength]; + } else { + serverLog.logWarning("kelondroBLOBHeap", "BLOB " + heapFile.getName() + ": skiped not wellformed key " + new String(key) + " at seek pos " + seek); } } // new seek position seek += 4L + reclen; } + indexready.finish(); // try to merge free entries if (this.free.size() > 1) { @@ -157,6 +155,14 @@ public final class kelondroBLOBHeap implements kelondroBLOB { serverLog.logInfo("kelondroBLOBHeap", "BLOB " + heapFile.getName() + ": merged " + merged + " free records"); } + try { + this.index = indexready.result(); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (ExecutionException e) { + e.printStackTrace(); + } + // DEBUG /* Iterator i = index.keys(true, null); diff --git a/source/de/anomic/kelondro/kelondroBytesLongMap.java b/source/de/anomic/kelondro/kelondroBytesLongMap.java index 6558a5ec0..51969fc90 100644 --- a/source/de/anomic/kelondro/kelondroBytesLongMap.java +++ b/source/de/anomic/kelondro/kelondroBytesLongMap.java @@ -27,18 +27,18 @@ package de.anomic.kelondro; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; public class kelondroBytesLongMap { private final kelondroRow rowdef; - private kelondroIndex index; - - public kelondroBytesLongMap(final kelondroIndex ki) { - assert (ki.row().columns() == 2); // must be a key/index relation - assert (ki.row().width(1) == 8); // the value must be a b256-encoded int, 4 bytes long - this.index = ki; - this.rowdef = ki.row(); - } + private kelondroRAMIndex index; public kelondroBytesLongMap(final int keylength, final kelondroByteOrder objectOrder, final int space) { this.rowdef = new kelondroRow(new kelondroColumn[]{new kelondroColumn("key", kelondroColumn.celltype_binary, kelondroColumn.encoder_bytes, keylength, "key"), new kelondroColumn("long c-8 {b256}")}, objectOrder, 0); @@ -132,4 +132,73 @@ public class kelondroBytesLongMap { index = null; } + public static initDataConsumer asynchronusInitializer(final int keylength, final kelondroByteOrder objectOrder, final int space, int bufferSize) { + initDataConsumer initializer = new initDataConsumer(new kelondroBytesLongMap(keylength, objectOrder, space), bufferSize); + ExecutorService service = Executors.newSingleThreadExecutor(); + initializer.setResult(service.submit(initializer)); + service.shutdown(); + return initializer; + } + + public static class entry { + public byte[] key; + public long l; + public entry(final byte[] key, final long l) { + this.key = key; + this.l = l; + } + } + + public static class initDataConsumer implements Callable { + + private BlockingQueue cache; + private final entry poison = new entry(new byte[0], 0); + private kelondroBytesLongMap map; + private Future result; + + public initDataConsumer(kelondroBytesLongMap map, int bufferCount) { + this.map = map; + cache = new ArrayBlockingQueue(bufferCount); + } + + protected void setResult(Future result) { + this.result = result; + } + + public void consume(final byte[] key, final long l) { + try { + cache.put(new entry(key, l)); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + public void finish() { + try { + cache.put(poison); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + public kelondroBytesLongMap result() throws InterruptedException, ExecutionException { + return this.result.get(); + } + + public kelondroBytesLongMap call() throws IOException { + try { + entry c; + while(true) { + c = cache.take(); + if (c == poison) break; + map.addl(c.key, c.l); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + map.index.finishInitialization(); + return map; + } + + } } diff --git a/source/de/anomic/kelondro/kelondroRAMIndex.java b/source/de/anomic/kelondro/kelondroRAMIndex.java index a2116a610..55ed252bf 100644 --- a/source/de/anomic/kelondro/kelondroRAMIndex.java +++ b/source/de/anomic/kelondro/kelondroRAMIndex.java @@ -57,7 +57,7 @@ public class kelondroRAMIndex implements kelondroIndex { return index0.row(); } - private final void finishInitialization() { + protected final void finishInitialization() { if (index1 == null) { // finish initialization phase index0.sort();