@ -48,6 +48,8 @@ import net.yacy.kelondro.util.MemoryControl;
* /
public class IODispatcher extends Thread {
public static final Log log = new Log ( "IODispatcher" ) ;
private Semaphore controlQueue ;
private final Semaphore termination ;
private ArrayBlockingQueue < MergeJob > mergeQueue ;
@ -81,7 +83,7 @@ public class IODispatcher extends Thread {
@SuppressWarnings ( "unchecked" )
protected synchronized void dump ( final ReferenceContainerCache < ? extends Reference > cache , final File file , final ReferenceContainerArray < ? extends Reference > array ) {
if ( this . dumpQueue = = null | | this . controlQueue = = null | | ! isAlive ( ) ) {
Log. logWarning ( "IODispatcher" , "emergency dump of file " + file . getName ( ) ) ;
log. logWarning ( "emergency dump of file " + file . getName ( ) ) ;
if ( ! cache . isEmpty ( ) ) cache . dump ( file , ( int ) Math . min ( MemoryControl . available ( ) / 3 , this . writeBufferSize ) , true ) ;
} else {
@SuppressWarnings ( "rawtypes" )
@ -91,7 +93,7 @@ public class IODispatcher extends Thread {
if ( isAlive ( ) ) {
try {
this . dumpQueue . put ( job ) ;
Log. logInfo ( "IODispatcher" , "appended dump job for file " + file . getName ( ) ) ;
log. logInfo ( "appended dump job for file " + file . getName ( ) ) ;
} catch ( final InterruptedException e ) {
Log . logException ( e ) ;
cache . dump ( file , ( int ) Math . min ( MemoryControl . available ( ) / 3 , this . writeBufferSize ) , true ) ;
@ -100,7 +102,7 @@ public class IODispatcher extends Thread {
}
} else {
job . dump ( ) ;
Log. logWarning ( "IODispatcher" , "dispatcher is not alive, just dumped file " + file . getName ( ) ) ;
log. logWarning ( "dispatcher is not alive, just dumped file " + file . getName ( ) ) ;
}
}
}
@ -112,9 +114,9 @@ public class IODispatcher extends Thread {
protected synchronized void merge ( final File f1 , final File f2 , final ReferenceFactory < ? extends Reference > factory , final ArrayStack array , final File newFile ) {
if ( this . mergeQueue = = null | | this . controlQueue = = null | | ! isAlive ( ) ) {
if ( f2 = = null ) {
Log. logWarning ( "IODispatcher" , "emergency rewrite of file " + f1 . getName ( ) + " to " + newFile . getName ( ) ) ;
log. logWarning ( "emergency rewrite of file " + f1 . getName ( ) + " to " + newFile . getName ( ) ) ;
} else {
Log. logWarning ( "IODispatcher" , "emergency merge of files " + f1 . getName ( ) + ", " + f2 . getName ( ) + " to " + newFile . getName ( ) ) ;
log. logWarning ( "emergency merge of files " + f1 . getName ( ) + ", " + f2 . getName ( ) + " to " + newFile . getName ( ) ) ;
}
array . mergeMount ( f1 , f2 , factory , newFile , ( int ) Math . min ( MemoryControl . available ( ) / 3 , this . writeBufferSize ) ) ;
} else {
@ -123,12 +125,12 @@ public class IODispatcher extends Thread {
try {
this . mergeQueue . put ( job ) ;
if ( f2 = = null ) {
Log. logInfo ( "IODispatcher" , "appended rewrite job of file " + f1 . getName ( ) + " to " + newFile . getName ( ) ) ;
log. logInfo ( "appended rewrite job of file " + f1 . getName ( ) + " to " + newFile . getName ( ) ) ;
} else {
Log. logInfo ( "IODispatcher" , "appended merge job of files " + f1 . getName ( ) + ", " + f2 . getName ( ) + " to " + newFile . getName ( ) ) ;
log. logInfo ( "appended merge job of files " + f1 . getName ( ) + ", " + f2 . getName ( ) + " to " + newFile . getName ( ) ) ;
}
} catch ( final InterruptedException e ) {
Log. logWarning ( "IODispatcher" , "interrupted: " + e . getMessage ( ) , e ) ;
log. logWarning ( "interrupted: " + e . getMessage ( ) , e ) ;
array . mergeMount ( f1 , f2 , factory , newFile , ( int ) Math . min ( MemoryControl . available ( ) / 3 , this . writeBufferSize ) ) ;
} finally {
this . controlQueue . release ( ) ;
@ -136,9 +138,9 @@ public class IODispatcher extends Thread {
} else {
job . merge ( ) ;
if ( f2 = = null ) {
Log. logWarning ( "IODispatcher" , "dispatcher not running, merged files " + f1 . getName ( ) + " to " + newFile . getName ( ) ) ;
log. logWarning ( "dispatcher not running, merged files " + f1 . getName ( ) + " to " + newFile . getName ( ) ) ;
} else {
Log. logWarning ( "IODispatcher" , "dispatcher not running, rewrote file " + f1 . getName ( ) + ", " + f2 . getName ( ) + " to " + newFile . getName ( ) ) ;
log. logWarning ( "dispatcher not running, rewrote file " + f1 . getName ( ) + ", " + f2 . getName ( ) + " to " + newFile . getName ( ) ) ;
}
}
}
@ -160,10 +162,10 @@ public class IODispatcher extends Thread {
f = dumpJob . file ;
dumpJob . dump ( ) ;
} catch ( final InterruptedException e ) {
Log. logSevere ( "IODispatcher" , "main run job was interrupted (1)" , e ) ;
log. logSevere ( "main run job was interrupted (1)" , e ) ;
Log . logException ( e ) ;
} catch ( final Throwable e ) {
Log. logSevere ( "IODispatcher" , "main run job had errors (1), dump to " + f + " failed." , e ) ;
log. logSevere ( "main run job had errors (1), dump to " + f + " failed." , e ) ;
Log . logException ( e ) ;
}
continue loop ;
@ -179,13 +181,13 @@ public class IODispatcher extends Thread {
f2 = mergeJob . f2 ;
mergeJob . merge ( ) ;
} catch ( final InterruptedException e ) {
Log. logSevere ( "IODispatcher" , "main run job was interrupted (2)" , e ) ;
log. logSevere ( "main run job was interrupted (2)" , e ) ;
Log . logException ( e ) ;
} catch ( final Throwable e ) {
if ( f2 = = null ) {
Log . logSevere ( "IODispatcher" , "main run job had errors (2), dump to " + f + " failed. Input file is " + f1 , e ) ;
log . logSevere ( "main run job had errors (2), dump to " + f + " failed. Input file is " + f1 , e ) ;
} else {
Log. logSevere ( "IODispatcher" , "main run job had errors (2), dump to " + f + " failed. Input files are " + f1 + " and " + f2 , e ) ;
log. logSevere ( "main run job had errors (2), dump to " + f + " failed. Input files are " + f1 + " and " + f2 , e ) ;
}
Log . logException ( e ) ;
}
@ -194,22 +196,22 @@ public class IODispatcher extends Thread {
// check termination
if ( this . terminate ) {
Log. logInfo ( "IODispatcher" , "caught termination signal" ) ;
log. logInfo ( "caught termination signal" ) ;
break ;
}
Log. logSevere ( "IODispatcher" , "main loop in bad state, dumpQueue.size() = " + this . dumpQueue . size ( ) + ", mergeQueue.size() = " + this . mergeQueue . size ( ) + ", controlQueue.availablePermits() = " + this . controlQueue . availablePermits ( ) ) ;
log. logSevere ( "main loop in bad state, dumpQueue.size() = " + this . dumpQueue . size ( ) + ", mergeQueue.size() = " + this . mergeQueue . size ( ) + ", controlQueue.availablePermits() = " + this . controlQueue . availablePermits ( ) ) ;
assert false : "this process statt should not be reached" ; // this should never happen
} catch ( final Throwable e ) {
Log. logSevere ( "IODispatcher" , "main run job failed (X)" , e ) ;
log. logSevere ( "main run job failed (X)" , e ) ;
Log . logException ( e ) ;
}
Log . logInfo ( "IODispatcher" , "loop terminated" ) ;
log . logInfo ( "loop terminated" ) ;
} catch ( final Throwable e ) {
Log. logSevere ( "IODispatcher" , "main run job failed (4)" , e ) ;
log. logSevere ( "main run job failed (4)" , e ) ;
Log . logException ( e ) ;
} finally {
Log. logInfo ( "IODispatcher" , "terminating run job" ) ;
log. logInfo ( "terminating run job" ) ;
this . controlQueue = null ;
this . dumpQueue = null ;
this . mergeQueue = null ;
@ -257,11 +259,11 @@ public class IODispatcher extends Thread {
private File merge ( ) {
if ( ! this . f1 . exists ( ) ) {
Log . logWarning ( "IODispatcher" , "merge of file (1) " + this . f1 . getName ( ) + " failed: file does not exists" ) ;
log . logWarning ( "merge of file (1) " + this . f1 . getName ( ) + " failed: file does not exists" ) ;
return null ;
}
if ( this . f2 ! = null & & ! this . f2 . exists ( ) ) {
Log . logWarning ( "IODispatcher" , "merge of file (2) " + this . f2 . getName ( ) + " failed: file does not exists" ) ;
log . logWarning ( "merge of file (2) " + this . f2 . getName ( ) + " failed: file does not exists" ) ;
return null ;
}
return this . array . mergeMount ( this . f1 , this . f2 , this . factory , this . newFile , ( int ) Math . min ( MemoryControl . available ( ) / 3 , IODispatcher . this . writeBufferSize ) ) ;