|
|
|
@ -77,7 +77,7 @@ public class Balancer {
|
|
|
|
|
// create a stack for newly entered entries
|
|
|
|
|
if (!(cachePath.exists())) cachePath.mkdir(); // make the path
|
|
|
|
|
cacheStacksPath.mkdirs();
|
|
|
|
|
File f = new File(cacheStacksPath, stackname + indexSuffix);
|
|
|
|
|
final File f = new File(cacheStacksPath, stackname + indexSuffix);
|
|
|
|
|
try {
|
|
|
|
|
urlFileIndex = new Table(f, Request.rowdef, EcoFSBufferSize, 0, useTailCache, exceed134217727);
|
|
|
|
|
} catch (RowSpaceExceededException e) {
|
|
|
|
@ -177,7 +177,7 @@ public class Balancer {
|
|
|
|
|
assert urlFileIndex.size() + removedCounter == s : "urlFileIndex.size() = " + urlFileIndex.size() + ", s = " + s;
|
|
|
|
|
|
|
|
|
|
// iterate through the top list
|
|
|
|
|
Iterator<String> j = top.iterator();
|
|
|
|
|
final Iterator<String> j = top.iterator();
|
|
|
|
|
String urlhash;
|
|
|
|
|
while (j.hasNext()) {
|
|
|
|
|
urlhash = j.next();
|
|
|
|
@ -186,7 +186,7 @@ public class Balancer {
|
|
|
|
|
|
|
|
|
|
// remove from delayed
|
|
|
|
|
synchronized (this.delayed) {
|
|
|
|
|
Iterator<Map.Entry<Long, String>> k = this.delayed.entrySet().iterator();
|
|
|
|
|
final Iterator<Map.Entry<Long, String>> k = this.delayed.entrySet().iterator();
|
|
|
|
|
while (k.hasNext()) {
|
|
|
|
|
if (urlHashes.contains(k.next().getValue())) k.remove();
|
|
|
|
|
}
|
|
|
|
@ -199,7 +199,7 @@ public class Balancer {
|
|
|
|
|
while (q.hasNext()) {
|
|
|
|
|
se = q.next();
|
|
|
|
|
stack = se.getValue();
|
|
|
|
|
Iterator<String> i = stack.iterator();
|
|
|
|
|
final Iterator<String> i = stack.iterator();
|
|
|
|
|
while (i.hasNext()) {
|
|
|
|
|
if (urlHashes.contains(i.next())) i.remove();
|
|
|
|
|
}
|
|
|
|
@ -240,7 +240,7 @@ public class Balancer {
|
|
|
|
|
|
|
|
|
|
public void push(final Request entry) throws IOException, RowSpaceExceededException {
|
|
|
|
|
assert entry != null;
|
|
|
|
|
String hash = entry.url().hash();
|
|
|
|
|
final String hash = entry.url().hash();
|
|
|
|
|
synchronized (this) {
|
|
|
|
|
if (urlFileIndex.has(hash.getBytes())) {
|
|
|
|
|
//Log.logWarning("BALANCER", "double-check has failed for urlhash " + entry.url().hash() + " in " + stackname + " - fixed");
|
|
|
|
@ -248,7 +248,7 @@ public class Balancer {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// add to index
|
|
|
|
|
int s = urlFileIndex.size();
|
|
|
|
|
final int s = urlFileIndex.size();
|
|
|
|
|
urlFileIndex.put(entry.toRow());
|
|
|
|
|
assert s < urlFileIndex.size() : "hash = " + hash;
|
|
|
|
|
assert urlFileIndex.has(hash.getBytes()) : "hash = " + hash;
|
|
|
|
@ -258,7 +258,7 @@ public class Balancer {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void pushHashToDomainStacks(final String hash, int maxstacksize) {
|
|
|
|
|
private void pushHashToDomainStacks(final String hash, final int maxstacksize) {
|
|
|
|
|
// extend domain stack
|
|
|
|
|
final String dom = hash.substring(6);
|
|
|
|
|
LinkedList<String> domainList = domainStacks.get(dom);
|
|
|
|
@ -276,9 +276,9 @@ public class Balancer {
|
|
|
|
|
private void removeHashFromDomainStacks(final String hash) {
|
|
|
|
|
// extend domain stack
|
|
|
|
|
final String dom = hash.substring(6);
|
|
|
|
|
LinkedList<String> domainList = domainStacks.get(dom);
|
|
|
|
|
final LinkedList<String> domainList = domainStacks.get(dom);
|
|
|
|
|
if (domainList == null) return;
|
|
|
|
|
Iterator<String> i = domainList.iterator();
|
|
|
|
|
final Iterator<String> i = domainList.iterator();
|
|
|
|
|
while (i.hasNext()) {
|
|
|
|
|
if (i.next().equals(hash)) {
|
|
|
|
|
i.remove();
|
|
|
|
@ -289,7 +289,7 @@ public class Balancer {
|
|
|
|
|
|
|
|
|
|
private String nextFromDelayed() {
|
|
|
|
|
if (this.delayed.isEmpty()) return null;
|
|
|
|
|
Long first = this.delayed.firstKey();
|
|
|
|
|
final Long first = this.delayed.firstKey();
|
|
|
|
|
if (first.longValue() < System.currentTimeMillis()) {
|
|
|
|
|
return this.delayed.remove(first);
|
|
|
|
|
}
|
|
|
|
@ -298,7 +298,7 @@ public class Balancer {
|
|
|
|
|
|
|
|
|
|
private String anyFromDelayed() {
|
|
|
|
|
if (this.delayed.isEmpty()) return null;
|
|
|
|
|
Long first = this.delayed.firstKey();
|
|
|
|
|
final Long first = this.delayed.firstKey();
|
|
|
|
|
return this.delayed.remove(first);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -376,7 +376,7 @@ public class Balancer {
|
|
|
|
|
|
|
|
|
|
// at this point we must check if the crawlEntry has relevancy because the crawl profile still exists
|
|
|
|
|
// if not: return null. A calling method must handle the null value and try again
|
|
|
|
|
CrawlProfile.entry profileEntry = (profile == null) ? null : profile.getEntry(crawlEntry.profileHandle());
|
|
|
|
|
final CrawlProfile.entry profileEntry = (profile == null) ? null : profile.getEntry(crawlEntry.profileHandle());
|
|
|
|
|
if (profileEntry == null) {
|
|
|
|
|
Log.logWarning("Balancer", "no profile entry for handle " + crawlEntry.profileHandle());
|
|
|
|
|
return null;
|
|
|
|
@ -435,7 +435,7 @@ public class Balancer {
|
|
|
|
|
return crawlEntry;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void filltop(boolean delay, long maximumwaiting, boolean acceptonebest) {
|
|
|
|
|
private void filltop(final boolean delay, final long maximumwaiting, final boolean acceptonebest) {
|
|
|
|
|
if (!this.top.isEmpty()) return;
|
|
|
|
|
|
|
|
|
|
//System.out.println("*** DEBUG started filltop delay=" + ((delay) ? "true":"false") + ", maximumwaiting=" + maximumwaiting + ", acceptonebest=" + ((acceptonebest) ? "true":"false"));
|
|
|
|
@ -448,7 +448,7 @@ public class Balancer {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// iterate over the domain stacks
|
|
|
|
|
Iterator<Map.Entry<String, LinkedList<String>>> i = this.domainStacks.entrySet().iterator();
|
|
|
|
|
final Iterator<Map.Entry<String, LinkedList<String>>> i = this.domainStacks.entrySet().iterator();
|
|
|
|
|
Map.Entry<String, LinkedList<String>> entry;
|
|
|
|
|
long smallestWaiting = Long.MAX_VALUE;
|
|
|
|
|
String besthash = null;
|
|
|
|
@ -463,7 +463,7 @@ public class Balancer {
|
|
|
|
|
|
|
|
|
|
String n = entry.getValue().getFirst();
|
|
|
|
|
if (delay) {
|
|
|
|
|
long w = Latency.waitingRemainingGuessed(n, minimumLocalDelta, minimumGlobalDelta);
|
|
|
|
|
final long w = Latency.waitingRemainingGuessed(n, minimumLocalDelta, minimumGlobalDelta);
|
|
|
|
|
if (w > maximumwaiting) {
|
|
|
|
|
if (w < smallestWaiting) {
|
|
|
|
|
smallestWaiting = w;
|
|
|
|
@ -485,12 +485,12 @@ public class Balancer {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void fillDomainStacks(int maxdomstacksize) throws IOException {
|
|
|
|
|
private void fillDomainStacks(final int maxdomstacksize) throws IOException {
|
|
|
|
|
if (!this.domainStacks.isEmpty() && System.currentTimeMillis() - lastDomainStackFill < 120000L) return;
|
|
|
|
|
this.domainStacks.clear();
|
|
|
|
|
//synchronized (this.delayed) { delayed.clear(); }
|
|
|
|
|
this.lastDomainStackFill = System.currentTimeMillis();
|
|
|
|
|
CloneableIterator<byte[]> i = this.urlFileIndex.keys(true, null);
|
|
|
|
|
final CloneableIterator<byte[]> i = this.urlFileIndex.keys(true, null);
|
|
|
|
|
while (i.hasNext()) {
|
|
|
|
|
pushHashToDomainStacks(new String(i.next()), 50);
|
|
|
|
|
if (this.domainStacks.size() > maxdomstacksize) break;
|
|
|
|
@ -501,12 +501,12 @@ public class Balancer {
|
|
|
|
|
|
|
|
|
|
public ArrayList<Request> top(int count) {
|
|
|
|
|
count = Math.min(count, top.size());
|
|
|
|
|
ArrayList<Request> cel = new ArrayList<Request>();
|
|
|
|
|
final ArrayList<Request> cel = new ArrayList<Request>();
|
|
|
|
|
if (count == 0) return cel;
|
|
|
|
|
synchronized (this) {
|
|
|
|
|
for (String n: top) {
|
|
|
|
|
try {
|
|
|
|
|
Row.Entry rowEntry = urlFileIndex.get(n.getBytes());
|
|
|
|
|
final Row.Entry rowEntry = urlFileIndex.get(n.getBytes());
|
|
|
|
|
if (rowEntry == null) continue;
|
|
|
|
|
final Request crawlEntry = new Request(rowEntry);
|
|
|
|
|
cel.add(crawlEntry);
|
|
|
|
|