diff --git a/source/de/anomic/crawler/CrawlProfile.java b/source/de/anomic/crawler/CrawlProfile.java index 8b7383570..8ba34ee90 100644 --- a/source/de/anomic/crawler/CrawlProfile.java +++ b/source/de/anomic/crawler/CrawlProfile.java @@ -67,7 +67,7 @@ public class CrawlProfile { public CrawlProfile(final File file) throws IOException { this.profileTableFile = file; profileTableFile.getParentFile().mkdirs(); - final kelondroBLOB dyn = new kelondroBLOBHeap(profileTableFile, yacySeedDB.commonHashLength, kelondroNaturalOrder.naturalOrder); + final kelondroBLOB dyn = new kelondroBLOBHeap(profileTableFile, yacySeedDB.commonHashLength, kelondroNaturalOrder.naturalOrder, 1024 * 64); profileTable = new kelondroMap(dyn, 500); } diff --git a/source/de/anomic/crawler/RobotsTxt.java b/source/de/anomic/crawler/RobotsTxt.java index 8d6e1544d..390313637 100644 --- a/source/de/anomic/crawler/RobotsTxt.java +++ b/source/de/anomic/crawler/RobotsTxt.java @@ -71,7 +71,7 @@ public class RobotsTxt { kelondroBLOB blob = null; if (robotsTableFile.getName().endsWith(".heap")) { try { - blob = new kelondroBLOBHeap(robotsTableFile, 64, kelondroNaturalOrder.naturalOrder); + blob = new kelondroBLOBHeap(robotsTableFile, 64, kelondroNaturalOrder.naturalOrder, 1024 * 1024); } catch (final IOException e) { e.printStackTrace(); } diff --git a/source/de/anomic/index/indexContainerHeap.java b/source/de/anomic/index/indexContainerHeap.java index 879e1f962..d412f72ea 100755 --- a/source/de/anomic/index/indexContainerHeap.java +++ b/source/de/anomic/index/indexContainerHeap.java @@ -43,7 +43,6 @@ import java.util.SortedMap; import java.util.TreeMap; import de.anomic.kelondro.kelondroBLOB; -import de.anomic.kelondro.kelondroBLOBBuffer; import de.anomic.kelondro.kelondroBLOBHeap; import de.anomic.kelondro.kelondroBase64Order; import de.anomic.kelondro.kelondroByteOrder; @@ -145,7 +144,7 @@ public final class indexContainerHeap { assert this.cache != null; if (log != null) log.logInfo("creating alternative rwi heap dump '" + heapFile.getName() + "', " + cache.size() + " rwi's"); if (heapFile.exists()) heapFile.delete(); - final kelondroBLOB dump = new kelondroBLOBBuffer(new kelondroBLOBHeap(heapFile, payloadrow.primaryKeyLength, kelondroBase64Order.enhancedCoder), 1024 * 1024 * 2, true); + final kelondroBLOB dump = new kelondroBLOBHeap(heapFile, payloadrow.primaryKeyLength, kelondroBase64Order.enhancedCoder, 1024 * 1024 * 10); final long startTime = System.currentTimeMillis(); long wordcount = 0, urlcount = 0; String wordHash; diff --git a/source/de/anomic/kelondro/kelondroAbstractRA.java b/source/de/anomic/kelondro/kelondroAbstractRA.java index ea1fd3404..4b7b808ea 100644 --- a/source/de/anomic/kelondro/kelondroAbstractRA.java +++ b/source/de/anomic/kelondro/kelondroAbstractRA.java @@ -68,7 +68,7 @@ abstract class kelondroAbstractRA implements kelondroRA { public short readShort() throws IOException { byte[] b = new byte[2]; this.readFully(b, 0, 2); - if ((b[0] | b[1]) < 0) throw new IOException("kelondroAbstractRA.readInt: wrong values; ch1=" + (b[0] & 0xFF) + ", ch2=" + (b[1] & 0xFF)); + //if ((b[0] | b[1]) < 0) throw new IOException("kelondroAbstractRA.readInt: wrong values; ch1=" + (b[0] & 0xFF) + ", ch2=" + (b[1] & 0xFF)); return (short) (((b[0] & 0xFF) << 8) | (b[1] & 0xFF)); } @@ -82,17 +82,21 @@ abstract class kelondroAbstractRA implements kelondroRA { public int readInt() throws IOException { byte[] b = new byte[4]; this.readFully(b, 0, 4); - if ((b[0] | b[1] | b[2] | b[3]) < 0) throw new IOException("kelondroAbstractRA.readInt: wrong values; ch1=" + (b[0] & 0xFF) + ", ch2=" + (b[1] & 0xFF) + ", ch3=" + (b[2] & 0xFF) + ", ch4=" + (b[3] & 0xFF)); + //if ((b[0] | b[1] | b[2] | b[3]) < 0) throw new IOException("kelondroAbstractRA.readInt: wrong values; ch1=" + (b[0] & 0xFF) + ", ch2=" + (b[1] & 0xFF) + ", ch3=" + (b[2] & 0xFF) + ", ch4=" + (b[3] & 0xFF)); return (((b[0] & 0xFF) << 24) | ((b[1] & 0xFF) << 16) | ((b[2] & 0xFF) << 8) | (b[3] & 0xFF)); } public void writeInt(final int v) throws IOException { + this.write(int2array(v)); + } + + public final static byte[] int2array(final int v) throws IOException { byte[] b = new byte[4]; b[0] = (byte) ((v >>> 24) & 0xFF); b[1] = (byte) ((v >>> 16) & 0xFF); b[2] = (byte) ((v >>> 8) & 0xFF); b[3] = (byte) ( v & 0xFF); - this.write(b); + return b; } public long readLong() throws IOException { diff --git a/source/de/anomic/kelondro/kelondroBLOBArray.java b/source/de/anomic/kelondro/kelondroBLOBArray.java index 22a776506..cce5bf460 100755 --- a/source/de/anomic/kelondro/kelondroBLOBArray.java +++ b/source/de/anomic/kelondro/kelondroBLOBArray.java @@ -65,15 +65,18 @@ public class kelondroBLOBArray implements kelondroBLOB { private long repositorySizeMax; private List blobs; private String blobSalt; + private int buffersize; public kelondroBLOBArray( final File heapLocation, final String blobSalt, final int keylength, - final kelondroByteOrder ordering) throws IOException { + final kelondroByteOrder ordering, + final int buffersize) throws IOException { this.keylength = keylength; this.blobSalt = blobSalt; this.ordering = ordering; + this.buffersize = buffersize; this.heapLocation = heapLocation; this.fileAgeLimit = oneMonth; this.fileSizeLimit = oneGigabyte; @@ -93,14 +96,26 @@ public class kelondroBLOBArray implements kelondroBLOB { TreeMap sortedItems = new TreeMap(); kelondroBLOB oneBlob; File f; + long time, maxtime = 0; + // first find maximum time: the file with this time will be given a write buffer for (int i = 0; i < files.length; i++) { if (files[i].length() >= 19 && files[i].endsWith(".blob")) { try { d = serverDate.parseShortSecond(files[i].substring(0, 14)); + time = d.getTime(); + if (time > maxtime) maxtime = time; + } catch (ParseException e) {continue;} + } + } + for (int i = 0; i < files.length; i++) { + if (files[i].length() >= 19 && files[i].endsWith(".blob")) { + try { + d = serverDate.parseShortSecond(files[i].substring(0, 14)); + f = new File(heapLocation, files[i]); + time = d.getTime(); + oneBlob = new kelondroBLOBHeap(f, keylength, ordering, (time == maxtime) ? buffersize : 0); + sortedItems.put(Long.valueOf(time), new blobItem(d, f, oneBlob)); } catch (ParseException e) {continue;} - f = new File(heapLocation, files[i]); - oneBlob = new kelondroBLOBHeap(f, keylength, ordering); - sortedItems.put(Long.valueOf(d.getTime()), new blobItem(d, f, oneBlob)); } } @@ -172,7 +187,7 @@ public class kelondroBLOBArray implements kelondroBLOB { // make a new blob file and assign it in this item this.creation = new Date(); this.location = new File(heapLocation, serverDate.formatShortSecond(creation) + "." + blobSalt + ".blob"); - this.blob = new kelondroBLOBHeap(location, keylength, ordering); + this.blob = new kelondroBLOBHeap(location, keylength, ordering, buffersize); } } @@ -323,7 +338,7 @@ public class kelondroBLOBArray implements kelondroBLOB { final File f = new File("/Users/admin/blobarraytest"); try { //f.delete(); - final kelondroBLOBArray heap = new kelondroBLOBArray(f, "test", 12, kelondroNaturalOrder.naturalOrder); + final kelondroBLOBArray heap = new kelondroBLOBArray(f, "test", 12, kelondroNaturalOrder.naturalOrder, 512 * 1024); heap.put("aaaaaaaaaaaa".getBytes(), "eins zwei drei".getBytes()); heap.put("aaaaaaaaaaab".getBytes(), "vier fuenf sechs".getBytes()); heap.put("aaaaaaaaaaac".getBytes(), "sieben acht neun".getBytes()); diff --git a/source/de/anomic/kelondro/kelondroBLOBBuffer.java b/source/de/anomic/kelondro/kelondroBLOBBuffer.java deleted file mode 100644 index 3a9722f6d..000000000 --- a/source/de/anomic/kelondro/kelondroBLOBBuffer.java +++ /dev/null @@ -1,383 +0,0 @@ -// kelondroBLOBBuffer.java -// (C) 2008 by Michael Peter Christen; mc@yacy.net, Frankfurt a. M., Germany -// first published 17.10.2008 on http://yacy.net -// -// This is a part of YaCy, a peer-to-peer based web search engine -// -// $LastChangedDate: 2006-04-02 22:40:07 +0200 (So, 02 Apr 2006) $ -// $LastChangedRevision: 1986 $ -// $LastChangedBy: orbiter $ -// -// LICENSE -// -// This program is free software; you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation; either version 2 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License -// along with this program; if not, write to the Free Software -// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - - -package de.anomic.kelondro; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Iterator; -import java.util.Map; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.zip.GZIPInputStream; -import java.util.zip.GZIPOutputStream; - -public class kelondroBLOBBuffer extends Thread implements kelondroBLOB { - - static byte[] gzipMagic = {(byte) 'z', (byte) '|'}; // magic for gzip-encoded content - static byte[] plainMagic = {(byte) 'p', (byte) '|'}; // magic for plain content (no encoding) - - private kelondroBLOB backend; - private LinkedBlockingQueue> rawQueue; // entries which are not compressed, format is RAW (without magic) - private LinkedBlockingQueue> compressedQueue; // entries which are compressed, format is with leading magic - private kelondroBytesIntMap contentLength; - private long queueLength; - private long maxCacheSize; - private int cdr; - - private class Entry implements Map.Entry { - - byte[] key, value; - - public Entry(byte[] key, byte[] value) { - this.key = key; - this.value = value; - } - - public byte[] getKey() { - return this.key; - } - - public byte[] getValue() { - return this.value; - } - - public byte[] setValue(byte[] value) { - byte[] b = this.value; - this.value = value; - return b; - } - - } - - public kelondroBLOBBuffer(kelondroBLOB backend, long cachesize, boolean compress) { - this.backend = backend; - this.maxCacheSize = cachesize; - cdr = 0; - initQueues(compress); - } - - public String name() { - return this.backend.name(); - } - - public synchronized void clear() throws IOException { - initQueues(this.compressedQueue != null); - this.backend.clear(); - } - - private void initQueues(boolean compress) { - this.rawQueue = new LinkedBlockingQueue>(); - this.compressedQueue = (compress) ? new LinkedBlockingQueue>() : null; - this.contentLength = new kelondroBytesIntMap(backend.keylength(), backend.ordering(), 500); - this.queueLength = 0; - } - - public kelondroByteOrder ordering() { - return this.backend.ordering(); - } - - public synchronized void close() { - // no more thread is running, flush all queues - try { - flushAll(); - } catch (IOException e) { - e.printStackTrace(); - } - this.backend.close(); - } - - private byte[] compress(byte[] b) { - // compressed a byte array and adds a leading magic for the compression - try { - cdr++; - //System.out.print("/(" + cdr + ")"); // DEBUG - final ByteArrayOutputStream baos = new ByteArrayOutputStream(b.length / 5); - baos.write(gzipMagic); - final OutputStream os = new GZIPOutputStream(baos, 512); - os.write(b); - os.close(); - baos.close(); - return baos.toByteArray(); - } catch (IOException e) { - e.printStackTrace(); - return null; - } - } - - private byte[] markWithPlainMagic(byte[] b) { - //System.out.print("+"); // DEBUG - byte[] r = new byte[b.length + 2]; - r[0] = plainMagic[0]; - r[1] = plainMagic[1]; - System.arraycopy(b, 0, r, 2, b.length); - return r; - } - - private byte[] decompress(byte[] b) { - // use a magic in the head of the bytes to identify compression type - if (kelondroByteArray.equals(b, gzipMagic)) { - //System.out.print("\\"); // DEBUG - cdr--; - ByteArrayInputStream bais = new ByteArrayInputStream(b); - // eat up the magic - bais.read(); - bais.read(); - // decompress what is remaining - InputStream gis; - try { - gis = new GZIPInputStream(bais); - final ByteArrayOutputStream baos = new ByteArrayOutputStream(b.length); - final byte[] buf = new byte[1024]; - int n; - while ((n = gis.read(buf)) > 0) baos.write(buf, 0, n); - gis.close(); - bais.close(); - baos.close(); - - return baos.toByteArray(); - } catch (IOException e) { - e.printStackTrace(); - return null; - } - } else if (kelondroByteArray.equals(b, plainMagic)) { - System.out.print("-"); // DEBUG - byte[] r = new byte[b.length - 2]; - System.arraycopy(b, 2, r, 0, b.length - 2); - return r; - } else { - // we consider that the entry is also plain, but without leading magic - return b; - } - } - - private byte[] getFromQueue(byte[] key, LinkedBlockingQueue> queue) { - Iterator> i = queue.iterator(); - Map.Entry e; - while (i.hasNext()) { - e = i.next(); - if (kelondroByteArray.equals(key, e.getKey())) return e.getValue(); - } - return null; - } - - public synchronized byte[] get(byte[] key) throws IOException { - // depending on the source of the result, we additionally do entry compression - // because if a document was read once, we think that it will not be retrieved another time again soon - byte[] b; - if (this.compressedQueue == null) { - b = getFromQueue(key, rawQueue); - if (b != null) return b; - } else { - b = removeFromQueue(key, rawQueue); - if (b != null) { - // put the entry on the compressed queue - byte[] bb = compress(b); - this.compressedQueue.add(new Entry(key, bb)); - this.queueLength = this.queueLength - b.length + bb.length; - return b; - } - } - // no special handling for elements from the compressed queue - b = (compressedQueue == null) ? null : getFromQueue(key, compressedQueue); - if (b != null) { - //System.out.print("CASEA"); // DEBUG - return decompress(b); - } - - // finally return from the backend - b = this.backend.get(key); - if (b == null) return null; - return decompress(b); - } - - private boolean hasInQueue(byte[] key, LinkedBlockingQueue> queue) { - Iterator> i = queue.iterator(); - Map.Entry e; - while (i.hasNext()) { - e = i.next(); - if (kelondroByteArray.equals(key, e.getKey())) return true; - } - return false; - } - - public synchronized boolean has(byte[] key) throws IOException { - return - (rawQueue != null && hasInQueue(key, rawQueue)) || - (compressedQueue != null && hasInQueue(key, compressedQueue)) || - this.backend.has(key); - } - - public int keylength() { - return this.backend.keylength(); - } - - public synchronized long length() { - try { - return this.backend.length() + this.queueLength; - } catch (IOException e) { - e.printStackTrace(); - return 0; - } - } - - public synchronized long length(byte[] key) throws IOException { - int i = this.contentLength.geti(key); - if (i >= 0) { - //System.out.print("CASEC"); // DEBUG - return (long) i; - } - byte[] b = getFromQueue(key, rawQueue); - if (b != null) return b.length; - b = (compressedQueue == null) ? null : getFromQueue(key, compressedQueue); - if (b != null) { - //System.out.print("CASEB"); // DEBUG - return decompress(b).length; - } - return this.backend.length(key); - } - - private byte[] removeFromQueue(byte[] key, LinkedBlockingQueue> queue) { - Iterator> i = queue.iterator(); - Map.Entry e; - while (i.hasNext()) { - e = i.next(); - if (kelondroByteArray.equals(key, e.getKey())) { - i.remove(); - return e.getValue(); - } - } - return null; - } - - private int removeFromQueues(byte[] key) throws IOException { - byte[] b = removeFromQueue(key, rawQueue); - if (b != null) return b.length; - b = (compressedQueue == null) ? null : removeFromQueue(key, compressedQueue); - if (b != null) return b.length; - return 0; - } - - public synchronized void put(byte[] key, byte[] b) throws IOException { - - // first ensure that the files do not exist anywhere - this.backend.remove(key); - long rx = removeFromQueues(key); - if (rx > 0) this.queueLength -= rx; - - // check if the buffer is full or could be full after this write - if (this.queueLength + b.length * 2 > this.maxCacheSize) { - // in case that we compress, just compress as much as is necessary to get enough room - if (this.compressedQueue == null) { - flushAll(); - } else { - while (this.queueLength + b.length * 2 > this.maxCacheSize && this.rawQueue.size() > 0) { - flushOneRaw(); - } - // in case that this was not enough, just flush all - if (this.queueLength + b.length * 2 > this.maxCacheSize) flushAll(); - } - } - - // files are written uncompressed to the uncompressed-queue - // they are either written uncompressed to the database - // or compressed later - this.rawQueue.add(new Entry(key, b)); - this.queueLength += b.length; - this.contentLength.puti(key, b.length); - if (this.contentLength.size() > 500) this.contentLength.clear(); // prevent the case that this object becomes a memory leak - } - - public synchronized void remove(byte[] key) throws IOException { - this.backend.remove(key); - long rx = removeFromQueues(key); - if (rx > 0) this.queueLength -= rx; - } - - public int size() { - return this.backend.size(); - } - - public synchronized kelondroCloneableIterator keys(boolean up, boolean rotating) throws IOException { - flushAll(); - return this.backend.keys(up, rotating); - } - - public synchronized kelondroCloneableIterator keys(boolean up, byte[] firstKey) throws IOException { - flushAll(); - return this.backend.keys(up, firstKey); - } - - private boolean flushOneRaw() throws IOException { - if (this.rawQueue.size() == 0) return false; - // depending on process case, write it to the file or compress it to the other queue - try { - Map.Entry entry = this.rawQueue.take(); - this.queueLength -= entry.getValue().length; - if (this.compressedQueue != null) { - entry.setValue(compress(entry.getValue())); - this.queueLength += entry.getValue().length; - this.compressedQueue.add(entry); - } else { - this.backend.put(entry.getKey(), markWithPlainMagic(entry.getValue())); - assert this.queueLength == 0; - if (this.rawQueue.size() == 0) this.queueLength = 0; - } - return true; - } catch (InterruptedException e) { - return false; - } - } - - private boolean flushOneCompressed() throws IOException { - if (this.compressedQueue == null || this.compressedQueue.size() == 0) return false; - // write compressed entry to the file - try { - //System.out.print("#"); // DEBUG - Map.Entry entry = this.compressedQueue.take(); - this.queueLength -= entry.getValue().length; - this.backend.put(entry.getKey(), entry.getValue()); - if (this.rawQueue.size() == 0 && this.compressedQueue.size() == 0) this.queueLength = 0; - return true; - } catch (InterruptedException e) { - return false; - } - } - - private void flushAll() throws IOException { - while (this.rawQueue.size() > 0) { - if (!flushOneRaw()) break; - } - while (this.compressedQueue != null && this.compressedQueue.size() > 0) { - if (!flushOneCompressed()) break; - } - assert this.queueLength == 0; - } - -} diff --git a/source/de/anomic/kelondro/kelondroBLOBCompressor.java b/source/de/anomic/kelondro/kelondroBLOBCompressor.java new file mode 100644 index 000000000..c6b5f9efb --- /dev/null +++ b/source/de/anomic/kelondro/kelondroBLOBCompressor.java @@ -0,0 +1,267 @@ +// kelondroBLOBCompressor.java +// (C) 2008 by Michael Peter Christen; mc@yacy.net, Frankfurt a. M., Germany +// first published 17.10.2008 on http://yacy.net +// +// This is a part of YaCy, a peer-to-peer based web search engine +// +// $LastChangedDate: 2006-04-02 22:40:07 +0200 (So, 02 Apr 2006) $ +// $LastChangedRevision: 1986 $ +// $LastChangedBy: orbiter $ +// +// LICENSE +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation; either version 2 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + + +package de.anomic.kelondro; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.HashMap; +import java.util.Map; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +public class kelondroBLOBCompressor extends Thread implements kelondroBLOB { + + static byte[] gzipMagic = {(byte) 'z', (byte) '|'}; // magic for gzip-encoded content + static byte[] plainMagic = {(byte) 'p', (byte) '|'}; // magic for plain content (no encoding) + + private kelondroBLOB backend; + private HashMap buffer; // entries which are not yet compressed, format is RAW (without magic) + private long bufferlength; + private long maxbufferlength; + private int cdr; + + public kelondroBLOBCompressor(kelondroBLOB backend, long buffersize) { + this.backend = backend; + this.maxbufferlength = buffersize; + this.cdr = 0; + initBuffer(); + } + + public String name() { + return this.backend.name(); + } + + public synchronized void clear() throws IOException { + initBuffer(); + this.backend.clear(); + } + + private void initBuffer() { + this.buffer = new HashMap(); + this.bufferlength = 0; + } + + public kelondroByteOrder ordering() { + return this.backend.ordering(); + } + + public synchronized void close() { + // no more thread is running, flush all queues + try { + flushAll(); + } catch (IOException e) { + e.printStackTrace(); + } + this.backend.close(); + } + + private byte[] compress(byte[] b) { + int l = b.length; + if (l < 100) return markWithPlainMagic(b); + byte[] bb = compressAddMagic(b); + if (bb.length >= l) return markWithPlainMagic(b); + return bb; + } + + private byte[] compressAddMagic(byte[] b) { + // compress a byte array and add a leading magic for the compression + try { + cdr++; + //System.out.print("/(" + cdr + ")"); // DEBUG + final ByteArrayOutputStream baos = new ByteArrayOutputStream(b.length / 5); + baos.write(gzipMagic); + final OutputStream os = new GZIPOutputStream(baos, 512); + os.write(b); + os.close(); + baos.close(); + return baos.toByteArray(); + } catch (IOException e) { + e.printStackTrace(); + return null; + } + } + + private byte[] markWithPlainMagic(byte[] b) { + //System.out.print("+"); // DEBUG + byte[] r = new byte[b.length + 2]; + r[0] = plainMagic[0]; + r[1] = plainMagic[1]; + System.arraycopy(b, 0, r, 2, b.length); + return r; + } + + private byte[] decompress(byte[] b) { + // use a magic in the head of the bytes to identify compression type + if (kelondroByteArray.equals(b, gzipMagic)) { + //System.out.print("\\"); // DEBUG + cdr--; + ByteArrayInputStream bais = new ByteArrayInputStream(b); + // eat up the magic + bais.read(); + bais.read(); + // decompress what is remaining + InputStream gis; + try { + gis = new GZIPInputStream(bais); + final ByteArrayOutputStream baos = new ByteArrayOutputStream(b.length); + final byte[] buf = new byte[1024]; + int n; + while ((n = gis.read(buf)) > 0) baos.write(buf, 0, n); + gis.close(); + bais.close(); + baos.close(); + + return baos.toByteArray(); + } catch (IOException e) { + e.printStackTrace(); + return null; + } + } else if (kelondroByteArray.equals(b, plainMagic)) { + System.out.print("-"); // DEBUG + byte[] r = new byte[b.length - 2]; + System.arraycopy(b, 2, r, 0, b.length - 2); + return r; + } else { + // we consider that the entry is also plain, but without leading magic + return b; + } + } + + public synchronized byte[] get(byte[] key) throws IOException { + // depending on the source of the result, we additionally do entry compression + // because if a document was read once, we think that it will not be retrieved another time again soon + byte[] b = buffer.remove(new String(key)); + if (b != null) { + // compress the entry now and put it to the backend + byte[] bb = compress(b); + this.backend.put(key, bb); + this.bufferlength = this.bufferlength - b.length; + return b; + } + + // return from the backend + b = this.backend.get(key); + if (b == null) return null; + return decompress(b); + } + + public synchronized boolean has(byte[] key) throws IOException { + return + this.buffer.containsKey(new String(key)) || this.backend.has(key); + } + + public int keylength() { + return this.backend.keylength(); + } + + public synchronized long length() { + try { + return this.backend.length() + this.bufferlength; + } catch (IOException e) { + e.printStackTrace(); + return 0; + } + } + + public synchronized long length(byte[] key) throws IOException { + byte[] b = buffer.get(new String(key)); + if (b != null) return b.length; + return decompress(this.backend.get(key)).length; + } + + private int removeFromQueues(byte[] key) throws IOException { + byte[] b = buffer.remove(new String(key)); + if (b != null) return b.length; + return 0; + } + + public synchronized void put(byte[] key, byte[] b) throws IOException { + + // first ensure that the files do not exist anywhere + remove(key); + + // check if the buffer is full or could be full after this write + if (this.bufferlength + b.length * 2 > this.maxbufferlength) { + // in case that we compress, just compress as much as is necessary to get enough room + while (this.bufferlength + b.length * 2 > this.maxbufferlength && this.buffer.size() > 0) { + flushOne(); + } + // in case that this was not enough, just flush all + if (this.bufferlength + b.length * 2 > this.maxbufferlength) flushAll(); + } + + // files are written uncompressed to the uncompressed-queue + // they are either written uncompressed to the database + // or compressed later + this.buffer.put(new String(key), b); + this.bufferlength += b.length; + } + + public synchronized void remove(byte[] key) throws IOException { + this.backend.remove(key); + long rx = removeFromQueues(key); + if (rx > 0) this.bufferlength -= rx; + } + + public int size() { + return this.backend.size() + this.buffer.size(); + } + + public synchronized kelondroCloneableIterator keys(boolean up, boolean rotating) throws IOException { + flushAll(); + return this.backend.keys(up, rotating); + } + + public synchronized kelondroCloneableIterator keys(boolean up, byte[] firstKey) throws IOException { + flushAll(); + return this.backend.keys(up, firstKey); + } + + private boolean flushOne() throws IOException { + if (this.buffer.size() == 0) return false; + // depending on process case, write it to the file or compress it to the other queue + Map.Entry entry = this.buffer.entrySet().iterator().next(); + this.buffer.remove(entry.getKey()); + byte[] b = entry.getValue(); + this.bufferlength -= b.length; + b = compress(b); + this.backend.put(entry.getKey().getBytes(), b); + return true; + } + + private void flushAll() throws IOException { + while (this.buffer.size() > 0) { + if (!flushOne()) break; + } + assert this.bufferlength == 0; + } + +} diff --git a/source/de/anomic/kelondro/kelondroBLOBHeap.java b/source/de/anomic/kelondro/kelondroBLOBHeap.java index aa940627b..c8a374185 100755 --- a/source/de/anomic/kelondro/kelondroBLOBHeap.java +++ b/source/de/anomic/kelondro/kelondroBLOBHeap.java @@ -28,7 +28,7 @@ package de.anomic.kelondro; import java.io.File; import java.io.IOException; -import java.io.RandomAccessFile; +import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.SortedMap; @@ -40,11 +40,15 @@ import de.anomic.server.logging.serverLog; public final class kelondroBLOBHeap implements kelondroBLOB { - private kelondroBytesLongMap index; // key/seek relation for used records - private TreeMap free; // list of {size, seek} pairs denoting space and position of free records - private final File heapFile; // the file of the heap - private final kelondroByteOrder ordering; // the ordering on keys - private RandomAccessFile file; // a random access to the file + + private kelondroBytesLongMap index; // key/seek relation for used records + private TreeMap free; // list of {size, seek} pairs denoting space and position of free records + private final File heapFile; // the file of the heap + private final kelondroByteOrder ordering; // the ordering on keys + private kelondroFileRA file; // a random access to the file + private HashMap buffer; // a write buffer to limit IO to the file; attention: Maps cannot use byte[] as key + private int buffersize; // bytes that are buffered in buffer + private int buffermax; // maximum size of the buffer /* * This class implements a BLOB management based on a sequence of records in a random access file @@ -76,13 +80,16 @@ public final class kelondroBLOBHeap implements kelondroBLOB { * @param ordering * @throws IOException */ - public kelondroBLOBHeap(final File heapFile, final int keylength, final kelondroByteOrder ordering) throws IOException { + public kelondroBLOBHeap(final File heapFile, final int keylength, final kelondroByteOrder ordering, int buffermax) throws IOException { this.ordering = ordering; this.heapFile = heapFile; + this.buffermax = buffermax; this.index = null; // will be created as result of initialization process this.free = new TreeMap(); - this.file = new RandomAccessFile(heapFile, "rw"); + this.buffer = new HashMap(); + this.buffersize = 0; + this.file = new kelondroFileRA(heapFile); byte[] key = new byte[keylength]; int reclen; long seek = 0; @@ -105,7 +112,7 @@ public final class kelondroBLOBHeap implements kelondroBLOB { } // read key - file.readFully(key); + file.readFully(key, 0, key.length); } catch (final IOException e) { // EOF reached @@ -145,7 +152,7 @@ public final class kelondroBLOBHeap implements kelondroBLOB { lastFree.setValue(lastFree.getValue() + nextFree.getValue() + 4); // this updates also the free map file.writeInt(lastFree.getValue()); file.seek(nextFree.getKey()); - file.write(0);file.write(0);file.write(0);file.write(0); + file.writeInt(0); i.remove(); merged++; } else { @@ -190,7 +197,7 @@ public final class kelondroBLOBHeap implements kelondroBLOB { * @return the number of BLOBs in the heap */ public synchronized int size() { - return this.index.size(); + return this.index.size() + this.buffer.size(); } public kelondroByteOrder ordering() { @@ -206,7 +213,10 @@ public final class kelondroBLOBHeap implements kelondroBLOB { assert index != null; assert index.row().primaryKeyLength == key.length : index.row().primaryKeyLength + "!=" + key.length; - // check if the index contains the key + // check the buffer + if (this.buffer.containsKey(new String(key))) return true; + + // check if the file index contains the key try { return index.getl(key) >= 0; } catch (final IOException e) { @@ -222,26 +232,55 @@ public final class kelondroBLOBHeap implements kelondroBLOB { * @throws IOException */ private void add(final byte[] key, final byte[] blob) throws IOException { - add(key, blob, 0, blob.length); + assert blob.length > 0; + assert index.row().primaryKeyLength == key.length : index.row().primaryKeyLength + "!=" + key.length; + if ((blob == null) || (blob.length == 0)) return; + final int pos = (int) file.length(); + file.seek(pos); + file.writeInt(key.length + blob.length); + file.write(key); + file.write(blob, 0, blob.length); + index.putl(key, pos); } /** - * add a BLOB to the heap: this adds the blob always to the end of the file - * @param key - * @param blob + * flush the buffer completely + * this is like adding all elements of the buffer, but it needs only one IO access * @throws IOException */ - private void add(final byte[] key, final byte[] blob, final int offset, final int len) throws IOException { - assert len > 0; - assert index.row().primaryKeyLength == key.length : index.row().primaryKeyLength + "!=" + key.length; - assert blob == null || blob.length - offset >= len; - if ((blob == null) || (blob.length == 0)) return; + private void flushBuffer() throws IOException { + // check size of buffer + Iterator> i = this.buffer.entrySet().iterator(); + int l = 0; + while (i.hasNext()) l += i.next().getValue().length; + assert l == this.buffersize; + + // append all contents of the buffer into one byte[] + i = this.buffer.entrySet().iterator(); final int pos = (int) file.length(); - file.seek(file.length()); - file.writeInt(len + key.length); - file.write(key); - file.write(blob, offset, len); - index.putl(key, pos); + int posFile = pos; + int posBuffer = 0; + byte[] ba = new byte[this.buffersize + (4 + this.index.row().primaryKeyLength) * this.buffer.size()]; + Map.Entry entry; + byte[] key, blob, b; + while (i.hasNext()) { + entry = i.next(); + key = entry.getKey().getBytes(); + blob = entry.getValue(); + index.putl(key, posFile); + b = kelondroAbstractRA.int2array(key.length + blob.length); + assert b.length == 4; + System.arraycopy(b, 0, ba, posBuffer, 4); + System.arraycopy(key, 0, ba, posBuffer + 4, key.length); + System.arraycopy(blob, 0, ba, posBuffer + 4 + key.length, blob.length); + posFile += 4 + key.length + blob.length; + posBuffer += 4 + key.length + blob.length; + } + assert ba.length == posBuffer; // must fit exactly + this.file.seek(pos); + this.file.write(ba); + this.buffer.clear(); + this.buffersize = 0; } /** @@ -253,6 +292,10 @@ public final class kelondroBLOBHeap implements kelondroBLOB { public synchronized byte[] get(final byte[] key) throws IOException { assert index.row().primaryKeyLength == key.length : index.row().primaryKeyLength + "!=" + key.length; + // check the buffer + byte[] blob = this.buffer.get(new String(key)); + if (blob != null) return blob; + // check if the index contains the key final long pos = index.getl(key); if (pos < 0) return null; @@ -263,15 +306,15 @@ public final class kelondroBLOBHeap implements kelondroBLOB { if (serverMemory.available() < len) { if (!serverMemory.request(len, false)) return null; // not enough memory available for this blob } - final byte[] blob = new byte[len]; // read the key final byte[] keyf = new byte[index.row().primaryKeyLength]; - file.readFully(keyf); + file.readFully(keyf, 0, keyf.length); assert this.ordering.compare(key, keyf) == 0; // read the blob - file.readFully(blob); + blob = new byte[len]; + file.readFully(blob, 0, blob.length); return blob; } @@ -285,6 +328,10 @@ public final class kelondroBLOBHeap implements kelondroBLOB { public long length(byte[] key) throws IOException { assert index.row().primaryKeyLength == key.length : index.row().primaryKeyLength + "!=" + key.length; + // check the buffer + byte[] blob = this.buffer.get(new String(key)); + if (blob != null) return blob.length; + // check if the index contains the key final long pos = index.getl(key); if (pos < 0) return -1; @@ -299,15 +346,17 @@ public final class kelondroBLOBHeap implements kelondroBLOB { * @throws IOException */ public synchronized void clear() throws IOException { - index.clear(); - free.clear(); + this.buffer.clear(); + this.buffersize = 0; + this.index.clear(); + this.free.clear(); try { - file.close(); + this.file.close(); } catch (final IOException e) { e.printStackTrace(); } this.heapFile.delete(); - this.file = new RandomAccessFile(heapFile, "rw"); + this.file = new kelondroFileRA(heapFile); } /** @@ -316,6 +365,11 @@ public final class kelondroBLOBHeap implements kelondroBLOB { */ public synchronized void close() { shrinkWithGapsAtEnd(); + try { + flushBuffer(); + } catch (IOException e) { + e.printStackTrace(); + } index.close(); free.clear(); try { @@ -348,82 +402,109 @@ public final class kelondroBLOBHeap implements kelondroBLOB { // we do not write records of length 0 into the BLOB if (b.length == 0) return; - // first remove the old entry + // first remove the old entry (removes from buffer and file) this.remove(key); // then look if we can use a free entry - if (this.free.size() > 0) { - // find the largest entry - long lseek = -1; - int lsize = 0; - final int reclen = b.length + index.row().primaryKeyLength; - Map.Entry entry; - Iterator> i = this.free.entrySet().iterator(); - while (i.hasNext()) { - entry = i.next(); - if (entry.getValue().intValue() == reclen) { - // we found an entry that has exactly the size that we need! - // we use that entry and stop looking for a larger entry - file.seek(entry.getKey()); - final int reclenf = file.readInt(); - assert reclenf == reclen; - file.write(key); - file.write(b); - - // add the entry to the index - this.index.putl(key, entry.getKey()); - - // remove the entry from the free list - i.remove(); - - //System.out.println("*** DEBUG BLOB: replaced-fit record at " + entry.seek + ", reclen=" + reclen + ", key=" + new String(key)); - - // finished! - return; - } - // look for the biggest size - if (entry.getValue() > lsize) { - lseek = entry.getKey(); - lsize = entry.getValue(); - } + if (putToGap(key, b)) return; + + // if there is not enough space in the buffer, flush all + if (this.buffersize + b.length > buffermax) { + // this is too big. Flush everything + shrinkWithGapsAtEnd(); + flushBuffer(); + if (b.length > buffermax) { + this.add(key, b); + } else { + this.buffer.put(new String(key), b); + this.buffersize += b.length; } - - // check if the found entry is large enough - if (lsize > reclen + 4) { - // split the free entry into two new entries - // if would be sufficient if lsize = reclen + 4, but this would mean to create - // an empty entry with zero next bytes for BLOB and key, which is not very good for the - // data structure in the file - - // write the new entry - file.seek(lseek); - file.writeInt(reclen); + return; + } + + // add entry to buffer + this.buffer.put(new String(key), b); + this.buffersize += b.length; + } + + private boolean putToGap(final byte[] key, final byte[] b) throws IOException { + assert index.row().primaryKeyLength == key.length : index.row().primaryKeyLength + "!=" + key.length; + + // we do not write records of length 0 into the BLOB + if (b.length == 0) return true; + + // then look if we can use a free entry + if (this.free.size() == 0) return false; + + // find the largest entry + long lseek = -1; + int lsize = 0; + final int reclen = b.length + index.row().primaryKeyLength; + Map.Entry entry; + Iterator> i = this.free.entrySet().iterator(); + while (i.hasNext()) { + entry = i.next(); + if (entry.getValue().intValue() == reclen) { + // we found an entry that has exactly the size that we need! + // we use that entry and stop looking for a larger entry + file.seek(entry.getKey()); + final int reclenf = file.readInt(); + assert reclenf == reclen; file.write(key); file.write(b); - // add the index to the new entry - index.putl(key, lseek); - - // define the new empty entry - final int newfreereclen = lsize - reclen - 4; - assert newfreereclen > 0; - file.writeInt(newfreereclen); - - // remove the old free entry - this.free.remove(lseek); + // add the entry to the index + this.index.putl(key, entry.getKey()); - // add a new free entry - this.free.put(lseek + 4 + reclen, newfreereclen); + // remove the entry from the free list + i.remove(); - //System.out.println("*** DEBUG BLOB: replaced-split record at " + lseek + ", reclen=" + reclen + ", new reclen=" + newfreereclen + ", key=" + new String(key)); + //System.out.println("*** DEBUG BLOB: replaced-fit record at " + entry.seek + ", reclen=" + reclen + ", key=" + new String(key)); // finished! - return; + return true; + } + // look for the biggest size + if (entry.getValue() > lsize) { + lseek = entry.getKey(); + lsize = entry.getValue(); } } - // if there is no free entry or no free entry is large enough, append the entry at the end of the file - this.add(key, b); + // check if the found entry is large enough + if (lsize > reclen + 4) { + // split the free entry into two new entries + // if would be sufficient if lsize = reclen + 4, but this would mean to create + // an empty entry with zero next bytes for BLOB and key, which is not very good for the + // data structure in the file + + // write the new entry + file.seek(lseek); + file.writeInt(reclen); + file.write(key); + file.write(b); + + // add the index to the new entry + index.putl(key, lseek); + + // define the new empty entry + final int newfreereclen = lsize - reclen - 4; + assert newfreereclen > 0; + file.writeInt(newfreereclen); + + // remove the old free entry + this.free.remove(lseek); + + // add a new free entry + this.free.put(lseek + 4 + reclen, newfreereclen); + + //System.out.println("*** DEBUG BLOB: replaced-split record at " + lseek + ", reclen=" + reclen + ", new reclen=" + newfreereclen + ", key=" + new String(key)); + + // finished! + return true; + } + // could not insert to gap + return false; } /** @@ -434,6 +515,13 @@ public final class kelondroBLOBHeap implements kelondroBLOB { public synchronized void remove(final byte[] key) throws IOException { assert index.row().primaryKeyLength == key.length : index.row().primaryKeyLength + "!=" + key.length; + // check the buffer + byte[] blob = this.buffer.remove(new String(key)); + if (blob != null) { + this.buffersize -= blob.length; + return; + } + // check if the index contains the key final long seek = index.getl(key); if (seek < 0) return; @@ -452,7 +540,9 @@ public final class kelondroBLOBHeap implements kelondroBLOB { this.free.put(seek, size); // fill zeros to the content - int l = size; while (l-- > 0) this.file.write(0); + int l = size; byte[] fill = new byte[size]; + while (l-- > 0) fill[l] = 0; + this.file.write(fill, 0, size); // remove entry from index this.index.removel(key); @@ -508,7 +598,9 @@ public final class kelondroBLOBHeap implements kelondroBLOB { } else { // check if this is a true gap! this.file.seek(nextSeek + 4); - int t = this.file.read(); + byte[] o = new byte[1]; + this.file.readFully(o, 0, 1); + int t = o[0]; assert t == 0; if (t == 0) { // the nextRecord is a gap record; we remove that from the free list because it will be joined with the current gap @@ -529,7 +621,7 @@ public final class kelondroBLOBHeap implements kelondroBLOB { // overwrite the size bytes of next records with zeros this.file.seek(seek1); - this.file.write(0);this.file.write(0);this.file.write(0);this.file.write(0); + this.file.writeInt(0); // the new size of the current gap: old size + len + 4 int newSize = size0 + 4 + size1; @@ -554,7 +646,7 @@ public final class kelondroBLOBHeap implements kelondroBLOB { this.free.remove(seek); } } catch (IOException e) { - // do nothing + e.printStackTrace(); } } @@ -581,14 +673,14 @@ public final class kelondroBLOBHeap implements kelondroBLOB { } public long length() throws IOException { - return this.heapFile.length(); + return this.heapFile.length() + this.buffersize; } - public static void main(final String[] args) { + public static void heaptest() { final File f = new File("/Users/admin/blobtest.heap"); try { //f.delete(); - final kelondroBLOBHeap heap = new kelondroBLOBHeap(f, 12, kelondroNaturalOrder.naturalOrder); + final kelondroBLOBHeap heap = new kelondroBLOBHeap(f, 12, kelondroNaturalOrder.naturalOrder, 1024 * 512); heap.put("aaaaaaaaaaaa".getBytes(), "eins zwei drei".getBytes()); heap.put("aaaaaaaaaaab".getBytes(), "vier fuenf sechs".getBytes()); heap.put("aaaaaaaaaaac".getBytes(), "sieben acht neun".getBytes()); @@ -611,4 +703,33 @@ public final class kelondroBLOBHeap implements kelondroBLOB { } } + private static Map map(String a, String b) { + HashMap m = new HashMap(); + m.put(a, b); + return m; + } + + public static void maptest() { + final File f = new File("/Users/admin/blobtest.heap"); + try { + //f.delete(); + final kelondroMap heap = new kelondroMap(new kelondroBLOBHeap(f, 12, kelondroNaturalOrder.naturalOrder, 1024 * 512), 500); + heap.put("aaaaaaaaaaaa", map("aaaaaaaaaaaa", "eins zwei drei")); + heap.put("aaaaaaaaaaab", map("aaaaaaaaaaab", "vier fuenf sechs")); + heap.put("aaaaaaaaaaac", map("aaaaaaaaaaac", "sieben acht neun")); + heap.put("aaaaaaaaaaad", map("aaaaaaaaaaad", "zehn elf zwoelf")); + heap.remove("aaaaaaaaaaab"); + heap.remove("aaaaaaaaaaac"); + heap.put("aaaaaaaaaaaX", map("aaaaaaaaaaad", "WXYZ")); + heap.close(); + } catch (final IOException e) { + e.printStackTrace(); + } + } + + public static void main(final String[] args) { + //heaptest(); + maptest(); + } + } diff --git a/source/de/anomic/kelondro/kelondroBufferedIOChunks.java b/source/de/anomic/kelondro/kelondroBufferedIOChunks.java index 6deb1e5ee..a1511b585 100644 --- a/source/de/anomic/kelondro/kelondroBufferedIOChunks.java +++ b/source/de/anomic/kelondro/kelondroBufferedIOChunks.java @@ -80,6 +80,7 @@ public final class kelondroBufferedIOChunks extends kelondroAbstractIOChunks imp assert (b.length >= off + len): "write pos=" + pos + ", b.length=" + b.length + ", b='" + new String(b) + "', off=" + off + ", len=" + len; //assert pos <= this.ra.length(): "pos = " + pos + ", this.ra.length() = " + this.ra.length(); + if (len == 0) return; if (pos >= this.ra.length()) { // the position is fully outside of the file if (pos - this.ra.length() + len > this.buffer.length) { @@ -107,10 +108,11 @@ public final class kelondroBufferedIOChunks extends kelondroAbstractIOChunks imp } public synchronized void commit() throws IOException { + this.lastCommit = System.currentTimeMillis(); + if (this.bufferSize == 0) return; this.ra.seek(this.ra.length()); // move to end of file this.ra.write(this.buffer, 0, this.bufferSize); this.bufferSize = 0; - this.lastCommit = System.currentTimeMillis(); } public synchronized void close() throws IOException { diff --git a/source/de/anomic/kelondro/kelondroCachedRA.java b/source/de/anomic/kelondro/kelondroCachedRA.java deleted file mode 100644 index dceec32e2..000000000 --- a/source/de/anomic/kelondro/kelondroCachedRA.java +++ /dev/null @@ -1,189 +0,0 @@ -// kelondroCachedRA.java -// ----------------------- -// part of The Kelondro Database -// (C) by Michael Peter Christen; mc@yacy.net -// first published on http://yacy.net -// Frankfurt, Germany, 2004-2008 -// last major change: 04.12.2008 -// -// This program is free software; you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation; either version 2 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License -// along with this program; if not, write to the Free Software -// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - -package de.anomic.kelondro; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; - -import de.anomic.server.serverMemory; - -public class kelondroCachedRA extends kelondroAbstractRA implements kelondroRA { - - // a shared cache for all users of this class - private static final int elementsize = 8192; - private static final int remainingfree = 30 * 1024 * 1024; - private static HashMap cacheMemory = new HashMap(); - - // class variables - protected kelondroRA ra; - private long seekpos; - private String id; - - public kelondroCachedRA(final kelondroRA ra) { - this.ra = ra; - this.name = ra.name(); - this.file = ra.file(); - this.id = file.toString(); - this.seekpos = 0; - } - - public synchronized long length() throws IOException { - return ra.length(); - } - - public synchronized int available() throws IOException { - return (int) (ra.length() - seekpos); - } - - private int cacheElementNumber(final long address) { - return (int) address / elementsize; - } - - private int cacheElementOffset(final long address) { - return (int) address % elementsize; - } - - private byte[] readCache(final int cacheNr) throws IOException { - String key = this.id + cacheNr; - byte[] cache = cacheMemory.get(key); - if (cache == null) { - if (serverMemory.available() < remainingfree) { - // delete elements in buffer if buffer too big - synchronized(cacheMemory) { - Iterator> i = cacheMemory.entrySet().iterator(); - for (int j = 0; j < 10; j++) { - if (!i.hasNext()) break; - i.next(); - i.remove(); - } - } - } - // check if we have enough space in the file to read a complete cache element - long seek = cacheNr * (long) elementsize; - if (ra.length() - seek < elementsize) return null; - // add new element - cache = new byte[elementsize]; - ra.seek(seek); - ra.readFully(cache, 0, elementsize); - cacheMemory.put(key, cache); - } - return cache; - } - - private boolean existCache(final int cacheNr) throws IOException { - return cacheMemory.containsKey(Integer.valueOf(cacheNr)); - } - - public synchronized void readFully(byte[] b, int off, int len) throws IOException { - final int bn1 = cacheElementNumber(seekpos); - final int bn2 = cacheElementNumber(seekpos + len - 1); - final int offset = cacheElementOffset(seekpos); - final byte[] cache = readCache(bn1); - if (bn1 == bn2) { - // simple case - if (cache == null) { - ra.seek(seekpos); - ra.readFully(b, off, len); - seekpos += len; - return; - } - //System.out.println("cache hit"); - System.arraycopy(cache, offset, b, off, len); - seekpos += len; - return; - } - assert cache != null; - - // do recursively - final int thislen = elementsize - offset; - System.arraycopy(cache, offset, b, off, thislen); - seekpos += thislen; - readFully(b, off + thislen, len - thislen); - } - - public synchronized void write(final byte[] b, final int off, final int len) throws IOException { - final int bn1 = cacheElementNumber(seekpos); - final int bn2 = cacheElementNumber(seekpos + len - 1); - final int offset = cacheElementOffset(seekpos); - if (bn1 == bn2) { - if (existCache(bn1)) { - // write to cache and file; here: write only to cache - final byte[] cache = readCache(bn1); - assert cache != null; - System.arraycopy(b, off, cache, offset, len); - } else { - // in case that the cache could be filled completely - // create a new entry here and store it also to the cache - if (offset == 0 && len >= elementsize) { - final byte[] cache = new byte[elementsize]; - System.arraycopy(b, off, cache, 0, elementsize); - cacheMemory.put(this.id + bn1, cache); - } - } - // write to file - ra.seek(seekpos); - ra.write(b, off, len); - seekpos += len; - return; - } - - // do recursively - final int thislen = elementsize - offset; - if (existCache(bn1)) { - // write to cache and file; here: write only to cache - final byte[] cache = readCache(bn1); - assert cache != null; - System.arraycopy(b, off, cache, offset, thislen); - } else { - // in case that the cache could be filled completely - // create a new entry here and store it also to the cache - if (offset == 0 && len >= elementsize) { - final byte[] cache = new byte[elementsize]; - System.arraycopy(b, off, cache, 0, elementsize); - cacheMemory.put(this.id + bn1, cache); - } - } - // write to file - ra.seek(seekpos); - ra.write(b, off, thislen); - seekpos += thislen; - write(b, off + thislen, len - thislen); - } - - public synchronized void seek(final long pos) throws IOException { - seekpos = pos; - } - - public synchronized void close() throws IOException { - ra.close(); - } - - protected void finalize() { - try { - close(); - } catch (final IOException e) {} - } - -} diff --git a/source/de/anomic/kelondro/kelondroFileRA.java b/source/de/anomic/kelondro/kelondroFileRA.java index 35d30dd60..23f4cc4b6 100644 --- a/source/de/anomic/kelondro/kelondroFileRA.java +++ b/source/de/anomic/kelondro/kelondroFileRA.java @@ -2,9 +2,9 @@ // ----------------------- // part of The Kelondro Database // (C) by Michael Peter Christen; mc@yacy.net -// first published on http://www.anomic.de -// Frankfurt, Germany, 2004 -// last major change: 05.02.2004 +// first published on http://yacy.net +// Frankfurt, Germany, 2004-2008 +// last major change: 09.12.2008 // // This program is free software; you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by @@ -31,47 +31,98 @@ import java.util.Map; public final class kelondroFileRA extends kelondroAbstractRA implements kelondroRA { private RandomAccessFile RAFile; + private byte[] cache; + private long cachestart; + private int cachelen; public kelondroFileRA(final File file) throws IOException, FileNotFoundException { this.name = file.getName(); this.file = file; RAFile = new RandomAccessFile(file, "rw"); + cache = new byte[8192]; + cachestart = 0; + cachelen = 0; } - public long length() throws IOException { - return RAFile.length(); + public synchronized long length() throws IOException { + return this.RAFile.length(); } - public int available() throws IOException { - return (int) (RAFile.length() - RAFile.getFilePointer()); + public synchronized void setLength(long length) throws IOException { + cachelen = 0; + RAFile.setLength(length); } - // pseudo-native method read - public int read() throws IOException { - return RAFile.read(); + public synchronized int available() throws IOException { + return (int) (this.length() - RAFile.getFilePointer()); } - // pseudo-native method write - public void write(final int b) throws IOException { - RAFile.write(b); - } - - public final void readFully(final byte[] b, final int off, final int len) throws IOException { - RAFile.readFully(b, off, len); + public synchronized final void readFully(final byte[] b, final int off, int len) throws IOException { + long seek = RAFile.getFilePointer(); + if (cache != null && cachestart <= seek && cachelen - seek + cachestart >= len) { + // read from cache + //System.out.println("*** DEBUG FileRA " + this.file.getName() + ": CACHE HIT at " + seek); + System.arraycopy(cache, (int) (seek - cachestart), b, off, len); + RAFile.seek(seek + len); + return; + } + if (cache == null || cache.length < len) { + // cannot fill cache here + RAFile.readFully(b, off, len); + return; + } + // we fill the cache here + int available = (int) (this.RAFile.length() - seek); + if (available < len) throw new IOException("EOF, available = " + available + ", requested = " + len); + if (cachestart + cachelen == seek && cache.length - cachelen >= len) { + RAFile.readFully(cache, cachelen, len); + //System.out.println("*** DEBUG FileRA " + this.file.getName() + ": append fill " + len + " bytes"); + System.arraycopy(cache, cachelen, b, off, len); + cachelen += len; + } else { + // fill the cache as much as possible + int m = Math.min(available, cache.length); + RAFile.readFully(cache, 0, m); + cachestart = seek; + cachelen = m; + if (m != len) RAFile.seek(seek + len); + //System.out.println("*** DEBUG FileRA " + this.file.getName() + ": replace fill " + len + " bytes"); + System.arraycopy(cache, 0, b, off, len); + } + } - public void write(final byte[] b, final int off, final int len) throws IOException { + public synchronized void write(final byte[] b, final int off, final int len) throws IOException { + //assert len > 0; // write to file + //if (this.cache.length > 2048) this.cache = new byte[2048]; // the large cache is only useful during an initialization phase + long seekpos = this.RAFile.getFilePointer(); + if (this.cachelen + len <= this.cache.length && this.cachestart + this.cachelen == seekpos) { + // append to cache + System.arraycopy(b, off, this.cache, this.cachelen, len); + //System.out.println("*** DEBUG FileRA " + this.file.getName() + ": write append " + len + " bytes"); + this.cachelen += len; + } else if (len <= this.cache.length) { + // copy to cache + System.arraycopy(b, off, this.cache, 0, len); + //System.out.println("*** DEBUG FileRA " + this.file.getName() + ": write copy " + len + " bytes"); + this.cachelen = len; + this.cachestart = seekpos; + } else { + // delete cache + this.cachelen = 0; + } RAFile.write(b, off, len); } - public void seek(final long pos) throws IOException { + public synchronized void seek(final long pos) throws IOException { RAFile.seek(pos); } - public void close() throws IOException { + public synchronized void close() throws IOException { if (RAFile != null) RAFile.close(); - RAFile = null; + this.cache = null; + this.RAFile = null; } protected void finalize() throws Throwable { diff --git a/source/de/anomic/kelondro/kelondroMap.java b/source/de/anomic/kelondro/kelondroMap.java index 84e902364..1bbb66ccb 100644 --- a/source/de/anomic/kelondro/kelondroMap.java +++ b/source/de/anomic/kelondro/kelondroMap.java @@ -336,7 +336,7 @@ public class kelondroMap { if (f.exists()) f.delete(); try { // make a blob - kelondroBLOB blob = new kelondroBLOBHeap(f, 12, kelondroNaturalOrder.naturalOrder); + kelondroBLOB blob = new kelondroBLOBHeap(f, 12, kelondroNaturalOrder.naturalOrder, 1024 * 1024); // make map kelondroMap map = new kelondroMap(blob, 1024); // put some values into the map diff --git a/source/de/anomic/kelondro/kelondroRAIOChunks.java b/source/de/anomic/kelondro/kelondroRAIOChunks.java index b495b561b..44c51681f 100644 --- a/source/de/anomic/kelondro/kelondroRAIOChunks.java +++ b/source/de/anomic/kelondro/kelondroRAIOChunks.java @@ -42,11 +42,13 @@ public final class kelondroRAIOChunks extends kelondroAbstractIOChunks implement } public synchronized void readFully(long pos, final byte[] b, int off, int len) throws IOException { + if (len == 0) return; this.ra.seek(pos); this.ra.readFully(b, off, len); } public synchronized void write(final long pos, final byte[] b, final int off, final int len) throws IOException { + if (len == 0) return; this.ra.seek(pos); this.ra.write(b, off, len); } diff --git a/source/de/anomic/plasma/plasmaHTCache.java b/source/de/anomic/plasma/plasmaHTCache.java index 674139ede..7939fd93f 100644 --- a/source/de/anomic/plasma/plasmaHTCache.java +++ b/source/de/anomic/plasma/plasmaHTCache.java @@ -45,7 +45,7 @@ import de.anomic.http.httpResponseHeader; import de.anomic.index.indexDocumentMetadata; import de.anomic.kelondro.kelondroBLOB; import de.anomic.kelondro.kelondroBLOBArray; -import de.anomic.kelondro.kelondroBLOBBuffer; +import de.anomic.kelondro.kelondroBLOBCompressor; import de.anomic.kelondro.kelondroBLOBHeap; import de.anomic.kelondro.kelondroBase64Order; import de.anomic.kelondro.kelondroMap; @@ -62,7 +62,7 @@ public final class plasmaHTCache { public static final long oneday = 1000L * 60L * 60L * 24L; // milliseconds of a day private static kelondroMap responseHeaderDB = null; - private static kelondroBLOBBuffer fileDB = null; + private static kelondroBLOBCompressor fileDB = null; private static kelondroBLOBArray fileDBunbuffered = null; public static long maxCacheSize = 0l; @@ -130,15 +130,15 @@ public final class plasmaHTCache { final File dbfile = new File(cachePath, RESPONSE_HEADER_DB_NAME); kelondroBLOB blob = null; try { - blob = new kelondroBLOBHeap(dbfile, yacySeedDB.commonHashLength, kelondroBase64Order.enhancedCoder); + blob = new kelondroBLOBHeap(dbfile, yacySeedDB.commonHashLength, kelondroBase64Order.enhancedCoder, 1024 * 1024); } catch (final IOException e) { e.printStackTrace(); } responseHeaderDB = new kelondroMap(blob, 500); try { - fileDBunbuffered = new kelondroBLOBArray(new File(cachePath, FILE_DB_NAME), salt, 12, kelondroBase64Order.enhancedCoder); + fileDBunbuffered = new kelondroBLOBArray(new File(cachePath, FILE_DB_NAME), salt, 12, kelondroBase64Order.enhancedCoder, 1024 * 1024 * 2); fileDBunbuffered.setMaxSize(maxCacheSize); - fileDB = new kelondroBLOBBuffer(fileDBunbuffered, 2 * 1024 * 1024, true); + fileDB = new kelondroBLOBCompressor(fileDBunbuffered, 2 * 1024 * 1024); } catch (IOException e) { e.printStackTrace(); } diff --git a/source/de/anomic/yacy/yacySeedDB.java b/source/de/anomic/yacy/yacySeedDB.java index 19f5ef2c5..0b6edc1ef 100644 --- a/source/de/anomic/yacy/yacySeedDB.java +++ b/source/de/anomic/yacy/yacySeedDB.java @@ -240,12 +240,12 @@ public final class yacySeedDB implements httpdAlternativeDomainNames { initializeHandlerMethod = null; } try { - return new kelondroMapDataMining(new kelondroBLOBHeap(seedDBFile, commonHashLength, kelondroBase64Order.enhancedCoder), 500, sortFields, longaccFields, doubleaccFields, initializeHandlerMethod, this); + return new kelondroMapDataMining(new kelondroBLOBHeap(seedDBFile, commonHashLength, kelondroBase64Order.enhancedCoder, 1024 * 512), 500, sortFields, longaccFields, doubleaccFields, initializeHandlerMethod, this); } catch (final Exception e) { // try again seedDBFile.delete(); try { - return new kelondroMapDataMining(new kelondroBLOBHeap(seedDBFile, commonHashLength, kelondroBase64Order.enhancedCoder), 500, sortFields, longaccFields, doubleaccFields, initializeHandlerMethod, this); + return new kelondroMapDataMining(new kelondroBLOBHeap(seedDBFile, commonHashLength, kelondroBase64Order.enhancedCoder, 1024 * 512), 500, sortFields, longaccFields, doubleaccFields, initializeHandlerMethod, this); } catch (IOException e1) { e1.printStackTrace(); System.exit(-1);