better scaling of HEAP dump writer for small memory configurations;

should prevent OOMs during cache dumps

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@5920 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
orbiter 16 years ago
parent 6e0b57284d
commit 21fbca0410

@ -559,16 +559,16 @@ public class BLOBArray implements BLOB {
blobs = null; blobs = null;
} }
public File mergeMount(File f1, File f2, ReferenceFactory<?> factory, Row payloadrow, File newFile) throws IOException { public File mergeMount(File f1, File f2, ReferenceFactory<?> factory, Row payloadrow, File newFile, int writeBuffer) throws IOException {
Log.logInfo("BLOBArray", "merging " + f1.getName() + " with " + f2.getName()); Log.logInfo("BLOBArray", "merging " + f1.getName() + " with " + f2.getName());
File resultFile = mergeWorker(factory, this.keylength, this.ordering, f1, f2, payloadrow, newFile); File resultFile = mergeWorker(factory, this.keylength, this.ordering, f1, f2, payloadrow, newFile, writeBuffer);
if (resultFile == null) return null; if (resultFile == null) return null;
mountBLOB(resultFile, false); mountBLOB(resultFile, false);
Log.logInfo("BLOBArray", "merged " + f1.getName() + " with " + f2.getName() + " into " + resultFile); Log.logInfo("BLOBArray", "merged " + f1.getName() + " with " + f2.getName() + " into " + resultFile);
return resultFile; return resultFile;
} }
private static <ReferenceType extends Reference> File mergeWorker(ReferenceFactory<ReferenceType> factory, int keylength, ByteOrder order, File f1, File f2, Row payloadrow, File newFile) throws IOException { private static <ReferenceType extends Reference> File mergeWorker(ReferenceFactory<ReferenceType> factory, int keylength, ByteOrder order, File f1, File f2, Row payloadrow, File newFile, int writeBuffer) throws IOException {
// iterate both files and write a new one // iterate both files and write a new one
CloneableIterator<ReferenceContainer<ReferenceType>> i1 = new blobFileEntries<ReferenceType>(f1, factory, payloadrow); CloneableIterator<ReferenceContainer<ReferenceType>> i1 = new blobFileEntries<ReferenceType>(f1, factory, payloadrow);
@ -591,7 +591,7 @@ public class BLOBArray implements BLOB {
assert i1.hasNext(); assert i1.hasNext();
assert i2.hasNext(); assert i2.hasNext();
File tmpFile = new File(newFile.getParentFile(), newFile.getName() + ".tmp"); File tmpFile = new File(newFile.getParentFile(), newFile.getName() + ".tmp");
HeapWriter writer = new HeapWriter(tmpFile, newFile, keylength, order); HeapWriter writer = new HeapWriter(tmpFile, newFile, keylength, order, writeBuffer);
merge(i1, i2, order, writer); merge(i1, i2, order, writer);
try { try {
writer.close(true); writer.close(true);

@ -72,12 +72,12 @@ public final class HeapWriter {
* @param ordering * @param ordering
* @throws IOException * @throws IOException
*/ */
public HeapWriter(final File temporaryHeapFile, final File readyHeapFile, final int keylength, final ByteOrder ordering) throws IOException { public HeapWriter(final File temporaryHeapFile, final File readyHeapFile, final int keylength, final ByteOrder ordering, int outBuffer) throws IOException {
this.heapFileTMP = temporaryHeapFile; this.heapFileTMP = temporaryHeapFile;
this.heapFileREADY = readyHeapFile; this.heapFileREADY = readyHeapFile;
this.keylength = keylength; this.keylength = keylength;
this.index = new LongHandleIndex(keylength, ordering, 10, 100000); this.index = new LongHandleIndex(keylength, ordering, 10, 100000);
this.os = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(temporaryHeapFile), 8 * 1024 * 1024)); this.os = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(temporaryHeapFile), outBuffer));
//this.doublecheck = new HashSet<String>(); //this.doublecheck = new HashSet<String>();
this.seek = 0; this.seek = 0;
} }

@ -32,6 +32,7 @@ import java.util.concurrent.Semaphore;
import de.anomic.kelondro.blob.BLOBArray; import de.anomic.kelondro.blob.BLOBArray;
import de.anomic.kelondro.index.Row; import de.anomic.kelondro.index.Row;
import de.anomic.kelondro.util.Log; import de.anomic.kelondro.util.Log;
import de.anomic.kelondro.util.MemoryControl;
/** /**
* this is a concurrent merger that can merge single files that are queued for merging. * this is a concurrent merger that can merge single files that are queued for merging.
@ -53,13 +54,15 @@ public class IODispatcher <ReferenceType extends Reference> extends Thread {
private ArrayBlockingQueue<DumpJob> dumpQueue; private ArrayBlockingQueue<DumpJob> dumpQueue;
private ReferenceFactory<ReferenceType> factory; private ReferenceFactory<ReferenceType> factory;
private boolean terminate; private boolean terminate;
private int writeBufferSize;
public IODispatcher(ReferenceFactory<ReferenceType> factory, int dumpQueueLength, int mergeQueueLength) { public IODispatcher(ReferenceFactory<ReferenceType> factory, int dumpQueueLength, int mergeQueueLength, int writeBufferSize) {
this.factory = factory; this.factory = factory;
this.termination = new Semaphore(0); this.termination = new Semaphore(0);
this.controlQueue = new Semaphore(0); this.controlQueue = new Semaphore(0);
this.dumpQueue = new ArrayBlockingQueue<DumpJob>(dumpQueueLength); this.dumpQueue = new ArrayBlockingQueue<DumpJob>(dumpQueueLength);
this.mergeQueue = new ArrayBlockingQueue<MergeJob>(mergeQueueLength); this.mergeQueue = new ArrayBlockingQueue<MergeJob>(mergeQueueLength);
this.writeBufferSize = writeBufferSize;
this.terminate = false; this.terminate = false;
} }
@ -79,7 +82,7 @@ public class IODispatcher <ReferenceType extends Reference> extends Thread {
public synchronized void dump(ReferenceContainerCache<ReferenceType> cache, File file, ReferenceContainerArray<ReferenceType> array) { public synchronized void dump(ReferenceContainerCache<ReferenceType> cache, File file, ReferenceContainerArray<ReferenceType> array) {
if (dumpQueue == null || controlQueue == null || !this.isAlive()) { if (dumpQueue == null || controlQueue == null || !this.isAlive()) {
Log.logWarning("IODispatcher", "emergency dump of file " + file.getName()); Log.logWarning("IODispatcher", "emergency dump of file " + file.getName());
cache.dump(file); cache.dump(file, (int) Math.min(MemoryControl.available() / 3, writeBufferSize));
} else { } else {
DumpJob job = new DumpJob(cache, file, array); DumpJob job = new DumpJob(cache, file, array);
try { try {
@ -88,7 +91,7 @@ public class IODispatcher <ReferenceType extends Reference> extends Thread {
Log.logInfo("IODispatcher", "appended dump job for file " + file.getName()); Log.logInfo("IODispatcher", "appended dump job for file " + file.getName());
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
cache.dump(file); cache.dump(file, (int) Math.min(MemoryControl.available() / 3, writeBufferSize));
} }
} }
} }
@ -101,7 +104,7 @@ public class IODispatcher <ReferenceType extends Reference> extends Thread {
if (mergeQueue == null || controlQueue == null || !this.isAlive()) { if (mergeQueue == null || controlQueue == null || !this.isAlive()) {
try { try {
Log.logWarning("IODispatcher", "emergency merge of files " + f1.getName() + ", " + f2.getName() + " to " + newFile.getName()); Log.logWarning("IODispatcher", "emergency merge of files " + f1.getName() + ", " + f2.getName() + " to " + newFile.getName());
array.mergeMount(f1, f2, factory, payloadrow, newFile); array.mergeMount(f1, f2, factory, payloadrow, newFile, (int) Math.min(MemoryControl.available() / 3, writeBufferSize));
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); e.printStackTrace();
} }
@ -114,7 +117,7 @@ public class IODispatcher <ReferenceType extends Reference> extends Thread {
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
try { try {
array.mergeMount(f1, f2, factory, payloadrow, newFile); array.mergeMount(f1, f2, factory, payloadrow, newFile, (int) Math.min(MemoryControl.available() / 3, writeBufferSize));
} catch (IOException ee) { } catch (IOException ee) {
ee.printStackTrace(); ee.printStackTrace();
} }
@ -189,7 +192,7 @@ public class IODispatcher <ReferenceType extends Reference> extends Thread {
} }
public void dump() { public void dump() {
try { try {
cache.dump(file); cache.dump(file, (int) Math.min(MemoryControl.available() / 3, writeBufferSize));
array.mountBLOBFile(file); array.mountBLOBFile(file);
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); e.printStackTrace();
@ -221,7 +224,7 @@ public class IODispatcher <ReferenceType extends Reference> extends Thread {
return null; return null;
} }
try { try {
return array.mergeMount(f1, f2, factory, payloadrow, newFile); return array.mergeMount(f1, f2, factory, payloadrow, newFile, (int) Math.min(MemoryControl.available() / 3, writeBufferSize));
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); e.printStackTrace();
} }

@ -61,6 +61,7 @@ public final class IndexCell<ReferenceType extends Reference> extends AbstractBu
private final IODispatcher<ReferenceType> merger; private final IODispatcher<ReferenceType> merger;
private long lastCleanup; private long lastCleanup;
private final long targetFileSize, maxFileSize; private final long targetFileSize, maxFileSize;
private final int writeBufferSize;
public IndexCell( public IndexCell(
@ -71,7 +72,8 @@ public final class IndexCell<ReferenceType extends Reference> extends AbstractBu
final int maxRamEntries, final int maxRamEntries,
final long targetFileSize, final long targetFileSize,
final long maxFileSize, final long maxFileSize,
IODispatcher<ReferenceType> merger IODispatcher<ReferenceType> merger,
int writeBufferSize
) throws IOException { ) throws IOException {
super(factory); super(factory);
@ -83,6 +85,7 @@ public final class IndexCell<ReferenceType extends Reference> extends AbstractBu
this.lastCleanup = System.currentTimeMillis(); this.lastCleanup = System.currentTimeMillis();
this.targetFileSize = targetFileSize; this.targetFileSize = targetFileSize;
this.maxFileSize = maxFileSize; this.maxFileSize = maxFileSize;
this.writeBufferSize = writeBufferSize;
cleanCache(); cleanCache();
} }
@ -251,7 +254,7 @@ public final class IndexCell<ReferenceType extends Reference> extends AbstractBu
* and is composed of the current date and the cell salt * and is composed of the current date and the cell salt
*/ */
public synchronized void close() { public synchronized void close() {
this.ram.dump(this.array.newContainerBLOBFile()); this.ram.dump(this.array.newContainerBLOBFile(), (int) Math.min(MemoryControl.available() / 3, writeBufferSize));
// close all // close all
this.ram.close(); this.ram.close();
this.array.close(); this.array.close();

@ -60,6 +60,7 @@ public final class IndexCollectionMigration<ReferenceType extends Reference> ext
final long targetFileSize, final long targetFileSize,
final long maxFileSize, final long maxFileSize,
final IODispatcher<ReferenceType> merger, final IODispatcher<ReferenceType> merger,
final int writeBufferSize,
final Log log) throws IOException { final Log log) throws IOException {
super(factory); super(factory);
@ -73,7 +74,8 @@ public final class IndexCollectionMigration<ReferenceType extends Reference> ext
entityCacheMaxSize, entityCacheMaxSize,
targetFileSize, targetFileSize,
maxFileSize, maxFileSize,
this.merger); this.merger,
writeBufferSize);
final File textindexcache = new File(indexPrimaryTextLocation, "RICACHE"); final File textindexcache = new File(indexPrimaryTextLocation, "RICACHE");
if (textindexcache.exists()) { if (textindexcache.exists()) {
// migrate the "index.dhtout.blob" into RICELL directory // migrate the "index.dhtout.blob" into RICELL directory

@ -115,14 +115,14 @@ public final class ReferenceContainerCache<ReferenceType extends Reference> exte
Log.logInfo("indexContainerRAMHeap", "finished rwi blob restore: " + cache.size() + " words, " + urlCount + " word/URL relations in " + (System.currentTimeMillis() - start) + " milliseconds"); Log.logInfo("indexContainerRAMHeap", "finished rwi blob restore: " + cache.size() + " words, " + urlCount + " word/URL relations in " + (System.currentTimeMillis() - start) + " milliseconds");
} }
public void dump(final File heapFile) { public void dump(final File heapFile, int writeBuffer) {
assert this.cache != null; assert this.cache != null;
Log.logInfo("indexContainerRAMHeap", "creating rwi heap dump '" + heapFile.getName() + "', " + cache.size() + " rwi's"); Log.logInfo("indexContainerRAMHeap", "creating rwi heap dump '" + heapFile.getName() + "', " + cache.size() + " rwi's");
if (heapFile.exists()) FileUtils.deletedelete(heapFile); if (heapFile.exists()) FileUtils.deletedelete(heapFile);
File tmpFile = new File(heapFile.getParentFile(), heapFile.getName() + ".tmp"); File tmpFile = new File(heapFile.getParentFile(), heapFile.getName() + ".tmp");
HeapWriter dump; HeapWriter dump;
try { try {
dump = new HeapWriter(tmpFile, heapFile, payloadrow.primaryKeyLength, Base64Order.enhancedCoder); dump = new HeapWriter(tmpFile, heapFile, payloadrow.primaryKeyLength, Base64Order.enhancedCoder, writeBuffer);
} catch (IOException e1) { } catch (IOException e1) {
e1.printStackTrace(); e1.printStackTrace();
return; return;

@ -75,6 +75,7 @@ public final class plasmaWordIndex {
public static final int maxCollectionPartition = 7; // should be 7 public static final int maxCollectionPartition = 7; // should be 7
public static final long targetFileSize = 100 * 1024 * 1024; // 100 MB public static final long targetFileSize = 100 * 1024 * 1024; // 100 MB
public static final long maxFileSize = BLOBArray.oneGigabyte; // 1GB public static final long maxFileSize = BLOBArray.oneGigabyte; // 1GB
public static final int writeBufferSize = 4 * 1024 * 1024;
// the reference factory // the reference factory
public static final ReferenceFactory<WordReference> wordReferenceFactory = new WordReferenceFactory(); public static final ReferenceFactory<WordReference> wordReferenceFactory = new WordReferenceFactory();
@ -152,7 +153,7 @@ public final class plasmaWordIndex {
// check if the peer has migrated the index // check if the peer has migrated the index
if (new File(indexPrimaryTextLocation, "RICOLLECTION").exists()) { if (new File(indexPrimaryTextLocation, "RICOLLECTION").exists()) {
this.merger = new IODispatcher<WordReference>(plasmaWordIndex.wordReferenceFactory, 1, 1); this.merger = new IODispatcher<WordReference>(plasmaWordIndex.wordReferenceFactory, 1, 1, writeBufferSize);
if (this.merger != null) this.merger.start(); if (this.merger != null) this.merger.start();
this.index = new IndexCollectionMigration<WordReference>( this.index = new IndexCollectionMigration<WordReference>(
indexPrimaryTextLocation, indexPrimaryTextLocation,
@ -163,9 +164,10 @@ public final class plasmaWordIndex {
targetFileSize, targetFileSize,
maxFileSize, maxFileSize,
this.merger, this.merger,
writeBufferSize,
log); log);
} else { } else {
this.merger = new IODispatcher<WordReference>(plasmaWordIndex.wordReferenceFactory, 1, 1); this.merger = new IODispatcher<WordReference>(plasmaWordIndex.wordReferenceFactory, 1, 1, writeBufferSize);
this.merger.start(); this.merger.start();
this.index = new IndexCell<WordReference>( this.index = new IndexCell<WordReference>(
new File(indexPrimaryTextLocation, "RICELL"), new File(indexPrimaryTextLocation, "RICELL"),
@ -175,7 +177,8 @@ public final class plasmaWordIndex {
entityCacheMaxSize, entityCacheMaxSize,
targetFileSize, targetFileSize,
maxFileSize, maxFileSize,
this.merger); this.merger,
writeBufferSize);
} }

Loading…
Cancel
Save