|
|
|
@ -65,11 +65,13 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
|
|
|
|
|
private plasmaWordIndexInterface backend;
|
|
|
|
|
private TreeMap cache;
|
|
|
|
|
private kelondroMScoreCluster hashScore;
|
|
|
|
|
private HashMap hashDate;
|
|
|
|
|
private kelondroMScoreCluster hashDate;
|
|
|
|
|
private long startTime;
|
|
|
|
|
private int maxWords;
|
|
|
|
|
private serverLog log;
|
|
|
|
|
private plasmaWordIndexAssortmentCluster assortmentCluster;
|
|
|
|
|
private int singletonBufferSize; //kb
|
|
|
|
|
private int assortmentBufferSize; //kb
|
|
|
|
|
private flush flushThread;
|
|
|
|
|
|
|
|
|
|
// calculated constants
|
|
|
|
|
private static String minKey, maxKey;
|
|
|
|
@ -80,7 +82,7 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
|
|
|
|
|
for (int i = 0; i < yacySeedDB.commonHashLength; i++) maxKey += '-';
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public plasmaWordIndexCache(File databaseRoot, plasmaWordIndexInterface backend, int singletonbufferkb, serverLog log) {
|
|
|
|
|
public plasmaWordIndexCache(File databaseRoot, plasmaWordIndexInterface backend, int assortmentbufferkb, serverLog log) {
|
|
|
|
|
// migrate#1
|
|
|
|
|
File oldSingletonFile = new File(databaseRoot, oldSingletonFileName);
|
|
|
|
|
File newSingletonFile = new File(databaseRoot, newSingletonFileName);
|
|
|
|
@ -94,17 +96,21 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
|
|
|
|
|
File acSingletonFile = new File(assortmentClusterPath, newSingletonFileName);
|
|
|
|
|
if ((newSingletonFile.exists()) && (!(acSingletonFile.exists()))) newSingletonFile.renameTo(acSingletonFile);
|
|
|
|
|
|
|
|
|
|
// create flushing thread
|
|
|
|
|
flushThread = new flush();
|
|
|
|
|
|
|
|
|
|
// creates a new index cache
|
|
|
|
|
// the cache has a back-end where indexes that do not fit in the cache are flushed
|
|
|
|
|
this.databaseRoot = databaseRoot;
|
|
|
|
|
this.singletonBufferSize = singletonbufferkb;
|
|
|
|
|
this.assortmentBufferSize = assortmentbufferkb;
|
|
|
|
|
this.cache = new TreeMap();
|
|
|
|
|
this.hashScore = new kelondroMScoreCluster();
|
|
|
|
|
this.hashDate = new HashMap();
|
|
|
|
|
this.hashDate = new kelondroMScoreCluster();
|
|
|
|
|
this.startTime = System.currentTimeMillis();
|
|
|
|
|
this.maxWords = 10000;
|
|
|
|
|
this.backend = backend;
|
|
|
|
|
this.log = log;
|
|
|
|
|
this.assortmentCluster = new plasmaWordIndexAssortmentCluster(assortmentClusterPath, assortmentLimit, singletonBufferSize, log);
|
|
|
|
|
this.assortmentCluster = new plasmaWordIndexAssortmentCluster(assortmentClusterPath, assortmentLimit, assortmentBufferSize, log);
|
|
|
|
|
|
|
|
|
|
// read in dump of last session
|
|
|
|
|
try {
|
|
|
|
@ -113,8 +119,12 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
|
|
|
|
|
log.logError("unable to restore cache dump: " + e.getMessage());
|
|
|
|
|
e.printStackTrace();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// start permanent flushing
|
|
|
|
|
flushThread.start();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private void dump(int waitingSeconds) throws IOException {
|
|
|
|
|
log.logSystem("creating dump for index cache, " + cache.size() + " words (and much more urls)");
|
|
|
|
|
File indexDumpFile = new File(databaseRoot, indexDumpFileName);
|
|
|
|
@ -125,20 +135,17 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
|
|
|
|
|
long wordsPerSecond = 0, wordcount = 0, urlcount = 0;
|
|
|
|
|
synchronized (cache) {
|
|
|
|
|
Iterator i = cache.entrySet().iterator();
|
|
|
|
|
//Iterator i = hashScore.scores(true);
|
|
|
|
|
Map.Entry entry;
|
|
|
|
|
String wordHash;
|
|
|
|
|
plasmaWordIndexEntryContainer container;
|
|
|
|
|
long creationTime;
|
|
|
|
|
long updateTime;
|
|
|
|
|
plasmaWordIndexEntry wordEntry;
|
|
|
|
|
byte[][] row = new byte[5][];
|
|
|
|
|
while (i.hasNext()) {
|
|
|
|
|
// get entries
|
|
|
|
|
entry = (Map.Entry) i.next();
|
|
|
|
|
//wordHash = (String) i.next();
|
|
|
|
|
wordHash = (String) entry.getKey();
|
|
|
|
|
creationTime = getCreationTime(wordHash);
|
|
|
|
|
//container = (plasmaWordIndexEntryContainer) cache.get(wordHash);
|
|
|
|
|
updateTime = getUpdateTime(wordHash);
|
|
|
|
|
container = (plasmaWordIndexEntryContainer) entry.getValue();
|
|
|
|
|
|
|
|
|
|
// put entries on stack
|
|
|
|
@ -148,7 +155,7 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
|
|
|
|
|
wordEntry = (plasmaWordIndexEntry) ci.next();
|
|
|
|
|
row[0] = wordHash.getBytes();
|
|
|
|
|
row[1] = kelondroRecords.long2bytes(container.size(), 4);
|
|
|
|
|
row[2] = kelondroRecords.long2bytes(creationTime, 8);
|
|
|
|
|
row[2] = kelondroRecords.long2bytes(updateTime, 8);
|
|
|
|
|
row[3] = wordEntry.getUrlHash().getBytes();
|
|
|
|
|
row[4] = wordEntry.toEncodedForm(true).getBytes();
|
|
|
|
|
dumpStack.push(row);
|
|
|
|
@ -235,7 +242,7 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
|
|
|
|
|
public Iterator wordHashes(String startWordHash, boolean up) {
|
|
|
|
|
// here we merge 3 databases into one view:
|
|
|
|
|
// - the RAM Cache
|
|
|
|
|
// - the singleton File Cache
|
|
|
|
|
// - the assortmentCluster File Cache
|
|
|
|
|
// - the backend
|
|
|
|
|
if (!(up)) throw new RuntimeException("plasmaWordIndexCache.wordHashes can only count up");
|
|
|
|
|
return new kelondroMergeIterator(
|
|
|
|
@ -247,6 +254,49 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
|
|
|
|
|
true);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private class flush extends Thread {
|
|
|
|
|
boolean terminate, pause;
|
|
|
|
|
|
|
|
|
|
public flush() {
|
|
|
|
|
terminate = false;
|
|
|
|
|
pause = false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void run() {
|
|
|
|
|
String nextHash;
|
|
|
|
|
while (!terminate) {
|
|
|
|
|
if (pause) {
|
|
|
|
|
try {this.sleep(300);} catch (InterruptedException e) {}
|
|
|
|
|
} else {
|
|
|
|
|
nextHash = (String) hashDate.getMinObject();
|
|
|
|
|
if (nextHash != null) {
|
|
|
|
|
try {
|
|
|
|
|
flushFromMem(nextHash, true);
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
log.logError("flushThread: " + e.getMessage());
|
|
|
|
|
e.printStackTrace();
|
|
|
|
|
}
|
|
|
|
|
try {this.sleep(10 + java.lang.Math.min(1000, 10 * maxWords/(cache.size() + 1)));} catch (InterruptedException e) {}
|
|
|
|
|
} else {
|
|
|
|
|
try {this.sleep(2000);} catch (InterruptedException e) {}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void pause() {
|
|
|
|
|
pause = true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void proceed() {
|
|
|
|
|
pause = false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void terminate() {
|
|
|
|
|
terminate = true;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private int flushFromMem(String key, boolean reintegrate) {
|
|
|
|
|
// this method flushes indexes out from the ram to the disc.
|
|
|
|
|
// at first we check the singleton database and act accordingly
|
|
|
|
@ -261,12 +311,12 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
|
|
|
|
|
// get the container
|
|
|
|
|
container = (plasmaWordIndexEntryContainer) cache.get(key);
|
|
|
|
|
if (container == null) return 0; // flushing of nonexisting key
|
|
|
|
|
time = getCreationTime(key);
|
|
|
|
|
time = getUpdateTime(key);
|
|
|
|
|
|
|
|
|
|
// remove it from the cache
|
|
|
|
|
cache.remove(key);
|
|
|
|
|
hashScore.deleteScore(key);
|
|
|
|
|
hashDate.remove(key);
|
|
|
|
|
hashDate.deleteScore(key);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// now decide where to flush that container
|
|
|
|
@ -296,7 +346,7 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
|
|
|
|
|
synchronized (cache) {
|
|
|
|
|
cache.put(key, container);
|
|
|
|
|
hashScore.setScore(key, container.size());
|
|
|
|
|
hashDate.put(key, new Long(time));
|
|
|
|
|
hashDate.setScore(key, intTime(time));
|
|
|
|
|
}
|
|
|
|
|
return -flushedFromAssortment.size();
|
|
|
|
|
} else {
|
|
|
|
@ -306,6 +356,14 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private int intTime(long longTime) {
|
|
|
|
|
return (int) ((longTime - startTime) / 1000);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private long longTime(int intTime) {
|
|
|
|
|
return ((long) intTime) * ((long) 1000) + startTime;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private boolean flushFromAssortmentCluster(String key) {
|
|
|
|
|
// this should only be called if the singleton shall be deleted or returned in an index entity
|
|
|
|
|
plasmaWordIndexEntryContainer container = assortmentCluster.removeFromAll(key);
|
|
|
|
@ -319,19 +377,9 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private synchronized int flushFromMemToLimit() {
|
|
|
|
|
if ((hashScore.size() == 0) && (cache.size() == 0)) {
|
|
|
|
|
serverLog.logDebug("PLASMA INDEXING", "flushToLimit: called but cache is empty");
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
if ((hashScore.size() == 0) && (cache.size() != 0)) {
|
|
|
|
|
serverLog.logError("PLASMA INDEXING", "flushToLimit: hashScore.size=0 but cache.size=" + cache.size());
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
if ((hashScore.size() != 0) && (cache.size() == 0)) {
|
|
|
|
|
serverLog.logError("PLASMA INDEXING", "flushToLimit: hashScore.size=" + hashScore.size() + " but cache.size=0");
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
if ((hashScore.size() == 0) || (cache.size() == 0)) return 0;
|
|
|
|
|
|
|
|
|
|
flushThread.pause();
|
|
|
|
|
int count = 0;
|
|
|
|
|
//serverLog.logDebug("PLASMA INDEXING", "flushSpecific: hashScore.size=" + hashScore.size() + ", cache.size=" + cache.size());
|
|
|
|
|
synchronized (hashScore) {
|
|
|
|
@ -345,7 +393,7 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
|
|
|
|
|
while (i.hasNext()) {
|
|
|
|
|
// get the entry properties
|
|
|
|
|
key = (String) i.next();
|
|
|
|
|
createTime = (Long) hashDate.get(key);
|
|
|
|
|
createTime = new Long(longTime(hashDate.getScore(key)));
|
|
|
|
|
count = hashScore.getScore(key);
|
|
|
|
|
|
|
|
|
|
// put it into a specific ohl
|
|
|
|
@ -396,7 +444,10 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
|
|
|
|
|
|
|
|
|
|
// stop flushing if cache is shrinked enough
|
|
|
|
|
// avoid as possible to flush high-scores
|
|
|
|
|
if (cache.size() < this.maxWords - 100) return count;
|
|
|
|
|
if (cache.size() < this.maxWords - 100) {
|
|
|
|
|
flushThread.proceed();
|
|
|
|
|
return count;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// flush high-scores
|
|
|
|
|
for (int cluster = java.lang.Math.min(clusterCandidate.length, ramcacheLimit); cluster > assortmentLimit; cluster--) {
|
|
|
|
@ -411,43 +462,60 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
|
|
|
|
|
candidateCounter += cluster;
|
|
|
|
|
log.logDebug("flushed high-cluster below limit #" + cluster + ", key=" + key + ", count=" + count + ", cachesize=" + cache.size());
|
|
|
|
|
}
|
|
|
|
|
if (cache.size() < this.maxWords - 100) return count;
|
|
|
|
|
if (cache.size() < this.maxWords - 100) {
|
|
|
|
|
flushThread.proceed();
|
|
|
|
|
return count;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
flushThread.proceed();
|
|
|
|
|
return count;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public plasmaWordIndexEntity getIndex(String wordHash, boolean deleteIfEmpty) {
|
|
|
|
|
flushThread.pause();
|
|
|
|
|
flushFromMem(wordHash, false);
|
|
|
|
|
flushFromAssortmentCluster(wordHash);
|
|
|
|
|
flushThread.proceed();
|
|
|
|
|
return backend.getIndex(wordHash, deleteIfEmpty);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public long getCreationTime(String wordHash) {
|
|
|
|
|
Long time = (Long) hashDate.get(wordHash);
|
|
|
|
|
public long getUpdateTime(String wordHash) {
|
|
|
|
|
plasmaWordIndexEntryContainer entries = (plasmaWordIndexEntryContainer) cache.get(wordHash);
|
|
|
|
|
if (entries == null) return 0;
|
|
|
|
|
return entries.updated();
|
|
|
|
|
/*
|
|
|
|
|
Long time = new Long(longTime(hashDate.getScore(wordHash)));
|
|
|
|
|
if (time == null) return 0;
|
|
|
|
|
return time.longValue();
|
|
|
|
|
*/
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void deleteIndex(String wordHash) {
|
|
|
|
|
flushThread.pause();
|
|
|
|
|
synchronized (cache) {
|
|
|
|
|
cache.remove(wordHash);
|
|
|
|
|
hashScore.deleteScore(wordHash);
|
|
|
|
|
hashDate.remove(wordHash);
|
|
|
|
|
hashDate.deleteScore(wordHash);
|
|
|
|
|
}
|
|
|
|
|
assortmentCluster.removeFromAll(wordHash);
|
|
|
|
|
backend.deleteIndex(wordHash);
|
|
|
|
|
flushThread.proceed();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public synchronized int removeEntries(String wordHash, String[] urlHashes, boolean deleteComplete) {
|
|
|
|
|
flushThread.pause();
|
|
|
|
|
flushFromMem(wordHash, false);
|
|
|
|
|
flushFromAssortmentCluster(wordHash);
|
|
|
|
|
return backend.removeEntries(wordHash, urlHashes, deleteComplete);
|
|
|
|
|
int removed = backend.removeEntries(wordHash, urlHashes, deleteComplete);
|
|
|
|
|
flushThread.proceed();
|
|
|
|
|
return removed;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public synchronized int addEntries(plasmaWordIndexEntryContainer container, long updateTime) {
|
|
|
|
|
flushThread.pause();
|
|
|
|
|
//serverLog.logDebug("PLASMA INDEXING", "addEntryToIndexMem: cache.size=" + cache.size() + "; hashScore.size=" + hashScore.size());
|
|
|
|
|
if (cache.size() >= this.maxWords) flushFromMemToLimit();
|
|
|
|
|
//if (flushc > 0) serverLog.logDebug("PLASMA INDEXING", "addEntryToIndexMem - flushed " + flushc + " entries");
|
|
|
|
@ -462,10 +530,11 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
|
|
|
|
|
if (added > 0) {
|
|
|
|
|
cache.put(wordHash, entries);
|
|
|
|
|
hashScore.addScore(wordHash, added);
|
|
|
|
|
hashDate.put(wordHash, new Long(updateTime));
|
|
|
|
|
hashDate.setScore(wordHash, intTime(updateTime));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
//System.out.println("DEBUG: cache = " + cache.toString());
|
|
|
|
|
flushThread.proceed();
|
|
|
|
|
return added;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -475,11 +544,17 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
|
|
|
|
|
if (entries.add(new plasmaWordIndexEntry[]{newEntry}, updateTime) > 0) {
|
|
|
|
|
cache.put(wordHash, entries);
|
|
|
|
|
hashScore.incScore(wordHash);
|
|
|
|
|
hashDate.put(wordHash, new Long(updateTime));
|
|
|
|
|
hashDate.setScore(wordHash, intTime(updateTime));
|
|
|
|
|
}
|
|
|
|
|
flushThread.proceed();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void close(int waitingSeconds) {
|
|
|
|
|
// stop permanent flushing
|
|
|
|
|
flushThread.terminate();
|
|
|
|
|
try {flushThread.join(5000);} catch (InterruptedException e) {}
|
|
|
|
|
|
|
|
|
|
// close cluster
|
|
|
|
|
assortmentCluster.close();
|
|
|
|
|
try {
|
|
|
|
|
dump(waitingSeconds);
|
|
|
|
|