@ -37,6 +37,7 @@ import net.yacy.cora.document.encoding.ASCII;
import net.yacy.cora.document.id.DigestURL ;
import net.yacy.cora.order.Base64Order ;
import net.yacy.cora.protocol.ClientIdentification ;
import net.yacy.cora.storage.HandleMap ;
import net.yacy.cora.storage.HandleSet ;
import net.yacy.cora.util.ConcurrentLog ;
import net.yacy.cora.util.SpaceExceededException ;
@ -45,6 +46,7 @@ import net.yacy.crawler.data.Latency;
import net.yacy.crawler.retrieval.Request ;
import net.yacy.crawler.robots.RobotsTxt ;
import net.yacy.kelondro.data.word.Word ;
import net.yacy.kelondro.index.RowHandleMap ;
import net.yacy.kelondro.index.RowHandleSet ;
/ * *
@ -58,19 +60,18 @@ import net.yacy.kelondro.index.RowHandleSet;
* /
public class HostBalancer implements Balancer {
public final static HandleMap depthCache = new RowHandleMap ( Word . commonHashLength , Word . commonHashOrder , 2 , 8 * 1024 * 1024 , "HostBalancer.DepthCache" ) ;
private final File hostsPath ;
private final boolean exceed134217727 ;
private final Map < String , HostQueue > queues ;
private final Set < String > roundRobinHostHashes ;
private HandleSet urlHashDoubleCheck ;
public HostBalancer (
final File hostsPath ,
final boolean exceed134217727 ) {
this . hostsPath = hostsPath ;
this . exceed134217727 = exceed134217727 ;
this . urlHashDoubleCheck = new RowHandleSet ( Word . commonHashLength , Word . commonHashOrder , 0 ) ;
// create a stack for newly entered entries
if ( ! ( hostsPath . exists ( ) ) ) hostsPath . mkdirs ( ) ; // make the path
@ -93,8 +94,8 @@ public class HostBalancer implements Balancer {
@Override
public synchronized void close ( ) {
if ( this . urlHashDoubleCheck ! = null ) {
this . urlHashDoubleCheck . clear ( ) ;
if ( depthCache ! = null ) {
depthCache . clear ( ) ;
}
for ( HostQueue queue : this . queues . values ( ) ) queue . close ( ) ;
this . queues . clear ( ) ;
@ -102,8 +103,8 @@ public class HostBalancer implements Balancer {
@Override
public void clear ( ) {
if ( this . urlHashDoubleCheck ! = null ) {
this . urlHashDoubleCheck . clear ( ) ;
if ( depthCache ! = null ) {
depthCache . clear ( ) ;
}
for ( HostQueue queue : this . queues . values ( ) ) queue . clear ( ) ;
this . queues . clear ( ) ;
@ -137,13 +138,13 @@ public class HostBalancer implements Balancer {
if ( hq ! = null ) c + = hq . removeAllByHostHashes ( hosthashes ) ;
}
// remove from cache
Iterator < byte [ ] > i = this . urlHashDoubleCheck . iterator ( ) ;
Iterator < Map . Entry < byte [ ] , Long > > i = depthCache . iterator ( ) ;
ArrayList < String > deleteHashes = new ArrayList < String > ( ) ;
while ( i . hasNext ( ) ) {
String h = ASCII . String ( i . next ( ) );
String h = ASCII . String ( i . next ( ) .getKey ( ) );
if ( hosthashes . contains ( h . substring ( 6 ) ) ) deleteHashes . add ( h ) ;
}
for ( String h : deleteHashes ) this . urlHashDoubleCheck . remove ( ASCII . getBytes ( h ) ) ;
for ( String h : deleteHashes ) depthCache . remove ( ASCII . getBytes ( h ) ) ;
return c ;
}
@ -151,7 +152,7 @@ public class HostBalancer implements Balancer {
public synchronized int remove ( final HandleSet urlHashes ) throws IOException {
Map < String , HandleSet > removeLists = new ConcurrentHashMap < String , HandleSet > ( ) ;
for ( byte [ ] urlhash : urlHashes ) {
this . urlHashDoubleCheck . remove ( urlhash ) ;
depthCache . remove ( urlhash ) ;
String hosthash = ASCII . String ( urlhash , 6 , 6 ) ;
HandleSet removeList = removeLists . get ( hosthash ) ;
if ( removeList = = null ) {
@ -170,7 +171,7 @@ public class HostBalancer implements Balancer {
@Override
public boolean has ( final byte [ ] urlhashb ) {
if ( this . urlHashDoubleCheck . has ( urlhashb ) ) return true ;
if ( depthCache . has ( urlhashb ) ) return true ;
String hosthash = ASCII . String ( urlhashb , 6 , 6 ) ;
HostQueue queue = this . queues . get ( hosthash ) ;
if ( queue = = null ) return false ;
@ -202,12 +203,13 @@ public class HostBalancer implements Balancer {
@Override
public synchronized String push ( final Request entry , CrawlProfile profile , final RobotsTxt robots ) throws IOException , SpaceExceededException {
if ( this . has ( entry . url ( ) . hash ( ) ) ) return "double occurrence" ;
this . urlHashDoubleCheck . put ( entry . url ( ) . has h( ) ) ;
depthCache . put ( entry . url ( ) . has h( ) , entry . dept h( ) ) ;
String hosthash = ASCII . String ( entry . url ( ) . hash ( ) , 6 , 6 ) ;
HostQueue queue = this . queues . get ( hosthash ) ;
if ( queue = = null ) {
queue = new HostQueue ( this . hostsPath , entry . url ( ) . getHost ( ) , entry . url ( ) . getPort ( ) , this . queues . size ( ) > 100 , this . exceed134217727 ) ;
this . queues . put ( hosthash , queue ) ;
robots . ensureExist ( entry . url ( ) , profile . getAgent ( ) , true ) ; // concurrently load all robots.txt
}
return queue . push ( entry , profile , robots ) ;
}
@ -228,34 +230,36 @@ public class HostBalancer implements Balancer {
public synchronized Request pop ( boolean delay , CrawlSwitchboard cs , RobotsTxt robots ) throws IOException {
tryagain : while ( true ) try {
if ( this . roundRobinHostHashes . size ( ) = = 0 ) {
// select all queues on the lowest crawldepth level; that means: first look for the lowest level
int lowestCrawldepth = Integer . MAX_VALUE ;
for ( HostQueue hq : this . queues . values ( ) ) {
int lsd = hq . getLowestStackDepth ( ) ;
if ( lsd < lowestCrawldepth ) lowestCrawldepth = lsd ;
}
// now add only such stacks which have the lowest level
for ( Map . Entry < String , HostQueue > entry : this . queues . entrySet ( ) ) {
if ( entry . getValue ( ) . getLowestStackDepth ( ) = = lowestCrawldepth ) this . roundRobinHostHashes . add ( entry . getKey ( ) ) ;
}
// emergency case if this fails
if ( this . roundRobinHostHashes . size ( ) = = 0 ) {
//assert this.queues.size() == 0; // thats the only case where that should happen
this . roundRobinHostHashes . addAll ( this . queues . keySet ( ) ) ;
}
// if there are stacks with less than 10 entries, remove all stacks with more than 10 entries
// this shall kick out small stacks to prevent that too many files are opened for very wide crawls
boolean smallStacksExist = false ;
smallsearch : for ( String s : this . roundRobinHostHashes ) {
HostQueue hq = this . queues . get ( s ) ;
if ( hq ! = null & & hq . size ( ) < = 10 ) { smallStacksExist = true ; break smallsearch ; }
}
if ( smallStacksExist ) {
Iterator < String > i = this . roundRobinHostHashes . iterator ( ) ;
while ( i . hasNext ( ) ) {
String s = i . next ( ) ;
// refresh the round-robin cache
this . roundRobinHostHashes . addAll ( this . queues . keySet ( ) ) ;
// quickly get rid of small stacks to reduce number of files:
if ( this . roundRobinHostHashes . size ( ) > 100 ) {
// if there are stacks with less than 10 entries, remove all stacks with more than 10 entries
// this shall kick out small stacks to prevent that too many files are opened for very wide crawls
boolean smallStacksExist = false ;
boolean singletonStacksExist = false ;
smallsearch : for ( String s : this . roundRobinHostHashes ) {
HostQueue hq = this . queues . get ( s ) ;
if ( hq ! = null & & hq . size ( ) > 10 ) { i . remove ( ) ; }
if ( hq ! = null ) {
int size = hq . size ( ) ;
if ( size = = 1 ) { singletonStacksExist = true ; break smallsearch ; }
if ( size < = 10 ) { smallStacksExist = true ; break smallsearch ; }
}
}
if ( singletonStacksExist ) {
Iterator < String > i = this . roundRobinHostHashes . iterator ( ) ;
while ( i . hasNext ( ) ) {
String s = i . next ( ) ;
HostQueue hq = this . queues . get ( s ) ;
if ( hq = = null | | hq . size ( ) ! = 1 ) { i . remove ( ) ; }
}
} else if ( smallStacksExist ) {
Iterator < String > i = this . roundRobinHostHashes . iterator ( ) ;
while ( i . hasNext ( ) ) {
String s = i . next ( ) ;
HostQueue hq = this . queues . get ( s ) ;
if ( hq = = null | | hq . size ( ) > 10 ) { i . remove ( ) ; }
}
}
}
}
@ -265,7 +269,7 @@ public class HostBalancer implements Balancer {
for ( String nextHH : this . roundRobinHostHashes ) {
HostQueue hq = this . queues . get ( nextHH ) ;
int delta = Latency . waitingRemainingGuessed ( hq . getHost ( ) , DigestURL . hosthash ( hq . getHost ( ) , hq . getPort ( ) ) , robots , ClientIdentification . yacyInternetCrawlerAgent ) ;
if ( delta < = 10 ) {
if ( delta < = 10 | | this . roundRobinHostHashes . size ( ) = = 1 ) {
this . roundRobinHostHashes . remove ( nextHH ) ;
Request request = hq = = null ? null : hq . pop ( delay , cs , robots ) ;
int size = hq = = null ? 0 : hq . size ( ) ;