diff --git a/source/de/anomic/kelondro/blob/BLOBArray.java b/source/de/anomic/kelondro/blob/BLOBArray.java index 9683aa5c7..4de4c939d 100755 --- a/source/de/anomic/kelondro/blob/BLOBArray.java +++ b/source/de/anomic/kelondro/blob/BLOBArray.java @@ -559,16 +559,16 @@ public class BLOBArray implements BLOB { blobs = null; } - public File mergeMount(File f1, File f2, ReferenceFactory factory, Row payloadrow, File newFile) throws IOException { + public File mergeMount(File f1, File f2, ReferenceFactory factory, Row payloadrow, File newFile, int writeBuffer) throws IOException { Log.logInfo("BLOBArray", "merging " + f1.getName() + " with " + f2.getName()); - File resultFile = mergeWorker(factory, this.keylength, this.ordering, f1, f2, payloadrow, newFile); + File resultFile = mergeWorker(factory, this.keylength, this.ordering, f1, f2, payloadrow, newFile, writeBuffer); if (resultFile == null) return null; mountBLOB(resultFile, false); Log.logInfo("BLOBArray", "merged " + f1.getName() + " with " + f2.getName() + " into " + resultFile); return resultFile; } - private static File mergeWorker(ReferenceFactory factory, int keylength, ByteOrder order, File f1, File f2, Row payloadrow, File newFile) throws IOException { + private static File mergeWorker(ReferenceFactory factory, int keylength, ByteOrder order, File f1, File f2, Row payloadrow, File newFile, int writeBuffer) throws IOException { // iterate both files and write a new one CloneableIterator> i1 = new blobFileEntries(f1, factory, payloadrow); @@ -591,7 +591,7 @@ public class BLOBArray implements BLOB { assert i1.hasNext(); assert i2.hasNext(); File tmpFile = new File(newFile.getParentFile(), newFile.getName() + ".tmp"); - HeapWriter writer = new HeapWriter(tmpFile, newFile, keylength, order); + HeapWriter writer = new HeapWriter(tmpFile, newFile, keylength, order, writeBuffer); merge(i1, i2, order, writer); try { writer.close(true); diff --git a/source/de/anomic/kelondro/blob/HeapWriter.java b/source/de/anomic/kelondro/blob/HeapWriter.java index 3d82aa466..9318e6f00 100644 --- a/source/de/anomic/kelondro/blob/HeapWriter.java +++ b/source/de/anomic/kelondro/blob/HeapWriter.java @@ -72,12 +72,12 @@ public final class HeapWriter { * @param ordering * @throws IOException */ - public HeapWriter(final File temporaryHeapFile, final File readyHeapFile, final int keylength, final ByteOrder ordering) throws IOException { + public HeapWriter(final File temporaryHeapFile, final File readyHeapFile, final int keylength, final ByteOrder ordering, int outBuffer) throws IOException { this.heapFileTMP = temporaryHeapFile; this.heapFileREADY = readyHeapFile; this.keylength = keylength; this.index = new LongHandleIndex(keylength, ordering, 10, 100000); - this.os = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(temporaryHeapFile), 8 * 1024 * 1024)); + this.os = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(temporaryHeapFile), outBuffer)); //this.doublecheck = new HashSet(); this.seek = 0; } diff --git a/source/de/anomic/kelondro/text/IODispatcher.java b/source/de/anomic/kelondro/text/IODispatcher.java index d719b3971..8bbb10964 100644 --- a/source/de/anomic/kelondro/text/IODispatcher.java +++ b/source/de/anomic/kelondro/text/IODispatcher.java @@ -32,6 +32,7 @@ import java.util.concurrent.Semaphore; import de.anomic.kelondro.blob.BLOBArray; import de.anomic.kelondro.index.Row; import de.anomic.kelondro.util.Log; +import de.anomic.kelondro.util.MemoryControl; /** * this is a concurrent merger that can merge single files that are queued for merging. @@ -53,13 +54,15 @@ public class IODispatcher extends Thread { private ArrayBlockingQueue dumpQueue; private ReferenceFactory factory; private boolean terminate; + private int writeBufferSize; - public IODispatcher(ReferenceFactory factory, int dumpQueueLength, int mergeQueueLength) { + public IODispatcher(ReferenceFactory factory, int dumpQueueLength, int mergeQueueLength, int writeBufferSize) { this.factory = factory; this.termination = new Semaphore(0); this.controlQueue = new Semaphore(0); this.dumpQueue = new ArrayBlockingQueue(dumpQueueLength); this.mergeQueue = new ArrayBlockingQueue(mergeQueueLength); + this.writeBufferSize = writeBufferSize; this.terminate = false; } @@ -79,7 +82,7 @@ public class IODispatcher extends Thread { public synchronized void dump(ReferenceContainerCache cache, File file, ReferenceContainerArray array) { if (dumpQueue == null || controlQueue == null || !this.isAlive()) { Log.logWarning("IODispatcher", "emergency dump of file " + file.getName()); - cache.dump(file); + cache.dump(file, (int) Math.min(MemoryControl.available() / 3, writeBufferSize)); } else { DumpJob job = new DumpJob(cache, file, array); try { @@ -88,7 +91,7 @@ public class IODispatcher extends Thread { Log.logInfo("IODispatcher", "appended dump job for file " + file.getName()); } catch (InterruptedException e) { e.printStackTrace(); - cache.dump(file); + cache.dump(file, (int) Math.min(MemoryControl.available() / 3, writeBufferSize)); } } } @@ -101,7 +104,7 @@ public class IODispatcher extends Thread { if (mergeQueue == null || controlQueue == null || !this.isAlive()) { try { Log.logWarning("IODispatcher", "emergency merge of files " + f1.getName() + ", " + f2.getName() + " to " + newFile.getName()); - array.mergeMount(f1, f2, factory, payloadrow, newFile); + array.mergeMount(f1, f2, factory, payloadrow, newFile, (int) Math.min(MemoryControl.available() / 3, writeBufferSize)); } catch (IOException e) { e.printStackTrace(); } @@ -114,7 +117,7 @@ public class IODispatcher extends Thread { } catch (InterruptedException e) { e.printStackTrace(); try { - array.mergeMount(f1, f2, factory, payloadrow, newFile); + array.mergeMount(f1, f2, factory, payloadrow, newFile, (int) Math.min(MemoryControl.available() / 3, writeBufferSize)); } catch (IOException ee) { ee.printStackTrace(); } @@ -189,7 +192,7 @@ public class IODispatcher extends Thread { } public void dump() { try { - cache.dump(file); + cache.dump(file, (int) Math.min(MemoryControl.available() / 3, writeBufferSize)); array.mountBLOBFile(file); } catch (IOException e) { e.printStackTrace(); @@ -221,7 +224,7 @@ public class IODispatcher extends Thread { return null; } try { - return array.mergeMount(f1, f2, factory, payloadrow, newFile); + return array.mergeMount(f1, f2, factory, payloadrow, newFile, (int) Math.min(MemoryControl.available() / 3, writeBufferSize)); } catch (IOException e) { e.printStackTrace(); } diff --git a/source/de/anomic/kelondro/text/IndexCell.java b/source/de/anomic/kelondro/text/IndexCell.java index ecc628a60..fe3e8cd1f 100644 --- a/source/de/anomic/kelondro/text/IndexCell.java +++ b/source/de/anomic/kelondro/text/IndexCell.java @@ -61,6 +61,7 @@ public final class IndexCell extends AbstractBu private final IODispatcher merger; private long lastCleanup; private final long targetFileSize, maxFileSize; + private final int writeBufferSize; public IndexCell( @@ -71,7 +72,8 @@ public final class IndexCell extends AbstractBu final int maxRamEntries, final long targetFileSize, final long maxFileSize, - IODispatcher merger + IODispatcher merger, + int writeBufferSize ) throws IOException { super(factory); @@ -83,6 +85,7 @@ public final class IndexCell extends AbstractBu this.lastCleanup = System.currentTimeMillis(); this.targetFileSize = targetFileSize; this.maxFileSize = maxFileSize; + this.writeBufferSize = writeBufferSize; cleanCache(); } @@ -251,7 +254,7 @@ public final class IndexCell extends AbstractBu * and is composed of the current date and the cell salt */ public synchronized void close() { - this.ram.dump(this.array.newContainerBLOBFile()); + this.ram.dump(this.array.newContainerBLOBFile(), (int) Math.min(MemoryControl.available() / 3, writeBufferSize)); // close all this.ram.close(); this.array.close(); diff --git a/source/de/anomic/kelondro/text/IndexCollectionMigration.java b/source/de/anomic/kelondro/text/IndexCollectionMigration.java index f34282a9d..d496edbd6 100644 --- a/source/de/anomic/kelondro/text/IndexCollectionMigration.java +++ b/source/de/anomic/kelondro/text/IndexCollectionMigration.java @@ -60,6 +60,7 @@ public final class IndexCollectionMigration ext final long targetFileSize, final long maxFileSize, final IODispatcher merger, + final int writeBufferSize, final Log log) throws IOException { super(factory); @@ -73,7 +74,8 @@ public final class IndexCollectionMigration ext entityCacheMaxSize, targetFileSize, maxFileSize, - this.merger); + this.merger, + writeBufferSize); final File textindexcache = new File(indexPrimaryTextLocation, "RICACHE"); if (textindexcache.exists()) { // migrate the "index.dhtout.blob" into RICELL directory diff --git a/source/de/anomic/kelondro/text/ReferenceContainerCache.java b/source/de/anomic/kelondro/text/ReferenceContainerCache.java index 255328f37..a5a9f1acf 100644 --- a/source/de/anomic/kelondro/text/ReferenceContainerCache.java +++ b/source/de/anomic/kelondro/text/ReferenceContainerCache.java @@ -115,14 +115,14 @@ public final class ReferenceContainerCache exte Log.logInfo("indexContainerRAMHeap", "finished rwi blob restore: " + cache.size() + " words, " + urlCount + " word/URL relations in " + (System.currentTimeMillis() - start) + " milliseconds"); } - public void dump(final File heapFile) { + public void dump(final File heapFile, int writeBuffer) { assert this.cache != null; Log.logInfo("indexContainerRAMHeap", "creating rwi heap dump '" + heapFile.getName() + "', " + cache.size() + " rwi's"); if (heapFile.exists()) FileUtils.deletedelete(heapFile); File tmpFile = new File(heapFile.getParentFile(), heapFile.getName() + ".tmp"); HeapWriter dump; try { - dump = new HeapWriter(tmpFile, heapFile, payloadrow.primaryKeyLength, Base64Order.enhancedCoder); + dump = new HeapWriter(tmpFile, heapFile, payloadrow.primaryKeyLength, Base64Order.enhancedCoder, writeBuffer); } catch (IOException e1) { e1.printStackTrace(); return; diff --git a/source/de/anomic/plasma/plasmaWordIndex.java b/source/de/anomic/plasma/plasmaWordIndex.java index b82934427..2b2a77f55 100644 --- a/source/de/anomic/plasma/plasmaWordIndex.java +++ b/source/de/anomic/plasma/plasmaWordIndex.java @@ -75,6 +75,7 @@ public final class plasmaWordIndex { public static final int maxCollectionPartition = 7; // should be 7 public static final long targetFileSize = 100 * 1024 * 1024; // 100 MB public static final long maxFileSize = BLOBArray.oneGigabyte; // 1GB + public static final int writeBufferSize = 4 * 1024 * 1024; // the reference factory public static final ReferenceFactory wordReferenceFactory = new WordReferenceFactory(); @@ -152,7 +153,7 @@ public final class plasmaWordIndex { // check if the peer has migrated the index if (new File(indexPrimaryTextLocation, "RICOLLECTION").exists()) { - this.merger = new IODispatcher(plasmaWordIndex.wordReferenceFactory, 1, 1); + this.merger = new IODispatcher(plasmaWordIndex.wordReferenceFactory, 1, 1, writeBufferSize); if (this.merger != null) this.merger.start(); this.index = new IndexCollectionMigration( indexPrimaryTextLocation, @@ -163,9 +164,10 @@ public final class plasmaWordIndex { targetFileSize, maxFileSize, this.merger, + writeBufferSize, log); } else { - this.merger = new IODispatcher(plasmaWordIndex.wordReferenceFactory, 1, 1); + this.merger = new IODispatcher(plasmaWordIndex.wordReferenceFactory, 1, 1, writeBufferSize); this.merger.start(); this.index = new IndexCell( new File(indexPrimaryTextLocation, "RICELL"), @@ -175,7 +177,8 @@ public final class plasmaWordIndex { entityCacheMaxSize, targetFileSize, maxFileSize, - this.merger); + this.merger, + writeBufferSize); }