@ -121,6 +121,10 @@ public final class plasmaWordIndexDistribution {
}
}
private boolean isClosed ( ) {
return ( this . closed | | Thread . currentThread ( ) . isInterrupted ( ) ) ;
}
public boolean job ( ) {
if ( this . closed ) {
@ -192,111 +196,114 @@ public final class plasmaWordIndexDistribution {
public int performTransferIndex ( int indexCount , int peerCount , boolean delete ) {
if ( ( yacyCore . seedDB = = null ) | | ( yacyCore . seedDB . sizeConnected ( ) = = 0 ) ) return - 1 ;
// collect index
String startPointHash = selectTransferStart ( ) ;
log . logFine ( "Selected hash " + startPointHash + " as start point for index distribution, distance = " + yacyDHTAction . dhtDistance ( yacyCore . seedDB . mySeed . hash , startPointHash ) ) ;
Object [ ] selectResult = selectTransferContainers ( startPointHash , indexCount ) ;
plasmaWordIndexEntryContainer [ ] indexContainers = ( plasmaWordIndexEntryContainer [ ] ) selectResult [ 0 ] ;
HashMap urlCache = ( HashMap ) selectResult [ 1 ] ; // String (url-hash) / plasmaCrawlLURL.Entry
if ( ( indexContainers = = null ) | | ( indexContainers . length = = 0 ) ) {
log . logFine ( "No index available for index transfer, hash start-point " + startPointHash ) ;
return - 1 ;
}
// count the indexes again, can be smaller as expected
indexCount = 0 ;
for ( int i = 0 ; i < indexContainers . length ; i + + ) {
indexCount + = indexContainers [ i ] . size ( ) ;
}
if ( indexCount < 50 ) {
log . logFine ( "Too few (" + indexCount + ") indexes selected for transfer." ) ;
closeTransferIndexes ( indexContainers ) ;
return - 1 ; // failed
}
// find start point for DHT-selection
String keyhash = indexContainers [ indexContainers . length - 1 ] . wordHash ( ) ; // DHT targets must have greater hashes
// find a list of DHT-peers
yacySeed [ ] seeds = new yacySeed [ peerCount + 10 ] ;
int hc0 = 0 ;
double ownDistance = Math . min ( yacyDHTAction . dhtDistance ( yacyCore . seedDB . mySeed . hash , indexContainers [ 0 ] . wordHash ( ) ) ,
yacyDHTAction . dhtDistance ( yacyCore . seedDB . mySeed . hash , indexContainers [ indexContainers . length - 1 ] . wordHash ( ) ) ) ;
double maxDistance = Math . min ( ownDistance , 0.4 ) ;
synchronized ( yacyCore . dhtAgent ) {
double avdist ;
Enumeration e = yacyCore . dhtAgent . getAcceptRemoteIndexSeeds ( keyhash ) ;
while ( ( e . hasMoreElements ( ) ) & & ( hc0 < seeds . length ) ) {
if ( closed ) {
log . logSevere ( "Index distribution interrupted by close, nothing deleted locally." ) ;
return - 1 ; // interrupted
}
seeds [ hc0 ] = ( yacySeed ) e . nextElement ( ) ;
if ( seeds [ hc0 ] ! = null ) {
avdist = Math . max ( yacyDHTAction . dhtDistance ( seeds [ hc0 ] . hash , indexContainers [ 0 ] . wordHash ( ) ) ,
yacyDHTAction . dhtDistance ( seeds [ hc0 ] . hash , indexContainers [ indexContainers . length - 1 ] . wordHash ( ) ) ) ;
if ( avdist < maxDistance ) {
log . logInfo ( "Selected " + ( ( hc0 < peerCount ) ? "primary" : "reserve" ) + " DHT target peer " + seeds [ hc0 ] . getName ( ) + ":" + seeds [ hc0 ] . hash + ", distance = " + avdist ) ;
hc0 + + ;
plasmaWordIndexEntryContainer [ ] indexContainers = null ;
try {
String startPointHash = selectTransferStart ( ) ;
this . log . logFine ( "Selected hash " + startPointHash + " as start point for index distribution, distance = " + yacyDHTAction . dhtDistance ( yacyCore . seedDB . mySeed . hash , startPointHash ) ) ;
Object [ ] selectResult = selectTransferContainers ( startPointHash , indexCount ) ;
indexContainers = ( plasmaWordIndexEntryContainer [ ] ) selectResult [ 0 ] ;
HashMap urlCache = ( HashMap ) selectResult [ 1 ] ; // String (url-hash) / plasmaCrawlLURL.Entry
if ( ( indexContainers = = null ) | | ( indexContainers . length = = 0 ) ) {
this . log . logFine ( "No index available for index transfer, hash start-point " + startPointHash ) ;
return - 1 ;
}
// count the indexes again, can be smaller as expected
indexCount = 0 ;
for ( int i = 0 ; i < indexContainers . length ; i + + ) {
indexCount + = indexContainers [ i ] . size ( ) ;
}
if ( indexCount < 50 ) {
this . log . logFine ( "Too few (" + indexCount + ") indexes selected for transfer." ) ;
return - 1 ; // failed
}
// find start point for DHT-selection
String keyhash = indexContainers [ indexContainers . length - 1 ] . wordHash ( ) ; // DHT targets must have greater hashes
// find a list of DHT-peers
yacySeed [ ] seeds = new yacySeed [ peerCount + 10 ] ;
int hc0 = 0 ;
double ownDistance = Math . min ( yacyDHTAction . dhtDistance ( yacyCore . seedDB . mySeed . hash , indexContainers [ 0 ] . wordHash ( ) ) ,
yacyDHTAction . dhtDistance ( yacyCore . seedDB . mySeed . hash , indexContainers [ indexContainers . length - 1 ] . wordHash ( ) ) ) ;
double maxDistance = Math . min ( ownDistance , 0.4 ) ;
synchronized ( yacyCore . dhtAgent ) {
double avdist ;
Enumeration e = yacyCore . dhtAgent . getAcceptRemoteIndexSeeds ( keyhash ) ;
while ( ( e . hasMoreElements ( ) ) & & ( hc0 < seeds . length ) ) {
if ( this . isClosed ( ) ) {
this . log . logSevere ( "Index distribution interrupted by close, nothing deleted locally." ) ;
return - 1 ; // interrupted
}
seeds [ hc0 ] = ( yacySeed ) e . nextElement ( ) ;
if ( seeds [ hc0 ] ! = null ) {
avdist = Math . max ( yacyDHTAction . dhtDistance ( seeds [ hc0 ] . hash , indexContainers [ 0 ] . wordHash ( ) ) ,
yacyDHTAction . dhtDistance ( seeds [ hc0 ] . hash , indexContainers [ indexContainers . length - 1 ] . wordHash ( ) ) ) ;
if ( avdist < maxDistance ) {
this . log . logInfo ( "Selected " + ( ( hc0 < peerCount ) ? "primary" : "reserve" ) + " DHT target peer " + seeds [ hc0 ] . getName ( ) + ":" + seeds [ hc0 ] . hash + ", distance = " + avdist ) ;
hc0 + + ;
}
}
}
e = null ; // finish enumeration
}
e = null ; // finish enumeration
}
if ( hc0 < peerCount ) {
log . logWarning ( "found not enough (" + hc0 + ") peers for distribution" ) ;
closeTransferIndexes ( indexContainers ) ;
return - 1 ; // failed
}
// send away the indexes to all these indexes
String error ;
String peerNames = "" ;
long start ;
int hc1 = 0 ;
for ( int i = 0 ; i < hc0 ; i + + ) {
if ( closed ) {
log . logSevere ( "Index distribution interrupted by close, nothing deleted locally." ) ;
return - 1 ; // interrupted
if ( hc0 < peerCount ) {
this . log . logWarning ( "found not enough (" + hc0 + ") peers for distribution" ) ;
return - 1 ; // failed
}
start = System . currentTimeMillis ( ) ;
error = yacyClient . transferIndex (
seeds [ i ] ,
indexContainers ,
urlCache ,
this . gzipBody4Distribution ,
this . timeout4Distribution ) ;
if ( error = = null ) {
log . logInfo ( "Index transfer of " + indexCount + " words [" + indexContainers [ 0 ] . wordHash ( ) + " .. " + indexContainers [ indexContainers . length - 1 ] . wordHash ( ) + "] to peer " + seeds [ i ] . getName ( ) + ":" + seeds [ i ] . hash + " in " + ( ( System . currentTimeMillis ( ) - start ) / 1000 )
+ " seconds successfull (" + ( 1000 * indexCount / ( System . currentTimeMillis ( ) - start + 1 ) ) + " words/s)" ) ;
peerNames + = ", " + seeds [ i ] . getName ( ) ;
hc1 + + ;
} else {
log . logWarning ( "Index transfer to peer " + seeds [ i ] . getName ( ) + ":" + seeds [ i ] . hash + " failed:'" + error + "', disconnecting peer" ) ;
yacyCore . peerActions . peerDeparture ( seeds [ i ] ) ;
// send away the indexes to all these indexes
String error ;
String peerNames = "" ;
long start ;
int hc1 = 0 ;
for ( int i = 0 ; i < hc0 ; i + + ) {
if ( this . isClosed ( ) ) {
this . log . logSevere ( "Index distribution interrupted by close, nothing deleted locally." ) ;
return - 1 ; // interrupted
}
start = System . currentTimeMillis ( ) ;
error = yacyClient . transferIndex (
seeds [ i ] ,
indexContainers ,
urlCache ,
this . gzipBody4Distribution ,
this . timeout4Distribution ) ;
if ( error = = null ) {
this . log . logInfo ( "Index transfer of " + indexCount + " words [" + indexContainers [ 0 ] . wordHash ( ) + " .. " + indexContainers [ indexContainers . length - 1 ] . wordHash ( ) + "] to peer " + seeds [ i ] . getName ( ) + ":" + seeds [ i ] . hash + " in " + ( ( System . currentTimeMillis ( ) - start ) / 1000 )
+ " seconds successfull (" + ( 1000 * indexCount / ( System . currentTimeMillis ( ) - start + 1 ) ) + " words/s)" ) ;
peerNames + = ", " + seeds [ i ] . getName ( ) ;
hc1 + + ;
} else {
this . log . logWarning ( "Index transfer to peer " + seeds [ i ] . getName ( ) + ":" + seeds [ i ] . hash + " failed:'" + error + "', disconnecting peer" ) ;
yacyCore . peerActions . peerDeparture ( seeds [ i ] ) ;
}
if ( hc1 > = peerCount ) break ;
}
if ( hc1 > = peerCount ) break ;
}
if ( peerNames . length ( ) > 0 ) peerNames = peerNames . substring ( 2 ) ; // remove comma
// clean up and finish with deletion of indexes
if ( hc1 > = peerCount ) {
// success
if ( delete ) {
int deletedURLs = deleteTransferIndexes ( indexContainers ) ;
log . logFine ( "Deleted from " + indexContainers . length + " transferred RWIs locally, removed " + deletedURLs + " URL references" ) ;
if ( peerNames . length ( ) > 0 ) peerNames = peerNames . substring ( 2 ) ; // remove comma
// clean up and finish with deletion of indexes
if ( hc1 > = peerCount ) {
// success
if ( delete ) {
int deletedURLs = deleteTransferIndexes ( indexContainers ) ;
this . log . logFine ( "Deleted from " + indexContainers . length + " transferred RWIs locally, removed " + deletedURLs + " URL references" ) ;
indexContainers = null ;
return indexCount ;
}
return indexCount ;
} else {
// simply close the indexEntities
closeTransferIndexes ( indexContainers ) ;
}
return indexCount ;
} else {
log . logSevere ( "Index distribution failed. Too few peers (" + hc1 + ") received the index, not deleted locally." ) ;
// simply close the indexEntities
closeTransferIndexes ( indexContainers ) ;
this . log . logSevere ( "Index distribution failed. Too few peers (" + hc1 + ") received the index, not deleted locally." ) ;
return - 1 ;
} finally {
if ( indexContainers ! = null ) {
// simply close the indexEntities
closeTransferIndexes ( indexContainers ) ;
}
}
}