@ -6,12 +6,14 @@ import java.net.InetAddress;
import java.net.MalformedURLException ;
import java.net.URL ;
import java.net.UnknownHostException ;
import java.util.ArrayList ;
import java.util.Date ;
import java.util.Iterator ;
import java.util.LinkedList ;
import org.apache.commons.pool.impl.GenericObjectPool ;
import de.anomic.data.robotsParser ;
import de.anomic.http.httpc ;
import de.anomic.kelondro.kelondroTree ;
import de.anomic.kelondro.kelondroRecords.Node ;
import de.anomic.server.serverCodings ;
@ -22,8 +24,10 @@ import de.anomic.yacy.yacyCore;
public final class plasmaStackCrawlThread extends Thread {
private final serverLog log = new serverLog ( "STACKCRAWL" ) ;
private final plasmaSwitchboard sb ;
final WorkerPool theWorkerPool ;
final ThreadGroup theWorkerThreadGroup = new ThreadGroup ( "stackCrawlThreadGroup" ) ;
final serverLog log = new serverLog ( "STACKCRAWL" ) ;
final plasmaSwitchboard sb ;
private boolean stopped = false ;
private stackCrawlQueue queue ;
@ -34,6 +38,9 @@ public final class plasmaStackCrawlThread extends Thread {
this . queue = new stackCrawlQueue ( dbPath , dbCacheSize ) ;
this . log . logInfo ( this . queue . size ( ) + " entries in the stackCrawl queue." ) ;
this . log . logInfo ( "STACKCRAWL thread initialized." ) ;
this . theWorkerPool = new WorkerPool ( new WorkterFactory ( this . theWorkerThreadGroup ) ) ;
public void stopIt ( ) {
@ -58,22 +65,11 @@ public final class plasmaStackCrawlThread extends Thread {
// getting a new message from the crawler queue
stackCrawlMessage theMsg = this . queue . waitForMessage ( ) ;
// process message
String rejectReason = stackCrawlDequeue ( theMsg ) ;
if ( rejectReason ! = null ) {
this . sb . urlPool . errorURL . newEntry (
new URL ( theMsg . url ( ) ) ,
theMsg . referrerHash ( ) ,
theMsg . initiatorHash ( ) ,
yacyCore . seedDB . mySeed . hash ,
theMsg . name ,
rejectReason ,
new bitfield ( plasmaURL . urlFlagLength ) ,
) ;
// getting a free session thread from the pool
Worker worker = ( Worker ) this . theWorkerPool . borrowObject ( ) ;
// processing the new request
worker . execute ( theMsg ) ;
} catch ( InterruptedException e ) {
Thread . interrupted ( ) ;
this . stopped = true ;
@ -83,6 +79,14 @@ public final class plasmaStackCrawlThread extends Thread {
try {
this . log . logFine ( "Shutdown. Terminationg worker threads." ) ;
this . theWorkerPool . close ( ) ;
} catch ( Exception e1 ) {
this . log . logSevere ( "Unable to shutdown all remaining stackCrawl threads" , e1 ) ;
try {
this . log . logFine ( "Shutdown. Closing stackCrawl queue." ) ;
this . queue . close ( ) ;
@ -141,6 +145,7 @@ public final class plasmaStackCrawlThread extends Thread {
// stacks a crawl item. The position can also be remote
// returns null if successful, a reason string if not successful
long startTime = System . currentTimeMillis ( ) ;
String reason = null ; // failure reason
// strange errors
@ -163,7 +168,8 @@ public final class plasmaStackCrawlThread extends Thread {
nexturl = new URL ( nexturlString ) ;
} catch ( MalformedURLException e ) {
reason = "denied_(url_'" + nexturlString + "'_wrong)" ;
this . log . logSevere ( "Wrong URL in stackCrawl: " + nexturlString ) ;
this . log . logSevere ( "Wrong URL in stackCrawl: " + nexturlString +
". Stack processing time: " + ( System . currentTimeMillis ( ) - startTime ) ) ;
return reason ;
@ -172,16 +178,19 @@ public final class plasmaStackCrawlThread extends Thread {
InetAddress hostAddress = InetAddress . getByName ( nexturl . getHost ( ) ) ;
if ( hostAddress . isSiteLocalAddress ( ) ) {
reason = "denied_(private_ip_address)" ;
this . log . logFine ( "Host in URL '" + nexturlString + "' has private ip address." ) ;
this . log . logFine ( "Host in URL '" + nexturlString + "' has private ip address." +
"Stack processing time: " + ( System . currentTimeMillis ( ) - startTime ) ) ;
return reason ;
} else if ( hostAddress . isLoopbackAddress ( ) ) {
reason = "denied_(loopback_ip_address)" ;
this . log . logFine ( "Host in URL '" + nexturlString + "' has loopback ip address." ) ;
this . log . logFine ( "Host in URL '" + nexturlString + "' has loopback ip address." +
"Stack processing time: " + ( System . currentTimeMillis ( ) - startTime ) ) ;
return reason ;
} catch ( UnknownHostException e ) {
reason = "denied_(unknown_host)" ;
this . log . logFine ( "Unknown host in URL '" + nexturlString + "'." ) ;
this . log . logFine ( "Unknown host in URL '" + nexturlString + "'." +
"Stack processing time: " + ( System . currentTimeMillis ( ) - startTime ) ) ;
return reason ;
@ -189,7 +198,8 @@ public final class plasmaStackCrawlThread extends Thread {
String hostlow = nexturl . getHost ( ) . toLowerCase ( ) ;
if ( plasmaSwitchboard . urlBlacklist . isListed ( hostlow , nexturl . getPath ( ) ) ) {
reason = "denied_(url_in_blacklist)" ;
this . log . logFine ( "URL '" + nexturlString + "' is in blacklist." ) ;
this . log . logFine ( "URL '" + nexturlString + "' is in blacklist." +
"Stack processing time: " + ( System . currentTimeMillis ( ) - startTime ) ) ;
return reason ;
@ -199,7 +209,8 @@ public final class plasmaStackCrawlThread extends Thread {
/ *
urlPool . errorURL . newEntry ( nexturl , referrerHash , initiatorHash , yacyCore . seedDB . mySeed . hash ,
name , reason , new bitfield ( plasmaURL . urlFlagLength ) , false ) ; * /
this . log . logFine ( "URL '" + nexturlString + "' does not match crawling filter '" + profile . generalFilter ( ) + "'." ) ;
this . log . logFine ( "URL '" + nexturlString + "' does not match crawling filter '" + profile . generalFilter ( ) + "'." +
"Stack processing time: " + ( System . currentTimeMillis ( ) - startTime ) ) ;
return reason ;
@ -209,7 +220,8 @@ public final class plasmaStackCrawlThread extends Thread {
/ *
urlPool . errorURL . newEntry ( nexturl , referrerHash , initiatorHash , yacyCore . seedDB . mySeed . hash ,
name , reason , new bitfield ( plasmaURL . urlFlagLength ) , false ) ; * /
this . log . logFine ( "URL '" + nexturlString + "' is cgi URL." ) ;
this . log . logFine ( "URL '" + nexturlString + "' is cgi URL." +
"Stack processing time: " + ( System . currentTimeMillis ( ) - startTime ) ) ;
return reason ;
@ -219,7 +231,8 @@ public final class plasmaStackCrawlThread extends Thread {
/ *
urlPool . errorURL . newEntry ( nexturl , referrerHash , initiatorHash , yacyCore . seedDB . mySeed . hash ,
name , reason , new bitfield ( plasmaURL . urlFlagLength ) , false ) ; * /
this . log . logFine ( "URL '" + nexturlString + "' is post URL." ) ;
this . log . logFine ( "URL '" + nexturlString + "' is post URL." +
"Stack processing time: " + ( System . currentTimeMillis ( ) - startTime ) ) ;
return reason ;
@ -231,7 +244,8 @@ public final class plasmaStackCrawlThread extends Thread {
/ *
urlPool . errorURL . newEntry ( nexturl , referrerHash , initiatorHash , yacyCore . seedDB . mySeed . hash ,
name , reason , new bitfield ( plasmaURL . urlFlagLength ) , false ) ; * /
this . log . logFine ( "URL '" + nexturlString + "' is double registered in '" + dbocc + "'." ) ;
this . log . logFine ( "URL '" + nexturlString + "' is double registered in '" + dbocc + "'." +
"Stack processing time: " + ( System . currentTimeMillis ( ) - startTime ) ) ;
return reason ;
@ -241,7 +255,8 @@ public final class plasmaStackCrawlThread extends Thread {
/ *
urlPool . errorURL . newEntry ( nexturl , referrerHash , initiatorHash , yacyCore . seedDB . mySeed . hash ,
name , reason , new bitfield ( plasmaURL . urlFlagLength ) , false ) ; * /
this . log . logFine ( "Crawling of URL '" + nexturlString + "' disallowed by robots.txt." ) ;
this . log . logFine ( "Crawling of URL '" + nexturlString + "' disallowed by robots.txt." +
"Stack processing time: " + ( System . currentTimeMillis ( ) - startTime ) ) ;
return reason ;
@ -256,7 +271,7 @@ public final class plasmaStackCrawlThread extends Thread {
( yacyCore . seedDB . mySeed . isPrincipal ( ) ) ) /* qualified */ ;
if ( ( ! local ) & & ( ! global ) ) {
this . log . log Fin e( "URL '" + nexturlString + "' can neither be crawled local nor global." ) ;
this . log . log Sever e( "URL '" + nexturlString + "' can neither be crawled local nor global." ) ;
this . sb . urlPool . noticeURL . newEntry ( initiatorHash , /* initiator, needed for p2p-feedback */
@ -507,7 +522,7 @@ public final class plasmaStackCrawlThread extends Thread {
byte [ ] [ ] entryBytes = null ;
stackCrawlMessage newMessage = null ;
synchronized ( this . urlEntryHashCache ) {
urlHash = ( String ) this . urlEntryHashCache . remove La st( ) ;
urlHash = ( String ) this . urlEntryHashCache . remove Fir st( ) ;
entryBytes = this . urlEntryCache . remove ( urlHash . getBytes ( ) ) ;
@ -518,4 +533,262 @@ public final class plasmaStackCrawlThread extends Thread {
public final class WorkterFactory implements org . apache . commons . pool . PoolableObjectFactory {
final ThreadGroup workerThreadGroup ;
public WorkterFactory ( ThreadGroup theWorkerThreadGroup ) {
super ( ) ;
if ( theWorkerThreadGroup = = null )
throw new IllegalArgumentException ( "The threadgroup object must not be null." ) ;
this . workerThreadGroup = theWorkerThreadGroup ;
public Object makeObject ( ) {
Worker newWorker = new Worker ( this . workerThreadGroup ) ;
newWorker . setPriority ( Thread . MAX_PRIORITY ) ;
return newWorker ;
/ * *
* @see org . apache . commons . pool . PoolableObjectFactory # destroyObject ( java . lang . Object )
* /
public void destroyObject ( Object obj ) {
if ( obj instanceof Worker ) {
Worker theWorker = ( Worker ) obj ;
theWorker . setStopped ( true ) ;
/ * *
* @see org . apache . commons . pool . PoolableObjectFactory # validateObject ( java . lang . Object )
* /
public boolean validateObject ( Object obj ) {
if ( obj instanceof Worker )
Worker theWorker = ( Worker ) obj ;
if ( ! theWorker . isAlive ( ) | | theWorker . isInterrupted ( ) ) return false ;
if ( theWorker . isRunning ( ) ) return true ;
return false ;
return true ;
/ * *
* @param obj
* /
public void activateObject ( Object obj ) {
//log.debug(" activateObject...");
/ * *
* @param obj
* /
public void passivateObject ( Object obj ) {
//log.debug(" passivateObject..." + obj);
// if (obj instanceof Session) {
// Session theSession = (Session) obj;
// }
public final class WorkerPool extends GenericObjectPool {
public boolean isClosed = false ;
/ * *
* First constructor .
* @param objFactory
* /
public WorkerPool ( WorkterFactory objFactory ) {
super ( objFactory ) ;
this . setMaxIdle ( 10 ) ; // Maximum idle threads.
this . setMaxActive ( 50 ) ; // Maximum active threads.
this . setMinEvictableIdleTimeMillis ( 30000 ) ; //Evictor runs every 30 secs.
//this.setMaxWait(1000); // Wait 1 second till a thread is available
public WorkerPool ( plasmaStackCrawlThread . WorkterFactory objFactory ,
GenericObjectPool . Config config ) {
super ( objFactory , config ) ;
public Object borrowObject ( ) throws Exception {
return super . borrowObject ( ) ;
public void returnObject ( Object obj ) throws Exception {
super . returnObject ( obj ) ;
public synchronized void close ( ) throws Exception {
/ *
* shutdown all still running session threads . . .
* /
this . isClosed = true ;
/* waiting for all threads to finish */
int threadCount = plasmaStackCrawlThread . this . theWorkerThreadGroup . activeCount ( ) ;
Thread [ ] threadList = new Thread [ threadCount ] ;
threadCount = plasmaStackCrawlThread . this . theWorkerThreadGroup . enumerate ( threadList ) ;
try {
// trying to gracefull stop all still running sessions ...
plasmaStackCrawlThread . this . log . logInfo ( "Signaling shutdown to " + threadCount + " remaining stackCrawl threads ..." ) ;
for ( int currentThreadIdx = 0 ; currentThreadIdx < threadCount ; currentThreadIdx + + ) {
Thread currentThread = threadList [ currentThreadIdx ] ;
if ( currentThread . isAlive ( ) ) {
( ( Worker ) currentThread ) . setStopped ( true ) ;
// waiting a frew ms for the session objects to continue processing
try { Thread . sleep ( 500 ) ; } catch ( InterruptedException ex ) { }
// interrupting all still running or pooled threads ...
plasmaStackCrawlThread . this . log . logInfo ( "Sending interruption signal to " + plasmaStackCrawlThread . this . theWorkerThreadGroup . activeCount ( ) + " remaining stackCrawl threads ..." ) ;
plasmaStackCrawlThread . this . theWorkerThreadGroup . interrupt ( ) ;
// if there are some sessions that are blocking in IO, we simply close the socket
plasmaStackCrawlThread . this . log . logFine ( "Trying to abort " + plasmaStackCrawlThread . this . theWorkerThreadGroup . activeCount ( ) + " remaining stackCrawl threads ..." ) ;
for ( int currentThreadIdx = 0 ; currentThreadIdx < threadCount ; currentThreadIdx + + ) {
Thread currentThread = threadList [ currentThreadIdx ] ;
if ( currentThread . isAlive ( ) ) {
plasmaStackCrawlThread . this . log . logInfo ( "Trying to shutdown stackCrawl thread '" + currentThread . getName ( ) + "' [" + currentThreadIdx + "]." ) ;
( ( Worker ) currentThread ) . close ( ) ;
// we need to use a timeout here because of missing interruptable session threads ...
plasmaStackCrawlThread . this . log . logFine ( "Waiting for " + plasmaStackCrawlThread . this . theWorkerThreadGroup . activeCount ( ) + " remaining stackCrawl threads to finish shutdown ..." ) ;
for ( int currentThreadIdx = 0 ; currentThreadIdx < threadCount ; currentThreadIdx + + ) {
Thread currentThread = threadList [ currentThreadIdx ] ;
if ( currentThread . isAlive ( ) ) {
plasmaStackCrawlThread . this . log . logFine ( "Waiting for stackCrawl thread '" + currentThread . getName ( ) + "' [" + currentThreadIdx + "] to finish shutdown." ) ;
try { currentThread . join ( 500 ) ; } catch ( InterruptedException ex ) { }
plasmaStackCrawlThread . this . log . logInfo ( "Shutdown of remaining stackCrawl threads finish." ) ;
} catch ( Exception e ) {
plasmaStackCrawlThread . this . log . logSevere ( "Unexpected error while trying to shutdown all remaining stackCrawl threads." , e ) ;
super . close ( ) ;
public final class Worker extends Thread {
private boolean running = false ;
private boolean stopped = false ;
private boolean done = false ;
private stackCrawlMessage theMsg ;
public Worker ( ThreadGroup theThreadGroup ) {
super ( theThreadGroup , "stackCrawlThread" ) ;
public void setStopped ( boolean stopped ) {
this . stopped = stopped ;
public void close ( ) {
if ( this . isAlive ( ) ) {
try {
// trying to close all still open httpc-Sockets first
int closedSockets = httpc . closeOpenSockets ( this ) ;
if ( closedSockets > 0 ) {
plasmaStackCrawlThread . this . log . logInfo ( closedSockets + " HTTP-client sockets of thread '" + this . getName ( ) + "' closed." ) ;
} catch ( Exception e ) { }
public synchronized void execute ( stackCrawlMessage newMsg ) {
this . theMsg = newMsg ;
this . done = false ;
if ( ! this . running ) {
// this.setDaemon(true);
this . start ( ) ;
} else {
this . notifyAll ( ) ;
public void reset ( ) {
this . done = true ;
this . theMsg = null ;
public boolean isRunning ( ) {
return this . running ;
public void run ( ) {
this . running = true ;
// The thread keeps running.
while ( ! this . stopped & & ! Thread . interrupted ( ) ) {
if ( this . done ) {
// We are waiting for a task now.
synchronized ( this ) {
try {
this . wait ( ) ; //Wait until we get a request to process.
} catch ( InterruptedException e ) {
this . stopped = true ;
// log.error("", e);
} else {
//There is a task....let us execute it.
try {
execute ( ) ;
} catch ( Exception e ) {
// log.error("", e);
} finally {
reset ( ) ;
if ( ! this . stopped & & ! this . isInterrupted ( ) & & ! plasmaStackCrawlThread . this . theWorkerPool . isClosed ) {
try {
this . setName ( "stackCrawlThread_inPool" ) ;
plasmaStackCrawlThread . this . theWorkerPool . returnObject ( this ) ;
} catch ( Exception e1 ) {
// e1.printStackTrace();
this . stopped = true ;
private void execute ( ) throws InterruptedException {
try {
String rejectReason = stackCrawlDequeue ( this . theMsg ) ;
if ( rejectReason ! = null ) {
plasmaStackCrawlThread . this . sb . urlPool . errorURL . newEntry (
new URL ( this . theMsg . url ( ) ) ,
this . theMsg . referrerHash ( ) ,
this . theMsg . initiatorHash ( ) ,
yacyCore . seedDB . mySeed . hash ,
this . theMsg . name ,
rejectReason ,
new bitfield ( plasmaURL . urlFlagLength ) ,
) ;
} catch ( Exception e ) {
e . printStackTrace ( ) ;
} finally {
this . done = true ;