|
|
|
@ -36,13 +36,14 @@ import java.util.HashSet;
|
|
|
|
|
import java.util.Iterator;
|
|
|
|
|
import java.util.List;
|
|
|
|
|
import java.util.Map;
|
|
|
|
|
import java.util.concurrent.ArrayBlockingQueue;
|
|
|
|
|
import java.util.concurrent.Callable;
|
|
|
|
|
import java.util.concurrent.CompletionService;
|
|
|
|
|
import java.util.concurrent.ExecutionException;
|
|
|
|
|
import java.util.concurrent.ExecutorCompletionService;
|
|
|
|
|
import java.util.concurrent.ExecutorService;
|
|
|
|
|
import java.util.concurrent.Future;
|
|
|
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
|
|
|
import java.util.concurrent.RejectedExecutionException;
|
|
|
|
|
import java.util.concurrent.ThreadPoolExecutor;
|
|
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
|
|
|
@ -79,7 +80,7 @@ public class kelondroSplitTable implements kelondroIndex {
|
|
|
|
|
public void init(boolean resetOnFail) {
|
|
|
|
|
|
|
|
|
|
// init the thread pool for the keeperOf executor service
|
|
|
|
|
this.executor = new ThreadPoolExecutor(serverProcessor.useCPU + 1, serverProcessor.useCPU + 1, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(serverProcessor.useCPU + 1));
|
|
|
|
|
this.executor = new ThreadPoolExecutor(serverProcessor.useCPU + 1, serverProcessor.useCPU + 1, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
|
|
|
|
|
|
|
|
|
|
// initialized tables map
|
|
|
|
|
this.tables = new HashMap<String, kelondroIndex>();
|
|
|
|
@ -264,7 +265,9 @@ public class kelondroSplitTable implements kelondroIndex {
|
|
|
|
|
// start a concurrent query to database tables
|
|
|
|
|
CompletionService<kelondroIndex> cs = new ExecutorCompletionService<kelondroIndex>(executor);
|
|
|
|
|
int s = tables.size();
|
|
|
|
|
int rejected = 0;
|
|
|
|
|
for (final kelondroIndex table : tables.values()) {
|
|
|
|
|
try {
|
|
|
|
|
cs.submit(new Callable<kelondroIndex>() {
|
|
|
|
|
public kelondroIndex call() {
|
|
|
|
|
try {
|
|
|
|
@ -274,11 +277,19 @@ public class kelondroSplitTable implements kelondroIndex {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
} catch (RejectedExecutionException e) {
|
|
|
|
|
// the executor is either shutting down or the blocking queue is full
|
|
|
|
|
// execute the search direct here without concurrency
|
|
|
|
|
try {
|
|
|
|
|
if (table.has(key)) return table;
|
|
|
|
|
} catch (IOException ee) {}
|
|
|
|
|
rejected++;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// read the result
|
|
|
|
|
try {
|
|
|
|
|
for (int i = 0; i < s; i++) {
|
|
|
|
|
for (int i = 0, n = s - rejected; i < n; i++) {
|
|
|
|
|
Future<kelondroIndex> f = cs.take();
|
|
|
|
|
kelondroIndex index = f.get();
|
|
|
|
|
if (index != dummyIndex) {
|
|
|
|
@ -296,21 +307,6 @@ public class kelondroSplitTable implements kelondroIndex {
|
|
|
|
|
//System.out.println("*DEBUG SplitTable fail.time = " + (System.currentTimeMillis() - start) + " ms");
|
|
|
|
|
return null;
|
|
|
|
|
}
|
|
|
|
|
/*
|
|
|
|
|
public synchronized kelondroIndex keeperOf(byte[] key) throws IOException {
|
|
|
|
|
// TODO: apply concurrency here!
|
|
|
|
|
// because the index is stored only in one table,
|
|
|
|
|
// and the index is completely in RAM, a concurrency would create not concurrent File accesses
|
|
|
|
|
long start = System.currentTimeMillis();
|
|
|
|
|
for (final kelondroIndex table : tables.values()) {
|
|
|
|
|
if (table.has(key)) {
|
|
|
|
|
System.out.println("*DEBUG SplitTable success.time = " + (System.currentTimeMillis() - start) + " ms");
|
|
|
|
|
return table;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
System.out.println("*DEBUG SplitTable fail.time = " + (System.currentTimeMillis() - start) + " ms");
|
|
|
|
|
return null;
|
|
|
|
|
}*/
|
|
|
|
|
|
|
|
|
|
public synchronized void addUnique(kelondroRow.Entry row) throws IOException {
|
|
|
|
|
addUnique(row, null);
|
|
|
|
|