*) Index transfer now considers the pause time send by busy peers during

index transfer / index distribution
   See: http://www.yacy-forum.de/viewtopic.php?p=22647#22491

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@2205 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
theli 19 years ago
parent e8848a3532
commit 4ca0857c0c

@ -273,12 +273,13 @@ public class IndexControl_p {
// use whats remaining
String gzipBody = switchboard.getConfig("indexControl.gzipBody","false");
int timeout = (int) switchboard.getConfigLong("indexControl.timeout",60000);
result = yacyClient.transferIndex(
HashMap resultObj = yacyClient.transferIndex(
yacyCore.seedDB.getConnected(post.get("hostHash", "")),
new indexContainer[]{index},
knownURLs,
"true".equalsIgnoreCase(gzipBody),
timeout);
result = (String) resultObj.get("result");
prop.put("result", (result == null) ? ("Successfully transferred " + index.size() + " words in " + ((System.currentTimeMillis() - starttime) / 1000) + " seconds") : result);
index = null;
}

@ -62,6 +62,7 @@
<td class="small" align="right"><tt>#[selection.twrange]#</tt></td>
<td class="small" rowspan="2" align="left">#[twEntityCount]# Entities (#[twEntityPercent]#%)<br>
#[twEntryCount]# Entries<br>
#[twPayloadSize]#<br>
#[twEntitySpeed]# entities/s</td>
<td class="small" rowspan="2">#(deleteIndex)#false::true#(/deleteIndex)#</td>
<td class="small" rowspan="2">#[peerName]#</td>

@ -55,6 +55,7 @@ import java.util.TreeMap;
import de.anomic.http.httpHeader;
import de.anomic.plasma.plasmaSwitchboard;
import de.anomic.server.serverMemory;
import de.anomic.server.serverObjects;
import de.anomic.server.serverSwitch;
import de.anomic.yacy.yacyCore;
@ -113,6 +114,7 @@ public final class IndexTransfer_p {
prop.put("running_twEntityCount",switchboard.transferIdxThread.getTransferedContainerCount());
prop.put("running_twEntryCount",switchboard.transferIdxThread.getTransferedEntryCount());
prop.put("running_twPayloadSize",serverMemory.bytesToString(switchboard.transferIdxThread.getTransferedBytes()));
prop.put("running_twEntityPercent",Float.toString(switchboard.transferIdxThread.getTransferedContainerPercent()));
prop.put("running_twEntitySpeed",Integer.toString(switchboard.transferIdxThread.getTransferedEntitySpeed()));

@ -52,6 +52,7 @@ public class plasmaDHTFlush extends Thread {
private boolean gzipBody4Transfer = false;
private int timeout4Transfer = 60000;
private int transferedEntryCount = 0;
private long transferedBytes = 0;
private int transferedContainerCount = 0;
private String status = "Running";
private String oldStartingPointHash = "------------", startPointHash = "------------";
@ -82,6 +83,7 @@ public class plasmaDHTFlush extends Thread {
public void stopIt(boolean wait) throws InterruptedException {
this.finished = true;
if (this.worker != null) this.worker.stopIt();
if (wait) this.join();
}
@ -109,6 +111,10 @@ public class plasmaDHTFlush extends Thread {
return this.transferedContainerCount;
}
public long getTransferedBytes() {
return this.transferedBytes;
}
public float getTransferedContainerPercent() {
long currentWordsDBSize = this.sb.wordIndex.size();
if (this.initialWordsDBSize == 0) return 100;
@ -205,11 +211,13 @@ public class plasmaDHTFlush extends Thread {
// calculationg the new transfer size
this.calculateNewChunkSize();
this.worker = null;
// counting transfered containers / entries
this.transferedEntryCount += oldDHTChunk.indexCount();
this.transferedContainerCount += oldDHTChunk.containerSize();
this.transferedBytes += this.worker.getPayloadSize();
this.worker = null;
// deleting transfered words from index
if (this.delete) {
@ -223,6 +231,7 @@ public class plasmaDHTFlush extends Thread {
// handover chunk to transfer worker
if ((newDHTChunk.containerSize() > 0) || (newDHTChunk.getStatus() == plasmaDHTChunk.chunkStatus_FILLED)) {
this.worker = new plasmaDHTTransfer(this.log, this.seed, newDHTChunk, this.gzipBody4Transfer, this.timeout4Transfer, 5);
this.worker.setTransferMode(plasmaDHTTransfer.TRANSFER_MODE_FLUSH);
this.worker.start();
}
}

@ -43,6 +43,9 @@
package de.anomic.plasma;
import java.lang.StringBuffer;
import java.util.HashMap;
import de.anomic.server.serverDate;
import de.anomic.server.logging.serverLog;
import de.anomic.yacy.yacyClient;
import de.anomic.yacy.yacyCore;
@ -50,6 +53,9 @@ import de.anomic.yacy.yacySeed;
public class plasmaDHTTransfer extends Thread {
public static final int TRANSFER_MODE_DISTRIBUTION = 0;
public static final int TRANSFER_MODE_FLUSH = 1;
// connection properties
private boolean gzipBody4Transfer = false;
private int timeout4Transfer = 60000;
@ -57,6 +63,7 @@ public class plasmaDHTTransfer extends Thread {
// status fields
private boolean stopped = false;
private long transferTime = 0;
private long payloadSize = 0;
private int transferStatus = plasmaDHTChunk.chunkStatus_UNDEFINED;
private String transferStatusMessage = "";
@ -68,9 +75,17 @@ public class plasmaDHTTransfer extends Thread {
// other fields
private int maxRetry;
private int transferMode = TRANSFER_MODE_DISTRIBUTION;
serverLog log;
public plasmaDHTTransfer(serverLog log, yacySeed destSeed, plasmaDHTChunk dhtChunk, boolean gzipBody, int timeout, int retries) {
public plasmaDHTTransfer(
serverLog log,
yacySeed destSeed,
plasmaDHTChunk dhtChunk,
boolean gzipBody,
int timeout,
int retries
) {
super(new ThreadGroup("TransferIndexThreadGroup"), "TransferIndexWorker_" + destSeed.getName());
this.log = log;
this.gzipBody4Transfer = gzipBody;
@ -80,6 +95,10 @@ public class plasmaDHTTransfer extends Thread {
this.seed = destSeed;
}
public void setTransferMode(int mode) {
this.transferMode = mode;
}
public void run() {
try {
this.uploadIndex();
@ -90,6 +109,8 @@ public class plasmaDHTTransfer extends Thread {
private boolean isAborted() {
if (this.stopped || Thread.currentThread().isInterrupted()) {
this.transferStatus = plasmaDHTChunk.chunkStatus_INTERRUPTED;
this.transferStatusMessage = "aborted";
return true;
}
return false;
@ -103,6 +124,10 @@ public class plasmaDHTTransfer extends Thread {
return this.transferTime;
}
public long getPayloadSize() {
return this.payloadSize;
}
public int getStatus() {
return this.transferStatus;
}
@ -129,39 +154,77 @@ public class plasmaDHTTransfer extends Thread {
// transfering seleted words to remote peer
this.transferStatusMessage = "Running: Transfering chunk to target " + this.seed.hash + "/" + this.seed.getName();
String error = yacyClient.transferIndex(this.seed, this.dhtChunk.containers(), this.dhtChunk.urlCacheMap(), this.gzipBody4Transfer, this.timeout4Transfer);
HashMap result = yacyClient.transferIndex(this.seed, this.dhtChunk.containers(), this.dhtChunk.urlCacheMap(), this.gzipBody4Transfer, this.timeout4Transfer);
String error = (String) result.get("result");
if (error == null) {
// words successfully transfered
this.transferTime = System.currentTimeMillis() - start;
this.log.logInfo("Index transfer of " + this.dhtChunk.indexCount() + " words [" + this.dhtChunk.firstContainer().wordHash() + " .. " + this.dhtChunk.lastContainer().wordHash() + "]" + " to peer " + this.seed.getName() + ":" + this.seed.hash + " in " + (this.transferTime / 1000) + " seconds successful ("
+ (1000 * this.dhtChunk.indexCount() / (this.transferTime + 1)) + " words/s)");
retryCount = 0;
this.payloadSize = ((Integer)result.get("payloadSize")).intValue();
this.log.logInfo("Index transfer of " + this.dhtChunk.indexCount() +
" words [" + this.dhtChunk.firstContainer().wordHash() + " .. " + this.dhtChunk.lastContainer().wordHash() + "]" +
" to peer " + this.seed.getName() + ":" + this.seed.hash +
" in " + (this.transferTime / 1000) +
" seconds successful (" + (1000 * this.dhtChunk.indexCount() / (this.transferTime + 1)) +
" words/s, " + this.payloadSize + " Bytes)");
// if the peer has set a pause time and we are in flush mode (index transfer)
// then we pause for a while now
if (this.transferMode == TRANSFER_MODE_FLUSH) {
long pause = getBusyTime(result);
if (pause != -1) {
this.transferStatusMessage = "Finished: Transfer of chunk to target " + this.seed.hash + "/" + this.seed.getName() + ". Pausing " + pause + " ms.";
this.pause(pause);
}
} else {
this.transferStatusMessage = "Finished: Transfer of chunk to target " + this.seed.hash + "/" + this.seed.getName();
}
// transfer of chunk finished
this.transferStatus = plasmaDHTChunk.chunkStatus_COMPLETE;
retryCount = 0;
break;
}
// inc retry counter
retryCount++;
// testing if we were aborted ...
if (this.isAborted()) return;
// we have lost the connection to the remote peer. Adding peer to disconnected list
StringBuffer failMessage = new StringBuffer("Index transfer to peer " + this.seed.getName() + ":" + this.seed.hash + " failed:'" + error + "'");
if (!error.equals("busy")) {
boolean reconnectNeeded = false;
long pauseTime = 1;
if (error.equals("busy")) {
// get pause time that was requested by the remote peer
pauseTime = getBusyTime(result);
if (pauseTime == -1) pauseTime = 60000;
this.transferStatusMessage = "Peer " + this.seed.getName() + ":" + this.seed.hash + " is busy. Waiting " + pauseTime + " ms.";
this.log.logInfo(this.transferStatusMessage);
} else {
this.transferStatusMessage = "Transfer to peer " + this.seed.getName() + ":" + this.seed.hash + " failed:'" + error + "', Trying to reconnect ...";
// force disconnection of peer
yacyCore.peerActions.peerDeparture(this.seed);
failMessage.append(", disconnected peer");
this.log.logWarning(this.transferStatusMessage);
// calculate pause time
pauseTime = retryCount * 10000;
reconnectNeeded = true;
}
this.log.logWarning(failMessage.toString());
// if the retry counter limit was not exceeded we'll retry it in a few seconds
this.transferStatusMessage = "Disconnected peer: " + ((retryCount > 5) ? error + ". Transfer aborted" : "Retry " + retryCount);
if (retryCount > this.maxRetry) {
this.transferStatusMessage = "Transfer aborted. Retry limit reached.";
this.transferStatus = plasmaDHTChunk.chunkStatus_FAILED;
return;
}
Thread.sleep(retryCount * 5000);
// sleep for a while
this.pause(pauseTime);
// reconnect to peer if needed
if (reconnectNeeded) {
/* loop until
* - we have successfully done a peer ping or
@ -179,7 +242,7 @@ public class plasmaDHTTransfer extends Thread {
retryCount++;
this.transferStatusMessage = "Disconnected peer: Peer ping failed. " + ((retryCount > 5) ? "Transfer aborted." : "Retry " + retryCount);
if (retryCount > this.maxRetry) return;
Thread.sleep(retryCount * 5000);
this.pause(retryCount * 10000);
continue;
}
@ -190,3 +253,26 @@ public class plasmaDHTTransfer extends Thread {
}
}
}
private long getBusyTime(HashMap result) {
int pause = -1;
HashMap transferRWIResult = (HashMap) result.get("resultTransferRWI");
if (transferRWIResult != null && transferRWIResult.containsKey("pause")) {
String pauseStr = (String) transferRWIResult.get("pause");
try { pause = Integer.valueOf(pauseStr).intValue(); } catch (NumberFormatException numEx){}
if (pause <= 0) pause = 60000;
else if (pause > 1800000) pause = 1800000;
}
return pause;
}
private void pause(long sleepTime) throws InterruptedException {
long sleepCounter = sleepTime / 1000;
long sleepRest = sleepTime % 1000;
while (!this.isAborted() && sleepCounter > 0) {
sleepCounter--;
Thread.sleep(1000);
}
if (sleepRest > 0) Thread.sleep(sleepRest);
}
}

@ -1503,13 +1503,13 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
words = condenser.RESULT_SIMI_WORDS;
// transfering the index to the storage peer
String error = yacyClient.transferIndex(
HashMap resultObj = yacyClient.transferIndex(
seed,
(indexTreeMapContainer[])tmpContainers.toArray(new indexTreeMapContainer[tmpContainers.size()]),
urlCache,
true,
120000);
String error = (String) resultObj.get("result");
if (error != null) {
words = wordIndex.addPageIndex(entry.url(), urlHash, docDate, (int) entry.size(),
document, condenser,

@ -878,7 +878,11 @@ public final class yacyClient {
}
}
public static String transferIndex(yacySeed targetSeed, indexContainer[] indexes, HashMap urlCache, boolean gzipBody, int timeout) {
public static HashMap transferIndex(yacySeed targetSeed, indexContainer[] indexes, HashMap urlCache, boolean gzipBody, int timeout) {
HashMap resultObj = new HashMap();
int payloadSize = 0;
try {
// check if we got all necessary urls in the urlCache (only for debugging)
Iterator eenum;
@ -895,22 +899,36 @@ public final class yacyClient {
// transfer the RWI without the URLs
HashMap in = transferRWI(targetSeed, indexes, gzipBody, timeout);
if (in == null) { return "no_connection_1"; }
resultObj.put("resultTransferRWI", in);
if (in == null) {
resultObj.put("result", "no_connection_1");
return resultObj;
}
if (in.containsKey("indexPayloadSize")) payloadSize += ((Integer)in.get("indexPayloadSize")).intValue();
String result = (String) in.get("result");
if (result == null) { return "no_result_1"; }
if (result == null) {
resultObj.put("result","no_result_1");
return resultObj;
}
if (!(result.equals("ok"))) {
targetSeed.setFlagAcceptRemoteIndex(false);
yacyCore.seedDB.update(targetSeed.hash, targetSeed);
return result;
resultObj.put("result",result);
return resultObj;
}
// in now contains a list of unknown hashes
final String uhss = (String) in.get("unknownURL");
if (uhss == null) { return "no_unknownURL_tag_in_response"; }
if (uhss.length() == 0) { return null; } // all url's known, we are ready here
if (uhss == null) {
resultObj.put("result","no_unknownURL_tag_in_response");
return resultObj;
}
if (uhss.length() == 0) { return resultObj; } // all url's known, we are ready here
final String[] uhs = uhss.split(",");
if (uhs.length == 0) { return null; } // all url's known
if (uhs.length == 0) { return resultObj; } // all url's known
// extract the urlCache from the result
plasmaCrawlLURL.Entry[] urls = new plasmaCrawlLURL.Entry[uhs.length];
@ -922,18 +940,32 @@ public final class yacyClient {
}
in = transferURL(targetSeed, urls, gzipBody, timeout);
if (in == null) { return "no_connection_2"; }
resultObj.put("resultTransferURL", in);
if (in == null) {
resultObj.put("result","no_connection_2");
return resultObj;
}
if (in.containsKey("urlPayloadSize")) payloadSize += ((Integer)in.get("urlPayloadSize")).intValue();
result = (String) in.get("result");
if (result == null) { return "no_result_2"; }
if (result == null) {
resultObj.put("result","no_result_2");
return resultObj;
}
if (!(result.equals("ok"))) {
targetSeed.setFlagAcceptRemoteIndex(false);
yacyCore.seedDB.update(targetSeed.hash, targetSeed);
return result;
resultObj.put("result",result);
return resultObj;
}
// int doubleentries = Integer.parseInt((String) in.get("double"));
// System.out.println("DEBUG tansferIndex: transferred " + uhs.length + " URL's, double=" + doubleentries);
return null;
return resultObj;
} finally {
resultObj.put("payloadSize", new Integer(payloadSize));
}
}
private static HashMap transferRWI(yacySeed targetSeed, indexContainer[] indexes, boolean gzipBody, int timeout) {
@ -1001,6 +1033,8 @@ public final class yacyClient {
}
final HashMap result = nxTools.table(v);
// return the transfered index data in bytes (for debugging only)
result.put("indexPayloadSize", new Integer(entrypost.length()));
return result;
} catch (Exception e) {
yacyCore.log.logSevere("yacyClient.transferRWI error:" + e.getMessage());
@ -1032,11 +1066,13 @@ public final class yacyClient {
post.put("youare", targetSeed.hash);
String resource = "";
int urlc = 0;
int urlPayloadSize = 0;
for (int i = 0; i < urls.length; i++) {
if (urls[i] != null) {
resource = urls[i].toString();
if (resource != null) {
post.put("url" + urlc, resource);
urlPayloadSize += resource.length();
urlc++;
}
}
@ -1057,7 +1093,11 @@ public final class yacyClient {
if (v != null) {
yacyCore.seedDB.mySeed.incSU(urlc);
}
return nxTools.table(v);
HashMap result = nxTools.table(v);
// return the transfered url data in bytes (for debugging only)
result.put("urlPayloadSize", new Integer(urlPayloadSize));
return result;
} catch (Exception e) {
yacyCore.log.logSevere("yacyClient.transferRWI error:" + e.getMessage());
return null;

Loading…
Cancel
Save