@ -123,7 +123,9 @@ public final class SearchEvent {
private final SortedMap < byte [ ] , String > IAResults ;
private final SortedMap < byte [ ] , HeuristicResult > heuristics ;
private byte [ ] IAmaxcounthash , IAneardhthash ;
public Thread rwiProcess , localsearch ;
public Thread rwiProcess ;
private Thread localsolrsearch ;
private int localsolroffset ;
private final AtomicInteger expectedRemoteReferences , maxExpectedRemoteReferences ; // counter for referenced that had been sorted out for other reasons
public final ScoreMap < String > hostNavigator ; // a counter for the appearance of host names
public final ScoreMap < String > authorNavigator ; // a counter for the appearances of authors
@ -135,7 +137,7 @@ public final class SearchEvent {
private final LoaderDispatcher loader ;
private final HandleSet snippetFetchWordHashes ; // a set of word hashes that are used to match with the snippets
private final boolean deleteIfSnippetFail ;
private long urlRetrievalAllTime ;
private long urlRetrievalAllTime ;
private long snippetComputationAllTime ;
private ConcurrentHashMap < String , String > snippets ;
private final boolean remote ;
@ -159,15 +161,16 @@ public final class SearchEvent {
public final AtomicInteger local_rwi_stored ; // the number of existing hits by the local search in rwi index
public final AtomicInteger remote_rwi_available ; // the number of hits imported from remote peers (rwi/solr mixed)
public final AtomicInteger remote_rwi_stored ; // the number of existing hits at remote site
public final AtomicInteger remote_rwi_peerCount ; // the number of peers which contributed to the remote search result
public final AtomicInteger remote_rwi_peerCount ; // the number of peers which contributed to the remote search result
public final AtomicInteger local_solr_available ; // the number of hits generated/ranked by the local search in solr
public final AtomicInteger local_solr_stored ; // the number of existing hits by the local search in solr
public final AtomicInteger remote_solr_available ; // the number of hits imported from remote peers (rwi/solr mixed)
public final AtomicInteger remote_solr_stored ; // the number of existing hits at remote site
public final AtomicInteger remote_solr_peerCount ; // the number of peers which contributed to the remote search result
public final AtomicInteger remote_solr_peerCount ; // the number of peers which contributed to the remote search result
public int getResultCount ( ) {
return this . local_rwi_available . get ( ) + local_solr_stored . get ( ) ;
return this . local_rwi_available . get ( ) + this . remote_rwi_available . get ( ) +
this . remote_solr_available . get ( ) + this . local_solr_stored . get ( ) ;
}
protected SearchEvent (
@ -252,8 +255,9 @@ public final class SearchEvent {
}
// start a local solr search
this . localsearch = RemoteSearch . solrRemoteSearch ( this , 100 , null /*this peer*/ , Switchboard . urlBlacklist ) ;
this . localsolrsearch = RemoteSearch . solrRemoteSearch ( this , 0 , this . query . itemsPerPage , null /*this peer*/ , Switchboard . urlBlacklist ) ;
this . localsolroffset = this . query . itemsPerPage ;
// start a local RWI search concurrently
this . rwiProcess = null ;
if ( query . getSegment ( ) . connectedRWI ( ) & & ( ! this . remote | | this . peers . mySeed ( ) . getBirthdate ( ) < noRobinsonLocalRWISearch ) ) {
@ -278,7 +282,7 @@ public final class SearchEvent {
Thread . currentThread ( ) . setName ( "SearchEvent.primaryRemoteSearches" ) ;
RemoteSearch . primaryRemoteSearches (
SearchEvent . this ,
remote_maxcount ,
0 , remote_maxcount ,
remote_maxtime ,
Switchboard . urlBlacklist ,
( SearchEvent . this . query . domType = = QueryParams . Searchdom . GLOBAL ) ? null : preselectedPeerHashes ,
@ -486,8 +490,11 @@ public final class SearchEvent {
assert ( iEntry . urlhash ( ) . length = = index . row ( ) . primaryKeyLength ) ;
// doublecheck for urls
if ( this . urlhashes . has ( iEntry . urlhash ( ) ) ) continue pollloop ;
if ( this . urlhashes . has ( iEntry . urlhash ( ) ) ) {
if ( log . isFine ( ) ) log . logFine ( "dropped RWI: doublecheck" ) ;
continue pollloop ;
}
// increase flag counts
Bitfield flags = iEntry . flags ( ) ;
for ( int j = 0 ; j < 32 ; j + + ) {
@ -495,7 +502,10 @@ public final class SearchEvent {
}
// check constraints
if ( ! this . testFlags ( flags ) ) continue pollloop ;
if ( ! this . testFlags ( flags ) ) {
if ( log . isFine ( ) ) log . logFine ( "dropped RWI: flag test failed" ) ;
continue pollloop ;
}
// check document domain
if ( this . query . contentdom . getCode ( ) > 0 & &
@ -503,6 +513,7 @@ public final class SearchEvent {
( this . query . contentdom = = ContentDomain . VIDEO & & ! ( flags . get ( Condenser . flag_cat_hasvideo ) ) ) | |
( this . query . contentdom = = ContentDomain . IMAGE & & ! ( flags . get ( Condenser . flag_cat_hasimage ) ) ) | |
( this . query . contentdom = = ContentDomain . APP & & ! ( flags . get ( Condenser . flag_cat_hasapp ) ) ) ) ) {
if ( log . isFine ( ) ) log . logFine ( "dropped RWI: contentdom fail" ) ;
continue pollloop ;
}
@ -512,10 +523,16 @@ public final class SearchEvent {
// check site constraints
final String hosthash = iEntry . hosthash ( ) ;
if ( this . query . modifier . sitehash = = null ) {
if ( this . query . siteexcludes ! = null & & this . query . siteexcludes . contains ( hosthash ) ) continue pollloop ;
if ( this . query . siteexcludes ! = null & & this . query . siteexcludes . contains ( hosthash ) ) {
if ( log . isFine ( ) ) log . logFine ( "dropped RWI: siteexcludes" ) ;
continue pollloop ;
}
} else {
// filter out all domains that do not match with the site constraint
if ( ! hosthash . equals ( this . query . modifier . sitehash ) ) continue pollloop ;
if ( ! hosthash . equals ( this . query . modifier . sitehash ) ) {
if ( log . isFine ( ) ) log . logFine ( "dropped RWI: modifier.sitehash" ) ;
continue pollloop ;
}
}
// finally extend the double-check and insert result to stack
@ -526,6 +543,7 @@ public final class SearchEvent {
break rankingtryloop ;
} catch ( final ArithmeticException e ) {
// this may happen if the concurrent normalizer changes values during cardinal computation
if ( log . isFine ( ) ) log . logFine ( "dropped RWI: arithmetic exception" ) ;
continue rankingtryloop ;
}
}
@ -735,12 +753,14 @@ public final class SearchEvent {
if ( ! this . query . urlMask_isCatchall ) {
// check url mask
if ( ! iEntry . matches ( this . query . urlMask ) ) {
if ( log . isFine ( ) ) log . logFine ( "dropped Node: url mask does not match" ) ;
continue pollloop ;
}
}
// doublecheck for urls
if ( this . urlhashes . has ( iEntry . hash ( ) ) ) {
if ( log . isFine ( ) ) log . logFine ( "dropped Node: double check" ) ;
continue pollloop ;
}
@ -751,7 +771,10 @@ public final class SearchEvent {
// check constraints
Bitfield flags = iEntry . flags ( ) ;
if ( ! this . testFlags ( flags ) ) continue pollloop ;
if ( ! this . testFlags ( flags ) ) {
if ( log . isFine ( ) ) log . logFine ( "dropped Node: flag test" ) ;
continue pollloop ;
}
// check document domain
if ( this . query . contentdom . getCode ( ) > 0 & &
@ -759,6 +782,7 @@ public final class SearchEvent {
( this . query . contentdom = = ContentDomain . VIDEO & & ! ( flags . get ( Condenser . flag_cat_hasvideo ) ) ) | |
( this . query . contentdom = = ContentDomain . IMAGE & & ! ( flags . get ( Condenser . flag_cat_hasimage ) ) ) | |
( this . query . contentdom = = ContentDomain . APP & & ! ( flags . get ( Condenser . flag_cat_hasapp ) ) ) ) ) {
if ( log . isFine ( ) ) log . logFine ( "dropped Node: content domain does not match" ) ;
continue pollloop ;
}
@ -766,11 +790,15 @@ public final class SearchEvent {
final String hosthash = iEntry . hosthash ( ) ;
if ( this . query . modifier . sitehash = = null ) {
if ( this . query . siteexcludes ! = null & & this . query . siteexcludes . contains ( hosthash ) ) {
if ( log . isFine ( ) ) log . logFine ( "dropped Node: siteexclude" ) ;
continue pollloop ;
}
} else {
// filter out all domains that do not match with the site constraint
if ( iEntry . url ( ) . getHost ( ) . indexOf ( this . query . modifier . sitehost ) < 0 ) continue pollloop ;
if ( iEntry . url ( ) . getHost ( ) . indexOf ( this . query . modifier . sitehost ) < 0 ) {
if ( log . isFine ( ) ) log . logFine ( "dropped Node: sitehost" ) ;
continue pollloop ;
}
}
// finally extend the double-check and insert result to stack
@ -1047,14 +1075,21 @@ public final class SearchEvent {
return null ;
}
public void drainStacksToResult ( ) {
public boolean drainStacksToResult ( ) {
// we take one entry from both stacks at the same time
boolean success = false ;
Element < URIMetadataNode > localEntryElement = this . nodeStack . sizeQueue ( ) > 0 ? this . nodeStack . poll ( ) : null ;
URIMetadataNode localEntry = localEntryElement = = null ? null : localEntryElement . getElement ( ) ;
if ( localEntry ! = null ) addResult ( getSnippet ( localEntry , null ) ) ;
if ( localEntry ! = null ) {
addResult ( getSnippet ( localEntry , null ) ) ;
success = true ;
}
if ( localEntry = = null ) {
URIMetadataNode p2pEntry = pullOneFilteredFromRWI ( true ) ;
if ( p2pEntry ! = null ) addResult ( getSnippet ( p2pEntry , null ) ) ;
if ( p2pEntry ! = null ) {
addResult ( getSnippet ( p2pEntry , null ) ) ;
success = true ;
}
} else {
new Thread ( ) {
public void run ( ) {
@ -1063,6 +1098,7 @@ public final class SearchEvent {
}
} . start ( ) ;
}
return success ;
}
/ * *
@ -1188,33 +1224,23 @@ public final class SearchEvent {
final long finishTime = System . currentTimeMillis ( ) + timeout ;
EventTracker . update ( EventTracker . EClass . SEARCH , new ProfilingGraph . EventSearch ( this . query . id ( true ) , SearchEventType . ONERESULT , "started, item = " + item + ", available = " + this . getResultCount ( ) , 0 , 0 ) , false ) ;
// check if we have a success
if ( this . resultList . sizeAvailable ( ) > item ) {
// we have the wanted result already in the result array .. return that
final ResultEntry re = this . resultList . element ( item ) . getElement ( ) ;
EventTracker . update ( EventTracker . EClass . SEARCH , new ProfilingGraph . EventSearch ( this . query . id ( true ) , SearchEventType . ONERESULT , "prefetched, item = " + item + ", available = " + this . getResultCount ( ) + ": " + re . urlstring ( ) , 0 , 0 ) , false ) ;
return re ;
// wait until a local solr is finished, we must do that to be able to check if we need more
if ( this . localsolrsearch ! = null & & this . localsolrsearch . isAlive ( ) ) {
try { this . localsolrsearch . join ( ) ; } catch ( InterruptedException e ) { }
}
// we must wait some time until the first result page is full to get enough elements for ranking
/ *
if ( this . remote & & item < 10 & & ! this . feedingIsFinished ( ) ) {
// the first 10 results have a very special timing to get most of the remote results ordered
// before they are presented on the first lines .. yes sleeps seem to be bad. but how shall we predict how long other
// peers will take until they respond?
long stoptime = System . currentTimeMillis ( ) + Math . min ( timeout , item = = 0 ? 100 : ( 10 - item ) * 9 ) ; // the first result takes the longest time
while ( System . currentTimeMillis ( ) < stoptime ) {
//drainStacksToResult();
try { Thread . sleep ( 10 ) ; } catch ( final InterruptedException e ) { Log . logException ( e ) ; }
}
this . localsolrsearch = null ;
if ( item > = this . localsolroffset & & this . local_solr_stored . get ( ) > = item ) {
// load remaining solr results now
int nextitems = item - this . localsolroffset + this . query . itemsPerPage ; // example: suddenly switch to item 60, just 10 had been shown, 20 loaded.
this . localsolrsearch = RemoteSearch . solrRemoteSearch ( this , this . localsolroffset , nextitems , null /*this peer*/ , Switchboard . urlBlacklist ) ;
this . localsolroffset + = nextitems ;
}
* /
// now do this as long as needed
while ( ( ! this . feedingIsFinished ( ) | | this . rwiQueueSize ( ) > 0 | | this . nodeStack . sizeQueue ( ) > 0 ) & &
this . resultList . sizeAvailable ( ) < item + 1 & & System . currentTimeMillis ( ) < finishTime ) {
drainStacksToResult ( ) ;
try { Thread . sleep ( 10 ) ; } catch ( final InterruptedException e ) { Log . logException ( e ) ; }
// now pull results as long as needed and as long as possible
while ( this . resultList . sizeAvailable ( ) < = item & &
( this . rwiQueueSize ( ) > 0 | | this . nodeStack . sizeQueue ( ) > 0 | |
( ! this . feedingIsFinished ( ) & & System . currentTimeMillis ( ) < finishTime ) ) ) {
if ( ! drainStacksToResult ( ) ) try { Thread . sleep ( 10 ) ; } catch ( final InterruptedException e ) { Log . logException ( e ) ; }
}
// check if we have a success
@ -1222,6 +1248,12 @@ public final class SearchEvent {
// we have the wanted result already in the result array .. return that
final ResultEntry re = this . resultList . element ( item ) . getElement ( ) ;
EventTracker . update ( EventTracker . EClass . SEARCH , new ProfilingGraph . EventSearch ( this . query . id ( true ) , SearchEventType . ONERESULT , "fetched, item = " + item + ", available = " + this . getResultCount ( ) + ": " + re . urlstring ( ) , 0 , 0 ) , false ) ;
if ( this . local_solr_stored . get ( ) > this . localsolroffset & & ( item + 1 ) % this . query . itemsPerPage = = 0 ) {
// at the end of a list, trigger a next solr search
this . localsolrsearch = RemoteSearch . solrRemoteSearch ( this , this . localsolroffset , this . query . itemsPerPage , null /*this peer*/ , Switchboard . urlBlacklist ) ;
this . localsolroffset + = this . query . itemsPerPage ;
}
return re ;
}