@ -109,7 +109,7 @@ public class Balancer {
}
}
public Request get ( final String urlhash ) throws IOException {
public synchronized Request get ( final String urlhash ) throws IOException {
assert urlhash ! = null ;
if ( urlFileIndex = = null ) return null ; // case occurs during shutdown
final Row . Entry entry = urlFileIndex . get ( urlhash . getBytes ( ) ) ;
@ -117,7 +117,7 @@ public class Balancer {
return new Request ( entry ) ;
}
public int removeAllByProfileHandle ( final String profileHandle , final long timeout ) throws IOException {
public synchronized int removeAllByProfileHandle ( final String profileHandle , final long timeout ) throws IOException {
// removes all entries with a specific profile hash.
// this may last some time
// returns number of deletions
@ -148,7 +148,7 @@ public class Balancer {
* @return number of entries that had been removed
* @throws IOException
* /
public int remove ( final HashSet < String > urlHashes ) throws IOException {
public synchronized int remove ( final HashSet < String > urlHashes ) throws IOException {
final int s = urlFileIndex . size ( ) ;
int removedCounter = 0 ;
for ( final String urlhash : urlHashes ) {
@ -191,7 +191,7 @@ public class Balancer {
return removedCounter ;
}
public boolean has ( final String urlhash ) {
public synchronized boolean has ( final String urlhash ) {
return urlFileIndex . has ( urlhash . getBytes ( ) ) ;
}
@ -306,48 +306,50 @@ public class Balancer {
long sleeptime = 0 ;
Request crawlEntry = null ;
while ( this . urlFileIndex . size ( ) > 0 ) {
// first simply take one of the entries in the top list, that should be one without any delay
String result = nextFromDelayed ( ) ;
if ( result = = null & & this . top . size ( ) > 0 ) result = top . remove ( ) ;
// check minimumDelta and if necessary force a sleep
//final int s = urlFileIndex.size();
Row . Entry rowEntry = ( result = = null ) ? null : urlFileIndex . remove ( result . getBytes ( ) ) ;
if ( rowEntry = = null ) {
rowEntry = urlFileIndex . removeOne ( ) ;
result = ( rowEntry = = null ) ? null : new String ( rowEntry . getPrimaryKeyBytes ( ) ) ;
}
if ( rowEntry = = null ) {
Log . logWarning ( "Balancer" , "removeOne() failed - size = " + this . size ( ) ) ;
return null ;
}
//assert urlFileIndex.size() + 1 == s : "urlFileIndex.size() = " + urlFileIndex.size() + ", s = " + s + ", result = " + result;
crawlEntry = new Request ( rowEntry ) ;
//Log.logInfo("Balancer", "fetched next url: " + crawlEntry.url().toNormalform(true, false));
// at this point we must check if the crawlEntry has relevancy because the crawl profile still exists
// if not: return null. A calling method must handle the null value and try again
if ( profile ! = null & & ! profile . hasEntry ( crawlEntry . profileHandle ( ) ) ) {
profileErrors + + ;
if ( profileErrors < 20 ) Log . logInfo ( "Balancer" , "no profile entry for handle " + crawlEntry . profileHandle ( ) ) ;
return null ;
}
sleeptime = Latency . waitingRemaining ( crawlEntry . url ( ) , minimumLocalDelta , minimumGlobalDelta ) ; // this uses the robots.txt database and may cause a loading of robots.txt from the server
assert result . equals ( new String ( rowEntry . getPrimaryKeyBytes ( ) ) ) : "result = " + result + ", rowEntry.getPrimaryKeyBytes() = " + new String ( rowEntry . getPrimaryKeyBytes ( ) ) ;
assert result . equals ( crawlEntry . url ( ) . hash ( ) ) : "result = " + result + ", crawlEntry.url().hash() = " + crawlEntry . url ( ) . hash ( ) ;
if ( this . domainStacks . size ( ) < = 1 ) break ;
if ( delay & & sleeptime > 0 ) {
// put that thing back to omit a delay here
this . delayed . put ( new Long ( System . currentTimeMillis ( ) + sleeptime + 1 ) , result ) ;
this . urlFileIndex . put ( rowEntry ) ;
this . domainStacks . remove ( result . substring ( 6 ) ) ;
continue ;
}
break ;
synchronized ( this ) {
while ( this . urlFileIndex . size ( ) > 0 ) {
// first simply take one of the entries in the top list, that should be one without any delay
String result = nextFromDelayed ( ) ;
if ( result = = null & & this . top . size ( ) > 0 ) result = top . remove ( ) ;
// check minimumDelta and if necessary force a sleep
//final int s = urlFileIndex.size();
Row . Entry rowEntry = ( result = = null ) ? null : urlFileIndex . remove ( result . getBytes ( ) ) ;
if ( rowEntry = = null ) {
rowEntry = urlFileIndex . removeOne ( ) ;
result = ( rowEntry = = null ) ? null : new String ( rowEntry . getPrimaryKeyBytes ( ) ) ;
}
if ( rowEntry = = null ) {
Log . logWarning ( "Balancer" , "removeOne() failed - size = " + this . size ( ) ) ;
return null ;
}
//assert urlFileIndex.size() + 1 == s : "urlFileIndex.size() = " + urlFileIndex.size() + ", s = " + s + ", result = " + result;
crawlEntry = new Request ( rowEntry ) ;
//Log.logInfo("Balancer", "fetched next url: " + crawlEntry.url().toNormalform(true, false));
// at this point we must check if the crawlEntry has relevancy because the crawl profile still exists
// if not: return null. A calling method must handle the null value and try again
if ( profile ! = null & & ! profile . hasEntry ( crawlEntry . profileHandle ( ) ) ) {
profileErrors + + ;
if ( profileErrors < 20 ) Log . logInfo ( "Balancer" , "no profile entry for handle " + crawlEntry . profileHandle ( ) ) ;
return null ;
}
sleeptime = Latency . waitingRemaining ( crawlEntry . url ( ) , minimumLocalDelta , minimumGlobalDelta ) ; // this uses the robots.txt database and may cause a loading of robots.txt from the server
assert result . equals ( new String ( rowEntry . getPrimaryKeyBytes ( ) ) ) : "result = " + result + ", rowEntry.getPrimaryKeyBytes() = " + new String ( rowEntry . getPrimaryKeyBytes ( ) ) ;
assert result . equals ( crawlEntry . url ( ) . hash ( ) ) : "result = " + result + ", crawlEntry.url().hash() = " + crawlEntry . url ( ) . hash ( ) ;
if ( this . domainStacks . size ( ) < = 1 ) break ;
if ( delay & & sleeptime > 0 ) {
// put that thing back to omit a delay here
this . delayed . put ( new Long ( System . currentTimeMillis ( ) + sleeptime + 1 ) , result ) ;
this . urlFileIndex . put ( rowEntry ) ;
this . domainStacks . remove ( result . substring ( 6 ) ) ;
continue ;
}
break ;
}
}
if ( crawlEntry = = null ) return null ;
@ -438,16 +440,18 @@ public class Balancer {
count = Math . min ( count , top . size ( ) ) ;
ArrayList < Request > cel = new ArrayList < Request > ( ) ;
if ( count = = 0 ) return cel ;
for ( String n : top ) {
try {
Row . Entry rowEntry = urlFileIndex . get ( n . getBytes ( ) ) ;
if ( rowEntry = = null ) continue ;
final Request crawlEntry = new Request ( rowEntry ) ;
cel . add ( crawlEntry ) ;
count - - ;
if ( count < = 0 ) break ;
} catch ( IOException e ) {
}
synchronized ( this ) {
for ( String n : top ) {
try {
Row . Entry rowEntry = urlFileIndex . get ( n . getBytes ( ) ) ;
if ( rowEntry = = null ) continue ;
final Request crawlEntry = new Request ( rowEntry ) ;
cel . add ( crawlEntry ) ;
count - - ;
if ( count < = 0 ) break ;
} catch ( IOException e ) {
}
}
}
return cel ;
}