replaced the storing procedure for the index ram cache with a method that generates BLOBHeap-compatible dumps

this is a migration step to support a new method to store the web index, which will also based on the same data structure. made also a lot of refactoring for a better structuring of the BLOBHeap class.

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@5430 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
orbiter 16 years ago
parent db1cfae3e7
commit b6bba18c37

@ -27,12 +27,9 @@
package de.anomic.index; package de.anomic.index;
import java.io.BufferedInputStream; import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File; import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
@ -42,8 +39,8 @@ import java.util.Set;
import java.util.SortedMap; import java.util.SortedMap;
import java.util.TreeMap; import java.util.TreeMap;
import de.anomic.kelondro.kelondroBLOB; import de.anomic.kelondro.kelondroBLOBHeapReader;
import de.anomic.kelondro.kelondroBLOBHeap; import de.anomic.kelondro.kelondroBLOBHeapWriter;
import de.anomic.kelondro.kelondroBase64Order; import de.anomic.kelondro.kelondroBase64Order;
import de.anomic.kelondro.kelondroByteOrder; import de.anomic.kelondro.kelondroByteOrder;
import de.anomic.kelondro.kelondroCloneableIterator; import de.anomic.kelondro.kelondroCloneableIterator;
@ -90,7 +87,7 @@ public final class indexContainerHeap {
* @param heapFile * @param heapFile
* @throws IOException * @throws IOException
*/ */
public void initWriteMode(final File heapFile) throws IOException { public void initWriteModeFromHeap(final File heapFile) throws IOException {
if (log != null) log.logInfo("restoring dump for rwi heap '" + heapFile.getName() + "'"); if (log != null) log.logInfo("restoring dump for rwi heap '" + heapFile.getName() + "'");
final long start = System.currentTimeMillis(); final long start = System.currentTimeMillis();
this.cache = Collections.synchronizedSortedMap(new TreeMap<String, indexContainer>(new kelondroByteOrder.StringOrder(payloadrow.getOrdering()))); this.cache = Collections.synchronizedSortedMap(new TreeMap<String, indexContainer>(new kelondroByteOrder.StringOrder(payloadrow.getOrdering())));
@ -103,10 +100,33 @@ public final class indexContainerHeap {
urlCount += container.size(); urlCount += container.size();
} }
} }
if (log != null) log.logInfo("finished rwi heap restore: " + cache.size() + " words, " + urlCount + " word/URL relations in " + ((System.currentTimeMillis() - start) / 1000) + " seconds"); if (log != null) log.logInfo("finished rwi heap restore: " + cache.size() + " words, " + urlCount + " word/URL relations in " + (System.currentTimeMillis() - start) + " milliseconds");
} }
public void dump(final File heapFile) throws IOException { /**
* this is the new cache file format initialization
* @param heapFile
* @throws IOException
*/
public void initWriteModeFromBLOB(final File blobFile) throws IOException {
if (log != null) log.logInfo("restoring rwi blob dump '" + blobFile.getName() + "'");
final long start = System.currentTimeMillis();
this.cache = Collections.synchronizedSortedMap(new TreeMap<String, indexContainer>(new kelondroByteOrder.StringOrder(payloadrow.getOrdering())));
int urlCount = 0;
synchronized (cache) {
for (final indexContainer container : new blobFileEntries(blobFile, this.payloadrow)) {
// TODO: in this loop a lot of memory may be allocated. A check if the memory gets low is necessary. But what do when the memory is low?
if (container == null) break;
cache.put(container.getWordHash(), container);
urlCount += container.size();
}
}
// remove idx and gap files if they exist here
kelondroBLOBHeapWriter.deleteAllFingerprints(blobFile);
if (log != null) log.logInfo("finished rwi blob restore: " + cache.size() + " words, " + urlCount + " word/URL relations in " + (System.currentTimeMillis() - start) + " milliseconds");
}
/*
public void dumpold(final File heapFile) throws IOException {
assert this.cache != null; assert this.cache != null;
if (log != null) log.logInfo("creating rwi heap dump '" + heapFile.getName() + "', " + cache.size() + " rwi's"); if (log != null) log.logInfo("creating rwi heap dump '" + heapFile.getName() + "', " + cache.size() + " rwi's");
if (heapFile.exists()) heapFile.delete(); if (heapFile.exists()) heapFile.delete();
@ -137,14 +157,14 @@ public final class indexContainerHeap {
} }
os.flush(); os.flush();
os.close(); os.close();
if (log != null) log.logInfo("finished rwi heap dump: " + wordcount + " words, " + urlcount + " word/URL relations in " + ((System.currentTimeMillis() - startTime) / 1000) + " seconds"); if (log != null) log.logInfo("finished rwi heap dump: " + wordcount + " words, " + urlcount + " word/URL relations in " + (System.currentTimeMillis() - startTime) + " milliseconds");
} }
*/
public void dump2(final File heapFile) throws IOException { public void dump(final File heapFile) throws IOException {
assert this.cache != null; assert this.cache != null;
if (log != null) log.logInfo("creating alternative rwi heap dump '" + heapFile.getName() + "', " + cache.size() + " rwi's"); if (log != null) log.logInfo("creating alternative rwi heap dump '" + heapFile.getName() + "', " + cache.size() + " rwi's");
if (heapFile.exists()) heapFile.delete(); if (heapFile.exists()) heapFile.delete();
final kelondroBLOB dump = new kelondroBLOBHeap(heapFile, payloadrow.primaryKeyLength, kelondroBase64Order.enhancedCoder, 1024 * 1024 * 10); final kelondroBLOBHeapWriter dump = new kelondroBLOBHeapWriter(heapFile, payloadrow.primaryKeyLength, kelondroBase64Order.enhancedCoder);
final long startTime = System.currentTimeMillis(); final long startTime = System.currentTimeMillis();
long wordcount = 0, urlcount = 0; long wordcount = 0, urlcount = 0;
String wordHash; String wordHash;
@ -159,14 +179,14 @@ public final class indexContainerHeap {
// put entries on heap // put entries on heap
if (container != null && wordHash.length() == payloadrow.primaryKeyLength) { if (container != null && wordHash.length() == payloadrow.primaryKeyLength) {
dump.put(wordHash.getBytes(), container.exportCollection()); dump.add(wordHash.getBytes(), container.exportCollection());
urlcount += container.size(); urlcount += container.size();
} }
wordcount++; wordcount++;
} }
} }
dump.close(); dump.close();
if (log != null) log.logInfo("finished alternative rwi heap dump: " + wordcount + " words, " + urlcount + " word/URL relations in " + ((System.currentTimeMillis() - startTime) / 1000) + " seconds"); if (log != null) log.logInfo("finished alternative rwi heap dump: " + wordcount + " words, " + urlcount + " word/URL relations in " + (System.currentTimeMillis() - startTime) + " milliseconds");
} }
public int size() { public int size() {
@ -184,7 +204,7 @@ public final class indexContainerHeap {
public heapFileEntries(final File heapFile, final kelondroRow payloadrow) throws IOException { public heapFileEntries(final File heapFile, final kelondroRow payloadrow) throws IOException {
if (!(heapFile.exists())) throw new IOException("file " + heapFile + " does not exist"); if (!(heapFile.exists())) throw new IOException("file " + heapFile + " does not exist");
is = new DataInputStream(new BufferedInputStream(new FileInputStream(heapFile), 64*1024)); is = new DataInputStream(new BufferedInputStream(new FileInputStream(heapFile), 1024*1024));
word = new byte[payloadrow.primaryKeyLength]; word = new byte[payloadrow.primaryKeyLength];
this.payloadrow = payloadrow; this.payloadrow = payloadrow;
this.nextContainer = next0(); this.nextContainer = next0();
@ -231,6 +251,53 @@ public final class indexContainerHeap {
} }
} }
/**
* static iterator of BLOBHeap files: is used to import heap dumps into a write-enabled index heap
*/
public static class blobFileEntries implements Iterator<indexContainer>, Iterable<indexContainer> {
Iterator<Map.Entry<String, byte[]>> blobs;
kelondroRow payloadrow;
public blobFileEntries(final File blobFile, final kelondroRow payloadrow) throws IOException {
this.blobs = new kelondroBLOBHeapReader.entries(blobFile, payloadrow.primaryKeyLength);
this.payloadrow = payloadrow;
}
public boolean hasNext() {
return blobs.hasNext();
}
/**
* return an index container
* because they may get very large, it is wise to deallocate some memory before calling next()
*/
public indexContainer next() {
try {
Map.Entry<String, byte[]> entry = blobs.next();
byte[] payload = entry.getValue();
return new indexContainer(entry.getKey(), kelondroRowSet.importRowSet(payload, payloadrow));
} catch (final IOException e) {
return null;
}
}
public void remove() {
throw new UnsupportedOperationException("heap dumps are read-only");
}
public Iterator<indexContainer> iterator() {
return this;
}
public void close() {
blobs = null;
}
protected void finalize() {
this.close();
}
}
public synchronized int maxReferences() { public synchronized int maxReferences() {
// iterate to find the max score // iterate to find the max score
int max = 0; int max = 0;

@ -47,11 +47,19 @@ public final class indexRAMRI implements indexRI, indexRIReader {
public int cacheReferenceCountLimit; // the maximum number of references to a single RWI entity public int cacheReferenceCountLimit; // the maximum number of references to a single RWI entity
public long cacheReferenceAgeLimit; // the maximum age (= time not changed) of a RWI entity public long cacheReferenceAgeLimit; // the maximum age (= time not changed) of a RWI entity
private final serverLog log; private final serverLog log;
private final File indexHeapFile; private final File oldDumpFile, newDumpFile;
private indexContainerHeap heap; private indexContainerHeap heap;
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public indexRAMRI(final File databaseRoot, final kelondroRow payloadrow, final int entityCacheMaxSize, final int wCacheReferenceCountLimitInit, final long wCacheReferenceAgeLimitInit, final String newHeapName, final serverLog log) { public indexRAMRI(
final File databaseRoot,
final kelondroRow payloadrow,
final int entityCacheMaxSize,
final int wCacheReferenceCountLimitInit,
final long wCacheReferenceAgeLimitInit,
final String oldHeapName,
final String newHeapName,
final serverLog log) {
// creates a new index cache // creates a new index cache
// the cache has a back-end where indexes that do not fit in the cache are flushed // the cache has a back-end where indexes that do not fit in the cache are flushed
@ -62,25 +70,37 @@ public final class indexRAMRI implements indexRI, indexRIReader {
this.cacheReferenceCountLimit = wCacheReferenceCountLimitInit; this.cacheReferenceCountLimit = wCacheReferenceCountLimitInit;
this.cacheReferenceAgeLimit = wCacheReferenceAgeLimitInit; this.cacheReferenceAgeLimit = wCacheReferenceAgeLimitInit;
this.log = log; this.log = log;
this.indexHeapFile = new File(databaseRoot, newHeapName); this.oldDumpFile = new File(databaseRoot, oldHeapName);
this.newDumpFile = new File(databaseRoot, newHeapName);
this.heap = new indexContainerHeap(payloadrow, log); this.heap = new indexContainerHeap(payloadrow, log);
// read in dump of last session // read in dump of last session
if (indexHeapFile.exists()) { boolean initFailed = false;
try { if (newDumpFile.exists() && oldDumpFile.exists()) {
heap.initWriteMode(indexHeapFile); // we need only one, delete the old
for (final indexContainer ic : (Iterable<indexContainer>) heap.wordContainers(null, false)) { oldDumpFile.delete();
this.hashDate.setScore(ic.getWordHash(), intTime(ic.lastWrote())); }
this.hashScore.setScore(ic.getWordHash(), ic.size()); if (oldDumpFile.exists()) try {
} heap.initWriteModeFromHeap(oldDumpFile);
} catch (final IOException e){ } catch (IOException e) {
log.logSevere("unable to restore cache dump: " + e.getMessage(), e); initFailed = true;
// get empty dump e.printStackTrace();
heap.initWriteMode(); }
} catch (final NegativeArraySizeException e){ if (newDumpFile.exists()) try {
log.logSevere("unable to restore cache dump: " + e.getMessage(), e); heap.initWriteModeFromBLOB(newDumpFile);
// get empty dump } catch (IOException e) {
heap.initWriteMode(); initFailed = true;
e.printStackTrace();
}
if (initFailed) {
log.logSevere("unable to restore cache dump");
// get empty dump
heap.initWriteMode();
} else if (oldDumpFile.exists() || newDumpFile.exists()) {
// initialize scores for cache organization
for (final indexContainer ic : (Iterable<indexContainer>) heap.wordContainers(null, false)) {
this.hashDate.setScore(ic.getWordHash(), intTime(ic.lastWrote()));
this.hashScore.setScore(ic.getWordHash(), ic.size());
} }
} else { } else {
heap.initWriteMode(); heap.initWriteMode();
@ -319,8 +339,8 @@ public final class indexRAMRI implements indexRI, indexRIReader {
public synchronized void close() { public synchronized void close() {
// dump cache // dump cache
try { try {
heap.dump(this.indexHeapFile); //heap.dumpold(this.oldDumpFile);
//heap.dump2(new File(this.indexHeapFile.getAbsolutePath() + ".blob")); heap.dump(this.newDumpFile);
} catch (final IOException e){ } catch (final IOException e){
log.logSevere("unable to dump cache: " + e.getMessage(), e); log.logSevere("unable to dump cache: " + e.getMessage(), e);
} }

@ -28,12 +28,12 @@ package de.anomic.kelondro;
import java.io.BufferedInputStream; import java.io.BufferedInputStream;
import java.io.BufferedOutputStream; import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File; import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.TreeMap; import java.util.TreeMap;
@ -56,21 +56,24 @@ public class kelondroBLOBGap extends TreeMap<Long, Integer> {
* initialize a kelondroBLOBGap with the content of a dump * initialize a kelondroBLOBGap with the content of a dump
* @param file * @param file
* @throws IOException * @throws IOException
* @throws IOException
*/ */
public kelondroBLOBGap(final File file) throws IOException { public kelondroBLOBGap(final File file) throws IOException {
super(); super();
// read the index dump and fill the index // read the index dump and fill the index
InputStream is = new BufferedInputStream(new FileInputStream(file), 1024 * 1024); DataInputStream is = new DataInputStream(new BufferedInputStream(new FileInputStream(file), 1024 * 1024));
byte[] k = new byte[8]; long p;
byte[] v = new byte[4]; int l;
int c;
while (true) { while (true) {
c = is.read(k); try {
if (c <= 0) break; p = is.readLong();
c = is.read(v); l = is.readInt();
if (c <= 0) break; this.put(new Long(p), new Integer(l));
this.put(new Long(kelondroNaturalOrder.decodeLong(k)), new Integer((int) kelondroNaturalOrder.decodeLong(v))); } catch (IOException e) {
break;
}
} }
is.close();
} }
/** /**
@ -81,13 +84,13 @@ public class kelondroBLOBGap extends TreeMap<Long, Integer> {
*/ */
public int dump(File file) throws IOException { public int dump(File file) throws IOException {
Iterator<Map.Entry<Long, Integer>> i = this.entrySet().iterator(); Iterator<Map.Entry<Long, Integer>> i = this.entrySet().iterator();
OutputStream os = new BufferedOutputStream(new FileOutputStream(file), 1024 * 1024); DataOutputStream os = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(file), 1024 * 1024));
int c = 0; int c = 0;
Map.Entry<Long, Integer> e; Map.Entry<Long, Integer> e;
while (i.hasNext()) { while (i.hasNext()) {
e = i.next(); e = i.next();
os.write(kelondroNaturalOrder.encodeLong(e.getKey().longValue(), 8)); os.writeLong(e.getKey().longValue());
os.write(kelondroNaturalOrder.encodeLong(e.getValue().longValue(), 4)); os.writeInt(e.getValue().intValue());
c++; c++;
} }
os.flush(); os.flush();

@ -32,19 +32,11 @@ import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.SortedMap; import java.util.SortedMap;
import java.util.concurrent.ExecutionException;
import de.anomic.server.serverMemory;
import de.anomic.server.logging.serverLog; import de.anomic.server.logging.serverLog;
public final class kelondroBLOBHeap implements kelondroBLOB { public final class kelondroBLOBHeap extends kelondroBLOBHeapReader implements kelondroBLOB {
private int keylength; // the length of the primary key
private kelondroBytesLongMap index; // key/seek relation for used records
private kelondroBLOBGap free; // set of {seek, size} pairs denoting space and position of free records
private final File heapFile; // the file of the heap
private final kelondroByteOrder ordering; // the ordering on keys
private kelondroCachedFileRA file; // a random access to the file
private HashMap<String, byte[]> buffer; // a write buffer to limit IO to the file; attention: Maps cannot use byte[] as key private HashMap<String, byte[]> buffer; // a write buffer to limit IO to the file; attention: Maps cannot use byte[] as key
private int buffersize; // bytes that are buffered in buffer private int buffersize; // bytes that are buffered in buffer
private int buffermax; // maximum size of the buffer private int buffermax; // maximum size of the buffer
@ -79,47 +71,16 @@ public final class kelondroBLOBHeap implements kelondroBLOB {
* @param ordering * @param ordering
* @throws IOException * @throws IOException
*/ */
public kelondroBLOBHeap(final File heapFile, final int keylength, final kelondroByteOrder ordering, int buffermax) throws IOException { public kelondroBLOBHeap(
this.ordering = ordering; final File heapFile,
this.heapFile = heapFile; final int keylength,
final kelondroByteOrder ordering,
int buffermax) throws IOException {
super(heapFile, keylength, ordering);
this.buffermax = buffermax; this.buffermax = buffermax;
this.keylength = keylength;
this.index = null; // will be created as result of initialization process
this.free = null; // will be initialized later depending on existing idx/gap file
this.buffer = new HashMap<String, byte[]>(); this.buffer = new HashMap<String, byte[]>();
this.buffersize = 0; this.buffersize = 0;
this.file = new kelondroCachedFileRA(heapFile); mergeFreeEntries();
// read or initialize the index
if (initIndexReadDump(heapFile)) {
// verify that everything worked just fine
// pick some elements of the index
Iterator<byte[]> i = this.index.keys(true, null);
int c = 3;
byte[] b, b1 = new byte[index.row().primaryKeyLength];
long pos;
boolean ok = true;
while (i.hasNext() && c-- > 0) {
b = i.next();
pos = this.index.getl(b);
file.seek(pos + 4);
file.readFully(b1, 0, b1.length);
if (this.ordering.compare(b, b1) != 0) {
ok = false;
break;
}
}
if (!ok) {
serverLog.logWarning("kelondroBLOBHeap", "verification of idx file for " + heapFile.toString() + " failed, re-building index");
initIndexReadFromHeap();
} else {
serverLog.logInfo("kelondroBLOBHeap", "using a dump of the index of " + heapFile.toString() + ".");
}
} else {
// if we did not have a dump, create a new index
initIndexReadFromHeap();
}
/* /*
// DEBUG // DEBUG
Iterator<byte[]> i = index.keys(true, null); Iterator<byte[]> i = index.keys(true, null);
@ -138,128 +99,10 @@ public final class kelondroBLOBHeap implements kelondroBLOB {
*/ */
} }
private boolean initIndexReadDump(File f) {
// look for an index dump and read it if it exist
// if this is successfull, return true; otherwise false
File fif = fingerprintIndexFile(f);
File fgf = fingerprintGapFile(f);
if (!fif.exists() || !fgf.exists()) {
deleteAllFingerprints(f);
return false;
}
// there is an index and a gap file:
// read the index file:
try {
this.index = new kelondroBytesLongMap(this.keylength, this.ordering, fif);
} catch (IOException e) {
e.printStackTrace();
return false;
}
// an index file is a one-time throw-away object, so just delete it now
fif.delete();
// read the gap file:
try {
this.free = new kelondroBLOBGap(fgf);
} catch (IOException e) {
e.printStackTrace();
return false;
}
// same with gap file
fgf.delete();
// everything is fine now
return this.index.size() > 0;
}
private static File fingerprintIndexFile(File f) {
return new File(f.getParentFile(), f.getName() + "." + fingerprintFileHash(f) + ".idx");
}
private static File fingerprintGapFile(File f) {
return new File(f.getParentFile(), f.getName() + "." + fingerprintFileHash(f) + ".gap");
}
private static String fingerprintFileHash(File f) {
return kelondroDigest.fastFingerprintB64(f, false).substring(0, 12);
}
private static void deleteAllFingerprints(File f) {
File d = f.getParentFile();
String n = f.getName();
String[] l = d.list();
for (int i = 0; i < l.length; i++) {
if (l[i].startsWith(n) && (l[i].endsWith(".idx") || l[i].endsWith(".gap"))) new File(d, l[i]).delete();
}
}
private void initIndexReadFromHeap() throws IOException {
// this initializes the this.index object by reading positions from the heap file
this.free = new kelondroBLOBGap();
kelondroBytesLongMap.initDataConsumer indexready = kelondroBytesLongMap.asynchronusInitializer(keylength, this.ordering, 0, Math.max(10, (int) (Runtime.getRuntime().freeMemory() / (10 * 1024 * 1024))));
byte[] key = new byte[keylength];
int reclen;
long seek = 0;
loop: while (true) { // don't test available() here because this does not work for files > 2GB
try {
// go to seek position
file.seek(seek);
// read length of the following record without the length of the record size bytes
reclen = file.readInt();
//assert reclen > 0 : " reclen == 0 at seek pos " + seek;
if (reclen == 0) {
// very bad file inconsistency
serverLog.logSevere("kelondroBLOBHeap", "reclen == 0 at seek pos " + seek + " in file " + heapFile);
this.file.setLength(seek); // delete everything else at the remaining of the file :-(
break loop;
}
// read key
file.readFully(key, 0, key.length);
} catch (final IOException e) {
// EOF reached
break loop; // terminate loop
}
// check if this record is empty
if (key == null || key[0] == 0) {
// it is an empty record, store to free list
if (reclen > 0) free.put(seek, reclen);
} else {
if (this.ordering.wellformed(key)) {
indexready.consume(key, seek);
key = new byte[keylength];
} else {
serverLog.logWarning("kelondroBLOBHeap", "BLOB " + heapFile.getName() + ": skiped not wellformed key " + new String(key) + " at seek pos " + seek);
}
}
// new seek position
seek += 4L + reclen;
}
indexready.finish();
// do something useful in between
mergeFreeEntries();
// finish the index generation
try {
this.index = indexready.result();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
private void mergeFreeEntries() throws IOException { private void mergeFreeEntries() throws IOException {
// try to merge free entries // try to merge free entries
if (this.free.size() > 1) { if (super.free.size() > 1) {
int merged = 0; int merged = 0;
Map.Entry<Long, Integer> lastFree, nextFree; Map.Entry<Long, Integer> lastFree, nextFree;
final Iterator<Map.Entry<Long, Integer>> i = this.free.entrySet().iterator(); final Iterator<Map.Entry<Long, Integer>> i = this.free.entrySet().iterator();
@ -286,21 +129,14 @@ public final class kelondroBLOBHeap implements kelondroBLOB {
} }
public String name() {
return this.heapFile.getName();
}
/** /**
* the number of BLOBs in the heap * the number of BLOBs in the heap
* @return the number of BLOBs in the heap * @return the number of BLOBs in the heap
*/ */
public synchronized int size() { public synchronized int size() {
return this.index.size() + this.buffer.size(); return super.size() + this.buffer.size();
} }
public kelondroByteOrder ordering() {
return this.ordering;
}
/** /**
* test if a key is in the heap file. This does not need any IO, because it uses only the ram index * test if a key is in the heap file. This does not need any IO, because it uses only the ram index
@ -313,14 +149,7 @@ public final class kelondroBLOBHeap implements kelondroBLOB {
// check the buffer // check the buffer
if (this.buffer.containsKey(new String(key))) return true; if (this.buffer.containsKey(new String(key))) return true;
return super.has(key);
// check if the file index contains the key
try {
return index.getl(key) >= 0;
} catch (final IOException e) {
e.printStackTrace();
return false;
}
} }
/** /**
@ -331,6 +160,7 @@ public final class kelondroBLOBHeap implements kelondroBLOB {
*/ */
private void add(final byte[] key, final byte[] blob) throws IOException { private void add(final byte[] key, final byte[] blob) throws IOException {
assert blob.length > 0; assert blob.length > 0;
assert key.length == this.keylength;
assert index.row().primaryKeyLength == key.length : index.row().primaryKeyLength + "!=" + key.length; assert index.row().primaryKeyLength == key.length : index.row().primaryKeyLength + "!=" + key.length;
if ((blob == null) || (blob.length == 0)) return; if ((blob == null) || (blob.length == 0)) return;
final int pos = (int) file.length(); final int pos = (int) file.length();
@ -395,34 +225,7 @@ public final class kelondroBLOBHeap implements kelondroBLOB {
byte[] blob = this.buffer.get(new String(key)); byte[] blob = this.buffer.get(new String(key));
if (blob != null) return blob; if (blob != null) return blob;
// check if the index contains the key return super.get(key);
final long pos = index.getl(key);
if (pos < 0) return null;
// access the file and read the container
file.seek(pos);
final int len = file.readInt() - index.row().primaryKeyLength;
if (serverMemory.available() < len) {
if (!serverMemory.request(len, false)) return null; // not enough memory available for this blob
}
// read the key
final byte[] keyf = new byte[index.row().primaryKeyLength];
file.readFully(keyf, 0, keyf.length);
if (this.ordering.compare(key, keyf) != 0) {
// verification of the indexed access failed. we must re-read the index
serverLog.logWarning("kelondroBLOBHeap", "verification indexed access for " + heapFile.toString() + " failed, re-building index");
// this is a severe operation, it should never happen.
// but if the process ends in this state, it would completey fail
// if the index is not rebuild now at once
initIndexReadFromHeap();
}
// read the blob
blob = new byte[len];
file.readFully(blob, 0, blob.length);
return blob;
} }
/** /**
@ -437,14 +240,8 @@ public final class kelondroBLOBHeap implements kelondroBLOB {
// check the buffer // check the buffer
byte[] blob = this.buffer.get(new String(key)); byte[] blob = this.buffer.get(new String(key));
if (blob != null) return blob.length; if (blob != null) return blob.length;
// check if the index contains the key return super.length(key);
final long pos = index.getl(key);
if (pos < 0) return -1;
// access the file and read the size of the container
file.seek(pos);
return file.readInt() - index.row().primaryKeyLength;
} }
/** /**
@ -467,31 +264,32 @@ public final class kelondroBLOBHeap implements kelondroBLOB {
/** /**
* close the BLOB table * close the BLOB table
* @throws
*/ */
public synchronized void close() { public synchronized void close() {
shrinkWithGapsAtEnd(); shrinkWithGapsAtEnd();
try { if (file != null) {
flushBuffer(); try {
} catch (IOException e) { flushBuffer();
e.printStackTrace(); } catch (IOException e) {
} e.printStackTrace();
try { }
file.close(); try {
} catch (final IOException e) { file.close();
e.printStackTrace(); } catch (final IOException e) {
e.printStackTrace();
}
} }
file = null; file = null;
if (index.size() > 3 || free.size() > 3) { if (index != null && free != null && (index.size() > 3 || free.size() > 3)) {
// now we can create a dump of the index and the gap information // now we can create a dump of the index and the gap information
// to speed up the next start // to speed up the next start
try { try {
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
free.dump(fingerprintGapFile(this.heapFile)); free.dump(kelondroBLOBHeapWriter.fingerprintGapFile(this.heapFile));
free.clear(); free.clear();
free = null; free = null;
index.dump(fingerprintIndexFile(this.heapFile)); index.dump(kelondroBLOBHeapWriter.fingerprintIndexFile(this.heapFile));
serverLog.logInfo("kelondroBLOBHeap", "wrote a dump for the " + this.index.size() + " index entries of " + heapFile.getName()+ " in " + (System.currentTimeMillis() - start) + " milliseconds."); serverLog.logInfo("kelondroBLOBHeap", "wrote a dump for the " + this.index.size() + " index entries of " + heapFile.getName()+ " in " + (System.currentTimeMillis() - start) + " milliseconds.");
index.close(); index.close();
index = null; index = null;
@ -506,14 +304,6 @@ public final class kelondroBLOBHeap implements kelondroBLOB {
index = null; index = null;
} }
} }
/**
* ask for the length of the primary key
* @return the length of the key
*/
public int keylength() {
return this.index.row().primaryKeyLength;
}
/** /**
* write a whole byte array as BLOB to the table * write a whole byte array as BLOB to the table
@ -783,7 +573,8 @@ public final class kelondroBLOBHeap implements kelondroBLOB {
* @throws IOException * @throws IOException
*/ */
public synchronized kelondroCloneableIterator<byte[]> keys(final boolean up, final boolean rotating) throws IOException { public synchronized kelondroCloneableIterator<byte[]> keys(final boolean up, final boolean rotating) throws IOException {
return new kelondroRotateIterator<byte[]>(this.index.keys(up, null), null, this.index.size()); this.flushBuffer();
return super.keys(up, rotating);
} }
/** /**
@ -794,11 +585,12 @@ public final class kelondroBLOBHeap implements kelondroBLOB {
* @throws IOException * @throws IOException
*/ */
public synchronized kelondroCloneableIterator<byte[]> keys(final boolean up, final byte[] firstKey) throws IOException { public synchronized kelondroCloneableIterator<byte[]> keys(final boolean up, final byte[] firstKey) throws IOException {
return this.index.keys(up, firstKey); this.flushBuffer();
return super.keys(up, firstKey);
} }
public long length() throws IOException { public long length() throws IOException {
return this.heapFile.length() + this.buffersize; return super.length() + this.buffersize;
} }
public static void heaptest() { public static void heaptest() {

@ -0,0 +1,410 @@
// kelondroBLOBHeapReader.java
// (C) 2008 by Michael Peter Christen; mc@yacy.net, Frankfurt a. M., Germany
// first published 30.12.2008 on http://yacy.net
//
// $LastChangedDate: 2008-03-14 01:16:04 +0100 (Fr, 14 Mrz 2008) $
// $LastChangedRevision: 4558 $
// $LastChangedBy: orbiter $
//
// LICENSE
//
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 2 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
package de.anomic.kelondro;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import de.anomic.server.serverMemory;
import de.anomic.server.logging.serverLog;
public class kelondroBLOBHeapReader {
protected int keylength; // the length of the primary key
protected kelondroBytesLongMap index; // key/seek relation for used records
protected kelondroBLOBGap free; // set of {seek, size} pairs denoting space and position of free records
protected final File heapFile; // the file of the heap
protected final kelondroByteOrder ordering; // the ordering on keys
protected kelondroCachedFileRA file; // a random access to the file
public kelondroBLOBHeapReader(
final File heapFile,
final int keylength,
final kelondroByteOrder ordering) throws IOException {
this.ordering = ordering;
this.heapFile = heapFile;
this.keylength = keylength;
this.index = null; // will be created as result of initialization process
this.free = null; // will be initialized later depending on existing idx/gap file
this.file = new kelondroCachedFileRA(heapFile);
// read or initialize the index
if (initIndexReadDump(heapFile)) {
// verify that everything worked just fine
// pick some elements of the index
Iterator<byte[]> i = this.index.keys(true, null);
int c = 3;
byte[] b, b1 = new byte[index.row().primaryKeyLength];
long pos;
boolean ok = true;
while (i.hasNext() && c-- > 0) {
b = i.next();
pos = this.index.getl(b);
file.seek(pos + 4);
file.readFully(b1, 0, b1.length);
if (this.ordering.compare(b, b1) != 0) {
ok = false;
break;
}
}
if (!ok) {
serverLog.logWarning("kelondroBLOBHeap", "verification of idx file for " + heapFile.toString() + " failed, re-building index");
initIndexReadFromHeap();
} else {
serverLog.logInfo("kelondroBLOBHeap", "using a dump of the index of " + heapFile.toString() + ".");
}
} else {
// if we did not have a dump, create a new index
initIndexReadFromHeap();
}
}
private boolean initIndexReadDump(File f) {
// look for an index dump and read it if it exist
// if this is successfull, return true; otherwise false
File fif = kelondroBLOBHeapWriter.fingerprintIndexFile(f);
File fgf = kelondroBLOBHeapWriter.fingerprintGapFile(f);
if (!fif.exists() || !fgf.exists()) {
kelondroBLOBHeapWriter.deleteAllFingerprints(f);
return false;
}
// there is an index and a gap file:
// read the index file:
try {
this.index = new kelondroBytesLongMap(this.keylength, this.ordering, fif);
} catch (IOException e) {
e.printStackTrace();
return false;
}
// an index file is a one-time throw-away object, so just delete it now
fif.delete();
// read the gap file:
try {
this.free = new kelondroBLOBGap(fgf);
} catch (IOException e) {
e.printStackTrace();
return false;
}
// same with gap file
fgf.delete();
// everything is fine now
return this.index.size() > 0;
}
private void initIndexReadFromHeap() throws IOException {
// this initializes the this.index object by reading positions from the heap file
this.free = new kelondroBLOBGap();
kelondroBytesLongMap.initDataConsumer indexready = kelondroBytesLongMap.asynchronusInitializer(keylength, this.ordering, 0, Math.max(10, (int) (Runtime.getRuntime().freeMemory() / (10 * 1024 * 1024))));
byte[] key = new byte[keylength];
int reclen;
long seek = 0;
loop: while (true) { // don't test available() here because this does not work for files > 2GB
try {
// go to seek position
file.seek(seek);
// read length of the following record without the length of the record size bytes
reclen = file.readInt();
//assert reclen > 0 : " reclen == 0 at seek pos " + seek;
if (reclen == 0) {
// very bad file inconsistency
serverLog.logSevere("kelondroBLOBHeap", "reclen == 0 at seek pos " + seek + " in file " + heapFile);
this.file.setLength(seek); // delete everything else at the remaining of the file :-(
break loop;
}
// read key
file.readFully(key, 0, key.length);
} catch (final IOException e) {
// EOF reached
break loop; // terminate loop
}
// check if this record is empty
if (key == null || key[0] == 0) {
// it is an empty record, store to free list
if (reclen > 0) free.put(seek, reclen);
} else {
if (this.ordering.wellformed(key)) {
indexready.consume(key, seek);
key = new byte[keylength];
} else {
serverLog.logWarning("kelondroBLOBHeap", "BLOB " + heapFile.getName() + ": skiped not wellformed key " + new String(key) + " at seek pos " + seek);
}
}
// new seek position
seek += 4L + reclen;
}
indexready.finish();
// finish the index generation
try {
this.index = indexready.result();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
public String name() {
return this.heapFile.getName();
}
/**
* the number of BLOBs in the heap
* @return the number of BLOBs in the heap
*/
public synchronized int size() {
return this.index.size();
}
/**
* test if a key is in the heap file. This does not need any IO, because it uses only the ram index
* @param key
* @return true if the key exists, false otherwise
*/
public synchronized boolean has(final byte[] key) {
assert index != null;
assert index.row().primaryKeyLength == key.length : index.row().primaryKeyLength + "!=" + key.length;
// check if the file index contains the key
try {
return index.getl(key) >= 0;
} catch (final IOException e) {
e.printStackTrace();
return false;
}
}
public kelondroByteOrder ordering() {
return this.ordering;
}
/**
* read a blob from the heap
* @param key
* @return
* @throws IOException
*/
public synchronized byte[] get(final byte[] key) throws IOException {
assert index.row().primaryKeyLength == key.length : index.row().primaryKeyLength + "!=" + key.length;
// check if the index contains the key
final long pos = index.getl(key);
if (pos < 0) return null;
// access the file and read the container
file.seek(pos);
final int len = file.readInt() - index.row().primaryKeyLength;
if (serverMemory.available() < len) {
if (!serverMemory.request(len, false)) return null; // not enough memory available for this blob
}
// read the key
final byte[] keyf = new byte[index.row().primaryKeyLength];
file.readFully(keyf, 0, keyf.length);
if (this.ordering.compare(key, keyf) != 0) {
// verification of the indexed access failed. we must re-read the index
serverLog.logWarning("kelondroBLOBHeap", "verification indexed access for " + heapFile.toString() + " failed, re-building index");
// this is a severe operation, it should never happen.
// but if the process ends in this state, it would completely fail
// if the index is not rebuild now at once
initIndexReadFromHeap();
}
// read the blob
byte[] blob = new byte[len];
file.readFully(blob, 0, blob.length);
return blob;
}
/**
* retrieve the size of the BLOB
* @param key
* @return the size of the BLOB or -1 if the BLOB does not exist
* @throws IOException
*/
public long length(byte[] key) throws IOException {
assert index.row().primaryKeyLength == key.length : index.row().primaryKeyLength + "!=" + key.length;
// check if the index contains the key
final long pos = index.getl(key);
if (pos < 0) return -1;
// access the file and read the size of the container
file.seek(pos);
return file.readInt() - index.row().primaryKeyLength;
}
/**
* close the BLOB table
*/
public synchronized void close() {
if (file != null) try {
file.close();
} catch (final IOException e) {
e.printStackTrace();
}
file = null;
free.clear();
free = null;
index.close();
index = null;
}
/**
* ask for the length of the primary key
* @return the length of the key
*/
public int keylength() {
return this.index.row().primaryKeyLength;
}
/**
* iterator over all keys
* @param up
* @param rotating
* @return
* @throws IOException
*/
public synchronized kelondroCloneableIterator<byte[]> keys(final boolean up, final boolean rotating) throws IOException {
return new kelondroRotateIterator<byte[]>(this.index.keys(up, null), null, this.index.size());
}
/**
* iterate over all keys
* @param up
* @param firstKey
* @return
* @throws IOException
*/
public synchronized kelondroCloneableIterator<byte[]> keys(final boolean up, final byte[] firstKey) throws IOException {
return this.index.keys(up, firstKey);
}
public long length() throws IOException {
return this.heapFile.length();
}
/**
* static iterator of entries in BLOBHeap files:
* this is used to import heap dumps into a write-enabled index heap
*/
public static class entries implements Iterator<Map.Entry<String, byte[]>>, Iterable<Map.Entry<String, byte[]>> {
DataInputStream is;
int keylen;
Map.Entry<String, byte[]> nextEntry;
public entries(final File blobFile, final int keylen) throws IOException {
if (!(blobFile.exists())) throw new IOException("file " + blobFile + " does not exist");
this.is = new DataInputStream(new BufferedInputStream(new FileInputStream(blobFile), 1024*1024));
this.keylen = keylen;
this.nextEntry = next0();
}
public boolean hasNext() {
return this.nextEntry != null;
}
private Map.Entry<String, byte[]> next0() {
try {
while (true) {
int len = is.readInt();
byte[] key = new byte[this.keylen];
if (is.read(key) < key.length) return null;
byte[] payload = new byte[len - this.keylen];
if (is.read(payload) < payload.length) return null;
if (key[0] == 0) continue; // this is an empty gap
return new entry(new String(key), payload);
}
} catch (final IOException e) {
return null;
}
}
public Map.Entry<String, byte[]> next() {
final Map.Entry<String, byte[]> n = this.nextEntry;
this.nextEntry = next0();
return n;
}
public void remove() {
throw new UnsupportedOperationException("blobs cannot be altered during read-only iteration");
}
public Iterator<Map.Entry<String, byte[]>> iterator() {
return this;
}
public void close() {
if (is != null) try { is.close(); } catch (final IOException e) {}
is = null;
}
protected void finalize() {
this.close();
}
}
public static class entry implements Map.Entry<String, byte[]> {
private String s;
private byte[] b;
public entry(final String s, final byte[] b) {
this.s = s;
this.b = b;
}
public String getKey() {
return s;
}
public byte[] getValue() {
return b;
}
public byte[] setValue(byte[] value) {
byte[] b1 = b;
b = value;
return b1;
}
}
}

@ -0,0 +1,151 @@
// kelondroBLOBHeapWriter.java
// (C) 2008 by Michael Peter Christen; mc@yacy.net, Frankfurt a. M., Germany
// first published 30.12.2008 on http://yacy.net
//
// $LastChangedDate: 2008-03-14 01:16:04 +0100 (Fr, 14 Mrz 2008) $
// $LastChangedRevision: 4558 $
// $LastChangedBy: orbiter $
//
// LICENSE
//
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 2 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
package de.anomic.kelondro;
import java.io.BufferedOutputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import de.anomic.server.logging.serverLog;
public final class kelondroBLOBHeapWriter {
private int keylength; // the length of the primary key
private kelondroBytesLongMap index; // key/seek relation for used records
private final File heapFile; // the file of the heap
private DataOutputStream os; // the output stream where the BLOB is written
private long seek; // the current write position
/*
* This class implements a BLOB management based on a sequence of records
* The data structure is:
* file :== record*
* record :== reclen key blob
* reclen :== <4 byte integer == length of key and blob>
* key :== <bytes as defined with keylen, if first byte is zero then record is empty>
* blob :== <bytes of length reclen - keylen>
* that means that each record has the size reclen+4
*
* Because the blob sizes are stored with integers, one entry may not exceed 2GB
*
* With this class a BLOB file can only be written.
* To read them, use a kelondroBLOBHeapReader.
* A BLOBHeap can be also read and write in random access mode with kelondroBLOBHeap.
*/
/**
* create a heap file: a arbitrary number of BLOBs, indexed by an access key
* The heap file will be indexed upon initialization.
* @param heapFile
* @param keylength
* @param ordering
* @throws IOException
*/
public kelondroBLOBHeapWriter(final File heapFile, final int keylength, final kelondroByteOrder ordering) throws IOException {
this.heapFile = heapFile;
this.keylength = keylength;
this.index = new kelondroBytesLongMap(keylength, ordering, 10);
this.os = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(heapFile), 1024 * 1024));
this.seek = 0;
}
/**
* add a BLOB to the heap: this adds the blob always to the end of the file
* newly added heap entries must have keys that have not been added before
* @param key
* @param blob
* @throws IOException
*/
public void add(final byte[] key, final byte[] blob) throws IOException {
assert blob.length > 0;
assert key.length == this.keylength;
assert index.row().primaryKeyLength == key.length : index.row().primaryKeyLength + "!=" + key.length;
assert index.getl(key) < 0; // must not occur before
if ((blob == null) || (blob.length == 0)) return;
int chunkl = key.length + blob.length;
os.writeInt(chunkl);
os.write(key);
os.write(blob);
index.addl(key, seek);
this.seek += chunkl + 4;
}
protected static File fingerprintIndexFile(File f) {
return new File(f.getParentFile(), f.getName() + "." + fingerprintFileHash(f) + ".idx");
}
protected static File fingerprintGapFile(File f) {
return new File(f.getParentFile(), f.getName() + "." + fingerprintFileHash(f) + ".gap");
}
protected static String fingerprintFileHash(File f) {
return kelondroDigest.fastFingerprintB64(f, false).substring(0, 12);
}
public static void deleteAllFingerprints(File f) {
File d = f.getParentFile();
String n = f.getName();
String[] l = d.list();
for (int i = 0; i < l.length; i++) {
if (l[i].startsWith(n) && (l[i].endsWith(".idx") || l[i].endsWith(".gap"))) new File(d, l[i]).delete();
}
}
/**
* close the BLOB table
* @throws
*/
public synchronized void close() {
try {
os.flush();
os.close();
} catch (final IOException e) {
e.printStackTrace();
}
os = null;
if (index.size() > 3) {
// now we can create a dump of the index and the gap information
// to speed up the next start
try {
long start = System.currentTimeMillis();
new kelondroBLOBGap().dump(fingerprintGapFile(this.heapFile));
index.dump(fingerprintIndexFile(this.heapFile));
serverLog.logInfo("kelondroBLOBHeapWriter", "wrote a dump for the " + this.index.size() + " index entries of " + heapFile.getName()+ " in " + (System.currentTimeMillis() - start) + " milliseconds.");
index.close();
index = null;
} catch (IOException e) {
e.printStackTrace();
}
} else {
// this is small.. just free resources, do not write index
index.close();
index = null;
}
}
}

@ -776,7 +776,8 @@ public class kelondroCollectionIndex {
serverLog.logSevere("kelondroCollectionIndex." + array.filename, "lost a RowCollection because of a bad arraykey (error #" + indexErrors + ")"); serverLog.logSevere("kelondroCollectionIndex." + array.filename, "lost a RowCollection because of a bad arraykey (error #" + indexErrors + ")");
return new kelondroRowSet(this.payloadrow, 0); return new kelondroRowSet(this.payloadrow, 0);
} }
final kelondroRowSet collection = new kelondroRowSet(this.payloadrow, arrayrow, 1); // FIXME: this does not yet work with different rowdef in case of several rowdef.objectsize()
final kelondroRowSet collection = new kelondroRowSet(this.payloadrow, arrayrow); // FIXME: this does not yet work with different rowdef in case of several rowdef.objectsize()
if ((!(index.row().objectOrder.wellformed(indexkey))) || (index.row().objectOrder.compare(arraykey, indexkey) != 0)) { if ((!(index.row().objectOrder.wellformed(indexkey))) || (index.row().objectOrder.compare(arraykey, indexkey) != 0)) {
// check if we got the right row; this row is wrong. Fix it: // check if we got the right row; this row is wrong. Fix it:
index.remove(indexkey); // the wrong row cannot be fixed index.remove(indexkey); // the wrong row cannot be fixed

@ -26,7 +26,6 @@ package de.anomic.kelondro;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
@ -64,9 +63,8 @@ public class kelondroRowCollection {
private static final int exp_last_read = 1; private static final int exp_last_read = 1;
private static final int exp_last_wrote = 2; private static final int exp_last_wrote = 2;
private static final int exp_order_type = 3; private static final int exp_order_type = 3;
private static final int exp_order_col = 4; private static final int exp_order_bound = 4;
private static final int exp_order_bound = 5; private static final int exp_collection = 5;
private static final int exp_collection = 6;
public kelondroRowCollection(final kelondroRowCollection rc) { public kelondroRowCollection(final kelondroRowCollection rc) {
this.rowdef = rc.rowdef; this.rowdef = rc.rowdef;
@ -95,12 +93,12 @@ public class kelondroRowCollection {
this.lastTimeWrote = System.currentTimeMillis(); this.lastTimeWrote = System.currentTimeMillis();
} }
public kelondroRowCollection(final kelondroRow rowdef, final kelondroRow.Entry exportedCollectionRowEnvironment, final int columnInEnvironment) { public kelondroRowCollection(final kelondroRow rowdef, final kelondroRow.Entry exportedCollectionRowEnvironment) {
final int chunkcachelength = exportedCollectionRowEnvironment.cellwidth(1) - exportOverheadSize;
final kelondroRow.Entry exportedCollection = exportRow(chunkcachelength).newEntry(exportedCollectionRowEnvironment, 1);
this.rowdef = rowdef; this.rowdef = rowdef;
final int chunkcachelength = exportedCollectionRowEnvironment.cellwidth(columnInEnvironment) - exportOverheadSize;
final kelondroRow.Entry exportedCollection = exportRow(chunkcachelength).newEntry(exportedCollectionRowEnvironment, columnInEnvironment);
this.chunkcount = (int) exportedCollection.getColLong(exp_chunkcount); this.chunkcount = (int) exportedCollection.getColLong(exp_chunkcount);
//assert (this.chunkcount <= chunkcachelength / rowdef.objectsize) : "chunkcount = " + this.chunkcount + ", chunkcachelength = " + chunkcachelength + ", rowdef.objectsize = " + rowdef.objectsize;
if ((this.chunkcount > chunkcachelength / rowdef.objectsize)) { if ((this.chunkcount > chunkcachelength / rowdef.objectsize)) {
serverLog.logWarning("RowCollection", "corrected wrong chunkcount; chunkcount = " + this.chunkcount + ", chunkcachelength = " + chunkcachelength + ", rowdef.objectsize = " + rowdef.objectsize); serverLog.logWarning("RowCollection", "corrected wrong chunkcount; chunkcount = " + this.chunkcount + ", chunkcachelength = " + chunkcachelength + ", rowdef.objectsize = " + rowdef.objectsize);
this.chunkcount = chunkcachelength / rowdef.objectsize; // patch problem this.chunkcount = chunkcachelength / rowdef.objectsize; // patch problem
@ -117,10 +115,7 @@ public class kelondroRowCollection {
} }
if ((rowdef.objectOrder != null) && (oldOrder != null) && (!(rowdef.objectOrder.signature().equals(oldOrder.signature())))) if ((rowdef.objectOrder != null) && (oldOrder != null) && (!(rowdef.objectOrder.signature().equals(oldOrder.signature()))))
throw new kelondroException("old collection order does not match with new order; objectOrder.signature = " + rowdef.objectOrder.signature() + ", oldOrder.signature = " + oldOrder.signature()); throw new kelondroException("old collection order does not match with new order; objectOrder.signature = " + rowdef.objectOrder.signature() + ", oldOrder.signature = " + oldOrder.signature());
if (rowdef.primaryKeyIndex != (int) exportedCollection.getColLong(exp_order_col))
throw new kelondroException("old collection primary key does not match with new primary key");
this.sortBound = (int) exportedCollection.getColLong(exp_order_bound); this.sortBound = (int) exportedCollection.getColLong(exp_order_bound);
//assert (sortBound <= chunkcount) : "sortBound = " + sortBound + ", chunkcount = " + chunkcount;
if (sortBound > chunkcount) { if (sortBound > chunkcount) {
serverLog.logWarning("RowCollection", "corrected wrong sortBound; sortBound = " + sortBound + ", chunkcount = " + chunkcount); serverLog.logWarning("RowCollection", "corrected wrong sortBound; sortBound = " + sortBound + ", chunkcount = " + chunkcount);
this.sortBound = chunkcount; this.sortBound = chunkcount;
@ -155,8 +150,7 @@ public class kelondroRowCollection {
"short lastread-2 {b256}," + // as daysSince2000 "short lastread-2 {b256}," + // as daysSince2000
"short lastwrote-2 {b256}," + // as daysSince2000 "short lastwrote-2 {b256}," + // as daysSince2000
"byte[] orderkey-2," + "byte[] orderkey-2," +
"short ordercol-2 {b256}," + "int orderbound-4 {b256}," +
"short orderbound-2 {b256}," +
"byte[] collection-" + chunkcachelength, "byte[] collection-" + chunkcachelength,
kelondroNaturalOrder.naturalOrder, 0 kelondroNaturalOrder.naturalOrder, 0
); );
@ -176,35 +170,11 @@ public class kelondroRowCollection {
entry.setCol(exp_last_read, daysSince2000(this.lastTimeRead)); entry.setCol(exp_last_read, daysSince2000(this.lastTimeRead));
entry.setCol(exp_last_wrote, daysSince2000(this.lastTimeWrote)); entry.setCol(exp_last_wrote, daysSince2000(this.lastTimeWrote));
entry.setCol(exp_order_type, (this.rowdef.objectOrder == null) ? "__".getBytes() :this.rowdef.objectOrder.signature().getBytes()); entry.setCol(exp_order_type, (this.rowdef.objectOrder == null) ? "__".getBytes() :this.rowdef.objectOrder.signature().getBytes());
entry.setCol(exp_order_col, this.rowdef.primaryKeyIndex);
entry.setCol(exp_order_bound, this.sortBound); entry.setCol(exp_order_bound, this.sortBound);
entry.setCol(exp_collection, this.chunkcache); entry.setCol(exp_collection, this.chunkcache);
return entry.bytes(); return entry.bytes();
} }
public static kelondroRowCollection importCollection(final InputStream is, final kelondroRow rowdef) throws IOException {
final byte[] byte2 = new byte[2];
final byte[] byte4 = new byte[4];
int bytesRead;
bytesRead = is.read(byte4); final int size = (int) kelondroNaturalOrder.decodeLong(byte4);
assert bytesRead == byte4.length;
bytesRead = is.read(byte2); //short lastread = (short) kelondroNaturalOrder.decodeLong(byte2);
assert bytesRead == byte2.length;
bytesRead = is.read(byte2); //short lastwrote = (short) kelondroNaturalOrder.decodeLong(byte2);
assert bytesRead == byte2.length;
bytesRead = is.read(byte2); //String orderkey = new String(byte2);
assert bytesRead == byte2.length;
bytesRead = is.read(byte2); final short ordercol = (short) kelondroNaturalOrder.decodeLong(byte2);
assert bytesRead == byte2.length;
bytesRead = is.read(byte2); final short orderbound = (short) kelondroNaturalOrder.decodeLong(byte2);
assert bytesRead == byte2.length;
assert rowdef.primaryKeyIndex == ordercol;
final byte[] chunkcache = new byte[size * rowdef.objectsize];
bytesRead = is.read(chunkcache);
assert bytesRead == chunkcache.length;
return new kelondroRowCollection(rowdef, size, chunkcache, orderbound);
}
public void saveCollection(final File file) throws IOException { public void saveCollection(final File file) throws IOException {
serverFileUtils.copy(exportCollection(), file); serverFileUtils.copy(exportCollection(), file);
} }
@ -284,7 +254,8 @@ public class kelondroRowCollection {
assert (index >= 0) : "set: access with index " + index + " is below zero"; assert (index >= 0) : "set: access with index " + index + " is below zero";
ensureSize(index + 1); ensureSize(index + 1);
a.writeToArray(chunkcache, index * rowdef.objectsize); a.writeToArray(chunkcache, index * rowdef.objectsize);
if (index >= chunkcount) chunkcount = index + 1; if (index >= this.chunkcount) this.chunkcount = index + 1;
if (index < this.sortBound) this.sortBound = index;
this.lastTimeWrote = System.currentTimeMillis(); this.lastTimeWrote = System.currentTimeMillis();
} }
@ -322,17 +293,20 @@ public class kelondroRowCollection {
assert (!(serverLog.allZero(a, astart, alength))) : "a = " + serverLog.arrayList(a, astart, alength); assert (!(serverLog.allZero(a, astart, alength))) : "a = " + serverLog.arrayList(a, astart, alength);
assert (alength > 0); assert (alength > 0);
assert (astart + alength <= a.length); assert (astart + alength <= a.length);
/*
if (bugappearance(a, astart, alength)) {
serverLog.logWarning("RowCollection", "wrong a = " + serverLog.arrayList(a, astart, alength));
//return false; // TODO: this is temporary; remote peers may still submit bad entries
}
*/
//assert (!(bugappearance(a, astart, alength))) : "a = " + serverLog.arrayList(a, astart, alength);
final int l = Math.min(rowdef.objectsize, Math.min(alength, a.length - astart)); final int l = Math.min(rowdef.objectsize, Math.min(alength, a.length - astart));
ensureSize(chunkcount + 1); ensureSize(chunkcount + 1);
System.arraycopy(a, astart, chunkcache, rowdef.objectsize * chunkcount, l); System.arraycopy(a, astart, chunkcache, rowdef.objectsize * chunkcount, l);
chunkcount++; chunkcount++;
// if possible, increase the sortbound value to suppress unnecessary sorting
if (this.chunkcount == 1) {
assert this.sortBound == 0;
this.sortBound = 1;
} else if (
this.sortBound + 1 == chunkcount &&
this.rowdef.objectOrder.compare(chunkcache, rowdef.objectsize * (chunkcount - 2), rowdef.primaryKeyLength,
chunkcache, rowdef.objectsize * (chunkcount - 1), rowdef.primaryKeyLength) == -1) {
this.sortBound = chunkcount;
}
this.lastTimeWrote = System.currentTimeMillis(); this.lastTimeWrote = System.currentTimeMillis();
} }

@ -25,7 +25,6 @@
package de.anomic.kelondro; package de.anomic.kelondro;
import java.io.DataInput; import java.io.DataInput;
import java.io.DataInputStream;
import java.io.IOException; import java.io.IOException;
import java.util.Date; import java.util.Date;
import java.util.Iterator; import java.util.Iterator;
@ -57,8 +56,14 @@ public class kelondroRowSet extends kelondroRowCollection implements kelondroInd
this.profile = new kelondroProfile(); this.profile = new kelondroProfile();
} }
public kelondroRowSet(final kelondroRow rowdef, final kelondroRow.Entry exportedCollectionRowEnvironment, final int columnInEnvironment) { /**
super(rowdef, exportedCollectionRowEnvironment, columnInEnvironment); * import an exported collection
* @param rowdef
* @param exportedCollectionRowEnvironment
* @param columnInEnvironment
*/
public kelondroRowSet(final kelondroRow rowdef, final kelondroRow.Entry exportedCollectionRowEnvironment) {
super(rowdef, exportedCollectionRowEnvironment);
assert rowdef.objectOrder != null; assert rowdef.objectOrder != null;
this.profile = new kelondroProfile(); this.profile = new kelondroProfile();
} }
@ -74,33 +79,25 @@ public class kelondroRowSet extends kelondroRowCollection implements kelondroInd
} }
public static kelondroRowSet importRowSet(final DataInput is, final kelondroRow rowdef) throws IOException { public static kelondroRowSet importRowSet(final DataInput is, final kelondroRow rowdef) throws IOException {
final byte[] byte2 = new byte[2]; final byte[] byte6 = new byte[6];
final byte[] byte4 = new byte[4]; final int size = is.readInt();
is.readFully(byte4); final int size = (int) kelondroNaturalOrder.decodeLong(byte4); is.readFully(byte6);
is.readFully(byte2); //short lastread = (short) kelondroNaturalOrder.decodeLong(byte2); //short lastread = (short) kelondroNaturalOrder.decodeLong(byte2);
is.readFully(byte2); //short lastwrote = (short) kelondroNaturalOrder.decodeLong(byte2); //short lastwrote = (short) kelondroNaturalOrder.decodeLong(byte2);
is.readFully(byte2); //String orderkey = new String(byte2); //String orderkey = new String(byte2);
is.readFully(byte2); final short ordercol = (short) kelondroNaturalOrder.decodeLong(byte2); final int orderbound = is.readInt();
is.readFully(byte2); final short orderbound = (short) kelondroNaturalOrder.decodeLong(byte2);
assert rowdef.primaryKeyIndex == ordercol;
final byte[] chunkcache = new byte[size * rowdef.objectsize]; final byte[] chunkcache = new byte[size * rowdef.objectsize];
is.readFully(chunkcache); is.readFully(chunkcache);
return new kelondroRowSet(rowdef, size, chunkcache, orderbound); return new kelondroRowSet(rowdef, size, chunkcache, orderbound);
} }
public static int skipNextRowSet(final DataInputStream is, final kelondroRow rowdef) throws IOException { public static kelondroRowSet importRowSet(byte[] b, final kelondroRow rowdef) throws IOException {
final byte[] byte2 = new byte[2]; final int size = (int) kelondroNaturalOrder.decodeLong(b, 0, 4);
final byte[] byte4 = new byte[4]; final int orderbound = (int) kelondroNaturalOrder.decodeLong(b, 10, 4);
is.readFully(byte4); final int size = (int) kelondroNaturalOrder.decodeLong(byte4); final byte[] chunkcache = new byte[size * rowdef.objectsize];
is.readFully(byte2); //short lastread = (short) kelondroNaturalOrder.decodeLong(byte2); assert b.length - exportOverheadSize == size * rowdef.objectsize;
is.readFully(byte2); //short lastwrote = (short) kelondroNaturalOrder.decodeLong(byte2); System.arraycopy(b, 14, chunkcache, 0, chunkcache.length);
is.readFully(byte2); //String orderkey = new String(byte2); return new kelondroRowSet(rowdef, size, chunkcache, orderbound);
is.readFully(byte2); final short ordercol = (short) kelondroNaturalOrder.decodeLong(byte2);
is.readFully(byte2);
assert rowdef.primaryKeyIndex == ordercol;
int skip = size * rowdef.objectsize;
while (skip > 0) skip -= is.skip(skip);
return size * rowdef.objectsize + 14;
} }
public void reset() { public void reset() {

@ -583,7 +583,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<IndexingStack.
"parseDocument", "parseDocument",
"This does the parsing of the newly loaded documents from the web. The result is not only a plain text document, but also a list of URLs that are embedded into the document. The urls are handed over to the CrawlStacker. This process has two child process queues!", "This does the parsing of the newly loaded documents from the web. The result is not only a plain text document, but also a list of URLs that are embedded into the document. The urls are handed over to the CrawlStacker. This process has two child process queues!",
new String[]{"condenseDocument", "CrawlStacker"}, new String[]{"condenseDocument", "CrawlStacker"},
this, "parseDocument", serverProcessor.useCPU + 1, indexingCondensementProcessor, serverProcessor.useCPU + 1); this, "parseDocument", 2 * serverProcessor.useCPU + 1, indexingCondensementProcessor, 2 * serverProcessor.useCPU + 1);
// deploy busy threads // deploy busy threads
log.logConfig("Starting Threads"); log.logConfig("Starting Threads");

@ -144,8 +144,8 @@ public final class plasmaWordIndex implements indexRI {
final File textindexcache = new File(indexPrimaryTextLocation, "RICACHE"); final File textindexcache = new File(indexPrimaryTextLocation, "RICACHE");
if (!(textindexcache.exists())) textindexcache.mkdirs(); if (!(textindexcache.exists())) textindexcache.mkdirs();
this.dhtOutCache = new indexRAMRI(textindexcache, indexRWIRowEntry.urlEntryRow, entityCacheMaxSize, wCacheMaxChunk, wCacheMaxAge, "index.dhtout.heap", log); this.dhtOutCache = new indexRAMRI(textindexcache, indexRWIRowEntry.urlEntryRow, entityCacheMaxSize, wCacheMaxChunk, wCacheMaxAge, "index.dhtout.heap", "index.dhtout.blob", log);
this.dhtInCache = new indexRAMRI(textindexcache, indexRWIRowEntry.urlEntryRow, entityCacheMaxSize, wCacheMaxChunk, wCacheMaxAge, "index.dhtin.heap", log); this.dhtInCache = new indexRAMRI(textindexcache, indexRWIRowEntry.urlEntryRow, entityCacheMaxSize, wCacheMaxChunk, wCacheMaxAge, "index.dhtin.heap", "index.dhtin.blob", log);
// create collections storage path // create collections storage path
final File textindexcollections = new File(indexPrimaryTextLocation, "RICOLLECTION"); final File textindexcollections = new File(indexPrimaryTextLocation, "RICOLLECTION");

Loading…
Cancel
Save