@ -29,6 +29,7 @@ import java.util.HashSet;
import java.util.Iterator ;
import java.util.Iterator ;
import java.util.LinkedList ;
import java.util.LinkedList ;
import java.util.Map ;
import java.util.Map ;
import java.util.TreeMap ;
import java.util.concurrent.ConcurrentHashMap ;
import java.util.concurrent.ConcurrentHashMap ;
import java.util.concurrent.ConcurrentLinkedQueue ;
import java.util.concurrent.ConcurrentLinkedQueue ;
@ -48,6 +49,8 @@ public class Balancer {
domainStacks ; // a map from domain name part to Lists with url hashs
domainStacks ; // a map from domain name part to Lists with url hashs
private ConcurrentLinkedQueue < String >
private ConcurrentLinkedQueue < String >
top ;
top ;
private TreeMap < Long , String >
delayed ;
private ObjectIndex urlFileIndex ;
private ObjectIndex urlFileIndex ;
private final File cacheStacksPath ;
private final File cacheStacksPath ;
private long minimumLocalDelta ;
private long minimumLocalDelta ;
@ -60,6 +63,7 @@ public class Balancer {
this . cacheStacksPath = cachePath ;
this . cacheStacksPath = cachePath ;
this . domainStacks = new ConcurrentHashMap < String , LinkedList < String > > ( ) ;
this . domainStacks = new ConcurrentHashMap < String , LinkedList < String > > ( ) ;
this . top = new ConcurrentLinkedQueue < String > ( ) ;
this . top = new ConcurrentLinkedQueue < String > ( ) ;
this . delayed = new TreeMap < Long , String > ( ) ;
this . minimumLocalDelta = minimumLocalDelta ;
this . minimumLocalDelta = minimumLocalDelta ;
this . minimumGlobalDelta = minimumGlobalDelta ;
this . minimumGlobalDelta = minimumGlobalDelta ;
@ -93,7 +97,7 @@ public class Balancer {
}
}
}
}
public synchronized void clear ( ) {
public void clear ( ) {
Log . logInfo ( "Balancer" , "cleaing balancer with " + urlFileIndex . size ( ) + " entries from " + urlFileIndex . filename ( ) ) ;
Log . logInfo ( "Balancer" , "cleaing balancer with " + urlFileIndex . size ( ) + " entries from " + urlFileIndex . filename ( ) ) ;
try {
try {
urlFileIndex . clear ( ) ;
urlFileIndex . clear ( ) ;
@ -102,9 +106,12 @@ public class Balancer {
}
}
domainStacks . clear ( ) ;
domainStacks . clear ( ) ;
top . clear ( ) ;
top . clear ( ) ;
synchronized ( this . delayed ) {
delayed . clear ( ) ;
}
}
}
public synchronized CrawlEntry get ( final String urlhash ) throws IOException {
public CrawlEntry get ( final String urlhash ) throws IOException {
assert urlhash ! = null ;
assert urlhash ! = null ;
if ( urlFileIndex = = null ) return null ; // case occurs during shutdown
if ( urlFileIndex = = null ) return null ; // case occurs during shutdown
final Row . Entry entry = urlFileIndex . get ( urlhash . getBytes ( ) ) ;
final Row . Entry entry = urlFileIndex . get ( urlhash . getBytes ( ) ) ;
@ -112,7 +119,7 @@ public class Balancer {
return new CrawlEntry ( entry ) ;
return new CrawlEntry ( entry ) ;
}
}
public synchronized int removeAllByProfileHandle ( final String profileHandle , final long timeout ) throws IOException {
public int removeAllByProfileHandle ( final String profileHandle , final long timeout ) throws IOException {
// removes all entries with a specific profile hash.
// removes all entries with a specific profile hash.
// this may last some time
// this may last some time
// returns number of deletions
// returns number of deletions
@ -143,7 +150,7 @@ public class Balancer {
* @return number of entries that had been removed
* @return number of entries that had been removed
* @throws IOException
* @throws IOException
* /
* /
public synchronized int remove ( final HashSet < String > urlHashes ) throws IOException {
public int remove ( final HashSet < String > urlHashes ) throws IOException {
final int s = urlFileIndex . size ( ) ;
final int s = urlFileIndex . size ( ) ;
int removedCounter = 0 ;
int removedCounter = 0 ;
for ( final String urlhash : urlHashes ) {
for ( final String urlhash : urlHashes ) {
@ -161,34 +168,42 @@ public class Balancer {
if ( urlHashes . contains ( urlhash ) ) j . remove ( ) ;
if ( urlHashes . contains ( urlhash ) ) j . remove ( ) ;
}
}
// remove from delayed
synchronized ( this . delayed ) {
Iterator < Map . Entry < Long , String > > k = this . delayed . entrySet ( ) . iterator ( ) ;
while ( k . hasNext ( ) ) {
if ( urlHashes . contains ( k . next ( ) . getValue ( ) ) ) k . remove ( ) ;
}
}
// iterate through the domain stacks
// iterate through the domain stacks
final Iterator < Map . Entry < String , LinkedList < String > > > k = domainStacks . entrySet ( ) . iterator ( ) ;
final Iterator < Map . Entry < String , LinkedList < String > > > q = domainStacks . entrySet ( ) . iterator ( ) ;
Map . Entry < String , LinkedList < String > > se ;
Map . Entry < String , LinkedList < String > > se ;
LinkedList < String > stack ;
LinkedList < String > stack ;
while ( k . hasNext ( ) ) {
while ( q . hasNext ( ) ) {
se = k . next ( ) ;
se = q . next ( ) ;
stack = se . getValue ( ) ;
stack = se . getValue ( ) ;
Iterator < String > i = stack . iterator ( ) ;
Iterator < String > i = stack . iterator ( ) ;
while ( i . hasNext ( ) ) {
while ( i . hasNext ( ) ) {
if ( urlHashes . contains ( i . next ( ) ) ) i . remove ( ) ;
if ( urlHashes . contains ( i . next ( ) ) ) i . remove ( ) ;
}
}
if ( stack . size ( ) = = 0 ) k . remove ( ) ;
if ( stack . size ( ) = = 0 ) q . remove ( ) ;
}
}
return removedCounter ;
return removedCounter ;
}
}
public synchronized boolean has ( final String urlhash ) {
public boolean has ( final String urlhash ) {
return urlFileIndex . has ( urlhash . getBytes ( ) ) ;
return urlFileIndex . has ( urlhash . getBytes ( ) ) ;
}
}
public synchronized boolean notEmpty ( ) {
public boolean notEmpty ( ) {
// alternative method to the property size() > 0
// alternative method to the property size() > 0
// this is better because it may avoid synchronized access to domain stack summarization
// this is better because it may avoid synchronized access to domain stack summarization
return domainStacksNotEmpty ( ) ;
return domainStacksNotEmpty ( ) ;
}
}
public synchronized int size ( ) {
public int size ( ) {
return urlFileIndex . size ( ) ;
return urlFileIndex . size ( ) ;
}
}
@ -203,22 +218,24 @@ public class Balancer {
return false ;
return false ;
}
}
public synchronized void push ( final CrawlEntry entry ) throws IOException {
public void push ( final CrawlEntry entry ) throws IOException {
assert entry ! = null ;
assert entry ! = null ;
String hash = entry . url ( ) . hash ( ) ;
String hash = entry . url ( ) . hash ( ) ;
if ( urlFileIndex . has ( hash . getBytes ( ) ) ) {
synchronized ( this ) {
//Log.logWarning("BALANCER", "double-check has failed for urlhash " + entry.url().hash() + " in " + stackname + " - fixed");
if ( urlFileIndex . has ( hash . getBytes ( ) ) ) {
return ;
//Log.logWarning("BALANCER", "double-check has failed for urlhash " + entry.url().hash() + " in " + stackname + " - fixed");
}
return ;
}
// add to index
int s = urlFileIndex . size ( ) ;
urlFileIndex . put ( entry . toRow ( ) ) ;
assert s < urlFileIndex . size ( ) : "hash = " + hash ;
assert urlFileIndex . has ( hash . getBytes ( ) ) : "hash = " + hash ;
// add the hash to a queue
// add to index
pushHashToDomainStacks ( entry . url ( ) . hash ( ) , 50 ) ;
int s = urlFileIndex . size ( ) ;
urlFileIndex . put ( entry . toRow ( ) ) ;
assert s < urlFileIndex . size ( ) : "hash = " + hash ;
assert urlFileIndex . has ( hash . getBytes ( ) ) : "hash = " + hash ;
// add the hash to a queue
pushHashToDomainStacks ( entry . url ( ) . hash ( ) , 50 ) ;
}
}
}
private void pushHashToDomainStacks ( final String hash , int maxstacksize ) {
private void pushHashToDomainStacks ( final String hash , int maxstacksize ) {
@ -252,6 +269,18 @@ public class Balancer {
}
}
}
}
private String nextFromDelayed ( ) {
if ( this . delayed . size ( ) = = 0 ) return null ;
synchronized ( this . delayed ) {
if ( this . delayed . size ( ) = = 0 ) return null ;
Long first = this . delayed . firstKey ( ) ;
if ( first . longValue ( ) < System . currentTimeMillis ( ) ) {
return this . delayed . remove ( first ) ;
}
}
return null ;
}
/ * *
/ * *
* get the next entry in this crawl queue in such a way that the domain access time delta is maximized
* get the next entry in this crawl queue in such a way that the domain access time delta is maximized
* and always above the given minimum delay time . An additional delay time is computed using the robots . txt
* and always above the given minimum delay time . An additional delay time is computed using the robots . txt
@ -263,7 +292,7 @@ public class Balancer {
* @return a url in a CrawlEntry object
* @return a url in a CrawlEntry object
* @throws IOException
* @throws IOException
* /
* /
public synchronized CrawlEntry pop ( boolean delay , CrawlProfile profile ) throws IOException {
public CrawlEntry pop ( boolean delay , CrawlProfile profile ) throws IOException {
// returns a crawl entry from the stack and ensures minimum delta times
// returns a crawl entry from the stack and ensures minimum delta times
filltop ( delay , - 600000 , false ) ;
filltop ( delay , - 600000 , false ) ;
@ -277,35 +306,53 @@ public class Balancer {
filltop ( delay , - 500 , false ) ;
filltop ( delay , - 500 , false ) ;
filltop ( delay , 0 , true ) ;
filltop ( delay , 0 , true ) ;
String result = null ; // the result
long sleeptime = 0 ;
CrawlEntry crawlEntry = null ;
// first simply take one of the entries in the top list, that should be one without any delay
while ( this . urlFileIndex . size ( ) > 0 ) {
if ( this . top . size ( ) > 0 ) {
// first simply take one of the entries in the top list, that should be one without any delay
result = top . remove ( ) ;
String result = nextFromDelayed ( ) ;
}
if ( result = = null & & this . top . size ( ) > 0 ) result = top . remove ( ) ;
// finally: check minimumDelta and if necessary force a sleep
// check minimumDelta and if necessary force a sleep
final int s = urlFileIndex . size ( ) ;
//final int s = urlFileIndex.size();
Row . Entry rowEntry = ( result = = null ) ? null : urlFileIndex . remove ( result . getBytes ( ) ) ;
Row . Entry rowEntry = ( result = = null ) ? null : urlFileIndex . remove ( result . getBytes ( ) ) ;
if ( rowEntry = = null ) rowEntry = urlFileIndex . removeOne ( ) ;
if ( rowEntry = = null ) {
if ( rowEntry = = null ) {
rowEntry = urlFileIndex . removeOne ( ) ;
Log . logWarning ( "Balancer" , "removeOne() failed - size = " + this . size ( ) ) ;
result = ( rowEntry = = null ) ? null : new String ( rowEntry . getPrimaryKeyBytes ( ) ) ;
return null ;
}
}
if ( rowEntry = = null ) {
assert urlFileIndex . size ( ) + 1 = = s : "urlFileIndex.size() = " + urlFileIndex . size ( ) + ", s = " + s + ", result = " + result ;
Log . logWarning ( "Balancer" , "removeOne() failed - size = " + this . size ( ) ) ;
return null ;
final CrawlEntry crawlEntry = new CrawlEntry ( rowEntry ) ;
}
//Log.logInfo("Balancer", "fetched next url: " + crawlEntry.url().toNormalform(true, false));
//assert urlFileIndex.size() + 1 == s : "urlFileIndex.size() = " + urlFileIndex.size() + ", s = " + s + ", result = " + result;
// at this point we must check if the crawlEntry has relevancy because the crawl profile still exists
crawlEntry = new CrawlEntry ( rowEntry ) ;
// if not: return null. A calling method must handle the null value and try again
//Log.logInfo("Balancer", "fetched next url: " + crawlEntry.url().toNormalform(true, false));
if ( profile ! = null & & ! profile . hasEntry ( crawlEntry . profileHandle ( ) ) ) {
profileErrors + + ;
// at this point we must check if the crawlEntry has relevancy because the crawl profile still exists
if ( profileErrors < 20 ) Log . logInfo ( "Balancer" , "no profile entry for handle " + crawlEntry . profileHandle ( ) ) ;
// if not: return null. A calling method must handle the null value and try again
return null ;
if ( profile ! = null & & ! profile . hasEntry ( crawlEntry . profileHandle ( ) ) ) {
}
profileErrors + + ;
long sleeptime = Latency . waitingRemaining ( crawlEntry . url ( ) , minimumLocalDelta , minimumGlobalDelta ) ; // this uses the robots.txt database and may cause a loading of robots.txt from the server
if ( profileErrors < 20 ) Log . logInfo ( "Balancer" , "no profile entry for handle " + crawlEntry . profileHandle ( ) ) ;
return null ;
}
sleeptime = Latency . waitingRemaining ( crawlEntry . url ( ) , minimumLocalDelta , minimumGlobalDelta ) ; // this uses the robots.txt database and may cause a loading of robots.txt from the server
assert result . equals ( new String ( rowEntry . getPrimaryKeyBytes ( ) ) ) : "result = " + result + ", rowEntry.getPrimaryKeyBytes() = " + new String ( rowEntry . getPrimaryKeyBytes ( ) ) ;
assert result . equals ( crawlEntry . url ( ) . hash ( ) ) : "result = " + result + ", crawlEntry.url().hash() = " + crawlEntry . url ( ) . hash ( ) ;
if ( this . domainStacks . size ( ) < = 1 ) break ;
if ( delay & & sleeptime > 0 ) {
// put that thing back to omit a delay here
this . delayed . put ( new Long ( System . currentTimeMillis ( ) + sleeptime + 1 ) , result ) ;
this . urlFileIndex . put ( rowEntry ) ;
this . domainStacks . remove ( result . substring ( 6 ) ) ;
continue ;
}
break ;
}
if ( crawlEntry = = null ) return null ;
if ( delay & & sleeptime > 0 ) {
if ( delay & & sleeptime > 0 ) {
// force a busy waiting here
// force a busy waiting here
// in best case, this should never happen if the balancer works propertly
// in best case, this should never happen if the balancer works propertly
@ -378,8 +425,9 @@ public class Balancer {
}
}
private void fillDomainStacks ( int maxdomstacksize ) throws IOException {
private void fillDomainStacks ( int maxdomstacksize ) throws IOException {
if ( this . domainStacks . size ( ) > 0 & & System . currentTimeMillis ( ) - lastDomainStackFill < 6 00000L) return ;
if ( this . domainStacks . size ( ) > 0 & & System . currentTimeMillis ( ) - lastDomainStackFill < 2 00000L) return ;
this . domainStacks . clear ( ) ;
this . domainStacks . clear ( ) ;
//synchronized (this.delayed) { delayed.clear(); }
this . lastDomainStackFill = System . currentTimeMillis ( ) ;
this . lastDomainStackFill = System . currentTimeMillis ( ) ;
CloneableIterator < byte [ ] > i = this . urlFileIndex . keys ( true , null ) ;
CloneableIterator < byte [ ] > i = this . urlFileIndex . keys ( true , null ) ;
while ( i . hasNext ( ) ) {
while ( i . hasNext ( ) ) {
@ -406,7 +454,7 @@ public class Balancer {
return cel ;
return cel ;
}
}
public synchronized Iterator < CrawlEntry > iterator ( ) throws IOException {
public Iterator < CrawlEntry > iterator ( ) throws IOException {
return new EntryIterator ( ) ;
return new EntryIterator ( ) ;
}
}