|
|
@ -6,6 +6,10 @@
|
|
|
|
// Frankfurt, Germany, 2005
|
|
|
|
// Frankfurt, Germany, 2005
|
|
|
|
// created: 24.09.2005
|
|
|
|
// created: 24.09.2005
|
|
|
|
//
|
|
|
|
//
|
|
|
|
|
|
|
|
//$LastChangedDate$
|
|
|
|
|
|
|
|
//$LastChangedRevision$
|
|
|
|
|
|
|
|
//$LastChangedBy$
|
|
|
|
|
|
|
|
//
|
|
|
|
// This program is free software; you can redistribute it and/or modify
|
|
|
|
// This program is free software; you can redistribute it and/or modify
|
|
|
|
// it under the terms of the GNU General Public License as published by
|
|
|
|
// it under the terms of the GNU General Public License as published by
|
|
|
|
// the Free Software Foundation; either version 2 of the License, or
|
|
|
|
// the Free Software Foundation; either version 2 of the License, or
|
|
|
@ -28,6 +32,7 @@ import java.util.ArrayList;
|
|
|
|
import java.util.Iterator;
|
|
|
|
import java.util.Iterator;
|
|
|
|
import java.util.List;
|
|
|
|
import java.util.List;
|
|
|
|
import java.util.Map;
|
|
|
|
import java.util.Map;
|
|
|
|
|
|
|
|
import java.util.SortedMap;
|
|
|
|
import java.util.TreeMap;
|
|
|
|
import java.util.TreeMap;
|
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
|
|
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
|
|
@ -45,6 +50,7 @@ import net.yacy.kelondro.util.ByteBuffer;
|
|
|
|
|
|
|
|
|
|
|
|
import de.anomic.crawler.retrieval.Request;
|
|
|
|
import de.anomic.crawler.retrieval.Request;
|
|
|
|
import de.anomic.http.client.Cache;
|
|
|
|
import de.anomic.http.client.Cache;
|
|
|
|
|
|
|
|
import java.util.concurrent.ConcurrentMap;
|
|
|
|
|
|
|
|
|
|
|
|
public class Balancer {
|
|
|
|
public class Balancer {
|
|
|
|
|
|
|
|
|
|
|
@ -54,9 +60,9 @@ public class Balancer {
|
|
|
|
private static final String localhost = "localhost";
|
|
|
|
private static final String localhost = "localhost";
|
|
|
|
|
|
|
|
|
|
|
|
// class variables
|
|
|
|
// class variables
|
|
|
|
private final ConcurrentHashMap<String, HandleSet> domainStacks; // a map from host name to lists with url hashs
|
|
|
|
private final ConcurrentMap<String, HandleSet> domainStacks; // a map from host name to lists with url hashs
|
|
|
|
private final ConcurrentLinkedQueue<byte[]> top; // a list of url-hashes that shall be taken next
|
|
|
|
private final ConcurrentLinkedQueue<byte[]> top; // a list of url-hashes that shall be taken next
|
|
|
|
private final TreeMap<Long, byte[]> delayed;
|
|
|
|
private final SortedMap<Long, byte[]> delayed;
|
|
|
|
private final HandleSet ddc;
|
|
|
|
private final HandleSet ddc;
|
|
|
|
private final HandleSet double_push_check; // for debugging
|
|
|
|
private final HandleSet double_push_check; // for debugging
|
|
|
|
private BufferedObjectIndex urlFileIndex;
|
|
|
|
private BufferedObjectIndex urlFileIndex;
|
|
|
@ -67,8 +73,8 @@ public class Balancer {
|
|
|
|
private int domStackInitSize;
|
|
|
|
private int domStackInitSize;
|
|
|
|
|
|
|
|
|
|
|
|
public Balancer(
|
|
|
|
public Balancer(
|
|
|
|
final File cachePath,
|
|
|
|
final File cachePath,
|
|
|
|
final String stackname,
|
|
|
|
final String stackname,
|
|
|
|
final long minimumLocalDelta,
|
|
|
|
final long minimumLocalDelta,
|
|
|
|
final long minimumGlobalDelta,
|
|
|
|
final long minimumGlobalDelta,
|
|
|
|
final boolean useTailCache,
|
|
|
|
final boolean useTailCache,
|
|
|
@ -189,16 +195,16 @@ public class Balancer {
|
|
|
|
final Iterator<byte[]> j = top.iterator();
|
|
|
|
final Iterator<byte[]> j = top.iterator();
|
|
|
|
byte[] urlhash;
|
|
|
|
byte[] urlhash;
|
|
|
|
while (j.hasNext()) {
|
|
|
|
while (j.hasNext()) {
|
|
|
|
urlhash = j.next();
|
|
|
|
urlhash = j.next();
|
|
|
|
if (urlHashes.has(urlhash)) j.remove();
|
|
|
|
if (urlHashes.has(urlhash)) j.remove();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// remove from delayed
|
|
|
|
// remove from delayed
|
|
|
|
synchronized (this.delayed) {
|
|
|
|
synchronized (this.delayed) {
|
|
|
|
final Iterator<Map.Entry<Long, byte[]>> k = this.delayed.entrySet().iterator();
|
|
|
|
final Iterator<Map.Entry<Long, byte[]>> k = this.delayed.entrySet().iterator();
|
|
|
|
while (k.hasNext()) {
|
|
|
|
while (k.hasNext()) {
|
|
|
|
if (urlHashes.has(k.next().getValue())) k.remove();
|
|
|
|
if (urlHashes.has(k.next().getValue())) k.remove();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// iterate through the domain stacks
|
|
|
|
// iterate through the domain stacks
|
|
|
@ -206,7 +212,7 @@ public class Balancer {
|
|
|
|
HandleSet stack;
|
|
|
|
HandleSet stack;
|
|
|
|
while (q.hasNext()) {
|
|
|
|
while (q.hasNext()) {
|
|
|
|
stack = q.next().getValue();
|
|
|
|
stack = q.next().getValue();
|
|
|
|
for (byte[] handle: urlHashes) stack.remove(handle);
|
|
|
|
for (final byte[] handle: urlHashes) stack.remove(handle);
|
|
|
|
if (stack.isEmpty()) q.remove();
|
|
|
|
if (stack.isEmpty()) q.remove();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
@ -234,7 +240,7 @@ public class Balancer {
|
|
|
|
private boolean domainStacksNotEmpty() {
|
|
|
|
private boolean domainStacksNotEmpty() {
|
|
|
|
if (domainStacks == null) return false;
|
|
|
|
if (domainStacks == null) return false;
|
|
|
|
synchronized (domainStacks) {
|
|
|
|
synchronized (domainStacks) {
|
|
|
|
for (HandleSet l: domainStacks.values()) {
|
|
|
|
for (final HandleSet l: domainStacks.values()) {
|
|
|
|
if (!l.isEmpty()) return true;
|
|
|
|
if (!l.isEmpty()) return true;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -255,7 +261,7 @@ public class Balancer {
|
|
|
|
|
|
|
|
|
|
|
|
// add to index
|
|
|
|
// add to index
|
|
|
|
final int s = this.urlFileIndex.size();
|
|
|
|
final int s = this.urlFileIndex.size();
|
|
|
|
this.urlFileIndex.put(entry.toRow());
|
|
|
|
this.urlFileIndex.put(entry.toRow());
|
|
|
|
assert s < this.urlFileIndex.size() : "hash = " + new String(hash) + ", s = " + s + ", size = " + this.urlFileIndex.size();
|
|
|
|
assert s < this.urlFileIndex.size() : "hash = " + new String(hash) + ", s = " + s + ", size = " + this.urlFileIndex.size();
|
|
|
|
assert this.urlFileIndex.has(hash) : "hash = " + new String(hash);
|
|
|
|
assert this.urlFileIndex.has(hash) : "hash = " + new String(hash);
|
|
|
|
|
|
|
|
|
|
|
@ -288,15 +294,15 @@ public class Balancer {
|
|
|
|
return;
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
domainList.remove(urlhash);
|
|
|
|
domainList.remove(urlhash);
|
|
|
|
if (domainList.size() == 0) domainStacks.remove(host);
|
|
|
|
if (domainList.isEmpty()) domainStacks.remove(host);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
private byte[] nextFromDelayed() {
|
|
|
|
private byte[] nextFromDelayed() {
|
|
|
|
if (this.delayed.isEmpty()) return null;
|
|
|
|
if (this.delayed.isEmpty()) return null;
|
|
|
|
final Long first = this.delayed.firstKey();
|
|
|
|
final Long first = this.delayed.firstKey();
|
|
|
|
if (first.longValue() < System.currentTimeMillis()) {
|
|
|
|
if (first.longValue() < System.currentTimeMillis()) {
|
|
|
|
return this.delayed.remove(first);
|
|
|
|
return this.delayed.remove(first);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return null;
|
|
|
|
return null;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
@ -465,37 +471,37 @@ public class Balancer {
|
|
|
|
byte[] besturlhash = null;
|
|
|
|
byte[] besturlhash = null;
|
|
|
|
String besthost = null;
|
|
|
|
String besthost = null;
|
|
|
|
while (i.hasNext()) {
|
|
|
|
while (i.hasNext()) {
|
|
|
|
entry = i.next();
|
|
|
|
entry = i.next();
|
|
|
|
|
|
|
|
|
|
|
|
// clean up empty entries
|
|
|
|
// clean up empty entries
|
|
|
|
if (entry.getValue().isEmpty()) {
|
|
|
|
if (entry.getValue().isEmpty()) {
|
|
|
|
i.remove();
|
|
|
|
i.remove();
|
|
|
|
continue;
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
byte[] n = entry.getValue().removeOne();
|
|
|
|
byte[] n = entry.getValue().removeOne();
|
|
|
|
if (n == null) continue;
|
|
|
|
if (n == null) continue;
|
|
|
|
if (delay) {
|
|
|
|
if (delay) {
|
|
|
|
final long w = Latency.waitingRemainingGuessed(entry.getKey(), minimumLocalDelta, minimumGlobalDelta);
|
|
|
|
final long w = Latency.waitingRemainingGuessed(entry.getKey(), minimumLocalDelta, minimumGlobalDelta);
|
|
|
|
if (w > maximumwaiting) {
|
|
|
|
if (w > maximumwaiting) {
|
|
|
|
if (w < smallestWaiting) {
|
|
|
|
if (w < smallestWaiting) {
|
|
|
|
smallestWaiting = w;
|
|
|
|
smallestWaiting = w;
|
|
|
|
besturlhash = n;
|
|
|
|
besturlhash = n;
|
|
|
|
besthost = entry.getKey();
|
|
|
|
besthost = entry.getKey();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
entry.getValue().put(n); // put entry back
|
|
|
|
entry.getValue().put(n); // put entry back
|
|
|
|
continue;
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
this.top.add(n);
|
|
|
|
this.top.add(n);
|
|
|
|
if (entry.getValue().isEmpty()) i.remove();
|
|
|
|
if (entry.getValue().isEmpty()) i.remove();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// if we could not find any entry, then take the best we have seen so far
|
|
|
|
// if we could not find any entry, then take the best we have seen so far
|
|
|
|
if (acceptonebest && !this.top.isEmpty() && besturlhash != null) {
|
|
|
|
if (acceptonebest && !this.top.isEmpty() && besturlhash != null) {
|
|
|
|
removeHashFromDomainStacks(besthost, besturlhash);
|
|
|
|
removeHashFromDomainStacks(besthost, besturlhash);
|
|
|
|
this.top.add(besturlhash);
|
|
|
|
this.top.add(besturlhash);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
@ -525,21 +531,21 @@ public class Balancer {
|
|
|
|
this.domStackInitSize = this.domainStacks.size();
|
|
|
|
this.domStackInitSize = this.domainStacks.size();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
public ArrayList<Request> top(int count) {
|
|
|
|
public List<Request> top(int count) {
|
|
|
|
final ArrayList<Request> cel = new ArrayList<Request>();
|
|
|
|
final List<Request> cel = new ArrayList<Request>();
|
|
|
|
if (count == 0) return cel;
|
|
|
|
if (count == 0) return cel;
|
|
|
|
byte[][] ta = new byte[Math.min(count, top.size())][];
|
|
|
|
byte[][] ta = new byte[Math.min(count, top.size())][];
|
|
|
|
ta = top.toArray(ta);
|
|
|
|
ta = top.toArray(ta);
|
|
|
|
for (byte[] n: ta) {
|
|
|
|
for (final byte[] n: ta) {
|
|
|
|
if (n == null) break;
|
|
|
|
if (n == null) break;
|
|
|
|
try {
|
|
|
|
try {
|
|
|
|
final Row.Entry rowEntry = urlFileIndex.get(n);
|
|
|
|
final Row.Entry rowEntry = urlFileIndex.get(n);
|
|
|
|
if (rowEntry == null) continue;
|
|
|
|
if (rowEntry == null) continue;
|
|
|
|
final Request crawlEntry = new Request(rowEntry);
|
|
|
|
final Request crawlEntry = new Request(rowEntry);
|
|
|
|
cel.add(crawlEntry);
|
|
|
|
cel.add(crawlEntry);
|
|
|
|
count--;
|
|
|
|
count--;
|
|
|
|
if (count <= 0) break;
|
|
|
|
if (count <= 0) break;
|
|
|
|
} catch (IOException e) {}
|
|
|
|
} catch (IOException e) {}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
int depth = 0;
|
|
|
|
int depth = 0;
|
|
|
@ -565,7 +571,7 @@ public class Balancer {
|
|
|
|
|
|
|
|
|
|
|
|
if (cel.size() < count) try {
|
|
|
|
if (cel.size() < count) try {
|
|
|
|
List<Row.Entry> list = urlFileIndex.top(count - cel.size());
|
|
|
|
List<Row.Entry> list = urlFileIndex.top(count - cel.size());
|
|
|
|
for (Row.Entry entry: list) cel.add(new Request(entry));
|
|
|
|
for (final Row.Entry entry: list) cel.add(new Request(entry));
|
|
|
|
} catch (IOException e) { }
|
|
|
|
} catch (IOException e) { }
|
|
|
|
return cel;
|
|
|
|
return cel;
|
|
|
|
}
|
|
|
|
}
|
|
|
|