@ -31,8 +31,10 @@ import java.io.IOException;
import java.net.MalformedURLException ;
import java.util.ArrayList ;
import java.util.Date ;
import java.util.HashMap ;
import java.util.Iterator ;
import java.util.Map ;
import java.util.concurrent.ArrayBlockingQueue ;
import java.util.concurrent.ConcurrentHashMap ;
import net.yacy.cora.document.encoding.ASCII ;
@ -62,10 +64,12 @@ import net.yacy.search.index.ErrorCache;
public class CrawlQueues {
private final static Request POISON_REQUEST = new Request ( ) ;
private final static ConcurrentLog log = new ConcurrentLog ( "CRAWLER" ) ;
private Switchboard sb ;
private Map < Integer , Loader > workers ; // mapping from url hash to Worker thread object
private final Switchboard sb ;
private final Loader [ ] worker ;
private final ArrayBlockingQueue < Request > workerQueue ;
private final ArrayList < String > remoteCrawlProviderHashes ;
public NoticedURL noticeURL ;
@ -74,7 +78,9 @@ public class CrawlQueues {
public CrawlQueues ( final Switchboard sb , final File queuePath ) {
this . sb = sb ;
this . workers = new ConcurrentHashMap < Integer , Loader > ( ) ;
final int maxWorkers = ( int ) sb . getConfigLong ( SwitchboardConstants . CRAWLER_THREADS_ACTIVE_MAX , 10 ) ;
this . worker = new Loader [ maxWorkers ] ;
this . workerQueue = new ArrayBlockingQueue < Request > ( 200 ) ;
this . remoteCrawlProviderHashes = new ArrayList < String > ( ) ;
// start crawling management
@ -82,12 +88,12 @@ public class CrawlQueues {
this . noticeURL = new NoticedURL ( queuePath , sb . useTailCache , sb . exceed134217727 ) ;
this . errorURL = new ErrorCache ( sb . index . fulltext ( ) ) ;
this . delegatedURL = new ConcurrentHashMap < String , DigestURL > ( ) ;
}
public void relocate ( final File newQueuePath ) {
close ( ) ;
this . workers = new ConcurrentHashMap < Integer , Loader > ( ) ;
this . remoteCrawlProviderHashes . clear ( ) ;
this . noticeURL = new NoticedURL ( newQueuePath , this . sb . useTailCache , this . sb . exceed134217727 ) ;
@ -97,14 +103,17 @@ public class CrawlQueues {
public synchronized void close ( ) {
// wait for all workers to finish
for ( final Loader w : this . workers . values ( ) ) {
w . interrupt ( ) ;
for ( int i = 0 ; i < this . worker . length ; i + + ) {
try { this . workerQueue . put ( POISON_REQUEST ) ; } catch ( InterruptedException e ) { }
}
for ( final Loader w : this . workers . values ( ) ) {
try {
w . join ( ) ;
} catch ( final InterruptedException e ) {
ConcurrentLog . logException ( e ) ;
for ( final Loader w : this . worker ) {
if ( w ! = null & & w . isAlive ( ) ) {
try {
w . join ( 1000 ) ;
if ( w . isAlive ( ) ) w . interrupt ( ) ;
} catch ( final InterruptedException e ) {
ConcurrentLog . logException ( e ) ;
}
}
}
this . noticeURL . close ( ) ;
@ -113,9 +122,9 @@ public class CrawlQueues {
public void clear ( ) {
// wait for all workers to finish
for ( final Loader w : this . workers . values ( ) ) w . interrupt ( ) ;
for ( final Loader w : this . worker s. values ( ) ) try { w . join ( 10 ) ; } catch ( final InterruptedException e1 ) { }
this. workers . clear ( ) ;
this. workerQueue . clear ( ) ;
for ( final Loader w : this . worker ) w . interrupt ( ) ;
for ( final Loader w : this . worker ) try { w . join ( 10 ) ; } catch ( final InterruptedException e1 ) { }
this . remoteCrawlProviderHashes . clear ( ) ;
this . noticeURL . clear ( ) ;
this . delegatedURL . clear ( ) ;
@ -135,9 +144,9 @@ public class CrawlQueues {
}
//if (this.noticeURL.existsInStack(hash)) {
// return HarvestProcess.CRAWLER;
//} // this is disabled because it prevents proper t crawling of smb shares. The cause is unknown
for ( final Loader worker : this . workers . values ( ) ) {
if ( Base64Order . enhancedCoder . equal ( worker. request. url ( ) . hash ( ) , hash ) ) {
//} // this is disabled because it prevents proper crawling of smb shares. The cause is unknown
for ( final Request request : activeWorkerEntries ( ) . values ( ) ) {
if ( Base64Order . enhancedCoder . equal ( request. url ( ) . hash ( ) , hash ) ) {
return HarvestProcess . WORKER ;
}
}
@ -152,14 +161,9 @@ public class CrawlQueues {
public int hostcount ( final String host ) {
if ( host = = null | | host . length ( ) = = 0 ) return 0 ;
int c = 0 ;
final int timeout = ( int ) this . sb . getConfigLong ( "crawler.clientTimeout" , 10000 ) ;
for ( final Loader worker : this . workers . values ( ) ) {
if ( worker . isAlive ( ) ) {
if ( worker . age ( ) > timeout ) {
try { worker . interrupt ( ) ; } catch ( Throwable e ) { }
} else if ( host . equals ( worker . request . url ( ) . getHost ( ) ) ) {
c + + ;
}
for ( final DigestURL url : activeWorkerEntries ( ) . keySet ( ) ) {
if ( host . equals ( url . getHost ( ) ) ) {
c + + ;
}
}
return c ;
@ -180,9 +184,9 @@ public class CrawlQueues {
if ( u ! = null ) {
return u ;
}
for ( final Loader w : this . workers . values ( ) ) {
if ( Base64Order . enhancedCoder . equal ( w. request . url( ) . hash ( ) , urlhash ) ) {
return w. request . url( ) ;
for ( final DigestURL url : activeWorkerEntries ( ) . keySet ( ) ) {
if ( Base64Order . enhancedCoder . equal ( url. hash ( ) , urlhash ) ) {
return url;
}
}
final Request ne = this . noticeURL . get ( urlhash ) ;
@ -192,16 +196,6 @@ public class CrawlQueues {
return null ;
}
public void cleanup ( ) {
// wait for all workers to finish
final int timeout = ( int ) this . sb . getConfigLong ( "crawler.clientTimeout" , 10000 ) ;
for ( final Loader w : this . workers . values ( ) ) {
if ( w . isAlive ( ) & & w . age ( ) > timeout ) {
try { w . interrupt ( ) ; } catch ( Throwable e ) { }
}
}
}
public void freemem ( ) {
if ( ( this . errorURL . stackSize ( ) > 1 ) ) {
log . warn ( "freemem: Cleaning Error-URLs report stack, "
@ -211,17 +205,16 @@ public class CrawlQueues {
}
}
public Request[ ] activeWorkerEntries ( ) {
synchronized ( this . worker s ) {
final Request [ ] e = new Request [ this . workers . size ( ) ] ;
int i = 0 ;
for ( final Loader w : this . workers . values ( ) ) {
if ( i > = e . length ) {
break ;
public Map< DigestURL , Request > activeWorkerEntries ( ) {
synchronized ( this . worker ) {
Map < DigestURL , Request > map = new HashMap < DigestURL , Request > ( ) ;
for ( final Loader w : this . worker ) {
if ( w ! = null ) {
Request r = w . loading ( ) ;
if ( r ! = null ) map . put ( r . url ( ) , r ) ;
}
e [ i + + ] = w . request ;
}
return e ;
return map ;
}
}
@ -343,18 +336,15 @@ public class CrawlQueues {
if ( urlEntry = = null | | urlEntry . url ( ) = = null ) {
CrawlQueues . log . info ( stats + ": urlEntry = null" ) ;
} else {
if ( ! this . workers . containsKey ( Integer . valueOf ( urlEntry . hashCode ( ) ) ) ) {
Loader loader = new Loader ( urlEntry ) ;
this . workers . put ( loader . code , loader ) ;
if ( ! activeWorkerEntries ( ) . containsKey ( urlEntry . url ( ) ) ) {
try {
loader. start ( ) ;
} catch ( final OutOfMemoryError e ) {
ConcurrentLog . warn ( "CrawlQueues" , "crawlWorker sequential fail-over: " + e . getMessage ( ) ) ;
loader. run ( ) ;
ensureLoaderRunning ( ) ;
this . workerQueue . put ( urlEntry ) ;
} catch ( InterruptedException e ) {
ConcurrentLog. logException ( e ) ;
}
}
}
} else {
CrawlQueues . log . severe ( "Unsupported protocol in URL '" + url . toString ( ) ) ;
}
@ -395,15 +385,9 @@ public class CrawlQueues {
return "stack is empty" ;
}
// check the worker threads
final int maxWorkers = ( int ) this . sb . getConfigLong ( SwitchboardConstants . CRAWLER_THREADS_ACTIVE_MAX , 10 ) ;
if ( this . workers . size ( ) > = maxWorkers ) {
// too many worker threads, try a cleanup
cleanup ( ) ;
}
// check again
if ( this . worker s. size ( ) > = maxWorkers ) {
return "too many workers active: " + this . worker s . size ( ) ;
if ( this . workerQueue . remainingCapacity ( ) = = 0 ) {
return "too many workers active: " + this . workerQueue . size ( ) ;
}
final String cautionCause = this . sb . onlineCaution ( ) ;
@ -426,14 +410,10 @@ public class CrawlQueues {
return false ;
}
if ( this . workers . size ( ) > = this . sb . getConfigLong ( SwitchboardConstants . CRAWLER_THREADS_ACTIVE_MAX , 20 ) ) {
// try a cleanup
cleanup ( ) ;
}
// check again
if ( this . worker s. size ( ) > = this . sb . getConfigLong ( SwitchboardConstants . CRAWLER_THREADS_ACTIVE_MAX , 20 ) ) {
if ( this . workerQueue . remainingCapacity ( ) = = 0 ) {
if ( CrawlQueues . log . isFine ( ) ) {
CrawlQueues . log . fine ( "remoteCrawlLoaderJob: too many processes in loader queue, dismissed (" + " cacheLoader=" + this . workers . size ( ) + "), httpClients = " + ConnectionInfo . getCount ( ) ) ;
CrawlQueues . log . fine ( "remoteCrawlLoaderJob: too many processes in loader queue, dismissed (" + "workerQueue=" + this . workerQueue . size ( ) + "), httpClients = " + ConnectionInfo . getCount ( ) ) ;
}
return false ;
}
@ -609,86 +589,93 @@ public class CrawlQueues {
}
}
public int workerSize ( ) {
return this . workers . size ( ) ;
private void ensureLoaderRunning ( ) {
// check if there is at least one loader available
for ( int i = 0 ; i < this . worker . length ; i + + ) {
if ( this . worker [ i ] = = null | | ! this . worker [ i ] . isAlive ( ) ) {
this . worker [ i ] = new Loader ( ) ;
this . worker [ i ] . start ( ) ;
return ;
}
if ( this . worker [ i ] . loading ( ) = = null ) return ;
}
}
private final class Loader extends Thread {
private Request request ;
private final Integer code ;
private final long start ;
private final CrawlProfile profile ;
private Loader ( final Request entry ) {
this . start = System . currentTimeMillis ( ) ;
this . request = entry ;
this . request . setStatus ( "worker-initialized" , WorkflowJob . STATUS_INITIATED ) ;
this . code = Integer . valueOf ( entry . hashCode ( ) ) ;
this . setPriority ( Thread . MIN_PRIORITY ) ; // http requests from the crawler should not cause that other functions work worse
this . profile = CrawlQueues . this . sb . crawler . get ( UTF8 . getBytes ( this . request . profileHandle ( ) ) ) ;
this . setName ( "CrawlQueues.Loader(" + entry . url ( ) + ")" ) ;
private Request request = null ;
private Loader ( ) {
}
p rivate long age ( ) {
return System. currentTimeMillis ( ) - this . star t;
public Request loading ( ) {
return request ;
}
@Override
public void run ( ) {
this . setPriority ( Thread . MIN_PRIORITY ) ; // http requests from the crawler should not cause that other functions work worse
try {
// checking robots.txt for http(s) resources
this . request . setStatus ( "worker-checkingrobots" , WorkflowJob . STATUS_STARTED ) ;
RobotsTxtEntry robotsEntry ;
if ( ( this . request . url ( ) . getProtocol ( ) . equals ( "http" ) | | this . request . url ( ) . getProtocol ( ) . equals ( "https" ) ) & &
( robotsEntry = CrawlQueues . this . sb . robots . getEntry ( this . request . url ( ) , this . profile . getAgent ( ) ) ) ! = null & &
robotsEntry . isDisallowed ( this . request . url ( ) ) ) {
//if (log.isFine()) log.logFine("Crawling of URL '" + request.url().toString() + "' disallowed by robots.txt.");
CrawlQueues . this . errorURL . push ( this . request . url ( ) , profile , FailCategory . FINAL_ROBOTS_RULE , "denied by robots.txt" , - 1 ) ;
this . request . setStatus ( "worker-disallowed" , WorkflowJob . STATUS_FINISHED ) ;
} else {
// starting a load from the internet
this . request . setStatus ( "worker-loading" , WorkflowJob . STATUS_RUNNING ) ;
String result = null ;
// load a resource and push queue entry to switchboard queue
// returns null if everything went fine, a fail reason string if a problem occurred
while ( ( request = CrawlQueues . this . workerQueue . take ( ) ) ! = POISON_REQUEST ) {
request . setStatus ( "worker-initialized" , WorkflowJob . STATUS_INITIATED ) ;
this . setName ( "CrawlQueues.Loader(" + request . url ( ) + ")" ) ;
CrawlProfile profile = CrawlQueues . this . sb . crawler . get ( UTF8 . getBytes ( request . profileHandle ( ) ) ) ;
try {
this . request . setStatus ( "loading" , WorkflowJob . STATUS_RUNNING ) ;
final Response response = CrawlQueues . this . sb . loader . load ( this . request , profile = = null ? CacheStrategy . IFEXIST : profile . cacheStrategy ( ) , BlacklistType . CRAWLER , this . profile . getAgent ( ) ) ;
if ( response = = null ) {
this . request . setStatus ( "error" , WorkflowJob . STATUS_FINISHED ) ;
if ( CrawlQueues . log . isFine ( ) ) {
CrawlQueues . log . fine ( "problem loading " + this . request . url ( ) . toString ( ) + ": no content (possibly caused by cache policy)" ) ;
}
result = "no content (possibly caused by cache policy)" ;
// checking robots.txt for http(s) resources
request . setStatus ( "worker-checkingrobots" , WorkflowJob . STATUS_STARTED ) ;
RobotsTxtEntry robotsEntry ;
if ( ( request . url ( ) . getProtocol ( ) . equals ( "http" ) | | request . url ( ) . getProtocol ( ) . equals ( "https" ) ) & &
( robotsEntry = CrawlQueues . this . sb . robots . getEntry ( request . url ( ) , profile . getAgent ( ) ) ) ! = null & &
robotsEntry . isDisallowed ( request . url ( ) ) ) {
//if (log.isFine()) log.logFine("Crawling of URL '" + request.url().toString() + "' disallowed by robots.txt.");
CrawlQueues . this . errorURL . push ( request . url ( ) , profile , FailCategory . FINAL_ROBOTS_RULE , "denied by robots.txt" , - 1 ) ;
request . setStatus ( "worker-disallowed" , WorkflowJob . STATUS_FINISHED ) ;
} else {
this . request . setStatus ( "loaded" , WorkflowJob . STATUS_RUNNING ) ;
final String storedFailMessage = CrawlQueues . this . sb . toIndexer ( response ) ;
this . request . setStatus ( "enqueued-" + ( ( storedFailMessage = = null ) ? "ok" : "fail" ) , WorkflowJob . STATUS_FINISHED ) ;
result = ( storedFailMessage = = null ) ? null : "not enqueued to indexer: " + storedFailMessage ;
}
} catch ( final IOException e ) {
this . request . setStatus ( "error" , WorkflowJob . STATUS_FINISHED ) ;
if ( CrawlQueues . log . isFine ( ) ) {
CrawlQueues . log . fine ( "problem loading " + this . request . url ( ) . toString ( ) + ": " + e . getMessage ( ) ) ;
// starting a load from the internet
request . setStatus ( "worker-loading" , WorkflowJob . STATUS_RUNNING ) ;
String result = null ;
// load a resource and push queue entry to switchboard queue
// returns null if everything went fine, a fail reason string if a problem occurred
try {
request . setStatus ( "loading" , WorkflowJob . STATUS_RUNNING ) ;
final Response response = CrawlQueues . this . sb . loader . load ( request , profile = = null ? CacheStrategy . IFEXIST : profile . cacheStrategy ( ) , BlacklistType . CRAWLER , profile . getAgent ( ) ) ;
if ( response = = null ) {
request . setStatus ( "error" , WorkflowJob . STATUS_FINISHED ) ;
if ( CrawlQueues . log . isFine ( ) ) {
CrawlQueues . log . fine ( "problem loading " + request . url ( ) . toString ( ) + ": no content (possibly caused by cache policy)" ) ;
}
result = "no content (possibly caused by cache policy)" ;
} else {
request . setStatus ( "loaded" , WorkflowJob . STATUS_RUNNING ) ;
final String storedFailMessage = CrawlQueues . this . sb . toIndexer ( response ) ;
request . setStatus ( "enqueued-" + ( ( storedFailMessage = = null ) ? "ok" : "fail" ) , WorkflowJob . STATUS_FINISHED ) ;
result = ( storedFailMessage = = null ) ? null : "not enqueued to indexer: " + storedFailMessage ;
}
} catch ( final IOException e ) {
request . setStatus ( "error" , WorkflowJob . STATUS_FINISHED ) ;
if ( CrawlQueues . log . isFine ( ) ) {
CrawlQueues . log . fine ( "problem loading " + request . url ( ) . toString ( ) + ": " + e . getMessage ( ) ) ;
}
result = "load error - " + e . getMessage ( ) ;
}
if ( result ! = null ) {
CrawlQueues . this . errorURL . push ( request . url ( ) , profile , FailCategory . TEMPORARY_NETWORK_FAILURE , "cannot load: " + result , - 1 ) ;
request . setStatus ( "worker-error" , WorkflowJob . STATUS_FINISHED ) ;
} else {
request . setStatus ( "worker-processed" , WorkflowJob . STATUS_FINISHED ) ;
}
}
result = "load error - " + e . getMessage ( ) ;
}
if ( result ! = null ) {
CrawlQueues . this . errorURL . push ( this . request . url ( ) , profile , FailCategory . TEMPORARY_NETWORK_FAILURE , "cannot load: " + result , - 1 ) ;
this . request . setStatus ( "worker-error" , WorkflowJob . STATUS_FINISHED ) ;
} else {
this . request . setStatus ( "worker-processed" , WorkflowJob . STATUS_FINISHED ) ;
} catch ( final Exception e ) {
CrawlQueues . this . errorURL . push ( request . url ( ) , profile , FailCategory . TEMPORARY_NETWORK_FAILURE , e . getMessage ( ) + " - in worker" , - 1 ) ;
ConcurrentLog . logException ( e ) ;
request . setStatus ( "worker-exception" , WorkflowJob . STATUS_FINISHED ) ;
} finally {
request = null ;
}
}
} catch ( final Exception e ) {
CrawlQueues . this . errorURL . push ( this . request . url ( ) , profile , FailCategory . TEMPORARY_NETWORK_FAILURE , e . getMessage ( ) + " - in worker" , - 1 ) ;
ConcurrentLog . logException ( e ) ;
this . request . setStatus ( "worker-exception" , WorkflowJob . STATUS_FINISHED ) ;
} finally {
CrawlQueues . this . workers . remove ( this . code ) ;
} catch ( InterruptedException e2 ) {
ConcurrentLog . logException ( e2 ) ;
}
}
}