From 9b0c4b1063f3ab8aa725f00fbf93c37cf88d8a4d Mon Sep 17 00:00:00 2001 From: orbiter Date: Sun, 19 Oct 2008 22:30:44 +0000 Subject: [PATCH] redesign of parts of the new BLOB buffer git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@5287 6c8d7289-2bf4-0310-a012-ef5d649a1542 --- source/de/anomic/kelondro/kelondroBLOB.java | 6 + .../de/anomic/kelondro/kelondroBLOBArray.java | 4 + .../anomic/kelondro/kelondroBLOBBuffer.java | 231 +++++++++--------- .../de/anomic/kelondro/kelondroBLOBHeap.java | 6 +- .../de/anomic/kelondro/kelondroBLOBTree.java | 4 + .../anomic/kelondro/kelondroBytesIntMap.java | 4 + source/de/anomic/plasma/plasmaHTCache.java | 1 - 7 files changed, 132 insertions(+), 124 deletions(-) diff --git a/source/de/anomic/kelondro/kelondroBLOB.java b/source/de/anomic/kelondro/kelondroBLOB.java index 74e1385a8..5cd6f4a1e 100644 --- a/source/de/anomic/kelondro/kelondroBLOB.java +++ b/source/de/anomic/kelondro/kelondroBLOB.java @@ -36,6 +36,12 @@ public interface kelondroBLOB { */ public int keylength(); + /** + * return the underlying odering + * @return + */ + public kelondroByteOrder ordering(); + /** * clears the content of the database * @throws IOException diff --git a/source/de/anomic/kelondro/kelondroBLOBArray.java b/source/de/anomic/kelondro/kelondroBLOBArray.java index 6f36c870e..c01ef48e4 100755 --- a/source/de/anomic/kelondro/kelondroBLOBArray.java +++ b/source/de/anomic/kelondro/kelondroBLOBArray.java @@ -148,6 +148,10 @@ public class kelondroBLOBArray implements kelondroBLOB { return s; } + public kelondroByteOrder ordering() { + return this.ordering; + } + private class blobItem { Date creation; File location; diff --git a/source/de/anomic/kelondro/kelondroBLOBBuffer.java b/source/de/anomic/kelondro/kelondroBLOBBuffer.java index 9a6d79be3..93cac6d09 100644 --- a/source/de/anomic/kelondro/kelondroBLOBBuffer.java +++ b/source/de/anomic/kelondro/kelondroBLOBBuffer.java @@ -35,7 +35,6 @@ import java.io.OutputStream; import java.util.Iterator; import java.util.Map; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.Semaphore; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; @@ -45,13 +44,12 @@ public class kelondroBLOBBuffer extends Thread implements kelondroBLOB { static byte[] plainMagic = {(byte) 'p', (byte) '|'}; // magic for plain content (no encoding) private kelondroBLOB backend; - private boolean executing, shallRun; - private Semaphore shutdownControl; // steering of close method - private boolean compress; // if true then files should be written compressed private LinkedBlockingQueue> rawQueue; // entries which are not compressed, format is RAW (without magic) private LinkedBlockingQueue> compressedQueue; // entries which are compressed, format is with leading magic + private kelondroBytesIntMap contentLength; private long queueLength; private long maxCacheSize; + private int cdr; private class Entry implements Map.Entry { @@ -81,41 +79,27 @@ public class kelondroBLOBBuffer extends Thread implements kelondroBLOB { public kelondroBLOBBuffer(kelondroBLOB backend, long cachesize, boolean compress) { this.backend = backend; this.maxCacheSize = cachesize; - this.compress = compress; - this.executing = false; - this.shallRun = false; - this.shutdownControl = null; - assert !this.executing || this.compress; - initQueues(); + cdr = 0; + initQueues(compress); } public synchronized void clear() throws IOException { - shutdown(); - initQueues(); + initQueues(this.compressedQueue != null); this.backend.clear(); - if (this.executing) this.start(); } - private void initQueues() { - /* - * executing = false, compress = false: Queue = used, CompressedQueue = null - * files are written in the uncompressed-queue and are written from there into the database - * - * executing = false, compress = true : Queue = null, CompressedQueue = used - * files are compressed when they arrive and written to the compressed queue - * - * executing = true, compress = false: status is not allowed, this does not make sense because the additional thread is only for compression of files - * - * executing = true, compress = true : Queue = used, CompressedQueue = used - * files are written uncompressed to the uncompressed-queue and compressed with the concurrent thread - */ + private void initQueues(boolean compress) { this.rawQueue = new LinkedBlockingQueue>(); this.compressedQueue = (compress) ? new LinkedBlockingQueue>() : null; + this.contentLength = new kelondroBytesIntMap(backend.keylength(), backend.ordering(), 500); this.queueLength = 0; } + public kelondroByteOrder ordering() { + return this.backend.ordering(); + } + public synchronized void close() { - shutdown(); // no more thread is running, flush all queues try { flushAll(); @@ -125,18 +109,11 @@ public class kelondroBLOBBuffer extends Thread implements kelondroBLOB { this.backend.close(); } - private void shutdown() { - this.shallRun = false; - if (this.executing && this.shutdownControl != null) { - // wait for semaphore - try {this.shutdownControl.acquire();} catch (InterruptedException e) {} - } - } - private byte[] compress(byte[] b) { // compressed a byte array and adds a leading magic for the compression try { - System.out.print("/"); // DEBUG + cdr++; + //System.out.print("/(" + cdr + ")"); // DEBUG final ByteArrayOutputStream baos = new ByteArrayOutputStream(b.length / 5); baos.write(gzipMagic); final OutputStream os = new GZIPOutputStream(baos, 512); @@ -151,7 +128,7 @@ public class kelondroBLOBBuffer extends Thread implements kelondroBLOB { } private byte[] markWithPlainMagic(byte[] b) { - System.out.print("+"); // DEBUG + //System.out.print("+"); // DEBUG byte[] r = new byte[b.length + 2]; r[0] = plainMagic[0]; r[1] = plainMagic[1]; @@ -162,7 +139,8 @@ public class kelondroBLOBBuffer extends Thread implements kelondroBLOB { private byte[] decompress(byte[] b) { // use a magic in the head of the bytes to identify compression type if (kelondroByteArray.equals(b, gzipMagic)) { - System.out.print("\\"); // DEBUG + //System.out.print("\\"); // DEBUG + cdr--; ByteArrayInputStream bais = new ByteArrayInputStream(b); // eat up the magic bais.read(); @@ -204,18 +182,32 @@ public class kelondroBLOBBuffer extends Thread implements kelondroBLOB { } return null; } - - private byte[] getFromQueues(byte[] key) throws IOException { - byte[] b = (rawQueue == null) ? null : getFromQueue(key, rawQueue); - if (b != null) return b; - b = (compressedQueue == null) ? null : getFromQueue(key, compressedQueue); - if (b != null) return decompress(b); - return null; - } - + public synchronized byte[] get(byte[] key) throws IOException { - byte[] b = getFromQueues(key); - if (b != null) return b; + // depending on the source of the result, we additionally do entry compression + // because if a document was read once, we think that it will not be retrieved another time again soon + byte[] b; + if (this.compressedQueue == null) { + b = getFromQueue(key, rawQueue); + if (b != null) return b; + } else { + b = removeFromQueue(key, rawQueue); + if (b != null) { + // put the entry on the compressed queue + byte[] bb = compress(b); + this.compressedQueue.add(new Entry(key, bb)); + this.queueLength = this.queueLength - b.length + bb.length; + return b; + } + } + // no special handling for elements from the compressed queue + b = (compressedQueue == null) ? null : getFromQueue(key, compressedQueue); + if (b != null) { + //System.out.print("CASEA"); // DEBUG + return decompress(b); + } + + // finally return from the backend b = this.backend.get(key); if (b == null) return null; return decompress(b); @@ -252,10 +244,18 @@ public class kelondroBLOBBuffer extends Thread implements kelondroBLOB { } public synchronized long length(byte[] key) throws IOException { - byte[] b = (rawQueue == null) ? null : getFromQueue(key, rawQueue); + int i = this.contentLength.geti(key); + if (i >= 0) { + //System.out.print("CASEC"); // DEBUG + return (long) i; + } + byte[] b = getFromQueue(key, rawQueue); if (b != null) return b.length; b = (compressedQueue == null) ? null : getFromQueue(key, compressedQueue); - if (b != null) return decompress(b).length; + if (b != null) { + //System.out.print("CASEB"); // DEBUG + return decompress(b).length; + } return this.backend.length(key); } @@ -272,35 +272,48 @@ public class kelondroBLOBBuffer extends Thread implements kelondroBLOB { return null; } - private boolean removeFromQueues(byte[] key) throws IOException { - byte[] b = (rawQueue == null) ? null : removeFromQueue(key, rawQueue); - if (b != null) return true; + private int removeFromQueues(byte[] key) throws IOException { + byte[] b = removeFromQueue(key, rawQueue); + if (b != null) return b.length; b = (compressedQueue == null) ? null : removeFromQueue(key, compressedQueue); - if (b != null) return true; - return false; + if (b != null) return b.length; + return 0; } public synchronized void put(byte[] key, byte[] b) throws IOException { - assert !this.executing || this.compress; // first ensure that the files do not exist anywhere this.backend.remove(key); - boolean existedInQueue = removeFromQueues(key); - if (existedInQueue) this.queueLength -= b.length; // this is only an approximation + long rx = removeFromQueues(key); + if (rx > 0) this.queueLength -= rx; // check if the buffer is full or could be full after this write - if (this.queueLength + b.length * 2 > this.maxCacheSize) flushAll(); + if (this.queueLength + b.length * 2 > this.maxCacheSize) { + // in case that we compress, just compress as much as is necessary to get enough room + if (this.compressedQueue == null) { + flushAll(); + } else { + while (this.queueLength + b.length * 2 > this.maxCacheSize && this.rawQueue.size() > 0) { + flushOneRaw(); + } + // in case that this was not enough, just flush all + if (this.queueLength + b.length * 2 > this.maxCacheSize) flushAll(); + } + } // files are written uncompressed to the uncompressed-queue // they are either written uncompressed to the database - // or compressed with the concurrent thread and written later + // or compressed later this.rawQueue.add(new Entry(key, b)); this.queueLength += b.length; + this.contentLength.puti(key, b.length); + if (this.contentLength.size() > 500) this.contentLength.clear(); // prevent the case that this object becomes a memory leak } public synchronized void remove(byte[] key) throws IOException { this.backend.remove(key); - removeFromQueues(key); + long rx = removeFromQueues(key); + if (rx > 0) this.queueLength -= rx; } public int size() { @@ -317,76 +330,50 @@ public class kelondroBLOBBuffer extends Thread implements kelondroBLOB { return this.backend.keys(up, firstKey); } - private boolean flushOne(boolean block) throws IOException { - if (rawQueue.size() > 0) { - // files are compressed when they arrive and written to the compressed queue - return flushOneRaw(block); - } else { - return flushOneCompressed(block); - } - } - - private boolean flushOneRaw(boolean block) throws IOException { - if (this.rawQueue != null && (block || this.rawQueue.size() > 0)) { - // depening on process case, write it to the file or compress it to the other queue - try { - Map.Entry entry = this.rawQueue.take(); - this.queueLength -= entry.getValue().length; - if (this.compress) { - entry.setValue(compress(entry.getValue())); - this.queueLength += entry.getValue().length; - this.compressedQueue.add(entry); - } else { - this.backend.put(entry.getKey(), markWithPlainMagic(entry.getValue())); - } - return true; - } catch (InterruptedException e) { - return false; + private boolean flushOneRaw() throws IOException { + if (this.rawQueue.size() == 0) return false; + // depending on process case, write it to the file or compress it to the other queue + try { + Map.Entry entry = this.rawQueue.take(); + this.queueLength -= entry.getValue().length; + if (this.compressedQueue != null) { + entry.setValue(compress(entry.getValue())); + this.queueLength += entry.getValue().length; + this.compressedQueue.add(entry); + } else { + this.backend.put(entry.getKey(), markWithPlainMagic(entry.getValue())); + assert this.queueLength == 0; + if (this.rawQueue.size() == 0) this.queueLength = 0; } + return true; + } catch (InterruptedException e) { + return false; } - return false; } - private boolean flushOneCompressed(boolean block) throws IOException { - if (this.compressedQueue != null && (block || this.compressedQueue.size() > 0)) { - // write compressed entry to the file - try { - Map.Entry entry = this.compressedQueue.take(); - this.queueLength -= entry.getValue().length; - this.backend.put(entry.getKey(), entry.getValue()); - return true; - } catch (InterruptedException e) { - return false; - } + private boolean flushOneCompressed() throws IOException { + if (this.compressedQueue == null || this.compressedQueue.size() == 0) return false; + // write compressed entry to the file + try { + //System.out.print("#"); // DEBUG + Map.Entry entry = this.compressedQueue.take(); + this.queueLength -= entry.getValue().length; + this.backend.put(entry.getKey(), entry.getValue()); + if (this.rawQueue.size() == 0 && this.compressedQueue.size() == 0) this.queueLength = 0; + return true; + } catch (InterruptedException e) { + return false; } - return false; } private void flushAll() throws IOException { - while (this.rawQueue.size() > 0 || - (this.compressedQueue != null && this.compressedQueue.size() > 0)) { - if (!flushOne(false)) break; + while (this.rawQueue.size() > 0) { + if (!flushOneRaw()) break; } - this.queueLength = 0; - } - - public void run() { - this.executing = true; - this.shallRun = true; - this.shutdownControl = new Semaphore(1); - assert !this.executing || this.compress; - boolean doneOne; - while (shallRun) { - try { - doneOne = flushOneRaw(true); - assert doneOne; - } catch (IOException e) { - e.printStackTrace(); - break; - } + while (this.compressedQueue != null && this.compressedQueue.size() > 0) { + if (!flushOneCompressed()) break; } - this.executing = false; - this.shutdownControl.release(); + assert this.queueLength == 0; } } diff --git a/source/de/anomic/kelondro/kelondroBLOBHeap.java b/source/de/anomic/kelondro/kelondroBLOBHeap.java index 278124928..862b4ac35 100755 --- a/source/de/anomic/kelondro/kelondroBLOBHeap.java +++ b/source/de/anomic/kelondro/kelondroBLOBHeap.java @@ -182,6 +182,10 @@ public final class kelondroBLOBHeap implements kelondroBLOB { return this.index.size(); } + public kelondroByteOrder ordering() { + return this.ordering; + } + /** * test if a key is in the heap file. This does not need any IO, because it uses only the ram index * @param key @@ -506,7 +510,7 @@ public final class kelondroBLOBHeap implements kelondroBLOB { } private void mergeGaps(final long seek0, final int size0, final long seek1, final int size1) throws IOException { - System.out.println("*** DEBUG-BLOBHeap " + heapFile.getName() + ": merging gap from pos " + seek0 + ", len " + size0 + " with next record of size " + size1 + " (+ 4)"); + //System.out.println("*** DEBUG-BLOBHeap " + heapFile.getName() + ": merging gap from pos " + seek0 + ", len " + size0 + " with next record of size " + size1 + " (+ 4)"); Integer g = this.free.remove(seek1); // g is only used for debugging assert g != null; diff --git a/source/de/anomic/kelondro/kelondroBLOBTree.java b/source/de/anomic/kelondro/kelondroBLOBTree.java index 096ce9304..36537bfa6 100644 --- a/source/de/anomic/kelondro/kelondroBLOBTree.java +++ b/source/de/anomic/kelondro/kelondroBLOBTree.java @@ -118,6 +118,10 @@ public class kelondroBLOBTree implements kelondroBLOB { return this.keylen; } + public kelondroByteOrder ordering() { + return this.rowdef.objectOrder; + } + public synchronized int size() { return index.size(); } diff --git a/source/de/anomic/kelondro/kelondroBytesIntMap.java b/source/de/anomic/kelondro/kelondroBytesIntMap.java index 8ca540fd4..e44fd3633 100644 --- a/source/de/anomic/kelondro/kelondroBytesIntMap.java +++ b/source/de/anomic/kelondro/kelondroBytesIntMap.java @@ -49,6 +49,10 @@ public class kelondroBytesIntMap { return index.row(); } + public void clear() throws IOException { + this.index.clear(); + } + public synchronized boolean has(final byte[] key) { assert (key != null); return index.has(key); diff --git a/source/de/anomic/plasma/plasmaHTCache.java b/source/de/anomic/plasma/plasmaHTCache.java index 9dfa313db..843f991b7 100644 --- a/source/de/anomic/plasma/plasmaHTCache.java +++ b/source/de/anomic/plasma/plasmaHTCache.java @@ -136,7 +136,6 @@ public final class plasmaHTCache { fileDBunbuffered = new kelondroBLOBArray(new File(cachePath, FILE_DB_NAME), 12, kelondroBase64Order.enhancedCoder); fileDBunbuffered.setMaxSize(maxCacheSize); fileDB = new kelondroBLOBBuffer(fileDBunbuffered, 2 * 1024 * 1024, true); - //fileDB.start(); } catch (IOException e) { e.printStackTrace(); }