- patch for future computation in SplitTable

- added same concurrent process for has() from SPlitTable in ArrayStack

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@6093 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
orbiter 16 years ago
parent 9a5ec20b3c
commit 15180fc95e

@ -35,7 +35,17 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import de.anomic.kelondro.index.Row;
import de.anomic.kelondro.order.ByteOrder;
@ -48,6 +58,7 @@ import de.anomic.kelondro.text.ReferenceFactory;
import de.anomic.kelondro.text.ReferenceContainerCache.blobFileEntries;
import de.anomic.kelondro.util.DateFormatter;
import de.anomic.kelondro.util.FileUtils;
import de.anomic.kelondro.util.NamePrefixThreadFactory;
import de.anomic.yacy.logging.Log;
public class ArrayStack implements BLOB {
@ -78,6 +89,9 @@ public class ArrayStack implements BLOB {
private String prefix;
private int buffersize;
// the thread pool for the keeperOf executor service
private ExecutorService executor;
public ArrayStack(
final File heapLocation,
final String prefix,
@ -94,6 +108,14 @@ public class ArrayStack implements BLOB {
this.repositoryAgeMax = Long.MAX_VALUE;
this.repositorySizeMax = Long.MAX_VALUE;
// init the thread pool for the keeperOf executor service
this.executor = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors() + 1,
Runtime.getRuntime().availableProcessors() + 1, 10,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(),
new NamePrefixThreadFactory(prefix));
// check existence of the heap directory
if (heapLocation.exists()) {
if (!heapLocation.isDirectory()) throw new IOException("the BLOBArray directory " + heapLocation.toString() + " does not exist (is blocked by a file with same name)");
@ -465,8 +487,59 @@ public class ArrayStack implements BLOB {
* @throws IOException
*/
public synchronized boolean has(byte[] key) {
for (blobItem bi: blobs) if (bi.blob.has(key)) return true;
return false;
blobItem bi = keeperOf(key);
return bi != null;
//for (blobItem bi: blobs) if (bi.blob.has(key)) return true;
//return false;
}
public synchronized blobItem keeperOf(final byte[] key) {
// because the index is stored only in one table,
// and the index is completely in RAM, a concurrency will create
// not concurrent File accesses
//long start = System.currentTimeMillis();
// start a concurrent query to database tables
final CompletionService<blobItem> cs = new ExecutorCompletionService<blobItem>(executor);
int accepted = 0;
for (final blobItem bi : blobs) {
try {
cs.submit(new Callable<blobItem>() {
public blobItem call() {
if (bi.blob.has(key)) return bi;
return null;
}
});
accepted++;
} catch (final RejectedExecutionException e) {
// the executor is either shutting down or the blocking queue is full
// execute the search direct here without concurrency
if (bi.blob.has(key)) return bi;
}
}
// read the result
try {
for (int i = 0; i < accepted; i++) {
final Future<blobItem> f = cs.take();
//hash(System.out.println("**********accepted = " + accepted + ", i =" + i);
if (f == null) continue;
final blobItem index = f.get();
if (index != null) {
//System.out.println("*DEBUG SplitTable success.time = " + (System.currentTimeMillis() - start) + " ms");
return index;
}
}
//System.out.println("*DEBUG SplitTable fail.time = " + (System.currentTimeMillis() - start) + " ms");
return null;
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
} catch (final ExecutionException e) {
e.printStackTrace();
throw new RuntimeException(e.getCause());
}
//System.out.println("*DEBUG SplitTable fail.time = " + (System.currentTimeMillis() - start) + " ms");
return null;
}
/**
@ -476,12 +549,16 @@ public class ArrayStack implements BLOB {
* @throws IOException
*/
public synchronized byte[] get(byte[] key) throws IOException {
byte[] b;
//blobItem bi = keeperOf(key);
//return (bi == null) ? null : bi.blob.get(key);
byte[] b;
for (blobItem bi: blobs) {
b = bi.blob.get(key);
if (b != null) return b;
}
return null;
}
/**

@ -105,7 +105,7 @@ public final class Heap extends HeapModifier implements BLOB {
* @return the number of BLOBs in the heap
*/
public synchronized int size() {
return super.size() + this.buffer.size();
return super.size() + ((this.buffer == null) ? 0 : this.buffer.size());
}

@ -233,6 +233,7 @@ public class HeapModifier extends HeapReader implements BLOB {
protected void shrinkWithGapsAtEnd() {
// find gaps at the end of the file and shrink the file by these gaps
if (this.free == null) return;
try {
while (this.free.size() > 0) {
Long seek = this.free.lastKey();

@ -214,7 +214,7 @@ public class HeapReader {
* @return the number of BLOBs in the heap
*/
public synchronized int size() {
return this.index.size();
return (this.index == null) ? 0 : this.index.size();
}
/**

@ -298,8 +298,7 @@ public class SplitTable implements ObjectIndex {
// start a concurrent query to database tables
final CompletionService<ObjectIndex> cs = new ExecutorCompletionService<ObjectIndex>(executor);
final int s = tables.size();
int rejected = 0;
int accepted = 0;
for (final ObjectIndex table : tables.values()) {
try {
cs.submit(new Callable<ObjectIndex>() {
@ -308,22 +307,24 @@ public class SplitTable implements ObjectIndex {
return dummyIndex;
}
});
accepted++;
} catch (final RejectedExecutionException e) {
// the executor is either shutting down or the blocking queue is full
// execute the search direct here without concurrency
if (table.has(key)) return table;
rejected++;
}
}
// read the result
try {
for (int i = 0, n = s - rejected; i < n; i++) {
for (int i = 0; i < accepted; i++) {
final Future<ObjectIndex> f = cs.take();
//hash(System.out.println("**********accepted = " + accepted + ", i =" + i);
if (f == null) continue;
final ObjectIndex index = f.get();
if (index != dummyIndex) {
//System.out.println("*DEBUG SplitTable success.time = " + (System.currentTimeMillis() - start) + " ms");
return index;
return index;
}
}
//System.out.println("*DEBUG SplitTable fail.time = " + (System.currentTimeMillis() - start) + " ms");

@ -312,7 +312,7 @@ public final class plasmaSearchRankingProcess {
// finally remove the best entry from the doubledom cache
m = this.doubleDomCache.get(bestEntry.element.metadataHash().substring(6));
o = m.pop();
assert o == null || o.element.metadataHash().equals(bestEntry.element.metadataHash()) : "bestEntry.element.metadataHash() = " + bestEntry.element.metadataHash() + ", o.element.metadataHash() = " + o.element.metadataHash();
//assert o == null || o.element.metadataHash().equals(bestEntry.element.metadataHash()) : "bestEntry.element.metadataHash() = " + bestEntry.element.metadataHash() + ", o.element.metadataHash() = " + o.element.metadataHash();
return bestEntry;
}

Loading…
Cancel
Save