@ -105,10 +105,148 @@ public class ResultFetcher {
EventTracker . update ( EventTracker . EClass . SEARCH , new ProfilingGraph . searchEvent ( query . id ( true ) , SearchEvent . Type . SNIPPETFETCH_START , ( ( this . workerThreads = = null ) ? "no" : this . workerThreads . length ) + " online snippet fetch threads started" , 0 , 0 ) , false ) ;
}
public void deployWorker ( int deployCount , int neededResults ) {
if ( anyWorkerAlive ( ) ) return ;
public long getURLRetrievalTime ( ) {
return this . urlRetrievalAllTime ;
}
public long getSnippetComputationTime ( ) {
return this . snippetComputationAllTime ;
}
public ResultEntry oneResult ( final int item , long timeout ) {
// check if we already retrieved this item
// (happens if a search pages is accessed a second time)
long finishTime = System . currentTimeMillis ( ) + timeout ;
EventTracker . update ( EventTracker . EClass . SEARCH , new ProfilingGraph . searchEvent ( query . id ( true ) , SearchEvent . Type . ONERESULT , "started, item = " + item + ", available = " + this . result . sizeAvailable ( ) , 0 , 0 ) , false ) ;
if ( this . result . sizeAvailable ( ) > item ) {
// we have the wanted result already in the result array .. return that
ResultEntry re = this . result . element ( item ) . getElement ( ) ;
EventTracker . update ( EventTracker . EClass . SEARCH , new ProfilingGraph . searchEvent ( query . id ( true ) , SearchEvent . Type . ONERESULT , "prefetched, item = " + item + ", available = " + this . result . sizeAvailable ( ) + ": " + re . urlstring ( ) , 0 , 0 ) , false ) ;
return re ;
}
// deploy worker to get more results
deployWorker ( Math . min ( 20 , query . itemsPerPage ) , item + query . itemsPerPage ) ;
// finally wait until enough results are there produced from the snippet fetch process
WeakPriorityBlockingQueue . Element < ResultEntry > entry = null ;
while ( System . currentTimeMillis ( ) < finishTime ) {
if ( this . result . sizeAvailable ( ) + this . rankingProcess . sizeQueue ( ) < = item & & ! anyWorkerAlive ( ) & & this . rankingProcess . feedingIsFinished ( ) ) break ;
try { entry = this . result . element ( item , 50 ) ; } catch ( InterruptedException e ) { Log . logException ( e ) ; }
if ( entry ! = null ) break ;
if ( ! anyWorkerAlive ( ) & & this . rankingProcess . sizeQueue ( ) = = 0 & & this . rankingProcess . feedingIsFinished ( ) ) break ;
}
// finally, if there is something, return the result
if ( entry = = null ) {
EventTracker . update ( EventTracker . EClass . SEARCH , new ProfilingGraph . searchEvent ( query . id ( true ) , SearchEvent . Type . ONERESULT , "not found, item = " + item + ", available = " + this . result . sizeAvailable ( ) , 0 , 0 ) , false ) ;
return null ;
}
ResultEntry re = entry . getElement ( ) ;
EventTracker . update ( EventTracker . EClass . SEARCH , new ProfilingGraph . searchEvent ( query . id ( true ) , SearchEvent . Type . ONERESULT , "retrieved, item = " + item + ", available = " + this . result . sizeAvailable ( ) + ": " + re . urlstring ( ) , 0 , 0 ) , false ) ;
return re ;
}
private int resultCounter = 0 ;
public ResultEntry nextResult ( ) {
final ResultEntry re = oneResult ( resultCounter , 1000 ) ;
resultCounter + + ;
return re ;
}
public MediaSnippet oneImage ( final int item ) {
// always look for a next object if there are way too less
if ( this . images . sizeAvailable ( ) < = item + 10 ) fillImagesCache ( ) ;
// check if we already retrieved the item
if ( this . images . sizeDrained ( ) > item ) return this . images . element ( item ) . getElement ( ) ;
// look again if there are not enough for presentation
while ( this . images . sizeAvailable ( ) < = item ) {
if ( fillImagesCache ( ) = = 0 ) break ;
}
if ( this . images . sizeAvailable ( ) < = item ) return null ;
// now take the specific item from the image stack
return this . images . element ( item ) . getElement ( ) ;
}
private int fillImagesCache ( ) {
ResultEntry result = nextResult ( ) ;
int c = 0 ;
if ( result = = null ) return c ;
// iterate over all images in the result
final List < MediaSnippet > imagemedia = result . mediaSnippets ( ) ;
if ( imagemedia ! = null ) {
for ( final MediaSnippet ms : imagemedia ) {
images . put ( new ReverseElement < MediaSnippet > ( ms , ms . ranking ) ) ; // remove smallest in case of overflow
c + + ;
//System.out.println("*** image " + new String(ms.href.hash()) + " images.size = " + images.size() + "/" + images.size());
}
}
return c ;
}
public ArrayList < WeakPriorityBlockingQueue . Element < ResultEntry > > completeResults ( final long waitingtime ) {
final long timeout = System . currentTimeMillis ( ) + waitingtime ;
while ( result . sizeAvailable ( ) < query . neededResults ( ) & &
anyWorkerAlive ( ) & &
System . currentTimeMillis ( ) < timeout ) {
try { Thread . sleep ( 20 ) ; } catch ( final InterruptedException e ) { }
//System.out.println("+++DEBUG-completeResults+++ sleeping " + 200);
}
return this . result . list ( Math . min ( query . neededResults ( ) , this . result . sizeAvailable ( ) ) ) ;
}
public long postRanking (
final ResultEntry rentry ,
final StaticScore < String > topwords ) {
long r = 0 ;
// for media search: prefer pages with many links
if ( query . contentdom = = ContentDomain . IMAGE ) r + = rentry . limage ( ) < < query . ranking . coeff_cathasimage ;
if ( query . contentdom = = ContentDomain . AUDIO ) r + = rentry . laudio ( ) < < query . ranking . coeff_cathasaudio ;
if ( query . contentdom = = ContentDomain . VIDEO ) r + = rentry . lvideo ( ) < < query . ranking . coeff_cathasvideo ;
if ( query . contentdom = = ContentDomain . APP ) r + = rentry . lapp ( ) < < query . ranking . coeff_cathasapp ;
// prefer hit with 'prefer' pattern
if ( query . prefer . matcher ( rentry . url ( ) . toNormalform ( true , true ) ) . matches ( ) ) r + = 256 < < query . ranking . coeff_prefer ;
if ( query . prefer . matcher ( rentry . title ( ) ) . matches ( ) ) r + = 256 < < query . ranking . coeff_prefer ;
// apply 'common-sense' heuristic using references
final String urlstring = rentry . url ( ) . toNormalform ( true , true ) ;
final String [ ] urlcomps = MultiProtocolURI . urlComps ( urlstring ) ;
final String [ ] descrcomps = MultiProtocolURI . splitpattern . split ( rentry . title ( ) . toLowerCase ( ) ) ;
int tc ;
for ( int j = 0 ; j < urlcomps . length ; j + + ) {
tc = topwords . get ( urlcomps [ j ] ) ;
if ( tc > 0 ) r + = Math . max ( 1 , tc ) < < query . ranking . coeff_urlcompintoplist ;
}
for ( int j = 0 ; j < descrcomps . length ; j + + ) {
tc = topwords . get ( descrcomps [ j ] ) ;
if ( tc > 0 ) r + = Math . max ( 1 , tc ) < < query . ranking . coeff_descrcompintoplist ;
}
// apply query-in-result matching
final HandleSet urlcomph = Word . words2hashesHandles ( urlcomps ) ;
final HandleSet descrcomph = Word . words2hashesHandles ( descrcomps ) ;
final Iterator < byte [ ] > shi = query . queryHashes . iterator ( ) ;
byte [ ] queryhash ;
while ( shi . hasNext ( ) ) {
queryhash = shi . next ( ) ;
if ( urlcomph . has ( queryhash ) ) r + = 256 < < query . ranking . coeff_appurl ;
if ( descrcomph . has ( queryhash ) ) r + = 256 < < query . ranking . coeff_app_dc_title ;
}
return r ;
}
public void deployWorker ( int deployCount , final int neededResults ) {
if ( rankingProcess . feedingIsFinished ( ) & & rankingProcess . sizeQueue ( ) = = 0 ) return ;
this . workerThreads = new Worker [ /*(query.snippetCacheStrategy.mustBeOffline()) ? 1 : */ deployCount ] ;
if ( this . workerThreads = = null ) {
this . workerThreads = new Worker [ deployCount ] ;
synchronized ( this . workerThreads ) {
for ( int i = 0 ; i < workerThreads . length ; i + + ) {
Worker worker = new Worker ( i , 10000 , query . snippetCacheStrategy , neededResults ) ;
@ -116,9 +254,24 @@ public class ResultFetcher {
this . workerThreads [ i ] = worker ;
}
}
} else {
// there are still worker threads running, but some may be dead.
// if we find dead workers, reanimate them
synchronized ( this . workerThreads ) {
for ( int i = 0 ; i < this . workerThreads . length ; i + + ) {
if ( deployCount < = 0 ) break ;
if ( this . workerThreads [ i ] = = null | | ! this . workerThreads [ i ] . isAlive ( ) ) {
Worker worker = new Worker ( i , 10000 , query . snippetCacheStrategy , neededResults ) ;
worker . start ( ) ;
this . workerThreads [ i ] = worker ;
deployCount - - ;
}
}
}
}
}
boolean anyWorkerAlive ( ) {
private boolean anyWorkerAlive ( ) {
if ( this . workerThreads = = null ) return false ;
synchronized ( this . workerThreads ) {
for ( int i = 0 ; i < this . workerThreads . length ; i + + ) {
@ -130,14 +283,6 @@ public class ResultFetcher {
return false ;
}
public long getURLRetrievalTime ( ) {
return this . urlRetrievalAllTime ;
}
public long getSnippetComputationTime ( ) {
return this . snippetComputationAllTime ;
}
protected class Worker extends Thread {
private final long timeout ; // the date until this thread should try to work
@ -169,12 +314,13 @@ public class ResultFetcher {
// check if we have enough
if ( result . sizeAvailable ( ) > = this . neededResults ) {
//System.out.println("result.sizeAvailable() >= this.neededResults");
System . out . println ( result . sizeAvailable ( ) + " = result.sizeAvailable() >= this.neededResults = " + this . neededResults ) ;
break ;
}
// check if we can succeed if we try to take another url
if ( rankingProcess . feedingIsFinished ( ) & & rankingProcess . sizeQueue ( ) = = 0 ) {
System . out . println ( "rankingProcess.feedingIsFinished() && rankingProcess.sizeQueue() == 0" ) ;
break ;
}
@ -182,7 +328,7 @@ public class ResultFetcher {
page = rankingProcess . takeURL ( true , this . timeout - System . currentTimeMillis ( ) ) ;
//if (page == null) page = rankedCache.takeURL(false, this.timeout - System.currentTimeMillis());
if ( page = = null ) {
//System.out.println("page == null");
System . out . println ( "page == null" ) ;
break ; // no more available
}
if ( failedURLs . has ( page . hash ( ) ) ) continue ;
@ -294,147 +440,4 @@ public class ResultFetcher {
}
Log . logInfo ( "SEARCH" , "sorted out urlhash " + new String ( urlhash ) + " during search: " + reason ) ;
}
public ResultEntry oneResult ( final int item , long timeout ) {
// check if we already retrieved this item
// (happens if a search pages is accessed a second time)
long finishTime = System . currentTimeMillis ( ) + timeout ;
EventTracker . update ( EventTracker . EClass . SEARCH , new ProfilingGraph . searchEvent ( query . id ( true ) , SearchEvent . Type . ONERESULT , "started, item = " + item + ", available = " + this . result . sizeAvailable ( ) , 0 , 0 ) , false ) ;
if ( this . result . sizeAvailable ( ) > item ) {
// we have the wanted result already in the result array .. return that
ResultEntry re = this . result . element ( item ) . getElement ( ) ;
EventTracker . update ( EventTracker . EClass . SEARCH , new ProfilingGraph . searchEvent ( query . id ( true ) , SearchEvent . Type . ONERESULT , "prefetched, item = " + item + ", available = " + this . result . sizeAvailable ( ) + ": " + re . urlstring ( ) , 0 , 0 ) , false ) ;
return re ;
}
/ *
System . out . println ( "rankedCache.size() = " + this . rankedCache . size ( ) ) ;
System . out . println ( "result.size() = " + this . result . size ( ) ) ;
System . out . println ( "query.neededResults() = " + query . neededResults ( ) ) ;
* /
if ( this . result . sizeAvailable ( ) < = item ) {
// start worker threads to fetch urls and snippets
//System.out.println("item = " + item);
//System.out.println("anyWorkerAlive() = " + anyWorkerAlive());
//System.out.println("rankingProcess.feedingIsFinished() = " + rankingProcess.feedingIsFinished());
//System.out.println("this.rankingProcess.sizeQueue() = " + this.rankingProcess.sizeQueue());
//System.out.println("this.result.sizeAvailable() = " + this.result.sizeAvailable());
//System.out.println("this.result.sizeAvailable() + this.rankingProcess.sizeQueue() = " + (this.result.sizeAvailable() + this.rankingProcess.sizeQueue()));
deployWorker ( Math . min ( 20 , query . itemsPerPage ) , ( ( item + query . itemsPerPage ) / query . itemsPerPage ) * query . itemsPerPage ) ;
}
// finally wait until enough results are there produced from the
// snippet fetch process
WeakPriorityBlockingQueue . Element < ResultEntry > entry = null ;
while ( System . currentTimeMillis ( ) < finishTime ) {
if ( this . result . sizeAvailable ( ) + this . rankingProcess . sizeQueue ( ) < = item & & ! anyWorkerAlive ( ) & & this . rankingProcess . feedingIsFinished ( ) ) break ;
try { entry = this . result . element ( item , 50 ) ; } catch ( InterruptedException e ) { Log . logException ( e ) ; }
if ( entry ! = null ) break ;
if ( ! anyWorkerAlive ( ) & & this . rankingProcess . sizeQueue ( ) = = 0 & & this . rankingProcess . feedingIsFinished ( ) ) break ; //
}
// finally, if there is something, return the result
if ( entry = = null ) {
EventTracker . update ( EventTracker . EClass . SEARCH , new ProfilingGraph . searchEvent ( query . id ( true ) , SearchEvent . Type . ONERESULT , "not found, item = " + item + ", available = " + this . result . sizeAvailable ( ) , 0 , 0 ) , false ) ;
return null ;
}
ResultEntry re = entry . getElement ( ) ;
EventTracker . update ( EventTracker . EClass . SEARCH , new ProfilingGraph . searchEvent ( query . id ( true ) , SearchEvent . Type . ONERESULT , "retrieved, item = " + item + ", available = " + this . result . sizeAvailable ( ) + ": " + re . urlstring ( ) , 0 , 0 ) , false ) ;
return re ;
}
private int resultCounter = 0 ;
public ResultEntry nextResult ( ) {
final ResultEntry re = oneResult ( resultCounter , 1000 ) ;
resultCounter + + ;
return re ;
}
public MediaSnippet oneImage ( final int item ) {
// always look for a next object if there are way too less
if ( this . images . sizeAvailable ( ) < = item + 10 ) fillImagesCache ( ) ;
// check if we already retrieved the item
if ( this . images . sizeDrained ( ) > item ) return this . images . element ( item ) . getElement ( ) ;
// look again if there are not enough for presentation
while ( this . images . sizeAvailable ( ) < = item ) {
if ( fillImagesCache ( ) = = 0 ) break ;
}
if ( this . images . sizeAvailable ( ) < = item ) return null ;
// now take the specific item from the image stack
return this . images . element ( item ) . getElement ( ) ;
}
private int fillImagesCache ( ) {
ResultEntry result = nextResult ( ) ;
int c = 0 ;
if ( result = = null ) return c ;
// iterate over all images in the result
final List < MediaSnippet > imagemedia = result . mediaSnippets ( ) ;
if ( imagemedia ! = null ) {
for ( final MediaSnippet ms : imagemedia ) {
images . put ( new ReverseElement < MediaSnippet > ( ms , ms . ranking ) ) ; // remove smallest in case of overflow
c + + ;
//System.out.println("*** image " + new String(ms.href.hash()) + " images.size = " + images.size() + "/" + images.size());
}
}
return c ;
}
public ArrayList < WeakPriorityBlockingQueue . Element < ResultEntry > > completeResults ( final long waitingtime ) {
final long timeout = System . currentTimeMillis ( ) + waitingtime ;
while ( result . sizeAvailable ( ) < query . neededResults ( ) & &
anyWorkerAlive ( ) & &
System . currentTimeMillis ( ) < timeout ) {
try { Thread . sleep ( 20 ) ; } catch ( final InterruptedException e ) { }
//System.out.println("+++DEBUG-completeResults+++ sleeping " + 200);
}
return this . result . list ( Math . min ( query . neededResults ( ) , this . result . sizeAvailable ( ) ) ) ;
}
public long postRanking (
final ResultEntry rentry ,
final StaticScore < String > topwords ) {
long r = 0 ;
// for media search: prefer pages with many links
if ( query . contentdom = = ContentDomain . IMAGE ) r + = rentry . limage ( ) < < query . ranking . coeff_cathasimage ;
if ( query . contentdom = = ContentDomain . AUDIO ) r + = rentry . laudio ( ) < < query . ranking . coeff_cathasaudio ;
if ( query . contentdom = = ContentDomain . VIDEO ) r + = rentry . lvideo ( ) < < query . ranking . coeff_cathasvideo ;
if ( query . contentdom = = ContentDomain . APP ) r + = rentry . lapp ( ) < < query . ranking . coeff_cathasapp ;
// prefer hit with 'prefer' pattern
if ( query . prefer . matcher ( rentry . url ( ) . toNormalform ( true , true ) ) . matches ( ) ) r + = 256 < < query . ranking . coeff_prefer ;
if ( query . prefer . matcher ( rentry . title ( ) ) . matches ( ) ) r + = 256 < < query . ranking . coeff_prefer ;
// apply 'common-sense' heuristic using references
final String urlstring = rentry . url ( ) . toNormalform ( true , true ) ;
final String [ ] urlcomps = MultiProtocolURI . urlComps ( urlstring ) ;
final String [ ] descrcomps = MultiProtocolURI . splitpattern . split ( rentry . title ( ) . toLowerCase ( ) ) ;
int tc ;
for ( int j = 0 ; j < urlcomps . length ; j + + ) {
tc = topwords . get ( urlcomps [ j ] ) ;
if ( tc > 0 ) r + = Math . max ( 1 , tc ) < < query . ranking . coeff_urlcompintoplist ;
}
for ( int j = 0 ; j < descrcomps . length ; j + + ) {
tc = topwords . get ( descrcomps [ j ] ) ;
if ( tc > 0 ) r + = Math . max ( 1 , tc ) < < query . ranking . coeff_descrcompintoplist ;
}
// apply query-in-result matching
final HandleSet urlcomph = Word . words2hashesHandles ( urlcomps ) ;
final HandleSet descrcomph = Word . words2hashesHandles ( descrcomps ) ;
final Iterator < byte [ ] > shi = query . queryHashes . iterator ( ) ;
byte [ ] queryhash ;
while ( shi . hasNext ( ) ) {
queryhash = shi . next ( ) ;
if ( urlcomph . has ( queryhash ) ) r + = 256 < < query . ranking . coeff_appurl ;
if ( descrcomph . has ( queryhash ) ) r + = 256 < < query . ranking . coeff_app_dc_title ;
}
return r ;
}
}