- added a write buffer to BLOBHeap - modified the BLOBBuffer (is now only to buffer non-compressed content) - added content compression to the HTCache The new read cache will decrease the start/initialization time of BLOB files, like the HTCache, RobotsTxt and other BLOBHeap structures. git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@5386 6c8d7289-2bf4-0310-a012-ef5d649a1542pull/1/head
parent
e1acdb952c
commit
1779c3c507
@ -1,383 +0,0 @@
|
|||||||
// kelondroBLOBBuffer.java
|
|
||||||
// (C) 2008 by Michael Peter Christen; mc@yacy.net, Frankfurt a. M., Germany
|
|
||||||
// first published 17.10.2008 on http://yacy.net
|
|
||||||
//
|
|
||||||
// This is a part of YaCy, a peer-to-peer based web search engine
|
|
||||||
//
|
|
||||||
// $LastChangedDate: 2006-04-02 22:40:07 +0200 (So, 02 Apr 2006) $
|
|
||||||
// $LastChangedRevision: 1986 $
|
|
||||||
// $LastChangedBy: orbiter $
|
|
||||||
//
|
|
||||||
// LICENSE
|
|
||||||
//
|
|
||||||
// This program is free software; you can redistribute it and/or modify
|
|
||||||
// it under the terms of the GNU General Public License as published by
|
|
||||||
// the Free Software Foundation; either version 2 of the License, or
|
|
||||||
// (at your option) any later version.
|
|
||||||
//
|
|
||||||
// This program is distributed in the hope that it will be useful,
|
|
||||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
// GNU General Public License for more details.
|
|
||||||
//
|
|
||||||
// You should have received a copy of the GNU General Public License
|
|
||||||
// along with this program; if not, write to the Free Software
|
|
||||||
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
|
||||||
|
|
||||||
|
|
||||||
package de.anomic.kelondro;
|
|
||||||
|
|
||||||
import java.io.ByteArrayInputStream;
|
|
||||||
import java.io.ByteArrayOutputStream;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.InputStream;
|
|
||||||
import java.io.OutputStream;
|
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
|
||||||
import java.util.zip.GZIPInputStream;
|
|
||||||
import java.util.zip.GZIPOutputStream;
|
|
||||||
|
|
||||||
public class kelondroBLOBBuffer extends Thread implements kelondroBLOB {
|
|
||||||
|
|
||||||
static byte[] gzipMagic = {(byte) 'z', (byte) '|'}; // magic for gzip-encoded content
|
|
||||||
static byte[] plainMagic = {(byte) 'p', (byte) '|'}; // magic for plain content (no encoding)
|
|
||||||
|
|
||||||
private kelondroBLOB backend;
|
|
||||||
private LinkedBlockingQueue<Map.Entry<byte[], byte[]>> rawQueue; // entries which are not compressed, format is RAW (without magic)
|
|
||||||
private LinkedBlockingQueue<Map.Entry<byte[], byte[]>> compressedQueue; // entries which are compressed, format is with leading magic
|
|
||||||
private kelondroBytesIntMap contentLength;
|
|
||||||
private long queueLength;
|
|
||||||
private long maxCacheSize;
|
|
||||||
private int cdr;
|
|
||||||
|
|
||||||
private class Entry implements Map.Entry<byte[], byte[]> {
|
|
||||||
|
|
||||||
byte[] key, value;
|
|
||||||
|
|
||||||
public Entry(byte[] key, byte[] value) {
|
|
||||||
this.key = key;
|
|
||||||
this.value = value;
|
|
||||||
}
|
|
||||||
|
|
||||||
public byte[] getKey() {
|
|
||||||
return this.key;
|
|
||||||
}
|
|
||||||
|
|
||||||
public byte[] getValue() {
|
|
||||||
return this.value;
|
|
||||||
}
|
|
||||||
|
|
||||||
public byte[] setValue(byte[] value) {
|
|
||||||
byte[] b = this.value;
|
|
||||||
this.value = value;
|
|
||||||
return b;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
public kelondroBLOBBuffer(kelondroBLOB backend, long cachesize, boolean compress) {
|
|
||||||
this.backend = backend;
|
|
||||||
this.maxCacheSize = cachesize;
|
|
||||||
cdr = 0;
|
|
||||||
initQueues(compress);
|
|
||||||
}
|
|
||||||
|
|
||||||
public String name() {
|
|
||||||
return this.backend.name();
|
|
||||||
}
|
|
||||||
|
|
||||||
public synchronized void clear() throws IOException {
|
|
||||||
initQueues(this.compressedQueue != null);
|
|
||||||
this.backend.clear();
|
|
||||||
}
|
|
||||||
|
|
||||||
private void initQueues(boolean compress) {
|
|
||||||
this.rawQueue = new LinkedBlockingQueue<Map.Entry<byte[], byte[]>>();
|
|
||||||
this.compressedQueue = (compress) ? new LinkedBlockingQueue<Map.Entry<byte[], byte[]>>() : null;
|
|
||||||
this.contentLength = new kelondroBytesIntMap(backend.keylength(), backend.ordering(), 500);
|
|
||||||
this.queueLength = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
public kelondroByteOrder ordering() {
|
|
||||||
return this.backend.ordering();
|
|
||||||
}
|
|
||||||
|
|
||||||
public synchronized void close() {
|
|
||||||
// no more thread is running, flush all queues
|
|
||||||
try {
|
|
||||||
flushAll();
|
|
||||||
} catch (IOException e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
}
|
|
||||||
this.backend.close();
|
|
||||||
}
|
|
||||||
|
|
||||||
private byte[] compress(byte[] b) {
|
|
||||||
// compressed a byte array and adds a leading magic for the compression
|
|
||||||
try {
|
|
||||||
cdr++;
|
|
||||||
//System.out.print("/(" + cdr + ")"); // DEBUG
|
|
||||||
final ByteArrayOutputStream baos = new ByteArrayOutputStream(b.length / 5);
|
|
||||||
baos.write(gzipMagic);
|
|
||||||
final OutputStream os = new GZIPOutputStream(baos, 512);
|
|
||||||
os.write(b);
|
|
||||||
os.close();
|
|
||||||
baos.close();
|
|
||||||
return baos.toByteArray();
|
|
||||||
} catch (IOException e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private byte[] markWithPlainMagic(byte[] b) {
|
|
||||||
//System.out.print("+"); // DEBUG
|
|
||||||
byte[] r = new byte[b.length + 2];
|
|
||||||
r[0] = plainMagic[0];
|
|
||||||
r[1] = plainMagic[1];
|
|
||||||
System.arraycopy(b, 0, r, 2, b.length);
|
|
||||||
return r;
|
|
||||||
}
|
|
||||||
|
|
||||||
private byte[] decompress(byte[] b) {
|
|
||||||
// use a magic in the head of the bytes to identify compression type
|
|
||||||
if (kelondroByteArray.equals(b, gzipMagic)) {
|
|
||||||
//System.out.print("\\"); // DEBUG
|
|
||||||
cdr--;
|
|
||||||
ByteArrayInputStream bais = new ByteArrayInputStream(b);
|
|
||||||
// eat up the magic
|
|
||||||
bais.read();
|
|
||||||
bais.read();
|
|
||||||
// decompress what is remaining
|
|
||||||
InputStream gis;
|
|
||||||
try {
|
|
||||||
gis = new GZIPInputStream(bais);
|
|
||||||
final ByteArrayOutputStream baos = new ByteArrayOutputStream(b.length);
|
|
||||||
final byte[] buf = new byte[1024];
|
|
||||||
int n;
|
|
||||||
while ((n = gis.read(buf)) > 0) baos.write(buf, 0, n);
|
|
||||||
gis.close();
|
|
||||||
bais.close();
|
|
||||||
baos.close();
|
|
||||||
|
|
||||||
return baos.toByteArray();
|
|
||||||
} catch (IOException e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
} else if (kelondroByteArray.equals(b, plainMagic)) {
|
|
||||||
System.out.print("-"); // DEBUG
|
|
||||||
byte[] r = new byte[b.length - 2];
|
|
||||||
System.arraycopy(b, 2, r, 0, b.length - 2);
|
|
||||||
return r;
|
|
||||||
} else {
|
|
||||||
// we consider that the entry is also plain, but without leading magic
|
|
||||||
return b;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private byte[] getFromQueue(byte[] key, LinkedBlockingQueue<Map.Entry<byte[], byte[]>> queue) {
|
|
||||||
Iterator<Map.Entry<byte[], byte[]>> i = queue.iterator();
|
|
||||||
Map.Entry<byte[], byte[]> e;
|
|
||||||
while (i.hasNext()) {
|
|
||||||
e = i.next();
|
|
||||||
if (kelondroByteArray.equals(key, e.getKey())) return e.getValue();
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
public synchronized byte[] get(byte[] key) throws IOException {
|
|
||||||
// depending on the source of the result, we additionally do entry compression
|
|
||||||
// because if a document was read once, we think that it will not be retrieved another time again soon
|
|
||||||
byte[] b;
|
|
||||||
if (this.compressedQueue == null) {
|
|
||||||
b = getFromQueue(key, rawQueue);
|
|
||||||
if (b != null) return b;
|
|
||||||
} else {
|
|
||||||
b = removeFromQueue(key, rawQueue);
|
|
||||||
if (b != null) {
|
|
||||||
// put the entry on the compressed queue
|
|
||||||
byte[] bb = compress(b);
|
|
||||||
this.compressedQueue.add(new Entry(key, bb));
|
|
||||||
this.queueLength = this.queueLength - b.length + bb.length;
|
|
||||||
return b;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// no special handling for elements from the compressed queue
|
|
||||||
b = (compressedQueue == null) ? null : getFromQueue(key, compressedQueue);
|
|
||||||
if (b != null) {
|
|
||||||
//System.out.print("CASEA"); // DEBUG
|
|
||||||
return decompress(b);
|
|
||||||
}
|
|
||||||
|
|
||||||
// finally return from the backend
|
|
||||||
b = this.backend.get(key);
|
|
||||||
if (b == null) return null;
|
|
||||||
return decompress(b);
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean hasInQueue(byte[] key, LinkedBlockingQueue<Map.Entry<byte[], byte[]>> queue) {
|
|
||||||
Iterator<Map.Entry<byte[], byte[]>> i = queue.iterator();
|
|
||||||
Map.Entry<byte[], byte[]> e;
|
|
||||||
while (i.hasNext()) {
|
|
||||||
e = i.next();
|
|
||||||
if (kelondroByteArray.equals(key, e.getKey())) return true;
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
public synchronized boolean has(byte[] key) throws IOException {
|
|
||||||
return
|
|
||||||
(rawQueue != null && hasInQueue(key, rawQueue)) ||
|
|
||||||
(compressedQueue != null && hasInQueue(key, compressedQueue)) ||
|
|
||||||
this.backend.has(key);
|
|
||||||
}
|
|
||||||
|
|
||||||
public int keylength() {
|
|
||||||
return this.backend.keylength();
|
|
||||||
}
|
|
||||||
|
|
||||||
public synchronized long length() {
|
|
||||||
try {
|
|
||||||
return this.backend.length() + this.queueLength;
|
|
||||||
} catch (IOException e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public synchronized long length(byte[] key) throws IOException {
|
|
||||||
int i = this.contentLength.geti(key);
|
|
||||||
if (i >= 0) {
|
|
||||||
//System.out.print("CASEC"); // DEBUG
|
|
||||||
return (long) i;
|
|
||||||
}
|
|
||||||
byte[] b = getFromQueue(key, rawQueue);
|
|
||||||
if (b != null) return b.length;
|
|
||||||
b = (compressedQueue == null) ? null : getFromQueue(key, compressedQueue);
|
|
||||||
if (b != null) {
|
|
||||||
//System.out.print("CASEB"); // DEBUG
|
|
||||||
return decompress(b).length;
|
|
||||||
}
|
|
||||||
return this.backend.length(key);
|
|
||||||
}
|
|
||||||
|
|
||||||
private byte[] removeFromQueue(byte[] key, LinkedBlockingQueue<Map.Entry<byte[], byte[]>> queue) {
|
|
||||||
Iterator<Map.Entry<byte[], byte[]>> i = queue.iterator();
|
|
||||||
Map.Entry<byte[], byte[]> e;
|
|
||||||
while (i.hasNext()) {
|
|
||||||
e = i.next();
|
|
||||||
if (kelondroByteArray.equals(key, e.getKey())) {
|
|
||||||
i.remove();
|
|
||||||
return e.getValue();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
private int removeFromQueues(byte[] key) throws IOException {
|
|
||||||
byte[] b = removeFromQueue(key, rawQueue);
|
|
||||||
if (b != null) return b.length;
|
|
||||||
b = (compressedQueue == null) ? null : removeFromQueue(key, compressedQueue);
|
|
||||||
if (b != null) return b.length;
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
public synchronized void put(byte[] key, byte[] b) throws IOException {
|
|
||||||
|
|
||||||
// first ensure that the files do not exist anywhere
|
|
||||||
this.backend.remove(key);
|
|
||||||
long rx = removeFromQueues(key);
|
|
||||||
if (rx > 0) this.queueLength -= rx;
|
|
||||||
|
|
||||||
// check if the buffer is full or could be full after this write
|
|
||||||
if (this.queueLength + b.length * 2 > this.maxCacheSize) {
|
|
||||||
// in case that we compress, just compress as much as is necessary to get enough room
|
|
||||||
if (this.compressedQueue == null) {
|
|
||||||
flushAll();
|
|
||||||
} else {
|
|
||||||
while (this.queueLength + b.length * 2 > this.maxCacheSize && this.rawQueue.size() > 0) {
|
|
||||||
flushOneRaw();
|
|
||||||
}
|
|
||||||
// in case that this was not enough, just flush all
|
|
||||||
if (this.queueLength + b.length * 2 > this.maxCacheSize) flushAll();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// files are written uncompressed to the uncompressed-queue
|
|
||||||
// they are either written uncompressed to the database
|
|
||||||
// or compressed later
|
|
||||||
this.rawQueue.add(new Entry(key, b));
|
|
||||||
this.queueLength += b.length;
|
|
||||||
this.contentLength.puti(key, b.length);
|
|
||||||
if (this.contentLength.size() > 500) this.contentLength.clear(); // prevent the case that this object becomes a memory leak
|
|
||||||
}
|
|
||||||
|
|
||||||
public synchronized void remove(byte[] key) throws IOException {
|
|
||||||
this.backend.remove(key);
|
|
||||||
long rx = removeFromQueues(key);
|
|
||||||
if (rx > 0) this.queueLength -= rx;
|
|
||||||
}
|
|
||||||
|
|
||||||
public int size() {
|
|
||||||
return this.backend.size();
|
|
||||||
}
|
|
||||||
|
|
||||||
public synchronized kelondroCloneableIterator<byte[]> keys(boolean up, boolean rotating) throws IOException {
|
|
||||||
flushAll();
|
|
||||||
return this.backend.keys(up, rotating);
|
|
||||||
}
|
|
||||||
|
|
||||||
public synchronized kelondroCloneableIterator<byte[]> keys(boolean up, byte[] firstKey) throws IOException {
|
|
||||||
flushAll();
|
|
||||||
return this.backend.keys(up, firstKey);
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean flushOneRaw() throws IOException {
|
|
||||||
if (this.rawQueue.size() == 0) return false;
|
|
||||||
// depending on process case, write it to the file or compress it to the other queue
|
|
||||||
try {
|
|
||||||
Map.Entry<byte[], byte[]> entry = this.rawQueue.take();
|
|
||||||
this.queueLength -= entry.getValue().length;
|
|
||||||
if (this.compressedQueue != null) {
|
|
||||||
entry.setValue(compress(entry.getValue()));
|
|
||||||
this.queueLength += entry.getValue().length;
|
|
||||||
this.compressedQueue.add(entry);
|
|
||||||
} else {
|
|
||||||
this.backend.put(entry.getKey(), markWithPlainMagic(entry.getValue()));
|
|
||||||
assert this.queueLength == 0;
|
|
||||||
if (this.rawQueue.size() == 0) this.queueLength = 0;
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean flushOneCompressed() throws IOException {
|
|
||||||
if (this.compressedQueue == null || this.compressedQueue.size() == 0) return false;
|
|
||||||
// write compressed entry to the file
|
|
||||||
try {
|
|
||||||
//System.out.print("#"); // DEBUG
|
|
||||||
Map.Entry<byte[], byte[]> entry = this.compressedQueue.take();
|
|
||||||
this.queueLength -= entry.getValue().length;
|
|
||||||
this.backend.put(entry.getKey(), entry.getValue());
|
|
||||||
if (this.rawQueue.size() == 0 && this.compressedQueue.size() == 0) this.queueLength = 0;
|
|
||||||
return true;
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void flushAll() throws IOException {
|
|
||||||
while (this.rawQueue.size() > 0) {
|
|
||||||
if (!flushOneRaw()) break;
|
|
||||||
}
|
|
||||||
while (this.compressedQueue != null && this.compressedQueue.size() > 0) {
|
|
||||||
if (!flushOneCompressed()) break;
|
|
||||||
}
|
|
||||||
assert this.queueLength == 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
@ -0,0 +1,267 @@
|
|||||||
|
// kelondroBLOBCompressor.java
|
||||||
|
// (C) 2008 by Michael Peter Christen; mc@yacy.net, Frankfurt a. M., Germany
|
||||||
|
// first published 17.10.2008 on http://yacy.net
|
||||||
|
//
|
||||||
|
// This is a part of YaCy, a peer-to-peer based web search engine
|
||||||
|
//
|
||||||
|
// $LastChangedDate: 2006-04-02 22:40:07 +0200 (So, 02 Apr 2006) $
|
||||||
|
// $LastChangedRevision: 1986 $
|
||||||
|
// $LastChangedBy: orbiter $
|
||||||
|
//
|
||||||
|
// LICENSE
|
||||||
|
//
|
||||||
|
// This program is free software; you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU General Public License as published by
|
||||||
|
// the Free Software Foundation; either version 2 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
//
|
||||||
|
// This program is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU General Public License for more details.
|
||||||
|
//
|
||||||
|
// You should have received a copy of the GNU General Public License
|
||||||
|
// along with this program; if not, write to the Free Software
|
||||||
|
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
||||||
|
|
||||||
|
|
||||||
|
package de.anomic.kelondro;
|
||||||
|
|
||||||
|
import java.io.ByteArrayInputStream;
|
||||||
|
import java.io.ByteArrayOutputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.zip.GZIPInputStream;
|
||||||
|
import java.util.zip.GZIPOutputStream;
|
||||||
|
|
||||||
|
public class kelondroBLOBCompressor extends Thread implements kelondroBLOB {
|
||||||
|
|
||||||
|
static byte[] gzipMagic = {(byte) 'z', (byte) '|'}; // magic for gzip-encoded content
|
||||||
|
static byte[] plainMagic = {(byte) 'p', (byte) '|'}; // magic for plain content (no encoding)
|
||||||
|
|
||||||
|
private kelondroBLOB backend;
|
||||||
|
private HashMap<String, byte[]> buffer; // entries which are not yet compressed, format is RAW (without magic)
|
||||||
|
private long bufferlength;
|
||||||
|
private long maxbufferlength;
|
||||||
|
private int cdr;
|
||||||
|
|
||||||
|
public kelondroBLOBCompressor(kelondroBLOB backend, long buffersize) {
|
||||||
|
this.backend = backend;
|
||||||
|
this.maxbufferlength = buffersize;
|
||||||
|
this.cdr = 0;
|
||||||
|
initBuffer();
|
||||||
|
}
|
||||||
|
|
||||||
|
public String name() {
|
||||||
|
return this.backend.name();
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized void clear() throws IOException {
|
||||||
|
initBuffer();
|
||||||
|
this.backend.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void initBuffer() {
|
||||||
|
this.buffer = new HashMap<String, byte[]>();
|
||||||
|
this.bufferlength = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
public kelondroByteOrder ordering() {
|
||||||
|
return this.backend.ordering();
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized void close() {
|
||||||
|
// no more thread is running, flush all queues
|
||||||
|
try {
|
||||||
|
flushAll();
|
||||||
|
} catch (IOException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
this.backend.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
private byte[] compress(byte[] b) {
|
||||||
|
int l = b.length;
|
||||||
|
if (l < 100) return markWithPlainMagic(b);
|
||||||
|
byte[] bb = compressAddMagic(b);
|
||||||
|
if (bb.length >= l) return markWithPlainMagic(b);
|
||||||
|
return bb;
|
||||||
|
}
|
||||||
|
|
||||||
|
private byte[] compressAddMagic(byte[] b) {
|
||||||
|
// compress a byte array and add a leading magic for the compression
|
||||||
|
try {
|
||||||
|
cdr++;
|
||||||
|
//System.out.print("/(" + cdr + ")"); // DEBUG
|
||||||
|
final ByteArrayOutputStream baos = new ByteArrayOutputStream(b.length / 5);
|
||||||
|
baos.write(gzipMagic);
|
||||||
|
final OutputStream os = new GZIPOutputStream(baos, 512);
|
||||||
|
os.write(b);
|
||||||
|
os.close();
|
||||||
|
baos.close();
|
||||||
|
return baos.toByteArray();
|
||||||
|
} catch (IOException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private byte[] markWithPlainMagic(byte[] b) {
|
||||||
|
//System.out.print("+"); // DEBUG
|
||||||
|
byte[] r = new byte[b.length + 2];
|
||||||
|
r[0] = plainMagic[0];
|
||||||
|
r[1] = plainMagic[1];
|
||||||
|
System.arraycopy(b, 0, r, 2, b.length);
|
||||||
|
return r;
|
||||||
|
}
|
||||||
|
|
||||||
|
private byte[] decompress(byte[] b) {
|
||||||
|
// use a magic in the head of the bytes to identify compression type
|
||||||
|
if (kelondroByteArray.equals(b, gzipMagic)) {
|
||||||
|
//System.out.print("\\"); // DEBUG
|
||||||
|
cdr--;
|
||||||
|
ByteArrayInputStream bais = new ByteArrayInputStream(b);
|
||||||
|
// eat up the magic
|
||||||
|
bais.read();
|
||||||
|
bais.read();
|
||||||
|
// decompress what is remaining
|
||||||
|
InputStream gis;
|
||||||
|
try {
|
||||||
|
gis = new GZIPInputStream(bais);
|
||||||
|
final ByteArrayOutputStream baos = new ByteArrayOutputStream(b.length);
|
||||||
|
final byte[] buf = new byte[1024];
|
||||||
|
int n;
|
||||||
|
while ((n = gis.read(buf)) > 0) baos.write(buf, 0, n);
|
||||||
|
gis.close();
|
||||||
|
bais.close();
|
||||||
|
baos.close();
|
||||||
|
|
||||||
|
return baos.toByteArray();
|
||||||
|
} catch (IOException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
} else if (kelondroByteArray.equals(b, plainMagic)) {
|
||||||
|
System.out.print("-"); // DEBUG
|
||||||
|
byte[] r = new byte[b.length - 2];
|
||||||
|
System.arraycopy(b, 2, r, 0, b.length - 2);
|
||||||
|
return r;
|
||||||
|
} else {
|
||||||
|
// we consider that the entry is also plain, but without leading magic
|
||||||
|
return b;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized byte[] get(byte[] key) throws IOException {
|
||||||
|
// depending on the source of the result, we additionally do entry compression
|
||||||
|
// because if a document was read once, we think that it will not be retrieved another time again soon
|
||||||
|
byte[] b = buffer.remove(new String(key));
|
||||||
|
if (b != null) {
|
||||||
|
// compress the entry now and put it to the backend
|
||||||
|
byte[] bb = compress(b);
|
||||||
|
this.backend.put(key, bb);
|
||||||
|
this.bufferlength = this.bufferlength - b.length;
|
||||||
|
return b;
|
||||||
|
}
|
||||||
|
|
||||||
|
// return from the backend
|
||||||
|
b = this.backend.get(key);
|
||||||
|
if (b == null) return null;
|
||||||
|
return decompress(b);
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized boolean has(byte[] key) throws IOException {
|
||||||
|
return
|
||||||
|
this.buffer.containsKey(new String(key)) || this.backend.has(key);
|
||||||
|
}
|
||||||
|
|
||||||
|
public int keylength() {
|
||||||
|
return this.backend.keylength();
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized long length() {
|
||||||
|
try {
|
||||||
|
return this.backend.length() + this.bufferlength;
|
||||||
|
} catch (IOException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized long length(byte[] key) throws IOException {
|
||||||
|
byte[] b = buffer.get(new String(key));
|
||||||
|
if (b != null) return b.length;
|
||||||
|
return decompress(this.backend.get(key)).length;
|
||||||
|
}
|
||||||
|
|
||||||
|
private int removeFromQueues(byte[] key) throws IOException {
|
||||||
|
byte[] b = buffer.remove(new String(key));
|
||||||
|
if (b != null) return b.length;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized void put(byte[] key, byte[] b) throws IOException {
|
||||||
|
|
||||||
|
// first ensure that the files do not exist anywhere
|
||||||
|
remove(key);
|
||||||
|
|
||||||
|
// check if the buffer is full or could be full after this write
|
||||||
|
if (this.bufferlength + b.length * 2 > this.maxbufferlength) {
|
||||||
|
// in case that we compress, just compress as much as is necessary to get enough room
|
||||||
|
while (this.bufferlength + b.length * 2 > this.maxbufferlength && this.buffer.size() > 0) {
|
||||||
|
flushOne();
|
||||||
|
}
|
||||||
|
// in case that this was not enough, just flush all
|
||||||
|
if (this.bufferlength + b.length * 2 > this.maxbufferlength) flushAll();
|
||||||
|
}
|
||||||
|
|
||||||
|
// files are written uncompressed to the uncompressed-queue
|
||||||
|
// they are either written uncompressed to the database
|
||||||
|
// or compressed later
|
||||||
|
this.buffer.put(new String(key), b);
|
||||||
|
this.bufferlength += b.length;
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized void remove(byte[] key) throws IOException {
|
||||||
|
this.backend.remove(key);
|
||||||
|
long rx = removeFromQueues(key);
|
||||||
|
if (rx > 0) this.bufferlength -= rx;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int size() {
|
||||||
|
return this.backend.size() + this.buffer.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized kelondroCloneableIterator<byte[]> keys(boolean up, boolean rotating) throws IOException {
|
||||||
|
flushAll();
|
||||||
|
return this.backend.keys(up, rotating);
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized kelondroCloneableIterator<byte[]> keys(boolean up, byte[] firstKey) throws IOException {
|
||||||
|
flushAll();
|
||||||
|
return this.backend.keys(up, firstKey);
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean flushOne() throws IOException {
|
||||||
|
if (this.buffer.size() == 0) return false;
|
||||||
|
// depending on process case, write it to the file or compress it to the other queue
|
||||||
|
Map.Entry<String, byte[]> entry = this.buffer.entrySet().iterator().next();
|
||||||
|
this.buffer.remove(entry.getKey());
|
||||||
|
byte[] b = entry.getValue();
|
||||||
|
this.bufferlength -= b.length;
|
||||||
|
b = compress(b);
|
||||||
|
this.backend.put(entry.getKey().getBytes(), b);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void flushAll() throws IOException {
|
||||||
|
while (this.buffer.size() > 0) {
|
||||||
|
if (!flushOne()) break;
|
||||||
|
}
|
||||||
|
assert this.bufferlength == 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -1,189 +0,0 @@
|
|||||||
// kelondroCachedRA.java
|
|
||||||
// -----------------------
|
|
||||||
// part of The Kelondro Database
|
|
||||||
// (C) by Michael Peter Christen; mc@yacy.net
|
|
||||||
// first published on http://yacy.net
|
|
||||||
// Frankfurt, Germany, 2004-2008
|
|
||||||
// last major change: 04.12.2008
|
|
||||||
//
|
|
||||||
// This program is free software; you can redistribute it and/or modify
|
|
||||||
// it under the terms of the GNU General Public License as published by
|
|
||||||
// the Free Software Foundation; either version 2 of the License, or
|
|
||||||
// (at your option) any later version.
|
|
||||||
//
|
|
||||||
// This program is distributed in the hope that it will be useful,
|
|
||||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
// GNU General Public License for more details.
|
|
||||||
//
|
|
||||||
// You should have received a copy of the GNU General Public License
|
|
||||||
// along with this program; if not, write to the Free Software
|
|
||||||
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
|
||||||
|
|
||||||
package de.anomic.kelondro;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
import de.anomic.server.serverMemory;
|
|
||||||
|
|
||||||
public class kelondroCachedRA extends kelondroAbstractRA implements kelondroRA {
|
|
||||||
|
|
||||||
// a shared cache for all users of this class
|
|
||||||
private static final int elementsize = 8192;
|
|
||||||
private static final int remainingfree = 30 * 1024 * 1024;
|
|
||||||
private static HashMap<String, byte[]> cacheMemory = new HashMap<String, byte[]>();
|
|
||||||
|
|
||||||
// class variables
|
|
||||||
protected kelondroRA ra;
|
|
||||||
private long seekpos;
|
|
||||||
private String id;
|
|
||||||
|
|
||||||
public kelondroCachedRA(final kelondroRA ra) {
|
|
||||||
this.ra = ra;
|
|
||||||
this.name = ra.name();
|
|
||||||
this.file = ra.file();
|
|
||||||
this.id = file.toString();
|
|
||||||
this.seekpos = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
public synchronized long length() throws IOException {
|
|
||||||
return ra.length();
|
|
||||||
}
|
|
||||||
|
|
||||||
public synchronized int available() throws IOException {
|
|
||||||
return (int) (ra.length() - seekpos);
|
|
||||||
}
|
|
||||||
|
|
||||||
private int cacheElementNumber(final long address) {
|
|
||||||
return (int) address / elementsize;
|
|
||||||
}
|
|
||||||
|
|
||||||
private int cacheElementOffset(final long address) {
|
|
||||||
return (int) address % elementsize;
|
|
||||||
}
|
|
||||||
|
|
||||||
private byte[] readCache(final int cacheNr) throws IOException {
|
|
||||||
String key = this.id + cacheNr;
|
|
||||||
byte[] cache = cacheMemory.get(key);
|
|
||||||
if (cache == null) {
|
|
||||||
if (serverMemory.available() < remainingfree) {
|
|
||||||
// delete elements in buffer if buffer too big
|
|
||||||
synchronized(cacheMemory) {
|
|
||||||
Iterator<Map.Entry<String, byte[]>> i = cacheMemory.entrySet().iterator();
|
|
||||||
for (int j = 0; j < 10; j++) {
|
|
||||||
if (!i.hasNext()) break;
|
|
||||||
i.next();
|
|
||||||
i.remove();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// check if we have enough space in the file to read a complete cache element
|
|
||||||
long seek = cacheNr * (long) elementsize;
|
|
||||||
if (ra.length() - seek < elementsize) return null;
|
|
||||||
// add new element
|
|
||||||
cache = new byte[elementsize];
|
|
||||||
ra.seek(seek);
|
|
||||||
ra.readFully(cache, 0, elementsize);
|
|
||||||
cacheMemory.put(key, cache);
|
|
||||||
}
|
|
||||||
return cache;
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean existCache(final int cacheNr) throws IOException {
|
|
||||||
return cacheMemory.containsKey(Integer.valueOf(cacheNr));
|
|
||||||
}
|
|
||||||
|
|
||||||
public synchronized void readFully(byte[] b, int off, int len) throws IOException {
|
|
||||||
final int bn1 = cacheElementNumber(seekpos);
|
|
||||||
final int bn2 = cacheElementNumber(seekpos + len - 1);
|
|
||||||
final int offset = cacheElementOffset(seekpos);
|
|
||||||
final byte[] cache = readCache(bn1);
|
|
||||||
if (bn1 == bn2) {
|
|
||||||
// simple case
|
|
||||||
if (cache == null) {
|
|
||||||
ra.seek(seekpos);
|
|
||||||
ra.readFully(b, off, len);
|
|
||||||
seekpos += len;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
//System.out.println("cache hit");
|
|
||||||
System.arraycopy(cache, offset, b, off, len);
|
|
||||||
seekpos += len;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
assert cache != null;
|
|
||||||
|
|
||||||
// do recursively
|
|
||||||
final int thislen = elementsize - offset;
|
|
||||||
System.arraycopy(cache, offset, b, off, thislen);
|
|
||||||
seekpos += thislen;
|
|
||||||
readFully(b, off + thislen, len - thislen);
|
|
||||||
}
|
|
||||||
|
|
||||||
public synchronized void write(final byte[] b, final int off, final int len) throws IOException {
|
|
||||||
final int bn1 = cacheElementNumber(seekpos);
|
|
||||||
final int bn2 = cacheElementNumber(seekpos + len - 1);
|
|
||||||
final int offset = cacheElementOffset(seekpos);
|
|
||||||
if (bn1 == bn2) {
|
|
||||||
if (existCache(bn1)) {
|
|
||||||
// write to cache and file; here: write only to cache
|
|
||||||
final byte[] cache = readCache(bn1);
|
|
||||||
assert cache != null;
|
|
||||||
System.arraycopy(b, off, cache, offset, len);
|
|
||||||
} else {
|
|
||||||
// in case that the cache could be filled completely
|
|
||||||
// create a new entry here and store it also to the cache
|
|
||||||
if (offset == 0 && len >= elementsize) {
|
|
||||||
final byte[] cache = new byte[elementsize];
|
|
||||||
System.arraycopy(b, off, cache, 0, elementsize);
|
|
||||||
cacheMemory.put(this.id + bn1, cache);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// write to file
|
|
||||||
ra.seek(seekpos);
|
|
||||||
ra.write(b, off, len);
|
|
||||||
seekpos += len;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// do recursively
|
|
||||||
final int thislen = elementsize - offset;
|
|
||||||
if (existCache(bn1)) {
|
|
||||||
// write to cache and file; here: write only to cache
|
|
||||||
final byte[] cache = readCache(bn1);
|
|
||||||
assert cache != null;
|
|
||||||
System.arraycopy(b, off, cache, offset, thislen);
|
|
||||||
} else {
|
|
||||||
// in case that the cache could be filled completely
|
|
||||||
// create a new entry here and store it also to the cache
|
|
||||||
if (offset == 0 && len >= elementsize) {
|
|
||||||
final byte[] cache = new byte[elementsize];
|
|
||||||
System.arraycopy(b, off, cache, 0, elementsize);
|
|
||||||
cacheMemory.put(this.id + bn1, cache);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// write to file
|
|
||||||
ra.seek(seekpos);
|
|
||||||
ra.write(b, off, thislen);
|
|
||||||
seekpos += thislen;
|
|
||||||
write(b, off + thislen, len - thislen);
|
|
||||||
}
|
|
||||||
|
|
||||||
public synchronized void seek(final long pos) throws IOException {
|
|
||||||
seekpos = pos;
|
|
||||||
}
|
|
||||||
|
|
||||||
public synchronized void close() throws IOException {
|
|
||||||
ra.close();
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void finalize() {
|
|
||||||
try {
|
|
||||||
close();
|
|
||||||
} catch (final IOException e) {}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
Loading…
Reference in new issue