diff --git a/source/de/anomic/kelondro/blob/ArrayStack.java b/source/de/anomic/kelondro/blob/ArrayStack.java index 3e1c3cf13..0828de234 100755 --- a/source/de/anomic/kelondro/blob/ArrayStack.java +++ b/source/de/anomic/kelondro/blob/ArrayStack.java @@ -35,7 +35,17 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import de.anomic.kelondro.index.Row; import de.anomic.kelondro.order.ByteOrder; @@ -48,6 +58,7 @@ import de.anomic.kelondro.text.ReferenceFactory; import de.anomic.kelondro.text.ReferenceContainerCache.blobFileEntries; import de.anomic.kelondro.util.DateFormatter; import de.anomic.kelondro.util.FileUtils; +import de.anomic.kelondro.util.NamePrefixThreadFactory; import de.anomic.yacy.logging.Log; public class ArrayStack implements BLOB { @@ -78,6 +89,9 @@ public class ArrayStack implements BLOB { private String prefix; private int buffersize; + // the thread pool for the keeperOf executor service + private ExecutorService executor; + public ArrayStack( final File heapLocation, final String prefix, @@ -94,6 +108,14 @@ public class ArrayStack implements BLOB { this.repositoryAgeMax = Long.MAX_VALUE; this.repositorySizeMax = Long.MAX_VALUE; + // init the thread pool for the keeperOf executor service + this.executor = new ThreadPoolExecutor( + Runtime.getRuntime().availableProcessors() + 1, + Runtime.getRuntime().availableProcessors() + 1, 10, + TimeUnit.SECONDS, + new LinkedBlockingQueue(), + new NamePrefixThreadFactory(prefix)); + // check existence of the heap directory if (heapLocation.exists()) { if (!heapLocation.isDirectory()) throw new IOException("the BLOBArray directory " + heapLocation.toString() + " does not exist (is blocked by a file with same name)"); @@ -465,8 +487,59 @@ public class ArrayStack implements BLOB { * @throws IOException */ public synchronized boolean has(byte[] key) { - for (blobItem bi: blobs) if (bi.blob.has(key)) return true; - return false; + blobItem bi = keeperOf(key); + return bi != null; + //for (blobItem bi: blobs) if (bi.blob.has(key)) return true; + //return false; + } + + public synchronized blobItem keeperOf(final byte[] key) { + // because the index is stored only in one table, + // and the index is completely in RAM, a concurrency will create + // not concurrent File accesses + //long start = System.currentTimeMillis(); + + // start a concurrent query to database tables + final CompletionService cs = new ExecutorCompletionService(executor); + int accepted = 0; + for (final blobItem bi : blobs) { + try { + cs.submit(new Callable() { + public blobItem call() { + if (bi.blob.has(key)) return bi; + return null; + } + }); + accepted++; + } catch (final RejectedExecutionException e) { + // the executor is either shutting down or the blocking queue is full + // execute the search direct here without concurrency + if (bi.blob.has(key)) return bi; + } + } + + // read the result + try { + for (int i = 0; i < accepted; i++) { + final Future f = cs.take(); + //hash(System.out.println("**********accepted = " + accepted + ", i =" + i); + if (f == null) continue; + final blobItem index = f.get(); + if (index != null) { + //System.out.println("*DEBUG SplitTable success.time = " + (System.currentTimeMillis() - start) + " ms"); + return index; + } + } + //System.out.println("*DEBUG SplitTable fail.time = " + (System.currentTimeMillis() - start) + " ms"); + return null; + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (final ExecutionException e) { + e.printStackTrace(); + throw new RuntimeException(e.getCause()); + } + //System.out.println("*DEBUG SplitTable fail.time = " + (System.currentTimeMillis() - start) + " ms"); + return null; } /** @@ -476,12 +549,16 @@ public class ArrayStack implements BLOB { * @throws IOException */ public synchronized byte[] get(byte[] key) throws IOException { - byte[] b; + //blobItem bi = keeperOf(key); + //return (bi == null) ? null : bi.blob.get(key); + + byte[] b; for (blobItem bi: blobs) { b = bi.blob.get(key); if (b != null) return b; } return null; + } /** diff --git a/source/de/anomic/kelondro/blob/Heap.java b/source/de/anomic/kelondro/blob/Heap.java index 60d4753a3..2558e5215 100755 --- a/source/de/anomic/kelondro/blob/Heap.java +++ b/source/de/anomic/kelondro/blob/Heap.java @@ -105,7 +105,7 @@ public final class Heap extends HeapModifier implements BLOB { * @return the number of BLOBs in the heap */ public synchronized int size() { - return super.size() + this.buffer.size(); + return super.size() + ((this.buffer == null) ? 0 : this.buffer.size()); } diff --git a/source/de/anomic/kelondro/blob/HeapModifier.java b/source/de/anomic/kelondro/blob/HeapModifier.java index 1c302541b..3eab471fe 100644 --- a/source/de/anomic/kelondro/blob/HeapModifier.java +++ b/source/de/anomic/kelondro/blob/HeapModifier.java @@ -233,6 +233,7 @@ public class HeapModifier extends HeapReader implements BLOB { protected void shrinkWithGapsAtEnd() { // find gaps at the end of the file and shrink the file by these gaps + if (this.free == null) return; try { while (this.free.size() > 0) { Long seek = this.free.lastKey(); diff --git a/source/de/anomic/kelondro/blob/HeapReader.java b/source/de/anomic/kelondro/blob/HeapReader.java index e91a09b30..5649462de 100644 --- a/source/de/anomic/kelondro/blob/HeapReader.java +++ b/source/de/anomic/kelondro/blob/HeapReader.java @@ -214,7 +214,7 @@ public class HeapReader { * @return the number of BLOBs in the heap */ public synchronized int size() { - return this.index.size(); + return (this.index == null) ? 0 : this.index.size(); } /** diff --git a/source/de/anomic/kelondro/table/SplitTable.java b/source/de/anomic/kelondro/table/SplitTable.java index 66ed03c5c..b2e04adfd 100644 --- a/source/de/anomic/kelondro/table/SplitTable.java +++ b/source/de/anomic/kelondro/table/SplitTable.java @@ -298,8 +298,7 @@ public class SplitTable implements ObjectIndex { // start a concurrent query to database tables final CompletionService cs = new ExecutorCompletionService(executor); - final int s = tables.size(); - int rejected = 0; + int accepted = 0; for (final ObjectIndex table : tables.values()) { try { cs.submit(new Callable() { @@ -308,22 +307,24 @@ public class SplitTable implements ObjectIndex { return dummyIndex; } }); + accepted++; } catch (final RejectedExecutionException e) { // the executor is either shutting down or the blocking queue is full // execute the search direct here without concurrency if (table.has(key)) return table; - rejected++; } } // read the result try { - for (int i = 0, n = s - rejected; i < n; i++) { + for (int i = 0; i < accepted; i++) { final Future f = cs.take(); + //hash(System.out.println("**********accepted = " + accepted + ", i =" + i); + if (f == null) continue; final ObjectIndex index = f.get(); if (index != dummyIndex) { //System.out.println("*DEBUG SplitTable success.time = " + (System.currentTimeMillis() - start) + " ms"); - return index; + return index; } } //System.out.println("*DEBUG SplitTable fail.time = " + (System.currentTimeMillis() - start) + " ms"); diff --git a/source/de/anomic/plasma/plasmaSearchRankingProcess.java b/source/de/anomic/plasma/plasmaSearchRankingProcess.java index 5b10757e3..76a99c879 100644 --- a/source/de/anomic/plasma/plasmaSearchRankingProcess.java +++ b/source/de/anomic/plasma/plasmaSearchRankingProcess.java @@ -312,7 +312,7 @@ public final class plasmaSearchRankingProcess { // finally remove the best entry from the doubledom cache m = this.doubleDomCache.get(bestEntry.element.metadataHash().substring(6)); o = m.pop(); - assert o == null || o.element.metadataHash().equals(bestEntry.element.metadataHash()) : "bestEntry.element.metadataHash() = " + bestEntry.element.metadataHash() + ", o.element.metadataHash() = " + o.element.metadataHash(); + //assert o == null || o.element.metadataHash().equals(bestEntry.element.metadataHash()) : "bestEntry.element.metadataHash() = " + bestEntry.element.metadataHash() + ", o.element.metadataHash() = " + o.element.metadataHash(); return bestEntry; }