diff --git a/source/net/yacy/peers/DHTSelection.java b/source/net/yacy/peers/DHTSelection.java index 33482e82a..4d4ab6b4a 100644 --- a/source/net/yacy/peers/DHTSelection.java +++ b/source/net/yacy/peers/DHTSelection.java @@ -28,6 +28,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Random; @@ -72,18 +73,28 @@ public class DHTSelection { return l; } + /** + * + * @param seedDB + * @param wordhashes + * @param minage + * @param omit + * @param maxcount + * @param r we must use a random factor for the selection to prevent that all peers do the same and therefore overload the same peers + * @return + */ public static Collection selectExtraTargets( final SeedDB seedDB, final HandleSet wordhashes, final int minage, final Set omit, - final int maxcount) { + final int maxcount, + final Random r) { Collection extraSeeds = new HashSet(); if (seedDB != null) { - final OrderedScoreMap seedSelection = new OrderedScoreMap(null); - Random r = new Random(); // we must use a random factor for the selection to prevent that all peers do the same and therefore overload the same peers + final OrderedScoreMap seedSelection = new OrderedScoreMap(null); // create sets that contains only robinson/node/large/young peers Iterator dhtEnum = seedDB.seedsConnected(true, false, null, 0.50f); @@ -93,12 +104,16 @@ public class DHTSelection { if (seed == null) continue; if (omit != null && omit.contains(seed)) continue; // sort out peers that are target for DHT if (seed.isLastSeenTimeout(3600000)) continue; // do not ask peers that had not been seen more than one hour (happens during a startup situation) - if (!seed.getFlagAcceptRemoteIndex() && seed.matchPeerTags(wordhashes)) seedSelection.dec(seed, r.nextInt(10) + 10); // robinson peers with matching peer tags - if (seed.getFlagRootNode()) seedSelection.dec(seed, r.nextInt(15) + 15); // root nodes (fast peers) - if (seed.getAge() < minage) seedSelection.dec(seed, r.nextInt(25) + 25);// the 'workshop feature', fresh peers should be seen - if (seed.getLinkCount() > 1000000) { + if (!seed.getFlagAcceptRemoteIndex() && seed.matchPeerTags(wordhashes)) seedSelection.dec(seed, r.nextInt(10) + 2); // robinson peers with matching peer tags + if (seed.getFlagRootNode()) seedSelection.dec(seed, r.nextInt(30) + 6); // root nodes (fast peers) + if (seed.getAge() < minage) seedSelection.dec(seed, r.nextInt(15) + 3); // young peers (with fresh info) + if (seed.getAge() < 1) seedSelection.dec(seed, r.nextInt(40) + 8); // the 'workshop feature', fresh peers should be seen + if (seed.getLinkCount() >= 100000 && seed.getLinkCount() < 1000000) { // peers above 100.000 links take part on a selection of medium-size peers + seedSelection.dec(seed, r.nextInt(25) + 5); + } + if (seed.getLinkCount() >= 1000000) { // peers above 1 million links take part on a selection of large peers int pf = 1 + (int) (20000000 / seed.getLinkCount()); - seedSelection.dec(seed, r.nextInt(pf) + pf); // large peers + seedSelection.dec(seed, r.nextInt(pf) + pf / 5); // large peers; choose large one less frequent to reduce load on their peer } } @@ -109,9 +124,11 @@ public class DHTSelection { seed = i.next(); if (RemoteSearch.log.isInfo()) { RemoteSearch.log.info("selectPeers/extra: " + seed.hash + ":" + seed.getName() + ", " + seed.getLinkCount() + " URLs" + - (!seed.getFlagAcceptRemoteIndex() && seed.matchPeerTags(wordhashes) ? " ROBINSON" : "") + - (seed.getFlagRootNode() ? " NODE" : "") + - (seed.getAge() < 1 ? " FRESH" : "") + (seed.getLinkCount() >= 1000000 ? " LARGE-SIZE" : "") + + (seed.getLinkCount() >= 100000 && seed.getLinkCount() < 1000000 ? " MEDIUM-SIZE" : "") + + (!seed.getFlagAcceptRemoteIndex() && seed.matchPeerTags(wordhashes) ? " ROBINSON" : "") + + (seed.getFlagRootNode() ? " NODE" : "") + + (seed.getAge() < 1 ? " FRESH" : "") ); } extraSeeds.add(seed); @@ -121,42 +138,65 @@ public class DHTSelection { return extraSeeds; } - public static Set selectDHTSearchTargets(final SeedDB seedDB, final HandleSet wordhashes, final int minage, final int redundancy) { + public static Set selectDHTSearchTargets(final SeedDB seedDB, final HandleSet wordhashes, final int minage, final int redundancy, final int maxredundancy, final Random random) { // put in seeds according to dht - Set seeds = new HashSet(); // dht position seeds + Set seeds = new LinkedHashSet(); // dht position seeds if (seedDB != null) { Iterator iter = wordhashes.iterator(); while (iter.hasNext()) { - seeds.addAll(selectDHTPositions(seedDB, iter.next(), minage, redundancy)); + seeds.addAll(collectHorizontalDHTPositions(seedDB, iter.next(), minage, redundancy, maxredundancy, random)); } - //int minimumseeds = Math.min(seedDB.scheme.verticalPartitions(), regularSeeds.size()); // that should be the minimum number of seeds that are returned - //int maximumseeds = seedDB.scheme.verticalPartitions() * redundancy; // this is the maximum number of seeds according to dht and heuristics. It can be more using burst mode. } return seeds; } - - private static List selectDHTPositions(final SeedDB seedDB, final byte[] wordhash, final int minage, final int redundancy) { + + private static ArrayList collectHorizontalDHTPositions(final SeedDB seedDB, final byte[] wordhash, final int minage, final int redundancy, final int maxredundancy, Random random) { // this method is called from the search target computation - ArrayList seeds = new ArrayList(redundancy); - Seed seed; + ArrayList collectedSeeds = new ArrayList(redundancy * seedDB.scheme.verticalPartitions()); for (int verticalPosition = 0; verticalPosition < seedDB.scheme.verticalPartitions(); verticalPosition++) { - final long dhtVerticalTarget = seedDB.scheme.verticalDHTPosition(wordhash, verticalPosition); - final byte[] verticalhash = Distribution.positionToHash(dhtVerticalTarget); - final Iterator dhtEnum = getAcceptRemoteIndexSeeds(seedDB, verticalhash, redundancy, false); - int c = Math.min(seedDB.sizeConnected(), redundancy); - int cc = 20; // in case that the network grows rapidly, we may jump to several additional peers but that must have a limit - while (dhtEnum.hasNext() && c > 0 && cc-- > 0) { - seed = dhtEnum.next(); - if (seed == null || seed.hash == null) continue; - if (!seed.getFlagAcceptRemoteIndex()) continue; // probably a robinson peer - if (seed.getAge() < minage) continue; // prevent bad results because of too strong network growth - if (RemoteSearch.log.isInfo()) RemoteSearch.log.info("selectPeers/DHTorder: " + seed.hash + ":" + seed.getName() + "/ score " + c); - seeds.add(seed); - c--; + ArrayList seeds = selectVerticalDHTPositions(seedDB, wordhash, minage, maxredundancy, verticalPosition); + if (seeds.size() <= redundancy) { + collectedSeeds.addAll(seeds); + } else { + // we pick some random peers from the vertical position. + // All of them should be valid, but picking a random subset is a distributed load balancing on the whole YaCy network. + // without picking a random subset, always the same peers would be targeted for the same word resulting in (possible) DoS on the target. + for (int i = 0; i < redundancy; i++) { + collectedSeeds.add(seeds.remove(random.nextInt(seeds.size()))); + } } } + return collectedSeeds; + } + + /** + * collecting vertical positions: that chooses for each of the DHT partition a collection of redundant storage positions + * @param seedDB the database of seeds + * @param wordhash the word we are searching for + * @param minage the minimum age of a seed (to prevent that too young seeds which cannot have results yet are asked) + * @param redundancy the number of redundant peer position for this parition, minimum is 1 + * @param verticalPosition the verical position, thats the number of the partition 0 <= verticalPosition < seedDB.scheme.verticalPartitions() + * @return a list of seeds for the redundant positions + */ + private static ArrayList selectVerticalDHTPositions(final SeedDB seedDB, final byte[] wordhash, final int minage, final int redundancy, int verticalPosition) { + // this method is called from the search target computation + ArrayList seeds = new ArrayList(redundancy); + final long dhtVerticalTarget = seedDB.scheme.verticalDHTPosition(wordhash, verticalPosition); + final byte[] verticalhash = Distribution.positionToHash(dhtVerticalTarget); + final Iterator dhtEnum = getAcceptRemoteIndexSeeds(seedDB, verticalhash, redundancy, false); + int c = Math.min(seedDB.sizeConnected(), redundancy); + int cc = 20; // in case that the network grows rapidly, we may jump to several additional peers but that must have a limit + while (dhtEnum.hasNext() && c > 0 && cc-- > 0) { + Seed seed = dhtEnum.next(); + if (seed == null || seed.hash == null) continue; + if (!seed.getFlagAcceptRemoteIndex()) continue; // probably a robinson peer + if (seed.getAge() < minage) continue; // prevent bad results because of too strong network growth + if (RemoteSearch.log.isInfo()) RemoteSearch.log.info("selectPeers/DHTorder: " + seed.hash + ":" + seed.getName() + "/ score " + c); + seeds.add(seed); + c--; + } return seeds; } diff --git a/source/net/yacy/peers/Protocol.java b/source/net/yacy/peers/Protocol.java index aa9619390..e63d74e07 100644 --- a/source/net/yacy/peers/Protocol.java +++ b/source/net/yacy/peers/Protocol.java @@ -1005,7 +1005,7 @@ public final class Protocol { final SolrQuery solrQuery, final int offset, final int count, - Seed target, + final Seed target, final int partitions, final Blacklist blacklist) { @@ -1030,47 +1030,65 @@ public final class Protocol { solrQuery.setHighlight(false); } boolean localsearch = target == null || target.equals(event.peers.mySeed()); - if (localsearch && Switchboard.getSwitchboard().getConfigBool(SwitchboardConstants.DEBUG_SEARCH_REMOTE_SOLR_TESTLOCAL, false)) { - target = event.peers.mySeed(); - localsearch = false; - } - RemoteInstance instance = null; - SolrConnector solrConnector = null; - SolrDocumentList docList = null; Map> facets = new HashMap>(event.query.facetfields.size()); Map snippets = new HashMap(); // this will be a list of urlhash-snippet entries + final QueryResponse[] rsp = new QueryResponse[]{null}; + final SolrDocumentList[] docList = new SolrDocumentList[]{null}; {// encapsulate expensive solr QueryResponse object - QueryResponse rsp = null; - if (localsearch) { + if (localsearch && !Switchboard.getSwitchboard().getConfigBool(SwitchboardConstants.DEBUG_SEARCH_REMOTE_SOLR_TESTLOCAL, false)) { // search the local index try { - rsp = event.getQuery().getSegment().fulltext().getDefaultConnector().getResponseByParams(solrQuery); - docList = rsp.getResults(); + rsp[0] = event.getQuery().getSegment().fulltext().getDefaultConnector().getResponseByParams(solrQuery); + docList[0] = rsp[0].getResults(); } catch (final Throwable e) { Network.log.info("SEARCH failed (solr), localpeer (" + e.getMessage() + ")", e); return -1; } } else { try { - boolean myseed = target == event.peers.mySeed(); - String address = myseed ? "localhost:" + target.getPort() : target.getPublicAddress(); + final boolean myseed = target == event.peers.mySeed(); + final String address = myseed ? "localhost:" + target.getPort() : target.getPublicAddress(); final int solrtimeout = Switchboard.getSwitchboard().getConfigInt(SwitchboardConstants.FEDERATED_SERVICE_SOLR_INDEXING_TIMEOUT, 6000); - instance = new RemoteInstance("http://" + address, null, "solr", solrtimeout); // this is a 'patch configuration' which considers 'solr' as default collection - solrConnector = new RemoteSolrConnector(instance, myseed ? true : target.getVersion() >= 1.63, "solr"); - rsp = solrConnector.getResponseByParams(solrQuery); - docList = rsp.getResults(); - solrConnector.close(); - instance.close(); + Thread remoteRequest = new Thread() { + public void run() { + try { + RemoteInstance instance = new RemoteInstance("http://" + address, null, "solr", solrtimeout); // this is a 'patch configuration' which considers 'solr' as default collection + try { + SolrConnector solrConnector = new RemoteSolrConnector(instance, myseed ? true : target.getVersion() >= 1.63, "solr"); + try { + rsp[0] = solrConnector.getResponseByParams(solrQuery); + docList[0] = rsp[0].getResults(); + } catch (Throwable e) {} finally { + solrConnector.close(); + } + } catch (Throwable ee) {} finally { + instance.close(); + } + } catch (Throwable eee) {} + } + }; + remoteRequest.start(); + remoteRequest.join(solrtimeout); // just wait until timeout appears + if (remoteRequest.isAlive()) { + try {remoteRequest.interrupt();} catch (Throwable e) {} + Network.log.info("SEARCH failed (solr), remote Peer: " + target.getName() + "/" + target.getPublicAddress() + " does not answer (time-out)"); + return -1; // give up, leave remoteRequest abandoned. + } // no need to close this here because that sends a commit to remote solr which is not wanted here } catch (final Throwable e) { - Network.log.info("SEARCH failed (solr), remote Peer: " +target.getName() + "/" + target.getPublicAddress() + " (" + e.getMessage() + ")"); + Network.log.info("SEARCH failed (solr), remote Peer: " + target.getName() + "/" + target.getPublicAddress() + " (" + e.getMessage() + ")"); return -1; } } + + if (rsp[0] == null || docList[0] == null) { + Network.log.info("SEARCH failed (solr), remote Peer: " + target.getName() + "/" + target.getPublicAddress() + " returned null"); + return -1; + } // evaluate facets for (String field: event.query.facetfields) { - FacetField facet = rsp.getFacetField(field); + FacetField facet = rsp[0].getFacetField(field); ReversibleScoreMap result = new ClusteredScoreMap(UTF8.insensitiveUTF8Comparator); List values = facet == null ? null : facet.getValues(); if (values == null) continue; @@ -1083,7 +1101,7 @@ public final class Protocol { } // evaluate snippets - Map>> rawsnippets = rsp.getHighlighting(); // a map from the urlhash to a map with key=field and value = list of snippets + Map>> rawsnippets = rsp[0].getHighlighting(); // a map from the urlhash to a map with key=field and value = list of snippets if (rawsnippets != null) { nextsnippet: for (Map.Entry>> re: rawsnippets.entrySet()) { Map> rs = re.getValue(); @@ -1099,20 +1117,20 @@ public final class Protocol { // no snippet found :( --we don't assign a value here by default; that can be done as an evaluation outside this method } } - rsp = null; + rsp[0] = null; } // evaluate result List container = new ArrayList(); - if (docList == null || docList.size() == 0) { + if (docList == null || docList[0].size() == 0) { Network.log.info("SEARCH (solr), returned 0 out of 0 documents from " + (target == null ? "shard" : ("peer " + target.hash + ":" + target.getName())) + " query = " + solrQuery.toString()) ; return 0; } - Network.log.info("SEARCH (solr), returned " + docList.size() + " out of " + docList.getNumFound() + " documents and " + facets.size() + " facets " + facets.keySet().toString() + " from " + (target == null ? "shard" : ("peer " + target.hash + ":" + target.getName()))); + Network.log.info("SEARCH (solr), returned " + docList[0].size() + " out of " + docList[0].getNumFound() + " documents and " + facets.size() + " facets " + facets.keySet().toString() + " from " + (target == null ? "shard" : ("peer " + target.hash + ":" + target.getName()))); int term = count; - Collection docs = new ArrayList(docList.size()); - for (final SolrDocument doc: docList) { + Collection docs = new ArrayList(docList[0].size()); + for (final SolrDocument doc: docList[0]) { if ( term-- <= 0 ) { break; // do not process more that requested (in case that evil peers fill us up with rubbish) } @@ -1168,10 +1186,10 @@ public final class Protocol { // add the url entry to the word indexes container.add(urlEntry); } - final int dls = docList.size(); - final int numFound = (int) docList.getNumFound(); - docList.clear(); - docList = null; + final int dls = docList[0].size(); + final int numFound = (int) docList[0].getNumFound(); + docList[0].clear(); + docList[0] = null; if (localsearch) { event.addNodes(container, facets, snippets, true, "localpeer", numFound); event.addFinalize(); @@ -1187,8 +1205,6 @@ public final class Protocol { event.addExpectedRemoteReferences(-count); Network.log.info("remote search (solr): peer " + target.getName() + " sent " + (container.size() == 0 ? 0 : container.size()) + "/" + numFound + " references"); } - if (solrConnector != null) solrConnector.close(); - if (instance != null) instance.close(); return dls; } diff --git a/source/net/yacy/peers/RemoteSearch.java b/source/net/yacy/peers/RemoteSearch.java index f316a768f..bcf5d39e3 100644 --- a/source/net/yacy/peers/RemoteSearch.java +++ b/source/net/yacy/peers/RemoteSearch.java @@ -24,8 +24,11 @@ package net.yacy.peers; +import java.util.ArrayList; import java.util.Collection; +import java.util.HashSet; import java.util.Iterator; +import java.util.Random; import java.util.Set; import java.util.SortedMap; @@ -144,31 +147,48 @@ public class RemoteSearch extends Thread { final boolean shortmem = MemoryControl.shortStatus(); final int indexingQueueSize = event.query.getSegment().fulltext().bufferSize(); int redundancy = event.peers.redundancy(); - if (indexingQueueSize > 10) redundancy = Math.max(1, redundancy - 1); - if (indexingQueueSize > 50) redundancy = Math.max(1, redundancy - 1); - if (Memory.load() > 2.0) redundancy = Math.max(1, redundancy - 1); - if (Memory.cores() < 4) redundancy = Math.max(1, redundancy - 1); - if (Memory.cores() == 1) redundancy = 1; + StringBuilder healthMessage = new StringBuilder(50); + if (indexingQueueSize > 0) {redundancy = Math.max(1, redundancy - 1); healthMessage.append(", indexingQueueSize > 0");} + if (indexingQueueSize > 10) {redundancy = Math.max(1, redundancy - 1); healthMessage.append(", indexingQueueSize > 10");} + if (indexingQueueSize > 50) {redundancy = Math.max(1, redundancy - 1); healthMessage.append(", indexingQueueSize > 50");} + if (Memory.load() > 2.0) {redundancy = Math.max(1, redundancy - 1); healthMessage.append(", load() > 2.0");} + if (Memory.cores() < 4) {redundancy = Math.max(1, redundancy - 1); healthMessage.append(", cores() < 4");} + if (Memory.cores() == 1) {redundancy = 1; healthMessage.append(", cores() == 1");} int minage = 3; int robinsoncount = event.peers.scheme.verticalPartitions() * redundancy / 2; + if (indexingQueueSize > 0) robinsoncount = Math.max(1, robinsoncount / 2); if (indexingQueueSize > 10) robinsoncount = Math.max(1, robinsoncount / 2); if (indexingQueueSize > 50) robinsoncount = Math.max(1, robinsoncount / 2); - if (shortmem) {redundancy = 1; robinsoncount = 1;} + if (shortmem) {redundancy = 1; robinsoncount = 1; healthMessage.append(", shortmem");} // prepare seed targets and threads - final Set dhtPeers = - (clusterselection == null) ? - DHTSelection.selectDHTSearchTargets( + Random random = new Random(System.currentTimeMillis()); + Set dhtPeers = null; + if (clusterselection != null) { + dhtPeers = DHTSelection.selectClusterPeers(event.peers, clusterselection); + } else { + dhtPeers = DHTSelection.selectDHTSearchTargets( event.peers, event.query.getQueryGoal().getIncludeHashes(), minage, - redundancy) - : DHTSelection.selectClusterPeers(event.peers, clusterselection); - if (dhtPeers == null) return; + redundancy, event.peers.redundancy(), + random); + // this set of peers may be too large and consume too many threads if more than one word is searched. + // to prevent overloading, we do a subset collection based on random to prevent the death of the own peer + // and to do a distributed load-balancing on the target peers + long targetSize = 1 + redundancy * event.peers.scheme.verticalPartitions(); // this is the maximum for one word plus one + if (dhtPeers.size() > targetSize) { + ArrayList pa = new ArrayList(dhtPeers.size()); + pa.addAll(dhtPeers); + dhtPeers.clear(); + for (int i = 0; i < targetSize; i++) dhtPeers.add(pa.remove(random.nextInt(pa.size()))); + } + } + if (dhtPeers == null) dhtPeers = new HashSet(); // select node targets - final Collection robinsonPeers = DHTSelection.selectExtraTargets(event.peers, event.query.getQueryGoal().getIncludeHashes(), minage, dhtPeers, robinsoncount); + final Collection robinsonPeers = DHTSelection.selectExtraTargets(event.peers, event.query.getQueryGoal().getIncludeHashes(), minage, dhtPeers, robinsoncount, random); if (event.peers != null) { if (Switchboard.getSwitchboard().getConfigBool(SwitchboardConstants.DEBUG_SEARCH_REMOTE_DHT_TESTLOCAL, false)) { @@ -183,7 +203,7 @@ public class RemoteSearch extends Thread { } log.info("preparing remote search: shortmem=" + (shortmem ? "true" : "false") + ", indexingQueueSize=" + indexingQueueSize + - ", redundancy=" + redundancy + ", minage=" + minage + ", robinsoncount=" + robinsoncount + ", dhtPeers=" + dhtPeers.size() + ", robinsonpeers=" + robinsonPeers.size()); + ", redundancy=" + redundancy + ", minage=" + minage + ", dhtPeers=" + dhtPeers.size() + ", robinsonpeers=" + robinsonPeers.size() + ", health: " + (healthMessage.length() > 0 ? healthMessage.substring(2) : "perfect")); // start solr searches @@ -296,12 +316,13 @@ public class RemoteSearch extends Thread { int urls = 0; try { event.oneFeederStarted(); + boolean localsearch = (targetPeer == null || targetPeer.equals(event.peers.mySeed())) && Switchboard.getSwitchboard().getConfigBool(SwitchboardConstants.DEBUG_SEARCH_REMOTE_SOLR_TESTLOCAL, false); urls = Protocol.solrQuery( event, solrQuery, start, count, - targetPeer, + localsearch ? event.peers.mySeed() : targetPeer, partitions, blacklist); if (urls >= 0) {