From f8ce7040abb969b0c623ef21399f5dae4ceeb762 Mon Sep 17 00:00:00 2001 From: Michael Peter Christen Date: Thu, 16 Jan 2014 17:27:14 +0100 Subject: [PATCH] remote search peer selection schema change: - all non-dht targets (previously separated into 'robinson' for dht-like queries and 'node' for solr queries) are non 'extra' peers, which are queries using solr - these extra-peers are now selected using a ranking on last-seen, peer-tag-matches, node-peer flags, peer age, and link count. The ranking is done using a weight and a random factor. - the number of extra peers is 50% of the dht peers - the dht peers now exclude too young peers to prevent bad results during strong growth of the network - the number of dht peers (and therefore extra-peers) is reduced when the memory of the peer is low and/or some documents still appear in the indexing-queue. This shall prevent a peer from deadlocks when p2p queries are made in a fast sequence on weak hardware. --- defaults/yacy.network.freeworld.unit | 16 - htroot/IndexControlRWIs_p.java | 3 +- htroot/yacy/search.java | 2 +- htroot/yacysearch.java | 4 +- .../solr/connector/CachedSolrConnector.java | 6 + .../ConcurrentUpdateSolrConnector.java | 5 + .../solr/connector/EmbeddedSolrConnector.java | 6 + .../solr/connector/MirrorSolrConnector.java | 8 + .../solr/connector/RemoteSolrConnector.java | 5 + .../solr/connector/SolrConnector.java | 7 +- .../solr/instance/InstanceMirror.java | 11 +- .../yacy/cora/federate/yacy/Distribution.java | 5 +- source/net/yacy/peers/DHTSelection.java | 292 ++++++------------ source/net/yacy/peers/Dispatcher.java | 87 +++--- source/net/yacy/peers/RemoteSearch.java | 80 ++--- source/net/yacy/peers/SeedDB.java | 2 +- source/net/yacy/peers/Transmission.java | 12 +- source/net/yacy/search/Switchboard.java | 3 +- .../net/yacy/search/SwitchboardConstants.java | 2 - source/net/yacy/search/index/Fulltext.java | 6 +- source/net/yacy/search/query/SearchEvent.java | 6 +- .../yacy/search/query/SearchEventCache.java | 6 +- 22 files changed, 241 insertions(+), 333 deletions(-) diff --git a/defaults/yacy.network.freeworld.unit b/defaults/yacy.network.freeworld.unit index f411e2d0d..49cf670ac 100644 --- a/defaults/yacy.network.freeworld.unit +++ b/defaults/yacy.network.freeworld.unit @@ -45,22 +45,6 @@ network.unit.dht.partitionExponent = 4 # to not-handshaking in a distributed way. It means to get data without using # a dht transmission logic. -# robinson burst: percentage of the number of robinson peers that -# shall be accessed for every search. This includes also robinson peers -# that do not have a matching peer tag. If this is set to 100 then all robinson -# peers are always asked -network.unit.dht.burst.robinson = 50 - -# multi-word burst: percentage of the number of all peers that -# shall be accessed for multi-word searches. Multi-word search is -# a hard problem when the distributed search network is divided by -# term (as done with yacy, partly..). -# Scientific solutions for this problem is to apply heuristics. -# This heuristic enables to switch on a full network scan to get also -# non-distributed multi-word positions. For a full scan set this value to 100. -# Attention: this may out-number the maxcount of available httpc network connections. -network.unit.dht.burst.multiword = 30 - # switch to enable verification of search results # must be set to true in untrusted networks and can be # set to false in completely trusted networks diff --git a/htroot/IndexControlRWIs_p.java b/htroot/IndexControlRWIs_p.java index 7dead7440..33b63489a 100644 --- a/htroot/IndexControlRWIs_p.java +++ b/htroot/IndexControlRWIs_p.java @@ -61,7 +61,6 @@ import net.yacy.peers.Seed; import net.yacy.repository.Blacklist; import net.yacy.repository.Blacklist.BlacklistType; import net.yacy.search.Switchboard; -import net.yacy.search.SwitchboardConstants; import net.yacy.search.index.Segment; import net.yacy.search.query.QueryGoal; import net.yacy.search.query.QueryModifier; @@ -639,7 +638,7 @@ public class IndexControlRWIs_p { false, 0.0d, 0.0d, 0.0d, new String[0]); - final SearchEvent theSearch = SearchEventCache.getEvent(query, sb.peers, sb.tables, null, false, sb.loader, Integer.MAX_VALUE, Long.MAX_VALUE, (int) sb.getConfigLong(SwitchboardConstants.DHT_BURST_ROBINSON, 0), (int) sb.getConfigLong(SwitchboardConstants.DHT_BURST_MULTIWORD, 0)); + final SearchEvent theSearch = SearchEventCache.getEvent(query, sb.peers, sb.tables, null, false, sb.loader, Integer.MAX_VALUE, Long.MAX_VALUE); if (theSearch.rwiProcess != null && theSearch.rwiProcess.isAlive()) try {theSearch.rwiProcess.join();} catch (final InterruptedException e) {} if (theSearch.local_rwi_available.get() == 0) { prop.put("searchresult", 2); diff --git a/htroot/yacy/search.java b/htroot/yacy/search.java index 59ca3f59c..9078d26c5 100644 --- a/htroot/yacy/search.java +++ b/htroot/yacy/search.java @@ -323,7 +323,7 @@ public final class search { EventChannel.channels(EventChannel.REMOTESEARCH).addMessage(new RSSMessage("Remote Search Request from " + ((remoteSeed == null) ? "unknown" : remoteSeed.getName()), QueryParams.anonymizedQueryHashes(theQuery.getQueryGoal().getIncludeHashes()), "")); // make event - theSearch = SearchEventCache.getEvent(theQuery, sb.peers, sb.tables, null, abstracts.length() > 0, sb.loader, count, maxtime, (int) sb.getConfigLong(SwitchboardConstants.DHT_BURST_ROBINSON, 0), (int) sb.getConfigLong(SwitchboardConstants.DHT_BURST_MULTIWORD, 0)); + theSearch = SearchEventCache.getEvent(theQuery, sb.peers, sb.tables, null, abstracts.length() > 0, sb.loader, count, maxtime); if (theSearch.rwiProcess != null && theSearch.rwiProcess.isAlive()) try {theSearch.rwiProcess.join();} catch (final InterruptedException e) {} // set statistic details of search result and find best result index set diff --git a/htroot/yacysearch.java b/htroot/yacysearch.java index 2e8992814..3c059907d 100644 --- a/htroot/yacysearch.java +++ b/htroot/yacysearch.java @@ -718,9 +718,7 @@ public class yacysearch { sb.getConfigLong(SwitchboardConstants.REMOTESEARCH_MAXCOUNT_DEFAULT, 10)), sb.getConfigLong( SwitchboardConstants.REMOTESEARCH_MAXTIME_USER, - sb.getConfigLong(SwitchboardConstants.REMOTESEARCH_MAXTIME_DEFAULT, 3000)), - (int) sb.getConfigLong(SwitchboardConstants.DHT_BURST_ROBINSON, 0), - (int) sb.getConfigLong(SwitchboardConstants.DHT_BURST_MULTIWORD, 0)); + sb.getConfigLong(SwitchboardConstants.REMOTESEARCH_MAXTIME_DEFAULT, 3000))); if ( startRecord == 0 ) { if ( modifier.sitehost != null && sb.getConfigBool(SwitchboardConstants.HEURISTIC_SITE, false) && authenticated && !stealthmode) { diff --git a/source/net/yacy/cora/federate/solr/connector/CachedSolrConnector.java b/source/net/yacy/cora/federate/solr/connector/CachedSolrConnector.java index 10fec1993..a22ecf4ed 100644 --- a/source/net/yacy/cora/federate/solr/connector/CachedSolrConnector.java +++ b/source/net/yacy/cora/federate/solr/connector/CachedSolrConnector.java @@ -61,6 +61,12 @@ public class CachedSolrConnector extends AbstractSolrConnector implements SolrCo this.missCache = new ConcurrentARC(missCacheMax, partitions); } + @Override + public int bufferSize() { + return solr.bufferSize(); + } + + @Override public void clearCaches() { this.hitCache.clear(); this.missCache.clear(); diff --git a/source/net/yacy/cora/federate/solr/connector/ConcurrentUpdateSolrConnector.java b/source/net/yacy/cora/federate/solr/connector/ConcurrentUpdateSolrConnector.java index 1775859b4..c63a0663d 100644 --- a/source/net/yacy/cora/federate/solr/connector/ConcurrentUpdateSolrConnector.java +++ b/source/net/yacy/cora/federate/solr/connector/ConcurrentUpdateSolrConnector.java @@ -139,6 +139,11 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector { ensureAliveUpdateHandler(); } + @Override + public int bufferSize() { + return this.updateQueue.size() + this.deleteQueue.size(); + } + @Override public void clearCaches() { this.connector.clearCaches(); diff --git a/source/net/yacy/cora/federate/solr/connector/EmbeddedSolrConnector.java b/source/net/yacy/cora/federate/solr/connector/EmbeddedSolrConnector.java index dd5c88cec..c0eb2432e 100644 --- a/source/net/yacy/cora/federate/solr/connector/EmbeddedSolrConnector.java +++ b/source/net/yacy/cora/federate/solr/connector/EmbeddedSolrConnector.java @@ -102,7 +102,13 @@ public class EmbeddedSolrConnector extends SolrServerConnector implements SolrCo this.requestHandler.inform(this.core); super.init(this.instance.getServer(coreName)); } + + @Override + public int bufferSize() { + return 0; + } + @Override public void clearCaches() { SolrConfig solrConfig = this.core.getSolrConfig(); @SuppressWarnings("unchecked") diff --git a/source/net/yacy/cora/federate/solr/connector/MirrorSolrConnector.java b/source/net/yacy/cora/federate/solr/connector/MirrorSolrConnector.java index 3ee1e8519..c9ebd74b4 100644 --- a/source/net/yacy/cora/federate/solr/connector/MirrorSolrConnector.java +++ b/source/net/yacy/cora/federate/solr/connector/MirrorSolrConnector.java @@ -53,6 +53,14 @@ public class MirrorSolrConnector extends AbstractSolrConnector implements SolrCo this.solr0 = solr0; this.solr1 = solr1; } + + @Override + public int bufferSize() { + int b = 0; + if (this.solr0 != null) b += this.solr0.bufferSize(); + if (this.solr1 != null) b += this.solr1.bufferSize(); + return b; + } @Override public void clearCaches() { diff --git a/source/net/yacy/cora/federate/solr/connector/RemoteSolrConnector.java b/source/net/yacy/cora/federate/solr/connector/RemoteSolrConnector.java index f2ad6c6dc..e620cee9b 100644 --- a/source/net/yacy/cora/federate/solr/connector/RemoteSolrConnector.java +++ b/source/net/yacy/cora/federate/solr/connector/RemoteSolrConnector.java @@ -74,6 +74,11 @@ public class RemoteSolrConnector extends SolrServerConnector implements SolrConn public synchronized void close() { super.close(); } + + @Override + public int bufferSize() { + return 0; + } @Override public void clearCaches() { diff --git a/source/net/yacy/cora/federate/solr/connector/SolrConnector.java b/source/net/yacy/cora/federate/solr/connector/SolrConnector.java index 1ccd28e37..1efd9ecca 100644 --- a/source/net/yacy/cora/federate/solr/connector/SolrConnector.java +++ b/source/net/yacy/cora/federate/solr/connector/SolrConnector.java @@ -36,12 +36,17 @@ import org.apache.solr.common.SolrInputDocument; import org.apache.solr.common.params.ModifiableSolrParams; public interface SolrConnector extends Iterable /* Iterable of document IDs */ { - + /** * clear all caches: inside solr and ouside solr within the implementations of this interface */ public void clearCaches(); + /** + * get the size of a write buffer (if any) of pending write requests + */ + public int bufferSize(); + /** * get the size of the index * @return number of results if solr is queries with a catch-all pattern diff --git a/source/net/yacy/cora/federate/solr/instance/InstanceMirror.java b/source/net/yacy/cora/federate/solr/instance/InstanceMirror.java index c11375de8..7831dba79 100644 --- a/source/net/yacy/cora/federate/solr/instance/InstanceMirror.java +++ b/source/net/yacy/cora/federate/solr/instance/InstanceMirror.java @@ -159,11 +159,16 @@ public class InstanceMirror { this.connectorCache.put(corename, msc); return msc; } + + public int bufferSize() { + int b = 0; + for (SolrConnector csc: this.connectorCache.values()) b += csc.bufferSize(); + for (EmbeddedSolrConnector ssc: this.embeddedCache.values()) b += ssc.bufferSize(); + return b; + } public void clearCaches() { - for (SolrConnector csc: this.connectorCache.values()) { - csc.clearCaches(); - } + for (SolrConnector csc: this.connectorCache.values()) csc.clearCaches(); for (EmbeddedSolrConnector ssc: this.embeddedCache.values()) ssc.commit(true); } diff --git a/source/net/yacy/cora/federate/yacy/Distribution.java b/source/net/yacy/cora/federate/yacy/Distribution.java index 9382dd133..a35de9aca 100644 --- a/source/net/yacy/cora/federate/yacy/Distribution.java +++ b/source/net/yacy/cora/federate/yacy/Distribution.java @@ -151,7 +151,10 @@ public class Distribution { * @return a number from 0..verticalPartitions() */ public final int verticalDHTPosition(final byte[] urlHash) { - return (int) (Distribution.horizontalDHTPosition(urlHash) >> this.shiftLength); // take only the top- bits + int vdp = (int) (Distribution.horizontalDHTPosition(urlHash) >> this.shiftLength); // take only the top- bits + assert vdp >= 0; + assert vdp < this.partitionCount; + return vdp; } public static void main(String[] args) { diff --git a/source/net/yacy/peers/DHTSelection.java b/source/net/yacy/peers/DHTSelection.java index cfc05a1d7..33482e82a 100644 --- a/source/net/yacy/peers/DHTSelection.java +++ b/source/net/yacy/peers/DHTSelection.java @@ -25,7 +25,7 @@ package net.yacy.peers; import java.util.ArrayList; -import java.util.HashMap; +import java.util.Collection; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -40,11 +40,10 @@ import net.yacy.cora.document.encoding.ASCII; import net.yacy.cora.federate.yacy.Distribution; import net.yacy.cora.order.Base64Order; import net.yacy.cora.order.Digest; +import net.yacy.cora.sorting.OrderedScoreMap; import net.yacy.cora.storage.HandleSet; -import net.yacy.cora.util.ConcurrentLog; -import net.yacy.cora.util.SpaceExceededException; +import net.yacy.cora.util.LookAheadIterator; import net.yacy.kelondro.data.word.Word; -import net.yacy.kelondro.index.RowHandleSet; import net.yacy.kelondro.util.kelondroException; import net.yacy.peers.operation.yacyVersion; @@ -56,10 +55,10 @@ import net.yacy.peers.operation.yacyVersion; */ public class DHTSelection { - - public static List selectClusterPeers(final SeedDB seedDB, final SortedMap peerhashes) { + + public static Set selectClusterPeers(final SeedDB seedDB, final SortedMap peerhashes) { final Iterator> i = peerhashes.entrySet().iterator(); - final List l = new ArrayList(); + final Set l = new HashSet(); Map.Entry entry; Seed s; while (i.hasNext()) { @@ -72,149 +71,96 @@ public class DHTSelection { } return l; } - - public static List selectNodeSearchTargets(final SeedDB seedDB, int maxCount, Set omit) { - if (seedDB == null) { return null; } - - final List goodSeeds = new ArrayList(); - final List optionalSeeds = new ArrayList(); - Seed seed; - Iterator seedenum = seedDB.seedsConnected(true, true, Seed.randomHash(), 1.041f); - int c = seedDB.sizeConnected(); - while (seedenum.hasNext() && c-- > 0 && goodSeeds.size() < maxCount) { - seed = seedenum.next(); - if (seed == null || omit.contains(seed)) continue; - if (seed.getFlagRootNode()) { - goodSeeds.add(seed); - } else { - optionalSeeds.add(seed); - } - } - Random r = new Random(System.currentTimeMillis()); - while (goodSeeds.size() < maxCount && optionalSeeds.size() > 0) { - goodSeeds.add(optionalSeeds.remove(r.nextInt(optionalSeeds.size()))); - } - - return goodSeeds; - } - public static List selectSearchTargets( + public static Collection selectExtraTargets( final SeedDB seedDB, final HandleSet wordhashes, - int redundancy, - int burstRobinsonPercent, - int burstMultiwordPercent) { - // find out a specific number of seeds, that would be relevant for the given word hash(es) - // the result is ordered by relevance: [0] is most relevant - // the seedcount is the maximum number of wanted results - if (seedDB == null) { return null; } - - // put in seeds according to dht - final Map regularSeeds = new HashMap(); // dht position seeds - Seed seed; - Iterator dhtEnum; - Iterator iter = wordhashes.iterator(); - while (iter.hasNext()) { - selectDHTPositions(seedDB, iter.next(), redundancy, regularSeeds); - } - //int minimumseeds = Math.min(seedDB.scheme.verticalPartitions(), regularSeeds.size()); // that should be the minimum number of seeds that are returned - //int maximumseeds = seedDB.scheme.verticalPartitions() * redundancy; // this is the maximum number of seeds according to dht and heuristics. It can be more using burst mode. - - // put in some seeds according to size of peer. - // But not all, that would produce too much load on the largest peers - dhtEnum = seedDB.seedsSortedConnected(false, Seed.ICOUNT); - int c = Math.max(Math.min(5, seedDB.sizeConnected()), wordhashes.size() > 1 ? seedDB.sizeConnected() * burstMultiwordPercent / 100 : 0); - while (dhtEnum.hasNext() && c-- > 0) { - seed = dhtEnum.next(); - if (seed == null) continue; - if (seed.isLastSeenTimeout(3600000)) continue; - if (seed.getAge() < 1) { // the 'workshop feature' - ConcurrentLog.info("DHT", "selectPeers/Age: " + seed.hash + ":" + seed.getName() + ", is newbie, age = " + seed.getAge()); - regularSeeds.put(seed.hash, seed); - continue; - } - if (Math.random() * 100 + (wordhashes.size() > 1 ? burstMultiwordPercent : 25) >= 50) { - if (ConcurrentLog.isFine("DHT")) ConcurrentLog.fine("DHT", "selectPeers/CountBurst: " + seed.hash + ":" + seed.getName() + ", RWIcount=" + seed.getWordCount()); - regularSeeds.put(seed.hash, seed); - continue; + final int minage, + final Set omit, + final int maxcount) { + + Collection extraSeeds = new HashSet(); + + if (seedDB != null) { + final OrderedScoreMap seedSelection = new OrderedScoreMap(null); + Random r = new Random(); // we must use a random factor for the selection to prevent that all peers do the same and therefore overload the same peers + + // create sets that contains only robinson/node/large/young peers + Iterator dhtEnum = seedDB.seedsConnected(true, false, null, 0.50f); + Seed seed; + while (dhtEnum.hasNext()) { + seed = dhtEnum.next(); + if (seed == null) continue; + if (omit != null && omit.contains(seed)) continue; // sort out peers that are target for DHT + if (seed.isLastSeenTimeout(3600000)) continue; // do not ask peers that had not been seen more than one hour (happens during a startup situation) + if (!seed.getFlagAcceptRemoteIndex() && seed.matchPeerTags(wordhashes)) seedSelection.dec(seed, r.nextInt(10) + 10); // robinson peers with matching peer tags + if (seed.getFlagRootNode()) seedSelection.dec(seed, r.nextInt(15) + 15); // root nodes (fast peers) + if (seed.getAge() < minage) seedSelection.dec(seed, r.nextInt(25) + 25);// the 'workshop feature', fresh peers should be seen + if (seed.getLinkCount() > 1000000) { + int pf = 1 + (int) (20000000 / seed.getLinkCount()); + seedSelection.dec(seed, r.nextInt(pf) + pf); // large peers + } } - } - - // create a set that contains only robinson peers because these get a special handling - dhtEnum = seedDB.seedsConnected(true, false, null, 0.50f); - Set robinson = new HashSet(); - while (dhtEnum.hasNext()) { - seed = dhtEnum.next(); - if (seed == null) continue; - if (seed.getFlagAcceptRemoteIndex()) continue; - if (seed.isLastSeenTimeout(3600000)) continue; - robinson.add(seed); - } - - // add robinson peers according to robinson burst rate - dhtEnum = robinson.iterator(); - c = robinson.size() * burstRobinsonPercent / 100; - while (dhtEnum.hasNext() && c-- > 0) { - seed = dhtEnum.next(); - if (seed == null) continue; - if (seed.isLastSeenTimeout(3600000)) continue; - if (Math.random() * 100 + burstRobinsonPercent >= 100) { - if (ConcurrentLog.isFine("DHT")) ConcurrentLog.fine("DHT", "selectPeers/RobinsonBurst: " + seed.hash + ":" + seed.getName()); - regularSeeds.put(seed.hash, seed); - continue; + + // select the maxount + Iterator i = seedSelection.iterator(); + int count = 0; + while (i.hasNext() && count++ < maxcount) { + seed = i.next(); + if (RemoteSearch.log.isInfo()) { + RemoteSearch.log.info("selectPeers/extra: " + seed.hash + ":" + seed.getName() + ", " + seed.getLinkCount() + " URLs" + + (!seed.getFlagAcceptRemoteIndex() && seed.matchPeerTags(wordhashes) ? " ROBINSON" : "") + + (seed.getFlagRootNode() ? " NODE" : "") + + (seed.getAge() < 1 ? " FRESH" : "") + ); + } + extraSeeds.add(seed); } } + + return extraSeeds; + } + + public static Set selectDHTSearchTargets(final SeedDB seedDB, final HandleSet wordhashes, final int minage, final int redundancy) { - // put in seeds that are public robinson peers and where the peer tags match with query - // or seeds that are newbies to ensure that private demonstrations always work - dhtEnum = robinson.iterator(); - while (dhtEnum.hasNext()) { - seed = dhtEnum.next(); - if (seed == null) continue; - if (seed.isLastSeenTimeout(3600000)) continue; - if (seed.matchPeerTags(wordhashes)) { - // peer tags match - String specialized = seed.getPeerTags().toString(); - if (specialized.equals("[*]")) { - ConcurrentLog.info("DHT", "selectPeers/RobinsonTag: " + seed.hash + ":" + seed.getName() + " grants search for all"); - } else { - ConcurrentLog.info("DHT", "selectPeers/RobinsonTag " + seed.hash + ":" + seed.getName() + " is specialized peer for " + specialized); - } - regularSeeds.put(seed.hash, seed); + // put in seeds according to dht + Set seeds = new HashSet(); // dht position seeds + if (seedDB != null) { + Iterator iter = wordhashes.iterator(); + while (iter.hasNext()) { + seeds.addAll(selectDHTPositions(seedDB, iter.next(), minage, redundancy)); } + //int minimumseeds = Math.min(seedDB.scheme.verticalPartitions(), regularSeeds.size()); // that should be the minimum number of seeds that are returned + //int maximumseeds = seedDB.scheme.verticalPartitions() * redundancy; // this is the maximum number of seeds according to dht and heuristics. It can be more using burst mode. } - - // produce return set - List result = new ArrayList(regularSeeds.size()); - result.addAll(regularSeeds.values()); - return result; + + return seeds; } - private static void selectDHTPositions( - final SeedDB seedDB, - byte[] wordhash, - int redundancy, - Map regularSeeds) { + private static List selectDHTPositions(final SeedDB seedDB, final byte[] wordhash, final int minage, final int redundancy) { // this method is called from the search target computation + ArrayList seeds = new ArrayList(redundancy); Seed seed; for (int verticalPosition = 0; verticalPosition < seedDB.scheme.verticalPartitions(); verticalPosition++) { - long dhtVerticalTarget = seedDB.scheme.verticalDHTPosition(wordhash, verticalPosition); - wordhash = Distribution.positionToHash(dhtVerticalTarget); - Iterator dhtEnum = getAcceptRemoteIndexSeeds(seedDB, wordhash, redundancy, false); + final long dhtVerticalTarget = seedDB.scheme.verticalDHTPosition(wordhash, verticalPosition); + final byte[] verticalhash = Distribution.positionToHash(dhtVerticalTarget); + final Iterator dhtEnum = getAcceptRemoteIndexSeeds(seedDB, verticalhash, redundancy, false); int c = Math.min(seedDB.sizeConnected(), redundancy); - int cc = 2; // select a maximum of 3, this is enough redundancy + int cc = 20; // in case that the network grows rapidly, we may jump to several additional peers but that must have a limit while (dhtEnum.hasNext() && c > 0 && cc-- > 0) { seed = dhtEnum.next(); if (seed == null || seed.hash == null) continue; if (!seed.getFlagAcceptRemoteIndex()) continue; // probably a robinson peer - if (ConcurrentLog.isFine("DHT")) ConcurrentLog.fine("DHT", "selectPeers/DHTorder: " + seed.hash + ":" + seed.getName() + "/ score " + c); - regularSeeds.put(seed.hash, seed); + if (seed.getAge() < minage) continue; // prevent bad results because of too strong network growth + if (RemoteSearch.log.isInfo()) RemoteSearch.log.info("selectPeers/DHTorder: " + seed.hash + ":" + seed.getName() + "/ score " + c); + seeds.add(seed); c--; } } + return seeds; } - public static byte[] selectTransferStart() { + public static byte[] selectRandomTransferStart() { return ASCII.getBytes(Base64Order.enhancedCoder.encode(Digest.encodeMD5Raw(Long.toString(System.currentTimeMillis()))).substring(2, 2 + Word.commonHashLength)); } @@ -248,105 +194,59 @@ public class DHTSelection { return new acceptRemoteIndexSeedEnum(seedDB, starthash, Math.min(max, seedDB.sizeConnected()), alsoMyOwn); } - private static class acceptRemoteIndexSeedEnum implements Iterator { + private static class acceptRemoteIndexSeedEnum extends LookAheadIterator implements Iterator, Iterable { private final Iterator se; - private Seed nextSeed; private final SeedDB seedDB; - private final HandleSet doublecheck; private int remaining; private final boolean alsoMyOwn; private acceptRemoteIndexSeedEnum(SeedDB seedDB, final byte[] starthash, int max, boolean alsoMyOwn) { this.seedDB = seedDB; - this.se = getDHTSeeds(seedDB, starthash, yacyVersion.YACY_HANDLES_COLLECTION_INDEX, alsoMyOwn); + this.se = new seedDHTEnum(seedDB, starthash, alsoMyOwn); this.remaining = max; - this.doublecheck = new RowHandleSet(12, Base64Order.enhancedCoder, 0); - this.nextSeed = nextInternal(); this.alsoMyOwn = alsoMyOwn; } @Override - public boolean hasNext() { - return this.nextSeed != null; - } - - private Seed nextInternal() { + protected Seed next0() { if (this.remaining <= 0) return null; - Seed s; + Seed s = null; try { while (this.se.hasNext()) { s = this.se.next(); if (s == null) return null; - byte[] hashb = ASCII.getBytes(s.hash); - if (this.doublecheck.has(hashb)) return null; - try { - this.doublecheck.put(hashb); - } catch (final SpaceExceededException e) { - ConcurrentLog.logException(e); - break; - } if (s.getFlagAcceptRemoteIndex() || (this.alsoMyOwn && s.hash.equals(this.seedDB.mySeed().hash)) // Accept own peer regardless of FlagAcceptRemoteIndex ) { this.remaining--; - return s; + break; } } + return s; } catch (final kelondroException e) { System.out.println("DEBUG acceptRemoteIndexSeedEnum:" + e.getMessage()); Network.log.severe("database inconsistency (" + e.getMessage() + "), re-set of db."); this.seedDB.resetActiveTable(); return null; } - return null; } - @Override - public Seed next() { - final Seed next = this.nextSeed; - this.nextSeed = nextInternal(); - return next; - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - - } - - /** - * enumerate seeds for DHT target positions - * @param seedDB - * @param firstHash - * @param minVersion - * @return - */ - protected static Iterator getDHTSeeds(final SeedDB seedDB, final byte[] firstHash, final double minVersion) { - // enumerates seed-type objects: all seeds with starting point in the middle, rotating at the end/beginning - return new seedDHTEnum(seedDB, firstHash, minVersion, false); - } - - protected static Iterator getDHTSeeds(final SeedDB seedDB, final byte[] firstHash, final double minVersion, final boolean alsoMyOwn) { - // enumerates seed-type objects: all seeds with starting point in the middle, rotating at the end/beginning - return new seedDHTEnum(seedDB, firstHash, minVersion, alsoMyOwn); } + private static class seedDHTEnum implements Iterator { private Iterator e; private int steps; - private final double minVersion; private final SeedDB seedDB; private boolean alsoMyOwn; private int pass, insertOwnInPass; private Seed nextSeed; - private seedDHTEnum(final SeedDB seedDB, final byte[] firstHash, final double minVersion, final boolean alsoMyOwn) { + private seedDHTEnum(final SeedDB seedDB, final byte[] firstHash, final boolean alsoMyOwn) { this.seedDB = seedDB; this.steps = seedDB.sizeConnected() + ((alsoMyOwn) ? 1 : 0); - this.minVersion = minVersion; - this.e = seedDB.seedsConnected(true, false, firstHash, minVersion); + this.e = seedDB.seedsConnected(true, false, firstHash, yacyVersion.YACY_HANDLES_COLLECTION_INDEX); this.pass = 1; this.alsoMyOwn = alsoMyOwn; if (alsoMyOwn) { @@ -367,7 +267,8 @@ public class DHTSelection { this.steps--; if (!this.e.hasNext() && this.pass == 1) { - this.e = this.seedDB.seedsConnected(true, false, null, this.minVersion); + // rotate from the beginning; this closes the ordering of the DHT at the ends + this.e = this.seedDB.seedsConnected(true, false, null, yacyVersion.YACY_HANDLES_COLLECTION_INDEX); this.pass = 2; } if (this.e.hasNext()) { @@ -408,24 +309,17 @@ public class DHTSelection { return new providesRemoteCrawlURLsEnum(seedDB); } - private static class providesRemoteCrawlURLsEnum implements Iterator { + private static class providesRemoteCrawlURLsEnum extends LookAheadIterator implements Iterator, Iterable { private final Iterator se; - private Seed nextSeed; private final SeedDB seedDB; private providesRemoteCrawlURLsEnum(final SeedDB seedDB) { this.seedDB = seedDB; - this.se = getDHTSeeds(seedDB, null, yacyVersion.YACY_POVIDES_REMOTECRAWL_LISTS); - this.nextSeed = nextInternal(); + this.se = seedDB.seedsConnected(true, false, null, yacyVersion.YACY_POVIDES_REMOTECRAWL_LISTS); } - @Override - public boolean hasNext() { - return this.nextSeed != null; - } - - private Seed nextInternal() { + protected Seed next0() { Seed s; try { while (this.se.hasNext()) { @@ -442,18 +336,6 @@ public class DHTSelection { return null; } - @Override - public Seed next() { - final Seed next = this.nextSeed; - this.nextSeed = nextInternal(); - return next; - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - } /** @@ -485,6 +367,4 @@ public class DHTSelection { } } - - } \ No newline at end of file diff --git a/source/net/yacy/peers/Dispatcher.java b/source/net/yacy/peers/Dispatcher.java index 427549e88..f30b58cc5 100644 --- a/source/net/yacy/peers/Dispatcher.java +++ b/source/net/yacy/peers/Dispatcher.java @@ -36,7 +36,6 @@ import net.yacy.cora.document.encoding.ASCII; import net.yacy.cora.federate.yacy.Distribution; import net.yacy.cora.order.Base64Order; import net.yacy.cora.storage.HandleSet; -import net.yacy.cora.util.ByteArray; import net.yacy.cora.util.ConcurrentLog; import net.yacy.cora.util.SpaceExceededException; import net.yacy.kelondro.data.meta.URIMetadataRow; @@ -83,7 +82,7 @@ public class Dispatcher { // a cloud is a cache for the objects that wait to be transmitted // the String-key is the primary target as contained in the Entry - private Map transmissionCloud; + private Map transmissionCloud; // the segment backend is used to store the remaining indexContainers in case that the object is closed private final Segment segment; @@ -106,7 +105,7 @@ public class Dispatcher { final boolean gzipBody, final int timeout ) { - this.transmissionCloud = new ConcurrentHashMap(); + this.transmissionCloud = new ConcurrentHashMap(); this.segment = segment; this.seeds = seeds; this.log = new ConcurrentLog("INDEX-TRANSFER-DISPATCHER"); @@ -238,21 +237,9 @@ public class Dispatcher { for (int i = 0; i < partitions.length; i++) partitions[i] = new ArrayList>(); // check all entries and split them to the partitions - final ReferenceContainer[] partitionBuffer = new ReferenceContainer[partitionCount]; - WordReference re; for (final ReferenceContainer container: containers) { // init the new partitions - for (int j = 0; j < partitionBuffer.length; j++) { - partitionBuffer[j] = new ReferenceContainer(Segment.wordReferenceFactory, container.getTermHash(), container.size() / partitionCount); - } - - // split the container - final Iterator i = container.entries(); - while (i.hasNext()) { - re = i.next(); - if (re == null) continue; - partitionBuffer[this.seeds.scheme.verticalDHTPosition(re.urlhash())].add(re); - } + final ReferenceContainer[] partitionBuffer = splitContainer(container); // add the containers to the result vector for (int j = 0; j < partitionBuffer.length; j++) { @@ -261,6 +248,31 @@ public class Dispatcher { } return partitions; } + + private ReferenceContainer[] splitContainer(final ReferenceContainer container) throws SpaceExceededException { + + // init the result vector + final int partitionCount = this.seeds.scheme.verticalPartitions(); + + // check all entries and split them to the partitions + @SuppressWarnings("unchecked") + final ReferenceContainer[] partitionBuffer = new ReferenceContainer[partitionCount]; + + // init the new partitions + for (int j = 0; j < partitionBuffer.length; j++) { + partitionBuffer[j] = new ReferenceContainer(Segment.wordReferenceFactory, container.getTermHash(), container.size() / partitionCount); + } + + // split the container + final Iterator i = container.entries(); + while (i.hasNext()) { + WordReference wordReference = i.next(); + if (wordReference == null) continue; + partitionBuffer[this.seeds.scheme.verticalDHTPosition(wordReference.urlhash())].add(wordReference); + } + + return partitionBuffer; + } /** * PROCESS(3) and PROCESS(4) @@ -275,39 +287,28 @@ public class Dispatcher { private void enqueueContainersToCloud(final List>[] containers) { assert (containers.length == this.seeds.scheme.verticalPartitions()); if (this.transmissionCloud == null) return; - ReferenceContainer lastContainer; byte[] primaryTarget; - ByteArray pTArray; + String primaryTargetString; Transmission.Chunk entry; for (int vertical = 0; vertical < containers.length; vertical++) { - // the 'new' primary target is the word hash of the last container in the array - lastContainer = containers[vertical].get(containers[vertical].size() - 1); + List> verticalList = containers[vertical]; + ReferenceContainer lastContainer = verticalList.get(verticalList.size() - 1); primaryTarget = Distribution.positionToHash(this.seeds.scheme.verticalDHTPosition(lastContainer.getTermHash(), vertical)); assert primaryTarget[2] != '@'; - pTArray = new ByteArray(primaryTarget); + primaryTargetString = ASCII.String(primaryTarget); // get or make a entry object - entry = this.transmissionCloud.get(pTArray); // if this is not null, the entry is extended here + entry = this.transmissionCloud.get(primaryTargetString); // if this is not null, the entry is extended here final List targets = DHTSelection.getAcceptRemoteIndexSeedsList( this.seeds, primaryTarget, this.seeds.redundancy() * 3, true); - this.log.info("enqueueContainers: selected " + targets.size() + " targets for primary target key " + ASCII.String(primaryTarget) + "/" + vertical + " with " + containers[vertical].size() + " index containers."); - if (entry == null) entry = this.transmission.newChunk(primaryTarget, targets); - - /*/ lookup targets - int sc = 1; - for (yacySeed seed : targets) { - if(seed == null) continue; - if(seed == seeds.mySeed()) this.log.logInfo("enqueueContainers: myself-target at position " + sc); - this.log.logInfo("enqueueContainers: primaryTarget distance at position " + sc + ": " + FlatWordPartitionScheme.std.dhtDistance(primaryTarget, null, seed)); - this.log.logInfo("enqueueContainers: distance to first container at position " + sc + ": " + FlatWordPartitionScheme.std.dhtDistance(FlatWordPartitionScheme.positionToHash(this.seeds.scheme.dhtPosition(containers[vertical].get(0).getTermHash(), vertical)), null, seed)); - sc++; - }*/ + this.log.info("enqueueContainers: selected " + targets.size() + " targets for primary target key " + primaryTargetString + "/" + vertical + " with " + verticalList.size() + " index containers."); + if (entry == null) entry = this.transmission.newChunk(primaryTargetString, targets); // fill the entry with the containers - for (final ReferenceContainer c: containers[vertical]) { + for (final ReferenceContainer c: verticalList) { try { entry.add(c); } catch (final SpaceExceededException e) { @@ -317,7 +318,7 @@ public class Dispatcher { } // put the entry into the cloud - if (this.transmissionCloud != null && entry.containersSize() > 0) this.transmissionCloud.put(pTArray, entry); + if (this.transmissionCloud != null && entry.containersSize() > 0) this.transmissionCloud.put(primaryTargetString, entry); } } @@ -343,7 +344,7 @@ public class Dispatcher { return false; } - List>[] splitContainerCache; + List>[] splitContainerCache; // for each vertical partition a set of word references within a reference container try { splitContainerCache = splitContainers(selectedContainerCache); } catch (final SpaceExceededException e) { @@ -375,9 +376,9 @@ public class Dispatcher { public boolean dequeueContainer() { if (this.transmissionCloud == null) return false; if (this.indexingTransmissionProcessor.getQueueSize() > this.indexingTransmissionProcessor.getMaxConcurrency()) return false; - ByteArray maxtarget = null; + String maxtarget = null; int maxsize = -1; - for (final Map.Entry chunk: this.transmissionCloud.entrySet()) { + for (final Map.Entry chunk: this.transmissionCloud.entrySet()) { if (chunk.getValue().containersSize() > maxsize) { maxsize = chunk.getValue().containersSize(); maxtarget = chunk.getKey(); @@ -403,17 +404,17 @@ public class Dispatcher { if (success && chunk.isFinished()) { // finished with this queue! - this.log.info("STORE: Chunk " + ASCII.String(chunk.primaryTarget()) + " has FINISHED all transmissions!"); + this.log.info("STORE: Chunk " + chunk.primaryTarget() + " has FINISHED all transmissions!"); return chunk; } - if (!success) this.log.info("STORE: Chunk " + ASCII.String(chunk.primaryTarget()) + " has failed to transmit index; marked peer as busy"); + if (!success) this.log.info("STORE: Chunk " + chunk.primaryTarget() + " has failed to transmit index; marked peer as busy"); if (chunk.canFinish()) { if (this.indexingTransmissionProcessor != null) this.indexingTransmissionProcessor.enQueue(chunk); return chunk; } - this.log.info("STORE: Chunk " + ASCII.String(chunk.primaryTarget()) + " has not enough targets left. This transmission has failed, putting back index to backend"); + this.log.info("STORE: Chunk " + chunk.primaryTarget() + " has not enough targets left. This transmission has failed, putting back index to backend"); chunk.restore(); return null; } @@ -422,7 +423,7 @@ public class Dispatcher { // removes all entries from the dispatcher and puts them back to a RAMRI if (this.indexingTransmissionProcessor != null) this.indexingTransmissionProcessor.shutdown(); if (this.transmissionCloud != null) { - outerLoop: for (final Map.Entry e : this.transmissionCloud.entrySet()) { + outerLoop: for (final Map.Entry e : this.transmissionCloud.entrySet()) { for (final ReferenceContainer i : e.getValue()) try { this.segment.storeRWI(i); } catch (final Exception e1) { diff --git a/source/net/yacy/peers/RemoteSearch.java b/source/net/yacy/peers/RemoteSearch.java index a41b43639..08e24cea0 100644 --- a/source/net/yacy/peers/RemoteSearch.java +++ b/source/net/yacy/peers/RemoteSearch.java @@ -24,9 +24,8 @@ package net.yacy.peers; -import java.util.HashSet; +import java.util.Collection; import java.util.Iterator; -import java.util.List; import java.util.Set; import java.util.SortedMap; @@ -35,6 +34,7 @@ import org.apache.solr.client.solrj.SolrQuery; import net.yacy.cora.document.encoding.ASCII; import net.yacy.cora.storage.HandleSet; import net.yacy.cora.util.ConcurrentLog; +import net.yacy.kelondro.util.MemoryControl; import net.yacy.repository.Blacklist; import net.yacy.search.Switchboard; import net.yacy.search.SwitchboardConstants; @@ -46,7 +46,8 @@ import net.yacy.search.query.SecondarySearchSuperviser; public class RemoteSearch extends Thread { private static final ThreadGroup ysThreadGroup = new ThreadGroup("yacySearchThreadGroup"); - + public static final ConcurrentLog log = new ConcurrentLog("DHT"); + final private SearchEvent event; final private String wordhashes, excludehashes, contentdom; final private int partitions; @@ -134,53 +135,58 @@ public class RemoteSearch extends Thread { final int start, final int count, final long time, final Blacklist blacklist, - final SortedMap clusterselection, - final int burstRobinsonPercent, - final int burstMultiwordPercent) { + final SortedMap clusterselection) { // check own peer status //if (wordIndex.seedDB.mySeed() == null || wordIndex.seedDB.mySeed().getPublicAddress() == null) { return null; } + + // check the peer memory and lifesign-situation to get a scaling for the number of remote search processes + final boolean shortmem = MemoryControl.shortStatus(); + final int indexingQueueSize = event.query.getSegment().fulltext().bufferSize(); + int redundancy = event.peers.redundancy(); + if (indexingQueueSize > 10) redundancy = Math.max(1, redundancy - 1); + if (indexingQueueSize > 50) redundancy = Math.max(1, redundancy - 1); + int minage = 3; + int robinsoncount = event.peers.scheme.verticalPartitions() * redundancy / 2; + if (indexingQueueSize > 10) robinsoncount = Math.max(1, robinsoncount / 2); + if (indexingQueueSize > 50) robinsoncount = Math.max(1, robinsoncount / 2); + if (shortmem) {redundancy = 1; robinsoncount = 1;} + + // prepare seed targets and threads - final List dhtPeers = + final Set dhtPeers = (clusterselection == null) ? - DHTSelection.selectSearchTargets( + DHTSelection.selectDHTSearchTargets( event.peers, event.query.getQueryGoal().getIncludeHashes(), - event.peers.redundancy(), - burstRobinsonPercent, - burstMultiwordPercent) + minage, + redundancy) : DHTSelection.selectClusterPeers(event.peers, clusterselection); if (dhtPeers == null) return; - - // find nodes - Set omit = new HashSet(); - for (Seed s: dhtPeers) omit.add(s); - List nodePeers = DHTSelection.selectNodeSearchTargets(event.peers, 20, omit); - // remove all robinson peers from the dhtPeers and put them to the nodes - Iterator si = dhtPeers.iterator(); - while (si.hasNext()) { - Seed s = si.next(); - if (!s.getFlagAcceptRemoteIndex()) { - si.remove(); - nodePeers.add(s); + // select node targets + final Collection robinsonPeers = DHTSelection.selectExtraTargets(event.peers, event.query.getQueryGoal().getIncludeHashes(), minage, dhtPeers, robinsoncount); + + if (event.peers != null) { + if (Switchboard.getSwitchboard().getConfigBool(SwitchboardConstants.DEBUG_SEARCH_REMOTE_DHT_TESTLOCAL, false)) { + dhtPeers.clear(); + dhtPeers.add(event.peers.mySeed()); + } + + if (Switchboard.getSwitchboard().getConfigBool(SwitchboardConstants.DEBUG_SEARCH_REMOTE_SOLR_TESTLOCAL, false)) { + robinsonPeers.clear(); + robinsonPeers.add(event.peers.mySeed()); } } - if (Switchboard.getSwitchboard().getConfigBool(SwitchboardConstants.DEBUG_SEARCH_REMOTE_DHT_TESTLOCAL, false)) { - dhtPeers.clear(); - dhtPeers.add(event.peers.mySeed()); - } + log.info("preparing remote search: shortmem=" + (shortmem ? "true" : "false") + ", indexingQueueSize=" + indexingQueueSize + + ", redundancy=" + redundancy + ", minage=" + minage + ", robinsoncount=" + robinsoncount + ", dhtPeers=" + dhtPeers.size() + ", robinsonpeers=" + robinsonPeers.size()); + - if (Switchboard.getSwitchboard().getConfigBool(SwitchboardConstants.DEBUG_SEARCH_REMOTE_SOLR_TESTLOCAL, false)) { - nodePeers.clear(); - nodePeers.add(event.peers.mySeed()); - } - // start solr searches - final int targets = dhtPeers.size() + nodePeers.size(); + final int targets = dhtPeers.size() + robinsonPeers.size(); if (!Switchboard.getSwitchboard().getConfigBool(SwitchboardConstants.DEBUG_SEARCH_REMOTE_SOLR_OFF, false)) { final SolrQuery solrQuery = event.query.solrQuery(event.getQuery().contentdom, start == 0, event.excludeintext_image); - for (Seed s: nodePeers) { + for (Seed s: robinsonPeers) { Thread t = solrRemoteSearch(event, solrQuery, start, count, s, targets, blacklist); event.nodeSearchThreads.add(t); } @@ -188,8 +194,8 @@ public class RemoteSearch extends Thread { // start search to YaCy DHT peers if (!Switchboard.getSwitchboard().getConfigBool(SwitchboardConstants.DEBUG_SEARCH_REMOTE_DHT_OFF, false)) { - for (int i = 0; i < dhtPeers.size(); i++) { - if (dhtPeers.get(i) == null || dhtPeers.get(i).hash == null) continue; + for (Seed dhtPeer: dhtPeers) { + if (dhtPeer == null || dhtPeer.hash == null) continue; try { RemoteSearch rs = new RemoteSearch( event, @@ -201,7 +207,7 @@ public class RemoteSearch extends Thread { time, event.query.maxDistance, targets, - dhtPeers.get(i), + dhtPeer, event.secondarySearchSuperviser, blacklist); rs.start(); diff --git a/source/net/yacy/peers/SeedDB.java b/source/net/yacy/peers/SeedDB.java index d769eece4..47afa59a9 100644 --- a/source/net/yacy/peers/SeedDB.java +++ b/source/net/yacy/peers/SeedDB.java @@ -220,7 +220,7 @@ public final class SeedDB implements AlternativeDomainNames { } public int redundancy() { - if (this.mySeed.isJunior()) return 1; + if (this.mySeed.isVirgin()) return 1; return this.netRedundancy; } diff --git a/source/net/yacy/peers/Transmission.java b/source/net/yacy/peers/Transmission.java index 44fd4a807..0b7055871 100644 --- a/source/net/yacy/peers/Transmission.java +++ b/source/net/yacy/peers/Transmission.java @@ -69,7 +69,7 @@ public class Transmission { this.timeout4Transfer = timeout4Transfer; } - public Chunk newChunk(final byte[] primaryTarget, final List targets) { + public Chunk newChunk(final String primaryTarget, final List targets) { return new Chunk(primaryTarget, targets); } @@ -84,7 +84,7 @@ public class Transmission { * - a set of yacy seeds which will shrink as the containers are transmitted to them * - a counter that gives the number of sucessful and unsuccessful transmissions so far */ - private final byte[] primaryTarget; + private final String primaryTarget; private final ReferenceContainerCache containers; private final HandleSet references; private final HandleSet badReferences; @@ -98,7 +98,7 @@ public class Transmission { * @param primaryTarget * @param targets */ - public Chunk(final byte[] primaryTarget, final List targets) { + public Chunk(final String primaryTarget, final List targets) { super(); this.primaryTarget = primaryTarget; this.containers = new ReferenceContainerCache(Segment.wordReferenceFactory, Segment.wordOrder, Word.commonHashLength); @@ -204,7 +204,7 @@ public class Transmission { return this.containers.size(); } - public byte[] primaryTarget() { + public String primaryTarget() { return this.primaryTarget; } @@ -245,7 +245,7 @@ public class Transmission { Transmission.this.log.info("Transfer of chunk to myself-target"); return true; } - Transmission.this.log.info("starting new index transmission request to " + ASCII.String(this.primaryTarget)); + Transmission.this.log.info("starting new index transmission request to " + this.primaryTarget); final long start = System.currentTimeMillis(); final String error = Protocol.transferIndex(target, this.containers, this.references, Transmission.this.segment, Transmission.this.gzipBody4Transfer, Transmission.this.timeout4Transfer); if (error == null) { @@ -254,7 +254,7 @@ public class Transmission { final Iterator> i = this.containers.iterator(); final ReferenceContainer firstContainer = (i == null) ? null : i.next(); Transmission.this.log.info("Index transfer of " + this.containers.size() + - " words [" + ((firstContainer == null) ? null : ASCII.String(firstContainer.getTermHash())) + " .. " + ASCII.String(this.primaryTarget) + "]" + + " words [" + ((firstContainer == null) ? null : ASCII.String(firstContainer.getTermHash())) + " .. " + this.primaryTarget + "]" + " and " + this.references.size() + " URLs" + " to peer " + target.getName() + ":" + target.hash + " in " + (transferTime / 1000) + diff --git a/source/net/yacy/search/Switchboard.java b/source/net/yacy/search/Switchboard.java index 8e15ab0a8..b45b3190b 100644 --- a/source/net/yacy/search/Switchboard.java +++ b/source/net/yacy/search/Switchboard.java @@ -2034,6 +2034,7 @@ public final class Switchboard extends serverSwitch { public boolean cleanupJob() { + ConcurrentLog.ensureWorkerIsRunning(); try { // flush caches in used libraries pdfParser.clean_up_idiotic_PDFParser_font_cache_which_eats_up_tons_of_megabytes(); // eats up megabytes, see http://markmail.org/thread/quk5odee4hbsauhu @@ -3416,7 +3417,7 @@ public final class Switchboard extends serverSwitch { byte[] startHash = null, limitHash = null; int tries = 10; while ( tries-- > 0 ) { - startHash = DHTSelection.selectTransferStart(); + startHash = DHTSelection.selectRandomTransferStart(); assert startHash != null; limitHash = DHTSelection.limitOver(this.peers, startHash); if ( limitHash != null ) { diff --git a/source/net/yacy/search/SwitchboardConstants.java b/source/net/yacy/search/SwitchboardConstants.java index bb4fe9ded..a0853842d 100644 --- a/source/net/yacy/search/SwitchboardConstants.java +++ b/source/net/yacy/search/SwitchboardConstants.java @@ -303,8 +303,6 @@ public final class SwitchboardConstants { public static final String DHT_ENABLED = "network.unit.dht"; - public static final String DHT_BURST_ROBINSON = "network.unit.dht.burst.robinson"; - public static final String DHT_BURST_MULTIWORD = "network.unit.dht.burst.multiword"; public static final String REMOTESEARCH_MAXCOUNT_DEFAULT = "network.unit.remotesearch.maxcount"; public static final String REMOTESEARCH_MAXTIME_DEFAULT = "network.unit.remotesearch.maxtime"; diff --git a/source/net/yacy/search/index/Fulltext.java b/source/net/yacy/search/index/Fulltext.java index 0eb84686a..77afc1d87 100644 --- a/source/net/yacy/search/index/Fulltext.java +++ b/source/net/yacy/search/index/Fulltext.java @@ -227,7 +227,11 @@ public final class Fulltext { return this.solrInstances.getMirrorConnector(WebgraphSchema.CORE_NAME); } } - + + public int bufferSize() { + return this.solrInstances.bufferSize(); + } + public void clearCaches() { if (this.urlIndexFile != null && this.urlIndexFile instanceof Cache) ((Cache) this.urlIndexFile).clearCache(); if (this.statsDump != null) this.statsDump.clear(); diff --git a/source/net/yacy/search/query/SearchEvent.java b/source/net/yacy/search/query/SearchEvent.java index f0f538cfd..db5c69e52 100644 --- a/source/net/yacy/search/query/SearchEvent.java +++ b/source/net/yacy/search/query/SearchEvent.java @@ -194,8 +194,6 @@ public final class SearchEvent { final LoaderDispatcher loader, final int remote_maxcount, final long remote_maxtime, - final int burstRobinsonPercent, - final int burstMultiwordPercent, final boolean deleteIfSnippetFail) { long ab = MemoryControl.available(); @@ -321,9 +319,7 @@ public final class SearchEvent { 0, remote_maxcount, remote_maxtime, Switchboard.urlBlacklist, - (SearchEvent.this.query.domType == QueryParams.Searchdom.GLOBAL) ? null : preselectedPeerHashes, - burstRobinsonPercent, - burstMultiwordPercent); + (SearchEvent.this.query.domType == QueryParams.Searchdom.GLOBAL) ? null : preselectedPeerHashes); } }.start(); } diff --git a/source/net/yacy/search/query/SearchEventCache.java b/source/net/yacy/search/query/SearchEventCache.java index 83d61cb3a..b55ec8682 100644 --- a/source/net/yacy/search/query/SearchEventCache.java +++ b/source/net/yacy/search/query/SearchEventCache.java @@ -139,9 +139,7 @@ public class SearchEventCache { final boolean generateAbstracts, final LoaderDispatcher loader, final int remote_maxcount, - final long remote_maxtime, - final int burstRobinsonPercent, - final int burstMultiwordPercent) { + final long remote_maxtime) { if (MemoryControl.shortStatus()) cleanupEvents(true); final String id = query.id(false); @@ -173,7 +171,7 @@ public class SearchEventCache { // start a new event Switchboard sb = Switchboard.getSwitchboard(); final boolean delete = sb == null || Switchboard.getSwitchboard().getConfigBool(SwitchboardConstants.SEARCH_VERIFY_DELETE, true); - event = new SearchEvent(query, peers, workTables, preselectedPeerHashes, generateAbstracts, loader, remote_maxcount, remote_maxtime, burstRobinsonPercent, burstMultiwordPercent, delete); + event = new SearchEvent(query, peers, workTables, preselectedPeerHashes, generateAbstracts, loader, remote_maxcount, remote_maxtime, delete); MemoryControl.request(100 * 1024 * 1024, false); // this may trigger a short memory status which causes a reducing of cache space of other threads }