@ -34,7 +34,6 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap ;
import net.yacy.cora.document.encoding.ASCII ;
import net.yacy.cora.federate.yacy.Distribution ;
import net.yacy.cora.order.Base64Order ;
import net.yacy.cora.storage.HandleSet ;
import net.yacy.cora.util.ConcurrentLog ;
@ -58,10 +57,10 @@ public class Dispatcher {
* - ( 1 ) a number of RWIs are selected and accumulated .
* When they are selected , they are removed from the index
* - ( 2 ) the RWI collection is split into a number of partitions according to the vertical DHT .
* - ( 3 ) the split RWIs are enqueued as Entry object in the entry ' cloud ' of the dispatcher
* - ( 4 ) more entries may be enqueued to the dispatcher and entries with the same primary target
* are accumulated .
* - ( 5 ) the largest entries are selected from the dispatcher cloud and enqueued to the ' next ' array
* - ( 3 ) the split RWIs are enqueued as Entry object in the write buffer of the dispatcher
* - ( 4 ) more entries may be enqueued to the dispatcher and
* entries with the same primary target are accumulated to the same buffer entry .
* - ( 5 ) the largest entries are selected from the dispatcher write buffer and enqueued to the ' next ' array
* which means that they are ready for transmission
* - ( 6 ) the dispatcher takes some of the entries in the next queue and initiates
* transmission to other peers concurrently . As much transmissions are initiated concurrently
@ -83,9 +82,11 @@ public class Dispatcher {
* called failed . In case of a fail , the RWI fragment is put back into the backend index structure
* /
// a cloud is a cache for the objects that wait to be transmitted
// the String-key is the primary target as contained in the Entry
private Map < String , Transmission . Chunk > transmissionCloud ;
/ * *
* A transmission buffer is a write buffer for the rwi objects ( indices ) that wait to be transmitted .
* The String - key is the primary target as contained in the chunk entry .
* /
private Map < String , Transmission . Chunk > transmissionBuffer ;
// the segment backend is used to store the remaining indexContainers in case that the object is closed
private final Segment segment ;
@ -108,7 +109,7 @@ public class Dispatcher {
final boolean gzipBody ,
final int timeout
) {
this . transmission Cloud = new ConcurrentHashMap < String , Transmission . Chunk > ( ) ;
this . transmission Buffer = new ConcurrentHashMap < String , Transmission . Chunk > ( ) ;
this . segment = segment ;
this . seeds = seeds ;
this . log = new ConcurrentLog ( "INDEX-TRANSFER-DISPATCHER" ) ;
@ -127,8 +128,8 @@ public class Dispatcher {
this , "transferDocumentIndex" , concurrentSender * 3 , null , concurrentSender ) ;
}
public int cloud Size( ) {
return ( this . transmission Cloud = = null ) ? 0 : this . transmission Cloud . size ( ) ;
public int buffer Size( ) {
return ( this . transmission Buffer = = null ) ? 0 : this . transmission Buffer . size ( ) ;
}
public int transmissionSize ( ) {
@ -137,8 +138,8 @@ public class Dispatcher {
/ * *
* PROCESS ( 1 )
* s elect a number of index containers from the backend index .
* Selected containers are removed from the backend .
* S elect a number of index containers from the RWI index .
* Selected containers are removed from the RWIs ( not from Solr , only the DHT references ) .
* @param hash
* @param limitHash
* @param maxContainerCount
@ -227,31 +228,9 @@ public class Dispatcher {
* PROCESS ( 2 )
* split a list of containers into partitions according to the vertical distribution scheme
* @param containers
* @param scheme
* @return
* @return a # verticalPartitions list of reference containers , one for each vertical position
* @throws SpaceExceededException
* /
@SuppressWarnings ( "unchecked" )
private List < ReferenceContainer < WordReference > > [ ] splitContainers ( final List < ReferenceContainer < WordReference > > containers ) throws SpaceExceededException {
// init the result vector
final int partitionCount = this . seeds . scheme . verticalPartitions ( ) ;
final List < ReferenceContainer < WordReference > > [ ] partitions = ( List < ReferenceContainer < WordReference > > [ ] ) Array . newInstance ( ArrayList . class , partitionCount ) ;
for ( int i = 0 ; i < partitions . length ; i + + ) partitions [ i ] = new ArrayList < ReferenceContainer < WordReference > > ( ) ;
// check all entries and split them to the partitions
for ( final ReferenceContainer < WordReference > container : containers ) {
// init the new partitions
final ReferenceContainer < WordReference > [ ] partitionBuffer = splitContainer ( container ) ;
// add the containers to the result vector
for ( int j = 0 ; j < partitionBuffer . length ; j + + ) {
partitions [ j ] . add ( partitionBuffer [ j ] ) ;
}
}
return partitions ;
}
private ReferenceContainer < WordReference > [ ] splitContainer ( final ReferenceContainer < WordReference > container ) throws SpaceExceededException {
// init the result vector
@ -279,116 +258,97 @@ public class Dispatcher {
/ * *
* PROCESS ( 3 ) and PROCESS ( 4 )
* put containers into cloud . This needs information about the network ,
* put containers into the write buffer . This needs information about the network ,
* because the possible targets are assigned here as well . The indexRepositoryReference
* is the database of references which is needed here because this is the place where
* finally is checked if the reference exists . If the entry does not exist for specific
* entries in the indexContainer , then it is discarded . If it exists , then the entry is
* stored in a cache of the Entry for later transmission to the targets , which means that
* then no additional IO is necessary .
* @param containers a reference containers array , one container for each vertical position
* /
private void enqueueContainersTo Cloud( final List < ReferenceContainer < WordReference > > [ ] containers ) {
private void enqueueContainersTo Buffer( final byte [ ] wordhash , final ReferenceContainer < WordReference > [ ] containers ) {
assert ( containers . length = = this . seeds . scheme . verticalPartitions ( ) ) ;
if ( this . transmission Cloud = = null ) return ;
byte [ ] primaryTarget ;
String primaryTargetString ;
Transmission . Chunk entry ;
if ( this . transmission Buffer = = null ) return ;
List < Seed > [ ] targets = DHTSelection . selectDHTDistributionTargets ( this . seeds , wordhash , 3 , this . seeds . redundancy ( ) ) ;
assert ( targets . length = = this . seeds . scheme . verticalPartitions ( ) ) ;
assert ( targets . length = = containers . length ) ;
for ( int vertical = 0 ; vertical < containers . length ; vertical + + ) {
List < ReferenceContainer < WordReference > > verticalList = containers [ vertical ] ;
ReferenceContainer < WordReference > lastContainer = verticalList . get ( verticalList . size ( ) - 1 ) ;
primaryTarget = Distribution . positionToHash ( this . seeds . scheme . verticalDHTPosition ( lastContainer . getTermHash ( ) , vertical ) ) ;
assert primaryTarget [ 2 ] ! = '@' ;
primaryTargetString = ASCII . String ( primaryTarget ) ;
// get or make a entry object
entry = this . transmissionCloud . get ( primaryTargetString ) ; // if this is not null, the entry is extended here
final List < Seed > targets = DHTSelection . getAcceptRemoteIndexSeedsList (
this . seeds ,
primaryTarget ,
this . seeds . redundancy ( ) * 3 ,
true ) ;
this . log . info ( "enqueueContainers: selected " + targets . size ( ) + " targets for primary target key " + primaryTargetString + "/" + vertical + " with " + verticalList . size ( ) + " index containers." ) ;
if ( entry = = null ) entry = this . transmission . newChunk ( primaryTargetString , targets ) ;
// fill the entry with the containers
for ( final ReferenceContainer < WordReference > c : verticalList ) {
ReferenceContainer < WordReference > verticalContainer = containers [ vertical ] ;
if ( verticalContainer . isEmpty ( ) ) continue ;
// extend the transmissionBuffer with entries for each redundant position
for ( Seed target : targets [ vertical ] ) {
Transmission . Chunk entry = this . transmissionBuffer . get ( target . hash ) ; // if this is not null, the entry is extended here
if ( entry = = null ) entry = transmission . newChunk ( target ) ; else {
log . info ( "extending chunk for peer " + entry . dhtTarget ( ) . hash + " containing " + entry . containersSize ( ) + " references with " + verticalContainer . size ( ) + " more entries" ) ;
}
try {
entry . add ( c) ;
} catch ( final SpaceExceededException e ) {
entry . add ( verticalContainer ) ;
} catch ( SpaceExceededException e ) {
ConcurrentLog . logException ( e ) ;
break ;
}
this . transmissionBuffer . put ( target . hash , entry ) ;
}
// put the entry into the cloud
if ( this . transmissionCloud ! = null & & entry . containersSize ( ) > 0 ) this . transmissionCloud . put ( primaryTargetString , entry ) ;
}
}
public boolean selectContainersEnqueueTo Cloud (
public boolean selectContainersEnqueueTo Buffer (
final byte [ ] hash ,
final byte [ ] limitHash ,
final int maxContainerCount ,
final int maxReferenceCount ,
final int maxtime ) {
if ( this . transmission Cloud = = null ) return false ;
if ( this . transmission Buffer = = null ) return false ;
List < ReferenceContainer < WordReference > > selectedContainerCache ;
try {
selectedContainerCache = selectContainers ( hash , limitHash , maxContainerCount , maxReferenceCount , maxtime ) ;
} catch ( final IOException e ) {
this . log . severe ( "selectContainersEnqueueTo Cloud : selectedContainer failed", e ) ;
this . log . severe ( "selectContainersEnqueueTo Buffer : selectedContainer failed", e ) ;
return false ;
}
this . log . info ( "selectContainersEnqueueTo Cloud : selectedContainerCache was filled with " + selectedContainerCache . size ( ) + " entries" ) ;
this . log . info ( "selectContainersEnqueueTo Buffer : selectedContainerCache was filled with " + selectedContainerCache . size ( ) + " entries" ) ;
if ( selectedContainerCache = = null | | selectedContainerCache . isEmpty ( ) ) {
this . log . info ( "selectContainersEnqueueTo Cloud : selectedContainerCache is empty, cannot do anything here.") ;
this . log . info ( "selectContainersEnqueueTo Buffer : selectedContainerCache is empty, cannot do anything here.") ;
return false ;
}
List < ReferenceContainer < WordReference > > [ ] splitContainerCache ; // for each vertical partition a set of word references within a reference container
// check all entries and split them to the partitions
try {
splitContainerCache = splitContainers ( selectedContainerCache ) ;
for ( final ReferenceContainer < WordReference > container : selectedContainerCache ) {
// init the new partitions
final ReferenceContainer < WordReference > [ ] partitionBuffer = splitContainer ( container ) ;
enqueueContainersToBuffer ( container . getTermHash ( ) , partitionBuffer ) ;
}
} catch ( final SpaceExceededException e ) {
this . log . severe ( "selectContainersEnqueueToCloud: splitContainers failed because of too low RAM" , e ) ;
this . log . severe ( "s plitContainer : splitContainers failed because of too low RAM", e ) ;
return false ;
}
selectedContainerCache = null ;
if ( splitContainerCache = = null ) {
this . log . info ( "selectContainersEnqueueToCloud: splitContainerCache is empty, cannot do anything here." ) ;
return false ;
}
this . log . info ( "splitContainersFromCache: splitContainerCache filled with " + splitContainerCache . length + " partitions, deleting selectedContainerCache" ) ;
if ( splitContainerCache . length ! = this . seeds . scheme . verticalPartitions ( ) ) {
this . log . warn ( "selectContainersEnqueueToCloud: splitContainerCache has wrong length." ) ;
return false ;
}
enqueueContainersToCloud ( splitContainerCache ) ;
splitContainerCache = null ;
this . log . info ( "selectContainersEnqueueToCloud: splitContainerCache enqueued to cloud array which has now " + this . transmissionCloud . size ( ) + " entries." ) ;
this . log . info ( "selectContainersEnqueueToBuffer: splitContainerCache enqueued to the write buffer array which has now " + this . transmissionBuffer . size ( ) + " entries." ) ;
return true ;
}
/ * *
* PROCESS ( 5 )
* take the largest container from the cloud and put it into the ' next ' array ,
* take the largest container from the write buffer and put it into the ' next ' array ,
* where it waits to be processed .
* This method returns true if a container was dequeued , false if not
* /
public boolean dequeueContainer ( ) {
if ( this . transmission Cloud = = null ) return false ;
if ( this . transmissionBuffer = = null ) return false ;
if ( this . indexingTransmissionProcessor . getQueueSize ( ) > this . indexingTransmissionProcessor . getMaxConcurrency ( ) ) return false ;
String maxtarget = null ;
int maxsize = - 1 ;
for ( final Map . Entry < String , Transmission . Chunk > chunk : this . transmission Cloud . entrySet ( ) ) {
for ( final Map . Entry < String , Transmission . Chunk > chunk : this . transmission Buffer . entrySet ( ) ) {
if ( chunk . getValue ( ) . containersSize ( ) > maxsize ) {
maxsize = chunk . getValue ( ) . containersSize ( ) ;
maxtarget = chunk . getKey ( ) ;
}
}
if ( maxsize < 0 ) return false ;
final Transmission . Chunk chunk = this . transmission Cloud . remove ( maxtarget ) ;
final Transmission . Chunk chunk = this . transmission Buffer . remove ( maxtarget ) ;
this . indexingTransmissionProcessor . enQueue ( chunk ) ;
return true ;
}
@ -410,20 +370,9 @@ public class Dispatcher {
// do the transmission
final boolean success = chunk . transmit ( ) ;
if ( success ) return chunk ;
if ( success & & chunk . isFinished ( ) ) {
// finished with this queue!
this . log . info ( "STORE: Chunk " + chunk . primaryTarget ( ) + " has FINISHED all transmissions!" ) ;
return chunk ;
}
if ( ! success ) this . log . info ( "STORE: Chunk " + chunk . primaryTarget ( ) + " has failed to transmit index; marked peer as busy" ) ;
if ( chunk . canFinish ( ) ) {
if ( this . indexingTransmissionProcessor ! = null ) this . indexingTransmissionProcessor . enQueue ( chunk ) ;
return chunk ;
}
this . log . info ( "STORE: Chunk " + chunk . primaryTarget ( ) + " has not enough targets left. This transmission has failed, putting back index to backend" ) ;
this . log . info ( "STORE: Chunk " + chunk . dhtTarget ( ) . getName ( ) + " does not respond or accept the dht index, putting back index to backend" ) ;
chunk . restore ( ) ;
return null ;
}
@ -431,8 +380,8 @@ public class Dispatcher {
public void close ( ) {
// removes all entries from the dispatcher and puts them back to a RAMRI
if ( this . indexingTransmissionProcessor ! = null ) this . indexingTransmissionProcessor . shutdown ( ) ;
if ( this . transmission Cloud ! = null ) {
outerLoop : for ( final Map . Entry < String , Transmission . Chunk > e : this . transmission Cloud . entrySet ( ) ) {
if ( this . transmission Buffer ! = null ) {
outerLoop : for ( final Map . Entry < String , Transmission . Chunk > e : this . transmission Buffer . entrySet ( ) ) {
for ( final ReferenceContainer < WordReference > i : e . getValue ( ) ) try {
this . segment . storeRWI ( i ) ;
} catch ( final Exception e1 ) {
@ -440,9 +389,9 @@ public class Dispatcher {
break outerLoop ;
}
}
this . transmission Cloud . clear ( ) ;
this . transmission Buffer . clear ( ) ;
}
this . transmission Cloud = null ;
this . transmission Buffer = null ;
if ( this . indexingTransmissionProcessor ! = null ) {
this . indexingTransmissionProcessor . clear ( ) ;
}