@ -67,11 +67,13 @@ public final class IndexCell<ReferenceType extends Reference> extends AbstractBu
private final ComparableARC < byte [ ] , Integer > countCache ;
private int maxRamEntries ;
private final IODispatcher merger ;
private long lastCleanup , lastDump ;
private long lastCleanup ;
private long lastDump ;
private final long targetFileSize , maxFileSize ;
private final int writeBufferSize ;
private final Map < byte [ ] , HandleSet > removeDelayedURLs ; // mapping from word hashes to a list of url hashes
private boolean cleanupShallRun ;
private final Thread cleanupThread ;
public IndexCell (
final File cellPath ,
@ -98,8 +100,73 @@ public final class IndexCell<ReferenceType extends Reference> extends AbstractBu
this . maxFileSize = maxFileSize ;
this . writeBufferSize = writeBufferSize ;
this . removeDelayedURLs = new TreeMap < byte [ ] , HandleSet > ( URIMetadataRow . rowdef . objectOrder ) ;
this . cleanupShallRun = true ;
this . cleanupThread = new CleanupThread ( ) ;
this . cleanupThread . start ( ) ;
}
private class CleanupThread extends Thread {
public void run ( ) {
while ( IndexCell . this . cleanupShallRun ) {
cleanCache ( ) ;
try { Thread . sleep ( 3000 ) ; } catch ( final InterruptedException e ) { }
}
}
private void cleanCache ( ) {
// dump the cache if necessary
final long t = System . currentTimeMillis ( ) ;
if ( ( IndexCell . this . ram . size ( ) > = IndexCell . this . maxRamEntries | |
( IndexCell . this . ram . size ( ) > 3000 & & ! MemoryControl . request ( 80L * 1024L * 1024L , false ) ) | |
( IndexCell . this . ram . size ( ) > 0 & & IndexCell . this . lastDump + dumpCycle < t ) ) ) {
synchronized ( IndexCell . this . merger ) {
if ( IndexCell . this . ram . size ( ) > = IndexCell . this . maxRamEntries | |
( IndexCell . this . ram . size ( ) > 3000 & & ! MemoryControl . request ( 80L * 1024L * 1024L , false ) ) | |
( IndexCell . this . ram . size ( ) > 0 & & IndexCell . this . lastDump + dumpCycle < t ) ) try {
IndexCell . this . lastDump = System . currentTimeMillis ( ) ;
// removed delayed
try { removeDelayed ( ) ; } catch ( final IOException e ) { }
// dump the ram
final File dumpFile = IndexCell . this . array . newContainerBLOBFile ( ) ;
// a critical point: when the ram is handed to the dump job,
// don't write into it any more. Use a fresh one instead
ReferenceContainerCache < ReferenceType > ramdump ;
final ByteOrder termOrder = IndexCell . this . ram . termKeyOrdering ( ) ;
final int termSize = IndexCell . this . ram . termKeyLength ( ) ;
synchronized ( this ) {
ramdump = IndexCell . this . ram ;
// get a fresh ram cache
IndexCell . this . ram = new ReferenceContainerCache < ReferenceType > ( IndexCell . this . factory , termOrder , termSize ) ;
}
// dump the buffer
IndexCell . this . merger . dump ( ramdump , dumpFile , IndexCell . this . array ) ;
IndexCell . this . lastDump = System . currentTimeMillis ( ) ;
} catch ( final Exception e ) {
// catch all exceptions
Log . logException ( e ) ;
}
}
}
// clean-up the cache
if ( ( IndexCell . this . array . entries ( ) > 50 | |
IndexCell . this . lastCleanup + cleanupCycle < t ) ) {
synchronized ( IndexCell . this . array ) {
if ( IndexCell . this . array . entries ( ) > 50 | | ( IndexCell . this . lastCleanup + cleanupCycle < System . currentTimeMillis ( ) ) ) try {
IndexCell . this . lastCleanup = System . currentTimeMillis ( ) ; // set time to prevent that this is called to soon again
IndexCell . this . array . shrink ( IndexCell . this . targetFileSize , IndexCell . this . maxFileSize ) ;
IndexCell . this . lastCleanup = System . currentTimeMillis ( ) ; // set again to mark end of procedure
} catch ( final Exception e ) {
// catch all exceptions
Log . logException ( e ) ;
}
}
}
}
}
/ *
* methods to implement Index
@ -124,11 +191,9 @@ public final class IndexCell<ReferenceType extends Reference> extends AbstractBu
final long t = System . currentTimeMillis ( ) ;
if ( this . ram . size ( ) % 1000 = = 0 | | this . lastCleanup + cleanupCycle < t | | this . lastDump + dumpCycle < t ) {
EventTracker . update ( EventTracker . EClass . WORDCACHE , Long . valueOf ( this . ram . size ( ) ) , true ) ;
cleanCache ( ) ;
}
} catch ( final RowSpaceExceededException e ) {
EventTracker . update ( EventTracker . EClass . WORDCACHE , Long . valueOf ( this . ram . size ( ) ) , true ) ;
cleanCache ( ) ;
this . ram . add ( newEntries ) ;
}
@ -140,11 +205,9 @@ public final class IndexCell<ReferenceType extends Reference> extends AbstractBu
final long t = System . currentTimeMillis ( ) ;
if ( this . ram . size ( ) % 1000 = = 0 | | this . lastCleanup + cleanupCycle < t | | this . lastDump + dumpCycle < t ) {
EventTracker . update ( EventTracker . EClass . WORDCACHE , Long . valueOf ( this . ram . size ( ) ) , true ) ;
cleanCache ( ) ;
}
} catch ( final RowSpaceExceededException e ) {
EventTracker . update ( EventTracker . EClass . WORDCACHE , Long . valueOf ( this . ram . size ( ) ) , true ) ;
cleanCache ( ) ;
this . ram . add ( termHash , entry ) ;
}
}
@ -249,7 +312,6 @@ public final class IndexCell<ReferenceType extends Reference> extends AbstractBu
this . array . delete ( termHash ) ;
}
final ReferenceContainer < ReferenceType > c0 = this . ram . delete ( termHash ) ;
cleanCache ( ) ;
if ( c1 = = null ) return c0 ;
if ( c0 = = null ) return c1 ;
try {
@ -435,6 +497,8 @@ public final class IndexCell<ReferenceType extends Reference> extends AbstractBu
try { removeDelayed ( ) ; } catch ( final IOException e ) { }
if ( ! this . ram . isEmpty ( ) ) this . ram . dump ( this . array . newContainerBLOBFile ( ) , ( int ) Math . min ( MemoryControl . available ( ) / 3 , this . writeBufferSize ) , true ) ;
// close all
this . cleanupShallRun = false ;
if ( this . cleanupThread ! = null ) try { this . cleanupThread . join ( ) ; } catch ( final InterruptedException e ) { }
this . ram . close ( ) ;
this . array . close ( ) ;
}
@ -467,64 +531,6 @@ public final class IndexCell<ReferenceType extends Reference> extends AbstractBu
return this . array . ordering ( ) ;
}
/ *
* cache control methods
* /
private void cleanCache ( ) {
// dump the cache if necessary
final long t = System . currentTimeMillis ( ) ;
if ( ( this . ram . size ( ) > = this . maxRamEntries | |
( this . ram . size ( ) > 3000 & & ! MemoryControl . request ( 80L * 1024L * 1024L , false ) ) | |
( this . ram . size ( ) > 0 & & this . lastDump + dumpCycle < t ) ) ) {
synchronized ( this . merger ) {
if ( this . ram . size ( ) > = this . maxRamEntries | |
( this . ram . size ( ) > 3000 & & ! MemoryControl . request ( 80L * 1024L * 1024L , false ) ) | |
( this . ram . size ( ) > 0 & & this . lastDump + dumpCycle < t ) ) try {
this . lastDump = System . currentTimeMillis ( ) ;
// removed delayed
try { removeDelayed ( ) ; } catch ( final IOException e ) { }
// dump the ram
final File dumpFile = this . array . newContainerBLOBFile ( ) ;
// a critical point: when the ram is handed to the dump job,
// don't write into it any more. Use a fresh one instead
ReferenceContainerCache < ReferenceType > ramdump ;
final ByteOrder termOrder = this . ram . termKeyOrdering ( ) ;
final int termSize = this . ram . termKeyLength ( ) ;
synchronized ( this ) {
ramdump = this . ram ;
// get a fresh ram cache
this . ram = new ReferenceContainerCache < ReferenceType > ( this . factory , termOrder , termSize ) ;
}
// dump the buffer
this . merger . dump ( ramdump , dumpFile , this . array ) ;
this . lastDump = System . currentTimeMillis ( ) ;
} catch ( final Exception e ) {
// catch all exceptions
Log . logException ( e ) ;
}
}
}
// clean-up the cache
if ( ( this . array . entries ( ) > 50 | |
this . lastCleanup + cleanupCycle < t ) ) {
synchronized ( this . array ) {
if ( this . array . entries ( ) > 50 | | ( this . lastCleanup + cleanupCycle < System . currentTimeMillis ( ) ) ) try {
this . lastCleanup = System . currentTimeMillis ( ) ; // set time to prevent that this is called to soon again
this . array . shrink ( this . targetFileSize , this . maxFileSize ) ;
this . lastCleanup = System . currentTimeMillis ( ) ; // set again to mark end of procedure
} catch ( final Exception e ) {
// catch all exceptions
Log . logException ( e ) ;
}
}
}
}
public File newContainerBLOBFile ( ) {
// for migration of cache files
return this . array . newContainerBLOBFile ( ) ;
@ -557,7 +563,6 @@ public final class IndexCell<ReferenceType extends Reference> extends AbstractBu
public void setBufferMaxWordCount ( final int maxWords ) {
this . maxRamEntries = maxWords ;
this . cleanCache ( ) ;
}
}