diff --git a/source/de/anomic/plasma/plasmaWordIndexDistribution.java b/source/de/anomic/plasma/plasmaWordIndexDistribution.java index bebd51182..e513facea 100644 --- a/source/de/anomic/plasma/plasmaWordIndexDistribution.java +++ b/source/de/anomic/plasma/plasmaWordIndexDistribution.java @@ -473,6 +473,7 @@ public class plasmaWordIndexDistribution { this.seed = seed; this.delete = delete; this.wordsDBSize = plasmaSwitchboard.getSwitchboard().wordIndex.size(); + this.setName("TransferIndex_" + seed.getName()); } public void run() { @@ -513,18 +514,22 @@ public class plasmaWordIndexDistribution { try { plasmaWordIndexDistribution.this.paused = true; - // collect index + // initial startingpoint of intex transfer is "------------" plasmaWordIndexDistribution.this.log.logFine("Selected hash " + startPointHash + " as start point for index distribution of whole index"); - long start; + /* Loop until we have + * - finished transfer of whole index + * - detected a server shutdown or user interruption + * - detected a failure + */ + long start, retryCount = 0; while (!finished && !Thread.currentThread().isInterrupted()) { + int idxCount = 0; start = System.currentTimeMillis(); + + // selecting 500 words to transfer Object[] selectResult = selectTransferIndexes(startPointHash, 500); - plasmaWordIndexEntity[] indexEntities = (plasmaWordIndexEntity[]) selectResult[0]; - if (finished || Thread.currentThread().isInterrupted()) { - this.status = "aborted"; - return; - } + plasmaWordIndexEntity[] indexEntities = (plasmaWordIndexEntity[]) selectResult[0]; HashMap urlCache = (HashMap) selectResult[1]; // String (url-hash) / plasmaCrawlLURL.Entry if ((indexEntities == null) || (indexEntities.length == 0)) { @@ -532,13 +537,10 @@ public class plasmaWordIndexDistribution { this.status = "finished."; return; } - // count the indexes again, can be smaller as expected - int idxCount = 0; - for (int i = 0; i < indexEntities.length; i++) { - idxCount += indexEntities[i].size(); - } + // count the indexes again, can be smaller as expected + for (int i = 0; i < indexEntities.length; i++) idxCount += indexEntities[i].size(); - // find start point for DHT-selection + // getting start point for next DHT-selection oldStartingPointHash = startPointHash; startPointHash = indexEntities[indexEntities.length - 1].wordHash(); // DHT targets must have greater hashes @@ -547,20 +549,70 @@ public class plasmaWordIndexDistribution { ((System.currentTimeMillis() - start) / 1000) + " seconds (" + (1000 * idxCount / (System.currentTimeMillis() - start + 1)) + " words/s)"); + /* loop until we + * - have successfully transfered the words list or + * - the retry counter limit was exceeded + */ start = System.currentTimeMillis(); - String error = yacyClient.transferIndex(seed, indexEntities, urlCache); - if (error == null) { - plasmaWordIndexDistribution.this.log.logInfo("Index transfer of " + idxCount + " words [" + indexEntities[0].wordHash() + " .. " + indexEntities[indexEntities.length-1].wordHash() + "]" + - " to peer " + seed.getName() + ":" + seed.hash + " in " + - ((System.currentTimeMillis() - start) / 1000) + " seconds successfull (" + - (1000 * idxCount / (System.currentTimeMillis() - start + 1)) + " words/s)"); - } else { - plasmaWordIndexDistribution.this.log.logWarning("Index transfer to peer " + seed.getName() + ":" + seed.hash + " failed:'" + error + "', disconnecting peer"); - yacyCore.peerActions.peerDeparture(seed); - this.status = "Disconnected peer: " + error; - return; - } + while (true) { + // testing if we wer aborted + if (isAborted()) return; + + // transfering seleted words to remote peer + String error = yacyClient.transferIndex(seed, indexEntities, urlCache); + if (error == null) { + // words successfully transfered + plasmaWordIndexDistribution.this.log.logInfo("Index transfer of " + idxCount + " words [" + indexEntities[0].wordHash() + " .. " + indexEntities[indexEntities.length-1].wordHash() + "]" + + " to peer " + seed.getName() + ":" + seed.hash + " in " + + ((System.currentTimeMillis() - start) / 1000) + " seconds successfull (" + + (1000 * idxCount / (System.currentTimeMillis() - start + 1)) + " words/s)"); + retryCount = 0; + break; + } else { + // worts transfer failed + + // inc retry counter + retryCount++; + + // testing if we were aborted ... + if (isAborted()) return; + + // we have lost the connection to the remote peer. Adding peer to disconnected list + plasmaWordIndexDistribution.this.log.logWarning("Index transfer to peer " + seed.getName() + ":" + seed.hash + " failed:'" + error + "', disconnecting peer"); + yacyCore.peerActions.peerDeparture(seed); + + // if the retry counter limit was not exceeded we'll retry it in a few seconds + this.status = "Disconnected peer: " + ((retryCount > 5)? error + ". Transfer aborted":"Retry " + retryCount); + if (retryCount > 5) return; + Thread.sleep(retryCount*5000); + + /* loop until + * - we have successfully done a peer ping or + * - the retry counter limit was exceeded + */ + while (true) { + // testing if we were aborted ... + if (isAborted()) return; + + // doing a peer ping to the remote seed + int added = yacyClient.publishMySeed(seed.getAddress(), seed.hash); + if (added < 0) { + // inc. retry counter + retryCount++; + this.status = "Disconnected peer: Peer ping failed. " + ((retryCount > 5)?"Transfer aborted.":"Retry " + retryCount); + if (retryCount > 5) return; + Thread.sleep(retryCount*5000); + continue; + } else { + seed = yacyCore.seedDB.getConnected(seed.hash); + this.status = "running"; + break; + } + } + } + } + // deleting transfered words from index if (delete) { try { if (deleteTransferIndexes(indexEntities)) { @@ -580,6 +632,8 @@ public class plasmaWordIndexDistribution { transferedIndexCount += idxCount; } } + + // if we reach this point we were aborted by the user or by server shutdown this.status = "aborted"; } catch (Exception e) { this.status = "Error: " + e.getMessage(); @@ -587,6 +641,14 @@ public class plasmaWordIndexDistribution { } finally { plasmaWordIndexDistribution.this.paused = false; } - } - } + } + + private boolean isAborted() { + if (finished || Thread.currentThread().isInterrupted()) { + this.status = "aborted"; + return true; + } + return false; + } + } }