@ -28,6 +28,8 @@ package net.yacy.kelondro.rwi;
import java.io.File ;
import java.io.IOException ;
import java.util.Map ;
import java.util.TreeMap ;
import java.util.concurrent.Semaphore ;
import net.yacy.cora.storage.ComparableARC ;
@ -71,6 +73,8 @@ public final class IndexCell<ReferenceType extends Reference> extends AbstractBu
private final int writeBufferSize ;
private Semaphore dumperSemaphore = new Semaphore ( 1 ) ;
private Semaphore cleanerSemaphore = new Semaphore ( 1 ) ;
private final Map < byte [ ] , HandleSet > failedURLs ; // mapping from word hashes to a list of url hashes
public IndexCell (
final File cellPath ,
@ -96,7 +100,7 @@ public final class IndexCell<ReferenceType extends Reference> extends AbstractBu
this . targetFileSize = targetFileSize ;
this . maxFileSize = maxFileSize ;
this . writeBufferSize = writeBufferSize ;
//cleanCache();
this . failedURLs = new TreeMap < byte [ ] , HandleSet > ( URIMetadataRow . rowdef . objectOrder ) ;
}
@ -169,6 +173,13 @@ public final class IndexCell<ReferenceType extends Reference> extends AbstractBu
ReferenceContainer < ReferenceType > countRam = this . ram . get ( termHash , null ) ;
assert countRam = = null | | countRam . size ( ) > = 0 ;
int c = countRam = = null ? countFile : countFile + countRam . size ( ) ;
// exclude entries from delayed remove
synchronized ( this . failedURLs ) {
HandleSet s = this . failedURLs . get ( termHash ) ;
if ( s ! = null ) c - = s . size ( ) ;
if ( c < 0 ) c = 0 ;
}
// put count result into cache
this . countCache . put ( termHash , c ) ;
return c ;
}
@ -188,22 +199,31 @@ public final class IndexCell<ReferenceType extends Reference> extends AbstractBu
} catch ( RowSpaceExceededException e2 ) {
Log . logException ( e2 ) ;
}
if ( c1 = = null ) {
if ( c0 = = null ) return null ;
return c0 ;
}
if ( c0 = = null ) return c1 ;
try {
return c1 . merge ( c0 ) ;
} catch ( RowSpaceExceededException e ) {
// try to free some ram
ReferenceContainer < ReferenceType > result = null ;
if ( c0 ! = null & & c1 ! = null ) {
try {
return c1 . merge ( c0 ) ;
} catch ( RowSpaceExceededException e1 ) {
// go silently over the problem
return ( c1 . size ( ) > c0 . size ( ) ) ? c1 : c0 ;
result = c1 . merge ( c0 ) ;
} catch ( RowSpaceExceededException e ) {
// try to free some ram
try {
result = c1 . merge ( c0 ) ;
} catch ( RowSpaceExceededException e1 ) {
// go silently over the problem
result = ( c1 . size ( ) > c0 . size ( ) ) ? c1 : c0 ;
}
}
} else if ( c0 ! = null ) {
result = c0 ;
} else if ( c1 ! = null ) {
result = c1 ;
}
if ( result = = null ) return null ;
// remove the failed urls
synchronized ( this . failedURLs ) {
HandleSet s = this . failedURLs . get ( termHash ) ;
if ( s ! = null ) result . removeEntries ( s ) ;
}
return result ;
}
/ * *
@ -212,6 +232,7 @@ public final class IndexCell<ReferenceType extends Reference> extends AbstractBu
* @throws IOException
* /
public ReferenceContainer < ReferenceType > delete ( byte [ ] termHash ) throws IOException {
removeDelayed ( ) ;
ReferenceContainer < ReferenceType > c1 = null ;
try {
c1 = this . array . get ( termHash ) ;
@ -238,6 +259,60 @@ public final class IndexCell<ReferenceType extends Reference> extends AbstractBu
}
}
public void removeDelayed ( byte [ ] termHash , HandleSet urlHashes ) {
HandleSet r ;
synchronized ( failedURLs ) {
r = this . failedURLs . get ( termHash ) ;
}
if ( r = = null ) {
r = new HandleSet ( URIMetadataRow . rowdef . primaryKeyLength , URIMetadataRow . rowdef . objectOrder , 0 ) ;
}
try {
r . putAll ( urlHashes ) ;
} catch ( RowSpaceExceededException e ) {
try { remove ( termHash , urlHashes ) ; } catch ( IOException e1 ) { }
return ;
}
synchronized ( failedURLs ) {
this . failedURLs . put ( termHash , r ) ;
}
}
public void removeDelayed ( byte [ ] termHash , byte [ ] urlHashBytes ) {
HandleSet r ;
synchronized ( failedURLs ) {
r = this . failedURLs . get ( termHash ) ;
}
if ( r = = null ) {
r = new HandleSet ( URIMetadataRow . rowdef . primaryKeyLength , URIMetadataRow . rowdef . objectOrder , 0 ) ;
}
try {
r . put ( urlHashBytes ) ;
} catch ( RowSpaceExceededException e ) {
try { remove ( termHash , urlHashBytes ) ; } catch ( IOException e1 ) { }
return ;
}
synchronized ( failedURLs ) {
this . failedURLs . put ( termHash , r ) ;
}
}
public void removeDelayed ( ) throws IOException {
HandleSet words = new HandleSet ( URIMetadataRow . rowdef . primaryKeyLength , URIMetadataRow . rowdef . objectOrder , 0 ) ; // a set of url hashes where a worker thread tried to work on, but failed.
synchronized ( failedURLs ) {
for ( byte [ ] b : failedURLs . keySet ( ) ) try { words . put ( b ) ; } catch ( RowSpaceExceededException e ) { }
}
for ( byte [ ] b : words ) {
HandleSet urls ;
synchronized ( failedURLs ) {
urls = failedURLs . remove ( b ) ;
}
remove ( b , urls ) ;
}
this . countCache . clear ( ) ;
}
/ * *
* remove url references from a selected word hash . this deletes also in the BLOB
* files , which means that there exists new gap entries after the deletion
@ -246,6 +321,7 @@ public final class IndexCell<ReferenceType extends Reference> extends AbstractBu
* @throws IOException
* /
public int remove ( byte [ ] termHash , HandleSet urlHashes ) throws IOException {
this . countCache . remove ( termHash ) ;
int removed = this . ram . remove ( termHash , urlHashes ) ;
int reduced ;
//final long am = this.array.mem();
@ -260,6 +336,7 @@ public final class IndexCell<ReferenceType extends Reference> extends AbstractBu
}
public boolean remove ( byte [ ] termHash , byte [ ] urlHashBytes ) throws IOException {
this . countCache . remove ( termHash ) ;
boolean removed = this . ram . remove ( termHash , urlHashBytes ) ;
int reduced ;
//final long am = this.array.mem();
@ -333,6 +410,8 @@ public final class IndexCell<ReferenceType extends Reference> extends AbstractBu
* @throws IOException
* /
public synchronized void clear ( ) throws IOException {
this . countCache . clear ( ) ;
this . failedURLs . clear ( ) ;
this . ram . clear ( ) ;
this . array . clear ( ) ;
}
@ -343,6 +422,8 @@ public final class IndexCell<ReferenceType extends Reference> extends AbstractBu
* and is composed of the current date and the cell salt
* /
public synchronized void close ( ) {
this . countCache . clear ( ) ;
try { removeDelayed ( ) ; } catch ( IOException e ) { }
if ( ! this . ram . isEmpty ( ) ) this . ram . dump ( this . array . newContainerBLOBFile ( ) , ( int ) Math . min ( MemoryControl . available ( ) / 3 , writeBufferSize ) , true ) ;
// close all
this . ram . close ( ) ;
@ -395,6 +476,8 @@ public final class IndexCell<ReferenceType extends Reference> extends AbstractBu
( this . ram . size ( ) > 3000 & & ! MemoryControl . request ( 80L * 1024L * 1024L , false ) ) | |
( this . ram . size ( ) > 0 & & this . lastDump + dumpCycle < t ) ) try {
this . lastDump = System . currentTimeMillis ( ) ;
// removed delayed
try { removeDelayed ( ) ; } catch ( IOException e ) { }
// dump the ram
File dumpFile = this . array . newContainerBLOBFile ( ) ;
// a critical point: when the ram is handed to the dump job,