@ -28,11 +28,12 @@ package net.yacy.crawler;
import java.io.File ;
import java.io.IOException ;
import java.util.AbstractMap ;
import java.util.ArrayList ;
import java.util.HashMap ;
import java.util.Iterator ;
import java.util.List ;
import java.util.Map ;
import java.util.Random ;
import java.util.Set ;
import java.util.TreeMap ;
import java.util.concurrent.ConcurrentHashMap ;
@ -44,6 +45,7 @@ import net.yacy.cora.federate.yacy.CacheStrategy;
import net.yacy.cora.order.Base64Order ;
import net.yacy.cora.order.CloneableIterator ;
import net.yacy.cora.protocol.Domains ;
import net.yacy.cora.sorting.OrderedScoreMap ;
import net.yacy.cora.storage.HandleSet ;
import net.yacy.cora.util.SpaceExceededException ;
import net.yacy.crawler.data.Cache ;
@ -78,6 +80,8 @@ public class Balancer {
private final HandleSet double_push_check ; // for debugging
private long lastDomainStackFill ;
private int domStackInitSize ;
private final List < Map . Entry < String , byte [ ] > > zeroWaitingCandidates ;
private final Random random ; // used to alternate between choose-from-maxstack or choose from any zero-waiting
public Balancer (
final File cachePath ,
@ -94,7 +98,9 @@ public class Balancer {
this . myAgentIDs = myAgentIDs ;
this . domStackInitSize = Integer . MAX_VALUE ;
this . double_push_check = new RowHandleSet ( URIMetadataRow . rowdef . primaryKeyLength , URIMetadataRow . rowdef . objectOrder , 0 ) ;
this . zeroWaitingCandidates = new ArrayList < Map . Entry < String , byte [ ] > > ( ) ;
this . random = new Random ( System . currentTimeMillis ( ) ) ;
// create a stack for newly entered entries
if ( ! ( cachePath . exists ( ) ) ) cachePath . mkdir ( ) ; // make the path
this . cacheStacksPath . mkdirs ( ) ;
@ -205,6 +211,12 @@ public class Balancer {
for ( final byte [ ] handle : urlHashes ) stack . remove ( handle ) ;
if ( stack . isEmpty ( ) ) q . remove ( ) ;
}
// iterate through zero-waiting map
final Iterator < Map . Entry < String , byte [ ] > > i = this . zeroWaitingCandidates . iterator ( ) ;
while ( i . hasNext ( ) ) {
if ( urlHashes . has ( i . next ( ) . getValue ( ) ) ) i . remove ( ) ;
}
return removedCounter ;
}
@ -274,32 +286,35 @@ public class Balancer {
public Map < String , Integer [ ] > getDomainStackHosts ( ) {
Map < String , Integer [ ] > map = new TreeMap < String , Integer [ ] > ( ) ; // we use a tree map to get a stable ordering
for ( Map . Entry < String , HandleSet > entry : this . domainStacks . entrySet ( ) ) {
map . put ( entry . getKey ( ) , new Integer [ ] { entry . getValue ( ) . size ( ) , ( int ) Latency . waitingRemainingGuessed ( entry . getKey ( ) , this . minimumLocalDelta , this . minimumGlobalDelta ) } ) ;
int size = entry . getValue ( ) . size ( ) ;
int delta = ( int ) Latency . waitingRemainingGuessed ( entry . getKey ( ) , this . minimumLocalDelta , this . minimumGlobalDelta ) ;
map . put ( entry . getKey ( ) , new Integer [ ] { size , delta } ) ;
}
return map ;
}
/ * *
* compute the current sleep time for a given crawl entry
* @param cs
* @param crawlEntry
* @return
* Get the minimum sleep time for a given url . The result can also be negative to reflect the time since the last access
* The time can be as low as Long . MIN_VALUE to show that there should not be any limitation at all .
* @param robots
* @param profileEntry
* @param crawlURL
* @return the sleep time in milliseconds ; may be negative for no sleep time
* /
public long getDomainSleepTime ( final CrawlSwitchboard cs , final RobotsTxt robots , Request crawlEntry ) {
final CrawlProfile profileEntry = cs . getActive ( UTF8 . getBytes ( crawlEntry . profileHandle ( ) ) ) ;
return getDomainSleepTime ( robots , profileEntry , crawlEntry . url ( ) ) ;
}
private long getDomainSleepTime ( final RobotsTxt robots , final CrawlProfile profileEntry , final DigestURI crawlURL ) {
if ( profileEntry = = null ) {
return 0 ;
}
if ( profileEntry = = null ) return 0 ;
long sleeptime = (
profileEntry . cacheStrategy ( ) = = CacheStrategy . CACHEONLY | |
( profileEntry . cacheStrategy ( ) = = CacheStrategy . IFEXIST & & Cache . has ( crawlURL . hash ( ) ) )
) ? 0 : Latency . waitingRemaining ( crawlURL , robots , this . myAgentIDs , this . minimumLocalDelta , this . minimumGlobalDelta ) ; // this uses the robots.txt database and may cause a loading of robots.txt from the server
) ? Integer . MIN_VALUE : Latency . waitingRemaining ( crawlURL , robots , this . myAgentIDs , this . minimumLocalDelta , this . minimumGlobalDelta ) ; // this uses the robots.txt database and may cause a loading of robots.txt from the server
return sleeptime ;
}
private long getRobotsTime ( final RobotsTxt robots , final CrawlProfile profileEntry , final DigestURI crawlURL ) {
if ( profileEntry = = null ) return 0 ;
long sleeptime = Latency . waitingRobots ( crawlURL , robots , this . myAgentIDs ) ; // this uses the robots.txt database and may cause a loading of robots.txt from the server
return sleeptime < 0 ? 0 : sleeptime ;
}
/ * *
* get lists of crawl request entries for a specific host
@ -377,6 +392,7 @@ public class Balancer {
long sleeptime = 0 ;
Request crawlEntry = null ;
CrawlProfile profileEntry = null ;
synchronized ( this ) {
byte [ ] failhash = null ;
while ( ! this . urlFileIndex . isEmpty ( ) ) {
@ -408,7 +424,7 @@ public class Balancer {
// at this point we must check if the crawlEntry has relevance because the crawl profile still exists
// if not: return null. A calling method must handle the null value and try again
final CrawlProfile profileEntry = cs . getActive ( UTF8 . getBytes ( crawlEntry . profileHandle ( ) ) ) ;
profileEntry = cs . getActive ( UTF8 . getBytes ( crawlEntry . profileHandle ( ) ) ) ;
if ( profileEntry = = null ) {
Log . logWarning ( "Balancer" , "no profile entry for handle " + crawlEntry . profileHandle ( ) ) ;
return null ;
@ -425,6 +441,8 @@ public class Balancer {
}
if ( crawlEntry = = null ) return null ;
long robotsTime = getRobotsTime ( robots , profileEntry , crawlEntry . url ( ) ) ;
Latency . updateAfterSelection ( crawlEntry . url ( ) , profileEntry = = null ? 0 : robotsTime ) ;
if ( delay & & sleeptime > 0 ) {
// force a busy waiting here
// in best case, this should never happen if the balancer works propertly
@ -442,96 +460,119 @@ public class Balancer {
Log . logInfo ( "BALANCER" , "waiting for " + crawlEntry . url ( ) . getHost ( ) + ": " + ( loops - i ) + " seconds remaining..." ) ;
try { this . wait ( 1000 ) ; } catch ( final InterruptedException e ) { }
}
Latency . updateAfterSelection ( crawlEntry . url ( ) , robotsTime ) ;
}
Latency . update ( crawlEntry . url ( ) ) ;
return crawlEntry ;
}
private byte [ ] getbest ( final RobotsTxt robots ) {
// check if we need to get entries from the file index
try {
fillDomainStacks ( ) ;
} catch ( final IOException e ) {
Log . logException ( e ) ;
}
// iterate over the domain stacks
final Iterator < Map . Entry < String , HandleSet > > i = this . domainStacks . entrySet ( ) . iterator ( ) ;
Map . Entry < String , HandleSet > entry ;
long smallestWaiting = Long . MAX_VALUE ;
byte [ ] besturlhash = null ;
String besthost = null ;
Map < String , byte [ ] > zeroWaitingCandidates = new HashMap < String , byte [ ] > ( ) ;
while ( i . hasNext ( ) & & zeroWaitingCandidates . size ( ) < 10 ) {
entry = i . next ( ) ;
// clean up empty entries
if ( entry . getValue ( ) . isEmpty ( ) ) {
i . remove ( ) ;
continue ;
synchronized ( this . zeroWaitingCandidates ) {
if ( this . zeroWaitingCandidates . size ( ) > 0 ) {
byte [ ] urlhash = pickFromZeroWaiting ( ) ;
if ( urlhash ! = null ) return urlhash ;
}
final byte [ ] n = entry . getValue ( ) . removeOne ( ) ;
if ( n = = null ) continue ;
long w ;
Row . Entry rowEntry ;
try {
rowEntry = this . urlFileIndex . get ( n , false ) ;
if ( rowEntry = = null ) {
continue ;
this . zeroWaitingCandidates . clear ( ) ;
// check if we need to get entries from the file index
try {
fillDomainStacks ( ) ;
} catch ( final IOException e ) {
Log . logException ( e ) ;
}
// iterate over the domain stacks
final Iterator < Map . Entry < String , HandleSet > > i = this . domainStacks . entrySet ( ) . iterator ( ) ;
Map . Entry < String , HandleSet > entry ;
long smallestWaiting = Long . MAX_VALUE ;
byte [ ] besturlhash = null ;
String besthost = null ;
OrderedScoreMap < Map . Entry < String , byte [ ] > > nextZeroCandidates = new OrderedScoreMap < Map . Entry < String , byte [ ] > > ( null ) ;
while ( i . hasNext ( ) & & nextZeroCandidates . size ( ) < 1000 ) {
entry = i . next ( ) ;
// clean up empty entries
if ( entry . getValue ( ) . isEmpty ( ) ) {
i . remove ( ) ;
continue ;
}
final byte [ ] urlhash = entry . getValue ( ) . getOne ( 0 ) ;
if ( urlhash = = null ) continue ;
long w ;
Row . Entry rowEntry ;
try {
rowEntry = this . urlFileIndex . get ( urlhash , false ) ;
if ( rowEntry = = null ) {
continue ;
}
Request crawlEntry = new Request ( rowEntry ) ;
w = Latency . waitingRemaining ( crawlEntry . url ( ) , robots , this . myAgentIDs , this . minimumLocalDelta , this . minimumGlobalDelta ) ;
//System.out.println("*** waitingRemaining = " + w + ", guessed = " + Latency.waitingRemainingGuessed(entry.getKey(), this.minimumLocalDelta, this.minimumGlobalDelta));
//System.out.println("*** explained: " + Latency.waitingRemainingExplain(crawlEntry.url(), robots, this.myAgentIDs, this.minimumLocalDelta, this.minimumGlobalDelta));
} catch ( IOException e1 ) {
w = Latency . waitingRemainingGuessed ( entry . getKey ( ) , this . minimumLocalDelta , this . minimumGlobalDelta ) ;
}
Request crawlEntry = new Request ( rowEntry ) ;
w = Latency . waitingRemaining ( crawlEntry . url ( ) , robots , this . myAgentIDs , this . minimumLocalDelta , this . minimumGlobalDelta ) ;
//System.out.println("*** waitingRemaining = " + w + ", guessed = " + Latency.waitingRemainingGuessed(entry.getKey(), this.minimumLocalDelta, this.minimumGlobalDelta));
//System.out.println("*** explained: " + Latency.waitingRemainingExplain(crawlEntry.url(), robots, this.myAgentIDs, this.minimumLocalDelta, this.minimumGlobalDelta));
} catch ( IOException e1 ) {
w = Latency . waitingRemainingGuessed ( entry . getKey ( ) , this . minimumLocalDelta , this . minimumGlobalDelta ) ;
}
if ( w < smallestWaiting ) {
smallestWaiting = w ;
besturlhash = n ;
besthost = entry . getKey ( ) ;
if ( w < = 0 ) {
zeroWaitingCandidates . put ( besthost , besturlhash ) ;
nextZeroCandidates . set ( new AbstractMap . SimpleEntry < String , byte [ ] > ( entry . getKey ( ) , urlhash ) , w = = Integer . MIN_VALUE ? 1000 /* get new domains a chance */ : entry . getValue ( ) . size ( ) ) ;
}
}
try {
entry . getValue ( ) . put ( n ) ; // put entry back, we are checking only
} catch ( SpaceExceededException e ) {
e . printStackTrace ( ) ;
}
}
if ( w < smallestWaiting | | ( w = = smallestWaiting & & this . random . nextBoolean ( ) ) ) {
smallestWaiting = w ;
besturlhash = urlhash ;
besthost = entry . getKey ( ) ;
}
}
Log . logInfo ( "Balancer" , "*** getbest: created new nextZeroCandidates-list, size = " + nextZeroCandidates . size ( ) + ", domainStacks.size = " + this . domainStacks . size ( ) ) ;
if ( besturlhash = = null ) {
Log . logInfo ( "Balancer" , "*** getbest: besturlhash == null" ) ;
return null ; // this should never happen
}
// best case would be, if we have some zeroWaitingCandidates,
// then we select that one with the largest stack
if ( nextZeroCandidates . isEmpty ( ) ) {
// bad luck: just take that one with least waiting
removeHashFromDomainStacks ( besthost , besturlhash ) ;
Log . logInfo ( "Balancer" , "*** getbest: no zero waiting candidates, besthost = " + besthost ) ;
return besturlhash ;
}
// now take some of the nextZeroCandidates and put the best into the zeroWaitingCandidates
int pick = nextZeroCandidates . size ( ) < = 10 ? nextZeroCandidates . size ( ) : Math . max ( 1 , nextZeroCandidates . size ( ) / 3 ) ;
Iterator < Map . Entry < String , byte [ ] > > k = nextZeroCandidates . keys ( false ) ;
while ( k . hasNext ( ) & & pick - - > 0 ) {
this . zeroWaitingCandidates . add ( k . next ( ) ) ;
}
Log . logInfo ( "Balancer" , "*** getbest: created new zeroWaitingCandidates-list, size = " + zeroWaitingCandidates . size ( ) + ", domainStacks.size = " + this . domainStacks . size ( ) ) ;
return pickFromZeroWaiting ( ) ;
}
}
if ( besturlhash = = null ) return null ; // worst case
// best case would be, if we have some zeroWaitingCandidates,
// then we select that one with the largest stack
if ( ! zeroWaitingCandidates . isEmpty ( ) ) {
int largestStack = - 1 ;
String largestStackHost = null ;
byte [ ] largestStackHash = null ;
for ( Map . Entry < String , byte [ ] > z : zeroWaitingCandidates . entrySet ( ) ) {
HandleSet hs = this . domainStacks . get ( z . getKey ( ) ) ;
if ( hs = = null | | hs . size ( ) < = largestStack ) continue ;
largestStack = hs . size ( ) ;
largestStackHost = z . getKey ( ) ;
largestStackHash = z . getValue ( ) ;
}
if ( largestStackHost ! = null & & largestStackHash ! = null ) {
removeHashFromDomainStacks ( largestStackHost , largestStackHash ) ;
//Log.logInfo("Balancer", "*** picked one from largest stack");
return largestStackHash ;
}
}
private byte [ ] pickFromZeroWaiting ( ) {
// by random we choose now either from the largest stack or from any of the other stacks
String host = null ;
byte [ ] hash = null ;
while ( this . zeroWaitingCandidates . size ( ) > 0 ) {
Map . Entry < String , byte [ ] > z = this . zeroWaitingCandidates . remove ( this . random . nextInt ( this . zeroWaitingCandidates . size ( ) ) ) ;
HandleSet hs = this . domainStacks . get ( z . getKey ( ) ) ;
if ( hs = = null ) continue ;
host = z . getKey ( ) ; if ( host = = null ) continue ;
hash = z . getValue ( ) ; if ( hash = = null ) continue ;
removeHashFromDomainStacks ( host , hash ) ;
Log . logInfo ( "Balancer" , "*** getbest: picked a random from the zero-waiting stack: " + host + ", zeroWaitingCandidates.size = " + this . zeroWaitingCandidates . size ( ) ) ;
return hash ;
}
// default case: just take that one with least waiting
removeHashFromDomainStacks ( besthost , besturlhash ) ;
return besturlhash ;
Log . logInfo ( "Balancer" , "*** getbest: picking from zero-waiting stack failed!" + " zeroWaitingCandidates.size = " + this . zeroWaitingCandidates . size ( ) ) ;
this . zeroWaitingCandidates . clear ( ) ;
return null ;
}
private void fillDomainStacks ( ) throws IOException {
if ( ! this . domainStacks . isEmpty ( ) & & System . currentTimeMillis ( ) - this . lastDomainStackFill < 60000L ) return ;
this . domainStacks . clear ( ) ;