added a merge operation for IndexCell data structures

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@5727 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
orbiter 16 years ago
parent d99ff745aa
commit 9a90ea05e0

@ -99,9 +99,9 @@ public class httpClient {
conManager.getParams().setConnectionTimeout(60000); // set a default timeout conManager.getParams().setConnectionTimeout(60000); // set a default timeout
conManager.getParams().setDefaultMaxConnectionsPerHost(3); // prevent DoS by mistake conManager.getParams().setDefaultMaxConnectionsPerHost(3); // prevent DoS by mistake
localHostConfiguration.setHost("localhost"); localHostConfiguration.setHost("localhost");
conManager.getParams().setMaxConnectionsPerHost(localHostConfiguration, 10); conManager.getParams().setMaxConnectionsPerHost(localHostConfiguration, 100);
localHostConfiguration.setHost("127.0.0.1"); localHostConfiguration.setHost("127.0.0.1");
conManager.getParams().setMaxConnectionsPerHost(localHostConfiguration, 10); conManager.getParams().setMaxConnectionsPerHost(localHostConfiguration, 100);
// TODO should this be configurable? // TODO should this be configurable?
// accept self-signed or untrusted certificates // accept self-signed or untrusted certificates

@ -146,7 +146,7 @@ public interface BLOB {
/** /**
* close the BLOB table * close the BLOB table
*/ */
public void close(); public void close(boolean writeIDX);
public interface Rewriter { public interface Rewriter {

@ -147,14 +147,14 @@ public class BLOBArray implements BLOB {
blobs.add(new blobItem(d, location, oneBlob)); blobs.add(new blobItem(d, location, oneBlob));
} }
public void unmountBLOB(File location) { public void unmountBLOB(File location, boolean writeIDX) {
Iterator<blobItem> i = this.blobs.iterator(); Iterator<blobItem> i = this.blobs.iterator();
blobItem b; blobItem b;
while (i.hasNext()) { while (i.hasNext()) {
b = i.next(); b = i.next();
if (b.location.equals(location)) { if (b.location.equals(location)) {
i.remove(); i.remove();
b.blob.close(); b.blob.close(writeIDX);
return; return;
} }
} }
@ -163,7 +163,7 @@ public class BLOBArray implements BLOB {
public File unmountOldestBLOB() { public File unmountOldestBLOB() {
if (this.blobs.size() == 0) return null; if (this.blobs.size() == 0) return null;
blobItem b = this.blobs.remove(0); blobItem b = this.blobs.remove(0);
b.blob.close(); b.blob.close(false);
return b.location; return b.location;
} }
@ -207,7 +207,7 @@ public class BLOBArray implements BLOB {
while (blobs.size() > 0 && System.currentTimeMillis() - blobs.get(0).creation.getTime() - this.fileAgeLimit > this.repositoryAgeMax) { while (blobs.size() > 0 && System.currentTimeMillis() - blobs.get(0).creation.getTime() - this.fileAgeLimit > this.repositoryAgeMax) {
// too old // too old
blobItem oldestBLOB = blobs.remove(0); blobItem oldestBLOB = blobs.remove(0);
oldestBLOB.blob.close(); oldestBLOB.blob.close(false);
if (!oldestBLOB.location.delete()) oldestBLOB.location.deleteOnExit(); if (!oldestBLOB.location.delete()) oldestBLOB.location.deleteOnExit();
} }
@ -215,7 +215,7 @@ public class BLOBArray implements BLOB {
while (blobs.size() > 0 && length() > this.repositorySizeMax) { while (blobs.size() > 0 && length() > this.repositorySizeMax) {
// too large // too large
blobItem oldestBLOB = blobs.remove(0); blobItem oldestBLOB = blobs.remove(0);
oldestBLOB.blob.close(); oldestBLOB.blob.close(false);
if (!oldestBLOB.location.delete()) oldestBLOB.location.deleteOnExit(); if (!oldestBLOB.location.delete()) oldestBLOB.location.deleteOnExit();
} }
} }
@ -417,8 +417,8 @@ public class BLOBArray implements BLOB {
/** /**
* close the BLOB * close the BLOB
*/ */
public void close() { public void close(boolean writeIDX) {
for (blobItem bi: blobs) bi.blob.close(); for (blobItem bi: blobs) bi.blob.close(writeIDX);
blobs.clear(); blobs.clear();
blobs = null; blobs = null;
} }
@ -441,7 +441,7 @@ public class BLOBArray implements BLOB {
heap.remove("aaaaaaaaaaab".getBytes()); heap.remove("aaaaaaaaaaab".getBytes());
heap.remove("aaaaaaaaaaac".getBytes()); heap.remove("aaaaaaaaaaac".getBytes());
heap.put("aaaaaaaaaaaX".getBytes(), "WXYZ".getBytes()); heap.put("aaaaaaaaaaaX".getBytes(), "WXYZ".getBytes());
heap.close(); heap.close(true);
} catch (final IOException e) { } catch (final IOException e) {
e.printStackTrace(); e.printStackTrace();
} }

@ -77,14 +77,14 @@ public class BLOBCompressor extends Thread implements BLOB {
return this.backend.ordering(); return this.backend.ordering();
} }
public synchronized void close() { public synchronized void close(boolean writeIDX) {
// no more thread is running, flush all queues // no more thread is running, flush all queues
try { try {
flushAll(); flushAll();
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); e.printStackTrace();
} }
this.backend.close(); this.backend.close(writeIDX);
} }
private byte[] compress(byte[] b) { private byte[] compress(byte[] b) {

@ -102,7 +102,7 @@ public class BLOBHeapModifier extends HeapReader implements BLOB {
/** /**
* close the BLOB table * close the BLOB table
*/ */
public synchronized void close() { public synchronized void close(boolean writeIDX) {
shrinkWithGapsAtEnd(); shrinkWithGapsAtEnd();
if (file != null) { if (file != null) {
try { try {
@ -113,7 +113,7 @@ public class BLOBHeapModifier extends HeapReader implements BLOB {
} }
file = null; file = null;
if (index != null && free != null && (index.size() > 3 || free.size() > 3)) { if (writeIDX && index != null && free != null && (index.size() > 3 || free.size() > 3)) {
// now we can create a dump of the index and the gap information // now we can create a dump of the index and the gap information
// to speed up the next start // to speed up the next start
try { try {

@ -500,7 +500,7 @@ public class BLOBTree implements BLOB {
} }
public synchronized void close() { public synchronized void close(boolean writeIDX) {
index.close(); index.close();
} }
@ -518,7 +518,7 @@ public class BLOBTree implements BLOB {
final Iterator<byte[]> i = kd.keys(true, false); final Iterator<byte[]> i = kd.keys(true, false);
while (i.hasNext()) while (i.hasNext())
System.out.println(new String(i.next())); System.out.println(new String(i.next()));
kd.close(); kd.close(true);
} catch (final IOException e) { } catch (final IOException e) {
e.printStackTrace(); e.printStackTrace();
} }

@ -82,10 +82,10 @@ public class HeapReader {
} }
} }
if (!ok) { if (!ok) {
Log.logWarning("kelondroBLOBHeap", "verification of idx file for " + heapFile.toString() + " failed, re-building index"); Log.logWarning("HeapReader", "verification of idx file for " + heapFile.toString() + " failed, re-building index");
initIndexReadFromHeap(); initIndexReadFromHeap();
} else { } else {
Log.logInfo("kelondroBLOBHeap", "using a dump of the index of " + heapFile.toString() + "."); Log.logInfo("HeapReader", "using a dump of the index of " + heapFile.toString() + ".");
} }
} else { } else {
// if we did not have a dump, create a new index // if we did not have a dump, create a new index

@ -37,11 +37,12 @@ import de.anomic.kelondro.util.Log;
public final class HeapWriter { public final class HeapWriter {
private int keylength; // the length of the primary key private int keylength; // the length of the primary key
private LongHandleIndex index; // key/seek relation for used records private LongHandleIndex index; // key/seek relation for used records
private final File heapFile; // the file of the heap private final File heapFile; // the file of the heap
private DataOutputStream os; // the output stream where the BLOB is written private DataOutputStream os; // the output stream where the BLOB is written
private long seek; // the current write position private long seek; // the current write position
//private HashSet<String> doublecheck;// only for testing
/* /*
* This class implements a BLOB management based on a sequence of records * This class implements a BLOB management based on a sequence of records
@ -73,6 +74,7 @@ public final class HeapWriter {
this.keylength = keylength; this.keylength = keylength;
this.index = new LongHandleIndex(keylength, ordering, 10, 100000); this.index = new LongHandleIndex(keylength, ordering, 10, 100000);
this.os = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(heapFile), 1024 * 1024)); this.os = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(heapFile), 1024 * 1024));
//this.doublecheck = new HashSet<String>();
this.seek = 0; this.seek = 0;
} }
@ -83,17 +85,19 @@ public final class HeapWriter {
* @param blob * @param blob
* @throws IOException * @throws IOException
*/ */
public void add(final byte[] key, final byte[] blob) throws IOException { public synchronized void add(final byte[] key, final byte[] blob) throws IOException {
//System.out.println("HeapWriter.add: " + new String(key));
assert blob.length > 0; assert blob.length > 0;
assert key.length == this.keylength; assert key.length == this.keylength;
assert index.row().primaryKeyLength == key.length : index.row().primaryKeyLength + "!=" + key.length; assert index.row().primaryKeyLength == key.length : index.row().primaryKeyLength + "!=" + key.length;
assert index.get(key) < 0; // must not occur before assert index.get(key) < 0 : "index.get(key) = " + index.get(key) + ", index.size() = " + index.size() + ", file.length() = " + this.heapFile.length() + ", key = " + new String(key); // must not occur before
if ((blob == null) || (blob.length == 0)) return; if ((blob == null) || (blob.length == 0)) return;
int chunkl = key.length + blob.length; int chunkl = key.length + blob.length;
os.writeInt(chunkl); os.writeInt(chunkl);
os.write(key); os.write(key);
os.write(blob); os.write(blob);
index.putUnique(key, seek); index.putUnique(key, seek);
//assert (this.doublecheck.add(new String(key))) : "doublecheck failed for " + new String(key);
this.seek += chunkl + 4; this.seek += chunkl + 4;
} }
@ -122,7 +126,7 @@ public final class HeapWriter {
* close the BLOB table * close the BLOB table
* @throws * @throws
*/ */
public synchronized void close() { public synchronized void close(boolean writeIDX) {
try { try {
os.flush(); os.flush();
os.close(); os.close();
@ -131,7 +135,7 @@ public final class HeapWriter {
} }
os = null; os = null;
if (index.size() > 3) { if (writeIDX && index.size() > 3) {
// now we can create a dump of the index and the gap information // now we can create a dump of the index and the gap information
// to speed up the next start // to speed up the next start
try { try {

@ -296,7 +296,7 @@ public class MapView {
cacheScore = null; cacheScore = null;
// close file // close file
blob.close(); blob.close(true);
} }
public class objectIterator implements Iterator<Map<String, String>> { public class objectIterator implements Iterator<Map<String, String>> {

@ -42,6 +42,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import de.anomic.kelondro.order.Base64Order;
import de.anomic.kelondro.order.ByteOrder; import de.anomic.kelondro.order.ByteOrder;
import de.anomic.kelondro.order.CloneableIterator; import de.anomic.kelondro.order.CloneableIterator;
@ -310,4 +311,18 @@ public class LongHandleIndex {
} }
} }
public static void main(String[] args) {
LongHandleIndex idx = new LongHandleIndex(12, Base64Order.enhancedCoder, 10000, 10000000);
byte[] s;
//long l;
for (int i = 0; i < 10000000; i = i + 8) {
s = Base64Order.enhancedCoder.uncardinal(Long.MAX_VALUE - i);
//l = Base64Order.enhancedCoder.cardinal(s);
//if (i != l) System.out.println("encoding bug for " + new String(s) + ", v = " + (Long.MAX_VALUE - i) + ", l = " + l);
//System.out.println(s);
if (idx.get(s) >= 0) System.out.println("search bug for " + new String(s) + ": " + idx.get(s));
idx.putUnique(s, 1);
}
}
} }

@ -253,24 +253,6 @@ public class Base64Order extends AbstractOrder<byte[]> implements ByteOrder, Cod
} }
} }
private final long cardinalI(final byte[] key, int off, int len) {
// returns a cardinal number in the range of 0 .. Long.MAX_VALUE
long c = 0;
int lim = off + Math.min(10, len);
int lim10 = off + 10;
byte b;
while (off < lim) {
b = key[off++];
if (b < 0) return -1;
b = ahpla[b];
if (b < 0) return -1;
c = (c << 6) | b;
}
while (off++ < lim10) c = (c << 6);
c = c << 3;
assert c >= 0;
return c;
}
/* /*
private final long cardinalI(final byte[] key) { private final long cardinalI(final byte[] key) {
@ -292,21 +274,42 @@ public class Base64Order extends AbstractOrder<byte[]> implements ByteOrder, Cod
while ((p < 10) && (p < key.length())) { while ((p < 10) && (p < key.length())) {
b = ahpla[key.charAt(p++)]; b = ahpla[key.charAt(p++)];
if (b < 0) return -1; if (b < 0) return -1;
c = (c << 6) |b; c = (c << 6) | b;
} }
while (p++ < 10) c = (c << 6); while (p++ < 10) c = (c << 6);
c = c << 3; c = (c << 3) | 7;
assert c >= 0;
return c;
}
private final long cardinalI(final byte[] key, int off, int len) {
// returns a cardinal number in the range of 0 .. Long.MAX_VALUE
long c = 0;
int lim = off + Math.min(10, len);
int lim10 = off + 10;
byte b;
while (off < lim) {
b = key[off++];
if (b < 0) return -1;
b = ahpla[b];
if (b < 0) return -1;
c = (c << 6) | b;
}
while (off++ < lim10) c = (c << 6);
c = (c << 3) | 7;
assert c >= 0; assert c >= 0;
return c; return c;
} }
public final byte[] uncardinal(long c) { public final byte[] uncardinal(long c) {
c = c >> 3; c = c >> 3;
byte[] b = new byte[10]; byte[] b = new byte[12];
for (int p = 9; p >= 0; p--) { for (int p = 9; p >= 0; p--) {
b[p] = (byte) alpha[(int) (c & 0x3fL)]; b[p] = (byte) alpha[(int) (c & 0x3fL)];
c = c >> 6; c = c >> 6;
} }
b[10] = (byte) alpha[0x3f];
b[11] = (byte) alpha[0x3f];
return b; return b;
} }

@ -321,7 +321,7 @@ public final class IndexBuffer extends AbstractIndex implements Index, IndexRead
// dump cache // dump cache
try { try {
//heap.dumpold(this.oldDumpFile); //heap.dumpold(this.oldDumpFile);
heap.dump(this.dumpFile); heap.dump(this.dumpFile, true);
} catch (final IOException e){ } catch (final IOException e){
log.logSevere("unable to dump cache: " + e.getMessage(), e); log.logSevere("unable to dump cache: " + e.getMessage(), e);
} }

@ -54,18 +54,20 @@ public final class IndexCell extends AbstractBufferedIndex implements BufferedIn
// class variables // class variables
private ReferenceContainerArray array; private ReferenceContainerArray array;
private ReferenceContainerCache ram; private ReferenceContainerCache ram;
private int maxRamEntries; private int maxRamEntries, maxArrayFiles;
public IndexCell( public IndexCell(
final File cellPath, final File cellPath,
final ByteOrder wordOrder, final ByteOrder wordOrder,
final Row payloadrow, final Row payloadrow,
final int maxRamEntries final int maxRamEntries,
final int maxArrayFiles
) throws IOException { ) throws IOException {
this.array = new ReferenceContainerArray(cellPath, wordOrder, payloadrow); this.array = new ReferenceContainerArray(cellPath, wordOrder, payloadrow);
this.ram = new ReferenceContainerCache(payloadrow, wordOrder); this.ram = new ReferenceContainerCache(payloadrow, wordOrder);
this.ram.initWriteMode(); this.ram.initWriteMode();
this.maxRamEntries = maxRamEntries; this.maxRamEntries = maxRamEntries;
this.maxArrayFiles = maxArrayFiles;
} }
@ -230,7 +232,7 @@ public final class IndexCell extends AbstractBufferedIndex implements BufferedIn
public synchronized void close() { public synchronized void close() {
// dump the ram // dump the ram
try { try {
this.ram.dump(this.array.newContainerBLOBFile()); this.ram.dump(this.array.newContainerBLOBFile(), true);
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); e.printStackTrace();
} }
@ -256,15 +258,19 @@ public final class IndexCell extends AbstractBufferedIndex implements BufferedIn
* cache control methods * cache control methods
*/ */
private void cacheDump() throws IOException { private synchronized void cacheDump() throws IOException {
// dump the ram // dump the ram
File dumpFile = this.array.newContainerBLOBFile(); File dumpFile = this.array.newContainerBLOBFile();
this.ram.dump(dumpFile); this.ram.dump(dumpFile, true);
// get a fresh ram cache // get a fresh ram cache
this.ram = new ReferenceContainerCache(this.array.rowdef(), this.array.ordering()); this.ram = new ReferenceContainerCache(this.array.rowdef(), this.array.ordering());
this.ram.initWriteMode(); this.ram.initWriteMode();
// add the dumped indexContainerBLOB to the array // add the dumped indexContainerBLOB to the array
this.array.mountBLOBContainer(dumpFile); this.array.mountBLOBContainer(dumpFile);
int c = 0;
while (this.array.entries() > this.maxArrayFiles && c++ < 3) {
if (!this.array.mergeOldest()) break;
}
} }
public void cleanupBuffer(int time) { public void cleanupBuffer(int time) {

@ -34,10 +34,12 @@ import java.util.List;
import de.anomic.kelondro.blob.BLOB; import de.anomic.kelondro.blob.BLOB;
import de.anomic.kelondro.blob.BLOBArray; import de.anomic.kelondro.blob.BLOBArray;
import de.anomic.kelondro.blob.HeapWriter;
import de.anomic.kelondro.index.Row; import de.anomic.kelondro.index.Row;
import de.anomic.kelondro.index.RowSet; import de.anomic.kelondro.index.RowSet;
import de.anomic.kelondro.order.ByteOrder; import de.anomic.kelondro.order.ByteOrder;
import de.anomic.kelondro.order.CloneableIterator; import de.anomic.kelondro.order.CloneableIterator;
import de.anomic.kelondro.text.ReferenceContainerCache.blobFileEntries;
public final class ReferenceContainerArray { public final class ReferenceContainerArray {
@ -68,7 +70,7 @@ public final class ReferenceContainerArray {
} }
public synchronized void close() { public synchronized void close() {
this.array.close(); this.array.close(true);
} }
public synchronized void clear() throws IOException { public synchronized void clear() throws IOException {
@ -231,36 +233,6 @@ public final class ReferenceContainerArray {
return c.exportCollection(); return c.exportCollection();
} }
} }
/*
public int mergeOldest() {
if (this.array.entries() < 2) return 0;
File f1 = this.array.unmountOldestBLOB();
File f2 = this.array.unmountOldestBLOB();
// iterate both files and write a new one
new kelondroMergeIterator<indexContainer>(
(kelondroCloneableIterator<Map.Entry<String, byte[]>>) new kelondroBLOBHeapReader.entries(f1, this.payloadrow.objectsize),
null,
null,
null,
true);
return 0;
}
*/
/*
* new kelondroMergeIterator<indexContainer>(
new kelondroBLOBHeapReader.entries(f1, this.payloadrow.objectsize),
new kelondroBLOBHeapReader.entries(f2, this.payloadrow.objectsize),
this.payloadrow.getOrdering(),
indexContainer.containerMergeMethod,
true);
*/
/*
public kelondroMergeIterator(
final kelondroCloneableIterator<E> a,
final kelondroCloneableIterator<E> b,
final Comparator<E> c,
final Method m, final boolean up) {
*/
public interface ContainerRewriter { public interface ContainerRewriter {
@ -268,4 +240,107 @@ public final class ReferenceContainerArray {
} }
public int entries() {
return this.array.entries();
}
public synchronized boolean mergeOldest() throws IOException {
if (this.array.entries() < 2) return false;
File f1 = this.array.unmountOldestBLOB();
File f2 = this.array.unmountOldestBLOB();
// iterate both files and write a new one
CloneableIterator<ReferenceContainer> i1 = new blobFileEntries(f1, this.payloadrow);
CloneableIterator<ReferenceContainer> i2 = new blobFileEntries(f2, this.payloadrow);
ReferenceContainer c1, c2, c1o, c2o;
c1 = (i1.hasNext()) ? i1.next() : null;
c2 = (i2.hasNext()) ? i2.next() : null;
if (c1 == null && c2 == null) {
if (!f1.delete()) f1.deleteOnExit();
if (!f2.delete()) f2.deleteOnExit();
return true;
}
if (c1 == null) {
if (!f1.delete()) f1.deleteOnExit();
this.array.mountBLOB(f2);
return true;
}
if (c2 == null) {
if (!f2.delete()) f2.deleteOnExit();
this.array.mountBLOB(f1);
return true;
}
File newFile = newContainerBLOBFile();
HeapWriter writer = new HeapWriter(newFile, this.array.keylength(), this.array.ordering());
int e;
while (true) {
assert c1 != null;
assert c2 != null;
e = this.array.ordering().compare(c1.getWordHash().getBytes(), c2.getWordHash().getBytes());
if (e < 0) {
writer.add(c1.getWordHash().getBytes(), c1.exportCollection());
if (i1.hasNext()) {
c1o = c1;
c1 = i1.next();
assert this.array.ordering().compare(c1.getWordHash().getBytes(), c1o.getWordHash().getBytes()) > 0;
continue;
}
break;
}
if (e > 0) {
writer.add(c2.getWordHash().getBytes(), c2.exportCollection());
if (i2.hasNext()) {
c2o = c2;
c2 = i2.next();
assert this.array.ordering().compare(c2.getWordHash().getBytes(), c2o.getWordHash().getBytes()) > 0;
continue;
}
break;
}
assert e == 0;
// merge the entries
writer.add(c1.getWordHash().getBytes(), (c1.merge(c2)).exportCollection());
if (i1.hasNext() && i2.hasNext()) {
c1 = i1.next();
c2 = i2.next();
continue;
}
if (i1.hasNext()) c1 = i1.next();
if (i2.hasNext()) c2 = i2.next();
break;
}
// catch up remaining entries
assert !(i1.hasNext() && i2.hasNext());
while (i1.hasNext()) {
//System.out.println("FLUSH REMAINING 1: " + c1.getWordHash());
writer.add(c1.getWordHash().getBytes(), c1.exportCollection());
if (i1.hasNext()) {
c1o = c1;
c1 = i1.next();
assert this.array.ordering().compare(c1.getWordHash().getBytes(), c1o.getWordHash().getBytes()) > 0;
continue;
}
break;
}
while (i2.hasNext()) {
//System.out.println("FLUSH REMAINING 2: " + c2.getWordHash());
writer.add(c2.getWordHash().getBytes(), c2.exportCollection());
if (i2.hasNext()) {
c2o = c2;
c2 = i2.next();
assert this.array.ordering().compare(c2.getWordHash().getBytes(), c2o.getWordHash().getBytes()) > 0;
continue;
}
break;
}
// finished with writing
writer.close(true);
// we don't need the old files any more
if (!f1.delete()) f1.deleteOnExit();
if (!f2.delete()) f2.deleteOnExit();
this.array.mountBLOB(newFile);
return true;
}
} }

@ -26,10 +26,7 @@
package de.anomic.kelondro.text; package de.anomic.kelondro.text;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.File; import java.io.File;
import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
@ -88,29 +85,6 @@ public final class ReferenceContainerCache extends AbstractIndex implements Inde
this.cache = Collections.synchronizedSortedMap(new TreeMap<String, ReferenceContainer>(new ByteOrder.StringOrder(this.wordOrder))); this.cache = Collections.synchronizedSortedMap(new TreeMap<String, ReferenceContainer>(new ByteOrder.StringOrder(this.wordOrder)));
} }
/**
* restore a heap dump: this is a heap in write mode. There should no heap file
* be assigned in initialization; the file name is given here in this method
* when the heap is once dumped again, the heap file name may be different
* @param heapFile
* @throws IOException
*/
public void initWriteModeFromHeap(final File heapFile) throws IOException {
Log.logInfo("indexContainerRAMHeap", "restoring dump for rwi heap '" + heapFile.getName() + "'");
final long start = System.currentTimeMillis();
this.cache = Collections.synchronizedSortedMap(new TreeMap<String, ReferenceContainer>(new ByteOrder.StringOrder(this.wordOrder)));
int urlCount = 0;
synchronized (cache) {
for (final ReferenceContainer container : new heapFileEntries(heapFile, this.payloadrow)) {
// TODO: in this loop a lot of memory may be allocated. A check if the memory gets low is necessary. But what do when the memory is low?
if (container == null) break;
cache.put(container.getWordHash(), container);
urlCount += container.size();
}
}
Log.logInfo("indexContainerRAMHeap", "finished rwi heap restore: " + cache.size() + " words, " + urlCount + " word/URL relations in " + (System.currentTimeMillis() - start) + " milliseconds");
}
/** /**
* this is the new cache file format initialization * this is the new cache file format initialization
* @param heapFile * @param heapFile
@ -135,32 +109,38 @@ public final class ReferenceContainerCache extends AbstractIndex implements Inde
Log.logInfo("indexContainerRAMHeap", "finished rwi blob restore: " + cache.size() + " words, " + urlCount + " word/URL relations in " + (System.currentTimeMillis() - start) + " milliseconds"); Log.logInfo("indexContainerRAMHeap", "finished rwi blob restore: " + cache.size() + " words, " + urlCount + " word/URL relations in " + (System.currentTimeMillis() - start) + " milliseconds");
} }
public void dump(final File heapFile) throws IOException { public void dump(final File heapFile, boolean writeIDX) throws IOException {
assert this.cache != null; assert this.cache != null;
Log.logInfo("indexContainerRAMHeap", "creating rwi heap dump '" + heapFile.getName() + "', " + cache.size() + " rwi's"); Log.logInfo("indexContainerRAMHeap", "creating rwi heap dump '" + heapFile.getName() + "', " + cache.size() + " rwi's");
if (heapFile.exists()) heapFile.delete(); if (heapFile.exists()) heapFile.delete();
final HeapWriter dump = new HeapWriter(heapFile, payloadrow.primaryKeyLength, Base64Order.enhancedCoder); HeapWriter dump = new HeapWriter(heapFile, payloadrow.primaryKeyLength, Base64Order.enhancedCoder);
final long startTime = System.currentTimeMillis(); final long startTime = System.currentTimeMillis();
long wordcount = 0, urlcount = 0; long wordcount = 0, urlcount = 0;
String wordHash; String wordHash = null, lwh;
ReferenceContainer container; ReferenceContainer container;
// write wCache // write wCache
synchronized (cache) { synchronized (cache) {
for (final Map.Entry<String, ReferenceContainer> entry: cache.entrySet()) { for (final Map.Entry<String, ReferenceContainer> entry: cache.entrySet()) {
// get entries // get entries
lwh = wordHash;
wordHash = entry.getKey(); wordHash = entry.getKey();
container = entry.getValue(); container = entry.getValue();
// check consistency: entries must be ordered
assert (lwh == null || this.ordering().compare(wordHash.getBytes(), lwh.getBytes()) > 0);
// put entries on heap // put entries on heap
if (container != null && wordHash.length() == payloadrow.primaryKeyLength) { if (container != null && wordHash.length() == payloadrow.primaryKeyLength) {
//System.out.println("Dump: " + wordHash);
dump.add(wordHash.getBytes(), container.exportCollection()); dump.add(wordHash.getBytes(), container.exportCollection());
urlcount += container.size(); urlcount += container.size();
} }
wordcount++; wordcount++;
} }
} }
dump.close(); dump.close(writeIDX);
dump = null;
Log.logInfo("indexContainerRAMHeap", "finished alternative rwi heap dump: " + wordcount + " words, " + urlcount + " word/URL relations in " + (System.currentTimeMillis() - startTime) + " milliseconds"); Log.logInfo("indexContainerRAMHeap", "finished alternative rwi heap dump: " + wordcount + " words, " + urlcount + " word/URL relations in " + (System.currentTimeMillis() - startTime) + " milliseconds");
} }
@ -168,74 +148,18 @@ public final class ReferenceContainerCache extends AbstractIndex implements Inde
return (this.cache == null) ? 0 : this.cache.size(); return (this.cache == null) ? 0 : this.cache.size();
} }
/**
* static iterator of heap files: is used to import heap dumps into a write-enabled index heap
*/
public static class heapFileEntries implements Iterator<ReferenceContainer>, Iterable<ReferenceContainer> {
DataInputStream is;
byte[] word;
Row payloadrow;
ReferenceContainer nextContainer;
public heapFileEntries(final File heapFile, final Row payloadrow) throws IOException {
if (!(heapFile.exists())) throw new IOException("file " + heapFile + " does not exist");
is = new DataInputStream(new BufferedInputStream(new FileInputStream(heapFile), 1024*1024));
word = new byte[payloadrow.primaryKeyLength];
this.payloadrow = payloadrow;
this.nextContainer = next0();
}
public boolean hasNext() {
return this.nextContainer != null;
}
private ReferenceContainer next0() {
try {
is.readFully(word);
return new ReferenceContainer(new String(word), RowSet.importRowSet(is, payloadrow));
} catch (final IOException e) {
return null;
}
}
/**
* return an index container
* because they may get very large, it is wise to deallocate some memory before calling next()
*/
public ReferenceContainer next() {
final ReferenceContainer n = this.nextContainer;
this.nextContainer = next0();
return n;
}
public void remove() {
throw new UnsupportedOperationException("heap dumps are read-only");
}
public Iterator<ReferenceContainer> iterator() {
return this;
}
public void close() {
if (is != null) try { is.close(); } catch (final IOException e) {}
is = null;
}
protected void finalize() {
this.close();
}
}
/** /**
* static iterator of BLOBHeap files: is used to import heap dumps into a write-enabled index heap * static iterator of BLOBHeap files: is used to import heap dumps into a write-enabled index heap
*/ */
public static class blobFileEntries implements Iterator<ReferenceContainer>, Iterable<ReferenceContainer> { public static class blobFileEntries implements CloneableIterator<ReferenceContainer>, Iterable<ReferenceContainer> {
Iterator<Map.Entry<String, byte[]>> blobs; Iterator<Map.Entry<String, byte[]>> blobs;
Row payloadrow; Row payloadrow;
File blobFile;
public blobFileEntries(final File blobFile, final Row payloadrow) throws IOException { public blobFileEntries(final File blobFile, final Row payloadrow) throws IOException {
this.blobs = new HeapReader.entries(blobFile, payloadrow.primaryKeyLength); this.blobs = new HeapReader.entries(blobFile, payloadrow.primaryKeyLength);
this.payloadrow = payloadrow; this.payloadrow = payloadrow;
this.blobFile = blobFile;
} }
public boolean hasNext() { public boolean hasNext() {
@ -267,6 +191,15 @@ public final class ReferenceContainerCache extends AbstractIndex implements Inde
protected void finalize() { protected void finalize() {
this.close(); this.close();
} }
public CloneableIterator<ReferenceContainer> clone(Object modifier) {
try {
return new blobFileEntries(this.blobFile, this.payloadrow);
} catch (IOException e) {
e.printStackTrace();
return null;
}
}
} }
public synchronized int maxReferences() { public synchronized int maxReferences() {

@ -172,7 +172,7 @@ public final class plasmaHTCache {
public static void close() { public static void close() {
responseHeaderDB.close(); responseHeaderDB.close();
fileDB.close(); fileDB.close(true);
} }
public static boolean isPicture(final String mimeType) { public static boolean isPicture(final String mimeType) {

@ -135,7 +135,7 @@ public final class plasmaWordIndex {
new File(indexPrimaryTextLocation, "RICELL"), new File(indexPrimaryTextLocation, "RICELL"),
wordOrder, wordOrder,
ReferenceRow.urlEntryRow, ReferenceRow.urlEntryRow,
entityCacheMaxSize) : entityCacheMaxSize, 10) :
new BufferedIndexCollection( new BufferedIndexCollection(
indexPrimaryTextLocation, indexPrimaryTextLocation,
wordOrder, wordOrder,

@ -82,7 +82,9 @@ public class FlatWordPartitionScheme implements PartitionScheme {
public static String positionToHash(final long l) { public static String positionToHash(final long l) {
// transform the position of a peer position into a close peer hash // transform the position of a peer position into a close peer hash
return new String(Base64Order.enhancedCoder.uncardinal(l)) + "AA"; String s = new String(Base64Order.enhancedCoder.uncardinal(l));
while (s.length() < 12) s += "A";
return s;
} }
} }

Loading…
Cancel
Save