@ -7,7 +7,7 @@
// $LastChangedBy$
// $LastChangedBy$
//
//
// LICENSE
// LICENSE
//
//
// This program is free software; you can redistribute it and/or modify
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 2 of the License, or
// the Free Software Foundation; either version 2 of the License, or
@ -33,7 +33,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue ;
import java.util.concurrent.LinkedBlockingQueue ;
import java.util.concurrent.TimeUnit ;
import java.util.concurrent.TimeUnit ;
import net.yacy.kelondro.logging.Log ;
import net.yacy.kelondro.logging.Log ;
import net.yacy.kelondro.util.NamePrefixThreadFactory ;
import net.yacy.kelondro.util.NamePrefixThreadFactory ;
@ -50,9 +49,9 @@ public class WorkflowProcessor<J extends WorkflowJob> {
private final Object environment ;
private final Object environment ;
private final String processName , methodName , description ;
private final String processName , methodName , description ;
private final String [ ] childs ;
private final String [ ] childs ;
private long blockTime , execTime , passOnTime ;
private long blockTime , execTime , passOnTime ;
private long execCount ;
private long execCount ;
public WorkflowProcessor (
public WorkflowProcessor (
final String name , final String description , final String [ ] childnames ,
final String name , final String description , final String [ ] childnames ,
final Object env , final String jobExecMethod ,
final Object env , final String jobExecMethod ,
@ -71,57 +70,57 @@ public class WorkflowProcessor<J extends WorkflowJob> {
this . executor . submit ( new InstantBlockingThread < J > ( env , jobExecMethod , this ) ) ;
this . executor . submit ( new InstantBlockingThread < J > ( env , jobExecMethod , this ) ) ;
}
}
// init statistics
// init statistics
blockTime = 0 ;
this . blockTime = 0 ;
execTime = 0 ;
this . execTime = 0 ;
passOnTime = 0 ;
this . passOnTime = 0 ;
execCount = 0 ;
this . execCount = 0 ;
// store this object for easy monitoring
// store this object for easy monitoring
processMonitor . add ( this ) ;
processMonitor . add ( this ) ;
}
}
public int queueSize ( ) {
public int queueSize ( ) {
return this . input . size ( ) ;
return this . input . size ( ) ;
}
}
public boolean queueIsEmpty ( ) {
public boolean queueIsEmpty ( ) {
return this . input . isEmpty ( ) ;
return this . input . isEmpty ( ) ;
}
}
public int queueSizeMax ( ) {
public int queueSizeMax ( ) {
return this . input . size ( ) + this . input . remainingCapacity ( ) ;
return this . input . size ( ) + this . input . remainingCapacity ( ) ;
}
}
public int concurrency ( ) {
public int concurrency ( ) {
return this . poolsize ;
return this . poolsize ;
}
}
public J take ( ) throws InterruptedException {
public J take ( ) throws InterruptedException {
// read from the input queue
// read from the input queue
if ( this . input = = null ) return null ;
if ( this . input = = null ) return null ;
long t = System . currentTimeMillis ( ) ;
final long t = System . currentTimeMillis ( ) ;
J j = this . input . take ( ) ;
final J j = this . input . take ( ) ;
this . blockTime + = System . currentTimeMillis ( ) - t ;
this . blockTime + = System . currentTimeMillis ( ) - t ;
return j ;
return j ;
}
}
public void passOn ( final J next ) throws InterruptedException {
public void passOn ( final J next ) throws InterruptedException {
// don't mix this method up with enQueue()!
// don't mix this method up with enQueue()!
// this method enqueues into the _next_ queue, not this queue!
// this method enqueues into the _next_ queue, not this queue!
if ( this . output = = null ) return ;
if ( this . output = = null ) return ;
long t = System . currentTimeMillis ( ) ;
final long t = System . currentTimeMillis ( ) ;
this . output . enQueue ( next ) ;
this . output . enQueue ( next ) ;
this . passOnTime + = System . currentTimeMillis ( ) - t ;
this . passOnTime + = System . currentTimeMillis ( ) - t ;
}
}
public void clear ( ) {
public void clear ( ) {
if ( this . input ! = null ) this . input . clear ( ) ;
if ( this . input ! = null ) this . input . clear ( ) ;
}
}
public synchronized void relaxCapacity ( ) {
public synchronized void relaxCapacity ( ) {
if ( this . input . isEmpty ( ) ) return ;
if ( this . input . isEmpty ( ) ) return ;
if ( this . input . remainingCapacity ( ) > 1000 ) return ;
if ( this . input . remainingCapacity ( ) > 1000 ) return ;
BlockingQueue < J > i = new LinkedBlockingQueue < J > ( ) ;
final BlockingQueue < J > i = new LinkedBlockingQueue < J > ( ) ;
J e ;
J e ;
while ( ! this . input . isEmpty ( ) ) {
while ( ! this . input . isEmpty ( ) ) {
e = this . input . poll ( ) ;
e = this . input . poll ( ) ;
@ -130,15 +129,15 @@ public class WorkflowProcessor<J extends WorkflowJob> {
}
}
this . input = i ;
this . input = i ;
}
}
@SuppressWarnings ( "unchecked" )
@SuppressWarnings ( "unchecked" )
public void enQueue ( final J in ) throws InterruptedException {
public void enQueue ( final J in ) throws InterruptedException {
// ensure that enough job executors are running
// ensure that enough job executors are running
if ( ( this . input = = null ) | | ( executor = = null ) | | ( executor . isShutdown ( ) ) | | ( executor. isTerminated ( ) ) ) {
if ( ( this . input = = null ) | | ( this . executor = = null ) | | ( this . executor . isShutdown ( ) ) | | ( this . executor. isTerminated ( ) ) ) {
// execute serialized without extra thread
// execute serialized without extra thread
Log . logWarning ( "PROCESSOR" , "executing job " + environment . getClass ( ) . getName ( ) + "." + methodName + " serialized" ) ;
//Log.logWarning("PROCESSOR", "executing job " + environment.getClass().getName() + "." + methodName + " serialized");
try {
try {
final J out = ( J ) InstantBlockingThread . execMethod ( this . environment , this . methodName ) . invoke ( environment, new Object [ ] { in } ) ;
final J out = ( J ) InstantBlockingThread . execMethod ( this . environment , this . methodName ) . invoke ( this . environment, new Object [ ] { in } ) ;
if ( out ! = null & & this . output ! = null ) this . output . enQueue ( out ) ;
if ( out ! = null & & this . output ! = null ) this . output . enQueue ( out ) ;
} catch ( final IllegalArgumentException e ) {
} catch ( final IllegalArgumentException e ) {
Log . logException ( e ) ;
Log . logException ( e ) ;
@ -154,41 +153,41 @@ public class WorkflowProcessor<J extends WorkflowJob> {
try {
try {
this . input . put ( in ) ;
this . input . put ( in ) ;
break ;
break ;
} catch ( InterruptedException e ) {
} catch ( final InterruptedException e ) {
try { Thread . sleep ( 10 ) ; } catch ( InterruptedException ee ) { }
try { Thread . sleep ( 10 ) ; } catch ( final InterruptedException ee ) { }
}
}
}
}
}
}
@SuppressWarnings ( "unchecked" )
@SuppressWarnings ( "unchecked" )
public void announceShutdown ( ) {
public void announceShutdown ( ) {
if ( executor = = null ) return ;
if ( this . executor = = null ) return ;
if ( executor. isShutdown ( ) ) return ;
if ( this . executor. isShutdown ( ) ) return ;
// before we put pills into the queue, make sure that they will take them
// before we put pills into the queue, make sure that they will take them
relaxCapacity ( ) ;
relaxCapacity ( ) ;
// put poison pills into the queue
// put poison pills into the queue
for ( int i = 0 ; i < poolsize; i + + ) {
for ( int i = 0 ; i < this . poolsize; i + + ) {
try {
try {
Log . logInfo ( "serverProcessor" , "putting poison pill in queue " + this . processName + ", thread " + i ) ;
Log . logInfo ( "serverProcessor" , "putting poison pill in queue " + this . processName + ", thread " + i ) ;
input. put ( ( J ) WorkflowJob . poisonPill ) ; // put a poison pill into the queue which will kill the job
this . input. put ( ( J ) WorkflowJob . poisonPill ) ; // put a poison pill into the queue which will kill the job
Log . logInfo ( "serverProcessor" , ".. poison pill is in queue " + this . processName + ", thread " + i + ". awaiting termination" ) ;
Log . logInfo ( "serverProcessor" , ".. poison pill is in queue " + this . processName + ", thread " + i + ". awaiting termination" ) ;
} catch ( final InterruptedException e ) { }
} catch ( final InterruptedException e ) { }
}
}
}
}
public void awaitShutdown ( final long millisTimeout ) {
public void awaitShutdown ( final long millisTimeout ) {
if ( executor ! = null & ! executor. isShutdown ( ) ) {
if ( this . executor ! = null & ! this . executor. isShutdown ( ) ) {
// wait for shutdown
// wait for shutdown
try {
try {
executor. shutdown ( ) ;
this . executor. shutdown ( ) ;
executor. awaitTermination ( millisTimeout , TimeUnit . MILLISECONDS ) ;
this . executor. awaitTermination ( millisTimeout , TimeUnit . MILLISECONDS ) ;
} catch ( final InterruptedException e ) { }
} catch ( final InterruptedException e ) { }
}
}
Log . logInfo ( "serverProcessor" , "queue " + this . processName + ": shutdown." ) ;
Log . logInfo ( "serverProcessor" , "queue " + this . processName + ": shutdown." ) ;
this . executor = null ;
this . executor = null ;
this . input = null ;
this . input = null ;
// remove entry from monitor
// remove entry from monitor
Iterator < WorkflowProcessor < ? > > i = processes ( ) ;
final Iterator < WorkflowProcessor < ? > > i = processes ( ) ;
WorkflowProcessor < ? > p ;
WorkflowProcessor < ? > p ;
while ( i . hasNext ( ) ) {
while ( i . hasNext ( ) ) {
p = i . next ( ) ;
p = i . next ( ) ;
@ -198,59 +197,59 @@ public class WorkflowProcessor<J extends WorkflowJob> {
}
}
}
}
}
}
public static Iterator < WorkflowProcessor < ? > > processes ( ) {
public static Iterator < WorkflowProcessor < ? > > processes ( ) {
return processMonitor . iterator ( ) ;
return processMonitor . iterator ( ) ;
}
}
protected void increaseJobTime ( final long time ) {
protected void increaseJobTime ( final long time ) {
this . execTime + = time ;
this . execTime + = time ;
this . execCount + + ;
this . execCount + + ;
}
}
public String getName ( ) {
public String getName ( ) {
return this . processName ;
return this . processName ;
}
}
public String getDescription ( ) {
public String getDescription ( ) {
return this . description ;
return this . description ;
}
}
public String getChilds ( ) {
public String getChilds ( ) {
StringBuilder s = new StringBuilder ( this . childs . length * 40 + 1 ) ;
final StringBuilder s = new StringBuilder ( this . childs . length * 40 + 1 ) ;
for ( int i = 0 ; i < this . childs . length ; i + + ) {
for ( final String child : this . childs ) {
s . append ( this . childs [ i ] ) ;
s . append ( child ) ;
s . append ( ' ' ) ;
s . append ( ' ' ) ;
}
}
return s . toString ( ) ;
return s . toString ( ) ;
}
}
/ * *
/ * *
* the block time is the time that a take ( ) blocks until it gets a value
* the block time is the time that a take ( ) blocks until it gets a value
* @return
* @return
* /
* /
public long getBlockTime ( ) {
public long getBlockTime ( ) {
return blockTime;
return this . blockTime;
}
}
/ * *
/ * *
* the exec time is the complete time of the execution and processing of the value from take ( )
* the exec time is the complete time of the execution and processing of the value from take ( )
* @return
* @return
* /
* /
public long getExecTime ( ) {
public long getExecTime ( ) {
return execTime;
return this . execTime;
}
}
public long getExecCount ( ) {
public long getExecCount ( ) {
return execCount;
return this . execCount;
}
}
/ * *
/ * *
* the passOn time is the time that a put ( ) takes to enqueue a result value to the next queue
* the passOn time is the time that a put ( ) takes to enqueue a result value to the next queue
* in case that the target queue is limited and may be full , this value may increase
* in case that the target queue is limited and may be full , this value may increase
* @return
* @return
* /
* /
public long getPassOnTime ( ) {
public long getPassOnTime ( ) {
return passOnTime;
return this . passOnTime;
}
}
}
}