From 9a90ea05e009d3290cd13bbb8e49932c4bceb33a Mon Sep 17 00:00:00 2001 From: orbiter Date: Wed, 18 Mar 2009 16:14:31 +0000 Subject: [PATCH] added a merge operation for IndexCell data structures git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@5727 6c8d7289-2bf4-0310-a012-ef5d649a1542 --- source/de/anomic/http/httpClient.java | 4 +- source/de/anomic/kelondro/blob/BLOB.java | 2 +- source/de/anomic/kelondro/blob/BLOBArray.java | 16 +- .../anomic/kelondro/blob/BLOBCompressor.java | 4 +- .../kelondro/blob/BLOBHeapModifier.java | 4 +- source/de/anomic/kelondro/blob/BLOBTree.java | 4 +- .../de/anomic/kelondro/blob/HeapReader.java | 4 +- .../de/anomic/kelondro/blob/HeapWriter.java | 20 ++- source/de/anomic/kelondro/blob/MapView.java | 2 +- .../kelondro/index/LongHandleIndex.java | 15 ++ .../de/anomic/kelondro/order/Base64Order.java | 45 +++--- .../de/anomic/kelondro/text/IndexBuffer.java | 2 +- source/de/anomic/kelondro/text/IndexCell.java | 16 +- .../text/ReferenceContainerArray.java | 139 ++++++++++++++---- .../text/ReferenceContainerCache.java | 111 +++----------- source/de/anomic/plasma/plasmaHTCache.java | 2 +- source/de/anomic/plasma/plasmaWordIndex.java | 2 +- .../yacy/dht/FlatWordPartitionScheme.java | 4 +- 18 files changed, 217 insertions(+), 179 deletions(-) diff --git a/source/de/anomic/http/httpClient.java b/source/de/anomic/http/httpClient.java index 9f5a6c750..6afaa8034 100644 --- a/source/de/anomic/http/httpClient.java +++ b/source/de/anomic/http/httpClient.java @@ -99,9 +99,9 @@ public class httpClient { conManager.getParams().setConnectionTimeout(60000); // set a default timeout conManager.getParams().setDefaultMaxConnectionsPerHost(3); // prevent DoS by mistake localHostConfiguration.setHost("localhost"); - conManager.getParams().setMaxConnectionsPerHost(localHostConfiguration, 10); + conManager.getParams().setMaxConnectionsPerHost(localHostConfiguration, 100); localHostConfiguration.setHost("127.0.0.1"); - conManager.getParams().setMaxConnectionsPerHost(localHostConfiguration, 10); + conManager.getParams().setMaxConnectionsPerHost(localHostConfiguration, 100); // TODO should this be configurable? // accept self-signed or untrusted certificates diff --git a/source/de/anomic/kelondro/blob/BLOB.java b/source/de/anomic/kelondro/blob/BLOB.java index 78dea244e..1da391761 100644 --- a/source/de/anomic/kelondro/blob/BLOB.java +++ b/source/de/anomic/kelondro/blob/BLOB.java @@ -146,7 +146,7 @@ public interface BLOB { /** * close the BLOB table */ - public void close(); + public void close(boolean writeIDX); public interface Rewriter { diff --git a/source/de/anomic/kelondro/blob/BLOBArray.java b/source/de/anomic/kelondro/blob/BLOBArray.java index c13ada8a3..e03af11aa 100755 --- a/source/de/anomic/kelondro/blob/BLOBArray.java +++ b/source/de/anomic/kelondro/blob/BLOBArray.java @@ -147,14 +147,14 @@ public class BLOBArray implements BLOB { blobs.add(new blobItem(d, location, oneBlob)); } - public void unmountBLOB(File location) { + public void unmountBLOB(File location, boolean writeIDX) { Iterator i = this.blobs.iterator(); blobItem b; while (i.hasNext()) { b = i.next(); if (b.location.equals(location)) { i.remove(); - b.blob.close(); + b.blob.close(writeIDX); return; } } @@ -163,7 +163,7 @@ public class BLOBArray implements BLOB { public File unmountOldestBLOB() { if (this.blobs.size() == 0) return null; blobItem b = this.blobs.remove(0); - b.blob.close(); + b.blob.close(false); return b.location; } @@ -207,7 +207,7 @@ public class BLOBArray implements BLOB { while (blobs.size() > 0 && System.currentTimeMillis() - blobs.get(0).creation.getTime() - this.fileAgeLimit > this.repositoryAgeMax) { // too old blobItem oldestBLOB = blobs.remove(0); - oldestBLOB.blob.close(); + oldestBLOB.blob.close(false); if (!oldestBLOB.location.delete()) oldestBLOB.location.deleteOnExit(); } @@ -215,7 +215,7 @@ public class BLOBArray implements BLOB { while (blobs.size() > 0 && length() > this.repositorySizeMax) { // too large blobItem oldestBLOB = blobs.remove(0); - oldestBLOB.blob.close(); + oldestBLOB.blob.close(false); if (!oldestBLOB.location.delete()) oldestBLOB.location.deleteOnExit(); } } @@ -417,8 +417,8 @@ public class BLOBArray implements BLOB { /** * close the BLOB */ - public void close() { - for (blobItem bi: blobs) bi.blob.close(); + public void close(boolean writeIDX) { + for (blobItem bi: blobs) bi.blob.close(writeIDX); blobs.clear(); blobs = null; } @@ -441,7 +441,7 @@ public class BLOBArray implements BLOB { heap.remove("aaaaaaaaaaab".getBytes()); heap.remove("aaaaaaaaaaac".getBytes()); heap.put("aaaaaaaaaaaX".getBytes(), "WXYZ".getBytes()); - heap.close(); + heap.close(true); } catch (final IOException e) { e.printStackTrace(); } diff --git a/source/de/anomic/kelondro/blob/BLOBCompressor.java b/source/de/anomic/kelondro/blob/BLOBCompressor.java index a201f1d2b..025363808 100644 --- a/source/de/anomic/kelondro/blob/BLOBCompressor.java +++ b/source/de/anomic/kelondro/blob/BLOBCompressor.java @@ -77,14 +77,14 @@ public class BLOBCompressor extends Thread implements BLOB { return this.backend.ordering(); } - public synchronized void close() { + public synchronized void close(boolean writeIDX) { // no more thread is running, flush all queues try { flushAll(); } catch (IOException e) { e.printStackTrace(); } - this.backend.close(); + this.backend.close(writeIDX); } private byte[] compress(byte[] b) { diff --git a/source/de/anomic/kelondro/blob/BLOBHeapModifier.java b/source/de/anomic/kelondro/blob/BLOBHeapModifier.java index e2c40ceeb..ef1fc2982 100644 --- a/source/de/anomic/kelondro/blob/BLOBHeapModifier.java +++ b/source/de/anomic/kelondro/blob/BLOBHeapModifier.java @@ -102,7 +102,7 @@ public class BLOBHeapModifier extends HeapReader implements BLOB { /** * close the BLOB table */ - public synchronized void close() { + public synchronized void close(boolean writeIDX) { shrinkWithGapsAtEnd(); if (file != null) { try { @@ -113,7 +113,7 @@ public class BLOBHeapModifier extends HeapReader implements BLOB { } file = null; - if (index != null && free != null && (index.size() > 3 || free.size() > 3)) { + if (writeIDX && index != null && free != null && (index.size() > 3 || free.size() > 3)) { // now we can create a dump of the index and the gap information // to speed up the next start try { diff --git a/source/de/anomic/kelondro/blob/BLOBTree.java b/source/de/anomic/kelondro/blob/BLOBTree.java index 035b2337f..d9bee0e91 100644 --- a/source/de/anomic/kelondro/blob/BLOBTree.java +++ b/source/de/anomic/kelondro/blob/BLOBTree.java @@ -500,7 +500,7 @@ public class BLOBTree implements BLOB { } - public synchronized void close() { + public synchronized void close(boolean writeIDX) { index.close(); } @@ -518,7 +518,7 @@ public class BLOBTree implements BLOB { final Iterator i = kd.keys(true, false); while (i.hasNext()) System.out.println(new String(i.next())); - kd.close(); + kd.close(true); } catch (final IOException e) { e.printStackTrace(); } diff --git a/source/de/anomic/kelondro/blob/HeapReader.java b/source/de/anomic/kelondro/blob/HeapReader.java index 9cb123ccd..a16688610 100644 --- a/source/de/anomic/kelondro/blob/HeapReader.java +++ b/source/de/anomic/kelondro/blob/HeapReader.java @@ -82,10 +82,10 @@ public class HeapReader { } } if (!ok) { - Log.logWarning("kelondroBLOBHeap", "verification of idx file for " + heapFile.toString() + " failed, re-building index"); + Log.logWarning("HeapReader", "verification of idx file for " + heapFile.toString() + " failed, re-building index"); initIndexReadFromHeap(); } else { - Log.logInfo("kelondroBLOBHeap", "using a dump of the index of " + heapFile.toString() + "."); + Log.logInfo("HeapReader", "using a dump of the index of " + heapFile.toString() + "."); } } else { // if we did not have a dump, create a new index diff --git a/source/de/anomic/kelondro/blob/HeapWriter.java b/source/de/anomic/kelondro/blob/HeapWriter.java index 08bd6b9e5..531cbad42 100644 --- a/source/de/anomic/kelondro/blob/HeapWriter.java +++ b/source/de/anomic/kelondro/blob/HeapWriter.java @@ -37,11 +37,12 @@ import de.anomic.kelondro.util.Log; public final class HeapWriter { - private int keylength; // the length of the primary key + private int keylength; // the length of the primary key private LongHandleIndex index; // key/seek relation for used records - private final File heapFile; // the file of the heap - private DataOutputStream os; // the output stream where the BLOB is written - private long seek; // the current write position + private final File heapFile; // the file of the heap + private DataOutputStream os; // the output stream where the BLOB is written + private long seek; // the current write position + //private HashSet doublecheck;// only for testing /* * This class implements a BLOB management based on a sequence of records @@ -73,6 +74,7 @@ public final class HeapWriter { this.keylength = keylength; this.index = new LongHandleIndex(keylength, ordering, 10, 100000); this.os = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(heapFile), 1024 * 1024)); + //this.doublecheck = new HashSet(); this.seek = 0; } @@ -83,17 +85,19 @@ public final class HeapWriter { * @param blob * @throws IOException */ - public void add(final byte[] key, final byte[] blob) throws IOException { + public synchronized void add(final byte[] key, final byte[] blob) throws IOException { + //System.out.println("HeapWriter.add: " + new String(key)); assert blob.length > 0; assert key.length == this.keylength; assert index.row().primaryKeyLength == key.length : index.row().primaryKeyLength + "!=" + key.length; - assert index.get(key) < 0; // must not occur before + assert index.get(key) < 0 : "index.get(key) = " + index.get(key) + ", index.size() = " + index.size() + ", file.length() = " + this.heapFile.length() + ", key = " + new String(key); // must not occur before if ((blob == null) || (blob.length == 0)) return; int chunkl = key.length + blob.length; os.writeInt(chunkl); os.write(key); os.write(blob); index.putUnique(key, seek); + //assert (this.doublecheck.add(new String(key))) : "doublecheck failed for " + new String(key); this.seek += chunkl + 4; } @@ -122,7 +126,7 @@ public final class HeapWriter { * close the BLOB table * @throws */ - public synchronized void close() { + public synchronized void close(boolean writeIDX) { try { os.flush(); os.close(); @@ -131,7 +135,7 @@ public final class HeapWriter { } os = null; - if (index.size() > 3) { + if (writeIDX && index.size() > 3) { // now we can create a dump of the index and the gap information // to speed up the next start try { diff --git a/source/de/anomic/kelondro/blob/MapView.java b/source/de/anomic/kelondro/blob/MapView.java index 8748da2b8..65ac30835 100644 --- a/source/de/anomic/kelondro/blob/MapView.java +++ b/source/de/anomic/kelondro/blob/MapView.java @@ -296,7 +296,7 @@ public class MapView { cacheScore = null; // close file - blob.close(); + blob.close(true); } public class objectIterator implements Iterator> { diff --git a/source/de/anomic/kelondro/index/LongHandleIndex.java b/source/de/anomic/kelondro/index/LongHandleIndex.java index 37708d780..e29de8256 100644 --- a/source/de/anomic/kelondro/index/LongHandleIndex.java +++ b/source/de/anomic/kelondro/index/LongHandleIndex.java @@ -42,6 +42,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import de.anomic.kelondro.order.Base64Order; import de.anomic.kelondro.order.ByteOrder; import de.anomic.kelondro.order.CloneableIterator; @@ -310,4 +311,18 @@ public class LongHandleIndex { } } + + public static void main(String[] args) { + LongHandleIndex idx = new LongHandleIndex(12, Base64Order.enhancedCoder, 10000, 10000000); + byte[] s; + //long l; + for (int i = 0; i < 10000000; i = i + 8) { + s = Base64Order.enhancedCoder.uncardinal(Long.MAX_VALUE - i); + //l = Base64Order.enhancedCoder.cardinal(s); + //if (i != l) System.out.println("encoding bug for " + new String(s) + ", v = " + (Long.MAX_VALUE - i) + ", l = " + l); + //System.out.println(s); + if (idx.get(s) >= 0) System.out.println("search bug for " + new String(s) + ": " + idx.get(s)); + idx.putUnique(s, 1); + } + } } diff --git a/source/de/anomic/kelondro/order/Base64Order.java b/source/de/anomic/kelondro/order/Base64Order.java index 055cb27d3..a6f8972ec 100644 --- a/source/de/anomic/kelondro/order/Base64Order.java +++ b/source/de/anomic/kelondro/order/Base64Order.java @@ -253,24 +253,6 @@ public class Base64Order extends AbstractOrder implements ByteOrder, Cod } } - private final long cardinalI(final byte[] key, int off, int len) { - // returns a cardinal number in the range of 0 .. Long.MAX_VALUE - long c = 0; - int lim = off + Math.min(10, len); - int lim10 = off + 10; - byte b; - while (off < lim) { - b = key[off++]; - if (b < 0) return -1; - b = ahpla[b]; - if (b < 0) return -1; - c = (c << 6) | b; - } - while (off++ < lim10) c = (c << 6); - c = c << 3; - assert c >= 0; - return c; - } /* private final long cardinalI(final byte[] key) { @@ -292,21 +274,42 @@ public class Base64Order extends AbstractOrder implements ByteOrder, Cod while ((p < 10) && (p < key.length())) { b = ahpla[key.charAt(p++)]; if (b < 0) return -1; - c = (c << 6) |b; + c = (c << 6) | b; } while (p++ < 10) c = (c << 6); - c = c << 3; + c = (c << 3) | 7; assert c >= 0; return c; } + private final long cardinalI(final byte[] key, int off, int len) { + // returns a cardinal number in the range of 0 .. Long.MAX_VALUE + long c = 0; + int lim = off + Math.min(10, len); + int lim10 = off + 10; + byte b; + while (off < lim) { + b = key[off++]; + if (b < 0) return -1; + b = ahpla[b]; + if (b < 0) return -1; + c = (c << 6) | b; + } + while (off++ < lim10) c = (c << 6); + c = (c << 3) | 7; + assert c >= 0; + return c; + } + public final byte[] uncardinal(long c) { c = c >> 3; - byte[] b = new byte[10]; + byte[] b = new byte[12]; for (int p = 9; p >= 0; p--) { b[p] = (byte) alpha[(int) (c & 0x3fL)]; c = c >> 6; } + b[10] = (byte) alpha[0x3f]; + b[11] = (byte) alpha[0x3f]; return b; } diff --git a/source/de/anomic/kelondro/text/IndexBuffer.java b/source/de/anomic/kelondro/text/IndexBuffer.java index 199854a1b..0bae39cde 100644 --- a/source/de/anomic/kelondro/text/IndexBuffer.java +++ b/source/de/anomic/kelondro/text/IndexBuffer.java @@ -321,7 +321,7 @@ public final class IndexBuffer extends AbstractIndex implements Index, IndexRead // dump cache try { //heap.dumpold(this.oldDumpFile); - heap.dump(this.dumpFile); + heap.dump(this.dumpFile, true); } catch (final IOException e){ log.logSevere("unable to dump cache: " + e.getMessage(), e); } diff --git a/source/de/anomic/kelondro/text/IndexCell.java b/source/de/anomic/kelondro/text/IndexCell.java index 0d9c9477c..a5b1af20a 100644 --- a/source/de/anomic/kelondro/text/IndexCell.java +++ b/source/de/anomic/kelondro/text/IndexCell.java @@ -54,18 +54,20 @@ public final class IndexCell extends AbstractBufferedIndex implements BufferedIn // class variables private ReferenceContainerArray array; private ReferenceContainerCache ram; - private int maxRamEntries; + private int maxRamEntries, maxArrayFiles; public IndexCell( final File cellPath, final ByteOrder wordOrder, final Row payloadrow, - final int maxRamEntries + final int maxRamEntries, + final int maxArrayFiles ) throws IOException { this.array = new ReferenceContainerArray(cellPath, wordOrder, payloadrow); this.ram = new ReferenceContainerCache(payloadrow, wordOrder); this.ram.initWriteMode(); this.maxRamEntries = maxRamEntries; + this.maxArrayFiles = maxArrayFiles; } @@ -230,7 +232,7 @@ public final class IndexCell extends AbstractBufferedIndex implements BufferedIn public synchronized void close() { // dump the ram try { - this.ram.dump(this.array.newContainerBLOBFile()); + this.ram.dump(this.array.newContainerBLOBFile(), true); } catch (IOException e) { e.printStackTrace(); } @@ -256,15 +258,19 @@ public final class IndexCell extends AbstractBufferedIndex implements BufferedIn * cache control methods */ - private void cacheDump() throws IOException { + private synchronized void cacheDump() throws IOException { // dump the ram File dumpFile = this.array.newContainerBLOBFile(); - this.ram.dump(dumpFile); + this.ram.dump(dumpFile, true); // get a fresh ram cache this.ram = new ReferenceContainerCache(this.array.rowdef(), this.array.ordering()); this.ram.initWriteMode(); // add the dumped indexContainerBLOB to the array this.array.mountBLOBContainer(dumpFile); + int c = 0; + while (this.array.entries() > this.maxArrayFiles && c++ < 3) { + if (!this.array.mergeOldest()) break; + } } public void cleanupBuffer(int time) { diff --git a/source/de/anomic/kelondro/text/ReferenceContainerArray.java b/source/de/anomic/kelondro/text/ReferenceContainerArray.java index 87c8dce22..05601d0df 100644 --- a/source/de/anomic/kelondro/text/ReferenceContainerArray.java +++ b/source/de/anomic/kelondro/text/ReferenceContainerArray.java @@ -34,10 +34,12 @@ import java.util.List; import de.anomic.kelondro.blob.BLOB; import de.anomic.kelondro.blob.BLOBArray; +import de.anomic.kelondro.blob.HeapWriter; import de.anomic.kelondro.index.Row; import de.anomic.kelondro.index.RowSet; import de.anomic.kelondro.order.ByteOrder; import de.anomic.kelondro.order.CloneableIterator; +import de.anomic.kelondro.text.ReferenceContainerCache.blobFileEntries; public final class ReferenceContainerArray { @@ -68,7 +70,7 @@ public final class ReferenceContainerArray { } public synchronized void close() { - this.array.close(); + this.array.close(true); } public synchronized void clear() throws IOException { @@ -231,41 +233,114 @@ public final class ReferenceContainerArray { return c.exportCollection(); } } -/* - public int mergeOldest() { - if (this.array.entries() < 2) return 0; - File f1 = this.array.unmountOldestBLOB(); - File f2 = this.array.unmountOldestBLOB(); - // iterate both files and write a new one - new kelondroMergeIterator( - (kelondroCloneableIterator>) new kelondroBLOBHeapReader.entries(f1, this.payloadrow.objectsize), - null, - null, - null, - true); - return 0; - } - */ - /* - * new kelondroMergeIterator( - new kelondroBLOBHeapReader.entries(f1, this.payloadrow.objectsize), - new kelondroBLOBHeapReader.entries(f2, this.payloadrow.objectsize), - this.payloadrow.getOrdering(), - indexContainer.containerMergeMethod, - true); - */ - /* - public kelondroMergeIterator( - final kelondroCloneableIterator a, - final kelondroCloneableIterator b, - final Comparator c, - final Method m, final boolean up) { - */ - + public interface ContainerRewriter { public ReferenceContainer rewrite(ReferenceContainer container); } + + public int entries() { + return this.array.entries(); + } + + public synchronized boolean mergeOldest() throws IOException { + if (this.array.entries() < 2) return false; + File f1 = this.array.unmountOldestBLOB(); + File f2 = this.array.unmountOldestBLOB(); + // iterate both files and write a new one + + CloneableIterator i1 = new blobFileEntries(f1, this.payloadrow); + CloneableIterator i2 = new blobFileEntries(f2, this.payloadrow); + ReferenceContainer c1, c2, c1o, c2o; + c1 = (i1.hasNext()) ? i1.next() : null; + c2 = (i2.hasNext()) ? i2.next() : null; + if (c1 == null && c2 == null) { + if (!f1.delete()) f1.deleteOnExit(); + if (!f2.delete()) f2.deleteOnExit(); + return true; + } + if (c1 == null) { + if (!f1.delete()) f1.deleteOnExit(); + this.array.mountBLOB(f2); + return true; + } + if (c2 == null) { + if (!f2.delete()) f2.deleteOnExit(); + this.array.mountBLOB(f1); + return true; + } + File newFile = newContainerBLOBFile(); + HeapWriter writer = new HeapWriter(newFile, this.array.keylength(), this.array.ordering()); + int e; + while (true) { + assert c1 != null; + assert c2 != null; + e = this.array.ordering().compare(c1.getWordHash().getBytes(), c2.getWordHash().getBytes()); + if (e < 0) { + writer.add(c1.getWordHash().getBytes(), c1.exportCollection()); + if (i1.hasNext()) { + c1o = c1; + c1 = i1.next(); + assert this.array.ordering().compare(c1.getWordHash().getBytes(), c1o.getWordHash().getBytes()) > 0; + continue; + } + break; + } + if (e > 0) { + writer.add(c2.getWordHash().getBytes(), c2.exportCollection()); + if (i2.hasNext()) { + c2o = c2; + c2 = i2.next(); + assert this.array.ordering().compare(c2.getWordHash().getBytes(), c2o.getWordHash().getBytes()) > 0; + continue; + } + break; + } + assert e == 0; + // merge the entries + writer.add(c1.getWordHash().getBytes(), (c1.merge(c2)).exportCollection()); + if (i1.hasNext() && i2.hasNext()) { + c1 = i1.next(); + c2 = i2.next(); + continue; + } + if (i1.hasNext()) c1 = i1.next(); + if (i2.hasNext()) c2 = i2.next(); + break; + + } + // catch up remaining entries + assert !(i1.hasNext() && i2.hasNext()); + while (i1.hasNext()) { + //System.out.println("FLUSH REMAINING 1: " + c1.getWordHash()); + writer.add(c1.getWordHash().getBytes(), c1.exportCollection()); + if (i1.hasNext()) { + c1o = c1; + c1 = i1.next(); + assert this.array.ordering().compare(c1.getWordHash().getBytes(), c1o.getWordHash().getBytes()) > 0; + continue; + } + break; + } + while (i2.hasNext()) { + //System.out.println("FLUSH REMAINING 2: " + c2.getWordHash()); + writer.add(c2.getWordHash().getBytes(), c2.exportCollection()); + if (i2.hasNext()) { + c2o = c2; + c2 = i2.next(); + assert this.array.ordering().compare(c2.getWordHash().getBytes(), c2o.getWordHash().getBytes()) > 0; + continue; + } + break; + } + // finished with writing + writer.close(true); + // we don't need the old files any more + if (!f1.delete()) f1.deleteOnExit(); + if (!f2.delete()) f2.deleteOnExit(); + this.array.mountBLOB(newFile); + return true; + } } diff --git a/source/de/anomic/kelondro/text/ReferenceContainerCache.java b/source/de/anomic/kelondro/text/ReferenceContainerCache.java index 59fad9e16..95bf937b6 100644 --- a/source/de/anomic/kelondro/text/ReferenceContainerCache.java +++ b/source/de/anomic/kelondro/text/ReferenceContainerCache.java @@ -26,10 +26,7 @@ package de.anomic.kelondro.text; -import java.io.BufferedInputStream; -import java.io.DataInputStream; import java.io.File; -import java.io.FileInputStream; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -88,29 +85,6 @@ public final class ReferenceContainerCache extends AbstractIndex implements Inde this.cache = Collections.synchronizedSortedMap(new TreeMap(new ByteOrder.StringOrder(this.wordOrder))); } - /** - * restore a heap dump: this is a heap in write mode. There should no heap file - * be assigned in initialization; the file name is given here in this method - * when the heap is once dumped again, the heap file name may be different - * @param heapFile - * @throws IOException - */ - public void initWriteModeFromHeap(final File heapFile) throws IOException { - Log.logInfo("indexContainerRAMHeap", "restoring dump for rwi heap '" + heapFile.getName() + "'"); - final long start = System.currentTimeMillis(); - this.cache = Collections.synchronizedSortedMap(new TreeMap(new ByteOrder.StringOrder(this.wordOrder))); - int urlCount = 0; - synchronized (cache) { - for (final ReferenceContainer container : new heapFileEntries(heapFile, this.payloadrow)) { - // TODO: in this loop a lot of memory may be allocated. A check if the memory gets low is necessary. But what do when the memory is low? - if (container == null) break; - cache.put(container.getWordHash(), container); - urlCount += container.size(); - } - } - Log.logInfo("indexContainerRAMHeap", "finished rwi heap restore: " + cache.size() + " words, " + urlCount + " word/URL relations in " + (System.currentTimeMillis() - start) + " milliseconds"); - } - /** * this is the new cache file format initialization * @param heapFile @@ -135,32 +109,38 @@ public final class ReferenceContainerCache extends AbstractIndex implements Inde Log.logInfo("indexContainerRAMHeap", "finished rwi blob restore: " + cache.size() + " words, " + urlCount + " word/URL relations in " + (System.currentTimeMillis() - start) + " milliseconds"); } - public void dump(final File heapFile) throws IOException { + public void dump(final File heapFile, boolean writeIDX) throws IOException { assert this.cache != null; Log.logInfo("indexContainerRAMHeap", "creating rwi heap dump '" + heapFile.getName() + "', " + cache.size() + " rwi's"); if (heapFile.exists()) heapFile.delete(); - final HeapWriter dump = new HeapWriter(heapFile, payloadrow.primaryKeyLength, Base64Order.enhancedCoder); + HeapWriter dump = new HeapWriter(heapFile, payloadrow.primaryKeyLength, Base64Order.enhancedCoder); final long startTime = System.currentTimeMillis(); long wordcount = 0, urlcount = 0; - String wordHash; + String wordHash = null, lwh; ReferenceContainer container; // write wCache synchronized (cache) { for (final Map.Entry entry: cache.entrySet()) { // get entries + lwh = wordHash; wordHash = entry.getKey(); container = entry.getValue(); + // check consistency: entries must be ordered + assert (lwh == null || this.ordering().compare(wordHash.getBytes(), lwh.getBytes()) > 0); + // put entries on heap if (container != null && wordHash.length() == payloadrow.primaryKeyLength) { + //System.out.println("Dump: " + wordHash); dump.add(wordHash.getBytes(), container.exportCollection()); urlcount += container.size(); } wordcount++; } } - dump.close(); + dump.close(writeIDX); + dump = null; Log.logInfo("indexContainerRAMHeap", "finished alternative rwi heap dump: " + wordcount + " words, " + urlcount + " word/URL relations in " + (System.currentTimeMillis() - startTime) + " milliseconds"); } @@ -168,74 +148,18 @@ public final class ReferenceContainerCache extends AbstractIndex implements Inde return (this.cache == null) ? 0 : this.cache.size(); } - /** - * static iterator of heap files: is used to import heap dumps into a write-enabled index heap - */ - public static class heapFileEntries implements Iterator, Iterable { - DataInputStream is; - byte[] word; - Row payloadrow; - ReferenceContainer nextContainer; - - public heapFileEntries(final File heapFile, final Row payloadrow) throws IOException { - if (!(heapFile.exists())) throw new IOException("file " + heapFile + " does not exist"); - is = new DataInputStream(new BufferedInputStream(new FileInputStream(heapFile), 1024*1024)); - word = new byte[payloadrow.primaryKeyLength]; - this.payloadrow = payloadrow; - this.nextContainer = next0(); - } - - public boolean hasNext() { - return this.nextContainer != null; - } - - private ReferenceContainer next0() { - try { - is.readFully(word); - return new ReferenceContainer(new String(word), RowSet.importRowSet(is, payloadrow)); - } catch (final IOException e) { - return null; - } - } - - /** - * return an index container - * because they may get very large, it is wise to deallocate some memory before calling next() - */ - public ReferenceContainer next() { - final ReferenceContainer n = this.nextContainer; - this.nextContainer = next0(); - return n; - } - - public void remove() { - throw new UnsupportedOperationException("heap dumps are read-only"); - } - - public Iterator iterator() { - return this; - } - - public void close() { - if (is != null) try { is.close(); } catch (final IOException e) {} - is = null; - } - - protected void finalize() { - this.close(); - } - } - /** * static iterator of BLOBHeap files: is used to import heap dumps into a write-enabled index heap */ - public static class blobFileEntries implements Iterator, Iterable { + public static class blobFileEntries implements CloneableIterator, Iterable { Iterator> blobs; Row payloadrow; + File blobFile; public blobFileEntries(final File blobFile, final Row payloadrow) throws IOException { this.blobs = new HeapReader.entries(blobFile, payloadrow.primaryKeyLength); this.payloadrow = payloadrow; + this.blobFile = blobFile; } public boolean hasNext() { @@ -267,6 +191,15 @@ public final class ReferenceContainerCache extends AbstractIndex implements Inde protected void finalize() { this.close(); } + + public CloneableIterator clone(Object modifier) { + try { + return new blobFileEntries(this.blobFile, this.payloadrow); + } catch (IOException e) { + e.printStackTrace(); + return null; + } + } } public synchronized int maxReferences() { diff --git a/source/de/anomic/plasma/plasmaHTCache.java b/source/de/anomic/plasma/plasmaHTCache.java index 2c11b806a..292bc3bef 100644 --- a/source/de/anomic/plasma/plasmaHTCache.java +++ b/source/de/anomic/plasma/plasmaHTCache.java @@ -172,7 +172,7 @@ public final class plasmaHTCache { public static void close() { responseHeaderDB.close(); - fileDB.close(); + fileDB.close(true); } public static boolean isPicture(final String mimeType) { diff --git a/source/de/anomic/plasma/plasmaWordIndex.java b/source/de/anomic/plasma/plasmaWordIndex.java index c3dfce501..2da564477 100644 --- a/source/de/anomic/plasma/plasmaWordIndex.java +++ b/source/de/anomic/plasma/plasmaWordIndex.java @@ -135,7 +135,7 @@ public final class plasmaWordIndex { new File(indexPrimaryTextLocation, "RICELL"), wordOrder, ReferenceRow.urlEntryRow, - entityCacheMaxSize) : + entityCacheMaxSize, 10) : new BufferedIndexCollection( indexPrimaryTextLocation, wordOrder, diff --git a/source/de/anomic/yacy/dht/FlatWordPartitionScheme.java b/source/de/anomic/yacy/dht/FlatWordPartitionScheme.java index 98759eee0..a04ac6294 100755 --- a/source/de/anomic/yacy/dht/FlatWordPartitionScheme.java +++ b/source/de/anomic/yacy/dht/FlatWordPartitionScheme.java @@ -82,7 +82,9 @@ public class FlatWordPartitionScheme implements PartitionScheme { public static String positionToHash(final long l) { // transform the position of a peer position into a close peer hash - return new String(Base64Order.enhancedCoder.uncardinal(l)) + "AA"; + String s = new String(Base64Order.enhancedCoder.uncardinal(l)); + while (s.length() < 12) s += "A"; + return s; } }