fight against problems with remove-methods and synchronization

- some bugs may have been fixed with wrong removal operations
- removed temporary storage of remove-positions and replaced by direct deletions
- changed synchronization
- added many assets
- modified dbtest to also test remove during threaded stresstest

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@3576 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
orbiter 18 years ago
parent b6a5f53020
commit 7a7a1c7c29

@ -9,6 +9,7 @@ import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
@ -133,6 +134,23 @@ public class dbtest {
}
}
public static final class RemoveJob extends STJob {
public RemoveJob(final kelondroIndex aTable, final long aSource) {
super(aTable, aSource);
}
public void run() {
final STEntry entry = new STEntry(this.getSource());
try {
getTable().remove(entry.getKey());
} catch (IOException e) {
System.err.println(e);
e.printStackTrace();
System.exit(0);
}
}
}
public static final class ReadJob extends STJob {
public ReadJob(final kelondroIndex aTable, final long aSource) {
super(aTable, aSource);
@ -357,14 +375,26 @@ public class dbtest {
long readCount = Long.parseLong(args[4]);
long randomstart = Long.parseLong(args[5]);
final Random random = new Random(randomstart);
long r;
int p;
ArrayList ra = new ArrayList();
for (int i = 0; i < writeCount; i++) {
serverInstantThread.oneTimeJob(new WriteJob(table, i), random.nextLong() % 1000, 50);
r = random.nextLong() % 1000;
serverInstantThread.oneTimeJob(new WriteJob(table, r), 0, 50);
if (random.nextLong() % 5 == 0) ra.add(new Long(r));
for (int j = 0; j < readCount; j++) {
serverInstantThread.oneTimeJob(new ReadJob(table, random.nextLong() % writeCount), random.nextLong() % 1000, 20);
}
if ((ra.size() > 0) && (random.nextLong() % 7 == 0)) {
p = Math.abs(random.nextInt()) % ra.size();
System.out.println("remove: " + ((Long) ra.get(p)).longValue());
serverInstantThread.oneTimeJob(new RemoveJob(table, ((Long) ra.remove(p)).longValue()), 0, 50);
}
}
while (serverInstantThread.instantThreadCounter > 0) {
try {Thread.sleep(1000);} catch (InterruptedException e) {} // wait for all tasks to finish
System.out.println("count: " + serverInstantThread.instantThreadCounter + ", jobs: " + serverInstantThread.jobs.toString());
}
while (serverInstantThread.instantThreadCounter > 0)
try {Thread.sleep(100);} catch (InterruptedException e) {} // wait for all tasks to finish
try {Thread.sleep(6000);} catch (InterruptedException e) {}
}

@ -38,9 +38,7 @@ public interface kelondroArray {
public int add(kelondroRow.Entry rowinstance) throws IOException;
public void remove(int index, boolean marked) throws IOException;
public void resolveMarkedRemoved() throws IOException;
public void remove(int index) throws IOException;
public void print() throws IOException;

@ -247,13 +247,6 @@ public class kelondroCollectionIndex {
return array;
}
private void arrayResolveRemoved() throws IOException {
Iterator i = arrays.values().iterator();
while (i.hasNext()) {
((kelondroFixedWidthArray) i.next()).resolveMarkedRemoved();
}
}
private int arrayCapacity(int arrayCounter) {
if (arrayCounter < 0) return 0;
int load = this.loadfactor;
@ -294,7 +287,7 @@ public class kelondroCollectionIndex {
kelondroFixedWidthArray array = getArray(oldPartitionNumber, serialNumber, chunkSize);
// delete old entry
array.remove(oldRownumber, true);
array.remove(oldRownumber);
}
private kelondroRow.Entry array_new(
@ -470,7 +463,10 @@ public class kelondroCollectionIndex {
}
public synchronized void put(byte[] key, kelondroRowCollection collection) throws IOException, kelondroOutOfLimitsException {
assert (key != null);
assert (collection != null);
assert (collection.size() != 0);
// first find an old entry, if one exists
kelondroRow.Entry indexrow = index.get(key);
@ -485,20 +481,12 @@ public class kelondroCollectionIndex {
// overwrite the old collection
// read old information
int oldchunksize = (int) indexrow.getColLong(idx_col_chunksize); // needed only for migration
//int oldchunksize = (int) indexrow.getColLong(idx_col_chunksize); // needed only for migration
int oldchunkcount = (int) indexrow.getColLong(idx_col_chunkcount); // the number if rows in the collection
int oldrownumber = (int) indexrow.getColLong(idx_col_indexpos); // index of the entry in array
int oldPartitionNumber = (int) indexrow.getColByte(idx_col_clusteridx); // points to array file
assert (oldPartitionNumber >= arrayIndex(oldchunkcount));
if ((collection == null) || (collection.size() == 0)) {
// delete the index entry and the array
kelondroFixedWidthArray array = getArray(oldPartitionNumber, serialNumber, oldchunksize);
array.remove(oldrownumber ,false);
index.remove(key);
return;
}
int newPartitionNumber = arrayIndex(collection.size());
// see if we need new space or if we can overwrite the old space
@ -515,7 +503,6 @@ public class kelondroCollectionIndex {
key, collection, indexrow,
newPartitionNumber, serialNumber, this.payloadrow.objectsize()); // modifies indexrow
}
arrayResolveRemoved(); // remove all to-be-removed marked entries
if ((int) indexrow.getColLong(idx_col_chunkcount) != collection.size())
serverLog.logSevere("kelondroCollectionIndex", "UPDATE (put) ERROR: array has different chunkcount than index after merge: index = " + (int) indexrow.getColLong(idx_col_chunkcount) + ", collection.size() = " + collection.size());
@ -684,9 +671,6 @@ public class kelondroCollectionIndex {
indexrows_new.add(indexrow); // collect new index rows
}
// remove all to-be-removed marked entries
arrayResolveRemoved();
// write index entries
index.putMultiple(indexrows_existing, new Date()); // write modified indexrows in optimized manner
index.addUniqueMultiple(indexrows_new, new Date()); // write new indexrows in optimized manner
@ -752,8 +736,7 @@ public class kelondroCollectionIndex {
key, collection, indexrow,
newPartitionNumber, oldSerialNumber, this.payloadrow.objectsize()); // modifies indexrow
}
arrayResolveRemoved(); // remove all to-be-removed marked entries
final int collectionsize = collection.size(); // extra variable for easier debugging
final int indexrowcount = (int) indexrow.getColLong(idx_col_chunkcount);
if (indexrowcount != collectionsize)
@ -858,6 +841,9 @@ public class kelondroCollectionIndex {
oldcollection.sort();
oldcollection.trim(false);
/* in case that the new array size is zero we dont delete the array, just allocate a minimal chunk
*
if (oldcollection.size() == 0) {
// delete the index entry and the array
kelondroFixedWidthArray array = getArray(oldPartitionNumber, serialNumber, oldchunksize);
@ -865,7 +851,7 @@ public class kelondroCollectionIndex {
index.remove(key);
return removed;
}
*/
int newPartitionNumber = arrayIndex(oldcollection.size());
// see if we need new space or if we can overwrite the old space
@ -882,7 +868,6 @@ public class kelondroCollectionIndex {
key, oldcollection, indexrow,
newPartitionNumber, serialNumber, this.payloadrow.objectsize()); // modifies indexrow
}
arrayResolveRemoved(); // remove all to-be-removed marked entries
index.put(indexrow); // write modified indexrow
return removed;
}
@ -941,7 +926,7 @@ public class kelondroCollectionIndex {
if (!(index.row().objectOrder.wellformed(arraykey))) {
// cleanup for a bad bug that corrupted the database
index.remove(indexkey); // the RowCollection must be considered lost
array.remove(rownumber, false); // loose the RowCollection (we don't know how much is lost)
array.remove(rownumber); // loose the RowCollection (we don't know how much is lost)
serverLog.logSevere("kelondroCollectionIndex." + array.filename, "lost a RowCollection because of a bad arraykey");
return new kelondroRowSet(this.payloadrow, 0);
}
@ -969,7 +954,7 @@ public class kelondroCollectionIndex {
index.put(indexrow);
array.logFailure("INCONSISTENCY (get) in " + arrayFile(this.path, this.filenameStub, this.loadfactor, chunksize, clusteridx, serialnumber).toString() + ": array has different chunkcount than index: index = " + chunkcount + ", array = " + chunkcountInArray + "; the index has been auto-fixed");
}
if (remove) array.remove(rownumber, false); // index is removed in calling method
if (remove) array.remove(rownumber); // index is removed in calling method
return collection;
}
@ -1043,7 +1028,7 @@ public class kelondroCollectionIndex {
collection.addUnique(rowdef.newEntry(new byte[][]{"abc".getBytes(), "efg".getBytes()}));
collectionIndex.put("erstes".getBytes(), collection);
for (int i = 0; i <= 170; i++) {
for (int i = 1; i <= 170; i++) {
collection = new kelondroRowSet(rowdef, 0);
for (int j = 0; j < i; j++) {
collection.addUnique(rowdef.newEntry(new byte[][]{("abc" + j).getBytes(), "xxx".getBytes()}));

@ -51,7 +51,6 @@ import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.TreeMap;
import java.util.TreeSet;
public class kelondroFixedWidthArray extends kelondroRecords implements kelondroArray {
@ -59,8 +58,6 @@ public class kelondroFixedWidthArray extends kelondroRecords implements kelondro
private static short thisOHBytes = 0; // our record definition does not need extra bytes
private static short thisOHHandles = 0; // and no handles
private TreeSet markedRemoved; // a set of Integer indexes of removed records (only temporary)
public kelondroFixedWidthArray(File file, kelondroRow rowdef, int intprops) throws IOException {
// this creates a new array
super(file, false, 0, thisOHBytes, thisOHHandles, rowdef, intprops, rowdef.columns() /* txtProps */, 80 /* txtPropWidth */);
@ -73,7 +70,6 @@ public class kelondroFixedWidthArray extends kelondroRecords implements kelondro
try {super.setText(i, rowdef.column(i).toString().getBytes());} catch (IOException e) {}
}
}
markedRemoved = new TreeSet();
}
public kelondroFixedWidthArray(kelondroRA ra, String filename, kelondroRow rowdef, int intprops) throws IOException {
@ -86,7 +82,6 @@ public class kelondroFixedWidthArray extends kelondroRecords implements kelondro
for (int i = 0; i < rowdef.columns(); i++) {
try {super.setText(i, rowdef.column(i).toString().getBytes());} catch (IOException e) {}
}
markedRemoved = new TreeSet();
}
public static kelondroFixedWidthArray open(File file, kelondroRow rowdef, int intprops) {
@ -109,8 +104,8 @@ public class kelondroFixedWidthArray extends kelondroRecords implements kelondro
// this writes a row without reading the row from the file system first
// create a node at position index with rowentry
Handle h = new Handle(index);
newNode(h, (rowentry == null) ? null : rowentry.bytes(), 0).commit(CP_NONE);
Handle h = new Handle(index);
commit(newNode(h, (rowentry == null) ? null : rowentry.bytes(), 0), CP_NONE);
// attention! this newNode call wants that the OH bytes are passed within the bulkchunk
// field. Here, only the rowentry.bytes() raw payload is passed. This is valid, because
// the OHbytes and OHhandles are zero.
@ -119,15 +114,18 @@ public class kelondroFixedWidthArray extends kelondroRecords implements kelondro
public synchronized void setMultiple(TreeMap /* of Integer/kelondroRow.Entry */ rows) throws IOException {
Iterator i = rows.entrySet().iterator();
Map.Entry entry;
Integer k;
while (i.hasNext()) {
entry = (Map.Entry) i.next();
set(((Integer) entry.getKey()).intValue(), (kelondroRow.Entry) entry.getValue());
k = (Integer) entry.getKey();
set(k.intValue(), (kelondroRow.Entry) entry.getValue());
}
}
public synchronized kelondroRow.Entry getIfValid(int index) throws IOException {
byte[] b = getNode(new Handle(index), true).getValueRow();
if (b[0] == 0) return null;
if ((b[0] == -128) && (b[1] == 0)) return null;
return row().newEntry(b);
}
@ -147,51 +145,25 @@ public class kelondroFixedWidthArray extends kelondroRecords implements kelondro
public synchronized int add(kelondroRow.Entry rowentry) throws IOException {
// adds a new rowentry, but re-uses a previously as-deleted marked entry
if (markedRemoved.size() == 0) {
// no records there to be re-used
Node n = newNode(rowentry.bytes());
n.commit(CP_NONE);
return n.handle().hashCode();
} else {
// re-use a removed record
Integer index = (Integer) markedRemoved.first();
markedRemoved.remove(index);
set(index.intValue(), rowentry);
return index.intValue();
}
Node n = newNode(rowentry.bytes());
commit(n, CP_NONE);
return n.handle().hashCode();
}
public synchronized void remove(int index, boolean marked) throws IOException {
public synchronized void remove(int index) throws IOException {
assert (index < (super.free() + super.size())) : "remove: index " + index + " out of bounds " + (super.free() + super.size());
if (marked) {
// does not remove directly, but sets only a mark that a record is to be deleted
// this record can be re-used with add
markedRemoved.add(new Integer(index));
} else {
// get the node at position index
Handle h = new Handle(index);
Node n = getNode(h, false);
// get the node at position index
Handle h = new Handle(index);
Node n = getNode(h, false);
// erase the row
n.setValueRow(null);
n.commit(CP_NONE);
// mark row as deleted so it can be re-used
deleteNode(h);
}
}
public synchronized void resolveMarkedRemoved() throws IOException {
Iterator i = markedRemoved.iterator();
Integer index;
while (i.hasNext()) {
index = (Integer) i.next();
remove(index.intValue(), false);
}
markedRemoved.clear();
}
// erase the row
n.setValueRow(null);
commit(n, CP_NONE);
// mark row as deleted so it can be re-used
deleteNode(h);
}
public void print() throws IOException {
System.out.println("PRINTOUT of table, length=" + size());
@ -229,14 +201,14 @@ public class kelondroFixedWidthArray extends kelondroRecords implements kelondro
k = new kelondroFixedWidthArray(f, rowdef, 6);
k.add(k.row().newEntry(new byte[][]{"a".getBytes(), "xxxx".getBytes()}));
k.add(k.row().newEntry(new byte[][]{"b".getBytes(), "xxxx".getBytes()}));
k.remove(0, false);
k.remove(0);
k.add(k.row().newEntry(new byte[][]{"c".getBytes(), "xxxx".getBytes()}));
k.add(k.row().newEntry(new byte[][]{"d".getBytes(), "xxxx".getBytes()}));
k.add(k.row().newEntry(new byte[][]{"e".getBytes(), "xxxx".getBytes()}));
k.add(k.row().newEntry(new byte[][]{"f".getBytes(), "xxxx".getBytes()}));
k.remove(0, false);
k.remove(1, false);
k.remove(0);
k.remove(1);
k.print();
k.print(true);
@ -251,10 +223,9 @@ public class kelondroFixedWidthArray extends kelondroRecords implements kelondro
k.add(k.row().newEntry(new byte[][]{(Integer.toString(i) + "-" + Integer.toString(j)).getBytes(), "xxxx".getBytes()}));
}
for (int j = 0; j < i; j++) {
k.remove(j, true);
k.remove(j);
}
}
k.resolveMarkedRemoved();
k.print();
k.print(true);
k.close();

@ -218,7 +218,13 @@ public class kelondroFlexTable extends kelondroFlexWidthArray implements kelondr
public synchronized kelondroRow.Entry get(byte[] key) throws IOException {
int pos = RWindex.geti(key);
if ((pos < 0) && (ROindex != null)) pos = ROindex.geti(key);
if (ROindex != null) {
if (pos < 0) {
pos = ROindex.geti(key);
} else {
assert ROindex.geti(key) < 0;
}
}
if (pos < 0) return null;
// i may be greater than this.size(), because this table may have deleted entries
// the deleted entries are subtracted from the 'real' tablesize, so the size may be
@ -298,11 +304,20 @@ public class kelondroFlexTable extends kelondroFlexWidthArray implements kelondr
public synchronized kelondroRow.Entry remove(byte[] key) throws IOException {
int i = RWindex.removei(key);
if ((i < 0) && (ROindex != null)) i = ROindex.removei(key); // yes, we are allowed to remove entries from RO partition of the index
if (ROindex != null) {
if (i < 0) {
i = ROindex.removei(key); // yes, we are allowed to remove entries from RO partition of the index
} else {
assert ROindex.removei(key) < 0;
}
}
assert (RWindex.removei(key) < 0);
assert (ROindex == null) || (ROindex.removei(key) < 0);
if (i < 0) return null;
kelondroRow.Entry r;
r = super.get(i);
super.remove(i, false);
kelondroRow.Entry r = super.get(i);
assert r != null; // error
super.remove(i);
assert super.get(i) == null : "i = " + i + ", get(i) = " + serverLog.arrayList(super.get(i).bytes(), 0, 12);
return r;
}
@ -312,7 +327,7 @@ public class kelondroFlexTable extends kelondroFlexWidthArray implements kelondr
if (i < 0) return null;
kelondroRow.Entry r;
r = super.get(i);
super.remove(i, false);
super.remove(i);
return r;
}

@ -186,8 +186,8 @@ public class kelondroFlexWidthArray implements kelondroArray {
this.init();
}
public void close() {
if (col != null) synchronized (col) {
public synchronized void close() {
if (col != null) {
for (int i = 0; i < col.length; i++) {
if (col[i] != null) {
// a column can be null, this is normal
@ -216,64 +216,58 @@ public class kelondroFlexWidthArray implements kelondroArray {
return col[0].size();
}
public void setMultiple(TreeMap /*of {Integer, kelondroRow.Entry}*/ entries) throws IOException {
public synchronized void setMultiple(TreeMap /*of {Integer, kelondroRow.Entry}*/ entries) throws IOException {
// a R/W head path-optimized option to write a set of entries
Iterator i;
Map.Entry entry;
kelondroRow.Entry rowentry, e;
int c = 0, index;
synchronized (col) {
// go across each file
while (c < rowdef.columns()) {
i = entries.entrySet().iterator();
while (i.hasNext()) {
entry = (Map.Entry) i.next();
index = ((Integer) entry.getKey()).intValue();
rowentry = (kelondroRow.Entry) entry.getValue();
assert rowentry.objectsize() == this.rowdef.objectsize;
// go across each file
while (c < rowdef.columns()) {
i = entries.entrySet().iterator();
while (i.hasNext()) {
entry = (Map.Entry) i.next();
index = ((Integer) entry.getKey()).intValue();
rowentry = (kelondroRow.Entry) entry.getValue();
assert rowentry.objectsize() == this.rowdef.objectsize;
e = col[c].row().newEntry(rowentry.bytes(), rowdef.colstart[c]);
col[c].set(index, e);
}
c = c + col[c].row().columns();
e = col[c].row().newEntry(rowentry.bytes(), rowdef.colstart[c]);
col[c].set(index, e);
}
c = c + col[c].row().columns();
}
}
public void set(int index, kelondroRow.Entry rowentry) throws IOException {
public synchronized void set(int index, kelondroRow.Entry rowentry) throws IOException {
assert rowentry.objectsize() == this.rowdef.objectsize;
int c = 0;
kelondroRow.Entry e;
synchronized (col) {
byte[] reb = rowentry.bytes();
while (c < rowdef.columns()) {
e = col[c].row().newEntry(reb, rowdef.colstart[c]);
col[c].set(index, e);
c = c + col[c].row().columns();
}
}
}
byte[] reb = rowentry.bytes();
while (c < rowdef.columns()) {
e = col[c].row().newEntry(reb, rowdef.colstart[c]);
col[c].set(index, e);
c = c + col[c].row().columns();
}
}
public int add(kelondroRow.Entry rowentry) throws IOException {
public synchronized int add(kelondroRow.Entry rowentry) throws IOException {
assert rowentry.objectsize() == this.rowdef.objectsize;
kelondroRow.Entry e;
int index = -1;
byte[] reb = rowentry.bytes();
synchronized (col) {
e = col[0].row().newEntry(reb, 0);
index = col[0].add(e);
int c = col[0].row().columns();
byte[] reb = rowentry.bytes();
e = col[0].row().newEntry(reb, 0);
index = col[0].add(e);
int c = col[0].row().columns();
while (c < rowdef.columns()) {
e = col[c].row().newEntry(reb, rowdef.colstart[c]);
col[c].set(index, e);
c = c + col[c].row().columns();
}
}
return index;
while (c < rowdef.columns()) {
e = col[c].row().newEntry(reb, rowdef.colstart[c]);
col[c].set(index, e);
c = c + col[c].row().columns();
}
return index;
}
protected TreeMap addMultiple(List rows) throws IOException {
protected synchronized TreeMap addMultiple(List rows) throws IOException {
// result is a Integer/byte[] relation
// of newly added rows (index, key)
TreeMap indexref = new TreeMap();
@ -292,20 +286,18 @@ public class kelondroFlexWidthArray implements kelondroArray {
kelondroRow.Entry e;
int index = -1;
byte[] reb = rowentry.bytes();
synchronized (col) {
e = col[0].row().newEntry(reb, 0);
index = col[0].add(e);
int c = col[0].row().columns();
e = col[0].row().newEntry(reb, 0);
index = col[0].add(e);
int c = col[0].row().columns();
while (c < rowdef.columns()) {
e = col[c].row().newEntry(reb, rowdef.colstart[c]);
// remember write to column, but do not write directly
colm[c].put(new Integer(index), e); // col[c].set(index,e);
c = c + col[c].row().columns();
}
}
indexref.put(new Integer(index), rowentry.getColBytes(0));
}
while (c < rowdef.columns()) {
e = col[c].row().newEntry(reb, rowdef.colstart[c]);
// remember write to column, but do not write directly
colm[c].put(new Integer(index), e); // col[c].set(index,e);
c = c + col[c].row().columns();
}
indexref.put(new Integer(index), rowentry.getColBytes(0));
}
// write the other columns
for (int j = 1; j < col.length; j++) {
if (col[j] != null) col[j].setMultiple(colm[j]);
@ -314,46 +306,34 @@ public class kelondroFlexWidthArray implements kelondroArray {
return indexref;
}
public kelondroRow.Entry get(int index) throws IOException {
int r = 0;
kelondroRow.Entry e, p;
p = rowdef.newEntry();
synchronized (col) {
while (r < rowdef.columns()) {
if (r == 0) {
e = col[r].getIfValid(index);
if (e == null) return null; // probably a deleted entry
} else {
e = col[r].get(index);
}
for (int i = 0; i < col[r].row().columns(); i++)
p.setCol(r + i, e.getColBytes(i));
r = r + col[r].row().columns();
}
}
return p;
public synchronized kelondroRow.Entry get(int index) throws IOException {
kelondroRow.Entry e = col[0].getIfValid(index);
if (e == null) return null; // probably a deleted entry
kelondroRow.Entry p = rowdef.newEntry();
p.setCol(0, e.getColBytes(0));
int r = col[0].row().columns();
while (r < rowdef.columns()) {
e = col[r].get(index);
for (int i = 0; i < col[r].row().columns(); i++) {
p.setCol(r + i, e.getColBytes(i));
}
r = r + col[r].row().columns();
}
return p;
}
public void remove(int index, boolean marked) throws IOException {
public synchronized void remove(int index) throws IOException {
int r = 0;
synchronized (col) {
// remove only from the first column
col[0].remove(index, marked);
r = r + col[r].row().columns();
// the other columns will be blanked out only
while (r < rowdef.columns()) {
col[r].set(index, null);
r = r + col[r].row().columns();
}
}
}
public synchronized void resolveMarkedRemoved() throws IOException {
synchronized (col) {
col[0].resolveMarkedRemoved();
}
// remove only from the first column
col[0].remove(index);
r = r + col[r].row().columns();
// the other columns will be blanked out only
while (r < rowdef.columns()) {
col[r].set(index, null);
r = r + col[r].row().columns();
}
}
public void print() throws IOException {
@ -380,14 +360,14 @@ public class kelondroFlexWidthArray implements kelondroArray {
kelondroFlexWidthArray k = new kelondroFlexWidthArray(f, "flextest", rowdef, true);
k.add(k.row().newEntry(new byte[][]{"a".getBytes(), "xxxx".getBytes()}));
k.add(k.row().newEntry(new byte[][]{"b".getBytes(), "xxxx".getBytes()}));
k.remove(0, false);
k.remove(0);
k.add(k.row().newEntry(new byte[][]{"c".getBytes(), "xxxx".getBytes()}));
k.add(k.row().newEntry(new byte[][]{"d".getBytes(), "xxxx".getBytes()}));
k.add(k.row().newEntry(new byte[][]{"e".getBytes(), "xxxx".getBytes()}));
k.add(k.row().newEntry(new byte[][]{"f".getBytes(), "xxxx".getBytes()}));
k.remove(0, false);
k.remove(1, false);
k.remove(0);
k.remove(1);
k.print();
k.col[0].print(true);
@ -407,11 +387,10 @@ public class kelondroFlexWidthArray implements kelondroArray {
k.close();
k = new kelondroFlexWidthArray(f, "flextest", rowdef, true);
for (int j = 0; j < i; j++) {
k.remove(i*2 - j - 1, true);
k.remove(i*2 - j - 1);
}
k.close();
}
k.resolveMarkedRemoved();
k = new kelondroFlexWidthArray(f, "flextest", rowdef, true);
k.print();
k.col[0].print(true);

@ -177,7 +177,7 @@ public class kelondroRecords {
private int FREEC; // counter of free elements in list of free Nodes
private Handle FREEH; // pointer to first element in list of free Nodes, empty = NUL
public usageControl(boolean init) throws IOException {
private usageControl(boolean init) throws IOException {
if (init) {
this.USEDC = 0;
this.FREEC = 0;
@ -254,7 +254,7 @@ public class kelondroRecords {
assert (h.index >= 0);
assert (h.index != NUL);
//synchronized (USAGE) {
//synchronized (entryFile) {
synchronized (entryFile) {
assert (h.index < USEDC + FREEC) : "USEDC = " + USEDC + ", FREEC = " + FREEC + ", h.index = " + h.index;
long sp = seekpos(h);
assert (sp <= entryFile.length() + ROW.objectsize) : h.index + "/" + sp + " exceeds file size " + entryFile.length();
@ -266,7 +266,7 @@ public class kelondroRecords {
FREEH = h;
writefree();
writeused(false);
//}
}
//}
}
@ -279,7 +279,7 @@ public class kelondroRecords {
}
assert (chunk.length == ROW.objectsize()) : "chunk.length = " + chunk.length + ", ROW.objectsize() = " + ROW.objectsize();
//synchronized (USAGE) {
//synchronized (entryFile) {
synchronized (entryFile) {
if (USAGE.FREEC == 0) {
// generate new entry
int index = USAGE.allCount();
@ -292,19 +292,18 @@ public class kelondroRecords {
USAGE.USEDC++;
USAGE.FREEC--;
// take link
int index;
if (USAGE.FREEH.index == NUL) {
int index = USAGE.FREEH.index;
if (index == NUL) {
serverLog.logSevere("kelondroRecords/" + filename, "INTERNAL ERROR (DATA INCONSISTENCY): re-use of records failed, lost " + (USAGE.FREEC + 1) + " records.");
// try to heal..
USAGE.USEDC = USAGE.allCount() + 1;
USAGE.FREEC = 0;
index = USAGE.USEDC - 1;
} else {
index = USAGE.FREEH.index;
//System.out.println("*DEBUG* ALLOCATED DELETED INDEX " + index);
// check for valid seek position
long seekp = seekpos(USAGE.FREEH);
if (seekp > entryFile.length()) {
if (seekp >= entryFile.length()) {
// this is a severe inconsistency. try to heal..
serverLog.logSevere("kelondroRecords/" + filename, "new Handle: lost " + USAGE.FREEC + " marked nodes; seek position " + seekp + "/" + USAGE.FREEH.index + " out of file size " + entryFile.length() + "/" + ((entryFile.length() - POS_NODES) / recordsize));
index = USAGE.allCount(); // a place at the end of the file
@ -314,6 +313,7 @@ public class kelondroRecords {
} else {
// read link to next element of FREEH chain
USAGE.FREEH.index = entryFile.readInt(seekp);
assert ((USAGE.FREEH.index == NUL) && (USAGE.FREEC == 0)) || seekpos(USAGE.FREEH) < entryFile.length() : "allocatePayload: USAGE.FREEH.index = " + USAGE.FREEH.index + ", seekp = " + seekp;
}
}
USAGE.writeused(false);
@ -321,7 +321,7 @@ public class kelondroRecords {
entryFile.write(seekpos(index) + overhead, chunk, 0, ROW.objectsize()); // overwrite space
return index;
}
//}
}
//}
}
@ -335,7 +335,7 @@ public class kelondroRecords {
}
//assert (chunk.length == ROW.objectsize()) : "chunk.length = " + chunk.length + ", ROW.objectsize() = " + ROW.objectsize();
//synchronized (USAGE) {
//synchronized (entryFile) {
synchronized (entryFile) {
if (index < USAGE.allCount()) {
// write within the file
// this can be critical, if we simply overwrite fields that are marked
@ -355,6 +355,7 @@ public class kelondroRecords {
entryFile.write(seekpos(h), spaceChunk); // occupy space, othervise the USAGE computaton does not work
entryFile.writeInt(seekpos(h), USAGE.FREEH.index);
USAGE.FREEH = h;
assert ((USAGE.FREEH.index == NUL) && (USAGE.FREEC == 0)) || seekpos(USAGE.FREEH) < entryFile.length() : "allocateRecord: USAGE.FREEH.index = " + USAGE.FREEH.index;
USAGE.writefree();
entryFile.commit();
}
@ -368,7 +369,7 @@ public class kelondroRecords {
entryFile.commit();
}
}
//}
}
//}
}
@ -770,7 +771,7 @@ public class kelondroRecords {
return map;
}
public final byte[] bulkRead(int start, int end) throws IOException {
public synchronized final byte[] bulkRead(int start, int end) throws IOException {
// a bulk read simply reads a piece of memory from the record file
// this makes only sense if there are no overhead bytes or pointer
// the end value is OUTSIDE the record interval
@ -803,14 +804,23 @@ public class kelondroRecords {
}
protected synchronized final void deleteNode(Handle handle) throws IOException {
if ((cacheHeaders == null) || (cacheHeaders.size() == 0)) {
USAGE.dispose(handle);
} else synchronized (cacheHeaders) {
cacheHeaders.removeb(handle.index);
cacheDelete++;
USAGE.dispose(handle);
if (cacheHeaders == null) {
USAGE.dispose(handle);
} else synchronized (cacheHeaders) {
if (cacheHeaders.size() == 0) {
USAGE.dispose(handle);
} else {
cacheHeaders.removeb(handle.index);
cacheDelete++;
USAGE.dispose(handle);
}
}
}
public synchronized void commit(Node n, int cachePriority) throws IOException {
n.commit(cachePriority);
}
public final class Node {
// an Node holds all information of one row of data. This includes the key to the entry
@ -839,13 +849,13 @@ public class kelondroRecords {
//private byte[] ohBytes = null; // the overhead bytes, OHBYTEC values
//private Handle[] ohHandle= null; // the overhead handles, OHHANDLEC values
//private byte[][] values = null; // an array of byte[] nodes is the value vector
protected Handle handle = null; // index of the entry, by default NUL means undefined
protected byte[] headChunk = null; // contains ohBytes, ohHandles and the key value
protected byte[] tailChunk = null; // contains all values except the key value
protected boolean headChanged = false;
protected boolean tailChanged = false;
private Handle handle = null; // index of the entry, by default NUL means undefined
private byte[] headChunk = null; // contains ohBytes, ohHandles and the key value
private byte[] tailChunk = null; // contains all values except the key value
private boolean headChanged = false;
private boolean tailChanged = false;
protected Node(byte[] rowinstance) throws IOException {
private Node(byte[] rowinstance) throws IOException {
// this initializer is used to create nodes from bulk-read byte arrays
assert ((rowinstance == null) || (rowinstance.length == ROW.objectsize)) : "bulkchunk.length = " + rowinstance.length + ", ROW.width(0) = " + ROW.width(0);
this.handle = new Handle(USAGE.allocatePayload(rowinstance));
@ -871,7 +881,7 @@ public class kelondroRecords {
this.tailChanged = false; // we write the tail already during allocate
}
protected Node(Handle handle, byte[] bulkchunk, int offset) throws IOException {
private Node(Handle handle, byte[] bulkchunk, int offset) throws IOException {
// this initializer is used to create nodes from bulk-read byte arrays
// if write is true, then the chunk in bulkchunk is written to the file
// othervise it is considered equal to what is stored in the file
@ -902,7 +912,7 @@ public class kelondroRecords {
this.tailChanged = changed;
}
protected Node(Handle handle, Node parentNode, int referenceInParent, boolean fillTail) throws IOException {
private Node(Handle handle, Node parentNode, int referenceInParent, boolean fillTail) throws IOException {
// this creates an entry with an pre-reserved entry position.
// values can be written using the setValues() method,
// but we expect that values are already there in the file.
@ -1040,7 +1050,7 @@ public class kelondroRecords {
return (h == NUL) ? null : new Handle(h);
}
public byte[] setValueRow(byte[] row) throws IOException {
public synchronized byte[] setValueRow(byte[] row) throws IOException {
// if the index is defined, then write values directly to the file, else only to the object
if ((row != null) && (row.length != ROW.objectsize())) throw new IOException("setValueRow with wrong (" + row.length + ") row length instead correct: " + ROW.objectsize());
byte[] result = getValueRow(); // previous value (this loads the values if not already happened)
@ -1055,18 +1065,18 @@ public class kelondroRecords {
return result; // return previous value
}
public boolean valid() {
public synchronized boolean valid() {
// returns true if the key starts with non-zero byte
// this may help to detect deleted entries
return headChunk[overhead] != 0;
return (headChunk[overhead] != 0) && ((headChunk[overhead] != -128) || (headChunk[overhead + 1] != 0));
}
public byte[] getKey() {
public synchronized byte[] getKey() {
// read key
return trimCopy(headChunk, overhead, ROW.width(0));
}
public byte[] getValueRow() throws IOException {
public synchronized byte[] getValueRow() throws IOException {
if (this.tailChunk == null) {
// load all values from the database file
@ -1087,7 +1097,7 @@ public class kelondroRecords {
return row;
}
public synchronized void commit(int cachePriority) throws IOException {
private synchronized void commit(int cachePriority) throws IOException {
// this must be called after all write operations to the node are
// finished
@ -1122,14 +1132,6 @@ public class kelondroRecords {
}
}
public synchronized void collapse() {
// this must be called after all write and read operations to the
// node are finished
this.headChunk = null;
this.tailChunk = null;
this.handle = null;
}
private byte[] trimCopy(byte[] a, int offset, int length) {
if (length > a.length - offset) length = a.length - offset;
while ((length > 0) && (a[offset + length - 1] == 0)) length--;
@ -1168,7 +1170,7 @@ public class kelondroRecords {
if (cacheGrowStatus() == 2) return true; // no need to flush cache space
// just delete any of the entries
if (cacheGrowStatus() <= 1) {
if (cacheGrowStatus() <= 1) synchronized (cacheHeaders) {
cacheHeaders.removeoneb();
cacheFlush++;
}
@ -1316,13 +1318,11 @@ public class kelondroRecords {
}
// Returns the number of key-value mappings in this map.
public int size() {
synchronized (entryFile) {
return USAGE.used();
}
public synchronized int size() {
return USAGE.used();
}
protected final int free() {
protected synchronized final int free() {
return USAGE.FREEC;
}
@ -1379,13 +1379,12 @@ public class kelondroRecords {
if (USAGE.FREEC != 0) {
Handle h = USAGE.FREEH;
long repair_position = POS_FREEH;
int iter = 0;
while (h.index != NUL) {
// check handle
seekp = seekpos(h);
if (seekp > entryFile.length()) {
// repair last hande store position
this.theLogger.severe("KELONDRO WARNING " + this.filename + ": seek position " + seekp + "/" + h.index + " out of file size " + entryFile.length() + "/" + ((entryFile.length() - POS_NODES) / recordsize) + " after " + iter + " iterations; patched wrong node");
this.theLogger.severe("KELONDRO WARNING " + this.filename + ": seek position " + seekp + "/" + h.index + " out of file size " + entryFile.length() + "/" + ((entryFile.length() - POS_NODES) / recordsize) + " after " + markedDeleted.size() + " iterations; patched wrong node");
entryFile.writeInt(repair_position, NUL);
return markedDeleted;
}
@ -1407,10 +1406,9 @@ public class kelondroRecords {
}
// this appears to be correct. go on.
iter++;
if (System.currentTimeMillis() > timeLimit) throw new kelondroException(filename, "time limit of " + maxTime + " exceeded; > " + markedDeleted.size() + " deleted entries");
}
System.out.println("\nDEBUG: " + iter + " deleted entries in " + entryFile.name());
System.out.println("\nDEBUG: " + markedDeleted.size() + " deleted entries in " + entryFile.name());
}
}
return markedDeleted;

@ -225,7 +225,7 @@ public class kelondroRowCollection {
return rowdef.newEntry(chunkcache, index * rowdef.objectsize());
}
public final void set(int index, kelondroRow.Entry a) {
public synchronized final void set(int index, kelondroRow.Entry a) {
assert (index >= 0) : "get: access with index " + index + " is below zero";
assert (index < chunkcount) : "get: access with index " + index + " is above chunkcount " + chunkcount;
//assert (!(bugappearance(a, astart, alength))) : "a = " + serverLog.arrayList(a, astart, alength);
@ -234,11 +234,11 @@ public class kelondroRowCollection {
this.lastTimeWrote = System.currentTimeMillis();
}
public void addUnique(kelondroRow.Entry row) {
public synchronized void addUnique(kelondroRow.Entry row) {
addUnique(row.bytes(), 0, row.bytes().length);
}
public void addUnique(kelondroRow.Entry row, Date entryDate) {
public synchronized void addUnique(kelondroRow.Entry row, Date entryDate) {
addUnique(row);
}
@ -247,11 +247,11 @@ public class kelondroRowCollection {
while (i.hasNext()) addUnique((kelondroRow.Entry) i.next());
}
public void add(byte[] a) {
public synchronized void add(byte[] a) {
addUnique(a, 0, a.length);
}
private synchronized final void addUnique(byte[] a, int astart, int alength) {
private final void addUnique(byte[] a, int astart, int alength) {
assert (a != null);
assert (astart >= 0) && (astart < a.length) : " astart = " + a;
assert (!(serverLog.allZero(a, astart, alength))) : "a = " + serverLog.arrayList(a, astart, alength);
@ -289,7 +289,7 @@ public class kelondroRowCollection {
chunkcount += c.size();
}
protected final void removeShift(int pos, int dist, int upBound) {
private final void removeShift(int pos, int dist, int upBound) {
assert ((pos + dist) * rowdef.objectsize() >= 0) : "pos = " + pos + ", dist = " + dist + ", rowdef.objectsize() = " + rowdef.objectsize;
assert (pos * rowdef.objectsize() >= 0) : "pos = " + pos + ", rowdef.objectsize() = " + rowdef.objectsize;
assert ((pos + dist) * rowdef.objectsize() + (upBound - pos - dist) * rowdef.objectsize() <= chunkcache.length) : "pos = " + pos + ", dist = " + dist + ", rowdef.objectsize() = " + rowdef.objectsize + ", upBound = " + upBound + ", chunkcache.length = " + chunkcache.length;
@ -299,7 +299,7 @@ public class kelondroRowCollection {
(upBound - pos - dist) * rowdef.objectsize());
}
protected final void copytop(int i) {
private final void copytop(int i) {
// copies the topmost row element to given position
if (i == chunkcount - 1) return;
System.arraycopy(chunkcache, this.rowdef.objectsize() * (chunkcount - 1), chunkcache, this.rowdef.objectsize() * i, this.rowdef.objectsize());
@ -458,7 +458,7 @@ public class kelondroRowCollection {
swap(j, j - 1, 0);
}
protected final int swap(int i, int j, int p) {
private final int swap(int i, int j, int p) {
if (i == j) return p;
if ((this.chunkcount + 1) * this.rowdef.objectsize() < this.chunkcache.length) {
// there is space in the chunkcache that we can use as buffer
@ -490,7 +490,7 @@ public class kelondroRowCollection {
}
}
public String toString() {
public synchronized String toString() {
StringBuffer s = new StringBuffer();
Iterator i = rows();
if (i.hasNext()) s.append(((kelondroRow.Entry) i.next()).toString());
@ -519,13 +519,13 @@ public class kelondroRowCollection {
return c;
}
protected int compare(byte[] a, int astart, int alength, int chunknumber) {
protected synchronized int compare(byte[] a, int astart, int alength, int chunknumber) {
assert (chunknumber < chunkcount);
int l = Math.min(this.rowdef.width(rowdef.primaryKey), Math.min(a.length - astart, alength));
return rowdef.objectOrder.compare(a, astart, l, chunkcache, chunknumber * this.rowdef.objectsize() + this.rowdef.colstart[rowdef.primaryKey], this.rowdef.width(rowdef.primaryKey));
}
protected boolean match(byte[] a, int astart, int alength, int chunknumber) {
protected synchronized boolean match(byte[] a, int astart, int alength, int chunknumber) {
if (chunknumber >= chunkcount) return false;
int i = 0;
int p = chunknumber * this.rowdef.objectsize() + this.rowdef.colstart[rowdef.primaryKey];

@ -109,6 +109,7 @@ public class kelondroRowSet extends kelondroRowCollection implements kelondroInd
if (index < 0) return null;
kelondroRow.Entry entry = super.get(index);
super.removeRow(index);
assert find(a, start, length) < 0; // check if the remove worked
return entry;
}

@ -109,6 +109,7 @@ public final class kelondroStack extends kelondroRecords {
public class Counter implements Iterator {
Handle nextHandle = null;
Handle lastHandle = null;
public Counter() {
nextHandle = getHandle(root);
@ -119,17 +120,21 @@ public final class kelondroStack extends kelondroRecords {
}
public Object next() {
Handle ret = nextHandle;
lastHandle = nextHandle;
try {
nextHandle = getNode(nextHandle, null, 0, false).getOHHandle(right);
return getNode(ret, null, 0, true);
return getNode(lastHandle, null, 0, true);
} catch (IOException e) {
throw new kelondroException(filename, "IO error at Counter:next()");
}
}
public void remove() {
throw new UnsupportedOperationException("no remove here..");
try {
unlinkNode(getNode(lastHandle, false));
} catch (IOException e) {
e.printStackTrace();
}
}
}
@ -145,7 +150,7 @@ public final class kelondroStack extends kelondroRecords {
Node n = newNode(row.bytes());
n.setOHHandle(left, null);
n.setOHHandle(right, null);
n.commit(CP_NONE);
commit(n, CP_NONE);
// assign handles
setHandle(root, n.handle());
setHandle(toor, n.handle());
@ -157,8 +162,8 @@ public final class kelondroStack extends kelondroRecords {
n.setOHHandle(right, null);
Node n1 = getNode(getHandle(toor), null, 0, false);
n1.setOHHandle(right, n.handle());
n.commit(CP_NONE);
n1.commit(CP_NONE);
commit(n, CP_NONE);
commit(n1, CP_NONE);
// assign handles
setHandle(toor, n.handle());
// thats it
@ -248,7 +253,7 @@ public final class kelondroStack extends kelondroRecords {
Node k = getNode(l, null, 0, false);
k.setOHHandle(left, k.getOHHandle(left));
k.setOHHandle(right, r);
k.commit(CP_NONE);
commit(k, CP_NONE);
}
// look right
if (r == null) {
@ -259,7 +264,7 @@ public final class kelondroStack extends kelondroRecords {
Node k = getNode(r, null, 0, false);
k.setOHHandle(left, l);
k.setOHHandle(right, k.getOHHandle(right));
k.commit(CP_NONE);
commit(k, CP_NONE);
}
}

@ -164,10 +164,10 @@ public class kelondroTree extends kelondroRecords implements kelondroIndex {
private void commitNode(Node n) throws IOException {
Handle left = n.getOHHandle(leftchild);
Handle right = n.getOHHandle(rightchild);
if ((left == null) && (right == null)) n.commit(CP_LOW);
else if (left == null) n.commit(CP_MEDIUM);
else if (right == null) n.commit(CP_MEDIUM);
else n.commit(CP_HIGH);
if ((left == null) && (right == null)) commit(n, CP_LOW);
else if (left == null) commit(n, CP_MEDIUM);
else if (right == null) commit(n, CP_MEDIUM);
else commit(n, CP_HIGH);
}
public boolean has(byte[] key) throws IOException {
@ -269,7 +269,7 @@ public class kelondroTree extends kelondroRecords implements kelondroIndex {
thenode.setOHHandle(parent, null);
thenode.setOHHandle(leftchild, null);
thenode.setOHHandle(rightchild, null);
thenode.commit(CP_NONE);
commit(thenode, CP_NONE);
logWarning("kelondroTree.Search.process: database contains loops; the loop-nodes have been auto-fixed");
found = false;
return;
@ -372,7 +372,7 @@ public class kelondroTree extends kelondroRecords implements kelondroIndex {
e.setOHHandle(leftchild, null);
e.setOHHandle(rightchild, null);
// do updates
e.commit(CP_LOW);
commit(e, CP_LOW);
setHandle(root, e.handle());
result = null;
} else {
@ -391,15 +391,15 @@ public class kelondroTree extends kelondroRecords implements kelondroIndex {
theNode.setOHHandle(parent, parentNode.handle());
theNode.setOHHandle(leftchild, null);
theNode.setOHHandle(rightchild, null);
theNode.commit(CP_LOW);
commit(theNode, CP_LOW);
// check consistency and link new node to parent node
byte parentBalance;
if (writeSearchObj.isLeft()) {
if (parentNode.getOHHandle(leftchild) != null) throw new kelondroException(filename, "tried to create leftchild node twice. parent=" + new String(parentNode.getKey()) + ", leftchild=" + new String(new Node(parentNode.getOHHandle(leftchild), (Node) null, 0, true).getKey()));
if (parentNode.getOHHandle(leftchild) != null) throw new kelondroException(filename, "tried to create leftchild node twice. parent=" + new String(parentNode.getKey()) + ", leftchild=" + new String(getNode(parentNode.getOHHandle(leftchild), (Node) null, 0, true).getKey()));
parentNode.setOHHandle(leftchild, theNode.handle());
} else if (writeSearchObj.isRight()) {
if (parentNode.getOHHandle(rightchild) != null) throw new kelondroException(filename, "tried to create rightchild node twice. parent=" + new String(parentNode.getKey()) + ", rightchild=" + new String(new Node(parentNode.getOHHandle(rightchild), (Node) null, 0, true).getKey()));
if (parentNode.getOHHandle(rightchild) != null) throw new kelondroException(filename, "tried to create rightchild node twice. parent=" + new String(parentNode.getKey()) + ", rightchild=" + new String(getNode(parentNode.getOHHandle(rightchild), (Node) null, 0, true).getKey()));
parentNode.setOHHandle(rightchild, theNode.handle());
} else {
throw new kelondroException(filename, "neither left nor right child");
@ -933,7 +933,7 @@ public class kelondroTree extends kelondroRecords implements kelondroIndex {
if (visitedNodeHandles.contains(childHandle)) {
// try to repair the nextNode
nextNode.setOHHandle(childtype, null);
nextNode.commit(CP_NONE);
commit(nextNode, CP_NONE);
logWarning("nodeIterator.next: internal loopback; fixed loop and try to go on");
break;
}

@ -146,37 +146,42 @@ public class plasmaCrawlBalancer {
// this method is only here, because so many import/export methods need it
// and it was implemented in the previous architecture
// however, usage is not recommendet
int s = urlFileIndex.size();
kelondroRow.Entry entry = urlFileIndex.remove(urlhash.getBytes());
if (entry == null) return null;
assert urlFileIndex.size() + 1 == s : "urlFileIndex.size() = " + urlFileIndex.size() + ", s = " + s;
// now delete that thing also from the queues
// iterate through the RAM stack
Iterator i = urlRAMStack.iterator();
String h;
boolean removed = false;
while (i.hasNext()) {
h = (String) i.next();
if (h.equals(urlhash)) {
i.remove();
removed = true;
break;
return new plasmaCrawlEntry(entry);
}
}
if ((kelondroRecords.debugmode) && (!removed)) {
serverLog.logWarning("PLASMA BALANCER", "remove: not found urlhash " + urlhash + " in " + stackname);
}
// we cannot iterate through the file stack, because the stack iterator
// has not yet a delete method implemented. It would also be a bad idea
// to do that, it would make too much IO load
// instead, the top/pop methods that aquire elements from the stack, that
// cannot be found in the urlFileIndex must handle that case silently
// iterate through the file stack
// in general this is a bad idea. But this can only be avoided by avoidance of this method
i = urlFileStack.iterator();
while (i.hasNext()) {
h = new String(((kelondroRecords.Node) i.next()).getKey());
if (h.equals(urlhash)) {
i.remove();
return new plasmaCrawlEntry(entry);
}
}
if (kelondroRecords.debugmode) {
serverLog.logWarning("PLASMA BALANCER", "remove: not found urlhash " + urlhash + " in " + stackname);
}
return new plasmaCrawlEntry(entry);
}
public boolean has(String urlhash) {
public synchronized boolean has(String urlhash) {
try {
return urlFileIndex.has(urlhash.getBytes());
} catch (IOException e) {
@ -189,7 +194,7 @@ public class plasmaCrawlBalancer {
int componentsize = urlFileStack.size() + urlRAMStack.size() + sizeDomainStacks();
try {
if (componentsize != urlFileIndex.size()) {
// here is urlIndexFile.size() always bigger. why?
// here is urlIndexFile.size() always smaller. why?
if (kelondroRecords.debugmode) {
serverLog.logWarning("PLASMA BALANCER", "size operation wrong in " + stackname + " - componentsize = " + componentsize + ", urlFileIndex.size() = " + urlFileIndex.size());
}
@ -285,7 +290,7 @@ public class plasmaCrawlBalancer {
}
// 2nd-a: check domainStacks for latest arrivals
if (result == null) {
if ((result == null) && (domainStacks.size() > 0)) {
// we select specific domains that have not been used for a long time
// i.e. 60 seconds. Latest arrivals that have not yet been crawled
// fit also in that scheme
@ -320,7 +325,7 @@ public class plasmaCrawlBalancer {
}
// 2nd-b: check domainStacks for best match between stack size and retrieval time
if (result == null) {
if ((result == null) && (domainStacks.size() > 0)) {
// we order all domains by the number of entries per domain
// then we iterate through these domains in descending entry order
// and that that one, that has a delta > minimumDelta
@ -407,8 +412,13 @@ public class plasmaCrawlBalancer {
// update statistical data
domainAccess.put(result.substring(6), new Long(System.currentTimeMillis()));
int s = urlFileIndex.size();
kelondroRow.Entry entry = urlFileIndex.remove(result.getBytes());
if (entry == null) return null;
assert urlFileIndex.size() + 1 == s : "urlFileIndex.size() = " + urlFileIndex.size() + ", s = " + s + ", result = " + result;
if (entry == null) {
serverLog.logSevere("PLASMA BALANCER", "get() found a valid urlhash, but failed to fetch the corresponding url entry - total size = " + size() + ", fileStack.size() = " + urlFileStack.size() + ", ramStack.size() = " + urlRAMStack.size() + ", domainStacks.size() = " + domainStacks.size());
return null;
}
return new plasmaCrawlEntry(entry);
}
@ -444,13 +454,13 @@ public class plasmaCrawlBalancer {
return new plasmaCrawlEntry(entry);
}
public Iterator iterator() throws IOException {
public synchronized Iterator iterator() throws IOException {
return new EntryIterator();
}
public class EntryIterator implements Iterator {
private class EntryIterator implements Iterator {
Iterator rowIterator;
private Iterator rowIterator;
public EntryIterator() throws IOException {
rowIterator = urlFileIndex.rows(true, null);

@ -42,6 +42,7 @@ package de.anomic.server;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.TreeMap;
import de.anomic.server.logging.serverLog;
@ -49,8 +50,10 @@ public final class serverInstantThread extends serverAbstractThread implements s
private Method jobExecMethod, jobCountMethod, freememExecMethod;
private Object environment;
private Long handle;
public static int instantThreadCounter = 0;
public static TreeMap jobs = new TreeMap();
public serverInstantThread(Object env, String jobExec, String jobCount, String freemem) {
// jobExec is the name of a method of the object 'env' that executes the one-step-run
@ -81,6 +84,7 @@ public final class serverInstantThread extends serverAbstractThread implements s
}
this.environment = env;
this.setName(env.getClass().getName() + "." + jobExec);
this.handle = new Long(System.currentTimeMillis() + this.getName().hashCode());
}
public int getJobCount() {
@ -103,6 +107,8 @@ public final class serverInstantThread extends serverAbstractThread implements s
public boolean job() throws Exception {
instantThreadCounter++;
//System.out.println("started job " + this.handle + ": " + this.getName());
synchronized(jobs) {jobs.put(this.handle, this.getName());}
boolean jobHasDoneSomething = false;
try {
Object result = jobExecMethod.invoke(environment, new Object[0]);
@ -129,6 +135,7 @@ public final class serverInstantThread extends serverAbstractThread implements s
freemem();
}
instantThreadCounter--;
synchronized(jobs) {jobs.remove(this.handle);}
return jobHasDoneSomething;
}

Loading…
Cancel
Save