From bcc950c5336396c8988f9d542c863d1a889a09f2 Mon Sep 17 00:00:00 2001 From: theli Date: Sat, 13 May 2006 15:28:57 +0000 Subject: [PATCH] *) Bugfix for Index Transfer git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@2088 6c8d7289-2bf4-0310-a012-ef5d649a1542 --- htroot/IndexTransfer_p.java | 4 +- source/de/anomic/plasma/plasmaDHTChunk.java | 26 ++- source/de/anomic/plasma/plasmaDHTFlush.java | 166 ++++++++---------- .../de/anomic/plasma/plasmaDHTTransfer.java | 143 +++++++-------- .../de/anomic/plasma/plasmaSwitchboard.java | 102 ++++++----- 5 files changed, 216 insertions(+), 225 deletions(-) diff --git a/htroot/IndexTransfer_p.java b/htroot/IndexTransfer_p.java index 907a1b22d..b8dcc8868 100644 --- a/htroot/IndexTransfer_p.java +++ b/htroot/IndexTransfer_p.java @@ -111,9 +111,9 @@ public final class IndexTransfer_p { prop.put("running_transfer.twchunk", Integer.toString(chunk[1])); - //prop.put("running_twEntityCount",transfThread.getTransferedEntityCount()); + prop.put("running_twEntityCount",switchboard.transferIdxThread.getTransferedContainerCount()); prop.put("running_twEntryCount",switchboard.transferIdxThread.getTransferedEntryCount()); - //prop.put("running_twEntityPercent",Float.toString(transfThread.getTransferedEntityPercent())); + prop.put("running_twEntityPercent",Float.toString(switchboard.transferIdxThread.getTransferedContainerPercent())); prop.put("running_twEntitySpeed",Integer.toString(switchboard.transferIdxThread.getTransferedEntitySpeed())); prop.put("running_deleteIndex", switchboard.transferIdxThread.deleteIndex()?1:0); diff --git a/source/de/anomic/plasma/plasmaDHTChunk.java b/source/de/anomic/plasma/plasmaDHTChunk.java index 61154e205..7ef16a899 100644 --- a/source/de/anomic/plasma/plasmaDHTChunk.java +++ b/source/de/anomic/plasma/plasmaDHTChunk.java @@ -75,6 +75,9 @@ public class plasmaDHTChunk { private HashMap urlCache; // String (url-hash) / plasmaCrawlLURL.Entry private int idxCount; + private long selectionStartTime = 0; + private long selectionEndTime = 0; + public plasmaWordIndexEntryContainer firstContainer() { return indexContainers[0]; } @@ -161,15 +164,20 @@ public class plasmaDHTChunk { return startPointHash; } - private void selectTransferContainers(String hash, int mincount, int maxcount) { - int refcountRAM = selectTransferContainersResource(hash, plasmaWordIndex.RL_RAMCACHE, maxcount); - if (refcountRAM >= mincount) { - log.logFine("DHT selection from RAM: " + refcountRAM + " entries"); + private void selectTransferContainers(String hash, int mincount, int maxcount) { + try { + this.selectionStartTime = System.currentTimeMillis(); + int refcountRAM = selectTransferContainersResource(hash, plasmaWordIndex.RL_RAMCACHE, maxcount); + if (refcountRAM >= mincount) { + log.logFine("DHT selection from RAM: " + refcountRAM + " entries"); + return; + } + int refcountFile = selectTransferContainersResource(hash, plasmaWordIndex.RL_WORDFILES, maxcount); + log.logFine("DHT selection from FILE: " + refcountFile + " entries, RAM provided only " + refcountRAM + " entries"); return; + } finally { + this.selectionEndTime = System.currentTimeMillis(); } - int refcountFile = selectTransferContainersResource(hash, plasmaWordIndex.RL_WORDFILES, maxcount); - log.logFine("DHT selection from FILE: " + refcountFile + " entries, RAM provided only " + refcountRAM + " entries"); - return; } private int selectTransferContainersResource(String hash, int resourceLevel, int maxcount) { @@ -277,4 +285,8 @@ public class plasmaDHTChunk { return count; } + public long getSelectionTime() { + if (this.selectionStartTime == 0 || this.selectionEndTime == 0) return -1; + return this.selectionEndTime-this.selectionStartTime; + } } diff --git a/source/de/anomic/plasma/plasmaDHTFlush.java b/source/de/anomic/plasma/plasmaDHTFlush.java index 6cd41d9ec..d5a7a236c 100644 --- a/source/de/anomic/plasma/plasmaDHTFlush.java +++ b/source/de/anomic/plasma/plasmaDHTFlush.java @@ -70,14 +70,14 @@ public class plasmaDHTFlush extends Thread { this.seed = seed; this.delete = delete; this.sb = plasmaSwitchboard.getSwitchboard(); - this.initialWordsDBSize = sb.wordIndex.size(); + this.initialWordsDBSize = this.sb.wordIndex.size(); this.gzipBody4Transfer = gzipBody; this.timeout4Transfer = timeout; //this.maxOpenFiles4Transfer = (int) sb.getConfigLong("indexTransfer.maxOpenFiles",800); } public void run() { - performTransferWholeIndex(); + this.performTransferWholeIndex(); } public void stopIt(boolean wait) throws InterruptedException { @@ -110,17 +110,17 @@ public class plasmaDHTFlush extends Thread { } public float getTransferedContainerPercent() { - long currentWordsDBSize = sb.wordIndex.size(); - if (initialWordsDBSize == 0) return 100; - else if (currentWordsDBSize >= initialWordsDBSize) return 0; + long currentWordsDBSize = this.sb.wordIndex.size(); + if (this.initialWordsDBSize == 0) return 100; + else if (currentWordsDBSize >= this.initialWordsDBSize) return 0; //else return (float) ((initialWordsDBSize-currentWordsDBSize)/(initialWordsDBSize/100)); - else return (float)(this.transferedContainerCount*100/initialWordsDBSize); + else return (this.transferedContainerCount*100/this.initialWordsDBSize); } public int getTransferedEntitySpeed() { - long transferTime = System.currentTimeMillis() - startingTime; + long transferTime = System.currentTimeMillis() - this.startingTime; if (transferTime <= 0) transferTime = 1; - return (int) ((1000 * transferedEntryCount) / transferTime); + return (int) ((1000 * this.transferedEntryCount) / transferTime); } public yacySeed getSeed() { @@ -138,144 +138,132 @@ public class plasmaDHTFlush extends Thread { public String[] getRange() { plasmaDHTTransfer workerThread = this.worker; if (workerThread != null) { - return new String[]{"[" + oldStartingPointHash + ".." + startPointHash + "]", - "[" + workerThread.dhtChunk.firstContainer().hashCode() + ".." + workerThread.dhtChunk.lastContainer().hashCode() + "]"}; + return new String[]{"[" + this.oldStartingPointHash + ".." + this.startPointHash + "]", + "[" + workerThread.dhtChunk.firstContainer().wordHash() + ".." + workerThread.dhtChunk.lastContainer().wordHash() + "]"}; } - return new String[]{"[" + oldStartingPointHash + ".." + startPointHash + "]","[------------..------------]"}; + return new String[]{"[" + this.oldStartingPointHash + ".." + this.startPointHash + "]","[------------..------------]"}; } public void performTransferWholeIndex() { plasmaDHTChunk newDHTChunk = null, oldDHTChunk = null; try { - // pausing the regular index distribution - // TODO: adding sync, to wait for a still running index distribution to finish - //plasmaWordIndexDistribution.paused = true; - // initial startingpoint of intex transfer is "------------" - log.logFine("Selected hash " + startPointHash + " as start point for index distribution of whole index"); + this.log.logFine("Selected hash " + this.startPointHash + " as start point for index distribution of whole index"); /* Loop until we have * - finished transfer of whole index * - detected a server shutdown or user interruption * - detected a failure */ - long selectionStart = System.currentTimeMillis(), selectionEnd = 0, selectionTime = 0, iteration = 0; + long iteration = 0; - while (!finished && !Thread.currentThread().isInterrupted()) { + while (!this.finished && !Thread.currentThread().isInterrupted()) { iteration++; - selectionStart = System.currentTimeMillis(); oldDHTChunk = newDHTChunk; // selecting 500 words to transfer this.status = "Running: Selecting chunk " + iteration; - newDHTChunk = new plasmaDHTChunk(log, wordIndex, sb.urlPool.loadedURL, this.chunkSize/3, this.chunkSize, this.startPointHash); + newDHTChunk = new plasmaDHTChunk(this.log, this.wordIndex, this.sb.urlPool.loadedURL, this.chunkSize/3, this.chunkSize, this.startPointHash); /* If we havn't selected a word chunk this could be because of * a) no words are left in the index * b) max open file limit was exceeded */ - if ((newDHTChunk == null) || - (newDHTChunk.containerSize() == 0) || - (newDHTChunk.getStatus() == plasmaDHTChunk.chunkStatus_FAILED)) { - if (sb.wordIndex.size() > 0) { + if (nothingSelected(newDHTChunk)) { + if (this.sb.wordIndex.size() > 0) { // if there are still words in the index we try it again now - startPointHash = "------------"; + this.startPointHash = "------------"; } else { // otherwise we could end transfer now - log.logFine("No index available for index transfer, hash start-point " + startPointHash); + this.log.logFine("No index available for index transfer, hash start-point " + this.startPointHash); this.status = "Finished. " + iteration + " chunks transfered."; - finished = true; + this.finished = true; } } else { // getting start point for next DHT-selection - oldStartingPointHash = startPointHash; - startPointHash = newDHTChunk.lastContainer().wordHash(); // DHT targets must have greater hashes + this.oldStartingPointHash = this.startPointHash; + this.startPointHash = newDHTChunk.lastContainer().wordHash(); // DHT targets must have greater hashes - selectionEnd = System.currentTimeMillis(); - selectionTime = selectionEnd - selectionStart; - log.logInfo("Index selection of " + newDHTChunk.indexCount() + " words [" + newDHTChunk.firstContainer().wordHash() + " .. " + newDHTChunk.lastContainer().wordHash() + "]" + + this.log.logInfo("Index selection of " + newDHTChunk.indexCount() + " words [" + newDHTChunk.firstContainer().wordHash() + " .. " + newDHTChunk.lastContainer().wordHash() + "]" + " in " + - (selectionTime / 1000) + " seconds (" + - (1000 * newDHTChunk.indexCount() / (selectionTime+1)) + " words/s)"); + (newDHTChunk.getSelectionTime() / 1000) + " seconds (" + + (1000 * newDHTChunk.indexCount() / (newDHTChunk.getSelectionTime()+1)) + " words/s)"); } // query status of old worker thread - if (worker != null) { + if (this.worker != null) { this.status = "Finished: Selecting chunk " + iteration; - worker.join(); - if (worker.dhtChunk.getStatus() != plasmaDHTChunk.chunkStatus_COMPLETE) { + this.worker.join(); + if (this.worker.getStatus() != plasmaDHTChunk.chunkStatus_COMPLETE) { // if the transfer failed we abort index transfer now - this.status = "Aborted because of Transfer error:\n" + worker.dhtChunk.getStatus(); + this.status = "Aborted because of Transfer error:\n" + this.worker.dhtChunk.getStatus(); // abort index transfer return; - } else { - /* - * If index transfer was done successfully we close all remaining open - * files that belong to the old index chunk and handover a new chunk - * to the transfer thread. - * Addintionally we recalculate the chunk size to optimize performance - */ - - this.chunkSize = worker.dhtChunk.indexCount(); - long transferTime = worker.getTransferTime(); - //TODO: only increase chunk Size if there is free memory left on the server - - // we need aprox. 73Byte per IndexEntity and an average URL length of 32 char - //if (ft.freeMemory() < 73*2*100) - if (transferTime > 60*1000) { - if (chunkSize>200) chunkSize-=100; - } else if (selectionTime < transferTime){ - this.chunkSize +=100; - //chunkSize+=50; - } else if (selectionTime >= selectionTime){ - if (chunkSize>200) chunkSize-=100; - } - - selectionStart = System.currentTimeMillis(); - - // deleting transfered words from index - if (delete) { - this.status = "Running: Deleting chunk " + iteration; - transferedEntryCount += oldDHTChunk.indexCount(); - transferedContainerCount += oldDHTChunk.containerSize(); - int urlReferences = oldDHTChunk.deleteTransferIndexes(); - log.logFine("Deleted from " + oldDHTChunk.containerSize() + " transferred RWIs locally " + urlReferences + " URL references"); - } else { - transferedEntryCount += oldDHTChunk.indexCount(); - transferedContainerCount += oldDHTChunk.containerSize(); - } - oldDHTChunk = null; } + + // calculationg the new transfer size + this.calculateNewChunkSize(); this.worker = null; + + // counting transfered containers / entries + this.transferedEntryCount += oldDHTChunk.indexCount(); + this.transferedContainerCount += oldDHTChunk.containerSize(); + + // deleting transfered words from index + if (this.delete) { + this.status = "Running: Deleting chunk " + iteration; + int urlReferences = oldDHTChunk.deleteTransferIndexes(); + this.log.logFine("Deleted from " + oldDHTChunk.containerSize() + " transferred RWIs locally " + urlReferences + " URL references"); + } + oldDHTChunk = null; } // handover chunk to transfer worker - if ((newDHTChunk != null) && - (newDHTChunk.containerSize() > 0) || - (newDHTChunk.getStatus() == plasmaDHTChunk.chunkStatus_FILLED)) { - worker = new plasmaDHTTransfer(log, seed, newDHTChunk, gzipBody4Transfer, timeout4Transfer, 5); - worker.start(); + if ((newDHTChunk.containerSize() > 0) || (newDHTChunk.getStatus() == plasmaDHTChunk.chunkStatus_FILLED)) { + this.worker = new plasmaDHTTransfer(this.log, this.seed, newDHTChunk, this.gzipBody4Transfer, this.timeout4Transfer, 5); + this.worker.start(); } } // if we reach this point we were aborted by the user or by server shutdown - if (sb.wordIndex.size() > 0) this.status = "aborted"; + if (this.sb.wordIndex.size() > 0) this.status = "aborted"; } catch (Exception e) { this.status = "Error: " + e.getMessage(); - log.logWarning("Index transfer to peer " + seed.getName() + ":" + seed.hash + " failed:'" + e.getMessage() + "'",e); + this.log.logWarning("Index transfer to peer " + this.seed.getName() + ":" + this.seed.hash + " failed:'" + e.getMessage() + "'",e); } finally { - if (worker != null) { - worker.stopIt(); - try {worker.join();}catch(Exception e){} - // worker = null; + if (this.worker != null) { + this.worker.stopIt(); + try {this.worker.join();}catch(Exception e){} } - - //plasmaWordIndexDistribution.paused = false; } } - + private void calculateNewChunkSize() { + // getting the transfered chunk size + this.chunkSize = this.worker.dhtChunk.indexCount(); + + // getting the chunk selection time + long selectionTime = this.worker.dhtChunk.getSelectionTime(); + + // getting the chunk transfer time + long transferTime = this.worker.getTransferTime(); + + // calculationg the new chunk size + if (transferTime > 60*1000 && this.chunkSize>200) { + this.chunkSize-=100; + } else if (selectionTime < transferTime){ + this.chunkSize +=100; + } else if (selectionTime >= selectionTime && this.chunkSize>200){ + this.chunkSize-=100; + } + } + + private static boolean nothingSelected(plasmaDHTChunk newDHTChunk) { + return (newDHTChunk == null) || + (newDHTChunk.containerSize() == 0) || + (newDHTChunk.getStatus() == plasmaDHTChunk.chunkStatus_FAILED); + } } diff --git a/source/de/anomic/plasma/plasmaDHTTransfer.java b/source/de/anomic/plasma/plasmaDHTTransfer.java index f7326092f..bbfc0e4f2 100644 --- a/source/de/anomic/plasma/plasmaDHTTransfer.java +++ b/source/de/anomic/plasma/plasmaDHTTransfer.java @@ -60,8 +60,6 @@ public class plasmaDHTTransfer extends Thread { private String transferStatusMessage = ""; // delivery destination - private yacySeed [] seeds = null; - private static int seedcount = 0; private yacySeed seed = null; // word chunk @@ -71,41 +69,26 @@ public class plasmaDHTTransfer extends Thread { private int maxRetry; serverLog log; - public plasmaDHTTransfer(serverLog log, yacySeed seed, plasmaDHTChunk dhtChunk, boolean gzipBody, int timeout, int retries) { - super(new ThreadGroup("TransferIndexThreadGroup"), "TransferIndexWorker_" + seed.getName()); + public plasmaDHTTransfer(serverLog log, yacySeed destSeed, plasmaDHTChunk dhtChunk, boolean gzipBody, int timeout, int retries) { + super(new ThreadGroup("TransferIndexThreadGroup"), "TransferIndexWorker_" + destSeed.getName()); this.log = log; this.gzipBody4Transfer = gzipBody; this.timeout4Transfer = timeout; this.dhtChunk = dhtChunk; this.maxRetry = retries; - seeds = new yacySeed[1]; - seeds[0] = seed; - } - - public plasmaDHTTransfer(serverLog log, yacySeed [] seeds, plasmaDHTChunk dhtChunk, boolean gzipBody, int timeout, int retries) { - super(new ThreadGroup("TransferIndexThreadGroup"), "TransferIndexWorker_" + seedcount); - this.log = log; - this.gzipBody4Transfer = gzipBody; - this.timeout4Transfer = timeout; - this.dhtChunk = dhtChunk; - this.maxRetry = retries; - this.seeds = seeds; + this.seed = destSeed; } public void run() { - while (getStatus() != plasmaDHTChunk.chunkStatus_COMPLETE && seedcount < seeds.length)try { - seed = seeds[seedcount++]; - uploadIndex(); + try { + this.uploadIndex(); } catch (InterruptedException e) { - // TODO Auto-generated catch block e.printStackTrace(); - } finally { - } } private boolean isAborted() { - if (stopped || Thread.currentThread().isInterrupted()) { + if (this.stopped || Thread.currentThread().isInterrupted()) { return true; } return false; @@ -120,11 +103,15 @@ public class plasmaDHTTransfer extends Thread { } public int getStatus() { - return transferStatus; + return this.transferStatus; } public String getStatusMessage() { - return transferStatusMessage; + return this.transferStatusMessage; + } + + public yacySeed getSeed() { + return this.seed; } public void uploadIndex() throws InterruptedException { @@ -133,78 +120,68 @@ public class plasmaDHTTransfer extends Thread { * - have successfully transfered the words list or * - the retry counter limit was exceeded */ - transferStatus = plasmaDHTChunk.chunkStatus_RUNNING; + this.transferStatus = plasmaDHTChunk.chunkStatus_RUNNING; long retryCount = 0, start = System.currentTimeMillis(); while (true) { // testing if we were aborted - if (isAborted()) return; + if (this.isAborted()) return; // transfering seleted words to remote peer - transferStatusMessage = "Running: Transfering chunk to target " + seed.hash + "/" + seed.getName(); - String error = yacyClient.transferIndex(seed, dhtChunk.containers(), dhtChunk.urlCacheMap(), gzipBody4Transfer, timeout4Transfer); + this.transferStatusMessage = "Running: Transfering chunk to target " + this.seed.hash + "/" + this.seed.getName(); + String error = yacyClient.transferIndex(this.seed, this.dhtChunk.containers(), this.dhtChunk.urlCacheMap(), this.gzipBody4Transfer, this.timeout4Transfer); if (error == null) { // words successfully transfered - transferTime = System.currentTimeMillis() - start; - this.log.logInfo("Index transfer of " + dhtChunk.indexCount() + " words [" + dhtChunk.firstContainer().wordHash() + " .. " + dhtChunk.lastContainer().wordHash() + "]" + " to peer " + seed.getName() + ":" + seed.hash + " in " + (transferTime / 1000) + " seconds successful (" - + (1000 * dhtChunk.indexCount() / (transferTime + 1)) + " words/s)"); + this.transferTime = System.currentTimeMillis() - start; + this.log.logInfo("Index transfer of " + this.dhtChunk.indexCount() + " words [" + this.dhtChunk.firstContainer().wordHash() + " .. " + this.dhtChunk.lastContainer().wordHash() + "]" + " to peer " + this.seed.getName() + ":" + this.seed.hash + " in " + (this.transferTime / 1000) + " seconds successful (" + + (1000 * this.dhtChunk.indexCount() / (this.transferTime + 1)) + " words/s)"); retryCount = 0; - transferStatusMessage = "Finished: Transfer of chunk to target " + seed.hash + "/" + seed.getName(); - transferStatus = plasmaDHTChunk.chunkStatus_COMPLETE; + this.transferStatusMessage = "Finished: Transfer of chunk to target " + this.seed.hash + "/" + this.seed.getName(); + this.transferStatus = plasmaDHTChunk.chunkStatus_COMPLETE; break; - } else { - // words transfer failed - - // inc retry counter - retryCount++; + } + + // inc retry counter + retryCount++; + + // testing if we were aborted ... + if (this.isAborted()) return; + + // we have lost the connection to the remote peer. Adding peer to disconnected list + this.log.logWarning("Index transfer to peer " + this.seed.getName() + ":" + this.seed.hash + " failed:'" + error + "', disconnecting peer"); + yacyCore.peerActions.peerDeparture(this.seed); + + // if the retry counter limit was not exceeded we'll retry it in a few seconds + this.transferStatusMessage = "Disconnected peer: " + ((retryCount > 5) ? error + ". Transfer aborted" : "Retry " + retryCount); + if (retryCount > this.maxRetry) { + this.transferStatus = plasmaDHTChunk.chunkStatus_FAILED; + return; + } + Thread.sleep(retryCount * 5000); + /* loop until + * - we have successfully done a peer ping or + * - the retry counter limit was exceeded + */ + while (true) { // testing if we were aborted ... - if (isAborted()) return; - - // we have lost the connection to the remote peer. Adding peer to disconnected list - this.log.logWarning("Index transfer to peer " + seed.getName() + ":" + seed.hash + " failed:'" + error + "', disconnecting peer"); - yacyCore.peerActions.peerDeparture(seed); - - // if the retry counter limit was not exceeded we'll retry it in a few seconds - transferStatusMessage = "Disconnected peer: " + ((retryCount > 5) ? error + ". Transfer aborted" : "Retry " + retryCount); - if (retryCount > maxRetry) { - transferStatus = plasmaDHTChunk.chunkStatus_FAILED; + if (this.isAborted()) return; + + // doing a peer ping to the remote seed + int added = yacyClient.publishMySeed(this.seed.getAddress(), this.seed.hash); + if (added < 0) { + // inc. retry counter + retryCount++; + this.transferStatusMessage = "Disconnected peer: Peer ping failed. " + ((retryCount > 5) ? "Transfer aborted." : "Retry " + retryCount); + if (retryCount > this.maxRetry) return; + Thread.sleep(retryCount * 5000); + continue; } - Thread.sleep(retryCount * 5000); - - /* loop until - * - we have successfully done a peer ping or - * - the retry counter limit was exceeded - */ - while (true) { - // testing if we were aborted ... - if (isAborted()) - return; - - // doing a peer ping to the remote seed - int added = yacyClient.publishMySeed(seed.getAddress(), seed.hash); - if (added < 0) { - // inc. retry counter - retryCount++; - transferStatusMessage = "Disconnected peer: Peer ping failed. " + ((retryCount > 5) ? "Transfer aborted." : "Retry " + retryCount); - if (retryCount > maxRetry) return; - Thread.sleep(retryCount * 5000); - continue; - } else { - yacyCore.seedDB.getConnected(seed.hash); - transferStatusMessage = "running"; - break; - } - } + + yacyCore.seedDB.getConnected(this.seed.hash); + this.transferStatusMessage = "running"; + break; } } } - - public static void setSeedcount(int seedcount) { - plasmaDHTTransfer.seedcount = seedcount; - } - - public static int getSeedcount() { - return seedcount; - } } diff --git a/source/de/anomic/plasma/plasmaSwitchboard.java b/source/de/anomic/plasma/plasmaSwitchboard.java index 766b5c9eb..76c5f58e1 100644 --- a/source/de/anomic/plasma/plasmaSwitchboard.java +++ b/source/de/anomic/plasma/plasmaSwitchboard.java @@ -111,6 +111,7 @@ import java.net.URL; import java.net.URLEncoder; import java.text.SimpleDateFormat; import java.util.ArrayList; +import java.util.Arrays; import java.util.Date; import java.util.HashMap; import java.util.Hashtable; @@ -2138,56 +2139,69 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser public boolean dhtTransferProcess(plasmaDHTChunk dhtChunk, int peerCount) { if ((yacyCore.seedDB == null) || (yacyCore.seedDB.sizeConnected() == 0)) return false; - // find a list of DHT-peers - yacySeed[] seeds = yacyCore.dhtAgent.getDHTTargets(log, peerCount, 10, dhtChunk.firstContainer().wordHash(), dhtChunk.lastContainer().wordHash(), 0.4); + try { + // find a list of DHT-peers + ArrayList seeds = new ArrayList(Arrays.asList(yacyCore.dhtAgent.getDHTTargets(log, peerCount, 10, dhtChunk.firstContainer().wordHash(), dhtChunk.lastContainer().wordHash(), 0.4))); + if (seeds.size() < peerCount) { + log.logWarning("found not enough (" + seeds.size() + ") peers for distribution"); + return false; + } - if (seeds.length < peerCount) { - log.logWarning("found not enough (" + seeds.length + ") peers for distribution"); - return false; - } + // send away the indexes to all these peers + int hc1 = 0; - // send away the indexes to all these peers - String peerNames = ""; - int hc1 = 0; - plasmaDHTTransfer.setSeedcount(0); - plasmaDHTTransfer [] transfer = new plasmaDHTTransfer[peerCount]; - for (int i = 0; i < transfer.length; i++) { - transfer[i] = new plasmaDHTTransfer(log, seeds, dhtChunk, - getConfig("indexDistribution.gzipBody","false").equalsIgnoreCase("true"), - (int)getConfigLong("indexDistribution.timeout",60000), 0); - transfer[i].start(); - } - - boolean DHTalive = true; - while(DHTalive) { - DHTalive = false; - try { - Thread.sleep(100); - } catch (InterruptedException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - for (int i = 0; i < transfer.length; i++) { - if (transfer[i].isAlive()) DHTalive = true; - } - } - - for (int i = 0; i < transfer.length; i++) { - if (transfer[i].getStatus() == plasmaDHTChunk.chunkStatus_COMPLETE) { - peerNames += ", " + seeds[i].getName(); - hc1++; + // getting distribution configuration values + boolean gzipBody = getConfig("indexDistribution.gzipBody","false").equalsIgnoreCase("true"); + int timeout = (int)getConfigLong("indexDistribution.timeout",60000); + int retries = 0; + + // starting up multiple DHT transfer threads + Iterator seedIter = seeds.iterator(); + ArrayList transfer = new ArrayList(peerCount); + while (hc1 < peerCount && seedIter.hasNext()) { + + // starting up some transfer threads + int transferThreadCount = transfer.size(); + for (int i=0; i < peerCount-hc1-transferThreadCount; i++) { + if (seedIter.hasNext()) { + plasmaDHTTransfer t = new plasmaDHTTransfer(log, (yacySeed)seedIter.next(), dhtChunk,gzipBody,timeout,retries); + t.start(); + transfer.add(t); + } else { + break; + } + } + + // waiting for the transfer threads to finish + Iterator transferIter = transfer.iterator(); + while (transferIter.hasNext()) { + plasmaDHTTransfer t = (plasmaDHTTransfer)transferIter.next(); + if (!t.isAlive()) { + // remove finished thread from the list + transferIter.remove(); + + // count successful transfers + if (t.getStatus() == plasmaDHTChunk.chunkStatus_COMPLETE) { + this.log.logInfo("DHT distribution: transfer to peer " + t.getSeed().getName() + " finished."); + hc1++; + } + } + } + + if (hc1 < peerCount) Thread.sleep(100); } - } - if (peerNames.length() > 0) peerNames = peerNames.substring(2); // remove comma - // clean up and finish with deletion of indexes - if (hc1 >= peerCount) { - // success - return true; + // clean up and finish with deletion of indexes + if (hc1 >= peerCount) { + // success + return true; + } + this.log.logSevere("Index distribution failed. Too few peers (" + hc1 + ") received the index, not deleted locally."); + return false; + } catch (InterruptedException e) { + return false; } - this.log.logSevere("Index distribution failed. Too few peers (" + hc1 + ") received the index, not deleted locally."); - return false; } public void terminate() {