serialized dhtChunk deletion with indexing

The dht selection, transmission and deletion is now completely serialized with indexing


git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@1731 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
orbiter 19 years ago
parent 76b167ed22
commit 1d8ca6e082

@ -3,7 +3,7 @@ javacSource=1.4
javacTarget=1.4
# Release Configuration
releaseVersion=0.431
releaseVersion=0.432
releaseFile=yacy_dev_v${releaseVersion}_${DSTAMP}_${releaseNr}.tar.gz
#releaseFile=yacy_v${releaseVersion}_${DSTAMP}_${releaseNr}.tar.gz
releaseDir=yacy_dev_v${releaseVersion}_${DSTAMP}_${releaseNr}

@ -68,7 +68,6 @@ public class plasmaDHTChunk {
private plasmaCrawlLURL lurls;
private int status = chunkStatus_UNDEFINED;
private String statusMessage = "";
private String startPointHash;
private plasmaWordIndexEntryContainer[] indexContainers = null;
private HashMap urlCache; // String (url-hash) / plasmaCrawlLURL.Entry
@ -114,14 +113,6 @@ public class plasmaDHTChunk {
return this.status;
}
public void setStatusMessage(String message) {
this.statusMessage = message;
}
public String getStatusMessage() {
return statusMessage;
}
public plasmaDHTChunk(serverLog log, plasmaWordIndex wordIndex, plasmaCrawlLURL lurls, int minCount, int maxCount) {
this.log = log;
this.wordIndex = wordIndex;

@ -130,7 +130,7 @@ public class plasmaDHTFlush extends Thread {
public String[] getStatus() {
plasmaDHTTransfer workerThread = this.worker;
if (workerThread != null) {
return new String[]{this.status,workerThread.dhtChunk.getStatusMessage()};
return new String[]{this.status,workerThread.getStatusMessage()};
}
return new String[]{this.status,"Not running"};
}

@ -56,6 +56,8 @@ public class plasmaDHTTransfer extends Thread {
// status fields
private boolean stopped = false;
private long transferTime = 0;
private int transferStatus = plasmaDHTChunk.chunkStatus_UNDEFINED;
private String transferStatusMessage = "";
// delivery destination
yacySeed seed = null;
@ -102,21 +104,29 @@ public class plasmaDHTTransfer extends Thread {
public long getTransferTime() {
return this.transferTime;
}
public int getStatus() {
return transferStatus;
}
public String getStatusMessage() {
return transferStatusMessage;
}
public void uploadIndex() throws InterruptedException {
/* loop until we
* - have successfully transfered the words list or
* - the retry counter limit was exceeded
*/
transferStatus = plasmaDHTChunk.chunkStatus_RUNNING;
long retryCount = 0, start = System.currentTimeMillis();
while (true) {
// testing if we were aborted
if (isAborted()) return;
// transfering seleted words to remote peer
dhtChunk.setStatusMessage("Running: Transfering chunk to target " + seed.hash + "/" + seed.getName());
dhtChunk.setStatus(plasmaDHTChunk.chunkStatus_RUNNING);
transferStatusMessage = "Running: Transfering chunk to target " + seed.hash + "/" + seed.getName();
String error = yacyClient.transferIndex(seed, dhtChunk.containers(), dhtChunk.urlCacheMap(), gzipBody4Transfer, timeout4Transfer);
if (error == null) {
// words successfully transfered
@ -124,8 +134,8 @@ public class plasmaDHTTransfer extends Thread {
this.log.logInfo("Index transfer of " + dhtChunk.indexCount() + " words [" + dhtChunk.firstContainer().wordHash() + " .. " + dhtChunk.lastContainer().wordHash() + "]" + " to peer " + seed.getName() + ":" + seed.hash + " in " + (transferTime / 1000) + " seconds successfull ("
+ (1000 * dhtChunk.indexCount() / (transferTime + 1)) + " words/s)");
retryCount = 0;
dhtChunk.setStatusMessage("Finished: Transfer of chunk to target " + seed.hash + "/" + seed.getName());
dhtChunk.setStatus(plasmaDHTChunk.chunkStatus_COMPLETE);
transferStatusMessage = "Finished: Transfer of chunk to target " + seed.hash + "/" + seed.getName();
transferStatus = plasmaDHTChunk.chunkStatus_COMPLETE;
break;
} else {
// words transfer failed
@ -141,9 +151,9 @@ public class plasmaDHTTransfer extends Thread {
yacyCore.peerActions.peerDeparture(seed);
// if the retry counter limit was not exceeded we'll retry it in a few seconds
dhtChunk.setStatusMessage("Disconnected peer: " + ((retryCount > 5) ? error + ". Transfer aborted" : "Retry " + retryCount));
transferStatusMessage = "Disconnected peer: " + ((retryCount > 5) ? error + ". Transfer aborted" : "Retry " + retryCount);
if (retryCount > maxRetry) {
dhtChunk.setStatus(plasmaDHTChunk.chunkStatus_FAILED);
transferStatus = plasmaDHTChunk.chunkStatus_FAILED;
return;
}
Thread.sleep(retryCount * 5000);
@ -162,13 +172,13 @@ public class plasmaDHTTransfer extends Thread {
if (added < 0) {
// inc. retry counter
retryCount++;
dhtChunk.setStatusMessage("Disconnected peer: Peer ping failed. " + ((retryCount > 5) ? "Transfer aborted." : "Retry " + retryCount));
transferStatusMessage = "Disconnected peer: Peer ping failed. " + ((retryCount > 5) ? "Transfer aborted." : "Retry " + retryCount);
if (retryCount > maxRetry) return;
Thread.sleep(retryCount * 5000);
continue;
} else {
yacyCore.seedDB.getConnected(seed.hash);
dhtChunk.setStatusMessage("running");
transferStatusMessage = "running";
break;
}
}

@ -864,11 +864,20 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
boolean doneSomething = false;
// possibly delete entries from last chunk
if ((this.dhtTransferChunk != null) &&
(this.dhtTransferChunk.getStatus() == plasmaDHTChunk.chunkStatus_COMPLETE)) {
int deletedURLs = this.dhtTransferChunk.deleteTransferIndexes();
this.log.logFine("Deleted from " + this.dhtTransferChunk.containers().length + " transferred RWIs locally, removed " + deletedURLs + " URL references");
this.dhtTransferChunk = null;
}
// generate a dht chunk
if ((this.dhtTransferChunk == null) ||
(this.dhtTransferChunk.getStatus() == plasmaDHTChunk.chunkStatus_UNDEFINED) ||
(this.dhtTransferChunk.getStatus() == plasmaDHTChunk.chunkStatus_COMPLETE) ||
(this.dhtTransferChunk.getStatus() == plasmaDHTChunk.chunkStatus_FAILED)) {
if ((dhtShallTransfer() == null) &&
((this.dhtTransferChunk == null) ||
(this.dhtTransferChunk.getStatus() == plasmaDHTChunk.chunkStatus_UNDEFINED) ||
(this.dhtTransferChunk.getStatus() == plasmaDHTChunk.chunkStatus_COMPLETE) ||
(this.dhtTransferChunk.getStatus() == plasmaDHTChunk.chunkStatus_FAILED))) {
// generate new chunk
dhtTransferChunk = new plasmaDHTChunk(this.log, this.wordIndex, this.urlPool.loadedURL, 30, dhtTransferIndexCount);
doneSomething = true;
@ -1980,34 +1989,35 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
}
}
public boolean dhtTransferJob() {
public String dhtShallTransfer() {
if (yacyCore.seedDB == null) {
log.logFine("no DHT distribution: seedDB == null");
return false;
return "no DHT distribution: seedDB == null";
}
if (yacyCore.seedDB.mySeed == null) {
log.logFine("no DHT distribution: mySeed == null");
return false;
return "no DHT distribution: mySeed == null";
}
if (yacyCore.seedDB.mySeed.isVirgin()) {
log.logFine("no DHT distribution: status is virgin");
return false;
return "no DHT distribution: status is virgin";
}
if (getConfig("allowDistributeIndex","false").equalsIgnoreCase("false")) {
log.logFine("no DHT distribution: not enabled");
return false;
return "no DHT distribution: not enabled";
}
if (urlPool.loadedURL.size() < 10) {
log.logFine("no DHT distribution: loadedURL.size() = " + urlPool.loadedURL.size());
return false;
return "no DHT distribution: loadedURL.size() = " + urlPool.loadedURL.size();
}
if (wordIndex.size() < 100) {
log.logFine("no DHT distribution: not enough words - wordIndex.size() = " + wordIndex.size());
return false;
return "no DHT distribution: not enough words - wordIndex.size() = " + wordIndex.size();
}
if ((getConfig("allowDistributeIndexWhileCrawling","false").equalsIgnoreCase("false")) && (urlPool.noticeURL.stackSize() > 0)) {
log.logFine("no DHT distribution: crawl in progress - noticeURL.stackSize() = " + urlPool.noticeURL.stackSize());
return "no DHT distribution: crawl in progress - noticeURL.stackSize() = " + urlPool.noticeURL.stackSize();
}
return null;
}
public boolean dhtTransferJob() {
String rejectReason = dhtShallTransfer();
if (rejectReason != null) {
log.logFine(rejectReason);
return false;
}
if (this.dhtTransferChunk == null) {
@ -2015,7 +2025,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
return false;
}
if ((this.dhtTransferChunk != null) && (this.dhtTransferChunk.getStatus() != plasmaDHTChunk.chunkStatus_FILLED)) {
log.logFine("no DHT distribution: index distribution is in progress");
log.logFine("no DHT distribution: index distribution is in progress, status=" + this.dhtTransferChunk.getStatus());
return false;
}
@ -2023,26 +2033,29 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
int peerCount = (yacyCore.seedDB.mySeed.isJunior()) ? 1 : 3;
long starttime = System.currentTimeMillis();
boolean ok = dhtTransferProcess(dhtTransferChunk, peerCount, true);
boolean ok = dhtTransferProcess(dhtTransferChunk, peerCount);
if (!ok) {
log.logFine("no word distribution: transfer failed");
if (ok) {
dhtTransferChunk.setStatus(plasmaDHTChunk.chunkStatus_COMPLETE);
log.logFine("DHT distribution: transfer COMPLETE");
// adopt transfer count
if ((System.currentTimeMillis() - starttime) > (10000 * peerCount)) {
dhtTransferIndexCount--;
} else {
dhtTransferIndexCount++;
}
if (dhtTransferIndexCount < 50) dhtTransferIndexCount = 50;
// show success
return true;
} else {
dhtTransferChunk.setStatus(plasmaDHTChunk.chunkStatus_FAILED);
log.logFine("DHT distribution: transfer FAILED");
return false;
}
// adopt transfer count
if ((System.currentTimeMillis() - starttime) > (10000 * peerCount))
dhtTransferIndexCount--;
else
dhtTransferIndexCount++;
if (dhtTransferIndexCount < 50) dhtTransferIndexCount = 50;
// show success
return true;
}
public boolean dhtTransferProcess(plasmaDHTChunk dhtChunk, int peerCount, boolean delete) {
public boolean dhtTransferProcess(plasmaDHTChunk dhtChunk, int peerCount) {
if ((yacyCore.seedDB == null) || (yacyCore.seedDB.sizeConnected() == 0)) return false;
// find a list of DHT-peers
@ -2063,7 +2076,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
(int)getConfigLong("indexDistribution.timeout",60000), 0);
try {transfer.uploadIndex();} catch (InterruptedException e) {}
if (transfer.dhtChunk.getStatus() == plasmaDHTChunk.chunkStatus_COMPLETE) {
if (transfer.getStatus() == plasmaDHTChunk.chunkStatus_COMPLETE) {
peerNames += ", " + seeds[i].getName();
hc1++;
}
@ -2074,11 +2087,6 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
// clean up and finish with deletion of indexes
if (hc1 >= peerCount) {
// success
if (delete) {
int deletedURLs = dhtChunk.deleteTransferIndexes();
this.log.logFine("Deleted from " + dhtChunk.containers().length + " transferred RWIs locally, removed " + deletedURLs + " URL references");
}
dhtChunk.setStatus(plasmaDHTChunk.chunkStatus_COMPLETE);
return true;
}
this.log.logSevere("Index distribution failed. Too few peers (" + hc1 + ") received the index, not deleted locally.");

@ -149,7 +149,7 @@ public final class plasmaWordIndex {
}
}
public int addEntriesBackend(plasmaWordIndexEntryContainer entries) {
private int addEntriesBackend(plasmaWordIndexEntryContainer entries) {
plasmaWordIndexEntryContainer feedback = assortmentCluster.storeTry(entries.wordHash(), entries);
if (feedback == null) {
return entries.size();

Loading…
Cancel
Save