|
|
|
@ -52,7 +52,7 @@ public class Balancer {
|
|
|
|
|
private static final int objectIndexBufferSize = 1000;
|
|
|
|
|
|
|
|
|
|
// class variables
|
|
|
|
|
private final ConcurrentHashMap<String, LinkedList<byte[]>> domainStacks; // a map from domain name hash part to Lists with url hashs
|
|
|
|
|
private final ConcurrentHashMap<String, LinkedList<byte[]>> domainStacks; // a map from host name to lists with url hashs
|
|
|
|
|
private final ConcurrentLinkedQueue<byte[]> top;
|
|
|
|
|
private final TreeMap<Long, byte[]> delayed;
|
|
|
|
|
private BufferedObjectIndex urlFileIndex;
|
|
|
|
@ -70,7 +70,7 @@ public class Balancer {
|
|
|
|
|
final boolean useTailCache,
|
|
|
|
|
final boolean exceed134217727) {
|
|
|
|
|
this.cacheStacksPath = cachePath;
|
|
|
|
|
this.domainStacks = new ConcurrentHashMap<String, LinkedList<byte[]>>();
|
|
|
|
|
this.domainStacks = new ConcurrentHashMap<String, LinkedList<byte[]>>();
|
|
|
|
|
this.top = new ConcurrentLinkedQueue<byte[]>();
|
|
|
|
|
this.delayed = new TreeMap<Long, byte[]>();
|
|
|
|
|
this.minimumLocalDelta = minimumLocalDelta;
|
|
|
|
@ -233,9 +233,8 @@ public class Balancer {
|
|
|
|
|
private boolean domainStacksNotEmpty() {
|
|
|
|
|
if (domainStacks == null) return false;
|
|
|
|
|
synchronized (domainStacks) {
|
|
|
|
|
final Iterator<LinkedList<byte[]>> i = domainStacks.values().iterator();
|
|
|
|
|
while (i.hasNext()) {
|
|
|
|
|
if (!i.next().isEmpty()) return true;
|
|
|
|
|
for (LinkedList<byte[]> l: domainStacks.values()) {
|
|
|
|
|
if (!l.isEmpty()) return true;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return false;
|
|
|
|
@ -256,33 +255,31 @@ public class Balancer {
|
|
|
|
|
assert urlFileIndex.has(hash) : "hash = " + new String(hash);
|
|
|
|
|
|
|
|
|
|
// add the hash to a queue
|
|
|
|
|
pushHashToDomainStacks(entry.url().hash(), 50);
|
|
|
|
|
pushHashToDomainStacks(entry.url().getHost(), entry.url().hash(), 50);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void pushHashToDomainStacks(final byte[] hash, final int maxstacksize) {
|
|
|
|
|
private void pushHashToDomainStacks(final String host, final byte[] urlhash, final int maxstacksize) {
|
|
|
|
|
// extend domain stack
|
|
|
|
|
final String dom = new String(hash).substring(6);
|
|
|
|
|
LinkedList<byte[]> domainList = domainStacks.get(dom);
|
|
|
|
|
LinkedList<byte[]> domainList = domainStacks.get(host);
|
|
|
|
|
if (domainList == null) {
|
|
|
|
|
// create new list
|
|
|
|
|
domainList = new LinkedList<byte[]>();
|
|
|
|
|
domainList.add(hash);
|
|
|
|
|
domainStacks.put(dom, domainList);
|
|
|
|
|
domainList.add(urlhash);
|
|
|
|
|
domainStacks.put(host, domainList);
|
|
|
|
|
} else {
|
|
|
|
|
// extend existent domain list
|
|
|
|
|
if (domainList.size() < maxstacksize) domainList.addLast(hash);
|
|
|
|
|
if (domainList.size() < maxstacksize) domainList.addLast(urlhash);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void removeHashFromDomainStacks(final byte[] hash) {
|
|
|
|
|
private void removeHashFromDomainStacks(final String host, final byte[] urlhash) {
|
|
|
|
|
// extend domain stack
|
|
|
|
|
final String dom = new String(hash).substring(6);
|
|
|
|
|
final LinkedList<byte[]> domainList = domainStacks.get(dom);
|
|
|
|
|
final LinkedList<byte[]> domainList = domainStacks.get(host);
|
|
|
|
|
if (domainList == null) return;
|
|
|
|
|
final Iterator<byte[]> i = domainList.iterator();
|
|
|
|
|
while (i.hasNext()) {
|
|
|
|
|
if (Base64Order.enhancedCoder.equal(i.next(), hash)) {
|
|
|
|
|
if (Base64Order.enhancedCoder.equal(i.next(), urlhash)) {
|
|
|
|
|
i.remove();
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
@ -403,7 +400,7 @@ public class Balancer {
|
|
|
|
|
}
|
|
|
|
|
try {
|
|
|
|
|
this.urlFileIndex.put(rowEntry);
|
|
|
|
|
this.domainStacks.remove(new String(nexthash).substring(6));
|
|
|
|
|
this.domainStacks.remove(crawlEntry.url().getHost());
|
|
|
|
|
failhash = nexthash;
|
|
|
|
|
} catch (RowSpaceExceededException e) {
|
|
|
|
|
Log.logException(e);
|
|
|
|
@ -452,8 +449,9 @@ public class Balancer {
|
|
|
|
|
// iterate over the domain stacks
|
|
|
|
|
final Iterator<Map.Entry<String, LinkedList<byte[]>>> i = this.domainStacks.entrySet().iterator();
|
|
|
|
|
Map.Entry<String, LinkedList<byte[]>> entry;
|
|
|
|
|
//long smallestWaiting = Long.MAX_VALUE;
|
|
|
|
|
byte[] besthash = null;
|
|
|
|
|
long smallestWaiting = Long.MAX_VALUE;
|
|
|
|
|
byte[] besturlhash = null;
|
|
|
|
|
String besthost = null;
|
|
|
|
|
while (i.hasNext()) {
|
|
|
|
|
entry = i.next();
|
|
|
|
|
|
|
|
|
@ -465,28 +463,28 @@ public class Balancer {
|
|
|
|
|
|
|
|
|
|
byte[] n = entry.getValue().getFirst();
|
|
|
|
|
if (n == null) continue;
|
|
|
|
|
/*
|
|
|
|
|
besthost = entry.getKey();
|
|
|
|
|
if (delay) {
|
|
|
|
|
final long w = Latency.waitingRemainingGuessed(n, minimumLocalDelta, minimumGlobalDelta);
|
|
|
|
|
final long w = Latency.waitingRemainingGuessed(besthost, minimumLocalDelta, minimumGlobalDelta);
|
|
|
|
|
if (w > maximumwaiting) {
|
|
|
|
|
if (w < smallestWaiting) {
|
|
|
|
|
smallestWaiting = w;
|
|
|
|
|
besthash = n;
|
|
|
|
|
besturlhash = n;
|
|
|
|
|
besthost = entry.getKey();
|
|
|
|
|
}
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
//System.out.println("*** accepting " + n + " : " + w);
|
|
|
|
|
}
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
n = entry.getValue().removeFirst();
|
|
|
|
|
this.top.add(n);
|
|
|
|
|
if (entry.getValue().isEmpty()) i.remove();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// if we could not find any entry, then take the best we have seen so far
|
|
|
|
|
if (acceptonebest && !this.top.isEmpty() && besthash != null) {
|
|
|
|
|
removeHashFromDomainStacks(besthash);
|
|
|
|
|
this.top.add(besthash);
|
|
|
|
|
if (acceptonebest && !this.top.isEmpty() && besturlhash != null) {
|
|
|
|
|
removeHashFromDomainStacks(besthost, besturlhash);
|
|
|
|
|
this.top.add(besturlhash);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -497,8 +495,14 @@ public class Balancer {
|
|
|
|
|
this.lastDomainStackFill = System.currentTimeMillis();
|
|
|
|
|
final HandleSet handles = this.urlFileIndex.keysFromBuffer(objectIndexBufferSize / 2);
|
|
|
|
|
final CloneableIterator<byte[]> i = handles.keys(true, null);
|
|
|
|
|
byte[] handle;
|
|
|
|
|
String host;
|
|
|
|
|
Request request;
|
|
|
|
|
while (i.hasNext()) {
|
|
|
|
|
pushHashToDomainStacks(i.next(), 1000);
|
|
|
|
|
handle = i.next();
|
|
|
|
|
request = new Request(this.urlFileIndex.get(handle));
|
|
|
|
|
host = request.url().getHost();
|
|
|
|
|
pushHashToDomainStacks(host, handle, 1000);
|
|
|
|
|
if (this.domainStacks.size() > maxdomstacksize) break;
|
|
|
|
|
}
|
|
|
|
|
Log.logInfo("BALANCER", "re-fill of domain stacks; fileIndex.size() = " + this.urlFileIndex.size() + ", domainStacks.size = " + domainStacks.size() + ", collection time = " + (System.currentTimeMillis() - this.lastDomainStackFill) + " ms");
|
|
|
|
|