From dcdea2d02f815bcece1713cd2ef8c56c39f889db Mon Sep 17 00:00:00 2001 From: luccioman Date: Thu, 29 Sep 2016 10:33:11 +0200 Subject: [PATCH] Fixed shutdown for crawler.MaxActiveThreads value greater than 200 Shutdown was hanging in CrawlQueues.close() at this.workerQueue.put(POISON_REQUEST) when config value crawler.MaxActiveThreads was greater than 200. Revealed by "Collision" Threads dumps in mantis 689 (http://mantis.tokeek.de/view.php?id=689#c1312) Fixed consistency between this.worker.length and this.workerQueue capacity, and made the process more reliable using non-blocking offer() function. --- source/net/yacy/crawler/data/CrawlQueues.java | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/source/net/yacy/crawler/data/CrawlQueues.java b/source/net/yacy/crawler/data/CrawlQueues.java index 025b5b112..35c244df1 100644 --- a/source/net/yacy/crawler/data/CrawlQueues.java +++ b/source/net/yacy/crawler/data/CrawlQueues.java @@ -88,7 +88,9 @@ public class CrawlQueues { this.sb = sb; final int maxWorkers = (int) sb.getConfigLong(SwitchboardConstants.CRAWLER_THREADS_ACTIVE_MAX, 10); this.worker = new Loader[maxWorkers]; - this.workerQueue = new ArrayBlockingQueue(200); + /* We initialize workerQueue with the same capacity as worker array, because this same queue + * will be used to send POISON_REQUEST items consumed by all eventually running workers in the close() function*/ + this.workerQueue = new ArrayBlockingQueue(maxWorkers); this.remoteCrawlProviderHashes = null; // start crawling management @@ -128,11 +130,19 @@ public class CrawlQueues { } public synchronized void close() { + /* We close first the noticeURL because it is used to fill the workerQueue.*/ + this.noticeURL.close(); // removed pending requests this.workerQueue.clear(); // wait for all workers to finish - for (int i = 0; i < this.worker.length; i++) { - try {this.workerQueue.put(POISON_REQUEST);} catch (InterruptedException e) {} + for (int i = 0; i < this.workerQueue.remainingCapacity(); i++) { + /* We use the non-blocking offer() function instead of the blocking put() in the unlikely eventual case another thread + * added an element to workerQueue during this loop.*/ + try { + this.workerQueue.offer(POISON_REQUEST, 1, TimeUnit.SECONDS); + } catch (InterruptedException e) { + CrawlQueues.log.warn("Interrupted while adding POISON_REQUEST to the workerQueue"); + } } for (final Loader w: this.worker) { if (w != null && w.isAlive()) { @@ -140,11 +150,10 @@ public class CrawlQueues { w.join(1000); if (w.isAlive()) w.interrupt(); } catch (final InterruptedException e) { - ConcurrentLog.logException(e); + CrawlQueues.log.warn("Interrupted while waiting for worker termination."); } } } - this.noticeURL.close(); if (this.delegatedURL != null) this.delegatedURL.clear(); }