@ -36,8 +36,18 @@ import java.util.HashSet;
import java.util.Iterator ;
import java.util.List ;
import java.util.Map ;
import java.util.concurrent.ArrayBlockingQueue ;
import java.util.concurrent.Callable ;
import java.util.concurrent.CompletionService ;
import java.util.concurrent.ExecutionException ;
import java.util.concurrent.ExecutorCompletionService ;
import java.util.concurrent.ExecutorService ;
import java.util.concurrent.Future ;
import java.util.concurrent.ThreadPoolExecutor ;
import java.util.concurrent.TimeUnit ;
import de.anomic.server.serverMemory ;
import de.anomic.server.serverProcessor ;
public class kelondroSplitTable implements kelondroIndex {
@ -47,6 +57,10 @@ public class kelondroSplitTable implements kelondroIndex {
private static final long minimumRAM4Eco = 80 * 1024 * 1024 ;
private static final int EcoFSBufferSize = 20 ;
private static final kelondroIndex dummyIndex = new kelondroRAMIndex ( new kelondroRow ( new kelondroColumn [ ] { new kelondroColumn ( "key" , kelondroColumn . celltype_binary , kelondroColumn . encoder_bytes , 2 , "key" ) } , kelondroNaturalOrder . naturalOrder , 0 ) , 0 ) ;
// the thread pool for the keeperOf executor service
private ExecutorService executor ;
private HashMap < String , kelondroIndex > tables ; // a map from a date string to a kelondroIndex object
private kelondroRow rowdef ;
@ -63,6 +77,9 @@ public class kelondroSplitTable implements kelondroIndex {
}
public void init ( boolean resetOnFail ) {
// init the thread pool for the keeperOf executor service
this . executor = new ThreadPoolExecutor ( serverProcessor . useCPU + 1 , serverProcessor . useCPU + 1 , 10 , TimeUnit . SECONDS , new ArrayBlockingQueue < Runnable > ( serverProcessor . useCPU + 1 ) ) ;
// initialized tables map
this . tables = new HashMap < String , kelondroIndex > ( ) ;
@ -174,12 +191,9 @@ public class kelondroSplitTable implements kelondroIndex {
}
public int writeBufferSize ( ) {
Iterator < kelondroIndex > i = tables . values ( ) . iterator ( ) ;
int s = 0 ;
kelondroIndex ki ;
while ( i . hasNext ( ) ) {
ki = ( ( kelondroIndex ) i . next ( ) ) ;
if ( ki instanceof kelondroCache ) s + = ( ( kelondroCache ) ki ) . writeBufferSize ( ) ;
for ( final kelondroIndex index : tables . values ( ) ) {
if ( index instanceof kelondroCache ) s + = ( ( kelondroCache ) index ) . writeBufferSize ( ) ;
}
return s ;
}
@ -189,19 +203,13 @@ public class kelondroSplitTable implements kelondroIndex {
}
public boolean has ( byte [ ] key ) throws IOException {
Iterator < kelondroIndex > i = tables . values ( ) . iterator ( ) ;
kelondroIndex table ;
while ( i . hasNext ( ) ) {
table = ( kelondroIndex ) i . next ( ) ;
if ( table . has ( key ) ) return true ;
}
return false ;
return keeperOf ( key ) ! = null ;
}
public synchronized kelondroRow . Entry get ( byte [ ] key ) throws IOException {
Object[ ] keeper = keeperOf ( key ) ;
kelondroIndex keeper = keeperOf ( key ) ;
if ( keeper = = null ) return null ;
return ( kelondroRow . Entry ) keeper [ 1 ] ;
return keeper . get ( key ) ;
}
public synchronized void putMultiple ( List < kelondroRow . Entry > rows ) throws IOException {
@ -214,8 +222,8 @@ public class kelondroSplitTable implements kelondroIndex {
public synchronized kelondroRow . Entry put ( kelondroRow . Entry row , Date entryDate ) throws IOException {
assert row . objectsize ( ) < = this . rowdef . objectsize ;
Object[ ] keeper = keeperOf ( row . getColBytes ( 0 ) ) ;
if ( keeper ! = null ) return ( ( kelondroIndex ) keeper [ 0 ] ) . put ( row ) ;
kelondroIndex keeper = keeperOf ( row . getColBytes ( 0 ) ) ;
if ( keeper ! = null ) return keeper . put ( row ) ;
if ( ( entryDate = = null ) | | ( entryDate . after ( new Date ( ) ) ) ) entryDate = new Date ( ) ; // fix date
String suffix = dateSuffix ( entryDate ) ;
if ( suffix = = null ) return null ;
@ -247,17 +255,62 @@ public class kelondroSplitTable implements kelondroIndex {
return null ;
}
public synchronized Object [ ] keeperOf ( byte [ ] key ) throws IOException {
Iterator < kelondroIndex > i = tables . values ( ) . iterator ( ) ;
kelondroIndex table ;
kelondroRow . Entry entry ;
while ( i . hasNext ( ) ) {
table = ( kelondroIndex ) i . next ( ) ;
entry = table . get ( key ) ;
if ( entry ! = null ) return new Object [ ] { table , entry } ;
public synchronized kelondroIndex keeperOf ( final byte [ ] key ) throws IOException {
// because the index is stored only in one table,
// and the index is completely in RAM, a concurrency will create
// not concurrent File accesses
//long start = System.currentTimeMillis();
// start a concurrent query to database tables
CompletionService < kelondroIndex > cs = new ExecutorCompletionService < kelondroIndex > ( executor ) ;
int s = tables . size ( ) ;
for ( final kelondroIndex table : tables . values ( ) ) {
cs . submit ( new Callable < kelondroIndex > ( ) {
public kelondroIndex call ( ) {
try {
if ( table . has ( key ) ) return table ; else return dummyIndex ;
} catch ( IOException e ) {
return dummyIndex ;
}
}
} ) ;
}
// read the result
try {
for ( int i = 0 ; i < s ; i + + ) {
Future < kelondroIndex > f = cs . take ( ) ;
kelondroIndex index = f . get ( ) ;
if ( index ! = dummyIndex ) {
//System.out.println("*DEBUG SplitTable success.time = " + (System.currentTimeMillis() - start) + " ms");
return index ;
}
}
//System.out.println("*DEBUG SplitTable fail.time = " + (System.currentTimeMillis() - start) + " ms");
return null ;
} catch ( InterruptedException e ) {
Thread . currentThread ( ) . interrupt ( ) ;
} catch ( ExecutionException e ) {
throw new RuntimeException ( e . getCause ( ) ) ;
}
//System.out.println("*DEBUG SplitTable fail.time = " + (System.currentTimeMillis() - start) + " ms");
return null ;
}
/ *
public synchronized kelondroIndex keeperOf ( byte [ ] key ) throws IOException {
// TODO: apply concurrency here!
// because the index is stored only in one table,
// and the index is completely in RAM, a concurrency would create not concurrent File accesses
long start = System . currentTimeMillis ( ) ;
for ( final kelondroIndex table : tables . values ( ) ) {
if ( table . has ( key ) ) {
System . out . println ( "*DEBUG SplitTable success.time = " + ( System . currentTimeMillis ( ) - start ) + " ms" ) ;
return table ;
}
}
System . out . println ( "*DEBUG SplitTable fail.time = " + ( System . currentTimeMillis ( ) - start ) + " ms" ) ;
return null ;
} * /
public synchronized void addUnique ( kelondroRow . Entry row ) throws IOException {
addUnique ( row , null ) ;
@ -303,15 +356,9 @@ public class kelondroSplitTable implements kelondroIndex {
}
public synchronized kelondroRow . Entry remove ( byte [ ] key , boolean keepOrder ) throws IOException {
Iterator < kelondroIndex > i = tables . values ( ) . iterator ( ) ;
kelondroIndex table ;
kelondroRow . Entry entry ;
while ( i . hasNext ( ) ) {
table = i . next ( ) ;
entry = table . remove ( key , keepOrder ) ;
if ( entry ! = null ) return entry ;
}
return null ;
kelondroIndex table = keeperOf ( key ) ;
if ( table = = null ) return null ;
return table . remove ( key , keepOrder ) ;
}
public synchronized kelondroRow . Entry removeOne ( ) throws IOException {
@ -372,11 +419,17 @@ public class kelondroSplitTable implements kelondroIndex {
public synchronized void close ( ) {
if ( tables = = null ) return ;
this . executor . shutdown ( ) ;
try {
this . executor . awaitTermination ( 3 , TimeUnit . SECONDS ) ;
} catch ( InterruptedException e ) {
}
this . executor = null ;
Iterator < kelondroIndex > i = tables . values ( ) . iterator ( ) ;
while ( i . hasNext ( ) ) {
i . next ( ) . close ( ) ;
}
tables = null ;
this . tables = null ;
}
public static void main ( String [ ] args ) {