From d882991bc570749afcb52bf768a8c4732beb38a1 Mon Sep 17 00:00:00 2001 From: reger Date: Mon, 25 May 2015 19:46:26 +0200 Subject: [PATCH] Implement sharing of ioDispatcher for term & citation index as proposed in ioDispatcher description --- .../net/yacy/kelondro/rwi/IODispatcher.java | 14 +++++------- source/net/yacy/kelondro/rwi/IndexCell.java | 9 ++++---- source/net/yacy/search/index/Segment.java | 22 +++++++++++++++++-- 3 files changed, 30 insertions(+), 15 deletions(-) diff --git a/source/net/yacy/kelondro/rwi/IODispatcher.java b/source/net/yacy/kelondro/rwi/IODispatcher.java index d8138b8a8..7313d6f8c 100644 --- a/source/net/yacy/kelondro/rwi/IODispatcher.java +++ b/source/net/yacy/kelondro/rwi/IODispatcher.java @@ -93,10 +93,10 @@ public class IODispatcher extends Thread { // check if the dispatcher is running if (isAlive()) { try { - this.dumpQueue.put(job); + this.dumpQueue.add(job); log.info("appended dump job for file " + file.getName()); - } catch (final InterruptedException e) { - ConcurrentLog.logException(e); + } catch (final IllegalStateException e) { + log.warn("could not append dump job, emergency dump of file " + file.getName()); cache.dump(file, (int) Math.min(MemoryControl.available() / 3, this.writeBufferSize), true); } finally { this.controlQueue.release(); @@ -130,8 +130,8 @@ public class IODispatcher extends Thread { } else { log.info("appended merge job of files " + f1.getName() + ", " + f2.getName() + " to " + newFile.getName()); } - } catch (final Exception e) { // because mergeQueue size is 1, IllegalStateException could happen frequently (serial execution ensured in run() ) - log.warn("Could not add merge job to queue: " + e.getMessage(), e); + } catch (final IllegalStateException e) { // because mergeQueue size is 1, IllegalStateException could happen frequently (serial execution ensured in run() ) + log.warn("Could not add merge job to queue: " + e.getMessage()); } finally { this.controlQueue.release(); } @@ -195,9 +195,7 @@ public class IODispatcher extends Thread { log.info("caught termination signal"); break; } - - log.severe("main loop in bad state, dumpQueue.size() = " + this.dumpQueue.size() + ", mergeQueue.size() = " + this.mergeQueue.size() + ", controlQueue.availablePermits() = " + this.controlQueue.availablePermits() + ", MemoryControl.shortStatus() = " + MemoryControl.shortStatus()); - assert false : "this process statt should not be reached"; // this should never happen + } catch (final Throwable e) { log.severe("main run job failed (X)", e); } diff --git a/source/net/yacy/kelondro/rwi/IndexCell.java b/source/net/yacy/kelondro/rwi/IndexCell.java index fde5f4b3a..e124cba2a 100644 --- a/source/net/yacy/kelondro/rwi/IndexCell.java +++ b/source/net/yacy/kelondro/rwi/IndexCell.java @@ -72,7 +72,7 @@ public final class IndexCell extends AbstractBu private ReferenceContainerCache ram; private final ComparableARC countCache; private int maxRamEntries; - private final IODispatcher merger; + private IODispatcher merger; // pointer to shared merger private long lastCleanup; private long lastDump; private final long targetFileSize, maxFileSize; @@ -90,16 +90,16 @@ public final class IndexCell extends AbstractBu final int maxRamEntries, final long targetFileSize, final long maxFileSize, - final int writeBufferSize + final int writeBufferSize, + final IODispatcher merger ) throws IOException { super(factory); - this.merger = new IODispatcher(2, 1, writeBufferSize); + this.merger = merger; this.array = new ReferenceContainerArray(cellPath, prefix, factory, termOrder, termSize); this.ram = new ReferenceContainerCache(factory, termOrder, termSize); this.countCache = new ComparableARC(1000, termOrder); this.maxRamEntries = maxRamEntries; - this.merger.start(); this.lastCleanup = System.currentTimeMillis(); this.lastDump = System.currentTimeMillis(); this.targetFileSize = targetFileSize; @@ -600,7 +600,6 @@ public final class IndexCell extends AbstractBu // close all this.flushShallRun = false; if (this.flushThread != null) try { this.flushThread.join(); } catch (final InterruptedException e) {} - this.merger.terminate(); this.ram.close(); this.array.close(); } diff --git a/source/net/yacy/search/index/Segment.java b/source/net/yacy/search/index/Segment.java index 200bb852d..2308af7e2 100644 --- a/source/net/yacy/search/index/Segment.java +++ b/source/net/yacy/search/index/Segment.java @@ -73,6 +73,7 @@ import net.yacy.kelondro.data.word.WordReference; import net.yacy.kelondro.data.word.WordReferenceFactory; import net.yacy.kelondro.data.word.WordReferenceRow; import net.yacy.kelondro.index.RowHandleSet; +import net.yacy.kelondro.rwi.IODispatcher; import net.yacy.kelondro.rwi.IndexCell; import net.yacy.kelondro.rwi.ReferenceContainer; import net.yacy.kelondro.rwi.ReferenceFactory; @@ -120,6 +121,7 @@ public class Segment { protected IndexCell termIndex; protected IndexCell urlCitationIndex; protected IndexTable firstSeenIndex; + protected IODispatcher merger = null; // shared iodispatcher for kelondro indexes /** * create a new Segment @@ -148,6 +150,11 @@ public class Segment { public void connectRWI(final int entityCacheMaxSize, final long maxFileSize) throws IOException { if (this.termIndex != null) return; + + if (this.merger == null) { // init shared iodispatcher if none running + this.merger = new IODispatcher(2, 2, writeBufferSize); + this.merger.start(); + } this.termIndex = new IndexCell( new File(this.segmentPath, "default"), termIndexName, @@ -157,7 +164,8 @@ public class Segment { entityCacheMaxSize, targetFileSize, maxFileSize, - writeBufferSize); + writeBufferSize, + merger); } public void disconnectRWI() { @@ -172,6 +180,11 @@ public class Segment { public void connectCitation(final int entityCacheMaxSize, final long maxFileSize) throws IOException { if (this.urlCitationIndex != null) return; + + if (this.merger == null) { // init shared iodispatcher if none running + this.merger = new IODispatcher(2,2,writeBufferSize); + this.merger.start(); + } this.urlCitationIndex = new IndexCell( new File(this.segmentPath, "default"), citationIndexName, @@ -181,7 +194,8 @@ public class Segment { entityCacheMaxSize, targetFileSize, maxFileSize, - writeBufferSize); + writeBufferSize, + merger); } public void disconnectCitation() { @@ -468,6 +482,10 @@ public class Segment { if (this.fulltext != null) this.fulltext.close(); if (this.urlCitationIndex != null) this.urlCitationIndex.close(); if (this.firstSeenIndex != null) this.firstSeenIndex.close(); + if (this.merger != null) { + this.merger.terminate(); + this.merger = null; + } } private static String votedLanguage(