added remote search to solr on YaCy peers!

- when doing a remote search, node peers are selected for solr queries
- the solr query is done concurrently to the standard YaCy rwi search
- the solr search result is feeded into the same data structure that
prepares the rwi search result
- the same remote seach that is done to several outside peers is done to
the local solr index
- the search process works now also without any 'old' RWI data using
solr
pull/1/head
Michael Peter Christen 13 years ago
parent a06123aec6
commit ff3eaa21b0

@ -39,8 +39,8 @@
<classpathentry kind="lib" path="lib/jcl-over-slf4j-1.6.1.jar"/>
<classpathentry kind="lib" path="lib/wstx-asl-3.2.7.jar"/>
<classpathentry kind="lib" path="lib/commons-codec-1.6.jar"/>
<classpathentry kind="lib" path="lib/httpcore-4.2.1.jar"/>
<classpathentry kind="lib" path="lib/httpclient-4.2.1.jar"/>
<classpathentry kind="lib" path="lib/httpcore-4.2.1.jar" sourcepath="/Volumes/Raptor/Data/sourcecode/httpcore/src/main/java"/>
<classpathentry kind="lib" path="lib/httpclient-4.2.1.jar" sourcepath="/Volumes/Raptor/Data/sourcecode/httpclient/src/main/java"/>
<classpathentry kind="lib" path="lib/httpmime-4.2.1.jar"/>
<classpathentry kind="lib" path="lib/commons-io-2.1.jar"/>
<classpathentry kind="lib" path="lib/commons-compress-1.4.1.jar"/>

@ -28,10 +28,13 @@ import java.util.List;
import net.yacy.kelondro.logging.Log;
import org.apache.solr.client.solrj.ResponseParser;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.XMLResponseParser;
import org.apache.solr.client.solrj.request.ContentStreamUpdateRequest;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList;
@ -202,27 +205,21 @@ public class SolrServerConnector extends AbstractSolrConnector implements SolrCo
//query.addSortField( "price", SolrQuery.ORDER.asc );
// query the server
//SearchResult result = new SearchResult(count);
Throwable error = null; // well this is a bad hack; see https://issues.apache.org/jira/browse/LUCENE-2239
// also: https://issues.apache.org/jira/browse/SOLR-2247
// we might try also: $JAVA_OPTS -Dsolr.directoryFactory=solr.MMapDirectoryFactory
for (int retry = 30; retry > 0; retry--) {
try {
final QueryResponse rsp = this.server.query(query);
final SolrDocumentList docs = rsp.getResults();
if (error != null) Log.logWarning("AbstractSolrConnector", "produced search result by silently ignoring an error before, message = " + error.getMessage());
return docs;
} catch (final Throwable e) {
Log.logWarning("AbstractSolrConnection", "problem with query=" + querystring, e);
error = e;
continue;
}
try {
QueryRequest request = new QueryRequest(query);
ResponseParser responseParser = new XMLResponseParser();
request.setResponseParser(responseParser);
final QueryResponse rsp = request.process(this.server);
final SolrDocumentList docs = rsp.getResults();
return docs;
} catch (final Throwable e) {
//Log.logException(e);
throw new IOException(e.getMessage(), e);
}
throw new IOException(error.getMessage(), error);
}
private final char[] queryIDTemplate = "id:\" \"".toCharArray();
/**
* get a document from solr by given id
* @param id
@ -234,7 +231,7 @@ public class SolrServerConnector extends AbstractSolrConnector implements SolrCo
assert id.length() == 12;
// construct query
char[] q = new char[17];
System.arraycopy(queryIDTemplate, 0, q, 0, 17);
System.arraycopy(this.queryIDTemplate, 0, q, 0, 17);
System.arraycopy(id.toCharArray(), 0, q, 4, 12);
final SolrQuery query = new SolrQuery();
query.setQuery(new String(q));
@ -242,23 +239,14 @@ public class SolrServerConnector extends AbstractSolrConnector implements SolrCo
query.setStart(0);
// query the server
Throwable error = null; // well this is a bad hack; see https://issues.apache.org/jira/browse/LUCENE-2239
// also: https://issues.apache.org/jira/browse/SOLR-2247
// we might try also: $JAVA_OPTS -Dsolr.directoryFactory=solr.MMapDirectoryFactory
for (int retry = 30; retry > 0; retry--) {
try {
final QueryResponse rsp = this.server.query(query);
final SolrDocumentList docs = rsp.getResults();
if (docs.isEmpty()) return null;
if (error != null) Log.logWarning("AbstractSolrConnector", "produced search result by silently ignoring an error before, message = " + error.getMessage());
return docs.get(0);
} catch (final Throwable e) {
Log.logWarning("AbstractSolrConnection", "problem with id=" + id, e);
error = e;
continue;
}
try {
final QueryResponse rsp = this.server.query(query);
final SolrDocumentList docs = rsp.getResults();
if (docs.isEmpty()) return null;
return docs.get(0);
} catch (final Throwable e) {
throw new IOException(e.getMessage(), e);
}
throw new IOException(error.getMessage(), error);
}
}

@ -76,12 +76,15 @@ import net.yacy.cora.services.federated.opensearch.SRURSSConnector;
import net.yacy.cora.services.federated.solr.RemoteSolrConnector;
import net.yacy.cora.services.federated.solr.SolrConnector;
import net.yacy.cora.services.federated.yacy.CacheStrategy;
import net.yacy.cora.storage.HandleSet;
import net.yacy.cora.util.SpaceExceededException;
import net.yacy.kelondro.data.meta.URIMetadata;
import net.yacy.kelondro.data.meta.URIMetadataNode;
import net.yacy.kelondro.data.meta.URIMetadataRow;
import net.yacy.kelondro.data.word.Word;
import net.yacy.kelondro.data.word.WordReference;
import net.yacy.kelondro.data.word.WordReferenceFactory;
import net.yacy.kelondro.data.word.WordReferenceVars;
import net.yacy.kelondro.logging.Log;
import net.yacy.kelondro.order.Base64Order;
import net.yacy.kelondro.order.Digest;
@ -100,12 +103,15 @@ import net.yacy.search.EventTracker;
import net.yacy.search.Switchboard;
import net.yacy.search.SwitchboardConstants;
import net.yacy.search.index.Segment;
import net.yacy.search.query.RWIProcess;
import net.yacy.search.index.YaCySchema;
import net.yacy.search.query.SearchEvent;
import net.yacy.search.snippet.TextSnippet;
import org.apache.http.entity.mime.content.ContentBody;
import org.apache.solr.client.solrj.util.ClientUtils;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrException;
import de.anomic.crawler.ResultURLs;
import de.anomic.crawler.ResultURLs.EventOrigin;
@ -642,7 +648,7 @@ public final class Protocol
);
} catch ( final IOException e ) {
Network.log.logInfo("SEARCH failed, Peer: " + target.hash + ":" + target.getName() + " (" + e.getMessage() + ")");
//yacyCore.peerActions.peerDeparture(target, "search request to peer created io exception: " + e.getMessage());
event.peers.peerActions.peerDeparture(target, "search request to peer created io exception: " + e.getMessage());
return -1;
}
// computation time
@ -722,7 +728,7 @@ public final class Protocol
);
} catch ( final IOException e ) {
Network.log.logInfo("SEARCH failed, Peer: " + target.hash + ":" + target.getName() + " (" + e.getMessage() + ")");
//yacyCore.peerActions.peerDeparture(target, "search request to peer created io exception: " + e.getMessage());
event.peers.peerActions.peerDeparture(target, "search request to peer created io exception: " + e.getMessage());
return -1;
}
// computation time
@ -1012,20 +1018,132 @@ public final class Protocol
}
}
public static void solrQuery(
final RWIProcess containerCache,
final String solrURL, final String querystring, final int offset, final int count) {
containerCache.oneFeederStarted();
SolrConnector solrConnector;
try {
solrConnector = new RemoteSolrConnector(solrURL);
SolrDocumentList docList = solrConnector.query(querystring, offset, count);
} catch (IOException e) {
Log.logException(e);
} finally {
containerCache.oneFeederTerminated();
public static int solrQuery(
final SearchEvent event,
final HandleSet wordhashes,
final int offset,
final int count,
final long time,
final Seed target,
final Blacklist blacklist) {
event.rankingProcess.addExpectedRemoteReferences(count);
SolrDocumentList docList = null;
final String solrQuerystring = YaCySchema.text_t + ":" + event.getQuery().queryString(true).replace(' ', '+');
boolean localsearch = target == null || target.equals(event.peers.mySeed());
if (localsearch) {
// search the local index
try {
docList = event.rankingProcess.getQuery().getSegment().fulltext().getSolr().query(solrQuerystring, offset, count);
} catch (SolrException e) {
Network.log.logInfo("SEARCH failed (solr), localpeer (" + e.getMessage() + ")");
return -1;
} catch (IOException e) {
Network.log.logInfo("SEARCH failed (solr), localpeer (" + e.getMessage() + ")");
return -1;
}
} else {
final String solrURL = "http://" + target.getPublicAddress() + "/solr";
try {
SolrConnector solrConnector = new RemoteSolrConnector(solrURL);
docList = solrConnector.query(solrQuerystring, offset, count);
// no need to close this here because that sends a commit to remote solr which is not wanted here
} catch (IOException e) {
Network.log.logInfo("SEARCH failed (solr), Peer: " + target.hash + ":" + target.getName() + " (" + e.getMessage() + ")");
event.peers.peerActions.peerDeparture(target, "search request to peer created io exception: " + e.getMessage());
return -1;
}
}
// evaluate result
if (docList.size() > 0) {// create containers
final List<ReferenceContainer<WordReference>> container = new ArrayList<ReferenceContainer<WordReference>>(wordhashes.size());
for (byte[] hash: wordhashes) {
try {
container.add(ReferenceContainer.emptyContainer(
Segment.wordReferenceFactory,
hash,
count));
} catch (SpaceExceededException e) {
} // throws SpaceExceededException
}
int term = count;
for (final SolrDocument doc: docList) {
if ( term-- <= 0 ) {
break; // do not process more that requested (in case that evil peers fill us up with rubbish)
}
// get one single search result
if ( doc == null ) {
continue;
}
URIMetadataNode urlEntry = new URIMetadataNode(doc);
if ( blacklist.isListed(BlacklistType.SEARCH, urlEntry) ) {
if ( Network.log.isInfo() ) {
if (localsearch) {
Network.log.logInfo("local search (solr): filtered blacklisted url " + urlEntry.url());
} else {
Network.log.logInfo("remote search (solr): filtered blacklisted url " + urlEntry.url() + " from peer " + target.getName());
}
}
continue; // block with backlist
}
final String urlRejectReason =
Switchboard.getSwitchboard().crawlStacker.urlInAcceptedDomain(urlEntry.url());
if ( urlRejectReason != null ) {
if ( Network.log.isInfo() ) {
if (localsearch) {
Network.log.logInfo("local search (solr): rejected url '" + urlEntry.url() + "' (" + urlRejectReason + ")");
} else {
Network.log.logInfo("remote search (solr): rejected url '" + urlEntry.url() + "' (" + urlRejectReason + ") from peer " + target.getName());
}
}
continue; // reject url outside of our domain
}
// passed all checks, store url
if (!localsearch) {
try {
event.getQuery().getSegment().fulltext().putDocument(ClientUtils.toSolrInputDocument(doc));
ResultURLs.stack(
urlEntry,
event.peers.mySeed().hash.getBytes(),
UTF8.getBytes(target.hash),
EventOrigin.QUERIES);
} catch ( final IOException e ) {
Network.log.logWarning("could not store search result", e);
continue; // db-error
}
}
// we create virtual word references here which are necessary to feed search results into retrieval process
Reference entry = new WordReferenceVars(urlEntry);
// add the url entry to the word indexes
for ( final ReferenceContainer<WordReference> c : container ) {
try {
c.add(entry);
} catch ( final SpaceExceededException e ) {
Log.logException(e);
break;
}
}
}
if (localsearch) {
event.rankingProcess.add(container.get(0), true, "localpeer", docList.size(), time);
event.rankingProcess.addFinalize();
event.rankingProcess.addExpectedRemoteReferences(-count);
Network.log.logInfo("local search (solr): localpeer sent " + container.get(0).size() + "/" + docList.size() + " references");
} else {
event.rankingProcess.add(container.get(0), false, target.getName() + "/" + target.hash, docList.size(), time);
event.rankingProcess.addFinalize();
event.rankingProcess.addExpectedRemoteReferences(-count);
Network.log.logInfo("remote search (solr): peer " + target.getName() + " sent " + container.get(0).size() + "/" + docList.size() + " references");
}
}
return docList.size();
}
public static Map<String, String> permissionMessage(final SeedDB seedDB, final String targetHash) {

@ -24,6 +24,7 @@
package net.yacy.peers;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.SortedMap;
@ -162,6 +163,16 @@ public class RemoteSearch extends Thread {
burstMultiwordPercent)
: PeerSelection.selectClusterPeers(event.peers, clusterselection);
if (targetPeers == null) return;
// start solr searches
Set<Seed> omit = new HashSet<Seed>();
for (Seed s: targetPeers) omit.add(s);
Seed[] nodes = PeerSelection.selectNodeSearchTargets(event.peers, 20, omit);
for (Seed s: nodes) {
solrRemoteSearch(event, count, event.getQuery().query_include_hashes, time, s, blacklist);
}
// start search to YaCy peers
final int targets = targetPeers.length;
if (targets == 0) return;
for (int i = 0; i < targets; i++) {
@ -244,6 +255,52 @@ public class RemoteSearch extends Thread {
return secondary;
}
public static Thread solrRemoteSearch(
final SearchEvent event,
final int count,
final HandleSet wordhashes,
final long time,
final Seed targetPeer,
final Blacklist blacklist) {
// check own peer status
if (event.peers.mySeed() == null || event.peers.mySeed().getPublicAddress() == null) { return null; }
// prepare seed targets and threads
if (targetPeer != null && targetPeer.hash != null && event.preselectedPeerHashes != null) targetPeer.setAlternativeAddress(event.preselectedPeerHashes.get(ASCII.getBytes(targetPeer.hash)));
Thread solr = new Thread() {
@Override
public void run() {
event.rankingProcess.oneFeederStarted();
try {
int urls = Protocol.solrQuery(
event,
wordhashes,
0,
count,
time,
targetPeer,
blacklist);
if (urls >= 0) {
// urls is an array of url hashes. this is only used for log output
event.peers.mySeed().incRI(urls);
event.peers.mySeed().incRU(urls);
} else {
if (targetPeer != null) {
Network.log.logInfo("REMOTE SEARCH - no answer from remote peer " + targetPeer.hash + ":" + targetPeer.getName());
}
}
} catch (final Exception e) {
Log.logException(e);
} finally {
event.rankingProcess.oneFeederTerminated();
}
}
};
solr.start();
return solr;
}
public static int remainingWaiting(final RemoteSearch[] searchThreads) {
if (searchThreads == null) return 0;
int alive = 0;

@ -26,7 +26,6 @@
package net.yacy.search.query;
import java.io.IOException;
import java.util.Comparator;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
@ -59,7 +58,6 @@ import net.yacy.document.Condenser;
import net.yacy.document.LibraryProvider;
import net.yacy.kelondro.data.meta.DigestURI;
import net.yacy.kelondro.data.meta.URIMetadata;
import net.yacy.kelondro.data.meta.URIMetadataNode;
import net.yacy.kelondro.data.meta.URIMetadataRow;
import net.yacy.kelondro.data.word.Word;
import net.yacy.kelondro.data.word.WordReference;
@ -76,10 +74,6 @@ import net.yacy.search.index.Segment;
import net.yacy.search.ranking.ReferenceOrder;
import net.yacy.search.snippet.ResultEntry;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrException;
import com.hp.hpl.jena.rdf.model.RDFNode;
import com.hp.hpl.jena.rdf.model.Resource;
@ -200,6 +194,7 @@ public final class RWIProcess extends Thread
@Override
public void run() {
/*
// start a concurrent solr search
if (this.query.query_include_words != null) {
Thread solrSearch = new Thread() {
@ -226,6 +221,7 @@ public final class RWIProcess extends Thread
};
solrSearch.start();
}
*/
// do a search
oneFeederStarted();

@ -144,11 +144,15 @@ public final class SearchEvent {
.getFlagAcceptRemoteIndex()));
final long start = System.currentTimeMillis();
// prepare a local RWI search
// initialize a ranking process that is the target for data
// that is generated concurrently from local and global search threads
this.rankingProcess = new RWIProcess(this.query, this.order, remote);
// start a local search concurrently
// start a local solr search
RemoteSearch.solrRemoteSearch(this, 100, this.query.query_include_hashes, 10000, null, Switchboard.urlBlacklist);
// start a local RWI search concurrently
this.rankingProcess.start();
if ( remote ) {

Loading…
Cancel
Save