From b6bba18c379c66153c04b0c933e624817f1e38ab Mon Sep 17 00:00:00 2001 From: orbiter Date: Thu, 1 Jan 2009 22:31:16 +0000 Subject: [PATCH] replaced the storing procedure for the index ram cache with a method that generates BLOBHeap-compatible dumps this is a migration step to support a new method to store the web index, which will also based on the same data structure. made also a lot of refactoring for a better structuring of the BLOBHeap class. git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@5430 6c8d7289-2bf4-0310-a012-ef5d649a1542 --- .../de/anomic/index/indexContainerHeap.java | 97 ++++- source/de/anomic/index/indexRAMRI.java | 60 ++- .../de/anomic/kelondro/kelondroBLOBGap.java | 31 +- .../de/anomic/kelondro/kelondroBLOBHeap.java | 276 ++---------- .../kelondro/kelondroBLOBHeapReader.java | 410 ++++++++++++++++++ .../kelondro/kelondroBLOBHeapWriter.java | 151 +++++++ .../kelondro/kelondroCollectionIndex.java | 3 +- .../kelondro/kelondroRowCollection.java | 64 +-- source/de/anomic/kelondro/kelondroRowSet.java | 47 +- .../de/anomic/plasma/plasmaSwitchboard.java | 2 +- source/de/anomic/plasma/plasmaWordIndex.java | 4 +- 11 files changed, 780 insertions(+), 365 deletions(-) create mode 100644 source/de/anomic/kelondro/kelondroBLOBHeapReader.java create mode 100644 source/de/anomic/kelondro/kelondroBLOBHeapWriter.java diff --git a/source/de/anomic/index/indexContainerHeap.java b/source/de/anomic/index/indexContainerHeap.java index d412f72ea..76ba034f0 100755 --- a/source/de/anomic/index/indexContainerHeap.java +++ b/source/de/anomic/index/indexContainerHeap.java @@ -27,12 +27,9 @@ package de.anomic.index; import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; import java.io.DataInputStream; -import java.io.DataOutputStream; import java.io.File; import java.io.FileInputStream; -import java.io.FileOutputStream; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -42,8 +39,8 @@ import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; -import de.anomic.kelondro.kelondroBLOB; -import de.anomic.kelondro.kelondroBLOBHeap; +import de.anomic.kelondro.kelondroBLOBHeapReader; +import de.anomic.kelondro.kelondroBLOBHeapWriter; import de.anomic.kelondro.kelondroBase64Order; import de.anomic.kelondro.kelondroByteOrder; import de.anomic.kelondro.kelondroCloneableIterator; @@ -90,7 +87,7 @@ public final class indexContainerHeap { * @param heapFile * @throws IOException */ - public void initWriteMode(final File heapFile) throws IOException { + public void initWriteModeFromHeap(final File heapFile) throws IOException { if (log != null) log.logInfo("restoring dump for rwi heap '" + heapFile.getName() + "'"); final long start = System.currentTimeMillis(); this.cache = Collections.synchronizedSortedMap(new TreeMap(new kelondroByteOrder.StringOrder(payloadrow.getOrdering()))); @@ -103,10 +100,33 @@ public final class indexContainerHeap { urlCount += container.size(); } } - if (log != null) log.logInfo("finished rwi heap restore: " + cache.size() + " words, " + urlCount + " word/URL relations in " + ((System.currentTimeMillis() - start) / 1000) + " seconds"); + if (log != null) log.logInfo("finished rwi heap restore: " + cache.size() + " words, " + urlCount + " word/URL relations in " + (System.currentTimeMillis() - start) + " milliseconds"); } - public void dump(final File heapFile) throws IOException { + /** + * this is the new cache file format initialization + * @param heapFile + * @throws IOException + */ + public void initWriteModeFromBLOB(final File blobFile) throws IOException { + if (log != null) log.logInfo("restoring rwi blob dump '" + blobFile.getName() + "'"); + final long start = System.currentTimeMillis(); + this.cache = Collections.synchronizedSortedMap(new TreeMap(new kelondroByteOrder.StringOrder(payloadrow.getOrdering()))); + int urlCount = 0; + synchronized (cache) { + for (final indexContainer container : new blobFileEntries(blobFile, this.payloadrow)) { + // TODO: in this loop a lot of memory may be allocated. A check if the memory gets low is necessary. But what do when the memory is low? + if (container == null) break; + cache.put(container.getWordHash(), container); + urlCount += container.size(); + } + } + // remove idx and gap files if they exist here + kelondroBLOBHeapWriter.deleteAllFingerprints(blobFile); + if (log != null) log.logInfo("finished rwi blob restore: " + cache.size() + " words, " + urlCount + " word/URL relations in " + (System.currentTimeMillis() - start) + " milliseconds"); + } + /* + public void dumpold(final File heapFile) throws IOException { assert this.cache != null; if (log != null) log.logInfo("creating rwi heap dump '" + heapFile.getName() + "', " + cache.size() + " rwi's"); if (heapFile.exists()) heapFile.delete(); @@ -137,14 +157,14 @@ public final class indexContainerHeap { } os.flush(); os.close(); - if (log != null) log.logInfo("finished rwi heap dump: " + wordcount + " words, " + urlcount + " word/URL relations in " + ((System.currentTimeMillis() - startTime) / 1000) + " seconds"); + if (log != null) log.logInfo("finished rwi heap dump: " + wordcount + " words, " + urlcount + " word/URL relations in " + (System.currentTimeMillis() - startTime) + " milliseconds"); } - - public void dump2(final File heapFile) throws IOException { + */ + public void dump(final File heapFile) throws IOException { assert this.cache != null; if (log != null) log.logInfo("creating alternative rwi heap dump '" + heapFile.getName() + "', " + cache.size() + " rwi's"); if (heapFile.exists()) heapFile.delete(); - final kelondroBLOB dump = new kelondroBLOBHeap(heapFile, payloadrow.primaryKeyLength, kelondroBase64Order.enhancedCoder, 1024 * 1024 * 10); + final kelondroBLOBHeapWriter dump = new kelondroBLOBHeapWriter(heapFile, payloadrow.primaryKeyLength, kelondroBase64Order.enhancedCoder); final long startTime = System.currentTimeMillis(); long wordcount = 0, urlcount = 0; String wordHash; @@ -159,14 +179,14 @@ public final class indexContainerHeap { // put entries on heap if (container != null && wordHash.length() == payloadrow.primaryKeyLength) { - dump.put(wordHash.getBytes(), container.exportCollection()); + dump.add(wordHash.getBytes(), container.exportCollection()); urlcount += container.size(); } wordcount++; } } dump.close(); - if (log != null) log.logInfo("finished alternative rwi heap dump: " + wordcount + " words, " + urlcount + " word/URL relations in " + ((System.currentTimeMillis() - startTime) / 1000) + " seconds"); + if (log != null) log.logInfo("finished alternative rwi heap dump: " + wordcount + " words, " + urlcount + " word/URL relations in " + (System.currentTimeMillis() - startTime) + " milliseconds"); } public int size() { @@ -184,7 +204,7 @@ public final class indexContainerHeap { public heapFileEntries(final File heapFile, final kelondroRow payloadrow) throws IOException { if (!(heapFile.exists())) throw new IOException("file " + heapFile + " does not exist"); - is = new DataInputStream(new BufferedInputStream(new FileInputStream(heapFile), 64*1024)); + is = new DataInputStream(new BufferedInputStream(new FileInputStream(heapFile), 1024*1024)); word = new byte[payloadrow.primaryKeyLength]; this.payloadrow = payloadrow; this.nextContainer = next0(); @@ -231,6 +251,53 @@ public final class indexContainerHeap { } } + /** + * static iterator of BLOBHeap files: is used to import heap dumps into a write-enabled index heap + */ + public static class blobFileEntries implements Iterator, Iterable { + Iterator> blobs; + kelondroRow payloadrow; + + public blobFileEntries(final File blobFile, final kelondroRow payloadrow) throws IOException { + this.blobs = new kelondroBLOBHeapReader.entries(blobFile, payloadrow.primaryKeyLength); + this.payloadrow = payloadrow; + } + + public boolean hasNext() { + return blobs.hasNext(); + } + + /** + * return an index container + * because they may get very large, it is wise to deallocate some memory before calling next() + */ + public indexContainer next() { + try { + Map.Entry entry = blobs.next(); + byte[] payload = entry.getValue(); + return new indexContainer(entry.getKey(), kelondroRowSet.importRowSet(payload, payloadrow)); + } catch (final IOException e) { + return null; + } + } + + public void remove() { + throw new UnsupportedOperationException("heap dumps are read-only"); + } + + public Iterator iterator() { + return this; + } + + public void close() { + blobs = null; + } + + protected void finalize() { + this.close(); + } + } + public synchronized int maxReferences() { // iterate to find the max score int max = 0; diff --git a/source/de/anomic/index/indexRAMRI.java b/source/de/anomic/index/indexRAMRI.java index 5d6acb943..ae86f4927 100644 --- a/source/de/anomic/index/indexRAMRI.java +++ b/source/de/anomic/index/indexRAMRI.java @@ -47,11 +47,19 @@ public final class indexRAMRI implements indexRI, indexRIReader { public int cacheReferenceCountLimit; // the maximum number of references to a single RWI entity public long cacheReferenceAgeLimit; // the maximum age (= time not changed) of a RWI entity private final serverLog log; - private final File indexHeapFile; + private final File oldDumpFile, newDumpFile; private indexContainerHeap heap; @SuppressWarnings("unchecked") - public indexRAMRI(final File databaseRoot, final kelondroRow payloadrow, final int entityCacheMaxSize, final int wCacheReferenceCountLimitInit, final long wCacheReferenceAgeLimitInit, final String newHeapName, final serverLog log) { + public indexRAMRI( + final File databaseRoot, + final kelondroRow payloadrow, + final int entityCacheMaxSize, + final int wCacheReferenceCountLimitInit, + final long wCacheReferenceAgeLimitInit, + final String oldHeapName, + final String newHeapName, + final serverLog log) { // creates a new index cache // the cache has a back-end where indexes that do not fit in the cache are flushed @@ -62,25 +70,37 @@ public final class indexRAMRI implements indexRI, indexRIReader { this.cacheReferenceCountLimit = wCacheReferenceCountLimitInit; this.cacheReferenceAgeLimit = wCacheReferenceAgeLimitInit; this.log = log; - this.indexHeapFile = new File(databaseRoot, newHeapName); + this.oldDumpFile = new File(databaseRoot, oldHeapName); + this.newDumpFile = new File(databaseRoot, newHeapName); this.heap = new indexContainerHeap(payloadrow, log); // read in dump of last session - if (indexHeapFile.exists()) { - try { - heap.initWriteMode(indexHeapFile); - for (final indexContainer ic : (Iterable) heap.wordContainers(null, false)) { - this.hashDate.setScore(ic.getWordHash(), intTime(ic.lastWrote())); - this.hashScore.setScore(ic.getWordHash(), ic.size()); - } - } catch (final IOException e){ - log.logSevere("unable to restore cache dump: " + e.getMessage(), e); - // get empty dump - heap.initWriteMode(); - } catch (final NegativeArraySizeException e){ - log.logSevere("unable to restore cache dump: " + e.getMessage(), e); - // get empty dump - heap.initWriteMode(); + boolean initFailed = false; + if (newDumpFile.exists() && oldDumpFile.exists()) { + // we need only one, delete the old + oldDumpFile.delete(); + } + if (oldDumpFile.exists()) try { + heap.initWriteModeFromHeap(oldDumpFile); + } catch (IOException e) { + initFailed = true; + e.printStackTrace(); + } + if (newDumpFile.exists()) try { + heap.initWriteModeFromBLOB(newDumpFile); + } catch (IOException e) { + initFailed = true; + e.printStackTrace(); + } + if (initFailed) { + log.logSevere("unable to restore cache dump"); + // get empty dump + heap.initWriteMode(); + } else if (oldDumpFile.exists() || newDumpFile.exists()) { + // initialize scores for cache organization + for (final indexContainer ic : (Iterable) heap.wordContainers(null, false)) { + this.hashDate.setScore(ic.getWordHash(), intTime(ic.lastWrote())); + this.hashScore.setScore(ic.getWordHash(), ic.size()); } } else { heap.initWriteMode(); @@ -319,8 +339,8 @@ public final class indexRAMRI implements indexRI, indexRIReader { public synchronized void close() { // dump cache try { - heap.dump(this.indexHeapFile); - //heap.dump2(new File(this.indexHeapFile.getAbsolutePath() + ".blob")); + //heap.dumpold(this.oldDumpFile); + heap.dump(this.newDumpFile); } catch (final IOException e){ log.logSevere("unable to dump cache: " + e.getMessage(), e); } diff --git a/source/de/anomic/kelondro/kelondroBLOBGap.java b/source/de/anomic/kelondro/kelondroBLOBGap.java index a0c847200..72fa1b1f4 100644 --- a/source/de/anomic/kelondro/kelondroBLOBGap.java +++ b/source/de/anomic/kelondro/kelondroBLOBGap.java @@ -28,12 +28,12 @@ package de.anomic.kelondro; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.util.Iterator; import java.util.Map; import java.util.TreeMap; @@ -56,21 +56,24 @@ public class kelondroBLOBGap extends TreeMap { * initialize a kelondroBLOBGap with the content of a dump * @param file * @throws IOException + * @throws IOException */ public kelondroBLOBGap(final File file) throws IOException { super(); // read the index dump and fill the index - InputStream is = new BufferedInputStream(new FileInputStream(file), 1024 * 1024); - byte[] k = new byte[8]; - byte[] v = new byte[4]; - int c; + DataInputStream is = new DataInputStream(new BufferedInputStream(new FileInputStream(file), 1024 * 1024)); + long p; + int l; while (true) { - c = is.read(k); - if (c <= 0) break; - c = is.read(v); - if (c <= 0) break; - this.put(new Long(kelondroNaturalOrder.decodeLong(k)), new Integer((int) kelondroNaturalOrder.decodeLong(v))); + try { + p = is.readLong(); + l = is.readInt(); + this.put(new Long(p), new Integer(l)); + } catch (IOException e) { + break; + } } + is.close(); } /** @@ -81,13 +84,13 @@ public class kelondroBLOBGap extends TreeMap { */ public int dump(File file) throws IOException { Iterator> i = this.entrySet().iterator(); - OutputStream os = new BufferedOutputStream(new FileOutputStream(file), 1024 * 1024); + DataOutputStream os = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(file), 1024 * 1024)); int c = 0; Map.Entry e; while (i.hasNext()) { e = i.next(); - os.write(kelondroNaturalOrder.encodeLong(e.getKey().longValue(), 8)); - os.write(kelondroNaturalOrder.encodeLong(e.getValue().longValue(), 4)); + os.writeLong(e.getKey().longValue()); + os.writeInt(e.getValue().intValue()); c++; } os.flush(); diff --git a/source/de/anomic/kelondro/kelondroBLOBHeap.java b/source/de/anomic/kelondro/kelondroBLOBHeap.java index 91fb6b30c..aec0f03ae 100755 --- a/source/de/anomic/kelondro/kelondroBLOBHeap.java +++ b/source/de/anomic/kelondro/kelondroBLOBHeap.java @@ -32,19 +32,11 @@ import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.SortedMap; -import java.util.concurrent.ExecutionException; -import de.anomic.server.serverMemory; import de.anomic.server.logging.serverLog; -public final class kelondroBLOBHeap implements kelondroBLOB { +public final class kelondroBLOBHeap extends kelondroBLOBHeapReader implements kelondroBLOB { - private int keylength; // the length of the primary key - private kelondroBytesLongMap index; // key/seek relation for used records - private kelondroBLOBGap free; // set of {seek, size} pairs denoting space and position of free records - private final File heapFile; // the file of the heap - private final kelondroByteOrder ordering; // the ordering on keys - private kelondroCachedFileRA file; // a random access to the file private HashMap buffer; // a write buffer to limit IO to the file; attention: Maps cannot use byte[] as key private int buffersize; // bytes that are buffered in buffer private int buffermax; // maximum size of the buffer @@ -79,47 +71,16 @@ public final class kelondroBLOBHeap implements kelondroBLOB { * @param ordering * @throws IOException */ - public kelondroBLOBHeap(final File heapFile, final int keylength, final kelondroByteOrder ordering, int buffermax) throws IOException { - this.ordering = ordering; - this.heapFile = heapFile; + public kelondroBLOBHeap( + final File heapFile, + final int keylength, + final kelondroByteOrder ordering, + int buffermax) throws IOException { + super(heapFile, keylength, ordering); this.buffermax = buffermax; - this.keylength = keylength; - this.index = null; // will be created as result of initialization process - this.free = null; // will be initialized later depending on existing idx/gap file this.buffer = new HashMap(); this.buffersize = 0; - this.file = new kelondroCachedFileRA(heapFile); - - // read or initialize the index - if (initIndexReadDump(heapFile)) { - // verify that everything worked just fine - // pick some elements of the index - Iterator i = this.index.keys(true, null); - int c = 3; - byte[] b, b1 = new byte[index.row().primaryKeyLength]; - long pos; - boolean ok = true; - while (i.hasNext() && c-- > 0) { - b = i.next(); - pos = this.index.getl(b); - file.seek(pos + 4); - file.readFully(b1, 0, b1.length); - if (this.ordering.compare(b, b1) != 0) { - ok = false; - break; - } - } - if (!ok) { - serverLog.logWarning("kelondroBLOBHeap", "verification of idx file for " + heapFile.toString() + " failed, re-building index"); - initIndexReadFromHeap(); - } else { - serverLog.logInfo("kelondroBLOBHeap", "using a dump of the index of " + heapFile.toString() + "."); - } - } else { - // if we did not have a dump, create a new index - initIndexReadFromHeap(); - } - + mergeFreeEntries(); /* // DEBUG Iterator i = index.keys(true, null); @@ -138,128 +99,10 @@ public final class kelondroBLOBHeap implements kelondroBLOB { */ } - private boolean initIndexReadDump(File f) { - // look for an index dump and read it if it exist - // if this is successfull, return true; otherwise false - File fif = fingerprintIndexFile(f); - File fgf = fingerprintGapFile(f); - if (!fif.exists() || !fgf.exists()) { - deleteAllFingerprints(f); - return false; - } - - // there is an index and a gap file: - // read the index file: - try { - this.index = new kelondroBytesLongMap(this.keylength, this.ordering, fif); - } catch (IOException e) { - e.printStackTrace(); - return false; - } - // an index file is a one-time throw-away object, so just delete it now - fif.delete(); - - // read the gap file: - try { - this.free = new kelondroBLOBGap(fgf); - } catch (IOException e) { - e.printStackTrace(); - return false; - } - // same with gap file - fgf.delete(); - - // everything is fine now - return this.index.size() > 0; - } - - private static File fingerprintIndexFile(File f) { - return new File(f.getParentFile(), f.getName() + "." + fingerprintFileHash(f) + ".idx"); - } - - private static File fingerprintGapFile(File f) { - return new File(f.getParentFile(), f.getName() + "." + fingerprintFileHash(f) + ".gap"); - } - - private static String fingerprintFileHash(File f) { - return kelondroDigest.fastFingerprintB64(f, false).substring(0, 12); - } - - private static void deleteAllFingerprints(File f) { - File d = f.getParentFile(); - String n = f.getName(); - String[] l = d.list(); - for (int i = 0; i < l.length; i++) { - if (l[i].startsWith(n) && (l[i].endsWith(".idx") || l[i].endsWith(".gap"))) new File(d, l[i]).delete(); - } - } - - private void initIndexReadFromHeap() throws IOException { - // this initializes the this.index object by reading positions from the heap file - - this.free = new kelondroBLOBGap(); - kelondroBytesLongMap.initDataConsumer indexready = kelondroBytesLongMap.asynchronusInitializer(keylength, this.ordering, 0, Math.max(10, (int) (Runtime.getRuntime().freeMemory() / (10 * 1024 * 1024)))); - byte[] key = new byte[keylength]; - int reclen; - long seek = 0; - loop: while (true) { // don't test available() here because this does not work for files > 2GB - - try { - // go to seek position - file.seek(seek); - - // read length of the following record without the length of the record size bytes - reclen = file.readInt(); - //assert reclen > 0 : " reclen == 0 at seek pos " + seek; - if (reclen == 0) { - // very bad file inconsistency - serverLog.logSevere("kelondroBLOBHeap", "reclen == 0 at seek pos " + seek + " in file " + heapFile); - this.file.setLength(seek); // delete everything else at the remaining of the file :-( - break loop; - } - - // read key - file.readFully(key, 0, key.length); - - } catch (final IOException e) { - // EOF reached - break loop; // terminate loop - } - - // check if this record is empty - if (key == null || key[0] == 0) { - // it is an empty record, store to free list - if (reclen > 0) free.put(seek, reclen); - } else { - if (this.ordering.wellformed(key)) { - indexready.consume(key, seek); - key = new byte[keylength]; - } else { - serverLog.logWarning("kelondroBLOBHeap", "BLOB " + heapFile.getName() + ": skiped not wellformed key " + new String(key) + " at seek pos " + seek); - } - } - // new seek position - seek += 4L + reclen; - } - indexready.finish(); - - // do something useful in between - mergeFreeEntries(); - - // finish the index generation - try { - this.index = indexready.result(); - } catch (InterruptedException e) { - e.printStackTrace(); - } catch (ExecutionException e) { - e.printStackTrace(); - } - } - private void mergeFreeEntries() throws IOException { // try to merge free entries - if (this.free.size() > 1) { + if (super.free.size() > 1) { int merged = 0; Map.Entry lastFree, nextFree; final Iterator> i = this.free.entrySet().iterator(); @@ -286,21 +129,14 @@ public final class kelondroBLOBHeap implements kelondroBLOB { } - public String name() { - return this.heapFile.getName(); - } - /** * the number of BLOBs in the heap * @return the number of BLOBs in the heap */ public synchronized int size() { - return this.index.size() + this.buffer.size(); + return super.size() + this.buffer.size(); } - public kelondroByteOrder ordering() { - return this.ordering; - } /** * test if a key is in the heap file. This does not need any IO, because it uses only the ram index @@ -313,14 +149,7 @@ public final class kelondroBLOBHeap implements kelondroBLOB { // check the buffer if (this.buffer.containsKey(new String(key))) return true; - - // check if the file index contains the key - try { - return index.getl(key) >= 0; - } catch (final IOException e) { - e.printStackTrace(); - return false; - } + return super.has(key); } /** @@ -331,6 +160,7 @@ public final class kelondroBLOBHeap implements kelondroBLOB { */ private void add(final byte[] key, final byte[] blob) throws IOException { assert blob.length > 0; + assert key.length == this.keylength; assert index.row().primaryKeyLength == key.length : index.row().primaryKeyLength + "!=" + key.length; if ((blob == null) || (blob.length == 0)) return; final int pos = (int) file.length(); @@ -395,34 +225,7 @@ public final class kelondroBLOBHeap implements kelondroBLOB { byte[] blob = this.buffer.get(new String(key)); if (blob != null) return blob; - // check if the index contains the key - final long pos = index.getl(key); - if (pos < 0) return null; - - // access the file and read the container - file.seek(pos); - final int len = file.readInt() - index.row().primaryKeyLength; - if (serverMemory.available() < len) { - if (!serverMemory.request(len, false)) return null; // not enough memory available for this blob - } - - // read the key - final byte[] keyf = new byte[index.row().primaryKeyLength]; - file.readFully(keyf, 0, keyf.length); - if (this.ordering.compare(key, keyf) != 0) { - // verification of the indexed access failed. we must re-read the index - serverLog.logWarning("kelondroBLOBHeap", "verification indexed access for " + heapFile.toString() + " failed, re-building index"); - // this is a severe operation, it should never happen. - // but if the process ends in this state, it would completey fail - // if the index is not rebuild now at once - initIndexReadFromHeap(); - } - - // read the blob - blob = new byte[len]; - file.readFully(blob, 0, blob.length); - - return blob; + return super.get(key); } /** @@ -437,14 +240,8 @@ public final class kelondroBLOBHeap implements kelondroBLOB { // check the buffer byte[] blob = this.buffer.get(new String(key)); if (blob != null) return blob.length; - - // check if the index contains the key - final long pos = index.getl(key); - if (pos < 0) return -1; - - // access the file and read the size of the container - file.seek(pos); - return file.readInt() - index.row().primaryKeyLength; + + return super.length(key); } /** @@ -467,31 +264,32 @@ public final class kelondroBLOBHeap implements kelondroBLOB { /** * close the BLOB table - * @throws */ public synchronized void close() { shrinkWithGapsAtEnd(); - try { - flushBuffer(); - } catch (IOException e) { - e.printStackTrace(); - } - try { - file.close(); - } catch (final IOException e) { - e.printStackTrace(); + if (file != null) { + try { + flushBuffer(); + } catch (IOException e) { + e.printStackTrace(); + } + try { + file.close(); + } catch (final IOException e) { + e.printStackTrace(); + } } file = null; - if (index.size() > 3 || free.size() > 3) { + if (index != null && free != null && (index.size() > 3 || free.size() > 3)) { // now we can create a dump of the index and the gap information // to speed up the next start try { long start = System.currentTimeMillis(); - free.dump(fingerprintGapFile(this.heapFile)); + free.dump(kelondroBLOBHeapWriter.fingerprintGapFile(this.heapFile)); free.clear(); free = null; - index.dump(fingerprintIndexFile(this.heapFile)); + index.dump(kelondroBLOBHeapWriter.fingerprintIndexFile(this.heapFile)); serverLog.logInfo("kelondroBLOBHeap", "wrote a dump for the " + this.index.size() + " index entries of " + heapFile.getName()+ " in " + (System.currentTimeMillis() - start) + " milliseconds."); index.close(); index = null; @@ -506,14 +304,6 @@ public final class kelondroBLOBHeap implements kelondroBLOB { index = null; } } - - /** - * ask for the length of the primary key - * @return the length of the key - */ - public int keylength() { - return this.index.row().primaryKeyLength; - } /** * write a whole byte array as BLOB to the table @@ -783,7 +573,8 @@ public final class kelondroBLOBHeap implements kelondroBLOB { * @throws IOException */ public synchronized kelondroCloneableIterator keys(final boolean up, final boolean rotating) throws IOException { - return new kelondroRotateIterator(this.index.keys(up, null), null, this.index.size()); + this.flushBuffer(); + return super.keys(up, rotating); } /** @@ -794,11 +585,12 @@ public final class kelondroBLOBHeap implements kelondroBLOB { * @throws IOException */ public synchronized kelondroCloneableIterator keys(final boolean up, final byte[] firstKey) throws IOException { - return this.index.keys(up, firstKey); + this.flushBuffer(); + return super.keys(up, firstKey); } public long length() throws IOException { - return this.heapFile.length() + this.buffersize; + return super.length() + this.buffersize; } public static void heaptest() { diff --git a/source/de/anomic/kelondro/kelondroBLOBHeapReader.java b/source/de/anomic/kelondro/kelondroBLOBHeapReader.java new file mode 100644 index 000000000..043d039f3 --- /dev/null +++ b/source/de/anomic/kelondro/kelondroBLOBHeapReader.java @@ -0,0 +1,410 @@ +// kelondroBLOBHeapReader.java +// (C) 2008 by Michael Peter Christen; mc@yacy.net, Frankfurt a. M., Germany +// first published 30.12.2008 on http://yacy.net +// +// $LastChangedDate: 2008-03-14 01:16:04 +0100 (Fr, 14 Mrz 2008) $ +// $LastChangedRevision: 4558 $ +// $LastChangedBy: orbiter $ +// +// LICENSE +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation; either version 2 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +package de.anomic.kelondro; + +import java.io.BufferedInputStream; +import java.io.DataInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ExecutionException; + +import de.anomic.server.serverMemory; +import de.anomic.server.logging.serverLog; + +public class kelondroBLOBHeapReader { + + protected int keylength; // the length of the primary key + protected kelondroBytesLongMap index; // key/seek relation for used records + protected kelondroBLOBGap free; // set of {seek, size} pairs denoting space and position of free records + protected final File heapFile; // the file of the heap + protected final kelondroByteOrder ordering; // the ordering on keys + protected kelondroCachedFileRA file; // a random access to the file + + public kelondroBLOBHeapReader( + final File heapFile, + final int keylength, + final kelondroByteOrder ordering) throws IOException { + this.ordering = ordering; + this.heapFile = heapFile; + this.keylength = keylength; + this.index = null; // will be created as result of initialization process + this.free = null; // will be initialized later depending on existing idx/gap file + this.file = new kelondroCachedFileRA(heapFile); + + // read or initialize the index + if (initIndexReadDump(heapFile)) { + // verify that everything worked just fine + // pick some elements of the index + Iterator i = this.index.keys(true, null); + int c = 3; + byte[] b, b1 = new byte[index.row().primaryKeyLength]; + long pos; + boolean ok = true; + while (i.hasNext() && c-- > 0) { + b = i.next(); + pos = this.index.getl(b); + file.seek(pos + 4); + file.readFully(b1, 0, b1.length); + if (this.ordering.compare(b, b1) != 0) { + ok = false; + break; + } + } + if (!ok) { + serverLog.logWarning("kelondroBLOBHeap", "verification of idx file for " + heapFile.toString() + " failed, re-building index"); + initIndexReadFromHeap(); + } else { + serverLog.logInfo("kelondroBLOBHeap", "using a dump of the index of " + heapFile.toString() + "."); + } + } else { + // if we did not have a dump, create a new index + initIndexReadFromHeap(); + } + } + + private boolean initIndexReadDump(File f) { + // look for an index dump and read it if it exist + // if this is successfull, return true; otherwise false + File fif = kelondroBLOBHeapWriter.fingerprintIndexFile(f); + File fgf = kelondroBLOBHeapWriter.fingerprintGapFile(f); + if (!fif.exists() || !fgf.exists()) { + kelondroBLOBHeapWriter.deleteAllFingerprints(f); + return false; + } + + // there is an index and a gap file: + // read the index file: + try { + this.index = new kelondroBytesLongMap(this.keylength, this.ordering, fif); + } catch (IOException e) { + e.printStackTrace(); + return false; + } + // an index file is a one-time throw-away object, so just delete it now + fif.delete(); + + // read the gap file: + try { + this.free = new kelondroBLOBGap(fgf); + } catch (IOException e) { + e.printStackTrace(); + return false; + } + // same with gap file + fgf.delete(); + + // everything is fine now + return this.index.size() > 0; + } + + private void initIndexReadFromHeap() throws IOException { + // this initializes the this.index object by reading positions from the heap file + + this.free = new kelondroBLOBGap(); + kelondroBytesLongMap.initDataConsumer indexready = kelondroBytesLongMap.asynchronusInitializer(keylength, this.ordering, 0, Math.max(10, (int) (Runtime.getRuntime().freeMemory() / (10 * 1024 * 1024)))); + byte[] key = new byte[keylength]; + int reclen; + long seek = 0; + loop: while (true) { // don't test available() here because this does not work for files > 2GB + + try { + // go to seek position + file.seek(seek); + + // read length of the following record without the length of the record size bytes + reclen = file.readInt(); + //assert reclen > 0 : " reclen == 0 at seek pos " + seek; + if (reclen == 0) { + // very bad file inconsistency + serverLog.logSevere("kelondroBLOBHeap", "reclen == 0 at seek pos " + seek + " in file " + heapFile); + this.file.setLength(seek); // delete everything else at the remaining of the file :-( + break loop; + } + + // read key + file.readFully(key, 0, key.length); + + } catch (final IOException e) { + // EOF reached + break loop; // terminate loop + } + + // check if this record is empty + if (key == null || key[0] == 0) { + // it is an empty record, store to free list + if (reclen > 0) free.put(seek, reclen); + } else { + if (this.ordering.wellformed(key)) { + indexready.consume(key, seek); + key = new byte[keylength]; + } else { + serverLog.logWarning("kelondroBLOBHeap", "BLOB " + heapFile.getName() + ": skiped not wellformed key " + new String(key) + " at seek pos " + seek); + } + } + // new seek position + seek += 4L + reclen; + } + indexready.finish(); + + // finish the index generation + try { + this.index = indexready.result(); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (ExecutionException e) { + e.printStackTrace(); + } + } + + public String name() { + return this.heapFile.getName(); + } + + /** + * the number of BLOBs in the heap + * @return the number of BLOBs in the heap + */ + public synchronized int size() { + return this.index.size(); + } + + /** + * test if a key is in the heap file. This does not need any IO, because it uses only the ram index + * @param key + * @return true if the key exists, false otherwise + */ + public synchronized boolean has(final byte[] key) { + assert index != null; + assert index.row().primaryKeyLength == key.length : index.row().primaryKeyLength + "!=" + key.length; + + // check if the file index contains the key + try { + return index.getl(key) >= 0; + } catch (final IOException e) { + e.printStackTrace(); + return false; + } + } + + public kelondroByteOrder ordering() { + return this.ordering; + } + + /** + * read a blob from the heap + * @param key + * @return + * @throws IOException + */ + public synchronized byte[] get(final byte[] key) throws IOException { + assert index.row().primaryKeyLength == key.length : index.row().primaryKeyLength + "!=" + key.length; + + // check if the index contains the key + final long pos = index.getl(key); + if (pos < 0) return null; + + // access the file and read the container + file.seek(pos); + final int len = file.readInt() - index.row().primaryKeyLength; + if (serverMemory.available() < len) { + if (!serverMemory.request(len, false)) return null; // not enough memory available for this blob + } + + // read the key + final byte[] keyf = new byte[index.row().primaryKeyLength]; + file.readFully(keyf, 0, keyf.length); + if (this.ordering.compare(key, keyf) != 0) { + // verification of the indexed access failed. we must re-read the index + serverLog.logWarning("kelondroBLOBHeap", "verification indexed access for " + heapFile.toString() + " failed, re-building index"); + // this is a severe operation, it should never happen. + // but if the process ends in this state, it would completely fail + // if the index is not rebuild now at once + initIndexReadFromHeap(); + } + + // read the blob + byte[] blob = new byte[len]; + file.readFully(blob, 0, blob.length); + + return blob; + } + + /** + * retrieve the size of the BLOB + * @param key + * @return the size of the BLOB or -1 if the BLOB does not exist + * @throws IOException + */ + public long length(byte[] key) throws IOException { + assert index.row().primaryKeyLength == key.length : index.row().primaryKeyLength + "!=" + key.length; + + // check if the index contains the key + final long pos = index.getl(key); + if (pos < 0) return -1; + + // access the file and read the size of the container + file.seek(pos); + return file.readInt() - index.row().primaryKeyLength; + } + + /** + * close the BLOB table + */ + public synchronized void close() { + if (file != null) try { + file.close(); + } catch (final IOException e) { + e.printStackTrace(); + } + file = null; + + free.clear(); + free = null; + index.close(); + index = null; + } + + /** + * ask for the length of the primary key + * @return the length of the key + */ + public int keylength() { + return this.index.row().primaryKeyLength; + } + + /** + * iterator over all keys + * @param up + * @param rotating + * @return + * @throws IOException + */ + public synchronized kelondroCloneableIterator keys(final boolean up, final boolean rotating) throws IOException { + return new kelondroRotateIterator(this.index.keys(up, null), null, this.index.size()); + } + + /** + * iterate over all keys + * @param up + * @param firstKey + * @return + * @throws IOException + */ + public synchronized kelondroCloneableIterator keys(final boolean up, final byte[] firstKey) throws IOException { + return this.index.keys(up, firstKey); + } + + public long length() throws IOException { + return this.heapFile.length(); + } + + /** + * static iterator of entries in BLOBHeap files: + * this is used to import heap dumps into a write-enabled index heap + */ + public static class entries implements Iterator>, Iterable> { + DataInputStream is; + int keylen; + Map.Entry nextEntry; + + public entries(final File blobFile, final int keylen) throws IOException { + if (!(blobFile.exists())) throw new IOException("file " + blobFile + " does not exist"); + this.is = new DataInputStream(new BufferedInputStream(new FileInputStream(blobFile), 1024*1024)); + this.keylen = keylen; + this.nextEntry = next0(); + } + + public boolean hasNext() { + return this.nextEntry != null; + } + + private Map.Entry next0() { + try { + while (true) { + int len = is.readInt(); + byte[] key = new byte[this.keylen]; + if (is.read(key) < key.length) return null; + byte[] payload = new byte[len - this.keylen]; + if (is.read(payload) < payload.length) return null; + if (key[0] == 0) continue; // this is an empty gap + return new entry(new String(key), payload); + } + } catch (final IOException e) { + return null; + } + } + + public Map.Entry next() { + final Map.Entry n = this.nextEntry; + this.nextEntry = next0(); + return n; + } + + public void remove() { + throw new UnsupportedOperationException("blobs cannot be altered during read-only iteration"); + } + + public Iterator> iterator() { + return this; + } + + public void close() { + if (is != null) try { is.close(); } catch (final IOException e) {} + is = null; + } + + protected void finalize() { + this.close(); + } + } + + public static class entry implements Map.Entry { + private String s; + private byte[] b; + + public entry(final String s, final byte[] b) { + this.s = s; + this.b = b; + } + + public String getKey() { + return s; + } + + public byte[] getValue() { + return b; + } + + public byte[] setValue(byte[] value) { + byte[] b1 = b; + b = value; + return b1; + } + } + +} diff --git a/source/de/anomic/kelondro/kelondroBLOBHeapWriter.java b/source/de/anomic/kelondro/kelondroBLOBHeapWriter.java new file mode 100644 index 000000000..b791ec7bb --- /dev/null +++ b/source/de/anomic/kelondro/kelondroBLOBHeapWriter.java @@ -0,0 +1,151 @@ +// kelondroBLOBHeapWriter.java +// (C) 2008 by Michael Peter Christen; mc@yacy.net, Frankfurt a. M., Germany +// first published 30.12.2008 on http://yacy.net +// +// $LastChangedDate: 2008-03-14 01:16:04 +0100 (Fr, 14 Mrz 2008) $ +// $LastChangedRevision: 4558 $ +// $LastChangedBy: orbiter $ +// +// LICENSE +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation; either version 2 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +package de.anomic.kelondro; + +import java.io.BufferedOutputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; + +import de.anomic.server.logging.serverLog; + +public final class kelondroBLOBHeapWriter { + + private int keylength; // the length of the primary key + private kelondroBytesLongMap index; // key/seek relation for used records + private final File heapFile; // the file of the heap + private DataOutputStream os; // the output stream where the BLOB is written + private long seek; // the current write position + + /* + * This class implements a BLOB management based on a sequence of records + * The data structure is: + * file :== record* + * record :== reclen key blob + * reclen :== <4 byte integer == length of key and blob> + * key :== + * blob :== + * that means that each record has the size reclen+4 + * + * Because the blob sizes are stored with integers, one entry may not exceed 2GB + * + * With this class a BLOB file can only be written. + * To read them, use a kelondroBLOBHeapReader. + * A BLOBHeap can be also read and write in random access mode with kelondroBLOBHeap. + */ + + /** + * create a heap file: a arbitrary number of BLOBs, indexed by an access key + * The heap file will be indexed upon initialization. + * @param heapFile + * @param keylength + * @param ordering + * @throws IOException + */ + public kelondroBLOBHeapWriter(final File heapFile, final int keylength, final kelondroByteOrder ordering) throws IOException { + this.heapFile = heapFile; + this.keylength = keylength; + this.index = new kelondroBytesLongMap(keylength, ordering, 10); + this.os = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(heapFile), 1024 * 1024)); + this.seek = 0; + } + + /** + * add a BLOB to the heap: this adds the blob always to the end of the file + * newly added heap entries must have keys that have not been added before + * @param key + * @param blob + * @throws IOException + */ + public void add(final byte[] key, final byte[] blob) throws IOException { + assert blob.length > 0; + assert key.length == this.keylength; + assert index.row().primaryKeyLength == key.length : index.row().primaryKeyLength + "!=" + key.length; + assert index.getl(key) < 0; // must not occur before + if ((blob == null) || (blob.length == 0)) return; + int chunkl = key.length + blob.length; + os.writeInt(chunkl); + os.write(key); + os.write(blob); + index.addl(key, seek); + this.seek += chunkl + 4; + } + + protected static File fingerprintIndexFile(File f) { + return new File(f.getParentFile(), f.getName() + "." + fingerprintFileHash(f) + ".idx"); + } + + protected static File fingerprintGapFile(File f) { + return new File(f.getParentFile(), f.getName() + "." + fingerprintFileHash(f) + ".gap"); + } + + protected static String fingerprintFileHash(File f) { + return kelondroDigest.fastFingerprintB64(f, false).substring(0, 12); + } + + public static void deleteAllFingerprints(File f) { + File d = f.getParentFile(); + String n = f.getName(); + String[] l = d.list(); + for (int i = 0; i < l.length; i++) { + if (l[i].startsWith(n) && (l[i].endsWith(".idx") || l[i].endsWith(".gap"))) new File(d, l[i]).delete(); + } + } + + /** + * close the BLOB table + * @throws + */ + public synchronized void close() { + try { + os.flush(); + os.close(); + } catch (final IOException e) { + e.printStackTrace(); + } + os = null; + + if (index.size() > 3) { + // now we can create a dump of the index and the gap information + // to speed up the next start + try { + long start = System.currentTimeMillis(); + new kelondroBLOBGap().dump(fingerprintGapFile(this.heapFile)); + index.dump(fingerprintIndexFile(this.heapFile)); + serverLog.logInfo("kelondroBLOBHeapWriter", "wrote a dump for the " + this.index.size() + " index entries of " + heapFile.getName()+ " in " + (System.currentTimeMillis() - start) + " milliseconds."); + index.close(); + index = null; + } catch (IOException e) { + e.printStackTrace(); + } + } else { + // this is small.. just free resources, do not write index + index.close(); + index = null; + } + } + +} diff --git a/source/de/anomic/kelondro/kelondroCollectionIndex.java b/source/de/anomic/kelondro/kelondroCollectionIndex.java index a040b907c..74a037d0f 100644 --- a/source/de/anomic/kelondro/kelondroCollectionIndex.java +++ b/source/de/anomic/kelondro/kelondroCollectionIndex.java @@ -776,7 +776,8 @@ public class kelondroCollectionIndex { serverLog.logSevere("kelondroCollectionIndex." + array.filename, "lost a RowCollection because of a bad arraykey (error #" + indexErrors + ")"); return new kelondroRowSet(this.payloadrow, 0); } - final kelondroRowSet collection = new kelondroRowSet(this.payloadrow, arrayrow, 1); // FIXME: this does not yet work with different rowdef in case of several rowdef.objectsize() + + final kelondroRowSet collection = new kelondroRowSet(this.payloadrow, arrayrow); // FIXME: this does not yet work with different rowdef in case of several rowdef.objectsize() if ((!(index.row().objectOrder.wellformed(indexkey))) || (index.row().objectOrder.compare(arraykey, indexkey) != 0)) { // check if we got the right row; this row is wrong. Fix it: index.remove(indexkey); // the wrong row cannot be fixed diff --git a/source/de/anomic/kelondro/kelondroRowCollection.java b/source/de/anomic/kelondro/kelondroRowCollection.java index 2689e8f08..4719bae36 100644 --- a/source/de/anomic/kelondro/kelondroRowCollection.java +++ b/source/de/anomic/kelondro/kelondroRowCollection.java @@ -26,7 +26,6 @@ package de.anomic.kelondro; import java.io.File; import java.io.IOException; -import java.io.InputStream; import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -64,9 +63,8 @@ public class kelondroRowCollection { private static final int exp_last_read = 1; private static final int exp_last_wrote = 2; private static final int exp_order_type = 3; - private static final int exp_order_col = 4; - private static final int exp_order_bound = 5; - private static final int exp_collection = 6; + private static final int exp_order_bound = 4; + private static final int exp_collection = 5; public kelondroRowCollection(final kelondroRowCollection rc) { this.rowdef = rc.rowdef; @@ -95,12 +93,12 @@ public class kelondroRowCollection { this.lastTimeWrote = System.currentTimeMillis(); } - public kelondroRowCollection(final kelondroRow rowdef, final kelondroRow.Entry exportedCollectionRowEnvironment, final int columnInEnvironment) { + public kelondroRowCollection(final kelondroRow rowdef, final kelondroRow.Entry exportedCollectionRowEnvironment) { + final int chunkcachelength = exportedCollectionRowEnvironment.cellwidth(1) - exportOverheadSize; + final kelondroRow.Entry exportedCollection = exportRow(chunkcachelength).newEntry(exportedCollectionRowEnvironment, 1); + this.rowdef = rowdef; - final int chunkcachelength = exportedCollectionRowEnvironment.cellwidth(columnInEnvironment) - exportOverheadSize; - final kelondroRow.Entry exportedCollection = exportRow(chunkcachelength).newEntry(exportedCollectionRowEnvironment, columnInEnvironment); this.chunkcount = (int) exportedCollection.getColLong(exp_chunkcount); - //assert (this.chunkcount <= chunkcachelength / rowdef.objectsize) : "chunkcount = " + this.chunkcount + ", chunkcachelength = " + chunkcachelength + ", rowdef.objectsize = " + rowdef.objectsize; if ((this.chunkcount > chunkcachelength / rowdef.objectsize)) { serverLog.logWarning("RowCollection", "corrected wrong chunkcount; chunkcount = " + this.chunkcount + ", chunkcachelength = " + chunkcachelength + ", rowdef.objectsize = " + rowdef.objectsize); this.chunkcount = chunkcachelength / rowdef.objectsize; // patch problem @@ -117,10 +115,7 @@ public class kelondroRowCollection { } if ((rowdef.objectOrder != null) && (oldOrder != null) && (!(rowdef.objectOrder.signature().equals(oldOrder.signature())))) throw new kelondroException("old collection order does not match with new order; objectOrder.signature = " + rowdef.objectOrder.signature() + ", oldOrder.signature = " + oldOrder.signature()); - if (rowdef.primaryKeyIndex != (int) exportedCollection.getColLong(exp_order_col)) - throw new kelondroException("old collection primary key does not match with new primary key"); this.sortBound = (int) exportedCollection.getColLong(exp_order_bound); - //assert (sortBound <= chunkcount) : "sortBound = " + sortBound + ", chunkcount = " + chunkcount; if (sortBound > chunkcount) { serverLog.logWarning("RowCollection", "corrected wrong sortBound; sortBound = " + sortBound + ", chunkcount = " + chunkcount); this.sortBound = chunkcount; @@ -155,8 +150,7 @@ public class kelondroRowCollection { "short lastread-2 {b256}," + // as daysSince2000 "short lastwrote-2 {b256}," + // as daysSince2000 "byte[] orderkey-2," + - "short ordercol-2 {b256}," + - "short orderbound-2 {b256}," + + "int orderbound-4 {b256}," + "byte[] collection-" + chunkcachelength, kelondroNaturalOrder.naturalOrder, 0 ); @@ -176,35 +170,11 @@ public class kelondroRowCollection { entry.setCol(exp_last_read, daysSince2000(this.lastTimeRead)); entry.setCol(exp_last_wrote, daysSince2000(this.lastTimeWrote)); entry.setCol(exp_order_type, (this.rowdef.objectOrder == null) ? "__".getBytes() :this.rowdef.objectOrder.signature().getBytes()); - entry.setCol(exp_order_col, this.rowdef.primaryKeyIndex); entry.setCol(exp_order_bound, this.sortBound); entry.setCol(exp_collection, this.chunkcache); return entry.bytes(); } - public static kelondroRowCollection importCollection(final InputStream is, final kelondroRow rowdef) throws IOException { - final byte[] byte2 = new byte[2]; - final byte[] byte4 = new byte[4]; - int bytesRead; - bytesRead = is.read(byte4); final int size = (int) kelondroNaturalOrder.decodeLong(byte4); - assert bytesRead == byte4.length; - bytesRead = is.read(byte2); //short lastread = (short) kelondroNaturalOrder.decodeLong(byte2); - assert bytesRead == byte2.length; - bytesRead = is.read(byte2); //short lastwrote = (short) kelondroNaturalOrder.decodeLong(byte2); - assert bytesRead == byte2.length; - bytesRead = is.read(byte2); //String orderkey = new String(byte2); - assert bytesRead == byte2.length; - bytesRead = is.read(byte2); final short ordercol = (short) kelondroNaturalOrder.decodeLong(byte2); - assert bytesRead == byte2.length; - bytesRead = is.read(byte2); final short orderbound = (short) kelondroNaturalOrder.decodeLong(byte2); - assert bytesRead == byte2.length; - assert rowdef.primaryKeyIndex == ordercol; - final byte[] chunkcache = new byte[size * rowdef.objectsize]; - bytesRead = is.read(chunkcache); - assert bytesRead == chunkcache.length; - return new kelondroRowCollection(rowdef, size, chunkcache, orderbound); - } - public void saveCollection(final File file) throws IOException { serverFileUtils.copy(exportCollection(), file); } @@ -284,7 +254,8 @@ public class kelondroRowCollection { assert (index >= 0) : "set: access with index " + index + " is below zero"; ensureSize(index + 1); a.writeToArray(chunkcache, index * rowdef.objectsize); - if (index >= chunkcount) chunkcount = index + 1; + if (index >= this.chunkcount) this.chunkcount = index + 1; + if (index < this.sortBound) this.sortBound = index; this.lastTimeWrote = System.currentTimeMillis(); } @@ -322,17 +293,20 @@ public class kelondroRowCollection { assert (!(serverLog.allZero(a, astart, alength))) : "a = " + serverLog.arrayList(a, astart, alength); assert (alength > 0); assert (astart + alength <= a.length); - /* - if (bugappearance(a, astart, alength)) { - serverLog.logWarning("RowCollection", "wrong a = " + serverLog.arrayList(a, astart, alength)); - //return false; // TODO: this is temporary; remote peers may still submit bad entries - } - */ - //assert (!(bugappearance(a, astart, alength))) : "a = " + serverLog.arrayList(a, astart, alength); final int l = Math.min(rowdef.objectsize, Math.min(alength, a.length - astart)); ensureSize(chunkcount + 1); System.arraycopy(a, astart, chunkcache, rowdef.objectsize * chunkcount, l); chunkcount++; + // if possible, increase the sortbound value to suppress unnecessary sorting + if (this.chunkcount == 1) { + assert this.sortBound == 0; + this.sortBound = 1; + } else if ( + this.sortBound + 1 == chunkcount && + this.rowdef.objectOrder.compare(chunkcache, rowdef.objectsize * (chunkcount - 2), rowdef.primaryKeyLength, + chunkcache, rowdef.objectsize * (chunkcount - 1), rowdef.primaryKeyLength) == -1) { + this.sortBound = chunkcount; + } this.lastTimeWrote = System.currentTimeMillis(); } diff --git a/source/de/anomic/kelondro/kelondroRowSet.java b/source/de/anomic/kelondro/kelondroRowSet.java index 5fa16e79c..e839e0256 100644 --- a/source/de/anomic/kelondro/kelondroRowSet.java +++ b/source/de/anomic/kelondro/kelondroRowSet.java @@ -25,7 +25,6 @@ package de.anomic.kelondro; import java.io.DataInput; -import java.io.DataInputStream; import java.io.IOException; import java.util.Date; import java.util.Iterator; @@ -57,8 +56,14 @@ public class kelondroRowSet extends kelondroRowCollection implements kelondroInd this.profile = new kelondroProfile(); } - public kelondroRowSet(final kelondroRow rowdef, final kelondroRow.Entry exportedCollectionRowEnvironment, final int columnInEnvironment) { - super(rowdef, exportedCollectionRowEnvironment, columnInEnvironment); + /** + * import an exported collection + * @param rowdef + * @param exportedCollectionRowEnvironment + * @param columnInEnvironment + */ + public kelondroRowSet(final kelondroRow rowdef, final kelondroRow.Entry exportedCollectionRowEnvironment) { + super(rowdef, exportedCollectionRowEnvironment); assert rowdef.objectOrder != null; this.profile = new kelondroProfile(); } @@ -74,33 +79,25 @@ public class kelondroRowSet extends kelondroRowCollection implements kelondroInd } public static kelondroRowSet importRowSet(final DataInput is, final kelondroRow rowdef) throws IOException { - final byte[] byte2 = new byte[2]; - final byte[] byte4 = new byte[4]; - is.readFully(byte4); final int size = (int) kelondroNaturalOrder.decodeLong(byte4); - is.readFully(byte2); //short lastread = (short) kelondroNaturalOrder.decodeLong(byte2); - is.readFully(byte2); //short lastwrote = (short) kelondroNaturalOrder.decodeLong(byte2); - is.readFully(byte2); //String orderkey = new String(byte2); - is.readFully(byte2); final short ordercol = (short) kelondroNaturalOrder.decodeLong(byte2); - is.readFully(byte2); final short orderbound = (short) kelondroNaturalOrder.decodeLong(byte2); - assert rowdef.primaryKeyIndex == ordercol; + final byte[] byte6 = new byte[6]; + final int size = is.readInt(); + is.readFully(byte6); + //short lastread = (short) kelondroNaturalOrder.decodeLong(byte2); + //short lastwrote = (short) kelondroNaturalOrder.decodeLong(byte2); + //String orderkey = new String(byte2); + final int orderbound = is.readInt(); final byte[] chunkcache = new byte[size * rowdef.objectsize]; is.readFully(chunkcache); return new kelondroRowSet(rowdef, size, chunkcache, orderbound); } - public static int skipNextRowSet(final DataInputStream is, final kelondroRow rowdef) throws IOException { - final byte[] byte2 = new byte[2]; - final byte[] byte4 = new byte[4]; - is.readFully(byte4); final int size = (int) kelondroNaturalOrder.decodeLong(byte4); - is.readFully(byte2); //short lastread = (short) kelondroNaturalOrder.decodeLong(byte2); - is.readFully(byte2); //short lastwrote = (short) kelondroNaturalOrder.decodeLong(byte2); - is.readFully(byte2); //String orderkey = new String(byte2); - is.readFully(byte2); final short ordercol = (short) kelondroNaturalOrder.decodeLong(byte2); - is.readFully(byte2); - assert rowdef.primaryKeyIndex == ordercol; - int skip = size * rowdef.objectsize; - while (skip > 0) skip -= is.skip(skip); - return size * rowdef.objectsize + 14; + public static kelondroRowSet importRowSet(byte[] b, final kelondroRow rowdef) throws IOException { + final int size = (int) kelondroNaturalOrder.decodeLong(b, 0, 4); + final int orderbound = (int) kelondroNaturalOrder.decodeLong(b, 10, 4); + final byte[] chunkcache = new byte[size * rowdef.objectsize]; + assert b.length - exportOverheadSize == size * rowdef.objectsize; + System.arraycopy(b, 14, chunkcache, 0, chunkcache.length); + return new kelondroRowSet(rowdef, size, chunkcache, orderbound); } public void reset() { diff --git a/source/de/anomic/plasma/plasmaSwitchboard.java b/source/de/anomic/plasma/plasmaSwitchboard.java index 308c3d78d..03292a724 100644 --- a/source/de/anomic/plasma/plasmaSwitchboard.java +++ b/source/de/anomic/plasma/plasmaSwitchboard.java @@ -583,7 +583,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch