From 94c42691d896705b37d82241662f4f9935c162ff Mon Sep 17 00:00:00 2001 From: orbiter Date: Mon, 16 Feb 2009 21:28:48 +0000 Subject: [PATCH] - reject less transmissions as transmission receiver - do not flag too much receiver when something goes wrong during transmission as sender git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@5616 6c8d7289-2bf4-0310-a012-ef5d649a1542 --- defaults/yacy.init | 9 ---- htroot/IndexControlRWIs_p.java | 6 +-- htroot/PerformanceQueues_p.java | 1 - htroot/yacy/transferRWI.java | 23 +++----- .../plasma/plasmaSwitchboardConstants.java | 6 --- source/de/anomic/yacy/dht/Transmission.java | 12 ++--- source/de/anomic/yacy/yacyClient.java | 54 ++++++++++--------- 7 files changed, 43 insertions(+), 68 deletions(-) diff --git a/defaults/yacy.init b/defaults/yacy.init index b6c202355..c29827ef1 100644 --- a/defaults/yacy.init +++ b/defaults/yacy.init @@ -724,17 +724,8 @@ indexTransfer.maxOpenFiles = 800 indexDistribution.minChunkSize = 10 indexDistribution.maxChunkSize = 1000 indexDistribution.startChunkSize = 200 - indexDistribution.maxChunkFails = 1 -# defines if the peer should reject incoming index transfer -# request if a given limit is reached -indexDistribution.transferRWIReceiptLimitEnabled = true - -# defines the word cache for DHT reaches this level -# the peer reports itself as busy -indexDistribution.dhtReceiptLimit = 10000 - # Distribution of Citation-Reference (CR-) files # The distribution is done in two steps: # first step to anonymize the records diff --git a/htroot/IndexControlRWIs_p.java b/htroot/IndexControlRWIs_p.java index 280070b65..b0039fb3a 100644 --- a/htroot/IndexControlRWIs_p.java +++ b/htroot/IndexControlRWIs_p.java @@ -199,7 +199,6 @@ public class IndexControlRWIs_p { // prepare index indexContainer index; - String result; final long starttime = System.currentTimeMillis(); index = sb.webIndex.getContainer(keyhash, null); // built urlCache @@ -226,14 +225,13 @@ public class IndexControlRWIs_p { // transport to other peer final String gzipBody = sb.getConfig("indexControl.gzipBody","false"); final int timeout = (int) sb.getConfigLong("indexControl.timeout",60000); - final HashMap resultObj = yacyClient.transferIndex( + final String error = yacyClient.transferIndex( seed, icc, knownURLs, "true".equalsIgnoreCase(gzipBody), timeout); - result = (String) resultObj.get("result"); - prop.put("result", (result == null) ? ("Successfully transferred " + knownURLs.size() + " words in " + ((System.currentTimeMillis() - starttime) / 1000) + " seconds, " + unknownURLEntries + " URL not found") : result); + prop.put("result", (error == null) ? ("Successfully transferred " + knownURLs.size() + " words in " + ((System.currentTimeMillis() - starttime) / 1000) + " seconds, " + unknownURLEntries + " URL not found") : "error: " + error); index = null; } diff --git a/htroot/PerformanceQueues_p.java b/htroot/PerformanceQueues_p.java index 971ab4218..c84e40ef5 100644 --- a/htroot/PerformanceQueues_p.java +++ b/htroot/PerformanceQueues_p.java @@ -230,7 +230,6 @@ public class PerformanceQueues_p { final int wordCacheMaxCount = post.getInt("wordCacheMaxCount", 20000); switchboard.setConfig(plasmaSwitchboardConstants.WORDCACHE_MAX_COUNT, Integer.toString(wordCacheMaxCount)); switchboard.webIndex.setMaxWordCount(wordCacheMaxCount); - switchboard.setConfig(plasmaSwitchboardConstants.INDEX_DIST_DHT_RECEIPT_LIMIT, wordCacheMaxCount); final int wordCacheInitCount = post.getInt(plasmaSwitchboardConstants.WORDCACHE_INIT_COUNT, 30000); switchboard.setConfig(plasmaSwitchboardConstants.WORDCACHE_INIT_COUNT, Integer.toString(wordCacheInitCount)); diff --git a/htroot/yacy/transferRWI.java b/htroot/yacy/transferRWI.java index 25b377e63..f3106da6e 100644 --- a/htroot/yacy/transferRWI.java +++ b/htroot/yacy/transferRWI.java @@ -82,15 +82,14 @@ public final class transferRWI { byte[] indexes = post.get("indexes", "").getBytes(); // the indexes, as list of word entries boolean granted = sb.getConfig("allowReceiveIndex", "false").equals("true"); final boolean blockBlacklist = sb.getConfig("indexReceiveBlockBlacklist", "false").equals("true"); - final boolean checkLimit = sb.getConfigBool("indexDistribution.transferRWIReceiptLimitEnabled", true); - final long cachelimit = sb.getConfigLong(plasmaSwitchboardConstants.INDEX_DIST_DHT_RECEIPT_LIMIT, 10000); + final long cachelimit = sb.getConfigLong(plasmaSwitchboardConstants.WORDCACHE_MAX_COUNT, 100000); final yacySeed otherPeer = sb.webIndex.seedDB.get(iam); final String otherPeerName = iam + ":" + ((otherPeer == null) ? "NULL" : (otherPeer.getName() + "/" + otherPeer.getVersion())); // response values - String result = "ok"; + int pause = 0; + String result = "ok"; final StringBuilder unknownURLs = new StringBuilder(); - int pause = 10000; if ((youare == null) || (!youare.equals(sb.webIndex.seedDB.mySeed().hash))) { sb.getLog().logInfo("Rejecting RWIs from peer " + otherPeerName + ". Wrong target. Wanted peer=" + youare + ", iam=" + sb.webIndex.seedDB.mySeed().hash); @@ -101,19 +100,13 @@ public final class transferRWI { sb.getLog().logInfo("Rejecting RWIs from peer " + otherPeerName + ". Not granted."); result = "not_granted"; pause = 0; - } else if (checkLimit && sb.webIndex.indexCacheSize() > cachelimit) { + } else if (sb.webIndex.indexCacheSize() > cachelimit) { // we are too busy to receive indexes sb.getLog().logInfo("Rejecting RWIs from peer " + otherPeerName + ". We are too busy (buffersize=" + sb.webIndex.indexCacheSize() + ")."); granted = false; // don't accept more words if there are too many words to flush result = "busy"; pause = 60000; - } /* else if ((checkLimit && sb.wordIndex.dhtOutCacheSize() > sb.getConfigLong(plasmaSwitchboard.WORDCACHE_MAX_COUNT, 20000)) || ((sb.wordIndex.busyCacheFlush) && (!shortCacheFlush))) { - // we are too busy flushing the ramCache to receive indexes - sb.getLog().logInfo("Rejecting RWIs from peer " + otherPeerName + ". We are too busy (wordcachesize=" + sb.wordIndex.dhtOutCacheSize() + ")."); - granted = false; // don't accept more words if there are too many words to flush - result = "busy"; - pause = 300000; - } */ else { + } else { // we want and can receive indexes // log value status (currently added to find outOfMemory error if (sb.getLog().isFine()) sb.getLog().logFine("Processing " + indexes.length + " bytes / " + wordc + " words / " + entryc + " entries from " + otherPeerName); @@ -186,7 +179,7 @@ public final class transferRWI { // finally compose the unknownURL hash list final Iterator it = unknownURL.iterator(); - unknownURLs.ensureCapacity(unknownURL.size()*13); + unknownURLs.ensureCapacity(unknownURL.size() * 25); while (it.hasNext()) { unknownURLs.append(",").append(it.next()); } @@ -200,9 +193,7 @@ public final class transferRWI { } result = "ok"; - if (checkLimit) { - pause = (sb.webIndex.indexCacheSize() < 500) ? 0 : sb.webIndex.indexCacheSize(); // estimation of necessary pause time - } + pause = (int) (sb.webIndex.indexCacheSize() * 20000 / sb.getConfigLong(plasmaSwitchboardConstants.WORDCACHE_MAX_COUNT, 100000)); // estimation of necessary pause time } prop.put("unknownURL", unknownURLs.toString()); diff --git a/source/de/anomic/plasma/plasmaSwitchboardConstants.java b/source/de/anomic/plasma/plasmaSwitchboardConstants.java index a804e8962..6de249537 100644 --- a/source/de/anomic/plasma/plasmaSwitchboardConstants.java +++ b/source/de/anomic/plasma/plasmaSwitchboardConstants.java @@ -213,12 +213,6 @@ public final class plasmaSwitchboardConstants { *

Name of the setting how much memory in bytes should be assigned to the Blog DB for caching purposes

*/ public static final String RAM_CACHE_BLOG_TIME = "ramCacheBlog_time"; - /** - *

public static final String INDEX_DIST_DHT_RECEIPT_LIMIT = "indexDistribution.dhtReceiptLimit"

- *

Name of the setting how many words the DHT-In cache may contain maximal before new DHT receipts - * will be rejected

- */ - public static final String INDEX_DIST_DHT_RECEIPT_LIMIT = "indexDistribution.dhtReceiptLimit"; /** *

public static final String INDEX_DIST_CHUNK_SIZE_START = "indexDistribution.startChunkSize"

*

Name of the setting specifying how many words the very first chunk will contain when the DHT-thread starts

diff --git a/source/de/anomic/yacy/dht/Transmission.java b/source/de/anomic/yacy/dht/Transmission.java index 359e3f959..7817057a7 100644 --- a/source/de/anomic/yacy/dht/Transmission.java +++ b/source/de/anomic/yacy/dht/Transmission.java @@ -196,12 +196,10 @@ public class Transmission { } log.logInfo("starting new index transmission request to " + this.primaryTarget); long start = System.currentTimeMillis(); - final HashMap ohm = yacyClient.transferIndex(target, this.containers, this.references, gzipBody4Transfer, timeout4Transfer); - final String result = (String) ohm.get("result"); - if (result == null) { + final String error = yacyClient.transferIndex(target, this.containers, this.references, gzipBody4Transfer, timeout4Transfer); + if (error == null) { // words successfully transfered - long transferTime = System.currentTimeMillis() - start; - int payloadSize = ((Integer) ohm.get("payloadSize")).intValue(); + long transferTime = System.currentTimeMillis() - start; Iterator i = this.containers.iterator(); indexContainer firstContainer = (i == null) ? null : i.next(); log.logInfo("Index transfer of " + this.containers.size() + @@ -210,7 +208,7 @@ public class Transmission { " to peer " + target.getName() + ":" + target.hash + " in " + (transferTime / 1000) + " seconds successful (" + (1000 * this.containers.size() / (transferTime + 1)) + - " words/s, " + payloadSize + " Bytes)"); + " words/s)"); // if the peer has set a pause time and we are in flush mode (index transfer) // then we pause for a while now @@ -222,7 +220,7 @@ public class Transmission { // write information that peer does not receive index transmissions target.setFlagAcceptRemoteIndex(false); seeds.update(target.hash, target); - log.logInfo("Transfer failed of chunk to target " + target.hash + "/" + target.getName()); + log.logInfo("Transfer failed of chunk to target " + target.hash + "/" + target.getName() + ": " + error); return false; } diff --git a/source/de/anomic/yacy/yacyClient.java b/source/de/anomic/yacy/yacyClient.java index 300c0f0e2..e35a06e84 100644 --- a/source/de/anomic/yacy/yacyClient.java +++ b/source/de/anomic/yacy/yacyClient.java @@ -844,7 +844,17 @@ public final class yacyClient { } } - public static HashMap transferIndex( + /** + * transfer the index. If the transmission fails, return a string describing the cause. + * If everything is ok, return null. + * @param targetSeed + * @param indexes + * @param urlCache + * @param gzipBody + * @param timeout + * @return + */ + public static String transferIndex( final yacySeed targetSeed, final indexContainerCache indexes, final HashMap urlCache, @@ -873,32 +883,30 @@ public final class yacyClient { resultObj.put("resultTransferRWI", in); if (in == null) { - resultObj.put("result", "no_connection_1"); - return resultObj; - } + return "no connection from transferRWI"; + } + if (in.containsKey("indexPayloadSize")) payloadSize += Integer.parseInt(in.get("indexPayloadSize")); String result = in.get("result"); - if (result == null) { - resultObj.put("result", "no_result_1"); - return resultObj; + if (result == null) { + return "no result from transferRWI"; } + if (!(result.equals("ok"))) { - targetSeed.setFlagAcceptRemoteIndex(false); - resultObj.put("result", result); - return resultObj; + return result; } // in now contains a list of unknown hashes - final String uhss = in.get("unknownURL"); + String uhss = in.get("unknownURL"); if (uhss == null) { - resultObj.put("result","no_unknownURL_tag_in_response"); - return resultObj; + return "no unknownURL tag in response"; } - if (uhss.length() == 0) { return resultObj; } // all url's known, we are ready here + uhss = uhss.trim(); + if (uhss.length() == 0 || uhss.equals(",")) { return null; } // all url's known, we are ready here final String[] uhs = uhss.split(","); - if (uhs.length == 0) { return resultObj; } // all url's known + if (uhs.length == 0) { return null; } // all url's known // extract the urlCache from the result final indexURLReference[] urls = new indexURLReference[uhs.length]; @@ -913,25 +921,21 @@ public final class yacyClient { resultObj.put("resultTransferURL", in); if (in == null) { - resultObj.put("result","no_connection_2"); - return resultObj; + return "no connection from transferURL"; } + if (in.containsKey("urlPayloadSize")) payloadSize += Integer.parseInt(in.get("urlPayloadSize")); result = in.get("result"); if (result == null) { - resultObj.put("result","no_result_2"); - return resultObj; + return "no result from transferURL"; } + if (!(result.equals("ok"))) { - targetSeed.setFlagAcceptRemoteIndex(false); - resultObj.put("result",result); - return resultObj; + return result; } - // int doubleentries = Integer.parseInt((String) in.get("double")); - // System.out.println("DEBUG tansferIndex: transferred " + uhs.length + " URL's, double=" + doubleentries); - return resultObj; + return null; } finally { resultObj.put("payloadSize", Integer.valueOf(payloadSize)); }