replaced index dump stack by an dump array and limited url number in assortment ram (prevents too much RAM occupation)

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@406 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
orbiter 20 years ago
parent 51962d55bf
commit 9c72b4cdec

@ -53,18 +53,20 @@ import de.anomic.kelondro.kelondroMScoreCluster;
import de.anomic.kelondro.kelondroMergeIterator; import de.anomic.kelondro.kelondroMergeIterator;
import de.anomic.kelondro.kelondroRecords; import de.anomic.kelondro.kelondroRecords;
import de.anomic.kelondro.kelondroStack; import de.anomic.kelondro.kelondroStack;
import de.anomic.kelondro.kelondroArray;
import de.anomic.server.logging.serverLog; import de.anomic.server.logging.serverLog;
import de.anomic.yacy.yacySeedDB; import de.anomic.yacy.yacySeedDB;
public final class plasmaWordIndexCache implements plasmaWordIndexInterface { public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
// environment constants // environment constants
private static final String indexDumpFileName = "indexDump0.stack"; private static final String indexArrayFileName = "indexDump1.array";
private static final String indexStackFileName = "indexDump0.stack";
private static final String oldSingletonFileName = "indexSingletons0.db"; private static final String oldSingletonFileName = "indexSingletons0.db";
private static final String newSingletonFileName = "indexAssortment001.db"; private static final String newSingletonFileName = "indexAssortment001.db";
private static final String indexAssortmentClusterPath = "ACLUSTER"; private static final String indexAssortmentClusterPath = "ACLUSTER";
private static final int assortmentLimit = 50; private static final int assortmentLimit = 50;
private static final int ramcacheLimit = 51; private static final int ramCacheLimit = 200;
// class variables // class variables
@ -134,7 +136,115 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
private void dump(int waitingSeconds) throws IOException { private void dump(int waitingSeconds) throws IOException {
log.logSystem("creating dump for index cache, " + cache.size() + " words (and much more urls)"); log.logSystem("creating dump for index cache, " + cache.size() + " words (and much more urls)");
File indexDumpFile = new File(databaseRoot, indexDumpFileName); File indexDumpFile = new File(databaseRoot, indexArrayFileName);
if (indexDumpFile.exists()) indexDumpFile.delete();
kelondroArray dumpArray = new kelondroArray(indexDumpFile, plasmaWordIndexAssortment.bufferStructureBasis, 0);
long startTime = System.currentTimeMillis();
long messageTime = System.currentTimeMillis() + 5000;
long wordsPerSecond = 0, wordcount = 0, urlcount = 0;
synchronized (cache) {
Iterator i = cache.entrySet().iterator();
Map.Entry entry;
String wordHash;
plasmaWordIndexEntryContainer container;
long updateTime;
plasmaWordIndexEntry wordEntry;
byte[][] row = new byte[5][];
while (i.hasNext()) {
// get entries
entry = (Map.Entry) i.next();
wordHash = (String) entry.getKey();
updateTime = getUpdateTime(wordHash);
container = (plasmaWordIndexEntryContainer) entry.getValue();
// put entries on stack
if (container != null) {
Iterator ci = container.entries();
while (ci.hasNext()) {
wordEntry = (plasmaWordIndexEntry) ci.next();
row[0] = wordHash.getBytes();
row[1] = kelondroRecords.long2bytes(container.size(), 4);
row[2] = kelondroRecords.long2bytes(updateTime, 8);
row[3] = wordEntry.getUrlHash().getBytes();
row[4] = wordEntry.toEncodedForm(true).getBytes();
dumpArray.set((int) urlcount++, row);
}
}
wordcount++;
i.remove(); // free some mem
// write a log
if (System.currentTimeMillis() > messageTime) {
System.gc(); // for better statistic
wordsPerSecond = wordcount * 1000 / (1 + System.currentTimeMillis() - startTime);
log.logInfo("dumping status: " + wordcount + " words done, " + (cache.size() / (wordsPerSecond + 1)) + " seconds remaining, free mem = " + (Runtime.getRuntime().freeMemory() / 1024 / 1024) + "MB");
messageTime = System.currentTimeMillis() + 5000;
}
}
}
dumpArray.close();
log.logSystem("dumped " + urlcount + " word/url relations in " + ((System.currentTimeMillis() - startTime) / 1000) + " seconds");
}
private long restore() throws IOException {
if ((new File(databaseRoot, indexArrayFileName)).exists())
return restoreArray();
else
return restoreStack();
}
private long restoreArray() throws IOException {
File indexDumpFile = new File(databaseRoot, indexArrayFileName);
if (!(indexDumpFile.exists())) return 0;
kelondroArray dumpArray = new kelondroArray(indexDumpFile);
log.logSystem("restore array dump of index cache, " + dumpArray.size() + " word/url relations");
long startTime = System.currentTimeMillis();
long messageTime = System.currentTimeMillis() + 5000;
long urlCount = 0, urlsPerSecond = 0;
try {
synchronized (cache) {
int i = dumpArray.size();
String wordHash;
plasmaWordIndexEntryContainer container;
long creationTime;
plasmaWordIndexEntry wordEntry;
byte[][] row;
Runtime rt = Runtime.getRuntime();
while (i-- > 0) {
// get out one entry
row = dumpArray.get(i);
wordHash = new String(row[0]);
creationTime = kelondroRecords.bytes2long(row[2]);
wordEntry = new plasmaWordIndexEntry(new String(row[3]), new String(row[4]));
// store to cache
addEntry(wordHash, wordEntry, creationTime);
urlCount++;
// protect against memory shortage
while (rt.freeMemory() < 1000000) {flushFromMem(); java.lang.System.gc();}
// write a log
if (System.currentTimeMillis() > messageTime) {
System.gc(); // for better statistic
urlsPerSecond = 1 + urlCount * 1000 / (1 + System.currentTimeMillis() - startTime);
log.logInfo("restoring status: " + urlCount + " urls done, " + (i / urlsPerSecond) + " seconds remaining, free mem = " + (Runtime.getRuntime().freeMemory() / 1024 / 1024) + "MB");
messageTime = System.currentTimeMillis() + 5000;
}
}
}
dumpArray.close();
log.logSystem("restored " + cache.size() + " words in " + ((System.currentTimeMillis() - startTime) / 1000) + " seconds");
} catch (kelondroException e) {
// restore failed
log.logError("restore of indexCache array dump failed: " + e.getMessage());
e.printStackTrace();
}
return urlCount;
}
/*
private void dump(int waitingSeconds) throws IOException {
log.logSystem("creating dump for index cache, " + cache.size() + " words (and much more urls)");
File indexDumpFile = new File(databaseRoot, indexStackFileName);
if (indexDumpFile.exists()) indexDumpFile.delete(); if (indexDumpFile.exists()) indexDumpFile.delete();
kelondroStack dumpStack = new kelondroStack(indexDumpFile, 1024, plasmaWordIndexAssortment.bufferStructureBasis); kelondroStack dumpStack = new kelondroStack(indexDumpFile, 1024, plasmaWordIndexAssortment.bufferStructureBasis);
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
@ -186,11 +296,13 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
log.logSystem("dumped " + urlcount + " word/url relations in " + ((System.currentTimeMillis() - startTime) / 1000) + " seconds"); log.logSystem("dumped " + urlcount + " word/url relations in " + ((System.currentTimeMillis() - startTime) / 1000) + " seconds");
} }
private long restore() throws IOException { */
File indexDumpFile = new File(databaseRoot, indexDumpFileName);
private long restoreStack() throws IOException {
File indexDumpFile = new File(databaseRoot, indexStackFileName);
if (!(indexDumpFile.exists())) return 0; if (!(indexDumpFile.exists())) return 0;
kelondroStack dumpStack = new kelondroStack(indexDumpFile, 1024); kelondroStack dumpStack = new kelondroStack(indexDumpFile, 1024);
log.logSystem("restore dump of index cache, " + dumpStack.size() + " word/url relations"); log.logSystem("restore stack dump of index cache, " + dumpStack.size() + " word/url relations");
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
long messageTime = System.currentTimeMillis() + 5000; long messageTime = System.currentTimeMillis() + 5000;
long urlCount = 0, urlsPerSecond = 0; long urlCount = 0, urlsPerSecond = 0;
@ -204,7 +316,6 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
plasmaWordIndexEntry wordEntry; plasmaWordIndexEntry wordEntry;
byte[][] row; byte[][] row;
Runtime rt = Runtime.getRuntime(); Runtime rt = Runtime.getRuntime();
System.gc(); // this is not for performance, but only to make the statistic work better
while (i.hasNext()) { while (i.hasNext()) {
// get out one entry // get out one entry
node = (kelondroRecords.Node) i.next(); node = (kelondroRecords.Node) i.next();
@ -234,6 +345,7 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
log.logError("restore of indexCache dump failed: " + e.getMessage()); log.logError("restore of indexCache dump failed: " + e.getMessage());
e.printStackTrace(); e.printStackTrace();
} }
indexDumpFile.delete();
return urlCount; return urlCount;
} }
@ -325,7 +437,8 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
} }
int count = hashScore.getMaxScore(); int count = hashScore.getMaxScore();
long time = longTime(hashDate.getScore(hash)); long time = longTime(hashDate.getScore(hash));
if ((count > ramcacheLimit) && (System.currentTimeMillis() - time > 10000)) { if ((count > ramCacheLimit) ||
((count > assortmentLimit) && (System.currentTimeMillis() - time > 10000))) {
// flush high-score entries // flush high-score entries
flushFromMem(hash, true); flushFromMem(hash, true);
} else { } else {
@ -387,67 +500,6 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
} }
/*
private int flushFromMem(String key, boolean reintegrate) {
// this method flushes indexes out from the ram to the disc.
// at first we check the singleton database and act accordingly
// if we we are to flush an index, but see also an entry in the singletons, we
// decide upn the 'reintegrate'-Flag:
// true: do not flush to disc, but re-Integrate the singleton to the RAM
// false: flush the singleton together with container to disc
plasmaWordIndexEntryContainer container = null;
long time;
synchronized (cache) {
// get the container
container = (plasmaWordIndexEntryContainer) cache.get(key);
if (container == null) return 0; // flushing of nonexisting key
time = getUpdateTime(key);
// remove it from the cache
cache.remove(key);
hashScore.deleteScore(key);
hashDate.deleteScore(key);
}
// now decide where to flush that container
plasmaWordIndexEntryContainer flushedFromAssortment = assortmentCluster.removeFromAll(key);
if ((flushedFromAssortment == null) || (flushedFromAssortment.size() == 0)) {
// not found in assortments
if (container.size() <= assortmentLimit) {
// this fits into the assortments
plasmaWordIndexEntryContainer feedback = assortmentCluster.storeTry(key, container);
if (feedback == null) {
return container.size();
} else {
// *** should care about another option here ***
return backend.addEntries(feedback, time);
}
} else {
// store to back-end; this should be a rare case
return backend.addEntries(container, time);
}
} else {
// we have some records and must integrate them into the flush
container.add(flushedFromAssortment);
// possibly reintegrate
if (reintegrate) {
// put assortmentRecord together with container back to ram
synchronized (cache) {
cache.put(key, container);
hashScore.setScore(key, container.size());
hashDate.setScore(key, intTime(time));
}
return -flushedFromAssortment.size();
} else {
// add this to the backend
return backend.addEntries(container, java.lang.Math.max(time, flushedFromAssortment.updated())) - flushedFromAssortment.size();
}
}
}
*/
private int intTime(long longTime) { private int intTime(long longTime) {
return (int) ((longTime - startTime) / 1000); return (int) ((longTime - startTime) / 1000);
} }
@ -468,106 +520,6 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
} }
} }
/*
private synchronized int flushFromMemToLimit() {
if ((hashScore.size() == 0) || (cache.size() == 0)) return 0;
flushThread.pause();
int count = 0;
//serverLog.logDebug("PLASMA INDEXING", "flushSpecific: hashScore.size=" + hashScore.size() + ", cache.size=" + cache.size());
synchronized (hashScore) {
String key;
Long createTime;
// generate flush list
Iterator i = hashScore.scores(true);
TreeMap[] clusterCandidate = new TreeMap[hashScore.getMaxScore()];
for (int k = 0; k < clusterCandidate.length; k++) clusterCandidate[k] = new TreeMap(); // by create time ordered hash-list
while (i.hasNext()) {
// get the entry properties
key = (String) i.next();
createTime = new Long(longTime(hashDate.getScore(key)));
count = hashScore.getScore(key);
// put it into a specific ohl
clusterCandidate[count - 1].put(createTime, key);
//System.out.println("COUNT FOR KEY " + key + ": " + count);
}
// print statistics
for (int k = 0; k < clusterCandidate.length; k++)
log.logDebug("FLUSH-LIST " + (k + 1) + ": " + clusterCandidate[k].size() + " entries");
Map.Entry entry;
int candidateCounter;
count = 0;
// flush high-scores that accumultated too much
for (int cluster = clusterCandidate.length; cluster >= ramcacheLimit; cluster--) {
candidateCounter = 0;
i = clusterCandidate[cluster - 1].entrySet().iterator();
while (i.hasNext()) {
entry = (Map.Entry) i.next();
key = (String) entry.getValue();
createTime = (Long) entry.getKey();
count += java.lang.Math.abs(flushFromMem(key, false));
candidateCounter += cluster;
log.logDebug("flushed high-cluster over limit #" + cluster + ", key=" + key + ", count=" + count + ", cachesize=" + cache.size());
}
}
// flush from assortment cluster
for (int cluster = 1; cluster <= java.lang.Math.min(clusterCandidate.length, assortmentLimit); cluster++) {
candidateCounter = 0;
// select a specific cluster
i = clusterCandidate[cluster - 1].entrySet().iterator();
// check each element in this flush-list: too old?
while (i.hasNext()) {
entry = (Map.Entry) i.next();
key = (String) entry.getValue();
createTime = (Long) entry.getKey();
if ((createTime != null) && ((System.currentTimeMillis() - createTime.longValue()) > (20000 + (cluster * 5000)))) {
//log.logDebug("flushing key " + key + ", count=" + count + ", cachesize=" + cache.size() + ", singleton-size=" + singletons.size());
count += java.lang.Math.abs(flushFromMem(key, true));
candidateCounter += cluster;
}
}
if (candidateCounter > 0) log.logDebug("flushed low-cluster #" + cluster + ", count=" + count + ", candidateCounter=" + candidateCounter + ", cachesize=" + cache.size());
}
// stop flushing if cache is shrinked enough
// avoid as possible to flush high-scores
if (cache.size() < this.maxWords - 100) {
flushThread.proceed();
return count;
}
// flush high-scores
for (int cluster = java.lang.Math.min(clusterCandidate.length, ramcacheLimit); cluster > assortmentLimit; cluster--) {
candidateCounter = 0;
i = clusterCandidate[cluster - 1].entrySet().iterator();
while (i.hasNext()) {
entry = (Map.Entry) i.next();
key = (String) entry.getValue();
createTime = (Long) entry.getKey();
if ((createTime != null) && ((System.currentTimeMillis() - createTime.longValue()) > (600000/cluster))) {
count += java.lang.Math.abs(flushFromMem(key, false));
candidateCounter += cluster;
log.logDebug("flushed high-cluster below limit #" + cluster + ", key=" + key + ", count=" + count + ", cachesize=" + cache.size());
}
if (cache.size() < this.maxWords - 100) {
flushThread.proceed();
return count;
}
}
}
}
flushThread.proceed();
return count;
}
*/
public plasmaWordIndexEntity getIndex(String wordHash, boolean deleteIfEmpty) { public plasmaWordIndexEntity getIndex(String wordHash, boolean deleteIfEmpty) {
flushThread.pause(); flushThread.pause();
flushFromMem(wordHash, false); flushFromMem(wordHash, false);

@ -389,6 +389,7 @@ public final class yacy {
} }
} catch (Exception ee) { } catch (Exception ee) {
serverLog.logFailure("STARTUP", "FATAL ERROR: " + ee.getMessage(),ee); serverLog.logFailure("STARTUP", "FATAL ERROR: " + ee.getMessage(),ee);
ee.printStackTrace();
} }
serverLog.logSystem("SHUTDOWN", "goodbye. (this is the last line)"); serverLog.logSystem("SHUTDOWN", "goodbye. (this is the last line)");
try { try {

Loading…
Cancel
Save