enhanced strategy in host browser

limit number of fresh hosts in round robin hashes
pull/402/head
Michael Peter Christen 4 years ago
parent 9be36800a4
commit 63f58e4785

@ -65,7 +65,7 @@ public class HostBalancer implements Balancer {
private final static ConcurrentLog log = new ConcurrentLog("HostBalancer");
public final static HandleMap depthCache = new RowHandleMap(Word.commonHashLength, Word.commonHashOrder, 2, 8 * 1024 * 1024, "HostBalancer.DepthCache");
private final File hostsPath;
private final boolean exceed134217727;
private final Map<String, HostQueue> queues;
@ -84,7 +84,7 @@ public class HostBalancer implements Balancer {
final boolean exceed134217727) {
this(hostsPath, onDemandLimit, exceed134217727, true);
}
/**
* Create a new instance and fills the queue by scanning the hostsPath directory.
* @param hostsPath
@ -100,7 +100,7 @@ public class HostBalancer implements Balancer {
this.hostsPath = hostsPath;
this.onDemandLimit = onDemandLimit;
this.exceed134217727 = exceed134217727;
// create a stack for newly entered entries
if (!(hostsPath.exists())) hostsPath.mkdirs(); // make the path
this.queues = new ConcurrentHashMap<String, HostQueue>();
@ -114,7 +114,7 @@ public class HostBalancer implements Balancer {
* return immediately (as large unfinished crawls may take longer to load)
*/
private void init(final boolean async) {
if(async) {
if(async) {
Thread t = new Thread("HostBalancer.init") {
@Override
public void run() {
@ -122,10 +122,10 @@ public class HostBalancer implements Balancer {
}
};
t.start();
} else {
runInit();
}
t.start();
} else {
runInit();
}
}
/**
@ -185,7 +185,7 @@ public class HostBalancer implements Balancer {
}
return c;
}
/**
* delete all urls which are stored for given host hashes
* @param hosthashes
@ -230,11 +230,11 @@ public class HostBalancer implements Balancer {
return c;
}
/**
* @return true when the URL is queued is this or any other HostBalancer
* instance (as {@link #depthCache} is shared between all HostBalancer
* instances)
*/
/**
* @return true when the URL is queued is this or any other HostBalancer
* instance (as {@link #depthCache} is shared between all HostBalancer
* instances)
*/
@Override
public boolean has(final byte[] urlhashb) {
if (depthCache.has(urlhashb)) return true;
@ -313,7 +313,7 @@ public class HostBalancer implements Balancer {
tryagain: while (true) try {
HostQueue rhq = null;
String rhh = null;
synchronized (this) {
if (this.roundRobinHostHashes.size() == 0) {
// refresh the round-robin cache
@ -331,14 +331,21 @@ public class HostBalancer implements Balancer {
if (size <= 10) {smallStacksExist = true; break smallsearch;}
}
}
if (singletonStacksExist || smallStacksExist) {
Iterator<String> i = this.roundRobinHostHashes.iterator();
smallstacks: while (i.hasNext()) {
if (this.roundRobinHostHashes.size() <= 10) break smallstacks; // don't shrink the hosts until nothing is left
String s = i.next();
HostQueue hq = this.queues.get(s);
if (hq == null) {i.remove(); continue smallstacks;}
int delta = Latency.waitingRemainingGuessed(hq.getHost(), hq.getPort(), s, robots, ClientIdentification.yacyInternetCrawlerAgent);
Set<String> freshhosts = new HashSet<>();
Iterator<String> i = this.roundRobinHostHashes.iterator();
smallstacks: while (i.hasNext()) {
if (this.roundRobinHostHashes.size() <= 10) break smallstacks; // don't shrink the hosts until nothing is left
String hosthash = i.next();
HostQueue hq = this.queues.get(hosthash);
if (hq == null) {i.remove(); continue smallstacks;}
int delta = Latency.waitingRemainingGuessed(hq.getHost(), hq.getPort(), hosthash, robots, ClientIdentification.yacyInternetCrawlerAgent);
if (delta == Integer.MIN_VALUE) {
// never-crawled hosts; we do not want to have too many of them in here. Loading new hosts means: waiting for robots.txt to load
freshhosts.add(hosthash);
i.remove();
continue smallstacks;
}
if (singletonStacksExist || smallStacksExist) {
if (delta < 0) continue; // keep all non-waiting stacks; they are useful to speed up things
// to protect all small stacks which have a fast throughput, remove all with long waiting time
if (delta >= 1000) {i.remove(); continue smallstacks;}
@ -350,6 +357,10 @@ public class HostBalancer implements Balancer {
}
}
}
// put at least one of the fresh hosts back
if (freshhosts.size() > 0) this.roundRobinHostHashes.add(freshhosts.iterator().next());
// result
if (this.roundRobinHostHashes.size() == 1) {
if (log.isFine()) log.fine("(re-)initialized the round-robin queue with one host");
} else {
@ -357,13 +368,13 @@ public class HostBalancer implements Balancer {
}
}
if (this.roundRobinHostHashes.size() == 0) return null;
// if the queue size is 1, just take that
if (this.roundRobinHostHashes.size() == 1) {
rhh = this.roundRobinHostHashes.iterator().next();
rhq = this.queues.get(rhh);
}
if (rhq == null) {
// mixed minimum sleep time / largest queue strategy:
// create a map of sleep time / queue relations with a fuzzy sleep time (ms / 500).
@ -449,7 +460,7 @@ public class HostBalancer implements Balancer {
}
*/
}
if (rhq == null) {
this.roundRobinHostHashes.clear(); // force re-initialization
continue tryagain;
@ -458,7 +469,7 @@ public class HostBalancer implements Balancer {
long timestamp = System.currentTimeMillis();
Request request = rhq.pop(delay, cs, robots); // this pop is outside of synchronization to prevent blocking of pushes
long actualwaiting = System.currentTimeMillis() - timestamp;
if (actualwaiting > 1000) {
synchronized (this) {
// to prevent that this occurs again, remove all stacks with positive delay times (which may be less after that waiting)
@ -473,7 +484,7 @@ public class HostBalancer implements Balancer {
}
}
}
if (rhq.isEmpty()) {
synchronized (this) {
this.queues.remove(rhh);
@ -545,7 +556,7 @@ public class HostBalancer implements Balancer {
@Override
public List<Request> getDomainStackReferences(String host, int maxcount, long maxtime) {
if (host == null) {
return Collections.emptyList();
return Collections.emptyList();
}
try {
HostQueue hq = this.queues.get(DigestURL.hosthash(host, host.startsWith("ftp.") ? 21 : 80));

Loading…
Cancel
Save