moved DHT index selection to new object that holds indexes to be send away to other peer.

This was made to make it possible that RWI selections can be serialized with indexing.
Serialization will be implemented in another step.

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@1698 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
orbiter 19 years ago
parent e3dd67bba0
commit cd41e9a0eb

@ -0,0 +1,227 @@
package de.anomic.plasma;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import de.anomic.kelondro.kelondroBase64Order;
import de.anomic.kelondro.kelondroException;
import de.anomic.server.serverCodings;
import de.anomic.server.logging.serverLog;
import de.anomic.yacy.yacyCore;
import de.anomic.yacy.yacyDHTAction;
import de.anomic.yacy.yacySeedDB;
public class plasmaDHTChunk {
public static final int chunkStatus_UNDEFINED = -1;
public static final int chunkStatus_FAILED = 0;
public static final int chunkStatus_FILLED = 1;
public static final int chunkStatus_RUNNING = 2;
public static final int chunkStatus_INTERRUPTED = 3;
public static final int chunkStatus_COMPLETE = 4;
private plasmaWordIndex wordIndex;
private serverLog log;
private plasmaCrawlLURL lurls;
private int status = chunkStatus_UNDEFINED;
private String startPointHash;
private plasmaWordIndexEntryContainer[] indexContainers = null;
private HashMap urlCache; // String (url-hash) / plasmaCrawlLURL.Entry
private int idxCount;
public plasmaWordIndexEntryContainer firstContainer() {
return indexContainers[0];
}
public plasmaWordIndexEntryContainer lastContainer() {
return indexContainers[indexContainers.length - 1];
}
public plasmaWordIndexEntryContainer[] containers() {
return indexContainers;
}
public int containerSize() {
return indexContainers.length;
}
public int indexCount() {
return this.idxCount;
}
private int indexCounter() {
int c = 0;
for (int i = 0; i < indexContainers.length; i++) {
c += indexContainers[i].size();
}
return c;
}
public HashMap urlCacheMap() {
return urlCache;
}
public int getStatus() {
return this.status;
}
public plasmaDHTChunk(serverLog log, plasmaWordIndex wordIndex, plasmaCrawlLURL lurls, int minCount, int maxCount) {
this.log = log;
this.wordIndex = wordIndex;
this.lurls = lurls;
startPointHash = selectTransferStart();
log.logFine("Selected hash " + startPointHash + " as start point for index distribution, distance = " + yacyDHTAction.dhtDistance(yacyCore.seedDB.mySeed.hash, startPointHash));
selectTransferContainers(startPointHash, minCount, maxCount);
// count the indexes, can be smaller as expected
this.idxCount = indexCounter();
if (idxCount < minCount) {
log.logFine("Too few (" + idxCount + ") indexes selected for transfer.");
this.status = chunkStatus_FAILED;
}
}
public plasmaDHTChunk(serverLog log, plasmaWordIndex wordIndex, plasmaCrawlLURL lurls, int minCount, int maxCount, String startHash) {
this.log = log;
this.wordIndex = wordIndex;
this.lurls = lurls;
log.logFine("Demanded hash " + startHash + " as start point for index distribution, distance = " + yacyDHTAction.dhtDistance(yacyCore.seedDB.mySeed.hash, startPointHash));
selectTransferContainers(startHash, minCount, maxCount);
// count the indexes, can be smaller as expected
this.idxCount = indexCounter();
if (idxCount < minCount) {
log.logFine("Too few (" + idxCount + ") indexes selected for transfer.");
this.status = chunkStatus_FAILED;
}
}
private String selectTransferStart() {
String startPointHash;
// first try to select with increasing probality a good start point
if (Math.round(Math.random() * 6) != 4)
for (int i = 9; i > 0; i--) {
startPointHash = kelondroBase64Order.enhancedCoder.encode(serverCodings.encodeMD5Raw(Long.toString(i + System.currentTimeMillis()))).substring(2, 2 + yacySeedDB.commonHashLength);
if (yacyDHTAction.dhtDistance(yacyCore.seedDB.mySeed.hash, startPointHash) > ((double) i / (double) 10))
return startPointHash;
}
// if that fails, take simply the best start point (this is usually avoided, since that leads to always the same target peers)
startPointHash = yacyCore.seedDB.mySeed.hash.substring(0, 11) + "z";
return startPointHash;
}
private void selectTransferContainers(String hash, int mincount, int maxcount) {
int refcountRAM = selectTransferContainersResource(hash, plasmaWordIndex.RL_RAMCACHE, maxcount);
if (refcountRAM >= mincount) {
log.logFine("DHT selection from RAM: " + refcountRAM + " entries");
return;
}
int refcountFile = selectTransferContainersResource(hash, plasmaWordIndex.RL_WORDFILES, maxcount);
log.logFine("DHT selection from FILE: " + refcountFile + " entries, RAM provided only " + refcountRAM + " entries");
return;
}
private int selectTransferContainersResource(String hash, int resourceLevel, int maxcount) {
// the hash is a start hash from where the indexes are picked
ArrayList tmpContainers = new ArrayList(maxcount);
String nexthash = "";
try {
Iterator wordHashIterator = wordIndex.wordHashes(hash, resourceLevel, true);
plasmaWordIndexEntryContainer indexContainer;
Iterator urlIter;
plasmaWordIndexEntry indexEntry;
plasmaCrawlLURL.Entry lurl;
int refcount = 0;
urlCache = new HashMap();
while ((maxcount > refcount) && (wordHashIterator.hasNext()) && ((nexthash = (String) wordHashIterator.next()) != null) && (nexthash.trim().length() > 0)
&& ((tmpContainers.size() == 0) || (yacyDHTAction.dhtDistance(nexthash, ((plasmaWordIndexEntryContainer) tmpContainers.get(0)).wordHash()) < 0.2))) {
// make an on-the-fly entity and insert values
indexContainer = wordIndex.getContainer(nexthash, true, 10000);
int notBoundCounter = 0;
try {
urlIter = indexContainer.entries();
// iterate over indexes to fetch url entries and store them in the urlCache
while ((urlIter.hasNext()) && (maxcount > refcount)) {
indexEntry = (plasmaWordIndexEntry) urlIter.next();
try {
lurl = lurls.getEntry(indexEntry.getUrlHash(), indexEntry);
if ((lurl == null) || (lurl.url() == null)) {
notBoundCounter++;
urlIter.remove();
wordIndex.removeEntries(nexthash, new String[] { indexEntry.getUrlHash() }, true);
} else {
urlCache.put(indexEntry.getUrlHash(), lurl);
refcount++;
}
} catch (IOException e) {
notBoundCounter++;
urlIter.remove();
wordIndex.removeEntries(nexthash, new String[] { indexEntry.getUrlHash() }, true);
}
}
// remove all remaining; we have enough
while (urlIter.hasNext()) {
indexEntry = (plasmaWordIndexEntry) urlIter.next();
urlIter.remove();
}
// use whats left
log.logFine("Selected partial index (" + indexContainer.size() + " from " + wordIndex.indexSize(nexthash) + " URLs, " + notBoundCounter + " not bound) for word " + indexContainer.wordHash());
tmpContainers.add(indexContainer);
} catch (kelondroException e) {
log.logSevere("plasmaWordIndexDistribution/2: deleted DB for word " + nexthash, e);
wordIndex.deleteIndex(nexthash);
}
}
// create result
indexContainers = (plasmaWordIndexEntryContainer[]) tmpContainers.toArray(new plasmaWordIndexEntryContainer[tmpContainers.size()]);
if ((indexContainers == null) || (indexContainers.length == 0)) {
log.logFine("No index available for index transfer, hash start-point " + startPointHash);
this.status = chunkStatus_FAILED;
return 0;
}
this.status = chunkStatus_FILLED;
return refcount;
} catch (kelondroException e) {
log.logSevere("selectTransferIndexes database corrupted: " + e.getMessage(), e);
indexContainers = new plasmaWordIndexEntryContainer[0];
urlCache = new HashMap();
this.status = chunkStatus_FAILED;
return 0;
}
}
public int deleteTransferIndexes() {
Iterator urlIter;
plasmaWordIndexEntry indexEntry;
String[] urlHashes;
int count = 0;
synchronized (wordIndex) {
for (int i = 0; i < this.indexContainers.length; i++) {
// delete entries separately
int c = 0;
urlHashes = new String[this.indexContainers[i].size()];
urlIter = this.indexContainers[i].entries();
while (urlIter.hasNext()) {
indexEntry = (plasmaWordIndexEntry) urlIter.next();
urlHashes[c++] = indexEntry.getUrlHash();
}
count += wordIndex.removeEntries(this.indexContainers[i].wordHash(), urlHashes, true);
log.logFine("Deleted partial index (" + c + " URLs) for word " + this.indexContainers[i].wordHash() + "; " + this.wordIndex.indexSize(indexContainers[i].wordHash()) + " entries left");
this.indexContainers[i] = null;
}
}
return count;
}
}

@ -44,19 +44,13 @@
package de.anomic.plasma;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.HashMap;
import de.anomic.yacy.yacyCore;
import de.anomic.yacy.yacySeed;
import de.anomic.yacy.yacySeedDB;
import de.anomic.yacy.yacyClient;
import de.anomic.yacy.yacyDHTAction;
import de.anomic.server.serverCodings;
import de.anomic.server.logging.serverLog;
import de.anomic.kelondro.kelondroBase64Order;
import de.anomic.kelondro.kelondroException;
public final class plasmaWordIndexDistribution {
@ -198,63 +192,37 @@ public final class plasmaWordIndexDistribution {
if ((yacyCore.seedDB == null) || (yacyCore.seedDB.sizeConnected() == 0)) return -1;
// collect index
plasmaWordIndexEntryContainer[] indexContainers = null;
plasmaDHTChunk dhtChunk = new plasmaDHTChunk(this.log, this.wordIndex, this.urlPool.loadedURL, indexCount, peerCount);
try {
String startPointHash = selectTransferStart();
this.log.logFine("Selected hash " + startPointHash + " as start point for index distribution, distance = " + yacyDHTAction.dhtDistance(yacyCore.seedDB.mySeed.hash, startPointHash));
final int minCount = indexCount/3;
Object[] selectResult = selectTransferContainers(startPointHash, minCount, indexCount);
indexContainers = (plasmaWordIndexEntryContainer[]) selectResult[0];
HashMap urlCache = (HashMap) selectResult[1]; // String (url-hash) / plasmaCrawlLURL.Entry
//int refcount = ((Integer) selectResult[2]).intValue();
if ((indexContainers == null) || (indexContainers.length == 0)) {
this.log.logFine("No index available for index transfer, hash start-point " + startPointHash);
return -1;
}
// count the indexes again, can be smaller as expected
indexCount = 0;
for (int i = 0; i < indexContainers.length; i++) {
indexCount += indexContainers[i].size();
}
if (indexCount < minCount) {
this.log.logFine("Too few (" + indexCount + ") indexes selected for transfer.");
return -1; // failed
}
// find start point for DHT-selection
String keyhash = indexContainers[indexContainers.length - 1].wordHash(); // DHT targets must have greater hashes
String keyhash = dhtChunk.lastContainer().wordHash(); // DHT targets must have greater hashes
// find a list of DHT-peers
yacySeed[] seeds = new yacySeed[peerCount + 10];
int hc0 = 0;
double ownDistance = Math.min(yacyDHTAction.dhtDistance(yacyCore.seedDB.mySeed.hash, indexContainers[0].wordHash()),
yacyDHTAction.dhtDistance(yacyCore.seedDB.mySeed.hash, indexContainers[indexContainers.length - 1].wordHash()));
double ownDistance = Math.min(yacyDHTAction.dhtDistance(yacyCore.seedDB.mySeed.hash, dhtChunk.firstContainer().wordHash()), yacyDHTAction.dhtDistance(yacyCore.seedDB.mySeed.hash, dhtChunk.lastContainer().wordHash()));
double maxDistance = Math.min(ownDistance, 0.4);
synchronized (yacyCore.dhtAgent) {
double avdist;
Enumeration e = yacyCore.dhtAgent.getAcceptRemoteIndexSeeds(keyhash);
while ((e.hasMoreElements()) && (hc0 < seeds.length)) {
if (this.isClosed()) {
this.log.logSevere("Index distribution interrupted by close, nothing deleted locally.");
return -1; // interrupted
}
seeds[hc0] = (yacySeed) e.nextElement();
if (seeds[hc0] != null) {
avdist = Math.max(yacyDHTAction.dhtDistance(seeds[hc0].hash, indexContainers[0].wordHash()),
yacyDHTAction.dhtDistance(seeds[hc0].hash, indexContainers[indexContainers.length - 1].wordHash()));
avdist = Math.max(yacyDHTAction.dhtDistance(seeds[hc0].hash, dhtChunk.firstContainer().wordHash()), yacyDHTAction.dhtDistance(seeds[hc0].hash, dhtChunk.lastContainer().wordHash()));
if (avdist < maxDistance) {
this.log.logInfo("Selected " + ((hc0 < peerCount) ? "primary" : "reserve") + " DHT target peer " + seeds[hc0].getName() + ":" + seeds[hc0].hash + ", distance = " + avdist);
log.logInfo("Selected " + ((hc0 < peerCount) ? "primary" : "reserve") + " DHT target peer " + seeds[hc0].getName() + ":" + seeds[hc0].hash + ", distance = " + avdist);
hc0++;
}
}
}
e = null; // finish enumeration
}
if (hc0 < peerCount) {
this.log.logWarning("found not enough (" + hc0 + ") peers for distribution");
return -1; // failed
log.logWarning("found not enough (" + hc0 + ") peers for distribution");
return -1;
}
// send away the indexes to all these indexes
@ -270,12 +238,12 @@ public final class plasmaWordIndexDistribution {
start = System.currentTimeMillis();
error = yacyClient.transferIndex(
seeds[i],
indexContainers,
urlCache,
dhtChunk.containers(),
dhtChunk.urlCacheMap(),
this.gzipBody4Distribution,
this.timeout4Distribution);
if (error == null) {
this.log.logInfo("Index transfer of " + indexCount + " words [" + indexContainers[0].wordHash() + " .. " + indexContainers[indexContainers.length - 1].wordHash() + "] to peer " + seeds[i].getName() + ":" + seeds[i].hash + " in " + ((System.currentTimeMillis() - start) / 1000)
this.log.logInfo("Index transfer of " + indexCount + " words [" + dhtChunk.firstContainer().wordHash() + " .. " + dhtChunk.lastContainer().wordHash() + "] to peer " + seeds[i].getName() + ":" + seeds[i].hash + " in " + ((System.currentTimeMillis() - start) / 1000)
+ " seconds successfull (" + (1000 * indexCount / (System.currentTimeMillis() - start + 1)) + " words/s)");
peerNames += ", " + seeds[i].getName();
hc1++;
@ -291,9 +259,8 @@ public final class plasmaWordIndexDistribution {
if (hc1 >= peerCount) {
// success
if (delete) {
int deletedURLs = deleteTransferIndexes(indexContainers);
this.log.logFine("Deleted from " + indexContainers.length + " transferred RWIs locally, removed " + deletedURLs + " URL references");
indexContainers = null;
int deletedURLs = dhtChunk.deleteTransferIndexes();
this.log.logFine("Deleted from " + dhtChunk.containers().length + " transferred RWIs locally, removed " + deletedURLs + " URL references");
return indexCount;
}
return indexCount;
@ -301,111 +268,13 @@ public final class plasmaWordIndexDistribution {
this.log.logSevere("Index distribution failed. Too few peers (" + hc1 + ") received the index, not deleted locally.");
return -1;
} finally {
if (indexContainers != null) {
if (dhtChunk.containers() != null) {
// simply close the indexEntities
closeTransferIndexes(indexContainers);
closeTransferIndexes(dhtChunk.containers());
}
}
}
private String selectTransferStart() {
String startPointHash;
// first try to select with increasing probality a good start point
if (Math.round(Math.random() * 6) != 4) for (int i = 9; i > 0; i--) {
startPointHash = kelondroBase64Order.enhancedCoder.encode(serverCodings.encodeMD5Raw(Long.toString(i + System.currentTimeMillis()))).substring(2, 2 + yacySeedDB.commonHashLength);
if (yacyDHTAction.dhtDistance(yacyCore.seedDB.mySeed.hash, startPointHash) > ((double) i / (double) 10)) return startPointHash;
}
// if that fails, take simply the best start point (this is usually avoided, since that leads to always the same target peers)
startPointHash = yacyCore.seedDB.mySeed.hash.substring(0, 11) + "z";
return startPointHash;
}
public Object[] /* of {plasmaWordIndexEntryContainer[], HashMap(String, plasmaCrawlLURL.Entry)}*/
selectTransferContainers(String hash, int mincount, int maxcount) {
Object[] selectResult = selectTransferContainersResource(hash, plasmaWordIndex.RL_RAMCACHE, maxcount);
int refcountRAM = ((Integer) selectResult[2]).intValue();
if (refcountRAM >= mincount) {
log.logFine("DHT selection from RAM: " + refcountRAM + " entries");
return selectResult;
}
selectResult = selectTransferContainersResource(hash, plasmaWordIndex.RL_WORDFILES, maxcount);
int refcountFile = ((Integer) selectResult[2]).intValue();
log.logFine("DHT selection from FILE: " + refcountFile + " entries, RAM provided only " + refcountRAM + " entries");
return selectResult;
}
private Object[] /* of {plasmaWordIndexEntryContainer[], HashMap(String, plasmaCrawlLURL.Entry)}*/
selectTransferContainersResource(String hash, int resourceLevel, int maxcount) {
// the hash is a start hash from where the indexes are picked
ArrayList tmpContainers = new ArrayList(maxcount);
String nexthash = "";
synchronized (this.wordIndex) {try {
Iterator wordHashIterator = this.wordIndex.wordHashes(hash, resourceLevel, true);
plasmaWordIndexEntryContainer indexContainer;
Iterator urlIter;
plasmaWordIndexEntry indexEntry;
plasmaCrawlLURL.Entry lurl;
int refcount = 0;
final HashMap knownURLs = new HashMap();
while (
(maxcount > refcount) &&
(wordHashIterator.hasNext()) &&
((nexthash = (String) wordHashIterator.next()) != null) &&
(nexthash.trim().length() > 0) &&
((tmpContainers.size() == 0) ||
(yacyDHTAction.dhtDistance(nexthash, ((plasmaWordIndexEntryContainer)tmpContainers.get(0)).wordHash()) < 0.2))
) {
// make an on-the-fly entity and insert values
indexContainer = this.wordIndex.getContainer(nexthash, true, 10000);
int notBoundCounter = 0;
try {
urlIter = indexContainer.entries();
// iterate over indexes to fetch url entries and store them in the urlCache
while ((urlIter.hasNext()) && (maxcount > refcount)) {
indexEntry = (plasmaWordIndexEntry) urlIter.next();
try {
lurl = this.urlPool.loadedURL.getEntry(indexEntry.getUrlHash(), indexEntry);
if ((lurl == null) || (lurl.url() == null)) {
notBoundCounter++;
urlIter.remove();
this.wordIndex.removeEntries(nexthash, new String[]{indexEntry.getUrlHash()}, true);
} else {
knownURLs.put(indexEntry.getUrlHash(), lurl);
refcount++;
}
} catch (IOException e) {
notBoundCounter++;
urlIter.remove();
this.wordIndex.removeEntries(nexthash, new String[]{indexEntry.getUrlHash()}, true);
}
}
// remove all remaining; we have enough
while (urlIter.hasNext()) {
indexEntry = (plasmaWordIndexEntry) urlIter.next();
urlIter.remove();
}
// use whats left
this.log.logFine("Selected partial index (" + indexContainer.size() + " from " + this.wordIndex.indexSize(nexthash) +" URLs, " + notBoundCounter + " not bound) for word " + indexContainer.wordHash());
tmpContainers.add(indexContainer);
} catch (kelondroException e) {
this.log.logSevere("plasmaWordIndexDistribution/2: deleted DB for word " + nexthash, e);
this.wordIndex.deleteIndex(nexthash);
}
}
// transfer to array
plasmaWordIndexEntryContainer[] entryContainers = (plasmaWordIndexEntryContainer[]) tmpContainers.toArray(new plasmaWordIndexEntryContainer[tmpContainers.size()]);
return new Object[]{entryContainers, knownURLs, new Integer(refcount)};
} catch (kelondroException e) {
this.log.logSevere("selectTransferIndexes database corrupted: " + e.getMessage(), e);
return new Object[]{new plasmaWordIndexEntity[0], new HashMap(0)};
}
}
}
void closeTransferIndex(plasmaWordIndexEntity indexEntity) throws IOException {
Object migrationStatus;
indexEntity.close();
@ -440,72 +309,6 @@ public final class plasmaWordIndexDistribution {
}
}
synchronized int deleteTransferIndexes(plasmaWordIndexEntryContainer[] indexContainers) {
Iterator urlIter;
plasmaWordIndexEntry indexEntry;
String[] urlHashes;
int count = 0;
for (int i = 0; i < indexContainers.length; i++) {
// delete entries separately
int c = 0;
urlHashes = new String[indexContainers[i].size()];
urlIter = indexContainers[i].entries();
while (urlIter.hasNext()) {
indexEntry = (plasmaWordIndexEntry) urlIter.next();
urlHashes[c++] = indexEntry.getUrlHash();
}
count += wordIndex.removeEntries(indexContainers[i].wordHash(), urlHashes, true);
log.logFine("Deleted partial index (" + c + " URLs) for word " + indexContainers[i].wordHash() + "; " + this.wordIndex.indexSize(indexContainers[i].wordHash()) + " entries left");
indexContainers[i] = null;
}
return count;
}
/*
boolean deleteTransferIndexes(plasmaWordIndexEntity[] indexEntities) throws IOException {
Iterator urlIter;
plasmaWordIndexEntry indexEntry;
plasmaWordIndexEntity indexEntity;
String[] urlHashes;
int sz;
boolean success = true;
for (int i = 0; i < indexEntities.length; i++) {
if (indexEntities[i].isTMPEntity()) {
// delete entries separately
int c = 0;
urlHashes = new String[indexEntities[i].size()];
urlIter = indexEntities[i].elements(true);
while (urlIter.hasNext()) {
indexEntry = (plasmaWordIndexEntry) urlIter.next();
urlHashes[c++] = indexEntry.getUrlHash();
}
wordIndex.removeEntries(indexEntities[i].wordHash(), urlHashes, true);
indexEntity = wordIndex.getEntity(indexEntities[i].wordHash(), true, -1);
sz = indexEntity.size();
// indexEntity.close();
closeTransferIndex(indexEntity);
log.logFine("Deleted partial index (" + c + " URLs) for word " + indexEntities[i].wordHash() + "; " + sz + " entries left");
// end debug
indexEntities[i].close();
} else {
// delete complete file
if (indexEntities[i].deleteComplete()) {
indexEntities[i].close();
} else {
indexEntities[i].close();
// have another try...
if (!(plasmaWordIndexEntity.wordHash2path(wordIndex.getRoot(), indexEntities[i].wordHash()).delete())) {
success = false;
log.logSevere("Could not delete whole index for word " + indexEntities[i].wordHash());
}
}
}
indexEntities[i] = null;
}
return success;
}
*/
public void startTransferWholeIndex(yacySeed seed, boolean delete) {
if (transferIdxThread == null) {
this.transferIdxThread = new transferIndexThread(seed,delete);
@ -794,7 +597,7 @@ public final class plasmaWordIndexDistribution {
}
public void performTransferWholeIndex() {
plasmaWordIndexEntryContainer[] newIndexContainers = null, oldIndexContainers = null;
plasmaDHTChunk newDHTChunk = null, oldDHTChunk = null;
try {
// pausing the regular index distribution
// TODO: adding sync, to wait for a still running index distribution to finish
@ -812,22 +615,20 @@ public final class plasmaWordIndexDistribution {
while (!finished && !Thread.currentThread().isInterrupted()) {
iteration++;
int idxCount = 0;
selectionStart = System.currentTimeMillis();
oldIndexContainers = newIndexContainers;
oldDHTChunk = newDHTChunk;
// selecting 500 words to transfer
this.status = "Running: Selecting chunk " + iteration;
Object[] selectResult = selectTransferContainers(this.startPointHash, this.chunkSize/3, this.chunkSize);
newIndexContainers = (plasmaWordIndexEntryContainer[]) selectResult[0];
HashMap urlCache = (HashMap) selectResult[1]; // String (url-hash) / plasmaCrawlLURL.Entry
//int refcount = ((Integer) selectResult[2]).intValue();
newDHTChunk = new plasmaDHTChunk(plasmaWordIndexDistribution.this.log, wordIndex, sb.urlPool.loadedURL, this.chunkSize/3, this.chunkSize, this.startPointHash);
/* If we havn't selected a word chunk this could be because of
* a) no words are left in the index
* b) max open file limit was exceeded
*/
if ((newIndexContainers == null) || (newIndexContainers.length == 0)) {
if ((newDHTChunk == null) ||
(newDHTChunk.containerSize() == 0) ||
(newDHTChunk.getStatus() == plasmaDHTChunk.chunkStatus_FAILED)) {
if (sb.wordIndex.size() > 0) {
// if there are still words in the index we try it again now
startPointHash = "------------";
@ -838,19 +639,17 @@ public final class plasmaWordIndexDistribution {
finished = true;
}
} else {
// count the indexes again, can be smaller as expected
for (int i = 0; i < newIndexContainers.length; i++) idxCount += newIndexContainers[i].size();
// getting start point for next DHT-selection
oldStartingPointHash = startPointHash;
startPointHash = newIndexContainers[newIndexContainers.length - 1].wordHash(); // DHT targets must have greater hashes
startPointHash = newDHTChunk.lastContainer().wordHash(); // DHT targets must have greater hashes
selectionEnd = System.currentTimeMillis();
selectionTime = selectionEnd - selectionStart;
plasmaWordIndexDistribution.this.log.logInfo("Index selection of " + idxCount + " words [" + newIndexContainers[0].wordHash() + " .. " + newIndexContainers[newIndexContainers.length-1].wordHash() + "]" +
plasmaWordIndexDistribution.this.log.logInfo("Index selection of " + newDHTChunk.indexCount() + " words [" + newDHTChunk.firstContainer().wordHash() + " .. " + newDHTChunk.lastContainer().wordHash() + "]" +
" in " +
(selectionTime / 1000) + " seconds (" +
(1000 * idxCount / (selectionTime+1)) + " words/s)");
(1000 * newDHTChunk.indexCount() / (selectionTime+1)) + " words/s)");
}
// query status of old worker thread
@ -861,12 +660,6 @@ public final class plasmaWordIndexDistribution {
// if the transfer failed we abort index transfer now
this.status = "Aborted because of Transfer error:\n" + worker.getStatus();
// cleanup. closing all open files
closeContainers(oldIndexContainers);
oldIndexContainers = null;
closeContainers(newIndexContainers);
newIndexContainers = null;
// abort index transfer
return;
} else {
@ -897,23 +690,27 @@ public final class plasmaWordIndexDistribution {
// deleting transfered words from index
if (delete) {
this.status = "Running: Deleting chunk " + iteration;
int urlReferences = deleteTransferIndexes(oldIndexContainers);
plasmaWordIndexDistribution.this.log.logFine("Deleted from " + oldIndexContainers.length + " transferred RWIs locally " + urlReferences + " URL references");
transferedEntryCount += idxCount;
transferedEntityCount += oldIndexContainers.length;
transferedEntryCount += oldDHTChunk.indexCount();
transferedEntityCount += oldDHTChunk.containerSize();
int urlReferences = oldDHTChunk.deleteTransferIndexes();
plasmaWordIndexDistribution.this.log.logFine("Deleted from " + oldDHTChunk.containerSize() + " transferred RWIs locally " + urlReferences + " URL references");
} else {
this.closeContainers(oldIndexContainers);
transferedEntryCount += idxCount;
transferedEntityCount += oldIndexContainers.length;
transferedEntryCount += oldDHTChunk.indexCount();
transferedEntityCount += oldDHTChunk.containerSize();
}
oldIndexContainers = null;
oldDHTChunk = null;
}
this.worker = null;
}
// handover chunk to transfer worker
if (!((newIndexContainers == null) || (newIndexContainers.length == 0))) {
worker = new transferIndexWorkerThread(seed,newIndexContainers,urlCache,gzipBody4Transfer,timeout4Transfer,iteration,idxCount,idxCount,startPointHash,oldStartingPointHash);
if ((newDHTChunk != null) &&
(newDHTChunk.containerSize() > 0) ||
(newDHTChunk.getStatus() == plasmaDHTChunk.chunkStatus_FILLED)) {
worker = new transferIndexWorkerThread(seed, newDHTChunk.containers(), newDHTChunk.urlCacheMap(),
gzipBody4Transfer, timeout4Transfer, iteration,
newDHTChunk.indexCount(), newDHTChunk.indexCount(),
startPointHash, oldStartingPointHash);
worker.start();
}
}
@ -930,20 +727,10 @@ public final class plasmaWordIndexDistribution {
try {worker.join();}catch(Exception e){}
// worker = null;
}
if (oldIndexContainers != null) closeContainers(oldIndexContainers);
if (newIndexContainers != null) closeContainers(newIndexContainers);
plasmaWordIndexDistribution.this.paused = false;
}
}
private void closeContainers(plasmaWordIndexEntryContainer[] indexContainers) {
if ((indexContainers == null)||(indexContainers.length ==0)) return;
for (int i = 0; i < indexContainers.length; i++) {
indexContainers[i] = null;
}
}
}

Loading…
Cancel
Save