@ -24,7 +24,6 @@
package net.yacy.kelondro.workflow ;
import java.lang.reflect.InvocationTargetException ;
import java.util.ArrayList ;
import java.util.Iterator ;
import java.util.concurrent.BlockingQueue ;
@ -32,6 +31,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors ;
import java.util.concurrent.LinkedBlockingQueue ;
import java.util.concurrent.TimeUnit ;
import java.util.concurrent.atomic.AtomicInteger ;
import net.yacy.kelondro.logging.Log ;
import net.yacy.kelondro.util.NamePrefixThreadFactory ;
@ -43,9 +43,10 @@ public class WorkflowProcessor<J extends WorkflowJob> {
private static final ArrayList < WorkflowProcessor < ? > > processMonitor = new ArrayList < WorkflowProcessor < ? > > ( ) ;
private ExecutorService executor ;
private AtomicInteger executorRunning ;
private BlockingQueue < J > input ;
private final WorkflowProcessor < J > output ;
private final int poolsize;
private final int max poolsize;
private final Object environment ;
private final String processName , methodName , description ;
private final String [ ] childs ;
@ -55,20 +56,25 @@ public class WorkflowProcessor<J extends WorkflowJob> {
public WorkflowProcessor (
final String name , final String description , final String [ ] childnames ,
final Object env , final String jobExecMethod ,
final int inputQueueSize , final WorkflowProcessor < J > output , final int poolsize ) {
final int inputQueueSize , final WorkflowProcessor < J > output ,
final int maxpoolsize ) {
// start a fixed number of executors that handle entries in the process queue
this . environment = env ;
this . processName = name ;
this . description = description ;
this . methodName = jobExecMethod ;
this . childs = childnames ;
this . input = new LinkedBlockingQueue < J > ( inputQueueSize ) ;
this . maxpoolsize = maxpoolsize ;
this . input = new LinkedBlockingQueue < J > ( Math . max ( maxpoolsize + 1 , inputQueueSize ) ) ;
this . output = output ;
this . poolsize = poolsize ;
this . executor = Executors . newCachedThreadPool ( new NamePrefixThreadFactory ( jobExecMethod ) ) ;
for ( int i = 0 ; i < poolsize ; i + + ) {
this . executor . submit ( new InstantBlockingThread < J > ( env , jobExecMethod , this ) ) ;
this . executor = Executors . newCachedThreadPool ( new NamePrefixThreadFactory ( this . methodName ) ) ;
this . executorRunning = new AtomicInteger ( 0 ) ;
/ *
for ( int i = 0 ; i < this . maxpoolsize ; i + + ) {
this . executor . submit ( new InstantBlockingThread < J > ( this ) ) ;
this . executorRunning + + ;
}
* /
// init statistics
this . blockTime = 0 ;
this . execTime = 0 ;
@ -79,6 +85,14 @@ public class WorkflowProcessor<J extends WorkflowJob> {
processMonitor . add ( this ) ;
}
public Object getEnvironment ( ) {
return this . environment ;
}
public String getMethodName ( ) {
return this . methodName ;
}
public int queueSize ( ) {
if ( this . input = = null ) return 0 ;
return this . input . size ( ) ;
@ -94,7 +108,7 @@ public class WorkflowProcessor<J extends WorkflowJob> {
}
public int concurrency ( ) {
return this . poolsize;
return this . max poolsize;
}
public J take ( ) throws InterruptedException {
@ -108,7 +122,7 @@ public class WorkflowProcessor<J extends WorkflowJob> {
return j ;
}
public void passOn ( final J next ) throws InterruptedException {
public void passOn ( final J next ) {
// don't mix this method up with enQueue()!
// this method enqueues into the _next_ queue, not this queue!
if ( this . output = = null ) {
@ -145,9 +159,9 @@ public class WorkflowProcessor<J extends WorkflowJob> {
}
@SuppressWarnings ( "unchecked" )
public void enQueue ( final J in ) throws InterruptedException {
public void enQueue ( final J in ) {
// ensure that enough job executors are running
if ( ( this . input = = null ) | | ( this . executor = = null ) | | ( this . executor . isShutdown ( ) ) | | ( this . executor . isTerminated ( ) ) ) {
if ( this . input = = null | | this . executor = = null | | this . executor . isShutdown ( ) | | this . executor . isTerminated ( ) ) {
// execute serialized without extra thread
//Log.logWarning("PROCESSOR", "executing job " + environment.getClass().getName() + "." + methodName + " serialized");
try {
@ -155,21 +169,23 @@ public class WorkflowProcessor<J extends WorkflowJob> {
if ( out ! = null & & this . output ! = null ) {
this . output . enQueue ( out ) ;
}
} catch ( final IllegalArgumentException e ) {
Log . logException ( e ) ;
} catch ( final IllegalAccessException e ) {
Log . logException ( e ) ;
} catch ( final InvocationTargetException e ) {
} catch ( final Throwable e ) {
Log . logException ( e ) ;
}
return ;
}
}
// execute concurrent in thread
while ( this . input ! = null ) {
try {
if ( this . input . size ( ) > this . executorRunning . get ( ) & & this . executorRunning . get ( ) < this . maxpoolsize ) synchronized ( executor ) {
if ( this . input . size ( ) > this . executorRunning . get ( ) & & this . executorRunning . get ( ) < this . maxpoolsize ) {
this . executorRunning . incrementAndGet ( ) ;
this . executor . submit ( new InstantBlockingThread < J > ( this ) ) ;
}
}
this . input . put ( in ) ;
break ;
} catch ( final InterruptedException e ) {
} catch ( final Throwable e ) {
try { Thread . sleep ( 10 ) ; } catch ( final InterruptedException ee ) { }
}
}
@ -186,7 +202,7 @@ public class WorkflowProcessor<J extends WorkflowJob> {
// before we put pills into the queue, make sure that they will take them
relaxCapacity ( ) ;
// put poison pills into the queue
for ( int i = 0 ; i < this . poolsize ; i + + ) {
for ( int i = 0 ; i < this . executorRunning. get ( ) ; i + + ) {
try {
Log . logInfo ( "serverProcessor" , "putting poison pill in queue " + this . processName + ", thread " + i ) ;
this . input . put ( ( J ) WorkflowJob . poisonPill ) ; // put a poison pill into the queue which will kill the job
@ -200,6 +216,7 @@ public class WorkflowProcessor<J extends WorkflowJob> {
Log . logInfo ( "WorkflowProcess" , "waiting for queue " + this . processName + " to shut down; input.size = " + this . input . size ( ) ) ;
try { Thread . sleep ( 1000 ) ; } catch ( InterruptedException e ) { }
}
this . executorRunning . set ( 0 ) ;
// shut down executors
if ( this . executor ! = null & ! this . executor . isShutdown ( ) ) {