@ -92,11 +92,7 @@ public class Dispatcher {
private serverProcessor < Transmission . Chunk > indexingTransmissionProcessor ;
// transmission object
private Transmission transmissions ;
// caching objects
private ArrayList < indexContainer > selectedContainerCache ;
private ArrayList < indexContainer > [ ] splittedContainerCache ;
private Transmission transmission ;
public Dispatcher (
final indexRI backend ,
@ -109,15 +105,15 @@ public class Dispatcher {
this . backend = backend ;
this . seeds = seeds ;
this . log = new Log ( "INDEX TRANSFER DISPATCHER" ) ;
this . transmission s = new Transmission (
this . transmission = new Transmission (
log ,
repository ,
seeds ,
backend ,
gzipBody ,
timeout ) ;
this . selectedContainerCache = null ;
this . splittedContainerCache = null ;
//this.selectedContainerCache = null;
//this.splittedContainerCache = null;
int concurrentSender = Math . min ( 25 , Math . max ( 10 , serverProcessor . useCPU * 2 + 1 ) ) ;
indexingTransmissionProcessor = new serverProcessor < Transmission . Chunk > (
@ -127,6 +123,14 @@ public class Dispatcher {
this , "storeDocumentIndex" , concurrentSender * 2 , null , concurrentSender ) ;
}
public int cloudSize ( ) {
return this . transmissionCloud . size ( ) ;
}
public int transmissionSize ( ) {
return this . indexingTransmissionProcessor . queueSize ( ) ;
}
/ * *
* PROCESS ( 1 )
* select a number of index containers from the backend index .
@ -138,7 +142,7 @@ public class Dispatcher {
* @return
* @throws IOException
* /
p ublic ArrayList < indexContainer > selectContainers (
p rivate ArrayList < indexContainer > selectContainers (
final String hash ,
final String limitHash ,
final int maxContainerCount ,
@ -153,7 +157,7 @@ public class Dispatcher {
return containers ;
}
p ublic ArrayList < indexContainer > selectContainers (
p rivate ArrayList < indexContainer > selectContainers (
final String hash ,
final String limitHash ,
final int maxContainerCount ,
@ -190,30 +194,6 @@ public class Dispatcher {
return containers ;
}
/ * *
* convenience method for the selection process : put the result in an internal cache
* @param hash
* @param limitHash
* @param maxContainerCount
* @param maxtime
* @return
* @throws IOException
* /
public synchronized int selectContainersToCache (
final String hash ,
final String limitHash ,
final int maxContainerCount ,
final int maxReferenceCount ,
final int maxtime ) throws IOException {
if ( this . selectedContainerCache ! = null & & this . selectedContainerCache . size ( ) > 0 ) {
this . log . logInfo ( "selectContainersToCache: selectedContainerCache is already filled, no selection done." ) ;
return 0 ;
}
this . selectedContainerCache = selectContainers ( hash , limitHash , maxContainerCount , maxReferenceCount , maxtime ) ;
this . log . logInfo ( "selectContainersToCache: selectedContainerCache was filled with " + this . selectedContainerCache . size ( ) + " entries" ) ;
return this . selectedContainerCache . size ( ) ;
}
/ * *
* PROCESS ( 2 )
* split a list of containers into partitions according to the vertical distribution scheme
@ -222,7 +202,7 @@ public class Dispatcher {
* @return
* /
@SuppressWarnings ( "unchecked" )
p ublic ArrayList < indexContainer > [ ] splitContainers ( ArrayList < indexContainer > containers ) {
p rivate ArrayList < indexContainer > [ ] splitContainers ( ArrayList < indexContainer > containers ) {
// init the result vector
int partitionCount = this . seeds . scheme . verticalPartitions ( ) ;
@ -254,25 +234,6 @@ public class Dispatcher {
return partitions ;
}
/ * *
* convenience method for splitContainers : use the container cache and write into a splitted container cache .
* @return
* /
public synchronized int splitContainersFromCache ( ) {
if ( selectedContainerCache = = null | | selectedContainerCache . size ( ) = = 0 ) {
this . log . logInfo ( "splitContainersFromCache: selectedContainerCache is empty, cannot do anything here." ) ;
return 0 ;
}
if ( splittedContainerCache ! = null & & splittedContainerCache . length > 0 ) {
this . log . logInfo ( "splitContainersFromCache: splittedContainerCache is aready filled, doing nothing now." ) ;
return 0 ;
}
this . splittedContainerCache = splitContainers ( selectedContainerCache ) ;
this . selectedContainerCache = null ;
this . log . logInfo ( "splitContainersFromCache: splittedContainerCache filled with " + this . splittedContainerCache . length + " partitions, deleting selectedContainerCache" ) ;
return this . splittedContainerCache . length ;
}
/ * *
* PROCESS ( 3 ) and PROCESS ( 4 )
* put containers into cloud . This needs information about the network ,
@ -283,7 +244,7 @@ public class Dispatcher {
* stored in a cache of the Entry for later transmission to the targets , which means that
* then no additional IO is necessary .
* /
p ublic void enqueueContainers ( final ArrayList < indexContainer > [ ] containers ) {
p rivate void enqueueContainersToCloud ( final ArrayList < indexContainer > [ ] containers ) {
indexContainer lastContainer ;
String primaryTarget ;
Transmission . Chunk entry ;
@ -300,7 +261,7 @@ public class Dispatcher {
seeds . redundancy ( ) * 3 ,
true ) ;
this . log . logInfo ( "enqueueContainers: selected " + targets . size ( ) + " targets for primary target key " + primaryTarget + "/" + vertical + " with " + containers [ vertical ] . size ( ) + " index containers." ) ;
if ( entry = = null ) entry = transmission s . newChunk ( primaryTarget , targets , lastContainer . row ( ) ) ;
if ( entry = = null ) entry = transmission . newChunk ( primaryTarget , targets , lastContainer . row ( ) ) ;
// fill the entry with the containers
for ( indexContainer c : containers [ vertical ] ) {
@ -311,14 +272,35 @@ public class Dispatcher {
if ( entry . containersSize ( ) > 0 ) this . transmissionCloud . put ( primaryTarget , entry ) ;
}
}
public boolean enqueueContainersFromCache ( ) {
if ( this . splittedContainerCache = = null ) {
public synchronized boolean selectContainersEnqueueToCloud (
final String hash ,
final String limitHash ,
final int maxContainerCount ,
final int maxReferenceCount ,
final int maxtime ) throws IOException {
ArrayList < indexContainer > selectedContainerCache = selectContainers ( hash , limitHash , maxContainerCount , maxReferenceCount , maxtime ) ;
this . log . logInfo ( "selectContainersToCache: selectedContainerCache was filled with " + selectedContainerCache . size ( ) + " entries" ) ;
if ( selectedContainerCache = = null | | selectedContainerCache . size ( ) = = 0 ) {
this . log . logInfo ( "splitContainersFromCache: selectedContainerCache is empty, cannot do anything here." ) ;
return false ;
}
ArrayList < indexContainer > [ ] splittedContainerCache = splitContainers ( selectedContainerCache ) ;
selectedContainerCache = null ;
if ( splittedContainerCache = = null ) {
this . log . logInfo ( "enqueueContainersFromCache: splittedContainerCache is empty, cannot do anything here." ) ;
return false ;
}
enqueueContainers ( this . splittedContainerCache ) ;
this . splittedContainerCache = null ;
this . log . logInfo ( "splitContainersFromCache: splittedContainerCache filled with " + splittedContainerCache . length + " partitions, deleting selectedContainerCache" ) ;
if ( splittedContainerCache . length ! = this . seeds . scheme . verticalPartitions ( ) ) {
this . log . logWarning ( "enqueueContainersFromCache: splittedContainerCache has wrong length." ) ;
return false ;
}
enqueueContainersToCloud ( splittedContainerCache ) ;
splittedContainerCache = null ;
this . log . logInfo ( "enqueueContainersFromCache: splittedContainerCache enqueued to cloud array which has now " + this . transmissionCloud . size ( ) + " entries." ) ;
return true ;
}
@ -360,7 +342,7 @@ public class Dispatcher {
return chunk ;
}
this . log . logInfo ( "STORE: Chunk " + chunk . primaryTarget ( ) + " has failed to transmit index; removed peer from network ") ;
this . log . logInfo ( "STORE: Chunk " + chunk . primaryTarget ( ) + " has failed to transmit index; marked peer as busy ") ;
if ( chunk . canFinish ( ) ) {
try {