@ -61,8 +61,6 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
// environment constants
private static final String indexArrayFileName = "indexDump1.array" ;
private static final String oldSingletonFileName = "indexSingletons0.db" ;
private static final String newSingletonFileName = "indexAssortment001.db" ;
private static final String indexAssortmentClusterPath = "ACLUSTER" ;
private static final int assortmentCount = 64 ;
private static final int ramCacheLimit = 200 ;
@ -88,19 +86,11 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
}
public plasmaWordIndexCache ( File databaseRoot , plasmaWordIndexInterface backend , int assortmentbufferkb , serverLog log ) {
// migrate#1
File oldSingletonFile = new File ( databaseRoot , oldSingletonFileName ) ;
File newSingletonFile = new File ( databaseRoot , newSingletonFileName ) ;
if ( ( oldSingletonFile . exists ( ) ) & & ( ! ( newSingletonFile . exists ( ) ) ) ) oldSingletonFile . renameTo ( newSingletonFile ) ;
// create new assortment cluster path
File assortmentClusterPath = new File ( databaseRoot , indexAssortmentClusterPath ) ;
if ( ! ( assortmentClusterPath . exists ( ) ) ) assortmentClusterPath . mkdirs ( ) ;
// migrate#2
File acSingletonFile = new File ( assortmentClusterPath , newSingletonFileName ) ;
if ( ( newSingletonFile . exists ( ) ) & & ( ! ( acSingletonFile . exists ( ) ) ) ) newSingletonFile . renameTo ( acSingletonFile ) ;
// create flushing thread
flushThread = new flush ( ) ;
@ -283,7 +273,7 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
}
private final class flush extends Thread {
boolean terminate , pause ;
boolean terminate ;
long intermission ;
public flush ( ) {
@ -305,27 +295,13 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
}
this . intermission = 0 ;
}
if ( pause ) {
try { sleep ( 300 ) ; } catch ( InterruptedException e ) { }
} else {
flushFromMem ( ) ;
try {
pausetime = 1 + java . lang . Math . min ( 1000 , 5 * maxWordsHigh / ( cache . size ( ) + 1 ) ) ;
if ( cache . size ( ) = = 0 ) pausetime = 2000 ;
sleep ( pausetime ) ;
} catch ( InterruptedException e ) { }
}
flushFromMem ( ) ;
pausetime = 1 + java . lang . Math . min ( 1000 , 5 * maxWordsHigh / ( cache . size ( ) + 1 ) ) ;
if ( cache . size ( ) = = 0 ) pausetime = 2000 ;
try { sleep ( pausetime ) ; } catch ( InterruptedException e ) { }
}
}
public void pause ( ) {
pause = true ;
}
public void proceed ( ) {
pause = false ;
}
public void terminate ( ) {
terminate = true ;
}
@ -337,28 +313,24 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
// - the oldest entry in the cache
// - the entry with maximum count
if ( cache . size ( ) = = 0 ) return ;
flushThread . pause ( ) ;
try {
String hash = ( String ) hashScore . getMaxObject ( ) ;
if ( hash = = null ) {
flushThread . proceed ( ) ;
return ;
}
int count = hashScore . getMaxScore ( ) ;
long time = longTime ( hashDate . getScore ( hash ) ) ;
if ( ( count > ramCacheLimit ) | |
( ( count > assortmentCount ) & & ( System . currentTimeMillis ( ) - time > 10000 ) ) ) {
// flush high-score entries
flushFromMem ( hash ) ;
} else {
// flush oldest entries
hash = ( String ) hashDate . getMinObject ( ) ;
flushFromMem ( hash ) ;
synchronized ( cache ) {
String hash = ( String ) hashScore . getMaxObject ( ) ;
if ( hash = = null ) return ;
int count = hashScore . getMaxScore ( ) ;
long time = longTime ( hashDate . getScore ( hash ) ) ;
if ( ( count > ramCacheLimit ) | | ( ( count > assortmentCount ) & & ( System . currentTimeMillis ( ) - time > 10000 ) ) ) {
// flush high-score entries
flushFromMem ( hash ) ;
} else {
// flush oldest entries
hash = ( String ) hashDate . getMinObject ( ) ;
flushFromMem ( hash ) ;
}
}
} catch ( Exception e ) {
log . logSevere ( "flushFromMem: " + e . getMessage ( ) , e ) ;
}
flushThread . proceed ( ) ;
}
private int flushFromMem ( String key ) {
@ -378,22 +350,13 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
}
// now decide where to flush that container
//if (container.size() <= assortmentCluster.clusterCapacity) {
// this fits into the assortments
plasmaWordIndexEntryContainer feedback = assortmentCluster . storeTry ( key , container ) ;
if ( feedback = = null ) {
return container . size ( ) ;
} else {
// *** should care about another option here ***
return backend . addEntries ( feedback , time , true ) ;
}
/ *
plasmaWordIndexEntryContainer feedback = assortmentCluster . storeTry ( key , container ) ;
if ( feedback = = null ) {
return container . size ( ) ;
} else {
// store to back-end; this should be a rare case
return backend . addEntries ( container , time , true ) ;
// *** should care about another option here ***
return backend . addEntries ( feedback , time , true ) ;
}
* * /
}
private int intTime ( long longTime ) {
@ -405,7 +368,7 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
}
private boolean flushFromAssortmentCluster ( String key , long maxTime ) {
// this should only be called if the assortment shall be deleted or returned in an index entity
// this should only be called if the assortment shall be deleted or returned in an index entity
maxTime = 8 * maxTime / 10 ; // reserve time for later adding to backend
plasmaWordIndexEntryContainer container = assortmentCluster . removeFromAll ( key , maxTime ) ;
if ( container = = null ) {
@ -418,18 +381,21 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
}
public plasmaWordIndexEntity getIndex ( String wordHash , boolean deleteIfEmpty , long maxTime ) {
flushThread . pause ( ) ;
// this possibly creates an index file in the back-end
// the index file is opened and returned as entity object
long start = System . currentTimeMillis ( ) ;
flushFromMem ( wordHash ) ;
if ( maxTime < 0 ) {
flushFromAssortmentCluster ( wordHash , - 1 ) ;
} else {
long remaining = maxTime - ( System . currentTimeMillis ( ) - start ) ;
if ( remaining > 0 ) flushFromAssortmentCluster ( wordHash , remaining ) ;
synchronized ( cache ) {
flushFromMem ( wordHash ) ;
if ( maxTime < 0 ) {
flushFromAssortmentCluster ( wordHash , - 1 ) ;
} else {
long remaining = maxTime - ( System . currentTimeMillis ( ) - start ) ;
if ( remaining > 0 )
flushFromAssortmentCluster ( wordHash , remaining ) ;
}
}
flushThread . proceed ( ) ;
long r = maxTime - ( System . currentTimeMillis ( ) - start ) ;
return backend . getIndex ( wordHash , deleteIfEmpty , ( r < 0 ) ? 0 : r ) ;
return backend . getIndex ( wordHash , deleteIfEmpty , ( r < 0 ) ? 0 : r ) ;
}
public long getUpdateTime ( String wordHash ) {
@ -444,7 +410,6 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
}
public void deleteIndex ( String wordHash ) {
flushThread . pause ( ) ;
synchronized ( cache ) {
cache . remove ( wordHash ) ;
hashScore . deleteScore ( wordHash ) ;
@ -452,15 +417,12 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
}
assortmentCluster . removeFromAll ( wordHash , - 1 ) ;
backend . deleteIndex ( wordHash ) ;
flushThread . proceed ( ) ;
}
public synchronized int removeEntries ( String wordHash , String [ ] urlHashes , boolean deleteComplete ) {
flushThread . pause ( ) ;
flushFromMem ( wordHash ) ;
flushFromAssortmentCluster ( wordHash , - 1 ) ;
int removed = backend . removeEntries ( wordHash , urlHashes , deleteComplete ) ;
flushThread . proceed ( ) ;
return removed ;
}
@ -469,6 +431,7 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
int added = 0 ;
// check cache space
/ *
if ( cache . size ( ) > 0 ) try {
// pause to get space in the cache (while it is flushed)
long pausetime ;
@ -483,16 +446,12 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
// slow down if we reach cache limit
Thread . sleep ( pausetime ) ;
} catch ( InterruptedException e ) { }
* /
//serverLog.logDebug("PLASMA INDEXING", "addEntryToIndexMem: cache.size=" + cache.size() + "; hashScore.size=" + hashScore.size());
// put new words into cache
String wordHash = container . wordHash ( ) ;
synchronized ( cache ) {
// stop flushing now for one moment
flushThread . pause ( ) ;
// put container into cache
plasmaWordIndexEntryContainer entries = ( plasmaWordIndexEntryContainer ) cache . get ( wordHash ) ; // null pointer exception? wordhash != null! must be cache==null
if ( entries = = null ) entries = new plasmaWordIndexEntryContainer ( wordHash ) ;
@ -503,28 +462,26 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
hashDate . setScore ( wordHash , intTime ( updateTime ) ) ;
}
entries = null ;
// resume flushing
flushThread . proceed ( ) ;
// force flush (sometimes)
if ( System . currentTimeMillis ( ) % 5 = = 0 ) flushFromMem ( ) ;
}
//System.out.println("DEBUG: cache = " + cache.toString());
return added ;
}
private void addEntry ( String wordHash , plasmaWordIndexEntry newEntry , long updateTime ) {
flushThread . pause ( ) ;
plasmaWordIndexEntryContainer container = ( plasmaWordIndexEntryContainer ) cache . get ( wordHash ) ;
if ( container = = null ) container = new plasmaWordIndexEntryContainer ( wordHash ) ;
plasmaWordIndexEntry [ ] entries = new plasmaWordIndexEntry [ ] { newEntry } ;
if ( container . add ( entries , updateTime ) > 0 ) {
cache . put ( wordHash , container ) ;
hashScore . incScore ( wordHash ) ;
hashDate . setScore ( wordHash , intTime ( updateTime ) ) ;
synchronized ( cache ) {
plasmaWordIndexEntryContainer container = ( plasmaWordIndexEntryContainer ) cache . get ( wordHash ) ;
if ( container = = null ) container = new plasmaWordIndexEntryContainer ( wordHash ) ;
plasmaWordIndexEntry [ ] entries = new plasmaWordIndexEntry [ ] { newEntry } ;
if ( container . add ( entries , updateTime ) > 0 ) {
cache . put ( wordHash , container ) ;
hashScore . incScore ( wordHash ) ;
hashDate . setScore ( wordHash , intTime ( updateTime ) ) ;
}
entries = null ;
container = null ;
}
entries = null ;
container = null ;
flushThread . proceed ( ) ;
}
public void close ( int waitingSeconds ) {