- reject less transmissions as transmission receiver

- do not flag too much receiver when something goes wrong during transmission as sender

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@5616 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
orbiter 16 years ago
parent f887fc159f
commit 94c42691d8

@ -724,17 +724,8 @@ indexTransfer.maxOpenFiles = 800
indexDistribution.minChunkSize = 10
indexDistribution.maxChunkSize = 1000
indexDistribution.startChunkSize = 200
indexDistribution.maxChunkFails = 1
# defines if the peer should reject incoming index transfer
# request if a given limit is reached
indexDistribution.transferRWIReceiptLimitEnabled = true
# defines the word cache for DHT reaches this level
# the peer reports itself as busy
indexDistribution.dhtReceiptLimit = 10000
# Distribution of Citation-Reference (CR-) files
# The distribution is done in two steps:
# first step to anonymize the records

@ -199,7 +199,6 @@ public class IndexControlRWIs_p {
// prepare index
indexContainer index;
String result;
final long starttime = System.currentTimeMillis();
index = sb.webIndex.getContainer(keyhash, null);
// built urlCache
@ -226,14 +225,13 @@ public class IndexControlRWIs_p {
// transport to other peer
final String gzipBody = sb.getConfig("indexControl.gzipBody","false");
final int timeout = (int) sb.getConfigLong("indexControl.timeout",60000);
final HashMap<String, Object> resultObj = yacyClient.transferIndex(
final String error = yacyClient.transferIndex(
seed,
icc,
knownURLs,
"true".equalsIgnoreCase(gzipBody),
timeout);
result = (String) resultObj.get("result");
prop.put("result", (result == null) ? ("Successfully transferred " + knownURLs.size() + " words in " + ((System.currentTimeMillis() - starttime) / 1000) + " seconds, " + unknownURLEntries + " URL not found") : result);
prop.put("result", (error == null) ? ("Successfully transferred " + knownURLs.size() + " words in " + ((System.currentTimeMillis() - starttime) / 1000) + " seconds, " + unknownURLEntries + " URL not found") : "error: " + error);
index = null;
}

@ -230,7 +230,6 @@ public class PerformanceQueues_p {
final int wordCacheMaxCount = post.getInt("wordCacheMaxCount", 20000);
switchboard.setConfig(plasmaSwitchboardConstants.WORDCACHE_MAX_COUNT, Integer.toString(wordCacheMaxCount));
switchboard.webIndex.setMaxWordCount(wordCacheMaxCount);
switchboard.setConfig(plasmaSwitchboardConstants.INDEX_DIST_DHT_RECEIPT_LIMIT, wordCacheMaxCount);
final int wordCacheInitCount = post.getInt(plasmaSwitchboardConstants.WORDCACHE_INIT_COUNT, 30000);
switchboard.setConfig(plasmaSwitchboardConstants.WORDCACHE_INIT_COUNT, Integer.toString(wordCacheInitCount));

@ -82,15 +82,14 @@ public final class transferRWI {
byte[] indexes = post.get("indexes", "").getBytes(); // the indexes, as list of word entries
boolean granted = sb.getConfig("allowReceiveIndex", "false").equals("true");
final boolean blockBlacklist = sb.getConfig("indexReceiveBlockBlacklist", "false").equals("true");
final boolean checkLimit = sb.getConfigBool("indexDistribution.transferRWIReceiptLimitEnabled", true);
final long cachelimit = sb.getConfigLong(plasmaSwitchboardConstants.INDEX_DIST_DHT_RECEIPT_LIMIT, 10000);
final long cachelimit = sb.getConfigLong(plasmaSwitchboardConstants.WORDCACHE_MAX_COUNT, 100000);
final yacySeed otherPeer = sb.webIndex.seedDB.get(iam);
final String otherPeerName = iam + ":" + ((otherPeer == null) ? "NULL" : (otherPeer.getName() + "/" + otherPeer.getVersion()));
// response values
int pause = 0;
String result = "ok";
final StringBuilder unknownURLs = new StringBuilder();
int pause = 10000;
if ((youare == null) || (!youare.equals(sb.webIndex.seedDB.mySeed().hash))) {
sb.getLog().logInfo("Rejecting RWIs from peer " + otherPeerName + ". Wrong target. Wanted peer=" + youare + ", iam=" + sb.webIndex.seedDB.mySeed().hash);
@ -101,19 +100,13 @@ public final class transferRWI {
sb.getLog().logInfo("Rejecting RWIs from peer " + otherPeerName + ". Not granted.");
result = "not_granted";
pause = 0;
} else if (checkLimit && sb.webIndex.indexCacheSize() > cachelimit) {
} else if (sb.webIndex.indexCacheSize() > cachelimit) {
// we are too busy to receive indexes
sb.getLog().logInfo("Rejecting RWIs from peer " + otherPeerName + ". We are too busy (buffersize=" + sb.webIndex.indexCacheSize() + ").");
granted = false; // don't accept more words if there are too many words to flush
result = "busy";
pause = 60000;
} /* else if ((checkLimit && sb.wordIndex.dhtOutCacheSize() > sb.getConfigLong(plasmaSwitchboard.WORDCACHE_MAX_COUNT, 20000)) || ((sb.wordIndex.busyCacheFlush) && (!shortCacheFlush))) {
// we are too busy flushing the ramCache to receive indexes
sb.getLog().logInfo("Rejecting RWIs from peer " + otherPeerName + ". We are too busy (wordcachesize=" + sb.wordIndex.dhtOutCacheSize() + ").");
granted = false; // don't accept more words if there are too many words to flush
result = "busy";
pause = 300000;
} */ else {
} else {
// we want and can receive indexes
// log value status (currently added to find outOfMemory error
if (sb.getLog().isFine()) sb.getLog().logFine("Processing " + indexes.length + " bytes / " + wordc + " words / " + entryc + " entries from " + otherPeerName);
@ -186,7 +179,7 @@ public final class transferRWI {
// finally compose the unknownURL hash list
final Iterator<String> it = unknownURL.iterator();
unknownURLs.ensureCapacity(unknownURL.size()*13);
unknownURLs.ensureCapacity(unknownURL.size() * 25);
while (it.hasNext()) {
unknownURLs.append(",").append(it.next());
}
@ -200,9 +193,7 @@ public final class transferRWI {
}
result = "ok";
if (checkLimit) {
pause = (sb.webIndex.indexCacheSize() < 500) ? 0 : sb.webIndex.indexCacheSize(); // estimation of necessary pause time
}
pause = (int) (sb.webIndex.indexCacheSize() * 20000 / sb.getConfigLong(plasmaSwitchboardConstants.WORDCACHE_MAX_COUNT, 100000)); // estimation of necessary pause time
}
prop.put("unknownURL", unknownURLs.toString());

@ -213,12 +213,6 @@ public final class plasmaSwitchboardConstants {
* <p>Name of the setting how much memory in bytes should be assigned to the Blog DB for caching purposes</p>
*/
public static final String RAM_CACHE_BLOG_TIME = "ramCacheBlog_time";
/**
* <p><code>public static final String <strong>INDEX_DIST_DHT_RECEIPT_LIMIT</strong> = "indexDistribution.dhtReceiptLimit"</code></p>
* <p>Name of the setting how many words the DHT-In cache may contain maximal before new DHT receipts
* will be rejected</p>
*/
public static final String INDEX_DIST_DHT_RECEIPT_LIMIT = "indexDistribution.dhtReceiptLimit";
/**
* <p><code>public static final String <strong>INDEX_DIST_CHUNK_SIZE_START</strong> = "indexDistribution.startChunkSize"</code></p>
* <p>Name of the setting specifying how many words the very first chunk will contain when the DHT-thread starts</p>

@ -196,12 +196,10 @@ public class Transmission {
}
log.logInfo("starting new index transmission request to " + this.primaryTarget);
long start = System.currentTimeMillis();
final HashMap<String, Object> ohm = yacyClient.transferIndex(target, this.containers, this.references, gzipBody4Transfer, timeout4Transfer);
final String result = (String) ohm.get("result");
if (result == null) {
final String error = yacyClient.transferIndex(target, this.containers, this.references, gzipBody4Transfer, timeout4Transfer);
if (error == null) {
// words successfully transfered
long transferTime = System.currentTimeMillis() - start;
int payloadSize = ((Integer) ohm.get("payloadSize")).intValue();
Iterator<indexContainer> i = this.containers.iterator();
indexContainer firstContainer = (i == null) ? null : i.next();
log.logInfo("Index transfer of " + this.containers.size() +
@ -210,7 +208,7 @@ public class Transmission {
" to peer " + target.getName() + ":" + target.hash +
" in " + (transferTime / 1000) +
" seconds successful (" + (1000 * this.containers.size() / (transferTime + 1)) +
" words/s, " + payloadSize + " Bytes)");
" words/s)");
// if the peer has set a pause time and we are in flush mode (index transfer)
// then we pause for a while now
@ -222,7 +220,7 @@ public class Transmission {
// write information that peer does not receive index transmissions
target.setFlagAcceptRemoteIndex(false);
seeds.update(target.hash, target);
log.logInfo("Transfer failed of chunk to target " + target.hash + "/" + target.getName());
log.logInfo("Transfer failed of chunk to target " + target.hash + "/" + target.getName() + ": " + error);
return false;
}

@ -844,7 +844,17 @@ public final class yacyClient {
}
}
public static HashMap<String, Object> transferIndex(
/**
* transfer the index. If the transmission fails, return a string describing the cause.
* If everything is ok, return null.
* @param targetSeed
* @param indexes
* @param urlCache
* @param gzipBody
* @param timeout
* @return
*/
public static String transferIndex(
final yacySeed targetSeed,
final indexContainerCache indexes,
final HashMap<String, indexURLReference> urlCache,
@ -873,32 +883,30 @@ public final class yacyClient {
resultObj.put("resultTransferRWI", in);
if (in == null) {
resultObj.put("result", "no_connection_1");
return resultObj;
return "no connection from transferRWI";
}
if (in.containsKey("indexPayloadSize")) payloadSize += Integer.parseInt(in.get("indexPayloadSize"));
String result = in.get("result");
if (result == null) {
resultObj.put("result", "no_result_1");
return resultObj;
return "no result from transferRWI";
}
if (!(result.equals("ok"))) {
targetSeed.setFlagAcceptRemoteIndex(false);
resultObj.put("result", result);
return resultObj;
return result;
}
// in now contains a list of unknown hashes
final String uhss = in.get("unknownURL");
String uhss = in.get("unknownURL");
if (uhss == null) {
resultObj.put("result","no_unknownURL_tag_in_response");
return resultObj;
return "no unknownURL tag in response";
}
if (uhss.length() == 0) { return resultObj; } // all url's known, we are ready here
uhss = uhss.trim();
if (uhss.length() == 0 || uhss.equals(",")) { return null; } // all url's known, we are ready here
final String[] uhs = uhss.split(",");
if (uhs.length == 0) { return resultObj; } // all url's known
if (uhs.length == 0) { return null; } // all url's known
// extract the urlCache from the result
final indexURLReference[] urls = new indexURLReference[uhs.length];
@ -913,25 +921,21 @@ public final class yacyClient {
resultObj.put("resultTransferURL", in);
if (in == null) {
resultObj.put("result","no_connection_2");
return resultObj;
return "no connection from transferURL";
}
if (in.containsKey("urlPayloadSize")) payloadSize += Integer.parseInt(in.get("urlPayloadSize"));
result = in.get("result");
if (result == null) {
resultObj.put("result","no_result_2");
return resultObj;
return "no result from transferURL";
}
if (!(result.equals("ok"))) {
targetSeed.setFlagAcceptRemoteIndex(false);
resultObj.put("result",result);
return resultObj;
return result;
}
// int doubleentries = Integer.parseInt((String) in.get("double"));
// System.out.println("DEBUG tansferIndex: transferred " + uhs.length + " URL's, double=" + doubleentries);
return resultObj;
return null;
} finally {
resultObj.put("payloadSize", Integer.valueOf(payloadSize));
}

Loading…
Cancel
Save