@ -44,11 +44,14 @@ import net.yacy.kelondro.order.Base64Order;
import net.yacy.kelondro.util.FileUtils ;
import net.yacy.kelondro.workflow.WorkflowJob ;
import de.anomic.crawler.NoticedURL.StackType ;
import de.anomic.crawler.retrieval.HTTPLoader ;
import de.anomic.crawler.retrieval.Request ;
import de.anomic.crawler.retrieval.Response ;
import de.anomic.search.Segments ;
import de.anomic.search.Switchboard ;
import de.anomic.search.SwitchboardConstants ;
import de.anomic.search.Switchboard.indexingQueueEntry ;
import de.anomic.yacy.yacyClient ;
import de.anomic.yacy.yacySeed ;
import de.anomic.yacy.dht.PeerSelection ;
@ -60,7 +63,7 @@ public class CrawlQueues {
protected Switchboard sb ;
protected Log log ;
protected Map < Integer , crawlWork er> workers ; // mapping from url hash to Worker thread object
protected Map < Integer , Load er> workers ; // mapping from url hash to Worker thread object
private final ArrayList < String > remoteCrawlProviderHashes ;
public NoticedURL noticeURL ;
@ -69,7 +72,7 @@ public class CrawlQueues {
public CrawlQueues ( final Switchboard sb , final File queuePath ) {
this . sb = sb ;
this . log = new Log ( "CRAWLER" ) ;
this . workers = new ConcurrentHashMap < Integer , crawlWork er> ( ) ;
this . workers = new ConcurrentHashMap < Integer , Load er> ( ) ;
this . remoteCrawlProviderHashes = new ArrayList < String > ( ) ;
// start crawling management
@ -83,7 +86,7 @@ public class CrawlQueues {
public void relocate ( final File newQueuePath ) {
this . close ( ) ;
this . workers = new ConcurrentHashMap < Integer , crawlWork er> ( ) ;
this . workers = new ConcurrentHashMap < Integer , Load er> ( ) ;
this . remoteCrawlProviderHashes . clear ( ) ;
noticeURL = new NoticedURL ( newQueuePath , sb . useTailCache , sb . exceed134217727 ) ;
@ -94,10 +97,10 @@ public class CrawlQueues {
public void close ( ) {
// wait for all workers to finish
for ( final crawlWork er w : workers . values ( ) ) {
for ( final Load er w : workers . values ( ) ) {
w . interrupt ( ) ;
}
for ( final crawlWork er w : workers . values ( ) ) {
for ( final Load er w : workers . values ( ) ) {
try {
w . join ( ) ;
} catch ( InterruptedException e ) {
@ -111,7 +114,7 @@ public class CrawlQueues {
public void clear ( ) {
// wait for all workers to finish
for ( final crawlWork er w : workers . values ( ) ) {
for ( final Load er w : workers . values ( ) ) {
w . interrupt ( ) ;
}
// TODO: wait some more time until all threads are finished
@ -139,7 +142,7 @@ public class CrawlQueues {
if ( delegatedURL . exists ( hash ) ) return "delegated" ;
if ( errorURL . exists ( hash ) ) return "errors" ;
if ( noticeURL . existsInStack ( hash ) ) return "crawler" ;
for ( final crawlWork er worker : workers . values ( ) ) {
for ( final Load er worker : workers . values ( ) ) {
if ( Base64Order . enhancedCoder . equal ( worker . request . url ( ) . hash ( ) , hash ) ) return "worker" ;
}
return null ;
@ -158,7 +161,7 @@ public class CrawlQueues {
if ( ee ! = null ) return ee . url ( ) ;
ee = errorURL . get ( urlhash ) ;
if ( ee ! = null ) return ee . url ( ) ;
for ( final crawlWork er w : workers . values ( ) ) {
for ( final Load er w : workers . values ( ) ) {
if ( Base64Order . enhancedCoder . equal ( w . request . url ( ) . hash ( ) , urlhash ) ) return w . request . url ( ) ;
}
final Request ne = noticeURL . get ( urlhash ) ;
@ -169,7 +172,7 @@ public class CrawlQueues {
public void cleanup ( ) {
// wait for all workers to finish
int timeout = ( int ) sb . getConfigLong ( "crawler.clientTimeout" , 10000 ) ;
for ( final crawlWork er w : workers . values ( ) ) {
for ( final Load er w : workers . values ( ) ) {
if ( w . age ( ) > timeout ) w . interrupt ( ) ;
}
}
@ -178,7 +181,7 @@ public class CrawlQueues {
synchronized ( workers ) {
final Request [ ] e = new Request [ workers . size ( ) ] ;
int i = 0 ;
for ( final crawlWork er w : workers . values ( ) ) {
for ( final Load er w : workers . values ( ) ) {
if ( i > = e . length ) break ;
e [ i + + ] = w . request ;
}
@ -187,7 +190,7 @@ public class CrawlQueues {
}
public int coreCrawlJobSize ( ) {
return noticeURL . stackSize ( NoticedURL . S TACK_TYPE_ CORE) ;
return noticeURL . stackSize ( NoticedURL . S tackType. CORE) ;
}
public boolean coreCrawlJob ( ) {
@ -200,14 +203,14 @@ public class CrawlQueues {
// move some tasks to the core crawl job so we have something to do
final int toshift = Math . min ( 10 , limitCrawlJobSize ( ) ) ; // this cannot be a big number because the balancer makes a forced waiting if it cannot balance
for ( int i = 0 ; i < toshift ; i + + ) {
noticeURL . shift ( NoticedURL . S TACK_TYPE_LIMIT, NoticedURL . STACK_TYPE_ CORE, sb . crawler . profilesActiveCrawls ) ;
noticeURL . shift ( NoticedURL . S tackType. LIMIT , NoticedURL . StackType . CORE, sb . crawler . profilesActiveCrawls ) ;
}
log . logInfo ( "shifted " + toshift + " jobs from global crawl to local crawl (coreCrawlJobSize()=" + coreCrawlJobSize ( ) +
", limitCrawlJobSize()=" + limitCrawlJobSize ( ) + ", cluster.mode=" + sb . getConfig ( SwitchboardConstants . CLUSTER_MODE , "" ) +
", robinsonMode=" + ( ( sb . isRobinsonMode ( ) ) ? "on" : "off" ) ) ;
}
String queueCheck = crawlIsPossible( NoticedURL . STACK_TYPE_ CORE) ;
String queueCheck = loadIsPossible( NoticedURL . StackType . CORE) ;
if ( queueCheck ! = null ) {
if ( log . isFine ( ) ) log . logFine ( "omitting de-queue/local: " + queueCheck ) ;
return false ;
@ -219,11 +222,39 @@ public class CrawlQueues {
}
// do a local crawl
Request urlEntry = null ;
while ( urlEntry = = null & & noticeURL . stackSize ( NoticedURL . STACK_TYPE_CORE ) > 0 ) {
final String stats = "LOCALCRAWL[" + noticeURL . stackSize ( NoticedURL . STACK_TYPE_CORE ) + ", " + noticeURL . stackSize ( NoticedURL . STACK_TYPE_LIMIT ) + ", " + noticeURL . stackSize ( NoticedURL . STACK_TYPE_OVERHANG ) + ", " + noticeURL . stackSize ( NoticedURL . STACK_TYPE_REMOTE ) + "]" ;
Request urlEntry ;
while ( noticeURL . stackSize ( NoticedURL . StackType . CORE ) > 0 | | noticeURL . stackSize ( NoticedURL . StackType . NOLOAD ) > 0 ) {
final String stats = "LOCALCRAWL[" +
noticeURL . stackSize ( NoticedURL . StackType . NOLOAD ) + ", " +
noticeURL . stackSize ( NoticedURL . StackType . CORE ) + ", " +
noticeURL . stackSize ( NoticedURL . StackType . LIMIT ) + ", " +
noticeURL . stackSize ( NoticedURL . StackType . OVERHANG ) +
", " + noticeURL . stackSize ( NoticedURL . StackType . REMOTE ) + "]" ;
try {
urlEntry = noticeURL . pop ( NoticedURL . STACK_TYPE_CORE , true , sb . crawler . profilesActiveCrawls ) ;
if ( noticeURL . stackSize ( NoticedURL . StackType . NOLOAD ) > 0 ) {
// get one entry that will not be loaded, just indexed
urlEntry = noticeURL . pop ( NoticedURL . StackType . NOLOAD , true , sb . crawler . profilesActiveCrawls ) ;
if ( urlEntry = = null ) continue ;
final String profileHandle = urlEntry . profileHandle ( ) ;
if ( profileHandle = = null ) {
log . logSevere ( stats + ": NULL PROFILE HANDLE '" + urlEntry . profileHandle ( ) + "' for URL " + urlEntry . url ( ) ) ;
return true ;
}
Map < String , String > map = sb . crawler . profilesActiveCrawls . get ( profileHandle ) ;
if ( map = = null ) {
log . logSevere ( stats + ": NULL PROFILE HANDLE '" + urlEntry . profileHandle ( ) + "' for URL " + urlEntry . url ( ) ) ;
return true ;
}
try {
sb . indexingDocumentProcessor . enQueue ( new indexingQueueEntry ( Segments . Process . LOCALCRAWLING , new Response ( urlEntry , new CrawlProfile ( map ) ) , null , null ) ) ;
Log . logInfo ( "CrawlQueues" , "placed NOLOAD URL on indexing queue: " + urlEntry . url ( ) . toNormalform ( true , false ) ) ;
} catch ( InterruptedException e ) {
Log . logException ( e ) ;
}
return true ;
}
urlEntry = noticeURL . pop ( NoticedURL . StackType . CORE , true , sb . crawler . profilesActiveCrawls ) ;
if ( urlEntry = = null ) continue ;
final String profileHandle = urlEntry . profileHandle ( ) ;
// System.out.println("DEBUG plasmaSwitchboard.processCrawling:
@ -232,11 +263,11 @@ public class CrawlQueues {
log . logSevere ( stats + ": NULL PROFILE HANDLE '" + urlEntry . profileHandle ( ) + "' for URL " + urlEntry . url ( ) ) ;
return true ;
}
generateCrawl ( urlEntry , stats , profileHandle ) ;
load ( urlEntry , stats , profileHandle ) ;
return true ;
} catch ( final IOException e ) {
log . logSevere ( stats + ": CANNOT FETCH ENTRY: " + e . getMessage ( ) , e ) ;
if ( e . getMessage ( ) . indexOf ( "hash is null" ) > 0 ) noticeURL . clear ( NoticedURL . S TACK_TYPE_ CORE) ;
if ( e . getMessage ( ) . indexOf ( "hash is null" ) > 0 ) noticeURL . clear ( NoticedURL . S tackType. CORE) ;
}
}
return true ;
@ -250,7 +281,7 @@ public class CrawlQueues {
* @param stats String for log prefixing
* @return
* /
private void generateCrawl ( Request urlEntry , final String stats , final String profileHandle ) {
private void load ( Request urlEntry , final String stats , final String profileHandle ) {
final Map < String , String > mp = sb . crawler . profilesActiveCrawls . get ( profileHandle . getBytes ( ) ) ;
if ( mp ! = null ) {
@ -270,10 +301,10 @@ public class CrawlQueues {
+ ", permission=" + ( ( sb . peers = = null ) ? "undefined" : ( ( ( sb . peers . mySeed ( ) . isSenior ( ) ) | | ( sb . peers . mySeed ( ) . isPrincipal ( ) ) ) ? "true" : "false" ) ) ) ;
// work off one Crawl stack entry
if ( ( urlEntry = = null ) | | ( urlEntry . url ( ) = = null ) ) {
if ( urlEntry = = null | | urlEntry . url ( ) = = null ) {
log . logInfo ( stats + ": urlEntry = null" ) ;
} else {
new crawlWork er( urlEntry ) ;
new Load er( urlEntry ) ;
}
} else {
@ -309,7 +340,7 @@ public class CrawlQueues {
* @param stackType
* @return
* /
private String crawlIsPossible( int stackType ) {
private String loadIsPossible( StackType stackType ) {
//System.out.println("stacksize = " + noticeURL.stackSize(stackType));
if ( noticeURL . stackSize ( stackType ) = = 0 ) {
//log.logDebug("GlobalCrawl: queue is empty");
@ -443,7 +474,8 @@ public class CrawlQueues {
sb . crawler . defaultRemoteProfile . handle ( ) ,
0 ,
0 ,
0
0 ,
item . getSize ( )
) ) ;
} else {
log . logWarning ( "crawlOrder: Rejected URL '" + urlToString ( url ) + "': " + urlRejectReason ) ;
@ -461,11 +493,11 @@ public class CrawlQueues {
}
public int limitCrawlJobSize ( ) {
return noticeURL . stackSize ( NoticedURL . S TACK_TYPE_ LIMIT) ;
return noticeURL . stackSize ( NoticedURL . S tackType. LIMIT) ;
}
public int remoteTriggeredCrawlJobSize ( ) {
return noticeURL . stackSize ( NoticedURL . S TACK_TYPE_ REMOTE) ;
return noticeURL . stackSize ( NoticedURL . S tackType. REMOTE) ;
}
public boolean remoteTriggeredCrawlJob ( ) {
@ -473,7 +505,7 @@ public class CrawlQueues {
// do nothing if either there are private processes to be done
// or there is no global crawl on the stack
String queueCheck = crawlIsPossible( NoticedURL . STACK_TYPE_ REMOTE) ;
String queueCheck = loadIsPossible( NoticedURL . StackType . REMOTE) ;
if ( queueCheck ! = null ) {
if ( log . isFinest ( ) ) log . logFinest ( "omitting de-queue/remote: " + queueCheck ) ;
return false ;
@ -485,19 +517,19 @@ public class CrawlQueues {
}
// we don't want to crawl a global URL globally, since WE are the global part. (from this point of view)
final String stats = "REMOTETRIGGEREDCRAWL[" + noticeURL . stackSize ( NoticedURL . S TACK_TYPE_ CORE) + ", " + noticeURL . stackSize ( NoticedURL . S TACK_TYPE_ LIMIT) + ", " + noticeURL . stackSize ( NoticedURL . S TACK_TYPE_ OVERHANG) + ", "
+ noticeURL . stackSize ( NoticedURL . S TACK_TYPE_ REMOTE) + "]" ;
final String stats = "REMOTETRIGGEREDCRAWL[" + noticeURL . stackSize ( NoticedURL . S tackType. CORE) + ", " + noticeURL . stackSize ( NoticedURL . S tackType. LIMIT) + ", " + noticeURL . stackSize ( NoticedURL . S tackType. OVERHANG) + ", "
+ noticeURL . stackSize ( NoticedURL . S tackType. REMOTE) + "]" ;
try {
final Request urlEntry = noticeURL . pop ( NoticedURL . S TACK_TYPE_ REMOTE, true , sb . crawler . profilesActiveCrawls ) ;
final Request urlEntry = noticeURL . pop ( NoticedURL . S tackType. REMOTE, true , sb . crawler . profilesActiveCrawls ) ;
final String profileHandle = urlEntry . profileHandle ( ) ;
// System.out.println("DEBUG plasmaSwitchboard.processCrawling:
// profileHandle = " + profileHandle + ", urlEntry.url = " +
// urlEntry.url());
generateCrawl ( urlEntry , stats , profileHandle ) ;
load ( urlEntry , stats , profileHandle ) ;
return true ;
} catch ( final IOException e ) {
log . logSevere ( stats + ": CANNOT FETCH ENTRY: " + e . getMessage ( ) , e ) ;
if ( e . getMessage ( ) . indexOf ( "hash is null" ) > 0 ) noticeURL . clear ( NoticedURL . S TACK_TYPE_ REMOTE) ;
if ( e . getMessage ( ) . indexOf ( "hash is null" ) > 0 ) noticeURL . clear ( NoticedURL . S tackType. REMOTE) ;
return true ;
}
}
@ -507,13 +539,13 @@ public class CrawlQueues {
return workers . size ( ) ;
}
protected final class crawlWork er extends Thread {
protected final class Load er extends Thread {
protected Request request ;
private final Integer code ;
private final long start ;
public crawlWork er( final Request entry ) {
public Load er( final Request entry ) {
this . start = System . currentTimeMillis ( ) ;
this . request = entry ;
this . request . setStatus ( "worker-initialized" , WorkflowJob . STATUS_INITIATED ) ;
@ -600,7 +632,7 @@ public class CrawlQueues {
// Client.initConnectionManager();
this . request . setStatus ( "worker-exception" , WorkflowJob . STATUS_FINISHED ) ;
} finally {
crawlWork er w = workers . remove ( code ) ;
Load er w = workers . remove ( code ) ;
assert w ! = null ;
}
}