@ -55,6 +55,7 @@ public class Balancer {
private long minimumLocalDelta ;
private long minimumGlobalDelta ;
private long lastDomainStackFill ;
private int domStackInitSize ;
public Balancer (
final File cachePath ,
@ -69,6 +70,7 @@ public class Balancer {
this . delayed = new TreeMap < Long , String > ( ) ;
this . minimumLocalDelta = minimumLocalDelta ;
this . minimumGlobalDelta = minimumGlobalDelta ;
this . domStackInitSize = Integer . MAX_VALUE ;
// create a stack for newly entered entries
if ( ! ( cachePath . exists ( ) ) ) cachePath . mkdir ( ) ; // make the path
@ -272,7 +274,6 @@ public class Balancer {
}
private String nextFromDelayed ( ) {
if ( this . delayed . size ( ) = = 0 ) return null ;
if ( this . delayed . size ( ) = = 0 ) return null ;
Long first = this . delayed . firstKey ( ) ;
if ( first . longValue ( ) < System . currentTimeMillis ( ) ) {
@ -281,6 +282,12 @@ public class Balancer {
return null ;
}
private String anyFromDelayed ( ) {
if ( this . delayed . size ( ) = = 0 ) return null ;
Long first = this . delayed . firstKey ( ) ;
return this . delayed . remove ( first ) ;
}
/ * *
* get the next entry in this crawl queue in such a way that the domain access time delta is maximized
* and always above the given minimum delay time . An additional delay time is computed using the robots . txt
@ -304,7 +311,14 @@ public class Balancer {
filltop ( delay , - 2000 , false ) ;
filltop ( delay , - 1000 , false ) ;
filltop ( delay , - 500 , false ) ;
filltop ( delay , 0 , true ) ;
filltop ( delay , 0 , true ) ;
filltop ( delay , 500 , true ) ;
filltop ( delay , 1000 , true ) ;
filltop ( delay , 2000 , true ) ;
filltop ( delay , 3000 , true ) ;
filltop ( delay , 4000 , true ) ;
filltop ( delay , 6000 , true ) ;
filltop ( delay , Long . MAX_VALUE , true ) ;
long sleeptime = 0 ;
Request crawlEntry = null ;
@ -318,6 +332,9 @@ public class Balancer {
nexthash = top . remove ( ) ;
//System.out.println("*** top.remove()=" + nexthash);
}
if ( nexthash = = null ) {
nexthash = anyFromDelayed ( ) ;
}
// check minimumDelta and if necessary force a sleep
//final int s = urlFileIndex.size();
@ -358,10 +375,9 @@ public class Balancer {
assert nexthash . equals ( new String ( rowEntry . getPrimaryKeyBytes ( ) ) ) : "result = " + nexthash + ", rowEntry.getPrimaryKeyBytes() = " + new String ( rowEntry . getPrimaryKeyBytes ( ) ) ;
assert nexthash . equals ( crawlEntry . url ( ) . hash ( ) ) : "result = " + nexthash + ", crawlEntry.url().hash() = " + crawlEntry . url ( ) . hash ( ) ;
if ( this . domainStacks . size ( ) < = 1 ) break ;
if ( failhash ! = null & & failhash . equals ( nexthash ) ) break ; // prevent endless loops
if ( delay & & sleeptime > 0 ) {
if ( delay & & sleeptime > 0 & & this . domStackInitSize > 1 ) {
//System.out.println("*** putback: nexthash=" + nexthash + ", failhash="+failhash);
// put that thing back to omit a delay here
if ( ! delayed . values ( ) . contains ( nexthash ) ) {
@ -383,7 +399,7 @@ public class Balancer {
// in best case, this should never happen if the balancer works propertly
// this is only to protection against the worst case, where the crawler could
// behave in a DoS-manner
Log . logInfo ( "BALANCER" , "forcing crawl-delay of " + sleeptime + " milliseconds for " + crawlEntry . url ( ) . getHost ( ) + ( ( sleeptime > Math . max ( minimumLocalDelta , minimumGlobalDelta ) ) ? " (forced latency)" : "" ) ) ;
Log . logInfo ( "BALANCER" , "forcing crawl-delay of " + sleeptime + " milliseconds for " + crawlEntry . url ( ) . getHost ( ) + ": " + Latency . waitingRemainingExplain ( crawlEntry . url ( ) , minimumLocalDelta , minimumGlobalDelta ) + ", top.size() = " + top . size ( ) + ", delayed.size() = " + delayed . size ( ) + ", domainStacks.size() = " + domainStacks . size ( ) + ", domainStacksInitSize = " + this . domStackInitSize ) ;
long loops = sleeptime / 3000 ;
long rest = sleeptime % 3000 ;
if ( loops < 2 ) {
@ -395,7 +411,6 @@ public class Balancer {
Log . logInfo ( "BALANCER" , "waiting for " + crawlEntry . url ( ) . getHost ( ) + ": " + ( ( loops - i ) * 3 ) + " seconds remaining..." ) ;
try { synchronized ( this ) { this . wait ( 3000 ) ; } } catch ( final InterruptedException e ) { }
}
if ( sleeptime > 3000 & & this . domainStacks . size ( ) > 1 ) this . domainStacks . remove ( crawlEntry . url ( ) . hash ( ) . substring ( 6 ) ) ;
}
Latency . update ( crawlEntry . url ( ) . hash ( ) . substring ( 6 ) , crawlEntry . url ( ) . getHost ( ) ) ;
return crawlEntry ;
@ -420,6 +435,8 @@ public class Balancer {
String besthash = null ;
while ( i . hasNext ( ) ) {
entry = i . next ( ) ;
// clean up empty entries
if ( entry . getValue ( ) . size ( ) = = 0 ) {
i . remove ( ) ;
continue ;
@ -450,7 +467,7 @@ public class Balancer {
}
private void fillDomainStacks ( int maxdomstacksize ) throws IOException {
if ( this . domainStacks . size ( ) > 0 & & System . currentTimeMillis ( ) - lastDomainStackFill < 20 0000L) return ;
if ( this . domainStacks . size ( ) > 0 & & System . currentTimeMillis ( ) - lastDomainStackFill < 1 20000L) return ;
this . domainStacks . clear ( ) ;
//synchronized (this.delayed) { delayed.clear(); }
this . lastDomainStackFill = System . currentTimeMillis ( ) ;
@ -459,6 +476,8 @@ public class Balancer {
pushHashToDomainStacks ( new String ( i . next ( ) ) , 50 ) ;
if ( this . domainStacks . size ( ) > maxdomstacksize ) break ;
}
Log . logInfo ( "BALANCER" , "re-fill of domain stacks; fileIndex.size() = " + this . urlFileIndex . size ( ) + ", domainStacks.size = " + domainStacks . size ( ) + ", collection time = " + ( System . currentTimeMillis ( ) - this . lastDomainStackFill ) + " ms" ) ;
this . domStackInitSize = this . domainStacks . size ( ) ;
}
public ArrayList < Request > top ( int count ) {