@ -1147,25 +1147,25 @@ public final class Protocol {
return 0 ;
}
List < URIMetadataNode > c ontainer = new ArrayList < URIMetadataNode > ( ) ;
List < URIMetadataNode > resultC ontainer = new ArrayList < URIMetadataNode > ( ) ;
Network . log . info ( "SEARCH (solr), returned " + docList [ 0 ] . size ( ) + " out of " + docList [ 0 ] . getNumFound ( ) + " documents and " + facets . size ( ) + " facets " + facets . keySet ( ) . toString ( ) + " from " + ( target = = null ? "shard" : ( "peer " + target . hash + ":" + target . getName ( ) ) ) ) ;
int term = count ;
Collection < SolrInputDocument > docs ;
if ( event . addResultsToLocalIndex ) { // only needed to store remote results
docs = new ArrayList < SolrInputDocument > ( docList [ 0 ] . size ( ) ) ;
} else docs = null ;
for ( final SolrDocument doc: docList [ 0 ] ) {
for ( final SolrDocument tmp doc: docList [ 0 ] ) {
//System.out.println("***DEBUG*** " + ((String) doc.getFieldValue("sku")));
if ( term - - < = 0 ) {
break ; // do not process more that requested (in case that evil peers fill us up with rubbish)
}
// get one single search result
if ( doc = = null ) {
if ( tmp doc = = null ) {
continue ;
}
URIMetadataNode urlEntry ;
try {
urlEntry = new URIMetadataNode ( doc) ;
urlEntry = new URIMetadataNode ( tmp doc) ;
} catch ( MalformedURLException ex ) {
continue ;
}
@ -1198,73 +1198,61 @@ public final class Protocol {
// put the remote documents to the local index. We must convert the solr document to a solr input document:
if ( event . addResultsToLocalIndex ) {
/* Check document size, only if a limit is set on remote documents size allowed to be stored to local index */
if ( checkDocumentSize ( doc, event . getRemoteDocStoredMaxSize ( ) * 1024 ) ) {
final SolrInputDocument sid = event . query . getSegment ( ) . fulltext ( ) . getDefaultConfiguration ( ) . toSolrInputDocument ( doc) ;
// the input document stays untouched because it contains top-level cloned objects
docs . add ( sid ) ;
// will be stored to index, and is a full solr document, can be added to firstseen
event . query . getSegment ( ) . setFirstSeenTime ( urlEntry . hash ( ) , Math . min ( urlEntry . moddate ( ) . getTime ( ) , System . currentTimeMillis ( ) ) ) ;
} else {
Network . log . info ( "Document size greater than " + event . getRemoteDocStoredMaxSize ( ) + " kbytes, excludes it from being stored to local index. Url : " + urlEntry . urlstring ( ) ) ;
}
/* Check document size, only if a limit is set on remote documents size allowed to be stored to local index */
if ( checkDocumentSize ( tmp doc, event . getRemoteDocStoredMaxSize ( ) * 1024 ) ) {
final SolrInputDocument sid = event . query . getSegment ( ) . fulltext ( ) . getDefaultConfiguration ( ) . toSolrInputDocument ( tmp doc) ;
// the input document stays untouched because it contains top-level cloned objects
docs . add ( sid ) ;
// will be stored to index, and is a full solr document, can be added to firstseen
event . query . getSegment ( ) . setFirstSeenTime ( urlEntry . hash ( ) , Math . min ( urlEntry . moddate ( ) . getTime ( ) , System . currentTimeMillis ( ) ) ) ;
} else {
Network . log . info ( "Document size greater than " + event . getRemoteDocStoredMaxSize ( ) + " kbytes, excludes it from being stored to local index. Url : " + urlEntry . urlstring ( ) ) ;
}
}
// after this conversion we can remove the largest and not used field text_t and synonyms_sxt from the document
// because that goes into a search cache and would take a lot of memory in the search cache
//doc.removeFields(CollectionSchema.text_t.getSolrFieldName());
doc. removeFields ( CollectionSchema . synonyms_sxt . getSolrFieldName ( ) ) ;
tmp doc. removeFields ( CollectionSchema . synonyms_sxt . getSolrFieldName ( ) ) ;
ResultURLs . stack (
ASCII . String ( urlEntry . url ( ) . hash ( ) ) ,
urlEntry . url ( ) . getHost ( ) ,
event . peers . mySeed ( ) . hash . getBytes ( ) ,
UTF8 . getBytes ( target . hash ) ,
EventOrigin . QUERIES ) ;
ASCII . String ( urlEntry . url ( ) . hash ( ) ) ,
urlEntry . url ( ) . getHost ( ) ,
event . peers . mySeed ( ) . hash . getBytes ( ) ,
UTF8 . getBytes ( target . hash ) ,
EventOrigin . QUERIES ) ;
}
// add the url entry to the word indexe s
c ontainer. add ( urlEntry ) ;
// add the url entry to the checked result s
resultC ontainer. add ( urlEntry ) ;
}
final int dls = docList [ 0 ] . size ( ) ;
final int numFound = ( int ) docList [ 0 ] . getNumFound ( ) ;
docList [ 0 ] . clear ( ) ;
docList [ 0 ] = null ;
if ( localsearch ) {
event . addNodes ( c ontainer, facets , snippets , true , "localpeer" , numFound ) ;
event . addNodes ( resultC ontainer, facets , snippets , true , "localpeer" , numFound ) ;
event . addFinalize ( ) ;
event . addExpectedRemoteReferences ( - count ) ;
Network . log . info ( "local search (solr): localpeer sent " + c ontainer. size ( ) + "/" + numFound + " references" ) ;
Network . log . info ( "local search (solr): localpeer sent " + resultC ontainer. size ( ) + "/" + numFound + " references" ) ;
} else {
if ( event . addResultsToLocalIndex ) {
/ *
* Current thread might be interrupted by SearchEvent . cleanup ( )
* /
if ( Thread . interrupted ( ) ) {
throw new InterruptedException ( "solrQuery interrupted" ) ;
}
WriteToLocalIndexThread writeToLocalIndexThread = new WriteToLocalIndexThread ( event . query . getSegment ( ) ,
docs ) ;
writeToLocalIndexThread . start ( ) ;
try {
writeToLocalIndexThread . join ( ) ;
} catch ( InterruptedException e ) {
/ *
* Current thread interruption might happen while waiting
* for writeToLocalIndexThread .
* /
writeToLocalIndexThread . stopWriting ( ) ;
throw new InterruptedException ( "solrQuery interrupted" ) ;
}
docs . clear ( ) ;
/ *
* Current thread might be interrupted by SearchEvent . cleanup ( )
* /
if ( Thread . interrupted ( ) ) {
throw new InterruptedException ( "solrQuery interrupted" ) ;
}
WriteToLocalIndexThread writeToLocalIndexThread = new WriteToLocalIndexThread ( event . query . getSegment ( ) ,
docs ) ; // will clear docs on return
writeToLocalIndexThread . start ( ) ;
}
event . addNodes ( c ontainer, facets , snippets , false , target . getName ( ) + "/" + target . hash , numFound ) ;
event . addNodes ( resultContainer , facets , snippets , false , target . getName ( ) + "/" + target . hash , numFound ) ;
event . addFinalize ( ) ;
event . addExpectedRemoteReferences ( - count ) ;
Network . log . info ( "remote search (solr): peer " + target . getName ( ) + " sent " + ( container. size ( ) = = 0 ? 0 : c ontainer. size ( ) ) + "/" + numFound + " references" ) ;
Network . log . info ( "remote search (solr): peer " + target . getName ( ) + " sent " + ( resultContainer . size ( ) ) + "/" + numFound + " references" ) ;
}
return dls ;
return resultContainer . size ( ) ;
}
/ * *
@ -1285,6 +1273,7 @@ public final class Protocol {
/ * *
* Parameters must be not null .
* After writing the collection is cleared
* @param segment solr segment to write
* @param docs solr documents collection to put to segment
* /
@ -1300,17 +1289,19 @@ public final class Protocol {
this . stop . set ( true ) ;
}
@Override
public void run ( ) {
for ( SolrInputDocument doc : docs ) {
if ( stop . get ( ) ) {
Network . log . info ( "Writing documents collection to Solr segment was stopped." ) ;
return ;
}
segment . putDocument ( doc ) ;
@Override
public void run ( ) {
for ( SolrInputDocument doc : docs ) {
if ( stop . get ( ) ) {
docs . clear ( ) ;
Network . log . info ( "Writing documents collection to Solr segment was stopped." ) ;
return ;
}
segment . putDocument ( doc ) ;
}
}
}
docs . clear ( ) ;
}
}
/ * *
* Only when maxSize is greater than zero , check that doc size is lower . To