VERY EXPERIMENTAL removal of index ram cache flushing thread.

The cache will fill up and flushed explicitely when it is full.
This shall remove double-access of assortments (indexing and flush)
during indexing process. Hopefully this should reduce IO.
The main idea is: the cache shall mainly be flushed by DHT transfer, and
only indexes that shall be hosted by the own peer are flushed to the
assortments. This needs further work.

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@1617 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
orbiter 19 years ago
parent 5e04b13819
commit 1e4578aab6

@ -60,6 +60,7 @@ import de.anomic.http.httpHeader;
import de.anomic.plasma.plasmaCrawlLURL;
import de.anomic.plasma.plasmaSwitchboard;
import de.anomic.plasma.plasmaURL;
import de.anomic.plasma.plasmaWordIndex;
import de.anomic.plasma.plasmaWordIndexEntry;
import de.anomic.plasma.plasmaWordIndexEntryContainer;
import de.anomic.server.serverObjects;
@ -287,7 +288,7 @@ public class IndexControl_p {
// generate list
if (post.containsKey("keyhashsimilar")) {
final Iterator hashIt = switchboard.wordIndex.wordHashes(keyhash, true, true);
final Iterator hashIt = switchboard.wordIndex.wordHashes(keyhash, plasmaWordIndex.RL_WORDFILES, true, true);
StringBuffer result = new StringBuffer("Sequential List of Word-Hashes:<br>");
String hash;
int i = 0;

@ -111,7 +111,7 @@ public class plasmaDbImporter extends AbstractImporter implements dbImporter {
// iterate over all words from import db
Iterator importWordHashIterator = this.importWordIndex.wordHashes(this.wordChunkStartHash, true, false);
Iterator importWordHashIterator = this.importWordIndex.wordHashes(this.wordChunkStartHash, plasmaWordIndex.RL_WORDFILES, true, false);
while (!isAborted() && importWordHashIterator.hasNext()) {
plasmaWordIndexEntryContainer newContainer = null;

@ -831,11 +831,13 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
return false; // nothing to do
}
/*
if (wordIndex.wordCacheRAMSize() + 1000 > (int) getConfigLong("wordCacheMaxLow", 8000)) {
log.logFine("deQueue: word index ram cache too full (" + ((int) getConfigLong("wordCacheMaxLow", 8000) - wordIndex.wordCacheRAMSize()) + " slots left); dismissed to omit ram flush lock");
return false;
}
*/
int stackCrawlQueueSize;
if ((stackCrawlQueueSize = sbStackCrawlThread.size()) >= stackCrawlSlots) {
log.logFine("deQueue: too many processes in stack crawl thread queue, dismissed to protect emergency case (" + "stackCrawlQueue=" + stackCrawlQueueSize + ")");

@ -257,7 +257,7 @@ public final class plasmaWordIndex {
}
public void intermission(long pause) {
this.ramCache.intermission(pause);
//this.ramCache.intermission(pause);
}
public void close(int waitingBoundSeconds) {
@ -268,18 +268,23 @@ public final class plasmaWordIndex {
ramCache.deleteIndex(wordHash);
}
public Iterator wordHashes(String startHash, boolean up, boolean rot) {
//return ramCache.wordHashes(startHash, up);
if (rot) return new rotatingWordIterator(up, startHash);
else return new correctedWordIterator(up, rot, startHash); // use correction until bug is found
public static final int RL_RAMCACHE = 0;
public static final int RL_FILECACHE = 1;
public static final int RL_ASSORTMENTS = 2;
public static final int RL_WORDFILES = 3;
public Iterator wordHashes(String startHash, int resourceLevel, boolean up, boolean rot) {
if (rot) return new rotatingWordIterator(startHash, resourceLevel, up);
else return new correctedWordIterator(startHash, resourceLevel, up, rot); // use correction until bug is found
}
private final class correctedWordIterator implements Iterator {
Iterator iter;
String nextWord;
public correctedWordIterator(boolean up, boolean rotating, String firstWord) {
iter = ramCache.wordHashes(firstWord, up, rotating);
public correctedWordIterator(String firstWord, int resourceLevel, boolean up, boolean rotating) {
iter = ramCache.wordHashes(firstWord, resourceLevel, up, rotating);
try {
nextWord = (iter.hasNext()) ? (String) iter.next() : null;
boolean corrected = true;
int cc = 0; // to avoid rotation loops
@ -299,6 +304,9 @@ public final class plasmaWordIndex {
cc++;
}
}
} catch (java.util.ConcurrentModificationException e) {
nextWord = null;
}
}
public void finalize() {
@ -312,7 +320,11 @@ public final class plasmaWordIndex {
public Object next() {
String r = nextWord;
nextWord = (iter.hasNext()) ? (String) iter.next() : null;
try {
nextWord = (iter.hasNext()) ? (String) iter.next() : null;
} catch (java.util.ConcurrentModificationException e) {
nextWord = null;
}
return r;
}
@ -323,11 +335,13 @@ public final class plasmaWordIndex {
private class rotatingWordIterator implements Iterator {
Iterator i;
int resourceLevel;
boolean up;
public rotatingWordIterator(boolean up, String startWordHash) {
public rotatingWordIterator(String startWordHash, int resourceLevel, boolean up) {
this.up = up;
i = new correctedWordIterator(up, false, startWordHash);
this.resourceLevel = resourceLevel;
i = new correctedWordIterator(startWordHash, resourceLevel, up, false);
}
public void finalize() {
@ -337,7 +351,7 @@ public final class plasmaWordIndex {
public boolean hasNext() {
if (i.hasNext()) return true;
else {
i = new correctedWordIterator(up, false, (up)?"------------":"zzzzzzzzzzzz");
i = new correctedWordIterator((up)?"------------":"zzzzzzzzzzzz", resourceLevel, up, false);
return i.hasNext();
}
}
@ -472,7 +486,7 @@ public final class plasmaWordIndex {
// System.out.println(new Date(reverseMicroDateDays(microDateDays(System.currentTimeMillis()))));
plasmaWordIndex index = new plasmaWordIndex(new File("D:\\dev\\proxy\\DATA\\PLASMADB"), 555, new serverLog("TESTAPP"));
Iterator iter = index.wordHashes("5A8yhZMh_Kmv", true, true);
Iterator iter = index.wordHashes("5A8yhZMh_Kmv", plasmaWordIndex.RL_WORDFILES, true, true);
while (iter.hasNext()) {
System.out.println("File: " + (String) iter.next());
}

@ -77,7 +77,7 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
private final serverLog log;
private final plasmaWordIndexAssortmentCluster assortmentCluster;
private int assortmentBufferSize; //kb
private final flush flushThread;
//private final flush flushThread;
// calculated constants
private static String maxKey;
@ -93,7 +93,7 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
if (!(assortmentClusterPath.exists())) assortmentClusterPath.mkdirs();
// create flushing thread
flushThread = new flush();
//flushThread = new flush();
// creates a new index cache
// the cache has a back-end where indexes that do not fit in the cache are flushed
@ -117,7 +117,7 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
}
// start permanent flushing
flushThread.start();
//flushThread.start();
}
private void dump(int waitingSeconds) throws IOException {
@ -223,10 +223,12 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
return urlCount;
}
/*
public void intermission(long pause) {
flushThread.intermission(pause);
}
*/
// cache settings
public int maxURLinWordCache() {
@ -272,13 +274,45 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
if (cacheIndex != null) size += cacheIndex.size();
return size;
}
public Iterator wordHashes(String startWordHash, boolean up, boolean rot) {
return wordHashes(startWordHash, plasmaWordIndex.RL_WORDFILES, up, rot);
}
public Iterator wordHashes(String startWordHash, boolean up) {
// Old convention implies rot = true
//return new rotatingWordHashes(startWordHash, up);
return wordHashes(startWordHash, up, true);
public Iterator wordHashes(String startWordHash, int resourceLevel, boolean up, boolean rot) {
synchronized (cache) {
if (!(up)) throw new RuntimeException("plasmaWordIndexCache.wordHashes can only count up");
if (resourceLevel == plasmaWordIndex.RL_RAMCACHE) {
return cache.tailMap(startWordHash).keySet().iterator();
}
/*
if (resourceLevel == plasmaWordIndex.RL_FILECACHE) {
}
*/
if (resourceLevel == plasmaWordIndex.RL_ASSORTMENTS) {
return new kelondroMergeIterator(
cache.tailMap(startWordHash).keySet().iterator(),
assortmentCluster.hashConjunction(startWordHash, true, rot),
kelondroNaturalOrder.naturalOrder,
true);
}
if (resourceLevel == plasmaWordIndex.RL_WORDFILES) {
return new kelondroMergeIterator(
new kelondroMergeIterator(
cache.tailMap(startWordHash).keySet().iterator(),
assortmentCluster.hashConjunction(startWordHash, true, rot),
kelondroNaturalOrder.naturalOrder,
true),
backend.wordHashes(startWordHash, true, false),
kelondroNaturalOrder.naturalOrder,
true);
}
return null;
}
}
/*
public Iterator wordHashes(String startWordHash, boolean up, boolean rot) {
// here we merge 3 databases into one view:
// - the RAM Cache
@ -293,11 +327,13 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
assortmentCluster.hashConjunction(startWordHash, true, rot),
kelondroNaturalOrder.naturalOrder,
true),
backend.wordHashes(startWordHash, true),
backend.wordHashes(startWordHash, true, false),
kelondroNaturalOrder.naturalOrder,
true);
}
*/
/*
private final class flush extends Thread {
boolean terminate;
long intermission;
@ -332,7 +368,8 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
terminate = true;
}
}
*/
private void flushFromMem() {
// select appropriate hash
// we have 2 different methods to find a good hash:
@ -484,22 +521,7 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
int added = 0;
// check cache space
/*
if (cache.size() > 0) try {
// pause to get space in the cache (while it is flushed)
long pausetime;
if (highPriority) {
if (cache.size() + 1000 >= this.maxWordsHigh) Thread.sleep(java.lang.Math.min(1000, cache.size() - this.maxWordsHigh + 1000));
pausetime = java.lang.Math.min(10, 2 * cache.size() / (maxWordsHigh + 1));
} else {
if (cache.size() + 1000 >= this.maxWordsLow) Thread.sleep(java.lang.Math.min(1000, cache.size() - this.maxWordsLow + 1000));
pausetime = java.lang.Math.min(10, 2 * cache.size() / (maxWordsLow + 1));
}
// slow down if we reach cache limit
Thread.sleep(pausetime);
} catch (InterruptedException e) {}
*/
//serverLog.logDebug("PLASMA INDEXING", "addEntryToIndexMem: cache.size=" + cache.size() + "; hashScore.size=" + hashScore.size());
// put new words into cache
@ -516,12 +538,19 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
}
entries = null;
// force flush (sometimes)
if (System.currentTimeMillis() % 7 == 4) flushFromMem();
// force flush
if (highPriority) {
while (cache.size() > maxWordsHigh) flushFromMem();
if (cache.size() > maxWordsHigh) {
while (cache.size() + 500 > maxWordsHigh) {
try { Thread.sleep(10); } catch (InterruptedException e) { }
flushFromMem();
}}
} else {
while (cache.size() > maxWordsLow) flushFromMem();
if (cache.size() > maxWordsLow) {
while (cache.size() + 500 > maxWordsLow) {
try { Thread.sleep(10); } catch (InterruptedException e) { }
flushFromMem();
}}
}
}
return added;
@ -544,8 +573,8 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
public void close(int waitingSeconds) {
// stop permanent flushing
flushThread.terminate();
try {flushThread.join(6000);} catch (InterruptedException e) {}
//flushThread.terminate();
//try {flushThread.join(6000);} catch (InterruptedException e) {}
// dump cache
try {

@ -71,7 +71,8 @@ public class plasmaWordIndexClassicDB implements plasmaWordIndexInterface {
return size;
}
public Iterator wordHashes(String startHash, boolean up) {
public Iterator wordHashes(String startHash, boolean up, boolean rot) {
if (rot) throw new UnsupportedOperationException("no rot allowed");
return new iterateFiles(startHash, up);
}

@ -203,9 +203,10 @@ public final class plasmaWordIndexDistribution {
String startPointHash = selectTransferStart();
this.log.logFine("Selected hash " + startPointHash + " as start point for index distribution, distance = " + yacyDHTAction.dhtDistance(yacyCore.seedDB.mySeed.hash, startPointHash));
Object[] selectResult = selectTransferContainers(startPointHash, indexCount);
Object[] selectResult = selectTransferContainers(startPointHash, indexCount/3, indexCount);
indexContainers = (plasmaWordIndexEntryContainer[]) selectResult[0];
HashMap urlCache = (HashMap) selectResult[1]; // String (url-hash) / plasmaCrawlLURL.Entry
//int refcount = ((Integer) selectResult[2]).intValue();
if ((indexContainers == null) || (indexContainers.length == 0)) {
this.log.logFine("No index available for index transfer, hash start-point " + startPointHash);
@ -319,21 +320,37 @@ public final class plasmaWordIndexDistribution {
return startPointHash;
}
Object[] /* of {plasmaWordIndexEntryContainer[], HashMap(String, plasmaCrawlLURL.Entry)}*/
selectTransferContainers(String hash, int count) {
public Object[] /* of {plasmaWordIndexEntryContainer[], HashMap(String, plasmaCrawlLURL.Entry)}*/
selectTransferContainers(String hash, int mincount, int maxcount) {
Object[] selectResult = selectTransferContainersResource(hash, plasmaWordIndex.RL_RAMCACHE, maxcount);
int refcount = ((Integer) selectResult[2]).intValue();
if (refcount >= mincount) {
log.logFine("DHT selection from RAM: " + refcount + " entries");
return selectResult;
}
selectResult = selectTransferContainersResource(hash, plasmaWordIndex.RL_WORDFILES, maxcount);
refcount = ((Integer) selectResult[2]).intValue();
log.logFine("DHT selection from FILE: " + refcount + " entries");
return selectResult;
}
private Object[] /* of {plasmaWordIndexEntryContainer[], HashMap(String, plasmaCrawlLURL.Entry)}*/
selectTransferContainersResource(String hash, int resourceLevel, int maxcount) {
// the hash is a start hash from where the indexes are picked
ArrayList tmpContainers = new ArrayList(count);
ArrayList tmpContainers = new ArrayList(maxcount);
String nexthash = "";
try {
Iterator wordHashIterator = this.wordIndex.wordHashes(hash, true, true);
synchronized (this.wordIndex) {try {
Iterator wordHashIterator = this.wordIndex.wordHashes(hash, resourceLevel, true, true);
plasmaWordIndexEntryContainer indexContainer;
Iterator urlIter;
plasmaWordIndexEntry indexEntry;
plasmaCrawlLURL.Entry lurl;
int refcount = 0;
final HashMap knownURLs = new HashMap();
while (
(count > 0) &&
(maxcount > refcount) &&
(wordHashIterator.hasNext()) &&
((nexthash = (String) wordHashIterator.next()) != null) &&
(nexthash.trim().length() > 0) &&
@ -346,7 +363,7 @@ public final class plasmaWordIndexDistribution {
try {
urlIter = indexContainer.entries();
// iterate over indexes to fetch url entries and store them in the urlCache
while ((urlIter.hasNext()) && (count > 0)) {
while ((urlIter.hasNext()) && (maxcount > refcount)) {
indexEntry = (plasmaWordIndexEntry) urlIter.next();
try {
lurl = this.urlPool.loadedURL.getEntry(indexEntry.getUrlHash(), indexEntry);
@ -356,7 +373,7 @@ public final class plasmaWordIndexDistribution {
this.wordIndex.removeEntries(nexthash, new String[]{indexEntry.getUrlHash()}, true);
} else {
knownURLs.put(indexEntry.getUrlHash(), lurl);
count--;
refcount++;
}
} catch (IOException e) {
notBoundCounter++;
@ -381,11 +398,12 @@ public final class plasmaWordIndexDistribution {
}
// transfer to array
plasmaWordIndexEntryContainer[] entryContainers = (plasmaWordIndexEntryContainer[]) tmpContainers.toArray(new plasmaWordIndexEntryContainer[tmpContainers.size()]);
return new Object[]{entryContainers, knownURLs};
return new Object[]{entryContainers, knownURLs, new Integer(refcount)};
} catch (kelondroException e) {
this.log.logSevere("selectTransferIndexes database corrupted: " + e.getMessage(), e);
return new Object[]{new plasmaWordIndexEntity[0], new HashMap(0)};
}
}
}
void closeTransferIndex(plasmaWordIndexEntity indexEntity) throws IOException {
@ -800,9 +818,10 @@ public final class plasmaWordIndexDistribution {
// selecting 500 words to transfer
this.status = "Running: Selecting chunk " + iteration;
Object[] selectResult = selectTransferContainers(this.startPointHash, this.chunkSize);
Object[] selectResult = selectTransferContainers(this.startPointHash, this.chunkSize/3, this.chunkSize);
newIndexContainers = (plasmaWordIndexEntryContainer[]) selectResult[0];
HashMap urlCache = (HashMap) selectResult[1]; // String (url-hash) / plasmaCrawlLURL.Entry
//int refcount = ((Integer) selectResult[2]).intValue();
/* If we havn't selected a word chunk this could be because of
* a) no words are left in the index

@ -48,7 +48,7 @@ public interface plasmaWordIndexInterface {
public int size();
public Iterator wordHashes(String startWordHash, boolean up);
public Iterator wordHashes(String startWordHash, boolean up, boolean rot);
public plasmaWordIndexEntity getEntity(String wordHash, boolean deleteIfEmpty, long maxTime);
public plasmaWordIndexEntryContainer getContainer(String wordHash, boolean deleteIfEmpty, long maxTime);

@ -836,7 +836,7 @@ public final class yacy {
long globalStart = System.currentTimeMillis(), wordChunkStart = System.currentTimeMillis(), wordChunkEnd = 0;
String wordChunkStartHash = "------------", wordChunkEndHash;
Iterator importWordHashIterator = importWordIndex.wordHashes(wordChunkStartHash, true, true);
Iterator importWordHashIterator = importWordIndex.wordHashes(wordChunkStartHash, plasmaWordIndex.RL_WORDFILES, true, true);
while (importWordHashIterator.hasNext()) {
// testing if import process was aborted
@ -945,7 +945,7 @@ public final class yacy {
Runtime rt = Runtime.getRuntime();
int cacheMem = (int)(rt.maxMemory()-rt.totalMemory())-5*1024*1024;
plasmaWordIndex wordIndex = new plasmaWordIndex(dbroot, cacheMem, log);
Iterator wordHashIterator = wordIndex.wordHashes("------------", true, true);
Iterator wordHashIterator = wordIndex.wordHashes("------------", plasmaWordIndex.RL_WORDFILES, true, true);
String wordhash;
long urlCounter = 0, wordCounter = 0;
@ -1332,7 +1332,7 @@ public final class yacy {
Iterator WordHashIterator = null;
if (resource.equals("all")) {
WordIndex = new plasmaWordIndex(homeDBroot, 8*1024*1024, log);
WordHashIterator = WordIndex.wordHashes(wordChunkStartHash, true, false);
WordHashIterator = WordIndex.wordHashes(wordChunkStartHash, plasmaWordIndex.RL_WORDFILES, true, false);
} else if (resource.equals("assortments")) {
plasmaWordIndexAssortmentCluster assortmentCluster = new plasmaWordIndexAssortmentCluster(new File(homeDBroot, "ACLUSTER"), 64, 16*1024*1024, log);
WordHashIterator = assortmentCluster.hashConjunction(wordChunkStartHash, true, false);
@ -1342,7 +1342,7 @@ public final class yacy {
WordHashIterator = assortment.hashes(wordChunkStartHash, true, false);
} else if (resource.equals("words")) {
plasmaWordIndexClassicDB fileDB = new plasmaWordIndexClassicDB(homeDBroot, log);
WordHashIterator = fileDB.wordHashes(wordChunkStartHash, true);
WordHashIterator = fileDB.wordHashes(wordChunkStartHash, true, false);
}
int counter = 0;
String wordHash = "";

Loading…
Cancel
Save