From d3e33fd6c141bd3d0211647cdf1f477f0b2f0aa7 Mon Sep 17 00:00:00 2001 From: orbiter Date: Sun, 1 Feb 2009 23:02:40 +0000 Subject: [PATCH] removed strange retry logic from DHT transfer git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@5564 6c8d7289-2bf4-0310-a012-ef5d649a1542 --- source/de/anomic/plasma/plasmaDHTFlush.java | 5 +- .../de/anomic/plasma/plasmaDHTTransfer.java | 173 ++++-------------- .../de/anomic/plasma/plasmaSwitchboard.java | 5 +- 3 files changed, 42 insertions(+), 141 deletions(-) diff --git a/source/de/anomic/plasma/plasmaDHTFlush.java b/source/de/anomic/plasma/plasmaDHTFlush.java index c61bc6d6d..0be225210 100644 --- a/source/de/anomic/plasma/plasmaDHTFlush.java +++ b/source/de/anomic/plasma/plasmaDHTFlush.java @@ -216,10 +216,9 @@ public class plasmaDHTFlush extends Thread { oldDHTChunk = null; } - // handover chunk to transfer worker + // hand-over chunk to transfer worker if ((newDHTChunk.containerSize() > 0) || (newDHTChunk.getStatus() == plasmaDHTChunk.chunkStatus_FILLED)) { - this.worker = new plasmaDHTTransfer(this.log, this.wordIndex.seedDB, this.wordIndex.peerActions, this.seed, newDHTChunk, this.gzipBody4Transfer, this.timeout4Transfer, 5); - this.worker.setTransferMode(plasmaDHTTransfer.TRANSFER_MODE_FLUSH); + this.worker = new plasmaDHTTransfer(this.log, this.wordIndex.seedDB, this.wordIndex.peerActions, this.seed, newDHTChunk, this.gzipBody4Transfer, this.timeout4Transfer); this.worker.start(); } } diff --git a/source/de/anomic/plasma/plasmaDHTTransfer.java b/source/de/anomic/plasma/plasmaDHTTransfer.java index 1ac2ad365..69ff0dab9 100644 --- a/source/de/anomic/plasma/plasmaDHTTransfer.java +++ b/source/de/anomic/plasma/plasmaDHTTransfer.java @@ -56,8 +56,6 @@ public class plasmaDHTTransfer extends Thread { // other fields private final yacySeedDB seedDB; private final yacyPeerActions peerActions; - private final int maxRetry; - private int transferMode = TRANSFER_MODE_DISTRIBUTION; Log log; public plasmaDHTTransfer( @@ -67,8 +65,7 @@ public class plasmaDHTTransfer extends Thread { final yacySeed destSeed, final plasmaDHTChunk dhtChunk, final boolean gzipBody, - final int timeout, - final int retries + final int timeout ) { super(new ThreadGroup("TransferIndexThreadGroup"), "TransferIndexWorker_" + destSeed.getName()); this.log = log; @@ -77,13 +74,8 @@ public class plasmaDHTTransfer extends Thread { this.gzipBody4Transfer = gzipBody; this.timeout4Transfer = timeout; this.dhtChunk = dhtChunk; - this.maxRetry = retries; this.seed = destSeed; } - - public void setTransferMode(final int mode) { - this.transferMode = mode; - } public void run() { try { @@ -133,138 +125,49 @@ public class plasmaDHTTransfer extends Thread { * - the retry counter limit was exceeded */ this.transferStatus = plasmaDHTChunk.chunkStatus_RUNNING; - long retryCount = 0; final long start = System.currentTimeMillis(); - while (true) { - // testing if we were aborted - if (this.isAborted()) return; - - // transfering seleted words to remote peer - this.transferStatusMessage = "Running: Transfering chunk to target " + this.seed.hash + "/" + this.seed.getName(); - final HashMap result = yacyClient.transferIndex(this.seedDB, this.seed, this.dhtChunk.containers(), this.dhtChunk.urlCacheMap(), this.gzipBody4Transfer, this.timeout4Transfer); - final String error = (String) result.get("result"); - if (error == null) { - // words successfully transfered - this.transferTime = System.currentTimeMillis() - start; - this.payloadSize = ((Integer)result.get("payloadSize")).intValue(); - - this.log.logInfo("Index transfer of " + this.dhtChunk.indexCount() + - " entries " + this.dhtChunk.containerSize() + - " words [" + this.dhtChunk.firstContainer().getWordHash() + " .. " + this.dhtChunk.lastContainer().getWordHash() + "]" + - " and " + this.dhtChunk.urlCacheMap().size() + " URLs" + - " to peer " + this.seed.getName() + ":" + this.seed.hash + - " in " + (this.transferTime / 1000) + - " seconds successful (" + (1000 * this.dhtChunk.indexCount() / (this.transferTime + 1)) + - " words/s, " + this.payloadSize + " Bytes)"); - - // if the peer has set a pause time and we are in flush mode (index transfer) - // then we pause for a while now - if (this.transferMode == TRANSFER_MODE_FLUSH) { - final long pause = getBusyTime(result); - if (pause != -1) { - this.transferStatusMessage = "Finished: Transfer of chunk to target " + this.seed.hash + "/" + this.seed.getName() + ". Pausing " + pause + " ms."; - this.pause(pause); - } - } else { - this.transferStatusMessage = "Finished: Transfer of chunk to target " + this.seed.hash + "/" + this.seed.getName(); - } - - // transfer of chunk finished - this.transferStatus = plasmaDHTChunk.chunkStatus_COMPLETE; - retryCount = 0; - - break; - } + // testing if we were aborted + if (this.isAborted()) return; + + // transferring selected words to remote peer + this.transferStatusMessage = "Running: Transfering chunk to target " + this.seed.hash + "/" + this.seed.getName(); + final HashMap result = yacyClient.transferIndex(this.seedDB, this.seed, this.dhtChunk.containers(), this.dhtChunk.urlCacheMap(), this.gzipBody4Transfer, this.timeout4Transfer); + final String error = (String) result.get("result"); + if (error == null) { + // words successfully transfered + this.transferTime = System.currentTimeMillis() - start; + this.payloadSize = ((Integer)result.get("payloadSize")).intValue(); - // inc retry counter - retryCount++; - if (this.isAborted()) return; - - boolean reconnectNeeded = false; - long pauseTime = 1; + this.log.logInfo("Index transfer of " + this.dhtChunk.indexCount() + + " entries " + this.dhtChunk.containerSize() + + " words [" + this.dhtChunk.firstContainer().getWordHash() + " .. " + this.dhtChunk.lastContainer().getWordHash() + "]" + + " and " + this.dhtChunk.urlCacheMap().size() + " URLs" + + " to peer " + this.seed.getName() + ":" + this.seed.hash + + " in " + (this.transferTime / 1000) + + " seconds successful (" + (1000 * this.dhtChunk.indexCount() / (this.transferTime + 1)) + + " words/s, " + this.payloadSize + " Bytes)"); - if (error.equals("busy")) { - // get pause time that was requested by the remote peer - pauseTime = getBusyTime(result); - if (pauseTime == -1) pauseTime = 60000; - - this.transferStatusMessage = "Peer " + this.seed.getName() + ":" + this.seed.hash + " is busy. Waiting " + pauseTime + " ms."; - this.log.logInfo(this.transferStatusMessage); - } else { - this.transferStatusMessage = "Transfer to peer " + this.seed.getName() + ":" + this.seed.hash + " failed:'" + error + "', Trying to reconnect ..."; - - // force disconnection of peer - peerActions.peerDeparture(this.seed, "DHT Transfer: " + this.transferStatusMessage); - this.log.logWarning(this.transferStatusMessage); - - // calculate pause time - pauseTime = retryCount * 10000; - reconnectNeeded = true; - } + // if the peer has set a pause time and we are in flush mode (index transfer) + // then we pause for a while now + this.transferStatusMessage = "Finished: Transfer of chunk to target " + this.seed.hash + "/" + this.seed.getName(); - // if the retry counter limit was not exceeded we'll retry it in a few seconds - if (retryCount > this.maxRetry) { - this.transferStatusMessage = "Transfer aborted. Retry limit reached."; - this.transferStatus = plasmaDHTChunk.chunkStatus_FAILED; - return; - } - - // sleep for a while - this.pause(pauseTime); + // transfer of chunk finished + this.transferStatus = plasmaDHTChunk.chunkStatus_COMPLETE; + return; + } + + if (this.isAborted()) return; + + if (error.equals("busy")) { + this.transferStatusMessage = "Peer " + this.seed.getName() + ":" + this.seed.hash + " is busy."; + this.log.logInfo(this.transferStatusMessage); + } else { + this.transferStatusMessage = "Transfer to peer " + this.seed.getName() + ":" + this.seed.hash + " failed:'" + error + "', Trying to reconnect ..."; - // reconnect to peer if needed - if (reconnectNeeded) { - - /* loop until - * - we have successfully done a peer ping or - * - the retry counter limit was exceeded - */ - while (true) { - // testing if we were aborted ... - if (this.isAborted()) - return; + // force disconnection of peer + peerActions.peerDeparture(this.seed, "DHT Transfer: " + this.transferStatusMessage); + this.log.logWarning(this.transferStatusMessage); - // doing a peer ping to the remote seed - final int added = yacyClient.publishMySeed(this.seedDB.mySeed(), this.peerActions, this.seed.getPublicAddress(), this.seed.hash); - if (added < 0) { - // inc. retry counter - retryCount++; - this.transferStatusMessage = "Disconnected peer: Peer ping failed. " + ((retryCount > 5) ? "Transfer aborted." : "Retry " + retryCount); - if (retryCount > this.maxRetry) return; - this.pause(retryCount * 10000); - continue; - } - - this.seedDB.getConnected(this.seed.hash); - this.transferStatusMessage = "running"; - break; - } - } } } - - @SuppressWarnings("unchecked") - private long getBusyTime(final HashMap result) { - int pause = -1; - final Object transferRWIResult = result.get("resultTransferRWI"); - assert transferRWIResult instanceof HashMap; - if (transferRWIResult != null && ((HashMap) transferRWIResult).containsKey("pause")) { - final String pauseStr = ((HashMap) transferRWIResult).get("pause"); - try { pause = Integer.valueOf(pauseStr).intValue(); } catch (final NumberFormatException numEx){} - if (pause < 0) pause = 5000; - else if (pause > 30000) pause = 30000; - } - return pause; - } - - private void pause(final long sleepTime) throws InterruptedException { - if (sleepTime == 0) return; - long sleepCounter = sleepTime / 1000; - final long sleepRest = sleepTime % 1000; - while (!this.isAborted() && sleepCounter > 0) { - sleepCounter--; - Thread.sleep(1000); - } - if (sleepRest > 0) Thread.sleep(sleepRest); - } } diff --git a/source/de/anomic/plasma/plasmaSwitchboard.java b/source/de/anomic/plasma/plasmaSwitchboard.java index 9450383c3..21d92f57a 100644 --- a/source/de/anomic/plasma/plasmaSwitchboard.java +++ b/source/de/anomic/plasma/plasmaSwitchboard.java @@ -2015,8 +2015,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch