diff --git a/source/net/yacy/kelondro/rwi/IODispatcher.java b/source/net/yacy/kelondro/rwi/IODispatcher.java index f1c056b35..80eb33497 100644 --- a/source/net/yacy/kelondro/rwi/IODispatcher.java +++ b/source/net/yacy/kelondro/rwi/IODispatcher.java @@ -7,7 +7,7 @@ // $LastChangedBy$ // // LICENSE -// +// // This program is free software; you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation; either version 2 of the License, or @@ -54,9 +54,9 @@ public class IODispatcher extends Thread { private ArrayBlockingQueue> dumpQueue; //private ReferenceFactory factory; private boolean terminate; - private int writeBufferSize; - - public IODispatcher(int dumpQueueLength, int mergeQueueLength, int writeBufferSize) { + private final int writeBufferSize; + + public IODispatcher(final int dumpQueueLength, final int mergeQueueLength, final int writeBufferSize) { this.termination = new Semaphore(0); this.controlQueue = new Semaphore(0); this.dumpQueue = new ArrayBlockingQueue>(dumpQueueLength); @@ -64,121 +64,124 @@ public class IODispatcher extends Thread { this.writeBufferSize = writeBufferSize; this.terminate = false; } - + public void terminate() { - if (termination != null && controlQueue != null && this.isAlive()) { + if (this.termination != null && this.controlQueue != null && isAlive()) { this.terminate = true; this.controlQueue.release(); // await termination try { - termination.acquire(); - } catch (InterruptedException e) { + this.termination.acquire(); + } catch (final InterruptedException e) { Log.logException(e); } } } @SuppressWarnings("unchecked") - protected synchronized void dump(ReferenceContainerCache cache, File file, ReferenceContainerArray array) { - if (dumpQueue == null || controlQueue == null || !this.isAlive()) { + protected synchronized void dump(final ReferenceContainerCache cache, final File file, final ReferenceContainerArray array) { + if (this.dumpQueue == null || this.controlQueue == null || !isAlive()) { Log.logWarning("IODispatcher", "emergency dump of file " + file.getName()); - if (!cache.isEmpty()) cache.dump(file, (int) Math.min(MemoryControl.available() / 3, writeBufferSize), true); + if (!cache.isEmpty()) cache.dump(file, (int) Math.min(MemoryControl.available() / 3, this.writeBufferSize), true); } else { @SuppressWarnings("rawtypes") + final DumpJob job = new DumpJob(cache, file, array); - try { - // check if the dispatcher is running - if (this.isAlive()) { + // check if the dispatcher is running + if (isAlive()) { + try { this.dumpQueue.put(job); - this.controlQueue.release(); Log.logInfo("IODispatcher", "appended dump job for file " + file.getName()); - } else { - job.dump(); - Log.logWarning("IODispatcher", "dispatcher is not alive, just dumped file " + file.getName()); + } catch (final InterruptedException e) { + Log.logException(e); + cache.dump(file, (int) Math.min(MemoryControl.available() / 3, this.writeBufferSize), true); + } finally { + this.controlQueue.release(); } - } catch (InterruptedException e) { - Log.logException(e); - cache.dump(file, (int) Math.min(MemoryControl.available() / 3, writeBufferSize), true); + } else { + job.dump(); + Log.logWarning("IODispatcher", "dispatcher is not alive, just dumped file " + file.getName()); } } } - + protected synchronized int queueLength() { - return (controlQueue == null || !this.isAlive()) ? 0 : controlQueue.availablePermits(); + return (this.controlQueue == null || !isAlive()) ? 0 : this.controlQueue.availablePermits(); } - - protected synchronized void merge(File f1, File f2, ReferenceFactory factory, ArrayStack array, File newFile) { - if (mergeQueue == null || controlQueue == null || !this.isAlive()) { + + protected synchronized void merge(final File f1, final File f2, final ReferenceFactory factory, final ArrayStack array, final File newFile) { + if (this.mergeQueue == null || this.controlQueue == null || !isAlive()) { if (f2 == null) { Log.logWarning("IODispatcher", "emergency rewrite of file " + f1.getName() + " to " + newFile.getName()); } else { Log.logWarning("IODispatcher", "emergency merge of files " + f1.getName() + ", " + f2.getName() + " to " + newFile.getName()); } - array.mergeMount(f1, f2, factory, newFile, (int) Math.min(MemoryControl.available() / 3, writeBufferSize)); + array.mergeMount(f1, f2, factory, newFile, (int) Math.min(MemoryControl.available() / 3, this.writeBufferSize)); } else { - MergeJob job = new MergeJob(f1, f2, factory, array, newFile); - try { - if (this.isAlive()) { + final MergeJob job = new MergeJob(f1, f2, factory, array, newFile); + if (isAlive()) { + try { this.mergeQueue.put(job); - this.controlQueue.release(); if (f2 == null) { Log.logInfo("IODispatcher", "appended rewrite job of file " + f1.getName() + " to " + newFile.getName()); } else { Log.logInfo("IODispatcher", "appended merge job of files " + f1.getName() + ", " + f2.getName() + " to " + newFile.getName()); } + } catch (final InterruptedException e) { + Log.logWarning("IODispatcher", "interrupted: " + e.getMessage(), e); + array.mergeMount(f1, f2, factory, newFile, (int) Math.min(MemoryControl.available() / 3, this.writeBufferSize)); + } finally { + this.controlQueue.release(); + } + } else { + job.merge(); + if (f2 == null) { + Log.logWarning("IODispatcher", "dispatcher not running, merged files " + f1.getName() + " to " + newFile.getName()); } else { - job.merge(); - if (f2 == null) { - Log.logWarning("IODispatcher", "dispatcher not running, merged files " + f1.getName() + " to " + newFile.getName()); - } else { - Log.logWarning("IODispatcher", "dispatcher not running, rewrote file " + f1.getName() + ", " + f2.getName() + " to " + newFile.getName()); - } + Log.logWarning("IODispatcher", "dispatcher not running, rewrote file " + f1.getName() + ", " + f2.getName() + " to " + newFile.getName()); } - } catch (InterruptedException e) { - Log.logWarning("IODispatcher", "interrupted: " + e.getMessage(), e); - array.mergeMount(f1, f2, factory, newFile, (int) Math.min(MemoryControl.available() / 3, writeBufferSize)); } } } - + @Override public void run() { MergeJob mergeJob; DumpJob dumpJob; try { loop: while (true) try { - controlQueue.acquire(); - + this.controlQueue.acquire(); + // prefer dump actions to flush memory to disc - if (!dumpQueue.isEmpty()) { + if (!this.dumpQueue.isEmpty()) { File f = null; try { - dumpJob = dumpQueue.take(); + dumpJob = this.dumpQueue.take(); f = dumpJob.file; dumpJob.dump(); - } catch (InterruptedException e) { + } catch (final InterruptedException e) { Log.logSevere("IODispatcher", "main run job was interrupted (1)", e); Log.logException(e); - } catch (Exception e) { + } catch (final Exception e) { Log.logSevere("IODispatcher", "main run job had errors (1), dump to " + f + " failed.", e); Log.logException(e); } continue loop; } - + // otherwise do a merge operation - if (!mergeQueue.isEmpty()) { + if (!this.mergeQueue.isEmpty()) { File f = null, f1 = null, f2 = null; try { - mergeJob = mergeQueue.take(); + mergeJob = this.mergeQueue.take(); f = mergeJob.newFile; f1 = mergeJob.f1; f2 = mergeJob.f2; mergeJob.merge(); - } catch (InterruptedException e) { + } catch (final InterruptedException e) { Log.logSevere("IODispatcher", "main run job was interrupted (2)", e); Log.logException(e); - } catch (Exception e) { + } catch (final Exception e) { if (f2 == null) { Log.logSevere("IODispatcher", "main run job had errors (2), dump to " + f + " failed. Input file is " + f1, e); } else { @@ -188,63 +191,63 @@ public class IODispatcher extends Thread { } continue loop; } - + // check termination if (this.terminate) { Log.logInfo("IODispatcher", "caught termination signal"); break; } - Log.logSevere("IODispatcher", "main loop in bad state, dumpQueue.size() = " + dumpQueue.size() + ", mergeQueue.size() = " + mergeQueue.size() + ", controlQueue.availablePermits() = " + controlQueue.availablePermits()); + Log.logSevere("IODispatcher", "main loop in bad state, dumpQueue.size() = " + this.dumpQueue.size() + ", mergeQueue.size() = " + this.mergeQueue.size() + ", controlQueue.availablePermits() = " + this.controlQueue.availablePermits()); assert false : "this process statt should not be reached"; // this should never happen - } catch (Exception e) { + } catch (final Exception e) { Log.logSevere("IODispatcher", "main run job failed (X)", e); Log.logException(e); } Log.logInfo("IODispatcher", "loop terminated"); - } catch (Exception e) { + } catch (final Exception e) { Log.logSevere("IODispatcher", "main run job failed (4)", e); Log.logException(e); } finally { Log.logInfo("IODispatcher", "terminating run job"); - controlQueue = null; - dumpQueue = null; - mergeQueue = null; - termination.release(); + this.controlQueue = null; + this.dumpQueue = null; + this.mergeQueue = null; + this.termination.release(); } } - + private class DumpJob { - private ReferenceContainerCache cache; - private File file; - private ReferenceContainerArray array; - private DumpJob(ReferenceContainerCache cache, File file, ReferenceContainerArray array) { + private final ReferenceContainerCache cache; + private final File file; + private final ReferenceContainerArray array; + private DumpJob(final ReferenceContainerCache cache, final File file, final ReferenceContainerArray array) { this.cache = cache; this.file = file; this.array = array; } private void dump() { try { - if (!cache.isEmpty()) cache.dump(file, (int) Math.min(MemoryControl.available() / 3, writeBufferSize), true); - array.mountBLOBFile(file); - } catch (IOException e) { + if (!this.cache.isEmpty()) this.cache.dump(this.file, (int) Math.min(MemoryControl.available() / 3, IODispatcher.this.writeBufferSize), true); + this.array.mountBLOBFile(this.file); + } catch (final IOException e) { Log.logException(e); } } } - + private class MergeJob { - private File f1, f2, newFile; - private ArrayStack array; - private ReferenceFactory factory; - + private final File f1, f2, newFile; + private final ArrayStack array; + private final ReferenceFactory factory; + private MergeJob( - File f1, - File f2, - ReferenceFactory factory, - ArrayStack array, - File newFile) { + final File f1, + final File f2, + final ReferenceFactory factory, + final ArrayStack array, + final File newFile) { this.f1 = f1; this.f2 = f2; this.factory = factory; @@ -253,15 +256,15 @@ public class IODispatcher extends Thread { } private File merge() { - if (!f1.exists()) { - Log.logWarning("IODispatcher", "merge of file (1) " + f1.getName() + " failed: file does not exists"); + if (!this.f1.exists()) { + Log.logWarning("IODispatcher", "merge of file (1) " + this.f1.getName() + " failed: file does not exists"); return null; } - if (f2 != null && !f2.exists()) { - Log.logWarning("IODispatcher", "merge of file (2) " + f2.getName() + " failed: file does not exists"); + if (this.f2 != null && !this.f2.exists()) { + Log.logWarning("IODispatcher", "merge of file (2) " + this.f2.getName() + " failed: file does not exists"); return null; } - return array.mergeMount(f1, f2, factory, newFile, (int) Math.min(MemoryControl.available() / 3, writeBufferSize)); + return this.array.mergeMount(this.f1, this.f2, this.factory, this.newFile, (int) Math.min(MemoryControl.available() / 3, IODispatcher.this.writeBufferSize)); } }