@ -78,6 +78,7 @@ public final class RankingProcess extends Thread {
private int remote_peerCount , remote_indexCount , remote_resourceSize , local_resourceSize ;
private final SortStack < WordReferenceVars > stack ;
private int feeders ;
private final HashMap < String , SortStack < WordReferenceVars > > doubleDomCache ; // key = domhash (6 bytes); value = like stack
private final HashSet < String > handover ; // key = urlhash; used for double-check of urls that had been handed over to search process
@ -85,6 +86,7 @@ public final class RankingProcess extends Thread {
private final ConcurrentHashMap < String , HostInfo > hostNavigator ;
private final ConcurrentHashMap < String , AuthorInfo > authorNavigator ;
public RankingProcess (
final Segment indexSegment ,
final QueryParams query ,
@ -114,6 +116,8 @@ public final class RankingProcess extends Thread {
this . ref = new ConcurrentHashMap < String , Integer > ( ) ;
this . domZones = new int [ 8 ] ;
for ( int i = 0 ; i < 8 ; i + + ) { this . domZones [ i ] = 0 ; }
this . feeders = concurrency ;
assert this . feeders > = 1 ;
}
public void run ( ) {
@ -140,6 +144,7 @@ public final class RankingProcess extends Thread {
} catch ( final Exception e ) {
e . printStackTrace ( ) ;
}
oneFeederTerminated ( ) ;
}
public long ranking ( final WordReferenceVars word ) {
@ -262,6 +267,22 @@ public final class RankingProcess extends Thread {
serverProfiling . update ( "SEARCH" , new ProfilingGraph . searchEvent ( query . id ( true ) , SearchEvent . PRESORT , index . size ( ) , System . currentTimeMillis ( ) - timer ) , false ) ;
}
/ * *
* method to signal the incoming stack that one feeder has terminated
* /
public void oneFeederTerminated ( ) {
this . feeders - - ;
assert this . feeders > = 0 : "feeders = " + this . feeders ;
}
public void moreFeeders ( final int countMoreFeeders ) {
this . feeders + = countMoreFeeders ;
}
public boolean feedingIsFinished ( ) {
return this . feeders = = 0 ;
}
private boolean testFlags ( final WordReference ientry ) {
if ( query . constraint = = null ) return true ;
// test if ientry matches with filter
@ -337,12 +358,16 @@ public final class RankingProcess extends Thread {
return bestEntry ;
}
public URLMetadataRow takeURL ( final boolean skipDoubleDom ) {
public URLMetadataRow takeURL ( final boolean skipDoubleDom , final long timeout ) {
// returns from the current RWI list the best URL entry and removes this entry from the list
while ( ( stack . size ( ) > 0 ) | | ( size ( ) > 0 ) ) {
if ( ( ( stack . size ( ) = = 0 ) & & ( size ( ) = = 0 ) ) ) break ;
long timeLimit = System . currentTimeMillis ( ) + timeout ;
while ( System . currentTimeMillis ( ) < timeLimit ) {
final SortStack < WordReferenceVars > . stackElement obrwi = takeRWI ( skipDoubleDom ) ;
if ( obrwi = = null ) continue ; // *** ? this happened and the thread was suspended silently. cause?
if ( obrwi = = null ) {
if ( this . feedingIsFinished ( ) ) return null ;
try { Thread . sleep ( 50 ) ; } catch ( final InterruptedException e1 ) { }
continue ;
}
final URLMetadataRow page = indexSegment . urlMetadata ( ) . load ( obrwi . element . metadataHash ( ) , obrwi . element , obrwi . weight . longValue ( ) ) ;
if ( page = = null ) {
misses . add ( obrwi . element . metadataHash ( ) ) ;
@ -421,18 +446,6 @@ public final class RankingProcess extends Thread {
return null ;
}
public URLMetadataRow takeURL ( final boolean skipDoubleDom , long timeout ) {
timeout + = System . currentTimeMillis ( ) ;
long wait = 10 ;
while ( System . currentTimeMillis ( ) < timeout ) {
URLMetadataRow row = takeURL ( skipDoubleDom ) ;
if ( row ! = null ) return row ;
try { Thread . sleep ( wait ) ; } catch ( final InterruptedException e1 ) { }
wait = wait * 2 ;
}
return null ;
}
public int size ( ) {
//assert sortedRWIEntries.size() == urlhashes.size() : "sortedRWIEntries.size() = " + sortedRWIEntries.size() + ", urlhashes.size() = " + urlhashes.size();
int c = stack . size ( ) ;