@ -309,7 +309,7 @@ public final class plasmaWordIndexDistribution {
final HashMap knownURLs = new HashMap ( ) ;
while (
( count > 0 ) & &
( currOpenFiles < = maxOpenFiles ) & &
( currOpenFiles < maxOpenFiles ) & &
( wordHashIterator . hasNext ( ) ) & &
( ( nexthash = ( String ) wordHashIterator . next ( ) ) ! = null ) & &
( nexthash . trim ( ) . length ( ) > 0 )
@ -740,7 +740,10 @@ public final class plasmaWordIndexDistribution {
}
public void performTransferWholeIndex ( ) {
plasmaWordIndexEntity [ ] newIndexEntities = null , oldIndexEntities = null ;
try {
// pausing the regular index distribution
// TODO: adding sync, to wait for a still running index distribution to finish
plasmaWordIndexDistribution . this . paused = true ;
// initial startingpoint of intex transfer is "------------"
@ -751,9 +754,9 @@ public final class plasmaWordIndexDistribution {
* - detected a server shutdown or user interruption
* - detected a failure
* /
long selectionStart = System . currentTimeMillis ( ) , selectionEnd = 0 , selectionTime , iteration = 0 ;
long selectionStart = System . currentTimeMillis ( ) , selectionEnd = 0 , selectionTime = 0 , iteration = 0 ;
plasmaWordIndexEntity[ ] newIndexEntities = null , oldIndexEntities = null ;
Integer openedFiles = new Integer ( 0 ) ;
while ( ! finished & & ! Thread . currentThread ( ) . isInterrupted ( ) ) {
iteration + + ;
int idxCount = 0 ;
@ -762,51 +765,71 @@ public final class plasmaWordIndexDistribution {
// selecting 500 words to transfer
this . status = "Running: Selecting chunk " + iteration ;
Object [ ] selectResult = selectTransferIndexes ( this . startPointHash , this . chunkSize , this . maxOpenFiles );
Object [ ] selectResult = selectTransferIndexes ( this . startPointHash , this . chunkSize , this . maxOpenFiles -openedFiles . intValue ( ) );
newIndexEntities = ( plasmaWordIndexEntity [ ] ) selectResult [ 0 ] ;
HashMap urlCache = ( HashMap ) selectResult [ 1 ] ; // String (url-hash) / plasmaCrawlLURL.Entry
openedFiles = ( Integer ) selectResult [ 2 ] ;
/ * If we havn ' t selected a word chunk this could be because of
* a ) no words are left in the index
* b ) max open file limit was exceeded
* /
if ( ( newIndexEntities = = null ) | | ( newIndexEntities . length = = 0 ) ) {
// if there are still words in the index we try it again now
if ( sb . wordIndex . size ( ) > 0 ) {
// if there are still words in the index we try it again now
startPointHash = "------------" ;
continue ;
} else {
// otherwise we could end transfer now
plasmaWordIndexDistribution . this . log . logFine ( "No index available for index transfer, hash start-point " + startPointHash ) ;
this . status = "Finished. " + iteration + " chunks transfered." ;
finished = true ;
}
plasmaWordIndexDistribution . this . log . logFine ( "No index available for index transfer, hash start-point " + startPointHash ) ;
this . status = "Finished. " + iteration + " chunks transfered." ;
return ;
} else {
// count the indexes again, can be smaller as expected
for ( int i = 0 ; i < newIndexEntities . length ; i + + ) idxCount + = newIndexEntities [ i ] . size ( ) ;
// getting start point for next DHT-selection
oldStartingPointHash = startPointHash ;
startPointHash = newIndexEntities [ newIndexEntities . length - 1 ] . wordHash ( ) ; // DHT targets must have greater hashes
selectionEnd = System . currentTimeMillis ( ) ;
selectionTime = selectionEnd - selectionStart ;
plasmaWordIndexDistribution . this . log . logInfo ( "Index selection of " + idxCount + " words [" + newIndexEntities [ 0 ] . wordHash ( ) + " .. " + newIndexEntities [ newIndexEntities . length - 1 ] . wordHash ( ) + "]" +
" in " +
( selectionTime / 1000 ) + " seconds (" +
( 1000 * idxCount / ( selectionTime ) ) + " words/s)" ) ;
}
// count the indexes again, can be smaller as expected
for ( int i = 0 ; i < newIndexEntities . length ; i + + ) idxCount + = newIndexEntities [ i ] . size ( ) ;
// getting start point for next DHT-selection
oldStartingPointHash = startPointHash ;
startPointHash = newIndexEntities [ newIndexEntities . length - 1 ] . wordHash ( ) ; // DHT targets must have greater hashes
selectionEnd = System . currentTimeMillis ( ) ;
selectionTime = selectionEnd - selectionStart ;
plasmaWordIndexDistribution . this . log . logInfo ( "Index selection of " + idxCount + " words [" + newIndexEntities [ 0 ] . wordHash ( ) + " .. " + newIndexEntities [ newIndexEntities . length - 1 ] . wordHash ( ) + "]" +
" in " +
( selectionTime / 1000 ) + " seconds (" +
( 1000 * idxCount / ( selectionTime ) ) + " words/s)" ) ;
// query status of old worker thread
if ( worker ! = null ) {
this . status = "Finished: Selecting chunk " + iteration ;
worker . join ( ) ;
if ( ! worker . success ) {
// if the transfer failed we abort index transfer now
this . status = "Aborted because of Transfer error:\n" + worker . getStatus ( ) ;
// cleanup. closing all open files
closeEntities ( oldIndexEntities ) ;
oldIndexEntities = null ;
closeEntities ( newIndexEntities ) ;
newIndexEntities = null ;
// abort index transfer
return ;
} else {
/ *
* If index transfer was done successfully we close all remaining open
* files that belong to the old index chunk and handover a new chunk
* to the transfer thread .
* Addintionally we recalculate the chunk size to optimize performance
* /
this . chunkSize = worker . getChunkSize ( ) ;
long transferTime = worker . getTransferTime ( ) ;
//TODO: only increase chunk Size if there is free memory left on the server
// we need aprox. 73Byte per IndexEntity and an average URL length of 32 char
//if (ft.freeMemory() < 73*2*100)
if ( transferTime > 60 * 1000 ) {
if ( chunkSize > 200 ) chunkSize - = 100 ;
} else if ( selectionTime < transferTime ) {
@ -832,23 +855,23 @@ public final class plasmaWordIndexDistribution {
plasmaWordIndexDistribution . this . log . logSevere ( "Deletion of indexes not possible:" + ee . getMessage ( ) , ee ) ;
}
} else {
// simply close the indexEntities
for ( int i = 0 ; i < oldIndexEntities . length ; i + + ) try {
oldIndexEntities [ i ] . close ( ) ;
} catch ( IOException ee ) { }
this . closeEntities ( oldIndexEntities ) ;
transferedIndexCount + = idxCount ;
}
oldIndexEntities = null ;
}
this . worker = null ;
}
// handover chunk to transfer worker
worker = new transferIndexWorkerThread ( seed , newIndexEntities , urlCache , gzipBody , timeout , iteration , idxCount , idxCount , startPointHash , oldStartingPointHash ) ;
worker . start ( ) ;
if ( ! ( ( newIndexEntities = = null ) | | ( newIndexEntities . length = = 0 ) ) ) {
worker = new transferIndexWorkerThread ( seed , newIndexEntities , urlCache , gzipBody , timeout , iteration , idxCount , idxCount , startPointHash , oldStartingPointHash ) ;
worker . start ( ) ;
}
}
// if we reach this point we were aborted by the user or by server shutdown
this. status = "aborted" ;
if ( sb . wordIndex . size ( ) > 0 ) this. status = "aborted" ;
} catch ( Exception e ) {
this . status = "Error: " + e . getMessage ( ) ;
plasmaWordIndexDistribution . this . log . logWarning ( "Index transfer to peer " + seed . getName ( ) + ":" + seed . hash + " failed:'" + e . getMessage ( ) + "'" , e ) ;
@ -859,10 +882,21 @@ public final class plasmaWordIndexDistribution {
try { worker . join ( ) ; } catch ( Exception e ) { }
// worker = null;
}
if ( oldIndexEntities ! = null ) closeEntities ( oldIndexEntities ) ;
if ( newIndexEntities ! = null ) closeEntities ( newIndexEntities ) ;
plasmaWordIndexDistribution . this . paused = false ;
}
}
private void closeEntities ( plasmaWordIndexEntity [ ] indexEntities ) {
if ( ( indexEntities = = null ) | | ( indexEntities . length = = 0 ) ) return ;
for ( int i = 0 ; i < indexEntities . length ; i + + ) try {
indexEntities [ i ] . close ( ) ;
} catch ( IOException ee ) { }
}
private boolean isAborted ( ) {
if ( finished | | Thread . currentThread ( ) . isInterrupted ( ) ) {
this . status = "aborted" ;