introduced concurrency in HTCACHE storage compression

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@6806 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
orbiter 15 years ago
parent 2e26744f4e
commit 2f181d0027

@ -32,8 +32,10 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
@ -50,27 +52,101 @@ public class Compressor implements BLOB {
static byte[] plainMagic = {(byte) 'p', (byte) '|'}; // magic for plain content (no encoding)
private final BLOB backend;
private HashMap<String, byte[]> buffer; // entries which are not yet compressed, format is RAW (without magic)
private LinkedHashMap<String, byte[]> buffer; // entries which are not yet compressed, format is RAW (without magic)
private BlockingQueue<Entity> writeQueue;
private long bufferlength;
private final long maxbufferlength;
private final Worker[] worker;
public Compressor(BLOB backend, long buffersize) {
this.backend = backend;
this.maxbufferlength = buffersize;
this.writeQueue = new LinkedBlockingQueue<Entity>();
this.worker = new Worker[Math.min(4, Runtime.getRuntime().availableProcessors())];
for (int i = 0; i < this.worker.length; i++) {
this.worker[i] = new Worker();
this.worker[i].start();
}
initBuffer();
}
private static class Entity implements Map.Entry<String, byte[]> {
private String key;
private byte[] payload;
public Entity(String key, byte[] payload) {
this.key = key;
this.payload = payload;
}
@Override
public String getKey() {
return this.key;
}
@Override
public byte[] getValue() {
return this.payload;
}
@Override
public byte[] setValue(byte[] payload) {
byte[] payload0 = payload;
this.payload = payload;
return payload0;
}
}
private final static Entity poisonWorkerEntry = new Entity("poison", null);
private class Worker extends Thread {
public Worker() {
}
public void run() {
Entity entry;
try {
while ((entry = writeQueue.take()) != poisonWorkerEntry) {
try {
Compressor.this.backend.put(entry.getKey().getBytes(), compress(entry.getValue()));
} catch (RowSpaceExceededException e) {
Log.logException(e);
buffer.put(entry.getKey(), entry.getValue());
} catch (IOException e) {
Log.logException(e);
buffer.put(entry.getKey(), entry.getValue());
}
}
} catch (InterruptedException e) {
Log.logException(e);
}
}
}
public String name() {
return this.backend.name();
}
public synchronized void clear() throws IOException {
initBuffer();
this.writeQueue.clear();
this.backend.clear();
}
private void initBuffer() {
this.buffer = new HashMap<String, byte[]>();
this.buffer = new LinkedHashMap<String, byte[]>(100, 0.1f, false) {
private static final long serialVersionUID = 1L;
@Override
protected boolean removeEldestEntry(final Map.Entry<String, byte[]> eldest) {
if (size() > 100) {
try {
Compressor.this.writeQueue.put(new Entity(eldest.getKey(), eldest.getValue()));
} catch (InterruptedException e) {
Log.logException(e);
}
return true;
} else {
return false;
}
}
};
this.bufferlength = 0;
}
@ -85,6 +161,16 @@ public class Compressor implements BLOB {
} catch (IOException e) {
Log.logSevere("Compressor", "", e);
}
for (int i = 0; i < this.worker.length; i++) try {
this.writeQueue.put(poisonWorkerEntry);
} catch (InterruptedException e) {
Log.logException(e);
}
for (int i = 0; i < this.worker.length; i++) try {
this.worker[i].join();
} catch (InterruptedException e) {
Log.logException(e);
}
this.backend.close(writeIDX);
}
@ -165,14 +251,13 @@ public class Compressor implements BLOB {
byte[] b = buffer.remove(new String(key));
if (b != null) {
// compress the entry now and put it to the backend
byte[] bb = compress(b);
try {
this.backend.put(key, bb);
this.writeQueue.put(new Entity(new String(key), b));
this.bufferlength = this.bufferlength - b.length;
return b;
} catch (RowSpaceExceededException e) {
} catch (InterruptedException e) {
Log.logException(e);
buffer.put(new String(key), b);
return b;
}
}
@ -273,16 +358,14 @@ public class Compressor implements BLOB {
// depending on process case, write it to the file or compress it to the other queue
Map.Entry<String, byte[]> entry = this.buffer.entrySet().iterator().next();
this.buffer.remove(entry.getKey());
byte[] b = entry.getValue();
this.bufferlength -= b.length;
byte[] bb = compress(b);
try {
this.backend.put(entry.getKey().getBytes(), bb);
} catch (RowSpaceExceededException e) {
this.buffer.put(entry.getKey(), b);
throw e;
this.writeQueue.put(new Entity(entry.getKey(), entry.getValue()));
this.bufferlength -= entry.getValue().length;
return true;
} catch (InterruptedException e) {
this.buffer.put(entry.getKey(), entry.getValue());
return false;
}
return true;
}
private void flushAll() throws IOException {

Loading…
Cancel
Save