@ -28,13 +28,14 @@ package de.anomic.kelondro.table;
import java.io.File ;
import java.io.IOException ;
import java.text.ParseException ;
import java.util.ArrayList ;
import java.util.Calendar ;
import java.util.Date ;
import java.util.HashMap ;
import java.util.Iterator ;
import java.util.List ;
import java.util.Map ;
import java.util.Random ;
import java.util.concurrent.Callable ;
import java.util.concurrent.CompletionService ;
import java.util.concurrent.ExecutionException ;
@ -46,6 +47,7 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor ;
import java.util.concurrent.TimeUnit ;
import de.anomic.kelondro.blob.BLOBArray ;
import de.anomic.kelondro.blob.Cache ;
import de.anomic.kelondro.index.Column ;
import de.anomic.kelondro.index.ObjectIndexCache ;
@ -57,6 +59,7 @@ import de.anomic.kelondro.order.NaturalOrder;
import de.anomic.kelondro.order.MergeIterator ;
import de.anomic.kelondro.order.Order ;
import de.anomic.kelondro.order.StackIterator ;
import de.anomic.kelondro.util.DateFormatter ;
import de.anomic.kelondro.util.FileUtils ;
import de.anomic.kelondro.util.Log ;
import de.anomic.kelondro.util.NamePrefixThreadFactory ;
@ -76,18 +79,42 @@ public class SplitTable implements ObjectIndex {
private HashMap < String , ObjectIndex > tables ; // a map from a date string to a kelondroIndex object
private final Row rowdef ;
private final File path ;
private final String tablename ;
private final String prefix ;
private final Order < Row . Entry > entryOrder ;
private String current ;
private long fileAgeLimit ;
private long fileSizeLimit ;
public SplitTable ( final File path , final String tablename , final Row rowdef , final boolean resetOnFail ) {
public SplitTable (
final File path ,
final String tablename ,
final Row rowdef ,
final boolean resetOnFail ) {
this ( path , tablename , rowdef , BLOBArray . oneMonth , BLOBArray . oneGigabyte , resetOnFail ) ;
}
public SplitTable (
final File path ,
final String tablename ,
final Row rowdef ,
final long fileAgeLimit ,
final long fileSizeLimit ,
final boolean resetOnFail ) {
this . path = path ;
this . tablename = tablename ;
this . prefix = tablename ;
this . rowdef = rowdef ;
this . fileAgeLimit = fileAgeLimit ;
this . fileSizeLimit = fileSizeLimit ;
this . entryOrder = new Row . EntryComparator ( rowdef . objectOrder ) ;
init ( resetOnFail ) ;
}
String newFilename ( ) {
return prefix + "." + DateFormatter . formatShortMilliSecond ( new Date ( ) ) + ".table" ;
}
public void init ( final boolean resetOnFail ) {
current = null ;
// init the thread pool for the keeperOf executor service
this . executor = new ThreadPoolExecutor (
@ -95,23 +122,48 @@ public class SplitTable implements ObjectIndex {
Runtime . getRuntime ( ) . availableProcessors ( ) + 1 , 10 ,
TimeUnit . SECONDS ,
new LinkedBlockingQueue < Runnable > ( ) ,
new NamePrefixThreadFactory ( tablename ) ) ;
new NamePrefixThreadFactory ( prefix ) ) ;
// initialized tables map
this . tables = new HashMap < String , ObjectIndex > ( ) ;
if ( ! ( path . exists ( ) ) ) path . mkdirs ( ) ;
final String [ ] tablefile = path . list ( ) ;
String date ;
String [ ] tablefile = path . list ( ) ;
// zero pass: migrate old table names
File f ;
Random r = new Random ( System . currentTimeMillis ( ) ) ;
for ( int i = 0 ; i < tablefile . length ; i + + ) {
if ( ( tablefile [ i ] . startsWith ( prefix ) ) & &
( tablefile [ i ] . charAt ( prefix . length ( ) ) = = '.' ) & &
( tablefile [ i ] . length ( ) = = prefix . length ( ) + 7 ) ) {
f = new File ( path , tablefile [ i ] ) ;
String newname = tablefile [ i ] + "0100000" + ( Long . toString ( r . nextLong ( ) ) + "00000" ) . substring ( 1 , 5 ) + ".table" ;
f . renameTo ( new File ( path , newname ) ) ;
}
}
tablefile = path . list ( ) ;
// first pass: find tables
final HashMap < String , Long > t = new HashMap < String , Long > ( ) ;
long ram , sum = 0 ;
File f ;
long ram , sum = 0 , time , maxtime = 0 ;
Date d ;
for ( int i = 0 ; i < tablefile . length ; i + + ) {
if ( ( tablefile [ i ] . startsWith ( tablename ) ) & &
( tablefile [ i ] . charAt ( tablename . length ( ) ) = = '.' ) & &
( tablefile [ i ] . length ( ) = = tablename . length ( ) + 7 ) ) {
if ( ( tablefile [ i ] . startsWith ( prefix ) ) & &
( tablefile [ i ] . charAt ( prefix . length ( ) ) = = '.' ) & &
( tablefile [ i ] . length ( ) = = prefix. length ( ) + 24 ) ) {
f = new File ( path , tablefile [ i ] ) ;
try {
d = DateFormatter . parseShortMilliSecond ( tablefile [ i ] . substring ( prefix . length ( ) + 1 , prefix . length ( ) + 18 ) ) ;
} catch ( ParseException e ) {
e . printStackTrace ( ) ;
continue ;
}
time = d . getTime ( ) ;
if ( time > maxtime ) {
current = tablefile [ i ] ;
maxtime = time ;
}
if ( f . isDirectory ( ) ) {
ram = FlexTable . staticRAMIndexNeed ( path , tablefile [ i ] , rowdef ) ;
} else {
@ -146,8 +198,7 @@ public class SplitTable implements ObjectIndex {
// open next biggest table
t . remove ( maxf ) ;
if ( maxf ! = null ) {
date = maxf . substring ( tablename . length ( ) + 1 ) ;
if ( maxf ! = null ) {
f = new File ( path , maxf ) ;
if ( f . isDirectory ( ) ) {
// this is a kelonodroFlex table
@ -156,7 +207,7 @@ public class SplitTable implements ObjectIndex {
}
Log . logInfo ( "kelondroSplitTable" , "opening partial eco table " + f ) ;
table = new EcoTable ( f , rowdef , EcoTable . tailCacheUsageAuto , EcoFSBufferSize , 0 ) ;
tables . put ( date , table ) ;
tables . put ( maxf , table ) ;
}
}
}
@ -165,7 +216,7 @@ public class SplitTable implements ObjectIndex {
this . close ( ) ;
final String [ ] l = path . list ( ) ;
for ( int i = 0 ; i < l . length ; i + + ) {
if ( l [ i ] . startsWith ( tablename ) ) {
if ( l [ i ] . startsWith ( prefix ) ) {
final File f = new File ( path , l [ i ] ) ;
if ( f . isDirectory ( ) ) FlexWidthArray . delete ( path , l [ i ] ) ; else FileUtils . deletedelete ( f ) ;
}
@ -174,24 +225,7 @@ public class SplitTable implements ObjectIndex {
}
public String filename ( ) {
return new File ( path , tablename ) . toString ( ) ;
}
private static final Calendar thisCalendar = Calendar . getInstance ( ) ;
public static final String dateSuffix ( final Date date ) {
int month , year ;
final StringBuilder suffix = new StringBuilder ( 6 ) ;
synchronized ( thisCalendar ) {
thisCalendar . setTime ( date ) ;
month = thisCalendar . get ( Calendar . MONTH ) + 1 ;
year = thisCalendar . get ( Calendar . YEAR ) ;
}
if ( ( year < 1970 ) & & ( year > = 70 ) ) suffix . append ( "19" ) . append ( Integer . toString ( year ) ) ;
else if ( year < 1970 ) suffix . append ( "20" ) . append ( Integer . toString ( year ) ) ;
else if ( year > 3000 ) return null ;
else suffix . append ( Integer . toString ( year ) ) ;
if ( month < 10 ) suffix . append ( "0" ) . append ( Integer . toString ( month ) ) ; else suffix . append ( Integer . toString ( month ) ) ;
return new String ( suffix ) ;
return new File ( path , prefix ) . toString ( ) ;
}
public int size ( ) {
@ -223,61 +257,50 @@ public class SplitTable implements ObjectIndex {
return keeper . get ( key ) ;
}
private ObjectIndex newTable ( ) {
this . current = newFilename ( ) ;
final File f = new File ( path , this . current ) ;
EcoTable table = new EcoTable ( f , rowdef , EcoTable . tailCacheDenyUsage , EcoFSBufferSize , 0 ) ;
tables . put ( this . current , table ) ;
return table ;
}
private ObjectIndex checkTable ( ObjectIndex table ) {
// check size and age of given table; in case it is too large or too old
// create a new table
String name = new File ( table . filename ( ) ) . getName ( ) ;
long d ;
try {
d = DateFormatter . parseShortMilliSecond ( name . substring ( prefix . length ( ) + 1 , prefix . length ( ) + 18 ) ) . getTime ( ) ;
} catch ( ParseException e ) {
e . printStackTrace ( ) ;
d = 0 ;
}
if ( d + this . fileAgeLimit < System . currentTimeMillis ( ) | | new File ( this . path , name ) . length ( ) > = this . fileSizeLimit ) {
return newTable ( ) ;
}
return table ;
}
public synchronized void put ( final List < Row . Entry > rows ) throws IOException {
throw new UnsupportedOperationException ( "not yet implemented" ) ;
for ( Row . Entry entry : rows ) put ( entry ) ;
}
public synchronized Row . Entry replace ( final Row . Entry row ) throws IOException {
assert row . objectsize ( ) < = this . rowdef . objectsize ;
final ObjectIndex keeper = keeperOf ( row . getColBytes ( 0 ) ) ;
ObjectIndex keeper = keeperOf ( row . getColBytes ( 0 ) ) ;
if ( keeper ! = null ) return keeper . replace ( row ) ;
Date entryDate = new Date ( ) ;
final String suffix = dateSuffix ( entryDate ) ;
if ( suffix = = null ) return null ;
ObjectIndex table = tables . get ( suffix ) ;
if ( table = = null ) {
// open table
final File f = new File ( path , tablename + "." + suffix ) ;
if ( f . exists ( ) ) {
if ( f . isDirectory ( ) ) {
FlexTable . delete ( path , tablename + "." + suffix ) ;
}
// open a eco table
table = new EcoTable ( f , rowdef , EcoTable . tailCacheDenyUsage , EcoFSBufferSize , 0 ) ;
} else {
// make new table
table = new EcoTable ( f , rowdef , EcoTable . tailCacheDenyUsage , EcoFSBufferSize , 0 ) ;
}
tables . put ( suffix , table ) ;
}
table . put ( row ) ;
keeper = ( this . current = = null ) ? newTable ( ) : checkTable ( this . tables . get ( this . current ) ) ;
keeper . put ( row ) ;
return null ;
}
public synchronized void put ( final Row . Entry row ) throws IOException {
assert row . objectsize ( ) < = this . rowdef . objectsize ;
final ObjectIndex keeper = keeperOf ( row . getColBytes ( 0 ) ) ;
ObjectIndex keeper = keeperOf ( row . getColBytes ( 0 ) ) ;
if ( keeper ! = null ) { keeper . put ( row ) ; return ; }
Date entryDate = new Date ( ) ;
final String suffix = dateSuffix ( entryDate ) ;
if ( suffix = = null ) return ;
ObjectIndex table = tables . get ( suffix ) ;
if ( table = = null ) {
// open table
final File f = new File ( path , tablename + "." + suffix ) ;
if ( f . exists ( ) ) {
if ( f . isDirectory ( ) ) {
FlexTable . delete ( path , tablename + "." + suffix ) ;
}
// open a eco table
table = new EcoTable ( f , rowdef , EcoTable . tailCacheDenyUsage , EcoFSBufferSize , 0 ) ;
} else {
// make new table
table = new EcoTable ( f , rowdef , EcoTable . tailCacheDenyUsage , EcoFSBufferSize , 0 ) ;
}
tables . put ( suffix , table ) ;
}
table . put ( row ) ;
keeper = ( this . current = = null ) ? newTable ( ) : checkTable ( this . tables . get ( this . current ) ) ;
keeper . put ( row ) ;
}
public synchronized ObjectIndex keeperOf ( final byte [ ] key ) {
@ -329,15 +352,8 @@ public class SplitTable implements ObjectIndex {
public synchronized void addUnique ( final Row . Entry row ) throws IOException {
assert row . objectsize ( ) < = this . rowdef . objectsize ;
Date entryDate = new Date ( ) ;
final String suffix = dateSuffix ( entryDate ) ;
if ( suffix = = null ) return ;
ObjectIndex table = tables . get ( suffix ) ;
if ( table = = null ) {
// make new table
table = new EcoTable ( new File ( path , tablename + "." + suffix ) , rowdef , EcoTable . tailCacheDenyUsage , EcoFSBufferSize , 0 ) ;
tables . put ( suffix , table ) ;
}
ObjectIndex table = ( this . current = = null ) ? null : tables . get ( this . current ) ;
if ( table = = null ) table = newTable ( ) ; else table = checkTable ( table ) ;
table . addUnique ( row ) ;
}
@ -401,26 +417,6 @@ public class SplitTable implements ObjectIndex {
}
return StackIterator . stack ( c ) ;
}
public final int cacheObjectChunkSize ( ) {
// dummy method
return - 1 ;
}
public long [ ] cacheObjectStatus ( ) {
// dummy method
return null ;
}
public final int cacheNodeChunkSize ( ) {
// returns the size that the node cache uses for a single entry
return - 1 ;
}
public final int [ ] cacheNodeStatus ( ) {
// a collection of different node cache status values
return new int [ ] { 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 } ;
}
public synchronized void close ( ) {
if ( tables = = null ) return ;
@ -436,10 +432,6 @@ public class SplitTable implements ObjectIndex {
}
this . tables = null ;
}
public static void main ( final String [ ] args ) {
System . out . println ( dateSuffix ( new Date ( ) ) ) ;
}
public void deleteOnExit ( ) {
for ( ObjectIndex i : this . tables . values ( ) ) i . deleteOnExit ( ) ;