*)removed multiple DHT_Distribution_Threads

*)boosted DHT_Distribution sending chunk parallel to multiple peers

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@1890 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
hydrox 19 years ago
parent 7605ddd450
commit 8da13088e9

@ -60,6 +60,8 @@ public class plasmaDHTTransfer extends Thread {
private String transferStatusMessage = "";
// delivery destination
private static yacySeed [] seeds = null;
private static int seedcount = 0;
yacySeed seed = null;
// word chunk
@ -74,13 +76,25 @@ public class plasmaDHTTransfer extends Thread {
this.log = log;
this.gzipBody4Transfer = gzipBody;
this.timeout4Transfer = timeout;
this.seed = seed;
this.dhtChunk = dhtChunk;
this.maxRetry = retries;
seeds = new yacySeed[1];
seeds[0] = seed;
}
public plasmaDHTTransfer(serverLog log, yacySeed [] seeds, plasmaDHTChunk dhtChunk, boolean gzipBody, int timeout, int retries) {
super(new ThreadGroup("TransferIndexThreadGroup"), "TransferIndexWorker_" + seedcount);
this.log = log;
this.gzipBody4Transfer = gzipBody;
this.timeout4Transfer = timeout;
this.dhtChunk = dhtChunk;
this.maxRetry = retries;
setSeeds(seeds);
}
public void run() {
try {
while (getStatus() != plasmaDHTChunk.chunkStatus_COMPLETE && seedcount < seeds.length)try {
seed = seeds[seedcount++];
uploadIndex();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
@ -89,7 +103,7 @@ public class plasmaDHTTransfer extends Thread {
}
}
private boolean isAborted() {
if (stopped || Thread.currentThread().isInterrupted()) {
return true;
@ -185,5 +199,16 @@ public class plasmaDHTTransfer extends Thread {
}
}
}
private static void setSeeds(yacySeed [] seeds) {
plasmaDHTTransfer.seeds = seeds;
}
public static void setSeedcount(int seedcount) {
plasmaDHTTransfer.seedcount = seedcount;
}
public static int getSeedcount() {
return seedcount;
}
}

@ -586,14 +586,11 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
peerPing = new serverInstantThread(yc, "peerPing", null), 2000);
peerPing.setSyncObject(new Object());
getConfig("20_dhtdistribution_threads","1");
for(int i=0; i<(int)getConfigLong("20_dhtdistribution_threads",1);i++) {
deployThread("20_dhtdistribution_"+i, "DHT Distribution", "selection, transfer and deletion of index entries that are not searched on your peer, but on others", null,
new serverInstantThread(this, "dhtTransferJob", null), 60000 + i*5000,
deployThread("20_dhtdistribution", "DHT Distribution", "selection, transfer and deletion of index entries that are not searched on your peer, but on others", null,
new serverInstantThread(this, "dhtTransferJob", null), 60000,
Long.parseLong(getConfig("20_dhtdistribution_idlesleep" , "5000")),
Long.parseLong(getConfig("20_dhtdistribution_busysleep" , "0")),
Long.parseLong(getConfig("20_dhtdistribution_memprereq" , "1000000")));
}
// test routine for snippet fetch
//Set query = new HashSet();
@ -2131,19 +2128,36 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
// send away the indexes to all these peers
String peerNames = "";
int hc1 = 0;
plasmaDHTTransfer transfer = null;
for (int i = 0; i < seeds.length; i++) {
transfer = new plasmaDHTTransfer(log, seeds[i], dhtChunk,
plasmaDHTTransfer.setSeedcount(0);
plasmaDHTTransfer [] transfer = new plasmaDHTTransfer[peerCount];
for (int i = 0; i < transfer.length; i++) {
transfer[i] = new plasmaDHTTransfer(log, seeds, dhtChunk,
getConfig("indexDistribution.gzipBody","false").equalsIgnoreCase("true"),
(int)getConfigLong("indexDistribution.timeout",60000), 0);
try {transfer.uploadIndex();} catch (InterruptedException e) {}
if (transfer.getStatus() == plasmaDHTChunk.chunkStatus_COMPLETE) {
transfer[i].start();
}
boolean DHTalive = true;
while(DHTalive) {
DHTalive = false;
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
for (int i = 0; i < transfer.length; i++) {
if (transfer[i].isAlive()) DHTalive = true;
}
}
for (int i = 0; i < transfer.length; i++) {
if (transfer[i].getStatus() == plasmaDHTChunk.chunkStatus_COMPLETE) {
peerNames += ", " + seeds[i].getName();
hc1++;
}
if (hc1 >= peerCount) break;
}
if (peerNames.length() > 0) peerNames = peerNames.substring(2); // remove comma
// clean up and finish with deletion of indexes
@ -2154,7 +2168,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
this.log.logSevere("Index distribution failed. Too few peers (" + hc1 + ") received the index, not deleted locally.");
return false;
}
public void terminate() {
this.terminate = true;
this.shutdownSync.V();

@ -409,7 +409,6 @@ xpstopw=true
20_dhtdistribution_idlesleep=20000
20_dhtdistribution_busysleep=2000
20_dhtdistribution_memprereq=8388608
20_dhtdistribution_threads=1
30_peerping_idlesleep=120000
30_peerping_busysleep=120000
30_peerping_memprereq=1048576

Loading…
Cancel
Save