diff --git a/source/de/anomic/kelondro/text/IODispatcher.java b/source/de/anomic/kelondro/text/IODispatcher.java index ac2dfba96..9b8bc8c8f 100644 --- a/source/de/anomic/kelondro/text/IODispatcher.java +++ b/source/de/anomic/kelondro/text/IODispatcher.java @@ -27,6 +27,7 @@ package de.anomic.kelondro.text; import java.io.File; import java.io.IOException; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.Semaphore; import de.anomic.kelondro.blob.BLOBArray; import de.anomic.kelondro.index.Row; @@ -46,33 +47,29 @@ import de.anomic.kelondro.util.Log; */ public class IODispatcher extends Thread { - private final Boolean poison, vita; - private ArrayBlockingQueue controlQueue; + private Semaphore controlQueue; + private Semaphore termination; private ArrayBlockingQueue mergeQueue; private ArrayBlockingQueue dumpQueue; - private ArrayBlockingQueue termQueue; - ReferenceFactory factory; + private ReferenceFactory factory; + private boolean terminate; public IODispatcher(ReferenceFactory factory, int dumpQueueLength, int mergeQueueLength) { this.factory = factory; - this.poison = new Boolean(false); - this.vita = new Boolean(true); - this.controlQueue = new ArrayBlockingQueue(dumpQueueLength + mergeQueueLength + 1); + this.termination = new Semaphore(0); + this.controlQueue = new Semaphore(0); this.dumpQueue = new ArrayBlockingQueue(dumpQueueLength); this.mergeQueue = new ArrayBlockingQueue(mergeQueueLength); - this.termQueue = new ArrayBlockingQueue(1); + this.terminate = false; } public synchronized void terminate() { - if (termQueue != null && controlQueue != null && this.isAlive()) { - try { - controlQueue.put(poison); - } catch (InterruptedException e) { - e.printStackTrace(); - } + if (termination != null && controlQueue != null && this.isAlive()) { + this.terminate = true; + this.controlQueue.release(); // await termination try { - termQueue.take(); + termination.acquire(); } catch (InterruptedException e) { e.printStackTrace(); } @@ -81,12 +78,14 @@ public class IODispatcher extends Thread { public synchronized void dump(ReferenceContainerCache cache, File file, ReferenceContainerArray array) { if (dumpQueue == null || controlQueue == null || !this.isAlive()) { + Log.logWarning("IODispatcher", "emergency dump of file " + file.getName()); cache.dump(file); } else { DumpJob job = new DumpJob(cache, file, array); try { - dumpQueue.put(job); - controlQueue.put(vita); + this.dumpQueue.put(job); + this.controlQueue.release(); + Log.logInfo("IODispatcher", "appended dump job for file " + file.getName()); } catch (InterruptedException e) { e.printStackTrace(); cache.dump(file); @@ -95,12 +94,13 @@ public class IODispatcher extends Thread { } public synchronized int queueLength() { - return (controlQueue == null) ? 0 : controlQueue.size(); + return (controlQueue == null) ? 0 : controlQueue.availablePermits(); } public synchronized void merge(File f1, File f2, BLOBArray array, Row payloadrow, File newFile) { if (mergeQueue == null || controlQueue == null || !this.isAlive()) { try { + Log.logWarning("IODispatcher", "emergency merge of files " + f1.getName() + ", " + f2.getName() + " to " + newFile.getName()); array.mergeMount(f1, f2, factory, payloadrow, newFile); } catch (IOException e) { e.printStackTrace(); @@ -108,8 +108,9 @@ public class IODispatcher extends Thread { } else { MergeJob job = new MergeJob(f1, f2, array, payloadrow, newFile); try { - mergeQueue.put(job); - controlQueue.put(vita); + this.mergeQueue.put(job); + this.controlQueue.release(); + Log.logInfo("IODispatcher", "appended merge job of files " + f1.getName() + ", " + f2.getName() + " to " + newFile.getName()); } catch (InterruptedException e) { e.printStackTrace(); try { @@ -125,7 +126,9 @@ public class IODispatcher extends Thread { MergeJob mergeJob; DumpJob dumpJob; try { - loop: while (controlQueue.take() != poison) { + loop: while (true) { + controlQueue.acquire(); + // prefer dump actions to flush memory to disc if (dumpQueue.size() > 0) { try { @@ -137,6 +140,7 @@ public class IODispatcher extends Thread { } continue loop; } + // otherwise do a merge operation if (mergeQueue.size() > 0) { try { @@ -148,19 +152,26 @@ public class IODispatcher extends Thread { } continue loop; } + + // check termination + if (this.terminate) { + Log.logInfo("IODispatcher", "catched termination signal"); + break; + } + + Log.logSevere("IODispatcher", "main loop in bad state, dumpQueue.size() = " + dumpQueue.size() + ", mergeQueue.size() = " + mergeQueue.size() + ", controlQueue.availablePermits() = " + controlQueue.availablePermits()); assert false; // this should never happen } + Log.logInfo("IODispatcher", "loop terminated"); } catch (InterruptedException e) { e.printStackTrace(); Log.logSevere("IODispatcher", "main run job was interrupted (3)", e); } finally { Log.logInfo("IODispatcher", "terminating run job"); controlQueue = null; - try { - termQueue.put(poison); - } catch (InterruptedException e) { - e.printStackTrace(); - } + dumpQueue = null; + mergeQueue = null; + termination.release(); } }