@ -76,13 +76,22 @@ public class Balancer {
private BufferedObjectIndex urlFileIndex ;
// class variables computed during operation
private final ConcurrentMap < String , H andleSet > domainStacks ; // a map from host name to lists with url hashs
private final ConcurrentMap < String , H ostHandles > domainStacks ; // a map from host name to lists with url hashs
private final HandleSet double_push_check ; // for debugging
private long lastDomainStackFill ;
private int domStackInitSize ;
private final List < Map . Entry < String , byte [ ] > > zeroWaitingCandidates ;
private final Random random ; // used to alternate between choose-from-maxstack or choose from any zero-waiting
private static class HostHandles {
public String hosthash ;
public HandleSet handleSet ;
public HostHandles ( final String hosthash , final HandleSet handleSet ) {
this . hosthash = hosthash ;
this . handleSet = handleSet ;
}
}
public Balancer (
final File cachePath ,
final String stackname ,
@ -92,7 +101,7 @@ public class Balancer {
final boolean useTailCache ,
final boolean exceed134217727 ) {
this . cacheStacksPath = cachePath ;
this . domainStacks = new ConcurrentHashMap < String , H andleSet > ( ) ;
this . domainStacks = new ConcurrentHashMap < String , H ostHandles > ( ) ;
this . minimumLocalDelta = minimumLocalDelta ;
this . minimumGlobalDelta = minimumGlobalDelta ;
this . myAgentIDs = myAgentIDs ;
@ -204,10 +213,10 @@ public class Balancer {
assert this . urlFileIndex . size ( ) + removedCounter = = s : "urlFileIndex.size() = " + this . urlFileIndex . size ( ) + ", s = " + s ;
// iterate through the domain stacks
final Iterator < Map . Entry < String , H andleSet > > q = this . domainStacks . entrySet ( ) . iterator ( ) ;
final Iterator < Map . Entry < String , H ostHandles > > q = this . domainStacks . entrySet ( ) . iterator ( ) ;
HandleSet stack ;
while ( q . hasNext ( ) ) {
stack = q . next ( ) . getValue ( ) ;
stack = q . next ( ) . getValue ( ) .handleSet ;
for ( final byte [ ] handle : urlHashes ) stack . remove ( handle ) ;
if ( stack . isEmpty ( ) ) q . remove ( ) ;
}
@ -242,8 +251,8 @@ public class Balancer {
private boolean domainStacksNotEmpty ( ) {
if ( this . domainStacks = = null ) return false ;
synchronized ( this . domainStacks ) {
for ( final H andleSet l : this . domainStacks . values ( ) ) {
if ( ! l . isEmpty( ) ) return true ;
for ( final H ostHandles l : this . domainStacks . values ( ) ) {
if ( ! l . handleSet. isEmpty( ) ) return true ;
}
}
return false ;
@ -285,11 +294,11 @@ public class Balancer {
* get a list of domains that are currently maintained as domain stacks
* @return a map of clear text strings of host names to an integer array : { the size of the domain stack , guessed delta waiting time }
* /
public Map < String , Integer [ ] > getDomainStackHosts ( ) {
public Map < String , Integer [ ] > getDomainStackHosts ( RobotsTxt robots ) {
Map < String , Integer [ ] > map = new TreeMap < String , Integer [ ] > ( ) ; // we use a tree map to get a stable ordering
for ( Map . Entry < String , H andleSet > entry : this . domainStacks . entrySet ( ) ) {
int size = entry . getValue ( ) . size( ) ;
int delta = Latency . waitingRemainingGuessed ( entry . getKey ( ) , this . minimumLocalDelta , this . minimumGlobalDelta ) ;
for ( Map . Entry < String , H ostHandles > entry : this . domainStacks . entrySet ( ) ) {
int size = entry . getValue ( ) . handleSet. size( ) ;
int delta = Latency . waitingRemainingGuessed ( entry . getKey ( ) , entry . getValue ( ) . hosthash , robots , this . myAgentIDs , this . minimumLocalDelta , this . minimumGlobalDelta ) ;
map . put ( entry . getKey ( ) , new Integer [ ] { size , delta } ) ;
}
return map ;
@ -333,8 +342,10 @@ public class Balancer {
* @return a list of crawl loader requests
* /
public List < Request > getDomainStackReferences ( String host , int maxcount ) {
HandleSet domainList = this . domainStacks . get ( host ) ;
if ( domainList = = null | | domainList . isEmpty ( ) ) return new ArrayList < Request > ( 0 ) ;
HostHandles hh = this . domainStacks . get ( host ) ;
if ( hh = = null ) return new ArrayList < Request > ( 0 ) ;
HandleSet domainList = hh . handleSet ;
if ( domainList . isEmpty ( ) ) return new ArrayList < Request > ( 0 ) ;
ArrayList < Request > cel = new ArrayList < Request > ( maxcount ) ;
for ( int i = 0 ; i < maxcount ; i + + ) {
if ( domainList . size ( ) < = i ) break ;
@ -358,16 +369,17 @@ public class Balancer {
return cel ;
}
private void pushHashToDomainStacks ( String host , final byte [ ] urlhash ) throws SpaceExceededException {
private void pushHashToDomainStacks ( String host , String hosthash , final byte [ ] urlhash ) throws SpaceExceededException {
// extend domain stack
if ( host = = null ) host = Domains . LOCALHOST ;
H andleSet domainList = this . domainStacks . get ( host ) ;
if ( domainList = = null ) {
H ostHandles hh = this . domainStacks . get ( host ) ;
if ( hh = = null ) {
// create new list
domainList = new RowHandleSet ( 12 , Base64Order . enhancedCoder , 1 ) ;
HandleSet domainList = new RowHandleSet ( 12 , Base64Order . enhancedCoder , 1 ) ;
domainList . put ( urlhash ) ;
this . domainStacks . put ( host , domainList ) ;
this . domainStacks . put ( host , new HostHandles ( hosthash , domainList ) ) ;
} else {
HandleSet domainList = hh . handleSet ;
// extend existent domain list
domainList . put ( urlhash ) ;
}
@ -376,11 +388,12 @@ public class Balancer {
private void removeHashFromDomainStacks ( String host , final byte [ ] urlhash ) {
// reduce domain stack
if ( host = = null ) host = Domains . LOCALHOST ;
final HandleSet domainList = this . domainStacks . get ( host ) ;
if ( domainList = = null ) {
HostHandles hh = this . domainStacks . get ( host ) ;
if ( hh = = null ) {
this . domainStacks . remove ( host ) ;
return ;
}
HandleSet domainList = hh . handleSet ;
domainList . remove ( urlhash ) ;
if ( domainList . isEmpty ( ) ) this . domainStacks . remove ( host ) ;
}
@ -495,26 +508,24 @@ public class Balancer {
}
// iterate over the domain stacks
final Iterator < Map . Entry < String , HandleSet > > i = this . domainStacks . entrySet ( ) . iterator ( ) ;
Map . Entry < String , HandleSet > entry ;
long smallestWaiting = Long . MAX_VALUE ;
byte [ ] besturlhash = null ;
String besthost = null ;
OrderedScoreMap < Map . Entry < String , byte [ ] > > nextZeroCandidates = new OrderedScoreMap < Map . Entry < String , byte [ ] > > ( null ) ;
int newCandidatesForward = 10 ;
final Iterator < Map . Entry < String , HostHandles > > i = this . domainStacks . entrySet ( ) . iterator ( ) ;
Map . Entry < String , HostHandles > entry ;
OrderedScoreMap < Map . Entry < String , byte [ ] > > nextZeroCandidates = new OrderedScoreMap < Map . Entry < String , byte [ ] > > ( null ) ;
OrderedScoreMap < Map . Entry < String , byte [ ] > > failoverCandidates = new OrderedScoreMap < Map . Entry < String , byte [ ] > > ( null ) ;
int newCandidatesForward = 1 ;
while ( i . hasNext ( ) & & nextZeroCandidates . size ( ) < 1000 ) {
entry = i . next ( ) ;
// clean up empty entries
if ( entry . getValue ( ) . isEmpty( ) ) {
if ( entry . getValue ( ) . handleSet. isEmpty( ) ) {
i . remove ( ) ;
continue ;
}
final byte [ ] urlhash = entry . getValue ( ) . getOne( 0 ) ;
final byte [ ] urlhash = entry . getValue ( ) . handleSet. getOne( 0 ) ;
if ( urlhash = = null ) continue ;
long w ;
int w ;
Row . Entry rowEntry ;
try {
rowEntry = this . urlFileIndex . get ( urlhash , false ) ;
@ -526,50 +537,55 @@ public class Balancer {
//System.out.println("*** waitingRemaining = " + w + ", guessed = " + Latency.waitingRemainingGuessed(entry.getKey(), this.minimumLocalDelta, this.minimumGlobalDelta));
//System.out.println("*** explained: " + Latency.waitingRemainingExplain(crawlEntry.url(), robots, this.myAgentIDs, this.minimumLocalDelta, this.minimumGlobalDelta));
} catch ( IOException e1 ) {
w = Latency . waitingRemainingGuessed ( entry . getKey ( ) , this . minimumLocalDelta , this . minimumGlobalDelta ) ;
w = Latency . waitingRemainingGuessed ( entry . getKey ( ) , entry . getValue ( ) . hosthash , robots , this . myAgentIDs , this . minimumLocalDelta , this . minimumGlobalDelta ) ;
}
if ( w < = 0 ) {
if ( w = = Integer . MIN_VALUE & & newCandidatesForward > 0 ) {
// give new domains a chance, but not too much; otherwise a massive downloading of robots.txt from too much domains (dns lock!) will more likely block crawling
newCandidatesForward - - ;
nextZeroCandidates . set ( new AbstractMap . SimpleEntry < String , byte [ ] > ( entry . getKey ( ) , urlhash ) , 1000 ) ;
if ( w = = Integer . MIN_VALUE ) {
if ( newCandidatesForward - - > 0 ) {
nextZeroCandidates . set ( new AbstractMap . SimpleEntry < String , byte [ ] > ( entry . getKey ( ) , urlhash ) , 10000 ) ;
} else {
failoverCandidates . set ( new AbstractMap . SimpleEntry < String , byte [ ] > ( entry . getKey ( ) , urlhash ) , 0 ) ;
}
} else {
nextZeroCandidates . set ( new AbstractMap . SimpleEntry < String , byte [ ] > ( entry . getKey ( ) , urlhash ) , entry . getValue ( ) . size ( ) ) ;
nextZeroCandidates . set ( new AbstractMap . SimpleEntry < String , byte [ ] > ( entry . getKey ( ) , urlhash ) , entry . getValue ( ) . handleSet. size( ) ) ;
}
}
if ( w < smallestWaiting | | ( w = = smallestWaiting & & this . random . nextBoolean ( ) ) ) {
smallestWaiting = w ;
besturlhash = urlhash ;
besthost = entry . getKey ( ) ;
} else {
failoverCandidates . set ( new AbstractMap . SimpleEntry < String , byte [ ] > ( entry . getKey ( ) , urlhash ) , w ) ;
}
}
Log . logInfo ( "Balancer" , "*** getbest: created new nextZeroCandidates-list, size = " + nextZeroCandidates . size ( ) + ", domainStacks.size = " + this . domainStacks . size ( ) ) ;
if ( besturlhash = = null ) {
Log . logInfo ( "Balancer" , "*** getbest: besturlhash == null" ) ;
return null ; // this should never happen
if ( ! nextZeroCandidates . isEmpty ( ) ) {
// take some of the nextZeroCandidates and put the best into the zeroWaitingCandidates
int pick = nextZeroCandidates . size ( ) < = 10 ? nextZeroCandidates . size ( ) : Math . max ( 1 , nextZeroCandidates . size ( ) / 3 ) ;
Iterator < Map . Entry < String , byte [ ] > > k = nextZeroCandidates . keys ( false ) ;
while ( k . hasNext ( ) & & pick - - > 0 ) {
this . zeroWaitingCandidates . add ( k . next ( ) ) ;
}
Log . logInfo ( "Balancer" , "*** getbest: created new zeroWaitingCandidates-list, size = " + zeroWaitingCandidates . size ( ) + ", domainStacks.size = " + this . domainStacks . size ( ) ) ;
return pickFromZeroWaiting ( ) ;
}
// best case would be, if we have some zeroWaitingCandidates,
// then we select that one with the largest stack
if ( nextZeroCandidates . isEmpty ( ) ) {
if ( ! failoverCandidates . isEmpty ( ) ) {
// bad luck: just take that one with least waiting
removeHashFromDomainStacks ( besthost , besturlhash ) ;
Log . logInfo ( "Balancer" , "*** getbest: no zero waiting candidates, besthost = " + besthost ) ;
return besturlhash ;
}
Iterator < Map . Entry < String , byte [ ] > > k = failoverCandidates . keys ( true ) ;
String besthost ;
byte [ ] besturlhash ;
Map . Entry < String , byte [ ] > hosthash ;
while ( k . hasNext ( ) ) {
hosthash = k . next ( ) ;
besthost = hosthash . getKey ( ) ;
besturlhash = hosthash . getValue ( ) ;
removeHashFromDomainStacks ( besthost , besturlhash ) ;
Log . logInfo ( "Balancer" , "*** getbest: no zero waiting candidates, besthost = " + besthost ) ;
return besturlhash ;
}
}
// now take some of the nextZeroCandidates and put the best into the zeroWaitingCandidates
int pick = nextZeroCandidates . size ( ) < = 10 ? nextZeroCandidates . size ( ) : Math . max ( 1 , nextZeroCandidates . size ( ) / 3 ) ;
Iterator < Map . Entry < String , byte [ ] > > k = nextZeroCandidates . keys ( false ) ;
while ( k . hasNext ( ) & & pick - - > 0 ) {
this . zeroWaitingCandidates . add ( k . next ( ) ) ;
}
Log . logInfo ( "Balancer" , "*** getbest: created new zeroWaitingCandidates-list, size = " + zeroWaitingCandidates . size ( ) + ", domainStacks.size = " + this . domainStacks . size ( ) ) ;
return pickFromZeroWaiting ( ) ;
Log . logInfo ( "Balancer" , "*** getbest: besturlhash == null" ) ;
return null ; // this should never happen
}
}
@ -579,8 +595,8 @@ public class Balancer {
byte [ ] hash = null ;
while ( this . zeroWaitingCandidates . size ( ) > 0 ) {
Map . Entry < String , byte [ ] > z = this . zeroWaitingCandidates . remove ( this . random . nextInt ( this . zeroWaitingCandidates . size ( ) ) ) ;
H andleSet hs = this . domainStacks . get ( z . getKey ( ) ) ;
if ( h s = = null ) continue ;
H ostHandles hh = this . domainStacks . get ( z . getKey ( ) ) ;
if ( h h = = null ) continue ;
host = z . getKey ( ) ; if ( host = = null ) continue ;
hash = z . getValue ( ) ; if ( hash = = null ) continue ;
removeHashFromDomainStacks ( host , hash ) ;
@ -604,6 +620,7 @@ public class Balancer {
String host ;
Request request ;
int count = 0 ;
long timeout = System . currentTimeMillis ( ) + 5000 ;
while ( i . hasNext ( ) ) {
handle = i . next ( ) ;
final Row . Entry entry = this . urlFileIndex . get ( handle , false ) ;
@ -611,12 +628,12 @@ public class Balancer {
request = new Request ( entry ) ;
host = request . url ( ) . getHost ( ) ;
try {
pushHashToDomainStacks ( host , handle) ;
pushHashToDomainStacks ( host , request. url ( ) . hosthash ( ) , handle) ;
} catch ( final SpaceExceededException e ) {
break ;
}
count + + ;
if ( this . domainStacks . size ( ) > = 100 | | ( ! this . domainStacks . isEmpty ( ) & & count > 600 * this . domainStacks . size ( ) ) ) break ;
if ( this . domainStacks . size ( ) > = 100 0 | | count > = 100000 | | System . currentTimeMillis ( ) > timeout ) break ;
}
Log . logInfo ( "BALANCER" , "re-fill of domain stacks; fileIndex.size() = " + this . urlFileIndex . size ( ) + ", domainStacks.size = " + this . domainStacks . size ( ) + ", collection time = " + ( System . currentTimeMillis ( ) - this . lastDomainStackFill ) + " ms" ) ;
this . domStackInitSize = this . domainStacks . size ( ) ;