From ff3eaa21b0d900e85db718f01948fbac3b9af852 Mon Sep 17 00:00:00 2001 From: Michael Peter Christen Date: Mon, 20 Aug 2012 12:16:11 +0200 Subject: [PATCH] added remote search to solr on YaCy peers! - when doing a remote search, node peers are selected for solr queries - the solr query is done concurrently to the standard YaCy rwi search - the solr search result is feeded into the same data structure that prepares the rwi search result - the same remote seach that is done to several outside peers is done to the local solr index - the search process works now also without any 'old' RWI data using solr --- .classpath | 4 +- .../federated/solr/SolrServerConnector.java | 56 +++---- source/net/yacy/peers/Protocol.java | 150 ++++++++++++++++-- source/net/yacy/peers/RemoteSearch.java | 57 +++++++ source/net/yacy/search/query/RWIProcess.java | 8 +- source/net/yacy/search/query/SearchEvent.java | 6 +- 6 files changed, 222 insertions(+), 59 deletions(-) diff --git a/.classpath b/.classpath index bef2c2b61..7ea5628c9 100644 --- a/.classpath +++ b/.classpath @@ -39,8 +39,8 @@ - - + + diff --git a/source/net/yacy/cora/services/federated/solr/SolrServerConnector.java b/source/net/yacy/cora/services/federated/solr/SolrServerConnector.java index ab5d797c9..cf42ce4ef 100644 --- a/source/net/yacy/cora/services/federated/solr/SolrServerConnector.java +++ b/source/net/yacy/cora/services/federated/solr/SolrServerConnector.java @@ -28,10 +28,13 @@ import java.util.List; import net.yacy.kelondro.logging.Log; +import org.apache.solr.client.solrj.ResponseParser; import org.apache.solr.client.solrj.SolrQuery; import org.apache.solr.client.solrj.SolrServer; import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.impl.XMLResponseParser; import org.apache.solr.client.solrj.request.ContentStreamUpdateRequest; +import org.apache.solr.client.solrj.request.QueryRequest; import org.apache.solr.client.solrj.response.QueryResponse; import org.apache.solr.common.SolrDocument; import org.apache.solr.common.SolrDocumentList; @@ -202,27 +205,21 @@ public class SolrServerConnector extends AbstractSolrConnector implements SolrCo //query.addSortField( "price", SolrQuery.ORDER.asc ); // query the server - //SearchResult result = new SearchResult(count); - Throwable error = null; // well this is a bad hack; see https://issues.apache.org/jira/browse/LUCENE-2239 - // also: https://issues.apache.org/jira/browse/SOLR-2247 - // we might try also: $JAVA_OPTS -Dsolr.directoryFactory=solr.MMapDirectoryFactory - for (int retry = 30; retry > 0; retry--) { - try { - final QueryResponse rsp = this.server.query(query); - final SolrDocumentList docs = rsp.getResults(); - if (error != null) Log.logWarning("AbstractSolrConnector", "produced search result by silently ignoring an error before, message = " + error.getMessage()); - return docs; - } catch (final Throwable e) { - Log.logWarning("AbstractSolrConnection", "problem with query=" + querystring, e); - error = e; - continue; - } + try { + QueryRequest request = new QueryRequest(query); + ResponseParser responseParser = new XMLResponseParser(); + request.setResponseParser(responseParser); + final QueryResponse rsp = request.process(this.server); + final SolrDocumentList docs = rsp.getResults(); + return docs; + } catch (final Throwable e) { + //Log.logException(e); + throw new IOException(e.getMessage(), e); } - throw new IOException(error.getMessage(), error); } private final char[] queryIDTemplate = "id:\" \"".toCharArray(); - + /** * get a document from solr by given id * @param id @@ -234,7 +231,7 @@ public class SolrServerConnector extends AbstractSolrConnector implements SolrCo assert id.length() == 12; // construct query char[] q = new char[17]; - System.arraycopy(queryIDTemplate, 0, q, 0, 17); + System.arraycopy(this.queryIDTemplate, 0, q, 0, 17); System.arraycopy(id.toCharArray(), 0, q, 4, 12); final SolrQuery query = new SolrQuery(); query.setQuery(new String(q)); @@ -242,23 +239,14 @@ public class SolrServerConnector extends AbstractSolrConnector implements SolrCo query.setStart(0); // query the server - Throwable error = null; // well this is a bad hack; see https://issues.apache.org/jira/browse/LUCENE-2239 - // also: https://issues.apache.org/jira/browse/SOLR-2247 - // we might try also: $JAVA_OPTS -Dsolr.directoryFactory=solr.MMapDirectoryFactory - for (int retry = 30; retry > 0; retry--) { - try { - final QueryResponse rsp = this.server.query(query); - final SolrDocumentList docs = rsp.getResults(); - if (docs.isEmpty()) return null; - if (error != null) Log.logWarning("AbstractSolrConnector", "produced search result by silently ignoring an error before, message = " + error.getMessage()); - return docs.get(0); - } catch (final Throwable e) { - Log.logWarning("AbstractSolrConnection", "problem with id=" + id, e); - error = e; - continue; - } + try { + final QueryResponse rsp = this.server.query(query); + final SolrDocumentList docs = rsp.getResults(); + if (docs.isEmpty()) return null; + return docs.get(0); + } catch (final Throwable e) { + throw new IOException(e.getMessage(), e); } - throw new IOException(error.getMessage(), error); } } diff --git a/source/net/yacy/peers/Protocol.java b/source/net/yacy/peers/Protocol.java index 2810eca6a..330fc2e0f 100644 --- a/source/net/yacy/peers/Protocol.java +++ b/source/net/yacy/peers/Protocol.java @@ -76,12 +76,15 @@ import net.yacy.cora.services.federated.opensearch.SRURSSConnector; import net.yacy.cora.services.federated.solr.RemoteSolrConnector; import net.yacy.cora.services.federated.solr.SolrConnector; import net.yacy.cora.services.federated.yacy.CacheStrategy; +import net.yacy.cora.storage.HandleSet; import net.yacy.cora.util.SpaceExceededException; import net.yacy.kelondro.data.meta.URIMetadata; +import net.yacy.kelondro.data.meta.URIMetadataNode; import net.yacy.kelondro.data.meta.URIMetadataRow; import net.yacy.kelondro.data.word.Word; import net.yacy.kelondro.data.word.WordReference; import net.yacy.kelondro.data.word.WordReferenceFactory; +import net.yacy.kelondro.data.word.WordReferenceVars; import net.yacy.kelondro.logging.Log; import net.yacy.kelondro.order.Base64Order; import net.yacy.kelondro.order.Digest; @@ -100,12 +103,15 @@ import net.yacy.search.EventTracker; import net.yacy.search.Switchboard; import net.yacy.search.SwitchboardConstants; import net.yacy.search.index.Segment; -import net.yacy.search.query.RWIProcess; +import net.yacy.search.index.YaCySchema; import net.yacy.search.query.SearchEvent; import net.yacy.search.snippet.TextSnippet; import org.apache.http.entity.mime.content.ContentBody; +import org.apache.solr.client.solrj.util.ClientUtils; +import org.apache.solr.common.SolrDocument; import org.apache.solr.common.SolrDocumentList; +import org.apache.solr.common.SolrException; import de.anomic.crawler.ResultURLs; import de.anomic.crawler.ResultURLs.EventOrigin; @@ -642,7 +648,7 @@ public final class Protocol ); } catch ( final IOException e ) { Network.log.logInfo("SEARCH failed, Peer: " + target.hash + ":" + target.getName() + " (" + e.getMessage() + ")"); - //yacyCore.peerActions.peerDeparture(target, "search request to peer created io exception: " + e.getMessage()); + event.peers.peerActions.peerDeparture(target, "search request to peer created io exception: " + e.getMessage()); return -1; } // computation time @@ -722,7 +728,7 @@ public final class Protocol ); } catch ( final IOException e ) { Network.log.logInfo("SEARCH failed, Peer: " + target.hash + ":" + target.getName() + " (" + e.getMessage() + ")"); - //yacyCore.peerActions.peerDeparture(target, "search request to peer created io exception: " + e.getMessage()); + event.peers.peerActions.peerDeparture(target, "search request to peer created io exception: " + e.getMessage()); return -1; } // computation time @@ -1012,20 +1018,132 @@ public final class Protocol } } - public static void solrQuery( - final RWIProcess containerCache, - final String solrURL, final String querystring, final int offset, final int count) { - containerCache.oneFeederStarted(); - SolrConnector solrConnector; - try { - solrConnector = new RemoteSolrConnector(solrURL); - SolrDocumentList docList = solrConnector.query(querystring, offset, count); - - } catch (IOException e) { - Log.logException(e); - } finally { - containerCache.oneFeederTerminated(); + public static int solrQuery( + final SearchEvent event, + final HandleSet wordhashes, + final int offset, + final int count, + final long time, + final Seed target, + final Blacklist blacklist) { + event.rankingProcess.addExpectedRemoteReferences(count); + SolrDocumentList docList = null; + final String solrQuerystring = YaCySchema.text_t + ":" + event.getQuery().queryString(true).replace(' ', '+'); + boolean localsearch = target == null || target.equals(event.peers.mySeed()); + if (localsearch) { + // search the local index + try { + docList = event.rankingProcess.getQuery().getSegment().fulltext().getSolr().query(solrQuerystring, offset, count); + } catch (SolrException e) { + Network.log.logInfo("SEARCH failed (solr), localpeer (" + e.getMessage() + ")"); + return -1; + } catch (IOException e) { + Network.log.logInfo("SEARCH failed (solr), localpeer (" + e.getMessage() + ")"); + return -1; + } + } else { + final String solrURL = "http://" + target.getPublicAddress() + "/solr"; + try { + SolrConnector solrConnector = new RemoteSolrConnector(solrURL); + docList = solrConnector.query(solrQuerystring, offset, count); + // no need to close this here because that sends a commit to remote solr which is not wanted here + } catch (IOException e) { + Network.log.logInfo("SEARCH failed (solr), Peer: " + target.hash + ":" + target.getName() + " (" + e.getMessage() + ")"); + event.peers.peerActions.peerDeparture(target, "search request to peer created io exception: " + e.getMessage()); + return -1; + } + } + + // evaluate result + if (docList.size() > 0) {// create containers + final List> container = new ArrayList>(wordhashes.size()); + for (byte[] hash: wordhashes) { + try { + container.add(ReferenceContainer.emptyContainer( + Segment.wordReferenceFactory, + hash, + count)); + } catch (SpaceExceededException e) { + } // throws SpaceExceededException + } + + int term = count; + for (final SolrDocument doc: docList) { + if ( term-- <= 0 ) { + break; // do not process more that requested (in case that evil peers fill us up with rubbish) + } + // get one single search result + if ( doc == null ) { + continue; + } + URIMetadataNode urlEntry = new URIMetadataNode(doc); + + if ( blacklist.isListed(BlacklistType.SEARCH, urlEntry) ) { + if ( Network.log.isInfo() ) { + if (localsearch) { + Network.log.logInfo("local search (solr): filtered blacklisted url " + urlEntry.url()); + } else { + Network.log.logInfo("remote search (solr): filtered blacklisted url " + urlEntry.url() + " from peer " + target.getName()); + } + } + continue; // block with backlist + } + + final String urlRejectReason = + Switchboard.getSwitchboard().crawlStacker.urlInAcceptedDomain(urlEntry.url()); + if ( urlRejectReason != null ) { + if ( Network.log.isInfo() ) { + if (localsearch) { + Network.log.logInfo("local search (solr): rejected url '" + urlEntry.url() + "' (" + urlRejectReason + ")"); + } else { + Network.log.logInfo("remote search (solr): rejected url '" + urlEntry.url() + "' (" + urlRejectReason + ") from peer " + target.getName()); + } + } + continue; // reject url outside of our domain + } + + // passed all checks, store url + if (!localsearch) { + try { + event.getQuery().getSegment().fulltext().putDocument(ClientUtils.toSolrInputDocument(doc)); + ResultURLs.stack( + urlEntry, + event.peers.mySeed().hash.getBytes(), + UTF8.getBytes(target.hash), + EventOrigin.QUERIES); + } catch ( final IOException e ) { + Network.log.logWarning("could not store search result", e); + continue; // db-error + } + } + + // we create virtual word references here which are necessary to feed search results into retrieval process + Reference entry = new WordReferenceVars(urlEntry); + + // add the url entry to the word indexes + for ( final ReferenceContainer c : container ) { + try { + c.add(entry); + } catch ( final SpaceExceededException e ) { + Log.logException(e); + break; + } + } + } + + if (localsearch) { + event.rankingProcess.add(container.get(0), true, "localpeer", docList.size(), time); + event.rankingProcess.addFinalize(); + event.rankingProcess.addExpectedRemoteReferences(-count); + Network.log.logInfo("local search (solr): localpeer sent " + container.get(0).size() + "/" + docList.size() + " references"); + } else { + event.rankingProcess.add(container.get(0), false, target.getName() + "/" + target.hash, docList.size(), time); + event.rankingProcess.addFinalize(); + event.rankingProcess.addExpectedRemoteReferences(-count); + Network.log.logInfo("remote search (solr): peer " + target.getName() + " sent " + container.get(0).size() + "/" + docList.size() + " references"); + } } + return docList.size(); } public static Map permissionMessage(final SeedDB seedDB, final String targetHash) { diff --git a/source/net/yacy/peers/RemoteSearch.java b/source/net/yacy/peers/RemoteSearch.java index dca2656dc..8f0b452ce 100644 --- a/source/net/yacy/peers/RemoteSearch.java +++ b/source/net/yacy/peers/RemoteSearch.java @@ -24,6 +24,7 @@ package net.yacy.peers; +import java.util.HashSet; import java.util.Iterator; import java.util.Set; import java.util.SortedMap; @@ -162,6 +163,16 @@ public class RemoteSearch extends Thread { burstMultiwordPercent) : PeerSelection.selectClusterPeers(event.peers, clusterselection); if (targetPeers == null) return; + + // start solr searches + Set omit = new HashSet(); + for (Seed s: targetPeers) omit.add(s); + Seed[] nodes = PeerSelection.selectNodeSearchTargets(event.peers, 20, omit); + for (Seed s: nodes) { + solrRemoteSearch(event, count, event.getQuery().query_include_hashes, time, s, blacklist); + } + + // start search to YaCy peers final int targets = targetPeers.length; if (targets == 0) return; for (int i = 0; i < targets; i++) { @@ -244,6 +255,52 @@ public class RemoteSearch extends Thread { return secondary; } + public static Thread solrRemoteSearch( + final SearchEvent event, + final int count, + final HandleSet wordhashes, + final long time, + final Seed targetPeer, + final Blacklist blacklist) { + + // check own peer status + if (event.peers.mySeed() == null || event.peers.mySeed().getPublicAddress() == null) { return null; } + + // prepare seed targets and threads + if (targetPeer != null && targetPeer.hash != null && event.preselectedPeerHashes != null) targetPeer.setAlternativeAddress(event.preselectedPeerHashes.get(ASCII.getBytes(targetPeer.hash))); + Thread solr = new Thread() { + @Override + public void run() { + event.rankingProcess.oneFeederStarted(); + try { + int urls = Protocol.solrQuery( + event, + wordhashes, + 0, + count, + time, + targetPeer, + blacklist); + if (urls >= 0) { + // urls is an array of url hashes. this is only used for log output + event.peers.mySeed().incRI(urls); + event.peers.mySeed().incRU(urls); + } else { + if (targetPeer != null) { + Network.log.logInfo("REMOTE SEARCH - no answer from remote peer " + targetPeer.hash + ":" + targetPeer.getName()); + } + } + } catch (final Exception e) { + Log.logException(e); + } finally { + event.rankingProcess.oneFeederTerminated(); + } + } + }; + solr.start(); + return solr; + } + public static int remainingWaiting(final RemoteSearch[] searchThreads) { if (searchThreads == null) return 0; int alive = 0; diff --git a/source/net/yacy/search/query/RWIProcess.java b/source/net/yacy/search/query/RWIProcess.java index cf708ea5e..5514fa3fd 100644 --- a/source/net/yacy/search/query/RWIProcess.java +++ b/source/net/yacy/search/query/RWIProcess.java @@ -26,7 +26,6 @@ package net.yacy.search.query; -import java.io.IOException; import java.util.Comparator; import java.util.ConcurrentModificationException; import java.util.HashMap; @@ -59,7 +58,6 @@ import net.yacy.document.Condenser; import net.yacy.document.LibraryProvider; import net.yacy.kelondro.data.meta.DigestURI; import net.yacy.kelondro.data.meta.URIMetadata; -import net.yacy.kelondro.data.meta.URIMetadataNode; import net.yacy.kelondro.data.meta.URIMetadataRow; import net.yacy.kelondro.data.word.Word; import net.yacy.kelondro.data.word.WordReference; @@ -76,10 +74,6 @@ import net.yacy.search.index.Segment; import net.yacy.search.ranking.ReferenceOrder; import net.yacy.search.snippet.ResultEntry; -import org.apache.solr.common.SolrDocument; -import org.apache.solr.common.SolrDocumentList; -import org.apache.solr.common.SolrException; - import com.hp.hpl.jena.rdf.model.RDFNode; import com.hp.hpl.jena.rdf.model.Resource; @@ -200,6 +194,7 @@ public final class RWIProcess extends Thread @Override public void run() { + /* // start a concurrent solr search if (this.query.query_include_words != null) { Thread solrSearch = new Thread() { @@ -226,6 +221,7 @@ public final class RWIProcess extends Thread }; solrSearch.start(); } + */ // do a search oneFeederStarted(); diff --git a/source/net/yacy/search/query/SearchEvent.java b/source/net/yacy/search/query/SearchEvent.java index 22a3e18b7..3bd50cfd4 100644 --- a/source/net/yacy/search/query/SearchEvent.java +++ b/source/net/yacy/search/query/SearchEvent.java @@ -144,11 +144,15 @@ public final class SearchEvent { .getFlagAcceptRemoteIndex())); final long start = System.currentTimeMillis(); + // prepare a local RWI search // initialize a ranking process that is the target for data // that is generated concurrently from local and global search threads this.rankingProcess = new RWIProcess(this.query, this.order, remote); - // start a local search concurrently + // start a local solr search + RemoteSearch.solrRemoteSearch(this, 100, this.query.query_include_hashes, 10000, null, Switchboard.urlBlacklist); + + // start a local RWI search concurrently this.rankingProcess.start(); if ( remote ) {