*) Bugfix for Index Transfer

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@2088 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
theli 19 years ago
parent fb064fdbee
commit bcc950c533

@ -111,9 +111,9 @@ public final class IndexTransfer_p {
prop.put("running_transfer.twchunk", Integer.toString(chunk[1])); prop.put("running_transfer.twchunk", Integer.toString(chunk[1]));
//prop.put("running_twEntityCount",transfThread.getTransferedEntityCount()); prop.put("running_twEntityCount",switchboard.transferIdxThread.getTransferedContainerCount());
prop.put("running_twEntryCount",switchboard.transferIdxThread.getTransferedEntryCount()); prop.put("running_twEntryCount",switchboard.transferIdxThread.getTransferedEntryCount());
//prop.put("running_twEntityPercent",Float.toString(transfThread.getTransferedEntityPercent())); prop.put("running_twEntityPercent",Float.toString(switchboard.transferIdxThread.getTransferedContainerPercent()));
prop.put("running_twEntitySpeed",Integer.toString(switchboard.transferIdxThread.getTransferedEntitySpeed())); prop.put("running_twEntitySpeed",Integer.toString(switchboard.transferIdxThread.getTransferedEntitySpeed()));
prop.put("running_deleteIndex", switchboard.transferIdxThread.deleteIndex()?1:0); prop.put("running_deleteIndex", switchboard.transferIdxThread.deleteIndex()?1:0);

@ -75,6 +75,9 @@ public class plasmaDHTChunk {
private HashMap urlCache; // String (url-hash) / plasmaCrawlLURL.Entry private HashMap urlCache; // String (url-hash) / plasmaCrawlLURL.Entry
private int idxCount; private int idxCount;
private long selectionStartTime = 0;
private long selectionEndTime = 0;
public plasmaWordIndexEntryContainer firstContainer() { public plasmaWordIndexEntryContainer firstContainer() {
return indexContainers[0]; return indexContainers[0];
} }
@ -161,15 +164,20 @@ public class plasmaDHTChunk {
return startPointHash; return startPointHash;
} }
private void selectTransferContainers(String hash, int mincount, int maxcount) { private void selectTransferContainers(String hash, int mincount, int maxcount) {
int refcountRAM = selectTransferContainersResource(hash, plasmaWordIndex.RL_RAMCACHE, maxcount); try {
if (refcountRAM >= mincount) { this.selectionStartTime = System.currentTimeMillis();
log.logFine("DHT selection from RAM: " + refcountRAM + " entries"); int refcountRAM = selectTransferContainersResource(hash, plasmaWordIndex.RL_RAMCACHE, maxcount);
if (refcountRAM >= mincount) {
log.logFine("DHT selection from RAM: " + refcountRAM + " entries");
return;
}
int refcountFile = selectTransferContainersResource(hash, plasmaWordIndex.RL_WORDFILES, maxcount);
log.logFine("DHT selection from FILE: " + refcountFile + " entries, RAM provided only " + refcountRAM + " entries");
return; return;
} finally {
this.selectionEndTime = System.currentTimeMillis();
} }
int refcountFile = selectTransferContainersResource(hash, plasmaWordIndex.RL_WORDFILES, maxcount);
log.logFine("DHT selection from FILE: " + refcountFile + " entries, RAM provided only " + refcountRAM + " entries");
return;
} }
private int selectTransferContainersResource(String hash, int resourceLevel, int maxcount) { private int selectTransferContainersResource(String hash, int resourceLevel, int maxcount) {
@ -277,4 +285,8 @@ public class plasmaDHTChunk {
return count; return count;
} }
public long getSelectionTime() {
if (this.selectionStartTime == 0 || this.selectionEndTime == 0) return -1;
return this.selectionEndTime-this.selectionStartTime;
}
} }

@ -70,14 +70,14 @@ public class plasmaDHTFlush extends Thread {
this.seed = seed; this.seed = seed;
this.delete = delete; this.delete = delete;
this.sb = plasmaSwitchboard.getSwitchboard(); this.sb = plasmaSwitchboard.getSwitchboard();
this.initialWordsDBSize = sb.wordIndex.size(); this.initialWordsDBSize = this.sb.wordIndex.size();
this.gzipBody4Transfer = gzipBody; this.gzipBody4Transfer = gzipBody;
this.timeout4Transfer = timeout; this.timeout4Transfer = timeout;
//this.maxOpenFiles4Transfer = (int) sb.getConfigLong("indexTransfer.maxOpenFiles",800); //this.maxOpenFiles4Transfer = (int) sb.getConfigLong("indexTransfer.maxOpenFiles",800);
} }
public void run() { public void run() {
performTransferWholeIndex(); this.performTransferWholeIndex();
} }
public void stopIt(boolean wait) throws InterruptedException { public void stopIt(boolean wait) throws InterruptedException {
@ -110,17 +110,17 @@ public class plasmaDHTFlush extends Thread {
} }
public float getTransferedContainerPercent() { public float getTransferedContainerPercent() {
long currentWordsDBSize = sb.wordIndex.size(); long currentWordsDBSize = this.sb.wordIndex.size();
if (initialWordsDBSize == 0) return 100; if (this.initialWordsDBSize == 0) return 100;
else if (currentWordsDBSize >= initialWordsDBSize) return 0; else if (currentWordsDBSize >= this.initialWordsDBSize) return 0;
//else return (float) ((initialWordsDBSize-currentWordsDBSize)/(initialWordsDBSize/100)); //else return (float) ((initialWordsDBSize-currentWordsDBSize)/(initialWordsDBSize/100));
else return (float)(this.transferedContainerCount*100/initialWordsDBSize); else return (this.transferedContainerCount*100/this.initialWordsDBSize);
} }
public int getTransferedEntitySpeed() { public int getTransferedEntitySpeed() {
long transferTime = System.currentTimeMillis() - startingTime; long transferTime = System.currentTimeMillis() - this.startingTime;
if (transferTime <= 0) transferTime = 1; if (transferTime <= 0) transferTime = 1;
return (int) ((1000 * transferedEntryCount) / transferTime); return (int) ((1000 * this.transferedEntryCount) / transferTime);
} }
public yacySeed getSeed() { public yacySeed getSeed() {
@ -138,144 +138,132 @@ public class plasmaDHTFlush extends Thread {
public String[] getRange() { public String[] getRange() {
plasmaDHTTransfer workerThread = this.worker; plasmaDHTTransfer workerThread = this.worker;
if (workerThread != null) { if (workerThread != null) {
return new String[]{"[" + oldStartingPointHash + ".." + startPointHash + "]", return new String[]{"[" + this.oldStartingPointHash + ".." + this.startPointHash + "]",
"[" + workerThread.dhtChunk.firstContainer().hashCode() + ".." + workerThread.dhtChunk.lastContainer().hashCode() + "]"}; "[" + workerThread.dhtChunk.firstContainer().wordHash() + ".." + workerThread.dhtChunk.lastContainer().wordHash() + "]"};
} }
return new String[]{"[" + oldStartingPointHash + ".." + startPointHash + "]","[------------..------------]"}; return new String[]{"[" + this.oldStartingPointHash + ".." + this.startPointHash + "]","[------------..------------]"};
} }
public void performTransferWholeIndex() { public void performTransferWholeIndex() {
plasmaDHTChunk newDHTChunk = null, oldDHTChunk = null; plasmaDHTChunk newDHTChunk = null, oldDHTChunk = null;
try { try {
// pausing the regular index distribution
// TODO: adding sync, to wait for a still running index distribution to finish
//plasmaWordIndexDistribution.paused = true;
// initial startingpoint of intex transfer is "------------" // initial startingpoint of intex transfer is "------------"
log.logFine("Selected hash " + startPointHash + " as start point for index distribution of whole index"); this.log.logFine("Selected hash " + this.startPointHash + " as start point for index distribution of whole index");
/* Loop until we have /* Loop until we have
* - finished transfer of whole index * - finished transfer of whole index
* - detected a server shutdown or user interruption * - detected a server shutdown or user interruption
* - detected a failure * - detected a failure
*/ */
long selectionStart = System.currentTimeMillis(), selectionEnd = 0, selectionTime = 0, iteration = 0; long iteration = 0;
while (!finished && !Thread.currentThread().isInterrupted()) { while (!this.finished && !Thread.currentThread().isInterrupted()) {
iteration++; iteration++;
selectionStart = System.currentTimeMillis();
oldDHTChunk = newDHTChunk; oldDHTChunk = newDHTChunk;
// selecting 500 words to transfer // selecting 500 words to transfer
this.status = "Running: Selecting chunk " + iteration; this.status = "Running: Selecting chunk " + iteration;
newDHTChunk = new plasmaDHTChunk(log, wordIndex, sb.urlPool.loadedURL, this.chunkSize/3, this.chunkSize, this.startPointHash); newDHTChunk = new plasmaDHTChunk(this.log, this.wordIndex, this.sb.urlPool.loadedURL, this.chunkSize/3, this.chunkSize, this.startPointHash);
/* If we havn't selected a word chunk this could be because of /* If we havn't selected a word chunk this could be because of
* a) no words are left in the index * a) no words are left in the index
* b) max open file limit was exceeded * b) max open file limit was exceeded
*/ */
if ((newDHTChunk == null) || if (nothingSelected(newDHTChunk)) {
(newDHTChunk.containerSize() == 0) || if (this.sb.wordIndex.size() > 0) {
(newDHTChunk.getStatus() == plasmaDHTChunk.chunkStatus_FAILED)) {
if (sb.wordIndex.size() > 0) {
// if there are still words in the index we try it again now // if there are still words in the index we try it again now
startPointHash = "------------"; this.startPointHash = "------------";
} else { } else {
// otherwise we could end transfer now // otherwise we could end transfer now
log.logFine("No index available for index transfer, hash start-point " + startPointHash); this.log.logFine("No index available for index transfer, hash start-point " + this.startPointHash);
this.status = "Finished. " + iteration + " chunks transfered."; this.status = "Finished. " + iteration + " chunks transfered.";
finished = true; this.finished = true;
} }
} else { } else {
// getting start point for next DHT-selection // getting start point for next DHT-selection
oldStartingPointHash = startPointHash; this.oldStartingPointHash = this.startPointHash;
startPointHash = newDHTChunk.lastContainer().wordHash(); // DHT targets must have greater hashes this.startPointHash = newDHTChunk.lastContainer().wordHash(); // DHT targets must have greater hashes
selectionEnd = System.currentTimeMillis(); this.log.logInfo("Index selection of " + newDHTChunk.indexCount() + " words [" + newDHTChunk.firstContainer().wordHash() + " .. " + newDHTChunk.lastContainer().wordHash() + "]" +
selectionTime = selectionEnd - selectionStart;
log.logInfo("Index selection of " + newDHTChunk.indexCount() + " words [" + newDHTChunk.firstContainer().wordHash() + " .. " + newDHTChunk.lastContainer().wordHash() + "]" +
" in " + " in " +
(selectionTime / 1000) + " seconds (" + (newDHTChunk.getSelectionTime() / 1000) + " seconds (" +
(1000 * newDHTChunk.indexCount() / (selectionTime+1)) + " words/s)"); (1000 * newDHTChunk.indexCount() / (newDHTChunk.getSelectionTime()+1)) + " words/s)");
} }
// query status of old worker thread // query status of old worker thread
if (worker != null) { if (this.worker != null) {
this.status = "Finished: Selecting chunk " + iteration; this.status = "Finished: Selecting chunk " + iteration;
worker.join(); this.worker.join();
if (worker.dhtChunk.getStatus() != plasmaDHTChunk.chunkStatus_COMPLETE) { if (this.worker.getStatus() != plasmaDHTChunk.chunkStatus_COMPLETE) {
// if the transfer failed we abort index transfer now // if the transfer failed we abort index transfer now
this.status = "Aborted because of Transfer error:\n" + worker.dhtChunk.getStatus(); this.status = "Aborted because of Transfer error:\n" + this.worker.dhtChunk.getStatus();
// abort index transfer // abort index transfer
return; return;
} else {
/*
* If index transfer was done successfully we close all remaining open
* files that belong to the old index chunk and handover a new chunk
* to the transfer thread.
* Addintionally we recalculate the chunk size to optimize performance
*/
this.chunkSize = worker.dhtChunk.indexCount();
long transferTime = worker.getTransferTime();
//TODO: only increase chunk Size if there is free memory left on the server
// we need aprox. 73Byte per IndexEntity and an average URL length of 32 char
//if (ft.freeMemory() < 73*2*100)
if (transferTime > 60*1000) {
if (chunkSize>200) chunkSize-=100;
} else if (selectionTime < transferTime){
this.chunkSize +=100;
//chunkSize+=50;
} else if (selectionTime >= selectionTime){
if (chunkSize>200) chunkSize-=100;
}
selectionStart = System.currentTimeMillis();
// deleting transfered words from index
if (delete) {
this.status = "Running: Deleting chunk " + iteration;
transferedEntryCount += oldDHTChunk.indexCount();
transferedContainerCount += oldDHTChunk.containerSize();
int urlReferences = oldDHTChunk.deleteTransferIndexes();
log.logFine("Deleted from " + oldDHTChunk.containerSize() + " transferred RWIs locally " + urlReferences + " URL references");
} else {
transferedEntryCount += oldDHTChunk.indexCount();
transferedContainerCount += oldDHTChunk.containerSize();
}
oldDHTChunk = null;
} }
// calculationg the new transfer size
this.calculateNewChunkSize();
this.worker = null; this.worker = null;
// counting transfered containers / entries
this.transferedEntryCount += oldDHTChunk.indexCount();
this.transferedContainerCount += oldDHTChunk.containerSize();
// deleting transfered words from index
if (this.delete) {
this.status = "Running: Deleting chunk " + iteration;
int urlReferences = oldDHTChunk.deleteTransferIndexes();
this.log.logFine("Deleted from " + oldDHTChunk.containerSize() + " transferred RWIs locally " + urlReferences + " URL references");
}
oldDHTChunk = null;
} }
// handover chunk to transfer worker // handover chunk to transfer worker
if ((newDHTChunk != null) && if ((newDHTChunk.containerSize() > 0) || (newDHTChunk.getStatus() == plasmaDHTChunk.chunkStatus_FILLED)) {
(newDHTChunk.containerSize() > 0) || this.worker = new plasmaDHTTransfer(this.log, this.seed, newDHTChunk, this.gzipBody4Transfer, this.timeout4Transfer, 5);
(newDHTChunk.getStatus() == plasmaDHTChunk.chunkStatus_FILLED)) { this.worker.start();
worker = new plasmaDHTTransfer(log, seed, newDHTChunk, gzipBody4Transfer, timeout4Transfer, 5);
worker.start();
} }
} }
// if we reach this point we were aborted by the user or by server shutdown // if we reach this point we were aborted by the user or by server shutdown
if (sb.wordIndex.size() > 0) this.status = "aborted"; if (this.sb.wordIndex.size() > 0) this.status = "aborted";
} catch (Exception e) { } catch (Exception e) {
this.status = "Error: " + e.getMessage(); this.status = "Error: " + e.getMessage();
log.logWarning("Index transfer to peer " + seed.getName() + ":" + seed.hash + " failed:'" + e.getMessage() + "'",e); this.log.logWarning("Index transfer to peer " + this.seed.getName() + ":" + this.seed.hash + " failed:'" + e.getMessage() + "'",e);
} finally { } finally {
if (worker != null) { if (this.worker != null) {
worker.stopIt(); this.worker.stopIt();
try {worker.join();}catch(Exception e){} try {this.worker.join();}catch(Exception e){}
// worker = null;
} }
//plasmaWordIndexDistribution.paused = false;
} }
} }
private void calculateNewChunkSize() {
// getting the transfered chunk size
this.chunkSize = this.worker.dhtChunk.indexCount();
// getting the chunk selection time
long selectionTime = this.worker.dhtChunk.getSelectionTime();
// getting the chunk transfer time
long transferTime = this.worker.getTransferTime();
// calculationg the new chunk size
if (transferTime > 60*1000 && this.chunkSize>200) {
this.chunkSize-=100;
} else if (selectionTime < transferTime){
this.chunkSize +=100;
} else if (selectionTime >= selectionTime && this.chunkSize>200){
this.chunkSize-=100;
}
}
private static boolean nothingSelected(plasmaDHTChunk newDHTChunk) {
return (newDHTChunk == null) ||
(newDHTChunk.containerSize() == 0) ||
(newDHTChunk.getStatus() == plasmaDHTChunk.chunkStatus_FAILED);
}
} }

@ -60,8 +60,6 @@ public class plasmaDHTTransfer extends Thread {
private String transferStatusMessage = ""; private String transferStatusMessage = "";
// delivery destination // delivery destination
private yacySeed [] seeds = null;
private static int seedcount = 0;
private yacySeed seed = null; private yacySeed seed = null;
// word chunk // word chunk
@ -71,41 +69,26 @@ public class plasmaDHTTransfer extends Thread {
private int maxRetry; private int maxRetry;
serverLog log; serverLog log;
public plasmaDHTTransfer(serverLog log, yacySeed seed, plasmaDHTChunk dhtChunk, boolean gzipBody, int timeout, int retries) { public plasmaDHTTransfer(serverLog log, yacySeed destSeed, plasmaDHTChunk dhtChunk, boolean gzipBody, int timeout, int retries) {
super(new ThreadGroup("TransferIndexThreadGroup"), "TransferIndexWorker_" + seed.getName()); super(new ThreadGroup("TransferIndexThreadGroup"), "TransferIndexWorker_" + destSeed.getName());
this.log = log; this.log = log;
this.gzipBody4Transfer = gzipBody; this.gzipBody4Transfer = gzipBody;
this.timeout4Transfer = timeout; this.timeout4Transfer = timeout;
this.dhtChunk = dhtChunk; this.dhtChunk = dhtChunk;
this.maxRetry = retries; this.maxRetry = retries;
seeds = new yacySeed[1]; this.seed = destSeed;
seeds[0] = seed;
}
public plasmaDHTTransfer(serverLog log, yacySeed [] seeds, plasmaDHTChunk dhtChunk, boolean gzipBody, int timeout, int retries) {
super(new ThreadGroup("TransferIndexThreadGroup"), "TransferIndexWorker_" + seedcount);
this.log = log;
this.gzipBody4Transfer = gzipBody;
this.timeout4Transfer = timeout;
this.dhtChunk = dhtChunk;
this.maxRetry = retries;
this.seeds = seeds;
} }
public void run() { public void run() {
while (getStatus() != plasmaDHTChunk.chunkStatus_COMPLETE && seedcount < seeds.length)try { try {
seed = seeds[seedcount++]; this.uploadIndex();
uploadIndex();
} catch (InterruptedException e) { } catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace(); e.printStackTrace();
} finally {
} }
} }
private boolean isAborted() { private boolean isAborted() {
if (stopped || Thread.currentThread().isInterrupted()) { if (this.stopped || Thread.currentThread().isInterrupted()) {
return true; return true;
} }
return false; return false;
@ -120,11 +103,15 @@ public class plasmaDHTTransfer extends Thread {
} }
public int getStatus() { public int getStatus() {
return transferStatus; return this.transferStatus;
} }
public String getStatusMessage() { public String getStatusMessage() {
return transferStatusMessage; return this.transferStatusMessage;
}
public yacySeed getSeed() {
return this.seed;
} }
public void uploadIndex() throws InterruptedException { public void uploadIndex() throws InterruptedException {
@ -133,78 +120,68 @@ public class plasmaDHTTransfer extends Thread {
* - have successfully transfered the words list or * - have successfully transfered the words list or
* - the retry counter limit was exceeded * - the retry counter limit was exceeded
*/ */
transferStatus = plasmaDHTChunk.chunkStatus_RUNNING; this.transferStatus = plasmaDHTChunk.chunkStatus_RUNNING;
long retryCount = 0, start = System.currentTimeMillis(); long retryCount = 0, start = System.currentTimeMillis();
while (true) { while (true) {
// testing if we were aborted // testing if we were aborted
if (isAborted()) return; if (this.isAborted()) return;
// transfering seleted words to remote peer // transfering seleted words to remote peer
transferStatusMessage = "Running: Transfering chunk to target " + seed.hash + "/" + seed.getName(); this.transferStatusMessage = "Running: Transfering chunk to target " + this.seed.hash + "/" + this.seed.getName();
String error = yacyClient.transferIndex(seed, dhtChunk.containers(), dhtChunk.urlCacheMap(), gzipBody4Transfer, timeout4Transfer); String error = yacyClient.transferIndex(this.seed, this.dhtChunk.containers(), this.dhtChunk.urlCacheMap(), this.gzipBody4Transfer, this.timeout4Transfer);
if (error == null) { if (error == null) {
// words successfully transfered // words successfully transfered
transferTime = System.currentTimeMillis() - start; this.transferTime = System.currentTimeMillis() - start;
this.log.logInfo("Index transfer of " + dhtChunk.indexCount() + " words [" + dhtChunk.firstContainer().wordHash() + " .. " + dhtChunk.lastContainer().wordHash() + "]" + " to peer " + seed.getName() + ":" + seed.hash + " in " + (transferTime / 1000) + " seconds successful (" this.log.logInfo("Index transfer of " + this.dhtChunk.indexCount() + " words [" + this.dhtChunk.firstContainer().wordHash() + " .. " + this.dhtChunk.lastContainer().wordHash() + "]" + " to peer " + this.seed.getName() + ":" + this.seed.hash + " in " + (this.transferTime / 1000) + " seconds successful ("
+ (1000 * dhtChunk.indexCount() / (transferTime + 1)) + " words/s)"); + (1000 * this.dhtChunk.indexCount() / (this.transferTime + 1)) + " words/s)");
retryCount = 0; retryCount = 0;
transferStatusMessage = "Finished: Transfer of chunk to target " + seed.hash + "/" + seed.getName(); this.transferStatusMessage = "Finished: Transfer of chunk to target " + this.seed.hash + "/" + this.seed.getName();
transferStatus = plasmaDHTChunk.chunkStatus_COMPLETE; this.transferStatus = plasmaDHTChunk.chunkStatus_COMPLETE;
break; break;
} else { }
// words transfer failed
// inc retry counter
// inc retry counter retryCount++;
retryCount++;
// testing if we were aborted ...
if (this.isAborted()) return;
// we have lost the connection to the remote peer. Adding peer to disconnected list
this.log.logWarning("Index transfer to peer " + this.seed.getName() + ":" + this.seed.hash + " failed:'" + error + "', disconnecting peer");
yacyCore.peerActions.peerDeparture(this.seed);
// if the retry counter limit was not exceeded we'll retry it in a few seconds
this.transferStatusMessage = "Disconnected peer: " + ((retryCount > 5) ? error + ". Transfer aborted" : "Retry " + retryCount);
if (retryCount > this.maxRetry) {
this.transferStatus = plasmaDHTChunk.chunkStatus_FAILED;
return;
}
Thread.sleep(retryCount * 5000);
/* loop until
* - we have successfully done a peer ping or
* - the retry counter limit was exceeded
*/
while (true) {
// testing if we were aborted ... // testing if we were aborted ...
if (isAborted()) return; if (this.isAborted())
// we have lost the connection to the remote peer. Adding peer to disconnected list
this.log.logWarning("Index transfer to peer " + seed.getName() + ":" + seed.hash + " failed:'" + error + "', disconnecting peer");
yacyCore.peerActions.peerDeparture(seed);
// if the retry counter limit was not exceeded we'll retry it in a few seconds
transferStatusMessage = "Disconnected peer: " + ((retryCount > 5) ? error + ". Transfer aborted" : "Retry " + retryCount);
if (retryCount > maxRetry) {
transferStatus = plasmaDHTChunk.chunkStatus_FAILED;
return; return;
// doing a peer ping to the remote seed
int added = yacyClient.publishMySeed(this.seed.getAddress(), this.seed.hash);
if (added < 0) {
// inc. retry counter
retryCount++;
this.transferStatusMessage = "Disconnected peer: Peer ping failed. " + ((retryCount > 5) ? "Transfer aborted." : "Retry " + retryCount);
if (retryCount > this.maxRetry) return;
Thread.sleep(retryCount * 5000);
continue;
} }
Thread.sleep(retryCount * 5000);
yacyCore.seedDB.getConnected(this.seed.hash);
/* loop until this.transferStatusMessage = "running";
* - we have successfully done a peer ping or break;
* - the retry counter limit was exceeded
*/
while (true) {
// testing if we were aborted ...
if (isAborted())
return;
// doing a peer ping to the remote seed
int added = yacyClient.publishMySeed(seed.getAddress(), seed.hash);
if (added < 0) {
// inc. retry counter
retryCount++;
transferStatusMessage = "Disconnected peer: Peer ping failed. " + ((retryCount > 5) ? "Transfer aborted." : "Retry " + retryCount);
if (retryCount > maxRetry) return;
Thread.sleep(retryCount * 5000);
continue;
} else {
yacyCore.seedDB.getConnected(seed.hash);
transferStatusMessage = "running";
break;
}
}
} }
} }
} }
public static void setSeedcount(int seedcount) {
plasmaDHTTransfer.seedcount = seedcount;
}
public static int getSeedcount() {
return seedcount;
}
} }

@ -111,6 +111,7 @@ import java.net.URL;
import java.net.URLEncoder; import java.net.URLEncoder;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.Hashtable; import java.util.Hashtable;
@ -2138,56 +2139,69 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
public boolean dhtTransferProcess(plasmaDHTChunk dhtChunk, int peerCount) { public boolean dhtTransferProcess(plasmaDHTChunk dhtChunk, int peerCount) {
if ((yacyCore.seedDB == null) || (yacyCore.seedDB.sizeConnected() == 0)) return false; if ((yacyCore.seedDB == null) || (yacyCore.seedDB.sizeConnected() == 0)) return false;
// find a list of DHT-peers try {
yacySeed[] seeds = yacyCore.dhtAgent.getDHTTargets(log, peerCount, 10, dhtChunk.firstContainer().wordHash(), dhtChunk.lastContainer().wordHash(), 0.4); // find a list of DHT-peers
ArrayList seeds = new ArrayList(Arrays.asList(yacyCore.dhtAgent.getDHTTargets(log, peerCount, 10, dhtChunk.firstContainer().wordHash(), dhtChunk.lastContainer().wordHash(), 0.4)));
if (seeds.size() < peerCount) {
log.logWarning("found not enough (" + seeds.size() + ") peers for distribution");
return false;
}
if (seeds.length < peerCount) { // send away the indexes to all these peers
log.logWarning("found not enough (" + seeds.length + ") peers for distribution"); int hc1 = 0;
return false;
}
// send away the indexes to all these peers // getting distribution configuration values
String peerNames = ""; boolean gzipBody = getConfig("indexDistribution.gzipBody","false").equalsIgnoreCase("true");
int hc1 = 0; int timeout = (int)getConfigLong("indexDistribution.timeout",60000);
plasmaDHTTransfer.setSeedcount(0); int retries = 0;
plasmaDHTTransfer [] transfer = new plasmaDHTTransfer[peerCount];
for (int i = 0; i < transfer.length; i++) { // starting up multiple DHT transfer threads
transfer[i] = new plasmaDHTTransfer(log, seeds, dhtChunk, Iterator seedIter = seeds.iterator();
getConfig("indexDistribution.gzipBody","false").equalsIgnoreCase("true"), ArrayList transfer = new ArrayList(peerCount);
(int)getConfigLong("indexDistribution.timeout",60000), 0); while (hc1 < peerCount && seedIter.hasNext()) {
transfer[i].start();
} // starting up some transfer threads
int transferThreadCount = transfer.size();
boolean DHTalive = true; for (int i=0; i < peerCount-hc1-transferThreadCount; i++) {
while(DHTalive) { if (seedIter.hasNext()) {
DHTalive = false; plasmaDHTTransfer t = new plasmaDHTTransfer(log, (yacySeed)seedIter.next(), dhtChunk,gzipBody,timeout,retries);
try { t.start();
Thread.sleep(100); transfer.add(t);
} catch (InterruptedException e) { } else {
// TODO Auto-generated catch block break;
e.printStackTrace(); }
} }
for (int i = 0; i < transfer.length; i++) {
if (transfer[i].isAlive()) DHTalive = true; // waiting for the transfer threads to finish
} Iterator transferIter = transfer.iterator();
} while (transferIter.hasNext()) {
plasmaDHTTransfer t = (plasmaDHTTransfer)transferIter.next();
for (int i = 0; i < transfer.length; i++) { if (!t.isAlive()) {
if (transfer[i].getStatus() == plasmaDHTChunk.chunkStatus_COMPLETE) { // remove finished thread from the list
peerNames += ", " + seeds[i].getName(); transferIter.remove();
hc1++;
// count successful transfers
if (t.getStatus() == plasmaDHTChunk.chunkStatus_COMPLETE) {
this.log.logInfo("DHT distribution: transfer to peer " + t.getSeed().getName() + " finished.");
hc1++;
}
}
}
if (hc1 < peerCount) Thread.sleep(100);
} }
}
if (peerNames.length() > 0) peerNames = peerNames.substring(2); // remove comma
// clean up and finish with deletion of indexes // clean up and finish with deletion of indexes
if (hc1 >= peerCount) { if (hc1 >= peerCount) {
// success // success
return true; return true;
}
this.log.logSevere("Index distribution failed. Too few peers (" + hc1 + ") received the index, not deleted locally.");
return false;
} catch (InterruptedException e) {
return false;
} }
this.log.logSevere("Index distribution failed. Too few peers (" + hc1 + ") received the index, not deleted locally.");
return false;
} }
public void terminate() { public void terminate() {

Loading…
Cancel
Save