another fix to the Solr metadata reading process and to the shutdown

process
pull/1/head
Michael Peter Christen 13 years ago
parent b51df6c7e8
commit 94a334f128

@ -7,7 +7,7 @@
// $LastChangedBy$ // $LastChangedBy$
// //
// LICENSE // LICENSE
// //
// This program is free software; you can redistribute it and/or modify // This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by // it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 2 of the License, or // the Free Software Foundation; either version 2 of the License, or
@ -38,7 +38,7 @@ import net.yacy.kelondro.util.MemoryControl;
public class HeapModifier extends HeapReader implements BLOB { public class HeapModifier extends HeapReader implements BLOB {
/* /*
* This class adds a remove operation to a BLOBHeapReader. That means that a BLOBModifier can * This class adds a remove operation to a BLOBHeapReader. That means that a BLOBModifier can
* - read elements from a BLOB * - read elements from a BLOB
@ -57,7 +57,7 @@ public class HeapModifier extends HeapReader implements BLOB {
public HeapModifier(final File heapFile, final int keylength, final ByteOrder ordering) throws IOException { public HeapModifier(final File heapFile, final int keylength, final ByteOrder ordering) throws IOException {
super(heapFile, keylength, ordering); super(heapFile, keylength, ordering);
} }
/** /**
* clears the content of the database * clears the content of the database
* @throws IOException * @throws IOException
@ -81,17 +81,17 @@ public class HeapModifier extends HeapReader implements BLOB {
shrinkWithGapsAtEnd(); shrinkWithGapsAtEnd();
super.close(writeIDX); super.close(writeIDX);
} }
@Override @Override
public synchronized void close() { public synchronized void close() {
close(true); close(true);
} }
@Override @Override
public void finalize() { public void finalize() {
this.close(); this.close();
} }
/** /**
* remove a BLOB * remove a BLOB
* @param key the primary key * @param key the primary key
@ -104,15 +104,15 @@ public class HeapModifier extends HeapReader implements BLOB {
// pre-check before synchronization // pre-check before synchronization
long seek = this.index.get(key); long seek = this.index.get(key);
if (seek < 0) return; if (seek < 0) return;
synchronized (this) { synchronized (this) {
// check again if the index contains the key // check again if the index contains the key
seek = this.index.get(key); seek = this.index.get(key);
if (seek < 0) return; if (seek < 0) return;
// check consistency of the index // check consistency of the index
//assert (checkKey(key, seek)) : "key compare failed; key = " + UTF8.String(key) + ", seek = " + seek; //assert (checkKey(key, seek)) : "key compare failed; key = " + UTF8.String(key) + ", seek = " + seek;
// access the file and read the container // access the file and read the container
this.file.seek(seek); this.file.seek(seek);
int size = this.file.readInt(); int size = this.file.readInt();
@ -123,37 +123,37 @@ public class HeapModifier extends HeapReader implements BLOB {
throw new IOException(this.heapFile.getName() + ": too long size " + size + " in record at " + seek); throw new IOException(this.heapFile.getName() + ": too long size " + size + " in record at " + seek);
} }
super.deleteFingerprint(); super.deleteFingerprint();
// add entry to free array // add entry to free array
this.free.put(seek, size); this.free.put(seek, size);
// fill zeros to the content // fill zeros to the content
int l = size; byte[] fill = new byte[size]; int l = size; byte[] fill = new byte[size];
while (l-- > 0) fill[l] = 0; while (l-- > 0) fill[l] = 0;
this.file.write(fill, 0, size); this.file.write(fill, 0, size);
// remove entry from index // remove entry from index
this.index.remove(key); this.index.remove(key);
// recursively merge gaps // recursively merge gaps
tryMergeNextGaps(seek, size); tryMergeNextGaps(seek, size);
tryMergePreviousGap(seek); tryMergePreviousGap(seek);
} }
} }
private void tryMergePreviousGap(final long thisSeek) throws IOException { private void tryMergePreviousGap(final long thisSeek) throws IOException {
// this is called after a record has been removed. That may cause that a new // this is called after a record has been removed. That may cause that a new
// empty record was surrounded by gaps. We merge with a previous gap, if this // empty record was surrounded by gaps. We merge with a previous gap, if this
// is also empty, but don't do that recursively // is also empty, but don't do that recursively
// If this is successful, it removes the given marker for thisSeed and // If this is successful, it removes the given marker for thisSeed and
// because of this, this method MUST be called AFTER tryMergeNextGaps was called. // because of this, this method MUST be called AFTER tryMergeNextGaps was called.
// first find the gap entry for the closest gap in front of the give gap // first find the gap entry for the closest gap in front of the give gap
SortedMap<Long, Integer> head = this.free.headMap(thisSeek); SortedMap<Long, Integer> head = this.free.headMap(thisSeek);
if (head.isEmpty()) return; if (head.isEmpty()) return;
long previousSeek = head.lastKey().longValue(); long previousSeek = head.lastKey().longValue();
int previousSize = head.get(previousSeek).intValue(); int previousSize = head.get(previousSeek).intValue();
// check if this is directly in front // check if this is directly in front
if (previousSeek + previousSize + 4 == thisSeek) { if (previousSeek + previousSize + 4 == thisSeek) {
// right in front! merge the gaps // right in front! merge the gaps
@ -166,22 +166,22 @@ public class HeapModifier extends HeapReader implements BLOB {
private void tryMergeNextGaps(final long thisSeek, final int thisSize) throws IOException { private void tryMergeNextGaps(final long thisSeek, final int thisSize) throws IOException {
// try to merge two gaps if one gap has been processed already and the position of the next record is known // try to merge two gaps if one gap has been processed already and the position of the next record is known
// if the next record is also a gap, merge these gaps and go on recursively // if the next record is also a gap, merge these gaps and go on recursively
// first check if next gap position is outside of file size // first check if next gap position is outside of file size
long nextSeek = thisSeek + thisSize + 4; long nextSeek = thisSeek + thisSize + 4;
if (nextSeek >= this.file.length()) return; // end of recursion if (nextSeek >= this.file.length()) return; // end of recursion
// move to next position and read record size // move to next position and read record size
Integer nextSize = this.free.get(nextSeek); Integer nextSize = this.free.get(nextSeek);
if (nextSize == null) return; // finished, this is not a gap if (nextSize == null) return; // finished, this is not a gap
// check if the record is a gap-record // check if the record is a gap-record
assert nextSize.intValue() > 0; assert nextSize.intValue() > 0;
if (nextSize.intValue() == 0) { if (nextSize.intValue() == 0) {
// a strange gap record: we can extend the thisGap with four bytes // a strange gap record: we can extend the thisGap with four bytes
// the nextRecord is a gap record; we remove that from the free list because it will be joined with the current gap // the nextRecord is a gap record; we remove that from the free list because it will be joined with the current gap
mergeGaps(thisSeek, thisSize, nextSeek, 0); mergeGaps(thisSeek, thisSize, nextSeek, 0);
// recursively go on // recursively go on
tryMergeNextGaps(thisSeek, thisSize + 4); tryMergeNextGaps(thisSeek, thisSize + 4);
} else { } else {
@ -194,35 +194,35 @@ public class HeapModifier extends HeapReader implements BLOB {
if (t == 0) { if (t == 0) {
// the nextRecord is a gap record; we remove that from the free list because it will be joined with the current gap // the nextRecord is a gap record; we remove that from the free list because it will be joined with the current gap
mergeGaps(thisSeek, thisSize, nextSeek, nextSize.intValue()); mergeGaps(thisSeek, thisSize, nextSeek, nextSize.intValue());
// recursively go on // recursively go on
tryMergeNextGaps(thisSeek, thisSize + 4 + nextSize.intValue()); tryMergeNextGaps(thisSeek, thisSize + 4 + nextSize.intValue());
} }
} }
} }
private void mergeGaps(final long seek0, final int size0, final long seek1, final int size1) throws IOException { private void mergeGaps(final long seek0, final int size0, final long seek1, final int size1) throws IOException {
//System.out.println("*** DEBUG-BLOBHeap " + heapFile.getName() + ": merging gap from pos " + seek0 + ", len " + size0 + " with next record of size " + size1 + " (+ 4)"); //System.out.println("*** DEBUG-BLOBHeap " + heapFile.getName() + ": merging gap from pos " + seek0 + ", len " + size0 + " with next record of size " + size1 + " (+ 4)");
Integer g = this.free.remove(seek1); // g is only used for debugging Integer g = this.free.remove(seek1); // g is only used for debugging
assert g != null; assert g != null;
assert g.intValue() == size1; assert g.intValue() == size1;
// overwrite the size bytes of next records with zeros // overwrite the size bytes of next records with zeros
this.file.seek(seek1); this.file.seek(seek1);
this.file.writeInt(0); this.file.writeInt(0);
// the new size of the current gap: old size + len + 4 // the new size of the current gap: old size + len + 4
int newSize = size0 + 4 + size1; int newSize = size0 + 4 + size1;
this.file.seek(seek0); this.file.seek(seek0);
this.file.writeInt(newSize); this.file.writeInt(newSize);
// register new gap in the free array; overwrite old gap entry // register new gap in the free array; overwrite old gap entry
g = this.free.put(seek0, newSize); g = this.free.put(seek0, newSize);
assert g != null; assert g != null;
assert g.intValue() == size0; assert g.intValue() == size0;
} }
protected void shrinkWithGapsAtEnd() { protected void shrinkWithGapsAtEnd() {
// find gaps at the end of the file and shrink the file by these gaps // find gaps at the end of the file and shrink the file by these gaps
if (this.free == null) return; if (this.free == null) return;
@ -249,26 +249,26 @@ public class HeapModifier extends HeapReader implements BLOB {
public int replace(byte[] key, final Rewriter rewriter) throws IOException { public int replace(byte[] key, final Rewriter rewriter) throws IOException {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override @Override
public int reduce(byte[] key, final Reducer reducer) throws IOException, SpaceExceededException { public int reduce(byte[] key, final Reducer reducer) throws IOException, SpaceExceededException {
key = normalizeKey(key); key = normalizeKey(key);
assert key.length == this.keylength; assert key.length == this.keylength;
// pre-check before synchronization // pre-check before synchronization
long pos = this.index.get(key); long pos = this.index.get(key);
if (pos < 0) return 0; if (pos < 0) return 0;
synchronized (this) { synchronized (this) {
long m = this.mem(); long m = this.mem();
// check again if the index contains the key // check again if the index contains the key
pos = this.index.get(key); pos = this.index.get(key);
if (pos < 0) return 0; if (pos < 0) return 0;
// check consistency of the index // check consistency of the index
//assert checkKey(key, pos) : "key compare failed; key = " + UTF8.String(key) + ", seek = " + pos; //assert checkKey(key, pos) : "key compare failed; key = " + UTF8.String(key) + ", seek = " + pos;
// access the file and read the container // access the file and read the container
this.file.seek(pos); this.file.seek(pos);
final int len = this.file.readInt() - this.keylength; final int len = this.file.readInt() - this.keylength;
@ -276,16 +276,16 @@ public class HeapModifier extends HeapReader implements BLOB {
if (!MemoryControl.request(len, true)) return 0; // not enough memory available for this blob if (!MemoryControl.request(len, true)) return 0; // not enough memory available for this blob
} }
super.deleteFingerprint(); super.deleteFingerprint();
// read the key // read the key
final byte[] keyf = new byte[this.keylength]; final byte[] keyf = new byte[this.keylength];
this.file.readFully(keyf, 0, keyf.length); this.file.readFully(keyf, 0, keyf.length);
assert this.ordering == null || this.ordering.equal(key, keyf); assert this.ordering == null || this.ordering.equal(key, keyf) : "key = " + UTF8.String(key) + ", keyf = " + UTF8.String(keyf);
// read the blob // read the blob
byte[] blob = new byte[len]; byte[] blob = new byte[len];
this.file.readFully(blob, 0, blob.length); this.file.readFully(blob, 0, blob.length);
// rewrite the entry // rewrite the entry
blob = reducer.rewrite(blob); blob = reducer.rewrite(blob);
int reduction = len - blob.length; int reduction = len - blob.length;
@ -295,30 +295,30 @@ public class HeapModifier extends HeapReader implements BLOB {
this.file.write(blob); this.file.write(blob);
return 0; return 0;
} }
// the new entry must be smaller than the old entry and must at least be 4 bytes smaller // the new entry must be smaller than the old entry and must at least be 4 bytes smaller
// because that is the space needed to write a new empty entry record at the end of the gap // because that is the space needed to write a new empty entry record at the end of the gap
if (blob.length > len - 4) throw new IOException("replace of BLOB for key " + UTF8.String(key) + " failed (too large): new size = " + blob.length + ", old size = " + (len - 4)); if (blob.length > len - 4) throw new IOException("replace of BLOB for key " + UTF8.String(key) + " failed (too large): new size = " + blob.length + ", old size = " + (len - 4));
// replace old content // replace old content
this.file.seek(pos); this.file.seek(pos);
this.file.writeInt(blob.length + key.length); this.file.writeInt(blob.length + key.length);
this.file.write(key); this.file.write(key);
this.file.write(blob); this.file.write(blob);
// define the new empty entry // define the new empty entry
final int newfreereclen = reduction - 4; final int newfreereclen = reduction - 4;
assert newfreereclen >= 0; assert newfreereclen >= 0;
this.file.writeInt(newfreereclen); this.file.writeInt(newfreereclen);
// fill zeros to the content // fill zeros to the content
int l = newfreereclen; byte[] fill = new byte[newfreereclen]; int l = newfreereclen; byte[] fill = new byte[newfreereclen];
while (l-- > 0) fill[l] = 0; while (l-- > 0) fill[l] = 0;
this.file.write(fill, 0, newfreereclen); this.file.write(fill, 0, newfreereclen);
// add a new free entry // add a new free entry
this.free.put(pos + 4 + blob.length + key.length, newfreereclen); this.free.put(pos + 4 + blob.length + key.length, newfreereclen);
assert mem() <= m : "m = " + m + ", mem() = " + mem(); assert mem() <= m : "m = " + m + ", mem() = " + mem();
return reduction; return reduction;
} }

@ -223,6 +223,14 @@ public class URIMetadataNode implements URIMetadata {
return UTF8.getBytes((String) languages.get(0)); return UTF8.getBytes((String) languages.get(0));
} }
@Override
public byte[] referrerHash() {
ArrayList<Object> referrer = getArrayList(YaCySchema.referrer_id_txt);
if (referrer == null || referrer.size() == 0) return null;
return ASCII.getBytes((String) referrer.get(0));
}
@Override @Override
public int size() { public int size() {
return getInt(YaCySchema.size_i); return getInt(YaCySchema.size_i);
@ -377,19 +385,27 @@ public class URIMetadataNode implements URIMetadata {
return null; return null;
core.ensureCapacity(core.length() + snippet.length() * 2); core.ensureCapacity(core.length() + snippet.length() * 2);
core.insert(0, "{"); core.insert(0, '{');
core.append(",snippet=").append(crypt.simpleEncode(snippet)); core.append(",snippet=").append(crypt.simpleEncode(snippet));
core.append("}"); core.append('}');
return core.toString(); return core.toString();
//return "{" + core + ",snippet=" + crypt.simpleEncode(snippet) + "}"; //return "{" + core + ",snippet=" + crypt.simpleEncode(snippet) + "}";
} }
/**
* @return the object as String.<br>
* This e.g. looks like this:
* <pre>{hash=jmqfMk7Y3NKw,referrer=------------,mod=20050610,load=20051003,size=51666,wc=1392,cc=0,local=true,q=AEn,dt=h,lang=uk,url=b|aHR0cDovL3d3dy50cmFuc3BhcmVuY3kub3JnL3N1cnZleXMv,descr=b|S25vd2xlZGdlIENlbnRyZTogQ29ycnVwdGlvbiBTdXJ2ZXlzIGFuZCBJbmRpY2Vz}</pre>
*/
@Override @Override
public byte[] referrerHash() { public String toString() {
String[] referrer = (String[]) this.doc.getFieldValue(YaCySchema.referrer_id_txt.name()); final StringBuilder core = corePropList();
if (referrer == null || referrer.length == 0) return null; if (core == null) return null;
return ASCII.getBytes(referrer[0]); core.insert(0, '{');
core.append('}');
return core.toString();
} }
@Override @Override

@ -195,8 +195,10 @@ public class WorkflowProcessor<J extends WorkflowJob> {
} }
// wait until input queue is empty // wait until input queue is empty
while (this.input.size() > 0) { for (int i = 0; i < 10; i++) {
try {Thread.sleep(100);} catch (InterruptedException e) {} if (this.input.size() <= 0) break;
Log.logInfo("WorkflowProcess", "waiting for queue " + this.processName + " to shut down; input.size = " + this.input.size());
try {Thread.sleep(1000);} catch (InterruptedException e) {}
} }
// shut down executors // shut down executors

Loading…
Cancel
Save