@ -9,7 +9,7 @@
// $LastChangedBy$
// $LastChangedBy$
//
//
// LICENSE
// LICENSE
//
//
// This program is free software; you can redistribute it and/or modify
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 2 of the License, or
// the Free Software Foundation; either version 2 of the License, or
@ -48,22 +48,22 @@ import net.yacy.kelondro.util.EventTracker;
import net.yacy.kelondro.util.MemoryControl ;
import net.yacy.kelondro.util.MemoryControl ;
import net.yacy.kelondro.util.SetTools ;
import net.yacy.kelondro.util.SetTools ;
import net.yacy.repository.LoaderDispatcher ;
import net.yacy.repository.LoaderDispatcher ;
import de.anomic.data.WorkTables ;
import de.anomic.data.WorkTables ;
import de.anomic.search.ResultFetcher.Worker ;
import de.anomic.yacy.yacySearch ;
import de.anomic.yacy.yacySearch ;
import de.anomic.yacy.yacySeedDB ;
import de.anomic.yacy.yacySeedDB ;
import de.anomic.yacy.dht.FlatWordPartitionScheme ;
import de.anomic.yacy.dht.FlatWordPartitionScheme ;
import de.anomic.yacy.graphics.ProfilingGraph ;
import de.anomic.yacy.graphics.ProfilingGraph ;
public final class SearchEvent {
public final class SearchEvent {
public enum Type {
public enum Type {
INITIALIZATION , COLLECTION , JOIN , PRESORT , URLFETCH , NORMALIZING , FINALIZATION ,
INITIALIZATION , COLLECTION , JOIN , PRESORT , URLFETCH , NORMALIZING , FINALIZATION ,
REMOTESEARCH_START , REMOTESEARCH_TERMINATE , ABSTRACTS , CLEANUP , SNIPPETFETCH_START , ONERESULT , REFERENCECOLLECTION , RESULTLIST ;
REMOTESEARCH_START , REMOTESEARCH_TERMINATE , ABSTRACTS , CLEANUP , SNIPPETFETCH_START , ONERESULT , REFERENCECOLLECTION , RESULTLIST ;
}
}
public static final int max_results_preparation = 3000 ;
public static final int max_results_preparation = 3000 ;
// class variables that may be implemented with an abstract class
// class variables that may be implemented with an abstract class
private long eventTime ;
private long eventTime ;
private QueryParams query ;
private QueryParams query ;
@ -71,8 +71,8 @@ public final class SearchEvent {
private final WorkTables workTables ;
private final WorkTables workTables ;
private RankingProcess rankingProcess ; // ordered search results, grows dynamically as all the query threads enrich this container
private RankingProcess rankingProcess ; // ordered search results, grows dynamically as all the query threads enrich this container
private ResultFetcher resultFetcher ;
private ResultFetcher resultFetcher ;
private final SecondarySearchSuperviser secondarySearchSuperviser ;
private final SecondarySearchSuperviser secondarySearchSuperviser ;
// class variables for remote searches
// class variables for remote searches
private yacySearch [ ] primarySearchThreads , secondarySearchThreads ;
private yacySearch [ ] primarySearchThreads , secondarySearchThreads ;
@ -83,7 +83,7 @@ public final class SearchEvent {
private final SortedMap < byte [ ] , HeuristicResult > heuristics ;
private final SortedMap < byte [ ] , HeuristicResult > heuristics ;
private byte [ ] IAmaxcounthash , IAneardhthash ;
private byte [ ] IAmaxcounthash , IAneardhthash ;
private final ReferenceOrder order ;
private final ReferenceOrder order ;
protected SearchEvent ( final QueryParams query ,
protected SearchEvent ( final QueryParams query ,
final yacySeedDB peers ,
final yacySeedDB peers ,
final WorkTables workTables ,
final WorkTables workTables ,
@ -119,10 +119,10 @@ public final class SearchEvent {
// initialize a ranking process that is the target for data
// initialize a ranking process that is the target for data
// that is generated concurrently from local and global search threads
// that is generated concurrently from local and global search threads
this . rankingProcess = new RankingProcess ( this . query , this . order , max_results_preparation ) ;
this . rankingProcess = new RankingProcess ( this . query , this . order , max_results_preparation ) ;
// start a local search concurrently
// start a local search concurrently
this . rankingProcess . start ( ) ;
this . rankingProcess . start ( ) ;
// start global searches
// start global searches
final long timer = System . currentTimeMillis ( ) ;
final long timer = System . currentTimeMillis ( ) ;
this . primarySearchThreads = ( query . queryHashes . isEmpty ( ) ) ? null : yacySearch . primaryRemoteSearches (
this . primarySearchThreads = ( query . queryHashes . isEmpty ( ) ) ? null : yacySearch . primaryRemoteSearches (
@ -139,8 +139,8 @@ public final class SearchEvent {
query . maxDistance ,
query . maxDistance ,
query . getSegment ( ) ,
query . getSegment ( ) ,
peers ,
peers ,
rankingProcess,
this . rankingProcess,
secondarySearchSuperviser,
this . secondarySearchSuperviser,
Switchboard . urlBlacklist ,
Switchboard . urlBlacklist ,
query . ranking ,
query . ranking ,
query . constraint ,
query . constraint ,
@ -152,18 +152,18 @@ public final class SearchEvent {
this . rankingProcess . moreFeeders ( this . primarySearchThreads . length ) ;
this . rankingProcess . moreFeeders ( this . primarySearchThreads . length ) ;
EventTracker . update ( EventTracker . EClass . SEARCH , new ProfilingGraph . searchEvent ( query . id ( true ) , Type . REMOTESEARCH_START , "" , this . primarySearchThreads . length , System . currentTimeMillis ( ) - timer ) , false ) ;
EventTracker . update ( EventTracker . EClass . SEARCH , new ProfilingGraph . searchEvent ( query . id ( true ) , Type . REMOTESEARCH_START , "" , this . primarySearchThreads . length , System . currentTimeMillis ( ) - timer ) , false ) ;
// finished searching
// finished searching
Log . logFine ( "SEARCH_EVENT" , "SEARCH TIME AFTER GLOBAL-TRIGGER TO " + primarySearchThreads. length + " PEERS: " + ( ( System . currentTimeMillis ( ) - start ) / 1000 ) + " seconds" ) ;
Log . logFine ( "SEARCH_EVENT" , "SEARCH TIME AFTER GLOBAL-TRIGGER TO " + this . primarySearchThreads. length + " PEERS: " + ( ( System . currentTimeMillis ( ) - start ) / 1000 ) + " seconds" ) ;
} else {
} else {
// no search since query is empty, user might have entered no data or filters have removed all search words
// no search since query is empty, user might have entered no data or filters have removed all search words
Log . logFine ( "SEARCH_EVENT" , "NO SEARCH STARTED DUE TO EMPTY SEARCH REQUEST." ) ;
Log . logFine ( "SEARCH_EVENT" , "NO SEARCH STARTED DUE TO EMPTY SEARCH REQUEST." ) ;
}
}
// start worker threads to fetch urls and snippets
// start worker threads to fetch urls and snippets
this . resultFetcher = new ResultFetcher ( loader , this . rankingProcess , query , this . peers , this . workTables , 3000 , deleteIfSnippetFail ) ;
this . resultFetcher = new ResultFetcher ( loader , this . rankingProcess , query , this . peers , this . workTables , 3000 , deleteIfSnippetFail ) ;
} else {
} else {
// do a local search
// do a local search
this . rankingProcess = new RankingProcess ( this . query , this . order , max_results_preparation ) ;
this . rankingProcess = new RankingProcess ( this . query , this . order , max_results_preparation ) ;
if ( generateAbstracts ) {
if ( generateAbstracts ) {
this . rankingProcess . run ( ) ; // this is not started concurrently here on purpose!
this . rankingProcess . run ( ) ; // this is not started concurrently here on purpose!
// compute index abstracts
// compute index abstracts
@ -177,17 +177,17 @@ public final class SearchEvent {
final ReferenceContainer < WordReference > container = entry . getValue ( ) ;
final ReferenceContainer < WordReference > container = entry . getValue ( ) ;
assert ( Base64Order . enhancedCoder . equal ( container . getTermHash ( ) , wordhash ) ) : "container.getTermHash() = " + ASCII . String ( container . getTermHash ( ) ) + ", wordhash = " + ASCII . String ( wordhash ) ;
assert ( Base64Order . enhancedCoder . equal ( container . getTermHash ( ) , wordhash ) ) : "container.getTermHash() = " + ASCII . String ( container . getTermHash ( ) ) + ", wordhash = " + ASCII . String ( wordhash ) ;
if ( container . size ( ) > maxcount ) {
if ( container . size ( ) > maxcount ) {
IAmaxcounthash = wordhash ;
this . IAmaxcounthash = wordhash ;
maxcount = container . size ( ) ;
maxcount = container . size ( ) ;
}
}
l = FlatWordPartitionScheme . std . dhtDistance ( wordhash , null , peers . mySeed ( ) ) ;
l = FlatWordPartitionScheme . std . dhtDistance ( wordhash , null , peers . mySeed ( ) ) ;
if ( l < mindhtdistance ) {
if ( l < mindhtdistance ) {
// calculate the word hash that is closest to our dht position
// calculate the word hash that is closest to our dht position
mindhtdistance = l ;
mindhtdistance = l ;
IAneardhthash = wordhash ;
this . IAneardhthash = wordhash ;
}
}
IACount. put ( wordhash , LargeNumberCache . valueOf ( container . size ( ) ) ) ;
this . IACount. put ( wordhash , LargeNumberCache . valueOf ( container . size ( ) ) ) ;
IAResults. put ( wordhash , WordReferenceFactory . compressIndex ( container , null , 1000 ) . toString ( ) ) ;
this . IAResults. put ( wordhash , WordReferenceFactory . compressIndex ( container , null , 1000 ) . toString ( ) ) ;
}
}
EventTracker . update ( EventTracker . EClass . SEARCH , new ProfilingGraph . searchEvent ( query . id ( true ) , Type . ABSTRACTS , "" , this . rankingProcess . searchContainerMap ( ) . size ( ) , System . currentTimeMillis ( ) - timer ) , false ) ;
EventTracker . update ( EventTracker . EClass . SEARCH , new ProfilingGraph . searchEvent ( query . id ( true ) , Type . ABSTRACTS , "" , this . rankingProcess . searchContainerMap ( ) . size ( ) , System . currentTimeMillis ( ) - timer ) , false ) ;
} else {
} else {
@ -196,63 +196,78 @@ public final class SearchEvent {
// before a reading process wants to get results from it
// before a reading process wants to get results from it
for ( int i = 0 ; i < 10 ; i + + ) {
for ( int i = 0 ; i < 10 ; i + + ) {
if ( ! this . rankingProcess . isAlive ( ) ) break ;
if ( ! this . rankingProcess . isAlive ( ) ) break ;
try { Thread . sleep ( 10 ) ; } catch ( InterruptedException e ) { }
try { Thread . sleep ( 10 ) ; } catch ( final InterruptedException e ) { }
}
}
// this will reduce the maximum waiting time until results are available to 100 milliseconds
// this will reduce the maximum waiting time until results are available to 100 milliseconds
// while we always get a good set of ranked data
// while we always get a good set of ranked data
}
}
// start worker threads to fetch urls and snippets
// start worker threads to fetch urls and snippets
this . resultFetcher = new ResultFetcher ( loader , this . rankingProcess , query , this . peers , this . workTables , 500 , deleteIfSnippetFail ) ;
this . resultFetcher = new ResultFetcher ( loader , this . rankingProcess , query , this . peers , this . workTables , 500 , deleteIfSnippetFail ) ;
}
}
// clean up events
// clean up events
SearchEventCache . cleanupEvents ( false ) ;
SearchEventCache . cleanupEvents ( false ) ;
EventTracker . update ( EventTracker . EClass . SEARCH , new ProfilingGraph . searchEvent ( query . id ( true ) , Type . CLEANUP , "" , 0 , 0 ) , false ) ;
EventTracker . update ( EventTracker . EClass . SEARCH , new ProfilingGraph . searchEvent ( query . id ( true ) , Type . CLEANUP , "" , 0 , 0 ) , false ) ;
// store this search to a cache so it can be re-used
// store this search to a cache so it can be re-used
if ( MemoryControl . available ( ) < 1024 * 1024 * 100 ) SearchEventCache . cleanupEvents ( true ) ;
if ( MemoryControl . available ( ) < 1024 * 1024 * 100 ) SearchEventCache . cleanupEvents ( true ) ;
SearchEventCache . put ( query . id ( false ) , this ) ;
SearchEventCache . put ( query . id ( false ) , this ) ;
}
}
public ReferenceOrder getOrder ( ) {
public ReferenceOrder getOrder ( ) {
return this . order ;
return this . order ;
}
}
public long getEventTime ( ) {
public long getEventTime ( ) {
return this . eventTime ;
return this . eventTime ;
}
}
public void resetEventTime ( ) {
public void resetEventTime ( ) {
this . eventTime = System . currentTimeMillis ( ) ;
this . eventTime = System . currentTimeMillis ( ) ;
}
}
public QueryParams getQuery ( ) {
public QueryParams getQuery ( ) {
return this . query ;
return this . query ;
}
}
public void setQuery ( QueryParams query ) {
public void setQuery ( final QueryParams query ) {
this . query = query ;
this . query = query ;
this . resultFetcher . query = query ;
this . resultFetcher . query = query ;
}
}
public void cleanup ( ) {
public void cleanup ( ) {
this . resultFetcher . setCleanupState ( ) ;
// stop all threads
// stop all threads
if ( primarySearchThreads ! = null ) {
if ( this . primarySearchThreads ! = null ) {
for ( final yacySearch search : this . primarySearchThreads ) {
for ( final yacySearch search : this . primarySearchThreads ) {
if ( search ! = null ) synchronized ( search ) {
if ( search ! = null ) synchronized ( search ) {
if ( search . isAlive ( ) ) search . interrupt ( ) ;
if ( search . isAlive ( ) ) search . interrupt ( ) ;
}
}
}
}
}
}
if ( secondarySearchThreads ! = null ) {
if ( this . secondarySearchThreads ! = null ) {
for ( final yacySearch search : this . secondarySearchThreads ) {
for ( final yacySearch search : this . secondarySearchThreads ) {
if ( search ! = null ) synchronized ( search ) {
if ( search ! = null ) synchronized ( search ) {
if ( search . isAlive ( ) ) search . interrupt ( ) ;
if ( search . isAlive ( ) ) search . interrupt ( ) ;
}
}
}
}
}
}
// call the worker threads and ask them to stop
for ( final Worker w : this . resultFetcher . workerThreads ) {
if ( w ! = null & & w . isAlive ( ) ) {
w . pleaseStop ( ) ;
w . interrupt ( ) ;
// the interrupt may occur during a MD5 computation which is resistant against interruption
// therefore set some more interrupts on the process
int ic = 10 ;
while ( ic - - > 0 & w . isAlive ( ) ) w . interrupt ( ) ;
}
}
// clear all data structures
// clear all data structures
if ( this . preselectedPeerHashes ! = null ) this . preselectedPeerHashes . clear ( ) ;
if ( this . preselectedPeerHashes ! = null ) this . preselectedPeerHashes . clear ( ) ;
if ( this . localSearchThread ! = null ) if ( this . localSearchThread . isAlive ( ) ) this . localSearchThread . interrupt ( ) ;
if ( this . localSearchThread ! = null ) if ( this . localSearchThread . isAlive ( ) ) this . localSearchThread . interrupt ( ) ;
@ -260,37 +275,37 @@ public final class SearchEvent {
if ( this . IAResults ! = null ) this . IAResults . clear ( ) ;
if ( this . IAResults ! = null ) this . IAResults . clear ( ) ;
if ( this . heuristics ! = null ) this . heuristics . clear ( ) ;
if ( this . heuristics ! = null ) this . heuristics . clear ( ) ;
}
}
public Iterator < Map . Entry < byte [ ] , String > > abstractsString ( ) {
public Iterator < Map . Entry < byte [ ] , String > > abstractsString ( ) {
return this . IAResults . entrySet ( ) . iterator ( ) ;
return this . IAResults . entrySet ( ) . iterator ( ) ;
}
}
public String abstractsString ( byte [ ] hash ) {
public String abstractsString ( final byte [ ] hash ) {
return this . IAResults . get ( hash ) ;
return this . IAResults . get ( hash ) ;
}
}
public Iterator < Map . Entry < byte [ ] , Integer > > abstractsCount ( ) {
public Iterator < Map . Entry < byte [ ] , Integer > > abstractsCount ( ) {
return this . IACount . entrySet ( ) . iterator ( ) ;
return this . IACount . entrySet ( ) . iterator ( ) ;
}
}
public int abstractsCount ( byte [ ] hash ) {
public int abstractsCount ( final byte [ ] hash ) {
Integer i = this . IACount . get ( hash ) ;
final Integer i = this . IACount . get ( hash ) ;
if ( i = = null ) return - 1 ;
if ( i = = null ) return - 1 ;
return i . intValue ( ) ;
return i . intValue ( ) ;
}
}
public byte [ ] getAbstractsMaxCountHash ( ) {
public byte [ ] getAbstractsMaxCountHash ( ) {
return this . IAmaxcounthash ;
return this . IAmaxcounthash ;
}
}
public byte [ ] getAbstractsNearDHTHash ( ) {
public byte [ ] getAbstractsNearDHTHash ( ) {
return this . IAneardhthash ;
return this . IAneardhthash ;
}
}
boolean anyRemoteSearchAlive ( ) {
boolean anyRemoteSearchAlive ( ) {
// check primary search threads
// check primary search threads
if ( ( this . primarySearchThreads ! = null ) & & ( this . primarySearchThreads . length ! = 0 ) ) {
if ( ( this . primarySearchThreads ! = null ) & & ( this . primarySearchThreads . length ! = 0 ) ) {
for ( final yacySearch primarySearchThread : primarySearchThreads) {
for ( final yacySearch primarySearchThread : this . primarySearchThreads) {
if ( ( primarySearchThread ! = null ) & & ( primarySearchThread . isAlive ( ) ) ) return true ;
if ( ( primarySearchThread ! = null ) & & ( primarySearchThread . isAlive ( ) ) ) return true ;
}
}
}
}
@ -302,15 +317,15 @@ public final class SearchEvent {
}
}
return false ;
return false ;
}
}
public yacySearch [ ] getPrimarySearchThreads ( ) {
public yacySearch [ ] getPrimarySearchThreads ( ) {
return primarySearchThreads;
return this . primarySearchThreads;
}
}
public yacySearch [ ] getSecondarySearchThreads ( ) {
public yacySearch [ ] getSecondarySearchThreads ( ) {
return secondarySearchThreads;
return this . secondarySearchThreads;
}
}
public RankingProcess getRankingResult ( ) {
public RankingProcess getRankingResult ( ) {
return this . rankingProcess ;
return this . rankingProcess ;
}
}
@ -318,52 +333,52 @@ public final class SearchEvent {
public ScoreMap < String > getNamespaceNavigator ( ) {
public ScoreMap < String > getNamespaceNavigator ( ) {
return this . rankingProcess . getNamespaceNavigator ( ) ;
return this . rankingProcess . getNamespaceNavigator ( ) ;
}
}
public ScoreMap < String > getHostNavigator ( ) {
public ScoreMap < String > getHostNavigator ( ) {
return this . rankingProcess . getHostNavigator ( ) ;
return this . rankingProcess . getHostNavigator ( ) ;
}
}
public ScoreMap < String > getTopicNavigator ( int count ) {
public ScoreMap < String > getTopicNavigator ( final int count ) {
// returns a set of words that are computed as toplist
// returns a set of words that are computed as toplist
return this . rankingProcess . getTopicNavigator ( count ) ;
return this . rankingProcess . getTopicNavigator ( count ) ;
}
}
public ScoreMap < String > getAuthorNavigator ( ) {
public ScoreMap < String > getAuthorNavigator ( ) {
// returns a list of authors so far seen on result set
// returns a list of authors so far seen on result set
return this . rankingProcess . getAuthorNavigator ( ) ;
return this . rankingProcess . getAuthorNavigator ( ) ;
}
}
public void addHeuristic ( byte [ ] urlhash , String heuristicName , boolean redundant ) {
public void addHeuristic ( final byte [ ] urlhash , final String heuristicName , final boolean redundant ) {
synchronized ( this . heuristics ) {
synchronized ( this . heuristics ) {
this . heuristics . put ( urlhash , new HeuristicResult ( urlhash , heuristicName , redundant ) ) ;
this . heuristics . put ( urlhash , new HeuristicResult ( urlhash , heuristicName , redundant ) ) ;
}
}
}
}
public HeuristicResult getHeuristic ( byte [ ] urlhash ) {
public HeuristicResult getHeuristic ( final byte [ ] urlhash ) {
synchronized ( this . heuristics ) {
synchronized ( this . heuristics ) {
return this . heuristics . get ( urlhash ) ;
return this . heuristics . get ( urlhash ) ;
}
}
}
}
public ResultEntry oneResult ( final int item , long timeout ) {
public ResultEntry oneResult ( final int item , final long timeout ) {
if ( ( query. domType = = QueryParams . SEARCHDOM_GLOBALDHT ) | |
if ( ( this . query. domType = = QueryParams . SEARCHDOM_GLOBALDHT ) | |
( query. domType = = QueryParams . SEARCHDOM_CLUSTERALL ) ) {
( this . query. domType = = QueryParams . SEARCHDOM_CLUSTERALL ) ) {
// this is a search using remote search threads. Also the local
// this is a search using remote search threads. Also the local
// search thread is started as background process
// search thread is started as background process
if ( ( localSearchThread ! = null ) & & ( localSearchThread. isAlive ( ) ) ) {
if ( ( this . localSearchThread ! = null ) & & ( this . localSearchThread. isAlive ( ) ) ) {
// in case that the local search takes longer than some other
// in case that the local search takes longer than some other
// remote search requests, wait that the local process terminates first
// remote search requests, wait that the local process terminates first
try { localSearchThread. join ( ) ; } catch ( InterruptedException e ) { }
try { this . localSearchThread. join ( ) ; } catch ( final InterruptedException e ) { }
}
}
}
}
return this . resultFetcher . oneResult ( item , timeout ) ;
return this . resultFetcher . oneResult ( item , timeout ) ;
}
}
boolean secondarySearchStartet = false ;
boolean secondarySearchStartet = false ;
public static class HeuristicResult /*implements Comparable<HeuristicResult>*/ {
public static class HeuristicResult /*implements Comparable<HeuristicResult>*/ {
public final byte [ ] urlhash ; public final String heuristicName ; public final boolean redundant ;
public final byte [ ] urlhash ; public final String heuristicName ; public final boolean redundant ;
public HeuristicResult ( byte [ ] urlhash , String heuristicName , boolean redundant ) {
public HeuristicResult ( final byte [ ] urlhash , final String heuristicName , final boolean redundant ) {
this . urlhash = urlhash ; this . heuristicName = heuristicName ; this . redundant = redundant ;
this . urlhash = urlhash ; this . heuristicName = heuristicName ; this . redundant = redundant ;
} / *
} / *
public int compareTo ( HeuristicResult o ) {
public int compareTo ( HeuristicResult o ) {
@ -376,33 +391,33 @@ public final class SearchEvent {
return Base64Order . enhancedCoder . equal ( this . urlhash , ( ( HeuristicResult ) o ) . urlhash ) ;
return Base64Order . enhancedCoder . equal ( this . urlhash , ( ( HeuristicResult ) o ) . urlhash ) ;
} * /
} * /
}
}
public class SecondarySearchSuperviser extends Thread {
public class SecondarySearchSuperviser extends Thread {
// cache for index abstracts; word:TreeMap mapping where the embedded TreeMap is a urlhash:peerlist relation
// cache for index abstracts; word:TreeMap mapping where the embedded TreeMap is a urlhash:peerlist relation
// this relation contains the information where specific urls can be found in specific peers
// this relation contains the information where specific urls can be found in specific peers
SortedMap < String , SortedMap < String , StringBuilder > > abstractsCache ;
SortedMap < String , SortedMap < String , StringBuilder > > abstractsCache ;
SortedSet < String > checkedPeers ;
SortedSet < String > checkedPeers ;
Semaphore trigger ;
Semaphore trigger ;
public SecondarySearchSuperviser ( ) {
public SecondarySearchSuperviser ( ) {
this . abstractsCache = new TreeMap < String , SortedMap < String , StringBuilder > > ( ) ;
this . abstractsCache = new TreeMap < String , SortedMap < String , StringBuilder > > ( ) ;
this . checkedPeers = new TreeSet < String > ( ) ;
this . checkedPeers = new TreeSet < String > ( ) ;
this . trigger = new Semaphore ( 0 ) ;
this . trigger = new Semaphore ( 0 ) ;
}
}
/ * *
/ * *
* add a single abstract to the existing set of abstracts
* add a single abstract to the existing set of abstracts
* @param wordhash
* @param wordhash
* @param singleAbstract // a mapping from url-hashes to a string of peer-hashes
* @param singleAbstract // a mapping from url-hashes to a string of peer-hashes
* /
* /
public void addAbstract ( String wordhash , final TreeMap < String , StringBuilder > singleAbstract ) {
public void addAbstract ( final String wordhash , final TreeMap < String , StringBuilder > singleAbstract ) {
final SortedMap < String , StringBuilder > oldAbstract ;
final SortedMap < String , StringBuilder > oldAbstract ;
synchronized ( abstractsCache) {
synchronized ( this . abstractsCache) {
oldAbstract = abstractsCache. get ( wordhash ) ;
oldAbstract = this . abstractsCache. get ( wordhash ) ;
if ( oldAbstract = = null ) {
if ( oldAbstract = = null ) {
// new abstracts in the cache
// new abstracts in the cache
abstractsCache. put ( wordhash , singleAbstract ) ;
this . abstractsCache. put ( wordhash , singleAbstract ) ;
return ;
return ;
}
}
}
}
@ -421,11 +436,11 @@ public final class SearchEvent {
} . start ( ) ;
} . start ( ) ;
// abstractsCache.put(wordhash, oldAbstract); // put not necessary since it is sufficient to just change the value content (it stays assigned)
// abstractsCache.put(wordhash, oldAbstract); // put not necessary since it is sufficient to just change the value content (it stays assigned)
}
}
public void commitAbstract ( ) {
public void commitAbstract ( ) {
this . trigger . release ( ) ;
this . trigger . release ( ) ;
}
}
private String wordsFromPeer ( final String peerhash , final StringBuilder urls ) {
private String wordsFromPeer ( final String peerhash , final StringBuilder urls ) {
Map . Entry < String , SortedMap < String , StringBuilder > > entry ;
Map . Entry < String , SortedMap < String , StringBuilder > > entry ;
String word , url , wordlist = "" ;
String word , url , wordlist = "" ;
@ -454,7 +469,7 @@ public final class SearchEvent {
}
}
return wordlist ;
return wordlist ;
}
}
@Override
@Override
public void run ( ) {
public void run ( ) {
try {
try {
@ -465,16 +480,16 @@ public final class SearchEvent {
t + + ;
t + + ;
if ( t > 10 ) break ;
if ( t > 10 ) break ;
}
}
} catch ( InterruptedException e ) {
} catch ( final InterruptedException e ) {
// the thread was interrupted
// the thread was interrupted
// do nohing
// do nohing
}
}
// the time-out was reached
// the time-out was reached
}
}
private void prepareSecondarySearch ( ) {
private void prepareSecondarySearch ( ) {
if ( abstractsCache = = null | | abstractsCache. size ( ) ! = query. queryHashes . size ( ) ) return ; // secondary search not possible (yet)
if ( this . abstractsCache = = null | | this . abstractsCache. size ( ) ! = SearchEvent. this . query. queryHashes . size ( ) ) return ; // secondary search not possible (yet)
// catch up index abstracts and join them; then call peers again to submit their urls
// catch up index abstracts and join them; then call peers again to submit their urls
/ *
/ *
System . out . println ( "DEBUG-INDEXABSTRACT: " + abstractsCache . size ( ) + " word references caught, " + query . queryHashes . size ( ) + " needed" ) ;
System . out . println ( "DEBUG-INDEXABSTRACT: " + abstractsCache . size ( ) + " word references caught, " + query . queryHashes . size ( ) + " needed" ) ;
@ -482,23 +497,23 @@ public final class SearchEvent {
System . out . println ( "DEBUG-INDEXABSTRACT: hash " + entry . getKey ( ) + ": " + ( ( query . queryHashes . has ( entry . getKey ( ) . getBytes ( ) ) ? "NEEDED" : "NOT NEEDED" ) + "; " + entry . getValue ( ) . size ( ) + " entries" ) ) ;
System . out . println ( "DEBUG-INDEXABSTRACT: hash " + entry . getKey ( ) + ": " + ( ( query . queryHashes . has ( entry . getKey ( ) . getBytes ( ) ) ? "NEEDED" : "NOT NEEDED" ) + "; " + entry . getValue ( ) . size ( ) + " entries" ) ) ;
}
}
* /
* /
// find out if there are enough references for all words that are searched
// find out if there are enough references for all words that are searched
if ( abstractsCache. size ( ) ! = query. queryHashes . size ( ) ) return ;
if ( this . abstractsCache. size ( ) ! = SearchEvent. this . query. queryHashes . size ( ) ) return ;
// join all the urlhash:peerlist relations: the resulting map has values with a combined peer-list list
// join all the urlhash:peerlist relations: the resulting map has values with a combined peer-list list
final SortedMap < String , StringBuilder > abstractJoin = SetTools . joinConstructive ( abstractsCache. values ( ) , true ) ;
final SortedMap < String , StringBuilder > abstractJoin = SetTools . joinConstructive ( this . abstractsCache. values ( ) , true ) ;
if ( abstractJoin . isEmpty ( ) ) return ;
if ( abstractJoin . isEmpty ( ) ) return ;
// the join result is now a urlhash: peer-list relation
// the join result is now a urlhash: peer-list relation
// generate a list of peers that have the urls for the joined search result
// generate a list of peers that have the urls for the joined search result
final SortedMap < String , StringBuilder > secondarySearchURLs = new TreeMap < String , StringBuilder > ( ) ; // a (peerhash:urlhash-liststring) mapping
final SortedMap < String , StringBuilder > secondarySearchURLs = new TreeMap < String , StringBuilder > ( ) ; // a (peerhash:urlhash-liststring) mapping
String url , peer ;
String url , peer ;
StringBuilder urls , peerlist ;
StringBuilder urls , peerlist ;
final String mypeerhash = peers. mySeed ( ) . hash ;
final String mypeerhash = SearchEvent. this . peers. mySeed ( ) . hash ;
boolean mypeerinvolved = false ;
boolean mypeerinvolved = false ;
int mypeercount ;
int mypeercount ;
for ( Map . Entry < String , StringBuilder > entry : abstractJoin . entrySet ( ) ) {
for ( final Map . Entry < String , StringBuilder > entry : abstractJoin . entrySet ( ) ) {
url = entry . getKey ( ) ;
url = entry . getKey ( ) ;
peerlist = entry . getValue ( ) ;
peerlist = entry . getValue ( ) ;
//System.out.println("DEBUG-INDEXABSTRACT: url " + url + ": from peers " + peerlist);
//System.out.println("DEBUG-INDEXABSTRACT: url " + url + ": from peers " + peerlist);
@ -519,33 +534,39 @@ public final class SearchEvent {
}
}
if ( mypeercount = = 1 ) mypeerinvolved = true ;
if ( mypeercount = = 1 ) mypeerinvolved = true ;
}
}
// compute words for secondary search and start the secondary searches
// compute words for secondary search and start the secondary searches
String words ;
String words ;
secondarySearchThreads = new yacySearch [ ( mypeerinvolved ) ? secondarySearchURLs . size ( ) - 1 : secondarySearchURLs . size ( ) ] ;
SearchEvent. this . secondarySearchThreads = new yacySearch [ ( mypeerinvolved ) ? secondarySearchURLs . size ( ) - 1 : secondarySearchURLs . size ( ) ] ;
int c = 0 ;
int c = 0 ;
for ( Map . Entry < String , StringBuilder > entry : secondarySearchURLs . entrySet ( ) ) {
for ( final Map . Entry < String , StringBuilder > entry : secondarySearchURLs . entrySet ( ) ) {
peer = entry . getKey ( ) ;
peer = entry . getKey ( ) ;
if ( peer . equals ( mypeerhash ) ) continue ; // we don't need to ask ourself
if ( peer . equals ( mypeerhash ) ) continue ; // we don't need to ask ourself
if ( checkedPeers. contains ( peer ) ) continue ; // do not ask a peer again
if ( this . checkedPeers. contains ( peer ) ) continue ; // do not ask a peer again
urls = entry . getValue ( ) ;
urls = entry . getValue ( ) ;
words = wordsFromPeer ( peer , urls ) ;
words = wordsFromPeer ( peer , urls ) ;
if ( words . length ( ) = = 0 ) continue ; // ???
if ( words . length ( ) = = 0 ) continue ; // ???
assert words . length ( ) > = 12 : "words = " + words ;
assert words . length ( ) > = 12 : "words = " + words ;
//System.out.println("DEBUG-INDEXABSTRACT ***: peer " + peer + " has urls: " + urls + " from words: " + words);
//System.out.println("DEBUG-INDEXABSTRACT ***: peer " + peer + " has urls: " + urls + " from words: " + words);
rankingProcess. moreFeeders ( 1 ) ;
SearchEvent. this . rankingProcess. moreFeeders ( 1 ) ;
checkedPeers. add ( peer ) ;
this . checkedPeers. add ( peer ) ;
secondarySearchThreads[ c + + ] = yacySearch . secondaryRemoteSearch (
SearchEvent. this . secondarySearchThreads[ c + + ] = yacySearch . secondaryRemoteSearch (
words , urls . toString ( ) , 6000 , query. getSegment ( ) , peers, rankingProcess, peer , Switchboard . urlBlacklist ,
words , urls . toString ( ) , 6000 , SearchEvent. this . query. getSegment ( ) , SearchEvent. this . peers, SearchEvent. this . rankingProcess, peer , Switchboard . urlBlacklist ,
query. ranking , query. constraint , preselectedPeerHashes) ;
SearchEvent. this . query. ranking , SearchEvent. this . query. constraint , SearchEvent. this . preselectedPeerHashes) ;
}
}
}
}
}
}
public ResultFetcher result ( ) {
public ResultFetcher result ( ) {
return this . resultFetcher ;
return this . resultFetcher ;
}
}
public boolean workerAlive ( ) {
if ( this . resultFetcher = = null | | this . resultFetcher . workerThreads = = null ) return false ;
for ( final Worker w : this . resultFetcher . workerThreads ) if ( w ! = null & & w . isAlive ( ) ) return true ;
return false ;
}
}
}