@ -41,7 +41,9 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException ;
import java.util.concurrent.ExecutorCompletionService ;
import java.util.concurrent.ExecutorService ;
import java.util.concurrent.Executors ;
import java.util.concurrent.Future ;
import java.util.concurrent.FutureTask ;
import java.util.concurrent.LinkedBlockingQueue ;
import java.util.concurrent.RejectedExecutionException ;
import java.util.concurrent.ThreadPoolExecutor ;
@ -561,7 +563,7 @@ public class ArrayStack implements BLOB {
if ( bi . blob . containsKey ( key ) ) return bi ;
return null ;
}
// first check the current blob only because that has most probably the key if any has that key
int bs1 = this . blobs . size ( ) - 1 ;
blobItem bi = this . blobs . get ( bs1 ) ;
@ -572,7 +574,7 @@ public class ArrayStack implements BLOB {
if ( bi . blob . containsKey ( key ) ) return bi ;
return null ;
}
// start a concurrent query to database tables
final CompletionService < blobItem > cs = new ExecutorCompletionService < blobItem > ( this . executor ) ;
int accepted = 0 ;
@ -839,30 +841,35 @@ public class ArrayStack implements BLOB {
final blobItem bi = this . blobs . get ( 0 ) ;
bi . blob . delete ( key ) ;
} else {
final Thread [ ] t = new Thread [ this . blobs . size ( ) - 1 ] ;
@SuppressWarnings ( "unchecked" )
final FutureTask < Boolean > [ ] t = new FutureTask [ this . blobs . size ( ) - 1 ] ;
int i = 0 ;
for ( final blobItem bi : this . blobs ) {
if ( i < t . length ) {
// run this in a concurrent thread
final blobItem bi0 = bi ;
t [ i ] = new Thread ( ) {
t [ i ] = new FutureTask< Boolean > ( new Callable < Boolean > ( ) {
@Override
public void run ( ) {
public Boolean call ( ) {
try { bi0 . blob . delete ( key ) ; } catch ( final IOException e ) { }
return true ;
}
} ;
t[ i ] . start ( ) ;
} ) ;
DELETE_EXECUTOR. execute ( t [ i ] ) ;
} else {
// no additional thread, run in this thread
try { bi . blob . delete ( key ) ; } catch ( final IOException e ) { }
}
i + + ;
}
for ( final Thread s : t ) try { s . join ( ) ; } catch ( final InterruptedException e ) { }
// wait for termination
for ( final FutureTask < Boolean > s : t ) try { s . get ( ) ; } catch ( final InterruptedException e ) { } catch ( ExecutionException e ) { }
}
assert mem ( ) < = m : "m = " + m + ", mem() = " + mem ( ) ;
}
private static final ExecutorService DELETE_EXECUTOR = Executors . newFixedThreadPool ( 128 ) ;
/ * *
* close the BLOB
* /