re-organization of index management

this was done to be prepared for new storage algorithms


git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@1635 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
orbiter 19 years ago
parent a8a5ad959a
commit 7eb10675b3

@ -288,7 +288,7 @@ public class IndexControl_p {
// generate list
if (post.containsKey("keyhashsimilar")) {
final Iterator hashIt = switchboard.wordIndex.wordHashes(keyhash, plasmaWordIndex.RL_WORDFILES, true, true);
final Iterator hashIt = switchboard.wordIndex.wordHashes(keyhash, plasmaWordIndex.RL_WORDFILES, true);
StringBuffer result = new StringBuffer("Sequential List of Word-Hashes:<br>");
String hash;
int i = 0;

@ -218,10 +218,12 @@ public class PerformanceMemory_p {
prop.put("Xms", Xms.substring(0, Xms.length() - 1));
// create statistics about write cache object space
int chunksizes = Math.max(
int chunksizes = ((kelondroObjectSpace.statAlive().size() > 0) &&
(kelondroObjectSpace.statHeap().size() > 0)) ?
Math.max(
((Integer) kelondroObjectSpace.statAlive().lastKey()).intValue(),
((Integer) kelondroObjectSpace.statHeap().lastKey()).intValue()
);
) : 0;
int[] statAlive = new int[chunksizes];
int[] statHeap = new int[chunksizes];
for (int i = 0; i < chunksizes; i++) { statAlive[i] = 0; statHeap[i] = 0; }

@ -259,7 +259,7 @@ public class PerformanceQueues_p {
prop.put("onlineCautionDelay", switchboard.getConfig("onlineCautionDelay", "30000"));
prop.put("onlineCautionDelayCurrent", System.currentTimeMillis() - switchboard.proxyLastAccess);
int[] asizes = switchboard.wordIndex.assortmentSizes();
int[] asizes = switchboard.wordIndex.assortmentsSizes();
for (int i = 0; i < asizes.length; i += 8) {
prop.put("assortmentCluster_" + (i/8) + "_assortmentSlots", (i + 1) + "-" + (i + 8));
prop.put("assortmentCluster_" + (i/8) + "_assortmentSizeA", asizes[i]);

@ -126,7 +126,7 @@ public final class transferRWI {
wordHash = estring.substring(0, p);
wordhashes[received] = wordHash;
entry = new plasmaWordIndexEntry(estring.substring(p));
sb.wordIndex.addEntries(plasmaWordIndexEntryContainer.instantContainer(wordHash, System.currentTimeMillis(), entry), true);
sb.wordIndex.addEntries(plasmaWordIndexEntryContainer.instantContainer(wordHash, System.currentTimeMillis(), entry), System.currentTimeMillis(), true);
serverCore.checkInterruption();
urlHash = entry.getUrlHash();

@ -111,7 +111,7 @@ public class plasmaDbImporter extends AbstractImporter implements dbImporter {
// iterate over all words from import db
Iterator importWordHashIterator = this.importWordIndex.wordHashes(this.wordChunkStartHash, plasmaWordIndex.RL_WORDFILES, true, false);
Iterator importWordHashIterator = this.importWordIndex.wordHashes(this.wordChunkStartHash, plasmaWordIndex.RL_WORDFILES, false);
while (!isAborted() && importWordHashIterator.hasNext()) {
plasmaWordIndexEntryContainer newContainer = null;
@ -154,7 +154,7 @@ public class plasmaDbImporter extends AbstractImporter implements dbImporter {
if (isAborted()) break;
// importing entity container to home db
this.homeWordIndex.addEntries(newContainer, false);
this.homeWordIndex.addEntries(newContainer, System.currentTimeMillis(), false);
// delete complete index entity file
this.importWordIndex.deleteIndex(this.wordHash);

@ -110,7 +110,7 @@ public class plasmaWordIndexAssortmentImporter extends AbstractImporter implemen
this.wordEntryCount += container.size();
// importing entity container to home db
this.sb.wordIndex.addEntries(container, false);
this.sb.wordIndex.addEntries(container, System.currentTimeMillis(), false);
if (this.wordEntityCount % 500 == 0) {
this.log.logFine(this.wordEntityCount + " word entities processed so far.");

@ -303,7 +303,7 @@ public final class plasmaSearchEvent extends Thread implements Runnable {
while (hashi.hasNext()) {
wordHash = (String) hashi.next();
rcGlobal.setWordHash(wordHash);
wordIndex.addEntries(rcGlobal, true);
wordIndex.addEntries(rcGlobal, System.currentTimeMillis(), true);
log.logFine("FLUSHED " + wordHash + ": " + rcGlobal.size() + " url entries");
}
// the rcGlobal was flushed, empty it

@ -58,17 +58,32 @@ import java.net.URL;
import de.anomic.htmlFilter.htmlFilterContentScraper;
import de.anomic.kelondro.kelondroBase64Order;
import de.anomic.kelondro.kelondroException;
import de.anomic.kelondro.kelondroMergeIterator;
import de.anomic.kelondro.kelondroNaturalOrder;
import de.anomic.server.logging.serverLog;
public final class plasmaWordIndex {
private static final String indexAssortmentClusterPath = "ACLUSTER";
private static final int assortmentCount = 64;
private final File databaseRoot;
private final plasmaWordIndexCache ramCache;
private final plasmaWordIndexAssortmentCluster assortmentCluster;
private int assortmentBufferSize; //kb
private final plasmaWordIndexClassicDB backend;
public plasmaWordIndex(File databaseRoot, int bufferkb, serverLog log) {
this.databaseRoot = databaseRoot;
plasmaWordIndexClassicDB fileDB = new plasmaWordIndexClassicDB(databaseRoot, log);
this.ramCache = new plasmaWordIndexCache(databaseRoot, fileDB, bufferkb, log);
this.backend = new plasmaWordIndexClassicDB(databaseRoot, log);
this.ramCache = new plasmaWordIndexCache(databaseRoot, log);
// create new assortment cluster path
File assortmentClusterPath = new File(databaseRoot, indexAssortmentClusterPath);
if (!(assortmentClusterPath.exists())) assortmentClusterPath.mkdirs();
this.assortmentBufferSize = bufferkb;
this.assortmentCluster = new plasmaWordIndexAssortmentCluster(assortmentClusterPath, assortmentCount, assortmentBufferSize, log);
}
public File getRoot() {
@ -83,26 +98,59 @@ public final class plasmaWordIndex {
return ramCache.wordCacheRAMSize();
}
public int[] assortmentSizes() {
return ramCache.assortmentsSizes();
public int[] assortmentsSizes() {
return assortmentCluster.sizes();
}
public int[] assortmentsCacheChunkSizeAvg() {
return ramCache.assortmentsCacheChunkSizeAvg();
return assortmentCluster.cacheChunkSizeAvg();
}
public int[] assortmentsCacheFillStatusCml() {
return ramCache.assortmentsCacheFillStatusCml();
return assortmentCluster.cacheFillStatusCml();
}
public void setMaxWords(int maxWordsLow, int maxWordsHigh) {
ramCache.setMaxWords(maxWordsLow, maxWordsHigh);
}
public int addEntries(plasmaWordIndexEntryContainer entries, boolean highPriority) {
return ramCache.addEntries(entries, System.currentTimeMillis(), highPriority);
public int addEntries(plasmaWordIndexEntryContainer entries, long updateTime, boolean highPriority) {
int added = ramCache.addEntries(entries, updateTime, highPriority);
// force flush
if (highPriority) {
if (ramCache.size() > ramCache.getMaxWordsHigh()) {
while (ramCache.size() + 500 > ramCache.getMaxWordsHigh()) {
try { Thread.sleep(10); } catch (InterruptedException e) { }
flushCacheToBackend(ramCache.bestFlushWordHash());
}}
} else {
if (ramCache.size() > ramCache.getMaxWordsLow()) {
while (ramCache.size() + 500 > ramCache.getMaxWordsLow()) {
try { Thread.sleep(10); } catch (InterruptedException e) { }
flushCacheToBackend(ramCache.bestFlushWordHash());
}}
}
return added;
}
private void flushCacheToBackend(String wordHash) {
plasmaWordIndexEntryContainer c = ramCache.deleteContainer(wordHash);
plasmaWordIndexEntryContainer feedback = assortmentCluster.storeTry(wordHash, c);
if (feedback != null) {
backend.addEntries(feedback, System.currentTimeMillis(), true);
}
}
public int addEntriesBackend(plasmaWordIndexEntryContainer entries) {
plasmaWordIndexEntryContainer feedback = assortmentCluster.storeTry(entries.wordHash(), entries);
if (feedback == null) {
return entries.size();
} else {
return backend.addEntries(feedback, -1, true);
}
}
private static final int hour = 3600000;
private static final int day = 86400000;
@ -171,25 +219,55 @@ public final class plasmaWordIndex {
language,
doctype,
true);
addEntries(plasmaWordIndexEntryContainer.instantContainer(wordHash, System.currentTimeMillis(), ientry), false);
addEntries(plasmaWordIndexEntryContainer.instantContainer(wordHash, System.currentTimeMillis(), ientry), System.currentTimeMillis(), false);
}
// System.out.println("DEBUG: plasmaSearch.addPageIndex: added " +
// condenser.getWords().size() + " words, flushed " + c + " entries");
return condenser.RESULT_SIMI_WORDS;
}
public int indexSize(String wordHash) {
return ramCache.indexSize(wordHash);
}
public plasmaWordIndexEntryContainer getContainer(String wordHash, boolean deleteIfEmpty, long maxTime) {
return ramCache.getContainer(wordHash, deleteIfEmpty, maxTime);
long start = System.currentTimeMillis();
plasmaWordIndexEntryContainer container = new plasmaWordIndexEntryContainer(wordHash);
// get from cache
// We must not use the container from cache to store everything we find,
// as that container remains linked to in the cache and might be changed later
// while the returned container is still in use.
// e.g. indexTransfer might keep this container for minutes while
// several new pages could be added to the index, possibly with the same words that have
// been selected for transfer
container.add(ramCache.getContainer(wordHash, true));
// get from assortments
container.add(assortmentCluster.getFromAll(wordHash, (maxTime < 0) ? -1 : maxTime / 2));
// get from backend
if (maxTime > 0) {
maxTime = maxTime - (System.currentTimeMillis() - start);
if (maxTime < 0)
maxTime = 100;
}
container.add(backend.getContainer(wordHash, deleteIfEmpty, (maxTime < 0) ? -1 : maxTime));
return container;
}
public plasmaWordIndexEntity getEntity(String wordHash, boolean deleteIfEmpty, long maxTime) {
return ramCache.getEntity(wordHash, deleteIfEmpty, maxTime);
// this possibly creates an index file in the back-end
// the index file is opened and returned as entity object
long start = System.currentTimeMillis();
flushCacheToBackend(wordHash);
if (maxTime < 0) {
flushFromAssortmentCluster(wordHash, -1);
} else {
long remaining = maxTime - (System.currentTimeMillis() - start);
if (remaining > 0)
flushFromAssortmentCluster(wordHash, remaining);
}
long r = maxTime - (System.currentTimeMillis() - start);
return backend.getEntity(wordHash, deleteIfEmpty, (r < 0) ? 0 : r);
}
public Set getContainers(Set wordHashes, boolean deleteIfEmpty, boolean interruptIfEmpty, long maxTime) {
// retrieve entities that belong to the hashes
@ -218,42 +296,23 @@ public final class plasmaWordIndex {
return containers;
}
/*
public Set getEntities(Set wordHashes, boolean deleteIfEmpty, boolean interruptIfEmpty, long maxTime) {
// retrieve entities that belong to the hashes
HashSet entities = new HashSet();
String singleHash;
plasmaWordIndexEntity singleEntity;
Iterator i = wordHashes.iterator();
long start = System.currentTimeMillis();
long remaining;
while (i.hasNext()) {
// check time
remaining = maxTime - (System.currentTimeMillis() - start);
//if ((maxTime > 0) && (remaining <= 0)) break;
// get next hash:
singleHash = (String) i.next();
// retrieve index
singleEntity = getEntity(singleHash, deleteIfEmpty, (maxTime < 0) ? -1 : remaining / (wordHashes.size() - entities.size()));
// check result
if (((singleEntity == null) || (singleEntity.size() == 0)) && (interruptIfEmpty)) return new HashSet();
entities.add(singleEntity);
}
return entities;
}
*/
public int size() {
return ramCache.size();
return java.lang.Math.max(assortmentCluster.sizeTotal(),
java.lang.Math.max(backend.size(), ramCache.size()));
}
public int removeEntries(String wordHash, String[] urlHashes, boolean deleteComplete) {
return ramCache.removeEntries(wordHash, urlHashes, deleteComplete);
public int indexSize(String wordHash) {
int size = 0;
try {
plasmaWordIndexEntity entity = backend.getEntity(wordHash, true, -1);
if (entity != null) {
size += entity.size();
entity.close();
}
} catch (IOException e) {}
size += assortmentCluster.indexSize(wordHash);
size += ramCache.indexSize(wordHash);
return size;
}
public void intermission(long pause) {
@ -262,28 +321,85 @@ public final class plasmaWordIndex {
public void close(int waitingBoundSeconds) {
ramCache.close(waitingBoundSeconds);
assortmentCluster.close();
backend.close(10);
}
public void deleteIndex(String wordHash) {
ramCache.deleteIndex(wordHash);
ramCache.deleteContainer(wordHash);
assortmentCluster.removeFromAll(wordHash, -1);
backend.deleteIndex(wordHash);
}
public synchronized int removeEntries(String wordHash, String[] urlHashes, boolean deleteComplete) {
int removed = 0;
removed += ramCache.removeEntries(wordHash, urlHashes, deleteComplete);
plasmaWordIndexEntryContainer container = assortmentCluster.removeFromAll(wordHash, -1);
if (container != null) this.addEntries(container, System.currentTimeMillis(), false);
removed = backend.removeEntries(wordHash, urlHashes, deleteComplete);
return removed;
}
private boolean flushFromAssortmentCluster(String key, long maxTime) {
// this should only be called if the assortment shall be deleted or returned in an index entity
if (maxTime > 0) maxTime = 8 * maxTime / 10; // reserve time for later adding to backend
plasmaWordIndexEntryContainer container = assortmentCluster.removeFromAll(key, maxTime);
if (container == null) {
return false;
} else {
// we have a non-empty entry-container
// integrate it to the backend
return backend.addEntries(container, container.updated(), true) > 0;
}
}
public static final int RL_RAMCACHE = 0;
public static final int RL_FILECACHE = 1;
public static final int RL_ASSORTMENTS = 2;
public static final int RL_WORDFILES = 3;
public Iterator wordHashes(String startHash, int resourceLevel, boolean up, boolean rot) {
if (rot) return new rotatingWordIterator(startHash, resourceLevel, up);
else return new correctedWordIterator(startHash, resourceLevel, up, rot); // use correction until bug is found
public Iterator wordHashes(String startHash, int resourceLevel, boolean rot) {
if (rot) return new rotatingWordIterator(startHash, resourceLevel);
else return new correctedWordIterator(startHash, resourceLevel, rot); // use correction until bug is found
}
private Iterator wordHashesX(String startWordHash, int resourceLevel, boolean rot) {
if (resourceLevel == plasmaWordIndex.RL_RAMCACHE) {
return ramCache.wordHashes(startWordHash, rot);
}
/*
if (resourceLevel == plasmaWordIndex.RL_FILECACHE) {
}
*/
if (resourceLevel == plasmaWordIndex.RL_ASSORTMENTS) {
return new kelondroMergeIterator(
ramCache.wordHashes(startWordHash, rot),
assortmentCluster.hashConjunction(startWordHash, true, rot),
kelondroNaturalOrder.naturalOrder,
true);
}
if (resourceLevel == plasmaWordIndex.RL_WORDFILES) {
return new kelondroMergeIterator(
new kelondroMergeIterator(
ramCache.wordHashes(startWordHash, rot),
assortmentCluster.hashConjunction(startWordHash, true, rot),
kelondroNaturalOrder.naturalOrder,
true),
backend.wordHashes(startWordHash, true, false),
kelondroNaturalOrder.naturalOrder,
true);
}
return null;
}
private final class correctedWordIterator implements Iterator {
Iterator iter;
String nextWord;
public correctedWordIterator(String firstWord, int resourceLevel, boolean up, boolean rotating) {
iter = ramCache.wordHashes(firstWord, resourceLevel, up, rotating);
public correctedWordIterator(String firstWord, int resourceLevel, boolean rotating) {
iter = wordHashesX(firstWord, resourceLevel, rotating);
try {
nextWord = (iter.hasNext()) ? (String) iter.next() : null;
boolean corrected = true;
@ -291,18 +407,13 @@ public final class plasmaWordIndex {
while ((nextWord != null) && (corrected) && (cc < 50)) {
int c = firstWord.compareTo(nextWord);
corrected = false;
if ((c > 0) && (up)) {
if (c > 0) {
// firstKey > nextNode.getKey()
//System.out.println("CORRECTING WORD ITERATOR: firstWord=" + firstWord + ", nextWord=" + nextWord);
nextWord = (iter.hasNext()) ? (String) iter.next() : null;
corrected = true;
cc++;
}
if ((c < 0) && (!(up))) {
nextWord = (iter.hasNext()) ? (String) iter.next() : null;
corrected = true;
cc++;
}
}
} catch (java.util.ConcurrentModificationException e) {
nextWord = null;
@ -336,12 +447,10 @@ public final class plasmaWordIndex {
private class rotatingWordIterator implements Iterator {
Iterator i;
int resourceLevel;
boolean up;
public rotatingWordIterator(String startWordHash, int resourceLevel, boolean up) {
this.up = up;
public rotatingWordIterator(String startWordHash, int resourceLevel) {
this.resourceLevel = resourceLevel;
i = new correctedWordIterator(startWordHash, resourceLevel, up, false);
i = new correctedWordIterator(startWordHash, resourceLevel, false);
}
public void finalize() {
@ -351,7 +460,7 @@ public final class plasmaWordIndex {
public boolean hasNext() {
if (i.hasNext()) return true;
else {
i = new correctedWordIterator((up)?"------------":"zzzzzzzzzzzz", resourceLevel, up, false);
i = new correctedWordIterator("------------", resourceLevel, false);
return i.hasNext();
}
}
@ -365,9 +474,6 @@ public final class plasmaWordIndex {
}
} // class rotatingWordIterator
public Object migrateWords2Assortment(String wordHash) throws IOException {
return ramCache.migrateWords2Assortment(wordHash);
}
/*
public Iterator fileIterator(String startHash, boolean up, boolean deleteEmpty) {
return new iterateFiles(startHash, up, deleteEmpty);
@ -481,12 +587,65 @@ public final class plasmaWordIndex {
}
}
*/
public Object migrateWords2Assortment(String wordhash) throws IOException {
// returns the number of entries that had been added to the assortments
// can be negative if some assortments have been moved to the backend
File db = plasmaWordIndexEntity.wordHash2path(databaseRoot, wordhash);
if (!(db.exists())) return "not available";
plasmaWordIndexEntity entity = null;
try {
entity = new plasmaWordIndexEntity(databaseRoot, wordhash, true);
int size = entity.size();
if (size > assortmentCluster.clusterCapacity) {
// this will be too big to integrate it
entity.close(); entity = null;
return "too big";
} else {
// take out all words from the assortment to see if it fits
// together with the extracted assortment
plasmaWordIndexEntryContainer container = assortmentCluster.removeFromAll(wordhash, -1);
if (size + container.size() > assortmentCluster.clusterCapacity) {
// this will also be too big to integrate, add to entity
entity.addEntries(container);
entity.close(); entity = null;
return new Integer(-container.size());
} else {
// the combined container will fit, read the container
try {
Iterator entries = entity.elements(true);
plasmaWordIndexEntry entry;
while (entries.hasNext()) {
entry = (plasmaWordIndexEntry) entries.next();
// System.out.println("ENTRY = " + entry.getUrlHash());
container.add(new plasmaWordIndexEntry[]{entry}, System.currentTimeMillis());
}
// we have read all elements, now delete the entity
entity.deleteComplete();
entity.close(); entity = null;
// integrate the container into the assortments; this will work
assortmentCluster.storeTry(wordhash, container);
return new Integer(size);
} catch (kelondroException e) {
// database corrupted, we simply give up the database and delete it
try {entity.close();} catch (Exception ee) {} entity = null;
try {db.delete();} catch (Exception ee) {}
return "database corrupted; deleted";
}
}
}
} finally {
if (entity != null) try {entity.close();}catch(Exception e){}
}
}
public static void main(String[] args) {
// System.out.println(kelondroMSetTools.fastStringComparator(true).compare("RwGeoUdyDQ0Y", "rwGeoUdyDQ0Y"));
// System.out.println(new Date(reverseMicroDateDays(microDateDays(System.currentTimeMillis()))));
plasmaWordIndex index = new plasmaWordIndex(new File("D:\\dev\\proxy\\DATA\\PLASMADB"), 555, new serverLog("TESTAPP"));
Iterator iter = index.wordHashes("5A8yhZMh_Kmv", plasmaWordIndex.RL_WORDFILES, true, true);
Iterator iter = index.wordHashes("5A8yhZMh_Kmv", plasmaWordIndex.RL_WORDFILES, true);
while (iter.hasNext()) {
System.out.println("File: " + (String) iter.next());
}

@ -52,8 +52,6 @@ import java.util.TreeMap;
import de.anomic.kelondro.kelondroArray;
import de.anomic.kelondro.kelondroException;
import de.anomic.kelondro.kelondroMScoreCluster;
import de.anomic.kelondro.kelondroMergeIterator;
import de.anomic.kelondro.kelondroNaturalOrder;
import de.anomic.kelondro.kelondroRecords;
import de.anomic.server.logging.serverLog;
import de.anomic.yacy.yacySeedDB;
@ -62,22 +60,16 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
// environment constants
private static final String indexArrayFileName = "indexDump1.array";
private static final String indexAssortmentClusterPath = "ACLUSTER";
private static final int assortmentCount = 64;
private static final int ramCacheLimit = 200;
private static final int ramCacheLimit = 60;
// class variables
private final File databaseRoot;
private final plasmaWordIndexInterface backend;
private final TreeMap cache;
private final kelondroMScoreCluster hashScore;
private final kelondroMScoreCluster hashDate;
private long startTime;
private int maxWordsLow, maxWordsHigh; // we have 2 cache limits for different priorities
private final serverLog log;
private final plasmaWordIndexAssortmentCluster assortmentCluster;
private int assortmentBufferSize; //kb
//private final flush flushThread;
// calculated constants
private static String maxKey;
@ -86,38 +78,25 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
//minKey = ""; for (int i = 0; i < yacySeedDB.commonHashLength; i++) maxKey += '-';
}
public plasmaWordIndexCache(File databaseRoot, plasmaWordIndexInterface backend, int assortmentbufferkb, serverLog log) {
// create new assortment cluster path
File assortmentClusterPath = new File(databaseRoot, indexAssortmentClusterPath);
if (!(assortmentClusterPath.exists())) assortmentClusterPath.mkdirs();
// create flushing thread
//flushThread = new flush();
public plasmaWordIndexCache(File databaseRoot, serverLog log) {
// creates a new index cache
// the cache has a back-end where indexes that do not fit in the cache are flushed
this.databaseRoot = databaseRoot;
this.assortmentBufferSize = assortmentbufferkb;
this.cache = new TreeMap();
this.hashScore = new kelondroMScoreCluster();
this.hashDate = new kelondroMScoreCluster();
this.startTime = System.currentTimeMillis();
this.maxWordsLow = 8000;
this.maxWordsHigh = 10000;
this.backend = backend;
this.log = log;
this.assortmentCluster = new plasmaWordIndexAssortmentCluster(assortmentClusterPath, assortmentCount, assortmentBufferSize, log);
// read in dump of last session
try {
restore();
} catch (IOException e){
log.logSevere("unable to restore cache dump: " + e.getMessage(), e);
}
// start permanent flushing
//flushThread.start();
}
private void dump(int waitingSeconds) throws IOException {
@ -189,7 +168,7 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
long creationTime;
plasmaWordIndexEntry wordEntry;
byte[][] row;
Runtime rt = Runtime.getRuntime();
//Runtime rt = Runtime.getRuntime();
while (i-- > 0) {
// get out one entry
row = dumpArray.get(i);
@ -201,7 +180,7 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
addEntry(wordHash, wordEntry, creationTime);
urlCount++;
// protect against memory shortage
while (rt.freeMemory() < 1000000) {flushFromMem(); java.lang.System.gc();}
//while (rt.freeMemory() < 1000000) {flushFromMem(); java.lang.System.gc();}
// write a log
if (System.currentTimeMillis() > messageTime) {
System.gc(); // for better statistic
@ -223,12 +202,6 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
return urlCount;
}
/*
public void intermission(long pause) {
flushThread.intermission(pause);
}
*/
// cache settings
public int maxURLinWordCache() {
@ -243,249 +216,70 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
this.maxWordsLow = maxWordsLow;
this.maxWordsHigh = maxWordsHigh;
}
public int[] assortmentsSizes() {
return assortmentCluster.sizes();
}
public int[] assortmentsCacheChunkSizeAvg() {
return assortmentCluster.cacheChunkSizeAvg();
public int getMaxWordsLow() {
return this.maxWordsLow;
}
public int[] assortmentsCacheFillStatusCml() {
return assortmentCluster.cacheFillStatusCml();
public int getMaxWordsHigh() {
return this.maxWordsHigh;
}
public int size() {
return java.lang.Math.max(assortmentCluster.sizeTotal(), java.lang.Math.max(backend.size(), cache.size()));
return cache.size();
}
public int indexSize(String wordHash) {
int size = 0;
try {
plasmaWordIndexEntity entity = backend.getEntity(wordHash, true, -1);
if (entity != null) {
size += entity.size();
entity.close();
}
} catch (IOException e) {}
size += assortmentCluster.indexSize(wordHash);
plasmaWordIndexEntryContainer cacheIndex = (plasmaWordIndexEntryContainer) cache.get(wordHash);
if (cacheIndex != null) size += cacheIndex.size();
return size;
}
public Iterator wordHashes(String startWordHash, boolean up, boolean rot) {
return wordHashes(startWordHash, plasmaWordIndex.RL_WORDFILES, up, rot);
}
public Iterator wordHashes(String startWordHash, int resourceLevel, boolean up, boolean rot) {
synchronized (cache) {
if (!(up)) throw new RuntimeException("plasmaWordIndexCache.wordHashes can only count up");
if (resourceLevel == plasmaWordIndex.RL_RAMCACHE) {
return cache.tailMap(startWordHash).keySet().iterator();
}
/*
if (resourceLevel == plasmaWordIndex.RL_FILECACHE) {
}
*/
if (resourceLevel == plasmaWordIndex.RL_ASSORTMENTS) {
return new kelondroMergeIterator(
cache.tailMap(startWordHash).keySet().iterator(),
assortmentCluster.hashConjunction(startWordHash, true, rot),
kelondroNaturalOrder.naturalOrder,
true);
}
if (resourceLevel == plasmaWordIndex.RL_WORDFILES) {
return new kelondroMergeIterator(
new kelondroMergeIterator(
cache.tailMap(startWordHash).keySet().iterator(),
assortmentCluster.hashConjunction(startWordHash, true, rot),
kelondroNaturalOrder.naturalOrder,
true),
backend.wordHashes(startWordHash, true, false),
kelondroNaturalOrder.naturalOrder,
true);
}
return null;
}
public Iterator wordHashes(String startWordHash, boolean rot) {
if (rot) throw new UnsupportedOperationException("plasmaWordIndexCache cannot rotate");
return cache.tailMap(startWordHash).keySet().iterator();
}
/*
public Iterator wordHashes(String startWordHash, boolean up, boolean rot) {
// here we merge 3 databases into one view:
// - the RAM Cache
// - the assortmentCluster File Cache
// - the backend
if (!(up)) throw new RuntimeException("plasmaWordIndexCache.wordHashes can only count up");
//if (rot) System.out.println("WARNING: wordHashes does not work correctly when individual Assotments rotate on their own!");
//return new rotatingWordHashes(startWordHash, up);
return new kelondroMergeIterator(
new kelondroMergeIterator(
cache.tailMap(startWordHash).keySet().iterator(),
assortmentCluster.hashConjunction(startWordHash, true, rot),
kelondroNaturalOrder.naturalOrder,
true),
backend.wordHashes(startWordHash, true, false),
kelondroNaturalOrder.naturalOrder,
true);
}
*/
/*
private final class flush extends Thread {
boolean terminate;
long intermission;
public flush() {
terminate = false;
intermission = 0;
this.setName(this.getClass().getName());
}
public void intermission(long pause) {
this.intermission = System.currentTimeMillis() + pause;
}
public void run() {
long pausetime;
while (!terminate) {
if (intermission > 0) {
if (this.intermission > System.currentTimeMillis()) {
try {sleep(this.intermission - System.currentTimeMillis());} catch (InterruptedException e) {}
}
this.intermission = 0;
}
flushFromMem();
pausetime = 1 + java.lang.Math.min(1000, 5 * maxWordsHigh / (cache.size() + 1));
if (cache.size() == 0) pausetime = 2000;
try { sleep(pausetime); } catch (InterruptedException e) { }
}
}
public void terminate() {
terminate = true;
}
}
*/
private void flushFromMem() {
public String bestFlushWordHash() {
// select appropriate hash
// we have 2 different methods to find a good hash:
// - the oldest entry in the cache
// - the entry with maximum count
if (cache.size() == 0) return;
if (cache.size() == 0) return null;
try {
synchronized (cache) {
String hash = (String) hashScore.getMaxObject();
if (hash == null) return;
if (hash == null) return null;
int count = hashScore.getMaxScore();
long time = longTime(hashDate.getScore(hash));
if ((count > ramCacheLimit) || ((count > assortmentCount) && (System.currentTimeMillis() - time > 10000))) {
//long time = longTime(hashDate.getScore(hash));
if (count > ramCacheLimit) {
// flush high-score entries
flushFromMem(hash);
return hash;
} else {
// flush oldest entries
hash = (String) hashDate.getMinObject();
flushFromMem(hash);
return hash;
}
}
} catch (Exception e) {
log.logSevere("flushFromMem: " + e.getMessage(), e);
}
}
private int flushFromMem(String key) {
// this method flushes indexes out from the ram to the disc.
plasmaWordIndexEntryContainer container = null;
long time;
synchronized (cache) {
// get the container and remove it from cache
container = (plasmaWordIndexEntryContainer) this.cache.remove(key);
if (container == null) return 0; // flushing of nonexisting key
time = container.updated();
// remove it from the MScoreClusters
hashScore.deleteScore(key);
hashDate.deleteScore(key);
}
// now decide where to flush that container
plasmaWordIndexEntryContainer feedback = assortmentCluster.storeTry(key, container);
if (feedback == null) {
return container.size();
} else {
// *** should care about another option here ***
return backend.addEntries(feedback, time, true);
}
return null;
}
private int intTime(long longTime) {
return (int) ((longTime - startTime) / 1000);
}
/*
private long longTime(int intTime) {
return ((long) intTime) * ((long) 1000) + startTime;
}
private boolean flushFromAssortmentCluster(String key, long maxTime) {
// this should only be called if the assortment shall be deleted or returned in an index entity
if (maxTime > 0) maxTime = 8 * maxTime / 10; // reserve time for later adding to backend
plasmaWordIndexEntryContainer container = assortmentCluster.removeFromAll(key, maxTime);
if (container == null) {
return false;
} else {
// we have a non-empty entry-container
// integrate it to the backend
return backend.addEntries(container, container.updated(), true) > 0;
}
}
public plasmaWordIndexEntryContainer getContainer(String wordHash, boolean deleteIfEmpty, long maxTime) {
long start = System.currentTimeMillis();
plasmaWordIndexEntryContainer container;
synchronized (cache) {
container = new plasmaWordIndexEntryContainer(wordHash);
// get from cache
// We must not use the container from cache to store everything we find, as that
// container remains linked to in the cache and might be changed later while the
// returned container is still in use.
// e.g. indexTransfer might keep this container for minutes while several new pages
// could be added to the index, possibly with the same words that have been selected
// for transfer
container.add((plasmaWordIndexEntryContainer) cache.get(wordHash));
// get from assortments
container.add(assortmentCluster.getFromAll(wordHash, (maxTime < 0) ? -1 : maxTime / 2));
// get from backend
if (maxTime > 0) {
maxTime = maxTime - (System.currentTimeMillis() - start);
if (maxTime < 0) maxTime = 100;
}
container.add(backend.getContainer(wordHash, deleteIfEmpty, (maxTime < 0) ? -1 : maxTime));
}
return container;
}
public plasmaWordIndexEntity getEntity(String wordHash, boolean deleteIfEmpty, long maxTime) {
// this possibly creates an index file in the back-end
// the index file is opened and returned as entity object
long start = System.currentTimeMillis();
synchronized (cache) {
flushFromMem(wordHash);
if (maxTime < 0) {
flushFromAssortmentCluster(wordHash, -1);
} else {
long remaining = maxTime - (System.currentTimeMillis() - start);
if (remaining > 0)
flushFromAssortmentCluster(wordHash, remaining);
}
}
long r = maxTime - (System.currentTimeMillis() - start);
return backend.getEntity(wordHash, deleteIfEmpty, (r < 0) ? 0 : r);
*/
public plasmaWordIndexEntryContainer getContainer(String wordHash, boolean deleteIfEmpty) {
return (plasmaWordIndexEntryContainer) cache.get(wordHash);
}
public long getUpdateTime(String wordHash) {
@ -499,21 +293,27 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
*/
}
public void deleteIndex(String wordHash) {
public plasmaWordIndexEntryContainer deleteContainer(String wordHash) {
// returns the index that had been deleted
synchronized (cache) {
cache.remove(wordHash);
plasmaWordIndexEntryContainer container = (plasmaWordIndexEntryContainer) cache.remove(wordHash);
hashScore.deleteScore(wordHash);
hashDate.deleteScore(wordHash);
return container;
}
assortmentCluster.removeFromAll(wordHash, -1);
backend.deleteIndex(wordHash);
}
public synchronized int removeEntries(String wordHash, String[] urlHashes, boolean deleteComplete) {
flushFromMem(wordHash);
flushFromAssortmentCluster(wordHash, -1);
int removed = backend.removeEntries(wordHash, urlHashes, deleteComplete);
return removed;
public int removeEntries(String wordHash, String[] urlHashes, boolean deleteComplete) {
if (urlHashes.length == 0) return 0;
int count = 0;
synchronized (cache) {
plasmaWordIndexEntryContainer c = (plasmaWordIndexEntryContainer) deleteContainer(wordHash);
if (c != null) {
for (int i = 0; i < urlHashes.length; i++) count += (c.remove(urlHashes[i]) == null) ? 0 : 1;
if (c.size() != 0) this.addEntries(c, System.currentTimeMillis(), false);
}
}
return count;
}
public int addEntries(plasmaWordIndexEntryContainer container, long updateTime, boolean highPriority) {
@ -537,21 +337,6 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
hashDate.setScore(wordHash, intTime(updateTime));
}
entries = null;
// force flush
if (highPriority) {
if (cache.size() > maxWordsHigh) {
while (cache.size() + 500 > maxWordsHigh) {
try { Thread.sleep(10); } catch (InterruptedException e) { }
flushFromMem();
}}
} else {
if (cache.size() > maxWordsLow) {
while (cache.size() + 500 > maxWordsLow) {
try { Thread.sleep(10); } catch (InterruptedException e) { }
flushFromMem();
}}
}
}
return added;
}
@ -572,70 +357,11 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
}
public void close(int waitingSeconds) {
// stop permanent flushing
//flushThread.terminate();
//try {flushThread.join(6000);} catch (InterruptedException e) {}
// dump cache
try {
dump(waitingSeconds);
} catch (IOException e){
log.logSevere("unable to dump cache: " + e.getMessage(), e);
}
// close cluster
assortmentCluster.close();
}
public Object migrateWords2Assortment(String wordhash) throws IOException {
// returns the number of entries that had been added to the assortments
// can be negative if some assortments have been moved to the backend
File db = plasmaWordIndexEntity.wordHash2path(databaseRoot, wordhash);
if (!(db.exists())) return "not available";
plasmaWordIndexEntity entity = null;
try {
entity = new plasmaWordIndexEntity(databaseRoot, wordhash, true);
int size = entity.size();
if (size > assortmentCluster.clusterCapacity) {
// this will be too big to integrate it
entity.close(); entity = null;
return "too big";
} else {
// take out all words from the assortment to see if it fits
// together with the extracted assortment
plasmaWordIndexEntryContainer container = assortmentCluster.removeFromAll(wordhash, -1);
if (size + container.size() > assortmentCluster.clusterCapacity) {
// this will also be too big to integrate, add to entity
entity.addEntries(container);
entity.close(); entity = null;
return new Integer(-container.size());
} else {
// the combined container will fit, read the container
try {
Iterator entries = entity.elements(true);
plasmaWordIndexEntry entry;
while (entries.hasNext()) {
entry = (plasmaWordIndexEntry) entries.next();
// System.out.println("ENTRY = " + entry.getUrlHash());
container.add(new plasmaWordIndexEntry[]{entry}, System.currentTimeMillis());
}
// we have read all elements, now delete the entity
entity.deleteComplete();
entity.close(); entity = null;
// integrate the container into the assortments; this will work
assortmentCluster.storeTry(wordhash, container);
return new Integer(size);
} catch (kelondroException e) {
// database corrupted, we simply give up the database and delete it
try {entity.close();} catch (Exception ee) {} entity = null;
try {db.delete();} catch (Exception ee) {}
return "database corrupted; deleted";
}
}
}
} finally {
if (entity != null) try {entity.close();}catch(Exception e){}
}
}
}

@ -53,9 +53,8 @@ import de.anomic.kelondro.kelondroNaturalOrder;
import de.anomic.server.logging.serverLog;
import de.anomic.yacy.yacySeedDB;
public class plasmaWordIndexClassicDB implements plasmaWordIndexInterface {
public class plasmaWordIndexClassicDB {
// class variables
private final File databaseRoot;
private final serverLog log;

@ -324,14 +324,14 @@ public final class plasmaWordIndexDistribution {
selectTransferContainers(String hash, int mincount, int maxcount) {
Object[] selectResult = selectTransferContainersResource(hash, plasmaWordIndex.RL_RAMCACHE, maxcount);
int refcount = ((Integer) selectResult[2]).intValue();
if (refcount >= mincount) {
log.logFine("DHT selection from RAM: " + refcount + " entries");
int refcountRAM = ((Integer) selectResult[2]).intValue();
if (refcountRAM >= mincount) {
log.logFine("DHT selection from RAM: " + refcountRAM + " entries");
return selectResult;
}
selectResult = selectTransferContainersResource(hash, plasmaWordIndex.RL_WORDFILES, maxcount);
refcount = ((Integer) selectResult[2]).intValue();
log.logFine("DHT selection from FILE: " + refcount + " entries");
int refcountFile = ((Integer) selectResult[2]).intValue();
log.logFine("DHT selection from FILE: " + refcountFile + " entries, RAM provided only " + refcountRAM + " entries");
return selectResult;
}
@ -341,7 +341,7 @@ public final class plasmaWordIndexDistribution {
ArrayList tmpContainers = new ArrayList(maxcount);
String nexthash = "";
synchronized (this.wordIndex) {try {
Iterator wordHashIterator = this.wordIndex.wordHashes(hash, resourceLevel, true, true);
Iterator wordHashIterator = this.wordIndex.wordHashes(hash, resourceLevel, true);
plasmaWordIndexEntryContainer indexContainer;
Iterator urlIter;
plasmaWordIndexEntry indexEntry;

@ -48,13 +48,12 @@ public interface plasmaWordIndexInterface {
public int size();
public Iterator wordHashes(String startWordHash, boolean up, boolean rot);
public plasmaWordIndexEntity getEntity(String wordHash, boolean deleteIfEmpty, long maxTime);
public plasmaWordIndexEntryContainer getContainer(String wordHash, boolean deleteIfEmpty, long maxTime);
public Iterator wordHashes(String startWordHash, boolean rot);
public long getUpdateTime(String wordHash);
public void deleteIndex(String wordHash);
public plasmaWordIndexEntryContainer getContainer(String wordHash, boolean deleteIfEmpty);
public plasmaWordIndexEntryContainer deleteContainer(String wordHash);
public int removeEntries(String wordHash, String[] urlHashes, boolean deleteComplete);
public int addEntries(plasmaWordIndexEntryContainer newEntries, long creationTime, boolean highPriority);

@ -81,7 +81,6 @@ import de.anomic.plasma.plasmaURLPool;
import de.anomic.plasma.plasmaWordIndex;
import de.anomic.plasma.plasmaWordIndexAssortment;
import de.anomic.plasma.plasmaWordIndexAssortmentCluster;
import de.anomic.plasma.plasmaWordIndexCache;
import de.anomic.plasma.plasmaWordIndexClassicDB;
import de.anomic.plasma.plasmaWordIndexEntity;
import de.anomic.plasma.plasmaWordIndexEntry;
@ -659,7 +658,7 @@ public final class yacy {
File dbroot = new File(new File(homePath), "DATA/PLASMADB");
serverLog log = new serverLog("WORDMIGRATION");
log.logInfo("STARTING MIGRATION");
plasmaWordIndexCache wordIndexCache = new plasmaWordIndexCache(dbroot, new plasmaWordIndexClassicDB(dbroot, log), 20000, log);
plasmaWordIndex wordIndexCache = new plasmaWordIndex(dbroot, 20000, log);
enumerateFiles words = new enumerateFiles(new File(dbroot, "WORDS"), true, false, true, true);
String wordhash;
File wordfile;
@ -756,7 +755,7 @@ public final class yacy {
wordEntryCount += container.size();
// importing entity container to home db
homeWordIndex.addEntries(container, true);
homeWordIndex.addEntries(container, System.currentTimeMillis(), true);
if (wordEntityCount % 500 == 0) {
log.logFine(wordEntityCount + " word entities processed so far.");
@ -840,7 +839,7 @@ public final class yacy {
long globalStart = System.currentTimeMillis(), wordChunkStart = System.currentTimeMillis(), wordChunkEnd = 0;
String wordChunkStartHash = "------------", wordChunkEndHash;
Iterator importWordHashIterator = importWordIndex.wordHashes(wordChunkStartHash, plasmaWordIndex.RL_WORDFILES, true, true);
Iterator importWordHashIterator = importWordIndex.wordHashes(wordChunkStartHash, plasmaWordIndex.RL_WORDFILES, true);
while (importWordHashIterator.hasNext()) {
// testing if import process was aborted
@ -886,7 +885,7 @@ public final class yacy {
if (Thread.interrupted()) break;
// importing entity container to home db
homeWordIndex.addEntries(newContainer, true);
homeWordIndex.addEntries(newContainer, System.currentTimeMillis(), true);
// delete complete index entity file
importWordIndex.deleteIndex(wordHash);
@ -949,7 +948,7 @@ public final class yacy {
Runtime rt = Runtime.getRuntime();
int cacheMem = (int)(rt.maxMemory()-rt.totalMemory())-5*1024*1024;
plasmaWordIndex wordIndex = new plasmaWordIndex(dbroot, cacheMem, log);
Iterator wordHashIterator = wordIndex.wordHashes("------------", plasmaWordIndex.RL_WORDFILES, true, true);
Iterator wordHashIterator = wordIndex.wordHashes("------------", plasmaWordIndex.RL_WORDFILES, true);
String wordhash;
long urlCounter = 0, wordCounter = 0;
@ -1336,7 +1335,7 @@ public final class yacy {
Iterator WordHashIterator = null;
if (resource.equals("all")) {
WordIndex = new plasmaWordIndex(homeDBroot, 8*1024*1024, log);
WordHashIterator = WordIndex.wordHashes(wordChunkStartHash, plasmaWordIndex.RL_WORDFILES, true, false);
WordHashIterator = WordIndex.wordHashes(wordChunkStartHash, plasmaWordIndex.RL_WORDFILES, false);
} else if (resource.equals("assortments")) {
plasmaWordIndexAssortmentCluster assortmentCluster = new plasmaWordIndexAssortmentCluster(new File(homeDBroot, "ACLUSTER"), 64, 16*1024*1024, log);
WordHashIterator = assortmentCluster.hashConjunction(wordChunkStartHash, true, false);

Loading…
Cancel
Save