@ -1,49 +1,48 @@
package de.anomic.plasma.crawler ;
import org.apache.commons.pool.impl.Generic ObjectPool;
import org.apache.commons.pool.impl.Generic Keyed ObjectPool;
import de.anomic.plasma.crawler.http.CrawlWorker ;
import de.anomic.server.logging.serverLog ;
public final class plasmaCrawlerPool extends Generic ObjectPool {
public final class plasmaCrawlerPool extends Generic Keyed ObjectPool {
private final ThreadGroup theThreadGroup ;
public boolean isClosed = false ;
public plasmaCrawlerPool ( plasmaCrawlerFactory objFactory ,
GenericObjectPool . Config config ,
ThreadGroup threadGroup ) {
public plasmaCrawlerPool ( plasmaCrawlerFactory objFactory , GenericKeyedObjectPool . Config config , ThreadGroup threadGroup ) {
super ( objFactory , config ) ;
this . theThreadGroup = threadGroup ;
objFactory . setPool ( this ) ;
}
public Object borrowObject ( ) throws Exception {
return super . borrowObject ( ) ;
public Object borrowObject ( Object key ) throws Exception {
return super . borrowObject ( key ) ;
}
public void returnObject ( Object obj) {
public void returnObject ( Object key, Object obj) {
if ( obj = = null ) return ;
if ( obj instanceof plasma CrawlWorker) {
if ( obj instanceof CrawlWorker) {
try {
( ( plasma CrawlWorker) obj ) . setName ( plasma CrawlWorker. threadBaseName + "_inPool" ) ;
super . returnObject ( obj) ;
( ( CrawlWorker) obj ) . setName ( CrawlWorker. threadBaseName + "_inPool" ) ;
super . returnObject ( key, obj) ;
} catch ( Exception e ) {
( ( plasma CrawlWorker) obj ) . setStopped ( true ) ;
( ( CrawlWorker) obj ) . setStopped ( true ) ;
serverLog . logSevere ( "CRAWLER-POOL" , "Unable to return crawler thread to pool." , e ) ;
}
} else {
serverLog . logSevere ( "CRAWLER-POOL" , "Object of wron t type '" + obj . getClass ( ) . getName ( ) +
serverLog . logSevere ( "CRAWLER-POOL" , "Object of wron g type '" + obj . getClass ( ) . getName ( ) +
"' returned to pool." ) ;
}
}
public void invalidateObject ( Object obj) {
public void invalidateObject ( Object key, Object obj) {
if ( obj = = null ) return ;
if ( this . isClosed ) return ;
if ( obj instanceof plasma CrawlWorker) {
if ( obj instanceof CrawlWorker) {
try {
( ( plasma CrawlWorker) obj ) . setName ( plasma CrawlWorker. threadBaseName + "_invalidated" ) ;
( ( plasma CrawlWorker) obj ) . setStopped ( true ) ;
super . invalidateObject ( obj) ;
( ( CrawlWorker) obj ) . setName ( CrawlWorker. threadBaseName + "_invalidated" ) ;
( ( CrawlWorker) obj ) . setStopped ( true ) ;
super . invalidateObject ( key, obj) ;
} catch ( Exception e ) {
serverLog . logSevere ( "CRAWLER-POOL" , "Unable to invalidate crawling thread." , e ) ;
}
@ -65,11 +64,11 @@ public final class plasmaCrawlerPool extends GenericObjectPool {
// signaling shutdown to all still running or pooled threads ...
serverLog . logInfo ( "CRAWLER" , "Signaling shutdown to " + threadCount + " remaining crawler threads ..." ) ;
for ( int currentThreadIdx = 0 ; currentThreadIdx < threadCount ; currentThreadIdx + + ) {
( ( plasma CrawlWorker) threadList [ currentThreadIdx ] ) . setStopped ( true ) ;
( ( CrawlWorker) threadList [ currentThreadIdx ] ) . setStopped ( true ) ;
}
// giving the crawlers some time to finish shutdown
try { Thread . sleep ( 500 ) ; } catch ( Exception e ) { }
try { Thread . sleep ( 500 ) ; } catch ( Exception e ) { /* Ignore this. Shutdown in progress */ }
// sending interrupted signal to all remaining threads
serverLog . logInfo ( "CRAWLER" , "Sending interruption signal to " + this . theThreadGroup . activeCount ( ) + " remaining crawler threads ..." ) ;
@ -81,7 +80,7 @@ public final class plasmaCrawlerPool extends GenericObjectPool {
Thread currentThread = threadList [ currentThreadIdx ] ;
if ( currentThread . isAlive ( ) ) {
serverLog . logInfo ( "CRAWLER" , "Trying to shutdown crawler thread '" + currentThread . getName ( ) + "' [" + currentThreadIdx + "]." ) ;
( ( plasma CrawlWorker) currentThread ) . close ( ) ;
( ( CrawlWorker) currentThread ) . close ( ) ;
}
}
@ -90,7 +89,7 @@ public final class plasmaCrawlerPool extends GenericObjectPool {
Thread currentThread = threadList [ currentThreadIdx ] ;
if ( currentThread . isAlive ( ) ) {
serverLog . logInfo ( "CRAWLER" , "Waiting for crawler thread '" + currentThread . getName ( ) + "' [" + currentThreadIdx + "] to finish shutdown." ) ;
try { currentThread . join ( 500 ) ; } catch ( InterruptedException ex ) { }
try { currentThread . join ( 500 ) ; } catch ( InterruptedException ex ) { /* Ignore this. Shutdown in progress */ }
}
}
serverLog . logWarning ( "CRAWLER" , "Shutdown of remaining crawler threads finish." ) ;