From c3e7efa84688122c7a4d0b1da94cc05f1e57bbe5 Mon Sep 17 00:00:00 2001 From: hermens Date: Mon, 10 Oct 2011 14:35:03 +0000 Subject: [PATCH] added sender side prevention of rwi flooding as mentioned in SVN 7993 saves memory and speeds up enqueueContainers by limiting the size of transfer.Chunk saves network bandwidth by not transmitting RWIs that would get discarded at the target anyway git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@7995 6c8d7289-2bf4-0310-a012-ef5d649a1542 --- source/net/yacy/peers/dht/Transmission.java | 56 +++++++++++++++++++-- 1 file changed, 53 insertions(+), 3 deletions(-) diff --git a/source/net/yacy/peers/dht/Transmission.java b/source/net/yacy/peers/dht/Transmission.java index 57c62e95d..aebf79e0d 100644 --- a/source/net/yacy/peers/dht/Transmission.java +++ b/source/net/yacy/peers/dht/Transmission.java @@ -26,6 +26,7 @@ package net.yacy.peers.dht; import java.util.ArrayList; import java.util.Iterator; +import java.util.Random; import java.util.TreeMap; import net.yacy.cora.document.ASCII; @@ -50,6 +51,10 @@ import java.util.SortedMap; public class Transmission { + // The number of RWIs we can be sure a remote peer will accept + // anything beyond that might get discarded without notice + public static final int maxRWIsCount = 1000; // since SVN 7993 hardcoded in htroot/yacy/transferRWI.java:161 + protected Log log; protected Segment segment; protected SeedDB seeds; @@ -109,6 +114,39 @@ public class Transmission { this.miss = 0; } + /* + * return a new container with at most max elements and put the rest back to the index + * as this chunk might be transferred back to myself a random selection needs to be taken + * @param container + * @param max + * @throws RowSpaceExceededException + * @return + */ + private ReferenceContainer trimContainer(ReferenceContainer container, final int max) throws RowSpaceExceededException { + final ReferenceContainer c = new ReferenceContainer(Segment.wordReferenceFactory, container.getTermHash(), max); + final int part = container.size() / max + 1; + final Random r = new Random(); + WordReference w; + List selected = new ArrayList(); + final Iterator i = container.entries(); + while ((i.hasNext()) && (c.size() < max)) { + w = i.next(); + if (r.nextInt(part) == 0) { + c.add(w); + selected.add(w.urlhash()); + } + } + // remove the selected entries from container + for (final byte[] b : selected) container.removeReference(b); + // put container back + try { + segment.termIndex().add(container); + } catch (Exception e) { + Log.logException(e); + } + return c; + } + /** * add a container to the Entry cache. * all entries in the container are checked and only such are stored which have a reference entry @@ -116,8 +154,20 @@ public class Transmission { * @throws RowSpaceExceededException */ public void add(ReferenceContainer container) throws RowSpaceExceededException { + int remaining = maxRWIsCount; + for (ReferenceContainer ic : this) remaining -= ic.size(); + if (remaining <= 0) { + // No space left in this chunk + try { + segment.termIndex().add(container); + } catch (Exception e) { + Log.logException(e); + } + return; + } + final ReferenceContainer c = (remaining >= container.size()) ? container : trimContainer(container, remaining); // iterate through the entries in the container and check if the reference is in the repository - Iterator i = container.entries(); + Iterator i = c.entries(); List notFoundx = new ArrayList(); while (i.hasNext()) { WordReference e = i.next(); @@ -135,9 +185,9 @@ public class Transmission { } } // now delete all references that were not found - for (final byte[] b : notFoundx) container.removeReference(b); + for (final byte[] b : notFoundx) c.removeReference(b); // finally add the remaining container to the cache - containers.add(container); + containers.add(c); } /**