- fixed more cluster routing problems

- fixed a problem in remote search when balancer caused shift process to wait too long

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@3627 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
orbiter 18 years ago
parent 304ed3f4d2
commit 81844e85b2

@ -118,6 +118,7 @@ public class ViewProfile {
} catch (IOException e) {}
// try to get the profile from remote peer
seed.setAlternativeAddress((String) switchboard.clusterhashes.get(seed.hash));
profile = yacyClient.getProfile(seed);
// if profile did not arrive, say that peer is disconnected

@ -95,12 +95,14 @@ public final class hello {
final String reportedPeerType = remoteSeed.get(yacySeed.PEERTYPE, yacySeed.PEERTYPE_JUNIOR);
final float clientversion = remoteSeed.getVersion();
if ((sb.isRobinsonMode()) && (!sb.isOpenRobinsonCluster())) {
if ((sb.isRobinsonMode()) && (!sb.isPublicRobinson())) {
// if we are a robinson cluster, answer only if this client is known by our network definition
return prop;
}
int urls = -1;
remoteSeed.setAlternativeAddress((String) sb.clusterhashes.get(remoteSeed.hash));
// if the remote client has reported its own IP address and the client supports
// the port forwarding feature (if client version >= 0.383) then we try to
// connect to the reported IP address first

@ -97,7 +97,7 @@ public final class message {
}
if ((sb.isRobinsonMode()) &&
(!((sb.isOpenRobinsonCluster()) ||
(!((sb.isPublicRobinson()) ||
(sb.isInMyCluster((String)header.get(httpHeader.CONNECTION_PROP_CLIENTIP)))))) {
// if we are a robinson cluster, answer only if this client is known by our network definition
prop.putASIS("response", "-1"); // request rejected

@ -69,8 +69,8 @@ public final class profile {
if (prop == null) { return null; }
if ((sb.isRobinsonMode()) &&
(!((sb.isOpenRobinsonCluster()) ||
(sb.isInMyCluster((String)header.get(httpHeader.CONNECTION_PROP_CLIENTIP)))))) {
(!sb.isPublicRobinson()) &&
(!sb.isInMyCluster((String)header.get(httpHeader.CONNECTION_PROP_CLIENTIP)))) {
// if we are a robinson cluster, answer only if this client is known by our network definition
prop.put("list", 0);
return prop;

@ -65,7 +65,7 @@ public final class query {
if (prop == null || sb == null) { return null; }
if ((sb.isRobinsonMode()) &&
(!((sb.isOpenRobinsonCluster()) ||
(!((sb.isPublicRobinson()) ||
(sb.isInMyCluster((String)header.get(httpHeader.CONNECTION_PROP_CLIENTIP)))))) {
// if we are a robinson cluster, answer only if this client is known by our network definition
prop.putASIS("response", "-1"); // request rejected

@ -117,7 +117,7 @@ public final class search {
serverObjects prop = new serverObjects();
if ((sb.isRobinsonMode()) &&
(!((sb.isOpenRobinsonCluster()) ||
(!((sb.isPublicRobinson()) ||
(sb.isInMyCluster((String)header.get(httpHeader.CONNECTION_PROP_CLIENTIP)))))) {
// if we are a robinson cluster, answer only if this client is known by our network definition
prop.putASIS("links", "");

@ -1,4 +1,4 @@
// kelondroCloneableSetIterator.java
// kelondroCloneableMapIterator.java
// (C) 2007 by Michael Peter Christen; mc@yacy.net, Frankfurt a. M., Germany
// first published 25.04.2007 on http://yacy.net
//
@ -28,33 +28,33 @@
package de.anomic.kelondro;
import java.util.Iterator;
import java.util.TreeSet;
import java.util.TreeMap;
public class kelondroCloneableSetIterator implements kelondroCloneableIterator {
public class kelondroCloneableMapIterator implements kelondroCloneableIterator {
TreeSet set;
TreeMap map;
Object next, last;
Object start;
Iterator iter;
public kelondroCloneableSetIterator(TreeSet set, Object start) {
public kelondroCloneableMapIterator(TreeMap map, Object start) {
// set must contain byte[] elements or String elements.
// start must be either of type byte[] or String
this.set = set;
this.map = map;
this.start = start;
this.iter = set.iterator();
this.iter = map.keySet().iterator();
if (this.start == null) {
if (iter.hasNext()) this.next = iter.next(); else this.next = null;
} else while (iter.hasNext()) {
this.next = iter.next();
if (set.comparator().compare(next, start) > 1) break;
if (map.comparator().compare(next, start) > 1) break;
}
this.last = null;
}
public Object clone(Object modifier) {
return new kelondroCloneableSetIterator(set, modifier);
return new kelondroCloneableMapIterator(map, modifier);
}
public boolean hasNext() {
@ -62,6 +62,7 @@ public class kelondroCloneableSetIterator implements kelondroCloneableIterator {
}
public Object next() {
// returns key-elements, not entry-elements
this.last = this.next;
if (this.iter.hasNext()) {
this.next = this.iter.next();
@ -72,7 +73,7 @@ public class kelondroCloneableSetIterator implements kelondroCloneableIterator {
}
public void remove() {
this.set.remove(this.last);
this.map.remove(this.last);
}
}

@ -397,6 +397,7 @@ public class plasmaCrawlBalancer {
// finally: check minimumDelta and if necessary force a sleep
long delta = lastAccessDelta(result);
assert delta >= 0: "delta = " + delta;
if (delta < minimumDelta) {
// force a busy waiting here
// in best case, this should never happen if the balancer works propertly

@ -299,7 +299,7 @@ public final class plasmaSearchEvent extends Thread implements Runnable {
System.out.println("DEBUG-INDEXABSTRACT ***: peer " + peer + " from words: " + words);
secondarySearchThreads[c++] = yacySearch.secondaryRemoteSearch(
words, "", urls, urlStore, wordIndex, rcContainers, peer, plasmaSwitchboard.urlBlacklist, snippetCache,
profileGlobal, ranking, query.constraint);
profileGlobal, ranking, query.constraint, preselectedPeerHashes);
}
}

@ -1336,7 +1336,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
return !getConfigBool(plasmaSwitchboard.INDEX_DIST_ALLOW, false) && !getConfigBool(plasmaSwitchboard.INDEX_RECEIVE_ALLOW, false);
}
public boolean isOpenRobinsonCluster() {
public boolean isPublicRobinson() {
// robinson peers may be member of robinson clusters, which can be public or private
// this does not check the robinson attribute, only the specific subtype of the cluster
String clustermode = getConfig("cluster.mode", "publicpeer");
@ -1999,24 +1999,20 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
//log.logDebug("LimitCrawl: queue is empty");
return false;
}
boolean robinsonPrivateCase = ((isRobinsonMode()) &&
(!getConfig("cluster.mode", "").equals("publiccluster")) &&
(!getConfig("cluster.mode", "").equals("privatecluster")));
if ((isRobinsonMode()) &&
(!getConfig("cluster.mode", "").equals("publicpeer")) &&
(!getConfig("cluster.mode", "").equals("privatepeer"))) {
// not-clustered robinson peers do not do remote crawling
return false;
}
if ((coreCrawlJobSize() <= 20) && (limitCrawlTriggerJobSize() > 100)) {
if ((robinsonPrivateCase) || ((coreCrawlJobSize() <= 20) && (limitCrawlTriggerJobSize() > 10))) {
// it is not efficient if the core crawl job is empty and we have too much to do
// move some tasks to the core crawl job
int toshift = limitCrawlTriggerJobSize() / 5;
if (toshift > 1000) toshift = 1000;
int toshift = 10; // this cannot be a big number because the balancer makes a forced waiting if it cannot balance
if (toshift > limitCrawlTriggerJobSize()) toshift = limitCrawlTriggerJobSize();
for (int i = 0; i < toshift; i++) {
noticeURL.shift(plasmaCrawlNURL.STACK_TYPE_LIMIT, plasmaCrawlNURL.STACK_TYPE_CORE);
}
log.logInfo("shifted " + toshift + " jobs from global crawl to local crawl");
log.logInfo("shifted " + toshift + " jobs from global crawl to local crawl (coreCrawlJobSize()=" + coreCrawlJobSize() + ", limitCrawlTriggerJobSize()=" + limitCrawlTriggerJobSize() + ", cluster.mode=" + getConfig("cluster.mode", "") + ", robinsonMode=" + ((isRobinsonMode()) ? "on" : "off"));
if (robinsonPrivateCase) return false;
}
// if the server is busy, we do crawling more slowly
@ -2484,6 +2480,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
// if this was performed for a remote crawl request, notify requester
if ((processCase == PROCESSCASE_6_GLOBAL_CRAWLING) && (initiatorPeer != null)) {
log.logInfo("Sending crawl receipt for '" + entry.normalizedURLString() + "' to " + initiatorPeer.getName());
initiatorPeer.setAlternativeAddress((String) clusterhashes.get(initiatorPeer.hash));
yacyClient.crawlReceipt(initiatorPeer, "crawl", "fill", "indexed", newEntry, "");
}
} else {
@ -2498,6 +2495,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
log.logSevere("Could not index URL " + entry.url() + ": " + ee.getMessage(), ee);
if ((processCase == PROCESSCASE_6_GLOBAL_CRAWLING) && (initiatorPeer != null)) {
initiatorPeer.setAlternativeAddress((String) clusterhashes.get(initiatorPeer.hash));
yacyClient.crawlReceipt(initiatorPeer, "crawl", "exception", ee.getMessage(), null, "");
}
addURLtoErrorDB(entry.url(), referrerUrlHash, initiatorPeerHash, docDescription, plasmaCrawlEURL.DENIED_UNSPECIFIED_INDEXING_ERROR, new kelondroBitfield());
@ -2510,6 +2508,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
log.logInfo("Not indexed any word in URL " + entry.url() + "; cause: " + noIndexReason);
addURLtoErrorDB(entry.url(), referrerUrlHash, initiatorPeerHash, docDescription, noIndexReason, new kelondroBitfield());
if ((processCase == PROCESSCASE_6_GLOBAL_CRAWLING) && (initiatorPeer != null)) {
initiatorPeer.setAlternativeAddress((String) clusterhashes.get(initiatorPeer.hash));
yacyClient.crawlReceipt(initiatorPeer, "crawl", "rejected", noIndexReason, null, "");
}
}
@ -2670,7 +2669,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
}
// check if peer for remote crawl is available
yacySeed remoteSeed = ((this.isOpenRobinsonCluster()) && (getConfig("cluster.mode", "").equals("publiccluster"))) ?
yacySeed remoteSeed = ((this.isPublicRobinson()) && (getConfig("cluster.mode", "").equals("publiccluster"))) ?
yacyCore.dhtAgent.getPublicClusterCrawlSeed(urlEntry.urlhash(), this.clusterhashes) :
yacyCore.dhtAgent.getGlobalCrawlSeed(urlEntry.urlhash());
if (remoteSeed == null) {
@ -2694,6 +2693,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
return false; // no response from peer, we will crawl this ourself
}
String response = (String) page.get("response");
log.logFine("plasmaSwitchboard.processRemoteCrawlTrigger: remoteSeed="
+ remoteSeed.getName() + ", url=" + urlEntry.url().toString()
+ ", response=" + page.toString()); // DEBUG
@ -2701,7 +2701,6 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
// we received an answer and we are told to wait a specific time until we shall ask again for another crawl
int newdelay = Integer.parseInt((String) page.get("delay"));
yacyCore.dhtAgent.setCrawlDelay(remoteSeed.hash, newdelay);
String response = (String) page.get("response");
if (response.equals("stacked")) {
// success, the remote peer accepted the crawl
log.logInfo(STR_REMOTECRAWLTRIGGER + remoteSeed.getName()

@ -349,10 +349,10 @@ public class yacyCore {
seedDB.sizeConnected() + " different peers");
}
private boolean canReachMyself() {
private boolean canReachMyself() { // TODO: check if this method is necessary - depending on the used router it will not work
// returns true if we can reach ourself under our known peer address
// if we cannot reach ourself, we call a forced publishMySeed and return false
final int urlc = yacyClient.queryUrlCount(seedDB.mySeed);
final int urlc = yacyClient.queryUrlCount(seedDB.mySeed);
if (urlc >= 0) {
seedDB.mySeed.setLastSeenUTC();
return true;

@ -47,11 +47,10 @@ import java.util.ArrayList;
import java.util.Enumeration;
import java.util.Hashtable;
import java.util.TreeMap;
import java.util.TreeSet;
import de.anomic.kelondro.kelondroBase64Order;
import de.anomic.kelondro.kelondroCloneableIterator;
import de.anomic.kelondro.kelondroCloneableSetIterator;
import de.anomic.kelondro.kelondroCloneableMapIterator;
import de.anomic.kelondro.kelondroException;
import de.anomic.kelondro.kelondroMScoreCluster;
import de.anomic.kelondro.kelondroRotateIterator;
@ -219,9 +218,10 @@ public class yacyDHTAction implements yacyPeerAction {
}
public synchronized yacySeed getPublicClusterCrawlSeed(String urlHash, TreeMap clusterhashes) {
kelondroCloneableIterator i = new kelondroRotateIterator(new kelondroCloneableSetIterator((TreeSet) (clusterhashes.keySet()), urlHash), null);
if (i.hasNext()) {
kelondroCloneableIterator i = new kelondroRotateIterator(new kelondroCloneableMapIterator(clusterhashes, urlHash), null);
while (i.hasNext()) {
yacySeed seed = seedDB.getConnected((String) i.next());
if (seed == null) continue;
seed.setAlternativeAddress((String) clusterhashes.get(seed.hash));
return seed;
}

@ -271,13 +271,14 @@ public class yacySearch extends Thread {
indexContainer containerCache,
String targethash, plasmaURLPattern blacklist, plasmaSnippetCache snippetCache,
plasmaSearchTimingProfile timingProfile, plasmaSearchRankingProfile rankingProfile,
kelondroBitfield constraint) {
kelondroBitfield constraint, TreeMap clusterselection) {
// check own peer status
if (yacyCore.seedDB.mySeed == null || yacyCore.seedDB.mySeed.getPublicAddress() == null) { return null; }
// prepare seed targets and threads
final yacySeed targetPeer = yacyCore.seedDB.getConnected(targethash);
if (targetPeer == null) return null;
targetPeer.setAlternativeAddress((String) clusterselection.get(targetPeer.hash));
yacySearch searchThread = new yacySearch(wordhashes, excludehashes, urlhashes, "", "", 9999, true, 0, targetPeer,
urlManager, wordIndex, containerCache, new TreeMap(), blacklist, snippetCache, timingProfile, rankingProfile, constraint);
searchThread.start();

@ -59,19 +59,15 @@ import java.util.Hashtable;
import java.util.Iterator;
import java.util.Map;
import java.util.TreeMap;
import java.util.TreeSet;
import de.anomic.http.httpHeader;
import de.anomic.http.httpc;
import de.anomic.http.httpd;
import de.anomic.kelondro.kelondroCloneableIterator;
import de.anomic.kelondro.kelondroCloneableSetIterator;
import de.anomic.kelondro.kelondroDyn;
import de.anomic.kelondro.kelondroException;
import de.anomic.kelondro.kelondroMScoreCluster;
import de.anomic.kelondro.kelondroMapObjects;
import de.anomic.kelondro.kelondroBase64Order;
import de.anomic.kelondro.kelondroRotateIterator;
import de.anomic.net.URL;
import de.anomic.plasma.plasmaSwitchboard;
import de.anomic.server.serverCore;
@ -296,21 +292,6 @@ public final class yacySeedDB {
return clustermap;
}
public Iterator /*of yacySeed*/ seedsInCluster(String firstHash, TreeSet clusterhashes) {
// returns an seed iterator for all peer hashes in the given set
// the iterator starts at the firstHash
kelondroCloneableIterator i = new kelondroRotateIterator(new kelondroCloneableSetIterator(clusterhashes, firstHash), null);
ArrayList l = new ArrayList();
Object o;
while (i.hasNext()) {
o = i.next();
if (o instanceof String) l.add(get((String) o));
if (o instanceof byte[]) l.add(get(new String((byte[]) o)));
if (l.size() >= clusterhashes.size()) break;
}
return l.iterator();
}
public Enumeration seedsConnected(boolean up, boolean rot, String firstHash, float minVersion) {
// enumerates seed-type objects: all seeds sequentially without order
return new seedEnum(up, rot, (firstHash == null) ? null : firstHash.getBytes(), null, seedActiveDB, minVersion);

Loading…
Cancel
Save