@ -52,6 +52,7 @@ import java.util.TreeMap;
import de.anomic.kelondro.kelondroException ;
import de.anomic.kelondro.kelondroMSetTools ;
import de.anomic.server.logging.serverLog ;
import de.anomic.yacy.yacyCore ;
import de.anomic.yacy.yacySearch ;
import de.anomic.index.indexContainer ;
import de.anomic.index.indexEntry ;
@ -132,10 +133,7 @@ public final class plasmaSearchEvent extends Thread implements Runnable {
// do a global search
// the result of the fetch is then in the rcGlobal
if ( fetchpeers < 10 ) fetchpeers = 10 ;
log . logFine ( "STARTING " + fetchpeers + " THREADS TO CATCH EACH " + profileGlobal . getTargetCount ( plasmaSearchTimingProfile . PROCESS_POSTSORT ) + " URLs WITHIN " + ( profileGlobal . duetime ( ) / 1000 ) + " SECONDS" ) ;
long secondaryTimeout = System . currentTimeMillis ( ) + profileGlobal . duetime ( ) / 2 ;
long primaryTimeout = System . currentTimeMillis ( ) + profileGlobal . duetime ( ) ;
primarySearchThreads = yacySearch . primaryRemoteSearches ( plasmaSearchQuery . hashSet2hashString ( query . queryHashes ) , "" ,
@ -144,8 +142,33 @@ public final class plasmaSearchEvent extends Thread implements Runnable {
// meanwhile do a local search
Map searchContainerMap = localSearchContainers ( null ) ;
// use the search containers to fill up rcAbstracts locally
if ( searchContainerMap ! = null ) {
Iterator i , ci = searchContainerMap . entrySet ( ) . iterator ( ) ;
Map . Entry entry ;
String wordhash ;
indexContainer container ;
TreeMap singleAbstract ;
String mypeerhash = yacyCore . seedDB . mySeed . hash ;
while ( ci . hasNext ( ) ) {
entry = ( Map . Entry ) ci . next ( ) ;
wordhash = ( String ) entry . getKey ( ) ;
container = ( indexContainer ) entry . getValue ( ) ;
// collect all urlhashes from the container
synchronized ( rcAbstracts ) {
singleAbstract = ( TreeMap ) rcAbstracts . get ( wordhash ) ; // a mapping from url-hashes to a string of peer-hashes
if ( singleAbstract = = null ) singleAbstract = new TreeMap ( ) ;
i = container . entries ( ) ;
while ( i . hasNext ( ) ) singleAbstract . put ( ( ( indexEntry ) i . next ( ) ) . urlHash ( ) , mypeerhash ) ;
rcAbstracts . put ( wordhash , singleAbstract ) ;
}
}
}
// try to pre-fetch some LURLs if there is enough time
indexContainer rcLocal = localSearchJoin ( ( searchContainerMap = = null ) ? null : searchContainerMap . values ( ) ) ;
plasmaSearchResult localResult = orderLocal ( rcLocal , secondaryTimeout ) ;
p refetch Local( rcLocal , secondaryTimeout ) ;
// evaluate index abstracts and start a secondary search
// this is temporary debugging code to learn that the index abstracts are fetched correctly
@ -168,7 +191,7 @@ public final class plasmaSearchEvent extends Thread implements Runnable {
log . logFine ( "SEARCH TIME AFTER GLOBAL-TRIGGER TO " + fetchpeers + " PEERS: " + ( ( System . currentTimeMillis ( ) - start ) / 1000 ) + " seconds" ) ;
// combine the result and order
plasmaSearchResult result = ( ( globalContributions = = 0 ) & & ( localResult . sizeOrdered ( ) ! = 0 ) ) ? localResult : orderFinal ( rcLocal ) ;
plasmaSearchResult result = orderFinal ( rcLocal ) ;
result . globalContributions = globalContributions ;
result . localContributions = rcLocal . size ( ) ;
@ -216,6 +239,8 @@ public final class plasmaSearchEvent extends Thread implements Runnable {
Iterator i1 = abstractJoin . entrySet ( ) . iterator ( ) ;
Map . Entry entry1 ;
String url , urls , peer , peers ;
String mypeerhash = yacyCore . seedDB . mySeed . hash ;
boolean mypeerinvolved = false ;
while ( i1 . hasNext ( ) ) {
entry1 = ( Map . Entry ) i1 . next ( ) ;
url = ( String ) entry1 . getKey ( ) ;
@ -227,17 +252,19 @@ public final class plasmaSearchEvent extends Thread implements Runnable {
urls = ( String ) secondarySearchURLs . get ( peer ) ;
urls = ( urls = = null ) ? url : urls + url ;
secondarySearchURLs . put ( peer , urls ) ;
if ( peer . equals ( mypeerhash ) ) mypeerinvolved = true ;
}
}
// compute words for secondary search and start the secondary searches
i1 = secondarySearchURLs . entrySet ( ) . iterator ( ) ;
String words ;
secondarySearchThreads = new yacySearch [ secondarySearchURLs . size ( ) ] ;
secondarySearchThreads = new yacySearch [ ( mypeerinvolved ) ? secondarySearchURLs . size ( ) - 1 : secondarySearchURLs . size ( ) ] ;
int c = 0 ;
while ( i1 . hasNext ( ) ) {
entry1 = ( Map . Entry ) i1 . next ( ) ;
peer = ( String ) entry1 . getKey ( ) ;
if ( peer . equals ( mypeerhash ) ) continue ; // we dont need to ask ourself
urls = ( String ) entry1 . getValue ( ) ;
words = wordsFromPeer ( peer , urls ) ;
System . out . println ( "DEBUG-INDEXABSTRACT: peer " + peer + " has urls: " + urls ) ;
@ -380,54 +407,24 @@ public final class plasmaSearchEvent extends Thread implements Runnable {
return acc ;
}
private plasmaSearchResult orderLocal ( indexContainer rcLocal , long timeout ) {
// we collect the urlhashes and construct a list with urlEntry objects
// attention: if minEntries is too high, this method will not terminate within the maxTime
private void prefetchLocal ( indexContainer rcLocal , long timeout ) {
// pre-fetch some urls to fill LURL ram cache
profileLocal . startTimer ( ) ;
plasmaSearchPreOrder preorder = new plasmaSearchPreOrder ( query , ranking , rcLocal , timeout - System . currentTimeMillis ( ) ) ;
preorder . remove ( true , true ) ;
profileLocal . setYieldTime ( plasmaSearchTimingProfile . PROCESS_PRESORT ) ;
profileLocal . setYieldCount ( plasmaSearchTimingProfile . PROCESS_PRESORT , rcLocal . size ( ) ) ;
// start url-fetch
profileLocal . startTimer ( ) ;
plasmaSearchResult acc = new plasmaSearchResult ( query , ranking ) ;
indexEntry entry ;
plasmaCrawlLURL . Entry page ;
Long preranking ;
Object [ ] preorderEntry ;
try {
while ( preorder . hasNext ( ) ) {
if ( System . currentTimeMillis ( ) > = timeout ) break ;
preorderEntry = preorder . next ( ) ;
entry = ( indexEntry ) preorderEntry [ 0 ] ;
preranking = ( Long ) preorderEntry [ 1 ] ;
// find the url entry
page = urlStore . load ( entry . urlHash ( ) , entry ) ;
// add a result
if ( page ! = null ) acc . addResult ( page , preranking ) ;
entry = ( indexEntry ) ( preorder . next ( ) [ 0 ] ) ;
// find and fetch the url entry
urlStore . load ( entry . urlHash ( ) , entry ) ;
}
} catch ( kelondroException ee ) {
serverLog . logSevere ( "PLASMA" , "Database Failure during plasmaSearch.order: " + ee . getMessage ( ) , ee ) ;
}
profileLocal . setYieldTime ( plasmaSearchTimingProfile . PROCESS_URLFETCH ) ;
profileLocal . setYieldCount ( plasmaSearchTimingProfile . PROCESS_URLFETCH , acc . sizeFetched ( ) ) ;
// start postsorting
profileLocal . startTimer ( ) ;
acc . sortResults ( postsort ) ;
profileLocal . setYieldTime ( plasmaSearchTimingProfile . PROCESS_POSTSORT ) ;
profileLocal . setYieldCount ( plasmaSearchTimingProfile . PROCESS_POSTSORT , acc . sizeOrdered ( ) ) ;
// apply filter
profileLocal . startTimer ( ) ;
acc . removeRedundant ( ) ;
profileLocal . setYieldTime ( plasmaSearchTimingProfile . PROCESS_FILTER ) ;
profileLocal . setYieldCount ( plasmaSearchTimingProfile . PROCESS_FILTER , acc . sizeOrdered ( ) ) ;
return acc ;
}
public void run ( ) {