- better 'extra'-peer selection

- logging of health status for 'extra'-peer selection
- concurrency for remote peer IO and interrupting the threads if
time-out occurrs
pull/1/head
Michael Peter Christen 11 years ago
parent e3c4456c8e
commit 0bf3cab8c7

@ -28,6 +28,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
@ -72,18 +73,28 @@ public class DHTSelection {
return l;
}
/**
*
* @param seedDB
* @param wordhashes
* @param minage
* @param omit
* @param maxcount
* @param r we must use a random factor for the selection to prevent that all peers do the same and therefore overload the same peers
* @return
*/
public static Collection<Seed> selectExtraTargets(
final SeedDB seedDB,
final HandleSet wordhashes,
final int minage,
final Set<Seed> omit,
final int maxcount) {
final int maxcount,
final Random r) {
Collection<Seed> extraSeeds = new HashSet<Seed>();
if (seedDB != null) {
final OrderedScoreMap<Seed> seedSelection = new OrderedScoreMap<Seed>(null);
Random r = new Random(); // we must use a random factor for the selection to prevent that all peers do the same and therefore overload the same peers
final OrderedScoreMap<Seed> seedSelection = new OrderedScoreMap<Seed>(null);
// create sets that contains only robinson/node/large/young peers
Iterator<Seed> dhtEnum = seedDB.seedsConnected(true, false, null, 0.50f);
@ -93,12 +104,16 @@ public class DHTSelection {
if (seed == null) continue;
if (omit != null && omit.contains(seed)) continue; // sort out peers that are target for DHT
if (seed.isLastSeenTimeout(3600000)) continue; // do not ask peers that had not been seen more than one hour (happens during a startup situation)
if (!seed.getFlagAcceptRemoteIndex() && seed.matchPeerTags(wordhashes)) seedSelection.dec(seed, r.nextInt(10) + 10); // robinson peers with matching peer tags
if (seed.getFlagRootNode()) seedSelection.dec(seed, r.nextInt(15) + 15); // root nodes (fast peers)
if (seed.getAge() < minage) seedSelection.dec(seed, r.nextInt(25) + 25);// the 'workshop feature', fresh peers should be seen
if (seed.getLinkCount() > 1000000) {
if (!seed.getFlagAcceptRemoteIndex() && seed.matchPeerTags(wordhashes)) seedSelection.dec(seed, r.nextInt(10) + 2); // robinson peers with matching peer tags
if (seed.getFlagRootNode()) seedSelection.dec(seed, r.nextInt(30) + 6); // root nodes (fast peers)
if (seed.getAge() < minage) seedSelection.dec(seed, r.nextInt(15) + 3); // young peers (with fresh info)
if (seed.getAge() < 1) seedSelection.dec(seed, r.nextInt(40) + 8); // the 'workshop feature', fresh peers should be seen
if (seed.getLinkCount() >= 100000 && seed.getLinkCount() < 1000000) { // peers above 100.000 links take part on a selection of medium-size peers
seedSelection.dec(seed, r.nextInt(25) + 5);
}
if (seed.getLinkCount() >= 1000000) { // peers above 1 million links take part on a selection of large peers
int pf = 1 + (int) (20000000 / seed.getLinkCount());
seedSelection.dec(seed, r.nextInt(pf) + pf); // large peers
seedSelection.dec(seed, r.nextInt(pf) + pf / 5); // large peers; choose large one less frequent to reduce load on their peer
}
}
@ -109,9 +124,11 @@ public class DHTSelection {
seed = i.next();
if (RemoteSearch.log.isInfo()) {
RemoteSearch.log.info("selectPeers/extra: " + seed.hash + ":" + seed.getName() + ", " + seed.getLinkCount() + " URLs" +
(!seed.getFlagAcceptRemoteIndex() && seed.matchPeerTags(wordhashes) ? " ROBINSON" : "") +
(seed.getFlagRootNode() ? " NODE" : "") +
(seed.getAge() < 1 ? " FRESH" : "")
(seed.getLinkCount() >= 1000000 ? " LARGE-SIZE" : "") +
(seed.getLinkCount() >= 100000 && seed.getLinkCount() < 1000000 ? " MEDIUM-SIZE" : "") +
(!seed.getFlagAcceptRemoteIndex() && seed.matchPeerTags(wordhashes) ? " ROBINSON" : "") +
(seed.getFlagRootNode() ? " NODE" : "") +
(seed.getAge() < 1 ? " FRESH" : "")
);
}
extraSeeds.add(seed);
@ -121,42 +138,65 @@ public class DHTSelection {
return extraSeeds;
}
public static Set<Seed> selectDHTSearchTargets(final SeedDB seedDB, final HandleSet wordhashes, final int minage, final int redundancy) {
public static Set<Seed> selectDHTSearchTargets(final SeedDB seedDB, final HandleSet wordhashes, final int minage, final int redundancy, final int maxredundancy, final Random random) {
// put in seeds according to dht
Set<Seed> seeds = new HashSet<Seed>(); // dht position seeds
Set<Seed> seeds = new LinkedHashSet<Seed>(); // dht position seeds
if (seedDB != null) {
Iterator<byte[]> iter = wordhashes.iterator();
while (iter.hasNext()) {
seeds.addAll(selectDHTPositions(seedDB, iter.next(), minage, redundancy));
seeds.addAll(collectHorizontalDHTPositions(seedDB, iter.next(), minage, redundancy, maxredundancy, random));
}
//int minimumseeds = Math.min(seedDB.scheme.verticalPartitions(), regularSeeds.size()); // that should be the minimum number of seeds that are returned
//int maximumseeds = seedDB.scheme.verticalPartitions() * redundancy; // this is the maximum number of seeds according to dht and heuristics. It can be more using burst mode.
}
return seeds;
}
private static List<Seed> selectDHTPositions(final SeedDB seedDB, final byte[] wordhash, final int minage, final int redundancy) {
private static ArrayList<Seed> collectHorizontalDHTPositions(final SeedDB seedDB, final byte[] wordhash, final int minage, final int redundancy, final int maxredundancy, Random random) {
// this method is called from the search target computation
ArrayList<Seed> seeds = new ArrayList<Seed>(redundancy);
Seed seed;
ArrayList<Seed> collectedSeeds = new ArrayList<Seed>(redundancy * seedDB.scheme.verticalPartitions());
for (int verticalPosition = 0; verticalPosition < seedDB.scheme.verticalPartitions(); verticalPosition++) {
final long dhtVerticalTarget = seedDB.scheme.verticalDHTPosition(wordhash, verticalPosition);
final byte[] verticalhash = Distribution.positionToHash(dhtVerticalTarget);
final Iterator<Seed> dhtEnum = getAcceptRemoteIndexSeeds(seedDB, verticalhash, redundancy, false);
int c = Math.min(seedDB.sizeConnected(), redundancy);
int cc = 20; // in case that the network grows rapidly, we may jump to several additional peers but that must have a limit
while (dhtEnum.hasNext() && c > 0 && cc-- > 0) {
seed = dhtEnum.next();
if (seed == null || seed.hash == null) continue;
if (!seed.getFlagAcceptRemoteIndex()) continue; // probably a robinson peer
if (seed.getAge() < minage) continue; // prevent bad results because of too strong network growth
if (RemoteSearch.log.isInfo()) RemoteSearch.log.info("selectPeers/DHTorder: " + seed.hash + ":" + seed.getName() + "/ score " + c);
seeds.add(seed);
c--;
ArrayList<Seed> seeds = selectVerticalDHTPositions(seedDB, wordhash, minage, maxredundancy, verticalPosition);
if (seeds.size() <= redundancy) {
collectedSeeds.addAll(seeds);
} else {
// we pick some random peers from the vertical position.
// All of them should be valid, but picking a random subset is a distributed load balancing on the whole YaCy network.
// without picking a random subset, always the same peers would be targeted for the same word resulting in (possible) DoS on the target.
for (int i = 0; i < redundancy; i++) {
collectedSeeds.add(seeds.remove(random.nextInt(seeds.size())));
}
}
}
return collectedSeeds;
}
/**
* collecting vertical positions: that chooses for each of the DHT partition a collection of redundant storage positions
* @param seedDB the database of seeds
* @param wordhash the word we are searching for
* @param minage the minimum age of a seed (to prevent that too young seeds which cannot have results yet are asked)
* @param redundancy the number of redundant peer position for this parition, minimum is 1
* @param verticalPosition the verical position, thats the number of the partition 0 <= verticalPosition < seedDB.scheme.verticalPartitions()
* @return a list of seeds for the redundant positions
*/
private static ArrayList<Seed> selectVerticalDHTPositions(final SeedDB seedDB, final byte[] wordhash, final int minage, final int redundancy, int verticalPosition) {
// this method is called from the search target computation
ArrayList<Seed> seeds = new ArrayList<Seed>(redundancy);
final long dhtVerticalTarget = seedDB.scheme.verticalDHTPosition(wordhash, verticalPosition);
final byte[] verticalhash = Distribution.positionToHash(dhtVerticalTarget);
final Iterator<Seed> dhtEnum = getAcceptRemoteIndexSeeds(seedDB, verticalhash, redundancy, false);
int c = Math.min(seedDB.sizeConnected(), redundancy);
int cc = 20; // in case that the network grows rapidly, we may jump to several additional peers but that must have a limit
while (dhtEnum.hasNext() && c > 0 && cc-- > 0) {
Seed seed = dhtEnum.next();
if (seed == null || seed.hash == null) continue;
if (!seed.getFlagAcceptRemoteIndex()) continue; // probably a robinson peer
if (seed.getAge() < minage) continue; // prevent bad results because of too strong network growth
if (RemoteSearch.log.isInfo()) RemoteSearch.log.info("selectPeers/DHTorder: " + seed.hash + ":" + seed.getName() + "/ score " + c);
seeds.add(seed);
c--;
}
return seeds;
}

@ -1005,7 +1005,7 @@ public final class Protocol {
final SolrQuery solrQuery,
final int offset,
final int count,
Seed target,
final Seed target,
final int partitions,
final Blacklist blacklist) {
@ -1030,47 +1030,65 @@ public final class Protocol {
solrQuery.setHighlight(false);
}
boolean localsearch = target == null || target.equals(event.peers.mySeed());
if (localsearch && Switchboard.getSwitchboard().getConfigBool(SwitchboardConstants.DEBUG_SEARCH_REMOTE_SOLR_TESTLOCAL, false)) {
target = event.peers.mySeed();
localsearch = false;
}
RemoteInstance instance = null;
SolrConnector solrConnector = null;
SolrDocumentList docList = null;
Map<String, ReversibleScoreMap<String>> facets = new HashMap<String, ReversibleScoreMap<String>>(event.query.facetfields.size());
Map<String, String> snippets = new HashMap<String, String>(); // this will be a list of urlhash-snippet entries
final QueryResponse[] rsp = new QueryResponse[]{null};
final SolrDocumentList[] docList = new SolrDocumentList[]{null};
{// encapsulate expensive solr QueryResponse object
QueryResponse rsp = null;
if (localsearch) {
if (localsearch && !Switchboard.getSwitchboard().getConfigBool(SwitchboardConstants.DEBUG_SEARCH_REMOTE_SOLR_TESTLOCAL, false)) {
// search the local index
try {
rsp = event.getQuery().getSegment().fulltext().getDefaultConnector().getResponseByParams(solrQuery);
docList = rsp.getResults();
rsp[0] = event.getQuery().getSegment().fulltext().getDefaultConnector().getResponseByParams(solrQuery);
docList[0] = rsp[0].getResults();
} catch (final Throwable e) {
Network.log.info("SEARCH failed (solr), localpeer (" + e.getMessage() + ")", e);
return -1;
}
} else {
try {
boolean myseed = target == event.peers.mySeed();
String address = myseed ? "localhost:" + target.getPort() : target.getPublicAddress();
final boolean myseed = target == event.peers.mySeed();
final String address = myseed ? "localhost:" + target.getPort() : target.getPublicAddress();
final int solrtimeout = Switchboard.getSwitchboard().getConfigInt(SwitchboardConstants.FEDERATED_SERVICE_SOLR_INDEXING_TIMEOUT, 6000);
instance = new RemoteInstance("http://" + address, null, "solr", solrtimeout); // this is a 'patch configuration' which considers 'solr' as default collection
solrConnector = new RemoteSolrConnector(instance, myseed ? true : target.getVersion() >= 1.63, "solr");
rsp = solrConnector.getResponseByParams(solrQuery);
docList = rsp.getResults();
solrConnector.close();
instance.close();
Thread remoteRequest = new Thread() {
public void run() {
try {
RemoteInstance instance = new RemoteInstance("http://" + address, null, "solr", solrtimeout); // this is a 'patch configuration' which considers 'solr' as default collection
try {
SolrConnector solrConnector = new RemoteSolrConnector(instance, myseed ? true : target.getVersion() >= 1.63, "solr");
try {
rsp[0] = solrConnector.getResponseByParams(solrQuery);
docList[0] = rsp[0].getResults();
} catch (Throwable e) {} finally {
solrConnector.close();
}
} catch (Throwable ee) {} finally {
instance.close();
}
} catch (Throwable eee) {}
}
};
remoteRequest.start();
remoteRequest.join(solrtimeout); // just wait until timeout appears
if (remoteRequest.isAlive()) {
try {remoteRequest.interrupt();} catch (Throwable e) {}
Network.log.info("SEARCH failed (solr), remote Peer: " + target.getName() + "/" + target.getPublicAddress() + " does not answer (time-out)");
return -1; // give up, leave remoteRequest abandoned.
}
// no need to close this here because that sends a commit to remote solr which is not wanted here
} catch (final Throwable e) {
Network.log.info("SEARCH failed (solr), remote Peer: " +target.getName() + "/" + target.getPublicAddress() + " (" + e.getMessage() + ")");
Network.log.info("SEARCH failed (solr), remote Peer: " + target.getName() + "/" + target.getPublicAddress() + " (" + e.getMessage() + ")");
return -1;
}
}
if (rsp[0] == null || docList[0] == null) {
Network.log.info("SEARCH failed (solr), remote Peer: " + target.getName() + "/" + target.getPublicAddress() + " returned null");
return -1;
}
// evaluate facets
for (String field: event.query.facetfields) {
FacetField facet = rsp.getFacetField(field);
FacetField facet = rsp[0].getFacetField(field);
ReversibleScoreMap<String> result = new ClusteredScoreMap<String>(UTF8.insensitiveUTF8Comparator);
List<Count> values = facet == null ? null : facet.getValues();
if (values == null) continue;
@ -1083,7 +1101,7 @@ public final class Protocol {
}
// evaluate snippets
Map<String, Map<String, List<String>>> rawsnippets = rsp.getHighlighting(); // a map from the urlhash to a map with key=field and value = list of snippets
Map<String, Map<String, List<String>>> rawsnippets = rsp[0].getHighlighting(); // a map from the urlhash to a map with key=field and value = list of snippets
if (rawsnippets != null) {
nextsnippet: for (Map.Entry<String, Map<String, List<String>>> re: rawsnippets.entrySet()) {
Map<String, List<String>> rs = re.getValue();
@ -1099,20 +1117,20 @@ public final class Protocol {
// no snippet found :( --we don't assign a value here by default; that can be done as an evaluation outside this method
}
}
rsp = null;
rsp[0] = null;
}
// evaluate result
List<URIMetadataNode> container = new ArrayList<URIMetadataNode>();
if (docList == null || docList.size() == 0) {
if (docList == null || docList[0].size() == 0) {
Network.log.info("SEARCH (solr), returned 0 out of 0 documents from " + (target == null ? "shard" : ("peer " + target.hash + ":" + target.getName())) + " query = " + solrQuery.toString()) ;
return 0;
}
Network.log.info("SEARCH (solr), returned " + docList.size() + " out of " + docList.getNumFound() + " documents and " + facets.size() + " facets " + facets.keySet().toString() + " from " + (target == null ? "shard" : ("peer " + target.hash + ":" + target.getName())));
Network.log.info("SEARCH (solr), returned " + docList[0].size() + " out of " + docList[0].getNumFound() + " documents and " + facets.size() + " facets " + facets.keySet().toString() + " from " + (target == null ? "shard" : ("peer " + target.hash + ":" + target.getName())));
int term = count;
Collection<SolrInputDocument> docs = new ArrayList<SolrInputDocument>(docList.size());
for (final SolrDocument doc: docList) {
Collection<SolrInputDocument> docs = new ArrayList<SolrInputDocument>(docList[0].size());
for (final SolrDocument doc: docList[0]) {
if ( term-- <= 0 ) {
break; // do not process more that requested (in case that evil peers fill us up with rubbish)
}
@ -1168,10 +1186,10 @@ public final class Protocol {
// add the url entry to the word indexes
container.add(urlEntry);
}
final int dls = docList.size();
final int numFound = (int) docList.getNumFound();
docList.clear();
docList = null;
final int dls = docList[0].size();
final int numFound = (int) docList[0].getNumFound();
docList[0].clear();
docList[0] = null;
if (localsearch) {
event.addNodes(container, facets, snippets, true, "localpeer", numFound);
event.addFinalize();
@ -1187,8 +1205,6 @@ public final class Protocol {
event.addExpectedRemoteReferences(-count);
Network.log.info("remote search (solr): peer " + target.getName() + " sent " + (container.size() == 0 ? 0 : container.size()) + "/" + numFound + " references");
}
if (solrConnector != null) solrConnector.close();
if (instance != null) instance.close();
return dls;
}

@ -24,8 +24,11 @@
package net.yacy.peers;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Random;
import java.util.Set;
import java.util.SortedMap;
@ -144,31 +147,48 @@ public class RemoteSearch extends Thread {
final boolean shortmem = MemoryControl.shortStatus();
final int indexingQueueSize = event.query.getSegment().fulltext().bufferSize();
int redundancy = event.peers.redundancy();
if (indexingQueueSize > 10) redundancy = Math.max(1, redundancy - 1);
if (indexingQueueSize > 50) redundancy = Math.max(1, redundancy - 1);
if (Memory.load() > 2.0) redundancy = Math.max(1, redundancy - 1);
if (Memory.cores() < 4) redundancy = Math.max(1, redundancy - 1);
if (Memory.cores() == 1) redundancy = 1;
StringBuilder healthMessage = new StringBuilder(50);
if (indexingQueueSize > 0) {redundancy = Math.max(1, redundancy - 1); healthMessage.append(", indexingQueueSize > 0");}
if (indexingQueueSize > 10) {redundancy = Math.max(1, redundancy - 1); healthMessage.append(", indexingQueueSize > 10");}
if (indexingQueueSize > 50) {redundancy = Math.max(1, redundancy - 1); healthMessage.append(", indexingQueueSize > 50");}
if (Memory.load() > 2.0) {redundancy = Math.max(1, redundancy - 1); healthMessage.append(", load() > 2.0");}
if (Memory.cores() < 4) {redundancy = Math.max(1, redundancy - 1); healthMessage.append(", cores() < 4");}
if (Memory.cores() == 1) {redundancy = 1; healthMessage.append(", cores() == 1");}
int minage = 3;
int robinsoncount = event.peers.scheme.verticalPartitions() * redundancy / 2;
if (indexingQueueSize > 0) robinsoncount = Math.max(1, robinsoncount / 2);
if (indexingQueueSize > 10) robinsoncount = Math.max(1, robinsoncount / 2);
if (indexingQueueSize > 50) robinsoncount = Math.max(1, robinsoncount / 2);
if (shortmem) {redundancy = 1; robinsoncount = 1;}
if (shortmem) {redundancy = 1; robinsoncount = 1; healthMessage.append(", shortmem");}
// prepare seed targets and threads
final Set<Seed> dhtPeers =
(clusterselection == null) ?
DHTSelection.selectDHTSearchTargets(
Random random = new Random(System.currentTimeMillis());
Set<Seed> dhtPeers = null;
if (clusterselection != null) {
dhtPeers = DHTSelection.selectClusterPeers(event.peers, clusterselection);
} else {
dhtPeers = DHTSelection.selectDHTSearchTargets(
event.peers,
event.query.getQueryGoal().getIncludeHashes(),
minage,
redundancy)
: DHTSelection.selectClusterPeers(event.peers, clusterselection);
if (dhtPeers == null) return;
redundancy, event.peers.redundancy(),
random);
// this set of peers may be too large and consume too many threads if more than one word is searched.
// to prevent overloading, we do a subset collection based on random to prevent the death of the own peer
// and to do a distributed load-balancing on the target peers
long targetSize = 1 + redundancy * event.peers.scheme.verticalPartitions(); // this is the maximum for one word plus one
if (dhtPeers.size() > targetSize) {
ArrayList<Seed> pa = new ArrayList<Seed>(dhtPeers.size());
pa.addAll(dhtPeers);
dhtPeers.clear();
for (int i = 0; i < targetSize; i++) dhtPeers.add(pa.remove(random.nextInt(pa.size())));
}
}
if (dhtPeers == null) dhtPeers = new HashSet<Seed>();
// select node targets
final Collection<Seed> robinsonPeers = DHTSelection.selectExtraTargets(event.peers, event.query.getQueryGoal().getIncludeHashes(), minage, dhtPeers, robinsoncount);
final Collection<Seed> robinsonPeers = DHTSelection.selectExtraTargets(event.peers, event.query.getQueryGoal().getIncludeHashes(), minage, dhtPeers, robinsoncount, random);
if (event.peers != null) {
if (Switchboard.getSwitchboard().getConfigBool(SwitchboardConstants.DEBUG_SEARCH_REMOTE_DHT_TESTLOCAL, false)) {
@ -183,7 +203,7 @@ public class RemoteSearch extends Thread {
}
log.info("preparing remote search: shortmem=" + (shortmem ? "true" : "false") + ", indexingQueueSize=" + indexingQueueSize +
", redundancy=" + redundancy + ", minage=" + minage + ", robinsoncount=" + robinsoncount + ", dhtPeers=" + dhtPeers.size() + ", robinsonpeers=" + robinsonPeers.size());
", redundancy=" + redundancy + ", minage=" + minage + ", dhtPeers=" + dhtPeers.size() + ", robinsonpeers=" + robinsonPeers.size() + ", health: " + (healthMessage.length() > 0 ? healthMessage.substring(2) : "perfect"));
// start solr searches
@ -296,12 +316,13 @@ public class RemoteSearch extends Thread {
int urls = 0;
try {
event.oneFeederStarted();
boolean localsearch = (targetPeer == null || targetPeer.equals(event.peers.mySeed())) && Switchboard.getSwitchboard().getConfigBool(SwitchboardConstants.DEBUG_SEARCH_REMOTE_SOLR_TESTLOCAL, false);
urls = Protocol.solrQuery(
event,
solrQuery,
start,
count,
targetPeer,
localsearch ? event.peers.mySeed() : targetPeer,
partitions,
blacklist);
if (urls >= 0) {

Loading…
Cancel
Save