@ -21,6 +21,7 @@
package net.yacy.cora.federate.solr.connector ;
import java.io.IOException ;
import java.io.InterruptedIOException ;
import java.util.ArrayList ;
import java.util.Collection ;
import java.util.Date ;
@ -31,24 +32,17 @@ import java.util.LinkedHashMap;
import java.util.List ;
import java.util.Map ;
import java.util.Map.Entry ;
import java.util.Objects ;
import java.util.Set ;
import java.util.concurrent.ArrayBlockingQueue ;
import java.util.concurrent.BlockingQueue ;
import java.util.concurrent.LinkedBlockingQueue ;
import java.util.concurrent.TimeUnit ;
import net.yacy.cora.document.encoding.UTF8 ;
import net.yacy.cora.sorting.ClusteredScoreMap ;
import net.yacy.cora.sorting.ReversibleScoreMap ;
import net.yacy.cora.util.ConcurrentLog ;
import net.yacy.cora.util.LookAheadIterator ;
import net.yacy.kelondro.data.word.Word ;
import net.yacy.search.schema.CollectionSchema ;
import org.apache.solr.client.solrj.SolrQuery ;
import org.apache.solr.client.solrj.response.FacetField ;
import org.apache.solr.client.solrj.response.QueryResponse ;
import org.apache.solr.client.solrj.response.FacetField.Count ;
import org.apache.solr.client.solrj.response.QueryResponse ;
import org.apache.solr.common.SolrDocument ;
import org.apache.solr.common.SolrDocumentList ;
import org.apache.solr.common.SolrException ;
@ -58,6 +52,14 @@ import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.DisMaxParams ;
import org.apache.solr.common.params.FacetParams ;
import net.yacy.cora.document.encoding.UTF8 ;
import net.yacy.cora.sorting.ClusteredScoreMap ;
import net.yacy.cora.sorting.ReversibleScoreMap ;
import net.yacy.cora.util.ConcurrentLog ;
import net.yacy.cora.util.LookAheadIterator ;
import net.yacy.kelondro.data.word.Word ;
import net.yacy.search.schema.CollectionSchema ;
public abstract class AbstractSolrConnector implements SolrConnector {
protected static Set < String > SOLR_ID_FIELDS = new HashSet < String > ( ) ;
@ -170,19 +172,7 @@ public abstract class AbstractSolrConnector implements SolrConnector {
}
/ * *
* Get results from solr queries as a stream of documents .
* The result queue is considered as terminated if AbstractSolrConnector . POISON_DOCUMENT is returned .
* The method returns immediately and feeds the search results into the queue
* @param querystrings the list of solr query strings
* @param sort the solr sort string , may be null to be not used
* @param offset first result offset
* @param maxcount the maximum number of results
* @param maxtime the maximum time in milliseconds
* @param buffersize the size of an ArrayBlockingQueue ; if < = 0 then a LinkedBlockingQueue is used
* @param concurrency is the number of AbstractSolrConnector . POISON_DOCUMENT entries to add at the end of the feed
* @param prefetchIDs if true , then first all IDs are fetched and then all documents are queries by the ID . If false then documents are retrieved directly
* @param fields list of fields
* @return a blocking queue which is terminated with AbstractSolrConnector . POISON_DOCUMENT as last element
* { @inheritDoc }
* /
@Override
public BlockingQueue < SolrDocument > concurrentDocumentsByQueries (
@ -195,11 +185,10 @@ public abstract class AbstractSolrConnector implements SolrConnector {
final int concurrency ,
final boolean prefetchIDs ,
final String . . . fields ) {
assert buffersize > 0 ;
if ( ! prefetchIDs ) return concurrentDocumentsByQueriesNoPrefetch ( querystrings , sort , offset , maxcount , maxtime , buffersize , concurrency , fields ) ;
final BlockingQueue < SolrDocument > queue = buffersize < = 0 ? new LinkedBlockingQueue < SolrDocument > ( ) : new ArrayBlockingQueue < SolrDocument > ( Math . max ( buffersize , concurrency ) ) ;
if ( querystrings . size ( ) = = 0 ) {
for ( int i = 0 ; i < Math . max ( 1 , concurrency ) ; i + + ) try { queue . put ( AbstractSolrConnector . POISON_DOCUMENT ) ; } catch ( final InterruptedException e1 ) { }
if ( ! prefetchIDs ) {
final Thread t = new Thread ( newDocumentsByQueriesTask ( queue , querystrings , sort , offset , maxcount , maxtime , buffersize , concurrency , fields ) ) ;
t . start ( ) ;
return queue ;
}
final BlockingQueue < String > idQueue = concurrentIDsByQueries ( querystrings , sort , offset , maxcount , maxtime , Math . min ( maxcount , 10000000 ) , concurrency ) ;
@ -235,7 +224,9 @@ public abstract class AbstractSolrConnector implements SolrConnector {
return queue ;
}
private BlockingQueue < SolrDocument > concurrentDocumentsByQueriesNoPrefetch (
@Override
public Runnable newDocumentsByQueriesTask (
final BlockingQueue < SolrDocument > queue ,
final List < String > querystrings ,
final String sort ,
final int offset ,
@ -244,59 +235,85 @@ public abstract class AbstractSolrConnector implements SolrConnector {
final int buffersize ,
final int concurrency ,
final String . . . fields ) {
assert buffersize > 0 ;
final BlockingQueue < SolrDocument > queue = buffersize < = 0 ? new LinkedBlockingQueue < SolrDocument > ( ) : new ArrayBlockingQueue < SolrDocument > ( buffersize ) ;
if ( querystrings . size ( ) = = 0 ) {
for ( int i = 0 ; i < Math . max ( 1 , concurrency ) ; i + + ) try { queue . put ( AbstractSolrConnector . POISON_DOCUMENT ) ; } catch ( final InterruptedException e1 ) { }
return queue ;
Objects . requireNonNull ( queue , "The queue parameter must not be null." ) ;
if ( querystrings = = null | | querystrings . isEmpty ( ) ) {
return ( ) - > {
for ( int i = 0 ; i < Math . max ( 1 , concurrency ) ; i + + ) {
try {
queue . put ( AbstractSolrConnector . POISON_DOCUMENT ) ;
} catch ( final InterruptedException e1 ) {
Thread . currentThread ( ) . interrupt ( ) ; // preserve interrupted thread state
}
}
} ;
}
final long endtime = maxtime < 0 | | maxtime = = Long . MAX_VALUE ? Long . MAX_VALUE : System . currentTimeMillis ( ) + maxtime ; // we know infinity!
final int ps = buffersize < 0 ? pagesize_docs : Math . min ( pagesize_docs , buffersize ) ;
final int maxretries = 6 ;
final Thread t = new Thread ( ) {
@Override
public void run ( ) {
return ( ) - > {
long remainingTime = endtime - System . currentTimeMillis ( ) ;
try {
for ( String querystring : querystrings ) {
this . setName ( "AbstractSolrConnector:concurrentDocumentsByQueryNoPrefetch(" + querystring + ")" ) ;
for ( final String querystring : querystrings ) {
Thread . currentThread ( ) . setName ( "AbstractSolrConnector:concurrentDocumentsByQueryNoPrefetch(" + querystring + ")" ) ;
int o = offset ;
int count = 0 ;
int retry = 0 ;
loop : while ( System . currentTimeMillis ( ) < endtime & & count < maxcount ) {
loop : while ( remainingTime > 0 & & count < maxcount ) {
try {
SolrDocumentList sdl = getDocumentListByQuery ( querystring , sort , o , Math . min ( maxcount , ps ) , fields ) ;
for ( SolrDocument d : sdl ) {
try { queue . put ( d ) ; } catch ( final InterruptedException e ) { break ; }
final SolrDocumentList sdl = getDocumentListByQuery ( querystring , sort , o , Math . min ( maxcount , ps ) , fields ) ;
for ( final SolrDocument d : sdl ) {
if ( endtime ! = Long . MAX_VALUE ) {
/ *
* A timeout is defined : we must not use here queue . put ( ) otherwise this
* thread could indefinitely wait here when the queue is full and the
* consumer thread has stopped taking in the queue .
* /
if ( ! queue . offer ( d , remainingTime , TimeUnit . MILLISECONDS ) ) {
break ;
}
} else {
queue . put ( d ) ;
}
count + + ;
}
if ( sdl . size ( ) < ps ) {
//System.out.println("sdl.size() = " + sdl.size() + ", pagesize = " + pagesize);
break loop ; // finished
}
o + = sdl . size ( ) ;
retry = 0 ;
} catch ( final InterruptedIOException e ) {
throw new InterruptedException ( ) ; // rethrow to finish the process
} catch ( final SolrException | IOException e ) {
ConcurrentLog . logException ( e ) ;
if ( retry + + < maxretries ) {
// remote Solr may be temporary down, so we wait a bit
try { Thread . sleep ( 100 ) ; } catch ( InterruptedException e1 ) { }
Thread . sleep ( 100 ) ;
continue loop ;
}
// fail
ConcurrentLog . severe ( "AbstractSolrConnector" , "aborted concurrentDocumentsByQueryNoPrefetch after " + maxretries + " retries: " + e . getMessage ( ) ) ;
break ;
}
remainingTime = endtime - System . currentTimeMillis ( ) ;
}
}
} catch ( Throwable e ) { } finally {
} catch ( final InterruptedException e ) {
Thread . currentThread ( ) . interrupt ( ) ; // preserve interrupted thread state
} catch ( final RuntimeException e ) {
ConcurrentLog . logException ( e ) ;
} finally {
/* Add poison elements only when the thread has not been interrupted */
for ( int i = 0 ; i < Math . max ( 1 , concurrency ) ; i + + ) {
try { queue . put ( AbstractSolrConnector . POISON_DOCUMENT ) ; } catch ( final InterruptedException e1 ) { }
try {
queue . put ( AbstractSolrConnector . POISON_DOCUMENT ) ;
} catch ( final InterruptedException e1 ) {
Thread . currentThread ( ) . interrupt ( ) ; // preserve interrupted thread state
break ; // thread is interrupted : in that case we no more try to add poison elements to the queue
}
}
}
} ;
t . start ( ) ;
return queue ;
}
/ * *