From 513179f404d3f6ab392cbba4948c54e2ea42cd94 Mon Sep 17 00:00:00 2001 From: orbiter Date: Sun, 23 Nov 2008 23:55:08 +0000 Subject: [PATCH] changed interface to colletctionIndex and adopted all implementing classes: do not return a result of a double-check when adding entries with addUnique git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@5363 6c8d7289-2bf4-0310-a012-ef5d649a1542 --- .../anomic/kelondro/kelondroBytesIntMap.java | 110 +++++++++++++++++- .../anomic/kelondro/kelondroBytesLongMap.java | 4 +- source/de/anomic/kelondro/kelondroCache.java | 21 ++-- .../kelondro/kelondroChunkIterator.java | 3 - .../de/anomic/kelondro/kelondroEcoTable.java | 22 ++-- .../de/anomic/kelondro/kelondroFlexTable.java | 7 +- source/de/anomic/kelondro/kelondroIndex.java | 4 +- .../de/anomic/kelondro/kelondroRAMIndex.java | 15 +-- .../kelondro/kelondroRowCollection.java | 15 +-- .../de/anomic/kelondro/kelondroSQLTable.java | 4 +- .../anomic/kelondro/kelondroSplitTable.java | 18 ++- source/de/anomic/kelondro/kelondroTree.java | 12 +- 12 files changed, 153 insertions(+), 82 deletions(-) diff --git a/source/de/anomic/kelondro/kelondroBytesIntMap.java b/source/de/anomic/kelondro/kelondroBytesIntMap.java index e44fd3633..3966e898e 100644 --- a/source/de/anomic/kelondro/kelondroBytesIntMap.java +++ b/source/de/anomic/kelondro/kelondroBytesIntMap.java @@ -27,6 +27,13 @@ package de.anomic.kelondro; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; public class kelondroBytesIntMap { @@ -76,13 +83,13 @@ public class kelondroBytesIntMap { return (int) oldentry.getColLong(1); } - public synchronized boolean addi(final byte[] key, final int i) throws IOException { + public synchronized void addi(final byte[] key, final int i) throws IOException { assert i >= 0 : "i = " + i; assert (key != null); final kelondroRow.Entry newentry = this.rowdef.newEntry(); newentry.setCol(0, key); newentry.setCol(1, i); - return index.addUnique(newentry); + index.addUnique(newentry); } public synchronized ArrayList removeDoubles() throws IOException { @@ -139,4 +146,103 @@ public class kelondroBytesIntMap { index = null; } + private static class entry { + public byte[] key; + public int l; + public entry(final byte[] key, final int l) { + this.key = key; + this.l = l; + } + } + + /** + * this method creates a concurrent thread that can take entries that are used to initialize the map + * it should be used when a bytesLongMap is initialized when a file is read. Concurrency of FileIO and + * map creation will speed up the initialization process. + * @param keylength + * @param objectOrder + * @param space + * @param bufferSize + * @return + */ + public static initDataConsumer asynchronusInitializer(final int keylength, final kelondroByteOrder objectOrder, final int space, int bufferSize) { + initDataConsumer initializer = new initDataConsumer(new kelondroBytesIntMap(keylength, objectOrder, space), bufferSize); + ExecutorService service = Executors.newSingleThreadExecutor(); + initializer.setResult(service.submit(initializer)); + service.shutdown(); + return initializer; + } + + public static class initDataConsumer implements Callable { + + private BlockingQueue cache; + private final entry poison = new entry(new byte[0], 0); + private kelondroBytesIntMap map; + private Future result; + private boolean sortAtEnd; + + public initDataConsumer(kelondroBytesIntMap map, int bufferCount) { + this.map = map; + cache = new ArrayBlockingQueue(bufferCount); + sortAtEnd = false; + } + + protected void setResult(Future result) { + this.result = result; + } + + /** + * hand over another entry that shall be inserted into the BytesLongMap with an addl method + * @param key + * @param l + */ + public void consume(final byte[] key, final int l) { + try { + cache.put(new entry(key, l)); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + /** + * to signal the initialization thread that no more entries will be sublitted with consumer() + * this method must be called. The process will not terminate if this is not called before. + */ + public void finish(boolean sortAtEnd) { + this.sortAtEnd = sortAtEnd; + try { + cache.put(poison); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + /** + * this must be called after a finish() was called. this method blocks until all entries + * had been processed, and the content was sorted. It returns the kelondroBytesLongMap + * that the user wanted to initialize + * @return + * @throws InterruptedException + * @throws ExecutionException + */ + public kelondroBytesIntMap result() throws InterruptedException, ExecutionException { + return this.result.get(); + } + + public kelondroBytesIntMap call() throws IOException { + try { + entry c; + while ((c = cache.take()) != poison) { + map.addi(c.key, c.l); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + if (sortAtEnd && map.index instanceof kelondroRAMIndex) { + ((kelondroRAMIndex) map.index).finishInitialization(); + } + return map; + } + + } } diff --git a/source/de/anomic/kelondro/kelondroBytesLongMap.java b/source/de/anomic/kelondro/kelondroBytesLongMap.java index 2078e11d7..3c90867f6 100644 --- a/source/de/anomic/kelondro/kelondroBytesLongMap.java +++ b/source/de/anomic/kelondro/kelondroBytesLongMap.java @@ -71,13 +71,13 @@ public class kelondroBytesLongMap { return oldentry.getColLong(1); } - public synchronized boolean addl(final byte[] key, final long l) throws IOException { + public synchronized void addl(final byte[] key, final long l) throws IOException { assert l >= 0 : "l = " + l; assert (key != null); final kelondroRow.Entry newentry = this.rowdef.newEntry(); newentry.setCol(0, key); newentry.setCol(1, l); - return index.addUnique(newentry); + index.addUnique(newentry); } public synchronized ArrayList removeDoubles() throws IOException { diff --git a/source/de/anomic/kelondro/kelondroCache.java b/source/de/anomic/kelondro/kelondroCache.java index d88d36a0e..80d2a349e 100644 --- a/source/de/anomic/kelondro/kelondroCache.java +++ b/source/de/anomic/kelondro/kelondroCache.java @@ -318,7 +318,7 @@ public class kelondroCache implements kelondroIndex { return this.put(row); } - public synchronized boolean addUnique(final Entry row) throws IOException { + public synchronized void addUnique(final Entry row) throws IOException { assert (row != null); assert (row.columns() == row().columns()); //assert (!(serverLog.allZero(row.getColBytes(index.primarykey())))); @@ -331,21 +331,20 @@ public class kelondroCache implements kelondroIndex { this.readMissCache.remove(key); this.hasnotDelete++; // the entry does not exist before - final boolean added = index.addUnique(row); // write to backend - if (added && (readHitCache != null)) { + index.addUnique(row); // write to backend + if (readHitCache != null) { final kelondroRow.Entry dummy = readHitCache.put(row); // learn that entry if (dummy == null) this.writeUnique++; else this.writeDouble++; } - return added; + return; } // the worst case: we must write to the back-end directly - final boolean added = index.addUnique(row); - if (added && (readHitCache != null)) { + index.addUnique(row); + if (readHitCache != null) { final kelondroRow.Entry dummy = readHitCache.put(row); // learn that entry if (dummy == null) this.writeUnique++; else this.writeDouble++; } - return added; } public synchronized void addUnique(final Entry row, final Date entryDate) throws IOException { @@ -374,13 +373,9 @@ public class kelondroCache implements kelondroIndex { } } - public synchronized int addUniqueMultiple(final List rows) throws IOException { + public synchronized void addUniqueMultiple(final List rows) throws IOException { final Iterator i = rows.iterator(); - int c = 0; - while (i.hasNext()) { - if (addUnique(i.next())) c++; - } - return c; + while (i.hasNext()) addUnique(i.next()); } public synchronized ArrayList removeDoubles() throws IOException { diff --git a/source/de/anomic/kelondro/kelondroChunkIterator.java b/source/de/anomic/kelondro/kelondroChunkIterator.java index 0d6209fce..ad1ceb8d7 100644 --- a/source/de/anomic/kelondro/kelondroChunkIterator.java +++ b/source/de/anomic/kelondro/kelondroChunkIterator.java @@ -34,9 +34,6 @@ import java.util.Iterator; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; public class kelondroChunkIterator implements Iterator { diff --git a/source/de/anomic/kelondro/kelondroEcoTable.java b/source/de/anomic/kelondro/kelondroEcoTable.java index 9330d7681..41585f43c 100644 --- a/source/de/anomic/kelondro/kelondroEcoTable.java +++ b/source/de/anomic/kelondro/kelondroEcoTable.java @@ -132,8 +132,7 @@ public class kelondroEcoTable implements kelondroIndex { // write the key into the index table assert key != null; if (key == null) {i++; continue;} - if (!index.addi(key, i++)) fail++; - assert index.size() + fail == i : "index.size() = " + index.size() + ", i = " + i + ", fail = " + fail + ", key = '" + new String(key) + "'"; + index.addi(key, i++); } } else { byte[] record; @@ -146,7 +145,7 @@ public class kelondroEcoTable implements kelondroIndex { System.arraycopy(record, 0, key, 0, rowdef.primaryKeyLength); // write the key into the index table - if (!index.addi(key, i++)) fail++; + index.addi(key, i++); // write the tail into the table table.addUnique(taildef.newEntry(record, rowdef.primaryKeyLength, true)); @@ -173,7 +172,7 @@ public class kelondroEcoTable implements kelondroIndex { for (final Integer[] ds: doubles) { file.get(ds[0].intValue(), record, 0); System.arraycopy(record, 0, key, 0, rowdef.primaryKeyLength); - if (!index.addi(key, ds[0].intValue())) fail++; + index.addi(key, ds[0].intValue()); } // then remove the other doubles by removing them from the table, but do a re-indexing while doing that // first aggregate all the delete positions because the elements from the top positions must be removed first @@ -250,15 +249,11 @@ public class kelondroEcoTable implements kelondroIndex { return (int) ((rowdef.primaryKeyLength + 4) * tableSize(f, rowdef.objectsize) * kelondroRowCollection.growfactor); } - public synchronized boolean addUnique(final Entry row) throws IOException { + public synchronized void addUnique(final Entry row) throws IOException { assert file.size() == index.size() + fail : "file.size() = " + file.size() + ", index.size() = " + index.size(); assert ((table == null) || (table.size() == index.size())); final int i = (int) file.size(); - final boolean added = index.addi(row.getPrimaryKeyBytes(), i); - if (!added) { - fail++; - return false; - } + index.addi(row.getPrimaryKeyBytes(), i); if (table != null) { assert table.size() == i; table.addUnique(taildef.newEntry(row.bytes(), rowdef.primaryKeyLength, true)); @@ -266,18 +261,15 @@ public class kelondroEcoTable implements kelondroIndex { } file.add(row.bytes(), 0); assert file.size() == index.size() + fail : "file.size() = " + file.size() + ", index.size() = " + index.size(); - return true; } - public synchronized int addUniqueMultiple(final List rows) throws IOException { + public synchronized void addUniqueMultiple(final List rows) throws IOException { assert file.size() == index.size() + fail : "file.size() = " + file.size() + ", index.size() = " + index.size(); final Iterator i = rows.iterator(); - int c = 0; while (i.hasNext()) { - if (addUnique(i.next())) c++; + addUnique(i.next()); } assert file.size() == index.size() + fail : "file.size() = " + file.size() + ", index.size() = " + index.size(); - return c; } /** diff --git a/source/de/anomic/kelondro/kelondroFlexTable.java b/source/de/anomic/kelondro/kelondroFlexTable.java index f7f8f06e3..4710286ee 100644 --- a/source/de/anomic/kelondro/kelondroFlexTable.java +++ b/source/de/anomic/kelondro/kelondroFlexTable.java @@ -300,13 +300,13 @@ public class kelondroFlexTable extends kelondroFlexWidthArray implements kelondr return oldentry; } - public synchronized boolean addUnique(final kelondroRow.Entry row) throws IOException { + public synchronized void addUnique(final kelondroRow.Entry row) throws IOException { assert row.objectsize() == this.rowdef.objectsize; assert this.size() == index.size() : "content.size() = " + this.size() + ", index.size() = " + index.size(); - return index.addi(row.getColBytes(0), super.add(row)); + index.addi(row.getColBytes(0), super.add(row)); } - public synchronized int addUniqueMultiple(final List rows) throws IOException { + public synchronized void addUniqueMultiple(final List rows) throws IOException { // add a list of entries in a ordered way. // this should save R/W head positioning time final TreeMap indexed_result = super.addMultiple(rows); @@ -319,7 +319,6 @@ public class kelondroFlexTable extends kelondroFlexWidthArray implements kelondr index.puti(entry.getValue(), entry.getKey().intValue()); } assert this.size() == index.size() : "content.size() = " + this.size() + ", index.size() = " + index.size(); - return indexed_result.size(); } public synchronized ArrayList removeDoubles() throws IOException { diff --git a/source/de/anomic/kelondro/kelondroIndex.java b/source/de/anomic/kelondro/kelondroIndex.java index 229d60efc..0d784cb2c 100644 --- a/source/de/anomic/kelondro/kelondroIndex.java +++ b/source/de/anomic/kelondro/kelondroIndex.java @@ -47,8 +47,8 @@ public interface kelondroIndex { public kelondroRow.Entry put(kelondroRow.Entry row) throws IOException; public kelondroRow.Entry put(kelondroRow.Entry row, Date entryDate) throws IOException; public void putMultiple(List rows) throws IOException; // for R/W head path optimization - public boolean addUnique(kelondroRow.Entry row) throws IOException; // no double-check - public int addUniqueMultiple(List rows) throws IOException; // no double-check + public void addUnique(kelondroRow.Entry row) throws IOException; // no double-check + public void addUniqueMultiple(List rows) throws IOException; // no double-check public ArrayList removeDoubles() throws IOException; // removes all elements that are double (to be used after all addUnique) public kelondroRow.Entry remove(byte[] key) throws IOException; public kelondroRow.Entry removeOne() throws IOException; diff --git a/source/de/anomic/kelondro/kelondroRAMIndex.java b/source/de/anomic/kelondro/kelondroRAMIndex.java index 55ed252bf..cbb0b4cad 100644 --- a/source/de/anomic/kelondro/kelondroRAMIndex.java +++ b/source/de/anomic/kelondro/kelondroRAMIndex.java @@ -105,23 +105,20 @@ public class kelondroRAMIndex implements kelondroIndex { } } - public synchronized boolean addUnique(final kelondroRow.Entry entry) { + public synchronized void addUnique(final kelondroRow.Entry entry) { assert (entry != null); if (index1 == null) { // we are in the initialization phase - return index0.addUnique(entry); + index0.addUnique(entry); + return; } // initialization is over, add to secondary index - return index1.addUnique(entry); + index1.addUnique(entry); } - public int addUniqueMultiple(final List rows) { + public void addUniqueMultiple(final List rows) { final Iterator i = rows.iterator(); - int c = 0; - while (i.hasNext()) { - if (addUnique(i.next())) c++; - } - return c; + while (i.hasNext()) addUnique(i.next()); } public synchronized ArrayList removeDoubles() { diff --git a/source/de/anomic/kelondro/kelondroRowCollection.java b/source/de/anomic/kelondro/kelondroRowCollection.java index cba931030..f02599bae 100644 --- a/source/de/anomic/kelondro/kelondroRowCollection.java +++ b/source/de/anomic/kelondro/kelondroRowCollection.java @@ -301,26 +301,22 @@ public class kelondroRowCollection { set(index, a); } - public synchronized boolean addUnique(final kelondroRow.Entry row) { + public synchronized void addUnique(final kelondroRow.Entry row) { final byte[] r = row.bytes(); - return addUnique(r, 0, r.length); + addUnique(r, 0, r.length); } - public synchronized int addUniqueMultiple(final List rows) { + public synchronized void addUniqueMultiple(final List rows) { assert this.sortBound == 0 : "sortBound = " + this.sortBound + ", chunkcount = " + this.chunkcount; final Iterator i = rows.iterator(); - int c = 0; - while (i.hasNext()) { - if (addUnique(i.next())) c++; - } - return c; + while (i.hasNext()) addUnique(i.next()); } public synchronized void add(final byte[] a) { addUnique(a, 0, a.length); } - private final boolean addUnique(final byte[] a, final int astart, final int alength) { + private final void addUnique(final byte[] a, final int astart, final int alength) { assert (a != null); assert (astart >= 0) && (astart < a.length) : " astart = " + astart; assert (!(serverLog.allZero(a, astart, alength))) : "a = " + serverLog.arrayList(a, astart, alength); @@ -338,7 +334,6 @@ public class kelondroRowCollection { System.arraycopy(a, astart, chunkcache, rowdef.objectsize * chunkcount, l); chunkcount++; this.lastTimeWrote = System.currentTimeMillis(); - return true; } public synchronized final void addAllUnique(final kelondroRowCollection c) { diff --git a/source/de/anomic/kelondro/kelondroSQLTable.java b/source/de/anomic/kelondro/kelondroSQLTable.java index 529a00114..eeb2d7f5f 100644 --- a/source/de/anomic/kelondro/kelondroSQLTable.java +++ b/source/de/anomic/kelondro/kelondroSQLTable.java @@ -202,7 +202,7 @@ public class kelondroSQLTable implements kelondroIndex { } } - public synchronized boolean addUnique(final kelondroRow.Entry row) throws IOException { + public synchronized void addUnique(final kelondroRow.Entry row) throws IOException { throw new UnsupportedOperationException(); } @@ -210,7 +210,7 @@ public class kelondroSQLTable implements kelondroIndex { throw new UnsupportedOperationException(); } - public synchronized int addUniqueMultiple(final List rows) throws IOException { + public synchronized void addUniqueMultiple(final List rows) throws IOException { throw new UnsupportedOperationException(); } diff --git a/source/de/anomic/kelondro/kelondroSplitTable.java b/source/de/anomic/kelondro/kelondroSplitTable.java index 0bb4f5561..b7092268f 100644 --- a/source/de/anomic/kelondro/kelondroSplitTable.java +++ b/source/de/anomic/kelondro/kelondroSplitTable.java @@ -308,15 +308,15 @@ public class kelondroSplitTable implements kelondroIndex { return null; } - public synchronized boolean addUnique(final kelondroRow.Entry row) throws IOException { - return addUnique(row, null); + public synchronized void addUnique(final kelondroRow.Entry row) throws IOException { + addUnique(row, null); } - public synchronized boolean addUnique(final kelondroRow.Entry row, Date entryDate) throws IOException { + public synchronized void addUnique(final kelondroRow.Entry row, Date entryDate) throws IOException { assert row.objectsize() <= this.rowdef.objectsize; if ((entryDate == null) || (entryDate.after(new Date()))) entryDate = new Date(); // fix date final String suffix = dateSuffix(entryDate); - if (suffix == null) return false; + if (suffix == null) return; kelondroIndex table = tables.get(suffix); if (table == null) { // make new table @@ -329,16 +329,12 @@ public class kelondroSplitTable implements kelondroIndex { } tables.put(suffix, table); } - return table.addUnique(row); + table.addUnique(row); } - public synchronized int addUniqueMultiple(final List rows) throws IOException { + public synchronized void addUniqueMultiple(final List rows) throws IOException { final Iterator i = rows.iterator(); - int c = 0; - while (i.hasNext()) { - if (addUnique(i.next())) c++; - } - return c; + while (i.hasNext()) addUnique(i.next()); } public synchronized void addUniqueMultiple(final List rows, final Date entryDate) throws IOException { diff --git a/source/de/anomic/kelondro/kelondroTree.java b/source/de/anomic/kelondro/kelondroTree.java index f11ff6732..f28fada21 100644 --- a/source/de/anomic/kelondro/kelondroTree.java +++ b/source/de/anomic/kelondro/kelondroTree.java @@ -482,23 +482,17 @@ public class kelondroTree extends kelondroCachedRecords implements kelondroIndex return result; } - public synchronized boolean addUnique(final kelondroRow.Entry row) throws IOException { - final int s = this.size(); + public synchronized void addUnique(final kelondroRow.Entry row) throws IOException { this.put(row); - return this.size() > s; } public synchronized void addUnique(final kelondroRow.Entry row, final Date entryDate) throws IOException { this.put(row, entryDate); } - public synchronized int addUniqueMultiple(final List rows) throws IOException { + public synchronized void addUniqueMultiple(final List rows) throws IOException { final Iterator i = rows.iterator(); - int c = 0; - while (i.hasNext()) { - if (addUnique(i.next())) c++; - } - return c; + while (i.hasNext()) addUnique(i.next()); } private void assignChild(final kelondroNode parentNode, final kelondroNode childNode, final int childType) throws IOException {