Implement sharing of ioDispatcher for term & citation index

as proposed in ioDispatcher description
pull/8/head
reger 10 years ago
parent 17e820cfd7
commit d882991bc5

@ -93,10 +93,10 @@ public class IODispatcher extends Thread {
// check if the dispatcher is running
if (isAlive()) {
try {
this.dumpQueue.put(job);
this.dumpQueue.add(job);
log.info("appended dump job for file " + file.getName());
} catch (final InterruptedException e) {
ConcurrentLog.logException(e);
} catch (final IllegalStateException e) {
log.warn("could not append dump job, emergency dump of file " + file.getName());
cache.dump(file, (int) Math.min(MemoryControl.available() / 3, this.writeBufferSize), true);
} finally {
this.controlQueue.release();
@ -130,8 +130,8 @@ public class IODispatcher extends Thread {
} else {
log.info("appended merge job of files " + f1.getName() + ", " + f2.getName() + " to " + newFile.getName());
}
} catch (final Exception e) { // because mergeQueue size is 1, IllegalStateException could happen frequently (serial execution ensured in run() )
log.warn("Could not add merge job to queue: " + e.getMessage(), e);
} catch (final IllegalStateException e) { // because mergeQueue size is 1, IllegalStateException could happen frequently (serial execution ensured in run() )
log.warn("Could not add merge job to queue: " + e.getMessage());
} finally {
this.controlQueue.release();
}
@ -195,9 +195,7 @@ public class IODispatcher extends Thread {
log.info("caught termination signal");
break;
}
log.severe("main loop in bad state, dumpQueue.size() = " + this.dumpQueue.size() + ", mergeQueue.size() = " + this.mergeQueue.size() + ", controlQueue.availablePermits() = " + this.controlQueue.availablePermits() + ", MemoryControl.shortStatus() = " + MemoryControl.shortStatus());
assert false : "this process statt should not be reached"; // this should never happen
} catch (final Throwable e) {
log.severe("main run job failed (X)", e);
}

@ -72,7 +72,7 @@ public final class IndexCell<ReferenceType extends Reference> extends AbstractBu
private ReferenceContainerCache<ReferenceType> ram;
private final ComparableARC<byte[], Integer> countCache;
private int maxRamEntries;
private final IODispatcher merger;
private IODispatcher merger; // pointer to shared merger
private long lastCleanup;
private long lastDump;
private final long targetFileSize, maxFileSize;
@ -90,16 +90,16 @@ public final class IndexCell<ReferenceType extends Reference> extends AbstractBu
final int maxRamEntries,
final long targetFileSize,
final long maxFileSize,
final int writeBufferSize
final int writeBufferSize,
final IODispatcher merger
) throws IOException {
super(factory);
this.merger = new IODispatcher(2, 1, writeBufferSize);
this.merger = merger;
this.array = new ReferenceContainerArray<ReferenceType>(cellPath, prefix, factory, termOrder, termSize);
this.ram = new ReferenceContainerCache<ReferenceType>(factory, termOrder, termSize);
this.countCache = new ComparableARC<byte[], Integer>(1000, termOrder);
this.maxRamEntries = maxRamEntries;
this.merger.start();
this.lastCleanup = System.currentTimeMillis();
this.lastDump = System.currentTimeMillis();
this.targetFileSize = targetFileSize;
@ -600,7 +600,6 @@ public final class IndexCell<ReferenceType extends Reference> extends AbstractBu
// close all
this.flushShallRun = false;
if (this.flushThread != null) try { this.flushThread.join(); } catch (final InterruptedException e) {}
this.merger.terminate();
this.ram.close();
this.array.close();
}

@ -73,6 +73,7 @@ import net.yacy.kelondro.data.word.WordReference;
import net.yacy.kelondro.data.word.WordReferenceFactory;
import net.yacy.kelondro.data.word.WordReferenceRow;
import net.yacy.kelondro.index.RowHandleSet;
import net.yacy.kelondro.rwi.IODispatcher;
import net.yacy.kelondro.rwi.IndexCell;
import net.yacy.kelondro.rwi.ReferenceContainer;
import net.yacy.kelondro.rwi.ReferenceFactory;
@ -120,6 +121,7 @@ public class Segment {
protected IndexCell<WordReference> termIndex;
protected IndexCell<CitationReference> urlCitationIndex;
protected IndexTable firstSeenIndex;
protected IODispatcher merger = null; // shared iodispatcher for kelondro indexes
/**
* create a new Segment
@ -148,6 +150,11 @@ public class Segment {
public void connectRWI(final int entityCacheMaxSize, final long maxFileSize) throws IOException {
if (this.termIndex != null) return;
if (this.merger == null) { // init shared iodispatcher if none running
this.merger = new IODispatcher(2, 2, writeBufferSize);
this.merger.start();
}
this.termIndex = new IndexCell<WordReference>(
new File(this.segmentPath, "default"),
termIndexName,
@ -157,7 +164,8 @@ public class Segment {
entityCacheMaxSize,
targetFileSize,
maxFileSize,
writeBufferSize);
writeBufferSize,
merger);
}
public void disconnectRWI() {
@ -172,6 +180,11 @@ public class Segment {
public void connectCitation(final int entityCacheMaxSize, final long maxFileSize) throws IOException {
if (this.urlCitationIndex != null) return;
if (this.merger == null) { // init shared iodispatcher if none running
this.merger = new IODispatcher(2,2,writeBufferSize);
this.merger.start();
}
this.urlCitationIndex = new IndexCell<CitationReference>(
new File(this.segmentPath, "default"),
citationIndexName,
@ -181,7 +194,8 @@ public class Segment {
entityCacheMaxSize,
targetFileSize,
maxFileSize,
writeBufferSize);
writeBufferSize,
merger);
}
public void disconnectCitation() {
@ -468,6 +482,10 @@ public class Segment {
if (this.fulltext != null) this.fulltext.close();
if (this.urlCitationIndex != null) this.urlCitationIndex.close();
if (this.firstSeenIndex != null) this.firstSeenIndex.close();
if (this.merger != null) {
this.merger.terminate();
this.merger = null;
}
}
private static String votedLanguage(

Loading…
Cancel
Save