@ -43,178 +43,325 @@ package de.anomic.plasma;
import java.io.File ;
import java.io.IOException ;
import java.io.UnsupportedEncodingException ;
import java.util.ArrayList ;
import java.util.Collections ;
import java.util.HashMap ;
import java.util.HashSet ;
import java.util.Iterator ;
import java.util.LinkedList ;
import java.util.Map ;
import java.util.TreeMap ;
import de.anomic.kelondro.kelondroBase64Order ;
import de.anomic.kelondro.kelondroRecords ;
import de.anomic.kelondro.kelondroRow ;
import de.anomic.kelondro.kelondroStack ;
import de.anomic.server.logging.serverLog ;
import de.anomic.yacy.yacySeedDB ;
public class plasmaCrawlBalancer {
private kelondroStack stack ;
private HashMap domainStacks ;
private HashMap domainAccess ;
// a shared domainAccess map for all balancers
private static final Map domainAccess = Collections . synchronizedMap ( new HashMap ( ) ) ;
// definition of payload for fileStack
private static final kelondroRow payload = new kelondroRow ( "byte[] urlhash-" + yacySeedDB . commonHashLength , kelondroBase64Order . enhancedCoder , 0 ) ;
// class variables
private ArrayList ramStack ; // a list that is flused first
private kelondroStack fileStack ; // a file with url hashes
private HashMap domainStacks ; // a map from domain name part to Lists with url hashs
private HashSet ramIndex ; // an index is needed externally, we provide that internally
public plasmaCrawlBalancer ( File stackFile ) {
stack = kelondroStack . open ( stackFile , new kelondroRow ( "byte[] urlhash-" + yacySeedDB . commonHashLength , kelondroBase64Order . enhancedCoder , 0 ) ) ;
fileStack = kelondroStack . open ( stackFile , payload ) ;
domainStacks = new HashMap ( ) ;
domainAccess = new HashMap ( ) ;
ramStack = new ArrayList ( ) ;
ramIndex = makeIndex ( ) ;
}
public void close ( ) {
try { flushSome ( domainStacks . size ( ) ) ; } catch ( IOException e ) { }
try { stack . close ( ) ; } catch ( IOException e ) { }
stack = null ;
public synchronized void close ( ) {
ramIndex = null ;
while ( sizeDomainStacks ( ) > 0 ) flushOnceDomStacks ( true ) ;
try { flushAllRamStack ( ) ; } catch ( IOException e ) { }
try { fileStack . close ( ) ; } catch ( IOException e ) { }
fileStack = null ;
}
public void reset ( ) {
synchronized ( domainStacks ) {
stack = kelondroStack . reset ( stack ) ;
domainStacks . clear ( ) ;
public void finalize ( ) {
if ( fileStack ! = null ) close ( ) ;
}
public synchronized void clear ( ) {
fileStack = kelondroStack . reset ( fileStack ) ;
domainStacks . clear ( ) ;
ramStack . clear ( ) ;
ramIndex = new HashSet ( ) ;
}
private HashSet makeIndex ( ) {
HashSet index = new HashSet ( ) ; // TODO: replace with kelondroIndex
// take all elements from the file stack
try {
Iterator i = fileStack . keyIterator ( ) ; // iterates byte[] - objects
while ( i . hasNext ( ) ) index . add ( new String ( ( byte [ ] ) i . next ( ) , "UTF-8" ) ) ;
} catch ( UnsupportedEncodingException e ) { }
// take elements from the ram stack
for ( int i = 0 ; i < ramStack . size ( ) ; i + + ) index . add ( ramStack . get ( i ) ) ;
// take elememts from domain stacks
Iterator i = domainStacks . entrySet ( ) . iterator ( ) ;
Map . Entry entry ;
LinkedList list ;
Iterator ii ;
while ( i . hasNext ( ) ) {
entry = ( Map . Entry ) i . next ( ) ;
list = ( LinkedList ) entry . getValue ( ) ;
ii = list . iterator ( ) ;
while ( ii . hasNext ( ) ) index . add ( ii . next ( ) ) ;
}
return index ;
}
public boolean has ( String urlhash ) {
return ramIndex . contains ( urlhash ) ;
}
public Iterator iterator ( ) {
// iterates byte[] - objects
return new KeyIterator ( stack . iterator ( ) ) ;
return ramIndex . iterator ( ) ;
}
public int size ( ) {
return stack . size ( ) + sizeDomainStacks ( ) ;
public synchronized int size ( ) {
int componentsize = fileStack . size ( ) + ramStack . size ( ) + sizeDomainStacks ( ) ;
//assert componentsize == ramIndex.size() : "componentsize = " + componentsize + ", ramIndex.size() = " + ramIndex.size();
if ( componentsize ! = ramIndex . size ( ) ) {
serverLog . logWarning ( "PLASMA BALANCER" , "size operation wrong - componentsize = " + componentsize + ", ramIndex.size() = " + ramIndex . size ( ) ) ;
}
return componentsize ;
}
private int sizeDomainStacks ( ) {
if ( domainStacks = = null ) return 0 ;
int sum = 0 ;
synchronized ( domainStacks ) {
Iterator i = domainStacks . values ( ) . iterator ( ) ;
while ( i . hasNext ( ) ) sum + = ( ( ArrayList ) i . next ( ) ) . size ( ) ;
}
Iterator i = domainStacks . values ( ) . iterator ( ) ;
while ( i . hasNext ( ) ) sum + = ( ( LinkedList ) i . next ( ) ) . size ( ) ;
return sum ;
}
private void flushOnce ( ) throws IOException {
private void flushOnce DomStacks( boolean ram ) {
// takes one entry from every domain stack and puts it on the file stack
synchronized ( domainStacks ) {
Iterator i = domainStacks . entrySet ( ) . iterator ( ) ;
Map . Entry entry ;
ArrayList list ;
while ( i . hasNext ( ) ) {
entry = ( Map . Entry ) i . next ( ) ;
list = ( ArrayList ) entry . getValue ( ) ;
if ( list . size ( ) ! = 0 ) {
stack . push ( stack . row ( ) . newEntry ( new byte [ ] [ ] { ( byte [ ] ) list . remove ( 0 ) } ) ) ;
if ( domainStacks . size ( ) = = 0 ) return ;
Iterator i = domainStacks . entrySet ( ) . iterator ( ) ;
Map . Entry entry ;
LinkedList list ;
while ( i . hasNext ( ) ) {
entry = ( Map . Entry ) i . next ( ) ;
list = ( LinkedList ) entry . getValue ( ) ;
if ( list . size ( ) ! = 0 ) {
if ( ram ) {
ramStack . add ( list . removeFirst ( ) ) ;
} else try {
fileStack . push ( fileStack . row ( ) . newEntry ( new byte [ ] [ ] { ( ( String ) list . removeFirst ( ) ) . getBytes ( ) } ) ) ;
} catch ( IOException e ) {
e . printStackTrace ( ) ;
}
if ( list . size ( ) = = 0 ) i . remove ( ) ;
}
if ( list . size ( ) = = 0 ) i . remove ( ) ;
}
}
private void flushSome ( int count ) throws IOException {
while ( ( domainStacks . size ( ) > 0 ) & & ( count - - > 0 ) ) flushOnce ( ) ;
private void flushAllRamStack ( ) throws IOException {
// this flushes only the ramStack to the fileStack, but does not flush the domainStacks
for ( int i = 0 ; i < ramStack . size ( ) / 2 ; i + + ) {
fileStack . push ( fileStack . row ( ) . newEntry ( new byte [ ] [ ] { ( ( String ) ramStack . get ( i ) ) . getBytes ( ) } ) ) ;
fileStack . push ( fileStack . row ( ) . newEntry ( new byte [ ] [ ] { ( ( String ) ramStack . get ( ramStack . size ( ) - i - 1 ) ) . getBytes ( ) } ) ) ;
}
if ( ramStack . size ( ) % 2 = = 1 )
fileStack . push ( fileStack . row ( ) . newEntry ( new byte [ ] [ ] { ( ( String ) ramStack . get ( ramStack . size ( ) / 2 ) ) . getBytes ( ) } ) ) ;
}
public void add ( String domain , byte [ ] hash ) throws IOException {
synchronized ( domainStacks ) {
ArrayList domainList = ( ArrayList ) domainStacks . get ( domain ) ;
if ( domainList = = null ) {
// create new list
domainList = new ArrayList ( ) ;
domainList . add ( hash ) ;
domainStacks . put ( domain , domainList ) ;
} else {
// extend existent domain list
domainList . add ( hash ) ;
}
public synchronized void add ( String urlhash ) throws IOException {
assert urlhash ! = null ;
if ( ramIndex . contains ( urlhash ) ) {
serverLog . logWarning ( "PLASMA BALANCER" , "double-check has failed for urlhash " + urlhash + " - fixed" ) ;
return ;
}
String dom = urlhash . substring ( 6 ) ;
LinkedList domainList = ( LinkedList ) domainStacks . get ( dom ) ;
if ( domainList = = null ) {
// create new list
domainList = new LinkedList ( ) ;
domainList . addLast ( urlhash ) ;
domainStacks . put ( dom , domainList ) ;
} else {
// extend existent domain list
domainList . add ( urlhash ) ;
}
// check size of domainStacks and flush
if ( ( domainStacks . size ( ) > 20 ) | | ( sizeDomainStacks ( ) > 400 ) ) {
flushOnce ( ) ;
if ( ( domainStacks . size ( ) > 20 ) | | ( sizeDomainStacks ( ) > 10 00) ) {
flushOnce DomStacks ( false ) ;
}
// add to index
ramIndex . add ( urlhash ) ;
}
public String get ( long minimumDelta ) throws IOException {
public synchronized String get ( long minimumDelta , long maximumAge ) throws IOException {
// returns an url-hash from the stack and ensures minimum delta times
synchronized ( domainStacks ) {
if ( ( stack . size ( ) = = 0 ) & & ( domainStacks . size ( ) > 0 ) ) flushOnce ( ) ;
if ( stack . size ( ) = = 0 ) return null ;
// we have 3 sources to choose from: the ramStack, the domainStacks and the fileStack
String result = null ; // the result
// 1st: check ramStack
if ( ramStack . size ( ) > 0 ) {
result = ( String ) ramStack . remove ( 0 ) ;
}
// 2nd-a: check domainStacks for latest arrivals
if ( result = = null ) {
// we select specific domains that have not been used for a long time
// i.e. 60 seconds. Latest arrivals that have not yet been crawled
// fit also in that scheme
Iterator i = domainStacks . entrySet ( ) . iterator ( ) ;
Map . Entry entry ;
String domhash ;
long delta , maxdelta = 0 ;
String maxhash = null ;
LinkedList domlist ;
while ( i . hasNext ( ) ) {
entry = ( Map . Entry ) i . next ( ) ;
domhash = ( String ) entry . getKey ( ) ;
delta = lastAccessDelta ( domhash ) ;
if ( delta = = Integer . MAX_VALUE ) {
// a brand new domain - we take it
domlist = ( LinkedList ) entry . getValue ( ) ;
result = ( String ) domlist . removeFirst ( ) ;
if ( domlist . size ( ) = = 0 ) i . remove ( ) ;
break ;
}
if ( delta > maxdelta ) {
maxdelta = delta ;
maxhash = domhash ;
}
}
if ( maxdelta > maximumAge ) {
// success - we found an entry from a domain that has not been used for a long time
domlist = ( LinkedList ) domainStacks . get ( maxhash ) ;
result = ( String ) domlist . removeFirst ( ) ;
if ( domlist . size ( ) = = 0 ) domainStacks . remove ( maxhash ) ;
}
}
// 2nd-b: check domainStacks for best match between stack size and retrieval time
if ( result = = null ) {
// we order all domains by the number of entries per domain
// then we iterate through these domains in descending entry order
// and that that one, that has a delta > minimumDelta
Iterator i = domainStacks . entrySet ( ) . iterator ( ) ;
Map . Entry entry ;
String domhash ;
LinkedList domlist ;
TreeMap hitlist = new TreeMap ( ) ;
int count = 0 ;
// first collect information about sizes of the domain lists
while ( i . hasNext ( ) ) {
entry = ( Map . Entry ) i . next ( ) ;
domhash = ( String ) entry . getKey ( ) ;
domlist = ( LinkedList ) entry . getValue ( ) ;
hitlist . put ( new Integer ( domlist . size ( ) * 100 + count + + ) , domhash ) ;
}
String entry = null ;
kelondroRow . Entry topentry = stack . top ( ) ;
if ( topentry = = null ) return null ;
String top = new String ( topentry . getColBytes ( 0 ) ) ;
// check if the time after retrieval of last hash from same
// domain is not shorter than the minimumDelta
long delta = lastAccessDelta ( top ) ;
if ( delta > minimumDelta ) {
// the entry from top is fine
entry = new String ( stack . pop ( ) . getColBytes ( 0 ) ) ;
} else {
// try entry from bottom
entry = new String ( stack . pot ( ) . getColBytes ( 0 ) ) ;
delta = lastAccessDelta ( entry ) ;
// now iterate in descending order an fetch that one,
// that is acceptable by the minimumDelta constraint
long delta ;
String maxhash = null ;
while ( hitlist . size ( ) > 0 ) {
domhash = ( String ) hitlist . remove ( hitlist . lastKey ( ) ) ;
if ( maxhash = = null ) maxhash = domhash ; // remember first entry
delta = lastAccessDelta ( domhash ) ;
if ( delta > minimumDelta ) {
domlist = ( LinkedList ) domainStacks . get ( domhash ) ;
result = ( String ) domlist . removeFirst ( ) ;
if ( domlist . size ( ) = = 0 ) domainStacks . remove ( domhash ) ;
break ;
}
}
if ( delta < minimumDelta ) {
// force a busy waiting here
// in best case, this should never happen if the balancer works propertly
// this is only to protect against the worst case, where the crawler could
// behave in a DoS-manner
long sleeptime = minimumDelta - delta ;
try { synchronized ( this ) { this . wait ( sleeptime ) ; } } catch ( InterruptedException e ) { }
// if we did yet not choose any entry, we simply take that one with the most entries
if ( ( result = = null ) & & ( maxhash ! = null ) ) {
domlist = ( LinkedList ) domainStacks . get ( maxhash ) ;
result = ( String ) domlist . removeFirst ( ) ;
if ( domlist . size ( ) = = 0 ) domainStacks . remove ( maxhash ) ;
}
domainAccess . put ( entry . substring ( 6 ) , new Long ( System . currentTimeMillis ( ) ) ) ;
return entry ;
}
}
private long lastAccessDelta ( String urlhash ) {
assert urlhash ! = null ;
Long lastAccess = ( Long ) domainAccess . get ( urlhash . substring ( 6 ) ) ;
if ( lastAccess = = null ) return Long . MAX_VALUE ; // never accessed
return System . currentTimeMillis ( ) - lastAccess . longValue ( ) ;
}
public byte [ ] top ( int dist ) throws IOException {
flushSome ( 1 + dist - stack . size ( ) ) ; // flush only that much as we need to display
synchronized ( domainStacks ) {
return stack . top ( dist ) . getColBytes ( 0 ) ;
}
}
public void clear ( ) throws IOException {
synchronized ( domainStacks ) {
domainStacks . clear ( ) ;
stack = kelondroStack . reset ( stack ) ;
}
}
public class KeyIterator implements Iterator {
Iterator ni ;
public KeyIterator ( Iterator i ) {
ni = i ;
// 3rd: take entry from file
if ( ( result = = null ) & & ( fileStack . size ( ) > 0 ) ) {
kelondroRow . Entry topentry = fileStack . top ( ) ;
if ( topentry ! = null ) {
String top = new String ( topentry . getColBytes ( 0 ) ) ;
// check if the time after retrieval of last hash from same
// domain is not shorter than the minimumDelta
long delta = lastAccessDelta ( top ) ;
if ( delta > minimumDelta ) {
// the entry from top is fine
result = new String ( fileStack . pop ( ) . getColBytes ( 0 ) ) ;
} else {
// try entry from bottom
result = new String ( fileStack . pot ( ) . getColBytes ( 0 ) ) ;
delta = lastAccessDelta ( result ) ;
}
}
}
public boolean hasNext ( ) {
return ni . hasNext ( ) ;
// check case where we did not found anything
if ( result = = null ) {
serverLog . logSevere ( "PLASMA BALANCER" , "get() was not able to find a valid urlhash - total size = " + size ( ) + ", fileStack.size() = " + fileStack . size ( ) + ", ramStack.size() = " + ramStack . size ( ) + ", domainStacks.size() = " + domainStacks . size ( ) ) ;
return null ;
}
public Object next ( ) {
return ( ( kelondroRecords . Node ) ni . next ( ) ) . getKey ( ) ;
// finally: check minimumDelta and if necessary force a sleep
long delta = lastAccessDelta ( result ) ;
if ( delta < minimumDelta ) {
// force a busy waiting here
// in best case, this should never happen if the balancer works propertly
// this is only to protect against the worst case, where the crawler could
// behave in a DoS-manner
long sleeptime = minimumDelta - delta ;
try { synchronized ( this ) { this . wait ( sleeptime ) ; } } catch ( InterruptedException e ) { }
}
public void remove ( ) {
// update statistical data
domainAccess . put ( result . substring ( 6 ) , new Long ( System . currentTimeMillis ( ) ) ) ;
ramIndex . remove ( result ) ;
return result ;
}
private long lastAccessDelta ( String hash ) {
assert hash ! = null ;
Long lastAccess = ( Long ) domainAccess . get ( ( hash . length ( ) > 6 ) ? hash . substring ( 6 ) : hash ) ;
if ( lastAccess = = null ) return Long . MAX_VALUE ; // never accessed
return System . currentTimeMillis ( ) - lastAccess . longValue ( ) ;
}
public synchronized String top ( int dist ) {
int availableInRam = ramStack . size ( ) + sizeDomainStacks ( ) ;
if ( ( availableInRam < dist ) & & ( fileStack . size ( ) > ( dist - availableInRam ) ) ) {
// flush some entries from disc to domain stacks
try {
for ( int i = 0 ; i < ( dist - availableInRam ) ; i + + ) {
ramStack . add ( new String ( fileStack . pop ( ) . getColBytes ( 0 ) ) ) ;
}
} catch ( IOException e ) { }
}
while ( ( sizeDomainStacks ( ) > 0 ) & & ( ramStack . size ( ) < = dist ) ) flushOnceDomStacks ( true ) ; // flush only that much as we need to display
if ( dist > = ramStack . size ( ) ) return null ;
return ( String ) ramStack . get ( dist ) ;
}
}