@ -51,15 +51,15 @@ import de.anomic.yacy.graphics.ProfilingGraph;
public class ResultFetcher {
// input values
final RankingProcess rank edCache ; // ordered search results, grows dynamically as all the query threads enrich this container
final RankingProcess rank ingProcess ; // ordered search results, grows dynamically as all the query threads enrich this container
QueryParams query ;
private final yacySeedDB peers ;
// result values
protected final LoaderDispatcher loader ;
protected Worker [ ] workerThreads ;
protected final WeakPriorityBlockingQueue < Re verseElement< Re sultEntry> > result ;
protected final WeakPriorityBlockingQueue < ReverseElement< MediaSnippet> > images ; // container to sort images by size
protected final WeakPriorityBlockingQueue < Re sultEntry> result ;
protected final WeakPriorityBlockingQueue < MediaSnippet> images ; // container to sort images by size
protected final HandleSet failedURLs ; // a set of urlhashes that could not been verified during search
protected final HandleSet snippetFetchWordHashes ; // a set of word hashes that are used to match with the snippets
long urlRetrievalAllTime ;
@ -74,15 +74,15 @@ public class ResultFetcher {
final int taketimeout ) {
this . loader = loader ;
this . rank edCache = rankedCache ;
this . rank ingProcess = rankedCache ;
this . query = query ;
this . peers = peers ;
this . taketimeout = taketimeout ;
this . urlRetrievalAllTime = 0 ;
this . snippetComputationAllTime = 0 ;
this . result = new WeakPriorityBlockingQueue < Re verseElement< Re sultEntry> > ( - 1 ) ; // this is the result, enriched with snippets, ranked and ordered by ranking
this . images = new WeakPriorityBlockingQueue < ReverseElement< MediaSnippet> > ( - 1 ) ;
this . result = new WeakPriorityBlockingQueue < Re sultEntry> ( - 1 ) ; // this is the result, enriched with snippets, ranked and ordered by ranking
this . images = new WeakPriorityBlockingQueue < MediaSnippet> ( - 1 ) ;
this . failedURLs = new HandleSet ( URIMetadataRow . rowdef . primaryKeyLength , URIMetadataRow . rowdef . objectOrder , 0 ) ; // a set of url hashes where a worker thread tried to work on, but failed.
// snippets do not need to match with the complete query hashes,
@ -107,19 +107,25 @@ public class ResultFetcher {
public void deployWorker ( int deployCount , int neededResults ) {
if ( anyWorkerAlive ( ) ) return ;
if ( rankingProcess . feedingIsFinished ( ) & & rankingProcess . sizeQueue ( ) = = 0 ) return ;
this . workerThreads = new Worker [ /*(query.snippetCacheStrategy.mustBeOffline()) ? 1 : */ deployCount ] ;
for ( int i = 0 ; i < workerThreads . length ; i + + ) {
this . workerThreads [ i ] = new Worker ( i , 10000 , query . snippetCacheStrategy , neededResults ) ;
this . workerThreads [ i ] . start ( ) ;
}
synchronized ( this . workerThreads ) {
for ( int i = 0 ; i < workerThreads . length ; i + + ) {
Worker worker = new Worker ( i , 1000 , query . snippetCacheStrategy , neededResults ) ;
worker . start ( ) ;
this . workerThreads [ i ] = worker ;
}
}
}
boolean anyWorkerAlive ( ) {
if ( this . workerThreads = = null ) return false ;
for ( int i = 0 ; i < this . workerThreads . length ; i + + ) {
if ( ( this . workerThreads [ i ] ! = null ) & &
( this . workerThreads [ i ] . isAlive ( ) ) & &
( this . workerThreads [ i ] . busytime ( ) < 3000 ) ) return true ;
synchronized ( this . workerThreads ) {
for ( int i = 0 ; i < this . workerThreads . length ; i + + ) {
if ( ( this . workerThreads [ i ] ! = null ) & &
( this . workerThreads [ i ] . isAlive ( ) ) & &
( this . workerThreads [ i ] . busytime ( ) < 1000 ) ) return true ;
}
}
return false ;
}
@ -155,20 +161,32 @@ public class ResultFetcher {
//final int fetchAhead = snippetMode == 0 ? 0 : 10;
boolean nav_topics = query . navigators . equals ( "all" ) | | query . navigators . indexOf ( "topics" ) > = 0 ;
try {
//System.out.println("DEPLOYED WORKER " + id + " FOR " + this.neededResults + " RESULTS, timeoutd = " + (this.timeout - System.currentTimeMillis()));
int loops = 0 ;
while ( System . currentTimeMillis ( ) < this . timeout ) {
if ( result . sizeAvailable ( ) > neededResults ) break ;
this . lastLifeSign = System . currentTimeMillis ( ) ;
this . lastLifeSign = System . currentTimeMillis ( ) ;
// check if we have enough
if ( ( query . contentdom = = ContentDomain . IMAGE ) & & ( images . sizeAvailable ( ) > = query . neededResults ( ) + 50 ) ) break ;
if ( ( query . contentdom ! = ContentDomain . IMAGE ) & & ( result . sizeAvailable ( ) > = query . neededResults ( ) + 10 ) ) break ;
if ( result . sizeAvailable ( ) > = this . neededResults ) {
//System.out.println("result.sizeAvailable() >= this.neededResults");
break ;
}
// check if we can succeed if we try to take another url
if ( rankingProcess . feedingIsFinished ( ) & & rankingProcess . sizeQueue ( ) = = 0 ) {
break ;
}
// get next entry
page = rankedCache . takeURL ( true , this . timeout - System . currentTimeMillis ( ) ) ;
//if (page == null) page = rankedCache.takeURL(false, taketimeout);
if ( page = = null ) break ;
page = rankingProcess . takeURL ( true , this . timeout - System . currentTimeMillis ( ) ) ;
//if (page == null) page = rankedCache.takeURL(false, this.timeout - System.currentTimeMillis());
if ( page = = null ) {
//System.out.println("page == null");
break ; // no more available
}
if ( failedURLs . has ( page . hash ( ) ) ) continue ;
loops + + ;
final ResultEntry resultEntry = fetchSnippet ( page , query . sitehash = = null ? cacheStrategy : CacheStrategy . CACHEONLY ) ; // does not fetch snippets if snippetMode == 0
if ( resultEntry = = null ) continue ; // the entry had some problems, cannot be used
@ -176,23 +194,25 @@ public class ResultFetcher {
urlRetrievalAllTime + = resultEntry . dbRetrievalTime ;
snippetComputationAllTime + = resultEntry . snippetComputationTime ;
//System.out.println("+++DEBUG-resultWorker+++ fetched " + resultEntry.urlstring());
// place the result to the result vector
// apply post-ranking
long ranking = Long . valueOf ( rankedCache . getOrder ( ) . cardinal ( resultEntry . word ( ) ) ) ;
ranking + = postRanking ( resultEntry , rankedCache . getTopics ( ) ) ;
//System.out.println("*** resultEntry.hash = " + resultEntry.hash());
long ranking = Long . valueOf ( rankingProcess . getOrder ( ) . cardinal ( resultEntry . word ( ) ) ) ;
ranking + = postRanking ( resultEntry , rankingProcess . getTopics ( ) ) ;
result . put ( new ReverseElement < ResultEntry > ( resultEntry , ranking ) ) ; // remove smallest in case of overflow
if ( nav_topics ) rankedCache . addTopics ( resultEntry ) ;
//System.out.println("DEBUG SNIPPET_LOADING: thread " + id + " got " + resultEntry.url());
if ( nav_topics ) rankingProcess . addTopics ( resultEntry ) ;
}
//System.out.println("FINISHED WORKER " + id + " FOR " + this.neededResults + " RESULTS, loops = " + loops);
} catch ( final Exception e ) {
Log . logException ( e ) ;
}
Log . logInfo ( "SEARCH" , "resultWorker thread " + id + " terminated" ) ;
}
/ * *
* calculate the time since the worker has had the latest activity
* @return time in milliseconds lasted since latest activity
* /
public long busytime ( ) {
return System . currentTimeMillis ( ) - this . lastLifeSign ;
}
@ -274,9 +294,10 @@ public class ResultFetcher {
Log . logInfo ( "SEARCH" , "sorted out urlhash " + new String ( urlhash ) + " during search: " + reason ) ;
}
public ResultEntry oneResult ( final int item ) {
public ResultEntry oneResult ( final int item , long timeout ) {
// check if we already retrieved this item
// (happens if a search pages is accessed a second time)
long finishTime = System . currentTimeMillis ( ) + timeout ;
EventTracker . update ( EventTracker . EClass . SEARCH , new ProfilingGraph . searchEvent ( query . id ( true ) , SearchEvent . Type . ONERESULT , "started, item = " + item + ", available = " + this . result . sizeAvailable ( ) , 0 , 0 ) , false ) ;
if ( this . result . sizeAvailable ( ) > item ) {
// we have the wanted result already in the result array .. return that
@ -289,35 +310,40 @@ public class ResultFetcher {
System . out . println ( "result.size() = " + this . result . size ( ) ) ;
System . out . println ( "query.neededResults() = " + query . neededResults ( ) ) ;
* /
if ( ( ! anyWorkerAlive ( ) ) & &
( ( ( query . contentdom = = ContentDomain . IMAGE ) & & ( images . sizeAvailable ( ) + 30 < query . neededResults ( ) ) ) | |
( this . result . sizeAvailable ( ) < query . neededResults ( ) ) ) & &
//(event.query.onlineSnippetFetch) &&
( this . rankedCache . size ( ) > this . result . sizeAvailable ( ) )
) {
if ( this . result . sizeAvailable ( ) < = item ) {
// start worker threads to fetch urls and snippets
deployWorker ( Math . min ( 10 , query . itemsPerPage ) , query . neededResults ( ) ) ;
//System.out.println("item = " + item);
//System.out.println("anyWorkerAlive() = " + anyWorkerAlive());
//System.out.println("rankingProcess.feedingIsFinished() = " + rankingProcess.feedingIsFinished());
//System.out.println("this.rankingProcess.sizeQueue() = " + this.rankingProcess.sizeQueue());
//System.out.println("this.result.sizeAvailable() = " + this.result.sizeAvailable());
//System.out.println("this.result.sizeAvailable() + this.rankingProcess.sizeQueue() = " + (this.result.sizeAvailable() + this.rankingProcess.sizeQueue()));
deployWorker ( Math . min ( 20 , query . itemsPerPage ) , ( ( item + query . itemsPerPage ) / query . itemsPerPage ) * query . itemsPerPage ) ;
}
// finally wait until enough results are there produced from the
// snippet fetch process
while ( ( anyWorkerAlive ( ) ) & & ( result . sizeAvailable ( ) < = item ) ) {
try { Thread . sleep ( ( item % query . itemsPerPage ) * 10L ) ; } catch ( final InterruptedException e ) { }
WeakPriorityBlockingQueue . Element < ResultEntry > entry = null ;
while ( System . currentTimeMillis ( ) < finishTime ) {
if ( this . result . sizeAvailable ( ) + this . rankingProcess . sizeQueue ( ) < = item & & ! anyWorkerAlive ( ) & & this . rankingProcess . feedingIsFinished ( ) ) break ;
try { entry = this . result . element ( item , 50 ) ; } catch ( InterruptedException e ) { Log . logException ( e ) ; }
if ( entry ! = null ) break ;
if ( ! anyWorkerAlive ( ) & & this . rankingProcess . sizeQueue ( ) = = 0 & & this . rankingProcess . feedingIsFinished ( ) ) break ; //
}
// finally, if there is something, return the result
if ( this . result . sizeAvailable ( ) < = item ) {
if ( entry = = null ) {
EventTracker . update ( EventTracker . EClass . SEARCH , new ProfilingGraph . searchEvent ( query . id ( true ) , SearchEvent . Type . ONERESULT , "not found, item = " + item + ", available = " + this . result . sizeAvailable ( ) , 0 , 0 ) , false ) ;
return null ;
}
ResultEntry re = this . result . element ( item ) . getElement ( ) ;
ResultEntry re = entry . getElement ( ) ;
EventTracker . update ( EventTracker . EClass . SEARCH , new ProfilingGraph . searchEvent ( query . id ( true ) , SearchEvent . Type . ONERESULT , "retrieved, item = " + item + ", available = " + this . result . sizeAvailable ( ) + ": " + re . urlstring ( ) , 0 , 0 ) , false ) ;
return re ;
}
private int resultCounter = 0 ;
public ResultEntry nextResult ( ) {
final ResultEntry re = oneResult ( resultCounter );
final ResultEntry re = oneResult ( resultCounter , 1000 );
resultCounter + + ;
return re ;
}
@ -355,7 +381,7 @@ public class ResultFetcher {
return c ;
}
public ArrayList < Reverse Element< ResultEntry > > completeResults ( final long waitingtime ) {
public ArrayList < WeakPriorityBlockingQueue. Element< ResultEntry > > completeResults ( final long waitingtime ) {
final long timeout = System . currentTimeMillis ( ) + waitingtime ;
while ( ( result . sizeAvailable ( ) < query . neededResults ( ) ) & & ( anyWorkerAlive ( ) ) & & ( System . currentTimeMillis ( ) < timeout ) ) {
try { Thread . sleep ( 20 ) ; } catch ( final InterruptedException e ) { }