//plasmaWordIndexDistribution.java
//-------------------------------------------
//(C) by Michael Peter Christen; mc@anomic.de
//first published on http://www.anomic.de
//Frankfurt, Germany, 2005
//
//last major change: $LastChangedDate$ by $LastChangedBy$
//Revision: $LastChangedRevision$
//
//This program is free software; you can redistribute it and/or modify
//it under the terms of the GNU General Public License as published by
//the Free Software Foundation; either version 2 of the License, or
//(at your option) any later version.
//
//This program is distributed in the hope that it will be useful,
//but WITHOUT ANY WARRANTY; without even the implied warranty of
//MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
//GNU General Public License for more details.
//
//You should have received a copy of the GNU General Public License
//along with this program; if not, write to the Free Software
//Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
//
//Using this software in any meaning (reading, learning, copying, compiling,
//running) means that you agree that the Author(s) is (are) not responsible
//for cost, loss of data or any harm that may be caused directly or indirectly
//by usage of this softare or this documentation. The usage of this software
//is on your own risk. The installation and usage (starting/running) of this
//software may allow other people or application to access your computer and
//any attached devices and is highly dependent on the configuration of the
//software which must be done by the user of the software; the author(s) is
//(are) also not responsible for proper configuration and usage of the
//software, even if provoked by documentation provided together with
//the software.
//
//Any changes to this file according to the GPL as documented in the file
//gpl.txt aside this file in the shipment you received can be done to the
//lines that follows this copyright notice here, but changes must not be
//done inside the copyright notive above. A re-distribution must contain
//the intact and unchanged copyright notice.
//Contributions and changes to the program code must be marked as such.
package de.anomic.plasma ;
import java.io.IOException ;
import java.util.Enumeration ;
import java.util.Vector ;
import java.util.Iterator ;
import java.util.HashSet ;
import java.util.HashMap ;
import de.anomic.yacy.yacyCore ;
import de.anomic.yacy.yacySeed ;
import de.anomic.yacy.yacySeedDB ;
import de.anomic.yacy.yacyClient ;
import de.anomic.yacy.yacyDHTAction ;
import de.anomic.server.serverCodings ;
import de.anomic.server.logging.serverLog ;
import de.anomic.kelondro.kelondroException ;
public class plasmaWordIndexDistribution {
// distributes parts of the index to other peers
// stops as soon as an error occurrs
private int indexCount ;
private int juniorPeerCount , seniorPeerCount ;
private long maxTime ;
private plasmaURLPool urlPool ;
private plasmaWordIndex wordIndex ;
serverLog log ;
boolean paused = false ;
private boolean enabled ;
private boolean enabledWhileCrawling ;
private boolean closed ;
public transferIndexThread transferIdxThread = null ;
public plasmaWordIndexDistribution ( plasmaURLPool urlPool , plasmaWordIndex wordIndex , serverLog log ,
boolean enable , boolean enabledWhileCrawling ) {
this . urlPool = urlPool ;
this . wordIndex = wordIndex ;
this . enabled = enable ;
this . enabledWhileCrawling = enabledWhileCrawling ;
this . log = log ;
this . closed = false ;
setCounts ( 100 /*indexCount*/ , 1 /*juniorPeerCount*/ , 3 /*seniorPeerCount*/ , 8000 ) ;
}
public void enable ( ) {
enabled = true ;
}
public void disable ( ) {
enabled = false ;
}
public void enableWhileCrawling ( ) {
this . enabledWhileCrawling = true ;
}
public void disableWhileCrawling ( ) {
this . enabledWhileCrawling = false ;
}
public void close ( ) {
closed = true ;
if ( transferIdxThread ! = null ) {
stopTransferWholeIndex ( false ) ;
}
}
public boolean job ( ) {
if ( this . closed ) {
log . logFine ( "no word distribution: closed" ) ;
return false ;
}
if ( yacyCore . seedDB = = null ) {
log . logFine ( "no word distribution: seedDB == null" ) ;
return false ;
}
if ( yacyCore . seedDB . mySeed = = null ) {
log . logFine ( "no word distribution: mySeed == null" ) ;
return false ;
}
if ( yacyCore . seedDB . mySeed . isVirgin ( ) ) {
log . logFine ( "no word distribution: status is virgin" ) ;
return false ;
}
if ( ! ( enabled ) ) {
log . logFine ( "no word distribution: not enabled" ) ;
return false ;
}
if ( paused ) {
log . logFine ( "no word distribution: paused" ) ;
return false ;
}
if ( urlPool . loadedURL . size ( ) < 10 ) {
log . logFine ( "no word distribution: loadedURL.size() = " + urlPool . loadedURL . size ( ) ) ;
return false ;
}
if ( wordIndex . size ( ) < 100 ) {
log . logFine ( "no word distribution: not enough words - wordIndex.size() = " + wordIndex . size ( ) ) ;
return false ;
}
if ( ( ! enabledWhileCrawling ) & & ( urlPool . noticeURL . stackSize ( ) > 0 ) ) {
log . logFine ( "no word distribution: crawl in progress - noticeURL.stackSize() = " + urlPool . noticeURL . stackSize ( ) ) ;
return false ;
}
// do the transfer
int peerCount = ( yacyCore . seedDB . mySeed . isJunior ( ) ) ? juniorPeerCount : seniorPeerCount ;
long starttime = System . currentTimeMillis ( ) ;
int transferred = performTransferIndex ( indexCount , peerCount , true ) ;
if ( transferred < = 0 ) {
log . logFine ( "no word distribution: transfer failed" ) ;
return false ;
}
// adopt transfer count
if ( ( System . currentTimeMillis ( ) - starttime ) > ( maxTime * peerCount ) )
indexCount - - ;
else
indexCount + + ;
if ( indexCount < 50 ) indexCount = 50 ;
// show success
return true ;
}
public void setCounts ( int indexCount , int juniorPeerCount , int seniorPeerCount , long maxTimePerTransfer ) {
this . maxTime = maxTimePerTransfer ;
this . indexCount = indexCount ;
if ( indexCount < 30 ) indexCount = 30 ;
this . juniorPeerCount = juniorPeerCount ;
this . seniorPeerCount = seniorPeerCount ;
}
public int performTransferIndex ( int indexCount , int peerCount , boolean delete ) {
if ( ( yacyCore . seedDB = = null ) | | ( yacyCore . seedDB . sizeConnected ( ) = = 0 ) ) return - 1 ;
// collect index
String startPointHash = selectTransferStart ( ) ;
log . logFine ( "Selected hash " + startPointHash + " as start point for index distribution, distance = " + yacyDHTAction . dhtDistance ( yacyCore . seedDB . mySeed . hash , startPointHash ) ) ;
Object [ ] selectResult = selectTransferIndexes ( startPointHash , indexCount ) ;
plasmaWordIndexEntity [ ] indexEntities = ( plasmaWordIndexEntity [ ] ) selectResult [ 0 ] ;
HashMap urlCache = ( HashMap ) selectResult [ 1 ] ; // String (url-hash) / plasmaCrawlLURL.Entry
if ( ( indexEntities = = null ) | | ( indexEntities . length = = 0 ) ) {
log . logFine ( "No index available for index transfer, hash start-point " + startPointHash ) ;
return - 1 ;
}
// count the indexes again, can be smaller as expected
indexCount = 0 ;
for ( int i = 0 ; i < indexEntities . length ; i + + ) {
indexCount + = indexEntities [ i ] . size ( ) ;
}
// find start point for DHT-selection
String keyhash = indexEntities [ indexEntities . length - 1 ] . wordHash ( ) ; // DHT targets must have greater hashes
// iterate over DHT-peers and send away the indexes
yacySeed seed ;
int hc = 0 ;
Enumeration e = yacyCore . dhtAgent . getAcceptRemoteIndexSeeds ( keyhash ) ;
String error ;
String peerNames = "" ;
double avdist ;
long start ;
while ( ( e . hasMoreElements ( ) ) & & ( hc < peerCount ) ) {
if ( closed ) {
log . logSevere ( "Index distribution interrupted by close, nothing deleted locally." ) ;
return - 1 ; // interrupted
}
seed = ( yacySeed ) e . nextElement ( ) ;
if ( ( seed ! = null ) & &
( ( avdist = ( yacyDHTAction . dhtDistance ( seed . hash , indexEntities [ 0 ] . wordHash ( ) ) +
yacyDHTAction . dhtDistance ( seed . hash , indexEntities [ indexEntities . length - 1 ] . wordHash ( ) ) ) / 2.0 ) < 0.3 ) ) {
start = System . currentTimeMillis ( ) ;
error = yacyClient . transferIndex ( seed , indexEntities , urlCache ) ;
if ( error = = null ) {
log . logInfo ( "Index transfer of " + indexCount + " words [" + indexEntities [ 0 ] . wordHash ( ) + " .. " + indexEntities [ indexEntities . length - 1 ] . wordHash ( ) + "]/" +
avdist + " to peer " + seed . getName ( ) + ":" + seed . hash + " in " +
( ( System . currentTimeMillis ( ) - start ) / 1000 ) + " seconds successfull (" +
( 1000 * indexCount / ( System . currentTimeMillis ( ) - start + 1 ) ) + " words/s)" ) ;
peerNames + = ", " + seed . getName ( ) ;
hc + + ;
} else {
log . logWarning ( "Index transfer to peer " + seed . getName ( ) + ":" + seed . hash + " failed:'" + error + "', disconnecting peer" ) ;
yacyCore . peerActions . peerDeparture ( seed ) ;
}
}
}
if ( peerNames . length ( ) > 0 ) peerNames = peerNames . substring ( 2 ) ; // remove comma
// clean up and finish with deletion of indexes
if ( hc > = peerCount ) {
// success
if ( delete ) {
try {
if ( deleteTransferIndexes ( indexEntities ) ) {
log . logFine ( "Deleted all " + indexEntities . length + " transferred whole-word indexes locally" ) ;
return indexCount ;
} else {
log . logSevere ( "Deleted not all transferred whole-word indexes" ) ;
return - 1 ;
}
} catch ( IOException ee ) {
log . logSevere ( "Deletion of indexes not possible:" + ee . getMessage ( ) , ee ) ;
return - 1 ;
}
} else {
// simply close the indexEntities
for ( int i = 0 ; i < indexEntities . length ; i + + ) try {
indexEntities [ i ] . close ( ) ;
} catch ( IOException ee ) { }
}
return indexCount ;
} else {
log . logSevere ( "Index distribution failed. Too less peers (" + hc + ") received the index, not deleted locally." ) ;
return - 1 ;
}
}
private String selectTransferStart ( ) {
String startPointHash ;
// first try to select with increasing probality a good start point
if ( Math . round ( Math . random ( ) * 6 ) ! = 4 ) for ( int i = 9 ; i > 0 ; i - - ) {
startPointHash = serverCodings . encodeMD5B64 ( Long . toString ( i + System . currentTimeMillis ( ) ) , true ) . substring ( 2 , 2 + yacySeedDB . commonHashLength ) ;
if ( yacyDHTAction . dhtDistance ( yacyCore . seedDB . mySeed . hash , startPointHash ) > ( ( double ) i / ( double ) 10 ) ) return startPointHash ;
}
// if that fails, take simply the best start point (this is usually avoided, since that leads to always the same target peers)
startPointHash = yacyCore . seedDB . mySeed . hash . substring ( 0 , 11 ) + "z" ;
return startPointHash ;
}
Object [ ] /* of {plasmaWordIndexEntity[], HashMap(String, plasmaCrawlLURL.Entry)}*/
selectTransferIndexes ( String hash , int count ) {
// the hash is a start hash from where the indexes are picked
Vector tmpEntities = new Vector ( ) ;
String nexthash = "" ;
try {
Iterator wordHashIterator = wordIndex . wordHashes ( hash , true , true ) ;
plasmaWordIndexEntity indexEntity , tmpEntity ;
Enumeration urlEnum ;
Iterator hashIter ;
plasmaWordIndexEntry indexEntry ;
plasmaCrawlLURL . Entry lurl ;
HashSet unknownURLEntries ;
HashMap knownURLs = new HashMap ( ) ;
while ( ( count > 0 ) & & ( wordHashIterator . hasNext ( ) ) & &
( ( nexthash = ( String ) wordHashIterator . next ( ) ) ! = null ) & & ( nexthash . trim ( ) . length ( ) > 0 ) ) {
indexEntity = wordIndex . getEntity ( nexthash , true ) ;
if ( indexEntity . size ( ) = = 0 ) {
indexEntity . deleteComplete ( ) ;
} else if ( ( indexEntity . size ( ) < = count ) | | // if we havn't exceeded the limit
( Math . abs ( indexEntity . size ( ) - count ) < = 10 ) ) { // or there are only at most 10 entries left
// take the whole entity
try {
// fist check if we know all urls
urlEnum = indexEntity . elements ( true ) ;
unknownURLEntries = new HashSet ( ) ;
while ( urlEnum . hasMoreElements ( ) ) {
indexEntry = ( plasmaWordIndexEntry ) urlEnum . nextElement ( ) ;
lurl = urlPool . loadedURL . getEntry ( indexEntry . getUrlHash ( ) ) ;
if ( ( lurl = = null ) | | ( lurl . toString ( ) = = null ) ) {
unknownURLEntries . add ( indexEntry . getUrlHash ( ) ) ;
} else {
if ( lurl . toString ( ) = = null ) {
urlPool . loadedURL . remove ( indexEntry . getUrlHash ( ) ) ;
unknownURLEntries . add ( indexEntry . getUrlHash ( ) ) ;
} else {
knownURLs . put ( indexEntry . getUrlHash ( ) , lurl ) ;
}
}
}
// now delete all entries that have no url entry
hashIter = unknownURLEntries . iterator ( ) ;
while ( hashIter . hasNext ( ) ) {
indexEntity . removeEntry ( ( String ) hashIter . next ( ) , false ) ;
}
// use whats remaining
tmpEntities . add ( indexEntity ) ;
log . logFine ( "Selected whole index (" + indexEntity . size ( ) + " URLs, " + unknownURLEntries . size ( ) + " not bound) for word " + indexEntity . wordHash ( ) ) ;
count - = indexEntity . size ( ) ;
} catch ( kelondroException e ) {
log . logSevere ( "plasmaWordIndexDistribution/1: deleted DB for word " + indexEntity . wordHash ( ) , e ) ;
try { indexEntity . deleteComplete ( ) ; } catch ( IOException ee ) { }
}
} else {
// make an on-the-fly entity and insert values
tmpEntity = new plasmaWordIndexEntity ( indexEntity . wordHash ( ) ) ;
try {
urlEnum = indexEntity . elements ( true ) ;
unknownURLEntries = new HashSet ( ) ;
while ( ( urlEnum . hasMoreElements ( ) ) & & ( count > 0 ) ) {
indexEntry = ( plasmaWordIndexEntry ) urlEnum . nextElement ( ) ;
lurl = urlPool . loadedURL . getEntry ( indexEntry . getUrlHash ( ) ) ;
if ( lurl = = null ) {
unknownURLEntries . add ( indexEntry . getUrlHash ( ) ) ;
} else {
if ( lurl . toString ( ) = = null ) {
urlPool . loadedURL . remove ( indexEntry . getUrlHash ( ) ) ;
unknownURLEntries . add ( indexEntry . getUrlHash ( ) ) ;
} else {
knownURLs . put ( indexEntry . getUrlHash ( ) , lurl ) ;
tmpEntity . addEntry ( indexEntry ) ;
count - - ;
}
}
}
// now delete all entries that have no url entry
hashIter = unknownURLEntries . iterator ( ) ;
while ( hashIter . hasNext ( ) ) {
indexEntity . removeEntry ( ( String ) hashIter . next ( ) , true ) ;
}
// use whats remaining
log . logFine ( "Selected partial index (" + tmpEntity . size ( ) + " from " + indexEntity . size ( ) + " URLs, " + unknownURLEntries . size ( ) + " not bound) for word " + tmpEntity . wordHash ( ) ) ;
tmpEntities . add ( tmpEntity ) ;
} catch ( kelondroException e ) {
log . logSevere ( "plasmaWordIndexDistribution/2: deleted DB for word " + indexEntity . wordHash ( ) , e ) ;
try { indexEntity . deleteComplete ( ) ; } catch ( IOException ee ) { }
}
indexEntity . close ( ) ; // important: is not closed elswhere and cannot be deleted afterwards
indexEntity = null ;
}
}
// transfer to array
plasmaWordIndexEntity [ ] indexEntities = new plasmaWordIndexEntity [ tmpEntities . size ( ) ] ;
for ( int i = 0 ; i < tmpEntities . size ( ) ; i + + ) indexEntities [ i ] = ( plasmaWordIndexEntity ) tmpEntities . elementAt ( i ) ;
return new Object [ ] { indexEntities , knownURLs } ;
} catch ( IOException e ) {
log . logSevere ( "selectTransferIndexes IO-Error (hash=" + nexthash + "): " + e . getMessage ( ) , e ) ;
return new Object [ ] { new plasmaWordIndexEntity [ 0 ] , new HashMap ( ) } ;
} catch ( kelondroException e ) {
log . logSevere ( "selectTransferIndexes database corrupted: " + e . getMessage ( ) , e ) ;
return new Object [ ] { new plasmaWordIndexEntity [ 0 ] , new HashMap ( ) } ;
}
}
boolean deleteTransferIndexes ( plasmaWordIndexEntity [ ] indexEntities ) throws IOException {
String wordhash ;
Enumeration urlEnum ;
plasmaWordIndexEntry indexEntry ;
plasmaWordIndexEntity indexEntity ;
String [ ] urlHashes ;
int sz ;
boolean success = true ;
for ( int i = 0 ; i < indexEntities . length ; i + + ) {
if ( indexEntities [ i ] . isTMPEntity ( ) ) {
// delete entries separately
int c = 0 ;
urlHashes = new String [ indexEntities [ i ] . size ( ) ] ;
urlEnum = indexEntities [ i ] . elements ( true ) ;
while ( urlEnum . hasMoreElements ( ) ) {
indexEntry = ( plasmaWordIndexEntry ) urlEnum . nextElement ( ) ;
urlHashes [ c + + ] = indexEntry . getUrlHash ( ) ;
}
wordIndex . removeEntries ( indexEntities [ i ] . wordHash ( ) , urlHashes , true ) ;
indexEntity = wordIndex . getEntity ( indexEntities [ i ] . wordHash ( ) , true ) ;
sz = indexEntity . size ( ) ;
indexEntity . close ( ) ;
log . logFine ( "Deleted partial index (" + c + " URLs) for word " + indexEntities [ i ] . wordHash ( ) + "; " + sz + " entries left" ) ;
// DEBUG: now try to delete the remaining index. If this works, this routine is fine
/ *
if ( wordIndex . getEntity ( indexEntities [ i ] . wordHash ( ) ) . deleteComplete ( ) )
System . out . println ( "DEBUG: trial delete of partial word index " + indexEntities [ i ] . wordHash ( ) + " SUCCESSFULL" ) ;
else
System . out . println ( "DEBUG: trial delete of partial word index " + indexEntities [ i ] . wordHash ( ) + " FAILED" ) ;
* /
// end debug
indexEntities [ i ] . close ( ) ;
} else {
// delete complete file
if ( indexEntities [ i ] . deleteComplete ( ) ) {
indexEntities [ i ] . close ( ) ;
} else {
indexEntities [ i ] . close ( ) ;
// have another try...
if ( ! ( plasmaWordIndexEntity . wordHash2path ( wordIndex . getRoot ( ) /*PLASMADB*/ , indexEntities [ i ] . wordHash ( ) ) . delete ( ) ) ) {
success = false ;
log . logSevere ( "Could not delete whole index for word " + indexEntities [ i ] . wordHash ( ) ) ;
}
}
}
indexEntities [ i ] = null ;
}
return success ;
}
public void startTransferWholeIndex ( yacySeed seed , boolean delete ) {
if ( transferIdxThread = = null ) {
this . transferIdxThread = new transferIndexThread ( seed , delete ) ;
this . transferIdxThread . start ( ) ;
}
}
public void stopTransferWholeIndex ( boolean wait ) {
if ( ( transferIdxThread ! = null ) & & ( transferIdxThread . isAlive ( ) ) & & ( ! transferIdxThread . isFinished ( ) ) ) {
try {
this . transferIdxThread . stopIt ( wait ) ;
} catch ( InterruptedException e ) { }
}
}
public void abortTransferWholeIndex ( boolean wait ) {
if ( transferIdxThread ! = null ) {
if ( ! transferIdxThread . isFinished ( ) )
try {
this . transferIdxThread . stopIt ( wait ) ;
} catch ( InterruptedException e ) { }
transferIdxThread = null ;
}
}
public class transferIndexThread extends Thread {
private yacySeed seed = null ;
private boolean delete = false ;
private boolean finished = false ;
private int transferedIndexCount = 0 ;
private String status = "Running" ;
private String oldStartingPointHash = "------------" , startPointHash = "------------" ;
private int wordsDBSize = 0 ;
private int chunkSize = 500 ;
public transferIndexThread ( yacySeed seed , boolean delete ) {
super ( new ThreadGroup ( "TransferIndexThreadGroup" ) , "TransferIndex_" + seed . getName ( ) ) ;
this . seed = seed ;
this . delete = delete ;
this . wordsDBSize = plasmaSwitchboard . getSwitchboard ( ) . wordIndex . size ( ) ;
}
public void run ( ) {
performTransferWholeIndex ( ) ;
}
public void stopIt ( boolean wait ) throws InterruptedException {
this . finished = true ;
this . join ( ) ;
}
public boolean isFinished ( ) {
return this . finished ;
}
public boolean deleteIndex ( ) {
return this . delete ;
}
public int getChunkSize ( ) {
return this . chunkSize ;
}
public int getTransferedIndexCount ( ) {
return this . transferedIndexCount ;
}
public float getTransferedIndexPercent ( ) {
if ( wordsDBSize = = 0 ) return 100 ;
else return ( float ) ( this . transferedIndexCount * 100 / wordsDBSize ) ;
}
public yacySeed getSeed ( ) {
return this . seed ;
}
public String getStatus ( ) {
return this . status ;
}
public String getRange ( ) {
return "[" + oldStartingPointHash + " .. " + startPointHash + "]" ;
}
public void performTransferWholeIndex ( ) {
try {
plasmaWordIndexDistribution . this . paused = true ;
// initial startingpoint of intex transfer is "------------"
plasmaWordIndexDistribution . this . log . logFine ( "Selected hash " + startPointHash + " as start point for index distribution of whole index" ) ;
/ * Loop until we have
* - finished transfer of whole index
* - detected a server shutdown or user interruption
* - detected a failure
* /
long start , retryCount = 0 , iteration = 0 ;
while ( ! finished & & ! Thread . currentThread ( ) . isInterrupted ( ) ) {
iteration + + ;
int idxCount = 0 ;
start = System . currentTimeMillis ( ) ;
// selecting 500 words to transfer
this . status = "Running: Selection of chunk " + iteration ;
Object [ ] selectResult = selectTransferIndexes ( startPointHash , chunkSize ) ;
plasmaWordIndexEntity [ ] indexEntities = ( plasmaWordIndexEntity [ ] ) selectResult [ 0 ] ;
HashMap urlCache = ( HashMap ) selectResult [ 1 ] ; // String (url-hash) / plasmaCrawlLURL.Entry
if ( ( indexEntities = = null ) | | ( indexEntities . length = = 0 ) ) {
plasmaWordIndexDistribution . this . log . logFine ( "No index available for index transfer, hash start-point " + startPointHash ) ;
this . status = "Finished. " + iteration + " chunks transfered." ;
return ;
}
// count the indexes again, can be smaller as expected
for ( int i = 0 ; i < indexEntities . length ; i + + ) idxCount + = indexEntities [ i ] . size ( ) ;
// getting start point for next DHT-selection
oldStartingPointHash = startPointHash ;
startPointHash = indexEntities [ indexEntities . length - 1 ] . wordHash ( ) ; // DHT targets must have greater hashes
plasmaWordIndexDistribution . this . log . logInfo ( "Index selection of " + idxCount + " words [" + indexEntities [ 0 ] . wordHash ( ) + " .. " + indexEntities [ indexEntities . length - 1 ] . wordHash ( ) + "]" +
" in " +
( ( System . currentTimeMillis ( ) - start ) / 1000 ) + " seconds (" +
( 1000 * idxCount / ( System . currentTimeMillis ( ) - start + 1 ) ) + " words/s)" ) ;
/ * loop until we
* - have successfully transfered the words list or
* - the retry counter limit was exceeded
* /
start = System . currentTimeMillis ( ) ;
while ( true ) {
// testing if we wer aborted
if ( isAborted ( ) ) return ;
// transfering seleted words to remote peer
this . status = "Running: Transfer of chunk " + iteration ;
String error = yacyClient . transferIndex ( seed , indexEntities , urlCache ) ;
if ( error = = null ) {
// words successfully transfered
long transferTime = System . currentTimeMillis ( ) - start ;
plasmaWordIndexDistribution . this . log . logInfo ( "Index transfer of " + idxCount + " words [" + indexEntities [ 0 ] . wordHash ( ) + " .. " + indexEntities [ indexEntities . length - 1 ] . wordHash ( ) + "]" +
" to peer " + seed . getName ( ) + ":" + seed . hash + " in " + ( transferTime / 1000 ) + " seconds successfull (" +
( 1000 * idxCount / ( transferTime + 1 ) ) + " words/s)" ) ;
retryCount = 0 ;
if ( transferTime > 30000 ) {
if ( chunkSize > 100 ) chunkSize - = 50 ;
} else {
chunkSize + = 50 ;
}
break ;
} else {
// worts transfer failed
// inc retry counter
retryCount + + ;
// testing if we were aborted ...
if ( isAborted ( ) ) return ;
// we have lost the connection to the remote peer. Adding peer to disconnected list
plasmaWordIndexDistribution . this . log . logWarning ( "Index transfer to peer " + seed . getName ( ) + ":" + seed . hash + " failed:'" + error + "', disconnecting peer" ) ;
yacyCore . peerActions . peerDeparture ( seed ) ;
// if the retry counter limit was not exceeded we'll retry it in a few seconds
this . status = "Disconnected peer: " + ( ( retryCount > 5 ) ? error + ". Transfer aborted" : "Retry " + retryCount ) ;
if ( retryCount > 5 ) return ;
Thread . sleep ( retryCount * 5000 ) ;
/ * loop until
* - we have successfully done a peer ping or
* - the retry counter limit was exceeded
* /
while ( true ) {
// testing if we were aborted ...
if ( isAborted ( ) ) return ;
// doing a peer ping to the remote seed
int added = yacyClient . publishMySeed ( seed . getAddress ( ) , seed . hash ) ;
if ( added < 0 ) {
// inc. retry counter
retryCount + + ;
this . status = "Disconnected peer: Peer ping failed. " + ( ( retryCount > 5 ) ? "Transfer aborted." : "Retry " + retryCount ) ;
if ( retryCount > 5 ) return ;
Thread . sleep ( retryCount * 5000 ) ;
continue ;
} else {
yacyCore . seedDB . getConnected ( seed . hash ) ;
this . status = "running" ;
break ;
}
}
}
}
// deleting transfered words from index
if ( delete ) {
this . status = "Running: Deletion of chunk " + iteration ;
try {
if ( deleteTransferIndexes ( indexEntities ) ) {
plasmaWordIndexDistribution . this . log . logFine ( "Deleted all " + indexEntities . length + " transferred whole-word indexes locally" ) ;
transferedIndexCount + = idxCount ;
} else {
plasmaWordIndexDistribution . this . log . logSevere ( "Deleted not all transferred whole-word indexes" ) ;
}
} catch ( IOException ee ) {
plasmaWordIndexDistribution . this . log . logSevere ( "Deletion of indexes not possible:" + ee . getMessage ( ) , ee ) ;
}
} else {
// simply close the indexEntities
for ( int i = 0 ; i < indexEntities . length ; i + + ) try {
indexEntities [ i ] . close ( ) ;
} catch ( IOException ee ) { }
transferedIndexCount + = idxCount ;
}
}
// if we reach this point we were aborted by the user or by server shutdown
this . status = "aborted" ;
} catch ( Exception e ) {
this . status = "Error: " + e . getMessage ( ) ;
plasmaWordIndexDistribution . this . log . logWarning ( "Index transfer to peer " + seed . getName ( ) + ":" + seed . hash + " failed:'" + e . getMessage ( ) + "'" , e ) ;
} finally {
plasmaWordIndexDistribution . this . paused = false ;
}
}
private boolean isAborted ( ) {
if ( finished | | Thread . currentThread ( ) . isInterrupted ( ) ) {
this . status = "aborted" ;
return true ;
}
return false ;
}
}
}