@ -25,13 +25,12 @@ package de.anomic.crawler;
import java.io.File ;
import java.io.File ;
import java.io.IOException ;
import java.io.IOException ;
import java.util.ArrayList ;
import java.util.ArrayList ;
import java.util.Collections ;
import java.util.HashMap ;
import java.util.HashSet ;
import java.util.HashSet ;
import java.util.Iterator ;
import java.util.Iterator ;
import java.util.LinkedList ;
import java.util.LinkedList ;
import java.util.Map ;
import java.util.Map ;
import java.util.TreeMap ;
import java.util.TreeMap ;
import java.util.concurrent.ConcurrentHashMap ;
import de.anomic.kelondro.kelondroBase64Order ;
import de.anomic.kelondro.kelondroBase64Order ;
import de.anomic.kelondro.kelondroEcoTable ;
import de.anomic.kelondro.kelondroEcoTable ;
@ -41,6 +40,7 @@ import de.anomic.kelondro.kelondroStack;
import de.anomic.plasma.plasmaSwitchboard ;
import de.anomic.plasma.plasmaSwitchboard ;
import de.anomic.server.logging.serverLog ;
import de.anomic.server.logging.serverLog ;
import de.anomic.yacy.yacySeedDB ;
import de.anomic.yacy.yacySeedDB ;
import de.anomic.yacy.yacyURL ;
public class Balancer {
public class Balancer {
@ -48,26 +48,29 @@ public class Balancer {
private static final String indexSuffix = "9.db" ;
private static final String indexSuffix = "9.db" ;
private static final int EcoFSBufferSize = 200 ;
private static final int EcoFSBufferSize = 200 ;
// a shared domainAccess map for all balancers
// a shared domainAccess map for all balancers . the key is a domain-hash (6 bytes)
p rivate static final Map< String , domaccess > domainAccess = Collections . synchronizedMap ( new HashMap< String , domaccess > ( ) ) ;
p ublic static final ConcurrentHash Map< String , domaccess > domainAccess = new Concurrent HashMap< String , domaccess > ( ) ;
// definition of payload for fileStack
// definition of payload for fileStack
private static final kelondroRow stackrow = new kelondroRow ( "byte[] urlhash-" + yacySeedDB . commonHashLength , kelondroBase64Order . enhancedCoder , 0 ) ;
private static final kelondroRow stackrow = new kelondroRow ( "byte[] urlhash-" + yacySeedDB . commonHashLength , kelondroBase64Order . enhancedCoder , 0 ) ;
// class variables
// class variables
private final ArrayList < String > urlRAMStack ; // a list that is flushed first
private final ConcurrentHashMap < String , LinkedList < String > >
private kelondroStack urlFileStack ; // a file with url hashes
domainStacks ; // a map from domain name part to Lists with url hashs
kelondroIndex urlFileIndex ;
private final ArrayList < String > urlRAMStack ; // a list that is flushed first
private final HashMap < String , LinkedList < String > > domainStacks ; // a map from domain name part to Lists with url hashs
private kelondroStack urlFileStack ; // a file with url hashes
private final File cacheStacksPath ;
private kelondroIndex urlFileIndex ;
private final String stackname ;
private final File cacheStacksPath ;
private boolean top ; // to alternate between top and bottom of the file stack
private final String stackname ;
private final boolean fullram ;
private boolean top ; // to alternate between top and bottom of the file stack
private final boolean fullram ;
public static class domaccess {
public static class domaccess {
long time ;
public long time ;
int count ;
public int count ;
public domaccess ( ) {
public String host ;
public domaccess ( String host ) {
this . host = host ;
this . time = System . currentTimeMillis ( ) ;
this . time = System . currentTimeMillis ( ) ;
this . count = 0 ;
this . count = 0 ;
}
}
@ -75,11 +78,8 @@ public class Balancer {
this . time = System . currentTimeMillis ( ) ;
this . time = System . currentTimeMillis ( ) ;
this . count + + ;
this . count + + ;
}
}
public long time ( ) {
public long flux ( long range ) {
return this . time ;
return count > = 1000 ? range * Math . min ( 5000 , count ) / 1000 : range / ( 1000 - count ) ;
}
public int count ( ) {
return this . count ;
}
}
}
}
@ -88,7 +88,7 @@ public class Balancer {
this . stackname = stackname ;
this . stackname = stackname ;
final File stackFile = new File ( cachePath , stackname + stackSuffix ) ;
final File stackFile = new File ( cachePath , stackname + stackSuffix ) ;
this . urlFileStack = kelondroStack . open ( stackFile , stackrow ) ;
this . urlFileStack = kelondroStack . open ( stackFile , stackrow ) ;
this . domainStacks = new HashMap< String , LinkedList < String > > ( ) ;
this . domainStacks = new Concurrent HashMap< String , LinkedList < String > > ( ) ;
this . urlRAMStack = new ArrayList < String > ( ) ;
this . urlRAMStack = new ArrayList < String > ( ) ;
this . top = true ;
this . top = true ;
this . fullram = fullram ;
this . fullram = fullram ;
@ -428,7 +428,7 @@ public class Balancer {
domhash = hitlist . remove ( hitlist . lastKey ( ) ) ;
domhash = hitlist . remove ( hitlist . lastKey ( ) ) ;
if ( maxhash = = null ) maxhash = domhash ; // remember first entry
if ( maxhash = = null ) maxhash = domhash ; // remember first entry
delta = lastAccessDelta ( domhash ) ;
delta = lastAccessDelta ( domhash ) ;
if ( delta > minimumGlobalDelta ) {
if ( delta > ( ( yacyURL . isLocal ( domhash ) ) ? minimumLocalDelta : minimumGlobalDelta ) ) {
domlist = domainStacks . get ( domhash ) ;
domlist = domainStacks . get ( domhash ) ;
result = domlist . removeFirst ( ) ;
result = domlist . removeFirst ( ) ;
if ( domlist . size ( ) = = 0 ) domainStacks . remove ( domhash ) ;
if ( domlist . size ( ) = = 0 ) domainStacks . remove ( domhash ) ;
@ -458,7 +458,7 @@ public class Balancer {
// check if the time after retrieval of last hash from same
// check if the time after retrieval of last hash from same
// domain is not shorter than the minimumDelta
// domain is not shorter than the minimumDelta
long delta = lastAccessDelta ( nexthash ) ;
long delta = lastAccessDelta ( nexthash ) ;
if ( delta > minimumGlobalDelta ) {
if ( delta > ( ( yacyURL . isLocal ( nexthash ) ) ? minimumLocalDelta : minimumGlobalDelta ) ) {
// the entry is fine
// the entry is fine
result = new String ( ( top ) ? urlFileStack . pop ( ) . getColBytes ( 0 ) : urlFileStack . pot ( ) . getColBytes ( 0 ) ) ;
result = new String ( ( top ) ? urlFileStack . pop ( ) . getColBytes ( 0 ) : urlFileStack . pot ( ) . getColBytes ( 0 ) ) ;
} else {
} else {
@ -487,18 +487,12 @@ public class Balancer {
return null ;
return null ;
}
}
assert urlFileIndex . size ( ) + 1 = = s : "urlFileIndex.size() = " + urlFileIndex . size ( ) + ", s = " + s + ", result = " + result ;
assert urlFileIndex . size ( ) + 1 = = s : "urlFileIndex.size() = " + urlFileIndex . size ( ) + ", s = " + s + ", result = " + result ;
final CrawlEntry crawlEntry = new CrawlEntry ( rowEntry ) ;
final CrawlEntry crawlEntry = new CrawlEntry ( rowEntry ) ;
final long genericDelta = Math . min (
final long genericDelta = ensureDelta ( result . substring ( 6 ) , crawlEntry , minimumLocalDelta , minimumGlobalDelta ) ;
15000 ,
Math . max (
( crawlEntry . url ( ) . isLocal ( ) ) ? minimumLocalDelta : minimumGlobalDelta ,
plasmaSwitchboard . getSwitchboard ( ) . robots . crawlDelayMillis ( crawlEntry . url ( ) ) )
) ; // prevent that that robots file can stop our indexer completely
if ( delta < genericDelta ) {
if ( delta < genericDelta ) {
// force a busy waiting here
// force a busy waiting here
// in best case, this should never happen if the balancer works propertly
// in best case, this should never happen if the balancer works propertly
// this is only to protect against the worst case, where the crawler could
// this is only to protection against the worst case, where the crawler could
// behave in a DoS-manner
// behave in a DoS-manner
final long sleeptime = genericDelta - delta ;
final long sleeptime = genericDelta - delta ;
try { synchronized ( this ) { this . wait ( sleeptime ) ; } } catch ( final InterruptedException e ) { }
try { synchronized ( this ) { this . wait ( sleeptime ) ; } } catch ( final InterruptedException e ) { }
@ -506,17 +500,33 @@ public class Balancer {
// update statistical data
// update statistical data
domaccess lastAccess = domainAccess . get ( result . substring ( 6 ) ) ;
domaccess lastAccess = domainAccess . get ( result . substring ( 6 ) ) ;
if ( lastAccess = = null ) lastAccess = new domaccess ( ) ; else lastAccess . update ( ) ;
if ( lastAccess = = null ) {
domainAccess . put ( result . substring ( 6 ) , lastAccess ) ;
lastAccess = new domaccess ( crawlEntry . url ( ) . getHost ( ) ) ;
domainAccess . put ( result . substring ( 6 ) , lastAccess ) ;
} else {
lastAccess . update ( ) ;
}
return crawlEntry ;
return crawlEntry ;
}
}
private long ensureDelta ( String hosthash , CrawlEntry crawlEntry , final long minimumLocalDelta , final long minimumGlobalDelta ) {
long deltaBase = ( yacyURL . isLocal ( hosthash ) ) ? minimumLocalDelta : minimumGlobalDelta ;
if ( crawlEntry . url ( ) . isCGI ( ) ) deltaBase = deltaBase * 2 ; // mostly there is a database access in the background which creates a lot of unwanted IO on target site
domaccess lastAccess = domainAccess . get ( hosthash ) ;
return Math . min (
60000 ,
Math . max (
deltaBase + ( ( lastAccess = = null ) ? 0 : lastAccess . flux ( deltaBase ) ) ,
plasmaSwitchboard . getSwitchboard ( ) . robots . crawlDelayMillis ( crawlEntry . url ( ) ) )
) ; // prevent that that robots file can stop our indexer completely
}
private long lastAccessDelta ( final String hash ) {
private long lastAccessDelta ( final String hash ) {
assert hash ! = null ;
assert hash ! = null ;
final domaccess lastAccess = domainAccess . get ( ( hash . length ( ) > 6 ) ? hash . substring ( 6 ) : hash ) ;
final domaccess lastAccess = domainAccess . get ( ( hash . length ( ) > 6 ) ? hash . substring ( 6 ) : hash ) ;
if ( lastAccess = = null ) return Long . MAX_VALUE ; // never accessed
if ( lastAccess = = null ) return Long . MAX_VALUE ; // never accessed
return System . currentTimeMillis ( ) - lastAccess . time ( ) ;
return System . currentTimeMillis ( ) - lastAccess . time ;
}
}
public synchronized ArrayList < CrawlEntry > top ( int count ) throws IOException {
public synchronized ArrayList < CrawlEntry > top ( int count ) throws IOException {