@ -68,7 +68,6 @@ public final class plasmaWordIndexDistribution {
private boolean closed ;
private boolean gzipBody4Distribution ;
private int timeout4Distribution ;
public transferIndexThread transferIdxThread = null ;
public plasmaWordIndexDistribution (
plasmaURLPool urlPool ,
@ -108,9 +107,7 @@ public final class plasmaWordIndexDistribution {
public void close ( ) {
closed = true ;
if ( transferIdxThread ! = null ) {
stopTransferWholeIndex ( false ) ;
}
}
private boolean isClosed ( ) {
@ -278,260 +275,4 @@ public final class plasmaWordIndexDistribution {
}
}
public void startTransferWholeIndex ( yacySeed seed , boolean delete ) {
if ( transferIdxThread = = null ) {
this . transferIdxThread = new transferIndexThread ( seed , delete ) ;
this . transferIdxThread . start ( ) ;
}
}
public void stopTransferWholeIndex ( boolean wait ) {
if ( ( transferIdxThread ! = null ) & & ( transferIdxThread . isAlive ( ) ) & & ( ! transferIdxThread . isFinished ( ) ) ) {
try {
this . transferIdxThread . stopIt ( wait ) ;
} catch ( InterruptedException e ) { }
}
}
public void abortTransferWholeIndex ( boolean wait ) {
if ( transferIdxThread ! = null ) {
if ( ! transferIdxThread . isFinished ( ) )
try {
this . transferIdxThread . stopIt ( wait ) ;
} catch ( InterruptedException e ) { }
transferIdxThread = null ;
}
}
public class transferIndexThread extends Thread {
private yacySeed seed = null ;
private boolean delete = false ;
private boolean finished = false ;
private boolean gzipBody4Transfer = false ;
private int timeout4Transfer = 60000 ;
private int transferedEntryCount = 0 ;
private int transferedContainerCount = 0 ;
private String status = "Running" ;
private String oldStartingPointHash = "------------" , startPointHash = "------------" ;
private int initialWordsDBSize = 0 ;
private int chunkSize = 500 ;
private final long startingTime = System . currentTimeMillis ( ) ;
private final plasmaSwitchboard sb ;
private plasmaDHTTransfer worker = null ;
public transferIndexThread ( yacySeed seed , boolean delete ) {
super ( new ThreadGroup ( "TransferIndexThreadGroup" ) , "TransferIndex_" + seed . getName ( ) ) ;
this . seed = seed ;
this . delete = delete ;
this . sb = plasmaSwitchboard . getSwitchboard ( ) ;
this . initialWordsDBSize = sb . wordIndex . size ( ) ;
this . gzipBody4Transfer = "true" . equalsIgnoreCase ( sb . getConfig ( "indexTransfer.gzipBody" , "false" ) ) ;
this . timeout4Transfer = ( int ) sb . getConfigLong ( "indexTransfer.timeout" , 60000 ) ;
//this.maxOpenFiles4Transfer = (int) sb.getConfigLong("indexTransfer.maxOpenFiles",800);
}
public void run ( ) {
performTransferWholeIndex ( ) ;
}
public void stopIt ( boolean wait ) throws InterruptedException {
this . finished = true ;
if ( wait ) this . join ( ) ;
}
public boolean isFinished ( ) {
return this . finished ;
}
public boolean deleteIndex ( ) {
return this . delete ;
}
public int [ ] getIndexCount ( ) {
plasmaDHTTransfer workerThread = this . worker ;
if ( workerThread ! = null ) {
return new int [ ] { this . chunkSize , workerThread . getIndexCount ( ) } ;
}
return new int [ ] { this . chunkSize , 500 } ;
}
public int getTransferedEntryCount ( ) {
return this . transferedEntryCount ;
}
public int getTransferedContainerCount ( ) {
return this . transferedContainerCount ;
}
public float getTransferedContainerPercent ( ) {
long currentWordsDBSize = sb . wordIndex . size ( ) ;
if ( initialWordsDBSize = = 0 ) return 100 ;
else if ( currentWordsDBSize > = initialWordsDBSize ) return 0 ;
//else return (float) ((initialWordsDBSize-currentWordsDBSize)/(initialWordsDBSize/100));
else return ( float ) ( this . transferedContainerCount * 100 / initialWordsDBSize ) ;
}
public int getTransferedEntitySpeed ( ) {
long transferTime = System . currentTimeMillis ( ) - startingTime ;
if ( transferTime < = 0 ) transferTime = 1 ;
return ( int ) ( ( 1000 * transferedEntryCount ) / transferTime ) ;
}
public yacySeed getSeed ( ) {
return this . seed ;
}
public String [ ] getStatus ( ) {
plasmaDHTTransfer workerThread = this . worker ;
if ( workerThread ! = null ) {
return new String [ ] { this . status , workerThread . getStatusMessage ( ) } ;
}
return new String [ ] { this . status , "Not running" } ;
}
public String [ ] getRange ( ) {
plasmaDHTTransfer workerThread = this . worker ;
if ( workerThread ! = null ) {
return new String [ ] { "[" + oldStartingPointHash + ".." + startPointHash + "]" , workerThread . getRange ( ) } ;
}
return new String [ ] { "[" + oldStartingPointHash + ".." + startPointHash + "]" , "[------------..------------]" } ;
}
public void performTransferWholeIndex ( ) {
plasmaDHTChunk newDHTChunk = null , oldDHTChunk = null ;
try {
// pausing the regular index distribution
// TODO: adding sync, to wait for a still running index distribution to finish
plasmaWordIndexDistribution . this . paused = true ;
// initial startingpoint of intex transfer is "------------"
plasmaWordIndexDistribution . this . log . logFine ( "Selected hash " + startPointHash + " as start point for index distribution of whole index" ) ;
/ * Loop until we have
* - finished transfer of whole index
* - detected a server shutdown or user interruption
* - detected a failure
* /
long selectionStart = System . currentTimeMillis ( ) , selectionEnd = 0 , selectionTime = 0 , iteration = 0 ;
while ( ! finished & & ! Thread . currentThread ( ) . isInterrupted ( ) ) {
iteration + + ;
selectionStart = System . currentTimeMillis ( ) ;
oldDHTChunk = newDHTChunk ;
// selecting 500 words to transfer
this . status = "Running: Selecting chunk " + iteration ;
newDHTChunk = new plasmaDHTChunk ( plasmaWordIndexDistribution . this . log , wordIndex , sb . urlPool . loadedURL , this . chunkSize / 3 , this . chunkSize , this . startPointHash ) ;
/ * If we havn ' t selected a word chunk this could be because of
* a ) no words are left in the index
* b ) max open file limit was exceeded
* /
if ( ( newDHTChunk = = null ) | |
( newDHTChunk . containerSize ( ) = = 0 ) | |
( newDHTChunk . getStatus ( ) = = plasmaDHTChunk . chunkStatus_FAILED ) ) {
if ( sb . wordIndex . size ( ) > 0 ) {
// if there are still words in the index we try it again now
startPointHash = "------------" ;
} else {
// otherwise we could end transfer now
plasmaWordIndexDistribution . this . log . logFine ( "No index available for index transfer, hash start-point " + startPointHash ) ;
this . status = "Finished. " + iteration + " chunks transfered." ;
finished = true ;
}
} else {
// getting start point for next DHT-selection
oldStartingPointHash = startPointHash ;
startPointHash = newDHTChunk . lastContainer ( ) . wordHash ( ) ; // DHT targets must have greater hashes
selectionEnd = System . currentTimeMillis ( ) ;
selectionTime = selectionEnd - selectionStart ;
plasmaWordIndexDistribution . this . log . logInfo ( "Index selection of " + newDHTChunk . indexCount ( ) + " words [" + newDHTChunk . firstContainer ( ) . wordHash ( ) + " .. " + newDHTChunk . lastContainer ( ) . wordHash ( ) + "]" +
" in " +
( selectionTime / 1000 ) + " seconds (" +
( 1000 * newDHTChunk . indexCount ( ) / ( selectionTime + 1 ) ) + " words/s)" ) ;
}
// query status of old worker thread
if ( worker ! = null ) {
this . status = "Finished: Selecting chunk " + iteration ;
worker . join ( ) ;
if ( ! worker . success ) {
// if the transfer failed we abort index transfer now
this . status = "Aborted because of Transfer error:\n" + worker . getStatus ( ) ;
// abort index transfer
return ;
} else {
/ *
* If index transfer was done successfully we close all remaining open
* files that belong to the old index chunk and handover a new chunk
* to the transfer thread .
* Addintionally we recalculate the chunk size to optimize performance
* /
this . chunkSize = worker . getIndexCount ( ) ;
long transferTime = worker . getTransferTime ( ) ;
//TODO: only increase chunk Size if there is free memory left on the server
// we need aprox. 73Byte per IndexEntity and an average URL length of 32 char
//if (ft.freeMemory() < 73*2*100)
if ( transferTime > 60 * 1000 ) {
if ( chunkSize > 200 ) chunkSize - = 100 ;
} else if ( selectionTime < transferTime ) {
this . chunkSize + = 100 ;
//chunkSize+=50;
} else if ( selectionTime > = selectionTime ) {
if ( chunkSize > 200 ) chunkSize - = 100 ;
}
selectionStart = System . currentTimeMillis ( ) ;
// deleting transfered words from index
if ( delete ) {
this . status = "Running: Deleting chunk " + iteration ;
transferedEntryCount + = oldDHTChunk . indexCount ( ) ;
transferedContainerCount + = oldDHTChunk . containerSize ( ) ;
int urlReferences = oldDHTChunk . deleteTransferIndexes ( ) ;
plasmaWordIndexDistribution . this . log . logFine ( "Deleted from " + oldDHTChunk . containerSize ( ) + " transferred RWIs locally " + urlReferences + " URL references" ) ;
} else {
transferedEntryCount + = oldDHTChunk . indexCount ( ) ;
transferedContainerCount + = oldDHTChunk . containerSize ( ) ;
}
oldDHTChunk = null ;
}
this . worker = null ;
}
// handover chunk to transfer worker
if ( ( newDHTChunk ! = null ) & &
( newDHTChunk . containerSize ( ) > 0 ) | |
( newDHTChunk . getStatus ( ) = = plasmaDHTChunk . chunkStatus_FILLED ) ) {
worker = new plasmaDHTTransfer ( log , seed , newDHTChunk ,
gzipBody4Transfer , timeout4Transfer , iteration ,
startPointHash , oldStartingPointHash ) ;
worker . start ( ) ;
}
}
// if we reach this point we were aborted by the user or by server shutdown
if ( sb . wordIndex . size ( ) > 0 ) this . status = "aborted" ;
} catch ( Exception e ) {
this . status = "Error: " + e . getMessage ( ) ;
plasmaWordIndexDistribution . this . log . logWarning ( "Index transfer to peer " + seed . getName ( ) + ":" + seed . hash + " failed:'" + e . getMessage ( ) + "'" , e ) ;
} finally {
if ( worker ! = null ) {
worker . stopIt ( ) ;
try { worker . join ( ) ; } catch ( Exception e ) { }
// worker = null;
}
plasmaWordIndexDistribution . this . paused = false ;
}
}
}
}