@ -29,71 +29,46 @@ import java.util.HashSet;
import java.util.Iterator ;
import java.util.LinkedList ;
import java.util.Map ;
import java.util.TreeMap ;
import java.util.concurrent.ConcurrentHashMap ;
import java.util.concurrent.ConcurrentLinkedQueue ;
import de.anomic.kelondro.index.Row ;
import de.anomic.kelondro.index.ObjectIndex ;
import de.anomic.kelondro.order. Base64Orde r;
import de.anomic.kelondro.order. CloneableIterato r;
import de.anomic.kelondro.table.EcoTable ;
import de.anomic.kelondro.table.Stack ;
import de.anomic.kelondro.util.Log ;
import de.anomic.yacy.yacySeedDB ;
public class Balancer {
private static final String stackSuffix = "9.stack" ;
private static final String indexSuffix = "9.db" ;
private static final int EcoFSBufferSize = 200 ;
// definition of payload for fileStack
private static final Row stackrow = new Row ( "byte[] urlhash-" + yacySeedDB . commonHashLength , Base64Order . enhancedCoder ) ;
// class variables
private final ConcurrentHashMap < String , LinkedList < String > >
domainStacks ; // a map from domain name part to Lists with url hashs
private final ArrayList < String > urlRAMStack ; // a list that is flushed first
private Stack urlFileStack ; // a file with url hashes
private ObjectIndex urlFileIndex ;
private final File cacheStacksPath ;
private final String stackname ;
private boolean top ; // to alternate between top and bottom of the file stack
private long minimumLocalDelta ;
private long minimumGlobalDelta ;
private long lastPrepare ;
domainStacks ; // a map from domain name part to Lists with url hashs
private ConcurrentLinkedQueue < String >
top ;
private ObjectIndex urlFileIndex ;
private final File cacheStacksPath ;
private long minimumLocalDelta ;
private long minimumGlobalDelta ;
private int profileErrors ;
public Balancer ( final File cachePath , final String stackname , final boolean fullram ,
final long minimumLocalDelta , final long minimumGlobalDelta ) {
this . cacheStacksPath = cachePath ;
this . stackname = stackname ;
final File stackFile = new File ( cachePath , stackname + stackSuffix ) ;
this . urlFileStack = Stack . open ( stackFile , stackrow ) ;
this . domainStacks = new ConcurrentHashMap < String , LinkedList < String > > ( ) ;
this . urlRAMStack = new ArrayList < String > ( ) ;
this . top = true ;
this . top = new ConcurrentLinkedQueue < String > ( ) ;
this . minimumLocalDelta = minimumLocalDelta ;
this . minimumGlobalDelta = minimumGlobalDelta ;
this . lastPrepare = System . currentTimeMillis ( ) ;
// create a stack for newly entered entries
if ( ! ( cachePath . exists ( ) ) ) cachePath . mkdir ( ) ; // make the path
cacheStacksPath . mkdirs ( ) ;
urlFileIndex = new EcoTable ( new File ( cacheStacksPath , stackname + indexSuffix ) , CrawlEntry . rowdef , ( fullram ) ? EcoTable . tailCacheUsageAuto : EcoTable . tailCacheDenyUsage , EcoFSBufferSize , 0 ) ;
if ( urlFileStack . size ( ) ! = urlFileIndex . size ( ) | | ( urlFileIndex . size ( ) < 10000 & & urlFileIndex . size ( ) > 0 ) ) {
// fix the file stack
Log . logInfo ( "Balancer" , "re-creating the " + stackname + " balancer stack, size = " + urlFileIndex . size ( ) + ( ( urlFileStack . size ( ) = = urlFileIndex . size ( ) ) ? "" : " (the old stack size was wrong)" ) ) ;
urlFileStack . clear ( ) ;
try {
final Iterator < byte [ ] > i = urlFileIndex . keys ( true , null ) ;
byte [ ] hash ;
while ( i ! = null & & i . hasNext ( ) ) {
hash = i . next ( ) ;
pushHashToDomainStacks ( new String ( hash ) , true ) ;
}
} catch ( final IOException e ) {
e . printStackTrace ( ) ;
}
}
File f = new File ( cacheStacksPath , stackname + indexSuffix ) ;
urlFileIndex = new EcoTable ( f , CrawlEntry . rowdef , ( fullram ) ? EcoTable . tailCacheUsageAuto : EcoTable . tailCacheDenyUsage , EcoFSBufferSize , 0 ) ;
profileErrors = 0 ;
Log . logInfo ( "Balancer" , "opened balancer file with " + urlFileIndex . size ( ) + " entries from " + f . toString ( ) ) ;
}
public long getMinimumLocalDelta ( ) {
@ -110,35 +85,21 @@ public class Balancer {
}
public synchronized void close ( ) {
while ( domainStacksNotEmpty ( ) ) flushOnceDomStacks ( true , false , Integer . MAX_VALUE ) ; // flush to ram, because the ram flush is optimized
size ( ) ;
try { flushAllRamStack ( ) ; } catch ( final IOException e ) { }
if ( urlFileIndex ! = null ) {
urlFileIndex . close ( ) ;
urlFileIndex = null ;
}
if ( urlFileStack ! = null ) {
urlFileStack . close ( ) ;
urlFileStack = null ;
}
}
protected void finalize ( ) {
if ( urlFileStack ! = null ) {
Log . logWarning ( "Balancer" , "crawl stack " + stackname + " closed by finalizer" ) ;
close ( ) ;
}
}
public synchronized void clear ( ) {
Log . logInfo ( "Balancer" , "cleaing balancer with " + urlFileIndex . size ( ) + " entries from " + urlFileIndex . filename ( ) ) ;
try {
urlFileIndex . clear ( ) ;
} catch ( IOException e ) {
e . printStackTrace ( ) ;
}
urlFileStack . clear ( ) ;
domainStacks . clear ( ) ;
urlRAMStack . clear ( ) ;
top . clear ( ) ;
}
public synchronized CrawlEntry get ( final String urlhash ) throws IOException {
@ -189,38 +150,28 @@ public class Balancer {
}
if ( removedCounter = = 0 ) return 0 ;
assert urlFileIndex . size ( ) + removedCounter = = s : "urlFileIndex.size() = " + urlFileIndex . size ( ) + ", s = " + s ;
// now delete these hashes also from the queues
// iterate through the RAM stack
Iterator < String > i = urlRAMStack . iterator ( ) ;
String h ;
while ( i . hasNext ( ) ) {
h = i . next ( ) ;
if ( urlHashes . contains ( h ) ) i . remove ( ) ;
}
// iterate through the file stack
// in general this is a bad idea. But this can only be avoided by avoidance of this method
final Iterator < Row . Entry > j = urlFileStack . stackIterator ( true ) ;
while ( j . hasNext ( ) ) {
h = new String ( j . next ( ) . getColBytes ( 0 ) ) ;
if ( urlHashes . contains ( h ) ) j . remove ( ) ;
}
// iterate through the domain stacks
final Iterator < Map . Entry < String , LinkedList < String > > > k = domainStacks . entrySet ( ) . iterator ( ) ;
Map . Entry < String , LinkedList < String > > se ;
LinkedList < String > stack ;
while ( k . hasNext ( ) ) {
se = k . next ( ) ;
stack = se . getValue ( ) ;
i = stack . iterator ( ) ;
while ( i . hasNext ( ) ) {
if ( urlHashes . contains ( i . next ( ) ) ) i . remove ( ) ;
}
if ( stack . size ( ) = = 0 ) k . remove ( ) ;
}
// iterate through the top list
Iterator < String > j = top . iterator ( ) ;
String urlhash ;
while ( j . hasNext ( ) ) {
urlhash = j . next ( ) ;
if ( urlHashes . contains ( urlhash ) ) j . remove ( ) ;
}
// iterate through the domain stacks
final Iterator < Map . Entry < String , LinkedList < String > > > k = domainStacks . entrySet ( ) . iterator ( ) ;
Map . Entry < String , LinkedList < String > > se ;
LinkedList < String > stack ;
while ( k . hasNext ( ) ) {
se = k . next ( ) ;
stack = se . getValue ( ) ;
Iterator < String > i = stack . iterator ( ) ;
while ( i . hasNext ( ) ) {
if ( urlHashes . contains ( i . next ( ) ) ) i . remove ( ) ;
}
if ( stack . size ( ) = = 0 ) k . remove ( ) ;
}
return removedCounter ;
}
@ -232,21 +183,11 @@ public class Balancer {
public synchronized boolean notEmpty ( ) {
// alternative method to the property size() > 0
// this is better because it may avoid synchronized access to domain stack summarization
return urlRAMStack. size ( ) > 0 | | urlFileStack . size ( ) > 0 | | domainStacksNotEmpty( ) ;
return domainStacksNotEmpty( ) ;
}
public synchronized int size ( ) {
final int componentsize = urlFileIndex . size ( ) ;
/ *
assert componentsize = = urlFileStack . size ( ) + urlRAMStack . size ( ) + sizeDomainStacks ( ) :
"size wrong in " + stackname +
" - urlFileIndex = " + urlFileIndex . size ( ) +
", componentsize = " + urlFileStack . size ( ) + urlRAMStack . size ( ) + sizeDomainStacks ( ) +
" = (urlFileStack = " + urlFileStack . size ( ) +
", urlRAMStack = " + urlRAMStack . size ( ) +
", sizeDomainStacks = " + sizeDomainStacks ( ) + ")" ;
* /
return componentsize ;
return urlFileIndex . size ( ) ;
}
private boolean domainStacksNotEmpty ( ) {
@ -260,102 +201,10 @@ public class Balancer {
return false ;
}
private int sizeDomainStacks ( ) {
if ( domainStacks = = null ) return 0 ;
int sum = 0 ;
//synchronized (domainStacks) {
final Iterator < LinkedList < String > > i = domainStacks . values ( ) . iterator ( ) ;
while ( i . hasNext ( ) ) sum + = i . next ( ) . size ( ) ;
//}
return sum ;
}
/ * *
* removes the head element of all domain stacks and moves the element in either the ram stack or the file stack
* @param minimumleft
* @param ram
* @param onlyReadyForAccess
* /
private void flushOnceDomStacks ( final boolean ram , final boolean onlyReadyForAccess , int max ) {
// takes one entry from every domain stack and puts it on the ram or file stack
// the minimumleft value is a limit for the number of entries that should be left
if ( domainStacks . size ( ) = = 0 ) return ;
synchronized ( domainStacks ) {
final Iterator < Map . Entry < String , LinkedList < String > > > i = domainStacks . entrySet ( ) . iterator ( ) ;
Map . Entry < String , LinkedList < String > > entry ;
LinkedList < String > list ;
int c = 0 ;
while ( i . hasNext ( ) & & c < max ) {
entry = i . next ( ) ;
list = entry . getValue ( ) ;
if ( onlyReadyForAccess & & Latency . waitingRemainingGuessed ( list . getFirst ( ) , minimumLocalDelta , minimumGlobalDelta ) > 0 ) continue ;
if ( ram ) {
urlRAMStack . add ( list . removeFirst ( ) ) ;
} else try {
urlFileStack . push ( urlFileStack . row ( ) . newEntry ( new byte [ ] [ ] { ( list . removeFirst ( ) ) . getBytes ( ) } ) ) ;
} catch ( final IOException e ) {
e . printStackTrace ( ) ;
}
if ( list . size ( ) = = 0 ) i . remove ( ) ;
c + + ;
}
}
}
private void flushAllRamStack ( ) throws IOException {
// this flushes only the ramStack to the fileStack, but does not flush the domainStacks
for ( int i = 0 ; i < urlRAMStack . size ( ) / 2 ; i + + ) {
urlFileStack . push ( urlFileStack . row ( ) . newEntry ( new byte [ ] [ ] { ( urlRAMStack . get ( i ) ) . getBytes ( ) } ) ) ;
urlFileStack . push ( urlFileStack . row ( ) . newEntry ( new byte [ ] [ ] { ( urlRAMStack . get ( urlRAMStack . size ( ) - i - 1 ) ) . getBytes ( ) } ) ) ;
}
if ( urlRAMStack . size ( ) % 2 ! = 0 )
urlFileStack . push ( urlFileStack . row ( ) . newEntry ( new byte [ ] [ ] { ( urlRAMStack . get ( urlRAMStack . size ( ) / 2 ) ) . getBytes ( ) } ) ) ;
}
private void shiftFileToDomStacks ( final int wantedsize ) {
int count = sizeDomainStacks ( ) - wantedsize ;
while ( ( urlFileStack ! = null ) & & ( count > 0 ) & & ( urlFileStack . size ( ) > 0 ) ) {
// flush some entries from disc to ram stack
try {
// one from the top:
Row . Entry t = urlFileStack . pop ( ) ;
if ( t = = null ) break ;
pushHashToDomainStacks ( new String ( t . getColBytes ( 0 ) ) , false ) ;
count - - ;
if ( urlFileStack . size ( ) = = 0 ) break ;
// one from the bottom:
t = urlFileStack . pot ( ) ;
if ( t = = null ) break ;
pushHashToDomainStacks ( new String ( t . getColBytes ( 0 ) ) , false ) ;
count - - ;
} catch ( final IOException e ) {
break ;
}
}
}
private void shiftFileToRAM ( final int wantedsize ) {
while ( ( urlFileStack ! = null ) & & ( urlRAMStack . size ( ) < = wantedsize ) & & ( urlFileStack . size ( ) > 0 ) ) {
// flush some entries from disc to ram stack
try {
// one from the top:
Row . Entry t = urlFileStack . pop ( ) ;
if ( t = = null ) break ;
urlRAMStack . add ( new String ( t . getColBytes ( 0 ) ) ) ;
if ( urlFileStack . size ( ) = = 0 ) break ;
// one from the bottom:
t = urlFileStack . pot ( ) ;
if ( t = = null ) break ;
urlRAMStack . add ( new String ( t . getColBytes ( 0 ) ) ) ;
} catch ( final IOException e ) {
break ;
}
}
}
public synchronized void push ( final CrawlEntry entry ) throws IOException {
assert entry ! = null ;
if ( urlFileIndex . has ( entry . url ( ) . hash ( ) . getBytes ( ) ) ) {
String hash = entry . url ( ) . hash ( ) ;
if ( urlFileIndex . has ( hash . getBytes ( ) ) ) {
//Log.logWarning("BALANCER", "double-check has failed for urlhash " + entry.url().hash() + " in " + stackname + " - fixed");
return ;
}
@ -363,13 +212,14 @@ public class Balancer {
// add to index
int s = urlFileIndex . size ( ) ;
urlFileIndex . put ( entry . toRow ( ) ) ;
assert s < urlFileIndex . size ( ) ;
assert s < urlFileIndex . size ( ) : "hash = " + hash ;
assert urlFileIndex . has ( hash . getBytes ( ) ) : "hash = " + hash ;
// add the hash to a queue
pushHashToDomainStacks ( entry . url ( ) . hash ( ) , true ) ;
pushHashToDomainStacks ( entry . url ( ) . hash ( ) , 10 ) ;
}
private void pushHashToDomainStacks ( final String hash , boolean flush ) {
private void pushHashToDomainStacks ( final String hash , int maxstacksize ) {
// extend domain stack
final String dom = hash . substring ( 6 ) ;
LinkedList < String > domainList = domainStacks . get ( dom ) ;
@ -382,12 +232,21 @@ public class Balancer {
}
} else {
// extend existent domain list
domainList . addLast ( hash ) ;
if ( domainList . size ( ) < maxstacksize ) domainList . addLast ( hash ) ;
}
// check size of domainStacks and flush
if ( flush & & ( domainStacks . size ( ) > 100 ) | | ( sizeDomainStacks ( ) > 1000 ) ) {
flushOnceDomStacks ( urlRAMStack . size ( ) < 100 , true , 100 ) ; // when the ram stack is small, flush it there
}
private void removeHashFromDomainStacks ( final String hash ) {
// extend domain stack
final String dom = hash . substring ( 6 ) ;
LinkedList < String > domainList = domainStacks . get ( dom ) ;
if ( domainList = = null ) return ;
Iterator < String > i = domainList . iterator ( ) ;
while ( i . hasNext ( ) ) {
if ( i . next ( ) . equals ( hash ) ) {
i . remove ( ) ;
return ;
}
}
}
@ -404,178 +263,42 @@ public class Balancer {
* /
public synchronized CrawlEntry pop ( boolean delay , CrawlProfile profile ) throws IOException {
// returns a crawl entry from the stack and ensures minimum delta times
// we have 3 sources to choose from: the ramStack, the domainStacks and the fileStack
filltop ( delay , 600000 , false ) ;
filltop ( delay , 60000 , false ) ;
filltop ( delay , 10000 , false ) ;
filltop ( delay , 6000 , false ) ;
filltop ( delay , 3000 , false ) ;
filltop ( delay , 1000 , false ) ;
filltop ( delay , 0 , true ) ;
String result = null ; // the result
// 1st: check ramStack
if ( urlRAMStack . size ( ) > 0 ) {
//result = urlRAMStack.remove(0);
Iterator < String > i = urlRAMStack . iterator ( ) ;
String urlhash ;
long waitingtime , min = Long . MAX_VALUE ;
String besthash = null ;
while ( i . hasNext ( ) ) {
urlhash = i . next ( ) ;
waitingtime = Latency . waitingRemainingGuessed ( urlhash , minimumLocalDelta , minimumGlobalDelta ) ;
if ( waitingtime = = 0 ) {
// zero waiting is a good one
result = urlhash ;
i . remove ( ) ;
min = Long . MAX_VALUE ; // that causes that the if at the end of this loop is not used
besthash = null ;
break ;
}
if ( waitingtime < min ) {
min = waitingtime ;
besthash = urlhash ;
}
}
if ( min < = 500 & & besthash ! = null ) {
// find that entry that was best end remove it
i = urlRAMStack . iterator ( ) ;
while ( i . hasNext ( ) ) {
urlhash = i . next ( ) ;
if ( urlhash . equals ( besthash ) ) {
// zero waiting is a good one
result = urlhash ;
i . remove ( ) ;
break ;
}
}
}
}
// the next options use the domain stack. If this is not filled enough, they dont work at all
// so just fill them up with some stuff
if ( result = = null ) shiftFileToDomStacks ( 1000 ) ;
// 2nd-b: check domainStacks for best match between stack size and retrieval time
String maxhash = null ;
if ( ( result = = null ) & & ( domainStacks . size ( ) > 0 ) ) synchronized ( domainStacks ) {
// we order all domains by the number of entries per domain
// then we iterate through these domains in descending entry order
// and take that one, that has a zero waiting time
final Iterator < Map . Entry < String , LinkedList < String > > > i = domainStacks . entrySet ( ) . iterator ( ) ;
Map . Entry < String , LinkedList < String > > entry ;
String domhash ;
LinkedList < String > domlist ;
final TreeMap < Integer , String > hitlist = new TreeMap < Integer , String > ( ) ;
int count = 0 ;
// first collect information about sizes of the domain lists
while ( i . hasNext ( ) ) {
entry = i . next ( ) ;
domhash = entry . getKey ( ) ;
domlist = entry . getValue ( ) ;
hitlist . put ( Integer . valueOf ( domlist . size ( ) * 100 + count + + ) , domhash ) ;
}
// now iterate in descending order and fetch that one,
// that is acceptable by the minimumDelta constraint
long waitingtime ;
while ( hitlist . size ( ) > 0 ) {
domhash = hitlist . remove ( hitlist . lastKey ( ) ) ;
if ( maxhash = = null ) maxhash = domhash ; // remember first entry
waitingtime = Latency . waitingRemainingGuessed ( domhash , minimumLocalDelta , minimumGlobalDelta ) ;
if ( waitingtime < 100 ) {
domlist = domainStacks . get ( domhash ) ;
result = domlist . removeFirst ( ) ;
if ( domlist . size ( ) = = 0 ) domainStacks . remove ( domhash ) ;
break ;
}
}
}
// 2nd-a: check domainStacks for latest arrivals
if ( ( result = = null ) & & ( domainStacks . size ( ) > 0 ) ) synchronized ( domainStacks ) {
// we select specific domains that have not been used for a long time
// Latest arrivals that have not yet been crawled fit also in that scheme
final Iterator < Map . Entry < String , LinkedList < String > > > i = domainStacks . entrySet ( ) . iterator ( ) ;
Map . Entry < String , LinkedList < String > > entry ;
String domhash ;
long waitingtime , min = Long . MAX_VALUE ;
String besthash = null ;
LinkedList < String > domlist ;
while ( i . hasNext ( ) ) {
entry = i . next ( ) ;
domhash = entry . getKey ( ) ;
waitingtime = Latency . waitingRemainingGuessed ( domhash , minimumLocalDelta , minimumGlobalDelta ) ;
if ( waitingtime = = 0 ) {
// zero waiting is a good one
domlist = entry . getValue ( ) ;
result = domlist . removeFirst ( ) ;
if ( domlist . size ( ) = = 0 ) i . remove ( ) ;
min = Long . MAX_VALUE ; // that causes that the if at the end of this loop is not used
besthash = null ;
break ;
}
if ( waitingtime < min ) {
min = waitingtime ;
besthash = domhash ;
}
}
if ( min < = 500 & & besthash ! = null ) {
domlist = domainStacks . get ( besthash ) ;
result = domlist . removeFirst ( ) ;
if ( domlist . size ( ) = = 0 ) domainStacks . remove ( besthash ) ;
}
}
// 2nd-c: if we did yet not choose any entry, we simply take that one with the most entries
if ( ( result = = null ) & & ( maxhash ! = null ) ) {
LinkedList < String > domlist = domainStacks . get ( maxhash ) ;
if ( domlist ! = null ) {
result = domlist . removeFirst ( ) ;
if ( domlist . size ( ) = = 0 ) domainStacks . remove ( maxhash ) ;
}
}
// 3rd: take entry from file
if ( ( result = = null ) & & ( urlFileStack . size ( ) > 0 ) ) {
Row . Entry nextentry = ( top ) ? urlFileStack . top ( ) : urlFileStack . bot ( ) ;
if ( nextentry = = null ) nextentry = ( top ) ? urlFileStack . bot ( ) : urlFileStack . top ( ) ;
if ( nextentry = = null ) {
// emergency case: this means that something with the stack organization is wrong
// the file appears to be broken. We kill the file.
urlFileStack . clear ( ) ;
Log . logSevere ( "BALANCER" , "get() failed to fetch entry from file stack. reset stack file." ) ;
} else {
final String nexthash = new String ( nextentry . getColBytes ( 0 ) ) ;
// check if the time after retrieval of last hash from same
// domain is not shorter than the minimumDelta
long waitingtime = Latency . waitingRemainingGuessed ( nexthash , minimumLocalDelta , minimumGlobalDelta ) ;
if ( waitingtime = = 0 ) {
// the entry is fine
result = new String ( ( top ) ? urlFileStack . pop ( ) . getColBytes ( 0 ) : urlFileStack . pot ( ) . getColBytes ( 0 ) ) ;
} else {
// try other entry
result = new String ( ( top ) ? urlFileStack . pot ( ) . getColBytes ( 0 ) : urlFileStack . pop ( ) . getColBytes ( 0 ) ) ;
}
}
top = ! top ; // alternate top/bottom
}
// check case where we did not found anything
if ( result = = null ) {
Log . logSevere ( "BALANCER" , "get() was not able to find a valid urlhash - total size = " + size ( ) + ", fileStack.size() = " + urlFileStack . size ( ) + ", ramStack.size() = " + urlRAMStack . size ( ) + ", domainStacks.size() = " + domainStacks . size ( ) ) ;
return null ;
// first simply take one of the entries in the top list, that should be one without any delay
if ( this . top . size ( ) > 0 ) {
result = top . remove ( ) ;
}
// finally: check minimumDelta and if necessary force a sleep
final int s = urlFileIndex . size ( ) ;
Row . Entry rowEntry = urlFileIndex . remove ( result . getBytes ( ) ) ;
Row . Entry rowEntry = ( result = = null ) ? null : urlFileIndex . remove ( result . getBytes ( ) ) ;
if ( rowEntry = = null ) rowEntry = urlFileIndex . removeOne ( ) ;
if ( rowEntry = = null ) {
String error = "get() found a valid urlhash, but failed to fetch the corresponding url entry - total size = " + size ( ) + ", fileStack.size() = " + urlFileStack . size ( ) + ", ramStack.size() = " + urlRAMStack . size ( ) + ", domainStacks.size() = " + domainStacks . size ( ) ;
//this.clear();
throw new IOException ( error + " - cleared the balancer" ) ;
Log . logWarning ( "Balancer" , "removeOne() failed - size = " + this . size ( ) ) ;
return null ;
}
assert urlFileIndex . size ( ) + 1 = = s : "urlFileIndex.size() = " + urlFileIndex . size ( ) + ", s = " + s + ", result = " + result ;
final CrawlEntry crawlEntry = new CrawlEntry ( rowEntry ) ;
//Log.logInfo("Balancer", "fetched next url: " + crawlEntry.url().toNormalform(true, false));
// at this point we must check if the crawlEntry has relevancy because the crawl profile still exists
// if not: return null. A calling method must handle the null value and try again
if ( profile ! = null & & ! profile . hasEntry ( crawlEntry . profileHandle ( ) ) ) return null ;
if ( profile ! = null & & ! profile . hasEntry ( crawlEntry . profileHandle ( ) ) ) {
profileErrors + + ;
if ( profileErrors < 20 ) Log . logInfo ( "Balancer" , "no profile entry for handle " + crawlEntry . profileHandle ( ) ) ;
return null ;
}
long sleeptime = Latency . waitingRemaining ( crawlEntry . url ( ) , minimumLocalDelta , minimumGlobalDelta ) ; // this uses the robots.txt database and may cause a loading of robots.txt from the server
if ( delay & & sleeptime > 0 ) {
@ -584,59 +307,81 @@ public class Balancer {
// this is only to protection against the worst case, where the crawler could
// behave in a DoS-manner
Log . logInfo ( "BALANCER" , "forcing crawl-delay of " + sleeptime + " milliseconds for " + crawlEntry . url ( ) . getHost ( ) + ( ( sleeptime > Math . max ( minimumLocalDelta , minimumGlobalDelta ) ) ? " (forced latency)" : "" ) ) ;
if ( System . currentTimeMillis ( ) - this . lastPrepare > 10000 ) {
long t = System . currentTimeMillis ( ) ;
prepare ( 400 ) ;
this . lastPrepare = System . currentTimeMillis ( ) ;
sleeptime - = this . lastPrepare - t ;
}
if ( sleeptime > 0 ) try { synchronized ( this ) { this . wait ( sleeptime ) ; } } catch ( final InterruptedException e ) { }
}
return crawlEntry ;
}
/ * *
* return top - elements from the crawl stack
* we do not produce here more entries than exist on the stack
* because otherwise the balancing does not work properly
* @param count
* @return
* @throws IOException
* /
public synchronized ArrayList < CrawlEntry > top ( int count ) throws IOException {
// construct a list using the urlRAMStack which was filled with this procedure
count = prepare ( count ) ;
final ArrayList < CrawlEntry > list = new ArrayList < CrawlEntry > ( ) ;
for ( int i = 0 ; i < count ; i + + ) {
final String urlhash = urlRAMStack . get ( i ) ;
final Row . Entry entry = urlFileIndex . get ( urlhash . getBytes ( ) ) ;
if ( entry = = null ) break ;
list . add ( new CrawlEntry ( entry ) ) ;
}
return list ;
private void filltop ( boolean delay , long maximumwaiting , boolean acceptonebest ) {
if ( this . top . size ( ) > 0 ) return ;
// check if we need to get entries from the file index
try {
fillDomainStacks ( 400 ) ;
} catch ( IOException e ) {
e . printStackTrace ( ) ;
}
// iterate over the domain stacks
Iterator < Map . Entry < String , LinkedList < String > > > i = this . domainStacks . entrySet ( ) . iterator ( ) ;
Map . Entry < String , LinkedList < String > > entry ;
long smallestWaiting = Long . MAX_VALUE ;
String besthash = null ;
while ( i . hasNext ( ) ) {
entry = i . next ( ) ;
if ( entry . getValue ( ) . size ( ) = = 0 ) {
i . remove ( ) ;
continue ;
}
String n = entry . getValue ( ) . getFirst ( ) ;
if ( delay ) {
long w = Latency . waitingRemainingGuessed ( n , minimumLocalDelta , minimumGlobalDelta ) ;
if ( w > maximumwaiting ) {
if ( w < smallestWaiting ) {
smallestWaiting = w ;
besthash = n ;
}
continue ;
}
}
n = entry . getValue ( ) . removeFirst ( ) ;
this . top . add ( n ) ;
if ( entry . getValue ( ) . size ( ) = = 0 ) i . remove ( ) ;
}
// if we could not find any entry, then take the best we have seen so far
if ( acceptonebest & & this . top . size ( ) > 0 & & besthash ! = null ) {
removeHashFromDomainStacks ( besthash ) ;
this . top . add ( besthash ) ;
}
}
private int prepare ( int count ) throws IOException {
// if we need to flush anything, then flush the domain stack first,
// to avoid that new urls get hidden by old entries from the file stack
if ( urlRAMStack = = null ) return 0 ;
// ensure that the domain stacks are filled enough
shiftFileToDomStacks ( count ) ;
// flush from the domain stacks first until they are empty
if ( ( domainStacksNotEmpty ( ) ) & & ( urlRAMStack . size ( ) < = count ) ) {
flushOnceDomStacks ( true , true , 100 ) ;
}
while ( ( domainStacksNotEmpty ( ) ) & & ( urlRAMStack . size ( ) < = count ) ) {
// flush only that much as we need to display
flushOnceDomStacks ( true , false , 100 ) ;
}
// if the ram is still not full enough, use the file stack
shiftFileToRAM ( count ) ;
return Math . min ( count , urlRAMStack . size ( ) ) ;
private void fillDomainStacks ( int maxdomstacksize ) throws IOException {
if ( this . domainStacks . size ( ) > 0 ) return ;
CloneableIterator < byte [ ] > i = this . urlFileIndex . keys ( true , null ) ;
while ( i . hasNext ( ) ) {
pushHashToDomainStacks ( new String ( i . next ( ) ) , 10 ) ;
if ( this . domainStacks . size ( ) > maxdomstacksize ) break ;
}
}
public ArrayList < CrawlEntry > top ( int count ) {
count = Math . min ( count , top . size ( ) ) ;
ArrayList < CrawlEntry > cel = new ArrayList < CrawlEntry > ( ) ;
if ( count = = 0 ) return cel ;
for ( String n : top ) {
try {
Row . Entry rowEntry = urlFileIndex . get ( n . getBytes ( ) ) ;
if ( rowEntry = = null ) continue ;
final CrawlEntry crawlEntry = new CrawlEntry ( rowEntry ) ;
cel . add ( crawlEntry ) ;
count - - ;
if ( count < = 0 ) break ;
} catch ( IOException e ) {
}
}
return cel ;
}
public synchronized Iterator < CrawlEntry > iterator ( ) throws IOException {