fix for IODispatcher

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@5896 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
orbiter 16 years ago
parent f678472f46
commit 8d6212233b

@ -27,6 +27,7 @@ package de.anomic.kelondro.text;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Semaphore;
import de.anomic.kelondro.blob.BLOBArray; import de.anomic.kelondro.blob.BLOBArray;
import de.anomic.kelondro.index.Row; import de.anomic.kelondro.index.Row;
@ -46,33 +47,29 @@ import de.anomic.kelondro.util.Log;
*/ */
public class IODispatcher <ReferenceType extends Reference> extends Thread { public class IODispatcher <ReferenceType extends Reference> extends Thread {
private final Boolean poison, vita; private Semaphore controlQueue;
private ArrayBlockingQueue<Boolean> controlQueue; private Semaphore termination;
private ArrayBlockingQueue<MergeJob> mergeQueue; private ArrayBlockingQueue<MergeJob> mergeQueue;
private ArrayBlockingQueue<DumpJob> dumpQueue; private ArrayBlockingQueue<DumpJob> dumpQueue;
private ArrayBlockingQueue<Boolean> termQueue; private ReferenceFactory<ReferenceType> factory;
ReferenceFactory<ReferenceType> factory; private boolean terminate;
public IODispatcher(ReferenceFactory<ReferenceType> factory, int dumpQueueLength, int mergeQueueLength) { public IODispatcher(ReferenceFactory<ReferenceType> factory, int dumpQueueLength, int mergeQueueLength) {
this.factory = factory; this.factory = factory;
this.poison = new Boolean(false); this.termination = new Semaphore(0);
this.vita = new Boolean(true); this.controlQueue = new Semaphore(0);
this.controlQueue = new ArrayBlockingQueue<Boolean>(dumpQueueLength + mergeQueueLength + 1);
this.dumpQueue = new ArrayBlockingQueue<DumpJob>(dumpQueueLength); this.dumpQueue = new ArrayBlockingQueue<DumpJob>(dumpQueueLength);
this.mergeQueue = new ArrayBlockingQueue<MergeJob>(mergeQueueLength); this.mergeQueue = new ArrayBlockingQueue<MergeJob>(mergeQueueLength);
this.termQueue = new ArrayBlockingQueue<Boolean>(1); this.terminate = false;
} }
public synchronized void terminate() { public synchronized void terminate() {
if (termQueue != null && controlQueue != null && this.isAlive()) { if (termination != null && controlQueue != null && this.isAlive()) {
try { this.terminate = true;
controlQueue.put(poison); this.controlQueue.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
// await termination // await termination
try { try {
termQueue.take(); termination.acquire();
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
} }
@ -81,12 +78,14 @@ public class IODispatcher <ReferenceType extends Reference> extends Thread {
public synchronized void dump(ReferenceContainerCache<ReferenceType> cache, File file, ReferenceContainerArray<ReferenceType> array) { public synchronized void dump(ReferenceContainerCache<ReferenceType> cache, File file, ReferenceContainerArray<ReferenceType> array) {
if (dumpQueue == null || controlQueue == null || !this.isAlive()) { if (dumpQueue == null || controlQueue == null || !this.isAlive()) {
Log.logWarning("IODispatcher", "emergency dump of file " + file.getName());
cache.dump(file); cache.dump(file);
} else { } else {
DumpJob job = new DumpJob(cache, file, array); DumpJob job = new DumpJob(cache, file, array);
try { try {
dumpQueue.put(job); this.dumpQueue.put(job);
controlQueue.put(vita); this.controlQueue.release();
Log.logInfo("IODispatcher", "appended dump job for file " + file.getName());
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
cache.dump(file); cache.dump(file);
@ -95,12 +94,13 @@ public class IODispatcher <ReferenceType extends Reference> extends Thread {
} }
public synchronized int queueLength() { public synchronized int queueLength() {
return (controlQueue == null) ? 0 : controlQueue.size(); return (controlQueue == null) ? 0 : controlQueue.availablePermits();
} }
public synchronized void merge(File f1, File f2, BLOBArray array, Row payloadrow, File newFile) { public synchronized void merge(File f1, File f2, BLOBArray array, Row payloadrow, File newFile) {
if (mergeQueue == null || controlQueue == null || !this.isAlive()) { if (mergeQueue == null || controlQueue == null || !this.isAlive()) {
try { try {
Log.logWarning("IODispatcher", "emergency merge of files " + f1.getName() + ", " + f2.getName() + " to " + newFile.getName());
array.mergeMount(f1, f2, factory, payloadrow, newFile); array.mergeMount(f1, f2, factory, payloadrow, newFile);
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); e.printStackTrace();
@ -108,8 +108,9 @@ public class IODispatcher <ReferenceType extends Reference> extends Thread {
} else { } else {
MergeJob job = new MergeJob(f1, f2, array, payloadrow, newFile); MergeJob job = new MergeJob(f1, f2, array, payloadrow, newFile);
try { try {
mergeQueue.put(job); this.mergeQueue.put(job);
controlQueue.put(vita); this.controlQueue.release();
Log.logInfo("IODispatcher", "appended merge job of files " + f1.getName() + ", " + f2.getName() + " to " + newFile.getName());
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
try { try {
@ -125,7 +126,9 @@ public class IODispatcher <ReferenceType extends Reference> extends Thread {
MergeJob mergeJob; MergeJob mergeJob;
DumpJob dumpJob; DumpJob dumpJob;
try { try {
loop: while (controlQueue.take() != poison) { loop: while (true) {
controlQueue.acquire();
// prefer dump actions to flush memory to disc // prefer dump actions to flush memory to disc
if (dumpQueue.size() > 0) { if (dumpQueue.size() > 0) {
try { try {
@ -137,6 +140,7 @@ public class IODispatcher <ReferenceType extends Reference> extends Thread {
} }
continue loop; continue loop;
} }
// otherwise do a merge operation // otherwise do a merge operation
if (mergeQueue.size() > 0) { if (mergeQueue.size() > 0) {
try { try {
@ -148,19 +152,26 @@ public class IODispatcher <ReferenceType extends Reference> extends Thread {
} }
continue loop; continue loop;
} }
// check termination
if (this.terminate) {
Log.logInfo("IODispatcher", "catched termination signal");
break;
}
Log.logSevere("IODispatcher", "main loop in bad state, dumpQueue.size() = " + dumpQueue.size() + ", mergeQueue.size() = " + mergeQueue.size() + ", controlQueue.availablePermits() = " + controlQueue.availablePermits());
assert false; // this should never happen assert false; // this should never happen
} }
Log.logInfo("IODispatcher", "loop terminated");
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
Log.logSevere("IODispatcher", "main run job was interrupted (3)", e); Log.logSevere("IODispatcher", "main run job was interrupted (3)", e);
} finally { } finally {
Log.logInfo("IODispatcher", "terminating run job"); Log.logInfo("IODispatcher", "terminating run job");
controlQueue = null; controlQueue = null;
try { dumpQueue = null;
termQueue.put(poison); mergeQueue = null;
} catch (InterruptedException e) { termination.release();
e.printStackTrace();
}
} }
} }

Loading…
Cancel
Save