From 2bc6199408b00eb367409935fcedd0c3230aa70d Mon Sep 17 00:00:00 2001 From: orbiter Date: Thu, 30 Oct 2014 21:52:52 +0100 Subject: [PATCH] more concurrency for postprocessing --- .../solr/connector/AbstractSolrConnector.java | 54 +++++++++---------- 1 file changed, 27 insertions(+), 27 deletions(-) diff --git a/source/net/yacy/cora/federate/solr/connector/AbstractSolrConnector.java b/source/net/yacy/cora/federate/solr/connector/AbstractSolrConnector.java index 30f3aaa28..bb0cc98a3 100644 --- a/source/net/yacy/cora/federate/solr/connector/AbstractSolrConnector.java +++ b/source/net/yacy/cora/federate/solr/connector/AbstractSolrConnector.java @@ -163,38 +163,38 @@ public abstract class AbstractSolrConnector implements SolrConnector { final String ... fields) { assert buffersize > 0; if (!prefetchIDs) return concurrentDocumentsByQueryNoPrefetch(querystring, sort, offset, maxcount, maxtime, buffersize, concurrency, fields); - final BlockingQueue idQueue = concurrentIDsByQuery(querystring, sort, offset, maxcount, maxtime, Math.min(maxcount, 10000000), 1); - final BlockingQueue queue = buffersize <= 0 ? new LinkedBlockingQueue() : new ArrayBlockingQueue(buffersize); + final BlockingQueue idQueue = concurrentIDsByQuery(querystring, sort, offset, maxcount, maxtime, Math.min(maxcount, 10000000), concurrency); + final BlockingQueue queue = buffersize <= 0 ? new LinkedBlockingQueue() : new ArrayBlockingQueue(Math.max(buffersize, concurrency)); final long endtime = maxtime == Long.MAX_VALUE ? Long.MAX_VALUE : System.currentTimeMillis() + maxtime; // we know infinity! - final Thread t = new Thread() { - @Override - public void run() { - this.setName("AbstractSolrConnector:concurrentDocumentsByQueryWithPrefetch(" + querystring + ")"); - String nextID; - try { - while (System.currentTimeMillis() < endtime && (nextID = idQueue.take()) != AbstractSolrConnector.POISON_ID) { - - try { - SolrDocument d = getDocumentById(nextID, fields); - // document may be null if another process has deleted the document meanwhile - // in case that the document is absent then, we silently ignore that case - if (d != null) try {queue.put(d);} catch (final InterruptedException e) {} - } catch (final SolrException | IOException e) { - ConcurrentLog.logException(e); - // fail - ConcurrentLog.severe("AbstractSolrConnector", "aborted concurrentDocumentsByQuery: " + e.getMessage()); - break; + final Thread[] t = new Thread[concurrency]; + for (int i = 0; i < concurrency; i++) { + t[i] = new Thread() { + @Override + public void run() { + this.setName("AbstractSolrConnector:concurrentDocumentsByQueryWithPrefetch(" + querystring + ")"); + String nextID; + try { + while (System.currentTimeMillis() < endtime && (nextID = idQueue.take()) != AbstractSolrConnector.POISON_ID) { + try { + SolrDocument d = getDocumentById(nextID, fields); + // document may be null if another process has deleted the document meanwhile + // in case that the document is absent then, we silently ignore that case + if (d != null) try {queue.put(d);} catch (final InterruptedException e) {} + } catch (final SolrException | IOException e) { + ConcurrentLog.logException(e); + // fail + ConcurrentLog.severe("AbstractSolrConnector", "aborted concurrentDocumentsByQuery: " + e.getMessage()); + break; + } } + } catch (InterruptedException e) { + ConcurrentLog.severe("AbstractSolrConnector", "interrupted concurrentDocumentsByQuery: " + e.getMessage()); } - } catch (InterruptedException e) { - ConcurrentLog.severe("AbstractSolrConnector", "interrupted concurrentDocumentsByQuery: " + e.getMessage()); - } - for (int i = 0; i < Math.max(1, concurrency); i++) { try {queue.put(AbstractSolrConnector.POISON_DOCUMENT);} catch (final InterruptedException e1) {} } - } - }; - t.start(); + }; + t[i].start(); + } return queue; }