@ -31,9 +31,11 @@ import java.io.IOException;
import java.net.MalformedURLException ;
import java.text.ParseException ;
import java.util.ArrayList ;
import java.util.Collections ;
import java.util.Date ;
import java.util.HashMap ;
import java.util.Iterator ;
import java.util.Map ;
import de.anomic.data.robotsParser ;
import de.anomic.plasma.plasmaCrawlEntry ;
@ -55,7 +57,7 @@ public class plasmaCrawlQueues {
private plasmaSwitchboard sb ;
private serverLog log ;
private HashMap< String , crawlWorker > workers ; // mapping from url hash to Worker thread object
private Map< Integer , crawlWorker > workers ; // mapping from url hash to Worker thread object
private plasmaProtocolLoader loader ;
private ArrayList < String > remoteCrawlProviderHashes ;
@ -65,7 +67,7 @@ public class plasmaCrawlQueues {
public plasmaCrawlQueues ( plasmaSwitchboard sb , File plasmaPath ) {
this . sb = sb ;
this . log = new serverLog ( "CRAWLER" ) ;
this . workers = new HashMap < String , crawlWorker > ( ) ;
this . workers = Collections . synchronizedMap ( new HashMap < Integer , crawlWorker > ( ) ) ;
this . loader = new plasmaProtocolLoader ( sb , log ) ;
this . remoteCrawlProviderHashes = new ArrayList < String > ( ) ;
@ -85,7 +87,8 @@ public class plasmaCrawlQueues {
if ( noticeURL . existsInStack ( hash ) ) return "crawler" ;
if ( delegatedURL . exists ( hash ) ) return "delegated" ;
if ( errorURL . exists ( hash ) ) return "errors" ;
if ( workers . containsKey ( hash ) ) return "workers" ;
Iterator < crawlWorker > i = workers . values ( ) . iterator ( ) ;
while ( i . hasNext ( ) ) if ( i . next ( ) . entry . url ( ) . hash ( ) . equals ( hash ) ) return "worker" ;
return null ;
}
@ -97,21 +100,25 @@ public class plasmaCrawlQueues {
public yacyURL getURL ( String urlhash ) {
if ( urlhash . equals ( yacyURL . dummyHash ) ) return null ;
crawlWorker w = workers . get ( urlhash ) ;
if ( w ! = null ) return w . entry . url ( ) ;
plasmaCrawlEntry ne = noticeURL . get ( urlhash ) ;
if ( ne ! = null ) return ne . url ( ) ;
plasmaCrawlZURL . Entry ee = delegatedURL . getEntry ( urlhash ) ;
if ( ee ! = null ) return ee . url ( ) ;
ee = errorURL . getEntry ( urlhash ) ;
if ( ee ! = null ) return ee . url ( ) ;
Iterator < crawlWorker > i = workers . values ( ) . iterator ( ) ;
crawlWorker w ;
while ( i . hasNext ( ) ) {
w = i . next ( ) ;
if ( w . entry . url ( ) . hash ( ) . equals ( urlhash ) ) return w . entry . url ( ) ;
}
return null ;
}
public void close ( ) {
// wait for all workers to finish
Iterator < crawlWorker > i = workers . values ( ) . iterator ( ) ;
while ( i . hasNext ( ) ) ( ( Thread ) i . next ( ) ) . interrupt ( ) ;
while ( i . hasNext ( ) ) i . next ( ) . interrupt ( ) ;
// TODO: wait some more time until all threads are finished
noticeURL . close ( ) ;
errorURL . close ( ) ;
@ -419,13 +426,7 @@ public class plasmaCrawlQueues {
log . logInfo ( stats + ": urlEntry = null" ) ;
return ;
}
synchronized ( this . workers ) {
crawlWorker w = new crawlWorker ( entry ) ;
synchronized ( workers ) {
workers . put ( entry . url ( ) . hash ( ) , w ) ;
}
}
new crawlWorker ( entry ) ;
log . logInfo ( stats + ": enqueued for load " + entry . url ( ) + " [" + entry . url ( ) . hash ( ) + "]" ) ;
return ;
@ -459,11 +460,16 @@ public class plasmaCrawlQueues {
protected class crawlWorker extends Thread {
public plasmaCrawlEntry entry ;
private Integer code ;
public crawlWorker ( plasmaCrawlEntry entry ) {
this . entry = entry ;
this . entry . setStatus ( "worker-initialized" ) ;
this . start ( ) ;
this . code = new Integer ( entry . hashCode ( ) ) ;
if ( ! workers . containsKey ( code ) ) {
workers . put ( code , this ) ;
this . start ( ) ;
}
}
public void run ( ) {
@ -493,9 +499,7 @@ public class plasmaCrawlQueues {
errorURL . push ( eentry ) ;
e . printStackTrace ( ) ;
} finally {
synchronized ( workers ) {
workers . remove ( entry . url ( ) . hash ( ) ) ;
}
workers . remove ( code ) ;
this . entry . setStatus ( "worker-finalized" ) ;
}
}