@ -473,6 +473,7 @@ public class plasmaWordIndexDistribution {
this . seed = seed ;
this . delete = delete ;
this . wordsDBSize = plasmaSwitchboard . getSwitchboard ( ) . wordIndex . size ( ) ;
this . setName ( "TransferIndex_" + seed . getName ( ) ) ;
}
public void run ( ) {
@ -513,18 +514,22 @@ public class plasmaWordIndexDistribution {
try {
plasmaWordIndexDistribution . this . paused = true ;
// collect index
// initial startingpoint of intex transfer is "------------"
plasmaWordIndexDistribution . this . log . logFine ( "Selected hash " + startPointHash + " as start point for index distribution of whole index" ) ;
long start ;
/ * Loop until we have
* - finished transfer of whole index
* - detected a server shutdown or user interruption
* - detected a failure
* /
long start , retryCount = 0 ;
while ( ! finished & & ! Thread . currentThread ( ) . isInterrupted ( ) ) {
int idxCount = 0 ;
start = System . currentTimeMillis ( ) ;
// selecting 500 words to transfer
Object [ ] selectResult = selectTransferIndexes ( startPointHash , 500 ) ;
plasmaWordIndexEntity [ ] indexEntities = ( plasmaWordIndexEntity [ ] ) selectResult [ 0 ] ;
if ( finished | | Thread . currentThread ( ) . isInterrupted ( ) ) {
this . status = "aborted" ;
return ;
}
plasmaWordIndexEntity [ ] indexEntities = ( plasmaWordIndexEntity [ ] ) selectResult [ 0 ] ;
HashMap urlCache = ( HashMap ) selectResult [ 1 ] ; // String (url-hash) / plasmaCrawlLURL.Entry
if ( ( indexEntities = = null ) | | ( indexEntities . length = = 0 ) ) {
@ -532,13 +537,10 @@ public class plasmaWordIndexDistribution {
this . status = "finished." ;
return ;
}
// count the indexes again, can be smaller as expected
int idxCount = 0 ;
for ( int i = 0 ; i < indexEntities . length ; i + + ) {
idxCount + = indexEntities [ i ] . size ( ) ;
}
// count the indexes again, can be smaller as expected
for ( int i = 0 ; i < indexEntities . length ; i + + ) idxCount + = indexEntities [ i ] . size ( ) ;
// find start point for DHT-selection
// getting start point for next DHT-selection
oldStartingPointHash = startPointHash ;
startPointHash = indexEntities [ indexEntities . length - 1 ] . wordHash ( ) ; // DHT targets must have greater hashes
@ -547,20 +549,70 @@ public class plasmaWordIndexDistribution {
( ( System . currentTimeMillis ( ) - start ) / 1000 ) + " seconds (" +
( 1000 * idxCount / ( System . currentTimeMillis ( ) - start + 1 ) ) + " words/s)" ) ;
/ * loop until we
* - have successfully transfered the words list or
* - the retry counter limit was exceeded
* /
start = System . currentTimeMillis ( ) ;
String error = yacyClient . transferIndex ( seed , indexEntities , urlCache ) ;
if ( error = = null ) {
plasmaWordIndexDistribution . this . log . logInfo ( "Index transfer of " + idxCount + " words [" + indexEntities [ 0 ] . wordHash ( ) + " .. " + indexEntities [ indexEntities . length - 1 ] . wordHash ( ) + "]" +
" to peer " + seed . getName ( ) + ":" + seed . hash + " in " +
( ( System . currentTimeMillis ( ) - start ) / 1000 ) + " seconds successfull (" +
( 1000 * idxCount / ( System . currentTimeMillis ( ) - start + 1 ) ) + " words/s)" ) ;
} else {
plasmaWordIndexDistribution . this . log . logWarning ( "Index transfer to peer " + seed . getName ( ) + ":" + seed . hash + " failed:'" + error + "', disconnecting peer" ) ;
yacyCore . peerActions . peerDeparture ( seed ) ;
this . status = "Disconnected peer: " + error ;
return ;
}
while ( true ) {
// testing if we wer aborted
if ( isAborted ( ) ) return ;
// transfering seleted words to remote peer
String error = yacyClient . transferIndex ( seed , indexEntities , urlCache ) ;
if ( error = = null ) {
// words successfully transfered
plasmaWordIndexDistribution . this . log . logInfo ( "Index transfer of " + idxCount + " words [" + indexEntities [ 0 ] . wordHash ( ) + " .. " + indexEntities [ indexEntities . length - 1 ] . wordHash ( ) + "]" +
" to peer " + seed . getName ( ) + ":" + seed . hash + " in " +
( ( System . currentTimeMillis ( ) - start ) / 1000 ) + " seconds successfull (" +
( 1000 * idxCount / ( System . currentTimeMillis ( ) - start + 1 ) ) + " words/s)" ) ;
retryCount = 0 ;
break ;
} else {
// worts transfer failed
// inc retry counter
retryCount + + ;
// testing if we were aborted ...
if ( isAborted ( ) ) return ;
// we have lost the connection to the remote peer. Adding peer to disconnected list
plasmaWordIndexDistribution . this . log . logWarning ( "Index transfer to peer " + seed . getName ( ) + ":" + seed . hash + " failed:'" + error + "', disconnecting peer" ) ;
yacyCore . peerActions . peerDeparture ( seed ) ;
// if the retry counter limit was not exceeded we'll retry it in a few seconds
this . status = "Disconnected peer: " + ( ( retryCount > 5 ) ? error + ". Transfer aborted" : "Retry " + retryCount ) ;
if ( retryCount > 5 ) return ;
Thread . sleep ( retryCount * 5000 ) ;
/ * loop until
* - we have successfully done a peer ping or
* - the retry counter limit was exceeded
* /
while ( true ) {
// testing if we were aborted ...
if ( isAborted ( ) ) return ;
// doing a peer ping to the remote seed
int added = yacyClient . publishMySeed ( seed . getAddress ( ) , seed . hash ) ;
if ( added < 0 ) {
// inc. retry counter
retryCount + + ;
this . status = "Disconnected peer: Peer ping failed. " + ( ( retryCount > 5 ) ? "Transfer aborted." : "Retry " + retryCount ) ;
if ( retryCount > 5 ) return ;
Thread . sleep ( retryCount * 5000 ) ;
continue ;
} else {
seed = yacyCore . seedDB . getConnected ( seed . hash ) ;
this . status = "running" ;
break ;
}
}
}
}
// deleting transfered words from index
if ( delete ) {
try {
if ( deleteTransferIndexes ( indexEntities ) ) {
@ -580,6 +632,8 @@ public class plasmaWordIndexDistribution {
transferedIndexCount + = idxCount ;
}
}
// if we reach this point we were aborted by the user or by server shutdown
this . status = "aborted" ;
} catch ( Exception e ) {
this . status = "Error: " + e . getMessage ( ) ;
@ -587,6 +641,14 @@ public class plasmaWordIndexDistribution {
} finally {
plasmaWordIndexDistribution . this . paused = false ;
}
}
}
}
private boolean isAborted ( ) {
if ( finished | | Thread . currentThread ( ) . isInterrupted ( ) ) {
this . status = "aborted" ;
return true ;
}
return false ;
}
}
}