Fixed shutdown for crawler.MaxActiveThreads value greater than 200

Shutdown was hanging in CrawlQueues.close() at
this.workerQueue.put(POISON_REQUEST) when config value
crawler.MaxActiveThreads was greater than 200.

Revealed by "Collision" Threads dumps in mantis 689
(http://mantis.tokeek.de/view.php?id=689#c1312)

Fixed consistency between this.worker.length and this.workerQueue
capacity, and made the process more reliable using non-blocking offer()
function.
pull/77/head
luccioman 8 years ago
parent ada473ced2
commit dcdea2d02f

@ -88,7 +88,9 @@ public class CrawlQueues {
this.sb = sb;
final int maxWorkers = (int) sb.getConfigLong(SwitchboardConstants.CRAWLER_THREADS_ACTIVE_MAX, 10);
this.worker = new Loader[maxWorkers];
this.workerQueue = new ArrayBlockingQueue<Request>(200);
/* We initialize workerQueue with the same capacity as worker array, because this same queue
* will be used to send POISON_REQUEST items consumed by all eventually running workers in the close() function*/
this.workerQueue = new ArrayBlockingQueue<Request>(maxWorkers);
this.remoteCrawlProviderHashes = null;
// start crawling management
@ -128,11 +130,19 @@ public class CrawlQueues {
}
public synchronized void close() {
/* We close first the noticeURL because it is used to fill the workerQueue.*/
this.noticeURL.close();
// removed pending requests
this.workerQueue.clear();
// wait for all workers to finish
for (int i = 0; i < this.worker.length; i++) {
try {this.workerQueue.put(POISON_REQUEST);} catch (InterruptedException e) {}
for (int i = 0; i < this.workerQueue.remainingCapacity(); i++) {
/* We use the non-blocking offer() function instead of the blocking put() in the unlikely eventual case another thread
* added an element to workerQueue during this loop.*/
try {
this.workerQueue.offer(POISON_REQUEST, 1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
CrawlQueues.log.warn("Interrupted while adding POISON_REQUEST to the workerQueue");
}
}
for (final Loader w: this.worker) {
if (w != null && w.isAlive()) {
@ -140,11 +150,10 @@ public class CrawlQueues {
w.join(1000);
if (w.isAlive()) w.interrupt();
} catch (final InterruptedException e) {
ConcurrentLog.logException(e);
CrawlQueues.log.warn("Interrupted while waiting for worker termination.");
}
}
}
this.noticeURL.close();
if (this.delegatedURL != null) this.delegatedURL.clear();
}

Loading…
Cancel
Save