@ -61,14 +61,15 @@ import net.yacy.kelondro.util.MemoryControl;
public final class IndexCell < ReferenceType extends Reference > extends AbstractBufferedIndex < ReferenceType > implements BufferedIndex < ReferenceType > {
private static final long cleanupCycle = 60000 ;
private static final long cleanupCycle = 60000 ;
private static final long dumpCycle = 600000 ;
// class variables
private final ReferenceContainerArray < ReferenceType > array ;
private ReferenceContainerCache < ReferenceType > ram ;
private int maxRamEntries ;
private final IODispatcher merger ;
private long lastCleanup ;
private long lastCleanup , lastDump ;
private final long targetFileSize , maxFileSize ;
private final int writeBufferSize ;
private final ARC < ByteArray , Integer > countCache ;
@ -94,6 +95,7 @@ public final class IndexCell<ReferenceType extends Reference> extends AbstractBu
this . maxRamEntries = maxRamEntries ;
this . merger = merger ;
this . lastCleanup = System . currentTimeMillis ( ) ;
this . lastDump = System . currentTimeMillis ( ) ;
this . targetFileSize = targetFileSize ;
this . maxFileSize = maxFileSize ;
this . writeBufferSize = writeBufferSize ;
@ -114,7 +116,8 @@ public final class IndexCell<ReferenceType extends Reference> extends AbstractBu
public void add ( ReferenceContainer < ReferenceType > newEntries ) throws IOException , RowSpaceExceededException {
try {
this . ram . add ( newEntries ) ;
if ( this . ram . size ( ) % 1000 = = 0 | | this . lastCleanup + cleanupCycle < System . currentTimeMillis ( ) ) {
long t = System . currentTimeMillis ( ) ;
if ( this . ram . size ( ) % 1000 = = 0 | | this . lastCleanup + cleanupCycle < t | | this . lastDump + dumpCycle < t ) {
EventTracker . update ( "wordcache" , Long . valueOf ( this . ram . size ( ) ) , true , 30000 , ProfilingGraph . maxTime ) ;
cleanCache ( ) ;
}
@ -129,7 +132,8 @@ public final class IndexCell<ReferenceType extends Reference> extends AbstractBu
public void add ( byte [ ] termHash , ReferenceType entry ) throws IOException , RowSpaceExceededException {
try {
this . ram . add ( termHash , entry ) ;
if ( this . ram . size ( ) % 1000 = = 0 | | this . lastCleanup + cleanupCycle < System . currentTimeMillis ( ) ) {
long t = System . currentTimeMillis ( ) ;
if ( this . ram . size ( ) % 1000 = = 0 | | this . lastCleanup + cleanupCycle < t | | this . lastDump + dumpCycle < t ) {
EventTracker . update ( "wordcache" , Long . valueOf ( this . ram . size ( ) ) , true , 30000 , ProfilingGraph . maxTime ) ;
cleanCache ( ) ;
}
@ -384,21 +388,30 @@ public final class IndexCell<ReferenceType extends Reference> extends AbstractBu
this . countCache . clear ( ) ;
// dump the cache if necessary
long t = System . currentTimeMillis ( ) ;
if ( this . dumperSemaphore . availablePermits ( ) > 0 & &
( this . ram . size ( ) > = this . maxRamEntries | |
( this . ram . size ( ) > 3000 & & ! MemoryControl . request ( 80L * 1024L * 1024L , false ) ) ) ) {
( this . ram . size ( ) > 3000 & & ! MemoryControl . request ( 80L * 1024L * 1024L , false ) ) | |
( this . ram . size ( ) > 0 & & this . lastDump + dumpCycle < t ) ) ) {
try {
this . dumperSemaphore . acquire ( ) ; // only one may pass
if ( this . ram . size ( ) > = this . maxRamEntries | | ( this . ram . size ( ) > 3000 & & ! MemoryControl . request ( 80L * 1024L * 1024L , false ) ) ) try {
if ( this . ram . size ( ) > = this . maxRamEntries | |
( this . ram . size ( ) > 3000 & & ! MemoryControl . request ( 80L * 1024L * 1024L , false ) ) | |
( this . ram . size ( ) > 0 & & this . lastDump + dumpCycle < t ) ) try {
this . lastDump = System . currentTimeMillis ( ) ;
// dump the ram
File dumpFile = this . array . newContainerBLOBFile ( ) ;
// a critical point: when the ram is handed to the dump job,
// don't write into it any more. Use a fresh one instead
ReferenceContainerCache < ReferenceType > ramdump = this . ram ;
// get a fresh ram cache
this . ram = new ReferenceContainerCache < ReferenceType > ( factory , this . array . rowdef ( ) , this . array . ordering ( ) ) ;
ReferenceContainerCache < ReferenceType > ramdump ;
synchronized ( this ) {
ramdump = this . ram ;
// get a fresh ram cache
this . ram = new ReferenceContainerCache < ReferenceType > ( factory , this . array . rowdef ( ) , this . array . ordering ( ) ) ;
}
// dump the buffer
merger . dump ( ramdump , dumpFile , array ) ;
this . lastDump = System . currentTimeMillis ( ) ;
} catch ( Exception e ) {
// catch all exceptions to prevent that no semaphore is released
Log . logException ( e ) ;
@ -412,7 +425,7 @@ public final class IndexCell<ReferenceType extends Reference> extends AbstractBu
// clean-up the cache
if ( this . cleanerSemaphore . availablePermits ( ) > 0 & &
( this . array . entries ( ) > 50 | |
this . lastCleanup + cleanupCycle < Sys tem. currentTimeMillis ( ) ) ) {
this . lastCleanup + cleanupCycle < t) ) {
try {
this . cleanerSemaphore . acquire ( ) ;
if ( this . array . entries ( ) > 50 | | ( this . lastCleanup + cleanupCycle < System . currentTimeMillis ( ) ) ) try {