|
|
|
@ -55,9 +55,10 @@ public class Balancer {
|
|
|
|
|
|
|
|
|
|
// class variables
|
|
|
|
|
private final ConcurrentHashMap<String, HandleSet> domainStacks; // a map from host name to lists with url hashs
|
|
|
|
|
private final ConcurrentLinkedQueue<byte[]> top;
|
|
|
|
|
private final ConcurrentLinkedQueue<byte[]> top; // a list of url-hashes that shall be taken next
|
|
|
|
|
private final TreeMap<Long, byte[]> delayed;
|
|
|
|
|
private final HandleSet ddc;
|
|
|
|
|
private final HandleSet double_push_check; // for debugging
|
|
|
|
|
private BufferedObjectIndex urlFileIndex;
|
|
|
|
|
private final File cacheStacksPath;
|
|
|
|
|
private long minimumLocalDelta;
|
|
|
|
@ -80,6 +81,7 @@ public class Balancer {
|
|
|
|
|
this.minimumGlobalDelta = minimumGlobalDelta;
|
|
|
|
|
this.domStackInitSize = Integer.MAX_VALUE;
|
|
|
|
|
this.ddc = new HandleSet(URIMetadataRow.rowdef.primaryKeyLength, URIMetadataRow.rowdef.objectOrder, 0);
|
|
|
|
|
this.double_push_check = new HandleSet(URIMetadataRow.rowdef.primaryKeyLength, URIMetadataRow.rowdef.objectOrder, 0);
|
|
|
|
|
|
|
|
|
|
// create a stack for newly entered entries
|
|
|
|
|
if (!(cachePath.exists())) cachePath.mkdir(); // make the path
|
|
|
|
@ -245,7 +247,13 @@ public class Balancer {
|
|
|
|
|
assert entry != null;
|
|
|
|
|
final byte[] hash = entry.url().hash();
|
|
|
|
|
synchronized (this) {
|
|
|
|
|
if (this.urlFileIndex.has(hash) || this.ddc.has(hash)) return;
|
|
|
|
|
// double-check
|
|
|
|
|
if (this.double_push_check.has(hash) || this.ddc.has(hash) || this.urlFileIndex.has(hash)) {
|
|
|
|
|
//Log.logSevere("Balancer", "double push: " + new String(hash));
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
if (this.double_push_check.size() > 10000) this.double_push_check.clear();
|
|
|
|
|
this.double_push_check.put(hash);
|
|
|
|
|
|
|
|
|
|
// add to index
|
|
|
|
|
final int s = this.urlFileIndex.size();
|
|
|
|
@ -355,7 +363,7 @@ public class Balancer {
|
|
|
|
|
//final int s = urlFileIndex.size();
|
|
|
|
|
Row.Entry rowEntry = (nexthash == null) ? null : urlFileIndex.remove(nexthash);
|
|
|
|
|
if (rowEntry == null) {
|
|
|
|
|
//System.out.println("*** rowEntry=null, nexthash=" + nexthash);
|
|
|
|
|
System.out.println("*** rowEntry=null, nexthash=" + nexthash);
|
|
|
|
|
rowEntry = urlFileIndex.removeOne();
|
|
|
|
|
if (rowEntry == null) {
|
|
|
|
|
nexthash = null;
|
|
|
|
@ -496,6 +504,7 @@ public class Balancer {
|
|
|
|
|
private void fillDomainStacks() throws IOException {
|
|
|
|
|
if (!this.domainStacks.isEmpty() && System.currentTimeMillis() - lastDomainStackFill < 120000L) return;
|
|
|
|
|
this.domainStacks.clear();
|
|
|
|
|
this.top.clear();
|
|
|
|
|
this.lastDomainStackFill = System.currentTimeMillis();
|
|
|
|
|
final HandleSet handles = this.urlFileIndex.keysFromBuffer(objectIndexBufferSize / 2);
|
|
|
|
|
final CloneableIterator<byte[]> i = handles.keys(true, null);
|
|
|
|
|