|
|
@ -54,9 +54,9 @@ public class IODispatcher extends Thread {
|
|
|
|
private ArrayBlockingQueue<DumpJob<? extends Reference>> dumpQueue;
|
|
|
|
private ArrayBlockingQueue<DumpJob<? extends Reference>> dumpQueue;
|
|
|
|
//private ReferenceFactory<ReferenceType> factory;
|
|
|
|
//private ReferenceFactory<ReferenceType> factory;
|
|
|
|
private boolean terminate;
|
|
|
|
private boolean terminate;
|
|
|
|
private int writeBufferSize;
|
|
|
|
private final int writeBufferSize;
|
|
|
|
|
|
|
|
|
|
|
|
public IODispatcher(int dumpQueueLength, int mergeQueueLength, int writeBufferSize) {
|
|
|
|
public IODispatcher(final int dumpQueueLength, final int mergeQueueLength, final int writeBufferSize) {
|
|
|
|
this.termination = new Semaphore(0);
|
|
|
|
this.termination = new Semaphore(0);
|
|
|
|
this.controlQueue = new Semaphore(0);
|
|
|
|
this.controlQueue = new Semaphore(0);
|
|
|
|
this.dumpQueue = new ArrayBlockingQueue<DumpJob<? extends Reference>>(dumpQueueLength);
|
|
|
|
this.dumpQueue = new ArrayBlockingQueue<DumpJob<? extends Reference>>(dumpQueueLength);
|
|
|
@ -66,66 +66,73 @@ public class IODispatcher extends Thread {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
public void terminate() {
|
|
|
|
public void terminate() {
|
|
|
|
if (termination != null && controlQueue != null && this.isAlive()) {
|
|
|
|
if (this.termination != null && this.controlQueue != null && isAlive()) {
|
|
|
|
this.terminate = true;
|
|
|
|
this.terminate = true;
|
|
|
|
this.controlQueue.release();
|
|
|
|
this.controlQueue.release();
|
|
|
|
// await termination
|
|
|
|
// await termination
|
|
|
|
try {
|
|
|
|
try {
|
|
|
|
termination.acquire();
|
|
|
|
this.termination.acquire();
|
|
|
|
} catch (InterruptedException e) {
|
|
|
|
} catch (final InterruptedException e) {
|
|
|
|
Log.logException(e);
|
|
|
|
Log.logException(e);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
@SuppressWarnings("unchecked")
|
|
|
|
@SuppressWarnings("unchecked")
|
|
|
|
protected synchronized void dump(ReferenceContainerCache<? extends Reference> cache, File file, ReferenceContainerArray<? extends Reference> array) {
|
|
|
|
protected synchronized void dump(final ReferenceContainerCache<? extends Reference> cache, final File file, final ReferenceContainerArray<? extends Reference> array) {
|
|
|
|
if (dumpQueue == null || controlQueue == null || !this.isAlive()) {
|
|
|
|
if (this.dumpQueue == null || this.controlQueue == null || !isAlive()) {
|
|
|
|
Log.logWarning("IODispatcher", "emergency dump of file " + file.getName());
|
|
|
|
Log.logWarning("IODispatcher", "emergency dump of file " + file.getName());
|
|
|
|
if (!cache.isEmpty()) cache.dump(file, (int) Math.min(MemoryControl.available() / 3, writeBufferSize), true);
|
|
|
|
if (!cache.isEmpty()) cache.dump(file, (int) Math.min(MemoryControl.available() / 3, this.writeBufferSize), true);
|
|
|
|
} else {
|
|
|
|
} else {
|
|
|
|
@SuppressWarnings("rawtypes")
|
|
|
|
@SuppressWarnings("rawtypes")
|
|
|
|
|
|
|
|
final
|
|
|
|
DumpJob<? extends Reference> job = new DumpJob(cache, file, array);
|
|
|
|
DumpJob<? extends Reference> job = new DumpJob(cache, file, array);
|
|
|
|
try {
|
|
|
|
|
|
|
|
// check if the dispatcher is running
|
|
|
|
// check if the dispatcher is running
|
|
|
|
if (this.isAlive()) {
|
|
|
|
if (isAlive()) {
|
|
|
|
|
|
|
|
try {
|
|
|
|
this.dumpQueue.put(job);
|
|
|
|
this.dumpQueue.put(job);
|
|
|
|
this.controlQueue.release();
|
|
|
|
|
|
|
|
Log.logInfo("IODispatcher", "appended dump job for file " + file.getName());
|
|
|
|
Log.logInfo("IODispatcher", "appended dump job for file " + file.getName());
|
|
|
|
|
|
|
|
} catch (final InterruptedException e) {
|
|
|
|
|
|
|
|
Log.logException(e);
|
|
|
|
|
|
|
|
cache.dump(file, (int) Math.min(MemoryControl.available() / 3, this.writeBufferSize), true);
|
|
|
|
|
|
|
|
} finally {
|
|
|
|
|
|
|
|
this.controlQueue.release();
|
|
|
|
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
} else {
|
|
|
|
job.dump();
|
|
|
|
job.dump();
|
|
|
|
Log.logWarning("IODispatcher", "dispatcher is not alive, just dumped file " + file.getName());
|
|
|
|
Log.logWarning("IODispatcher", "dispatcher is not alive, just dumped file " + file.getName());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} catch (InterruptedException e) {
|
|
|
|
|
|
|
|
Log.logException(e);
|
|
|
|
|
|
|
|
cache.dump(file, (int) Math.min(MemoryControl.available() / 3, writeBufferSize), true);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
protected synchronized int queueLength() {
|
|
|
|
protected synchronized int queueLength() {
|
|
|
|
return (controlQueue == null || !this.isAlive()) ? 0 : controlQueue.availablePermits();
|
|
|
|
return (this.controlQueue == null || !isAlive()) ? 0 : this.controlQueue.availablePermits();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
protected synchronized void merge(File f1, File f2, ReferenceFactory<? extends Reference> factory, ArrayStack array, File newFile) {
|
|
|
|
protected synchronized void merge(final File f1, final File f2, final ReferenceFactory<? extends Reference> factory, final ArrayStack array, final File newFile) {
|
|
|
|
if (mergeQueue == null || controlQueue == null || !this.isAlive()) {
|
|
|
|
if (this.mergeQueue == null || this.controlQueue == null || !isAlive()) {
|
|
|
|
if (f2 == null) {
|
|
|
|
if (f2 == null) {
|
|
|
|
Log.logWarning("IODispatcher", "emergency rewrite of file " + f1.getName() + " to " + newFile.getName());
|
|
|
|
Log.logWarning("IODispatcher", "emergency rewrite of file " + f1.getName() + " to " + newFile.getName());
|
|
|
|
} else {
|
|
|
|
} else {
|
|
|
|
Log.logWarning("IODispatcher", "emergency merge of files " + f1.getName() + ", " + f2.getName() + " to " + newFile.getName());
|
|
|
|
Log.logWarning("IODispatcher", "emergency merge of files " + f1.getName() + ", " + f2.getName() + " to " + newFile.getName());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
array.mergeMount(f1, f2, factory, newFile, (int) Math.min(MemoryControl.available() / 3, writeBufferSize));
|
|
|
|
array.mergeMount(f1, f2, factory, newFile, (int) Math.min(MemoryControl.available() / 3, this.writeBufferSize));
|
|
|
|
} else {
|
|
|
|
} else {
|
|
|
|
MergeJob job = new MergeJob(f1, f2, factory, array, newFile);
|
|
|
|
final MergeJob job = new MergeJob(f1, f2, factory, array, newFile);
|
|
|
|
|
|
|
|
if (isAlive()) {
|
|
|
|
try {
|
|
|
|
try {
|
|
|
|
if (this.isAlive()) {
|
|
|
|
|
|
|
|
this.mergeQueue.put(job);
|
|
|
|
this.mergeQueue.put(job);
|
|
|
|
this.controlQueue.release();
|
|
|
|
|
|
|
|
if (f2 == null) {
|
|
|
|
if (f2 == null) {
|
|
|
|
Log.logInfo("IODispatcher", "appended rewrite job of file " + f1.getName() + " to " + newFile.getName());
|
|
|
|
Log.logInfo("IODispatcher", "appended rewrite job of file " + f1.getName() + " to " + newFile.getName());
|
|
|
|
} else {
|
|
|
|
} else {
|
|
|
|
Log.logInfo("IODispatcher", "appended merge job of files " + f1.getName() + ", " + f2.getName() + " to " + newFile.getName());
|
|
|
|
Log.logInfo("IODispatcher", "appended merge job of files " + f1.getName() + ", " + f2.getName() + " to " + newFile.getName());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
} catch (final InterruptedException e) {
|
|
|
|
|
|
|
|
Log.logWarning("IODispatcher", "interrupted: " + e.getMessage(), e);
|
|
|
|
|
|
|
|
array.mergeMount(f1, f2, factory, newFile, (int) Math.min(MemoryControl.available() / 3, this.writeBufferSize));
|
|
|
|
|
|
|
|
} finally {
|
|
|
|
|
|
|
|
this.controlQueue.release();
|
|
|
|
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
} else {
|
|
|
|
job.merge();
|
|
|
|
job.merge();
|
|
|
|
if (f2 == null) {
|
|
|
|
if (f2 == null) {
|
|
|
@ -134,10 +141,6 @@ public class IODispatcher extends Thread {
|
|
|
|
Log.logWarning("IODispatcher", "dispatcher not running, rewrote file " + f1.getName() + ", " + f2.getName() + " to " + newFile.getName());
|
|
|
|
Log.logWarning("IODispatcher", "dispatcher not running, rewrote file " + f1.getName() + ", " + f2.getName() + " to " + newFile.getName());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} catch (InterruptedException e) {
|
|
|
|
|
|
|
|
Log.logWarning("IODispatcher", "interrupted: " + e.getMessage(), e);
|
|
|
|
|
|
|
|
array.mergeMount(f1, f2, factory, newFile, (int) Math.min(MemoryControl.available() / 3, writeBufferSize));
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
@ -147,19 +150,19 @@ public class IODispatcher extends Thread {
|
|
|
|
DumpJob<? extends Reference> dumpJob;
|
|
|
|
DumpJob<? extends Reference> dumpJob;
|
|
|
|
try {
|
|
|
|
try {
|
|
|
|
loop: while (true) try {
|
|
|
|
loop: while (true) try {
|
|
|
|
controlQueue.acquire();
|
|
|
|
this.controlQueue.acquire();
|
|
|
|
|
|
|
|
|
|
|
|
// prefer dump actions to flush memory to disc
|
|
|
|
// prefer dump actions to flush memory to disc
|
|
|
|
if (!dumpQueue.isEmpty()) {
|
|
|
|
if (!this.dumpQueue.isEmpty()) {
|
|
|
|
File f = null;
|
|
|
|
File f = null;
|
|
|
|
try {
|
|
|
|
try {
|
|
|
|
dumpJob = dumpQueue.take();
|
|
|
|
dumpJob = this.dumpQueue.take();
|
|
|
|
f = dumpJob.file;
|
|
|
|
f = dumpJob.file;
|
|
|
|
dumpJob.dump();
|
|
|
|
dumpJob.dump();
|
|
|
|
} catch (InterruptedException e) {
|
|
|
|
} catch (final InterruptedException e) {
|
|
|
|
Log.logSevere("IODispatcher", "main run job was interrupted (1)", e);
|
|
|
|
Log.logSevere("IODispatcher", "main run job was interrupted (1)", e);
|
|
|
|
Log.logException(e);
|
|
|
|
Log.logException(e);
|
|
|
|
} catch (Exception e) {
|
|
|
|
} catch (final Exception e) {
|
|
|
|
Log.logSevere("IODispatcher", "main run job had errors (1), dump to " + f + " failed.", e);
|
|
|
|
Log.logSevere("IODispatcher", "main run job had errors (1), dump to " + f + " failed.", e);
|
|
|
|
Log.logException(e);
|
|
|
|
Log.logException(e);
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -167,18 +170,18 @@ public class IODispatcher extends Thread {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// otherwise do a merge operation
|
|
|
|
// otherwise do a merge operation
|
|
|
|
if (!mergeQueue.isEmpty()) {
|
|
|
|
if (!this.mergeQueue.isEmpty()) {
|
|
|
|
File f = null, f1 = null, f2 = null;
|
|
|
|
File f = null, f1 = null, f2 = null;
|
|
|
|
try {
|
|
|
|
try {
|
|
|
|
mergeJob = mergeQueue.take();
|
|
|
|
mergeJob = this.mergeQueue.take();
|
|
|
|
f = mergeJob.newFile;
|
|
|
|
f = mergeJob.newFile;
|
|
|
|
f1 = mergeJob.f1;
|
|
|
|
f1 = mergeJob.f1;
|
|
|
|
f2 = mergeJob.f2;
|
|
|
|
f2 = mergeJob.f2;
|
|
|
|
mergeJob.merge();
|
|
|
|
mergeJob.merge();
|
|
|
|
} catch (InterruptedException e) {
|
|
|
|
} catch (final InterruptedException e) {
|
|
|
|
Log.logSevere("IODispatcher", "main run job was interrupted (2)", e);
|
|
|
|
Log.logSevere("IODispatcher", "main run job was interrupted (2)", e);
|
|
|
|
Log.logException(e);
|
|
|
|
Log.logException(e);
|
|
|
|
} catch (Exception e) {
|
|
|
|
} catch (final Exception e) {
|
|
|
|
if (f2 == null) {
|
|
|
|
if (f2 == null) {
|
|
|
|
Log.logSevere("IODispatcher", "main run job had errors (2), dump to " + f + " failed. Input file is " + f1, e);
|
|
|
|
Log.logSevere("IODispatcher", "main run job had errors (2), dump to " + f + " failed. Input file is " + f1, e);
|
|
|
|
} else {
|
|
|
|
} else {
|
|
|
@ -195,39 +198,39 @@ public class IODispatcher extends Thread {
|
|
|
|
break;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
Log.logSevere("IODispatcher", "main loop in bad state, dumpQueue.size() = " + dumpQueue.size() + ", mergeQueue.size() = " + mergeQueue.size() + ", controlQueue.availablePermits() = " + controlQueue.availablePermits());
|
|
|
|
Log.logSevere("IODispatcher", "main loop in bad state, dumpQueue.size() = " + this.dumpQueue.size() + ", mergeQueue.size() = " + this.mergeQueue.size() + ", controlQueue.availablePermits() = " + this.controlQueue.availablePermits());
|
|
|
|
assert false : "this process statt should not be reached"; // this should never happen
|
|
|
|
assert false : "this process statt should not be reached"; // this should never happen
|
|
|
|
} catch (Exception e) {
|
|
|
|
} catch (final Exception e) {
|
|
|
|
Log.logSevere("IODispatcher", "main run job failed (X)", e);
|
|
|
|
Log.logSevere("IODispatcher", "main run job failed (X)", e);
|
|
|
|
Log.logException(e);
|
|
|
|
Log.logException(e);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
Log.logInfo("IODispatcher", "loop terminated");
|
|
|
|
Log.logInfo("IODispatcher", "loop terminated");
|
|
|
|
} catch (Exception e) {
|
|
|
|
} catch (final Exception e) {
|
|
|
|
Log.logSevere("IODispatcher", "main run job failed (4)", e);
|
|
|
|
Log.logSevere("IODispatcher", "main run job failed (4)", e);
|
|
|
|
Log.logException(e);
|
|
|
|
Log.logException(e);
|
|
|
|
} finally {
|
|
|
|
} finally {
|
|
|
|
Log.logInfo("IODispatcher", "terminating run job");
|
|
|
|
Log.logInfo("IODispatcher", "terminating run job");
|
|
|
|
controlQueue = null;
|
|
|
|
this.controlQueue = null;
|
|
|
|
dumpQueue = null;
|
|
|
|
this.dumpQueue = null;
|
|
|
|
mergeQueue = null;
|
|
|
|
this.mergeQueue = null;
|
|
|
|
termination.release();
|
|
|
|
this.termination.release();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
private class DumpJob<ReferenceType extends Reference> {
|
|
|
|
private class DumpJob<ReferenceType extends Reference> {
|
|
|
|
private ReferenceContainerCache<ReferenceType> cache;
|
|
|
|
private final ReferenceContainerCache<ReferenceType> cache;
|
|
|
|
private File file;
|
|
|
|
private final File file;
|
|
|
|
private ReferenceContainerArray<ReferenceType> array;
|
|
|
|
private final ReferenceContainerArray<ReferenceType> array;
|
|
|
|
private DumpJob(ReferenceContainerCache<ReferenceType> cache, File file, ReferenceContainerArray<ReferenceType> array) {
|
|
|
|
private DumpJob(final ReferenceContainerCache<ReferenceType> cache, final File file, final ReferenceContainerArray<ReferenceType> array) {
|
|
|
|
this.cache = cache;
|
|
|
|
this.cache = cache;
|
|
|
|
this.file = file;
|
|
|
|
this.file = file;
|
|
|
|
this.array = array;
|
|
|
|
this.array = array;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
private void dump() {
|
|
|
|
private void dump() {
|
|
|
|
try {
|
|
|
|
try {
|
|
|
|
if (!cache.isEmpty()) cache.dump(file, (int) Math.min(MemoryControl.available() / 3, writeBufferSize), true);
|
|
|
|
if (!this.cache.isEmpty()) this.cache.dump(this.file, (int) Math.min(MemoryControl.available() / 3, IODispatcher.this.writeBufferSize), true);
|
|
|
|
array.mountBLOBFile(file);
|
|
|
|
this.array.mountBLOBFile(this.file);
|
|
|
|
} catch (IOException e) {
|
|
|
|
} catch (final IOException e) {
|
|
|
|
Log.logException(e);
|
|
|
|
Log.logException(e);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -235,16 +238,16 @@ public class IODispatcher extends Thread {
|
|
|
|
|
|
|
|
|
|
|
|
private class MergeJob {
|
|
|
|
private class MergeJob {
|
|
|
|
|
|
|
|
|
|
|
|
private File f1, f2, newFile;
|
|
|
|
private final File f1, f2, newFile;
|
|
|
|
private ArrayStack array;
|
|
|
|
private final ArrayStack array;
|
|
|
|
private ReferenceFactory<? extends Reference> factory;
|
|
|
|
private final ReferenceFactory<? extends Reference> factory;
|
|
|
|
|
|
|
|
|
|
|
|
private MergeJob(
|
|
|
|
private MergeJob(
|
|
|
|
File f1,
|
|
|
|
final File f1,
|
|
|
|
File f2,
|
|
|
|
final File f2,
|
|
|
|
ReferenceFactory<? extends Reference> factory,
|
|
|
|
final ReferenceFactory<? extends Reference> factory,
|
|
|
|
ArrayStack array,
|
|
|
|
final ArrayStack array,
|
|
|
|
File newFile) {
|
|
|
|
final File newFile) {
|
|
|
|
this.f1 = f1;
|
|
|
|
this.f1 = f1;
|
|
|
|
this.f2 = f2;
|
|
|
|
this.f2 = f2;
|
|
|
|
this.factory = factory;
|
|
|
|
this.factory = factory;
|
|
|
@ -253,15 +256,15 @@ public class IODispatcher extends Thread {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
private File merge() {
|
|
|
|
private File merge() {
|
|
|
|
if (!f1.exists()) {
|
|
|
|
if (!this.f1.exists()) {
|
|
|
|
Log.logWarning("IODispatcher", "merge of file (1) " + f1.getName() + " failed: file does not exists");
|
|
|
|
Log.logWarning("IODispatcher", "merge of file (1) " + this.f1.getName() + " failed: file does not exists");
|
|
|
|
return null;
|
|
|
|
return null;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if (f2 != null && !f2.exists()) {
|
|
|
|
if (this.f2 != null && !this.f2.exists()) {
|
|
|
|
Log.logWarning("IODispatcher", "merge of file (2) " + f2.getName() + " failed: file does not exists");
|
|
|
|
Log.logWarning("IODispatcher", "merge of file (2) " + this.f2.getName() + " failed: file does not exists");
|
|
|
|
return null;
|
|
|
|
return null;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return array.mergeMount(f1, f2, factory, newFile, (int) Math.min(MemoryControl.available() / 3, writeBufferSize));
|
|
|
|
return this.array.mergeMount(this.f1, this.f2, this.factory, this.newFile, (int) Math.min(MemoryControl.available() / 3, IODispatcher.this.writeBufferSize));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|