// Compressor.java // (C) 2008 by Michael Peter Christen; mc@yacy.net, Frankfurt a. M., Germany // first published 17.10.2008 on http://yacy.net // // This is a part of YaCy, a peer-to-peer based web search engine // // $LastChangedDate$ // $LastChangedRevision$ // $LastChangedBy$ // // LICENSE // // This program is free software; you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation; either version 2 of the License, or // (at your option) any later version. // // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with this program; if not, write to the Free Software // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA package net.yacy.kelondro.blob; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.Iterator; import java.util.Map; import java.util.TreeMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; import java.util.zip.Deflater; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; import net.yacy.cora.order.ByteOrder; import net.yacy.cora.order.CloneableIterator; import net.yacy.cora.util.ByteArray; import net.yacy.cora.util.ConcurrentLog; import net.yacy.cora.util.SpaceExceededException; import net.yacy.kelondro.util.MemoryControl; public class Compressor implements BLOB, Iterable { private static byte[] gzipMagic = {(byte) 'z', (byte) '|'}; // magic for gzip-encoded content private static byte[] plainMagic = {(byte) 'p', (byte) '|'}; // magic for plain content (no encoding) private final BLOB backend; /** entries which are not yet compressed, format is RAW (without magic) */ private TreeMap buffer; /** Total size (in bytes) of uncompressed entries in buffer */ private volatile long bufferlength; /** Maximum {@link #bufferlength} value before compressing and flushing to the backend */ private final long maxbufferlength; /** Maximum time (in milliseconds) to acquire a synchronization lock on get() and insert() */ private volatile long lockTimeout; /** Synchronization lock */ private final ReentrantLock lock; /** The compression level */ private volatile int compressionLevel; /** * @param backend the backend storage * @param buffersize the maximum total size (in bytes) of uncompressed in-memory entries before compressing and flushing to the backend * @param lockTimeout maximum time to acquire a synchronization lock on get() and insert() operations * @param compressionLevel the compression level : supported values ranging from 0 - no compression, to 9 - best compression */ public Compressor(final BLOB backend, final long buffersize, final long lockTimeout, final int compressionLevel) { this.backend = backend; this.maxbufferlength = buffersize; this.lockTimeout = lockTimeout; this.lock = new ReentrantLock(); /* Ensure a value within the range supported by the Deflater class */ this.compressionLevel = Math.max(Deflater.NO_COMPRESSION, Math.min(Deflater.BEST_COMPRESSION, compressionLevel)); initBuffer(); } @Override public long mem() { return this.backend.mem(); } @Override public void optimize() { this.backend.optimize(); } @Override public String name() { return this.backend.name(); } @Override public void clear() throws IOException { this.lock.lock(); try { initBuffer(); this.backend.clear(); } finally { this.lock.unlock(); } } private void initBuffer() { this.buffer = new TreeMap(this.backend.ordering()); this.bufferlength = 0; } @Override public ByteOrder ordering() { return this.backend.ordering(); } @Override public void close(final boolean writeIDX) { this.lock.lock(); try { // no more thread is running, flush all queues flushAll(); this.backend.close(writeIDX); } finally { this.lock.unlock(); } } private static byte[] compress(final byte[] b, final int compressionLevel) { final int l = b.length; if (l < 100) return markWithPlainMagic(b); final byte[] bb = compressAddMagic(b, compressionLevel); if (bb.length >= l) return markWithPlainMagic(b); return bb; } private static byte[] compressAddMagic(final byte[] b, final int compressionLevel) { // compress a byte array and add a leading magic for the compression try { //System.out.print("/(" + cdr + ")"); // DEBUG final ByteArrayOutputStream baos = new ByteArrayOutputStream(b.length / 5); baos.write(gzipMagic); final OutputStream os = new GZIPOutputStream(baos, 65536){{def.setLevel(compressionLevel);}}; os.write(b); os.close(); baos.close(); return baos.toByteArray(); } catch (final IOException e) { ConcurrentLog.severe("Compressor", "", e); return null; } } private static byte[] markWithPlainMagic(final byte[] b) { //System.out.print("+"); // DEBUG final byte[] r = new byte[b.length + 2]; r[0] = plainMagic[0]; r[1] = plainMagic[1]; System.arraycopy(b, 0, r, 2, b.length); return r; } private static byte[] decompress(final byte[] b) { // use a magic in the head of the bytes to identify compression type if (b == null) return null; if (ByteArray.startsWith(b, gzipMagic)) { //System.out.print("\\"); // DEBUG final ByteArrayInputStream bais = new ByteArrayInputStream(b); // eat up the magic bais.read(); bais.read(); // decompress what is remaining InputStream gis; try { gis = new GZIPInputStream(bais); final ByteArrayOutputStream baos = new ByteArrayOutputStream(b.length); final byte[] buf = new byte[1024 * 4]; int n; while ((n = gis.read(buf)) > 0) baos.write(buf, 0, n); gis.close(); bais.close(); baos.close(); return baos.toByteArray(); } catch (final IOException e) { ConcurrentLog.logException(e); return null; } } else if (ByteArray.startsWith(b, plainMagic)) { //System.out.print("-"); // DEBUG final byte[] r = new byte[b.length - 2]; System.arraycopy(b, 2, r, 0, b.length - 2); return r; } else { // we consider that the entry is also plain, but without leading magic return b; } } @Override public byte[] get(final byte[] key) throws IOException, SpaceExceededException { // depending on the source of the result, we additionally do entry compression // because if a document was read once, we think that it will not be retrieved another time again soon byte[] b = null; boolean locked = false; try { locked = this.lock.tryLock(this.lockTimeout, TimeUnit.MILLISECONDS); } catch (InterruptedException ignored) { /* When interrupted, simply return null */ ConcurrentLog.fine("Compressor", "Interrupted while acquiring a synchronzation lock on get()"); } if(locked) { try { b = this.buffer.remove(key); if (b != null) { this.bufferlength = this.bufferlength - b.length; this.backend.insert(key, compress(b, this.compressionLevel)); return b; } } finally { this.lock.unlock(); } // return from the backend b = this.backend.get(key); if (b == null) return null; if (!MemoryControl.request(b.length * 2, true)) { throw new SpaceExceededException(b.length * 2, "decompress needs 2 * " + b.length + " bytes"); } return decompress(b); } ConcurrentLog.fine("Compressor", "Could not acquire a synchronization lock for retrieval within " + this.lockTimeout + " milliseconds"); return b; } @Override public byte[] get(final Object key) { if (!(key instanceof byte[])) return null; try { return get((byte[]) key); } catch (final IOException e) { ConcurrentLog.logException(e); } catch (final SpaceExceededException e) { ConcurrentLog.logException(e); } return null; } @Override public boolean containsKey(final byte[] key) { this.lock.lock(); try { return this.buffer.containsKey(key) || this.backend.containsKey(key); } finally { this.lock.unlock(); } } @Override public int keylength() { return this.backend.keylength(); } @Override public long length() { this.lock.lock(); try { return this.backend.length() + this.bufferlength; } catch (final IOException e) { ConcurrentLog.logException(e); return 0; } finally { this.lock.unlock(); } } @Override public long length(final byte[] key) throws IOException { this.lock.lock(); try { byte[] b = this.buffer.get(key); if (b != null) return b.length; try { b = this.backend.get(key); if (b == null) return 0; b = decompress(b); return (b == null) ? 0 : b.length; } catch (final SpaceExceededException e) { throw new IOException(e.getMessage()); } } finally { this.lock.unlock(); } } private int removeFromQueues(final byte[] key) { final byte[] b = this.buffer.remove(key); if (b != null) return b.length; return 0; } @Override public void insert(final byte[] key, final byte[] b) throws IOException { boolean locked = false; try { locked = this.lock.tryLock(this.lockTimeout, TimeUnit.MILLISECONDS); } catch (InterruptedException ignored) { /* When interrupted, simply nothing is inserted */ ConcurrentLog.fine("Compressor", "Interrupted while acquiring a synchronzation lock on insert()"); } if(locked) { try { // first ensure that the files do not exist anywhere delete(key); // check if the buffer is full or could be full after this write if (this.bufferlength + b.length * 2 > this.maxbufferlength) { // in case that we compress, just compress as much as is necessary to get enough room while (this.bufferlength + b.length * 2 > this.maxbufferlength) { if (this.buffer.isEmpty()) break; flushOne(); } // in case that this was not enough, just flush all if (this.bufferlength + b.length * 2 > this.maxbufferlength) flushAll(); } // files are written uncompressed to the uncompressed-queue // they are either written uncompressed to the database // or compressed later this.buffer.put(key, b); this.bufferlength += b.length; } finally { this.lock.unlock(); } if (MemoryControl.shortStatus()) { flushAll(); } } else { ConcurrentLog.fine("Compressor", "Could not acquire a synchronization lock for insertion within " + this.lockTimeout + " milliseconds"); } } @Override public void delete(final byte[] key) throws IOException { this.lock.lock(); try { this.backend.delete(key); final long rx = removeFromQueues(key); if (rx > 0) this.bufferlength -= rx; } finally { this.lock.unlock(); } } @Override public int size() { this.lock.lock(); try { return this.backend.size() + this.buffer.size(); } finally { this.lock.unlock(); } } @Override public boolean isEmpty() { this.lock.lock(); try { if (!this.backend.isEmpty()) return false; if (!this.buffer.isEmpty()) return false; return true; } finally { this.lock.unlock(); } } @Override public CloneableIterator keys(final boolean up, final boolean rotating) throws IOException { this.lock.lock(); try { flushAll(); return this.backend.keys(up, rotating); } finally { this.lock.unlock(); } } @Override public CloneableIterator keys(final boolean up, final byte[] firstKey) throws IOException { this.lock.lock(); try { flushAll(); return this.backend.keys(up, firstKey); } finally { this.lock.unlock(); } } @Override public Iterator iterator() { flushAll(); try { return this.backend.keys(true, false); } catch (final IOException e) { return null; } } private boolean flushOne() { if (this.buffer.isEmpty()) return false; // depending on process case, write it to the file or compress it to the other queue final Map.Entry entry = this.buffer.entrySet().iterator().next(); this.buffer.remove(entry.getKey()); try { this.backend.insert(entry.getKey(), compress(entry.getValue(), this.compressionLevel)); this.bufferlength -= entry.getValue().length; return true; } catch (final IOException e) { this.buffer.put(entry.getKey(), entry.getValue()); return false; } } public void flushAll() { this.lock.lock(); try { while (!this.buffer.isEmpty()) { if (!flushOne()) { break; } } } finally { this.lock.unlock(); } } @Override public int replace(final byte[] key, final Rewriter rewriter) throws IOException, SpaceExceededException { final byte[] b = get(key); if (b == null) return 0; final byte[] c = rewriter.rewrite(b); final int reduction = c.length - b.length; assert reduction >= 0; if (reduction == 0) return 0; insert(key, c); return reduction; } @Override public int reduce(final byte[] key, final Reducer reducer) throws IOException, SpaceExceededException { final byte[] b = get(key); if (b == null) return 0; final byte[] c = reducer.rewrite(b); final int reduction = c.length - b.length; assert reduction >= 0; if (reduction == 0) return 0; insert(key, c); return reduction; } /** * Set the new content compression level. * @param compressionLevel the new compression level. Supported values between 0 (no compression) and 9 (best compression). */ public void setCompressionLevel(final int compressionLevel) { /* Ensure a value within the range supported by the Deflater class */ this.compressionLevel = Math.max(Deflater.NO_COMPRESSION, Math.min(Deflater.BEST_COMPRESSION, compressionLevel)); } /** * Set the new synchronization lock timeout. * @param lockTimeout the new synchronization lock timeout (in milliseconds). */ public void setLockTimeout(final long lockTimeout) { this.lockTimeout = lockTimeout; } }