From 7a7a1c7c2957ed4b6ad6bb700446acd8dc0990ce Mon Sep 17 00:00:00 2001 From: orbiter Date: Tue, 17 Apr 2007 15:15:47 +0000 Subject: [PATCH] fight against problems with remove-methods and synchronization - some bugs may have been fixed with wrong removal operations - removed temporary storage of remove-positions and replaced by direct deletions - changed synchronization - added many assets - modified dbtest to also test remove during threaded stresstest git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@3576 6c8d7289-2bf4-0310-a012-ef5d649a1542 --- source/dbtest.java | 36 +++- source/de/anomic/kelondro/kelondroArray.java | 4 +- .../kelondro/kelondroCollectionIndex.java | 43 ++--- .../kelondro/kelondroFixedWidthArray.java | 77 +++----- .../de/anomic/kelondro/kelondroFlexTable.java | 27 ++- .../kelondro/kelondroFlexWidthArray.java | 169 ++++++++---------- .../de/anomic/kelondro/kelondroRecords.java | 98 +++++----- .../kelondro/kelondroRowCollection.java | 22 +-- source/de/anomic/kelondro/kelondroRowSet.java | 1 + source/de/anomic/kelondro/kelondroStack.java | 21 ++- source/de/anomic/kelondro/kelondroTree.java | 20 +-- .../de/anomic/plasma/plasmaCrawlBalancer.java | 50 +++--- .../de/anomic/server/serverInstantThread.java | 7 + 13 files changed, 287 insertions(+), 288 deletions(-) diff --git a/source/dbtest.java b/source/dbtest.java index 23ecde27d..44040750b 100644 --- a/source/dbtest.java +++ b/source/dbtest.java @@ -9,6 +9,7 @@ import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.util.ArrayList; import java.util.Date; import java.util.Iterator; import java.util.List; @@ -133,6 +134,23 @@ public class dbtest { } } + public static final class RemoveJob extends STJob { + public RemoveJob(final kelondroIndex aTable, final long aSource) { + super(aTable, aSource); + } + + public void run() { + final STEntry entry = new STEntry(this.getSource()); + try { + getTable().remove(entry.getKey()); + } catch (IOException e) { + System.err.println(e); + e.printStackTrace(); + System.exit(0); + } + } + } + public static final class ReadJob extends STJob { public ReadJob(final kelondroIndex aTable, final long aSource) { super(aTable, aSource); @@ -357,14 +375,26 @@ public class dbtest { long readCount = Long.parseLong(args[4]); long randomstart = Long.parseLong(args[5]); final Random random = new Random(randomstart); + long r; + int p; + ArrayList ra = new ArrayList(); for (int i = 0; i < writeCount; i++) { - serverInstantThread.oneTimeJob(new WriteJob(table, i), random.nextLong() % 1000, 50); + r = random.nextLong() % 1000; + serverInstantThread.oneTimeJob(new WriteJob(table, r), 0, 50); + if (random.nextLong() % 5 == 0) ra.add(new Long(r)); for (int j = 0; j < readCount; j++) { serverInstantThread.oneTimeJob(new ReadJob(table, random.nextLong() % writeCount), random.nextLong() % 1000, 20); } + if ((ra.size() > 0) && (random.nextLong() % 7 == 0)) { + p = Math.abs(random.nextInt()) % ra.size(); + System.out.println("remove: " + ((Long) ra.get(p)).longValue()); + serverInstantThread.oneTimeJob(new RemoveJob(table, ((Long) ra.remove(p)).longValue()), 0, 50); + } + } + while (serverInstantThread.instantThreadCounter > 0) { + try {Thread.sleep(1000);} catch (InterruptedException e) {} // wait for all tasks to finish + System.out.println("count: " + serverInstantThread.instantThreadCounter + ", jobs: " + serverInstantThread.jobs.toString()); } - while (serverInstantThread.instantThreadCounter > 0) - try {Thread.sleep(100);} catch (InterruptedException e) {} // wait for all tasks to finish try {Thread.sleep(6000);} catch (InterruptedException e) {} } diff --git a/source/de/anomic/kelondro/kelondroArray.java b/source/de/anomic/kelondro/kelondroArray.java index 298801119..8aedea3b6 100644 --- a/source/de/anomic/kelondro/kelondroArray.java +++ b/source/de/anomic/kelondro/kelondroArray.java @@ -38,9 +38,7 @@ public interface kelondroArray { public int add(kelondroRow.Entry rowinstance) throws IOException; - public void remove(int index, boolean marked) throws IOException; - - public void resolveMarkedRemoved() throws IOException; + public void remove(int index) throws IOException; public void print() throws IOException; diff --git a/source/de/anomic/kelondro/kelondroCollectionIndex.java b/source/de/anomic/kelondro/kelondroCollectionIndex.java index 1bf72697c..cc344cb34 100644 --- a/source/de/anomic/kelondro/kelondroCollectionIndex.java +++ b/source/de/anomic/kelondro/kelondroCollectionIndex.java @@ -247,13 +247,6 @@ public class kelondroCollectionIndex { return array; } - private void arrayResolveRemoved() throws IOException { - Iterator i = arrays.values().iterator(); - while (i.hasNext()) { - ((kelondroFixedWidthArray) i.next()).resolveMarkedRemoved(); - } - } - private int arrayCapacity(int arrayCounter) { if (arrayCounter < 0) return 0; int load = this.loadfactor; @@ -294,7 +287,7 @@ public class kelondroCollectionIndex { kelondroFixedWidthArray array = getArray(oldPartitionNumber, serialNumber, chunkSize); // delete old entry - array.remove(oldRownumber, true); + array.remove(oldRownumber); } private kelondroRow.Entry array_new( @@ -470,7 +463,10 @@ public class kelondroCollectionIndex { } public synchronized void put(byte[] key, kelondroRowCollection collection) throws IOException, kelondroOutOfLimitsException { - + assert (key != null); + assert (collection != null); + assert (collection.size() != 0); + // first find an old entry, if one exists kelondroRow.Entry indexrow = index.get(key); @@ -485,20 +481,12 @@ public class kelondroCollectionIndex { // overwrite the old collection // read old information - int oldchunksize = (int) indexrow.getColLong(idx_col_chunksize); // needed only for migration + //int oldchunksize = (int) indexrow.getColLong(idx_col_chunksize); // needed only for migration int oldchunkcount = (int) indexrow.getColLong(idx_col_chunkcount); // the number if rows in the collection int oldrownumber = (int) indexrow.getColLong(idx_col_indexpos); // index of the entry in array int oldPartitionNumber = (int) indexrow.getColByte(idx_col_clusteridx); // points to array file assert (oldPartitionNumber >= arrayIndex(oldchunkcount)); - if ((collection == null) || (collection.size() == 0)) { - // delete the index entry and the array - kelondroFixedWidthArray array = getArray(oldPartitionNumber, serialNumber, oldchunksize); - array.remove(oldrownumber ,false); - index.remove(key); - return; - } - int newPartitionNumber = arrayIndex(collection.size()); // see if we need new space or if we can overwrite the old space @@ -515,7 +503,6 @@ public class kelondroCollectionIndex { key, collection, indexrow, newPartitionNumber, serialNumber, this.payloadrow.objectsize()); // modifies indexrow } - arrayResolveRemoved(); // remove all to-be-removed marked entries if ((int) indexrow.getColLong(idx_col_chunkcount) != collection.size()) serverLog.logSevere("kelondroCollectionIndex", "UPDATE (put) ERROR: array has different chunkcount than index after merge: index = " + (int) indexrow.getColLong(idx_col_chunkcount) + ", collection.size() = " + collection.size()); @@ -684,9 +671,6 @@ public class kelondroCollectionIndex { indexrows_new.add(indexrow); // collect new index rows } - // remove all to-be-removed marked entries - arrayResolveRemoved(); - // write index entries index.putMultiple(indexrows_existing, new Date()); // write modified indexrows in optimized manner index.addUniqueMultiple(indexrows_new, new Date()); // write new indexrows in optimized manner @@ -752,8 +736,7 @@ public class kelondroCollectionIndex { key, collection, indexrow, newPartitionNumber, oldSerialNumber, this.payloadrow.objectsize()); // modifies indexrow } - arrayResolveRemoved(); // remove all to-be-removed marked entries - + final int collectionsize = collection.size(); // extra variable for easier debugging final int indexrowcount = (int) indexrow.getColLong(idx_col_chunkcount); if (indexrowcount != collectionsize) @@ -858,6 +841,9 @@ public class kelondroCollectionIndex { oldcollection.sort(); oldcollection.trim(false); + /* in case that the new array size is zero we dont delete the array, just allocate a minimal chunk + * + if (oldcollection.size() == 0) { // delete the index entry and the array kelondroFixedWidthArray array = getArray(oldPartitionNumber, serialNumber, oldchunksize); @@ -865,7 +851,7 @@ public class kelondroCollectionIndex { index.remove(key); return removed; } - + */ int newPartitionNumber = arrayIndex(oldcollection.size()); // see if we need new space or if we can overwrite the old space @@ -882,7 +868,6 @@ public class kelondroCollectionIndex { key, oldcollection, indexrow, newPartitionNumber, serialNumber, this.payloadrow.objectsize()); // modifies indexrow } - arrayResolveRemoved(); // remove all to-be-removed marked entries index.put(indexrow); // write modified indexrow return removed; } @@ -941,7 +926,7 @@ public class kelondroCollectionIndex { if (!(index.row().objectOrder.wellformed(arraykey))) { // cleanup for a bad bug that corrupted the database index.remove(indexkey); // the RowCollection must be considered lost - array.remove(rownumber, false); // loose the RowCollection (we don't know how much is lost) + array.remove(rownumber); // loose the RowCollection (we don't know how much is lost) serverLog.logSevere("kelondroCollectionIndex." + array.filename, "lost a RowCollection because of a bad arraykey"); return new kelondroRowSet(this.payloadrow, 0); } @@ -969,7 +954,7 @@ public class kelondroCollectionIndex { index.put(indexrow); array.logFailure("INCONSISTENCY (get) in " + arrayFile(this.path, this.filenameStub, this.loadfactor, chunksize, clusteridx, serialnumber).toString() + ": array has different chunkcount than index: index = " + chunkcount + ", array = " + chunkcountInArray + "; the index has been auto-fixed"); } - if (remove) array.remove(rownumber, false); // index is removed in calling method + if (remove) array.remove(rownumber); // index is removed in calling method return collection; } @@ -1043,7 +1028,7 @@ public class kelondroCollectionIndex { collection.addUnique(rowdef.newEntry(new byte[][]{"abc".getBytes(), "efg".getBytes()})); collectionIndex.put("erstes".getBytes(), collection); - for (int i = 0; i <= 170; i++) { + for (int i = 1; i <= 170; i++) { collection = new kelondroRowSet(rowdef, 0); for (int j = 0; j < i; j++) { collection.addUnique(rowdef.newEntry(new byte[][]{("abc" + j).getBytes(), "xxx".getBytes()})); diff --git a/source/de/anomic/kelondro/kelondroFixedWidthArray.java b/source/de/anomic/kelondro/kelondroFixedWidthArray.java index 3ac4e4f25..c12b3dec2 100644 --- a/source/de/anomic/kelondro/kelondroFixedWidthArray.java +++ b/source/de/anomic/kelondro/kelondroFixedWidthArray.java @@ -51,7 +51,6 @@ import java.io.IOException; import java.util.Iterator; import java.util.Map; import java.util.TreeMap; -import java.util.TreeSet; public class kelondroFixedWidthArray extends kelondroRecords implements kelondroArray { @@ -59,8 +58,6 @@ public class kelondroFixedWidthArray extends kelondroRecords implements kelondro private static short thisOHBytes = 0; // our record definition does not need extra bytes private static short thisOHHandles = 0; // and no handles - private TreeSet markedRemoved; // a set of Integer indexes of removed records (only temporary) - public kelondroFixedWidthArray(File file, kelondroRow rowdef, int intprops) throws IOException { // this creates a new array super(file, false, 0, thisOHBytes, thisOHHandles, rowdef, intprops, rowdef.columns() /* txtProps */, 80 /* txtPropWidth */); @@ -73,7 +70,6 @@ public class kelondroFixedWidthArray extends kelondroRecords implements kelondro try {super.setText(i, rowdef.column(i).toString().getBytes());} catch (IOException e) {} } } - markedRemoved = new TreeSet(); } public kelondroFixedWidthArray(kelondroRA ra, String filename, kelondroRow rowdef, int intprops) throws IOException { @@ -86,7 +82,6 @@ public class kelondroFixedWidthArray extends kelondroRecords implements kelondro for (int i = 0; i < rowdef.columns(); i++) { try {super.setText(i, rowdef.column(i).toString().getBytes());} catch (IOException e) {} } - markedRemoved = new TreeSet(); } public static kelondroFixedWidthArray open(File file, kelondroRow rowdef, int intprops) { @@ -109,8 +104,8 @@ public class kelondroFixedWidthArray extends kelondroRecords implements kelondro // this writes a row without reading the row from the file system first // create a node at position index with rowentry - Handle h = new Handle(index); - newNode(h, (rowentry == null) ? null : rowentry.bytes(), 0).commit(CP_NONE); + Handle h = new Handle(index); + commit(newNode(h, (rowentry == null) ? null : rowentry.bytes(), 0), CP_NONE); // attention! this newNode call wants that the OH bytes are passed within the bulkchunk // field. Here, only the rowentry.bytes() raw payload is passed. This is valid, because // the OHbytes and OHhandles are zero. @@ -119,15 +114,18 @@ public class kelondroFixedWidthArray extends kelondroRecords implements kelondro public synchronized void setMultiple(TreeMap /* of Integer/kelondroRow.Entry */ rows) throws IOException { Iterator i = rows.entrySet().iterator(); Map.Entry entry; + Integer k; while (i.hasNext()) { entry = (Map.Entry) i.next(); - set(((Integer) entry.getKey()).intValue(), (kelondroRow.Entry) entry.getValue()); + k = (Integer) entry.getKey(); + set(k.intValue(), (kelondroRow.Entry) entry.getValue()); } } public synchronized kelondroRow.Entry getIfValid(int index) throws IOException { byte[] b = getNode(new Handle(index), true).getValueRow(); if (b[0] == 0) return null; + if ((b[0] == -128) && (b[1] == 0)) return null; return row().newEntry(b); } @@ -147,51 +145,25 @@ public class kelondroFixedWidthArray extends kelondroRecords implements kelondro public synchronized int add(kelondroRow.Entry rowentry) throws IOException { // adds a new rowentry, but re-uses a previously as-deleted marked entry - if (markedRemoved.size() == 0) { - // no records there to be re-used - Node n = newNode(rowentry.bytes()); - n.commit(CP_NONE); - return n.handle().hashCode(); - } else { - // re-use a removed record - Integer index = (Integer) markedRemoved.first(); - markedRemoved.remove(index); - set(index.intValue(), rowentry); - return index.intValue(); - } + Node n = newNode(rowentry.bytes()); + commit(n, CP_NONE); + return n.handle().hashCode(); } - public synchronized void remove(int index, boolean marked) throws IOException { + public synchronized void remove(int index) throws IOException { assert (index < (super.free() + super.size())) : "remove: index " + index + " out of bounds " + (super.free() + super.size()); - if (marked) { - // does not remove directly, but sets only a mark that a record is to be deleted - // this record can be re-used with add - markedRemoved.add(new Integer(index)); - } else { - - // get the node at position index - Handle h = new Handle(index); - Node n = getNode(h, false); + // get the node at position index + Handle h = new Handle(index); + Node n = getNode(h, false); - // erase the row - n.setValueRow(null); - n.commit(CP_NONE); - - // mark row as deleted so it can be re-used - deleteNode(h); - } - } - - public synchronized void resolveMarkedRemoved() throws IOException { - Iterator i = markedRemoved.iterator(); - Integer index; - while (i.hasNext()) { - index = (Integer) i.next(); - remove(index.intValue(), false); - } - markedRemoved.clear(); - } + // erase the row + n.setValueRow(null); + commit(n, CP_NONE); + + // mark row as deleted so it can be re-used + deleteNode(h); + } public void print() throws IOException { System.out.println("PRINTOUT of table, length=" + size()); @@ -229,14 +201,14 @@ public class kelondroFixedWidthArray extends kelondroRecords implements kelondro k = new kelondroFixedWidthArray(f, rowdef, 6); k.add(k.row().newEntry(new byte[][]{"a".getBytes(), "xxxx".getBytes()})); k.add(k.row().newEntry(new byte[][]{"b".getBytes(), "xxxx".getBytes()})); - k.remove(0, false); + k.remove(0); k.add(k.row().newEntry(new byte[][]{"c".getBytes(), "xxxx".getBytes()})); k.add(k.row().newEntry(new byte[][]{"d".getBytes(), "xxxx".getBytes()})); k.add(k.row().newEntry(new byte[][]{"e".getBytes(), "xxxx".getBytes()})); k.add(k.row().newEntry(new byte[][]{"f".getBytes(), "xxxx".getBytes()})); - k.remove(0, false); - k.remove(1, false); + k.remove(0); + k.remove(1); k.print(); k.print(true); @@ -251,10 +223,9 @@ public class kelondroFixedWidthArray extends kelondroRecords implements kelondro k.add(k.row().newEntry(new byte[][]{(Integer.toString(i) + "-" + Integer.toString(j)).getBytes(), "xxxx".getBytes()})); } for (int j = 0; j < i; j++) { - k.remove(j, true); + k.remove(j); } } - k.resolveMarkedRemoved(); k.print(); k.print(true); k.close(); diff --git a/source/de/anomic/kelondro/kelondroFlexTable.java b/source/de/anomic/kelondro/kelondroFlexTable.java index ed640f211..43427e6bb 100644 --- a/source/de/anomic/kelondro/kelondroFlexTable.java +++ b/source/de/anomic/kelondro/kelondroFlexTable.java @@ -218,7 +218,13 @@ public class kelondroFlexTable extends kelondroFlexWidthArray implements kelondr public synchronized kelondroRow.Entry get(byte[] key) throws IOException { int pos = RWindex.geti(key); - if ((pos < 0) && (ROindex != null)) pos = ROindex.geti(key); + if (ROindex != null) { + if (pos < 0) { + pos = ROindex.geti(key); + } else { + assert ROindex.geti(key) < 0; + } + } if (pos < 0) return null; // i may be greater than this.size(), because this table may have deleted entries // the deleted entries are subtracted from the 'real' tablesize, so the size may be @@ -298,11 +304,20 @@ public class kelondroFlexTable extends kelondroFlexWidthArray implements kelondr public synchronized kelondroRow.Entry remove(byte[] key) throws IOException { int i = RWindex.removei(key); - if ((i < 0) && (ROindex != null)) i = ROindex.removei(key); // yes, we are allowed to remove entries from RO partition of the index + if (ROindex != null) { + if (i < 0) { + i = ROindex.removei(key); // yes, we are allowed to remove entries from RO partition of the index + } else { + assert ROindex.removei(key) < 0; + } + } + assert (RWindex.removei(key) < 0); + assert (ROindex == null) || (ROindex.removei(key) < 0); if (i < 0) return null; - kelondroRow.Entry r; - r = super.get(i); - super.remove(i, false); + kelondroRow.Entry r = super.get(i); + assert r != null; // error + super.remove(i); + assert super.get(i) == null : "i = " + i + ", get(i) = " + serverLog.arrayList(super.get(i).bytes(), 0, 12); return r; } @@ -312,7 +327,7 @@ public class kelondroFlexTable extends kelondroFlexWidthArray implements kelondr if (i < 0) return null; kelondroRow.Entry r; r = super.get(i); - super.remove(i, false); + super.remove(i); return r; } diff --git a/source/de/anomic/kelondro/kelondroFlexWidthArray.java b/source/de/anomic/kelondro/kelondroFlexWidthArray.java index e60630524..565dd331d 100644 --- a/source/de/anomic/kelondro/kelondroFlexWidthArray.java +++ b/source/de/anomic/kelondro/kelondroFlexWidthArray.java @@ -186,8 +186,8 @@ public class kelondroFlexWidthArray implements kelondroArray { this.init(); } - public void close() { - if (col != null) synchronized (col) { + public synchronized void close() { + if (col != null) { for (int i = 0; i < col.length; i++) { if (col[i] != null) { // a column can be null, this is normal @@ -216,64 +216,58 @@ public class kelondroFlexWidthArray implements kelondroArray { return col[0].size(); } - public void setMultiple(TreeMap /*of {Integer, kelondroRow.Entry}*/ entries) throws IOException { + public synchronized void setMultiple(TreeMap /*of {Integer, kelondroRow.Entry}*/ entries) throws IOException { // a R/W head path-optimized option to write a set of entries Iterator i; Map.Entry entry; kelondroRow.Entry rowentry, e; int c = 0, index; - synchronized (col) { - // go across each file - while (c < rowdef.columns()) { - i = entries.entrySet().iterator(); - while (i.hasNext()) { - entry = (Map.Entry) i.next(); - index = ((Integer) entry.getKey()).intValue(); - rowentry = (kelondroRow.Entry) entry.getValue(); - assert rowentry.objectsize() == this.rowdef.objectsize; + // go across each file + while (c < rowdef.columns()) { + i = entries.entrySet().iterator(); + while (i.hasNext()) { + entry = (Map.Entry) i.next(); + index = ((Integer) entry.getKey()).intValue(); + rowentry = (kelondroRow.Entry) entry.getValue(); + assert rowentry.objectsize() == this.rowdef.objectsize; - e = col[c].row().newEntry(rowentry.bytes(), rowdef.colstart[c]); - col[c].set(index, e); - } - c = c + col[c].row().columns(); + e = col[c].row().newEntry(rowentry.bytes(), rowdef.colstart[c]); + col[c].set(index, e); } + c = c + col[c].row().columns(); } } - public void set(int index, kelondroRow.Entry rowentry) throws IOException { + public synchronized void set(int index, kelondroRow.Entry rowentry) throws IOException { assert rowentry.objectsize() == this.rowdef.objectsize; int c = 0; kelondroRow.Entry e; - synchronized (col) { - byte[] reb = rowentry.bytes(); - while (c < rowdef.columns()) { - e = col[c].row().newEntry(reb, rowdef.colstart[c]); - col[c].set(index, e); - c = c + col[c].row().columns(); - } - } - } + byte[] reb = rowentry.bytes(); + while (c < rowdef.columns()) { + e = col[c].row().newEntry(reb, rowdef.colstart[c]); + col[c].set(index, e); + c = c + col[c].row().columns(); + } + } - public int add(kelondroRow.Entry rowentry) throws IOException { + public synchronized int add(kelondroRow.Entry rowentry) throws IOException { assert rowentry.objectsize() == this.rowdef.objectsize; kelondroRow.Entry e; int index = -1; - byte[] reb = rowentry.bytes(); - synchronized (col) { - e = col[0].row().newEntry(reb, 0); - index = col[0].add(e); - int c = col[0].row().columns(); + byte[] reb = rowentry.bytes(); + e = col[0].row().newEntry(reb, 0); + index = col[0].add(e); + int c = col[0].row().columns(); - while (c < rowdef.columns()) { - e = col[c].row().newEntry(reb, rowdef.colstart[c]); - col[c].set(index, e); - c = c + col[c].row().columns(); - } - } - return index; + while (c < rowdef.columns()) { + e = col[c].row().newEntry(reb, rowdef.colstart[c]); + col[c].set(index, e); + c = c + col[c].row().columns(); + } + return index; } - protected TreeMap addMultiple(List rows) throws IOException { + protected synchronized TreeMap addMultiple(List rows) throws IOException { // result is a Integer/byte[] relation // of newly added rows (index, key) TreeMap indexref = new TreeMap(); @@ -292,20 +286,18 @@ public class kelondroFlexWidthArray implements kelondroArray { kelondroRow.Entry e; int index = -1; byte[] reb = rowentry.bytes(); - synchronized (col) { - e = col[0].row().newEntry(reb, 0); - index = col[0].add(e); - int c = col[0].row().columns(); + e = col[0].row().newEntry(reb, 0); + index = col[0].add(e); + int c = col[0].row().columns(); - while (c < rowdef.columns()) { - e = col[c].row().newEntry(reb, rowdef.colstart[c]); - // remember write to column, but do not write directly - colm[c].put(new Integer(index), e); // col[c].set(index,e); - c = c + col[c].row().columns(); - } - } - indexref.put(new Integer(index), rowentry.getColBytes(0)); - } + while (c < rowdef.columns()) { + e = col[c].row().newEntry(reb, rowdef.colstart[c]); + // remember write to column, but do not write directly + colm[c].put(new Integer(index), e); // col[c].set(index,e); + c = c + col[c].row().columns(); + } + indexref.put(new Integer(index), rowentry.getColBytes(0)); + } // write the other columns for (int j = 1; j < col.length; j++) { if (col[j] != null) col[j].setMultiple(colm[j]); @@ -314,46 +306,34 @@ public class kelondroFlexWidthArray implements kelondroArray { return indexref; } - public kelondroRow.Entry get(int index) throws IOException { - int r = 0; - kelondroRow.Entry e, p; - p = rowdef.newEntry(); - synchronized (col) { - while (r < rowdef.columns()) { - if (r == 0) { - e = col[r].getIfValid(index); - if (e == null) return null; // probably a deleted entry - } else { - e = col[r].get(index); - } - for (int i = 0; i < col[r].row().columns(); i++) - p.setCol(r + i, e.getColBytes(i)); - r = r + col[r].row().columns(); - } - } - return p; + public synchronized kelondroRow.Entry get(int index) throws IOException { + kelondroRow.Entry e = col[0].getIfValid(index); + if (e == null) return null; // probably a deleted entry + kelondroRow.Entry p = rowdef.newEntry(); + p.setCol(0, e.getColBytes(0)); + int r = col[0].row().columns(); + while (r < rowdef.columns()) { + e = col[r].get(index); + for (int i = 0; i < col[r].row().columns(); i++) { + p.setCol(r + i, e.getColBytes(i)); + } + r = r + col[r].row().columns(); + } + return p; } - public void remove(int index, boolean marked) throws IOException { + public synchronized void remove(int index) throws IOException { int r = 0; - synchronized (col) { - - // remove only from the first column - col[0].remove(index, marked); - r = r + col[r].row().columns(); - - // the other columns will be blanked out only - while (r < rowdef.columns()) { - col[r].set(index, null); - r = r + col[r].row().columns(); - } - } - } - - public synchronized void resolveMarkedRemoved() throws IOException { - synchronized (col) { - col[0].resolveMarkedRemoved(); - } + + // remove only from the first column + col[0].remove(index); + r = r + col[r].row().columns(); + + // the other columns will be blanked out only + while (r < rowdef.columns()) { + col[r].set(index, null); + r = r + col[r].row().columns(); + } } public void print() throws IOException { @@ -380,14 +360,14 @@ public class kelondroFlexWidthArray implements kelondroArray { kelondroFlexWidthArray k = new kelondroFlexWidthArray(f, "flextest", rowdef, true); k.add(k.row().newEntry(new byte[][]{"a".getBytes(), "xxxx".getBytes()})); k.add(k.row().newEntry(new byte[][]{"b".getBytes(), "xxxx".getBytes()})); - k.remove(0, false); + k.remove(0); k.add(k.row().newEntry(new byte[][]{"c".getBytes(), "xxxx".getBytes()})); k.add(k.row().newEntry(new byte[][]{"d".getBytes(), "xxxx".getBytes()})); k.add(k.row().newEntry(new byte[][]{"e".getBytes(), "xxxx".getBytes()})); k.add(k.row().newEntry(new byte[][]{"f".getBytes(), "xxxx".getBytes()})); - k.remove(0, false); - k.remove(1, false); + k.remove(0); + k.remove(1); k.print(); k.col[0].print(true); @@ -407,11 +387,10 @@ public class kelondroFlexWidthArray implements kelondroArray { k.close(); k = new kelondroFlexWidthArray(f, "flextest", rowdef, true); for (int j = 0; j < i; j++) { - k.remove(i*2 - j - 1, true); + k.remove(i*2 - j - 1); } k.close(); } - k.resolveMarkedRemoved(); k = new kelondroFlexWidthArray(f, "flextest", rowdef, true); k.print(); k.col[0].print(true); diff --git a/source/de/anomic/kelondro/kelondroRecords.java b/source/de/anomic/kelondro/kelondroRecords.java index 871af37b0..7f3890112 100644 --- a/source/de/anomic/kelondro/kelondroRecords.java +++ b/source/de/anomic/kelondro/kelondroRecords.java @@ -177,7 +177,7 @@ public class kelondroRecords { private int FREEC; // counter of free elements in list of free Nodes private Handle FREEH; // pointer to first element in list of free Nodes, empty = NUL - public usageControl(boolean init) throws IOException { + private usageControl(boolean init) throws IOException { if (init) { this.USEDC = 0; this.FREEC = 0; @@ -254,7 +254,7 @@ public class kelondroRecords { assert (h.index >= 0); assert (h.index != NUL); //synchronized (USAGE) { - //synchronized (entryFile) { + synchronized (entryFile) { assert (h.index < USEDC + FREEC) : "USEDC = " + USEDC + ", FREEC = " + FREEC + ", h.index = " + h.index; long sp = seekpos(h); assert (sp <= entryFile.length() + ROW.objectsize) : h.index + "/" + sp + " exceeds file size " + entryFile.length(); @@ -266,7 +266,7 @@ public class kelondroRecords { FREEH = h; writefree(); writeused(false); - //} + } //} } @@ -279,7 +279,7 @@ public class kelondroRecords { } assert (chunk.length == ROW.objectsize()) : "chunk.length = " + chunk.length + ", ROW.objectsize() = " + ROW.objectsize(); //synchronized (USAGE) { - //synchronized (entryFile) { + synchronized (entryFile) { if (USAGE.FREEC == 0) { // generate new entry int index = USAGE.allCount(); @@ -292,19 +292,18 @@ public class kelondroRecords { USAGE.USEDC++; USAGE.FREEC--; // take link - int index; - if (USAGE.FREEH.index == NUL) { + int index = USAGE.FREEH.index; + if (index == NUL) { serverLog.logSevere("kelondroRecords/" + filename, "INTERNAL ERROR (DATA INCONSISTENCY): re-use of records failed, lost " + (USAGE.FREEC + 1) + " records."); // try to heal.. USAGE.USEDC = USAGE.allCount() + 1; USAGE.FREEC = 0; index = USAGE.USEDC - 1; } else { - index = USAGE.FREEH.index; //System.out.println("*DEBUG* ALLOCATED DELETED INDEX " + index); // check for valid seek position long seekp = seekpos(USAGE.FREEH); - if (seekp > entryFile.length()) { + if (seekp >= entryFile.length()) { // this is a severe inconsistency. try to heal.. serverLog.logSevere("kelondroRecords/" + filename, "new Handle: lost " + USAGE.FREEC + " marked nodes; seek position " + seekp + "/" + USAGE.FREEH.index + " out of file size " + entryFile.length() + "/" + ((entryFile.length() - POS_NODES) / recordsize)); index = USAGE.allCount(); // a place at the end of the file @@ -314,6 +313,7 @@ public class kelondroRecords { } else { // read link to next element of FREEH chain USAGE.FREEH.index = entryFile.readInt(seekp); + assert ((USAGE.FREEH.index == NUL) && (USAGE.FREEC == 0)) || seekpos(USAGE.FREEH) < entryFile.length() : "allocatePayload: USAGE.FREEH.index = " + USAGE.FREEH.index + ", seekp = " + seekp; } } USAGE.writeused(false); @@ -321,7 +321,7 @@ public class kelondroRecords { entryFile.write(seekpos(index) + overhead, chunk, 0, ROW.objectsize()); // overwrite space return index; } - //} + } //} } @@ -335,7 +335,7 @@ public class kelondroRecords { } //assert (chunk.length == ROW.objectsize()) : "chunk.length = " + chunk.length + ", ROW.objectsize() = " + ROW.objectsize(); //synchronized (USAGE) { - //synchronized (entryFile) { + synchronized (entryFile) { if (index < USAGE.allCount()) { // write within the file // this can be critical, if we simply overwrite fields that are marked @@ -355,6 +355,7 @@ public class kelondroRecords { entryFile.write(seekpos(h), spaceChunk); // occupy space, othervise the USAGE computaton does not work entryFile.writeInt(seekpos(h), USAGE.FREEH.index); USAGE.FREEH = h; + assert ((USAGE.FREEH.index == NUL) && (USAGE.FREEC == 0)) || seekpos(USAGE.FREEH) < entryFile.length() : "allocateRecord: USAGE.FREEH.index = " + USAGE.FREEH.index; USAGE.writefree(); entryFile.commit(); } @@ -368,7 +369,7 @@ public class kelondroRecords { entryFile.commit(); } } - //} + } //} } @@ -770,7 +771,7 @@ public class kelondroRecords { return map; } - public final byte[] bulkRead(int start, int end) throws IOException { + public synchronized final byte[] bulkRead(int start, int end) throws IOException { // a bulk read simply reads a piece of memory from the record file // this makes only sense if there are no overhead bytes or pointer // the end value is OUTSIDE the record interval @@ -803,14 +804,23 @@ public class kelondroRecords { } protected synchronized final void deleteNode(Handle handle) throws IOException { - if ((cacheHeaders == null) || (cacheHeaders.size() == 0)) { - USAGE.dispose(handle); - } else synchronized (cacheHeaders) { - cacheHeaders.removeb(handle.index); - cacheDelete++; - USAGE.dispose(handle); + if (cacheHeaders == null) { + USAGE.dispose(handle); + } else synchronized (cacheHeaders) { + if (cacheHeaders.size() == 0) { + USAGE.dispose(handle); + } else { + cacheHeaders.removeb(handle.index); + cacheDelete++; + USAGE.dispose(handle); + } } } + + public synchronized void commit(Node n, int cachePriority) throws IOException { + n.commit(cachePriority); + } + public final class Node { // an Node holds all information of one row of data. This includes the key to the entry @@ -839,13 +849,13 @@ public class kelondroRecords { //private byte[] ohBytes = null; // the overhead bytes, OHBYTEC values //private Handle[] ohHandle= null; // the overhead handles, OHHANDLEC values //private byte[][] values = null; // an array of byte[] nodes is the value vector - protected Handle handle = null; // index of the entry, by default NUL means undefined - protected byte[] headChunk = null; // contains ohBytes, ohHandles and the key value - protected byte[] tailChunk = null; // contains all values except the key value - protected boolean headChanged = false; - protected boolean tailChanged = false; + private Handle handle = null; // index of the entry, by default NUL means undefined + private byte[] headChunk = null; // contains ohBytes, ohHandles and the key value + private byte[] tailChunk = null; // contains all values except the key value + private boolean headChanged = false; + private boolean tailChanged = false; - protected Node(byte[] rowinstance) throws IOException { + private Node(byte[] rowinstance) throws IOException { // this initializer is used to create nodes from bulk-read byte arrays assert ((rowinstance == null) || (rowinstance.length == ROW.objectsize)) : "bulkchunk.length = " + rowinstance.length + ", ROW.width(0) = " + ROW.width(0); this.handle = new Handle(USAGE.allocatePayload(rowinstance)); @@ -871,7 +881,7 @@ public class kelondroRecords { this.tailChanged = false; // we write the tail already during allocate } - protected Node(Handle handle, byte[] bulkchunk, int offset) throws IOException { + private Node(Handle handle, byte[] bulkchunk, int offset) throws IOException { // this initializer is used to create nodes from bulk-read byte arrays // if write is true, then the chunk in bulkchunk is written to the file // othervise it is considered equal to what is stored in the file @@ -902,7 +912,7 @@ public class kelondroRecords { this.tailChanged = changed; } - protected Node(Handle handle, Node parentNode, int referenceInParent, boolean fillTail) throws IOException { + private Node(Handle handle, Node parentNode, int referenceInParent, boolean fillTail) throws IOException { // this creates an entry with an pre-reserved entry position. // values can be written using the setValues() method, // but we expect that values are already there in the file. @@ -1040,7 +1050,7 @@ public class kelondroRecords { return (h == NUL) ? null : new Handle(h); } - public byte[] setValueRow(byte[] row) throws IOException { + public synchronized byte[] setValueRow(byte[] row) throws IOException { // if the index is defined, then write values directly to the file, else only to the object if ((row != null) && (row.length != ROW.objectsize())) throw new IOException("setValueRow with wrong (" + row.length + ") row length instead correct: " + ROW.objectsize()); byte[] result = getValueRow(); // previous value (this loads the values if not already happened) @@ -1055,18 +1065,18 @@ public class kelondroRecords { return result; // return previous value } - public boolean valid() { + public synchronized boolean valid() { // returns true if the key starts with non-zero byte // this may help to detect deleted entries - return headChunk[overhead] != 0; + return (headChunk[overhead] != 0) && ((headChunk[overhead] != -128) || (headChunk[overhead + 1] != 0)); } - public byte[] getKey() { + public synchronized byte[] getKey() { // read key return trimCopy(headChunk, overhead, ROW.width(0)); } - public byte[] getValueRow() throws IOException { + public synchronized byte[] getValueRow() throws IOException { if (this.tailChunk == null) { // load all values from the database file @@ -1087,7 +1097,7 @@ public class kelondroRecords { return row; } - public synchronized void commit(int cachePriority) throws IOException { + private synchronized void commit(int cachePriority) throws IOException { // this must be called after all write operations to the node are // finished @@ -1122,14 +1132,6 @@ public class kelondroRecords { } } - public synchronized void collapse() { - // this must be called after all write and read operations to the - // node are finished - this.headChunk = null; - this.tailChunk = null; - this.handle = null; - } - private byte[] trimCopy(byte[] a, int offset, int length) { if (length > a.length - offset) length = a.length - offset; while ((length > 0) && (a[offset + length - 1] == 0)) length--; @@ -1168,7 +1170,7 @@ public class kelondroRecords { if (cacheGrowStatus() == 2) return true; // no need to flush cache space // just delete any of the entries - if (cacheGrowStatus() <= 1) { + if (cacheGrowStatus() <= 1) synchronized (cacheHeaders) { cacheHeaders.removeoneb(); cacheFlush++; } @@ -1316,13 +1318,11 @@ public class kelondroRecords { } // Returns the number of key-value mappings in this map. - public int size() { - synchronized (entryFile) { - return USAGE.used(); - } + public synchronized int size() { + return USAGE.used(); } - protected final int free() { + protected synchronized final int free() { return USAGE.FREEC; } @@ -1379,13 +1379,12 @@ public class kelondroRecords { if (USAGE.FREEC != 0) { Handle h = USAGE.FREEH; long repair_position = POS_FREEH; - int iter = 0; while (h.index != NUL) { // check handle seekp = seekpos(h); if (seekp > entryFile.length()) { // repair last hande store position - this.theLogger.severe("KELONDRO WARNING " + this.filename + ": seek position " + seekp + "/" + h.index + " out of file size " + entryFile.length() + "/" + ((entryFile.length() - POS_NODES) / recordsize) + " after " + iter + " iterations; patched wrong node"); + this.theLogger.severe("KELONDRO WARNING " + this.filename + ": seek position " + seekp + "/" + h.index + " out of file size " + entryFile.length() + "/" + ((entryFile.length() - POS_NODES) / recordsize) + " after " + markedDeleted.size() + " iterations; patched wrong node"); entryFile.writeInt(repair_position, NUL); return markedDeleted; } @@ -1407,10 +1406,9 @@ public class kelondroRecords { } // this appears to be correct. go on. - iter++; if (System.currentTimeMillis() > timeLimit) throw new kelondroException(filename, "time limit of " + maxTime + " exceeded; > " + markedDeleted.size() + " deleted entries"); } - System.out.println("\nDEBUG: " + iter + " deleted entries in " + entryFile.name()); + System.out.println("\nDEBUG: " + markedDeleted.size() + " deleted entries in " + entryFile.name()); } } return markedDeleted; diff --git a/source/de/anomic/kelondro/kelondroRowCollection.java b/source/de/anomic/kelondro/kelondroRowCollection.java index d5e4bbe2b..2de360de1 100644 --- a/source/de/anomic/kelondro/kelondroRowCollection.java +++ b/source/de/anomic/kelondro/kelondroRowCollection.java @@ -225,7 +225,7 @@ public class kelondroRowCollection { return rowdef.newEntry(chunkcache, index * rowdef.objectsize()); } - public final void set(int index, kelondroRow.Entry a) { + public synchronized final void set(int index, kelondroRow.Entry a) { assert (index >= 0) : "get: access with index " + index + " is below zero"; assert (index < chunkcount) : "get: access with index " + index + " is above chunkcount " + chunkcount; //assert (!(bugappearance(a, astart, alength))) : "a = " + serverLog.arrayList(a, astart, alength); @@ -234,11 +234,11 @@ public class kelondroRowCollection { this.lastTimeWrote = System.currentTimeMillis(); } - public void addUnique(kelondroRow.Entry row) { + public synchronized void addUnique(kelondroRow.Entry row) { addUnique(row.bytes(), 0, row.bytes().length); } - public void addUnique(kelondroRow.Entry row, Date entryDate) { + public synchronized void addUnique(kelondroRow.Entry row, Date entryDate) { addUnique(row); } @@ -247,11 +247,11 @@ public class kelondroRowCollection { while (i.hasNext()) addUnique((kelondroRow.Entry) i.next()); } - public void add(byte[] a) { + public synchronized void add(byte[] a) { addUnique(a, 0, a.length); } - private synchronized final void addUnique(byte[] a, int astart, int alength) { + private final void addUnique(byte[] a, int astart, int alength) { assert (a != null); assert (astart >= 0) && (astart < a.length) : " astart = " + a; assert (!(serverLog.allZero(a, astart, alength))) : "a = " + serverLog.arrayList(a, astart, alength); @@ -289,7 +289,7 @@ public class kelondroRowCollection { chunkcount += c.size(); } - protected final void removeShift(int pos, int dist, int upBound) { + private final void removeShift(int pos, int dist, int upBound) { assert ((pos + dist) * rowdef.objectsize() >= 0) : "pos = " + pos + ", dist = " + dist + ", rowdef.objectsize() = " + rowdef.objectsize; assert (pos * rowdef.objectsize() >= 0) : "pos = " + pos + ", rowdef.objectsize() = " + rowdef.objectsize; assert ((pos + dist) * rowdef.objectsize() + (upBound - pos - dist) * rowdef.objectsize() <= chunkcache.length) : "pos = " + pos + ", dist = " + dist + ", rowdef.objectsize() = " + rowdef.objectsize + ", upBound = " + upBound + ", chunkcache.length = " + chunkcache.length; @@ -299,7 +299,7 @@ public class kelondroRowCollection { (upBound - pos - dist) * rowdef.objectsize()); } - protected final void copytop(int i) { + private final void copytop(int i) { // copies the topmost row element to given position if (i == chunkcount - 1) return; System.arraycopy(chunkcache, this.rowdef.objectsize() * (chunkcount - 1), chunkcache, this.rowdef.objectsize() * i, this.rowdef.objectsize()); @@ -458,7 +458,7 @@ public class kelondroRowCollection { swap(j, j - 1, 0); } - protected final int swap(int i, int j, int p) { + private final int swap(int i, int j, int p) { if (i == j) return p; if ((this.chunkcount + 1) * this.rowdef.objectsize() < this.chunkcache.length) { // there is space in the chunkcache that we can use as buffer @@ -490,7 +490,7 @@ public class kelondroRowCollection { } } - public String toString() { + public synchronized String toString() { StringBuffer s = new StringBuffer(); Iterator i = rows(); if (i.hasNext()) s.append(((kelondroRow.Entry) i.next()).toString()); @@ -519,13 +519,13 @@ public class kelondroRowCollection { return c; } - protected int compare(byte[] a, int astart, int alength, int chunknumber) { + protected synchronized int compare(byte[] a, int astart, int alength, int chunknumber) { assert (chunknumber < chunkcount); int l = Math.min(this.rowdef.width(rowdef.primaryKey), Math.min(a.length - astart, alength)); return rowdef.objectOrder.compare(a, astart, l, chunkcache, chunknumber * this.rowdef.objectsize() + this.rowdef.colstart[rowdef.primaryKey], this.rowdef.width(rowdef.primaryKey)); } - protected boolean match(byte[] a, int astart, int alength, int chunknumber) { + protected synchronized boolean match(byte[] a, int astart, int alength, int chunknumber) { if (chunknumber >= chunkcount) return false; int i = 0; int p = chunknumber * this.rowdef.objectsize() + this.rowdef.colstart[rowdef.primaryKey]; diff --git a/source/de/anomic/kelondro/kelondroRowSet.java b/source/de/anomic/kelondro/kelondroRowSet.java index 4670970ef..d4e1848c3 100644 --- a/source/de/anomic/kelondro/kelondroRowSet.java +++ b/source/de/anomic/kelondro/kelondroRowSet.java @@ -109,6 +109,7 @@ public class kelondroRowSet extends kelondroRowCollection implements kelondroInd if (index < 0) return null; kelondroRow.Entry entry = super.get(index); super.removeRow(index); + assert find(a, start, length) < 0; // check if the remove worked return entry; } diff --git a/source/de/anomic/kelondro/kelondroStack.java b/source/de/anomic/kelondro/kelondroStack.java index 882bf4038..f2bb7b46e 100644 --- a/source/de/anomic/kelondro/kelondroStack.java +++ b/source/de/anomic/kelondro/kelondroStack.java @@ -109,6 +109,7 @@ public final class kelondroStack extends kelondroRecords { public class Counter implements Iterator { Handle nextHandle = null; + Handle lastHandle = null; public Counter() { nextHandle = getHandle(root); @@ -119,17 +120,21 @@ public final class kelondroStack extends kelondroRecords { } public Object next() { - Handle ret = nextHandle; + lastHandle = nextHandle; try { nextHandle = getNode(nextHandle, null, 0, false).getOHHandle(right); - return getNode(ret, null, 0, true); + return getNode(lastHandle, null, 0, true); } catch (IOException e) { throw new kelondroException(filename, "IO error at Counter:next()"); } } public void remove() { - throw new UnsupportedOperationException("no remove here.."); + try { + unlinkNode(getNode(lastHandle, false)); + } catch (IOException e) { + e.printStackTrace(); + } } } @@ -145,7 +150,7 @@ public final class kelondroStack extends kelondroRecords { Node n = newNode(row.bytes()); n.setOHHandle(left, null); n.setOHHandle(right, null); - n.commit(CP_NONE); + commit(n, CP_NONE); // assign handles setHandle(root, n.handle()); setHandle(toor, n.handle()); @@ -157,8 +162,8 @@ public final class kelondroStack extends kelondroRecords { n.setOHHandle(right, null); Node n1 = getNode(getHandle(toor), null, 0, false); n1.setOHHandle(right, n.handle()); - n.commit(CP_NONE); - n1.commit(CP_NONE); + commit(n, CP_NONE); + commit(n1, CP_NONE); // assign handles setHandle(toor, n.handle()); // thats it @@ -248,7 +253,7 @@ public final class kelondroStack extends kelondroRecords { Node k = getNode(l, null, 0, false); k.setOHHandle(left, k.getOHHandle(left)); k.setOHHandle(right, r); - k.commit(CP_NONE); + commit(k, CP_NONE); } // look right if (r == null) { @@ -259,7 +264,7 @@ public final class kelondroStack extends kelondroRecords { Node k = getNode(r, null, 0, false); k.setOHHandle(left, l); k.setOHHandle(right, k.getOHHandle(right)); - k.commit(CP_NONE); + commit(k, CP_NONE); } } diff --git a/source/de/anomic/kelondro/kelondroTree.java b/source/de/anomic/kelondro/kelondroTree.java index c8d6f0612..2a70faf06 100644 --- a/source/de/anomic/kelondro/kelondroTree.java +++ b/source/de/anomic/kelondro/kelondroTree.java @@ -164,10 +164,10 @@ public class kelondroTree extends kelondroRecords implements kelondroIndex { private void commitNode(Node n) throws IOException { Handle left = n.getOHHandle(leftchild); Handle right = n.getOHHandle(rightchild); - if ((left == null) && (right == null)) n.commit(CP_LOW); - else if (left == null) n.commit(CP_MEDIUM); - else if (right == null) n.commit(CP_MEDIUM); - else n.commit(CP_HIGH); + if ((left == null) && (right == null)) commit(n, CP_LOW); + else if (left == null) commit(n, CP_MEDIUM); + else if (right == null) commit(n, CP_MEDIUM); + else commit(n, CP_HIGH); } public boolean has(byte[] key) throws IOException { @@ -269,7 +269,7 @@ public class kelondroTree extends kelondroRecords implements kelondroIndex { thenode.setOHHandle(parent, null); thenode.setOHHandle(leftchild, null); thenode.setOHHandle(rightchild, null); - thenode.commit(CP_NONE); + commit(thenode, CP_NONE); logWarning("kelondroTree.Search.process: database contains loops; the loop-nodes have been auto-fixed"); found = false; return; @@ -372,7 +372,7 @@ public class kelondroTree extends kelondroRecords implements kelondroIndex { e.setOHHandle(leftchild, null); e.setOHHandle(rightchild, null); // do updates - e.commit(CP_LOW); + commit(e, CP_LOW); setHandle(root, e.handle()); result = null; } else { @@ -391,15 +391,15 @@ public class kelondroTree extends kelondroRecords implements kelondroIndex { theNode.setOHHandle(parent, parentNode.handle()); theNode.setOHHandle(leftchild, null); theNode.setOHHandle(rightchild, null); - theNode.commit(CP_LOW); + commit(theNode, CP_LOW); // check consistency and link new node to parent node byte parentBalance; if (writeSearchObj.isLeft()) { - if (parentNode.getOHHandle(leftchild) != null) throw new kelondroException(filename, "tried to create leftchild node twice. parent=" + new String(parentNode.getKey()) + ", leftchild=" + new String(new Node(parentNode.getOHHandle(leftchild), (Node) null, 0, true).getKey())); + if (parentNode.getOHHandle(leftchild) != null) throw new kelondroException(filename, "tried to create leftchild node twice. parent=" + new String(parentNode.getKey()) + ", leftchild=" + new String(getNode(parentNode.getOHHandle(leftchild), (Node) null, 0, true).getKey())); parentNode.setOHHandle(leftchild, theNode.handle()); } else if (writeSearchObj.isRight()) { - if (parentNode.getOHHandle(rightchild) != null) throw new kelondroException(filename, "tried to create rightchild node twice. parent=" + new String(parentNode.getKey()) + ", rightchild=" + new String(new Node(parentNode.getOHHandle(rightchild), (Node) null, 0, true).getKey())); + if (parentNode.getOHHandle(rightchild) != null) throw new kelondroException(filename, "tried to create rightchild node twice. parent=" + new String(parentNode.getKey()) + ", rightchild=" + new String(getNode(parentNode.getOHHandle(rightchild), (Node) null, 0, true).getKey())); parentNode.setOHHandle(rightchild, theNode.handle()); } else { throw new kelondroException(filename, "neither left nor right child"); @@ -933,7 +933,7 @@ public class kelondroTree extends kelondroRecords implements kelondroIndex { if (visitedNodeHandles.contains(childHandle)) { // try to repair the nextNode nextNode.setOHHandle(childtype, null); - nextNode.commit(CP_NONE); + commit(nextNode, CP_NONE); logWarning("nodeIterator.next: internal loopback; fixed loop and try to go on"); break; } diff --git a/source/de/anomic/plasma/plasmaCrawlBalancer.java b/source/de/anomic/plasma/plasmaCrawlBalancer.java index b70f34995..3fc79d1e4 100644 --- a/source/de/anomic/plasma/plasmaCrawlBalancer.java +++ b/source/de/anomic/plasma/plasmaCrawlBalancer.java @@ -146,37 +146,42 @@ public class plasmaCrawlBalancer { // this method is only here, because so many import/export methods need it // and it was implemented in the previous architecture // however, usage is not recommendet + int s = urlFileIndex.size(); kelondroRow.Entry entry = urlFileIndex.remove(urlhash.getBytes()); if (entry == null) return null; + assert urlFileIndex.size() + 1 == s : "urlFileIndex.size() = " + urlFileIndex.size() + ", s = " + s; // now delete that thing also from the queues - + // iterate through the RAM stack Iterator i = urlRAMStack.iterator(); String h; - boolean removed = false; while (i.hasNext()) { h = (String) i.next(); if (h.equals(urlhash)) { i.remove(); - removed = true; - break; + return new plasmaCrawlEntry(entry); } } - if ((kelondroRecords.debugmode) && (!removed)) { - serverLog.logWarning("PLASMA BALANCER", "remove: not found urlhash " + urlhash + " in " + stackname); - } - // we cannot iterate through the file stack, because the stack iterator - // has not yet a delete method implemented. It would also be a bad idea - // to do that, it would make too much IO load - // instead, the top/pop methods that aquire elements from the stack, that - // cannot be found in the urlFileIndex must handle that case silently + // iterate through the file stack + // in general this is a bad idea. But this can only be avoided by avoidance of this method + i = urlFileStack.iterator(); + while (i.hasNext()) { + h = new String(((kelondroRecords.Node) i.next()).getKey()); + if (h.equals(urlhash)) { + i.remove(); + return new plasmaCrawlEntry(entry); + } + } + if (kelondroRecords.debugmode) { + serverLog.logWarning("PLASMA BALANCER", "remove: not found urlhash " + urlhash + " in " + stackname); + } return new plasmaCrawlEntry(entry); } - public boolean has(String urlhash) { + public synchronized boolean has(String urlhash) { try { return urlFileIndex.has(urlhash.getBytes()); } catch (IOException e) { @@ -189,7 +194,7 @@ public class plasmaCrawlBalancer { int componentsize = urlFileStack.size() + urlRAMStack.size() + sizeDomainStacks(); try { if (componentsize != urlFileIndex.size()) { - // here is urlIndexFile.size() always bigger. why? + // here is urlIndexFile.size() always smaller. why? if (kelondroRecords.debugmode) { serverLog.logWarning("PLASMA BALANCER", "size operation wrong in " + stackname + " - componentsize = " + componentsize + ", urlFileIndex.size() = " + urlFileIndex.size()); } @@ -285,7 +290,7 @@ public class plasmaCrawlBalancer { } // 2nd-a: check domainStacks for latest arrivals - if (result == null) { + if ((result == null) && (domainStacks.size() > 0)) { // we select specific domains that have not been used for a long time // i.e. 60 seconds. Latest arrivals that have not yet been crawled // fit also in that scheme @@ -320,7 +325,7 @@ public class plasmaCrawlBalancer { } // 2nd-b: check domainStacks for best match between stack size and retrieval time - if (result == null) { + if ((result == null) && (domainStacks.size() > 0)) { // we order all domains by the number of entries per domain // then we iterate through these domains in descending entry order // and that that one, that has a delta > minimumDelta @@ -407,8 +412,13 @@ public class plasmaCrawlBalancer { // update statistical data domainAccess.put(result.substring(6), new Long(System.currentTimeMillis())); + int s = urlFileIndex.size(); kelondroRow.Entry entry = urlFileIndex.remove(result.getBytes()); - if (entry == null) return null; + assert urlFileIndex.size() + 1 == s : "urlFileIndex.size() = " + urlFileIndex.size() + ", s = " + s + ", result = " + result; + if (entry == null) { + serverLog.logSevere("PLASMA BALANCER", "get() found a valid urlhash, but failed to fetch the corresponding url entry - total size = " + size() + ", fileStack.size() = " + urlFileStack.size() + ", ramStack.size() = " + urlRAMStack.size() + ", domainStacks.size() = " + domainStacks.size()); + return null; + } return new plasmaCrawlEntry(entry); } @@ -444,13 +454,13 @@ public class plasmaCrawlBalancer { return new plasmaCrawlEntry(entry); } - public Iterator iterator() throws IOException { + public synchronized Iterator iterator() throws IOException { return new EntryIterator(); } - public class EntryIterator implements Iterator { + private class EntryIterator implements Iterator { - Iterator rowIterator; + private Iterator rowIterator; public EntryIterator() throws IOException { rowIterator = urlFileIndex.rows(true, null); diff --git a/source/de/anomic/server/serverInstantThread.java b/source/de/anomic/server/serverInstantThread.java index 2c056f22f..1018384f3 100644 --- a/source/de/anomic/server/serverInstantThread.java +++ b/source/de/anomic/server/serverInstantThread.java @@ -42,6 +42,7 @@ package de.anomic.server; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.util.TreeMap; import de.anomic.server.logging.serverLog; @@ -49,8 +50,10 @@ public final class serverInstantThread extends serverAbstractThread implements s private Method jobExecMethod, jobCountMethod, freememExecMethod; private Object environment; + private Long handle; public static int instantThreadCounter = 0; + public static TreeMap jobs = new TreeMap(); public serverInstantThread(Object env, String jobExec, String jobCount, String freemem) { // jobExec is the name of a method of the object 'env' that executes the one-step-run @@ -81,6 +84,7 @@ public final class serverInstantThread extends serverAbstractThread implements s } this.environment = env; this.setName(env.getClass().getName() + "." + jobExec); + this.handle = new Long(System.currentTimeMillis() + this.getName().hashCode()); } public int getJobCount() { @@ -103,6 +107,8 @@ public final class serverInstantThread extends serverAbstractThread implements s public boolean job() throws Exception { instantThreadCounter++; + //System.out.println("started job " + this.handle + ": " + this.getName()); + synchronized(jobs) {jobs.put(this.handle, this.getName());} boolean jobHasDoneSomething = false; try { Object result = jobExecMethod.invoke(environment, new Object[0]); @@ -129,6 +135,7 @@ public final class serverInstantThread extends serverAbstractThread implements s freemem(); } instantThreadCounter--; + synchronized(jobs) {jobs.remove(this.handle);} return jobHasDoneSomething; }