@ -47,7 +47,6 @@ import net.yacy.cora.lod.JenaTripleStore;
import net.yacy.cora.lod.vocabulary.Tagging ;
import net.yacy.cora.lod.vocabulary.YaCyMetadata ;
import net.yacy.cora.order.Base64Order ;
import net.yacy.cora.protocol.ResponseHeader ;
import net.yacy.cora.protocol.Scanner ;
import net.yacy.cora.sorting.ClusteredScoreMap ;
import net.yacy.cora.sorting.ConcurrentScoreMap ;
@ -57,7 +56,6 @@ import net.yacy.cora.sorting.WeakPriorityBlockingQueue.Element;
import net.yacy.cora.sorting.WeakPriorityBlockingQueue.ReverseElement ;
import net.yacy.cora.storage.HandleSet ;
import net.yacy.cora.util.SpaceExceededException ;
import net.yacy.crawler.data.Cache ;
import net.yacy.data.WorkTables ;
import net.yacy.document.Condenser ;
import net.yacy.document.LargeNumberCache ;
@ -81,7 +79,6 @@ import net.yacy.repository.LoaderDispatcher;
import net.yacy.repository.Blacklist.BlacklistType ;
import net.yacy.search.EventTracker ;
import net.yacy.search.Switchboard ;
import net.yacy.search.snippet.MediaSnippet ;
import net.yacy.search.snippet.ResultEntry ;
public final class SearchEvent {
@ -114,12 +111,11 @@ public final class SearchEvent {
private final ScoreMap < String > protocolNavigator ; // a counter for protocol types
private final ScoreMap < String > filetypeNavigator ; // a counter for file types
protected final WeakPriorityBlockingQueue < URIMetadataNode > nodeStack ;
private final WeakPriorityBlockingQueue < MediaSnippet > images ; // container to sort images by size
protected final WeakPriorityBlockingQueue < ResultEntry > result ;
protected final LoaderDispatcher loader ;
protected final HandleSet snippetFetchWordHashes ; // a set of word hashes that are used to match with the snippets
protected final boolean deleteIfSnippetFail ;
pr otected SnippetWorker [ ] workerThreads ;
pr ivate SnippetWorker [ ] workerThreads ;
protected long urlRetrievalAllTime ;
protected long snippetComputationAllTime ;
private final boolean remote ;
@ -138,9 +134,7 @@ public final class SearchEvent {
final int burstRobinsonPercent ,
final int burstMultiwordPercent ,
final boolean deleteIfSnippetFail ) {
if ( MemoryControl . available ( ) < 1024 * 1024 * 100 ) {
SearchEventCache . cleanupEvents ( false ) ;
}
if ( MemoryControl . available ( ) < 1024 * 1024 * 100 ) SearchEventCache . cleanupEvents ( false ) ;
this . eventTime = System . currentTimeMillis ( ) ; // for lifetime check
this . peers = peers ;
this . workTables = workTables ;
@ -169,7 +163,7 @@ public final class SearchEvent {
this . IAmaxcounthash = null ;
this . IAneardhthash = null ;
this . localSearchThread = null ;
boolean remote =
this . remote =
( peers ! = null & & peers . sizeConnected ( ) > 0 )
& & ( this . query . domType = = QueryParams . Searchdom . CLUSTER | | ( this . query . domType = = QueryParams . Searchdom . GLOBAL & & peers
. mySeed ( )
@ -187,7 +181,7 @@ public final class SearchEvent {
// start a local RWI search concurrently
this . rankingProcess . start ( ) ;
if ( remote ) {
if ( this . remote ) {
// start global searches
final long timer = System . currentTimeMillis ( ) ;
if ( this . query . query_include_hashes . isEmpty ( ) ) {
@ -218,21 +212,9 @@ public final class SearchEvent {
+ " THREADS TO CATCH EACH "
+ remote_maxcount
+ " URLs" ) ;
EventTracker . update (
EventTracker . EClass . SEARCH ,
new ProfilingGraph . EventSearch (
this . query . id ( true ) ,
SearchEventType . REMOTESEARCH_START ,
"" ,
this . primarySearchThreadsL . size ( ) ,
System . currentTimeMillis ( ) - timer ) ,
false ) ;
EventTracker . update ( EventTracker . EClass . SEARCH , new ProfilingGraph . EventSearch ( this . query . id ( true ) , SearchEventType . REMOTESEARCH_START , "" , this . primarySearchThreadsL . size ( ) , System . currentTimeMillis ( ) - timer ) , false ) ;
// finished searching
Log . logFine ( "SEARCH_EVENT" , "SEARCH TIME AFTER GLOBAL-TRIGGER TO "
+ this . primarySearchThreadsL . size ( )
+ " PEERS: "
+ ( ( System . currentTimeMillis ( ) - start ) / 1000 )
+ " seconds" ) ;
Log . logFine ( "SEARCH_EVENT" , "SEARCH TIME AFTER GLOBAL-TRIGGER TO " + this . primarySearchThreadsL . size ( ) + " PEERS: " + ( ( System . currentTimeMillis ( ) - start ) / 1000 ) + " seconds" ) ;
} else {
// no search since query is empty, user might have entered no data or filters have removed all search words
Log . logFine ( "SEARCH_EVENT" , "NO SEARCH STARTED DUE TO EMPTY SEARCH REQUEST." ) ;
@ -275,15 +257,7 @@ public final class SearchEvent {
. compressIndex ( container , null , 1000 )
. toString ( ) ) ;
}
EventTracker . update (
EventTracker . EClass . SEARCH ,
new ProfilingGraph . EventSearch (
this . query . id ( true ) ,
SearchEventType . ABSTRACTS ,
"" ,
this . rankingProcess . searchContainerMap ( ) . size ( ) ,
System . currentTimeMillis ( ) - timer ) ,
false ) ;
EventTracker . update ( EventTracker . EClass . SEARCH , new ProfilingGraph . EventSearch ( this . query . id ( true ) , SearchEventType . ABSTRACTS , "" , this . rankingProcess . searchContainerMap ( ) . size ( ) , System . currentTimeMillis ( ) - timer ) , false ) ;
} else {
// give process time to accumulate a certain amount of data
// before a reading process wants to get results from it
@ -298,12 +272,10 @@ public final class SearchEvent {
// start worker threads to fetch urls and snippets
this . deleteIfSnippetFail = deleteIfSnippetFail ;
this . remote = remote ;
this . cleanupState = false ;
this . urlRetrievalAllTime = 0 ;
this . snippetComputationAllTime = 0 ;
this . result = new WeakPriorityBlockingQueue < ResultEntry > ( Math . max ( 1000 , 10 * query . itemsPerPage ( ) ) , true ) ; // this is the result, enriched with snippets, ranked and ordered by ranking
this . images = new WeakPriorityBlockingQueue < MediaSnippet > ( Math . max ( 1000 , 10 * query . itemsPerPage ( ) ) , true ) ;
// snippets do not need to match with the complete query hashes,
// only with the query minus the stopwords which had not been used for the search
@ -326,12 +298,7 @@ public final class SearchEvent {
// clean up events
SearchEventCache . cleanupEvents ( false ) ;
EventTracker . update ( EventTracker . EClass . SEARCH , new ProfilingGraph . EventSearch (
this . query . id ( true ) ,
SearchEventType . CLEANUP ,
"" ,
0 ,
0 ) , false ) ;
EventTracker . update ( EventTracker . EClass . SEARCH , new ProfilingGraph . EventSearch ( this . query . id ( true ) , SearchEventType . CLEANUP , "" , 0 , 0 ) , false ) ;
// store this search to a cache so it can be re-used
if ( MemoryControl . available ( ) < 1024 * 1024 * 100 ) {
@ -398,23 +365,11 @@ public final class SearchEvent {
}
// clear all data structures
if ( this . preselectedPeerHashes ! = null ) {
this . preselectedPeerHashes . clear ( ) ;
}
if ( this . localSearchThread ! = null ) {
if ( this . localSearchThread . isAlive ( ) ) {
this . localSearchThread . interrupt ( ) ;
}
}
if ( this . IACount ! = null ) {
this . IACount . clear ( ) ;
}
if ( this . IAResults ! = null ) {
this . IAResults . clear ( ) ;
}
if ( this . heuristics ! = null ) {
this . heuristics . clear ( ) ;
}
if ( this . preselectedPeerHashes ! = null ) this . preselectedPeerHashes . clear ( ) ;
if ( this . localSearchThread ! = null & & this . localSearchThread . isAlive ( ) ) this . localSearchThread . interrupt ( ) ;
if ( this . IACount ! = null ) this . IACount . clear ( ) ;
if ( this . IAResults ! = null ) this . IAResults . clear ( ) ;
if ( this . heuristics ! = null ) this . heuristics . clear ( ) ;
}
public String abstractsString ( final byte [ ] hash ) {
@ -541,12 +496,7 @@ public final class SearchEvent {
// normalize entries
int is = index . size ( ) ;
EventTracker . update ( EventTracker . EClass . SEARCH , new ProfilingGraph . EventSearch (
this . query . id ( true ) ,
SearchEventType . NORMALIZING ,
resourceName ,
is ,
System . currentTimeMillis ( ) - timer ) , false ) ;
EventTracker . update ( EventTracker . EClass . SEARCH , new ProfilingGraph . EventSearch ( this . query . id ( true ) , SearchEventType . NORMALIZING , resourceName , is , System . currentTimeMillis ( ) - timer ) , false ) ;
if ( ! local ) {
this . rankingProcess . receivedRemoteReferences . addAndGet ( is ) ;
}
@ -658,14 +608,7 @@ public final class SearchEvent {
}
} catch ( final SpaceExceededException e ) {
}
//if ((query.neededResults() > 0) && (container.size() > query.neededResults())) remove(true, true);
EventTracker . update ( EventTracker . EClass . SEARCH , new ProfilingGraph . EventSearch (
this . query . id ( true ) ,
SearchEventType . PRESORT ,
resourceName ,
index . size ( ) ,
System . currentTimeMillis ( ) - timer ) , false ) ;
EventTracker . update ( EventTracker . EClass . SEARCH , new ProfilingGraph . EventSearch ( this . query . id ( true ) , SearchEventType . PRESORT , resourceName , index . size ( ) , System . currentTimeMillis ( ) - timer ) , false ) ;
}
private long waitTimeRecommendation ( ) {
@ -750,11 +693,7 @@ public final class SearchEvent {
}
} catch ( final InterruptedException e1 ) {
}
//Log.logWarning("RWIProcess", "feedingIsFinished() = " + feedingIsFinished());
//Log.logWarning("RWIProcess", "this.addRunning = " + this.addRunning);
//Log.logWarning("RWIProcess", "this.nodeStack.sizeQueue() = " + this.nodeStack.sizeQueue());
//Log.logWarning("RWIProcess", "this.stack.sizeQueue() = " + this.rwiStack.sizeQueue());
//Log.logWarning("RWIProcess", "this.doubleDomCachee.size() = " + this.doubleDomCache.size());
//Log.logWarning("RWIProcess", "feedingIsFinished() = " + feedingIsFinished() + ", this.addRunning = " + this.addRunning + ", this.nodeStack.sizeQueue() = " + this.nodeStack.sizeQueue() + ", this.stack.sizeQueue() = " + this.rwiStack.sizeQueue() + ", this.doubleDomCachee.size() = " + this.doubleDomCache.size());
if ( this . rankingProcess . doubleDomCache . isEmpty ( ) ) {
Log . logWarning ( "RWIProcess" , "doubleDomCache.isEmpty" ) ;
return null ;
@ -857,9 +796,7 @@ public final class SearchEvent {
}
// content control
if ( Switchboard . getSwitchboard ( ) . getConfigBool ( "contentcontrol.enabled" , false ) = = true ) {
// check global network filter from bookmark list
if ( ! Switchboard . getSwitchboard ( )
. getConfig ( "contentcontrol.mandatoryfilterlist" , "" )
@ -889,9 +826,7 @@ public final class SearchEvent {
}
// check index-of constraint
if ( ( this . query . constraint ! = null )
& & ( this . query . constraint . get ( Condenser . flag_cat_indexof ) )
& & ( ! ( pagetitle . startsWith ( "index of" ) ) ) ) {
if ( ( this . query . constraint ! = null ) & & ( this . query . constraint . get ( Condenser . flag_cat_indexof ) ) & & ( ! ( pagetitle . startsWith ( "index of" ) ) ) ) {
final Iterator < byte [ ] > wi = this . query . query_include_hashes . iterator ( ) ;
while ( wi . hasNext ( ) ) {
this . query . getSegment ( ) . termIndex ( ) . removeDelayed ( wi . next ( ) , page . hash ( ) ) ;
@ -901,9 +836,7 @@ public final class SearchEvent {
}
// check location constraint
if ( ( this . query . constraint ! = null )
& & ( this . query . constraint . get ( Condenser . flag_cat_haslocation ) )
& & ( page . lat ( ) = = 0.0f | | page . lon ( ) = = 0.0f ) ) {
if ( ( this . query . constraint ! = null ) & & ( this . query . constraint . get ( Condenser . flag_cat_haslocation ) ) & & ( page . lat ( ) = = 0.0f | | page . lon ( ) = = 0.0f ) ) {
this . sortout + + ;
continue ;
}
@ -971,12 +904,9 @@ public final class SearchEvent {
// file type navigation
final String fileext = page . url ( ) . getFileExtension ( ) ;
if ( fileext . length ( ) > 0 ) {
this . filetypeNavigator . inc ( fileext ) ;
}
if ( fileext . length ( ) > 0 ) this . filetypeNavigator . inc ( fileext ) ;
// accept url
return page ;
return page ; // accept url
}
Log . logWarning ( "RWIProcess" , "loop terminated" ) ;
return null ;
@ -996,8 +926,7 @@ public final class SearchEvent {
// (happens if a search pages is accessed a second time)
final long finishTime = System . currentTimeMillis ( ) + timeout ;
EventTracker . update ( EventTracker . EClass . SEARCH , new ProfilingGraph . EventSearch ( this . query . id ( true ) , SearchEventType . ONERESULT , "started, item = " + item + ", available = " + this . result . sizeAvailable ( ) , 0 , 0 ) , false ) ;
//Log.logInfo("SnippetProcess", "*start method for item = " + item + "; anyWorkerAlive=" + anyWorkerAlive() + "; this.rankingProcess.isAlive() = " + this.rankingProcess.isAlive() + "; this.rankingProcess.feedingIsFinished() = " + this.rankingProcess.feedingIsFinished() + "; this.result.sizeAvailable() = " + this.result.sizeAvailable() + ", this.rankingProcess.sizeQueue() = " + this.rankingProcess.sizeQueue());
// we must wait some time until the first result page is full to get enough elements for ranking
final long waittimeout = System . currentTimeMillis ( ) + 300 ;
if ( this . remote & & item < 10 & & ! this . rankingProcess . feedingIsFinished ( ) ) {
@ -1005,7 +934,6 @@ public final class SearchEvent {
// before they are presented on the first lines .. yes sleeps seem to be bad. but how shall we predict how long other
// peers will take until they respond?
long sleep = item = = 0 ? 400 : ( 10 - item ) * 9 ; // the first result takes the longest time
//Log.logInfo("SnippetProcess", "SLEEP = " + sleep);
try { Thread . sleep ( sleep ) ; } catch ( final InterruptedException e1 ) { Log . logException ( e1 ) ; }
}
int thisRankingQueueSize , lastRankingQueueSize = 0 ;
@ -1032,11 +960,9 @@ public final class SearchEvent {
// finally wait until enough results are there produced from the snippet fetch process
WeakPriorityBlockingQueue . Element < ResultEntry > entry = null ;
while ( System . currentTimeMillis ( ) < finishTime ) {
Log . logInfo ( "SnippetProcess" , "item = " + item + "; anyWorkerAlive=" + anyWorkerAlive ( ) + "; this.rankingProcess.isAlive() = " + this . rankingProcess . isAlive ( ) + "; this.rankingProcess.feedingIsFinished() = " + this . rankingProcess . feedingIsFinished ( ) + "; this.result.sizeAvailable() = " + this . result . sizeAvailable ( ) + ", this.rankingProcess.sizeQueue() = " + this . rankingProcess . rwiQueueSize ( ) + ", this.rankingProcess.nodeStack.sizeAvailable() = " + this . nodeStack . sizeAvailable ( ) ) ;
if ( ! anyWorkerAlive ( ) & & ! this . rankingProcess . isAlive ( ) & & this . result . sizeAvailable ( ) + this . rankingProcess . rwiQueueSize ( ) + this . nodeStack . sizeAvailable ( ) < = item & & this . rankingProcess . feedingIsFinished ( ) ) {
//Log.logInfo("SnippetProcess", "interrupted result fetching; item = " + item + "; this.result.sizeAvailable() = " + this.result.sizeAvailable() + ", this.rankingProcess.sizeQueue() = " + this.rankingProcess.sizeQueue() + "; this.rankingProcess.feedingIsFinished() = " + this.rankingProcess.feedingIsFinished());
break ; // the fail case
}
@ -1047,84 +973,24 @@ public final class SearchEvent {
}
try { entry = this . result . element ( item , 50 ) ; } catch ( final InterruptedException e ) { break ; }
if ( entry ! = null ) { break ; }
if ( entry ! = null ) break ;
}
// finally, if there is something, return the result
if ( entry = = null ) {
EventTracker . update ( EventTracker . EClass . SEARCH , new ProfilingGraph . EventSearch ( this . query . id ( true ) , SearchEventType . ONERESULT , "not found, item = " + item + ", available = " + this . result . sizeAvailable ( ) , 0 , 0 ) , false ) ;
//Log.logInfo("SnippetProcess", "NO ENTRY computed (possible timeout); anyWorkerAlive=" + anyWorkerAlive() + "; this.rankingProcess.isAlive() = " + this.rankingProcess.isAlive() + "; this.rankingProcess.feedingIsFinished() = " + this.rankingProcess.feedingIsFinished() + "; this.result.sizeAvailable() = " + this.result.sizeAvailable() + ", this.rankingProcess.sizeQueue() = " + this.rankingProcess.sizeQueue());
return null ;
}
final ResultEntry re = entry . getElement ( ) ;
EventTracker . update ( EventTracker . EClass . SEARCH , new ProfilingGraph . EventSearch ( this . query . id ( true ) , SearchEventType . ONERESULT , "retrieved, item = " + item + ", available = " + this . result . sizeAvailable ( ) + ": " + re . urlstring ( ) , 0 , 0 ) , false ) ;
if ( item = = this . query . offset + this . query . itemsPerPage - 1 ) {
stopAllWorker ( ) ; // we don't need more
}
if ( item = = this . query . offset + this . query . itemsPerPage - 1 ) stopAllWorker ( ) ; // we don't need more
return re ;
}
public MediaSnippet oneImage ( final int item ) {
// always look for a next object if there are way too less
if ( this . images . sizeAvailable ( ) < = item + 10 ) {
fillImagesCache ( ) ;
}
// check if we already retrieved the item
if ( this . images . sizeDrained ( ) > item ) {
return this . images . element ( item ) . getElement ( ) ;
}
// look again if there are not enough for presentation
while ( this . images . sizeAvailable ( ) < = item ) {
if ( fillImagesCache ( ) = = 0 ) {
break ;
}
}
if ( this . images . sizeAvailable ( ) < = item ) {
return null ;
}
// now take the specific item from the image stack
return this . images . element ( item ) . getElement ( ) ;
}
private int fillImagesCache ( ) {
final ResultEntry re = oneResult ( this . resultCounter , Math . max ( 3000 , this . query . timeout - System . currentTimeMillis ( ) ) ) ;
this . resultCounter + + ;
final ResultEntry result = re ;
int c = 0 ;
if ( result = = null ) {
return c ;
}
// iterate over all images in the result
final List < MediaSnippet > imagemedia = result . mediaSnippets ( ) ;
if ( imagemedia ! = null ) {
ResponseHeader header ;
feedloop : for ( final MediaSnippet ms : imagemedia ) {
// check cache to see if the mime type of the image url is correct
header = Cache . getResponseHeader ( ms . href . hash ( ) ) ;
if ( header ! = null ) {
// this does not work for all urls since some of them may not be in the cache
if ( header . mime ( ) . startsWith ( "text" ) | | header . mime ( ) . startsWith ( "application" ) ) {
continue feedloop ;
}
}
this . images . put ( new ReverseElement < MediaSnippet > ( ms , ms . ranking ) ) ; // remove smallest in case of overflow
c + + ;
//System.out.println("*** image " + UTF8.String(ms.href.hash()) + " images.size = " + images.size() + "/" + images.size());
}
}
return c ;
}
public ArrayList < WeakPriorityBlockingQueue . Element < ResultEntry > > completeResults ( final long waitingtime ) {
final long timeout = System . currentTimeMillis ( ) + waitingtime ;
while ( this . result . sizeAvailable ( ) < this . query . neededResults ( ) & &
anyWorkerAlive ( ) & &
System . currentTimeMillis ( ) < timeout ) {
while ( this . result . sizeAvailable ( ) < this . query . neededResults ( ) & & anyWorkerAlive ( ) & & System . currentTimeMillis ( ) < timeout ) {
try { Thread . sleep ( 10 ) ; } catch ( final InterruptedException e ) { }
//System.out.println("+++DEBUG-completeResults+++ sleeping " + 200);
}
return this . result . list ( Math . min ( this . query . neededResults ( ) , this . result . sizeAvailable ( ) ) ) ;
}
@ -1201,11 +1067,7 @@ public final class SearchEvent {
}
synchronized ( this . workerThreads ) {
for ( final SnippetWorker workerThread : this . workerThreads ) {
if ( ( workerThread ! = null ) & &
( workerThread . isAlive ( ) ) & &
( workerThread . busytime ( ) < 10000 ) ) {
return true ;
}
if ( ( workerThread ! = null ) & & ( workerThread . isAlive ( ) ) & & ( workerThread . busytime ( ) < 10000 ) ) return true ;
}
}
return false ;