activated assortments, removed write-queues

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@151 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
orbiter 20 years ago
parent d63fae5bd4
commit 4b01ff7548

@ -81,13 +81,13 @@ Changes take effect immediately</td>
If this is a big number, it shows that the caching works efficiently.
</td>
</tr>
#{assortmentCluster}#
<tr valign="top" class="TableCellDark">
<td class=small>Singletons Cache Size:</td>
<td class=small>#[singletonsSize]#</td>
<td class=small>
The Singletons Cache is a database that holds words that occurred only once.
</td>
<td class=small>Assortment #[assortmentSlot]# Cache Size:</td>
<td class=small>#[assortmentSize]#</td>
<td class=small></td>
</tr>
#{/assortmentCluster}#
<tr valign="top" class="TableCellDark">
<td class=small>Maximum number of Word Caches:</td>
<td class=small><input name="wordCacheMax" type="text" size="20" maxlength="100" value="#[wordCacheMax]#"></td>

@ -178,7 +178,13 @@ public class Performance_p {
prop.put("maxURLinWordCache", "" + switchboard.wordIndex.maxURLinWordCache());
prop.put("maxWaitingWordFlush", switchboard.getConfig("maxWaitingWordFlush", "180"));
prop.put("wordCacheMax", switchboard.getConfig("wordCacheMax", "10000"));
prop.put("singletonsSize", switchboard.wordIndex.assortmentSizes()[0]);
int[] asizes = switchboard.wordIndex.assortmentSizes();
for (int i = 0; i < asizes.length; i++) {
prop.put("assortmentCluster_" + i + "_assortmentSlot", i + 1);
prop.put("assortmentCluster_" + i + "_assortmentSize", asizes[i]);
}
prop.put("assortmentCluster", asizes.length);
// table thread pool settings
GenericObjectPool.Config crawlerPoolConfig = switchboard.cacheLoader.getPoolConfig();

@ -61,7 +61,6 @@ public class kelondroMap {
private HashMap sortClusterMap; // a String-kelondroMScoreCluster - relation
private HashMap accMap; // to store accumulations of specific fields
private int elementCount;
private writeQueue writeWorker;
public kelondroMap(kelondroDyn dyn) {
this(dyn, null, null);
@ -124,96 +123,9 @@ public class kelondroMap {
// fill acc map
if (accfields != null) for (int i = 0; i < accfields.length; i++) accMap.put(accfields[i], accumulator[i]);
// initialize a writeQueue and start it
writeWorker = new writeQueue();
writeWorker.start();
}
class writeQueue extends Thread {
private LinkedList queue = new LinkedList();
boolean run;
public writeQueue() {
super("kelondroMap:WriteQueue");
run = true;
}
public void stack(String key) {
//System.out.println("kelondroMap: stack(" + dyn.entryFile.name() + ") " + key);
if (this.isAlive())
queue.addLast(key);
else
workoff(key);
}
public void workoff() {
String newKey = null;
synchronized (this.queue) {
if (this.queue.size() > 0) {
newKey = (String) this.queue.removeFirst();
}
}
if (newKey != null) workoff(newKey);
}
public void dequeue(String key) {
// take out one entry
synchronized (this.queue) {
ListIterator i = queue.listIterator();
String k;
while (i.hasNext()) {
k = (String) i.next();
if (k.equals(key)) {
i.remove();
return;
}
}
}
}
public void workoff(String key) {
//System.out.println("kelondroMap: workoff(" + dyn.entryFile.name() + ") " + key);
Map map = (Map) cache.get(key);
if (map == null) return;
try {
writeKra(key, map, "");
} catch (IOException e) {
System.out.println("PANIC! Critical Error in kelondroMap.writeQueue.workoff(" + dyn.entryFile.name() + "): " + e.getMessage());
e.printStackTrace();
run = false;
}
}
public void run() {
try {sleep(((System.currentTimeMillis() / 3) % 10) * 10000);} catch (InterruptedException e) {} // offset start
//System.out.println("XXXX! " + (System.currentTimeMillis() / 1000) + " " + dyn.entryFile.name());
int c;
while (run) {
c = 0; while ((run) && (c++ < 10)) try {sleep(1000);} catch (InterruptedException e) {}
//System.out.println("PING! " + (System.currentTimeMillis() / 1000) + " " + dyn.entryFile.name());
while (queue.size() > 0) {
if (run) try {sleep(5000 / queue.size());} catch (InterruptedException e) {}
workoff();
}
}
while (queue.size() > 0) workoff();
}
public void terminate(boolean waitFor) {
run = false;
if (waitFor) while (this.isAlive()) try {sleep(500);} catch (InterruptedException e) {}
}
}
/*
public synchronized boolean has(String key) throws IOException {
return (cache.containsKey(key)) || (dyn.existsDyn(key));
}
*/
public synchronized void set(String key, Map newMap) throws IOException {
// update elementCount
if ((sortfields != null) || (accfields != null)) {
@ -227,8 +139,8 @@ public class kelondroMap {
}
}
// stack to write queue
writeWorker.stack(key);
// write entry
writeKra(key, newMap, "");
// check for space in cache
checkCacheSpace();
@ -237,7 +149,6 @@ public class kelondroMap {
cacheScore.setScore(key, (int) ((System.currentTimeMillis() - startup) / 1000));
cache.put(key, newMap);
// update sortCluster
if (sortClusterMap != null) updateSortCluster(key, newMap);
@ -298,9 +209,6 @@ public class kelondroMap {
}
}
// remove from queue
writeWorker.dequeue(key);
// remove from cache
cacheScore.deleteScore(key);
cache.remove(key);
@ -345,6 +253,7 @@ public class kelondroMap {
return map;
}
private synchronized void checkCacheSpace() {
// check for space in cache
if (cache.size() >= cachesize) {
@ -396,7 +305,7 @@ public class kelondroMap {
public void close() throws IOException {
// finish queue
writeWorker.terminate(true);
//writeWorker.terminate(true);
// close cluster
if (sortClusterMap != null) {

@ -55,7 +55,8 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
private static final String indexDumpFileName = "indexDump0.stack";
private static final String oldSingletonFileName = "indexSingletons0.db";
private static final String newSingletonFileName = "indexAssortment001.db";
private static final int assortmentLimit = 1;
private static final String indexAssortmentClusterPath = "ACLUSTER";
private static final int assortmentLimit = 3;
// class variables
@ -79,11 +80,19 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
}
public plasmaWordIndexCache(File databaseRoot, plasmaWordIndexInterface backend, int singletonbufferkb, serverLog log) {
// migrate
// migrate#1
File oldSingletonFile = new File(databaseRoot, oldSingletonFileName);
File newSingletonFile = new File(databaseRoot, newSingletonFileName);
if ((oldSingletonFile.exists()) && (!(newSingletonFile.exists()))) oldSingletonFile.renameTo(newSingletonFile);
// create new assortment cluster path
File assortmentClusterPath = new File(databaseRoot, indexAssortmentClusterPath);
if (!(assortmentClusterPath.exists())) assortmentClusterPath.mkdirs();
// migrate#2
File acSingletonFile = new File(assortmentClusterPath, newSingletonFileName);
if ((newSingletonFile.exists()) && (!(acSingletonFile.exists()))) newSingletonFile.renameTo(acSingletonFile);
// creates a new index cache
// the cache has a back-end where indexes that do not fit in the cache are flushed
this.databaseRoot = databaseRoot;
@ -94,7 +103,7 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
this.maxWords = 10000;
this.backend = backend;
this.log = log;
this.assortmentCluster = new plasmaWordIndexAssortmentCluster(databaseRoot, assortmentLimit, singletonBufferSize, log);
this.assortmentCluster = new plasmaWordIndexAssortmentCluster(assortmentClusterPath, assortmentLimit, singletonBufferSize, log);
// read in dump of last session
try {
@ -261,7 +270,7 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
// now decide where to flush that container
plasmaWordIndexEntryContainer flushedFromAssortment = assortmentCluster.removeFromAll(key);
if (flushedFromAssortment == null) {
if ((flushedFromAssortment == null) || (flushedFromAssortment.size() == 0)) {
// not found in assortments
if (container.size() <= assortmentLimit) {
// this fits into the assortments
@ -288,15 +297,15 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
hashScore.setScore(key, container.size());
hashDate.put(key, new Long(time));
}
return -1;
return -flushedFromAssortment.size();
} else {
// add this to the backend
return backend.addEntries(container, java.lang.Math.max(time, flushedFromAssortment.updated()));
return backend.addEntries(container, java.lang.Math.max(time, flushedFromAssortment.updated())) - flushedFromAssortment.size();
}
}
}
private boolean flushFromSingleton(String key) {
private boolean flushFromAssortmentCluster(String key) {
// this should only be called if the singleton shall be deleted or returned in an index entity
plasmaWordIndexEntryContainer container = assortmentCluster.removeFromAll(key);
if (container == null) {
@ -330,8 +339,8 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
// generate flush list
Iterator i = hashScore.scores(true);
TreeMap[] al = new TreeMap[hashScore.getMaxScore() + 1];
for (int k = 0; k < al.length; k++) al[k] = new TreeMap(); // by create time ordered hash-list
TreeMap[] clusterCandidate = new TreeMap[hashScore.getMaxScore()];
for (int k = 0; k < clusterCandidate.length; k++) clusterCandidate[k] = new TreeMap(); // by create time ordered hash-list
while (i.hasNext()) {
// get the entry properties
key = (String) i.next();
@ -339,36 +348,48 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
count = hashScore.getScore(key);
// put it into a specific ohl
al[count].put(createTime, key);
clusterCandidate[count - 1].put(createTime, key);
//System.out.println("COUNT FOR KEY " + key + ": " + count);
}
// print statistics
for (int k = 1; k < al.length; k++) log.logDebug("FLUSH-LIST " + k + ": " + al[k].size() + " entries");
for (int k = 0; k < clusterCandidate.length; k++)
log.logDebug("FLUSH-LIST " + (k + 1) + ": " + clusterCandidate[k].size() + " entries");
// flush singletons
i = al[1].entrySet().iterator();
Map.Entry entry;
while (i.hasNext()) {
entry = (Map.Entry) i.next();
key = (String) entry.getValue();
createTime = (Long) entry.getKey();
if ((createTime != null) && ((System.currentTimeMillis() - createTime.longValue()) > 90000)) {
//log.logDebug("flushing singleton-key " + key + ", count=" + count + ", cachesize=" + cache.size() + ", singleton-size=" + singletons.size());
count += flushFromMem((String) key, true);
}
}
Map.Entry entry;
int candidateCounter;
// flush from assortment cluster
for (int cluster = 0; cluster < assortmentLimit; cluster++) {
candidateCounter = 0;
// select a specific cluster
i = clusterCandidate[cluster].entrySet().iterator();
// check each element in this flush-list: too old?
while (i.hasNext()) {
entry = (Map.Entry) i.next();
key = (String) entry.getValue();
createTime = (Long) entry.getKey();
if ((createTime != null) && ((System.currentTimeMillis() - createTime.longValue()) > 90000)) {
//log.logDebug("flushing singleton-key " + key + ", count=" + count + ", cachesize=" + cache.size() + ", singleton-size=" + singletons.size());
count += java.lang.Math.abs(flushFromMem(key, true));
candidateCounter += cluster + 1;
}
}
if (candidateCounter > 0) log.logDebug("flushed low-cluster #" + (cluster + 1) + ", count=" + count + ", candidateCounter=" + candidateCounter + ", cachesize=" + cache.size());
if (count > 2000) return count;
}
// flush high-scores
for (int k = al.length - 1; k >= 2; k--) {
i = al[k].entrySet().iterator();
for (int cluster = clusterCandidate.length; cluster > 0; cluster--) {
candidateCounter = 0;
i = clusterCandidate[cluster - 1].entrySet().iterator();
while (i.hasNext()) {
entry = (Map.Entry) i.next();
key = (String) entry.getValue();
createTime = (Long) entry.getKey();
if ((createTime != null) && ((System.currentTimeMillis() - createTime.longValue()) > (600000/k))) {
//log.logDebug("flushing high-key " + key + ", count=" + count + ", cachesize=" + cache.size() + ", singleton-size=" + singletons.size());
count += flushFromMem(key, false);
if ((createTime != null) && ((System.currentTimeMillis() - createTime.longValue()) > (600000/cluster))) {
count += java.lang.Math.abs(flushFromMem(key, false));
candidateCounter += cluster + 1;
log.logDebug("flushed high-cluster #" + (cluster + 1) + ", key=" + key + ", count=" + count + ", cachesize=" + cache.size());
}
if (count > 2000) return count;
}
@ -380,7 +401,7 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
public plasmaWordIndexEntity getIndex(String wordHash, boolean deleteIfEmpty) {
flushFromMem(wordHash, false);
flushFromSingleton(wordHash);
flushFromAssortmentCluster(wordHash);
return backend.getIndex(wordHash, deleteIfEmpty);
}
@ -402,7 +423,7 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
public synchronized int removeEntries(String wordHash, String[] urlHashes, boolean deleteComplete) {
flushFromMem(wordHash, false);
flushFromSingleton(wordHash);
flushFromAssortmentCluster(wordHash);
return backend.removeEntries(wordHash, urlHashes, deleteComplete);
}

Loading…
Cancel
Save