|
|
|
@ -36,6 +36,7 @@ import java.util.Iterator;
|
|
|
|
|
import java.util.Map;
|
|
|
|
|
import java.util.concurrent.ArrayBlockingQueue;
|
|
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
|
|
|
|
import net.yacy.cora.document.encoding.ASCII;
|
|
|
|
|
import net.yacy.cora.document.encoding.UTF8;
|
|
|
|
@ -102,6 +103,8 @@ public class CrawlQueues {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public synchronized void close() {
|
|
|
|
|
// removed pending requests
|
|
|
|
|
this.workerQueue.clear();
|
|
|
|
|
// wait for all workers to finish
|
|
|
|
|
for (int i = 0; i < this.worker.length; i++) {
|
|
|
|
|
try {this.workerQueue.put(POISON_REQUEST);} catch (InterruptedException e) {}
|
|
|
|
@ -124,7 +127,6 @@ public class CrawlQueues {
|
|
|
|
|
// wait for all workers to finish
|
|
|
|
|
this.workerQueue.clear();
|
|
|
|
|
for (final Loader w: this.worker) w.interrupt();
|
|
|
|
|
for (final Loader w: this.worker) try {w.join(10);} catch (final InterruptedException e1) {}
|
|
|
|
|
this.remoteCrawlProviderHashes.clear();
|
|
|
|
|
this.noticeURL.clear();
|
|
|
|
|
this.delegatedURL.clear();
|
|
|
|
@ -615,7 +617,8 @@ public class CrawlQueues {
|
|
|
|
|
public void run() {
|
|
|
|
|
this.setPriority(Thread.MIN_PRIORITY); // http requests from the crawler should not cause that other functions work worse
|
|
|
|
|
try {
|
|
|
|
|
while ((request = CrawlQueues.this.workerQueue.take()) != POISON_REQUEST) {
|
|
|
|
|
while ((request = CrawlQueues.this.workerQueue.poll(10, TimeUnit.SECONDS)) != POISON_REQUEST) {
|
|
|
|
|
if (request == null) break; // we run this only for a specific time and then let the process die to clear up resources
|
|
|
|
|
request.setStatus("worker-initialized", WorkflowJob.STATUS_INITIATED);
|
|
|
|
|
this.setName("CrawlQueues.Loader(" + request.url() + ")");
|
|
|
|
|
CrawlProfile profile = CrawlQueues.this.sb.crawler.get(UTF8.getBytes(request.profileHandle()));
|
|
|
|
@ -672,7 +675,9 @@ public class CrawlQueues {
|
|
|
|
|
request.setStatus("worker-exception", WorkflowJob.STATUS_FINISHED);
|
|
|
|
|
} finally {
|
|
|
|
|
request = null;
|
|
|
|
|
this.setName("CrawlQueues.Loader(WAITING)");
|
|
|
|
|
}
|
|
|
|
|
profile = null;
|
|
|
|
|
}
|
|
|
|
|
} catch (InterruptedException e2) {
|
|
|
|
|
ConcurrentLog.logException(e2);
|
|
|
|
|