From 51bf5c85b06142df765f2ba2043a21f9e46f60b0 Mon Sep 17 00:00:00 2001 From: orbiter Date: Fri, 11 Jul 2014 04:04:09 +0200 Subject: [PATCH] Renamed the transmission cloud to buffer in dispatcher since the name 'cloud' was a bad idea. Changed also the accumulation process for peer targets so that every dht chunk is not assigned the set of redundant targets but they are assigned to redundant targets individually. This enhances the granularity of the target accumulation and should enhance the efficiency of the process. Finally the dht protocol client was enriched with the ability to remove the 'accept remote index' flag from peers or remove peers completely if they do not answer at all. --- htroot/IndexControlRWIs_p.java | 2 +- source/net/yacy/peers/DHTSelection.java | 22 +++- source/net/yacy/peers/Dispatcher.java | 167 ++++++++---------------- source/net/yacy/peers/Protocol.java | 17 ++- source/net/yacy/peers/Transmission.java | 61 +++------ source/net/yacy/search/Switchboard.java | 12 +- 6 files changed, 112 insertions(+), 169 deletions(-) diff --git a/htroot/IndexControlRWIs_p.java b/htroot/IndexControlRWIs_p.java index 6ceaa4de1..2086dc937 100644 --- a/htroot/IndexControlRWIs_p.java +++ b/htroot/IndexControlRWIs_p.java @@ -315,7 +315,7 @@ public class IndexControlRWIs_p { // transport to other peer final boolean gzipBody = sb.getConfigBool("indexControl.gzipBody", false); final int timeout = (int) sb.getConfigLong("indexControl.timeout", 60000); - final String error = Protocol.transferIndex(seed, icc, knownURLs, segment, gzipBody, timeout); + final String error = Protocol.transferIndex(sb.peers, seed, icc, knownURLs, segment, gzipBody, timeout); prop.put("result", (error == null) ? ("Successfully transferred " + knownURLs.size() + " words in " diff --git a/source/net/yacy/peers/DHTSelection.java b/source/net/yacy/peers/DHTSelection.java index 92795f4d1..c902d2ad4 100644 --- a/source/net/yacy/peers/DHTSelection.java +++ b/source/net/yacy/peers/DHTSelection.java @@ -24,6 +24,7 @@ package net.yacy.peers; +import java.lang.reflect.Array; import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; @@ -48,13 +49,10 @@ import net.yacy.kelondro.data.word.Word; import net.yacy.kelondro.util.kelondroException; import net.yacy.peers.operation.yacyVersion; - - -/* - * this package is a collection of peer selection iterations that had been +/** + * This package is a collection of peer selection iterations that had been * part of yacyPeerActions, yacyDHTActions and yacySeedDB */ - public class DHTSelection { public static Set selectClusterPeers(final SeedDB seedDB, final SortedMap peerhashes) { @@ -138,7 +136,7 @@ public class DHTSelection { return extraSeeds; } - + public static Set selectDHTSearchTargets(final SeedDB seedDB, final HandleSet wordhashes, final int minage, final int redundancy, final int maxredundancy, final Random random) { // put in seeds according to dht @@ -172,11 +170,21 @@ public class DHTSelection { return collectedSeeds; } + @SuppressWarnings("unchecked") + public static List[] selectDHTDistributionTargets(final SeedDB seedDB, final byte[] wordhash, final int minage, final int redundancy) { + // this method is called from the distribution target computation + List[] seedlists = (List[]) Array.newInstance(ArrayList.class, seedDB.scheme.verticalPartitions()); + for (int verticalPosition = 0; verticalPosition < seedDB.scheme.verticalPartitions(); verticalPosition++) { + seedlists[verticalPosition] = selectVerticalDHTPositions(seedDB, wordhash, minage, redundancy, verticalPosition); + } + return seedlists; + } + /** * collecting vertical positions: that chooses for each of the DHT partition a collection of redundant storage positions * @param seedDB the database of seeds * @param wordhash the word we are searching for - * @param minage the minimum age of a seed (to prevent that too young seeds which cannot have results yet are asked) + * @param minage the minimum age of a seed in days (to prevent that too young seeds which cannot have results yet are asked) * @param redundancy the number of redundant peer position for this parition, minimum is 1 * @param verticalPosition the verical position, thats the number of the partition 0 <= verticalPosition < seedDB.scheme.verticalPartitions() * @return a list of seeds for the redundant positions diff --git a/source/net/yacy/peers/Dispatcher.java b/source/net/yacy/peers/Dispatcher.java index 9b4aebd3d..c76e97afd 100644 --- a/source/net/yacy/peers/Dispatcher.java +++ b/source/net/yacy/peers/Dispatcher.java @@ -34,7 +34,6 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import net.yacy.cora.document.encoding.ASCII; -import net.yacy.cora.federate.yacy.Distribution; import net.yacy.cora.order.Base64Order; import net.yacy.cora.storage.HandleSet; import net.yacy.cora.util.ConcurrentLog; @@ -58,10 +57,10 @@ public class Dispatcher { * - (1) a number of RWIs are selected and accumulated. * When they are selected, they are removed from the index * - (2) the RWI collection is split into a number of partitions according to the vertical DHT. - * - (3) the split RWIs are enqueued as Entry object in the entry 'cloud' of the dispatcher - * - (4) more entries may be enqueued to the dispatcher and entries with the same primary target - * are accumulated. - * - (5) the largest entries are selected from the dispatcher cloud and enqueued to the 'next' array + * - (3) the split RWIs are enqueued as Entry object in the write buffer of the dispatcher + * - (4) more entries may be enqueued to the dispatcher and + * entries with the same primary target are accumulated to the same buffer entry. + * - (5) the largest entries are selected from the dispatcher write buffer and enqueued to the 'next' array * which means that they are ready for transmission * - (6) the dispatcher takes some of the entries in the next queue and initiates * transmission to other peers concurrently. As much transmissions are initiated concurrently @@ -83,9 +82,11 @@ public class Dispatcher { * called failed. In case of a fail, the RWI fragment is put back into the backend index structure */ - // a cloud is a cache for the objects that wait to be transmitted - // the String-key is the primary target as contained in the Entry - private Map transmissionCloud; + /** + * A transmission buffer is a write buffer for the rwi objects (indices) that wait to be transmitted. + * The String-key is the primary target as contained in the chunk entry. + */ + private Map transmissionBuffer; // the segment backend is used to store the remaining indexContainers in case that the object is closed private final Segment segment; @@ -108,7 +109,7 @@ public class Dispatcher { final boolean gzipBody, final int timeout ) { - this.transmissionCloud = new ConcurrentHashMap(); + this.transmissionBuffer = new ConcurrentHashMap(); this.segment = segment; this.seeds = seeds; this.log = new ConcurrentLog("INDEX-TRANSFER-DISPATCHER"); @@ -127,8 +128,8 @@ public class Dispatcher { this, "transferDocumentIndex", concurrentSender * 3, null, concurrentSender); } - public int cloudSize() { - return (this.transmissionCloud == null) ? 0 : this.transmissionCloud.size(); + public int bufferSize() { + return (this.transmissionBuffer == null) ? 0 : this.transmissionBuffer.size(); } public int transmissionSize() { @@ -137,8 +138,8 @@ public class Dispatcher { /** * PROCESS(1) - * select a number of index containers from the backend index. - * Selected containers are removed from the backend. + * Select a number of index containers from the RWI index. + * Selected containers are removed from the RWIs (not from Solr, only the DHT references). * @param hash * @param limitHash * @param maxContainerCount @@ -227,31 +228,9 @@ public class Dispatcher { * PROCESS(2) * split a list of containers into partitions according to the vertical distribution scheme * @param containers - * @param scheme - * @return + * @return a #verticalPartitions list of reference containers, one for each vertical position * @throws SpaceExceededException */ - @SuppressWarnings("unchecked") - private List>[] splitContainers(final List> containers) throws SpaceExceededException { - - // init the result vector - final int partitionCount = this.seeds.scheme.verticalPartitions(); - final List>[] partitions = (List>[]) Array.newInstance(ArrayList.class, partitionCount); - for (int i = 0; i < partitions.length; i++) partitions[i] = new ArrayList>(); - - // check all entries and split them to the partitions - for (final ReferenceContainer container: containers) { - // init the new partitions - final ReferenceContainer[] partitionBuffer = splitContainer(container); - - // add the containers to the result vector - for (int j = 0; j < partitionBuffer.length; j++) { - partitions[j].add(partitionBuffer[j]); - } - } - return partitions; - } - private ReferenceContainer[] splitContainer(final ReferenceContainer container) throws SpaceExceededException { // init the result vector @@ -279,116 +258,97 @@ public class Dispatcher { /** * PROCESS(3) and PROCESS(4) - * put containers into cloud. This needs information about the network, + * put containers into the write buffer. This needs information about the network, * because the possible targets are assigned here as well. The indexRepositoryReference * is the database of references which is needed here because this is the place where * finally is checked if the reference exists. If the entry does not exist for specific * entries in the indexContainer, then it is discarded. If it exists, then the entry is * stored in a cache of the Entry for later transmission to the targets, which means that * then no additional IO is necessary. + * @param containers a reference containers array, one container for each vertical position */ - private void enqueueContainersToCloud(final List>[] containers) { + private void enqueueContainersToBuffer(final byte[] wordhash, final ReferenceContainer[] containers) { assert (containers.length == this.seeds.scheme.verticalPartitions()); - if (this.transmissionCloud == null) return; - byte[] primaryTarget; - String primaryTargetString; - Transmission.Chunk entry; + if (this.transmissionBuffer == null) return; + List[] targets = DHTSelection.selectDHTDistributionTargets(this.seeds, wordhash, 3, this.seeds.redundancy()); + assert (targets.length == this.seeds.scheme.verticalPartitions()); + assert (targets.length == containers.length); for (int vertical = 0; vertical < containers.length; vertical++) { - List> verticalList = containers[vertical]; - ReferenceContainer lastContainer = verticalList.get(verticalList.size() - 1); - primaryTarget = Distribution.positionToHash(this.seeds.scheme.verticalDHTPosition(lastContainer.getTermHash(), vertical)); - assert primaryTarget[2] != '@'; - primaryTargetString = ASCII.String(primaryTarget); - - // get or make a entry object - entry = this.transmissionCloud.get(primaryTargetString); // if this is not null, the entry is extended here - final List targets = DHTSelection.getAcceptRemoteIndexSeedsList( - this.seeds, - primaryTarget, - this.seeds.redundancy() * 3, - true); - this.log.info("enqueueContainers: selected " + targets.size() + " targets for primary target key " + primaryTargetString + "/" + vertical + " with " + verticalList.size() + " index containers."); - if (entry == null) entry = this.transmission.newChunk(primaryTargetString, targets); - - // fill the entry with the containers - for (final ReferenceContainer c: verticalList) { + ReferenceContainer verticalContainer = containers[vertical]; + if (verticalContainer.isEmpty()) continue; + + // extend the transmissionBuffer with entries for each redundant position + for (Seed target: targets[vertical]) { + Transmission.Chunk entry = this.transmissionBuffer.get(target.hash); // if this is not null, the entry is extended here + if (entry == null) entry = transmission.newChunk(target); else { + log.info("extending chunk for peer " + entry.dhtTarget().hash + " containing " + entry.containersSize() + " references with " + verticalContainer.size() + " more entries"); + } try { - entry.add(c); - } catch (final SpaceExceededException e) { + entry.add(verticalContainer); + } catch (SpaceExceededException e) { ConcurrentLog.logException(e); - break; } + this.transmissionBuffer.put(target.hash, entry); } - - // put the entry into the cloud - if (this.transmissionCloud != null && entry.containersSize() > 0) this.transmissionCloud.put(primaryTargetString, entry); } } - - public boolean selectContainersEnqueueToCloud( + + public boolean selectContainersEnqueueToBuffer( final byte[] hash, final byte[] limitHash, final int maxContainerCount, final int maxReferenceCount, final int maxtime) { - if (this.transmissionCloud == null) return false; + if (this.transmissionBuffer == null) return false; List> selectedContainerCache; try { selectedContainerCache = selectContainers(hash, limitHash, maxContainerCount, maxReferenceCount, maxtime); } catch (final IOException e) { - this.log.severe("selectContainersEnqueueToCloud: selectedContainer failed", e); + this.log.severe("selectContainersEnqueueToBuffer: selectedContainer failed", e); return false; } - this.log.info("selectContainersEnqueueToCloud: selectedContainerCache was filled with " + selectedContainerCache.size() + " entries"); + this.log.info("selectContainersEnqueueToBuffer: selectedContainerCache was filled with " + selectedContainerCache.size() + " entries"); if (selectedContainerCache == null || selectedContainerCache.isEmpty()) { - this.log.info("selectContainersEnqueueToCloud: selectedContainerCache is empty, cannot do anything here."); + this.log.info("selectContainersEnqueueToBuffer: selectedContainerCache is empty, cannot do anything here."); return false; } - List>[] splitContainerCache; // for each vertical partition a set of word references within a reference container + // check all entries and split them to the partitions try { - splitContainerCache = splitContainers(selectedContainerCache); + for (final ReferenceContainer container: selectedContainerCache) { + // init the new partitions + final ReferenceContainer[] partitionBuffer = splitContainer(container); + enqueueContainersToBuffer(container.getTermHash(), partitionBuffer); + } } catch (final SpaceExceededException e) { - this.log.severe("selectContainersEnqueueToCloud: splitContainers failed because of too low RAM", e); + this.log.severe("splitContainer: splitContainers failed because of too low RAM", e); return false; } - selectedContainerCache = null; - if (splitContainerCache == null) { - this.log.info("selectContainersEnqueueToCloud: splitContainerCache is empty, cannot do anything here."); - return false; - } - this.log.info("splitContainersFromCache: splitContainerCache filled with " + splitContainerCache.length + " partitions, deleting selectedContainerCache"); - if (splitContainerCache.length != this.seeds.scheme.verticalPartitions()) { - this.log.warn("selectContainersEnqueueToCloud: splitContainerCache has wrong length."); - return false; - } - enqueueContainersToCloud(splitContainerCache); - splitContainerCache = null; - this.log.info("selectContainersEnqueueToCloud: splitContainerCache enqueued to cloud array which has now " + this.transmissionCloud.size() + " entries."); + this.log.info("selectContainersEnqueueToBuffer: splitContainerCache enqueued to the write buffer array which has now " + this.transmissionBuffer.size() + " entries."); return true; } /** * PROCESS(5) - * take the largest container from the cloud and put it into the 'next' array, + * take the largest container from the write buffer and put it into the 'next' array, * where it waits to be processed. * This method returns true if a container was dequeued, false if not */ public boolean dequeueContainer() { - if (this.transmissionCloud == null) return false; + if (this.transmissionBuffer == null) return false; if (this.indexingTransmissionProcessor.getQueueSize() > this.indexingTransmissionProcessor.getMaxConcurrency()) return false; String maxtarget = null; int maxsize = -1; - for (final Map.Entry chunk: this.transmissionCloud.entrySet()) { + for (final Map.Entry chunk: this.transmissionBuffer.entrySet()) { if (chunk.getValue().containersSize() > maxsize) { maxsize = chunk.getValue().containersSize(); maxtarget = chunk.getKey(); } } if (maxsize < 0) return false; - final Transmission.Chunk chunk = this.transmissionCloud.remove(maxtarget); + final Transmission.Chunk chunk = this.transmissionBuffer.remove(maxtarget); this.indexingTransmissionProcessor.enQueue(chunk); return true; } @@ -410,20 +370,9 @@ public class Dispatcher { // do the transmission final boolean success = chunk.transmit(); + if (success) return chunk; - if (success && chunk.isFinished()) { - // finished with this queue! - this.log.info("STORE: Chunk " + chunk.primaryTarget() + " has FINISHED all transmissions!"); - return chunk; - } - - if (!success) this.log.info("STORE: Chunk " + chunk.primaryTarget() + " has failed to transmit index; marked peer as busy"); - - if (chunk.canFinish()) { - if (this.indexingTransmissionProcessor != null) this.indexingTransmissionProcessor.enQueue(chunk); - return chunk; - } - this.log.info("STORE: Chunk " + chunk.primaryTarget() + " has not enough targets left. This transmission has failed, putting back index to backend"); + this.log.info("STORE: Chunk " + chunk.dhtTarget().getName() + " does not respond or accept the dht index, putting back index to backend"); chunk.restore(); return null; } @@ -431,8 +380,8 @@ public class Dispatcher { public void close() { // removes all entries from the dispatcher and puts them back to a RAMRI if (this.indexingTransmissionProcessor != null) this.indexingTransmissionProcessor.shutdown(); - if (this.transmissionCloud != null) { - outerLoop: for (final Map.Entry e : this.transmissionCloud.entrySet()) { + if (this.transmissionBuffer != null) { + outerLoop: for (final Map.Entry e : this.transmissionBuffer.entrySet()) { for (final ReferenceContainer i : e.getValue()) try { this.segment.storeRWI(i); } catch (final Exception e1) { @@ -440,9 +389,9 @@ public class Dispatcher { break outerLoop; } } - this.transmissionCloud.clear(); + this.transmissionBuffer.clear(); } - this.transmissionCloud = null; + this.transmissionBuffer = null; if (this.indexingTransmissionProcessor != null) { this.indexingTransmissionProcessor.clear(); } diff --git a/source/net/yacy/peers/Protocol.java b/source/net/yacy/peers/Protocol.java index 2c4054621..cd270f69d 100644 --- a/source/net/yacy/peers/Protocol.java +++ b/source/net/yacy/peers/Protocol.java @@ -1355,6 +1355,7 @@ public final class Protocol { * @return */ public static String transferIndex( + final SeedDB seeds, final Seed targetSeed, final ReferenceContainerCache indexes, final HandleSet urlRefs, @@ -1383,15 +1384,21 @@ public final class Protocol { Map in = transferRWI(targetSeed, indexes, gzipBody, timeout); if ( in == null ) { - return "no connection from transferRWI"; + String errorCause = "no connection from transferRWI"; + seeds.peerActions.peerDeparture(targetSeed, errorCause); // disconnect unavailable peer + return errorCause; } String result = in.get("result"); if ( result == null ) { - return "no result from transferRWI"; + String errorCause = "no result from transferRWI"; + seeds.peerActions.peerDeparture(targetSeed, errorCause); // disconnect unavailable peer + return errorCause; } if ( !(result.equals("ok")) ) { + targetSeed.setFlagAcceptRemoteIndex(false); // the peer does not want our index + seeds.addConnected(targetSeed); // update the peer return result; } @@ -1426,10 +1433,14 @@ public final class Protocol { result = in.get("result"); if ( result == null ) { - return "no result from transferURL"; + String errorCause = "no result from transferURL"; + seeds.peerActions.peerDeparture(targetSeed, errorCause); // disconnect unavailable peer + return errorCause; } if ( !result.equals("ok") ) { + targetSeed.setFlagAcceptRemoteIndex(false); // the peer does not want our index + seeds.addConnected(targetSeed); // update the peer return result; } EventChannel.channels(EventChannel.DHTSEND).addMessage( diff --git a/source/net/yacy/peers/Transmission.java b/source/net/yacy/peers/Transmission.java index 5ecc0247a..8795209c7 100644 --- a/source/net/yacy/peers/Transmission.java +++ b/source/net/yacy/peers/Transmission.java @@ -69,8 +69,8 @@ public class Transmission { this.timeout4Transfer = timeout4Transfer; } - public Chunk newChunk(final String primaryTarget, final List targets) { - return new Chunk(primaryTarget, targets); + public Chunk newChunk(final Seed dhtTarget) { + return new Chunk(dhtTarget); } public class Chunk extends WorkflowJob implements Iterable> { @@ -84,27 +84,24 @@ public class Transmission { * - a set of yacy seeds which will shrink as the containers are transmitted to them * - a counter that gives the number of sucessful and unsuccessful transmissions so far */ - private final String primaryTarget; + private final Seed dhtTarget; private final ReferenceContainerCache containers; private final HandleSet references; private final HandleSet badReferences; - private final List targets; private int hit, miss; /** * generate a new dispatcher target. such a target is defined with a primary target and * a set of target peers that shall receive the entries of the containers * the payloadrow defines the structure of container entries - * @param primaryTarget - * @param targets + * @param dhtTarget */ - public Chunk(final String primaryTarget, final List targets) { + public Chunk(final Seed dhtTarget) { super(); - this.primaryTarget = primaryTarget; + this.dhtTarget = dhtTarget; this.containers = new ReferenceContainerCache(Segment.wordReferenceFactory, Segment.wordOrder, Word.commonHashLength); this.references = new RowHandleSet(WordReferenceRow.urlEntryRow.primaryKeyLength, WordReferenceRow.urlEntryRow.objectOrder, 0); this.badReferences = new RowHandleSet(WordReferenceRow.urlEntryRow.primaryKeyLength, WordReferenceRow.urlEntryRow.objectOrder, 0); - this.targets = targets; this.hit = 0; this.miss = 0; } @@ -203,8 +200,8 @@ public class Transmission { return this.containers.size(); } - public String primaryTarget() { - return this.primaryTarget; + public Seed dhtTarget() { + return this.dhtTarget; } /** @@ -223,39 +220,27 @@ public class Transmission { return this.miss; } - /** - * return the number of targets that are left in the target cache - * if this is empty, there may be no more use of this object and it should be flushed - * with the iterator method - * @return - */ - public int targets() { - return this.targets.size(); - } - public boolean transmit() { - if (this.targets.isEmpty()) return false; - final Seed target = this.targets.remove(0); // transferring selected words to remote peer - if (target == Transmission.this.seeds.mySeed() || target.hash.equals(Transmission.this.seeds.mySeed().hash)) { + if (this.dhtTarget == Transmission.this.seeds.mySeed() || this.dhtTarget.hash.equals(Transmission.this.seeds.mySeed().hash)) { // target is my own peer. This is easy. Just restore the indexContainer restore(); this.hit++; Transmission.this.log.info("Transfer of chunk to myself-target"); return true; } - Transmission.this.log.info("starting new index transmission request to " + this.primaryTarget); + Transmission.this.log.info("starting new index transmission request to " + this.dhtTarget.getName()); final long start = System.currentTimeMillis(); - final String error = Protocol.transferIndex(target, this.containers, this.references, Transmission.this.segment, Transmission.this.gzipBody4Transfer, Transmission.this.timeout4Transfer); + final String error = Protocol.transferIndex(Transmission.this.seeds, this.dhtTarget, this.containers, this.references, Transmission.this.segment, Transmission.this.gzipBody4Transfer, Transmission.this.timeout4Transfer); if (error == null) { // words successfully transfered final long transferTime = System.currentTimeMillis() - start; final Iterator> i = this.containers.iterator(); final ReferenceContainer firstContainer = (i == null) ? null : i.next(); Transmission.this.log.info("Index transfer of " + this.containers.size() + - " words [" + ((firstContainer == null) ? null : ASCII.String(firstContainer.getTermHash())) + " .. " + this.primaryTarget + "]" + + " references for terms [ " + ((firstContainer == null) ? null : ASCII.String(firstContainer.getTermHash())) + " ..]" + " and " + this.references.size() + " URLs" + - " to peer " + target.getName() + ":" + target.hash + + " to peer " + this.dhtTarget.getName() + ":" + this.dhtTarget.hash + " in " + (transferTime / 1000) + " seconds successful (" + (1000 * this.containers.size() / (transferTime + 1)) + " words/s)"); @@ -263,20 +248,20 @@ public class Transmission { Transmission.this.seeds.mySeed().incSU(this.references.size()); // if the peer has set a pause time and we are in flush mode (index transfer) // then we pause for a while now - Transmission.this.log.info("Transfer finished of chunk to target " + target.hash + "/" + target.getName()); + Transmission.this.log.info("Transfer finished of chunk to target " + this.dhtTarget.hash + "/" + this.dhtTarget.getName()); this.hit++; return true; } Transmission.this.log.info( - "Index transfer to peer " + target.getName() + ":" + target.hash + + "Index transfer to peer " + this.dhtTarget.getName() + ":" + this.dhtTarget.hash + " failed: " + error); this.miss++; // write information that peer does not receive index transmissions - Transmission.this.log.info("Transfer failed of chunk to target " + target.hash + "/" + target.getName() + ": " + error); + Transmission.this.log.info("Transfer failed of chunk to target " + this.dhtTarget.hash + "/" + this.dhtTarget.getName() + ": " + error); // get possibly newer target Info - final Seed newTarget = Transmission.this.seeds.get(target.hash); + final Seed newTarget = Transmission.this.seeds.get(this.dhtTarget.hash); if (newTarget != null) { - final String oldAddress = target.getPublicAddress(); + final String oldAddress = this.dhtTarget.getPublicAddress(); if ((oldAddress != null) && (oldAddress.equals(newTarget.getPublicAddress()))) { newTarget.setFlagAcceptRemoteIndex(false); Transmission.this.seeds.update(newTarget.hash, newTarget); @@ -289,16 +274,6 @@ public class Transmission { return false; } - public boolean isFinished() { - //System.out.println("canFinish: hit = " + this.hit + ", redundancy = " + seeds.redundancy() + ", targets.size() = " + targets.size()); - return this.hit >= Transmission.this.seeds.redundancy(); - } - - public boolean canFinish() { - //System.out.println("canFinish: hit = " + this.hit + ", redundancy = " + seeds.redundancy() + ", targets.size() = " + targets.size()); - return this.targets.size() >= Transmission.this.seeds.redundancy() - this.hit; - } - public void restore() { for (final ReferenceContainer ic : this) try { Transmission.this.segment.storeRWI(ic); diff --git a/source/net/yacy/search/Switchboard.java b/source/net/yacy/search/Switchboard.java index 773afccf2..d382caaff 100644 --- a/source/net/yacy/search/Switchboard.java +++ b/source/net/yacy/search/Switchboard.java @@ -3382,10 +3382,10 @@ public final class Switchboard extends serverSwitch { } boolean hasDoneSomething = false; final long kbytesUp = ConnectionInfo.getActiveUpbytes() / 1024; - // accumulate RWIs to transmission cloud - if ( this.dhtDispatcher.cloudSize() > this.peers.scheme.verticalPartitions() ) { - this.log.info("dhtTransferJob: no selection, too many entries in transmission cloud: " - + this.dhtDispatcher.cloudSize()); + // accumulate RWIs to transmission buffer + if ( this.dhtDispatcher.bufferSize() > this.peers.scheme.verticalPartitions() ) { + this.log.info("dhtTransferJob: no selection, too many entries in transmission buffer: " + + this.dhtDispatcher.bufferSize()); } else if ( MemoryControl.available() < 1024 * 1024 * 25 ) { this.log.info("dhtTransferJob: no selection, too less memory available : " + (MemoryControl.available() / 1024 / 1024) @@ -3415,7 +3415,7 @@ public final class Switchboard extends serverSwitch { this.log.info("dhtTransferJob: selected " + ASCII.String(startHash) + " as start hash"); this.log.info("dhtTransferJob: selected " + ASCII.String(limitHash) + " as limit hash"); final boolean enqueued = - this.dhtDispatcher.selectContainersEnqueueToCloud( + this.dhtDispatcher.selectContainersEnqueueToBuffer( startHash, limitHash, dhtMaxContainerCount, @@ -3428,7 +3428,7 @@ public final class Switchboard extends serverSwitch { // check if we can deliver entries to other peers if ( this.dhtDispatcher.transmissionSize() >= 10 ) { this.log - .info("dhtTransferJob: no dequeueing from cloud to transmission: too many concurrent sessions: " + .info("dhtTransferJob: no dequeueing from buffer to transmission: too many concurrent sessions: " + this.dhtDispatcher.transmissionSize()); } else if ( ConnectionInfo.getLoadPercent() > 75 ) { this.log.info("dhtTransferJob: too many connections in httpc pool : "