diff --git a/htroot/IndexControlRWIs_p.java b/htroot/IndexControlRWIs_p.java index 6ceaa4de1..2086dc937 100644 --- a/htroot/IndexControlRWIs_p.java +++ b/htroot/IndexControlRWIs_p.java @@ -315,7 +315,7 @@ public class IndexControlRWIs_p { // transport to other peer final boolean gzipBody = sb.getConfigBool("indexControl.gzipBody", false); final int timeout = (int) sb.getConfigLong("indexControl.timeout", 60000); - final String error = Protocol.transferIndex(seed, icc, knownURLs, segment, gzipBody, timeout); + final String error = Protocol.transferIndex(sb.peers, seed, icc, knownURLs, segment, gzipBody, timeout); prop.put("result", (error == null) ? ("Successfully transferred " + knownURLs.size() + " words in " diff --git a/source/net/yacy/peers/DHTSelection.java b/source/net/yacy/peers/DHTSelection.java index 92795f4d1..c902d2ad4 100644 --- a/source/net/yacy/peers/DHTSelection.java +++ b/source/net/yacy/peers/DHTSelection.java @@ -24,6 +24,7 @@ package net.yacy.peers; +import java.lang.reflect.Array; import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; @@ -48,13 +49,10 @@ import net.yacy.kelondro.data.word.Word; import net.yacy.kelondro.util.kelondroException; import net.yacy.peers.operation.yacyVersion; - - -/* - * this package is a collection of peer selection iterations that had been +/** + * This package is a collection of peer selection iterations that had been * part of yacyPeerActions, yacyDHTActions and yacySeedDB */ - public class DHTSelection { public static Set selectClusterPeers(final SeedDB seedDB, final SortedMap peerhashes) { @@ -138,7 +136,7 @@ public class DHTSelection { return extraSeeds; } - + public static Set selectDHTSearchTargets(final SeedDB seedDB, final HandleSet wordhashes, final int minage, final int redundancy, final int maxredundancy, final Random random) { // put in seeds according to dht @@ -172,11 +170,21 @@ public class DHTSelection { return collectedSeeds; } + @SuppressWarnings("unchecked") + public static List[] selectDHTDistributionTargets(final SeedDB seedDB, final byte[] wordhash, final int minage, final int redundancy) { + // this method is called from the distribution target computation + List[] seedlists = (List[]) Array.newInstance(ArrayList.class, seedDB.scheme.verticalPartitions()); + for (int verticalPosition = 0; verticalPosition < seedDB.scheme.verticalPartitions(); verticalPosition++) { + seedlists[verticalPosition] = selectVerticalDHTPositions(seedDB, wordhash, minage, redundancy, verticalPosition); + } + return seedlists; + } + /** * collecting vertical positions: that chooses for each of the DHT partition a collection of redundant storage positions * @param seedDB the database of seeds * @param wordhash the word we are searching for - * @param minage the minimum age of a seed (to prevent that too young seeds which cannot have results yet are asked) + * @param minage the minimum age of a seed in days (to prevent that too young seeds which cannot have results yet are asked) * @param redundancy the number of redundant peer position for this parition, minimum is 1 * @param verticalPosition the verical position, thats the number of the partition 0 <= verticalPosition < seedDB.scheme.verticalPartitions() * @return a list of seeds for the redundant positions diff --git a/source/net/yacy/peers/Dispatcher.java b/source/net/yacy/peers/Dispatcher.java index 9b4aebd3d..c76e97afd 100644 --- a/source/net/yacy/peers/Dispatcher.java +++ b/source/net/yacy/peers/Dispatcher.java @@ -34,7 +34,6 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import net.yacy.cora.document.encoding.ASCII; -import net.yacy.cora.federate.yacy.Distribution; import net.yacy.cora.order.Base64Order; import net.yacy.cora.storage.HandleSet; import net.yacy.cora.util.ConcurrentLog; @@ -58,10 +57,10 @@ public class Dispatcher { * - (1) a number of RWIs are selected and accumulated. * When they are selected, they are removed from the index * - (2) the RWI collection is split into a number of partitions according to the vertical DHT. - * - (3) the split RWIs are enqueued as Entry object in the entry 'cloud' of the dispatcher - * - (4) more entries may be enqueued to the dispatcher and entries with the same primary target - * are accumulated. - * - (5) the largest entries are selected from the dispatcher cloud and enqueued to the 'next' array + * - (3) the split RWIs are enqueued as Entry object in the write buffer of the dispatcher + * - (4) more entries may be enqueued to the dispatcher and + * entries with the same primary target are accumulated to the same buffer entry. + * - (5) the largest entries are selected from the dispatcher write buffer and enqueued to the 'next' array * which means that they are ready for transmission * - (6) the dispatcher takes some of the entries in the next queue and initiates * transmission to other peers concurrently. As much transmissions are initiated concurrently @@ -83,9 +82,11 @@ public class Dispatcher { * called failed. In case of a fail, the RWI fragment is put back into the backend index structure */ - // a cloud is a cache for the objects that wait to be transmitted - // the String-key is the primary target as contained in the Entry - private Map transmissionCloud; + /** + * A transmission buffer is a write buffer for the rwi objects (indices) that wait to be transmitted. + * The String-key is the primary target as contained in the chunk entry. + */ + private Map transmissionBuffer; // the segment backend is used to store the remaining indexContainers in case that the object is closed private final Segment segment; @@ -108,7 +109,7 @@ public class Dispatcher { final boolean gzipBody, final int timeout ) { - this.transmissionCloud = new ConcurrentHashMap(); + this.transmissionBuffer = new ConcurrentHashMap(); this.segment = segment; this.seeds = seeds; this.log = new ConcurrentLog("INDEX-TRANSFER-DISPATCHER"); @@ -127,8 +128,8 @@ public class Dispatcher { this, "transferDocumentIndex", concurrentSender * 3, null, concurrentSender); } - public int cloudSize() { - return (this.transmissionCloud == null) ? 0 : this.transmissionCloud.size(); + public int bufferSize() { + return (this.transmissionBuffer == null) ? 0 : this.transmissionBuffer.size(); } public int transmissionSize() { @@ -137,8 +138,8 @@ public class Dispatcher { /** * PROCESS(1) - * select a number of index containers from the backend index. - * Selected containers are removed from the backend. + * Select a number of index containers from the RWI index. + * Selected containers are removed from the RWIs (not from Solr, only the DHT references). * @param hash * @param limitHash * @param maxContainerCount @@ -227,31 +228,9 @@ public class Dispatcher { * PROCESS(2) * split a list of containers into partitions according to the vertical distribution scheme * @param containers - * @param scheme - * @return + * @return a #verticalPartitions list of reference containers, one for each vertical position * @throws SpaceExceededException */ - @SuppressWarnings("unchecked") - private List>[] splitContainers(final List> containers) throws SpaceExceededException { - - // init the result vector - final int partitionCount = this.seeds.scheme.verticalPartitions(); - final List>[] partitions = (List>[]) Array.newInstance(ArrayList.class, partitionCount); - for (int i = 0; i < partitions.length; i++) partitions[i] = new ArrayList>(); - - // check all entries and split them to the partitions - for (final ReferenceContainer container: containers) { - // init the new partitions - final ReferenceContainer[] partitionBuffer = splitContainer(container); - - // add the containers to the result vector - for (int j = 0; j < partitionBuffer.length; j++) { - partitions[j].add(partitionBuffer[j]); - } - } - return partitions; - } - private ReferenceContainer[] splitContainer(final ReferenceContainer container) throws SpaceExceededException { // init the result vector @@ -279,116 +258,97 @@ public class Dispatcher { /** * PROCESS(3) and PROCESS(4) - * put containers into cloud. This needs information about the network, + * put containers into the write buffer. This needs information about the network, * because the possible targets are assigned here as well. The indexRepositoryReference * is the database of references which is needed here because this is the place where * finally is checked if the reference exists. If the entry does not exist for specific * entries in the indexContainer, then it is discarded. If it exists, then the entry is * stored in a cache of the Entry for later transmission to the targets, which means that * then no additional IO is necessary. + * @param containers a reference containers array, one container for each vertical position */ - private void enqueueContainersToCloud(final List>[] containers) { + private void enqueueContainersToBuffer(final byte[] wordhash, final ReferenceContainer[] containers) { assert (containers.length == this.seeds.scheme.verticalPartitions()); - if (this.transmissionCloud == null) return; - byte[] primaryTarget; - String primaryTargetString; - Transmission.Chunk entry; + if (this.transmissionBuffer == null) return; + List[] targets = DHTSelection.selectDHTDistributionTargets(this.seeds, wordhash, 3, this.seeds.redundancy()); + assert (targets.length == this.seeds.scheme.verticalPartitions()); + assert (targets.length == containers.length); for (int vertical = 0; vertical < containers.length; vertical++) { - List> verticalList = containers[vertical]; - ReferenceContainer lastContainer = verticalList.get(verticalList.size() - 1); - primaryTarget = Distribution.positionToHash(this.seeds.scheme.verticalDHTPosition(lastContainer.getTermHash(), vertical)); - assert primaryTarget[2] != '@'; - primaryTargetString = ASCII.String(primaryTarget); - - // get or make a entry object - entry = this.transmissionCloud.get(primaryTargetString); // if this is not null, the entry is extended here - final List targets = DHTSelection.getAcceptRemoteIndexSeedsList( - this.seeds, - primaryTarget, - this.seeds.redundancy() * 3, - true); - this.log.info("enqueueContainers: selected " + targets.size() + " targets for primary target key " + primaryTargetString + "/" + vertical + " with " + verticalList.size() + " index containers."); - if (entry == null) entry = this.transmission.newChunk(primaryTargetString, targets); - - // fill the entry with the containers - for (final ReferenceContainer c: verticalList) { + ReferenceContainer verticalContainer = containers[vertical]; + if (verticalContainer.isEmpty()) continue; + + // extend the transmissionBuffer with entries for each redundant position + for (Seed target: targets[vertical]) { + Transmission.Chunk entry = this.transmissionBuffer.get(target.hash); // if this is not null, the entry is extended here + if (entry == null) entry = transmission.newChunk(target); else { + log.info("extending chunk for peer " + entry.dhtTarget().hash + " containing " + entry.containersSize() + " references with " + verticalContainer.size() + " more entries"); + } try { - entry.add(c); - } catch (final SpaceExceededException e) { + entry.add(verticalContainer); + } catch (SpaceExceededException e) { ConcurrentLog.logException(e); - break; } + this.transmissionBuffer.put(target.hash, entry); } - - // put the entry into the cloud - if (this.transmissionCloud != null && entry.containersSize() > 0) this.transmissionCloud.put(primaryTargetString, entry); } } - - public boolean selectContainersEnqueueToCloud( + + public boolean selectContainersEnqueueToBuffer( final byte[] hash, final byte[] limitHash, final int maxContainerCount, final int maxReferenceCount, final int maxtime) { - if (this.transmissionCloud == null) return false; + if (this.transmissionBuffer == null) return false; List> selectedContainerCache; try { selectedContainerCache = selectContainers(hash, limitHash, maxContainerCount, maxReferenceCount, maxtime); } catch (final IOException e) { - this.log.severe("selectContainersEnqueueToCloud: selectedContainer failed", e); + this.log.severe("selectContainersEnqueueToBuffer: selectedContainer failed", e); return false; } - this.log.info("selectContainersEnqueueToCloud: selectedContainerCache was filled with " + selectedContainerCache.size() + " entries"); + this.log.info("selectContainersEnqueueToBuffer: selectedContainerCache was filled with " + selectedContainerCache.size() + " entries"); if (selectedContainerCache == null || selectedContainerCache.isEmpty()) { - this.log.info("selectContainersEnqueueToCloud: selectedContainerCache is empty, cannot do anything here."); + this.log.info("selectContainersEnqueueToBuffer: selectedContainerCache is empty, cannot do anything here."); return false; } - List>[] splitContainerCache; // for each vertical partition a set of word references within a reference container + // check all entries and split them to the partitions try { - splitContainerCache = splitContainers(selectedContainerCache); + for (final ReferenceContainer container: selectedContainerCache) { + // init the new partitions + final ReferenceContainer[] partitionBuffer = splitContainer(container); + enqueueContainersToBuffer(container.getTermHash(), partitionBuffer); + } } catch (final SpaceExceededException e) { - this.log.severe("selectContainersEnqueueToCloud: splitContainers failed because of too low RAM", e); + this.log.severe("splitContainer: splitContainers failed because of too low RAM", e); return false; } - selectedContainerCache = null; - if (splitContainerCache == null) { - this.log.info("selectContainersEnqueueToCloud: splitContainerCache is empty, cannot do anything here."); - return false; - } - this.log.info("splitContainersFromCache: splitContainerCache filled with " + splitContainerCache.length + " partitions, deleting selectedContainerCache"); - if (splitContainerCache.length != this.seeds.scheme.verticalPartitions()) { - this.log.warn("selectContainersEnqueueToCloud: splitContainerCache has wrong length."); - return false; - } - enqueueContainersToCloud(splitContainerCache); - splitContainerCache = null; - this.log.info("selectContainersEnqueueToCloud: splitContainerCache enqueued to cloud array which has now " + this.transmissionCloud.size() + " entries."); + this.log.info("selectContainersEnqueueToBuffer: splitContainerCache enqueued to the write buffer array which has now " + this.transmissionBuffer.size() + " entries."); return true; } /** * PROCESS(5) - * take the largest container from the cloud and put it into the 'next' array, + * take the largest container from the write buffer and put it into the 'next' array, * where it waits to be processed. * This method returns true if a container was dequeued, false if not */ public boolean dequeueContainer() { - if (this.transmissionCloud == null) return false; + if (this.transmissionBuffer == null) return false; if (this.indexingTransmissionProcessor.getQueueSize() > this.indexingTransmissionProcessor.getMaxConcurrency()) return false; String maxtarget = null; int maxsize = -1; - for (final Map.Entry chunk: this.transmissionCloud.entrySet()) { + for (final Map.Entry chunk: this.transmissionBuffer.entrySet()) { if (chunk.getValue().containersSize() > maxsize) { maxsize = chunk.getValue().containersSize(); maxtarget = chunk.getKey(); } } if (maxsize < 0) return false; - final Transmission.Chunk chunk = this.transmissionCloud.remove(maxtarget); + final Transmission.Chunk chunk = this.transmissionBuffer.remove(maxtarget); this.indexingTransmissionProcessor.enQueue(chunk); return true; } @@ -410,20 +370,9 @@ public class Dispatcher { // do the transmission final boolean success = chunk.transmit(); + if (success) return chunk; - if (success && chunk.isFinished()) { - // finished with this queue! - this.log.info("STORE: Chunk " + chunk.primaryTarget() + " has FINISHED all transmissions!"); - return chunk; - } - - if (!success) this.log.info("STORE: Chunk " + chunk.primaryTarget() + " has failed to transmit index; marked peer as busy"); - - if (chunk.canFinish()) { - if (this.indexingTransmissionProcessor != null) this.indexingTransmissionProcessor.enQueue(chunk); - return chunk; - } - this.log.info("STORE: Chunk " + chunk.primaryTarget() + " has not enough targets left. This transmission has failed, putting back index to backend"); + this.log.info("STORE: Chunk " + chunk.dhtTarget().getName() + " does not respond or accept the dht index, putting back index to backend"); chunk.restore(); return null; } @@ -431,8 +380,8 @@ public class Dispatcher { public void close() { // removes all entries from the dispatcher and puts them back to a RAMRI if (this.indexingTransmissionProcessor != null) this.indexingTransmissionProcessor.shutdown(); - if (this.transmissionCloud != null) { - outerLoop: for (final Map.Entry e : this.transmissionCloud.entrySet()) { + if (this.transmissionBuffer != null) { + outerLoop: for (final Map.Entry e : this.transmissionBuffer.entrySet()) { for (final ReferenceContainer i : e.getValue()) try { this.segment.storeRWI(i); } catch (final Exception e1) { @@ -440,9 +389,9 @@ public class Dispatcher { break outerLoop; } } - this.transmissionCloud.clear(); + this.transmissionBuffer.clear(); } - this.transmissionCloud = null; + this.transmissionBuffer = null; if (this.indexingTransmissionProcessor != null) { this.indexingTransmissionProcessor.clear(); } diff --git a/source/net/yacy/peers/Protocol.java b/source/net/yacy/peers/Protocol.java index 2c4054621..cd270f69d 100644 --- a/source/net/yacy/peers/Protocol.java +++ b/source/net/yacy/peers/Protocol.java @@ -1355,6 +1355,7 @@ public final class Protocol { * @return */ public static String transferIndex( + final SeedDB seeds, final Seed targetSeed, final ReferenceContainerCache indexes, final HandleSet urlRefs, @@ -1383,15 +1384,21 @@ public final class Protocol { Map in = transferRWI(targetSeed, indexes, gzipBody, timeout); if ( in == null ) { - return "no connection from transferRWI"; + String errorCause = "no connection from transferRWI"; + seeds.peerActions.peerDeparture(targetSeed, errorCause); // disconnect unavailable peer + return errorCause; } String result = in.get("result"); if ( result == null ) { - return "no result from transferRWI"; + String errorCause = "no result from transferRWI"; + seeds.peerActions.peerDeparture(targetSeed, errorCause); // disconnect unavailable peer + return errorCause; } if ( !(result.equals("ok")) ) { + targetSeed.setFlagAcceptRemoteIndex(false); // the peer does not want our index + seeds.addConnected(targetSeed); // update the peer return result; } @@ -1426,10 +1433,14 @@ public final class Protocol { result = in.get("result"); if ( result == null ) { - return "no result from transferURL"; + String errorCause = "no result from transferURL"; + seeds.peerActions.peerDeparture(targetSeed, errorCause); // disconnect unavailable peer + return errorCause; } if ( !result.equals("ok") ) { + targetSeed.setFlagAcceptRemoteIndex(false); // the peer does not want our index + seeds.addConnected(targetSeed); // update the peer return result; } EventChannel.channels(EventChannel.DHTSEND).addMessage( diff --git a/source/net/yacy/peers/Transmission.java b/source/net/yacy/peers/Transmission.java index 5ecc0247a..8795209c7 100644 --- a/source/net/yacy/peers/Transmission.java +++ b/source/net/yacy/peers/Transmission.java @@ -69,8 +69,8 @@ public class Transmission { this.timeout4Transfer = timeout4Transfer; } - public Chunk newChunk(final String primaryTarget, final List targets) { - return new Chunk(primaryTarget, targets); + public Chunk newChunk(final Seed dhtTarget) { + return new Chunk(dhtTarget); } public class Chunk extends WorkflowJob implements Iterable> { @@ -84,27 +84,24 @@ public class Transmission { * - a set of yacy seeds which will shrink as the containers are transmitted to them * - a counter that gives the number of sucessful and unsuccessful transmissions so far */ - private final String primaryTarget; + private final Seed dhtTarget; private final ReferenceContainerCache containers; private final HandleSet references; private final HandleSet badReferences; - private final List targets; private int hit, miss; /** * generate a new dispatcher target. such a target is defined with a primary target and * a set of target peers that shall receive the entries of the containers * the payloadrow defines the structure of container entries - * @param primaryTarget - * @param targets + * @param dhtTarget */ - public Chunk(final String primaryTarget, final List targets) { + public Chunk(final Seed dhtTarget) { super(); - this.primaryTarget = primaryTarget; + this.dhtTarget = dhtTarget; this.containers = new ReferenceContainerCache(Segment.wordReferenceFactory, Segment.wordOrder, Word.commonHashLength); this.references = new RowHandleSet(WordReferenceRow.urlEntryRow.primaryKeyLength, WordReferenceRow.urlEntryRow.objectOrder, 0); this.badReferences = new RowHandleSet(WordReferenceRow.urlEntryRow.primaryKeyLength, WordReferenceRow.urlEntryRow.objectOrder, 0); - this.targets = targets; this.hit = 0; this.miss = 0; } @@ -203,8 +200,8 @@ public class Transmission { return this.containers.size(); } - public String primaryTarget() { - return this.primaryTarget; + public Seed dhtTarget() { + return this.dhtTarget; } /** @@ -223,39 +220,27 @@ public class Transmission { return this.miss; } - /** - * return the number of targets that are left in the target cache - * if this is empty, there may be no more use of this object and it should be flushed - * with the iterator method - * @return - */ - public int targets() { - return this.targets.size(); - } - public boolean transmit() { - if (this.targets.isEmpty()) return false; - final Seed target = this.targets.remove(0); // transferring selected words to remote peer - if (target == Transmission.this.seeds.mySeed() || target.hash.equals(Transmission.this.seeds.mySeed().hash)) { + if (this.dhtTarget == Transmission.this.seeds.mySeed() || this.dhtTarget.hash.equals(Transmission.this.seeds.mySeed().hash)) { // target is my own peer. This is easy. Just restore the indexContainer restore(); this.hit++; Transmission.this.log.info("Transfer of chunk to myself-target"); return true; } - Transmission.this.log.info("starting new index transmission request to " + this.primaryTarget); + Transmission.this.log.info("starting new index transmission request to " + this.dhtTarget.getName()); final long start = System.currentTimeMillis(); - final String error = Protocol.transferIndex(target, this.containers, this.references, Transmission.this.segment, Transmission.this.gzipBody4Transfer, Transmission.this.timeout4Transfer); + final String error = Protocol.transferIndex(Transmission.this.seeds, this.dhtTarget, this.containers, this.references, Transmission.this.segment, Transmission.this.gzipBody4Transfer, Transmission.this.timeout4Transfer); if (error == null) { // words successfully transfered final long transferTime = System.currentTimeMillis() - start; final Iterator> i = this.containers.iterator(); final ReferenceContainer firstContainer = (i == null) ? null : i.next(); Transmission.this.log.info("Index transfer of " + this.containers.size() + - " words [" + ((firstContainer == null) ? null : ASCII.String(firstContainer.getTermHash())) + " .. " + this.primaryTarget + "]" + + " references for terms [ " + ((firstContainer == null) ? null : ASCII.String(firstContainer.getTermHash())) + " ..]" + " and " + this.references.size() + " URLs" + - " to peer " + target.getName() + ":" + target.hash + + " to peer " + this.dhtTarget.getName() + ":" + this.dhtTarget.hash + " in " + (transferTime / 1000) + " seconds successful (" + (1000 * this.containers.size() / (transferTime + 1)) + " words/s)"); @@ -263,20 +248,20 @@ public class Transmission { Transmission.this.seeds.mySeed().incSU(this.references.size()); // if the peer has set a pause time and we are in flush mode (index transfer) // then we pause for a while now - Transmission.this.log.info("Transfer finished of chunk to target " + target.hash + "/" + target.getName()); + Transmission.this.log.info("Transfer finished of chunk to target " + this.dhtTarget.hash + "/" + this.dhtTarget.getName()); this.hit++; return true; } Transmission.this.log.info( - "Index transfer to peer " + target.getName() + ":" + target.hash + + "Index transfer to peer " + this.dhtTarget.getName() + ":" + this.dhtTarget.hash + " failed: " + error); this.miss++; // write information that peer does not receive index transmissions - Transmission.this.log.info("Transfer failed of chunk to target " + target.hash + "/" + target.getName() + ": " + error); + Transmission.this.log.info("Transfer failed of chunk to target " + this.dhtTarget.hash + "/" + this.dhtTarget.getName() + ": " + error); // get possibly newer target Info - final Seed newTarget = Transmission.this.seeds.get(target.hash); + final Seed newTarget = Transmission.this.seeds.get(this.dhtTarget.hash); if (newTarget != null) { - final String oldAddress = target.getPublicAddress(); + final String oldAddress = this.dhtTarget.getPublicAddress(); if ((oldAddress != null) && (oldAddress.equals(newTarget.getPublicAddress()))) { newTarget.setFlagAcceptRemoteIndex(false); Transmission.this.seeds.update(newTarget.hash, newTarget); @@ -289,16 +274,6 @@ public class Transmission { return false; } - public boolean isFinished() { - //System.out.println("canFinish: hit = " + this.hit + ", redundancy = " + seeds.redundancy() + ", targets.size() = " + targets.size()); - return this.hit >= Transmission.this.seeds.redundancy(); - } - - public boolean canFinish() { - //System.out.println("canFinish: hit = " + this.hit + ", redundancy = " + seeds.redundancy() + ", targets.size() = " + targets.size()); - return this.targets.size() >= Transmission.this.seeds.redundancy() - this.hit; - } - public void restore() { for (final ReferenceContainer ic : this) try { Transmission.this.segment.storeRWI(ic); diff --git a/source/net/yacy/search/Switchboard.java b/source/net/yacy/search/Switchboard.java index 773afccf2..d382caaff 100644 --- a/source/net/yacy/search/Switchboard.java +++ b/source/net/yacy/search/Switchboard.java @@ -3382,10 +3382,10 @@ public final class Switchboard extends serverSwitch { } boolean hasDoneSomething = false; final long kbytesUp = ConnectionInfo.getActiveUpbytes() / 1024; - // accumulate RWIs to transmission cloud - if ( this.dhtDispatcher.cloudSize() > this.peers.scheme.verticalPartitions() ) { - this.log.info("dhtTransferJob: no selection, too many entries in transmission cloud: " - + this.dhtDispatcher.cloudSize()); + // accumulate RWIs to transmission buffer + if ( this.dhtDispatcher.bufferSize() > this.peers.scheme.verticalPartitions() ) { + this.log.info("dhtTransferJob: no selection, too many entries in transmission buffer: " + + this.dhtDispatcher.bufferSize()); } else if ( MemoryControl.available() < 1024 * 1024 * 25 ) { this.log.info("dhtTransferJob: no selection, too less memory available : " + (MemoryControl.available() / 1024 / 1024) @@ -3415,7 +3415,7 @@ public final class Switchboard extends serverSwitch { this.log.info("dhtTransferJob: selected " + ASCII.String(startHash) + " as start hash"); this.log.info("dhtTransferJob: selected " + ASCII.String(limitHash) + " as limit hash"); final boolean enqueued = - this.dhtDispatcher.selectContainersEnqueueToCloud( + this.dhtDispatcher.selectContainersEnqueueToBuffer( startHash, limitHash, dhtMaxContainerCount, @@ -3428,7 +3428,7 @@ public final class Switchboard extends serverSwitch { // check if we can deliver entries to other peers if ( this.dhtDispatcher.transmissionSize() >= 10 ) { this.log - .info("dhtTransferJob: no dequeueing from cloud to transmission: too many concurrent sessions: " + .info("dhtTransferJob: no dequeueing from buffer to transmission: too many concurrent sessions: " + this.dhtDispatcher.transmissionSize()); } else if ( ConnectionInfo.getLoadPercent() > 75 ) { this.log.info("dhtTransferJob: too many connections in httpc pool : "