|
|
|
@ -52,12 +52,15 @@ public class kelondroRowCollection {
|
|
|
|
|
static final Integer dummy = new Integer(0);
|
|
|
|
|
|
|
|
|
|
public static ExecutorService sortingthreadexecutor = null;
|
|
|
|
|
public static ExecutorService partitionthreadexecutor = null;
|
|
|
|
|
|
|
|
|
|
static {
|
|
|
|
|
if (serverProcessor.useCPU > 1) {
|
|
|
|
|
sortingthreadexecutor = Executors.newCachedThreadPool(new NamePrefixThreadFactory("sorting"));
|
|
|
|
|
partitionthreadexecutor = Executors.newCachedThreadPool(new NamePrefixThreadFactory("partition"));
|
|
|
|
|
} else {
|
|
|
|
|
sortingthreadexecutor = null;
|
|
|
|
|
partitionthreadexecutor = null;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -503,13 +506,55 @@ public class kelondroRowCollection {
|
|
|
|
|
if ((sortingthreadexecutor != null) &&
|
|
|
|
|
(!sortingthreadexecutor.isShutdown()) &&
|
|
|
|
|
(serverProcessor.useCPU > 1) &&
|
|
|
|
|
(this.chunkcount > 80)) {
|
|
|
|
|
(this.chunkcount > 8000)) {
|
|
|
|
|
// sort this using multi-threading
|
|
|
|
|
Future<Integer> part0 = partitionthreadexecutor.submit(new partitionthread(this, 0, p, 0));
|
|
|
|
|
Future<Integer> part1 = partitionthreadexecutor.submit(new partitionthread(this, p, this.chunkcount, p));
|
|
|
|
|
try {
|
|
|
|
|
int p0 = part0.get().intValue();
|
|
|
|
|
Future<Object> sort0 = sortingthreadexecutor.submit(new qsortthread(this, 0, p0, 0));
|
|
|
|
|
Future<Object> sort1 = sortingthreadexecutor.submit(new qsortthread(this, p0, p, p0));
|
|
|
|
|
int p1 = part1.get().intValue();
|
|
|
|
|
Future<Object> sort2 = sortingthreadexecutor.submit(new qsortthread(this, p, p1, p));
|
|
|
|
|
Future<Object> sort3 = sortingthreadexecutor.submit(new qsortthread(this, p1, this.chunkcount, p1));
|
|
|
|
|
sort0.get();
|
|
|
|
|
sort1.get();
|
|
|
|
|
sort2.get();
|
|
|
|
|
sort3.get();
|
|
|
|
|
} catch (InterruptedException e) {
|
|
|
|
|
e.printStackTrace();
|
|
|
|
|
} catch (ExecutionException e) {
|
|
|
|
|
e.printStackTrace();
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
qsort(0, p, 0, swapspace);
|
|
|
|
|
qsort(p + 1, this.chunkcount, 0, swapspace);
|
|
|
|
|
}
|
|
|
|
|
this.sortBound = this.chunkcount;
|
|
|
|
|
//assert this.isSorted();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public synchronized final void sort2() {
|
|
|
|
|
assert (this.rowdef.objectOrder != null);
|
|
|
|
|
if (this.sortBound == this.chunkcount) return; // this is already sorted
|
|
|
|
|
if (this.chunkcount < isortlimit) {
|
|
|
|
|
isort(0, this.chunkcount, new byte[this.rowdef.objectsize]);
|
|
|
|
|
this.sortBound = this.chunkcount;
|
|
|
|
|
assert this.isSorted();
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
byte[] swapspace = new byte[this.rowdef.objectsize];
|
|
|
|
|
int p = partition(0, this.chunkcount, this.sortBound, swapspace);
|
|
|
|
|
if ((sortingthreadexecutor != null) &&
|
|
|
|
|
(!sortingthreadexecutor.isShutdown()) &&
|
|
|
|
|
(serverProcessor.useCPU > 1) &&
|
|
|
|
|
(this.chunkcount > 4000)) {
|
|
|
|
|
// sort this using multi-threading
|
|
|
|
|
Future<Object> part = sortingthreadexecutor.submit(new qsortthread(this, 0, p, 0));
|
|
|
|
|
//CompletionService<Object> sortingthreadcompletion = new ExecutorCompletionService<Object>(sortingthreadexecutor);
|
|
|
|
|
//Future<Object> part = sortingthreadcompletion.submit(new qsortthread(this, 0, p, 0));
|
|
|
|
|
qsort(p + 1, this.chunkcount, 0, swapspace);
|
|
|
|
|
try {
|
|
|
|
|
qsort(p + 1, this.chunkcount, 0, swapspace);
|
|
|
|
|
try {
|
|
|
|
|
part.get();
|
|
|
|
|
} catch (InterruptedException e) {
|
|
|
|
|
e.printStackTrace();
|
|
|
|
@ -517,8 +562,8 @@ public class kelondroRowCollection {
|
|
|
|
|
e.printStackTrace();
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
qsort(0, p, 0, swapspace);
|
|
|
|
|
qsort(p + 1, this.chunkcount, 0, swapspace);
|
|
|
|
|
qsort(0, p, 0, swapspace);
|
|
|
|
|
qsort(p + 1, this.chunkcount, 0, swapspace);
|
|
|
|
|
}
|
|
|
|
|
this.sortBound = this.chunkcount;
|
|
|
|
|
//assert this.isSorted();
|
|
|
|
@ -554,6 +599,22 @@ public class kelondroRowCollection {
|
|
|
|
|
qsort(p + 1, R, 0, swapspace);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public static class partitionthread implements Callable<Integer> {
|
|
|
|
|
kelondroRowCollection rc;
|
|
|
|
|
int L, R, S;
|
|
|
|
|
|
|
|
|
|
public partitionthread(kelondroRowCollection rc, int L, int R, int S) {
|
|
|
|
|
this.rc = rc;
|
|
|
|
|
this.L = L;
|
|
|
|
|
this.R = R;
|
|
|
|
|
this.S = S;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public Integer call() throws Exception {
|
|
|
|
|
return new Integer(rc.partition(L, R, S, new byte[rc.rowdef.objectsize]));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private final int partition(int L, int R, int S, byte[] swapspace) {
|
|
|
|
|
// L is the first element in the sequence
|
|
|
|
|
// R is the right bound of the sequence, and outside of the sequence
|
|
|
|
@ -584,7 +645,7 @@ public class kelondroRowCollection {
|
|
|
|
|
if (p <= q) {
|
|
|
|
|
oldpivot = pivot;
|
|
|
|
|
pivot = swap(p, q, pivot, swapspace);
|
|
|
|
|
if (pivot != oldpivot) compiledPivot = null; // must be computed again
|
|
|
|
|
if (pivot != oldpivot && compiledPivot != null) compiledPivot = null; // must be computed again
|
|
|
|
|
p++;
|
|
|
|
|
q--;
|
|
|
|
|
}
|
|
|
|
@ -891,7 +952,7 @@ public class kelondroRowCollection {
|
|
|
|
|
|
|
|
|
|
System.out.println("kelondroRowCollection test with size = " + testsize);
|
|
|
|
|
a = new kelondroRowCollection(r, testsize);
|
|
|
|
|
long t0 = System.currentTimeMillis();
|
|
|
|
|
long t0 = System.nanoTime();
|
|
|
|
|
random = new Random(0);
|
|
|
|
|
for (int i = 0; i < testsize; i++) a.add(randomHash().getBytes());
|
|
|
|
|
random = new Random(0);
|
|
|
|
|