diff --git a/htroot/IndexControl_p.java b/htroot/IndexControl_p.java index 1e9f54c12..bf79af636 100644 --- a/htroot/IndexControl_p.java +++ b/htroot/IndexControl_p.java @@ -273,12 +273,13 @@ public class IndexControl_p { // use whats remaining String gzipBody = switchboard.getConfig("indexControl.gzipBody","false"); int timeout = (int) switchboard.getConfigLong("indexControl.timeout",60000); - result = yacyClient.transferIndex( + HashMap resultObj = yacyClient.transferIndex( yacyCore.seedDB.getConnected(post.get("hostHash", "")), new indexContainer[]{index}, knownURLs, "true".equalsIgnoreCase(gzipBody), timeout); + result = (String) resultObj.get("result"); prop.put("result", (result == null) ? ("Successfully transferred " + index.size() + " words in " + ((System.currentTimeMillis() - starttime) / 1000) + " seconds") : result); index = null; } diff --git a/htroot/IndexTransfer_p.html b/htroot/IndexTransfer_p.html index 672266e7d..55654314c 100644 --- a/htroot/IndexTransfer_p.html +++ b/htroot/IndexTransfer_p.html @@ -62,6 +62,7 @@ #[selection.twrange]# #[twEntityCount]# Entities (#[twEntityPercent]#%)
#[twEntryCount]# Entries
+ #[twPayloadSize]#
#[twEntitySpeed]# entities/s #(deleteIndex)#false::true#(/deleteIndex)# #[peerName]# diff --git a/htroot/IndexTransfer_p.java b/htroot/IndexTransfer_p.java index b8dcc8868..e1059a9b8 100644 --- a/htroot/IndexTransfer_p.java +++ b/htroot/IndexTransfer_p.java @@ -55,6 +55,7 @@ import java.util.TreeMap; import de.anomic.http.httpHeader; import de.anomic.plasma.plasmaSwitchboard; +import de.anomic.server.serverMemory; import de.anomic.server.serverObjects; import de.anomic.server.serverSwitch; import de.anomic.yacy.yacyCore; @@ -113,6 +114,7 @@ public final class IndexTransfer_p { prop.put("running_twEntityCount",switchboard.transferIdxThread.getTransferedContainerCount()); prop.put("running_twEntryCount",switchboard.transferIdxThread.getTransferedEntryCount()); + prop.put("running_twPayloadSize",serverMemory.bytesToString(switchboard.transferIdxThread.getTransferedBytes())); prop.put("running_twEntityPercent",Float.toString(switchboard.transferIdxThread.getTransferedContainerPercent())); prop.put("running_twEntitySpeed",Integer.toString(switchboard.transferIdxThread.getTransferedEntitySpeed())); diff --git a/source/de/anomic/plasma/plasmaDHTFlush.java b/source/de/anomic/plasma/plasmaDHTFlush.java index b25196926..ccb7a3a5b 100644 --- a/source/de/anomic/plasma/plasmaDHTFlush.java +++ b/source/de/anomic/plasma/plasmaDHTFlush.java @@ -52,6 +52,7 @@ public class plasmaDHTFlush extends Thread { private boolean gzipBody4Transfer = false; private int timeout4Transfer = 60000; private int transferedEntryCount = 0; + private long transferedBytes = 0; private int transferedContainerCount = 0; private String status = "Running"; private String oldStartingPointHash = "------------", startPointHash = "------------"; @@ -82,6 +83,7 @@ public class plasmaDHTFlush extends Thread { public void stopIt(boolean wait) throws InterruptedException { this.finished = true; + if (this.worker != null) this.worker.stopIt(); if (wait) this.join(); } @@ -109,6 +111,10 @@ public class plasmaDHTFlush extends Thread { return this.transferedContainerCount; } + public long getTransferedBytes() { + return this.transferedBytes; + } + public float getTransferedContainerPercent() { long currentWordsDBSize = this.sb.wordIndex.size(); if (this.initialWordsDBSize == 0) return 100; @@ -205,11 +211,13 @@ public class plasmaDHTFlush extends Thread { // calculationg the new transfer size this.calculateNewChunkSize(); - this.worker = null; // counting transfered containers / entries this.transferedEntryCount += oldDHTChunk.indexCount(); this.transferedContainerCount += oldDHTChunk.containerSize(); + this.transferedBytes += this.worker.getPayloadSize(); + + this.worker = null; // deleting transfered words from index if (this.delete) { @@ -223,6 +231,7 @@ public class plasmaDHTFlush extends Thread { // handover chunk to transfer worker if ((newDHTChunk.containerSize() > 0) || (newDHTChunk.getStatus() == plasmaDHTChunk.chunkStatus_FILLED)) { this.worker = new plasmaDHTTransfer(this.log, this.seed, newDHTChunk, this.gzipBody4Transfer, this.timeout4Transfer, 5); + this.worker.setTransferMode(plasmaDHTTransfer.TRANSFER_MODE_FLUSH); this.worker.start(); } } diff --git a/source/de/anomic/plasma/plasmaDHTTransfer.java b/source/de/anomic/plasma/plasmaDHTTransfer.java index a92d08f7d..89fd02b25 100644 --- a/source/de/anomic/plasma/plasmaDHTTransfer.java +++ b/source/de/anomic/plasma/plasmaDHTTransfer.java @@ -43,6 +43,9 @@ package de.anomic.plasma; import java.lang.StringBuffer; +import java.util.HashMap; + +import de.anomic.server.serverDate; import de.anomic.server.logging.serverLog; import de.anomic.yacy.yacyClient; import de.anomic.yacy.yacyCore; @@ -50,13 +53,17 @@ import de.anomic.yacy.yacySeed; public class plasmaDHTTransfer extends Thread { + public static final int TRANSFER_MODE_DISTRIBUTION = 0; + public static final int TRANSFER_MODE_FLUSH = 1; + // connection properties private boolean gzipBody4Transfer = false; - private int timeout4Transfer = 60000; + private int timeout4Transfer = 60000; // status fields private boolean stopped = false; private long transferTime = 0; + private long payloadSize = 0; private int transferStatus = plasmaDHTChunk.chunkStatus_UNDEFINED; private String transferStatusMessage = ""; @@ -68,9 +75,17 @@ public class plasmaDHTTransfer extends Thread { // other fields private int maxRetry; + private int transferMode = TRANSFER_MODE_DISTRIBUTION; serverLog log; - public plasmaDHTTransfer(serverLog log, yacySeed destSeed, plasmaDHTChunk dhtChunk, boolean gzipBody, int timeout, int retries) { + public plasmaDHTTransfer( + serverLog log, + yacySeed destSeed, + plasmaDHTChunk dhtChunk, + boolean gzipBody, + int timeout, + int retries + ) { super(new ThreadGroup("TransferIndexThreadGroup"), "TransferIndexWorker_" + destSeed.getName()); this.log = log; this.gzipBody4Transfer = gzipBody; @@ -80,6 +95,10 @@ public class plasmaDHTTransfer extends Thread { this.seed = destSeed; } + public void setTransferMode(int mode) { + this.transferMode = mode; + } + public void run() { try { this.uploadIndex(); @@ -90,6 +109,8 @@ public class plasmaDHTTransfer extends Thread { private boolean isAborted() { if (this.stopped || Thread.currentThread().isInterrupted()) { + this.transferStatus = plasmaDHTChunk.chunkStatus_INTERRUPTED; + this.transferStatusMessage = "aborted"; return true; } return false; @@ -103,6 +124,10 @@ public class plasmaDHTTransfer extends Thread { return this.transferTime; } + public long getPayloadSize() { + return this.payloadSize; + } + public int getStatus() { return this.transferStatus; } @@ -129,64 +154,125 @@ public class plasmaDHTTransfer extends Thread { // transfering seleted words to remote peer this.transferStatusMessage = "Running: Transfering chunk to target " + this.seed.hash + "/" + this.seed.getName(); - String error = yacyClient.transferIndex(this.seed, this.dhtChunk.containers(), this.dhtChunk.urlCacheMap(), this.gzipBody4Transfer, this.timeout4Transfer); + HashMap result = yacyClient.transferIndex(this.seed, this.dhtChunk.containers(), this.dhtChunk.urlCacheMap(), this.gzipBody4Transfer, this.timeout4Transfer); + String error = (String) result.get("result"); if (error == null) { // words successfully transfered - this.transferTime = System.currentTimeMillis() - start; - this.log.logInfo("Index transfer of " + this.dhtChunk.indexCount() + " words [" + this.dhtChunk.firstContainer().wordHash() + " .. " + this.dhtChunk.lastContainer().wordHash() + "]" + " to peer " + this.seed.getName() + ":" + this.seed.hash + " in " + (this.transferTime / 1000) + " seconds successful (" - + (1000 * this.dhtChunk.indexCount() / (this.transferTime + 1)) + " words/s)"); - retryCount = 0; - this.transferStatusMessage = "Finished: Transfer of chunk to target " + this.seed.hash + "/" + this.seed.getName(); + this.transferTime = System.currentTimeMillis() - start; + this.payloadSize = ((Integer)result.get("payloadSize")).intValue(); + + this.log.logInfo("Index transfer of " + this.dhtChunk.indexCount() + + " words [" + this.dhtChunk.firstContainer().wordHash() + " .. " + this.dhtChunk.lastContainer().wordHash() + "]" + + " to peer " + this.seed.getName() + ":" + this.seed.hash + + " in " + (this.transferTime / 1000) + + " seconds successful (" + (1000 * this.dhtChunk.indexCount() / (this.transferTime + 1)) + + " words/s, " + this.payloadSize + " Bytes)"); + + // if the peer has set a pause time and we are in flush mode (index transfer) + // then we pause for a while now + if (this.transferMode == TRANSFER_MODE_FLUSH) { + long pause = getBusyTime(result); + if (pause != -1) { + this.transferStatusMessage = "Finished: Transfer of chunk to target " + this.seed.hash + "/" + this.seed.getName() + ". Pausing " + pause + " ms."; + this.pause(pause); + } + } else { + this.transferStatusMessage = "Finished: Transfer of chunk to target " + this.seed.hash + "/" + this.seed.getName(); + } + + // transfer of chunk finished this.transferStatus = plasmaDHTChunk.chunkStatus_COMPLETE; + retryCount = 0; + break; - } + } // inc retry counter retryCount++; + if (this.isAborted()) return; - // testing if we were aborted ... - if (this.isAborted()) return; - - // we have lost the connection to the remote peer. Adding peer to disconnected list - StringBuffer failMessage = new StringBuffer("Index transfer to peer " + this.seed.getName() + ":" + this.seed.hash + " failed:'" + error + "'"); - if (!error.equals("busy")) { + boolean reconnectNeeded = false; + long pauseTime = 1; + + if (error.equals("busy")) { + // get pause time that was requested by the remote peer + pauseTime = getBusyTime(result); + if (pauseTime == -1) pauseTime = 60000; + + this.transferStatusMessage = "Peer " + this.seed.getName() + ":" + this.seed.hash + " is busy. Waiting " + pauseTime + " ms."; + this.log.logInfo(this.transferStatusMessage); + } else { + this.transferStatusMessage = "Transfer to peer " + this.seed.getName() + ":" + this.seed.hash + " failed:'" + error + "', Trying to reconnect ..."; + + // force disconnection of peer yacyCore.peerActions.peerDeparture(this.seed); - failMessage.append(", disconnected peer"); + this.log.logWarning(this.transferStatusMessage); + + // calculate pause time + pauseTime = retryCount * 10000; + reconnectNeeded = true; } - this.log.logWarning(failMessage.toString()); - - // if the retry counter limit was not exceeded we'll retry it in a few seconds - this.transferStatusMessage = "Disconnected peer: " + ((retryCount > 5) ? error + ". Transfer aborted" : "Retry " + retryCount); + + // if the retry counter limit was not exceeded we'll retry it in a few seconds if (retryCount > this.maxRetry) { + this.transferStatusMessage = "Transfer aborted. Retry limit reached."; this.transferStatus = plasmaDHTChunk.chunkStatus_FAILED; return; - } - Thread.sleep(retryCount * 5000); - - /* loop until - * - we have successfully done a peer ping or - * - the retry counter limit was exceeded - */ - while (true) { - // testing if we were aborted ... - if (this.isAborted()) - return; - - // doing a peer ping to the remote seed - int added = yacyClient.publishMySeed(this.seed.getAddress(), this.seed.hash); - if (added < 0) { - // inc. retry counter - retryCount++; - this.transferStatusMessage = "Disconnected peer: Peer ping failed. " + ((retryCount > 5) ? "Transfer aborted." : "Retry " + retryCount); - if (retryCount > this.maxRetry) return; - Thread.sleep(retryCount * 5000); - continue; - } + } + + // sleep for a while + this.pause(pauseTime); + + // reconnect to peer if needed + if (reconnectNeeded) { - yacyCore.seedDB.getConnected(this.seed.hash); - this.transferStatusMessage = "running"; - break; + /* loop until + * - we have successfully done a peer ping or + * - the retry counter limit was exceeded + */ + while (true) { + // testing if we were aborted ... + if (this.isAborted()) + return; + + // doing a peer ping to the remote seed + int added = yacyClient.publishMySeed(this.seed.getAddress(), this.seed.hash); + if (added < 0) { + // inc. retry counter + retryCount++; + this.transferStatusMessage = "Disconnected peer: Peer ping failed. " + ((retryCount > 5) ? "Transfer aborted." : "Retry " + retryCount); + if (retryCount > this.maxRetry) return; + this.pause(retryCount * 10000); + continue; + } + + yacyCore.seedDB.getConnected(this.seed.hash); + this.transferStatusMessage = "running"; + break; + } } } } + + private long getBusyTime(HashMap result) { + int pause = -1; + HashMap transferRWIResult = (HashMap) result.get("resultTransferRWI"); + if (transferRWIResult != null && transferRWIResult.containsKey("pause")) { + String pauseStr = (String) transferRWIResult.get("pause"); + try { pause = Integer.valueOf(pauseStr).intValue(); } catch (NumberFormatException numEx){} + if (pause <= 0) pause = 60000; + else if (pause > 1800000) pause = 1800000; + } + return pause; + } + + private void pause(long sleepTime) throws InterruptedException { + long sleepCounter = sleepTime / 1000; + long sleepRest = sleepTime % 1000; + while (!this.isAborted() && sleepCounter > 0) { + sleepCounter--; + Thread.sleep(1000); + } + if (sleepRest > 0) Thread.sleep(sleepRest); + } } diff --git a/source/de/anomic/plasma/plasmaSwitchboard.java b/source/de/anomic/plasma/plasmaSwitchboard.java index b3c567bb3..942d9766a 100644 --- a/source/de/anomic/plasma/plasmaSwitchboard.java +++ b/source/de/anomic/plasma/plasmaSwitchboard.java @@ -1503,13 +1503,13 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser words = condenser.RESULT_SIMI_WORDS; // transfering the index to the storage peer - String error = yacyClient.transferIndex( + HashMap resultObj = yacyClient.transferIndex( seed, (indexTreeMapContainer[])tmpContainers.toArray(new indexTreeMapContainer[tmpContainers.size()]), urlCache, true, 120000); - + String error = (String) resultObj.get("result"); if (error != null) { words = wordIndex.addPageIndex(entry.url(), urlHash, docDate, (int) entry.size(), document, condenser, diff --git a/source/de/anomic/yacy/yacyClient.java b/source/de/anomic/yacy/yacyClient.java index 1626dab5a..182f73188 100644 --- a/source/de/anomic/yacy/yacyClient.java +++ b/source/de/anomic/yacy/yacyClient.java @@ -878,62 +878,94 @@ public final class yacyClient { } } - public static String transferIndex(yacySeed targetSeed, indexContainer[] indexes, HashMap urlCache, boolean gzipBody, int timeout) { + public static HashMap transferIndex(yacySeed targetSeed, indexContainer[] indexes, HashMap urlCache, boolean gzipBody, int timeout) { - // check if we got all necessary urls in the urlCache (only for debugging) - Iterator eenum; - indexURLEntry entry; - for (int i = 0; i < indexes.length; i++) { - eenum = indexes[i].entries(); - while (eenum.hasNext()) { - entry = (indexURLEntry) eenum.next(); - if (urlCache.get(entry.getUrlHash()) == null) { - yacyCore.log.logFine("DEBUG transferIndex: to-send url hash '" + entry.getUrlHash() + "' is not contained in urlCache"); + HashMap resultObj = new HashMap(); + int payloadSize = 0; + try { + + // check if we got all necessary urls in the urlCache (only for debugging) + Iterator eenum; + indexURLEntry entry; + for (int i = 0; i < indexes.length; i++) { + eenum = indexes[i].entries(); + while (eenum.hasNext()) { + entry = (indexURLEntry) eenum.next(); + if (urlCache.get(entry.getUrlHash()) == null) { + yacyCore.log.logFine("DEBUG transferIndex: to-send url hash '" + entry.getUrlHash() + "' is not contained in urlCache"); + } } + } + + // transfer the RWI without the URLs + HashMap in = transferRWI(targetSeed, indexes, gzipBody, timeout); + resultObj.put("resultTransferRWI", in); + + if (in == null) { + resultObj.put("result", "no_connection_1"); + return resultObj; + } + if (in.containsKey("indexPayloadSize")) payloadSize += ((Integer)in.get("indexPayloadSize")).intValue(); + + String result = (String) in.get("result"); + if (result == null) { + resultObj.put("result","no_result_1"); + return resultObj; } - } - - // transfer the RWI without the URLs - HashMap in = transferRWI(targetSeed, indexes, gzipBody, timeout); - if (in == null) { return "no_connection_1"; } - String result = (String) in.get("result"); - if (result == null) { return "no_result_1"; } - if (!(result.equals("ok"))) { - targetSeed.setFlagAcceptRemoteIndex(false); - yacyCore.seedDB.update(targetSeed.hash, targetSeed); - return result; - } - - // in now contains a list of unknown hashes - final String uhss = (String) in.get("unknownURL"); - if (uhss == null) { return "no_unknownURL_tag_in_response"; } - if (uhss.length() == 0) { return null; } // all url's known, we are ready here - - final String[] uhs = uhss.split(","); - if (uhs.length == 0) { return null; } // all url's known - - // extract the urlCache from the result - plasmaCrawlLURL.Entry[] urls = new plasmaCrawlLURL.Entry[uhs.length]; - for (int i = 0; i < uhs.length; i++) { - urls[i] = (plasmaCrawlLURL.Entry) urlCache.get(uhs[i]); - if (urls[i] == null) { - yacyCore.log.logFine("DEBUG transferIndex: requested url hash '" + uhs[i] + "', unknownURL='" + uhss + "'"); + if (!(result.equals("ok"))) { + targetSeed.setFlagAcceptRemoteIndex(false); + yacyCore.seedDB.update(targetSeed.hash, targetSeed); + resultObj.put("result",result); + return resultObj; } + + // in now contains a list of unknown hashes + final String uhss = (String) in.get("unknownURL"); + if (uhss == null) { + resultObj.put("result","no_unknownURL_tag_in_response"); + return resultObj; + } + if (uhss.length() == 0) { return resultObj; } // all url's known, we are ready here + + final String[] uhs = uhss.split(","); + if (uhs.length == 0) { return resultObj; } // all url's known + + // extract the urlCache from the result + plasmaCrawlLURL.Entry[] urls = new plasmaCrawlLURL.Entry[uhs.length]; + for (int i = 0; i < uhs.length; i++) { + urls[i] = (plasmaCrawlLURL.Entry) urlCache.get(uhs[i]); + if (urls[i] == null) { + yacyCore.log.logFine("DEBUG transferIndex: requested url hash '" + uhs[i] + "', unknownURL='" + uhss + "'"); + } + } + + in = transferURL(targetSeed, urls, gzipBody, timeout); + resultObj.put("resultTransferURL", in); + + if (in == null) { + resultObj.put("result","no_connection_2"); + return resultObj; + } + if (in.containsKey("urlPayloadSize")) payloadSize += ((Integer)in.get("urlPayloadSize")).intValue(); + + result = (String) in.get("result"); + if (result == null) { + resultObj.put("result","no_result_2"); + return resultObj; + } + if (!(result.equals("ok"))) { + targetSeed.setFlagAcceptRemoteIndex(false); + yacyCore.seedDB.update(targetSeed.hash, targetSeed); + resultObj.put("result",result); + return resultObj; + } + // int doubleentries = Integer.parseInt((String) in.get("double")); + // System.out.println("DEBUG tansferIndex: transferred " + uhs.length + " URL's, double=" + doubleentries); + + return resultObj; + } finally { + resultObj.put("payloadSize", new Integer(payloadSize)); } - - in = transferURL(targetSeed, urls, gzipBody, timeout); - if (in == null) { return "no_connection_2"; } - result = (String) in.get("result"); - if (result == null) { return "no_result_2"; } - if (!(result.equals("ok"))) { - targetSeed.setFlagAcceptRemoteIndex(false); - yacyCore.seedDB.update(targetSeed.hash, targetSeed); - return result; - } -// int doubleentries = Integer.parseInt((String) in.get("double")); -// System.out.println("DEBUG tansferIndex: transferred " + uhs.length + " URL's, double=" + doubleentries); - - return null; } private static HashMap transferRWI(yacySeed targetSeed, indexContainer[] indexes, boolean gzipBody, int timeout) { @@ -1001,6 +1033,8 @@ public final class yacyClient { } final HashMap result = nxTools.table(v); + // return the transfered index data in bytes (for debugging only) + result.put("indexPayloadSize", new Integer(entrypost.length())); return result; } catch (Exception e) { yacyCore.log.logSevere("yacyClient.transferRWI error:" + e.getMessage()); @@ -1032,11 +1066,13 @@ public final class yacyClient { post.put("youare", targetSeed.hash); String resource = ""; int urlc = 0; + int urlPayloadSize = 0; for (int i = 0; i < urls.length; i++) { if (urls[i] != null) { - resource = urls[i].toString(); + resource = urls[i].toString(); if (resource != null) { post.put("url" + urlc, resource); + urlPayloadSize += resource.length(); urlc++; } } @@ -1057,7 +1093,11 @@ public final class yacyClient { if (v != null) { yacyCore.seedDB.mySeed.incSU(urlc); } - return nxTools.table(v); + + HashMap result = nxTools.table(v); + // return the transfered url data in bytes (for debugging only) + result.put("urlPayloadSize", new Integer(urlPayloadSize)); + return result; } catch (Exception e) { yacyCore.log.logSevere("yacyClient.transferRWI error:" + e.getMessage()); return null;