@ -48,10 +48,10 @@ public class kelondroCollectionIndex {
private static final int idx_col_key = 0 ; // the index
private static final int idx_col_chunksize = 1 ; // chunksize (number of bytes in a single chunk, needed for migration option)
private static final int idx_col_chunkcount = 2 ; // chunkcount (number of chunks in this collection) needed to identify array file that has the chunks
private static final int idx_col_chunkcount = 2 ; // chunkcount (number of chunks in this collection)
private static final int idx_col_clusteridx = 3 ; // selector for right cluster file, must be >= arrayIndex(chunkcount)
private static final int idx_col_flags = 4 ; // flags (for future use)
private static final int idx_col_indexpos = 5 ; // indexpos (position in index file)
private static final int idx_col_indexpos = 5 ; // indexpos (position in array file)
private static final int idx_col_lastread = 6 ; // a time stamp, update time in days since 1.1.2000
private static final int idx_col_lastwrote = 7 ; // a time stamp, update time in days since 1.1.2000
@ -139,13 +139,17 @@ public class kelondroCollectionIndex {
if ( ( index ! = null ) & & ( indexGeneration ) ) {
// loop over all elements in array and create index entry for each row
kelondroRow . Entry aentry , ientry ;
kelondroRow . EntryIndex aentry ;
kelondroRow . Entry ientry ;
Iterator ei = array . contentRows ( - 1 ) ;
byte [ ] key ;
long start = System . currentTimeMillis ( ) ;
long lastlog = start ;
for ( int j = 0 ; j < array . USAGE . allCount ( ) ; j + + ) {
aentry = array . get ( j ) ;
int count = 0 ;
while ( ei . hasNext ( ) ) {
aentry = ( kelondroRow . EntryIndex ) ei . next ( ) ;
key = aentry . getColBytes ( 0 ) ;
assert ( key ! = null ) ;
if ( key = = null ) continue ; // skip deleted entries
kelondroRowSet indexrows = new kelondroRowSet ( this . playloadrow , aentry . getColBytes ( 1 ) ) ;
ientry = irow . newEntry ( ) ;
@ -154,14 +158,15 @@ public class kelondroCollectionIndex {
ientry . setCol ( idx_col_chunkcount , indexrows . size ( ) ) ;
ientry . setCol ( idx_col_clusteridx , ( byte ) partitionNumber ) ;
ientry . setCol ( idx_col_flags , ( byte ) 0 ) ;
ientry . setCol ( idx_col_indexpos , j ) ;
ientry . setCol ( idx_col_indexpos , aentry. index ( ) ) ;
ientry . setCol ( idx_col_lastread , t ) ;
ientry . setCol ( idx_col_lastwrote , t ) ;
index . addUnique ( ientry ) ;
count + + ;
// write a log
if ( System . currentTimeMillis ( ) - lastlog > 30000 ) {
serverLog . logFine ( "STARTUP" , "created " + j + " RWI index entries. " + ( ( ( System . currentTimeMillis ( ) - start ) * ( array . USAGE . allCount ( ) - j) / ( ( j = = 0 ) ? 1 : j ) ) / 60000 ) + " minutes remaining for this array" ) ;
serverLog . logFine ( "STARTUP" , "created " + count + " RWI index entries. " + ( ( ( System . currentTimeMillis ( ) - start ) * ( array . USAGE . allCount ( ) - count) / count ) / 60000 ) + " minutes remaining for this array" ) ;
lastlog = System . currentTimeMillis ( ) ;
}
}
@ -235,42 +240,56 @@ public class kelondroCollectionIndex {
}
}
public int size ( ) throws IOException {
public synchronized int size ( ) throws IOException {
return index . size ( ) ;
}
public void put ( byte [ ] key , kelondroRowCollection collection ) throws IOException , kelondroOutOfLimitsException {
public synchronized void put ( byte [ ] key , kelondroRowCollection collection ) throws IOException , kelondroOutOfLimitsException {
// this replaces an old collection by a new one
// this method is not approriate to extend an existing collection with another collection
putmergeremove ( key , collection , false , null , false );
putmergeremove ( key , collection , false , null );
}
public void merge ( byte [ ] key , kelondroRowCollection collection ) throws IOException , kelondroOutOfLimitsException {
putmergeremove ( key , collection , true , null , false );
public synchronized void merge ( byte [ ] key , kelondroRowCollection collection ) throws IOException , kelondroOutOfLimitsException {
putmergeremove ( key , collection , true , null );
}
public int remove ( byte [ ] key , Set removekeys , boolean deletecomplete ) throws IOException , kelondroOutOfLimitsException {
return putmergeremove ( key , null , false , removekeys , deletecomplete );
public synchronized int remove ( byte [ ] key , Set removekeys , boolean deletecomplete ) throws IOException , kelondroOutOfLimitsException {
return putmergeremove ( key , null , false , removekeys );
}
private int putmergeremove ( byte [ ] key , kelondroRowCollection collection , boolean merge , Set removekeys , boolean deletecomplete ) throws IOException , kelondroOutOfLimitsException {
private int putmergeremove ( byte [ ] key , kelondroRowCollection collection , boolean merge , Set removekeys ) throws IOException , kelondroOutOfLimitsException {
//if (collection.size() > maxChunks) throw new kelondroOutOfLimitsException(maxChunks, collection.size());
if ( ( ! merge ) & & ( removekeys ! = null ) & & ( collection ! = null ) & & ( collection . size ( ) = = 0 ) ) {
// this is not a replacement, it is a deletion
delete ( key ) ;
return 0 ;
}
synchronized ( index ) {
// first find an old entry, if one exists
kelondroRow . Entry indexrow = index . get ( key ) ;
if ( indexrow = = null ) {
if ( ( collection ! = null ) & & ( collection . size ( ) > 0 ) ) {
// the collection is new
overwrite ( key , collection , arrayIndex ( collection . size ( ) ) , index . row ( ) . newEntry ( ) ) ;
int newPartitionNumber = arrayIndex ( collection . size ( ) ) ;
indexrow = index . row ( ) . newEntry ( ) ;
kelondroFixedWidthArray array = getArray ( newPartitionNumber , 0 , this . playloadrow . objectsize ( ) ) ;
// define row
kelondroRow . Entry arrayEntry = array . row ( ) . newEntry ( ) ;
arrayEntry . setCol ( 0 , key ) ;
arrayEntry . setCol ( 1 , collection . exportCollection ( ) ) ;
// write a new entry in this array
int newRowNumber = array . add ( arrayEntry ) ;
// store the new row number in the index
indexrow . setCol ( idx_col_key , key ) ;
indexrow . setCol ( idx_col_chunksize , this . playloadrow . objectsize ( ) ) ;
indexrow . setCol ( idx_col_chunkcount , collection . size ( ) ) ;
indexrow . setCol ( idx_col_clusteridx , ( byte ) newPartitionNumber ) ;
indexrow . setCol ( idx_col_flags , ( byte ) 0 ) ;
indexrow . setCol ( idx_col_indexpos , ( long ) newRowNumber ) ;
indexrow . setCol ( idx_col_lastread , kelondroRowCollection . daysSince2000 ( System . currentTimeMillis ( ) ) ) ;
indexrow . setCol ( idx_col_lastwrote , kelondroRowCollection . daysSince2000 ( System . currentTimeMillis ( ) ) ) ;
index . addUnique ( indexrow ) ;
}
return 0 ;
}
@ -286,7 +305,7 @@ public class kelondroCollectionIndex {
if ( merge ) {
// load the old collection and join it
kelondroRowSet oldcollection = getwithparams ( indexrow , oldchunksize , oldchunkcount , oldPartitionNumber , oldrownumber , oldSerialNumber , false , false );
kelondroRowSet oldcollection = getwithparams ( indexrow , oldchunksize , oldchunkcount , oldPartitionNumber , oldrownumber , oldSerialNumber , false );
// join with new collection
oldcollection . addAll ( collection ) ;
@ -298,7 +317,7 @@ public class kelondroCollectionIndex {
int removed = 0 ;
if ( removekeys ! = null ) {
// load the old collection and remove keys
kelondroRowSet oldcollection = getwithparams ( indexrow , oldchunksize , oldchunkcount , oldPartitionNumber , oldrownumber , oldSerialNumber , false , false );
kelondroRowSet oldcollection = getwithparams ( indexrow , oldchunksize , oldchunkcount , oldPartitionNumber , oldrownumber , oldSerialNumber , false );
// remove the keys from the set
Iterator i = removekeys . iterator ( ) ;
@ -309,20 +328,15 @@ public class kelondroCollectionIndex {
if ( ( k instanceof String ) & & ( oldcollection . remove ( ( ( String ) k ) . getBytes ( ) ) ! = null ) ) removed + + ;
}
oldcollection . shape ( ) ;
oldcollection . trim ( ) ;
collection = oldcollection ;
}
if ( collection . size ( ) = = 0 ) {
if ( deletecomplete ) {
// delete the index entry and the array
kelondroFixedWidthArray array = getArray ( oldPartitionNumber , oldSerialNumber , oldchunksize ) ;
array . remove ( oldrownumber ) ;
index . remove ( key ) ;
} else {
// update the index entry
indexrow . setCol ( idx_col_chunkcount , 0 ) ;
indexrow . setCol ( idx_col_lastwrote , kelondroRowCollection . daysSince2000 ( System . currentTimeMillis ( ) ) ) ;
index . put ( indexrow ) ;
}
return removed ;
}
@ -347,7 +361,6 @@ public class kelondroCollectionIndex {
// update the index entry
indexrow . setCol ( idx_col_chunkcount , collection . size ( ) ) ;
indexrow . setCol ( idx_col_clusteridx , ( byte ) oldPartitionNumber ) ;
indexrow . setCol ( idx_col_flags , ( byte ) 0 ) ;
indexrow . setCol ( idx_col_lastwrote , kelondroRowCollection . daysSince2000 ( System . currentTimeMillis ( ) ) ) ;
index . put ( indexrow ) ;
} else {
@ -359,18 +372,7 @@ public class kelondroCollectionIndex {
array . remove ( oldrownumber ) ;
// write a new entry in the other array
overwrite ( key , collection , newPartitionNumber , indexrow ) ;
}
return removed ;
}
}
private void overwrite ( byte [ ] key , kelondroRowCollection collection , int targetpartition , kelondroRow . Entry indexEntry ) throws IOException {
// helper method, should not be called directly and only within a synchronized(index) environment
// simply store a collection without check if the collection existed before
// find array file
kelondroFixedWidthArray array = getArray ( targetpartition , 0 , this . playloadrow . objectsize ( ) ) ;
array = getArray ( newPartitionNumber , 0 , this . playloadrow . objectsize ( ) ) ;
// define row
kelondroRow . Entry arrayEntry = array . row ( ) . newEntry ( ) ;
@ -381,46 +383,41 @@ public class kelondroCollectionIndex {
int newRowNumber = array . add ( arrayEntry ) ;
// store the new row number in the index
indexEntry . setCol ( idx_col_key , key ) ;
indexEntry . setCol ( idx_col_chunksize , this . playloadrow . objectsize ( ) ) ;
indexEntry . setCol ( idx_col_chunkcount , collection . size ( ) ) ;
indexEntry . setCol ( idx_col_clusteridx , ( byte ) targetpartition ) ;
indexEntry . setCol ( idx_col_flags , ( byte ) 0 ) ;
indexEntry . setCol ( idx_col_indexpos , ( long ) newRowNumber ) ;
indexEntry . setCol ( idx_col_lastread , kelondroRowCollection . daysSince2000 ( System . currentTimeMillis ( ) ) ) ;
indexEntry . setCol ( idx_col_lastwrote , kelondroRowCollection . daysSince2000 ( System . currentTimeMillis ( ) ) ) ;
index . put ( indexEntry ) ;
indexrow . setCol ( idx_col_key , key ) ;
indexrow . setCol ( idx_col_chunkcount , collection . size ( ) ) ;
indexrow . setCol ( idx_col_clusteridx , ( byte ) newPartitionNumber ) ;
indexrow . setCol ( idx_col_indexpos , ( long ) newRowNumber ) ;
indexrow . setCol ( idx_col_lastwrote , kelondroRowCollection . daysSince2000 ( System . currentTimeMillis ( ) ) ) ;
index . put ( indexrow ) ;
}
return removed ;
}
public int indexSize ( byte [ ] key ) throws IOException {
synchronized ( index ) {
public synchronized int indexSize ( byte [ ] key ) throws IOException {
kelondroRow . Entry indexrow = index . get ( key ) ;
if ( indexrow = = null ) return 0 ;
return ( int ) indexrow . getColLong ( idx_col_chunkcount ) ;
}
}
public kelondroRowSet get ( byte [ ] key , boolean deleteIfEmpty ) throws IOException {
public synchronized kelondroRowSet get ( byte [ ] key , boolean deleteIfEmpty ) throws IOException {
// find an entry, if one exists
synchronized ( index ) {
kelondroRow . Entry indexrow = index . get ( key ) ;
if ( indexrow = = null ) return null ;
return getdelete ( indexrow , false , deleteIfEmpty ) ;
}
kelondroRowSet col = getdelete ( indexrow , false ) ;
assert ( col ! = null ) ;
return col ;
}
public kelondroRowSet delete ( byte [ ] key ) throws IOException {
public synchronized kelondroRowSet delete ( byte [ ] key ) throws IOException {
// find an entry, if one exists
synchronized ( index ) {
kelondroRow . Entry indexrow = index . get ( key ) ;
kelondroRow . Entry indexrow = index . remove ( key ) ;
if ( indexrow = = null ) return null ;
kelondroRowSet removedCollection = getdelete ( indexrow , tru e, fals e) ;
index . remove ( key ) ;
kelondroRowSet removedCollection = getdelete ( indexrow , tru e) ;
assert ( removedCollection ! = null ) ;
return removedCollection ;
}
}
protected kelondroRowSet getdelete ( kelondroRow . Entry indexrow , boolean remove , boolean deleteIfEmpty ) throws IOException {
protected kelondroRowSet getdelete ( kelondroRow . Entry indexrow , boolean remove ) throws IOException {
// call this only within a synchronized(index) environment
// read values
@ -431,10 +428,10 @@ public class kelondroCollectionIndex {
assert ( partitionnumber > = arrayIndex ( chunkcount ) ) ;
int serialnumber = 0 ;
return getwithparams ( indexrow , chunksize , chunkcount , partitionnumber , rownumber , serialnumber , remove , deleteIfEmpty );
return getwithparams ( indexrow , chunksize , chunkcount , partitionnumber , rownumber , serialnumber , remove );
}
private kelondroRowSet getwithparams ( kelondroRow . Entry indexrow , int chunksize , int chunkcount , int clusteridx , int rownumber , int serialnumber , boolean remove , boolean deleteIfEmpty ) throws IOException {
private kelondroRowSet getwithparams ( kelondroRow . Entry indexrow , int chunksize , int chunkcount , int clusteridx , int rownumber , int serialnumber , boolean remove ) throws IOException {
// open array entry
kelondroFixedWidthArray array = getArray ( clusteridx , serialnumber , chunksize ) ;
kelondroRow . Entry arrayrow = array . get ( rownumber ) ;
@ -442,9 +439,10 @@ public class kelondroCollectionIndex {
// read the row and define a collection
kelondroRowSet collection = new kelondroRowSet ( this . playloadrow , arrayrow . getColBytes ( 1 ) ) ; // FIXME: this does not yet work with different rowdef in case of several rowdef.objectsize()
if ( index . order ( ) . compare ( arrayrow . getColBytes ( 0 ) , indexrow . getColBytes ( idx_col_key ) ) ! = 0 ) {
byte [ ] key = indexrow . getColBytes ( idx_col_key ) ;
if ( index . order ( ) . compare ( arrayrow . getColBytes ( 0 ) , key ) ! = 0 ) {
// check if we got the right row; this row is wrong. Fix it:
index . remove ( indexrow. getColBytes ( idx_col_ key) ) ; // the wrong row cannot be fixed
index . remove ( key) ; // the wrong row cannot be fixed
// store the row number in the index; this may be a double-entry, but better than nothing
kelondroRow . Entry indexEntry = index . row ( ) . newEntry ( ) ;
indexEntry . setCol ( idx_col_key , arrayrow . getColBytes ( 0 ) ) ;
@ -465,11 +463,11 @@ public class kelondroCollectionIndex {
index . put ( indexrow ) ;
array . logFailure ( "INCONSISTENCY in " + arrayFile ( this . path , this . filenameStub , this . loadfactor , chunksize , clusteridx , serialnumber ) . toString ( ) + ": array has different chunkcount than index: index = " + chunkcount + ", array = " + chunkcountInArray + "; the index has been auto-fixed" ) ;
}
if ( ( remove ) | | ( ( collection . size ( ) = = 0 ) & & ( deleteIfEmpty ) ) ) array . remove ( rownumber ) ;
if ( remove ) array . remove ( rownumber ) ; // index is removed in calling method
return collection ;
}
public Iterator keycollections ( byte [ ] startKey , boolean rot ) {
public synchronized Iterator keycollections ( byte [ ] startKey , boolean rot ) {
// returns an iteration of {byte[], kelondroRowSet} Objects
try {
return new keycollectionIterator ( startKey , rot ) ;
@ -494,9 +492,10 @@ public class kelondroCollectionIndex {
public Object next ( ) {
kelondroRow . Entry indexrow = ( kelondroRow . Entry ) indexRowIterator . next ( ) ;
assert ( indexrow ! = null ) ;
if ( indexrow = = null ) return null ;
try {
return new Object [ ] { indexrow . getColBytes ( 0 ) , getdelete ( indexrow , false , false )} ;
return new Object [ ] { indexrow . getColBytes ( 0 ) , getdelete ( indexrow , false )} ;
} catch ( IOException e ) {
e . printStackTrace ( ) ;
return null ;
@ -509,15 +508,13 @@ public class kelondroCollectionIndex {
}
public void close ( ) throws IOException {
synchronized ( index ) {
public synchronized void close ( ) throws IOException {
this . index . close ( ) ;
Iterator i = arrays . values ( ) . iterator ( ) ;
while ( i . hasNext ( ) ) {
( ( kelondroFixedWidthArray ) i . next ( ) ) . close ( ) ;
}
}
}
public static void main ( String [ ] args ) {