|
|
|
@ -71,10 +71,31 @@ public class HostBalancer implements Balancer {
|
|
|
|
|
private final Set<String> roundRobinHostHashes;
|
|
|
|
|
private final int onDemandLimit;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Create a new instance and asynchronously fills the queue by scanning the hostsPath directory.
|
|
|
|
|
* @param hostsPath path with persisted hosts queues
|
|
|
|
|
* @param onDemandLimit
|
|
|
|
|
* @param exceed134217727
|
|
|
|
|
*/
|
|
|
|
|
public HostBalancer(
|
|
|
|
|
final File hostsPath,
|
|
|
|
|
final int onDemandLimit,
|
|
|
|
|
final boolean exceed134217727) {
|
|
|
|
|
this(hostsPath, onDemandLimit, exceed134217727, true);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Create a new instance and fills the queue by scanning the hostsPath directory.
|
|
|
|
|
* @param hostsPath
|
|
|
|
|
* @param onDemandLimit
|
|
|
|
|
* @param exceed134217727
|
|
|
|
|
* @param asyncInit when true, queue filling from file system is launched asynchronously
|
|
|
|
|
*/
|
|
|
|
|
public HostBalancer(
|
|
|
|
|
final File hostsPath,
|
|
|
|
|
final int onDemandLimit,
|
|
|
|
|
final boolean exceed134217727,
|
|
|
|
|
final boolean asyncInit) {
|
|
|
|
|
this.hostsPath = hostsPath;
|
|
|
|
|
this.onDemandLimit = onDemandLimit;
|
|
|
|
|
this.exceed134217727 = exceed134217727;
|
|
|
|
@ -83,38 +104,50 @@ public class HostBalancer implements Balancer {
|
|
|
|
|
if (!(hostsPath.exists())) hostsPath.mkdirs(); // make the path
|
|
|
|
|
this.queues = new ConcurrentHashMap<String, HostQueue>();
|
|
|
|
|
this.roundRobinHostHashes = new HashSet<String>();
|
|
|
|
|
init(); // return without wait but starts a thread to fill the queues
|
|
|
|
|
init(asyncInit); // return without wait but starts a thread to fill the queues
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* fills the queue by scanning the hostsPath directory in a thread to
|
|
|
|
|
* Fills the queue by scanning the hostsPath directory.
|
|
|
|
|
* @param async when true, launch in a dedicated thread to
|
|
|
|
|
* return immediately (as large unfinished crawls may take longer to load)
|
|
|
|
|
*/
|
|
|
|
|
private void init() {
|
|
|
|
|
Thread t = new Thread("HostBalancer.init") {
|
|
|
|
|
@Override
|
|
|
|
|
public void run() {
|
|
|
|
|
final String[] hostlist = hostsPath.list();
|
|
|
|
|
for (String hoststr : hostlist) {
|
|
|
|
|
try {
|
|
|
|
|
File queuePath = new File(hostsPath, hoststr);
|
|
|
|
|
HostQueue queue = new HostQueue(queuePath, queues.size() > onDemandLimit, exceed134217727);
|
|
|
|
|
if (queue.isEmpty()) {
|
|
|
|
|
queue.close();
|
|
|
|
|
FileUtils.deletedelete(queuePath);
|
|
|
|
|
} else {
|
|
|
|
|
queues.put(queue.getHostHash(), queue);
|
|
|
|
|
}
|
|
|
|
|
} catch (MalformedURLException | RuntimeException e) {
|
|
|
|
|
log.warn("delete queue due to init error for " + hostsPath.getName() + " host=" + hoststr + " " + e.getLocalizedMessage());
|
|
|
|
|
// if exception thrown we can't init the queue, maybe due to name violation. That won't get better, delete it.
|
|
|
|
|
FileUtils.deletedelete(new File(hostsPath, hoststr));
|
|
|
|
|
}
|
|
|
|
|
private void init(final boolean async) {
|
|
|
|
|
if(async) {
|
|
|
|
|
Thread t = new Thread("HostBalancer.init") {
|
|
|
|
|
@Override
|
|
|
|
|
public void run() {
|
|
|
|
|
runInit();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
t.start();
|
|
|
|
|
} else {
|
|
|
|
|
runInit();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
t.start();
|
|
|
|
|
/**
|
|
|
|
|
* Fills the queue by scanning the hostsPath directory.
|
|
|
|
|
*/
|
|
|
|
|
private void runInit() {
|
|
|
|
|
final String[] hostlist = hostsPath.list();
|
|
|
|
|
for (String hoststr : hostlist) {
|
|
|
|
|
try {
|
|
|
|
|
File queuePath = new File(hostsPath, hoststr);
|
|
|
|
|
HostQueue queue = new HostQueue(queuePath, queues.size() > onDemandLimit, exceed134217727);
|
|
|
|
|
if (queue.isEmpty()) {
|
|
|
|
|
queue.close();
|
|
|
|
|
FileUtils.deletedelete(queuePath);
|
|
|
|
|
} else {
|
|
|
|
|
queues.put(queue.getHostHash(), queue);
|
|
|
|
|
}
|
|
|
|
|
} catch (MalformedURLException | RuntimeException e) {
|
|
|
|
|
log.warn("delete queue due to init error for " + hostsPath.getName() + " host=" + hoststr + " " + e.getLocalizedMessage());
|
|
|
|
|
// if exception thrown we can't init the queue, maybe due to name violation. That won't get better, delete it.
|
|
|
|
|
FileUtils.deletedelete(new File(hostsPath, hoststr));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@ -196,6 +229,11 @@ public class HostBalancer implements Balancer {
|
|
|
|
|
return c;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* @return true when the URL is queued is this or any other HostBalancer
|
|
|
|
|
* instance (as {@link #depthCache} is shared between all HostBalancer
|
|
|
|
|
* instances)
|
|
|
|
|
*/
|
|
|
|
|
@Override
|
|
|
|
|
public boolean has(final byte[] urlhashb) {
|
|
|
|
|
if (depthCache.has(urlhashb)) return true;
|
|
|
|
|