- shifted some computation out of synchronization to allow more concurrency

- removed synchronization where not necessary

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@6814 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
orbiter 15 years ago
parent f204076d25
commit dde394a977

@ -109,13 +109,13 @@ public final class MetadataRepository implements Iterable<byte[]> {
}
}
public synchronized int writeCacheSize() {
public int writeCacheSize() {
if (urlIndexFile instanceof SplitTable) return ((SplitTable) urlIndexFile).writeBufferSize();
if (urlIndexFile instanceof Cache) return ((Cache) urlIndexFile).writeBufferSize();
return 0;
}
public synchronized URIMetadataRow load(final byte[] urlHash, final WordReferenceVars searchedWord, final long ranking) {
public URIMetadataRow load(final byte[] urlHash, final WordReferenceVars searchedWord, final long ranking) {
// generates an plasmaLURLEntry using the url hash
// if the url cannot be found, this returns null
if (urlHash == null) return null;
@ -129,9 +129,10 @@ public final class MetadataRepository implements Iterable<byte[]> {
}
}
public synchronized void store(final URIMetadataRow entry) throws IOException {
public void store(final URIMetadataRow entry) throws IOException {
// Check if there is a more recent Entry already in the DB
URIMetadataRow oldEntry;
synchronized (this) {
try {
Row.Entry oe = (urlIndexFile == null) ? null : urlIndexFile.get(entry.hash());
oldEntry = (oe == null) ? null : new URIMetadataRow(oe, null, 0);
@ -153,10 +154,11 @@ public final class MetadataRepository implements Iterable<byte[]> {
} catch (RowSpaceExceededException e) {
throw new IOException("RowSpaceExceededException in " + this.urlIndexFile.filename() + ": " + e.getMessage());
}
}
statsDump = null;
}
public synchronized boolean remove(final byte[] urlHashBytes) {
public boolean remove(final byte[] urlHashBytes) {
if (urlHashBytes == null) return false;
try {
final Row.Entry r = urlIndexFile.remove(urlHashBytes);

@ -31,6 +31,7 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.TreeMap;
import net.yacy.kelondro.index.RowSpaceExceededException;
import net.yacy.kelondro.io.AbstractWriter;
@ -42,7 +43,7 @@ import net.yacy.kelondro.order.NaturalOrder;
public final class Heap extends HeapModifier implements BLOB {
private HashMap<String, byte[]> buffer; // a write buffer to limit IO to the file; attention: Maps cannot use byte[] as key
private TreeMap<byte[], byte[]> buffer; // a write buffer to limit IO to the file
private int buffersize; // bytes that are buffered in buffer
private final int buffermax; // maximum size of the buffer
@ -83,7 +84,7 @@ public final class Heap extends HeapModifier implements BLOB {
int buffermax) throws IOException {
super(heapFile, keylength, ordering);
this.buffermax = buffermax;
this.buffer = new HashMap<String, byte[]>();
this.buffer = new TreeMap<byte[], byte[]>(ordering);
this.buffersize = 0;
/*
// DEBUG
@ -119,12 +120,14 @@ public final class Heap extends HeapModifier implements BLOB {
* @return true if the key exists, false otherwise
*/
@Override
public synchronized boolean has(byte[] key) {
public boolean has(byte[] key) {
assert index != null;
key = normalizeKey(key);
// check the buffer
if (this.buffer.containsKey(new String(key))) return true;
return super.has(key);
synchronized (this) {
// check the buffer
if (this.buffer.containsKey(key)) return true;
return super.has(key);
}
}
/**
@ -138,7 +141,6 @@ public final class Heap extends HeapModifier implements BLOB {
assert blob.length > 0;
if ((blob == null) || (blob.length == 0)) return;
final int pos = (int) file.length();
key = normalizeKey(key);
index.put(key, pos);
file.seek(pos);
file.writeInt(this.keylength + blob.length);
@ -154,7 +156,7 @@ public final class Heap extends HeapModifier implements BLOB {
*/
private void flushBuffer() throws IOException, RowSpaceExceededException {
// check size of buffer
Iterator<Map.Entry<String, byte[]>> i = this.buffer.entrySet().iterator();
Iterator<Map.Entry<byte[], byte[]>> i = this.buffer.entrySet().iterator();
int l = 0;
while (i.hasNext()) l += i.next().getValue().length;
assert l == this.buffersize;
@ -162,11 +164,11 @@ public final class Heap extends HeapModifier implements BLOB {
// simulate write: this whole code block is only here to test the assert at the end of the block; remove after testing
i = this.buffer.entrySet().iterator();
int posBuffer = 0;
Map.Entry<String, byte[]> entry;
Map.Entry<byte[], byte[]> entry;
byte[] key, blob;
while (i.hasNext()) {
entry = i.next();
key = normalizeKey(entry.getKey().getBytes());
key = normalizeKey(entry.getKey());
blob = entry.getValue();
posBuffer += 4 + this.keylength + blob.length;
}
@ -181,7 +183,7 @@ public final class Heap extends HeapModifier implements BLOB {
byte[] b;
while (i.hasNext()) {
entry = i.next();
key = normalizeKey(entry.getKey().getBytes());
key = normalizeKey(entry.getKey());
blob = entry.getValue();
index.put(key, posFile);
b = AbstractWriter.int2array(this.keylength + blob.length);
@ -211,14 +213,16 @@ public final class Heap extends HeapModifier implements BLOB {
* @throws IOException
*/
@Override
public synchronized byte[] get(byte[] key) throws IOException {
public byte[] get(byte[] key) throws IOException {
key = normalizeKey(key);
// check the buffer
byte[] blob = this.buffer.get(new String(key));
if (blob != null) return blob;
return super.get(key);
synchronized (this) {
// check the buffer
byte[] blob = this.buffer.get(key);
if (blob != null) return blob;
return super.get(key);
}
}
/**
@ -228,13 +232,16 @@ public final class Heap extends HeapModifier implements BLOB {
* @throws IOException
*/
@Override
public synchronized long length(byte[] key) throws IOException {
public long length(byte[] key) throws IOException {
key = normalizeKey(key);
// check the buffer
byte[] blob = this.buffer.get(new String(key));
if (blob != null) return blob.length;
return super.length(key);
synchronized (this) {
// check the buffer
byte[] blob = this.buffer.get(key);
if (blob != null) return blob.length;
return super.length(key);
}
}
/**
@ -289,41 +296,41 @@ public final class Heap extends HeapModifier implements BLOB {
* @throws RowSpaceExceededException
*/
@Override
public synchronized void put(byte[] key, final byte[] b) throws IOException, RowSpaceExceededException {
public void put(byte[] key, final byte[] b) throws IOException, RowSpaceExceededException {
key = normalizeKey(key);
// we do not write records of length 0 into the BLOB
if (b.length == 0) return;
// first remove the old entry (removes from buffer and file)
// TODO: this can be enhanced!
this.remove(key);
// then look if we can use a free entry
if (putToGap(key, b)) return;
// if there is not enough space in the buffer, flush all
if (this.buffersize + b.length > buffermax) {
// this is too big. Flush everything
super.shrinkWithGapsAtEnd();
flushBuffer();
if (b.length > buffermax) {
this.add(key, b);
} else {
this.buffer.put(new String(key), b);
this.buffersize += b.length;
synchronized (this) {
// first remove the old entry (removes from buffer and file)
// TODO: this can be enhanced!
this.remove(key);
// then look if we can use a free entry
if (putToGap(key, b)) return;
// if there is not enough space in the buffer, flush all
if (this.buffersize + b.length > buffermax) {
// this is too big. Flush everything
super.shrinkWithGapsAtEnd();
flushBuffer();
if (b.length > buffermax) {
this.add(key, b);
} else {
this.buffer.put(key, b);
this.buffersize += b.length;
}
return;
}
return;
// add entry to buffer
this.buffer.put(key, b);
this.buffersize += b.length;
}
// add entry to buffer
this.buffer.put(new String(key), b);
this.buffersize += b.length;
}
private boolean putToGap(byte[] key, final byte[] b) throws IOException, RowSpaceExceededException {
key = normalizeKey(key);
// we do not write records of length 0 into the BLOB
if (b.length == 0) return true;
@ -415,17 +422,19 @@ public final class Heap extends HeapModifier implements BLOB {
* @throws IOException
*/
@Override
public synchronized void remove(byte[] key) throws IOException {
public void remove(byte[] key) throws IOException {
key = normalizeKey(key);
// check the buffer
byte[] blob = this.buffer.remove(new String(key));
if (blob != null) {
this.buffersize -= blob.length;
return;
synchronized (this) {
// check the buffer
byte[] blob = this.buffer.remove(key);
if (blob != null) {
this.buffersize -= blob.length;
return;
}
super.remove(key);
}
super.remove(key);
}
/**

@ -94,40 +94,42 @@ public class HeapModifier extends HeapReader implements BLOB {
* @param key the primary key
* @throws IOException
*/
public synchronized void remove(byte[] key) throws IOException {
public void remove(byte[] key) throws IOException {
key = normalizeKey(key);
// check if the index contains the key
final long seek = index.get(key);
if (seek < 0) return;
// check consistency of the index
assert (checkKey(key, seek)) : "key compare failed; key = " + new String(key) + ", seek = " + seek;
// access the file and read the container
this.file.seek(seek);
int size = file.readInt();
//assert seek + size + 4 <= this.file.length() : heapFile.getName() + ": too long size " + size + " in record at " + seek;
long filelength = this.file.length(); // put in separate variable for debugging
if (seek + size + 4 > filelength) {
Log.logSevere("BLOBHeap", heapFile.getName() + ": too long size " + size + " in record at " + seek);
throw new IOException(heapFile.getName() + ": too long size " + size + " in record at " + seek);
synchronized (this) {
// check if the index contains the key
final long seek = index.get(key);
if (seek < 0) return;
// check consistency of the index
assert (checkKey(key, seek)) : "key compare failed; key = " + new String(key) + ", seek = " + seek;
// access the file and read the container
this.file.seek(seek);
int size = file.readInt();
//assert seek + size + 4 <= this.file.length() : heapFile.getName() + ": too long size " + size + " in record at " + seek;
long filelength = this.file.length(); // put in separate variable for debugging
if (seek + size + 4 > filelength) {
Log.logSevere("BLOBHeap", heapFile.getName() + ": too long size " + size + " in record at " + seek);
throw new IOException(heapFile.getName() + ": too long size " + size + " in record at " + seek);
}
// add entry to free array
this.free.put(seek, size);
// fill zeros to the content
int l = size; byte[] fill = new byte[size];
while (l-- > 0) fill[l] = 0;
this.file.write(fill, 0, size);
// remove entry from index
this.index.remove(key);
// recursively merge gaps
tryMergeNextGaps(seek, size);
tryMergePreviousGap(seek);
}
// add entry to free array
this.free.put(seek, size);
// fill zeros to the content
int l = size; byte[] fill = new byte[size];
while (l-- > 0) fill[l] = 0;
this.file.write(fill, 0, size);
// remove entry from index
this.index.remove(key);
// recursively merge gaps
tryMergeNextGaps(seek, size);
tryMergePreviousGap(seek);
}
private void tryMergePreviousGap(final long thisSeek) throws IOException {
@ -233,66 +235,68 @@ public class HeapModifier extends HeapReader implements BLOB {
throw new UnsupportedOperationException("put is not supported in BLOBHeapModifier");
}
public synchronized int replace(byte[] key, final Rewriter rewriter) throws IOException {
public int replace(byte[] key, final Rewriter rewriter) throws IOException {
key = normalizeKey(key);
assert key.length == this.keylength;
// check if the index contains the key
final long pos = index.get(key);
if (pos < 0) return 0;
// check consistency of the index
assert checkKey(key, pos) : "key compare failed; key = " + new String(key) + ", seek = " + pos;
// access the file and read the container
file.seek(pos);
final int len = file.readInt() - this.keylength;
if (MemoryControl.available() < len) {
if (!MemoryControl.request(len, true)) return 0; // not enough memory available for this blob
}
// read the key
final byte[] keyf = new byte[this.keylength];
file.readFully(keyf, 0, keyf.length);
assert this.ordering.equal(key, keyf);
// read the blob
byte[] blob = new byte[len];
file.readFully(blob, 0, blob.length);
// rewrite the entry
blob = rewriter.rewrite(blob);
int reduction = len - blob.length;
if (reduction == 0) {
// even if the reduction is zero then it is still be possible that the record has been changed
this.file.seek(pos + 4 + key.length);
synchronized (this) {
// check if the index contains the key
final long pos = index.get(key);
if (pos < 0) return 0;
// check consistency of the index
assert checkKey(key, pos) : "key compare failed; key = " + new String(key) + ", seek = " + pos;
// access the file and read the container
file.seek(pos);
final int len = file.readInt() - this.keylength;
if (MemoryControl.available() < len) {
if (!MemoryControl.request(len, true)) return 0; // not enough memory available for this blob
}
// read the key
final byte[] keyf = new byte[this.keylength];
file.readFully(keyf, 0, keyf.length);
assert this.ordering.equal(key, keyf);
// read the blob
byte[] blob = new byte[len];
file.readFully(blob, 0, blob.length);
// rewrite the entry
blob = rewriter.rewrite(blob);
int reduction = len - blob.length;
if (reduction == 0) {
// even if the reduction is zero then it is still be possible that the record has been changed
this.file.seek(pos + 4 + key.length);
file.write(blob);
return 0;
}
// the new entry must be smaller than the old entry and must at least be 4 bytes smaller
// because that is the space needed to write a new empty entry record at the end of the gap
if (blob.length > len - 4) throw new IOException("replace of BLOB for key " + new String(key) + " failed (too large): new size = " + blob.length + ", old size = " + (len - 4));
// replace old content
this.file.seek(pos);
file.writeInt(blob.length + key.length);
file.write(key);
file.write(blob);
return 0;
}
// the new entry must be smaller than the old entry and must at least be 4 bytes smaller
// because that is the space needed to write a new empty entry record at the end of the gap
if (blob.length > len - 4) throw new IOException("replace of BLOB for key " + new String(key) + " failed (too large): new size = " + blob.length + ", old size = " + (len - 4));
// replace old content
this.file.seek(pos);
file.writeInt(blob.length + key.length);
file.write(key);
file.write(blob);
// define the new empty entry
final int newfreereclen = reduction - 4;
assert newfreereclen >= 0;
file.writeInt(newfreereclen);
// fill zeros to the content
int l = newfreereclen; byte[] fill = new byte[newfreereclen];
while (l-- > 0) fill[l] = 0;
this.file.write(fill, 0, newfreereclen);
// add a new free entry
this.free.put(pos + 4 + blob.length + key.length, newfreereclen);
return reduction;
// define the new empty entry
final int newfreereclen = reduction - 4;
assert newfreereclen >= 0;
file.writeInt(newfreereclen);
// fill zeros to the content
int l = newfreereclen; byte[] fill = new byte[newfreereclen];
while (l-- > 0) fill[l] = 0;
this.file.write(fill, 0, newfreereclen);
// add a new free entry
this.free.put(pos + 4 + blob.length + key.length, newfreereclen);
return reduction;
}
}
}

@ -306,12 +306,14 @@ public class HeapReader {
* @param key
* @return true if the key exists, false otherwise
*/
public synchronized boolean has(byte[] key) {
public boolean has(byte[] key) {
assert index != null;
key = normalizeKey(key);
// check if the file index contains the key
return index.get(key) >= 0;
synchronized (this) {
// check if the file index contains the key
return index.get(key) >= 0;
}
}
public ByteOrder ordering() {
@ -372,37 +374,39 @@ public class HeapReader {
* @return
* @throws IOException
*/
public synchronized byte[] get(byte[] key) throws IOException {
public byte[] get(byte[] key) throws IOException {
key = normalizeKey(key);
// check if the index contains the key
final long pos = index.get(key);
if (pos < 0) return null;
// access the file and read the container
file.seek(pos);
final int len = file.readInt() - index.row().primaryKeyLength;
if (MemoryControl.available() < len * 2 + keepFreeMem) {
if (!MemoryControl.request(len * 2 + keepFreeMem, true)) return null; // not enough memory available for this blob
}
// read the key
final byte[] keyf = new byte[index.row().primaryKeyLength];
file.readFully(keyf, 0, keyf.length);
if (!this.ordering.equal(key, keyf)) {
// verification of the indexed access failed. we must re-read the index
Log.logSevere("kelondroBLOBHeap", "verification indexed access for " + heapFile.toString() + " failed, re-building index");
// this is a severe operation, it should never happen.
// but if the process ends in this state, it would completely fail
// if the index is not rebuild now at once
initIndexReadFromHeap();
synchronized (this) {
// check if the index contains the key
final long pos = index.get(key);
if (pos < 0) return null;
// access the file and read the container
file.seek(pos);
final int len = file.readInt() - index.row().primaryKeyLength;
if (MemoryControl.available() < len * 2 + keepFreeMem) {
if (!MemoryControl.request(len * 2 + keepFreeMem, true)) return null; // not enough memory available for this blob
}
// read the key
final byte[] keyf = new byte[index.row().primaryKeyLength];
file.readFully(keyf, 0, keyf.length);
if (!this.ordering.equal(key, keyf)) {
// verification of the indexed access failed. we must re-read the index
Log.logSevere("kelondroBLOBHeap", "verification indexed access for " + heapFile.toString() + " failed, re-building index");
// this is a severe operation, it should never happen.
// but if the process ends in this state, it would completely fail
// if the index is not rebuild now at once
initIndexReadFromHeap();
}
// read the blob
byte[] blob = new byte[len];
file.readFully(blob, 0, blob.length);
return blob;
}
// read the blob
byte[] blob = new byte[len];
file.readFully(blob, 0, blob.length);
return blob;
}
protected boolean checkKey(byte[] key, final long pos) throws IOException {
@ -422,16 +426,18 @@ public class HeapReader {
* @return the size of the BLOB or -1 if the BLOB does not exist
* @throws IOException
*/
public synchronized long length(byte[] key) throws IOException {
public long length(byte[] key) throws IOException {
key = normalizeKey(key);
// check if the index contains the key
final long pos = index.get(key);
if (pos < 0) return -1;
// access the file and read the size of the container
file.seek(pos);
return file.readInt() - index.row().primaryKeyLength;
synchronized (this) {
// check if the index contains the key
final long pos = index.get(key);
if (pos < 0) return -1;
// access the file and read the size of the container
file.seek(pos);
return file.readInt() - index.row().primaryKeyLength;
}
}
/**

Loading…
Cancel
Save