@ -60,8 +60,18 @@ import java.util.List;
import java.util.Map ;
import java.util.Set ;
import java.util.TreeMap ;
import java.util.concurrent.atomic.AtomicBoolean ;
import java.util.concurrent.atomic.AtomicInteger ;
import org.apache.http.entity.mime.content.ContentBody ;
import org.apache.solr.client.solrj.SolrQuery ;
import org.apache.solr.client.solrj.response.FacetField ;
import org.apache.solr.client.solrj.response.FacetField.Count ;
import org.apache.solr.client.solrj.response.QueryResponse ;
import org.apache.solr.common.SolrDocument ;
import org.apache.solr.common.SolrDocumentList ;
import org.apache.solr.common.SolrInputDocument ;
import net.yacy.migration ;
import net.yacy.cora.date.GenericFormatter ;
import net.yacy.cora.document.analysis.Classification ;
@ -120,15 +130,6 @@ import net.yacy.server.serverObjects;
import net.yacy.server.serverSwitch ;
import net.yacy.utils.crypt ;
import org.apache.http.entity.mime.content.ContentBody ;
import org.apache.solr.client.solrj.SolrQuery ;
import org.apache.solr.client.solrj.response.FacetField ;
import org.apache.solr.client.solrj.response.QueryResponse ;
import org.apache.solr.client.solrj.response.FacetField.Count ;
import org.apache.solr.common.SolrDocument ;
import org.apache.solr.common.SolrDocumentList ;
import org.apache.solr.common.SolrInputDocument ;
public final class Protocol {
@ -487,7 +488,7 @@ public final class Protocol {
final int partitions ,
final Seed target ,
final SecondarySearchSuperviser secondarySearchSuperviser ,
final Blacklist blacklist ) {
final Blacklist blacklist ) throws InterruptedException {
// send a search request to peer with remote Hash
// INPUT:
@ -586,7 +587,7 @@ public final class Protocol {
final int maxDistance ,
final int partitions ,
final Seed target ,
final Blacklist blacklist ) {
final Blacklist blacklist ) throws InterruptedException {
final long timestamp = System . currentTimeMillis ( ) ;
event . addExpectedRemoteReferences ( count ) ;
@ -639,7 +640,7 @@ public final class Protocol {
final Seed target ,
final Blacklist blacklist ,
final SearchResult result
) throws SpaceExceededException {
) throws SpaceExceededException , InterruptedException {
// create containers
final int words = wordhashes . length ( ) / Word . commonHashLength ;
@ -745,14 +746,24 @@ public final class Protocol {
// insert one container into the search result buffer
// one is enough, only the references are used, not the word
if ( event . addResultsToLocalIndex ) {
for ( URIMetadataNode entry : storeDocs ) {
try {
event . query . getSegment ( ) . setFirstSeenTime ( entry . hash ( ) , Math . min ( entry . moddate ( ) . getTime ( ) , System . currentTimeMillis ( ) ) ) ;
event . query . getSegment ( ) . fulltext ( ) . putMetadata ( entry ) ; // it will be checked inside the putMetadata that poor metadata does not overwrite rich metadata
} catch ( final IOException e ) {
ConcurrentLog . logException ( e ) ;
}
}
/ *
* Current thread might be interrupted by SearchEvent . cleanup ( )
* /
if ( Thread . interrupted ( ) ) {
throw new InterruptedException ( "solrQuery interrupted" ) ;
}
WriteMetadataNodeToLocalIndexThread writerToLocalIndex = new WriteMetadataNodeToLocalIndexThread ( event . query . getSegment ( ) , storeDocs ) ;
writerToLocalIndex . start ( ) ;
try {
writerToLocalIndex . join ( ) ;
} catch ( InterruptedException e ) {
/ *
* Current thread interruption might happen while waiting
* for writeToLocalIndexThread .
* /
writerToLocalIndex . stopWriting ( ) ;
throw new InterruptedException ( "remoteProcess stopped!" ) ;
}
event . addRWIs ( container . get ( 0 ) , false , target . getName ( ) + "/" + target . hash , result . totalCount , time ) ;
} else {
// feed results as nodes (SolrQuery results) which carry metadata,
@ -783,6 +794,56 @@ public final class Protocol {
}
Network . log . info ( "remote search: peer " + target . getName ( ) + " sent " + container . get ( 0 ) . size ( ) + "/" + result . totalCount + " references" ) ;
}
/ * *
* This thread is used to write a collection of URIMetadataNode documents to a segment allowing to be safely stopped .
* Indeed , if one interrupt a thread while commiting to Solr index , the index is closed and will be no more writable
* ( later calls would throw a org . apache . lucene . store . AlreadyClosedException ) because Solr IndexWriter uses an InterruptibleChanel .
* This thread allow to safely stop writing operation using an AtomicBoolean .
* @author luc
*
* /
private static class WriteMetadataNodeToLocalIndexThread extends Thread {
private AtomicBoolean stop = new AtomicBoolean ( false ) ;
private Segment segment ;
private Collection < URIMetadataNode > storeDocs ;
/ * *
* Parameters must be not null .
* @param segment solr segment to write
* @param storeDocs solr documents collection to put to segment
* /
public WriteMetadataNodeToLocalIndexThread ( Segment segment , Collection < URIMetadataNode > storeDocs ) {
this . segment = segment ;
this . storeDocs = storeDocs ;
}
/ * *
* Use this to stop writing operation . This thread will not stop immediately as Solr might be writing something .
* /
public void stopWriting ( ) {
this . stop . set ( true ) ;
}
@Override
public void run ( ) {
for ( URIMetadataNode entry : this . storeDocs ) {
if ( stop . get ( ) ) {
Network . log . info ( "Writing documents collection to Solr segment was stopped." ) ;
return ;
}
try {
segment . setFirstSeenTime ( entry . hash ( ) , Math . min ( entry . moddate ( ) . getTime ( ) , System . currentTimeMillis ( ) ) ) ;
segment . fulltext ( ) . putMetadata ( entry ) ; // it will be checked inside the putMetadata that poor metadata does not overwrite rich metadata
} catch ( final IOException e ) {
ConcurrentLog . logException ( e ) ;
}
}
}
}
private static class SearchResult {
public int availableCount ; // number of returned LURL's for this search
@ -929,6 +990,18 @@ public final class Protocol {
private final static CollectionSchema [ ] snippetFields = new CollectionSchema [ ] { CollectionSchema . description_txt , CollectionSchema . h4_txt , CollectionSchema . h3_txt , CollectionSchema . h2_txt , CollectionSchema . h1_txt , CollectionSchema . text_t } ;
/ * *
* Execute solr query against specified target .
* @param event search event ot feed with results
* @param solrQuery solr query
* @param offset pagination start indice
* @param count expected maximum results
* @param target target peer to query . May be null : in that case , local peer is queried .
* @param partitions
* @param blacklist url list to exclude from results
* @return the size of results list
* @throws InterruptedException when interrupt status on calling thread is detected while processing
* /
protected static int solrQuery (
final SearchEvent event ,
final SolrQuery solrQuery ,
@ -936,7 +1009,7 @@ public final class Protocol {
final int count ,
final Seed target ,
final int partitions ,
final Blacklist blacklist ) {
final Blacklist blacklist ) throws InterruptedException {
//try {System.out.println("*** debug-query *** " + URLDecoder.decode(solrQuery.toString(), "UTF-8"));} catch (UnsupportedEncodingException e) {}
@ -1125,12 +1198,17 @@ public final class Protocol {
// put the remote documents to the local index. We must convert the solr document to a solr input document:
if ( event . addResultsToLocalIndex ) {
final SolrInputDocument sid = event . query . getSegment ( ) . fulltext ( ) . getDefaultConfiguration ( ) . toSolrInputDocument ( doc ) ;
// the input document stays untouched because it contains top-level cloned objects
docs . add ( sid ) ;
// will be stored to index, and is a full solr document, can be added to firstseen
event . query . getSegment ( ) . setFirstSeenTime ( urlEntry . hash ( ) , Math . min ( urlEntry . moddate ( ) . getTime ( ) , System . currentTimeMillis ( ) ) ) ;
/* Check document size, only if a limit is set on remote documents size allowed to be stored to local index */
if ( checkDocumentSize ( doc , event . getRemoteDocStoredMaxSize ( ) * 1024 ) ) {
final SolrInputDocument sid = event . query . getSegment ( ) . fulltext ( ) . getDefaultConfiguration ( ) . toSolrInputDocument ( doc ) ;
// the input document stays untouched because it contains top-level cloned objects
docs . add ( sid ) ;
// will be stored to index, and is a full solr document, can be added to firstseen
event . query . getSegment ( ) . setFirstSeenTime ( urlEntry . hash ( ) , Math . min ( urlEntry . moddate ( ) . getTime ( ) , System . currentTimeMillis ( ) ) ) ;
} else {
Network . log . info ( "Document size greater than " + event . getRemoteDocStoredMaxSize ( ) + " kbytes, excludes it from being stored to local index. Url : " + urlEntry . urlstring ( ) ) ;
}
}
// after this conversion we can remove the largest and not used field text_t and synonyms_sxt from the document
@ -1160,10 +1238,26 @@ public final class Protocol {
Network . log . info ( "local search (solr): localpeer sent " + container . size ( ) + "/" + numFound + " references" ) ;
} else {
if ( event . addResultsToLocalIndex ) {
for ( SolrInputDocument doc : docs ) {
event . query . getSegment ( ) . putDocument ( doc ) ;
}
docs . clear ( ) ; docs = null ;
/ *
* Current thread might be interrupted by SearchEvent . cleanup ( )
* /
if ( Thread . interrupted ( ) ) {
throw new InterruptedException ( "solrQuery interrupted" ) ;
}
WriteToLocalIndexThread writeToLocalIndexThread = new WriteToLocalIndexThread ( event . query . getSegment ( ) ,
docs ) ;
writeToLocalIndexThread . start ( ) ;
try {
writeToLocalIndexThread . join ( ) ;
} catch ( InterruptedException e ) {
/ *
* Current thread interruption might happen while waiting
* for writeToLocalIndexThread .
* /
writeToLocalIndexThread . stopWriting ( ) ;
throw new InterruptedException ( "solrQuery interrupted" ) ;
}
docs . clear ( ) ;
}
event . addNodes ( container , facets , snippets , false , target . getName ( ) + "/" + target . hash , numFound ) ;
event . addFinalize ( ) ;
@ -1172,6 +1266,78 @@ public final class Protocol {
}
return dls ;
}
/ * *
* This thread is used to write a collection of Solr documents to a segment allowing to be safely stopped .
* Indeed , if one interrupt a thread while commiting to Solr index , the index is closed and will be no more writable
* ( later calls would throw a org . apache . lucene . store . AlreadyClosedException ) because Solr IndexWriter uses an InterruptibleChanel .
* This thead allow to safely stop writing operation using an AtomicBoolean .
* @author luc
*
* /
private static class WriteToLocalIndexThread extends Thread {
private AtomicBoolean stop = new AtomicBoolean ( false ) ;
private Segment segment ;
private Collection < SolrInputDocument > docs ;
/ * *
* Parameters must be not null .
* @param segment solr segment to write
* @param docs solr documents collection to put to segment
* /
public WriteToLocalIndexThread ( Segment segment , Collection < SolrInputDocument > docs ) {
this . segment = segment ;
this . docs = docs ;
}
/ * *
* Use this to stop writing operation . This thread will not stop immediately as Solr might be writing something .
* /
public void stopWriting ( ) {
this . stop . set ( true ) ;
}
@Override
public void run ( ) {
for ( SolrInputDocument doc : docs ) {
if ( stop . get ( ) ) {
Network . log . info ( "Writing documents collection to Solr segment was stopped." ) ;
return ;
}
segment . putDocument ( doc ) ;
}
}
}
/ * *
* Only when maxSize is greater than zero , check that doc size is lower . To
* process in a reasonable amount of time , document size is not evaluated
* summing all fields sizes , but only against text_t field which is quite representative and might weigh
* some MB .
*
* @param doc
* document to verify . Must not be null .
* @param maxSize
* maximum allowed size in bytes
* @return true when document evaluated size is lower or equal than maxSize , or when
* maxSize is lower or equal than zero .
* /
protected static boolean checkDocumentSize ( SolrDocument doc , long maxSize ) {
if ( maxSize > 0 ) {
/* All text field is often the largest */
Object value = doc . getFieldValue ( CollectionSchema . text_t . getSolrFieldName ( ) ) ;
if ( value instanceof String ) {
/* Each char uses 2 bytes */
if ( ( ( String ) value ) . length ( ) > ( maxSize / 2 ) ) {
return false ;
}
}
}
return true ;
}
public static Map < String , String > permissionMessage ( final String targetAddress , final String targetHash ) {
// ask for allowed message size and attachment size