@ -37,10 +37,8 @@ import de.anomic.kelondro.kelondroEcoTable;
import de.anomic.kelondro.kelondroIndex ;
import de.anomic.kelondro.kelondroRow ;
import de.anomic.kelondro.kelondroStack ;
import de.anomic.plasma.plasmaSwitchboard ;
import de.anomic.server.logging.serverLog ;
import de.anomic.yacy.yacySeedDB ;
import de.anomic.yacy.yacyURL ;
public class Balancer {
@ -48,9 +46,6 @@ public class Balancer {
private static final String indexSuffix = "9.db" ;
private static final int EcoFSBufferSize = 200 ;
// a shared domainAccess map for all balancers. the key is a domain-hash (6 bytes)
public static final ConcurrentHashMap < String , domaccess > domainAccess = new ConcurrentHashMap < String , domaccess > ( ) ;
// definition of payload for fileStack
private static final kelondroRow stackrow = new kelondroRow ( "byte[] urlhash-" + yacySeedDB . commonHashLength , kelondroBase64Order . enhancedCoder , 0 ) ;
@ -64,26 +59,11 @@ public class Balancer {
private final String stackname ;
private boolean top ; // to alternate between top and bottom of the file stack
private final boolean fullram ;
public static class domaccess {
public long time ;
public int count ;
public String host ;
public domaccess ( String host ) {
this . host = host ;
this . time = System . currentTimeMillis ( ) ;
this . count = 0 ;
}
public void update ( ) {
this . time = System . currentTimeMillis ( ) ;
this . count + + ;
}
public long flux ( long range ) {
return count > = 1000 ? range * Math . min ( 5000 , count ) / 1000 : range / ( 1000 - count ) ;
}
}
private long minimumLocalDelta ;
private long minimumGlobalDelta ;
public Balancer ( final File cachePath , final String stackname , final boolean fullram ) {
public Balancer ( final File cachePath , final String stackname , final boolean fullram ,
final long minimumLocalDelta , final long minimumGlobalDelta ) {
this . cacheStacksPath = cachePath ;
this . stackname = stackname ;
final File stackFile = new File ( cachePath , stackname + stackSuffix ) ;
@ -92,6 +72,8 @@ public class Balancer {
this . urlRAMStack = new ArrayList < String > ( ) ;
this . top = true ;
this . fullram = fullram ;
this . minimumLocalDelta = minimumLocalDelta ;
this . minimumGlobalDelta = minimumGlobalDelta ;
// create a stack for newly entered entries
if ( ! ( cachePath . exists ( ) ) ) cachePath . mkdir ( ) ; // make the path
@ -105,7 +87,7 @@ public class Balancer {
byte [ ] hash ;
while ( i . hasNext ( ) ) {
hash = i . next ( ) ;
pushHash ( new String ( hash ) ) ;
pushHash ToDomainStacks ( new String ( hash ) , true ) ;
}
} catch ( final IOException e ) {
e . printStackTrace ( ) ;
@ -113,8 +95,21 @@ public class Balancer {
}
}
public long getMinimumLocalDelta ( ) {
return this . minimumLocalDelta ;
}
public long getMinimumGlobalDelta ( ) {
return this . minimumGlobalDelta ;
}
public void setMinimumDelta ( final long minimumLocalDelta , final long minimumGlobalDelta ) {
this . minimumLocalDelta = minimumLocalDelta ;
this . minimumGlobalDelta = minimumGlobalDelta ;
}
public synchronized void close ( ) {
while ( domainStacksNotEmpty ( ) ) flushOnceDomStacks ( 0 , true ) ; // flush to ram, because the ram flush is optimized
while ( domainStacksNotEmpty ( ) ) flushOnceDomStacks ( 0 , true , false ); // flush to ram, because the ram flush is optimized
size ( ) ;
try { flushAllRamStack ( ) ; } catch ( final IOException e ) { }
if ( urlFileIndex ! = null ) {
@ -284,7 +279,13 @@ public class Balancer {
return sum ;
}
private void flushOnceDomStacks ( final int minimumleft , final boolean ram ) {
/ * *
* removes the head element of all domain stacks and moves the element in either the ram stack or the file stack
* @param minimumleft
* @param ram
* @param onlyReadyForAccess
* /
private void flushOnceDomStacks ( final int minimumleft , final boolean ram , final boolean onlyReadyForAccess ) {
// takes one entry from every domain stack and puts it on the ram or file stack
// the minimumleft value is a limit for the number of entries that should be left
if ( domainStacks . size ( ) = = 0 ) return ;
@ -296,6 +297,7 @@ public class Balancer {
entry = i . next ( ) ;
list = entry . getValue ( ) ;
if ( list . size ( ) > minimumleft ) {
if ( onlyReadyForAccess & & CrawlEntry . waitingRemainingGuessed ( list . getFirst ( ) , minimumLocalDelta , minimumGlobalDelta ) > 0 ) continue ;
if ( ram ) {
urlRAMStack . add ( list . removeFirst ( ) ) ;
} else try {
@ -319,6 +321,47 @@ public class Balancer {
urlFileStack . push ( urlFileStack . row ( ) . newEntry ( new byte [ ] [ ] { ( urlRAMStack . get ( urlRAMStack . size ( ) / 2 ) ) . getBytes ( ) } ) ) ;
}
private void shiftFileToDomStacks ( final int wantedsize ) {
int count = sizeDomainStacks ( ) - wantedsize ;
while ( ( urlFileStack ! = null ) & & ( count > 0 ) & & ( urlFileStack . size ( ) > 0 ) ) {
// flush some entries from disc to ram stack
try {
// one from the top:
kelondroRow . Entry t = urlFileStack . pop ( ) ;
if ( t = = null ) break ;
pushHashToDomainStacks ( new String ( t . getColBytes ( 0 ) ) , false ) ;
count - - ;
if ( urlFileStack . size ( ) = = 0 ) break ;
// one from the bottom:
t = urlFileStack . pot ( ) ;
if ( t = = null ) break ;
pushHashToDomainStacks ( new String ( t . getColBytes ( 0 ) ) , false ) ;
count - - ;
} catch ( final IOException e ) {
break ;
}
}
}
private void shiftFileToRAM ( final int wantedsize ) {
while ( ( urlFileStack ! = null ) & & ( urlRAMStack . size ( ) < = wantedsize ) & & ( urlFileStack . size ( ) > 0 ) ) {
// flush some entries from disc to ram stack
try {
// one from the top:
kelondroRow . Entry t = urlFileStack . pop ( ) ;
if ( t = = null ) break ;
urlRAMStack . add ( new String ( t . getColBytes ( 0 ) ) ) ;
if ( urlFileStack . size ( ) = = 0 ) break ;
// one from the bottom:
t = urlFileStack . pot ( ) ;
if ( t = = null ) break ;
urlRAMStack . add ( new String ( t . getColBytes ( 0 ) ) ) ;
} catch ( final IOException e ) {
break ;
}
}
}
public synchronized void push ( final CrawlEntry entry ) throws IOException {
assert entry ! = null ;
if ( urlFileIndex . has ( entry . url ( ) . hash ( ) . getBytes ( ) ) ) {
@ -330,10 +373,10 @@ public class Balancer {
urlFileIndex . put ( entry . toRow ( ) ) ;
// add the hash to a queue
pushHash ( entry . url ( ) . hash ( ) ) ;
pushHash ToDomainStacks ( entry . url ( ) . hash ( ) , true ) ;
}
private void pushHash ( final String ha sh) {
private void pushHash ToDomainStacks ( final String ha sh, boolean flu sh) {
// extend domain stack
final String dom = hash . substring ( 6 ) ;
LinkedList < String > domainList = domainStacks . get ( dom ) ;
@ -350,54 +393,91 @@ public class Balancer {
}
// check size of domainStacks and flush
if ( ( domainStacks . size ( ) > 100 ) | | ( sizeDomainStacks ( ) > 1000 ) ) {
flushOnceDomStacks ( 1 , urlRAMStack . size ( ) < 100 ); // when the ram stack is small, flush it there
if ( flush & & ( domainStacks . size ( ) > 100 ) | | ( sizeDomainStacks ( ) > 1000 ) ) {
flushOnceDomStacks ( 1 , urlRAMStack . size ( ) < 100 , true ); // when the ram stack is small, flush it there
}
}
public synchronized CrawlEntry pop ( final long minimumLocalDelta , final long minimumGlobalDelta , final long maximumAge ) throws IOException {
// returns a n url-hash from the stack and ensures minimum delta times
public synchronized CrawlEntry pop ( boolean delay ) throws IOException {
// returns a crawl entry from the stack and ensures minimum delta times
// we have 3 sources to choose from: the ramStack, the domainStacks and the fileStack
String result = null ; // the result
// 1st: check ramStack
if ( urlRAMStack . size ( ) > 0 ) {
result = urlRAMStack . remove ( 0 ) ;
//result = urlRAMStack.remove(0);
Iterator < String > i = urlRAMStack . iterator ( ) ;
String urlhash ;
long waitingtime , min = Long . MAX_VALUE ;
String besthash = null ;
while ( i . hasNext ( ) ) {
urlhash = i . next ( ) ;
waitingtime = CrawlEntry . waitingRemainingGuessed ( urlhash , minimumLocalDelta , minimumGlobalDelta ) ;
if ( waitingtime = = 0 ) {
// zero waiting is a good one
result = urlhash ;
i . remove ( ) ;
min = Long . MAX_VALUE ; // that causes that the if at the end of this loop is not used
besthash = null ;
break ;
}
if ( waitingtime < min ) {
min = waitingtime ;
besthash = urlhash ;
}
}
if ( min < = 500 & & besthash ! = null ) {
// find that entry that was best end remove it
i = urlRAMStack . iterator ( ) ;
while ( i . hasNext ( ) ) {
urlhash = i . next ( ) ;
if ( urlhash . equals ( besthash ) ) {
// zero waiting is a good one
result = urlhash ;
i . remove ( ) ;
break ;
}
}
}
}
// the next options use the domain stack. If this is not filled enough, they dont work at all
// so just fill them up with some stuff
if ( result = = null ) shiftFileToDomStacks ( 1000 ) ;
// 2nd-a: check domainStacks for latest arrivals
if ( ( result = = null ) & & ( domainStacks . size ( ) > 0 ) ) synchronized ( domainStacks ) {
// we select specific domains that have not been used for a long time
// i.e. 60 seconds. Latest arrivals that have not yet been crawled
// fit also in that scheme
// Latest arrivals that have not yet been crawled fit also in that scheme
final Iterator < Map . Entry < String , LinkedList < String > > > i = domainStacks . entrySet ( ) . iterator ( ) ;
Map . Entry < String , LinkedList < String > > entry ;
String domhash ;
long delta , maxdelta = 0 ;
String maxhash = null ;
long waitingtime, min = Long . MAX_VALUE ;
String best hash = null ;
LinkedList < String > domlist ;
while ( i . hasNext ( ) ) {
entry = i . next ( ) ;
domhash = entry . getKey ( ) ;
delta = lastAccessDelta ( domhash ) ;
if ( delta = = Integer . MAX_VALUE ) {
// a brand new domain - we take it
waitingtime = CrawlEntry . waitingRemainingGuessed ( domhash , minimumLocalDelta , minimumGlobalDelta ) ;
if ( waitingtime = = 0 ) {
// zero waiting is a good one
domlist = entry . getValue ( ) ;
result = domlist . removeFirst ( ) ;
if ( domlist . size ( ) = = 0 ) i . remove ( ) ;
min = Long . MAX_VALUE ; // that causes that the if at the end of this loop is not used
besthash = null ;
break ;
}
if ( delta > maxdelta ) {
maxdelta = delta ;
maxhash = domhash ;
if ( waitingtime < min ) {
m in = waitingtime ;
best hash = domhash ;
}
}
if ( maxdelta > maximumAge ) {
// success - we found an entry from a domain that has not been used for a long time
domlist = domainStacks . get ( maxhash ) ;
if ( min < = 500 & & besthash ! = null ) {
domlist = domainStacks . get ( besthash ) ;
result = domlist . removeFirst ( ) ;
if ( domlist . size ( ) = = 0 ) domainStacks . remove ( max hash) ;
if ( domlist . size ( ) = = 0 ) domainStacks . remove ( best hash) ;
}
}
@ -405,7 +485,7 @@ public class Balancer {
if ( ( result = = null ) & & ( domainStacks . size ( ) > 0 ) ) synchronized ( domainStacks ) {
// we order all domains by the number of entries per domain
// then we iterate through these domains in descending entry order
// and t hat that one, that has a delta > minimumDelta
// and t ake that one, that has a zero waiting time
final Iterator < Map . Entry < String , LinkedList < String > > > i = domainStacks . entrySet ( ) . iterator ( ) ;
Map . Entry < String , LinkedList < String > > entry ;
String domhash ;
@ -422,13 +502,13 @@ public class Balancer {
// now iterate in descending order and fetch that one,
// that is acceptable by the minimumDelta constraint
long delta ;
long waitingtime ;
String maxhash = null ;
while ( hitlist . size ( ) > 0 ) {
domhash = hitlist . remove ( hitlist . lastKey ( ) ) ;
if ( maxhash = = null ) maxhash = domhash ; // remember first entry
delta = lastAccessDelta ( domhash ) ;
if ( delta > ( ( yacyURL . isLocal ( domhash ) ) ? minimumLocalDelta : minimumGlobalDelta ) ) {
waitingtime = CrawlEntry . waitingRemainingGuessed ( domhash , minimumLocalDelta , minimumGlobalDelta ) ;
if ( waitingtime = = 0 ) {
domlist = domainStacks . get ( domhash ) ;
result = domlist . removeFirst ( ) ;
if ( domlist . size ( ) = = 0 ) domainStacks . remove ( domhash ) ;
@ -457,14 +537,13 @@ public class Balancer {
// check if the time after retrieval of last hash from same
// domain is not shorter than the minimumDelta
long delta = lastAccessDelta ( nexthash ) ;
if ( delta > ( ( yacyURL . isLocal ( nexthash ) ) ? minimumLocalDelta : minimumGlobalDelta ) ) {
long waitingtime = CrawlEntry . waitingRemainingGuessed ( nexthash , minimumLocalDelta , minimumGlobalDelta ) ;
if ( waitingtime = = 0 ) {
// the entry is fine
result = new String ( ( top ) ? urlFileStack . pop ( ) . getColBytes ( 0 ) : urlFileStack . pot ( ) . getColBytes ( 0 ) ) ;
} else {
// try other entry
result = new String ( ( top ) ? urlFileStack . pot ( ) . getColBytes ( 0 ) : urlFileStack . pop ( ) . getColBytes ( 0 ) ) ;
delta = lastAccessDelta ( result ) ;
}
}
top = ! top ; // alternate top/bottom
@ -477,83 +556,59 @@ public class Balancer {
}
// finally: check minimumDelta and if necessary force a sleep
final long delta = lastAccessDelta ( result ) ;
assert delta > = 0 : "delta = " + delta ;
final int s = urlFileIndex . size ( ) ;
final kelondroRow . Entry rowEntry = urlFileIndex . remove ( result . getBytes ( ) ) ;
assert ( rowEntry = = null ) | | ( urlFileIndex . size ( ) + 1 = = s ) : "urlFileIndex.size() = " + urlFileIndex . size ( ) + ", s = " + s + ", result = " + result ;
kelondroRow . Entry rowEntry = urlFileIndex . remove ( result . getBytes ( ) ) ;
if ( rowEntry = = null ) {
serverLog . logSevere ( "PLASMA BALANCER" , "get() found a valid urlhash, but failed to fetch the corresponding url entry - total size = " + size ( ) + ", fileStack.size() = " + urlFileStack . size ( ) + ", ramStack.size() = " + urlRAMStack . size ( ) + ", domainStacks.size() = " + domainStacks . size ( ) ) ;
return null ;
throw new IOException ( "get() found a valid urlhash, but failed to fetch the corresponding url entry - total size = " + size ( ) + ", fileStack.size() = " + urlFileStack . size ( ) + ", ramStack.size() = " + urlRAMStack . size ( ) + ", domainStacks.size() = " + domainStacks . size ( ) ) ;
}
assert urlFileIndex . size ( ) + 1 = = s : "urlFileIndex.size() = " + urlFileIndex . size ( ) + ", s = " + s + ", result = " + result ;
final CrawlEntry crawlEntry = new CrawlEntry ( rowEntry ) ;
final long genericDelta = ensureDelta ( result . substring ( 6 ) , crawlEntry , minimumLocalDelta , minimumGlobalDelta ) ;
if ( delta < genericDelta ) {
final CrawlEntry crawlEntry = new CrawlEntry ( rowEntry ) ;
long sleeptime = crawlEntry . waitingRemaining ( minimumLocalDelta , minimumGlobalDelta ) ;
if ( delay & & sleeptime > 0 ) {
// force a busy waiting here
// in best case, this should never happen if the balancer works propertly
// this is only to protection against the worst case, where the crawler could
// behave in a DoS-manner
final long sleeptime = genericDelta - delta ;
serverLog . logInfo ( "PLASMA BALANCER" , "forcing fetch delay of " + sleeptime + " millisecond for " + crawlEntry . url ( ) . getHost ( ) ) ;
try { synchronized ( this ) { this . wait ( sleeptime ) ; } } catch ( final InterruptedException e ) { }
}
// update statistical data
domaccess lastAccess = domainAccess . get ( result . substring ( 6 ) ) ;
if ( lastAccess = = null ) {
lastAccess = new domaccess ( crawlEntry . url ( ) . getHost ( ) ) ;
domainAccess . put ( result . substring ( 6 ) , lastAccess ) ;
} else {
lastAccess . update ( ) ;
}
crawlEntry . updateAccess ( ) ;
return crawlEntry ;
}
private long ensureDelta ( String hosthash , CrawlEntry crawlEntry , final long minimumLocalDelta , final long minimumGlobalDelta ) {
long deltaBase = ( yacyURL . isLocal ( hosthash ) ) ? minimumLocalDelta : minimumGlobalDelta ;
if ( crawlEntry . url ( ) . isCGI ( ) ) deltaBase = deltaBase * 2 ; // mostly there is a database access in the background which creates a lot of unwanted IO on target site
domaccess lastAccess = domainAccess . get ( hosthash ) ;
return Math . min (
60000 ,
Math . max (
deltaBase + ( ( lastAccess = = null ) ? 0 : lastAccess . flux ( deltaBase ) ) ,
plasmaSwitchboard . getSwitchboard ( ) . robots . crawlDelayMillis ( crawlEntry . url ( ) ) )
) ; // prevent that that robots file can stop our indexer completely
}
private long lastAccessDelta ( final String hash ) {
assert hash ! = null ;
final domaccess lastAccess = domainAccess . get ( ( hash . length ( ) > 6 ) ? hash . substring ( 6 ) : hash ) ;
if ( lastAccess = = null ) return Long . MAX_VALUE ; // never accessed
return System . currentTimeMillis ( ) - lastAccess . time ;
}
/ * *
* return top - elements from the crawl stack
* we do not produce here more entries than exist on the stack
* because otherwise the balancing does not work properly
* @param count
* @return
* @throws IOException
* /
public synchronized ArrayList < CrawlEntry > top ( int count ) throws IOException {
// if we need to flush anything, then flush the domain stack first,
// to avoid that new urls get hidden by old entries from the file stack
if ( urlRAMStack = = null ) return null ;
// ensure that the domain stacks are filled enough
shiftFileToDomStacks ( count ) ;
// flush from the domain stacks first until they are empty
if ( ( domainStacksNotEmpty ( ) ) & & ( urlRAMStack . size ( ) < = count ) ) {
flushOnceDomStacks ( 0 , true , true ) ;
}
while ( ( domainStacksNotEmpty ( ) ) & & ( urlRAMStack . size ( ) < = count ) ) {
// flush only that much as we need to display
flushOnceDomStacks ( 0 , true ) ;
}
while ( ( urlFileStack ! = null ) & & ( urlRAMStack . size ( ) < = count ) & & ( urlFileStack . size ( ) > 0 ) ) {
// flush some entries from disc to ram stack
try {
// one from the top:
kelondroRow . Entry t = urlFileStack . pop ( ) ;
if ( t = = null ) break ;
urlRAMStack . add ( new String ( t . getColBytes ( 0 ) ) ) ;
if ( urlFileStack . size ( ) = = 0 ) break ;
// one from the bottom:
t = urlFileStack . pot ( ) ;
if ( t = = null ) break ;
urlRAMStack . add ( new String ( t . getColBytes ( 0 ) ) ) ;
} catch ( final IOException e ) {
break ;
}
flushOnceDomStacks ( 0 , true , false ) ;
}
// if the ram is still not full enough, use the file stack
shiftFileToRAM ( count ) ;
// finally, construct a list using the urlRAMStack which was filled with this procedure
count = Math . min ( count , urlRAMStack . size ( ) ) ;
final ArrayList < CrawlEntry > list = new ArrayList < CrawlEntry > ( ) ;
for ( int i = 0 ; i < count ; i + + ) {