added sender side prevention of rwi flooding as mentioned in SVN 7993

saves memory and speeds up enqueueContainers by limiting the size of transfer.Chunk
saves network bandwidth by not transmitting RWIs that would get discarded at the target anyway


git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@7995 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
hermens 13 years ago
parent 5af9598bd1
commit c3e7efa846

@ -26,6 +26,7 @@ package net.yacy.peers.dht;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Random;
import java.util.TreeMap;
import net.yacy.cora.document.ASCII;
@ -50,6 +51,10 @@ import java.util.SortedMap;
public class Transmission {
// The number of RWIs we can be sure a remote peer will accept
// anything beyond that might get discarded without notice
public static final int maxRWIsCount = 1000; // since SVN 7993 hardcoded in htroot/yacy/transferRWI.java:161
protected Log log;
protected Segment segment;
protected SeedDB seeds;
@ -109,6 +114,39 @@ public class Transmission {
this.miss = 0;
}
/*
* return a new container with at most max elements and put the rest back to the index
* as this chunk might be transferred back to myself a random selection needs to be taken
* @param container
* @param max
* @throws RowSpaceExceededException
* @return
*/
private ReferenceContainer<WordReference> trimContainer(ReferenceContainer<WordReference> container, final int max) throws RowSpaceExceededException {
final ReferenceContainer<WordReference> c = new ReferenceContainer<WordReference>(Segment.wordReferenceFactory, container.getTermHash(), max);
final int part = container.size() / max + 1;
final Random r = new Random();
WordReference w;
List<byte[]> selected = new ArrayList<byte[]>();
final Iterator<WordReference> i = container.entries();
while ((i.hasNext()) && (c.size() < max)) {
w = i.next();
if (r.nextInt(part) == 0) {
c.add(w);
selected.add(w.urlhash());
}
}
// remove the selected entries from container
for (final byte[] b : selected) container.removeReference(b);
// put container back
try {
segment.termIndex().add(container);
} catch (Exception e) {
Log.logException(e);
}
return c;
}
/**
* add a container to the Entry cache.
* all entries in the container are checked and only such are stored which have a reference entry
@ -116,8 +154,20 @@ public class Transmission {
* @throws RowSpaceExceededException
*/
public void add(ReferenceContainer<WordReference> container) throws RowSpaceExceededException {
int remaining = maxRWIsCount;
for (ReferenceContainer ic : this) remaining -= ic.size();
if (remaining <= 0) {
// No space left in this chunk
try {
segment.termIndex().add(container);
} catch (Exception e) {
Log.logException(e);
}
return;
}
final ReferenceContainer<WordReference> c = (remaining >= container.size()) ? container : trimContainer(container, remaining);
// iterate through the entries in the container and check if the reference is in the repository
Iterator<WordReference> i = container.entries();
Iterator<WordReference> i = c.entries();
List<byte[]> notFoundx = new ArrayList<byte[]>();
while (i.hasNext()) {
WordReference e = i.next();
@ -135,9 +185,9 @@ public class Transmission {
}
}
// now delete all references that were not found
for (final byte[] b : notFoundx) container.removeReference(b);
for (final byte[] b : notFoundx) c.removeReference(b);
// finally add the remaining container to the cache
containers.add(container);
containers.add(c);
}
/**

Loading…
Cancel
Save