@ -42,10 +42,13 @@ import java.util.SortedSet;
import java.util.TreeMap ;
import java.util.concurrent.BlockingQueue ;
import java.util.concurrent.ConcurrentHashMap ;
import java.util.concurrent.Semaphore ;
import java.util.concurrent.TimeUnit ;
import java.util.concurrent.atomic.AtomicInteger ;
import java.util.regex.Pattern ;
import org.apache.solr.common.SolrDocument ;
import net.yacy.contentcontrol.ContentControlFilterUpdateThread ;
import net.yacy.cora.document.analysis.Classification ;
import net.yacy.cora.document.analysis.Classification.ContentDomain ;
@ -89,9 +92,9 @@ import net.yacy.kelondro.util.SetTools;
import net.yacy.peers.RemoteSearch ;
import net.yacy.peers.SeedDB ;
import net.yacy.peers.graphics.ProfilingGraph ;
import net.yacy.repository.Blacklist.BlacklistType ;
import net.yacy.repository.FilterEngine ;
import net.yacy.repository.LoaderDispatcher ;
import net.yacy.repository.Blacklist.BlacklistType ;
import net.yacy.search.EventTracker ;
import net.yacy.search.Switchboard ;
import net.yacy.search.SwitchboardConstants ;
@ -104,8 +107,6 @@ import net.yacy.search.schema.CollectionSchema;
import net.yacy.search.snippet.TextSnippet ;
import net.yacy.search.snippet.TextSnippet.ResultClass ;
import org.apache.solr.common.SolrDocument ;
public final class SearchEvent {
private static final int max_results_rwi = 3000 ;
@ -254,6 +255,9 @@ public final class SearchEvent {
/** the number of peers which contributed to the remote search result */
public final AtomicInteger remote_solr_peerCount ;
/** Ensure only one {@link #resortCachedResults()} operation to be performed on this search event */
public final Semaphore resortCacheAllowed ;
public int getResultCount ( ) {
return Math . max (
this . local_rwi_available . get ( ) + this . remote_rwi_available . get ( ) +
@ -355,6 +359,7 @@ public final class SearchEvent {
this . remote_solr_stored = new AtomicInteger ( 0 ) ;
this . remote_solr_available = new AtomicInteger ( 0 ) ; // the number of result contributions from all the remote solr peers
this . remote_solr_peerCount = new AtomicInteger ( 0 ) ; // the number of remote solr peers that have contributed
this . resortCacheAllowed = new Semaphore ( 1 ) ;
final long start = System . currentTimeMillis ( ) ;
// do a soft commit for fresh results
@ -1412,12 +1417,67 @@ public final class SearchEvent {
/ * *
* Adds the retrieved results ( fulltext & rwi ) to the result list and
* computes the text snippets
* @param concurrentSnippetFetch when true , allow starting concurrent tasks to fetch snippets when no one are already available
* @return true on adding entries to resultlist otherwise false
* /
public boolean drainStacksToResult ( ) {
public boolean drainStacksToResult ( boolean concurrentSnippetFetch ) {
// we take one entry from both stacks at the same time
boolean success = false ;
final Element < URIMetadataNode > localEntryElement = this . nodeStack . sizeQueue ( ) > 0 ? this . nodeStack . poll ( ) : null ;
boolean solrSuccess = drainSolrStackToResult ( concurrentSnippetFetch ) ;
boolean rwiSuccess = drainRWIStackToResult ( concurrentSnippetFetch ) ;
return solrSuccess | | rwiSuccess ;
}
/ * *
* Adds the retrieved results from local and remotes RWI to the result list and
* computes the text snippets
* @param concurrentSnippetFetch when true , allow starting a concurrent task to fetch a snippet when no one is already available
* @return true when an entry has been effectively added to resultlist otherwise false
* /
private boolean drainRWIStackToResult ( boolean concurrentSnippetFetch ) {
boolean success = false ;
if ( SearchEvent . this . snippetFetchAlive . get ( ) > = 10 | | MemoryControl . shortStatus ( ) | | ! concurrentSnippetFetch ) {
// too many concurrent processes
final URIMetadataNode noderwi = pullOneFilteredFromRWI ( true ) ;
if ( noderwi ! = null ) {
addResult ( getSnippet ( noderwi , null ) , noderwi . score ( ) ) ;
success = true ;
}
} else {
Thread t = new Thread ( "SearchEvent.drainStacksToResult.oneFilteredFromRWI" ) {
@Override
public void run ( ) {
SearchEvent . this . oneFeederStarted ( ) ;
try {
final URIMetadataNode noderwi = pullOneFilteredFromRWI ( true ) ;
if ( noderwi ! = null ) {
SearchEvent . this . snippetFetchAlive . incrementAndGet ( ) ;
try {
addResult ( getSnippet ( noderwi , SearchEvent . this . query . snippetCacheStrategy ) , noderwi . score ( ) ) ;
} catch ( final Throwable e ) {
ConcurrentLog . logException ( e ) ;
} finally {
SearchEvent . this . snippetFetchAlive . decrementAndGet ( ) ;
}
}
} catch ( final Throwable e ) { } finally {
SearchEvent . this . oneFeederTerminated ( ) ;
}
}
} ;
if ( SearchEvent . this . query . snippetCacheStrategy = = null ) t . run ( ) ; else t . start ( ) ; //no need for concurrency if there is no latency
}
return success ;
}
/ * *
* Adds the retrieved full text results from local and remotes Solr to the result list and
* computes the text snippets
* @param concurrentSnippetFetch when true , allow starting a concurrent task to fetch a snippet when no one is already available
* @return true when an entry has been effectively added to resultlist otherwise false
* /
private boolean drainSolrStackToResult ( boolean concurrentSnippetFetch ) {
boolean success = false ;
final Element < URIMetadataNode > localEntryElement = this . nodeStack . sizeQueue ( ) > 0 ? this . nodeStack . poll ( ) : null ;
final URIMetadataNode node = localEntryElement = = null ? null : localEntryElement . getElement ( ) ;
if ( node ! = null ) {
LinkedHashSet < String > solrsnippetlines = this . snippets . remove ( ASCII . String ( node . hash ( ) ) ) ; // we can remove this because it's used only once
@ -1438,7 +1498,7 @@ public final class SearchEvent {
success = true ;
} else {
// we don't have a snippet from solr, try to get it in our way (by reloading, if necessary)
if ( SearchEvent . this . snippetFetchAlive . get ( ) > = 10 ) {
if ( SearchEvent . this . snippetFetchAlive . get ( ) > = 10 | | ! concurrentSnippetFetch ) {
// too many concurrent processes
addResult ( getSnippet ( node , null ) , localEntryElement . getWeight ( ) ) ;
success = true ;
@ -1463,39 +1523,8 @@ public final class SearchEvent {
}
}
}
if ( SearchEvent . this . snippetFetchAlive . get ( ) > = 10 | | MemoryControl . shortStatus ( ) ) {
// too many concurrent processes
final URIMetadataNode noderwi = pullOneFilteredFromRWI ( true ) ;
if ( noderwi ! = null ) {
addResult ( getSnippet ( noderwi , null ) , noderwi . score ( ) ) ;
success = true ;
}
} else {
Thread t = new Thread ( "SearchEvent.drainStacksToResult.oneFilteredFromRWI" ) {
@Override
public void run ( ) {
SearchEvent . this . oneFeederStarted ( ) ;
try {
final URIMetadataNode noderwi = pullOneFilteredFromRWI ( true ) ;
if ( noderwi ! = null ) {
SearchEvent . this . snippetFetchAlive . incrementAndGet ( ) ;
try {
addResult ( getSnippet ( noderwi , SearchEvent . this . query . snippetCacheStrategy ) , noderwi . score ( ) ) ;
} catch ( final Throwable e ) {
ConcurrentLog . logException ( e ) ;
} finally {
SearchEvent . this . snippetFetchAlive . decrementAndGet ( ) ;
}
}
} catch ( final Throwable e ) { } finally {
SearchEvent . this . oneFeederTerminated ( ) ;
}
}
} ;
if ( SearchEvent . this . query . snippetCacheStrategy = = null ) t . run ( ) ; else t . start ( ) ; //no need for concurrency if there is no latency
}
return success ;
}
return success ;
}
/ * *
* place the result to the result vector and apply post - ranking
@ -1689,7 +1718,7 @@ public final class SearchEvent {
while ( this . resultList . sizeAvailable ( ) < = resultListIndex & &
( this . rwiQueueSize ( ) > 0 | | this . nodeStack . sizeQueue ( ) > 0 | |
( ! this . isFeedingFinished ( ) & & System . currentTimeMillis ( ) < finishTime ) ) ) {
if ( ! drainStacksToResult ( ) ) {
if ( ! drainStacksToResult ( true ) ) {
try {
Thread . sleep ( 10 ) ;
} catch ( final InterruptedException e ) {
@ -1857,6 +1886,65 @@ public final class SearchEvent {
return this . resultList . list ( Math . min ( this . query . neededResults ( ) , this . resultList . sizeAvailable ( ) ) ) ;
}
/ * *
* Re - sort results cached in the resultList and eventually include in that list
* elements with higher ranks from the Solr and RWI stacks .
* /
public void resortCachedResults ( ) {
/ *
* If stacks feeding is finished , drain as much as possible elements from stacks
* while their ranking is higher than the last element in the result list
* /
if ( isFeedingFinished ( ) & & this . resortCacheAllowed . tryAcquire ( ) ) {
/ *
* First put all elements of the resultList in its own sorted queue to have a
* consistent sorting on the whole set
* /
this . resultList . requeueDrainedElements ( ) ;
/ *
* Note : if the resultList is full ( its maxSize has been reached ) some elements
* with the lowest ranking may be lost in this next step . Not really a problem
* because they were not supposed to be here . If really necessary to keep them ,
* growing the maxSize of the resultList should be considered here .
* /
WeakPriorityBlockingQueue . Element < URIMetadataNode > initialLastResult = this . resultList . getLastInQueue ( ) ;
/ *
* Drain stacks in two steps ( Solr , then RWI ) , because one stack might still
* contains higher ranked results when only lower ranked remain in the other
* /
/ *
* Here we do not fetch snippets concurrently as we want to know immediately the
* drained element position in the final result list
* /
boolean drained = drainSolrStackToResult ( false ) ;
WeakPriorityBlockingQueue . Element < URIMetadataNode > newLastResult = this . resultList . getLastInQueue ( ) ;
/ *
* Loop while at least one element has been added to the results list and is not
* the last considering its final rank
* /
while ( drained & & newLastResult = = initialLastResult ) {
drained = drainSolrStackToResult ( false ) ;
newLastResult = this . resultList . getLastInQueue ( ) ;
}
drained = drainRWIStackToResult ( false ) ;
newLastResult = this . resultList . getLastInQueue ( ) ;
/ *
* Loop while at least one element has been added to the results list and is not
* the last considering its final rank
* /
while ( drained & & newLastResult = = initialLastResult ) {
drained = drainRWIStackToResult ( false ) ;
newLastResult = this . resultList . getLastInQueue ( ) ;
}
}
}
/ * *
* delete a specific entry from the search results
* this is used if the user clicks on a '-' sign beside the search result