@ -35,7 +35,6 @@ import java.io.OutputStream;
import java.util.Iterator ;
import java.util.Map ;
import java.util.concurrent.LinkedBlockingQueue ;
import java.util.concurrent.Semaphore ;
import java.util.zip.GZIPInputStream ;
import java.util.zip.GZIPOutputStream ;
@ -45,13 +44,12 @@ public class kelondroBLOBBuffer extends Thread implements kelondroBLOB {
static byte [ ] plainMagic = { ( byte ) 'p' , ( byte ) '|' } ; // magic for plain content (no encoding)
private kelondroBLOB backend ;
private boolean executing , shallRun ;
private Semaphore shutdownControl ; // steering of close method
private boolean compress ; // if true then files should be written compressed
private LinkedBlockingQueue < Map . Entry < byte [ ] , byte [ ] > > rawQueue ; // entries which are not compressed, format is RAW (without magic)
private LinkedBlockingQueue < Map . Entry < byte [ ] , byte [ ] > > compressedQueue ; // entries which are compressed, format is with leading magic
private kelondroBytesIntMap contentLength ;
private long queueLength ;
private long maxCacheSize ;
private int cdr ;
private class Entry implements Map . Entry < byte [ ] , byte [ ] > {
@ -81,41 +79,27 @@ public class kelondroBLOBBuffer extends Thread implements kelondroBLOB {
public kelondroBLOBBuffer ( kelondroBLOB backend , long cachesize , boolean compress ) {
this . backend = backend ;
this . maxCacheSize = cachesize ;
this . compress = compress ;
this . executing = false ;
this . shallRun = false ;
this . shutdownControl = null ;
assert ! this . executing | | this . compress ;
initQueues ( ) ;
cdr = 0 ;
initQueues ( compress ) ;
}
public synchronized void clear ( ) throws IOException {
shutdown ( ) ;
initQueues ( ) ;
initQueues ( this . compressedQueue ! = null ) ;
this . backend . clear ( ) ;
if ( this . executing ) this . start ( ) ;
}
private void initQueues ( ) {
/ *
* executing = false , compress = false : Queue = used , CompressedQueue = null
* files are written in the uncompressed - queue and are written from there into the database
*
* executing = false , compress = true : Queue = null , CompressedQueue = used
* files are compressed when they arrive and written to the compressed queue
*
* executing = true , compress = false : status is not allowed , this does not make sense because the additional thread is only for compression of files
*
* executing = true , compress = true : Queue = used , CompressedQueue = used
* files are written uncompressed to the uncompressed - queue and compressed with the concurrent thread
* /
}
private void initQueues ( boolean compress ) {
this . rawQueue = new LinkedBlockingQueue < Map . Entry < byte [ ] , byte [ ] > > ( ) ;
this . compressedQueue = ( compress ) ? new LinkedBlockingQueue < Map . Entry < byte [ ] , byte [ ] > > ( ) : null ;
this . contentLength = new kelondroBytesIntMap ( backend . keylength ( ) , backend . ordering ( ) , 500 ) ;
this . queueLength = 0 ;
}
public kelondroByteOrder ordering ( ) {
return this . backend . ordering ( ) ;
}
public synchronized void close ( ) {
shutdown ( ) ;
// no more thread is running, flush all queues
try {
flushAll ( ) ;
@ -125,18 +109,11 @@ public class kelondroBLOBBuffer extends Thread implements kelondroBLOB {
this . backend . close ( ) ;
}
private void shutdown ( ) {
this . shallRun = false ;
if ( this . executing & & this . shutdownControl ! = null ) {
// wait for semaphore
try { this . shutdownControl . acquire ( ) ; } catch ( InterruptedException e ) { }
}
}
private byte [ ] compress ( byte [ ] b ) {
// compressed a byte array and adds a leading magic for the compression
try {
System . out . print ( "/" ) ; // DEBUG
cdr + + ;
//System.out.print("/(" + cdr + ")"); // DEBUG
final ByteArrayOutputStream baos = new ByteArrayOutputStream ( b . length / 5 ) ;
baos . write ( gzipMagic ) ;
final OutputStream os = new GZIPOutputStream ( baos , 512 ) ;
@ -151,7 +128,7 @@ public class kelondroBLOBBuffer extends Thread implements kelondroBLOB {
}
private byte [ ] markWithPlainMagic ( byte [ ] b ) {
System . out . print ( "+" ) ; // DEBUG
//System.out.print("+"); // DEBUG
byte [ ] r = new byte [ b . length + 2 ] ;
r [ 0 ] = plainMagic [ 0 ] ;
r [ 1 ] = plainMagic [ 1 ] ;
@ -162,7 +139,8 @@ public class kelondroBLOBBuffer extends Thread implements kelondroBLOB {
private byte [ ] decompress ( byte [ ] b ) {
// use a magic in the head of the bytes to identify compression type
if ( kelondroByteArray . equals ( b , gzipMagic ) ) {
System . out . print ( "\\" ) ; // DEBUG
//System.out.print("\\"); // DEBUG
cdr - - ;
ByteArrayInputStream bais = new ByteArrayInputStream ( b ) ;
// eat up the magic
bais . read ( ) ;
@ -205,17 +183,31 @@ public class kelondroBLOBBuffer extends Thread implements kelondroBLOB {
return null ;
}
private byte [ ] getFromQueues ( byte [ ] key ) throws IOException {
byte [ ] b = ( rawQueue = = null ) ? null : getFromQueue ( key , rawQueue ) ;
public synchronized byte [ ] get ( byte [ ] key ) throws IOException {
// depending on the source of the result, we additionally do entry compression
// because if a document was read once, we think that it will not be retrieved another time again soon
byte [ ] b ;
if ( this . compressedQueue = = null ) {
b = getFromQueue ( key , rawQueue ) ;
if ( b ! = null ) return b ;
} else {
b = removeFromQueue ( key , rawQueue ) ;
if ( b ! = null ) {
// put the entry on the compressed queue
byte [ ] bb = compress ( b ) ;
this . compressedQueue . add ( new Entry ( key , bb ) ) ;
this . queueLength = this . queueLength - b . length + bb . length ;
return b ;
}
}
// no special handling for elements from the compressed queue
b = ( compressedQueue = = null ) ? null : getFromQueue ( key , compressedQueue ) ;
if ( b ! = null ) return decompress ( b ) ;
return null ;
if ( b ! = null ) {
//System.out.print("CASEA"); // DEBUG
return decompress ( b ) ;
}
public synchronized byte [ ] get ( byte [ ] key ) throws IOException {
byte [ ] b = getFromQueues ( key ) ;
if ( b ! = null ) return b ;
// finally return from the backend
b = this . backend . get ( key ) ;
if ( b = = null ) return null ;
return decompress ( b ) ;
@ -252,10 +244,18 @@ public class kelondroBLOBBuffer extends Thread implements kelondroBLOB {
}
public synchronized long length ( byte [ ] key ) throws IOException {
byte [ ] b = ( rawQueue = = null ) ? null : getFromQueue ( key , rawQueue ) ;
int i = this . contentLength . geti ( key ) ;
if ( i > = 0 ) {
//System.out.print("CASEC"); // DEBUG
return ( long ) i ;
}
byte [ ] b = getFromQueue ( key , rawQueue ) ;
if ( b ! = null ) return b . length ;
b = ( compressedQueue = = null ) ? null : getFromQueue ( key , compressedQueue ) ;
if ( b ! = null ) return decompress ( b ) . length ;
if ( b ! = null ) {
//System.out.print("CASEB"); // DEBUG
return decompress ( b ) . length ;
}
return this . backend . length ( key ) ;
}
@ -272,35 +272,48 @@ public class kelondroBLOBBuffer extends Thread implements kelondroBLOB {
return null ;
}
private boolean removeFromQueues ( byte [ ] key ) throws IOException {
byte [ ] b = ( rawQueue = = null ) ? null : removeFromQueue ( key , rawQueue ) ;
if ( b ! = null ) return true ;
private int removeFromQueues ( byte [ ] key ) throws IOException {
byte [ ] b = removeFromQueue ( key , rawQueue ) ;
if ( b ! = null ) return b . length ;
b = ( compressedQueue = = null ) ? null : removeFromQueue ( key , compressedQueue ) ;
if ( b ! = null ) return true ;
return false ;
if ( b ! = null ) return b . length ;
return 0 ;
}
public synchronized void put ( byte [ ] key , byte [ ] b ) throws IOException {
assert ! this . executing | | this . compress ;
// first ensure that the files do not exist anywhere
this . backend . remove ( key ) ;
boolean existedInQueue = removeFromQueues ( key ) ;
if ( existedInQueue) this . queueLength - = b . length ; // this is only an approximation
long rx = removeFromQueues ( key ) ;
if ( rx > 0 ) this . queueLength - = rx ;
// check if the buffer is full or could be full after this write
if ( this . queueLength + b . length * 2 > this . maxCacheSize ) {
// in case that we compress, just compress as much as is necessary to get enough room
if ( this . compressedQueue = = null ) {
flushAll ( ) ;
} else {
while ( this . queueLength + b . length * 2 > this . maxCacheSize & & this . rawQueue . size ( ) > 0 ) {
flushOneRaw ( ) ;
}
// in case that this was not enough, just flush all
if ( this . queueLength + b . length * 2 > this . maxCacheSize ) flushAll ( ) ;
}
}
// files are written uncompressed to the uncompressed-queue
// they are either written uncompressed to the database
// or compressed with the concurrent thread and written later
// or compressed later
this . rawQueue . add ( new Entry ( key , b ) ) ;
this . queueLength + = b . length ;
this . contentLength . puti ( key , b . length ) ;
if ( this . contentLength . size ( ) > 500 ) this . contentLength . clear ( ) ; // prevent the case that this object becomes a memory leak
}
public synchronized void remove ( byte [ ] key ) throws IOException {
this . backend . remove ( key ) ;
removeFromQueues ( key ) ;
long rx = removeFromQueues ( key ) ;
if ( rx > 0 ) this . queueLength - = rx ;
}
public int size ( ) {
@ -317,76 +330,50 @@ public class kelondroBLOBBuffer extends Thread implements kelondroBLOB {
return this . backend . keys ( up , firstKey ) ;
}
private boolean flushOne ( boolean block ) throws IOException {
if ( rawQueue . size ( ) > 0 ) {
// files are compressed when they arrive and written to the compressed queue
return flushOneRaw ( block ) ;
} else {
return flushOneCompressed ( block ) ;
}
}
private boolean flushOneRaw ( boolean block ) throws IOException {
if ( this . rawQueue ! = null & & ( block | | this . rawQueue . size ( ) > 0 ) ) {
// depening on process case, write it to the file or compress it to the other queue
private boolean flushOneRaw ( ) throws IOException {
if ( this . rawQueue . size ( ) = = 0 ) return false ;
// depending on process case, write it to the file or compress it to the other queue
try {
Map . Entry < byte [ ] , byte [ ] > entry = this . rawQueue . take ( ) ;
this . queueLength - = entry . getValue ( ) . length ;
if ( this . compress ) {
if ( this . compressedQueue ! = null ) {
entry . setValue ( compress ( entry . getValue ( ) ) ) ;
this . queueLength + = entry . getValue ( ) . length ;
this . compressedQueue . add ( entry ) ;
} else {
this . backend . put ( entry . getKey ( ) , markWithPlainMagic ( entry . getValue ( ) ) ) ;
assert this . queueLength = = 0 ;
if ( this . rawQueue . size ( ) = = 0 ) this . queueLength = 0 ;
}
return true ;
} catch ( InterruptedException e ) {
return false ;
}
}
return false ;
}
private boolean flushOneCompressed ( boolean block ) throws IOException {
if ( this . compressedQueue != null & & ( block | | this . compressedQueue . size ( ) > 0 ) ) {
private boolean flushOneCompressed ( ) throws IOException {
if ( this . compressedQueue == null | | this . compressedQueue . size ( ) == 0 ) return false ;
// write compressed entry to the file
try {
//System.out.print("#"); // DEBUG
Map . Entry < byte [ ] , byte [ ] > entry = this . compressedQueue . take ( ) ;
this . queueLength - = entry . getValue ( ) . length ;
this . backend . put ( entry . getKey ( ) , entry . getValue ( ) ) ;
if ( this . rawQueue . size ( ) = = 0 & & this . compressedQueue . size ( ) = = 0 ) this . queueLength = 0 ;
return true ;
} catch ( InterruptedException e ) {
return false ;
}
}
return false ;
}
private void flushAll ( ) throws IOException {
while ( this . rawQueue . size ( ) > 0 | |
( this . compressedQueue ! = null & & this . compressedQueue . size ( ) > 0 ) ) {
if ( ! flushOne ( false ) ) break ;
}
this . queueLength = 0 ;
}
public void run ( ) {
this . executing = true ;
this . shallRun = true ;
this . shutdownControl = new Semaphore ( 1 ) ;
assert ! this . executing | | this . compress ;
boolean doneOne ;
while ( shallRun ) {
try {
doneOne = flushOneRaw ( true ) ;
assert doneOne ;
} catch ( IOException e ) {
e . printStackTrace ( ) ;
break ;
while ( this . rawQueue . size ( ) > 0 ) {
if ( ! flushOneRaw ( ) ) break ;
}
while ( this . compressedQueue ! = null & & this . compressedQueue . size ( ) > 0 ) {
if ( ! flushOneCompressed ( ) ) break ;
}
this . executing = false ;
this . shutdownControl . release ( ) ;
assert this . queueLength = = 0 ;
}
}