package de.anomic.plasma ;
import java.util.HashMap ;
import java.util.Iterator ;
import java.util.Map ;
import java.util.TreeMap ;
public final class plasmaSearchEvent {
public static plasmaSearchEvent lastEvent = null ;
private static HashMap lastEvents = new HashMap ( ) ; // a cache for objects from this class: re-use old search requests
public static final long eventLifetime = 600000 ; // the time an event will stay in the cache, 10 Minutes
private long eventTime ;
private plasmaSearchQuery query ;
private plasmaSearchRankingProfile ranking ;
private plasmaWordIndex wordIndex ;
private indexContainer rcContainers ; // cache for results
private indexContainer rcLocal ; // cache for local results
private indexContainer rcGlobal ; // cache for global results
private Map rcAbstracts ; // cache for index abstracts; word:TreeMap mapping where the embedded TreeMap is a urlhash:peerlist relation
private plasmaSearchProcessing profileLocal , profileGlobal ;
private yacySearch [ ] primarySearchThreads , secondarySearchThreads ;
private TreeMap preselectedPeerHashes ;
private int localcount , globalcount ;
private indexContainer sortedResults ;
private int lastglobal ;
private int filteredCount ;
public plasmaSearchEvent ( plasmaSearchQuery query ,
private plasmaSearchEvent ( plasmaSearchQuery query ,
plasmaSearchRankingProfile ranking ,
plasmaSearchProcessing localTiming ,
plasmaSearchProcessing remoteTiming ,
plasmaWordIndex wordIndex ,
TreeMap preselectedPeerHashes ) {
this . eventTime = System . currentTimeMillis ( ) ; // for lifetime check
this . wordIndex = wordIndex ;
this . query = query ;
this . ranking = ranking ;
this . rcContainers = plasmaWordIndex . emptyContainer ( null ) ;
this . rcLocal = null ;
this . rcGlobal = plasmaWordIndex . emptyContainer ( null , 0 ) ; ;
this . rcAbstracts = ( query . queryHashes . size ( ) > 1 ) ? new TreeMap ( ) : null ; // generate abstracts only for combined searches
this . profileLocal = localTiming ;
this . profileGlobal = remoteTiming ;
this . preselectedPeerHashes = preselectedPeerHashes ;
this . localcount = 0 ;
this . globalcount = 0 ;
this . sortedResults = null ;
this . lastglobal = 0 ;
long start = System . currentTimeMillis ( ) ;
if ( ( query . domType = = plasmaSearchQuery . SEARCHDOM_GLOBALDHT ) | |
( query . domType = = plasmaSearchQuery . SEARCHDOM_CLUSTERALL ) ) {
int fetchpeers = ( int ) ( query . maximumTime / 500L ) ; // number of target peers; means 10 peers in 10 seconds
if ( fetchpeers > 50 ) fetchpeers = 50 ;
if ( fetchpeers < 30 ) fetchpeers = 30 ;
// do a global search
// the result of the fetch is then in the rcGlobal
serverLog . logFine ( "SEARCH_EVENT" , "STARTING " + fetchpeers + " THREADS TO CATCH EACH " + profileGlobal . getTargetCount ( plasmaSearchProcessing . PROCESS_POSTSORT ) + " URLs WITHIN " + ( profileGlobal . duetime ( ) / 1000 ) + " SECONDS" ) ;
long secondaryTimeout = System . currentTimeMillis ( ) + profileGlobal . duetime ( ) / 3 * 2 ;
long primaryTimeout = System . currentTimeMillis ( ) + profileGlobal . duetime ( ) ;
primarySearchThreads = yacySearch . primaryRemoteSearches (
plasmaSearchQuery . hashSet2hashString ( query . queryHashes ) ,
plasmaSearchQuery . hashSet2hashString ( query . excludeHashes ) ,
"" ,
query . prefer ,
query . urlMask ,
query . maxDistance ,
wordIndex ,
rcGlobal ,
rcAbstracts ,
fetchpeers ,
plasmaSwitchboard . urlBlacklist ,
profileGlobal ,
ranking ,
query . constraint ,
( query . domType = = plasmaSearchQuery . SEARCHDOM_GLOBALDHT ) ? null : preselectedPeerHashes ) ;
// meanwhile do a local search
Map [ ] searchContainerMaps = profileLocal . localSearchContainers ( query , wordIndex , null ) ;
// use the search containers to fill up rcAbstracts locally
if ( ( rcAbstracts ! = null ) & & ( searchContainerMap ! = null ) ) {
Iterator i , ci = searchContainerMap . entrySet ( ) . iterator ( ) ;
Map . Entry entry ;
String wordhash ;
indexContainer container ;
TreeMap singleAbstract ;
String mypeerhash = yacyCore . seedDB . mySeed . hash ;
while ( ci . hasNext ( ) ) {
entry = ( Map . Entry ) ci . next ( ) ;
wordhash = ( String ) entry . getKey ( ) ;
container = ( indexContainer ) entry . getValue ( ) ;
// collect all urlhashes from the container
synchronized ( rcAbstracts ) {
singleAbstract = ( TreeMap ) rcAbstracts . get ( wordhash ) ; // a mapping from url-hashes to a string of peer-hashes
if ( singleAbstract = = null ) singleAbstract = new TreeMap ( ) ;
i = container . entries ( ) ;
while ( i . hasNext ( ) ) singleAbstract . put ( ( ( indexEntry ) i . next ( ) ) . urlHash ( ) , mypeerhash ) ;
rcAbstracts . put ( wordhash , singleAbstract ) ;
// join and exlcude the local result
this . rcLocal =
( searchContainerMaps = = null ) ?
plasmaWordIndex . emptyContainer ( null , 0 ) :
profileLocal . localSearchJoinExclude (
searchContainerMaps [ 0 ] . values ( ) ,
searchContainerMaps [ 1 ] . values ( ) ,
( query . queryHashes . size ( ) = = 0 ) ?
0 :
profileLocal . getTargetTime ( plasmaSearchProcessing . PROCESS_JOIN ) * query . queryHashes . size ( ) / ( query . queryHashes . size ( ) + query . excludeHashes . size ( ) ) ,
query . maxDistance ) ;
// sort the local containers and truncate it to a limited count,
// so following sortings together with the global results will be fast
localcount = rcLocal . size ( ) ;
plasmaSearchPreOrder firstsort = new plasmaSearchPreOrder ( query , profileLocal , ranking , rcLocal ) ;
rcLocal = firstsort . strippedContainer ( 200 ) ;
// wait some time to retrieve index abstracts from primary search
while ( System . currentTimeMillis ( ) < secondaryTimeout ) {
if ( yacySearch . remainingWaiting ( primarySearchThreads ) = = 0 ) break ; // all threads have finished
try { Thread . sleep ( 100 ) ; } catch ( InterruptedException e ) { }
// evaluate index abstracts and start a secondary search
if ( rcAbstracts ! = null ) prepareSecondarySearch ( ) ;
// catch up global results:
// wait until primary timeout passed
while ( System . currentTimeMillis ( ) < primaryTimeout ) {
if ( ( yacySearch . remainingWaiting ( primarySearchThreads ) = = 0 ) & &
( ( secondarySearchThreads = = null ) | | ( yacySearch . remainingWaiting ( secondarySearchThreads ) = = 0 ) ) ) break ; // all threads have finished
try { Thread . sleep ( 100 ) ; } catch ( InterruptedException e ) { }
// finished searching
serverLog . logFine ( "SEARCH_EVENT" , "SEARCH TIME AFTER GLOBAL-TRIGGER TO " + primarySearchThreads . length + " PEERS: " + ( ( System . currentTimeMillis ( ) - start ) / 1000 ) + " seconds" ) ;
} else {
Map [ ] searchContainerMaps = profileLocal . localSearchContainers ( query , wordIndex , null ) ;
rcLocal =
( searchContainerMaps = = null ) ?
plasmaWordIndex . emptyContainer ( null , 0 ) :
profileLocal . localSearchJoinExclude (
searchContainerMaps [ 0 ] . values ( ) ,
searchContainerMaps [ 1 ] . values ( ) ,
( query . queryHashes . size ( ) = = 0 ) ?
0 :
profileLocal . getTargetTime ( plasmaSearchProcessing . PROCESS_JOIN ) * query . queryHashes . size ( ) / ( query . queryHashes . size ( ) + query . excludeHashes . size ( ) ) ,
query . maxDistance ) ;
this . localcount = rcLocal . size ( ) ;
// log the event
serverLog . logFine ( "SEARCH_EVENT" , "SEARCHRESULT: " + profileLocal . reportToString ( ) ) ;
// set link for statistic
lastEvent = this ;
// remove old events in the event cache
Iterator i = lastEvents . entrySet ( ) . iterator ( ) ;
while ( i . hasNext ( ) ) {
if ( ( ( plasmaSearchEvent ) ( ( Map . Entry ) i . next ( ) ) . getValue ( ) ) . eventTime + eventLifetime < System . currentTimeMillis ( ) ) i . remove ( ) ;
// store this search to a cache so it can be re-used
lastEvents . put ( query . id ( ) , this ) ;
public plasmaSearchQuery getQuery ( ) {
return this . globalcount ;
public plasmaSearchPreOrder search ( ) {
public static plasmaSearchEvent getEvent ( plasmaSearchQuery query ,
// combine all threads
plasmaSearchRankingProfile ranking ,
plasmaSearchProcessing localTiming ,
long start = System . currentTimeMillis ( ) ;
plasmaSearchProcessing remoteTiming ,
plasmaSearchPreOrder pre ;
plasmaWordIndex wordIndex ,
if ( ( query . domType = = plasmaSearchQuery . SEARCHDOM_GLOBALDHT ) | |
TreeMap preselectedPeerHashes ) {
( query . domType = = plasmaSearchQuery . SEARCHDOM_CLUSTERALL ) ) {
plasmaSearchEvent event = ( plasmaSearchEvent ) lastEvents . get ( query . id ( ) ) ;
int fetchpeers = ( int ) ( query . maximumTime / 500L ) ; // number of target peers; means 10 peers in 10 seconds
if ( event = = null ) {
if ( fetchpeers > 50 ) fetchpeers = 50 ;
event = new plasmaSearchEvent ( query , ranking , localTiming , remoteTiming , wordIndex , preselectedPeerHashes ) ;
if ( fetchpeers < 30 ) fetchpeers = 30 ;
} else {
//re-new the event time for this event, so it is not deleted next time too early
// do a global search
event . eventTime = System . currentTimeMillis ( ) ;
// the result of the fetch is then in the rcGlobal
serverLog . logFine ( "SEARCH_EVENT" , "STARTING " + fetchpeers + " THREADS TO CATCH EACH " + profileGlobal . getTargetCount ( plasmaSearchProcessing . PROCESS_POSTSORT ) + " URLs WITHIN " + ( profileGlobal . duetime ( ) / 1000 ) + " SECONDS" ) ;
return event ;
long secondaryTimeout = System . currentTimeMillis ( ) + profileGlobal . duetime ( ) / 3 * 2 ;
long primaryTimeout = System . currentTimeMillis ( ) + profileGlobal . duetime ( ) ;
primarySearchThreads = yacySearch . primaryRemoteSearches (
plasmaSearchQuery . hashSet2hashString ( query . queryHashes ) ,
plasmaSearchQuery . hashSet2hashString ( query . excludeHashes ) ,
"" ,
query . prefer ,
query . urlMask ,
query . maxDistance ,
wordIndex ,
rcContainers ,
rcAbstracts ,
fetchpeers ,
plasmaSwitchboard . urlBlacklist ,
profileGlobal ,
ranking ,
query . constraint ,
( query . domType = = plasmaSearchQuery . SEARCHDOM_GLOBALDHT ) ? null : preselectedPeerHashes ) ;
// meanwhile do a local search
Map [ ] searchContainerMaps = profileLocal . localSearchContainers ( query , wordIndex , null ) ;
// use the search containers to fill up rcAbstracts locally
if ( ( rcAbstracts ! = null ) & & ( searchContainerMap ! = null ) ) {
Iterator i , ci = searchContainerMap . entrySet ( ) . iterator ( ) ;
Map . Entry entry ;
String wordhash ;
indexContainer container ;
TreeMap singleAbstract ;
String mypeerhash = yacyCore . seedDB . mySeed . hash ;
while ( ci . hasNext ( ) ) {
entry = ( Map . Entry ) ci . next ( ) ;
wordhash = ( String ) entry . getKey ( ) ;
container = ( indexContainer ) entry . getValue ( ) ;
// collect all urlhashes from the container
synchronized ( rcAbstracts ) {
singleAbstract = ( TreeMap ) rcAbstracts . get ( wordhash ) ; // a mapping from url-hashes to a string of peer-hashes
if ( singleAbstract = = null ) singleAbstract = new TreeMap ( ) ;
i = container . entries ( ) ;
while ( i . hasNext ( ) ) singleAbstract . put ( ( ( indexEntry ) i . next ( ) ) . urlHash ( ) , mypeerhash ) ;
rcAbstracts . put ( wordhash , singleAbstract ) ;
// try to pre-fetch some LURLs if there is enough time
indexContainer rcLocal =
( searchContainerMaps = = null ) ?
plasmaWordIndex . emptyContainer ( null ) :
profileLocal . localSearchJoinExclude (
searchContainerMaps [ 0 ] . values ( ) ,
searchContainerMaps [ 1 ] . values ( ) ,
( query . queryHashes . size ( ) = = 0 ) ?
0 :
profileLocal . getTargetTime ( plasmaSearchProcessing . PROCESS_JOIN ) * query . queryHashes . size ( ) / ( query . queryHashes . size ( ) + query . excludeHashes . size ( ) ) ,
query . maxDistance ) ;
// this is temporary debugging code to learn that the index abstracts are fetched correctly
while ( System . currentTimeMillis ( ) < secondaryTimeout ) {
if ( yacySearch . remainingWaiting ( primarySearchThreads ) = = 0 ) break ; // all threads have finished
try { Thread . sleep ( 100 ) ; } catch ( InterruptedException e ) { }
// evaluate index abstracts and start a secondary search
if ( rcAbstracts ! = null ) prepareSecondarySearch ( ) ;
// catch up global results:
// wait until primary timeout passed
while ( System . currentTimeMillis ( ) < primaryTimeout ) {
if ( ( yacySearch . remainingWaiting ( primarySearchThreads ) = = 0 ) & &
( ( secondarySearchThreads = = null ) | | ( yacySearch . remainingWaiting ( secondarySearchThreads ) = = 0 ) ) ) break ; // all threads have finished
try { Thread . sleep ( 100 ) ; } catch ( InterruptedException e ) { }
// finished searching
serverLog . logFine ( "SEARCH_EVENT" , "SEARCH TIME AFTER GLOBAL-TRIGGER TO " + primarySearchThreads . length + " PEERS: " + ( ( System . currentTimeMillis ( ) - start ) / 1000 ) + " seconds" ) ;
// combine the result and order
public indexContainer search ( ) {
indexContainer searchResult = plasmaWordIndex . emptyContainer ( null ) ;
// combine the local and global (if any) result and order
if ( ( rcGlobal ! = null ) & & ( rcGlobal . size ( ) > 0 ) ) {
globalcount = rcGlobal . size ( ) ;
if ( ( this . sortedResults = = null ) | | ( this . lastglobal ! = globalcount ) ) {
indexContainer searchResult = plasmaWordIndex . emptyContainer ( null , rcLocal . size ( ) + rcGlobal . size ( ) ) ;
searchResult . addAllUnique ( rcLocal ) ;
searchResult . addAllUnique ( rcLocal ) ;
searchResult . addAllUnique ( rcContainers ) ;
searchResult . addAllUnique ( rcGlobal ) ;
searchResult . sort ( ) ;
searchResult . sort ( ) ;
searchResult . uniq ( 1000 ) ;
searchResult . uniq ( 100 ) ;
localcount = rcLocal . size ( ) ;
lastglobal = globalcount ;
globalcount = rcContainers . size ( ) ;
plasmaSearchPreOrder pre = new plasmaSearchPreOrder ( query , profileLocal , ranking , searchResult ) ;
pre = new plasmaSearchPreOrder ( query , profileLocal , ranking , searchResult ) ;
this . filteredCount = pre . filteredCount ( ) ;
} else {
this . sortedResults = pre . strippedContainer ( 200 ) ;
Map [ ] searchContainerMaps = profileLocal . localSearchContainers ( query , wordIndex , null ) ;
indexContainer rcLocal =
( searchContainerMaps = = null ) ?
plasmaWordIndex . emptyContainer ( null ) :
profileLocal . localSearchJoinExclude (
searchContainerMaps [ 0 ] . values ( ) ,
searchContainerMaps [ 1 ] . values ( ) ,
( query . queryHashes . size ( ) = = 0 ) ?
0 :
profileLocal . getTargetTime ( plasmaSearchProcessing . PROCESS_JOIN ) * query . queryHashes . size ( ) / ( query . queryHashes . size ( ) + query . excludeHashes . size ( ) ) ,
query . maxDistance ) ;
this . localcount = rcLocal . size ( ) ;
pre = new plasmaSearchPreOrder ( query , profileLocal , ranking , rcLocal ) ;
} else {
if ( this . sortedResults = = null ) {
plasmaSearchPreOrder pre = new plasmaSearchPreOrder ( query , profileLocal , ranking , rcLocal ) ;
this . filteredCount = pre . filteredCount ( ) ;
this . sortedResults = pre . strippedContainer ( 200 ) ;
// log the event
return this . sortedResults ;
serverLog . logFine ( "SEARCH_EVENT" , "SEARCHRESULT: " + profileLocal . reportToString ( ) ) ;
// prepare values for statistics
lastEvent = this ;
// return search result
public int filteredCount ( ) {
return pre ;
return this . filteredCount ;
private void prepareSecondarySearch ( ) {
private void prepareSecondarySearch ( ) {
System . out . println ( "DEBUG-INDEXABSTRACT ***: peer " + peer + " has urls: " + urls ) ;
System . out . println ( "DEBUG-INDEXABSTRACT ***: peer " + peer + " from words: " + words ) ;
secondarySearchThreads [ c + + ] = yacySearch . secondaryRemoteSearch (
secondarySearchThreads [ c + + ] = yacySearch . secondaryRemoteSearch (
words , "" , urls , wordIndex , rc Containers , peer , plasmaSwitchboard . urlBlacklist ,
words , "" , urls , wordIndex , rc Global , peer , plasmaSwitchboard . urlBlacklist ,
profileGlobal , ranking , query . constraint , preselectedPeerHashes ) ;
profileGlobal , ranking , query . constraint , preselectedPeerHashes ) ;