diff --git a/source/de/anomic/plasma/plasmaSwitchboard.java b/source/de/anomic/plasma/plasmaSwitchboard.java index c56b7983a..909b61660 100644 --- a/source/de/anomic/plasma/plasmaSwitchboard.java +++ b/source/de/anomic/plasma/plasmaSwitchboard.java @@ -1913,29 +1913,38 @@ public final class plasmaSwitchboard extends serverAbstractSwitch this.webIndex.seedDB.scheme.verticalPartitions() * 4) { + log.logInfo("dhtTransferJob: no selection, too many entries in transmission cloud: " + this.dhtDispatcher.cloudSize()); + } else if (MemoryControl.available() < 1024*1024*20) { + log.logInfo("dhtTransferJob: no selection, too less memory available : " + (MemoryControl.available() / 1024 / 1024) + " MB"); + } else { + String startHash = PeerSelection.selectTransferStart(); + log.logInfo("dhtTransferJob: selected " + startHash + " as start hash"); + String limitHash = PeerSelection.limitOver(this.webIndex.seedDB, startHash); + log.logInfo("dhtTransferJob: selected " + limitHash + " as limit hash"); + try { + boolean enqueued = this.dhtDispatcher.selectContainersEnqueueToCloud( + startHash, + limitHash, + dhtMaxContainerCount, + dhtMaxReferenceCount, + 2000); + hasDoneSomething = hasDoneSomething | enqueued; + log.logInfo("dhtTransferJob: result from enqueueing: " + ((enqueued) ? "true" : "false")); + } catch (IOException e) { + log.logSevere("dhtTransferJob: interrupted with exception: " + e.getMessage(), e); + return false; + } + } + if (this.dhtDispatcher.transmissionSize() >= 10) { + log.logInfo("dhtTransferJob: no dequeueing from cloud to transmission: too many concurrent sessions: " + this.dhtDispatcher.transmissionSize()); + } else { + boolean dequeued = this.dhtDispatcher.dequeueContainer(); + hasDoneSomething = hasDoneSomething | dequeued; + log.logInfo("dhtTransferJob: result from dequeueing: " + ((dequeued) ? "true" : "false")); } - int splitted = this.dhtDispatcher.splitContainersFromCache(); - log.logInfo("dhtTransferJob: splitted selected container in " + splitted + " parts"); - boolean enqueued = this.dhtDispatcher.enqueueContainersFromCache(); - log.logInfo("dhtTransferJob: result from enqueueing: " + ((enqueued) ? "true" : "false")); - boolean dequeued = this.dhtDispatcher.dequeueContainer(); - log.logInfo("dhtTransferJob: result from dequeueing: " + ((dequeued) ? "true" : "false")); - return dequeued; + return hasDoneSomething; } private void addURLtoErrorDB( diff --git a/source/de/anomic/yacy/dht/Dispatcher.java b/source/de/anomic/yacy/dht/Dispatcher.java index 5ccbd0448..6c6d581f8 100755 --- a/source/de/anomic/yacy/dht/Dispatcher.java +++ b/source/de/anomic/yacy/dht/Dispatcher.java @@ -92,11 +92,7 @@ public class Dispatcher { private serverProcessor indexingTransmissionProcessor; // transmission object - private Transmission transmissions; - - // caching objects - private ArrayList selectedContainerCache; - private ArrayList[] splittedContainerCache; + private Transmission transmission; public Dispatcher( final indexRI backend, @@ -109,15 +105,15 @@ public class Dispatcher { this.backend = backend; this.seeds = seeds; this.log = new Log("INDEX TRANSFER DISPATCHER"); - this.transmissions = new Transmission( + this.transmission = new Transmission( log, repository, seeds, backend, gzipBody, timeout); - this.selectedContainerCache = null; - this.splittedContainerCache = null; + //this.selectedContainerCache = null; + //this.splittedContainerCache = null; int concurrentSender = Math.min(25, Math.max(10, serverProcessor.useCPU * 2 + 1)); indexingTransmissionProcessor = new serverProcessor( @@ -127,6 +123,14 @@ public class Dispatcher { this, "storeDocumentIndex", concurrentSender * 2, null, concurrentSender); } + public int cloudSize() { + return this.transmissionCloud.size(); + } + + public int transmissionSize() { + return this.indexingTransmissionProcessor.queueSize(); + } + /** * PROCESS(1) * select a number of index containers from the backend index. @@ -138,7 +142,7 @@ public class Dispatcher { * @return * @throws IOException */ - public ArrayList selectContainers( + private ArrayList selectContainers( final String hash, final String limitHash, final int maxContainerCount, @@ -153,7 +157,7 @@ public class Dispatcher { return containers; } - public ArrayList selectContainers( + private ArrayList selectContainers( final String hash, final String limitHash, final int maxContainerCount, @@ -190,30 +194,6 @@ public class Dispatcher { return containers; } - /** - * convenience method for the selection process: put the result in an internal cache - * @param hash - * @param limitHash - * @param maxContainerCount - * @param maxtime - * @return - * @throws IOException - */ - public synchronized int selectContainersToCache( - final String hash, - final String limitHash, - final int maxContainerCount, - final int maxReferenceCount, - final int maxtime) throws IOException { - if (this.selectedContainerCache != null && this.selectedContainerCache.size() > 0) { - this.log.logInfo("selectContainersToCache: selectedContainerCache is already filled, no selection done."); - return 0; - } - this.selectedContainerCache = selectContainers(hash, limitHash, maxContainerCount, maxReferenceCount, maxtime); - this.log.logInfo("selectContainersToCache: selectedContainerCache was filled with " + this.selectedContainerCache.size() + " entries"); - return this.selectedContainerCache.size(); - } - /** * PROCESS(2) * split a list of containers into partitions according to the vertical distribution scheme @@ -222,7 +202,7 @@ public class Dispatcher { * @return */ @SuppressWarnings("unchecked") - public ArrayList[] splitContainers(ArrayList containers) { + private ArrayList[] splitContainers(ArrayList containers) { // init the result vector int partitionCount = this.seeds.scheme.verticalPartitions(); @@ -254,25 +234,6 @@ public class Dispatcher { return partitions; } - /** - * convenience method for splitContainers: use the container cache and write into a splitted container cache. - * @return - */ - public synchronized int splitContainersFromCache() { - if (selectedContainerCache == null || selectedContainerCache.size() == 0) { - this.log.logInfo("splitContainersFromCache: selectedContainerCache is empty, cannot do anything here."); - return 0; - } - if (splittedContainerCache != null && splittedContainerCache.length > 0) { - this.log.logInfo("splitContainersFromCache: splittedContainerCache is aready filled, doing nothing now."); - return 0; - } - this.splittedContainerCache = splitContainers(selectedContainerCache); - this.selectedContainerCache = null; - this.log.logInfo("splitContainersFromCache: splittedContainerCache filled with " + this.splittedContainerCache.length + " partitions, deleting selectedContainerCache"); - return this.splittedContainerCache.length; - } - /** * PROCESS(3) and PROCESS(4) * put containers into cloud. This needs information about the network, @@ -283,7 +244,7 @@ public class Dispatcher { * stored in a cache of the Entry for later transmission to the targets, which means that * then no additional IO is necessary. */ - public void enqueueContainers(final ArrayList[] containers) { + private void enqueueContainersToCloud(final ArrayList[] containers) { indexContainer lastContainer; String primaryTarget; Transmission.Chunk entry; @@ -300,7 +261,7 @@ public class Dispatcher { seeds.redundancy() * 3, true); this.log.logInfo("enqueueContainers: selected " + targets.size() + " targets for primary target key " + primaryTarget + "/" + vertical + " with " + containers[vertical].size() + " index containers."); - if (entry == null) entry = transmissions.newChunk(primaryTarget, targets, lastContainer.row()); + if (entry == null) entry = transmission.newChunk(primaryTarget, targets, lastContainer.row()); // fill the entry with the containers for (indexContainer c: containers[vertical]) { @@ -311,14 +272,35 @@ public class Dispatcher { if (entry.containersSize() > 0) this.transmissionCloud.put(primaryTarget, entry); } } - - public boolean enqueueContainersFromCache() { - if (this.splittedContainerCache == null) { + + public synchronized boolean selectContainersEnqueueToCloud( + final String hash, + final String limitHash, + final int maxContainerCount, + final int maxReferenceCount, + final int maxtime) throws IOException { + + ArrayList selectedContainerCache = selectContainers(hash, limitHash, maxContainerCount, maxReferenceCount, maxtime); + this.log.logInfo("selectContainersToCache: selectedContainerCache was filled with " + selectedContainerCache.size() + " entries"); + + if (selectedContainerCache == null || selectedContainerCache.size() == 0) { + this.log.logInfo("splitContainersFromCache: selectedContainerCache is empty, cannot do anything here."); + return false; + } + + ArrayList[] splittedContainerCache = splitContainers(selectedContainerCache); + selectedContainerCache = null; + if (splittedContainerCache == null) { this.log.logInfo("enqueueContainersFromCache: splittedContainerCache is empty, cannot do anything here."); return false; } - enqueueContainers(this.splittedContainerCache); - this.splittedContainerCache = null; + this.log.logInfo("splitContainersFromCache: splittedContainerCache filled with " + splittedContainerCache.length + " partitions, deleting selectedContainerCache"); + if (splittedContainerCache.length != this.seeds.scheme.verticalPartitions()) { + this.log.logWarning("enqueueContainersFromCache: splittedContainerCache has wrong length."); + return false; + } + enqueueContainersToCloud(splittedContainerCache); + splittedContainerCache = null; this.log.logInfo("enqueueContainersFromCache: splittedContainerCache enqueued to cloud array which has now " + this.transmissionCloud.size() + " entries."); return true; } @@ -360,7 +342,7 @@ public class Dispatcher { return chunk; } - this.log.logInfo("STORE: Chunk " + chunk.primaryTarget() + " has failed to transmit index; removed peer from network"); + this.log.logInfo("STORE: Chunk " + chunk.primaryTarget() + " has failed to transmit index; marked peer as busy"); if (chunk.canFinish()) { try {