enhanced assortment capacity; added extended WORDS migration

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@412 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
orbiter 20 years ago
parent e6c381a2e2
commit 1f36bf4dae

@ -75,7 +75,7 @@ public final class plasmaWordIndexAssortment {
// class variables
private File assortmentFile;
private int assortmentCapacity;
private int assortmentLength;
private serverLog log;
private kelondroTree assortments;
private long bufferSize;
@ -99,18 +99,18 @@ public final class plasmaWordIndexAssortment {
return structure;
}
public plasmaWordIndexAssortment(File storagePath, int assortmentCapacity, int bufferkb, serverLog log) {
public plasmaWordIndexAssortment(File storagePath, int assortmentLength, int bufferkb, serverLog log) {
if (!(storagePath.exists())) storagePath.mkdirs();
this.assortmentFile = new File(storagePath, assortmentFileName + intx(assortmentCapacity) + ".db");
this.assortmentCapacity = assortmentCapacity;
this.bufferStructureLength = 3 + 2 * assortmentCapacity;
this.assortmentFile = new File(storagePath, assortmentFileName + intx(assortmentLength) + ".db");
this.assortmentLength = assortmentLength;
this.bufferStructureLength = 3 + 2 * assortmentLength;
this.bufferSize = bufferkb * 1024;
this.log = log;
if (assortmentFile.exists()) {
// open existing assortment tree file
try {
assortments = new kelondroTree(assortmentFile, bufferSize);
if (log != null) log.logSystem("Opened Assortment Database, " + assortments.size() + " entries, width " + assortmentCapacity + ", " + bufferkb + "kb buffer");
if (log != null) log.logSystem("Opened Assortment Database, " + assortments.size() + " entries, width " + assortmentLength + ", " + bufferkb + "kb buffer");
} catch (IOException e){
if (log != null) log.logError("unable to open assortment database: " + e.getMessage());
e.printStackTrace();
@ -118,8 +118,8 @@ public final class plasmaWordIndexAssortment {
} else {
// create new assortment tree file
try {
assortments = new kelondroTree(assortmentFile, bufferSize, bufferStructure(assortmentCapacity));
if (log != null) log.logSystem("Created new Assortment Database, width " + assortmentCapacity + ", " + bufferkb + "kb buffer");
assortments = new kelondroTree(assortmentFile, bufferSize, bufferStructure(assortmentLength));
if (log != null) log.logSystem("Created new Assortment Database, width " + assortmentLength + ", " + bufferkb + "kb buffer");
} catch (IOException e){
if (log != null) log.logError("unable to create assortment database: " + e.getMessage());
e.printStackTrace();
@ -131,14 +131,14 @@ public final class plasmaWordIndexAssortment {
// stores a word index to assortment database
// this throws an exception if the word hash already existed
//log.logDebug("storeAssortment: wordHash=" + wordHash + ", urlHash=" + entry.getUrlHash() + ", time=" + creationTime);
if (newContainer.size() != assortmentCapacity) throw new RuntimeException("plasmaWordIndexAssortment.store: wrong container size");
if (newContainer.size() != assortmentLength) throw new RuntimeException("plasmaWordIndexAssortment.store: wrong container size");
byte[][] row = new byte[this.bufferStructureLength][];
row[0] = wordHash.getBytes();
row[1] = kelondroRecords.long2bytes(1, 4);
row[2] = kelondroRecords.long2bytes(newContainer.updated(), 8);
Iterator entries = newContainer.entries();
plasmaWordIndexEntry entry;
for (int i = 0; i < assortmentCapacity; i++) {
for (int i = 0; i < assortmentLength; i++) {
entry = (plasmaWordIndexEntry) entries.next();
row[3 + 2 * i] = entry.getUrlHash().getBytes();
row[4 + 2 * i] = entry.toEncodedForm(true).getBytes();
@ -179,7 +179,7 @@ public final class plasmaWordIndexAssortment {
long updateTime = kelondroRecords.bytes2long(row[2]);
plasmaWordIndexEntry[] wordEntries = new plasmaWordIndexEntry[this.bufferStructureLength];
plasmaWordIndexEntryContainer container = new plasmaWordIndexEntryContainer(wordHash);
for (int i = 0; i < assortmentCapacity; i++) {
for (int i = 0; i < assortmentLength; i++) {
container.add(new plasmaWordIndexEntry[]{new plasmaWordIndexEntry(new String(row[3 + 2 * i]), new String(row[4 + 2 * i]))}, updateTime);
}
return container;
@ -192,7 +192,7 @@ public final class plasmaWordIndexAssortment {
} catch (IOException e) {}
if (!(assortmentFile.delete())) throw new RuntimeException("cannot delete assortment database");
try {
assortments = new kelondroTree(assortmentFile, bufferSize, bufferStructure(assortmentCapacity));
assortments = new kelondroTree(assortmentFile, bufferSize, bufferStructure(assortmentLength));
} catch (IOException e){
log.logError("unable to re-create assortment database: " + e.getMessage());
e.printStackTrace();

@ -57,47 +57,50 @@ public final class plasmaWordIndexAssortmentCluster {
// class variables
private File assortmentsPath;
private int clusterCapacity;
private int clusterCount;
public int clusterCapacity;
private serverLog log;
private plasmaWordIndexAssortment[] assortments;
private long completeBufferKB;
public plasmaWordIndexAssortmentCluster(File assortmentsPath, int clusterCapacity, int bufferkb, serverLog log) {
public plasmaWordIndexAssortmentCluster(File assortmentsPath, int clusterCount, int bufferkb, serverLog log) {
// set class variables
if (!(assortmentsPath.exists())) assortmentsPath.mkdirs();
this.clusterCapacity = clusterCapacity;
this.clusterCount = clusterCount;
this.clusterCapacity = clusterCount * (clusterCount + 1) / 2;
this.completeBufferKB = bufferkb;
this.log = log;
this.assortments = new plasmaWordIndexAssortment[clusterCapacity];
this.assortments = new plasmaWordIndexAssortment[clusterCount];
// open cluster and close it directly again to detect the element sizes
int[] sizes = new int[clusterCapacity];
int[] sizes = new int[clusterCount];
int sumSizes = 1;
plasmaWordIndexAssortment testAssortment;
for (int i = 0; i < clusterCapacity; i++) {
for (int i = 0; i < clusterCount; i++) {
testAssortment = new plasmaWordIndexAssortment(assortmentsPath, i + 1, 0, null);
sizes[i] = testAssortment.size() + clusterCapacity - i;
sizes[i] = testAssortment.size() + clusterCount - i;
sumSizes += sizes[i];
testAssortment.close();
testAssortment = null;
}
// initialize cluster using the cluster elements size for optimal buffer size
for (int i = 0; i < clusterCapacity; i++) {
for (int i = 0; i < clusterCount; i++) {
assortments[i] = new plasmaWordIndexAssortment(assortmentsPath, i + 1, (int) ((long) completeBufferKB * (long) sizes[i] / (long) sumSizes), log);
}
}
public plasmaWordIndexEntryContainer storeTry(String wordHash, plasmaWordIndexEntryContainer newContainer) {
private plasmaWordIndexEntryContainer storeSingular(String wordHash, plasmaWordIndexEntryContainer newContainer) {
// this tries to store the record. If the record does not fit, or a same hash already
// exists and would not fit together with the new record, then the record is deleted from
// the assortmen(s) and returned together with the newRecord.
// if storage was successful, NULL is returned.
if (newContainer.size() > clusterCapacity) return newContainer; // it will not fit
if (newContainer.size() > clusterCount) return newContainer; // it will not fit
plasmaWordIndexEntryContainer buffer;
while ((buffer = assortments[newContainer.size() - 1].remove(wordHash)) != null) {
newContainer.add(buffer);
if (newContainer.size() > clusterCapacity) return newContainer; // it will not fit
if (newContainer.size() > clusterCount) return newContainer; // it will not fit
}
// the assortment (newContainer.size() - 1) should now be empty. put it in there
assortments[newContainer.size() - 1].store(wordHash, newContainer);
@ -105,39 +108,50 @@ public final class plasmaWordIndexAssortmentCluster {
return null;
}
/*
public plasmaWordIndexEntryContainer storeTry(String wordHash, plasmaWordIndexEntryContainer newContainer) {
// this tries to store the record. If the record does not fit, or a same hash already
// exists and would not fit together with the new record, then the record is deleted from
// the assortmen(s) and returned together with the newRecord.
// if storage was successful, NULL is returned.
if (newContainer.size() > clusterCapacity) return newContainer; // it will not fit
plasmaWordIndexEntryContainer buffer;
for (int i = 0; i < clusterCapacity; i++) {
buffer = assortments[i].remove(wordHash);
if (buffer != null) newContainer.add(buffer);
if (newContainer.size() > clusterCapacity) return newContainer; // it will not fit
}
// we collected all records and the result will fit somewhere..
private void storeForced(String wordHash, plasmaWordIndexEntryContainer newContainer) {
// this stores the record and overwrites an existing record.
// this is safe of we can be shure that the record does not exist before.
if ((newContainer == null) || (newContainer.size() == 0) || (newContainer.size() > clusterCount)) return; // it will not fit
assortments[newContainer.size() - 1].store(wordHash, newContainer);
// return null to show that we have stored the new Record successfully
return null;
}
*/
/*
public plasmaWordIndexEntryContainer removeFromOne(String wordHash, int assortment) {
// collect one container from a specific assortment
plasmaWordIndexEntryContainer container = assortments[assortment].remove(wordHash);
if (container == null) return new plasmaWordIndexEntryContainer(wordHash);
return container;
private void storeStretched(String wordHash, plasmaWordIndexEntryContainer newContainer) {
// this stores the record and stretches the storage over
// all the assortments that are necessary to fit in the record
if (newContainer.size() <= clusterCount) {
storeForced(wordHash, newContainer);
return;
}
plasmaWordIndexEntryContainer c;
Iterator i = newContainer.entries();
for (int j = clusterCount; j >= 1; j--) {
c = new plasmaWordIndexEntryContainer(wordHash);
for (int k = 0; k < j; k++) {
if (i.hasNext()) {
c.add((plasmaWordIndexEntry) i.next(), newContainer.updated());
} else {
storeForced(wordHash, c);
return;
}
}
storeForced(wordHash, c);
}
}
public plasmaWordIndexEntryContainer storeTry(String wordHash, plasmaWordIndexEntryContainer newContainer) {
if (newContainer.size() > clusterCapacity) return newContainer; // it will not fit
if (newContainer.size() <= clusterCount) newContainer = storeSingular(wordHash, newContainer);
if (newContainer == null) return null;
newContainer.add(removeFromAll(wordHash));
if (newContainer.size() > clusterCapacity) return newContainer;
storeStretched(wordHash, newContainer);
return null;
}
*/
public plasmaWordIndexEntryContainer removeFromAll(String wordHash) {
// collect all records from all the assortments and return them
plasmaWordIndexEntryContainer buffer, record = new plasmaWordIndexEntryContainer(wordHash);
for (int i = 0; i < clusterCapacity; i++) {
for (int i = 0; i < clusterCount; i++) {
buffer = assortments[i].remove(wordHash);
if (buffer != null) record.add(buffer);
}
@ -146,24 +160,24 @@ public final class plasmaWordIndexAssortmentCluster {
public Iterator hashConjunction(String startWordHash, boolean up) {
HashSet iterators = new HashSet();
for (int i = 0; i < clusterCapacity; i++) iterators.add(assortments[i].hashes(startWordHash, up, true));
for (int i = 0; i < clusterCount; i++) iterators.add(assortments[i].hashes(startWordHash, up, true));
return kelondroMergeIterator.cascade(iterators, up);
}
public int sizeTotal() {
int total = 0;
for (int i = 0; i < clusterCapacity; i++) total += assortments[i].size();
for (int i = 0; i < clusterCount; i++) total += assortments[i].size();
return total;
}
public int[] sizes() {
int[] sizes = new int[clusterCapacity];
for (int i = 0; i < clusterCapacity; i++) sizes[i] = assortments[i].size();
int[] sizes = new int[clusterCount];
for (int i = 0; i < clusterCount; i++) sizes[i] = assortments[i].size();
return sizes;
}
public void close() {
for (int i = 0; i < clusterCapacity; i++) assortments[i].close();
for (int i = 0; i < clusterCount; i++) assortments[i].close();
}
}

@ -47,6 +47,7 @@ import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.TreeMap;
import java.util.Enumeration;
import de.anomic.kelondro.kelondroException;
import de.anomic.kelondro.kelondroMScoreCluster;
@ -54,6 +55,7 @@ import de.anomic.kelondro.kelondroMergeIterator;
import de.anomic.kelondro.kelondroRecords;
import de.anomic.kelondro.kelondroStack;
import de.anomic.kelondro.kelondroArray;
import de.anomic.kelondro.kelondroTree;
import de.anomic.server.logging.serverLog;
import de.anomic.yacy.yacySeedDB;
@ -65,7 +67,7 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
private static final String oldSingletonFileName = "indexSingletons0.db";
private static final String newSingletonFileName = "indexAssortment001.db";
private static final String indexAssortmentClusterPath = "ACLUSTER";
private static final int assortmentLimit = 50;
private static final int assortmentCount = 50;
private static final int ramCacheLimit = 200;
@ -119,7 +121,7 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
this.maxWords = 10000;
this.backend = backend;
this.log = log;
this.assortmentCluster = new plasmaWordIndexAssortmentCluster(assortmentClusterPath, assortmentLimit, assortmentBufferSize, log);
this.assortmentCluster = new plasmaWordIndexAssortmentCluster(assortmentClusterPath, assortmentCount, assortmentBufferSize, log);
// read in dump of last session
try {
@ -438,13 +440,13 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
int count = hashScore.getMaxScore();
long time = longTime(hashDate.getScore(hash));
if ((count > ramCacheLimit) ||
((count > assortmentLimit) && (System.currentTimeMillis() - time > 10000))) {
((count > assortmentCount) && (System.currentTimeMillis() - time > 10000))) {
// flush high-score entries
flushFromMem(hash, true);
flushFromMem(hash);
} else {
// flush oldest entries
hash = (String) hashDate.getMinObject();
flushFromMem(hash, true);
flushFromMem(hash);
}
} catch (Exception e) {
log.logError("flushFromMem: " + e.getMessage());
@ -453,13 +455,8 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
flushThread.proceed();
}
private int flushFromMem(String key, boolean reintegrate) {
private int flushFromMem(String key) {
// this method flushes indexes out from the ram to the disc.
// at first we check the singleton database and act accordingly
// if we we are to flush an index, but see also an entry in the singletons, we
// decide upn the 'reintegrate'-Flag:
// true: do not flush to disc, but re-Integrate the singleton to the RAM
// false: flush the singleton together with container to disc
plasmaWordIndexEntryContainer container = null;
long time;
@ -476,19 +473,11 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
}
// now decide where to flush that container
if (container.size() <= assortmentLimit) {
if (container.size() <= assortmentCluster.clusterCapacity) {
// this fits into the assortments
plasmaWordIndexEntryContainer feedback = assortmentCluster.storeTry(key, container);
if (feedback == null) {
return container.size();
} else if ((container.size() != feedback.size()) && (reintegrate)) {
// put assortmentRecord together with container back to ram
synchronized (cache) {
cache.put(key, feedback);
hashScore.setScore(key, feedback.size());
hashDate.setScore(key, intTime(time));
}
return container.size() - feedback.size();
} else {
// *** should care about another option here ***
return backend.addEntries(feedback, time);
@ -522,7 +511,7 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
public plasmaWordIndexEntity getIndex(String wordHash, boolean deleteIfEmpty) {
flushThread.pause();
flushFromMem(wordHash, false);
flushFromMem(wordHash);
flushFromAssortmentCluster(wordHash);
flushThread.proceed();
return backend.getIndex(wordHash, deleteIfEmpty);
@ -553,7 +542,7 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
public synchronized int removeEntries(String wordHash, String[] urlHashes, boolean deleteComplete) {
flushThread.pause();
flushFromMem(wordHash, false);
flushFromMem(wordHash);
flushFromAssortmentCluster(wordHash);
int removed = backend.removeEntries(wordHash, urlHashes, deleteComplete);
flushThread.proceed();
@ -561,6 +550,7 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
}
public synchronized int addEntries(plasmaWordIndexEntryContainer container, long updateTime) {
// this puts the entries into the cache, not into the assortment directly
flushThread.pause();
//serverLog.logDebug("PLASMA INDEXING", "addEntryToIndexMem: cache.size=" + cache.size() + "; hashScore.size=" + hashScore.size());
while (cache.size() >= this.maxWords) flushFromMem();
@ -618,4 +608,42 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
}
}
public int migrateWords2Assortment(String wordhash) throws IOException {
// returns the number of entries that had been added to the assortments
// can be negative if some assortments have been moved to the backend
File db = plasmaWordIndexEntity.wordHash2path(databaseRoot, wordhash);
if (!(db.exists())) return 0;
plasmaWordIndexEntity entity = new plasmaWordIndexEntity(databaseRoot, wordhash, true);
int size = entity.size();
if (size > assortmentCluster.clusterCapacity) {
// this will be too big to integrate it
entity.close();
return 0;
} else {
// take out all words from the assortment to see if it fits
// together with the extracted assortment
plasmaWordIndexEntryContainer container = assortmentCluster.removeFromAll(wordhash);
if (size + container.size() > assortmentCluster.clusterCapacity) {
// this will also be too big to integrate, add to entity
entity.addEntries(container);
entity.close();
return -container.size();
} else {
// the combined container will fit, read the container
Enumeration entries = entity.elements(true);
plasmaWordIndexEntry entry;
while (entries.hasMoreElements()) {
entry = (plasmaWordIndexEntry) entries.nextElement();
container.add(new plasmaWordIndexEntry[]{entry}, System.currentTimeMillis());
}
// we have read all elements, now delete the entity
entity.deleteComplete();
entity.close();
// integrate the container into the assortments; this will work
assortmentCluster.storeTry(wordhash, container);
return size;
}
}
}
}

@ -226,20 +226,12 @@ public class plasmaWordIndexClassicDB implements plasmaWordIndexInterface {
public int addEntries(plasmaWordIndexEntryContainer container, long creationTime) {
//System.out.println("* adding " + newEntries.size() + " cached word index entries for word " + wordHash); // debug
// fetch the index cache
if (container.size() == 0) return 0;
if ((container == null) || (container.size() == 0)) return 0;
// open file
try {
plasmaWordIndexEntity pi = new plasmaWordIndexEntity(databaseRoot, container.wordHash(), false);
int count = 0;
// write from vector
if (container != null) {
Iterator i = container.entries();
while (i.hasNext()) {
if (pi.addEntry((plasmaWordIndexEntry) i.next())) count++;
}
}
int count = pi.addEntries(container);
// close and return
pi.close();

@ -170,6 +170,31 @@ public class plasmaWordIndexEntity {
}
}
public int addEntries(plasmaWordIndexEntryContainer container) {
//System.out.println("* adding " + newEntries.size() + " cached word index entries for word " + wordHash); // debug
// fetch the index cache
if ((container == null) || (container.size() == 0)) return 0;
// open file
try {
int count = 0;
// write from vector
if (container != null) {
Iterator i = container.entries();
while (i.hasNext()) {
if (addEntry((plasmaWordIndexEntry) i.next())) count++;
}
}
// close and return
return count;
} catch (IOException e) {
e.printStackTrace();
return 0;
}
}
public boolean deleteComplete() throws IOException {
if (theTmpMap == null) {
theIndex.close();

@ -81,6 +81,11 @@ public class plasmaWordIndexEntryContainer implements Comparable {
return wordHash;
}
public int add(plasmaWordIndexEntry entry, long updateTime) {
this.updateTime = java.lang.Math.max(this.updateTime, updateTime);
return (add(entry)) ? 1 : 0;
}
public int add(plasmaWordIndexEntry[] entries, long updateTime) {
int c = 0;
for (int i = 0; i < entries.length; i++) if (add(entries[i])) c++;

@ -98,6 +98,8 @@ import de.anomic.plasma.plasmaWordIndex;
import de.anomic.plasma.plasmaWordIndexEntity;
import de.anomic.plasma.plasmaWordIndexEntry;
import de.anomic.plasma.plasmaWordIndexEntryContainer;
import de.anomic.plasma.plasmaWordIndexClassicDB;
import de.anomic.plasma.plasmaWordIndexCache;
import de.anomic.server.serverCodings;
import de.anomic.server.serverCore;
import de.anomic.server.serverFileUtils;
@ -504,40 +506,6 @@ public final class yacy {
serverLog.logSystem("GEN-WORDSTAT", "FINISHED");
}
private static void checkMigrate(File dbroot, serverLog log, File file, plasmaWordIndex wordIndex) throws IOException {
long length = file.length();
if (length > 3000) {
log.logInfo("SKIPPED " + file.toString() + ": too big, size=" + (length / 1024) + "kb");
return;
}
kelondroTree db = new kelondroTree(file, 0);
String wordhash = file.getName().substring(0, 12);
int size = db.size();
db.close();
if (size <= 50) {
plasmaWordIndexEntryContainer container = new plasmaWordIndexEntryContainer(wordhash);
plasmaWordIndexEntity entity = new plasmaWordIndexEntity(dbroot, wordhash, true);
Enumeration entries = entity.elements(true);
plasmaWordIndexEntry entry;
while (entries.hasMoreElements()) {
entry = (plasmaWordIndexEntry) entries.nextElement();
container.add(new plasmaWordIndexEntry[]{entry}, System.currentTimeMillis());
}
wordIndex.addEntries(container);
entity.deleteComplete();
entity.close();
if (file.exists()) {
log.logInfo("MIGRATED " + file.toString() + ": " + size + " entries, " + (length / 1024) + "kb, delete fail at end");
file.delete();
} else {
log.logInfo("MIGRATED " + file.toString() + ": " + size + " entries, " + (length / 1024) + "kb");
}
} else {
log.logInfo("SKIPPED " + file.toString() + ": " + size + " entries, " + (length / 1024) + "kb");
}
db.close();
}
public static void migrateWords(String homePath) {
// run with "java -classpath classes yacy -migratewords"
try {serverLog.configureLogging(new File(homePath, "yacy.logging"));} catch (Exception e) {}
@ -545,15 +513,26 @@ public final class yacy {
try {
serverLog log = new serverLog("WORDMIGRATION");
log.logInfo("STARTING MIGRATION");
plasmaWordIndex wordIndex = new plasmaWordIndex(dbroot, 20000, log);
plasmaWordIndexCache wordIndexCache = new plasmaWordIndexCache(dbroot, new plasmaWordIndexClassicDB(dbroot, log), 20000, log);
enumerateFiles words = new enumerateFiles(new File(dbroot, "WORDS"), true, false, true, true);
String wordhash;
File wordfile;
int migration;
while (words.hasMoreElements()) try {
checkMigrate(dbroot, log, (File) words.nextElement(), wordIndex);
wordfile = (File) words.nextElement();
wordhash = wordfile.getName().substring(0, 12);
migration = wordIndexCache.migrateWords2Assortment(wordhash);
if (migration == 0)
log.logInfo("SKIPPED " + wordhash + ": too big");
else if (migration > 0)
log.logInfo("MIGRATED " + wordhash + ": " + migration + " entries");
else
log.logInfo("REVERSED " + wordhash + ": " + (-migration) + " entries");
} catch (Exception e) {
e.printStackTrace();
}
log.logInfo("FINISHED MIGRATION JOB, WAIT FOR DUMP");
wordIndex.close(60);
wordIndexCache.close(60);
log.logInfo("TERMINATED MIGRATION");
} catch (IOException e) {
e.printStackTrace();

Loading…
Cancel
Save