@ -22,12 +22,20 @@
package net.yacy.cora.federate.solr.connector ;
import java.io.IOException ;
import java.util.Collection ;
import java.util.HashSet ;
import java.util.Set ;
import java.util.concurrent.BlockingQueue ;
import java.util.concurrent.LinkedBlockingQueue ;
import net.yacy.cora.federate.solr.instance.EmbeddedInstance ;
import net.yacy.cora.federate.solr.instance.SolrInstance ;
import net.yacy.cora.util.ConcurrentLog ;
import net.yacy.search.schema.CollectionSchema ;
import org.apache.lucene.document.Document ;
import org.apache.lucene.index.DirectoryReader ;
import org.apache.solr.client.solrj.SolrQuery ;
import org.apache.solr.client.solrj.SolrServerException ;
import org.apache.solr.client.solrj.response.QueryResponse ;
import org.apache.solr.common.SolrException ;
@ -40,13 +48,20 @@ import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.component.SearchHandler ;
import org.apache.solr.request.SolrQueryRequest ;
import org.apache.solr.request.SolrQueryRequestBase ;
import org.apache.solr.re quest.SolrRequestInfo ;
import org.apache.solr.re sponse.ResultContext ;
import org.apache.solr.response.SolrQueryResponse ;
import org.apache.solr.search.DocIterator ;
import org.apache.solr.search.DocList ;
import org.apache.solr.search.SolrIndexSearcher ;
import org.apache.solr.util.RefCounted ;
public class EmbeddedSolrConnector extends SolrServerConnector implements SolrConnector {
static Set < String > SOLR_ID_FIELDS = new HashSet < String > ( ) ;
static {
SOLR_ID_FIELDS . add ( CollectionSchema . id . getSolrFieldName ( ) ) ;
}
public static final String SELECT = "/select" ;
public static final String CONTEXT = "/solr" ;
@ -128,7 +143,7 @@ public class EmbeddedSolrConnector extends SolrServerConnector implements SolrCo
NamedList < Object > responseHeader = new SimpleOrderedMap < Object > ( ) ;
responseHeader . add ( "params" , req . getOriginalParams ( ) . toNamedList ( ) ) ;
rsp . add ( "responseHeader" , responseHeader ) ;
SolrRequestInfo . setRequestInfo ( new SolrRequestInfo ( req , rsp ) ) ;
//SolrRequestInfo.setRequestInfo(new SolrRequestInfo(req, rsp));
// send request to solr and create a result
this . requestHandler . handleRequest ( req , rsp ) ;
@ -144,6 +159,10 @@ public class EmbeddedSolrConnector extends SolrServerConnector implements SolrCo
return rsp ;
}
/ * *
* the usage of getResponseByParams is disencouraged for the embedded Solr connector . Please use request ( SolrParams ) instead .
* Reason : Solr makes a very complex folding / unfolding including data compression for SolrQueryResponses .
* /
@Override
public QueryResponse getResponseByParams ( ModifiableSolrParams params ) throws IOException {
if ( this . server = = null ) throw new IOException ( "server disconnected" ) ;
@ -164,4 +183,125 @@ public class EmbeddedSolrConnector extends SolrServerConnector implements SolrCo
}
}
private class DocListSearcher {
public SolrQueryRequest request ;
public DocList response ;
public DocListSearcher ( final String querystring , final int offset , final int count , final String . . . fields ) {
// construct query
final SolrQuery params = new SolrQuery ( ) ;
params . setQuery ( querystring ) ;
params . setRows ( count ) ;
params . setStart ( offset ) ;
params . setFacet ( false ) ;
params . clearSorts ( ) ;
if ( fields . length > 0 ) params . setFields ( fields ) ;
params . setIncludeScore ( false ) ;
// query the server
this . request = request ( params ) ;
SolrQueryResponse rsp = query ( request ) ;
this . response = ( ( ResultContext ) rsp . getValues ( ) . get ( "response" ) ) . docs ;
}
public void close ( ) {
if ( this . request ! = null ) this . request . close ( ) ;
this . request = null ;
this . response = null ;
}
}
@Override
public long getCountByQuery ( String querystring ) {
DocListSearcher docListSearcher = new DocListSearcher ( querystring , 0 , 0 , CollectionSchema . id . getSolrFieldName ( ) ) ;
int numFound = docListSearcher . response . matches ( ) ;
docListSearcher . close ( ) ;
return numFound ;
}
@Override
public boolean existsById ( String id ) {
return getCountByQuery ( "{!raw f=" + CollectionSchema . id . getSolrFieldName ( ) + "}" + id ) > 0 ;
}
@Override
public Set < String > existsByIds ( Collection < String > ids ) {
if ( ids = = null | | ids . size ( ) = = 0 ) return new HashSet < String > ( ) ;
if ( ids . size ( ) = = 1 & & ids instanceof Set ) return existsById ( ids . iterator ( ) . next ( ) ) ? ( Set < String > ) ids : new HashSet < String > ( ) ;
StringBuilder sb = new StringBuilder ( ) ; // construct something like "({!raw f=id}Ij7B63g-gSHA) OR ({!raw f=id}PBcGI3g-gSHA)"
for ( String id : ids ) {
sb . append ( "({!raw f=" ) . append ( CollectionSchema . id . getSolrFieldName ( ) ) . append ( '}' ) . append ( id ) . append ( ") OR " ) ;
}
if ( sb . length ( ) > 0 ) sb . setLength ( sb . length ( ) - 4 ) ; // cut off the last 'or'
DocListSearcher docListSearcher = new DocListSearcher ( sb . toString ( ) , 0 , ids . size ( ) , CollectionSchema . id . getSolrFieldName ( ) ) ;
//int numFound = docListSearcher.response.matches();
int responseCount = docListSearcher . response . size ( ) ;
SolrIndexSearcher searcher = docListSearcher . request . getSearcher ( ) ;
DocIterator iterator = docListSearcher . response . iterator ( ) ;
HashSet < String > idsr = new HashSet < String > ( ) ;
try {
for ( int i = 0 ; i < responseCount ; i + + ) {
Document doc = searcher . doc ( iterator . nextDoc ( ) , SOLR_ID_FIELDS ) ;
idsr . add ( doc . get ( CollectionSchema . id . getSolrFieldName ( ) ) ) ;
}
} catch ( IOException e ) {
} finally {
docListSearcher . close ( ) ;
}
// construct a new id list from that
return idsr ;
}
@Override
public String getFieldById ( final String id , final String field ) throws IOException {
DocListSearcher docListSearcher = new DocListSearcher ( "{!raw f=" + CollectionSchema . id . getSolrFieldName ( ) + "}" + id , 0 , 1 , CollectionSchema . id . getSolrFieldName ( ) ) ;
int numFound = docListSearcher . response . matches ( ) ;
if ( numFound = = 0 ) return null ;
Set < String > solrFields = new HashSet < String > ( ) ;
solrFields . add ( field ) ;
try {
Document doc = docListSearcher . request . getSearcher ( ) . doc ( docListSearcher . response . iterator ( ) . nextDoc ( ) , solrFields ) ;
return doc . get ( field ) ;
} catch ( IOException e ) {
e . printStackTrace ( ) ;
} finally {
docListSearcher . close ( ) ;
}
return null ;
}
@Override
public BlockingQueue < String > concurrentIDsByQuery ( final String querystring , final int offset , final int maxcount , final long maxtime ) {
final BlockingQueue < String > queue = new LinkedBlockingQueue < String > ( ) ;
final long endtime = maxtime = = Long . MAX_VALUE ? Long . MAX_VALUE : System . currentTimeMillis ( ) + maxtime ; // we know infinity!
final Thread t = new Thread ( ) {
@Override
public void run ( ) {
int o = offset ;
while ( System . currentTimeMillis ( ) < endtime ) {
try {
DocListSearcher docListSearcher = new DocListSearcher ( querystring , o , pagesize , CollectionSchema . id . getSolrFieldName ( ) ) ;
int responseCount = docListSearcher . response . size ( ) ;
SolrIndexSearcher searcher = docListSearcher . request . getSearcher ( ) ;
DocIterator iterator = docListSearcher . response . iterator ( ) ;
try {
for ( int i = 0 ; i < responseCount ; i + + ) {
Document doc = searcher . doc ( iterator . nextDoc ( ) , SOLR_ID_FIELDS ) ;
try { queue . put ( doc . get ( CollectionSchema . id . getSolrFieldName ( ) ) ) ; } catch ( final InterruptedException e ) { break ; }
}
} catch ( IOException e ) {
} finally {
docListSearcher . close ( ) ;
}
if ( responseCount < pagesize ) break ;
o + = pagesize ;
} catch ( final SolrException e ) {
break ;
}
}
try { queue . put ( AbstractSolrConnector . POISON_ID ) ; } catch ( final InterruptedException e1 ) { }
}
} ;
t . start ( ) ;
return queue ;
}
}