@ -28,320 +28,117 @@ package de.anomic.index;
import java.io.BufferedInputStream ;
import java.io.BufferedOutputStream ;
import java.io.DataInputStream ;
import java.io.File ;
import java.io.FileInputStream ;
import java.io.FileOutputStream ;
import java.io.IOException ;
import java.io.InputStream ;
import java.io.OutputStream ;
import java.util.Collections ;
import java.util.Iterator ;
import java.util.Map ;
import java.util.Set ;
import java.util.SortedMap ;
import java.util.TreeMap ;
import de.anomic.kelondro.kelondroByteOrder ;
import de.anomic.kelondro.kelondro CloneableIterator ;
import de.anomic.kelondro.kelondro BytesIntMap ;
import de.anomic.kelondro.kelondroRow ;
import de.anomic.kelondro.kelondroRowSet ;
import de.anomic.server.serverMemory ;
import de.anomic.server.logging.serverLog ;
public final class indexContainerHeap {
// class variables
private final File databaseRoot ;
protected final SortedMap < String , indexContainer > cache ; // wordhash-container
private final serverLog log ;
private String indexArrayFileName ;
private kelondroRow payloadrow ;
public indexContainerHeap ( File databaseRoot , kelondroRow payloadrow , String dumpname , serverLog log ) {
// creates a new index cache
// the cache has a back-end where indexes that do not fit in the cache are flushed
this . databaseRoot = databaseRoot ;
this . cache = Collections . synchronizedSortedMap ( new TreeMap < String , indexContainer > ( new kelondroByteOrder . StringOrder ( payloadrow . getOrdering ( ) ) ) ) ;
this . log = log ;
this . indexArrayFileName = dumpname ;
this . payloadrow = payloadrow ;
// read in dump of last session
try {
restore ( ) ;
} catch ( IOException e ) {
log . logSevere ( "unable to restore cache dump: " + e . getMessage ( ) , e ) ;
}
}
private void dump ( ) throws IOException {
log . logConfig ( "creating dump for index cache '" + indexArrayFileName + "', " + cache . size ( ) + " words (and much more urls)" ) ;
File indexDumpFile = new File ( databaseRoot , indexArrayFileName ) ;
if ( index DumpFile. exists ( ) ) indexDum pFile. delete ( ) ;
OutputStream os = new BufferedOutputStream ( new FileOutputStream ( index DumpFile) ) ;
public static void dumpHeap ( File indexHeapFile , kelondroRow payloadrow , SortedMap < String , indexContainer > cache , serverLog log ) throws IOException {
if ( log ! = null ) log . logInfo ( "creating rwi heap dump '" + indexHeapFile . getName ( ) + "', " + cache . size ( ) + " rwi's" ) ;
if ( indexHeapFile . exists ( ) ) indexHeapFile . delete ( ) ;
OutputStream os = new BufferedOutputStream ( new FileOutputStream ( indexHeapFile ) , 64 * 1024 ) ;
long startTime = System . currentTimeMillis ( ) ;
long messageTime = System . currentTimeMillis ( ) + 5000 ;
long wordsPerSecond = 0 , wordcount = 0 , urlcount = 0 ;
Map . Entry < String , indexContainer > entry ;
long wordcount = 0 , urlcount = 0 ;
String wordHash ;
indexContainer container ;
// write wCache
synchronized ( cache ) {
Iterator < Map . Entry < String , indexContainer > > i = cache . entrySet ( ) . iterator ( ) ;
while ( i . hasNext ( ) ) {
for ( Map . Entry < String , indexContainer > entry : cache . entrySet ( ) ) {
// get entries
entry = i . next ( ) ;
wordHash = entry . getKey ( ) ;
container = entry . getValue ( ) ;
// put entries on stack
// put entries on heap
if ( container ! = null ) {
os . write ( wordHash . getBytes ( ) ) ;
if ( wordHash . length ( ) < payloadrow . primaryKeyLength ) {
for ( int i = 0 ; i < payloadrow . primaryKeyLength - wordHash . length ( ) ; i + + ) os . write ( 0 ) ;
}
os . write ( container . exportCollection ( ) ) ;
}
wordcount + + ;
i . remove ( ) ; // free some mem
// write a log
if ( System . currentTimeMillis ( ) > messageTime ) {
serverMemory . gc ( 1000 , "indexRAMRI, for better statistic-1" ) ; // for better statistic - thq
wordsPerSecond = wordcount * 1000
/ ( 1 + System . currentTimeMillis ( ) - startTime ) ;
log . logInfo ( "dump status: " + wordcount
+ " words done, "
+ ( cache . size ( ) / ( wordsPerSecond + 1 ) )
+ " seconds remaining, free mem = "
+ ( serverMemory . free ( ) / 1024 / 1024 )
+ "MB" ) ;
messageTime = System . currentTimeMillis ( ) + 5000 ;
}
urlcount + = container . size ( ) ;
}
}
os . flush ( ) ;
os . close ( ) ;
log . logInfo ( "finished dump of ram cache: " + urlcount + " word/URL relations in " + ( ( System . currentTimeMillis ( ) - startTime ) / 1000 ) + " seconds" ) ;
}
private void restore ( ) throws IOException {
File indexDumpFile = new File ( databaseRoot , indexArrayFileName ) ;
if ( ! ( indexDumpFile . exists ( ) ) ) return ;
InputStream is = new BufferedInputStream ( new FileInputStream ( indexDumpFile ) ) ;
long messageTime = System . currentTimeMillis ( ) + 5000 ;
long wordCount = 0 ;
synchronized ( cache ) {
String wordHash ;
byte [ ] word = new byte [ 12 ] ;
while ( is . available ( ) > 0 ) {
// read word
is . read ( word ) ;
wordHash = new String ( word ) ;
// read collection
indexContainer container = new indexContainer ( wordHash , kelondroRowSet . importRowSet ( is , payloadrow ) ) ;
cache . put ( wordHash , container ) ;
wordCount + + ;
// protect against memory shortage
//while (serverMemory.free() < 1000000) {flushFromMem(); java.lang.System.gc();}
// write a log
if ( System . currentTimeMillis ( ) > messageTime ) {
serverMemory . gc ( 1000 , "indexRAMRI, for better statistic-2" ) ; // for better statistic - thq
log . logInfo ( "restoring status: " + wordCount + " words done, free mem = " + ( serverMemory . free ( ) / 1024 / 1024 ) + "MB" ) ;
messageTime = System . currentTimeMillis ( ) + 5000 ;
}
}
}
is . close ( ) ;
if ( log ! = null ) log . logInfo ( "finished rwi heap dump: " + wordcount + " words, " + urlcount + " word/URL relations in " + ( ( System . currentTimeMillis ( ) - startTime ) / 1000 ) + " seconds" ) ;
}
public int size ( ) {
return cache . size ( ) ;
}
public synchronized int indexSize ( String wordHash ) {
indexContainer cacheIndex = ( indexContainer ) cache . get ( wordHash ) ;
if ( cacheIndex = = null ) return 0 ;
return cacheIndex . size ( ) ;
}
public synchronized kelondroCloneableIterator < indexContainer > wordContainers ( String startWordHash , boolean rot ) {
// we return an iterator object that creates top-level-clones of the indexContainers
// in the cache, so that manipulations of the iterated objects do not change
// objects in the cache.
return new wordContainerIterator ( startWordHash , rot ) ;
}
public class wordContainerIterator implements kelondroCloneableIterator < indexContainer > {
// this class exists, because the wCache cannot be iterated with rotation
// and because every indexContainer Object that is iterated must be returned as top-level-clone
// so this class simulates wCache.tailMap(startWordHash).values().iterator()
// plus the mentioned features
private boolean rot ;
private Iterator < indexContainer > iterator ;
public wordContainerIterator ( String startWordHash , boolean rot ) {
this . rot = rot ;
this . iterator = ( startWordHash = = null ) ? cache . values ( ) . iterator ( ) : cache . tailMap ( startWordHash ) . values ( ) . iterator ( ) ;
// The collection's iterator will return the values in the order that their corresponding keys appear in the tree.
}
public static SortedMap < String , indexContainer > restoreHeap ( File indexHeapFile , kelondroRow payloadrow , serverLog log ) throws IOException {
if ( ! ( indexHeapFile . exists ( ) ) ) throw new IOException ( "file " + indexHeapFile + " does not exist" ) ;
if ( log ! = null ) log . logInfo ( "restoring dump for rwi heap '" + indexHeapFile . getName ( ) + "'" ) ;
public wordContainerIterator clone ( Object secondWordHash ) {
return new wordContainerIterator ( ( String ) secondWordHash , rot ) ;
}
long start = System . currentTimeMillis ( ) ;
SortedMap < String , indexContainer > cache = Collections . synchronizedSortedMap ( new TreeMap < String , indexContainer > ( new kelondroByteOrder . StringOrder ( payloadrow . getOrdering ( ) ) ) ) ;
DataInputStream is = new DataInputStream ( new BufferedInputStream ( new FileInputStream ( indexHeapFile ) , 64 * 1024 ) ) ;
public boolean hasNext ( ) {
if ( rot ) return true ;
return iterator . hasNext ( ) ;
}
public indexContainer next ( ) {
if ( iterator . hasNext ( ) ) {
return ( ( indexContainer ) iterator . next ( ) ) . topLevelClone ( ) ;
} else {
// rotation iteration
if ( rot ) {
iterator = cache . values ( ) . iterator ( ) ;
return ( ( indexContainer ) iterator . next ( ) ) . topLevelClone ( ) ;
} else {
return null ;
}
}
}
public void remove ( ) {
iterator . remove ( ) ;
long urlCount = 0 ;
String wordHash ;
byte [ ] word = new byte [ payloadrow . primaryKeyLength ] ;
while ( is . available ( ) > 0 ) {
// read word
is . read ( word ) ;
wordHash = new String ( word ) ;
// read collection
indexContainer container = new indexContainer ( wordHash , kelondroRowSet . importRowSet ( is , payloadrow ) ) ;
cache . put ( wordHash , container ) ;
urlCount + = container . size ( ) ;
}
}
public boolean hasContainer ( String wordHash ) {
return cache . containsKey ( wordHash ) ;
is . close ( ) ;
if ( log ! = null ) log . logInfo ( "finished rwi heap restore: " + cache . size ( ) + " words, " + urlCount + " word/URL relations in " + ( ( System . currentTimeMillis ( ) - start ) / 1000 ) + " seconds" ) ;
return cache ;
}
public int sizeContainer ( String wordHash ) {
indexContainer c = ( indexContainer ) cache . get ( wordHash ) ;
return ( c = = null ) ? 0 : c . size ( ) ;
}
public synchronized indexContainer getContainer ( String wordHash , Set < String > urlselection ) {
if ( wordHash = = null ) return null ;
public static kelondroBytesIntMap indexHeap ( File indexHeapFile , kelondroRow payloadrow , serverLog log ) throws IOException {
if ( ! ( indexHeapFile . exists ( ) ) ) throw new IOException ( "file " + indexHeapFile + " does not exist" ) ;
if ( indexHeapFile . length ( ) > = ( long ) Integer . MAX_VALUE ) throw new IOException ( "file " + indexHeapFile + " too large, index can only be crated for files less than 2GB" ) ;
if ( log ! = null ) log . logInfo ( "creating index for rwi heap '" + indexHeapFile . getName ( ) + "'" ) ;
// retrieve container
indexContainer container = ( indexContainer ) cache . get ( wordHash ) ;
long start = System . currentTimeMillis ( ) ;
kelondroBytesIntMap index = new kelondroBytesIntMap ( payloadrow . primaryKeyLength , ( kelondroByteOrder ) payloadrow . getOrdering ( ) , 0 ) ;
DataInputStream is = new DataInputStream ( new BufferedInputStream ( new FileInputStream ( indexHeapFile ) , 64 * 1024 ) ) ;
// We must not use the container from cache to store everything we find,
// as that container remains linked to in the cache and might be changed later
// while the returned container is still in use.
// create a clone from the container
if ( container ! = null ) container = container . topLevelClone ( ) ;
// select the urlselection
if ( ( urlselection ! = null ) & & ( container ! = null ) ) container . select ( urlselection ) ;
return container ;
}
public synchronized indexContainer deleteContainer ( String wordHash ) {
// returns the index that had been deleted
indexContainer container = ( indexContainer ) cache . remove ( wordHash ) ;
return container ;
}
public synchronized boolean removeEntry ( String wordHash , String urlHash ) {
indexContainer c = ( indexContainer ) cache . get ( wordHash ) ;
if ( ( c ! = null ) & & ( c . remove ( urlHash ) ! = null ) ) {
// removal successful
if ( c . size ( ) = = 0 ) {
deleteContainer ( wordHash ) ;
} else {
cache . put ( wordHash , c ) ;
}
return true ;
}
return false ;
}
public synchronized int removeEntries ( String wordHash , Set < String > urlHashes ) {
if ( urlHashes . size ( ) = = 0 ) return 0 ;
indexContainer c = ( indexContainer ) cache . get ( wordHash ) ;
int count ;
if ( ( c ! = null ) & & ( ( count = c . removeEntries ( urlHashes ) ) > 0 ) ) {
// removal successful
if ( c . size ( ) = = 0 ) {
deleteContainer ( wordHash ) ;
} else {
cache . put ( wordHash , c ) ;
}
return count ;
}
return 0 ;
}
public synchronized int tryRemoveURLs ( String urlHash ) {
// this tries to delete an index from the cache that has this
// urlHash assigned. This can only work if the entry is really fresh
// Such entries must be searched in the latest entries
int delCount = 0 ;
Iterator < Map . Entry < String , indexContainer > > i = cache . entrySet ( ) . iterator ( ) ;
Map . Entry < String , indexContainer > entry ;
String wordhash ;
indexContainer c ;
while ( i . hasNext ( ) ) {
entry = i . next ( ) ;
wordhash = entry . getKey ( ) ;
long urlCount = 0 ;
String wordHash ;
byte [ ] word = new byte [ payloadrow . primaryKeyLength ] ;
int seek = 0 , seek0 ;
while ( is . available ( ) > 0 ) {
// remember seek position
seek0 = seek ;
// get container
c = entry . getValue ( ) ;
if ( c . remove ( urlHash ) ! = null ) {
if ( c . size ( ) = = 0 ) {
i . remove ( ) ;
} else {
cache . put ( wordhash , c ) ; // superfluous?
}
delCount + + ;
}
}
return delCount ;
}
public synchronized void addEntries ( indexContainer container ) {
// this puts the entries into the cache, not into the assortment directly
int added = 0 ;
if ( ( container = = null ) | | ( container . size ( ) = = 0 ) ) return ;
// put new words into cache
String wordHash = container . getWordHash ( ) ;
indexContainer entries = ( indexContainer ) cache . get ( wordHash ) ; // null pointer exception? wordhash != null! must be cache==null
if ( entries = = null ) {
entries = container . topLevelClone ( ) ;
added = entries . size ( ) ;
} else {
added = entries . putAllRecent ( container ) ;
}
if ( added > 0 ) {
cache . put ( wordHash , entries ) ;
}
entries = null ;
}
public synchronized void addEntry ( String wordHash , indexRWIRowEntry newEntry , long updateTime , boolean dhtCase ) {
indexContainer container = ( indexContainer ) cache . get ( wordHash ) ;
if ( container = = null ) container = new indexContainer ( wordHash , this . payloadrow , 1 ) ;
container . put ( newEntry ) ;
cache . put ( wordHash , container ) ;
}
public synchronized void close ( ) {
// dump cache
try {
dump ( ) ;
} catch ( IOException e ) {
log . logSevere ( "unable to dump cache: " + e . getMessage ( ) , e ) ;
// read word
is . read ( word ) ;
wordHash = new String ( word ) ;
seek + = wordHash . length ( ) ;
// read collection
seek + = kelondroRowSet . skipNextRowSet ( is , payloadrow ) ;
index . addi ( word , seek0 ) ;
}
is . close ( ) ;
if ( log ! = null ) log . logInfo ( "finished rwi heap indexing: " + urlCount + " word/URL relations in " + ( ( System . currentTimeMillis ( ) - start ) / 1000 ) + " seconds" ) ;
return index ;
}
}