From 0f0b4aec75b47ddba97dfaca1717f764754bb77d Mon Sep 17 00:00:00 2001 From: orbiter Date: Mon, 30 Mar 2009 06:22:27 +0000 Subject: [PATCH] better index cell merge logic git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@5754 6c8d7289-2bf4-0310-a012-ef5d649a1542 --- ...ContainerMerger.java => IODispatcher.java} | 128 +++++++++++++----- source/de/anomic/kelondro/text/IndexCell.java | 31 +++-- .../text/ReferenceContainerArray.java | 7 +- source/de/anomic/plasma/plasmaWordIndex.java | 6 +- 4 files changed, 122 insertions(+), 50 deletions(-) rename source/de/anomic/kelondro/text/{ReferenceContainerMerger.java => IODispatcher.java} (68%) diff --git a/source/de/anomic/kelondro/text/ReferenceContainerMerger.java b/source/de/anomic/kelondro/text/IODispatcher.java similarity index 68% rename from source/de/anomic/kelondro/text/ReferenceContainerMerger.java rename to source/de/anomic/kelondro/text/IODispatcher.java index 585cdd296..122367cc4 100644 --- a/source/de/anomic/kelondro/text/ReferenceContainerMerger.java +++ b/source/de/anomic/kelondro/text/IODispatcher.java @@ -48,44 +48,78 @@ import de.anomic.kelondro.text.ReferenceContainerCache.blobFileEntries; * of merging with a call to the start() - method. To shut down all mergings, call terminate() * only once. */ -public class ReferenceContainerMerger extends Thread { +public class IODispatcher extends Thread { - private Job poison; - private ArrayBlockingQueue queue; - private ArrayBlockingQueue termi; + private final Boolean poison, vita; + private ArrayBlockingQueue controlQueue; + private ArrayBlockingQueue mergeQueue; + private ArrayBlockingQueue dumpQueue; + private ArrayBlockingQueue termQueue; - public ReferenceContainerMerger(int queueLength) { - this.poison = new Job(); - this.queue = new ArrayBlockingQueue(queueLength); - this.termi = new ArrayBlockingQueue(1); + public IODispatcher(int dumpQueueLength, int mergeQueueLength) { + this.poison = new Boolean(false); + this.vita = new Boolean(true); + this.controlQueue = new ArrayBlockingQueue(dumpQueueLength + mergeQueueLength + 1); + this.dumpQueue = new ArrayBlockingQueue(dumpQueueLength); + this.mergeQueue = new ArrayBlockingQueue(mergeQueueLength); + this.termQueue = new ArrayBlockingQueue(1); } public synchronized void terminate() { - if (queue == null || !this.isAlive()) return; - try { - queue.put(poison); - } catch (InterruptedException e) { - e.printStackTrace(); + if (termQueue != null && this.isAlive()) { + try { + controlQueue.put(poison); + } catch (InterruptedException e) { + e.printStackTrace(); + } + // await termination + try { + termQueue.take(); + } catch (InterruptedException e) { + e.printStackTrace(); + } } - // await termination - try { - termi.take(); - } catch (InterruptedException e) { - e.printStackTrace(); + } + + public synchronized void dump(ReferenceContainerCache cache, File file, ReferenceContainerArray array) { + if (dumpQueue == null || !this.isAlive()) { + try { + cache.dump(file, true); + } catch (IOException e) { + e.printStackTrace(); + } + } else { + DumpJob job = new DumpJob(cache, file, array); + try { + dumpQueue.put(job); + controlQueue.put(vita); + } catch (InterruptedException e) { + e.printStackTrace(); + try { + cache.dump(file, true); + } catch (IOException ee) { + e.printStackTrace(); + } + } } } + public synchronized int queueLength() { + return controlQueue.size(); + } + public synchronized void merge(File f1, File f2, BLOBArray array, Row payloadrow, File newFile) { - if (queue == null || !this.isAlive()) { + if (mergeQueue == null || !this.isAlive()) { try { mergeMount(f1, f2, array, payloadrow, newFile); } catch (IOException e) { e.printStackTrace(); } } else { - Job job = new Job(f1, f2, array, payloadrow, newFile); + MergeJob job = new MergeJob(f1, f2, array, payloadrow, newFile); try { - queue.put(job); + mergeQueue.put(job); + controlQueue.put(vita); } catch (InterruptedException e) { e.printStackTrace(); try { @@ -98,37 +132,61 @@ public class ReferenceContainerMerger extends Thread { } public void run() { - Job job; + MergeJob mergeJob; + DumpJob dumpJob; try { - while ((job = queue.take()) != poison) { - job.merge(); + loop: while (controlQueue.take() != poison) { + // prefer dump actions to flush memory to disc + if (dumpQueue.size() > 0) { + dumpJob = dumpQueue.take(); + dumpJob.dump(); + continue loop; + } + // otherwise do a merge operation + if (mergeQueue.size() > 0) { + mergeJob = mergeQueue.take(); + mergeJob.merge(); + continue loop; + } + assert false; // this should never happen } } catch (InterruptedException e) { e.printStackTrace(); } finally { try { - termi.put(poison); + termQueue.put(poison); } catch (InterruptedException e) { e.printStackTrace(); } } } - public class Job { + public class DumpJob { + ReferenceContainerCache cache; + File file; + ReferenceContainerArray array; + public DumpJob(ReferenceContainerCache cache, File file, ReferenceContainerArray array) { + this.cache = cache; + this.file = file; + this.array = array; + } + public void dump() { + try { + cache.dump(file, true); + array.mountBLOBContainer(file); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + + public class MergeJob { File f1, f2, newFile; BLOBArray array; Row payloadrow; - public Job() { - this.f1 = null; - this.f2 = null; - this.newFile = null; - this.array = null; - this.payloadrow = null; - } - - public Job(File f1, File f2, BLOBArray array, Row payloadrow, File newFile) { + public MergeJob(File f1, File f2, BLOBArray array, Row payloadrow, File newFile) { this.f1 = f1; this.f2 = f2; this.newFile = newFile; diff --git a/source/de/anomic/kelondro/text/IndexCell.java b/source/de/anomic/kelondro/text/IndexCell.java index bcc7b649b..b8fdc2e7a 100644 --- a/source/de/anomic/kelondro/text/IndexCell.java +++ b/source/de/anomic/kelondro/text/IndexCell.java @@ -51,10 +51,15 @@ import de.anomic.server.serverProfiling; public final class IndexCell extends AbstractBufferedIndex implements BufferedIndex { + private static final long cleanupCycle = 10000; + // class variables - private ReferenceContainerArray array; - private ReferenceContainerCache ram; - private int maxRamEntries, maxArrayFiles; + private final ReferenceContainerArray array; + private ReferenceContainerCache ram; + private int maxRamEntries, maxArrayFiles; + private final IODispatcher merger; + private final long lastCleanup; + public IndexCell( final File cellPath, @@ -62,13 +67,15 @@ public final class IndexCell extends AbstractBufferedIndex implements BufferedIn final Row payloadrow, final int maxRamEntries, final int maxArrayFiles, - ReferenceContainerMerger merger + IODispatcher merger ) throws IOException { this.array = new ReferenceContainerArray(cellPath, wordOrder, payloadrow, merger); this.ram = new ReferenceContainerCache(payloadrow, wordOrder); this.ram.initWriteMode(); this.maxRamEntries = maxRamEntries; this.maxArrayFiles = maxArrayFiles; + this.merger = merger; + this.lastCleanup = System.currentTimeMillis(); } @@ -85,12 +92,14 @@ public final class IndexCell extends AbstractBufferedIndex implements BufferedIn this.ram.add(newEntries); serverProfiling.update("wordcache", Long.valueOf(this.ram.size()), true); if (this.ram.size() > this.maxRamEntries) cacheDump(); + cacheCleanup(); } public synchronized void add(String hash, ReferenceRow entry) throws IOException { this.ram.add(hash, entry); serverProfiling.update("wordcache", Long.valueOf(this.ram.size()), true); if (this.ram.size() > this.maxRamEntries) cacheDump(); + cacheCleanup(); } /** @@ -262,15 +271,19 @@ public final class IndexCell extends AbstractBufferedIndex implements BufferedIn private synchronized void cacheDump() throws IOException { // dump the ram File dumpFile = this.array.newContainerBLOBFile(); - this.ram.dump(dumpFile, true); + //this.ram.dump(dumpFile, true); + //this.array.mountBLOBContainer(dumpFile); + merger.dump(this.ram, dumpFile, array); // get a fresh ram cache this.ram = new ReferenceContainerCache(this.array.rowdef(), this.array.ordering()); this.ram.initWriteMode(); - // add the dumped indexContainerBLOB to the array - this.array.mountBLOBContainer(dumpFile); + } + + private synchronized void cacheCleanup() throws IOException { + if (this.lastCleanup + cleanupCycle > System.currentTimeMillis()) return; int c = 0; - while (this.array.entries() > this.maxArrayFiles && c++ < 3) { - if (!this.array.merge(true)) break; + if (this.array.entries() > this.maxArrayFiles && c++ < 3) { + this.array.shrink(true); } } diff --git a/source/de/anomic/kelondro/text/ReferenceContainerArray.java b/source/de/anomic/kelondro/text/ReferenceContainerArray.java index cd1782e15..4dff5f9ea 100644 --- a/source/de/anomic/kelondro/text/ReferenceContainerArray.java +++ b/source/de/anomic/kelondro/text/ReferenceContainerArray.java @@ -41,7 +41,7 @@ public final class ReferenceContainerArray { private final Row payloadrow; private final BLOBArray array; - private final ReferenceContainerMerger merger; + private final IODispatcher merger; /** * open a index container based on a BLOB dump. The content of the BLOB will not be read @@ -57,7 +57,7 @@ public final class ReferenceContainerArray { final File heapLocation, final ByteOrder wordOrder, final Row payloadrow, - ReferenceContainerMerger merger) throws IOException { + IODispatcher merger) throws IOException { this.payloadrow = payloadrow; this.array = new BLOBArray( heapLocation, @@ -243,8 +243,9 @@ public final class ReferenceContainerArray { return this.array.entries(); } - public synchronized boolean merge(boolean similar) throws IOException { + public synchronized boolean shrink(boolean similar) throws IOException { if (this.array.entries() < 2) return false; + if (this.merger.queueLength() > 0) return false; File f1 = this.array.unmountOldestBLOB(similar); File f2 = (similar) ? this.array.unmountSimilarSizeBLOB(f1.length()) : this.array.unmountOldestBLOB(false); merger.merge(f1, f2, this.array, this.payloadrow, newContainerBLOBFile()); diff --git a/source/de/anomic/plasma/plasmaWordIndex.java b/source/de/anomic/plasma/plasmaWordIndex.java index e84bf353e..d0745fb2e 100644 --- a/source/de/anomic/plasma/plasmaWordIndex.java +++ b/source/de/anomic/plasma/plasmaWordIndex.java @@ -47,7 +47,7 @@ import de.anomic.kelondro.text.BufferedIndexCollection; import de.anomic.kelondro.text.IndexCell; import de.anomic.kelondro.text.MetadataRowContainer; import de.anomic.kelondro.text.ReferenceContainer; -import de.anomic.kelondro.text.ReferenceContainerMerger; +import de.anomic.kelondro.text.IODispatcher; import de.anomic.kelondro.text.ReferenceRow; import de.anomic.kelondro.text.MetadataRepository; import de.anomic.kelondro.text.Word; @@ -97,7 +97,7 @@ public final class plasmaWordIndex { public CrawlProfile.entry defaultTextSnippetLocalProfile, defaultTextSnippetGlobalProfile; public CrawlProfile.entry defaultMediaSnippetLocalProfile, defaultMediaSnippetGlobalProfile; private final File queuesRoot; - private ReferenceContainerMerger merger; + private IODispatcher merger; public plasmaWordIndex( final String networkName, @@ -132,7 +132,7 @@ public final class plasmaWordIndex { } } } - this.merger = (useCell) ? new ReferenceContainerMerger(1) : null; + this.merger = (useCell) ? new IODispatcher(1, 1) : null; if (this.merger != null) this.merger.start(); this.index = (useCell) ? new IndexCell(