@ -34,6 +34,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue ;
import de.anomic.crawler.retrieval.Request ;
import de.anomic.http.client.Cache ;
import de.anomic.kelondro.index.Row ;
import de.anomic.kelondro.index.ObjectIndex ;
import de.anomic.kelondro.order.CloneableIterator ;
@ -107,7 +108,7 @@ public class Balancer {
}
}
public synchronized Request get ( final String urlhash ) throws IOException {
public Request get ( final String urlhash ) throws IOException {
assert urlhash ! = null ;
if ( urlFileIndex = = null ) return null ; // case occurs during shutdown
final Row . Entry entry = urlFileIndex . get ( urlhash . getBytes ( ) ) ;
@ -189,7 +190,7 @@ public class Balancer {
return removedCounter ;
}
public synchronized boolean has ( final String urlhash ) {
public boolean has ( final String urlhash ) {
return urlFileIndex . has ( urlhash . getBytes ( ) ) ;
}
@ -305,17 +306,29 @@ public class Balancer {
long sleeptime = 0 ;
Request crawlEntry = null ;
synchronized ( this ) {
String failhash = null ;
while ( this . urlFileIndex . size ( ) > 0 ) {
// first simply take one of the entries in the top list, that should be one without any delay
String result = nextFromDelayed ( ) ;
if ( result = = null & & this . top . size ( ) > 0 ) result = top . remove ( ) ;
String nexthash = nextFromDelayed ( ) ;
//System.out.println("*** nextFromDelayed=" + nexthash);
if ( nexthash = = null & & this . top . size ( ) > 0 ) {
nexthash = top . remove ( ) ;
//System.out.println("*** top.remove()=" + nexthash);
}
// check minimumDelta and if necessary force a sleep
//final int s = urlFileIndex.size();
Row . Entry rowEntry = ( result = = null ) ? null : urlFileIndex . remove ( result . getBytes ( ) ) ;
Row . Entry rowEntry = ( nexthash = = null ) ? null : urlFileIndex . remove ( nexthash . getBytes ( ) ) ;
if ( rowEntry = = null ) {
//System.out.println("*** rowEntry=null, nexthash=" + nexthash);
rowEntry = urlFileIndex . removeOne ( ) ;
result = ( rowEntry = = null ) ? null : new String ( rowEntry . getPrimaryKeyBytes ( ) ) ;
if ( rowEntry = = null ) {
nexthash = null ;
} else {
nexthash = new String ( rowEntry . getPrimaryKeyBytes ( ) ) ;
//System.out.println("*** rowEntry.getPrimaryKeyBytes()=" + nexthash);
}
}
if ( rowEntry = = null ) {
Log . logWarning ( "Balancer" , "removeOne() failed - size = " + this . size ( ) ) ;
@ -334,18 +347,28 @@ public class Balancer {
return null ;
}
// depending on the caching policy we need sleep time to avoid DoS-like situations
sleeptime = ( profileEntry . cacheStrategy ( ) = = CrawlProfile . CACHE_STRATEGY_CACHEONLY ) ? 0 : Latency . waitingRemaining ( crawlEntry . url ( ) , minimumLocalDelta , minimumGlobalDelta ) ; // this uses the robots.txt database and may cause a loading of robots.txt from the server
sleeptime = (
profileEntry . cacheStrategy ( ) = = CrawlProfile . CACHE_STRATEGY_CACHEONLY | |
( profileEntry . cacheStrategy ( ) = = CrawlProfile . CACHE_STRATEGY_IFEXIST & & Cache . has ( crawlEntry . url ( ) ) )
) ? 0 : Latency . waitingRemaining ( crawlEntry . url ( ) , minimumLocalDelta , minimumGlobalDelta ) ; // this uses the robots.txt database and may cause a loading of robots.txt from the server
assert nexthash . equals ( new String ( rowEntry . getPrimaryKeyBytes ( ) ) ) : "result = " + nexthash + ", rowEntry.getPrimaryKeyBytes() = " + new String ( rowEntry . getPrimaryKeyBytes ( ) ) ;
assert nexthash . equals ( crawlEntry . url ( ) . hash ( ) ) : "result = " + nexthash + ", crawlEntry.url().hash() = " + crawlEntry . url ( ) . hash ( ) ;
assert result . equals ( new String ( rowEntry . getPrimaryKeyBytes ( ) ) ) : "result = " + result + ", rowEntry.getPrimaryKeyBytes() = " + new String ( rowEntry . getPrimaryKeyBytes ( ) ) ;
assert result . equals ( crawlEntry . url ( ) . hash ( ) ) : "result = " + result + ", crawlEntry.url().hash() = " + crawlEntry . url ( ) . hash ( ) ;
if ( this . domainStacks . size ( ) < = 1 ) break ;
if ( failhash ! = null & & failhash . equals ( nexthash ) ) break ; // prevent endless loops
if ( delay & & sleeptime > 0 ) {
//System.out.println("*** putback: nexthash=" + nexthash + ", failhash="+failhash);
// put that thing back to omit a delay here
this . delayed . put ( new Long ( System . currentTimeMillis ( ) + sleeptime + 1 ) , result ) ;
if ( ! delayed . values ( ) . contains ( nexthash ) ) {
//System.out.println("*** delayed +=" + nexthash);
this . delayed . put ( new Long ( System . currentTimeMillis ( ) + sleeptime + 1 ) , nexthash ) ;
}
this . urlFileIndex . put ( rowEntry ) ;
this . domainStacks . remove ( result . substring ( 6 ) ) ;
continue ;
this . domainStacks . remove ( nexthash . substring ( 6 ) ) ;
failhash = nexthash ;
continue ;
}
break ;
}