added some memory protection

too large collection arrays are now avoided. By default, the biggest
collection index is 7. larger collections are dumped into a commons
directory, but cannot yet be used. Bevore doing a dump, the collection
is splittet into a part which has only root-references, and stored back
to the collection; the remaining part goes to commons

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@3426 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
orbiter 18 years ago
parent ce360ef43e
commit b466baa574

@ -44,7 +44,7 @@ public class indexCollectionRI implements indexRI {
kelondroCollectionIndex collectionIndex;
public indexCollectionRI(File path, String filenameStub, long buffersize, long preloadTime, kelondroRow payloadrow) {
public indexCollectionRI(File path, String filenameStub, long buffersize, long preloadTime, int maxpartition, kelondroRow payloadrow) {
try {
collectionIndex = new kelondroCollectionIndex(
path,
@ -54,6 +54,7 @@ public class indexCollectionRI implements indexRI {
buffersize,
preloadTime,
4 /*loadfactor*/,
maxpartition,
payloadrow);
} catch (IOException e) {
serverLog.logSevere("PLASMA", "unable to open collection index at " + path.toString() + ":" + e.getMessage());

@ -49,9 +49,6 @@ import de.anomic.yacy.yacySeedDB;
public final class indexRAMRI implements indexRI {
// environment constants
public static final long wCacheMaxAge = 1000 * 60 * 30; // milliseconds; 30 minutes
// class variables
private final File databaseRoot;
protected final SortedMap cache; // wordhash-container
@ -59,13 +56,14 @@ public final class indexRAMRI implements indexRI {
private final kelondroMScoreCluster hashDate;
private long initTime;
private int cacheMaxCount;
public int cacheReferenceLimit;
public int cacheReferenceCountLimit;
public long cacheReferenceAgeLimit;
private final serverLog log;
private String indexArrayFileName;
private kelondroRow payloadrow;
private kelondroRow bufferStructureBasis;
public indexRAMRI(File databaseRoot, kelondroRow payloadrow, int wCacheReferenceLimitInit, String dumpname, serverLog log) {
public indexRAMRI(File databaseRoot, kelondroRow payloadrow, int wCacheReferenceCountLimitInit, long wCacheReferenceAgeLimitInit, String dumpname, serverLog log) {
// creates a new index cache
// the cache has a back-end where indexes that do not fit in the cache are flushed
@ -75,7 +73,8 @@ public final class indexRAMRI implements indexRI {
this.hashDate = new kelondroMScoreCluster();
this.initTime = System.currentTimeMillis();
this.cacheMaxCount = 10000;
this.cacheReferenceLimit = wCacheReferenceLimitInit;
this.cacheReferenceCountLimit = wCacheReferenceCountLimitInit;
this.cacheReferenceAgeLimit = wCacheReferenceAgeLimitInit;
this.log = log;
this.indexArrayFileName = dumpname;
this.payloadrow = payloadrow;
@ -341,14 +340,14 @@ public final class indexRAMRI implements indexRI {
try {
String hash = null;
int count = hashScore.getMaxScore();
if ((count >= cacheReferenceLimit) &&
if ((count >= cacheReferenceCountLimit) &&
((hash = (String) hashScore.getMaxObject()) != null)) {
// we MUST flush high-score entries, because a loop deletes entries in cache until this condition fails
// in this cache we MUST NOT check wCacheMinAge
return hash;
}
long oldestTime = longEmit(hashDate.getMinScore());
if (((System.currentTimeMillis() - oldestTime) > wCacheMaxAge) &&
if (((System.currentTimeMillis() - oldestTime) > cacheReferenceAgeLimit) &&
((hash = (String) hashDate.getMinObject()) != null)) {
// flush out-dated entries
return hash;

@ -27,16 +27,23 @@ package de.anomic.kelondro;
import java.io.File;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.GregorianCalendar;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.TimeZone;
import java.util.TreeMap;
import de.anomic.index.indexContainer;
import de.anomic.plasma.plasmaURL;
import de.anomic.server.serverCodings;
import de.anomic.server.serverFileUtils;
import de.anomic.server.logging.serverLog;
@ -44,14 +51,15 @@ public class kelondroCollectionIndex {
private static final int serialNumber = 0;
protected kelondroIndex index;
private int keylength;
private kelondroIndex index;
private int keylength;
private File path;
private String filenameStub;
private File commonsPath;
private int loadfactor;
private Map arrays; // Map of (partitionNumber"-"chunksize)/kelondroFixedWidthArray - Objects
private kelondroRow payloadrow; // definition of the payload (chunks inside the collections)
// private int partitions; // this is the maxmimum number of array files; yet not used
private int maxPartitions; // this is the maxmimum number of array files; yet not used
private static final int idx_col_key = 0; // the index
private static final int idx_col_chunksize = 1; // chunksize (number of bytes in a single chunk, needed for migration option)
@ -101,14 +109,18 @@ public class kelondroCollectionIndex {
public kelondroCollectionIndex(File path, String filenameStub, int keyLength, kelondroOrder indexOrder,
long buffersize, long preloadTime,
int loadfactor, kelondroRow rowdef) throws IOException {
int loadfactor, int maxpartitions,
kelondroRow rowdef) throws IOException {
// the buffersize is number of bytes that are only used if the kelondroFlexTable is backed up with a kelondroTree
this.path = path;
this.filenameStub = filenameStub;
this.keylength = keyLength;
this.payloadrow = rowdef;
this.loadfactor = loadfactor;
this.maxPartitions = maxpartitions;
this.commonsPath = new File(path, filenameStub + "." + fillZ(Integer.toHexString(rowdef.objectsize).toUpperCase(), 4) + ".commons");
this.commonsPath.mkdirs();
boolean ramIndexGeneration = false;
boolean fileIndexGeneration = !(new File(path, filenameStub + ".index").exists());
if (ramIndexGeneration) index = new kelondroRowSet(indexRow(keyLength, indexOrder), 0);
@ -591,9 +603,20 @@ public class kelondroCollectionIndex {
oldcollection.shape();
oldcollection.uniq(); // FIXME: not clear if it would be better to insert the collection with put to avoid double-entries
oldcollection.trim();
// check for size of collection:
// if necessary shrink the collection and dump a part of that collection
// to avoid that this grows too big
int newPartitionNumber;
while ((newPartitionNumber = arrayIndex(oldcollection.size())) > maxPartitions) {
kelondroRowSet newcollection = shrinkCollection(key, oldcollection, arrayCapacity(maxPartitions));
saveCommons(key, oldcollection);
oldcollection = newcollection;
}
// work on with oldcollection
collection = oldcollection;
int newPartitionNumber = arrayIndex(collection.size());
newPartitionNumber = arrayIndex(collection.size());
// see if we need new space or if we can overwrite the old space
if (oldPartitionNumber == newPartitionNumber) {
@ -678,7 +701,7 @@ public class kelondroCollectionIndex {
// load the old collection and join it
kelondroRowSet oldcollection = getwithparams(indexrow, oldchunksize, oldchunkcount, oldPartitionNumber, oldrownumber, oldSerialNumber, false);
// join with new collection
oldcollection.addAllUnique(collection);
oldcollection.shape();
@ -686,7 +709,19 @@ public class kelondroCollectionIndex {
oldcollection.trim();
collection = oldcollection;
int newPartitionNumber = arrayIndex(collection.size());
// check for size of collection:
// if necessary shrink the collection and dump a part of that collection
// to avoid that this grows too big
int newPartitionNumber;
while ((newPartitionNumber = arrayIndex(oldcollection.size())) > maxPartitions) {
kelondroRowSet newcollection = shrinkCollection(key, oldcollection, arrayCapacity(maxPartitions));
saveCommons(key, oldcollection);
oldcollection = newcollection;
}
// work on with oldcollection
collection = oldcollection;
newPartitionNumber = arrayIndex(collection.size());
// see if we need new space or if we can overwrite the old space
if (oldPartitionNumber == newPartitionNumber) {
@ -707,6 +742,68 @@ public class kelondroCollectionIndex {
}
}
private kelondroRowSet shrinkCollection(byte[] key, kelondroRowSet collection, int targetSize) {
// removes entries from collection
// the removed entries are stored in a 'commons' dump file
// check if the collection is already small enough
int oldsize = collection.size();
kelondroRowSet survival = new kelondroRowSet(collection.rowdef, 0);
if (oldsize <= targetSize) return survival;
// delete some entries, which are bad rated
Iterator i = collection.rows();
kelondroRow.Entry entry;
byte[] ref;
while (i.hasNext()) {
entry = (kelondroRow.Entry) i.next();
ref = entry.getColBytes(0);
if ((ref.length == 12) && (plasmaURL.probablyRootURL(new String(ref)))) {
survival.addUnique(entry);
i.remove();
}
}
int firstSurvival = survival.size();
// check if we shrinked enough
Random rand = new Random(System.currentTimeMillis());
while (survival.size() > targetSize) {
// now delete randomly more entries from the survival collection
i = survival.rows();
while (i.hasNext()) {
entry = (kelondroRow.Entry) i.next();
ref = entry.getColBytes(0);
if (rand.nextInt() % 4 != 0) {
collection.addUnique(entry);
i.remove();
}
}
}
serverLog.logInfo("kelondroCollectionIndex", "shrinked common word " + new String(key) + "; old size = " + oldsize + ", new size = " + collection.size() + ", maximum size = " + targetSize + ", survival size = " + survival.size() + ", first survival = " + firstSurvival);
return survival;
}
private void saveCommons(byte[] key, kelondroRowSet collection) {
if (key.length != 12) return;
collection.shape();
TimeZone GMTTimeZone = TimeZone.getTimeZone("GMT");
Calendar gregorian = new GregorianCalendar(GMTTimeZone);
SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss");
String filename = serverCodings.encodeHex(kelondroBase64Order.enhancedCoder.decode(new String(key))) + "_" + formatter.format(gregorian.getTime()) + ".collection";
File storagePath = new File(commonsPath, filename.substring(0, 2)); // make a subpath
storagePath.mkdirs();
File file = new File(storagePath, filename);
try {
collection.saveCollection(file);
serverLog.logInfo("kelondroCollectionIndex", "dumped common word " + new String(key) + " to " + file.toString() + "; size = " + collection.size());
} catch (IOException e) {
e.printStackTrace();
serverLog.logWarning("kelondroCollectionIndex", "failed to dump common word " + new String(key) + " to " + file.toString() + "; size = " + collection.size());
}
}
public synchronized int remove(byte[] key, Set removekeys) throws IOException, kelondroOutOfLimitsException {
if ((removekeys == null) || (removekeys.size() == 0)) return 0;
@ -918,7 +1015,7 @@ public class kelondroCollectionIndex {
kelondroCollectionIndex collectionIndex = new kelondroCollectionIndex(
path, filenameStub, 9 /*keyLength*/,
kelondroNaturalOrder.naturalOrder, buffersize, preloadTime,
4 /*loadfactor*/, rowdef);
4 /*loadfactor*/, 7, rowdef);
// fill index with values
kelondroRowSet collection = new kelondroRowSet(rowdef, 0);

@ -191,13 +191,14 @@ public class kelondroFlexSplitTable implements kelondroIndex {
}
public synchronized kelondroRow.Entry put(kelondroRow.Entry row) throws IOException {
return put(row, new Date()); // entry for current date
return put(row, null); // entry for current date
}
public synchronized kelondroRow.Entry put(kelondroRow.Entry row, Date entryDate) throws IOException {
assert row.bytes().length <= this.rowdef.objectsize;
Object[] keeper = keeperOf(row.getColBytes(0));
if (keeper != null) return ((kelondroIndex) keeper[0]).put(row);
if ((entryDate == null) || (entryDate.after(new Date()))) entryDate = new Date(); // fix date
String suffix = dateSuffix(entryDate);
if (suffix == null) return null;
kelondroIndex table = (kelondroIndex) tables.get(suffix);
@ -223,11 +224,12 @@ public class kelondroFlexSplitTable implements kelondroIndex {
}
public synchronized void addUnique(kelondroRow.Entry row) throws IOException {
addUnique(row, new Date());
addUnique(row, null);
}
public synchronized void addUnique(kelondroRow.Entry row, Date entryDate) throws IOException {
assert row.bytes().length <= this.rowdef.objectsize;
if ((entryDate == null) || (entryDate.after(new Date()))) entryDate = new Date(); // fix date
String suffix = dateSuffix(entryDate);
if (suffix == null) return;
kelondroIndex table = (kelondroIndex) tables.get(suffix);

@ -24,12 +24,14 @@
package de.anomic.kelondro;
import java.io.File;
import java.io.IOException;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import de.anomic.server.serverFileUtils;
import de.anomic.server.logging.serverLog;
public class kelondroRowCollection {
@ -143,9 +145,13 @@ public class kelondroRowCollection {
entry.setCol(exp_order_type, (this.rowdef.objectOrder == null) ? "__".getBytes() :this.rowdef.objectOrder.signature().getBytes());
entry.setCol(exp_order_col, this.rowdef.primaryKey);
entry.setCol(exp_order_bound, this.sortBound);
entry.setCol(exp_collection, chunkcache);
entry.setCol(exp_collection, this.chunkcache);
return entry.bytes();
}
public void saveCollection(File file) throws IOException {
serverFileUtils.write(exportCollection(), file);
}
public kelondroRow row() {
return this.rowdef;
@ -324,6 +330,7 @@ public class kelondroRowCollection {
}
public Iterator rows() {
// iterates kelondroRow.Entry - type entries
return new rowIterator();
}

@ -262,7 +262,7 @@ public class plasmaRankingCRProcess {
if (newdb) {
File path = to_file.getParentFile(); // path to storage place
newacc = new kelondroFlexTable(path, CRG_accname, 128 * 1024 * 1024, -1, CRG_accrow);
newseq = new kelondroCollectionIndex(path, CRG_seqname, 12, kelondroBase64Order.enhancedCoder, 128 * 1024 * 1024, -1, 2, CRG_colrow);
newseq = new kelondroCollectionIndex(path, CRG_seqname, 12, kelondroBase64Order.enhancedCoder, 128 * 1024 * 1024, -1, 2, 9, CRG_colrow);
} else {
if (!(to_file.exists())) {
acc = new kelondroAttrSeq("Global Ranking Accumulator File",
@ -393,8 +393,8 @@ public class plasmaRankingCRProcess {
public static int genrcix(File cr_path_in, File rci_path_out) throws IOException {
//kelondroFlexTable acc = new kelondroFlexTable(cr_path_in, CRG_accname, kelondroBase64Order.enhancedCoder, 128 * 1024 * 1024, -1, CRG_accrow, true);
kelondroCollectionIndex seq = new kelondroCollectionIndex(cr_path_in, CRG_seqname, 12, kelondroBase64Order.enhancedCoder, 128 * 1024 * 1024, -1, 2, CRG_colrow);
kelondroCollectionIndex rci = new kelondroCollectionIndex(rci_path_out, RCI_colname, 6, kelondroBase64Order.enhancedCoder, 128 * 1024 * 1024, -1, 2, RCI_coli);
kelondroCollectionIndex seq = new kelondroCollectionIndex(cr_path_in, CRG_seqname, 12, kelondroBase64Order.enhancedCoder, 128 * 1024 * 1024, -1, 2, 9, CRG_colrow);
kelondroCollectionIndex rci = new kelondroCollectionIndex(rci_path_out, RCI_colname, 6, kelondroBase64Order.enhancedCoder, 128 * 1024 * 1024, -1, 2, 9, RCI_coli);
// loop over all referees
int count = 0;

@ -55,6 +55,11 @@ import de.anomic.yacy.yacyDHTAction;
public final class plasmaWordIndex implements indexRI {
// environment constants
public static final long wCacheMaxAge = 1000 * 60 * 30; // milliseconds; 30 minutes
public static final int wCacheMaxChunk = 1000; // number of references for each urlhash
public static final int maxCollectionPartition = 7; // should be 7
private final kelondroOrder indexOrder = kelondroBase64Order.enhancedCoder;
private final indexRAMRI dhtOutCache, dhtInCache;
private final indexCollectionRI collections; // new database structure to replace AssortmentCluster and FileCluster
@ -65,13 +70,13 @@ public final class plasmaWordIndex implements indexRI {
public plasmaWordIndex(File indexRoot, long rwibuffer, long lurlbuffer, long preloadTime, serverLog log) {
File textindexcache = new File(indexRoot, "PUBLIC/TEXT/RICACHE");
if (!(textindexcache.exists())) textindexcache.mkdirs();
this.dhtOutCache = new indexRAMRI(textindexcache, indexRWIEntryNew.urlEntryRow, 4000, "dump1.array", log);
this.dhtInCache = new indexRAMRI(textindexcache, indexRWIEntryNew.urlEntryRow, 4000, "dump2.array", log);
this.dhtOutCache = new indexRAMRI(textindexcache, indexRWIEntryNew.urlEntryRow, wCacheMaxChunk, wCacheMaxAge, "dump1.array", log);
this.dhtInCache = new indexRAMRI(textindexcache, indexRWIEntryNew.urlEntryRow, wCacheMaxChunk, wCacheMaxAge, "dump2.array", log);
// create collections storage path
File textindexcollections = new File(indexRoot, "PUBLIC/TEXT/RICOLLECTION");
if (!(textindexcollections.exists())) textindexcollections.mkdirs();
this.collections = new indexCollectionRI(textindexcollections, "collection", rwibuffer, preloadTime, indexRWIEntryNew.urlEntryRow);
this.collections = new indexCollectionRI(textindexcollections, "collection", rwibuffer, preloadTime, maxCollectionPartition, indexRWIEntryNew.urlEntryRow);
// create LURL-db
loadedURL = new plasmaCrawlLURL(indexRoot, lurlbuffer, preloadTime);
@ -129,13 +134,18 @@ public final class plasmaWordIndex implements indexRI {
this.flushsize = flushsize;
}
public void flushControl() {
public void dhtOutFlushControl() {
// check for forced flush
synchronized (this) {
if (dhtOutCache.size() > dhtOutCache.getMaxWordCount()) {
if ((dhtOutCache.getMaxWordCount() > wCacheMaxChunk ) || (dhtOutCache.size() > dhtOutCache.getMaxWordCount())) {
flushCache(dhtOutCache, dhtOutCache.size() + flushsize - dhtOutCache.getMaxWordCount());
}
if (dhtInCache.size() > dhtInCache.getMaxWordCount()) {
}
}
public void dhtInFlushControl() {
// check for forced flush
synchronized (this) {
if ((dhtInCache.getMaxWordCount() > wCacheMaxChunk ) || (dhtInCache.size() > dhtInCache.getMaxWordCount())) {
flushCache(dhtInCache, dhtInCache.size() + flushsize - dhtInCache.getMaxWordCount());
}
}
@ -160,9 +170,10 @@ public final class plasmaWordIndex implements indexRI {
// add the entry
if (dhtInCase) {
dhtInCache.addEntry(wordHash, entry, updateTime, true);
dhtInFlushControl();
} else {
dhtOutCache.addEntry(wordHash, entry, updateTime, false);
flushControl();
dhtOutFlushControl();
}
}
@ -175,9 +186,10 @@ public final class plasmaWordIndex implements indexRI {
// add the entry
if (dhtInCase) {
dhtInCache.addEntries(entries, updateTime, true);
dhtInFlushControl();
} else {
dhtOutCache.addEntries(entries, updateTime, false);
flushControl();
dhtOutFlushControl();
}
}
@ -187,7 +199,7 @@ public final class plasmaWordIndex implements indexRI {
}
private void flushCache(indexRAMRI ram, int count) {
if (ram.size() <= 5000) return;
if (ram.size() <= count) count = ram.size();
if (count <= 0) return;
if (count > 5000) count = 5000;
busyCacheFlush = true;
@ -199,7 +211,7 @@ public final class plasmaWordIndex implements indexRI {
while (collectMax) {
wordHash = ram.maxScoreWordHash();
c = ram.getContainer(wordHash, null, -1);
if ((c != null) && (c.size() > 4000)) {
if ((c != null) && (c.size() > wCacheMaxChunk)) {
containerList.add(ram.deleteContainer(wordHash));
} else {
collectMax = false;
@ -215,9 +227,9 @@ public final class plasmaWordIndex implements indexRI {
c = ram.deleteContainer(wordHash);
if (c != null) containerList.add(c);
}
// flush the containers
collections.addMultipleEntries(containerList);
}
// flush the containers
collections.addMultipleEntries(containerList);
//System.out.println("DEBUG-Finished flush of " + count + " entries from RAM to DB in " + (System.currentTimeMillis() - start) + " milliseconds");
busyCacheFlush = false;
}

@ -480,7 +480,7 @@ xpstopw=true
40_peerseedcycle_busysleep=1200000
40_peerseedcycle_memprereq=2097152
50_localcrawl_idlesleep=1000
50_localcrawl_busysleep=120
50_localcrawl_busysleep=250
50_localcrawl_memprereq=2097152
50_localcrawl_isPaused=false
61_globalcrawltrigger_idlesleep=10000
@ -494,8 +494,8 @@ xpstopw=true
70_cachemanager_idlesleep=1000
70_cachemanager_busysleep=0
70_cachemanager_memprereq=1048576
80_indexing_idlesleep=2000
80_indexing_busysleep=100
80_indexing_idlesleep=1000
80_indexing_busysleep=50
80_indexing_memprereq=2097152
82_crawlstack_idlesleep=5000
82_crawlstack_busysleep=0
@ -853,7 +853,7 @@ soap.serviceDeploymentList =
# Wiki access rights
# the built - in wiki system allowes by default only that the administration is allowed to make changes
# the built-in wiki system allows by default only that the administrator is allowed to make changes
# this can be changed. There are three options:
# admin - only the admin has write right
# all - everybody has write right

Loading…
Cancel
Save