@ -35,6 +35,10 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService ;
import java.util.concurrent.ExecutorService ;
import java.util.concurrent.Executors ;
import java.util.concurrent.Executors ;
import java.util.concurrent.Future ;
import java.util.concurrent.Future ;
import java.util.concurrent.RejectedExecutionException ;
import java.util.concurrent.SynchronousQueue ;
import java.util.concurrent.ThreadPoolExecutor ;
import java.util.concurrent.TimeUnit ;
import net.yacy.kelondro.logging.Log ;
import net.yacy.kelondro.logging.Log ;
import net.yacy.kelondro.order.Base64Order ;
import net.yacy.kelondro.order.Base64Order ;
@ -60,8 +64,27 @@ public class RowCollection implements Iterable<Row.Entry>, Cloneable {
private static final int exp_order_bound = 4 ;
private static final int exp_order_bound = 4 ;
private static final int exp_collection = 5 ;
private static final int exp_collection = 5 ;
public static final ExecutorService sortingthreadexecutor = ( availableCPU > 1 ) ? Executors . newCachedThreadPool ( new NamePrefixThreadFactory ( "sorting" ) ) : null ;
public static final ExecutorService sortingthreadexecutor =
private static final ExecutorService partitionthreadexecutor = ( availableCPU > 1 ) ? Executors . newCachedThreadPool ( new NamePrefixThreadFactory ( "partition" ) ) : null ;
( availableCPU > 1 )
? new ThreadPoolExecutor (
Runtime . getRuntime ( ) . availableProcessors ( ) ,
Integer . MAX_VALUE ,
120L , TimeUnit . SECONDS ,
new SynchronousQueue < Runnable > ( ) ,
new NamePrefixThreadFactory ( "sorting" ) ,
new ThreadPoolExecutor . CallerRunsPolicy ( ) )
: null ;
private static final ExecutorService partitionthreadexecutor =
( availableCPU > 1 )
? new ThreadPoolExecutor (
Runtime . getRuntime ( ) . availableProcessors ( ) ,
Integer . MAX_VALUE ,
120L , TimeUnit . SECONDS ,
new SynchronousQueue < Runnable > ( ) ,
new NamePrefixThreadFactory ( "partition" ) ,
new ThreadPoolExecutor . CallerRunsPolicy ( ) )
: null ;
public final Row rowdef ;
public final Row rowdef ;
protected byte [ ] chunkcache ;
protected byte [ ] chunkcache ;
@ -588,19 +611,53 @@ public class RowCollection implements Iterable<Row.Entry>, Cloneable {
this . chunkcount - p > isortlimit * 5
this . chunkcount - p > isortlimit * 5
) {
) {
// sort this using multi-threading
// sort this using multi-threading
final Future < Integer > part0 = partitionthreadexecutor . submit ( new partitionthread ( this , 0 , p , 0 ) ) ;
Future < Integer > part0 , part1 ;
final Future < Integer > part1 = partitionthreadexecutor . submit ( new partitionthread ( this , p , this . chunkcount , p ) ) ;
int p0 = - 1 , p1 = - 1 ;
try {
part0 = partitionthreadexecutor . submit ( new partitionthread ( this , 0 , p , 0 ) ) ;
} catch ( RejectedExecutionException e ) {
part0 = null ;
try { p0 = new partitionthread ( this , 0 , p , 0 ) . call ( ) . intValue ( ) ; } catch ( Exception ee ) { }
}
try {
part1 = partitionthreadexecutor . submit ( new partitionthread ( this , p , this . chunkcount , p ) ) ;
} catch ( RejectedExecutionException e ) {
part1 = null ;
try { p1 = new partitionthread ( this , p , this . chunkcount , p ) . call ( ) . intValue ( ) ; } catch ( Exception ee ) { }
}
try {
try {
final int p0 = part0 . get ( ) . intValue ( ) ;
if ( part0 ! = null ) p0 = part0 . get ( ) . intValue ( ) ;
final Future < Object > sort0 = sortingthreadexecutor . submit ( new qsortthread ( this , 0 , p0 , 0 ) ) ;
Future < Object > sort0 , sort1 , sort2 , sort3 ;
final Future < Object > sort1 = sortingthreadexecutor . submit ( new qsortthread ( this , p0 , p , p0 ) ) ;
try {
final int p1 = part1 . get ( ) . intValue ( ) ;
sort0 = sortingthreadexecutor . submit ( new qsortthread ( this , 0 , p0 , 0 ) ) ;
final Future < Object > sort2 = sortingthreadexecutor . submit ( new qsortthread ( this , p , p1 , p ) ) ;
} catch ( RejectedExecutionException e ) {
final Future < Object > sort3 = sortingthreadexecutor . submit ( new qsortthread ( this , p1 , this . chunkcount , p1 ) ) ;
sort0 = null ;
sort0 . get ( ) ;
try { new qsortthread ( this , 0 , p0 , 0 ) . call ( ) ; } catch ( Exception ee ) { }
sort1 . get ( ) ;
}
sort2 . get ( ) ;
try {
sort3 . get ( ) ;
sort1 = sortingthreadexecutor . submit ( new qsortthread ( this , p0 , p , p0 ) ) ;
} catch ( RejectedExecutionException e ) {
sort1 = null ;
try { new qsortthread ( this , p0 , p , p0 ) . call ( ) ; } catch ( Exception ee ) { }
}
if ( part1 ! = null ) p1 = part1 . get ( ) . intValue ( ) ;
try {
sort2 = sortingthreadexecutor . submit ( new qsortthread ( this , p , p1 , p ) ) ;
} catch ( RejectedExecutionException e ) {
sort2 = null ;
try { new qsortthread ( this , p , p1 , p ) . call ( ) ; } catch ( Exception ee ) { }
}
try {
sort3 = sortingthreadexecutor . submit ( new qsortthread ( this , p1 , this . chunkcount , p1 ) ) ;
} catch ( RejectedExecutionException e ) {
sort3 = null ;
try { new qsortthread ( this , p1 , this . chunkcount , p1 ) . call ( ) ; } catch ( Exception ee ) { }
}
// wait for all results
if ( sort0 ! = null ) sort0 . get ( ) ;
if ( sort1 ! = null ) sort1 . get ( ) ;
if ( sort2 ! = null ) sort2 . get ( ) ;
if ( sort3 ! = null ) sort3 . get ( ) ;
} catch ( final InterruptedException e ) {
} catch ( final InterruptedException e ) {
Log . logSevere ( "RowCollection" , "" , e ) ;
Log . logSevere ( "RowCollection" , "" , e ) ;
} catch ( final ExecutionException e ) {
} catch ( final ExecutionException e ) {