@ -27,11 +27,15 @@ package de.anomic.kelondro;
import java.io.File ;
import java.io.IOException ;
import java.util.ArrayList ;
import java.util.Date ;
import java.util.HashMap ;
import java.util.Iterator ;
import java.util.List ;
import java.util.Map ;
import java.util.Set ;
import de.anomic.index.indexContainer ;
import de.anomic.server.serverFileUtils ;
import de.anomic.server.logging.serverLog ;
@ -265,159 +269,10 @@ public class kelondroCollectionIndex {
return 2 * m * this . payloadrow . objectsize ;
}
public synchronized void put ( byte [ ] key , kelondroRowCollection collection ) throws IOException , kelondroOutOfLimitsException {
// this replaces an old collection by a new one
// this method is not approriate to extend an existing collection with another collection
putmergeremove ( key , collection , false , null ) ;
}
public synchronized void merge ( byte [ ] key , kelondroRowCollection collection ) throws IOException , kelondroOutOfLimitsException {
putmergeremove ( key , collection , true , null ) ;
}
public synchronized int remove ( byte [ ] key , Set removekeys ) throws IOException , kelondroOutOfLimitsException {
return putmergeremove ( key , null , false , removekeys ) ;
}
private int putmergeremove ( byte [ ] key , kelondroRowCollection collection , boolean merge , Set removekeys ) throws IOException , kelondroOutOfLimitsException {
//if (collection.size() > maxChunks) throw new kelondroOutOfLimitsException(maxChunks, collection.size());
// first find an old entry, if one exists
kelondroRow . Entry indexrow = index . get ( key ) ;
if ( indexrow = = null ) {
if ( ( collection ! = null ) & & ( collection . size ( ) > 0 ) ) {
// the collection is new
int newPartitionNumber = arrayIndex ( collection . size ( ) ) ;
indexrow = index . row ( ) . newEntry ( ) ;
kelondroFixedWidthArray array = getArray ( newPartitionNumber , 0 , this . payloadrow . objectsize ( ) ) ;
// define row
kelondroRow . Entry arrayEntry = array . row ( ) . newEntry ( ) ;
arrayEntry . setCol ( 0 , key ) ;
arrayEntry . setCol ( 1 , collection . exportCollection ( ) ) ;
// write a new entry in this array
int newRowNumber = array . add ( arrayEntry ) ;
// store the new row number in the index
indexrow . setCol ( idx_col_key , key ) ;
indexrow . setCol ( idx_col_chunksize , this . payloadrow . objectsize ( ) ) ;
indexrow . setCol ( idx_col_chunkcount , collection . size ( ) ) ;
indexrow . setCol ( idx_col_clusteridx , ( byte ) newPartitionNumber ) ;
indexrow . setCol ( idx_col_flags , ( byte ) 0 ) ;
indexrow . setCol ( idx_col_indexpos , ( long ) newRowNumber ) ;
indexrow . setCol ( idx_col_lastread , kelondroRowCollection . daysSince2000 ( System . currentTimeMillis ( ) ) ) ;
indexrow . setCol ( idx_col_lastwrote , kelondroRowCollection . daysSince2000 ( System . currentTimeMillis ( ) ) ) ;
index . addUnique ( indexrow ) ;
}
return 0 ;
}
// overwrite the old collection
// read old information
int oldchunksize = ( int ) indexrow . getColLong ( idx_col_chunksize ) ; // needed only for migration
int oldchunkcount = ( int ) indexrow . getColLong ( idx_col_chunkcount ) ;
int oldrownumber = ( int ) indexrow . getColLong ( idx_col_indexpos ) ;
int oldPartitionNumber = ( int ) indexrow . getColByte ( idx_col_clusteridx ) ;
assert ( oldPartitionNumber > = arrayIndex ( oldchunkcount ) ) ;
int oldSerialNumber = 0 ;
if ( merge ) {
// load the old collection and join it
kelondroRowSet oldcollection = getwithparams ( indexrow , oldchunksize , oldchunkcount , oldPartitionNumber , oldrownumber , oldSerialNumber , false ) ;
// join with new collection
oldcollection . addAllUnique ( collection ) ;
oldcollection . shape ( ) ;
oldcollection . uniq ( ) ; // FIXME: not clear if it would be better to insert the collection with put to avoid double-entries
oldcollection . trim ( ) ;
collection = oldcollection ;
}
int removed = 0 ;
if ( removekeys ! = null ) {
// load the old collection and remove keys
kelondroRowSet oldcollection = getwithparams ( indexrow , oldchunksize , oldchunkcount , oldPartitionNumber , oldrownumber , oldSerialNumber , false ) ;
// remove the keys from the set
Iterator i = removekeys . iterator ( ) ;
Object k ;
while ( i . hasNext ( ) ) {
k = i . next ( ) ;
if ( ( k instanceof byte [ ] ) & & ( oldcollection . remove ( ( byte [ ] ) k ) ! = null ) ) removed + + ;
if ( ( k instanceof String ) & & ( oldcollection . remove ( ( ( String ) k ) . getBytes ( ) ) ! = null ) ) removed + + ;
}
oldcollection . shape ( ) ;
oldcollection . trim ( ) ;
collection = oldcollection ;
}
if ( collection . size ( ) = = 0 ) {
// delete the index entry and the array
kelondroFixedWidthArray array = getArray ( oldPartitionNumber , oldSerialNumber , oldchunksize ) ;
array . remove ( oldrownumber ) ;
index . remove ( key ) ;
return removed ;
}
int newPartitionNumber = arrayIndex ( collection . size ( ) ) ;
int newSerialNumber = 0 ;
// see if we need new space or if we can overwrite the old space
if ( oldPartitionNumber = = newPartitionNumber ) {
// we don't need a new slot, just write into the old one
// find array file
kelondroFixedWidthArray array = getArray ( newPartitionNumber , newSerialNumber , this . payloadrow . objectsize ( ) ) ;
// define row
kelondroRow . Entry arrayEntry = array . row ( ) . newEntry ( ) ;
arrayEntry . setCol ( 0 , key ) ;
arrayEntry . setCol ( 1 , collection . exportCollection ( ) ) ;
// overwrite entry in this array
array . set ( oldrownumber , arrayEntry ) ;
// update the index entry
indexrow . setCol ( idx_col_chunkcount , collection . size ( ) ) ;
indexrow . setCol ( idx_col_clusteridx , ( byte ) oldPartitionNumber ) ;
indexrow . setCol ( idx_col_lastwrote , kelondroRowCollection . daysSince2000 ( System . currentTimeMillis ( ) ) ) ;
index . put ( indexrow ) ;
} else {
// we need a new slot, that means we must first delete the old entry
// find array file
kelondroFixedWidthArray array = getArray ( oldPartitionNumber , oldSerialNumber , oldchunksize ) ;
// delete old entry
array . remove ( oldrownumber ) ;
// write a new entry in the other array
array = getArray ( newPartitionNumber , 0 , this . payloadrow . objectsize ( ) ) ;
// define row
kelondroRow . Entry arrayEntry = array . row ( ) . newEntry ( ) ;
arrayEntry . setCol ( 0 , key ) ;
arrayEntry . setCol ( 1 , collection . exportCollection ( ) ) ;
// write a new entry in this array
int newRowNumber = array . add ( arrayEntry ) ;
// store the new row number in the index
indexrow . setCol ( idx_col_key , key ) ;
indexrow . setCol ( idx_col_chunkcount , collection . size ( ) ) ;
indexrow . setCol ( idx_col_clusteridx , ( byte ) newPartitionNumber ) ;
indexrow . setCol ( idx_col_indexpos , ( long ) newRowNumber ) ;
indexrow . setCol ( idx_col_lastwrote , kelondroRowCollection . daysSince2000 ( System . currentTimeMillis ( ) ) ) ;
index . put ( indexrow ) ;
}
return removed ;
}
private void putnew ( byte [ ] key , kelondroRowCollection collection , kelondroRow . Entry indexrow ) throws IOException {
private kelondroRow . Entry putnew ( byte [ ] key , kelondroRowCollection collection ) throws IOException {
// the collection is new
int newPartitionNumber = arrayIndex ( collection . size ( ) ) ;
indexrow = index . row ( ) . newEntry ( ) ;
kelondroRow . Entry indexrow = index . row ( ) . newEntry ( ) ;
kelondroFixedWidthArray array = getArray ( newPartitionNumber , 0 , this . payloadrow . objectsize ( ) ) ;
// define row
@ -439,18 +294,19 @@ public class kelondroCollectionIndex {
indexrow . setCol ( idx_col_lastwrote , kelondroRowCollection . daysSince2000 ( System . currentTimeMillis ( ) ) ) ;
// after calling this method there mus be a index.addUnique(indexrow);
return indexrow ;
}
private void putreplace (
byte [ ] key , kelondroRowCollection collection , kelondroRow . Entry indexrow ,
int serialNumber , int chunkSize ,
int partitionNumber , int rownumber ) throws IOException {
// we don't need a new slot, just write into the old one
// we don't need a new slot, just write collection into the old one
// find array file
kelondroFixedWidthArray array = getArray ( partitionNumber , serialNumber , chunkSize ) ;
// define row
// define new row
kelondroRow . Entry arrayEntry = array . row ( ) . newEntry ( ) ;
arrayEntry . setCol ( 0 , key ) ;
arrayEntry . setCol ( 1 , collection . exportCollection ( ) ) ;
@ -498,14 +354,15 @@ public class kelondroCollectionIndex {
// after calling this method there mus be a index.put(indexrow);
}
p rivate void put1 ( byte [ ] key , kelondroRowCollection collection ) throws IOException , kelondroOutOfLimitsException {
p ublic synchronized void put ( byte [ ] key , kelondroRowCollection collection ) throws IOException , kelondroOutOfLimitsException {
// first find an old entry, if one exists
kelondroRow . Entry indexrow = index . get ( key ) ;
if ( indexrow = = null ) {
// create new row and index entry
if ( ( collection ! = null ) & & ( collection . size ( ) > 0 ) ) {
putnew( key , collection , indexrow ) ; // modifies indexrow
indexrow = putnew( key , collection ) ; // modifies indexrow
index . addUnique ( indexrow ) ;
}
return ;
@ -513,14 +370,14 @@ public class kelondroCollectionIndex {
// overwrite the old collection
// read old information
int oldchunksize = ( int ) indexrow . getColLong ( idx_col_chunksize ) ; // needed only for migration
int oldchunkcount = ( int ) indexrow . getColLong ( idx_col_chunkcount ) ;
int oldrownumber = ( int ) indexrow . getColLong ( idx_col_indexpos ) ;
int oldPartitionNumber = ( int ) indexrow . getColByte ( idx_col_clusteridx ) ;
int oldchunksize = ( int ) indexrow . getColLong ( idx_col_chunksize ) ; // needed only for migration
int oldchunkcount = ( int ) indexrow . getColLong ( idx_col_chunkcount ) ; // the number if rows in the collection
int oldrownumber = ( int ) indexrow . getColLong ( idx_col_indexpos ) ; // index of the entry in array
int oldPartitionNumber = ( int ) indexrow . getColByte ( idx_col_clusteridx ) ; // points to array file
assert ( oldPartitionNumber > = arrayIndex ( oldchunkcount ) ) ;
int oldSerialNumber = 0 ;
if ( collection . size ( ) = = 0 ) {
if ( ( collection = = null ) | | ( collection . size ( ) = = 0 ) ) {
// delete the index entry and the array
kelondroFixedWidthArray array = getArray ( oldPartitionNumber , oldSerialNumber , oldchunksize ) ;
array . remove ( oldrownumber ) ;
@ -535,39 +392,98 @@ public class kelondroCollectionIndex {
putreplace (
key , collection , indexrow ,
oldSerialNumber , this . payloadrow . objectsize ( ) ,
oldPartitionNumber , oldrownumber ) ;
oldPartitionNumber , oldrownumber ) ; // modifies indexrow
} else {
puttransit (
key , collection , indexrow ,
oldSerialNumber , this . payloadrow . objectsize ( ) ,
oldPartitionNumber , oldrownumber ,
newPartitionNumber ) ;
newPartitionNumber ) ; // modifies indexrow
}
index . put ( indexrow ) ; // write modified indexrow
}
private void merge1 ( byte [ ] key , kelondroRowCollection collection ) throws IOException , kelondroOutOfLimitsException {
//if (collection.size() > maxChunks) throw new kelondroOutOfLimitsException(maxChunks, collection.size());
public synchronized void mergeMultiple ( List /* of indexContainer */ containerList ) throws IOException , kelondroOutOfLimitsException {
// merge a bulk of index containers
// this method should be used to optimize the R/W head path length
if ( ( collection = = null ) | | ( collection . size ( ) = = 0 ) ) return ;
// separate the list in two halves:
// - containers that do not exist yet in the collection
// - containers that do exist in the collection and must be merged
Iterator i = containerList . iterator ( ) ;
indexContainer container ;
byte [ ] key ;
ArrayList newContainer = new ArrayList ( ) ;
ArrayList existingContainer = new ArrayList ( ) ;
kelondroRow . Entry indexrow ;
while ( i . hasNext ( ) ) {
container = ( indexContainer ) i . next ( ) ;
if ( ( container = = null ) | | ( container . size ( ) = = 0 ) ) continue ;
key = container . getWordHash ( ) . getBytes ( ) ;
// first find an old entry, if one exists
indexrow = index . get ( key ) ;
if ( indexrow = = null ) {
newContainer . add ( new Object [ ] { key , container } ) ;
} else {
existingContainer . add ( new Object [ ] { key , container , indexrow } ) ;
}
}
// now iterate through the container lists and execute merges
// this is done in such a way, that there is a optimized path for the R/W head
// write new containers
i = newContainer . iterator ( ) ;
Object [ ] record ;
while ( i . hasNext ( ) ) {
record = ( Object [ ] ) i . next ( ) ; // {byte[], indexContainer}
mergeNew ( ( byte [ ] ) record [ 0 ] , ( indexContainer ) record [ 1 ] ) ;
}
// merge existing containers
i = existingContainer . iterator ( ) ;
ArrayList indexrows = new ArrayList ( ) ;
while ( i . hasNext ( ) ) {
record = ( Object [ ] ) i . next ( ) ; // {byte[], indexContainer, kelondroRow.Entry}
indexrow = ( kelondroRow . Entry ) record [ 2 ] ;
mergeExisting ( ( byte [ ] ) record [ 0 ] , ( indexContainer ) record [ 1 ] , indexrow ) ; // modifies indexrow
indexrows . add ( indexrow ) ; // indexrows are collected and written later as block
}
index . putMultiple ( indexrows , new Date ( ) ) ; // write modified indexrows in optimized manner
}
public synchronized void merge ( indexContainer container ) throws IOException , kelondroOutOfLimitsException {
if ( ( container = = null ) | | ( container . size ( ) = = 0 ) ) return ;
byte [ ] key = container . getWordHash ( ) . getBytes ( ) ;
// first find an old entry, if one exists
kelondroRow . Entry indexrow = index . get ( key ) ;
if ( indexrow = = null ) {
if ( ( collection ! = null ) & & ( collection . size ( ) > 0 ) ) {
putnew ( key , collection , indexrow ) ; // modifies indexrow
index . addUnique ( indexrow ) ;
}
return ;
mergeNew ( key , container ) ;
} else {
mergeExisting ( key , container , indexrow ) ; // modifies indexrow
index . put ( indexrow ) ; // write modified indexrow
}
}
private void mergeNew ( byte [ ] key , kelondroRowCollection collection ) throws IOException , kelondroOutOfLimitsException {
// create new row and index entry
kelondroRow . Entry indexrow = putnew ( key , collection ) ; // modifies indexrow
index . addUnique ( indexrow ) ; // write modified indexrow
}
private void mergeExisting ( byte [ ] key , kelondroRowCollection collection , kelondroRow . Entry indexrow ) throws IOException , kelondroOutOfLimitsException {
// merge with the old collection
// attention! this modifies the indexrow entry which must be written with index.put(indexrow) afterwards!
// read old information
int oldchunksize = ( int ) indexrow . getColLong ( idx_col_chunksize ) ; // needed only for migration
int oldchunkcount = ( int ) indexrow . getColLong ( idx_col_chunkcount ) ;
int oldrownumber = ( int ) indexrow . getColLong ( idx_col_indexpos ) ;
int oldPartitionNumber = ( int ) indexrow . getColByte ( idx_col_clusteridx ) ;
int oldchunksize = ( int ) indexrow . getColLong ( idx_col_chunksize ) ; // needed only for migration
int oldchunkcount = ( int ) indexrow . getColLong ( idx_col_chunkcount ) ; // the number if rows in the collection
int oldrownumber = ( int ) indexrow . getColLong ( idx_col_indexpos ) ; // index of the entry in array
int oldPartitionNumber = ( int ) indexrow . getColByte ( idx_col_clusteridx ) ; // points to array file
assert ( oldPartitionNumber > = arrayIndex ( oldchunkcount ) ) ;
int oldSerialNumber = 0 ;
@ -588,19 +504,20 @@ public class kelondroCollectionIndex {
putreplace (
key , collection , indexrow ,
oldSerialNumber , this . payloadrow . objectsize ( ) ,
oldPartitionNumber , oldrownumber ) ;
oldPartitionNumber , oldrownumber ) ; // modifies indexrow
} else {
puttransit (
key , collection , indexrow ,
oldSerialNumber , this . payloadrow . objectsize ( ) ,
oldPartitionNumber , oldrownumber ,
newPartitionNumber ) ;
newPartitionNumber ) ; // modifies indexrow
}
index . put ( indexrow ) ; // write modified indexrow
}
private int remove1 ( byte [ ] key , Set removekeys ) throws IOException , kelondroOutOfLimitsException {
//if (collection.size() > maxChunks) throw new kelondroOutOfLimitsException(maxChunks, collection.size());
public synchronized int remove ( byte [ ] key , Set removekeys ) throws IOException , kelondroOutOfLimitsException {
if ( ( removekeys = = null ) | | ( removekeys . size ( ) = = 0 ) ) return 0 ;
// first find an old entry, if one exists
kelondroRow . Entry indexrow = index . get ( key ) ;
@ -609,10 +526,10 @@ public class kelondroCollectionIndex {
// overwrite the old collection
// read old information
int oldchunksize = ( int ) indexrow . getColLong ( idx_col_chunksize ) ; // needed only for migration
int oldchunkcount = ( int ) indexrow . getColLong ( idx_col_chunkcount ) ;
int oldrownumber = ( int ) indexrow . getColLong ( idx_col_indexpos ) ;
int oldPartitionNumber = ( int ) indexrow . getColByte ( idx_col_clusteridx ) ;
int oldchunksize = ( int ) indexrow . getColLong ( idx_col_chunksize ) ; // needed only for migration
int oldchunkcount = ( int ) indexrow . getColLong ( idx_col_chunkcount ) ; // the number if rows in the collection
int oldrownumber = ( int ) indexrow . getColLong ( idx_col_indexpos ) ; // index of the entry in array
int oldPartitionNumber = ( int ) indexrow . getColByte ( idx_col_clusteridx ) ; // points to array file
assert ( oldPartitionNumber > = arrayIndex ( oldchunkcount ) ) ;
int oldSerialNumber = 0 ;
@ -647,13 +564,13 @@ public class kelondroCollectionIndex {
putreplace (
key , oldcollection , indexrow ,
oldSerialNumber , this . payloadrow . objectsize ( ) ,
oldPartitionNumber , oldrownumber ) ;
oldPartitionNumber , oldrownumber ) ; // modifies indexrow
} else {
puttransit (
key , oldcollection , indexrow ,
oldSerialNumber , this . payloadrow . objectsize ( ) ,
oldPartitionNumber , oldrownumber ,
newPartitionNumber ) ;
newPartitionNumber ) ; // modifies indexrow
}
index . put ( indexrow ) ; // write modified indexrow
return removed ;
@ -830,7 +747,7 @@ public class kelondroCollectionIndex {
for ( int j = 0 ; j < i ; j + + ) {
collection . addUnique ( rowdef . newEntry ( new byte [ ] [ ] { ( "def" + j ) . getBytes ( ) , "xxx" . getBytes ( ) } ) ) ;
}
collectionIndex . merge ( ( "key-" + i ). getBytes ( ) , collection ) ;
collectionIndex . merge ( new indexContainer ( "key-" + i , collection ) ) ;
}
// printout of index