robinson cluster: added client-side protocol implementation

- the network configuration page shows a new option: robinson clusters
- when a global search is made, all robinson peers are excluded, but:
- robinson peers/clusters that provide peer tags and where search words match
  such tags, they are included in global search. Therefore, robinson peers/clusters
  support the global yacy network with their indexes, without doin DHT-exchange


git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@3598 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
orbiter 18 years ago
parent 794eefe5b1
commit f8de19fb2f

@ -3,7 +3,7 @@ javacSource=1.4
javacTarget=1.4
# Release Configuration
releaseVersion=0.514
releaseVersion=0.515
releaseFile=yacy_dev_v${releaseVersion}_${DSTAMP}_${releaseNr}.tar.gz
#releaseFile=yacy_v${releaseVersion}_${DSTAMP}_${releaseNr}.tar.gz
releaseDir=yacy_dev_v${releaseVersion}_${DSTAMP}_${releaseNr}

@ -71,14 +71,15 @@
List of ip:port - addresses of the cluster: (comma-separated)<br>
<input type="text" name="cluster.peers.ipport" value="#[cluster.peers.ipport]#" size="80" maxlength="800" />
</dd>
-->
<dt>Public Cluster<input type="radio" value="publiccluster" name="cluster.mode" #(publicclusterChecked)#::checked="checked" #(/publicclusterChecked)#/></dt>
<dd>Your peer is part of a public cluster within the YaCy network.<br>
Index data is not distributed, but remote crawl requests are distributed and accepted<br>
Search requests are spread over all peers of the cluster, and answered from all peers of the cluster.<br>
List of .yacy or .yacyh - domains of the cluster: (comma-separated)<br>
<input type="text" name="cluster.peers.yacydomain" value="#[cluster.peers.yacydomain]#" size="80" maxlength="800" />
<input type="text" name="cluster.peers.yacydomain" value="#[cluster.peers.yacydomain]#" size="80" maxlength="800" /><br>
#[cluster.peers.yacydomain.hashes]#
</dd>
-->
<dt>Public Peer<input type="radio" value="publicpeer" name="cluster.mode" #(publicpeerChecked)#::checked="checked" #(/publicpeerChecked)#/></dt>
<dd>You are visible to other peers and contact them to distribute your presence.<br>
Your peer does not accept any outside index data, but responds on all remote search requests.

@ -120,6 +120,9 @@ public class ConfigNetwork_p {
sb.setConfig("cluster.peers.ipport", checkIPPortList(post.get("cluster.peers.ipport", "")));
sb.setConfig("cluster.peers.yacydomain", checkYaCyDomainList(post.get("cluster.peers.yacydomain", "")));
// update the cluster hash set
sb.clusterhashes = yacyCore.seedDB.clusterHashes(sb.getConfig("cluster.peers.yacydomain", ""));
}
// write answer code
@ -150,7 +153,7 @@ public class ConfigNetwork_p {
prop.put("robinson.checked", (indexDistribute || indexReceive) ? 0 : 1);
prop.put("cluster.peers.ipport", sb.getConfig("cluster.peers.ipport", ""));
prop.put("cluster.peers.yacydomain", sb.getConfig("cluster.peers.yacydomain", ""));
prop.put("cluster.peers.yacydomain.hashes", (sb.clusterhashes.size() == 0) ? "" : sb.clusterhashes.toString());
// set p2p mode flags
prop.put("privatepeerChecked", (sb.getConfig("cluster.mode", "").equals("privatepeer")) ? 1 : 0);
prop.put("privateclusterChecked", (sb.getConfig("cluster.mode", "").equals("privatecluster")) ? 1 : 0);

@ -160,7 +160,7 @@ public final class search {
plasmaSearchTimingProfile localTiming = new plasmaSearchTimingProfile(squery.maximumTime, squery.wantedResults);
plasmaSearchTimingProfile remoteTiming = null;
theSearch = new plasmaSearchEvent(squery, rankingProfile, localTiming, remoteTiming, true, yacyCore.log, sb.wordIndex, sb.wordIndex.loadedURL, sb.snippetCache);
theSearch = new plasmaSearchEvent(squery, rankingProfile, localTiming, remoteTiming, true, yacyCore.log, sb.wordIndex, sb.wordIndex.loadedURL, sb.snippetCache, null);
Map[] containers = theSearch.localSearchContainers(plasmaSearchQuery.hashes2Set(urls));
if (containers != null) {
Iterator ci = containers[0].entrySet().iterator();
@ -190,7 +190,7 @@ public final class search {
theSearch = new plasmaSearchEvent(squery,
rankingProfile, localTiming, remoteTiming, true,
yacyCore.log, sb.wordIndex, sb.wordIndex.loadedURL,
sb.snippetCache);
sb.snippetCache, null);
Map[] containers = theSearch.localSearchContainers(plasmaSearchQuery.hashes2Set(urls));
// set statistic details of search result and find best result index set

@ -183,7 +183,11 @@ public class yacysearch {
final boolean indexDistributeGranted = sb.getConfig(plasmaSwitchboard.INDEX_DIST_ALLOW, "true").equals("true");
final boolean indexReceiveGranted = sb.getConfig("allowReceiveIndex", "true").equals("true");
final boolean offline = yacyCore.seedDB.mySeed.isVirgin();
final boolean clustersearch = sb.isRobinsonMode() &&
(sb.getConfig("clustermode", "").equals("privatecluster") ||
sb.getConfig("clustermode", "").equals("publiccluster"));
if (offline || !indexDistributeGranted || !indexReceiveGranted) { global = false; }
if (clustersearch) global = true; // switches search on, but search target is limited to cluster nodes
// find search domain
int contentdomCode = plasmaSearchQuery.CONTENTDOM_TEXT;
@ -268,7 +272,8 @@ public class yacysearch {
count,
searchtime,
urlmask,
(globalsearch) ? plasmaSearchQuery.SEARCHDOM_GLOBALDHT : plasmaSearchQuery.SEARCHDOM_LOCAL,
(clustersearch && globalsearch) ? plasmaSearchQuery.SEARCHDOM_CLUSTERALL :
((globalsearch) ? plasmaSearchQuery.SEARCHDOM_GLOBALDHT : plasmaSearchQuery.SEARCHDOM_LOCAL),
"",
20,
constraint);

@ -240,6 +240,8 @@ public class kelondroRowSet extends kelondroRowCollection implements kelondroInd
}
public boolean hasNext() {
if (p < 0) return false;
if (p >= size()) return false;
if (up) {
return p < bound;
} else {

@ -48,6 +48,7 @@ import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import de.anomic.index.indexContainer;
import de.anomic.index.indexRWIEntry;
@ -78,6 +79,7 @@ public final class plasmaSearchEvent extends Thread implements Runnable {
private yacySearch[] primarySearchThreads, secondarySearchThreads;
private long searchtime;
private int searchcount;
private TreeSet preselectedPeerHashes;
public plasmaSearchEvent(plasmaSearchQuery query,
plasmaSearchRankingProfile ranking,
@ -87,7 +89,8 @@ public final class plasmaSearchEvent extends Thread implements Runnable {
serverLog log,
plasmaWordIndex wordIndex,
plasmaCrawlLURL urlStore,
plasmaSnippetCache snippetCache) {
plasmaSnippetCache snippetCache,
TreeSet preselectedPeerHashes) {
this.log = log;
this.wordIndex = wordIndex;
this.query = query;
@ -104,6 +107,7 @@ public final class plasmaSearchEvent extends Thread implements Runnable {
this.secondarySearchThreads = null;
this.searchtime = -1;
this.searchcount = -1;
this.preselectedPeerHashes = preselectedPeerHashes;
}
public plasmaSearchQuery getQuery() {
@ -141,7 +145,8 @@ public final class plasmaSearchEvent extends Thread implements Runnable {
synchronized (flushThreads) {
long start = System.currentTimeMillis();
plasmaSearchPostOrder result;
if (query.domType == plasmaSearchQuery.SEARCHDOM_GLOBALDHT) {
if ((query.domType == plasmaSearchQuery.SEARCHDOM_GLOBALDHT) ||
(query.domType == plasmaSearchQuery.SEARCHDOM_CLUSTERALL)) {
int fetchpeers = (int) (query.maximumTime / 500L); // number of target peers; means 10 peers in 10 seconds
if (fetchpeers > 50) fetchpeers = 50;
if (fetchpeers < 30) fetchpeers = 30;
@ -153,7 +158,8 @@ public final class plasmaSearchEvent extends Thread implements Runnable {
long primaryTimeout = System.currentTimeMillis() + profileGlobal.duetime();
primarySearchThreads = yacySearch.primaryRemoteSearches(plasmaSearchQuery.hashSet2hashString(query.queryHashes), plasmaSearchQuery.hashSet2hashString(query.excludeHashes), "",
query.prefer, query.urlMask, query.maxDistance, urlStore, wordIndex, rcContainers, rcAbstracts,
fetchpeers, plasmaSwitchboard.urlBlacklist, snippetCache, profileGlobal, ranking, query.constraint);
fetchpeers, plasmaSwitchboard.urlBlacklist, snippetCache, profileGlobal, ranking, query.constraint,
(query.domType == plasmaSearchQuery.SEARCHDOM_GLOBALDHT) ? null : preselectedPeerHashes);
// meanwhile do a local search
Map[] searchContainerMaps = localSearchContainers(null);

@ -57,8 +57,8 @@ import de.anomic.yacy.yacySeedDB;
public final class plasmaSearchQuery {
public static final int SEARCHDOM_LOCAL = 0;
public static final int SEARCHDOM_GROUPDHT = 1;
public static final int SEARCHDOM_GROUPALL = 2;
public static final int SEARCHDOM_CLUSTERDHT = 1;
public static final int SEARCHDOM_CLUSTERALL = 2;
public static final int SEARCHDOM_GLOBALDHT = 3;
public static final int SEARCHDOM_GLOBALALL = 4;

@ -251,6 +251,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
public double lastrequestedQueries = 0d;
public int totalPPM = 0;
public double totalQPM = 0d;
public TreeSet clusterhashes;
/*
* Remote Proxy configuration
@ -1272,6 +1273,9 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
this.dbImportManager = new dbImportManager(this);
// init robinson cluster
this.clusterhashes = yacyCore.seedDB.clusterHashes(getConfig("cluster.peers.yacydomain", ""));
sb=this;
log.logConfig("Finished Switchboard Initialization");
}
@ -1881,6 +1885,9 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
kelondroRecords.setCacheGrowStati(memprereq + (memprereq / 8) + 2 * 1024 * 1024, memprereq);
kelondroCache.setCacheGrowStati(memprereq + (memprereq / 8) + 2 * 1024 * 1024, memprereq);
// update the cluster set
this.clusterhashes = yacyCore.seedDB.clusterHashes(getConfig("cluster.peers.yacydomain", ""));
return hasDoneSomething;
} catch (InterruptedException e) {
this.log.logInfo("cleanupJob: Shutdown detected");
@ -2009,6 +2016,13 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
return false;
}
if ((isRobinsonMode()) &&
((getConfig("cluster.mode", "").equals("publicpeer")) ||
(getConfig("cluster.mode", "").equals("privatepeer")))){
// not-clustered robinson peers do not do remote crawling
return false;
}
if ((coreCrawlJobSize() <= 20) && (limitCrawlTriggerJobSize() > 100)) {
// it is not efficient if the core crawl job is empty and we have too much to do
// move some tasks to the core crawl job
@ -2672,7 +2686,9 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
}
// check if peer for remote crawl is available
yacySeed remoteSeed = yacyCore.dhtAgent.getCrawlSeed(urlEntry.urlhash());
yacySeed remoteSeed = ((this.isOpenRobinsonCluster()) && (getConfig("cluster.mode", "").equals("publiccluster"))) ?
yacyCore.dhtAgent.getPublicClusterCrawlSeed(urlEntry.urlhash(), this.clusterhashes) :
yacyCore.dhtAgent.getGlobalCrawlSeed(urlEntry.urlhash());
if (remoteSeed == null) {
log.logFine("plasmaSwitchboard.processRemoteCrawlTrigger: no remote crawl seed available");
return false;
@ -2788,7 +2804,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
//}
// create a new search event
plasmaSearchEvent theSearch = new plasmaSearchEvent(query, ranking, localTiming, remoteTiming, postsort, log, wordIndex, wordIndex.loadedURL, snippetCache);
plasmaSearchEvent theSearch = new plasmaSearchEvent(query, ranking, localTiming, remoteTiming, postsort, log, wordIndex, wordIndex.loadedURL, snippetCache, (isRobinsonMode()) ? this.clusterhashes : null);
plasmaSearchPostOrder acc = theSearch.search();
// fetch snippets

@ -78,7 +78,7 @@ public class ShareService extends AbstractService {
private static final int FILEINFO_COMMENT = 1;
private static final int GENMD5_MD5_ARRAY = 0;
private static final int GENMD5_MD5_STRING = 1;
//private static final int GENMD5_MD5_STRING = 1;
/* =====================================================================
* Used XML Templates

@ -326,6 +326,14 @@ public class yacyCore {
public void peerPing() {
if (!online()) return;
if ((switchboard.isRobinsonMode()) && (switchboard.getConfig("cluster.mode", "").equals("privatepeer"))) {
// in case this peer is a privat peer we omit the peer ping
// all other robinson peer types do a peer ping:
// the privatecluster does the ping to the other cluster members
// the publiccluster does the ping to all peers, but prefers the own peer
// the publicpeer does the ping to all peers
return;
}
// before publishing, update some seed data
peerActions.updateMySeed();

@ -46,10 +46,14 @@ package de.anomic.yacy;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.Hashtable;
import java.util.TreeSet;
import de.anomic.kelondro.kelondroBase64Order;
import de.anomic.kelondro.kelondroCloneableIterator;
import de.anomic.kelondro.kelondroCloneableSetIterator;
import de.anomic.kelondro.kelondroException;
import de.anomic.kelondro.kelondroMScoreCluster;
import de.anomic.kelondro.kelondroRotateIterator;
import de.anomic.server.logging.serverLog;
public class yacyDHTAction implements yacyPeerAction {
@ -205,7 +209,7 @@ public class yacyDHTAction implements yacyPeerAction {
}
public synchronized yacySeed getCrawlSeed(String urlHash) {
public synchronized yacySeed getGlobalCrawlSeed(String urlHash) {
Enumeration e = getAcceptRemoteCrawlSeeds(urlHash, true);
yacySeed seed;
if (e.hasMoreElements()) seed = (yacySeed) e.nextElement(); else seed = null;
@ -213,6 +217,14 @@ public class yacyDHTAction implements yacyPeerAction {
return seed;
}
public synchronized yacySeed getPublicClusterCrawlSeed(String urlHash, TreeSet clusterhashes) {
kelondroCloneableIterator i = new kelondroRotateIterator(new kelondroCloneableSetIterator(clusterhashes, urlHash), null);
if (i.hasNext()) {
return seedDB.getConnected((String) i.next());
}
return null;
}
public void setCrawlTime(String seedHash, int newYacyTime) {
if (newYacyTime < yacyCore.yacyTime()) newYacyTime = yacyCore.yacyTime();
seedCrawlReady.setScore(seedHash, newYacyTime);

@ -43,12 +43,14 @@
package de.anomic.yacy;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import de.anomic.index.indexContainer;
import de.anomic.kelondro.kelondroBitfield;
@ -145,7 +147,18 @@ public class yacySearch extends Thread {
return targetPeer;
}
private static yacySeed[] selectPeers(Set wordhashes, int seedcount) {
private static yacySeed[] selectClusterPeers(TreeSet peerhashes) {
Iterator i = peerhashes.iterator();
ArrayList l = new ArrayList();
yacySeed s;
while (i.hasNext()) {
s = yacyCore.seedDB.getConnected((String) i.next());
if (s != null) l.add(s);
}
return (yacySeed[]) l.toArray();
}
private static yacySeed[] selectDHTPeers(Set wordhashes, int seedcount) {
// find out a specific number of seeds, that would be relevant for the given word hash(es)
// the result is ordered by relevance: [0] is most relevant
// the seedcount is the maximum number of wanted results
@ -167,9 +180,10 @@ public class yacySearch extends Thread {
c = seedcount;
while (dhtEnum.hasMoreElements() && c > 0) {
seed = (yacySeed) dhtEnum.nextElement();
if (seed == null) { continue; }
if (seed == null) continue;
distance = yacyDHTAction.dhtDistance(seed.hash, wordhash);
if (distance > 0.9) { continue; } // catch bug in peer selection
if (distance > 0.9) continue; // catch bug in peer selection
if (!seed.getFlagAcceptRemoteIndex()) continue; // probably a robinson peer
serverLog.logFine("PLASMA", "selectPeers/DHTorder: " + seed.hash + ":" + seed.getName() + "/" + distance + " for wordhash " + wordhash + ", score " + c);
ranking.addScore(seed.hash, c--);
seeds.put(seed.hash, seed);
@ -183,7 +197,8 @@ public class yacySearch extends Thread {
if (c > yacyCore.seedDB.sizeConnected()) { c = yacyCore.seedDB.sizeConnected(); }
while (dhtEnum.hasMoreElements() && c > 0) {
seed = (yacySeed) dhtEnum.nextElement();
if (seed == null) { continue; }
if (seed == null) continue;
if (!seed.getFlagAcceptRemoteIndex()) continue; // probably a robinson peer
score = (int) Math.round(Math.random() * ((c / 3) + 3));
serverLog.logFine("PLASMA", "selectPeers/RWIcount: " + seed.hash + ":" + seed.getName() + ", RWIcount=" + seed.get(yacySeed.ICOUNT,"") + ", score " + score);
ranking.addScore(seed.hash, score);
@ -191,6 +206,17 @@ public class yacySearch extends Thread {
c--;
}
// put in seeds that are public robinson peers and where the peer tags match with query
dhtEnum = yacyCore.seedDB.seedsConnected(true, false, null, (float) 0.50);
while (dhtEnum.hasMoreElements()) {
seed = (yacySeed) dhtEnum.nextElement();
if (seed == null) continue;
if (!seed.matchPeerTags(wordhashes)) continue;
serverLog.logInfo("PLASMA", "selectPeers/RWIcount: " + seed.hash + ":" + seed.getName() + ", is specialized peer for " + seed.getPeerTags().toString());
ranking.addScore(seed.hash, seedcount);
seeds.put(seed.hash, seed);
}
// evaluate the ranking score and select seeds
if (ranking.size() < seedcount) { seedcount = ranking.size(); }
yacySeed[] result = new yacySeed[seedcount];
@ -212,12 +238,12 @@ public class yacySearch extends Thread {
indexContainer containerCache, Map abstractCache,
int targets, plasmaURLPattern blacklist, plasmaSnippetCache snippetCache,
plasmaSearchTimingProfile timingProfile, plasmaSearchRankingProfile rankingProfile,
kelondroBitfield constraint) {
kelondroBitfield constraint, TreeSet clusterselection) {
// check own peer status
if (yacyCore.seedDB.mySeed == null || yacyCore.seedDB.mySeed.getAddress() == null) { return null; }
// prepare seed targets and threads
final yacySeed[] targetPeers = selectPeers(plasmaSearchQuery.hashes2Set(wordhashes), targets);
final yacySeed[] targetPeers = (clusterselection == null) ? selectDHTPeers(plasmaSearchQuery.hashes2Set(wordhashes), targets) : selectClusterPeers(clusterselection);
if (targetPeers == null) return null;
targets = targetPeers.length;
if (targets == 0) return null;

@ -67,12 +67,14 @@ import java.net.InetAddress;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import de.anomic.kelondro.kelondroBase64Order;
import de.anomic.net.natLib;
import de.anomic.plasma.plasmaCondenser;
import de.anomic.plasma.plasmaSwitchboard;
import de.anomic.server.serverCodings;
import de.anomic.server.serverCore;
@ -461,6 +463,15 @@ public class yacySeed {
return serverCodings.string2set(get(PEERTAGS, ""), "|");
}
public boolean matchPeerTags(Set searchHashes) {
Set tags = serverCodings.string2set(get(PEERTAGS, ""), "|");
Iterator i = tags.iterator();
while (i.hasNext()) {
if (searchHashes.contains(plasmaCondenser.word2hash((String) i.next()))) return true;
}
return false;
}
public int getPPM() {
try {
return Integer.parseInt(get(yacySeed.ISPEED, yacySeed.ZERO));

@ -57,14 +57,19 @@ import java.util.Enumeration;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.Map;
import java.util.TreeSet;
import de.anomic.http.httpHeader;
import de.anomic.http.httpc;
import de.anomic.http.httpd;
import de.anomic.kelondro.kelondroCloneableIterator;
import de.anomic.kelondro.kelondroCloneableSetIterator;
import de.anomic.kelondro.kelondroDyn;
import de.anomic.kelondro.kelondroException;
import de.anomic.kelondro.kelondroMScoreCluster;
import de.anomic.kelondro.kelondroMapObjects;
import de.anomic.kelondro.kelondroBase64Order;
import de.anomic.kelondro.kelondroRotateIterator;
import de.anomic.net.URL;
import de.anomic.plasma.plasmaSwitchboard;
import de.anomic.server.serverCore;
@ -242,6 +247,53 @@ public final class yacySeedDB {
return new seedEnum(up, field, seedPotentialDB);
}
public TreeSet /* peer-b64-hashes */ clusterHashes(String clusterdefinition) {
// collects seeds according to cluster definition string, which consists of
// comma-separated .yacy or .yacyh-domains
String[] cluster = clusterdefinition.split(",");
TreeSet clusterset = new TreeSet(kelondroBase64Order.enhancedCoder);
yacySeed seed;
String hash;
for (int i = 0; i < cluster.length; i++) {
if (cluster[i].endsWith(".yacyh")) {
// find a peer with its hexhash
hash = yacySeed.hexHash2b64Hash(cluster[i].substring(0, cluster[i].length() - 6));
seed = get(hash);
if (seed == null) {
yacyCore.log.logWarning("cluster peer '" + cluster[i] + "' was not found.");
} else {
clusterset.add(hash);
}
} else if (cluster[i].endsWith(".yacy")) {
// find a peer with its name
seed = lookupByName(cluster[i].substring(0, cluster[i].length() - 5));
if (seed == null) {
yacyCore.log.logWarning("cluster peer '" + cluster[i] + "' was not found.");
} else {
clusterset.add(seed.hash);
}
} else {
yacyCore.log.logWarning("cluster peer '" + cluster[i] + "' has wrong syntax. the name must end with .yacy or .yacyh");
}
}
return clusterset;
}
public Iterator /*of yacySeed*/ seedsInCluster(String firstHash, TreeSet clusterhashes) {
// returns an seed iterator for all peer hashes in the given set
// the iterator starts at the firstHash
kelondroCloneableIterator i = new kelondroRotateIterator(new kelondroCloneableSetIterator(clusterhashes, firstHash), null);
ArrayList l = new ArrayList();
Object o;
while (i.hasNext()) {
o = i.next();
if (o instanceof String) l.add(get((String) o));
if (o instanceof byte[]) l.add(get(new String((byte[]) o)));
if (l.size() >= clusterhashes.size()) break;
}
return l.iterator();
}
public Enumeration seedsConnected(boolean up, boolean rot, String firstHash, float minVersion) {
// enumerates seed-type objects: all seeds sequentially without order
return new seedEnum(up, rot, (firstHash == null) ? null : firstHash.getBytes(), null, seedActiveDB, minVersion);

Loading…
Cancel
Save