remote search peer selection schema change:

- all non-dht targets (previously separated into 'robinson' for dht-like
queries and 'node' for solr queries) are non 'extra' peers, which are
queries using solr
- these extra-peers are now selected using a ranking on last-seen,
peer-tag-matches, node-peer flags, peer age, and link count. The ranking
is done using a weight and a random factor.
- the number of extra peers is 50% of the dht peers
- the dht peers now exclude too young peers to prevent bad results
during strong growth of the network
- the number of dht peers (and therefore extra-peers) is reduced when
the memory of the peer is low and/or some documents still appear in the
indexing-queue. This shall prevent a peer from deadlocks when p2p
queries are made in a fast sequence on weak hardware.
pull/1/head
Michael Peter Christen 11 years ago
parent 47a82e471c
commit f8ce7040ab

@ -45,22 +45,6 @@ network.unit.dht.partitionExponent = 4
# to not-handshaking in a distributed way. It means to get data without using
# a dht transmission logic.
# robinson burst: percentage of the number of robinson peers that
# shall be accessed for every search. This includes also robinson peers
# that do not have a matching peer tag. If this is set to 100 then all robinson
# peers are always asked
network.unit.dht.burst.robinson = 50
# multi-word burst: percentage of the number of all peers that
# shall be accessed for multi-word searches. Multi-word search is
# a hard problem when the distributed search network is divided by
# term (as done with yacy, partly..).
# Scientific solutions for this problem is to apply heuristics.
# This heuristic enables to switch on a full network scan to get also
# non-distributed multi-word positions. For a full scan set this value to 100.
# Attention: this may out-number the maxcount of available httpc network connections.
network.unit.dht.burst.multiword = 30
# switch to enable verification of search results
# must be set to true in untrusted networks and can be
# set to false in completely trusted networks

@ -61,7 +61,6 @@ import net.yacy.peers.Seed;
import net.yacy.repository.Blacklist;
import net.yacy.repository.Blacklist.BlacklistType;
import net.yacy.search.Switchboard;
import net.yacy.search.SwitchboardConstants;
import net.yacy.search.index.Segment;
import net.yacy.search.query.QueryGoal;
import net.yacy.search.query.QueryModifier;
@ -639,7 +638,7 @@ public class IndexControlRWIs_p {
false,
0.0d, 0.0d, 0.0d,
new String[0]);
final SearchEvent theSearch = SearchEventCache.getEvent(query, sb.peers, sb.tables, null, false, sb.loader, Integer.MAX_VALUE, Long.MAX_VALUE, (int) sb.getConfigLong(SwitchboardConstants.DHT_BURST_ROBINSON, 0), (int) sb.getConfigLong(SwitchboardConstants.DHT_BURST_MULTIWORD, 0));
final SearchEvent theSearch = SearchEventCache.getEvent(query, sb.peers, sb.tables, null, false, sb.loader, Integer.MAX_VALUE, Long.MAX_VALUE);
if (theSearch.rwiProcess != null && theSearch.rwiProcess.isAlive()) try {theSearch.rwiProcess.join();} catch (final InterruptedException e) {}
if (theSearch.local_rwi_available.get() == 0) {
prop.put("searchresult", 2);

@ -323,7 +323,7 @@ public final class search {
EventChannel.channels(EventChannel.REMOTESEARCH).addMessage(new RSSMessage("Remote Search Request from " + ((remoteSeed == null) ? "unknown" : remoteSeed.getName()), QueryParams.anonymizedQueryHashes(theQuery.getQueryGoal().getIncludeHashes()), ""));
// make event
theSearch = SearchEventCache.getEvent(theQuery, sb.peers, sb.tables, null, abstracts.length() > 0, sb.loader, count, maxtime, (int) sb.getConfigLong(SwitchboardConstants.DHT_BURST_ROBINSON, 0), (int) sb.getConfigLong(SwitchboardConstants.DHT_BURST_MULTIWORD, 0));
theSearch = SearchEventCache.getEvent(theQuery, sb.peers, sb.tables, null, abstracts.length() > 0, sb.loader, count, maxtime);
if (theSearch.rwiProcess != null && theSearch.rwiProcess.isAlive()) try {theSearch.rwiProcess.join();} catch (final InterruptedException e) {}
// set statistic details of search result and find best result index set

@ -718,9 +718,7 @@ public class yacysearch {
sb.getConfigLong(SwitchboardConstants.REMOTESEARCH_MAXCOUNT_DEFAULT, 10)),
sb.getConfigLong(
SwitchboardConstants.REMOTESEARCH_MAXTIME_USER,
sb.getConfigLong(SwitchboardConstants.REMOTESEARCH_MAXTIME_DEFAULT, 3000)),
(int) sb.getConfigLong(SwitchboardConstants.DHT_BURST_ROBINSON, 0),
(int) sb.getConfigLong(SwitchboardConstants.DHT_BURST_MULTIWORD, 0));
sb.getConfigLong(SwitchboardConstants.REMOTESEARCH_MAXTIME_DEFAULT, 3000)));
if ( startRecord == 0 ) {
if ( modifier.sitehost != null && sb.getConfigBool(SwitchboardConstants.HEURISTIC_SITE, false) && authenticated && !stealthmode) {

@ -61,6 +61,12 @@ public class CachedSolrConnector extends AbstractSolrConnector implements SolrCo
this.missCache = new ConcurrentARC<String, Object>(missCacheMax, partitions);
}
@Override
public int bufferSize() {
return solr.bufferSize();
}
@Override
public void clearCaches() {
this.hitCache.clear();
this.missCache.clear();

@ -139,6 +139,11 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
ensureAliveUpdateHandler();
}
@Override
public int bufferSize() {
return this.updateQueue.size() + this.deleteQueue.size();
}
@Override
public void clearCaches() {
this.connector.clearCaches();

@ -102,7 +102,13 @@ public class EmbeddedSolrConnector extends SolrServerConnector implements SolrCo
this.requestHandler.inform(this.core);
super.init(this.instance.getServer(coreName));
}
@Override
public int bufferSize() {
return 0;
}
@Override
public void clearCaches() {
SolrConfig solrConfig = this.core.getSolrConfig();
@SuppressWarnings("unchecked")

@ -53,6 +53,14 @@ public class MirrorSolrConnector extends AbstractSolrConnector implements SolrCo
this.solr0 = solr0;
this.solr1 = solr1;
}
@Override
public int bufferSize() {
int b = 0;
if (this.solr0 != null) b += this.solr0.bufferSize();
if (this.solr1 != null) b += this.solr1.bufferSize();
return b;
}
@Override
public void clearCaches() {

@ -74,6 +74,11 @@ public class RemoteSolrConnector extends SolrServerConnector implements SolrConn
public synchronized void close() {
super.close();
}
@Override
public int bufferSize() {
return 0;
}
@Override
public void clearCaches() {

@ -36,12 +36,17 @@ import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.ModifiableSolrParams;
public interface SolrConnector extends Iterable<String> /* Iterable of document IDs */ {
/**
* clear all caches: inside solr and ouside solr within the implementations of this interface
*/
public void clearCaches();
/**
* get the size of a write buffer (if any) of pending write requests
*/
public int bufferSize();
/**
* get the size of the index
* @return number of results if solr is queries with a catch-all pattern

@ -159,11 +159,16 @@ public class InstanceMirror {
this.connectorCache.put(corename, msc);
return msc;
}
public int bufferSize() {
int b = 0;
for (SolrConnector csc: this.connectorCache.values()) b += csc.bufferSize();
for (EmbeddedSolrConnector ssc: this.embeddedCache.values()) b += ssc.bufferSize();
return b;
}
public void clearCaches() {
for (SolrConnector csc: this.connectorCache.values()) {
csc.clearCaches();
}
for (SolrConnector csc: this.connectorCache.values()) csc.clearCaches();
for (EmbeddedSolrConnector ssc: this.embeddedCache.values()) ssc.commit(true);
}

@ -151,7 +151,10 @@ public class Distribution {
* @return a number from 0..verticalPartitions()
*/
public final int verticalDHTPosition(final byte[] urlHash) {
return (int) (Distribution.horizontalDHTPosition(urlHash) >> this.shiftLength); // take only the top-<partitionExponent> bits
int vdp = (int) (Distribution.horizontalDHTPosition(urlHash) >> this.shiftLength); // take only the top-<partitionExponent> bits
assert vdp >= 0;
assert vdp < this.partitionCount;
return vdp;
}
public static void main(String[] args) {

@ -25,7 +25,7 @@
package net.yacy.peers;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@ -40,11 +40,10 @@ import net.yacy.cora.document.encoding.ASCII;
import net.yacy.cora.federate.yacy.Distribution;
import net.yacy.cora.order.Base64Order;
import net.yacy.cora.order.Digest;
import net.yacy.cora.sorting.OrderedScoreMap;
import net.yacy.cora.storage.HandleSet;
import net.yacy.cora.util.ConcurrentLog;
import net.yacy.cora.util.SpaceExceededException;
import net.yacy.cora.util.LookAheadIterator;
import net.yacy.kelondro.data.word.Word;
import net.yacy.kelondro.index.RowHandleSet;
import net.yacy.kelondro.util.kelondroException;
import net.yacy.peers.operation.yacyVersion;
@ -56,10 +55,10 @@ import net.yacy.peers.operation.yacyVersion;
*/
public class DHTSelection {
public static List<Seed> selectClusterPeers(final SeedDB seedDB, final SortedMap<byte[], String> peerhashes) {
public static Set<Seed> selectClusterPeers(final SeedDB seedDB, final SortedMap<byte[], String> peerhashes) {
final Iterator<Map.Entry<byte[], String>> i = peerhashes.entrySet().iterator();
final List<Seed> l = new ArrayList<Seed>();
final Set<Seed> l = new HashSet<Seed>();
Map.Entry<byte[], String> entry;
Seed s;
while (i.hasNext()) {
@ -72,149 +71,96 @@ public class DHTSelection {
}
return l;
}
public static List<Seed> selectNodeSearchTargets(final SeedDB seedDB, int maxCount, Set<Seed> omit) {
if (seedDB == null) { return null; }
final List<Seed> goodSeeds = new ArrayList<Seed>();
final List<Seed> optionalSeeds = new ArrayList<Seed>();
Seed seed;
Iterator<Seed> seedenum = seedDB.seedsConnected(true, true, Seed.randomHash(), 1.041f);
int c = seedDB.sizeConnected();
while (seedenum.hasNext() && c-- > 0 && goodSeeds.size() < maxCount) {
seed = seedenum.next();
if (seed == null || omit.contains(seed)) continue;
if (seed.getFlagRootNode()) {
goodSeeds.add(seed);
} else {
optionalSeeds.add(seed);
}
}
Random r = new Random(System.currentTimeMillis());
while (goodSeeds.size() < maxCount && optionalSeeds.size() > 0) {
goodSeeds.add(optionalSeeds.remove(r.nextInt(optionalSeeds.size())));
}
return goodSeeds;
}
public static List<Seed> selectSearchTargets(
public static Collection<Seed> selectExtraTargets(
final SeedDB seedDB,
final HandleSet wordhashes,
int redundancy,
int burstRobinsonPercent,
int burstMultiwordPercent) {
// find out a specific number of seeds, that would be relevant for the given word hash(es)
// the result is ordered by relevance: [0] is most relevant
// the seedcount is the maximum number of wanted results
if (seedDB == null) { return null; }
// put in seeds according to dht
final Map<String, Seed> regularSeeds = new HashMap<String, Seed>(); // dht position seeds
Seed seed;
Iterator<Seed> dhtEnum;
Iterator<byte[]> iter = wordhashes.iterator();
while (iter.hasNext()) {
selectDHTPositions(seedDB, iter.next(), redundancy, regularSeeds);
}
//int minimumseeds = Math.min(seedDB.scheme.verticalPartitions(), regularSeeds.size()); // that should be the minimum number of seeds that are returned
//int maximumseeds = seedDB.scheme.verticalPartitions() * redundancy; // this is the maximum number of seeds according to dht and heuristics. It can be more using burst mode.
// put in some seeds according to size of peer.
// But not all, that would produce too much load on the largest peers
dhtEnum = seedDB.seedsSortedConnected(false, Seed.ICOUNT);
int c = Math.max(Math.min(5, seedDB.sizeConnected()), wordhashes.size() > 1 ? seedDB.sizeConnected() * burstMultiwordPercent / 100 : 0);
while (dhtEnum.hasNext() && c-- > 0) {
seed = dhtEnum.next();
if (seed == null) continue;
if (seed.isLastSeenTimeout(3600000)) continue;
if (seed.getAge() < 1) { // the 'workshop feature'
ConcurrentLog.info("DHT", "selectPeers/Age: " + seed.hash + ":" + seed.getName() + ", is newbie, age = " + seed.getAge());
regularSeeds.put(seed.hash, seed);
continue;
}
if (Math.random() * 100 + (wordhashes.size() > 1 ? burstMultiwordPercent : 25) >= 50) {
if (ConcurrentLog.isFine("DHT")) ConcurrentLog.fine("DHT", "selectPeers/CountBurst: " + seed.hash + ":" + seed.getName() + ", RWIcount=" + seed.getWordCount());
regularSeeds.put(seed.hash, seed);
continue;
final int minage,
final Set<Seed> omit,
final int maxcount) {
Collection<Seed> extraSeeds = new HashSet<Seed>();
if (seedDB != null) {
final OrderedScoreMap<Seed> seedSelection = new OrderedScoreMap<Seed>(null);
Random r = new Random(); // we must use a random factor for the selection to prevent that all peers do the same and therefore overload the same peers
// create sets that contains only robinson/node/large/young peers
Iterator<Seed> dhtEnum = seedDB.seedsConnected(true, false, null, 0.50f);
Seed seed;
while (dhtEnum.hasNext()) {
seed = dhtEnum.next();
if (seed == null) continue;
if (omit != null && omit.contains(seed)) continue; // sort out peers that are target for DHT
if (seed.isLastSeenTimeout(3600000)) continue; // do not ask peers that had not been seen more than one hour (happens during a startup situation)
if (!seed.getFlagAcceptRemoteIndex() && seed.matchPeerTags(wordhashes)) seedSelection.dec(seed, r.nextInt(10) + 10); // robinson peers with matching peer tags
if (seed.getFlagRootNode()) seedSelection.dec(seed, r.nextInt(15) + 15); // root nodes (fast peers)
if (seed.getAge() < minage) seedSelection.dec(seed, r.nextInt(25) + 25);// the 'workshop feature', fresh peers should be seen
if (seed.getLinkCount() > 1000000) {
int pf = 1 + (int) (20000000 / seed.getLinkCount());
seedSelection.dec(seed, r.nextInt(pf) + pf); // large peers
}
}
}
// create a set that contains only robinson peers because these get a special handling
dhtEnum = seedDB.seedsConnected(true, false, null, 0.50f);
Set<Seed> robinson = new HashSet<Seed>();
while (dhtEnum.hasNext()) {
seed = dhtEnum.next();
if (seed == null) continue;
if (seed.getFlagAcceptRemoteIndex()) continue;
if (seed.isLastSeenTimeout(3600000)) continue;
robinson.add(seed);
}
// add robinson peers according to robinson burst rate
dhtEnum = robinson.iterator();
c = robinson.size() * burstRobinsonPercent / 100;
while (dhtEnum.hasNext() && c-- > 0) {
seed = dhtEnum.next();
if (seed == null) continue;
if (seed.isLastSeenTimeout(3600000)) continue;
if (Math.random() * 100 + burstRobinsonPercent >= 100) {
if (ConcurrentLog.isFine("DHT")) ConcurrentLog.fine("DHT", "selectPeers/RobinsonBurst: " + seed.hash + ":" + seed.getName());
regularSeeds.put(seed.hash, seed);
continue;
// select the maxount
Iterator<Seed> i = seedSelection.iterator();
int count = 0;
while (i.hasNext() && count++ < maxcount) {
seed = i.next();
if (RemoteSearch.log.isInfo()) {
RemoteSearch.log.info("selectPeers/extra: " + seed.hash + ":" + seed.getName() + ", " + seed.getLinkCount() + " URLs" +
(!seed.getFlagAcceptRemoteIndex() && seed.matchPeerTags(wordhashes) ? " ROBINSON" : "") +
(seed.getFlagRootNode() ? " NODE" : "") +
(seed.getAge() < 1 ? " FRESH" : "")
);
}
extraSeeds.add(seed);
}
}
return extraSeeds;
}
public static Set<Seed> selectDHTSearchTargets(final SeedDB seedDB, final HandleSet wordhashes, final int minage, final int redundancy) {
// put in seeds that are public robinson peers and where the peer tags match with query
// or seeds that are newbies to ensure that private demonstrations always work
dhtEnum = robinson.iterator();
while (dhtEnum.hasNext()) {
seed = dhtEnum.next();
if (seed == null) continue;
if (seed.isLastSeenTimeout(3600000)) continue;
if (seed.matchPeerTags(wordhashes)) {
// peer tags match
String specialized = seed.getPeerTags().toString();
if (specialized.equals("[*]")) {
ConcurrentLog.info("DHT", "selectPeers/RobinsonTag: " + seed.hash + ":" + seed.getName() + " grants search for all");
} else {
ConcurrentLog.info("DHT", "selectPeers/RobinsonTag " + seed.hash + ":" + seed.getName() + " is specialized peer for " + specialized);
}
regularSeeds.put(seed.hash, seed);
// put in seeds according to dht
Set<Seed> seeds = new HashSet<Seed>(); // dht position seeds
if (seedDB != null) {
Iterator<byte[]> iter = wordhashes.iterator();
while (iter.hasNext()) {
seeds.addAll(selectDHTPositions(seedDB, iter.next(), minage, redundancy));
}
//int minimumseeds = Math.min(seedDB.scheme.verticalPartitions(), regularSeeds.size()); // that should be the minimum number of seeds that are returned
//int maximumseeds = seedDB.scheme.verticalPartitions() * redundancy; // this is the maximum number of seeds according to dht and heuristics. It can be more using burst mode.
}
// produce return set
List<Seed> result = new ArrayList<Seed>(regularSeeds.size());
result.addAll(regularSeeds.values());
return result;
return seeds;
}
private static void selectDHTPositions(
final SeedDB seedDB,
byte[] wordhash,
int redundancy,
Map<String, Seed> regularSeeds) {
private static List<Seed> selectDHTPositions(final SeedDB seedDB, final byte[] wordhash, final int minage, final int redundancy) {
// this method is called from the search target computation
ArrayList<Seed> seeds = new ArrayList<Seed>(redundancy);
Seed seed;
for (int verticalPosition = 0; verticalPosition < seedDB.scheme.verticalPartitions(); verticalPosition++) {
long dhtVerticalTarget = seedDB.scheme.verticalDHTPosition(wordhash, verticalPosition);
wordhash = Distribution.positionToHash(dhtVerticalTarget);
Iterator<Seed> dhtEnum = getAcceptRemoteIndexSeeds(seedDB, wordhash, redundancy, false);
final long dhtVerticalTarget = seedDB.scheme.verticalDHTPosition(wordhash, verticalPosition);
final byte[] verticalhash = Distribution.positionToHash(dhtVerticalTarget);
final Iterator<Seed> dhtEnum = getAcceptRemoteIndexSeeds(seedDB, verticalhash, redundancy, false);
int c = Math.min(seedDB.sizeConnected(), redundancy);
int cc = 2; // select a maximum of 3, this is enough redundancy
int cc = 20; // in case that the network grows rapidly, we may jump to several additional peers but that must have a limit
while (dhtEnum.hasNext() && c > 0 && cc-- > 0) {
seed = dhtEnum.next();
if (seed == null || seed.hash == null) continue;
if (!seed.getFlagAcceptRemoteIndex()) continue; // probably a robinson peer
if (ConcurrentLog.isFine("DHT")) ConcurrentLog.fine("DHT", "selectPeers/DHTorder: " + seed.hash + ":" + seed.getName() + "/ score " + c);
regularSeeds.put(seed.hash, seed);
if (seed.getAge() < minage) continue; // prevent bad results because of too strong network growth
if (RemoteSearch.log.isInfo()) RemoteSearch.log.info("selectPeers/DHTorder: " + seed.hash + ":" + seed.getName() + "/ score " + c);
seeds.add(seed);
c--;
}
}
return seeds;
}
public static byte[] selectTransferStart() {
public static byte[] selectRandomTransferStart() {
return ASCII.getBytes(Base64Order.enhancedCoder.encode(Digest.encodeMD5Raw(Long.toString(System.currentTimeMillis()))).substring(2, 2 + Word.commonHashLength));
}
@ -248,105 +194,59 @@ public class DHTSelection {
return new acceptRemoteIndexSeedEnum(seedDB, starthash, Math.min(max, seedDB.sizeConnected()), alsoMyOwn);
}
private static class acceptRemoteIndexSeedEnum implements Iterator<Seed> {
private static class acceptRemoteIndexSeedEnum extends LookAheadIterator<Seed> implements Iterator<Seed>, Iterable<Seed> {
private final Iterator<Seed> se;
private Seed nextSeed;
private final SeedDB seedDB;
private final HandleSet doublecheck;
private int remaining;
private final boolean alsoMyOwn;
private acceptRemoteIndexSeedEnum(SeedDB seedDB, final byte[] starthash, int max, boolean alsoMyOwn) {
this.seedDB = seedDB;
this.se = getDHTSeeds(seedDB, starthash, yacyVersion.YACY_HANDLES_COLLECTION_INDEX, alsoMyOwn);
this.se = new seedDHTEnum(seedDB, starthash, alsoMyOwn);
this.remaining = max;
this.doublecheck = new RowHandleSet(12, Base64Order.enhancedCoder, 0);
this.nextSeed = nextInternal();
this.alsoMyOwn = alsoMyOwn;
}
@Override
public boolean hasNext() {
return this.nextSeed != null;
}
private Seed nextInternal() {
protected Seed next0() {
if (this.remaining <= 0) return null;
Seed s;
Seed s = null;
try {
while (this.se.hasNext()) {
s = this.se.next();
if (s == null) return null;
byte[] hashb = ASCII.getBytes(s.hash);
if (this.doublecheck.has(hashb)) return null;
try {
this.doublecheck.put(hashb);
} catch (final SpaceExceededException e) {
ConcurrentLog.logException(e);
break;
}
if (s.getFlagAcceptRemoteIndex() ||
(this.alsoMyOwn && s.hash.equals(this.seedDB.mySeed().hash)) // Accept own peer regardless of FlagAcceptRemoteIndex
) {
this.remaining--;
return s;
break;
}
}
return s;
} catch (final kelondroException e) {
System.out.println("DEBUG acceptRemoteIndexSeedEnum:" + e.getMessage());
Network.log.severe("database inconsistency (" + e.getMessage() + "), re-set of db.");
this.seedDB.resetActiveTable();
return null;
}
return null;
}
@Override
public Seed next() {
final Seed next = this.nextSeed;
this.nextSeed = nextInternal();
return next;
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
}
/**
* enumerate seeds for DHT target positions
* @param seedDB
* @param firstHash
* @param minVersion
* @return
*/
protected static Iterator<Seed> getDHTSeeds(final SeedDB seedDB, final byte[] firstHash, final double minVersion) {
// enumerates seed-type objects: all seeds with starting point in the middle, rotating at the end/beginning
return new seedDHTEnum(seedDB, firstHash, minVersion, false);
}
protected static Iterator<Seed> getDHTSeeds(final SeedDB seedDB, final byte[] firstHash, final double minVersion, final boolean alsoMyOwn) {
// enumerates seed-type objects: all seeds with starting point in the middle, rotating at the end/beginning
return new seedDHTEnum(seedDB, firstHash, minVersion, alsoMyOwn);
}
private static class seedDHTEnum implements Iterator<Seed> {
private Iterator<Seed> e;
private int steps;
private final double minVersion;
private final SeedDB seedDB;
private boolean alsoMyOwn;
private int pass, insertOwnInPass;
private Seed nextSeed;
private seedDHTEnum(final SeedDB seedDB, final byte[] firstHash, final double minVersion, final boolean alsoMyOwn) {
private seedDHTEnum(final SeedDB seedDB, final byte[] firstHash, final boolean alsoMyOwn) {
this.seedDB = seedDB;
this.steps = seedDB.sizeConnected() + ((alsoMyOwn) ? 1 : 0);
this.minVersion = minVersion;
this.e = seedDB.seedsConnected(true, false, firstHash, minVersion);
this.e = seedDB.seedsConnected(true, false, firstHash, yacyVersion.YACY_HANDLES_COLLECTION_INDEX);
this.pass = 1;
this.alsoMyOwn = alsoMyOwn;
if (alsoMyOwn) {
@ -367,7 +267,8 @@ public class DHTSelection {
this.steps--;
if (!this.e.hasNext() && this.pass == 1) {
this.e = this.seedDB.seedsConnected(true, false, null, this.minVersion);
// rotate from the beginning; this closes the ordering of the DHT at the ends
this.e = this.seedDB.seedsConnected(true, false, null, yacyVersion.YACY_HANDLES_COLLECTION_INDEX);
this.pass = 2;
}
if (this.e.hasNext()) {
@ -408,24 +309,17 @@ public class DHTSelection {
return new providesRemoteCrawlURLsEnum(seedDB);
}
private static class providesRemoteCrawlURLsEnum implements Iterator<Seed> {
private static class providesRemoteCrawlURLsEnum extends LookAheadIterator<Seed> implements Iterator<Seed>, Iterable<Seed> {
private final Iterator<Seed> se;
private Seed nextSeed;
private final SeedDB seedDB;
private providesRemoteCrawlURLsEnum(final SeedDB seedDB) {
this.seedDB = seedDB;
this.se = getDHTSeeds(seedDB, null, yacyVersion.YACY_POVIDES_REMOTECRAWL_LISTS);
this.nextSeed = nextInternal();
this.se = seedDB.seedsConnected(true, false, null, yacyVersion.YACY_POVIDES_REMOTECRAWL_LISTS);
}
@Override
public boolean hasNext() {
return this.nextSeed != null;
}
private Seed nextInternal() {
protected Seed next0() {
Seed s;
try {
while (this.se.hasNext()) {
@ -442,18 +336,6 @@ public class DHTSelection {
return null;
}
@Override
public Seed next() {
final Seed next = this.nextSeed;
this.nextSeed = nextInternal();
return next;
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
}
/**
@ -485,6 +367,4 @@ public class DHTSelection {
}
}
}

@ -36,7 +36,6 @@ import net.yacy.cora.document.encoding.ASCII;
import net.yacy.cora.federate.yacy.Distribution;
import net.yacy.cora.order.Base64Order;
import net.yacy.cora.storage.HandleSet;
import net.yacy.cora.util.ByteArray;
import net.yacy.cora.util.ConcurrentLog;
import net.yacy.cora.util.SpaceExceededException;
import net.yacy.kelondro.data.meta.URIMetadataRow;
@ -83,7 +82,7 @@ public class Dispatcher {
// a cloud is a cache for the objects that wait to be transmitted
// the String-key is the primary target as contained in the Entry
private Map<ByteArray, Transmission.Chunk> transmissionCloud;
private Map<String, Transmission.Chunk> transmissionCloud;
// the segment backend is used to store the remaining indexContainers in case that the object is closed
private final Segment segment;
@ -106,7 +105,7 @@ public class Dispatcher {
final boolean gzipBody,
final int timeout
) {
this.transmissionCloud = new ConcurrentHashMap<ByteArray, Transmission.Chunk>();
this.transmissionCloud = new ConcurrentHashMap<String, Transmission.Chunk>();
this.segment = segment;
this.seeds = seeds;
this.log = new ConcurrentLog("INDEX-TRANSFER-DISPATCHER");
@ -238,21 +237,9 @@ public class Dispatcher {
for (int i = 0; i < partitions.length; i++) partitions[i] = new ArrayList<ReferenceContainer<WordReference>>();
// check all entries and split them to the partitions
final ReferenceContainer<WordReference>[] partitionBuffer = new ReferenceContainer[partitionCount];
WordReference re;
for (final ReferenceContainer<WordReference> container: containers) {
// init the new partitions
for (int j = 0; j < partitionBuffer.length; j++) {
partitionBuffer[j] = new ReferenceContainer<WordReference>(Segment.wordReferenceFactory, container.getTermHash(), container.size() / partitionCount);
}
// split the container
final Iterator<WordReference> i = container.entries();
while (i.hasNext()) {
re = i.next();
if (re == null) continue;
partitionBuffer[this.seeds.scheme.verticalDHTPosition(re.urlhash())].add(re);
}
final ReferenceContainer<WordReference>[] partitionBuffer = splitContainer(container);
// add the containers to the result vector
for (int j = 0; j < partitionBuffer.length; j++) {
@ -261,6 +248,31 @@ public class Dispatcher {
}
return partitions;
}
private ReferenceContainer<WordReference>[] splitContainer(final ReferenceContainer<WordReference> container) throws SpaceExceededException {
// init the result vector
final int partitionCount = this.seeds.scheme.verticalPartitions();
// check all entries and split them to the partitions
@SuppressWarnings("unchecked")
final ReferenceContainer<WordReference>[] partitionBuffer = new ReferenceContainer[partitionCount];
// init the new partitions
for (int j = 0; j < partitionBuffer.length; j++) {
partitionBuffer[j] = new ReferenceContainer<WordReference>(Segment.wordReferenceFactory, container.getTermHash(), container.size() / partitionCount);
}
// split the container
final Iterator<WordReference> i = container.entries();
while (i.hasNext()) {
WordReference wordReference = i.next();
if (wordReference == null) continue;
partitionBuffer[this.seeds.scheme.verticalDHTPosition(wordReference.urlhash())].add(wordReference);
}
return partitionBuffer;
}
/**
* PROCESS(3) and PROCESS(4)
@ -275,39 +287,28 @@ public class Dispatcher {
private void enqueueContainersToCloud(final List<ReferenceContainer<WordReference>>[] containers) {
assert (containers.length == this.seeds.scheme.verticalPartitions());
if (this.transmissionCloud == null) return;
ReferenceContainer<WordReference> lastContainer;
byte[] primaryTarget;
ByteArray pTArray;
String primaryTargetString;
Transmission.Chunk entry;
for (int vertical = 0; vertical < containers.length; vertical++) {
// the 'new' primary target is the word hash of the last container in the array
lastContainer = containers[vertical].get(containers[vertical].size() - 1);
List<ReferenceContainer<WordReference>> verticalList = containers[vertical];
ReferenceContainer<WordReference> lastContainer = verticalList.get(verticalList.size() - 1);
primaryTarget = Distribution.positionToHash(this.seeds.scheme.verticalDHTPosition(lastContainer.getTermHash(), vertical));
assert primaryTarget[2] != '@';
pTArray = new ByteArray(primaryTarget);
primaryTargetString = ASCII.String(primaryTarget);
// get or make a entry object
entry = this.transmissionCloud.get(pTArray); // if this is not null, the entry is extended here
entry = this.transmissionCloud.get(primaryTargetString); // if this is not null, the entry is extended here
final List<Seed> targets = DHTSelection.getAcceptRemoteIndexSeedsList(
this.seeds,
primaryTarget,
this.seeds.redundancy() * 3,
true);
this.log.info("enqueueContainers: selected " + targets.size() + " targets for primary target key " + ASCII.String(primaryTarget) + "/" + vertical + " with " + containers[vertical].size() + " index containers.");
if (entry == null) entry = this.transmission.newChunk(primaryTarget, targets);
/*/ lookup targets
int sc = 1;
for (yacySeed seed : targets) {
if(seed == null) continue;
if(seed == seeds.mySeed()) this.log.logInfo("enqueueContainers: myself-target at position " + sc);
this.log.logInfo("enqueueContainers: primaryTarget distance at position " + sc + ": " + FlatWordPartitionScheme.std.dhtDistance(primaryTarget, null, seed));
this.log.logInfo("enqueueContainers: distance to first container at position " + sc + ": " + FlatWordPartitionScheme.std.dhtDistance(FlatWordPartitionScheme.positionToHash(this.seeds.scheme.dhtPosition(containers[vertical].get(0).getTermHash(), vertical)), null, seed));
sc++;
}*/
this.log.info("enqueueContainers: selected " + targets.size() + " targets for primary target key " + primaryTargetString + "/" + vertical + " with " + verticalList.size() + " index containers.");
if (entry == null) entry = this.transmission.newChunk(primaryTargetString, targets);
// fill the entry with the containers
for (final ReferenceContainer<WordReference> c: containers[vertical]) {
for (final ReferenceContainer<WordReference> c: verticalList) {
try {
entry.add(c);
} catch (final SpaceExceededException e) {
@ -317,7 +318,7 @@ public class Dispatcher {
}
// put the entry into the cloud
if (this.transmissionCloud != null && entry.containersSize() > 0) this.transmissionCloud.put(pTArray, entry);
if (this.transmissionCloud != null && entry.containersSize() > 0) this.transmissionCloud.put(primaryTargetString, entry);
}
}
@ -343,7 +344,7 @@ public class Dispatcher {
return false;
}
List<ReferenceContainer<WordReference>>[] splitContainerCache;
List<ReferenceContainer<WordReference>>[] splitContainerCache; // for each vertical partition a set of word references within a reference container
try {
splitContainerCache = splitContainers(selectedContainerCache);
} catch (final SpaceExceededException e) {
@ -375,9 +376,9 @@ public class Dispatcher {
public boolean dequeueContainer() {
if (this.transmissionCloud == null) return false;
if (this.indexingTransmissionProcessor.getQueueSize() > this.indexingTransmissionProcessor.getMaxConcurrency()) return false;
ByteArray maxtarget = null;
String maxtarget = null;
int maxsize = -1;
for (final Map.Entry<ByteArray, Transmission.Chunk> chunk: this.transmissionCloud.entrySet()) {
for (final Map.Entry<String, Transmission.Chunk> chunk: this.transmissionCloud.entrySet()) {
if (chunk.getValue().containersSize() > maxsize) {
maxsize = chunk.getValue().containersSize();
maxtarget = chunk.getKey();
@ -403,17 +404,17 @@ public class Dispatcher {
if (success && chunk.isFinished()) {
// finished with this queue!
this.log.info("STORE: Chunk " + ASCII.String(chunk.primaryTarget()) + " has FINISHED all transmissions!");
this.log.info("STORE: Chunk " + chunk.primaryTarget() + " has FINISHED all transmissions!");
return chunk;
}
if (!success) this.log.info("STORE: Chunk " + ASCII.String(chunk.primaryTarget()) + " has failed to transmit index; marked peer as busy");
if (!success) this.log.info("STORE: Chunk " + chunk.primaryTarget() + " has failed to transmit index; marked peer as busy");
if (chunk.canFinish()) {
if (this.indexingTransmissionProcessor != null) this.indexingTransmissionProcessor.enQueue(chunk);
return chunk;
}
this.log.info("STORE: Chunk " + ASCII.String(chunk.primaryTarget()) + " has not enough targets left. This transmission has failed, putting back index to backend");
this.log.info("STORE: Chunk " + chunk.primaryTarget() + " has not enough targets left. This transmission has failed, putting back index to backend");
chunk.restore();
return null;
}
@ -422,7 +423,7 @@ public class Dispatcher {
// removes all entries from the dispatcher and puts them back to a RAMRI
if (this.indexingTransmissionProcessor != null) this.indexingTransmissionProcessor.shutdown();
if (this.transmissionCloud != null) {
outerLoop: for (final Map.Entry<ByteArray, Transmission.Chunk> e : this.transmissionCloud.entrySet()) {
outerLoop: for (final Map.Entry<String, Transmission.Chunk> e : this.transmissionCloud.entrySet()) {
for (final ReferenceContainer<WordReference> i : e.getValue()) try {
this.segment.storeRWI(i);
} catch (final Exception e1) {

@ -24,9 +24,8 @@
package net.yacy.peers;
import java.util.HashSet;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.SortedMap;
@ -35,6 +34,7 @@ import org.apache.solr.client.solrj.SolrQuery;
import net.yacy.cora.document.encoding.ASCII;
import net.yacy.cora.storage.HandleSet;
import net.yacy.cora.util.ConcurrentLog;
import net.yacy.kelondro.util.MemoryControl;
import net.yacy.repository.Blacklist;
import net.yacy.search.Switchboard;
import net.yacy.search.SwitchboardConstants;
@ -46,7 +46,8 @@ import net.yacy.search.query.SecondarySearchSuperviser;
public class RemoteSearch extends Thread {
private static final ThreadGroup ysThreadGroup = new ThreadGroup("yacySearchThreadGroup");
public static final ConcurrentLog log = new ConcurrentLog("DHT");
final private SearchEvent event;
final private String wordhashes, excludehashes, contentdom;
final private int partitions;
@ -134,53 +135,58 @@ public class RemoteSearch extends Thread {
final int start, final int count,
final long time,
final Blacklist blacklist,
final SortedMap<byte[], String> clusterselection,
final int burstRobinsonPercent,
final int burstMultiwordPercent) {
final SortedMap<byte[], String> clusterselection) {
// check own peer status
//if (wordIndex.seedDB.mySeed() == null || wordIndex.seedDB.mySeed().getPublicAddress() == null) { return null; }
// check the peer memory and lifesign-situation to get a scaling for the number of remote search processes
final boolean shortmem = MemoryControl.shortStatus();
final int indexingQueueSize = event.query.getSegment().fulltext().bufferSize();
int redundancy = event.peers.redundancy();
if (indexingQueueSize > 10) redundancy = Math.max(1, redundancy - 1);
if (indexingQueueSize > 50) redundancy = Math.max(1, redundancy - 1);
int minage = 3;
int robinsoncount = event.peers.scheme.verticalPartitions() * redundancy / 2;
if (indexingQueueSize > 10) robinsoncount = Math.max(1, robinsoncount / 2);
if (indexingQueueSize > 50) robinsoncount = Math.max(1, robinsoncount / 2);
if (shortmem) {redundancy = 1; robinsoncount = 1;}
// prepare seed targets and threads
final List<Seed> dhtPeers =
final Set<Seed> dhtPeers =
(clusterselection == null) ?
DHTSelection.selectSearchTargets(
DHTSelection.selectDHTSearchTargets(
event.peers,
event.query.getQueryGoal().getIncludeHashes(),
event.peers.redundancy(),
burstRobinsonPercent,
burstMultiwordPercent)
minage,
redundancy)
: DHTSelection.selectClusterPeers(event.peers, clusterselection);
if (dhtPeers == null) return;
// find nodes
Set<Seed> omit = new HashSet<Seed>();
for (Seed s: dhtPeers) omit.add(s);
List<Seed> nodePeers = DHTSelection.selectNodeSearchTargets(event.peers, 20, omit);
// remove all robinson peers from the dhtPeers and put them to the nodes
Iterator<Seed> si = dhtPeers.iterator();
while (si.hasNext()) {
Seed s = si.next();
if (!s.getFlagAcceptRemoteIndex()) {
si.remove();
nodePeers.add(s);
// select node targets
final Collection<Seed> robinsonPeers = DHTSelection.selectExtraTargets(event.peers, event.query.getQueryGoal().getIncludeHashes(), minage, dhtPeers, robinsoncount);
if (event.peers != null) {
if (Switchboard.getSwitchboard().getConfigBool(SwitchboardConstants.DEBUG_SEARCH_REMOTE_DHT_TESTLOCAL, false)) {
dhtPeers.clear();
dhtPeers.add(event.peers.mySeed());
}
if (Switchboard.getSwitchboard().getConfigBool(SwitchboardConstants.DEBUG_SEARCH_REMOTE_SOLR_TESTLOCAL, false)) {
robinsonPeers.clear();
robinsonPeers.add(event.peers.mySeed());
}
}
if (Switchboard.getSwitchboard().getConfigBool(SwitchboardConstants.DEBUG_SEARCH_REMOTE_DHT_TESTLOCAL, false)) {
dhtPeers.clear();
dhtPeers.add(event.peers.mySeed());
}
log.info("preparing remote search: shortmem=" + (shortmem ? "true" : "false") + ", indexingQueueSize=" + indexingQueueSize +
", redundancy=" + redundancy + ", minage=" + minage + ", robinsoncount=" + robinsoncount + ", dhtPeers=" + dhtPeers.size() + ", robinsonpeers=" + robinsonPeers.size());
if (Switchboard.getSwitchboard().getConfigBool(SwitchboardConstants.DEBUG_SEARCH_REMOTE_SOLR_TESTLOCAL, false)) {
nodePeers.clear();
nodePeers.add(event.peers.mySeed());
}
// start solr searches
final int targets = dhtPeers.size() + nodePeers.size();
final int targets = dhtPeers.size() + robinsonPeers.size();
if (!Switchboard.getSwitchboard().getConfigBool(SwitchboardConstants.DEBUG_SEARCH_REMOTE_SOLR_OFF, false)) {
final SolrQuery solrQuery = event.query.solrQuery(event.getQuery().contentdom, start == 0, event.excludeintext_image);
for (Seed s: nodePeers) {
for (Seed s: robinsonPeers) {
Thread t = solrRemoteSearch(event, solrQuery, start, count, s, targets, blacklist);
event.nodeSearchThreads.add(t);
}
@ -188,8 +194,8 @@ public class RemoteSearch extends Thread {
// start search to YaCy DHT peers
if (!Switchboard.getSwitchboard().getConfigBool(SwitchboardConstants.DEBUG_SEARCH_REMOTE_DHT_OFF, false)) {
for (int i = 0; i < dhtPeers.size(); i++) {
if (dhtPeers.get(i) == null || dhtPeers.get(i).hash == null) continue;
for (Seed dhtPeer: dhtPeers) {
if (dhtPeer == null || dhtPeer.hash == null) continue;
try {
RemoteSearch rs = new RemoteSearch(
event,
@ -201,7 +207,7 @@ public class RemoteSearch extends Thread {
time,
event.query.maxDistance,
targets,
dhtPeers.get(i),
dhtPeer,
event.secondarySearchSuperviser,
blacklist);
rs.start();

@ -220,7 +220,7 @@ public final class SeedDB implements AlternativeDomainNames {
}
public int redundancy() {
if (this.mySeed.isJunior()) return 1;
if (this.mySeed.isVirgin()) return 1;
return this.netRedundancy;
}

@ -69,7 +69,7 @@ public class Transmission {
this.timeout4Transfer = timeout4Transfer;
}
public Chunk newChunk(final byte[] primaryTarget, final List<Seed> targets) {
public Chunk newChunk(final String primaryTarget, final List<Seed> targets) {
return new Chunk(primaryTarget, targets);
}
@ -84,7 +84,7 @@ public class Transmission {
* - a set of yacy seeds which will shrink as the containers are transmitted to them
* - a counter that gives the number of sucessful and unsuccessful transmissions so far
*/
private final byte[] primaryTarget;
private final String primaryTarget;
private final ReferenceContainerCache<WordReference> containers;
private final HandleSet references;
private final HandleSet badReferences;
@ -98,7 +98,7 @@ public class Transmission {
* @param primaryTarget
* @param targets
*/
public Chunk(final byte[] primaryTarget, final List<Seed> targets) {
public Chunk(final String primaryTarget, final List<Seed> targets) {
super();
this.primaryTarget = primaryTarget;
this.containers = new ReferenceContainerCache<WordReference>(Segment.wordReferenceFactory, Segment.wordOrder, Word.commonHashLength);
@ -204,7 +204,7 @@ public class Transmission {
return this.containers.size();
}
public byte[] primaryTarget() {
public String primaryTarget() {
return this.primaryTarget;
}
@ -245,7 +245,7 @@ public class Transmission {
Transmission.this.log.info("Transfer of chunk to myself-target");
return true;
}
Transmission.this.log.info("starting new index transmission request to " + ASCII.String(this.primaryTarget));
Transmission.this.log.info("starting new index transmission request to " + this.primaryTarget);
final long start = System.currentTimeMillis();
final String error = Protocol.transferIndex(target, this.containers, this.references, Transmission.this.segment, Transmission.this.gzipBody4Transfer, Transmission.this.timeout4Transfer);
if (error == null) {
@ -254,7 +254,7 @@ public class Transmission {
final Iterator<ReferenceContainer<WordReference>> i = this.containers.iterator();
final ReferenceContainer<WordReference> firstContainer = (i == null) ? null : i.next();
Transmission.this.log.info("Index transfer of " + this.containers.size() +
" words [" + ((firstContainer == null) ? null : ASCII.String(firstContainer.getTermHash())) + " .. " + ASCII.String(this.primaryTarget) + "]" +
" words [" + ((firstContainer == null) ? null : ASCII.String(firstContainer.getTermHash())) + " .. " + this.primaryTarget + "]" +
" and " + this.references.size() + " URLs" +
" to peer " + target.getName() + ":" + target.hash +
" in " + (transferTime / 1000) +

@ -2034,6 +2034,7 @@ public final class Switchboard extends serverSwitch {
public boolean cleanupJob() {
ConcurrentLog.ensureWorkerIsRunning();
try {
// flush caches in used libraries
pdfParser.clean_up_idiotic_PDFParser_font_cache_which_eats_up_tons_of_megabytes(); // eats up megabytes, see http://markmail.org/thread/quk5odee4hbsauhu
@ -3416,7 +3417,7 @@ public final class Switchboard extends serverSwitch {
byte[] startHash = null, limitHash = null;
int tries = 10;
while ( tries-- > 0 ) {
startHash = DHTSelection.selectTransferStart();
startHash = DHTSelection.selectRandomTransferStart();
assert startHash != null;
limitHash = DHTSelection.limitOver(this.peers, startHash);
if ( limitHash != null ) {

@ -303,8 +303,6 @@ public final class SwitchboardConstants {
public static final String DHT_ENABLED = "network.unit.dht";
public static final String DHT_BURST_ROBINSON = "network.unit.dht.burst.robinson";
public static final String DHT_BURST_MULTIWORD = "network.unit.dht.burst.multiword";
public static final String REMOTESEARCH_MAXCOUNT_DEFAULT = "network.unit.remotesearch.maxcount";
public static final String REMOTESEARCH_MAXTIME_DEFAULT = "network.unit.remotesearch.maxtime";

@ -227,7 +227,11 @@ public final class Fulltext {
return this.solrInstances.getMirrorConnector(WebgraphSchema.CORE_NAME);
}
}
public int bufferSize() {
return this.solrInstances.bufferSize();
}
public void clearCaches() {
if (this.urlIndexFile != null && this.urlIndexFile instanceof Cache) ((Cache) this.urlIndexFile).clearCache();
if (this.statsDump != null) this.statsDump.clear();

@ -194,8 +194,6 @@ public final class SearchEvent {
final LoaderDispatcher loader,
final int remote_maxcount,
final long remote_maxtime,
final int burstRobinsonPercent,
final int burstMultiwordPercent,
final boolean deleteIfSnippetFail) {
long ab = MemoryControl.available();
@ -321,9 +319,7 @@ public final class SearchEvent {
0, remote_maxcount,
remote_maxtime,
Switchboard.urlBlacklist,
(SearchEvent.this.query.domType == QueryParams.Searchdom.GLOBAL) ? null : preselectedPeerHashes,
burstRobinsonPercent,
burstMultiwordPercent);
(SearchEvent.this.query.domType == QueryParams.Searchdom.GLOBAL) ? null : preselectedPeerHashes);
}
}.start();
}

@ -139,9 +139,7 @@ public class SearchEventCache {
final boolean generateAbstracts,
final LoaderDispatcher loader,
final int remote_maxcount,
final long remote_maxtime,
final int burstRobinsonPercent,
final int burstMultiwordPercent) {
final long remote_maxtime) {
if (MemoryControl.shortStatus()) cleanupEvents(true);
final String id = query.id(false);
@ -173,7 +171,7 @@ public class SearchEventCache {
// start a new event
Switchboard sb = Switchboard.getSwitchboard();
final boolean delete = sb == null || Switchboard.getSwitchboard().getConfigBool(SwitchboardConstants.SEARCH_VERIFY_DELETE, true);
event = new SearchEvent(query, peers, workTables, preselectedPeerHashes, generateAbstracts, loader, remote_maxcount, remote_maxtime, burstRobinsonPercent, burstMultiwordPercent, delete);
event = new SearchEvent(query, peers, workTables, preselectedPeerHashes, generateAbstracts, loader, remote_maxcount, remote_maxtime, delete);
MemoryControl.request(100 * 1024 * 1024, false); // this may trigger a short memory status which causes a reducing of cache space of other threads
}

Loading…
Cancel
Save