From 2f181d0027e31c61435e321a1d709a359ca3c1cf Mon Sep 17 00:00:00 2001 From: orbiter Date: Tue, 13 Apr 2010 16:22:09 +0000 Subject: [PATCH] introduced concurrency in HTCACHE storage compression git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@6806 6c8d7289-2bf4-0310-a012-ef5d649a1542 --- source/net/yacy/kelondro/blob/Compressor.java | 113 +++++++++++++++--- 1 file changed, 98 insertions(+), 15 deletions(-) diff --git a/source/net/yacy/kelondro/blob/Compressor.java b/source/net/yacy/kelondro/blob/Compressor.java index 388a413aa..ebc024529 100644 --- a/source/net/yacy/kelondro/blob/Compressor.java +++ b/source/net/yacy/kelondro/blob/Compressor.java @@ -32,8 +32,10 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; @@ -50,27 +52,101 @@ public class Compressor implements BLOB { static byte[] plainMagic = {(byte) 'p', (byte) '|'}; // magic for plain content (no encoding) private final BLOB backend; - private HashMap buffer; // entries which are not yet compressed, format is RAW (without magic) + private LinkedHashMap buffer; // entries which are not yet compressed, format is RAW (without magic) + private BlockingQueue writeQueue; private long bufferlength; private final long maxbufferlength; + private final Worker[] worker; public Compressor(BLOB backend, long buffersize) { this.backend = backend; this.maxbufferlength = buffersize; + this.writeQueue = new LinkedBlockingQueue(); + this.worker = new Worker[Math.min(4, Runtime.getRuntime().availableProcessors())]; + for (int i = 0; i < this.worker.length; i++) { + this.worker[i] = new Worker(); + this.worker[i].start(); + } initBuffer(); } + private static class Entity implements Map.Entry { + private String key; + private byte[] payload; + public Entity(String key, byte[] payload) { + this.key = key; + this.payload = payload; + } + @Override + public String getKey() { + return this.key; + } + + @Override + public byte[] getValue() { + return this.payload; + } + + @Override + public byte[] setValue(byte[] payload) { + byte[] payload0 = payload; + this.payload = payload; + return payload0; + } + } + + private final static Entity poisonWorkerEntry = new Entity("poison", null); + + private class Worker extends Thread { + public Worker() { + } + public void run() { + Entity entry; + try { + while ((entry = writeQueue.take()) != poisonWorkerEntry) { + try { + Compressor.this.backend.put(entry.getKey().getBytes(), compress(entry.getValue())); + } catch (RowSpaceExceededException e) { + Log.logException(e); + buffer.put(entry.getKey(), entry.getValue()); + } catch (IOException e) { + Log.logException(e); + buffer.put(entry.getKey(), entry.getValue()); + } + } + } catch (InterruptedException e) { + Log.logException(e); + } + } + } + public String name() { return this.backend.name(); } public synchronized void clear() throws IOException { initBuffer(); + this.writeQueue.clear(); this.backend.clear(); } private void initBuffer() { - this.buffer = new HashMap(); + this.buffer = new LinkedHashMap(100, 0.1f, false) { + private static final long serialVersionUID = 1L; + @Override + protected boolean removeEldestEntry(final Map.Entry eldest) { + if (size() > 100) { + try { + Compressor.this.writeQueue.put(new Entity(eldest.getKey(), eldest.getValue())); + } catch (InterruptedException e) { + Log.logException(e); + } + return true; + } else { + return false; + } + } + }; this.bufferlength = 0; } @@ -85,6 +161,16 @@ public class Compressor implements BLOB { } catch (IOException e) { Log.logSevere("Compressor", "", e); } + for (int i = 0; i < this.worker.length; i++) try { + this.writeQueue.put(poisonWorkerEntry); + } catch (InterruptedException e) { + Log.logException(e); + } + for (int i = 0; i < this.worker.length; i++) try { + this.worker[i].join(); + } catch (InterruptedException e) { + Log.logException(e); + } this.backend.close(writeIDX); } @@ -165,14 +251,13 @@ public class Compressor implements BLOB { byte[] b = buffer.remove(new String(key)); if (b != null) { // compress the entry now and put it to the backend - byte[] bb = compress(b); try { - this.backend.put(key, bb); + this.writeQueue.put(new Entity(new String(key), b)); this.bufferlength = this.bufferlength - b.length; return b; - } catch (RowSpaceExceededException e) { + } catch (InterruptedException e) { + Log.logException(e); buffer.put(new String(key), b); - return b; } } @@ -273,16 +358,14 @@ public class Compressor implements BLOB { // depending on process case, write it to the file or compress it to the other queue Map.Entry entry = this.buffer.entrySet().iterator().next(); this.buffer.remove(entry.getKey()); - byte[] b = entry.getValue(); - this.bufferlength -= b.length; - byte[] bb = compress(b); try { - this.backend.put(entry.getKey().getBytes(), bb); - } catch (RowSpaceExceededException e) { - this.buffer.put(entry.getKey(), b); - throw e; + this.writeQueue.put(new Entity(entry.getKey(), entry.getValue())); + this.bufferlength -= entry.getValue().length; + return true; + } catch (InterruptedException e) { + this.buffer.put(entry.getKey(), entry.getValue()); + return false; } - return true; } private void flushAll() throws IOException {