From 9a7fe9e0d103fb750d0eec85dccb4d6b50bd9536 Mon Sep 17 00:00:00 2001 From: Michael Peter Christen Date: Fri, 31 Oct 2014 23:17:56 +0100 Subject: [PATCH 1/3] fix for bad timing computation in postprocessing --- source/net/yacy/search/schema/CollectionConfiguration.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/net/yacy/search/schema/CollectionConfiguration.java b/source/net/yacy/search/schema/CollectionConfiguration.java index f849a0337..6cb589df2 100644 --- a/source/net/yacy/search/schema/CollectionConfiguration.java +++ b/source/net/yacy/search/schema/CollectionConfiguration.java @@ -1268,6 +1268,7 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri ReversibleScoreMap partitioning = partitioningFacet.get(partitioningKey); long emptyCount = collectionConnector.getCountByQuery("-" + partitioningKey + ":[* TO *] AND (" + collection1query + ")"); if (emptyCount > 0) partitioning.inc("", (int) emptyCount); + final long start = System.currentTimeMillis(); for (String partitioningValue: partitioning) { String partitioningQuery = (partitioningValue.length() == 0) ? "-" + partitioningKey + ":[* TO *] AND (" + collection1query + ")" : @@ -1275,7 +1276,6 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri postprocessingActivity = "collecting " + partitioning.get(partitioningValue) + " documents from partition \"" + partitioningValue + "\" (averall " + count + ") from the collection for harvestkey " + harvestkey + ", partitioned by " + partitioningKey; // start collection of documents - final long start = System.currentTimeMillis(); final int concurrency = Math.max(1, Math.min((int) (MemoryControl.available() / (100L * 1024L * 1024L)), Runtime.getRuntime().availableProcessors())); //final int concurrency = 1; final boolean reference_computation = this.contains(CollectionSchema.references_i) && From 92007e5d2d39d2778177f1877458a1958cd6c7d2 Mon Sep 17 00:00:00 2001 From: Michael Peter Christen Date: Sun, 2 Nov 2014 12:52:23 +0100 Subject: [PATCH 2/3] more enhancements to posprocessing speed --- .../solr/connector/AbstractSolrConnector.java | 203 +++++++++++++----- .../ConcurrentUpdateSolrConnector.java | 19 ++ .../solr/connector/SolrConnector.java | 55 ++++- .../schema/CollectionConfiguration.java | 49 ++--- 4 files changed, 240 insertions(+), 86 deletions(-) diff --git a/source/net/yacy/cora/federate/solr/connector/AbstractSolrConnector.java b/source/net/yacy/cora/federate/solr/connector/AbstractSolrConnector.java index df7f9ecc5..b9db0e3c7 100644 --- a/source/net/yacy/cora/federate/solr/connector/AbstractSolrConnector.java +++ b/source/net/yacy/cora/federate/solr/connector/AbstractSolrConnector.java @@ -138,7 +138,7 @@ public abstract class AbstractSolrConnector implements SolrConnector { } /** - * Get a query result from solr as a stream of documents. + * Get results from a solr query as a stream of documents. * The result queue is considered as terminated if AbstractSolrConnector.POISON_DOCUMENT is returned. * The method returns immediately and feeds the search results into the queue * @param querystring the solr query string @@ -148,7 +148,9 @@ public abstract class AbstractSolrConnector implements SolrConnector { * @param maxtime the maximum time in milliseconds * @param buffersize the size of an ArrayBlockingQueue; if <= 0 then a LinkedBlockingQueue is used * @param concurrency is the number of AbstractSolrConnector.POISON_DOCUMENT entries to add at the end of the feed - * @return a blocking queue which is terminated with AbstractSolrConnector.POISON_DOCUMENT as last element + * @param prefetchIDs if true, then first all IDs are fetched and then all documents are queries by the ID. If false then documents are retrieved directly + * @param fields list of fields + * @return a blocking queue which is terminated with AbstractSolrConnector.POISON_DOCUMENT as last element */ @Override public BlockingQueue concurrentDocumentsByQuery( @@ -161,17 +163,52 @@ public abstract class AbstractSolrConnector implements SolrConnector { final int concurrency, final boolean prefetchIDs, final String ... fields) { + List querystrings = new ArrayList<>(1); + querystrings.add(querystring); + return concurrentDocumentsByQueries(querystrings, sort, offset, maxcount, maxtime, buffersize, concurrency, prefetchIDs, fields); + } + + /** + * Get results from solr queries as a stream of documents. + * The result queue is considered as terminated if AbstractSolrConnector.POISON_DOCUMENT is returned. + * The method returns immediately and feeds the search results into the queue + * @param querystrings the list of solr query strings + * @param sort the solr sort string, may be null to be not used + * @param offset first result offset + * @param maxcount the maximum number of results + * @param maxtime the maximum time in milliseconds + * @param buffersize the size of an ArrayBlockingQueue; if <= 0 then a LinkedBlockingQueue is used + * @param concurrency is the number of AbstractSolrConnector.POISON_DOCUMENT entries to add at the end of the feed + * @param prefetchIDs if true, then first all IDs are fetched and then all documents are queries by the ID. If false then documents are retrieved directly + * @param fields list of fields + * @return a blocking queue which is terminated with AbstractSolrConnector.POISON_DOCUMENT as last element + */ + @Override + public BlockingQueue concurrentDocumentsByQueries( + final List querystrings, + final String sort, + final int offset, + final int maxcount, + final long maxtime, + final int buffersize, + final int concurrency, + final boolean prefetchIDs, + final String ... fields) { assert buffersize > 0; - if (!prefetchIDs) return concurrentDocumentsByQueryNoPrefetch(querystring, sort, offset, maxcount, maxtime, buffersize, concurrency, fields); - final BlockingQueue idQueue = concurrentIDsByQuery(querystring, sort, offset, maxcount, maxtime, Math.min(maxcount, 10000000), concurrency); + if (!prefetchIDs) return concurrentDocumentsByQueriesNoPrefetch(querystrings, sort, offset, maxcount, maxtime, buffersize, concurrency, fields); final BlockingQueue queue = buffersize <= 0 ? new LinkedBlockingQueue() : new ArrayBlockingQueue(Math.max(buffersize, concurrency)); - final long endtime = maxtime == Long.MAX_VALUE ? Long.MAX_VALUE : System.currentTimeMillis() + maxtime; // we know infinity! + if (querystrings.size() == 0) { + for (int i = 0; i < Math.max(1, concurrency); i++) try {queue.put(AbstractSolrConnector.POISON_DOCUMENT);} catch (final InterruptedException e1) {} + return queue; + } + final BlockingQueue idQueue = concurrentIDsByQueries(querystrings, sort, offset, maxcount, maxtime, Math.min(maxcount, 10000000), concurrency); + final long endtime = maxtime < 0 || maxtime == Long.MAX_VALUE ? Long.MAX_VALUE : System.currentTimeMillis() + maxtime; // we know infinity! final Thread[] t = new Thread[concurrency]; - for (int i = 0; i < concurrency; i++) { + for (int i = 0; i < Math.max(1, concurrency); i++) { t[i] = new Thread() { @Override public void run() { - this.setName("AbstractSolrConnector:concurrentDocumentsByQueryWithPrefetch(" + querystring + ")"); + this.setName("AbstractSolrConnector:concurrentDocumentsByQueriesWithPrefetch(" + querystrings.size() + " queries, first: " + querystrings.iterator().next() + ")"); String nextID; try { while (System.currentTimeMillis() < endtime && (nextID = idQueue.take()) != AbstractSolrConnector.POISON_ID) { @@ -197,9 +234,9 @@ public abstract class AbstractSolrConnector implements SolrConnector { } return queue; } - - private BlockingQueue concurrentDocumentsByQueryNoPrefetch( - final String querystring, + + private BlockingQueue concurrentDocumentsByQueriesNoPrefetch( + final List querystrings, final String sort, final int offset, final int maxcount, @@ -209,43 +246,52 @@ public abstract class AbstractSolrConnector implements SolrConnector { final String ... fields) { assert buffersize > 0; final BlockingQueue queue = buffersize <= 0 ? new LinkedBlockingQueue() : new ArrayBlockingQueue(buffersize); - final long endtime = maxtime == Long.MAX_VALUE ? Long.MAX_VALUE : System.currentTimeMillis() + maxtime; // we know infinity! - final int ps = Math.min(pagesize_docs, buffersize); - final int maxretries = 60; + if (querystrings.size() == 0) { + for (int i = 0; i < Math.max(1, concurrency); i++) try {queue.put(AbstractSolrConnector.POISON_DOCUMENT);} catch (final InterruptedException e1) {} + return queue; + } + final long endtime = maxtime < 0 || maxtime == Long.MAX_VALUE ? Long.MAX_VALUE : System.currentTimeMillis() + maxtime; // we know infinity! + final int ps = buffersize < 0 ? pagesize_docs : Math.min(pagesize_docs, buffersize); + final int maxretries = 6; final Thread t = new Thread() { @Override public void run() { - this.setName("AbstractSolrConnector:concurrentDocumentsByQueryNoPrefetch(" + querystring + ")"); - int o = offset; - int count = 0; - int retry = 0; - loop: while (System.currentTimeMillis() < endtime && count < maxcount) { - try { - SolrDocumentList sdl = getDocumentListByQuery(querystring, sort, o, Math.min(maxcount, ps), fields); - for (SolrDocument d: sdl) { - try {queue.put(d);} catch (final InterruptedException e) {break;} - count++; - } - if (sdl.size() < ps) { - //System.out.println("sdl.size() = " + sdl.size() + ", pagesize = " + pagesize); - break loop; // finished - } - o += sdl.size(); - retry = 0; - } catch (final SolrException | IOException e) { - ConcurrentLog.logException(e); - if (retry++ < maxretries) { - // remote Solr may be temporary down, so we wait a bit - try {Thread.sleep(100);} catch (InterruptedException e1) {} - continue loop; + try { + for (String querystring: querystrings) { + this.setName("AbstractSolrConnector:concurrentDocumentsByQueryNoPrefetch(" + querystring + ")"); + int o = offset; + int count = 0; + int retry = 0; + loop: while (System.currentTimeMillis() < endtime && count < maxcount) { + try { + SolrDocumentList sdl = getDocumentListByQuery(querystring, sort, o, Math.min(maxcount, ps), fields); + for (SolrDocument d: sdl) { + try {queue.put(d);} catch (final InterruptedException e) {break;} + count++; + } + if (sdl.size() < ps) { + //System.out.println("sdl.size() = " + sdl.size() + ", pagesize = " + pagesize); + break loop; // finished + } + o += sdl.size(); + retry = 0; + } catch (final SolrException | IOException e) { + ConcurrentLog.logException(e); + if (retry++ < maxretries) { + // remote Solr may be temporary down, so we wait a bit + try {Thread.sleep(100);} catch (InterruptedException e1) {} + continue loop; + } + // fail + ConcurrentLog.severe("AbstractSolrConnector", "aborted concurrentDocumentsByQueryNoPrefetch after " + maxretries + " retries: " + e.getMessage()); + break; + } } - // fail - ConcurrentLog.severe("AbstractSolrConnector", "aborted concurrentDocumentsByQueryNoPrefetch after " + maxretries + " retries: " + e.getMessage()); - break; } - } - for (int i = 0; i < Math.max(1, concurrency); i++) { - try {queue.put(AbstractSolrConnector.POISON_DOCUMENT);} catch (final InterruptedException e1) {} + } catch (Throwable e) {} finally { + for (int i = 0; i < Math.max(1, concurrency); i++) { + try {queue.put(AbstractSolrConnector.POISON_DOCUMENT);} catch (final InterruptedException e1) {} + } } } }; @@ -253,6 +299,18 @@ public abstract class AbstractSolrConnector implements SolrConnector { return queue; } + /** + * get a document id result stream from a solr query. + * The result queue is considered as terminated if AbstractSolrConnector.POISON_ID is returned. + * The method returns immediately and feeds the search results into the queue + * @param querystring + * @param sort the solr sort string, may be null to be not used + * @param offset + * @param maxcount + * @param buffersize the size of an ArrayBlockingQueue; if <= 0 then a LinkedBlockingQueue is used + * @param concurrency is the number of AbstractSolrConnector.POISON_ID entries to add at the end of the feed + * @return a list of ids in q blocking queue which is terminated with a number of AbstractSolrConnector.POISON_ID + */ @Override public BlockingQueue concurrentIDsByQuery( final String querystring, @@ -262,28 +320,57 @@ public abstract class AbstractSolrConnector implements SolrConnector { final long maxtime, final int buffersize, final int concurrency) { + List querystrings = new ArrayList<>(1); + querystrings.add(querystring); + return concurrentIDsByQueries(querystrings, sort, offset, maxcount, maxtime, buffersize, concurrency); + } + + /** + * get a document id result stream from a set of solr queries. + * The result queue is considered as terminated if AbstractSolrConnector.POISON_ID is returned. + * The method returns immediately and feeds the search results into the queue + * @param querystring a list of query strings + * @param sort the solr sort string, may be null to be not used + * @param offset common offset of all queries + * @param maxcount maximum count for each query + * @param buffersize the size of an ArrayBlockingQueue; if <= 0 then a LinkedBlockingQueue is used + * @param concurrency is the number of AbstractSolrConnector.POISON_ID entries to add at the end of the feed + * @return a list of ids in q blocking queue which is terminated with a number of AbstractSolrConnector.POISON_ID + */ + @Override + public BlockingQueue concurrentIDsByQueries( + final List querystrings, + final String sort, + final int offset, + final int maxcount, + final long maxtime, + final int buffersize, + final int concurrency) { final BlockingQueue queue = buffersize <= 0 ? new LinkedBlockingQueue() : new ArrayBlockingQueue(buffersize); - final long endtime = maxtime == Long.MAX_VALUE ? Long.MAX_VALUE : System.currentTimeMillis() + maxtime; // we know infinity! + final long endtime = maxtime < 0 || maxtime == Long.MAX_VALUE ? Long.MAX_VALUE : System.currentTimeMillis() + maxtime; // we know infinity! final Thread t = new Thread() { @Override public void run() { - this.setName("AbstractSolrConnector:concurrentIDsByQuery(" + querystring + ")"); - int o = offset; try { - while (System.currentTimeMillis() < endtime) { - try { - SolrDocumentList sdl = getDocumentListByQuery(querystring, sort, o, Math.min(maxcount, pagesize_ids), CollectionSchema.id.getSolrFieldName()); - int count = 0; - for (SolrDocument d: sdl) { - try {queue.put((String) d.getFieldValue(CollectionSchema.id.getSolrFieldName()));} catch (final InterruptedException e) {break;} - count++; + for (String querystring: querystrings) { + this.setName("AbstractSolrConnector:concurrentIDsByQueries(" + querystring + ")"); + int o = offset; + while (System.currentTimeMillis() < endtime) { + try { + SolrDocumentList sdl = getDocumentListByQuery(querystring, sort, o, maxcount < 0 ? pagesize_ids : Math.min(maxcount, pagesize_ids), CollectionSchema.id.getSolrFieldName()); + int count = 0; + for (SolrDocument d: sdl) { + try {queue.put((String) d.getFieldValue(CollectionSchema.id.getSolrFieldName()));} catch (final InterruptedException e) {break;} + count++; + } + if (count < pagesize_ids) break; + o += count; + if (o > maxcount && maxcount > 0) break; + } catch (final SolrException e) { + break; + } catch (final IOException e) { + break; } - if (count < pagesize_ids) break; - o += pagesize_ids; - } catch (final SolrException e) { - break; - } catch (final IOException e) { - break; } } } catch (Throwable e) {} finally { diff --git a/source/net/yacy/cora/federate/solr/connector/ConcurrentUpdateSolrConnector.java b/source/net/yacy/cora/federate/solr/connector/ConcurrentUpdateSolrConnector.java index 9bf6f29a2..f6c91203a 100644 --- a/source/net/yacy/cora/federate/solr/connector/ConcurrentUpdateSolrConnector.java +++ b/source/net/yacy/cora/federate/solr/connector/ConcurrentUpdateSolrConnector.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.util.Collection; import java.util.Iterator; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; @@ -364,11 +365,28 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector { return this.connector.concurrentDocumentsByQuery(querystring, sort, offset, maxcount, maxtime, buffersize, concurrency, prefetchIDs, fields); } + @Override + public BlockingQueue concurrentDocumentsByQueries( + List querystrings, String sort, int offset, int maxcount, + long maxtime, int buffersize, int concurrency, boolean prefetchIDs, + String... fields) { + commitDocBuffer(); + return this.connector.concurrentDocumentsByQueries(querystrings, sort, offset, maxcount, maxtime, buffersize, concurrency, prefetchIDs, fields); + } + @Override public BlockingQueue concurrentIDsByQuery(String querystring, String sort, int offset, int maxcount, long maxtime, int buffersize, final int concurrency) { commitDocBuffer(); return this.connector.concurrentIDsByQuery(querystring, sort, offset, maxcount, maxtime, buffersize, concurrency); } + + @Override + public BlockingQueue concurrentIDsByQueries( + List querystrings, String sort, int offset, int maxcount, + long maxtime, int buffersize, int concurrency) { + commitDocBuffer(); + return this.connector.concurrentIDsByQueries(querystrings, sort, offset, maxcount, maxtime, buffersize, concurrency); + } @Override public void update(final SolrInputDocument solrdoc) throws IOException, SolrException { @@ -381,4 +399,5 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector { commitDocBuffer(); this.connector.update(solrdoc); } + } diff --git a/source/net/yacy/cora/federate/solr/connector/SolrConnector.java b/source/net/yacy/cora/federate/solr/connector/SolrConnector.java index f4c1dfd0f..527668456 100644 --- a/source/net/yacy/cora/federate/solr/connector/SolrConnector.java +++ b/source/net/yacy/cora/federate/solr/connector/SolrConnector.java @@ -22,6 +22,7 @@ package net.yacy.cora.federate.solr.connector; import java.io.IOException; import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; @@ -223,7 +224,7 @@ public interface SolrConnector extends Iterable /* Iterable of document public Map> getFacets(String query, int maxresults, final String ... fields) throws IOException; /** - * Get a query result from solr as a stream of documents. + * Get results from a solr query as a stream of documents. * The result queue is considered as terminated if AbstractSolrConnector.POISON_DOCUMENT is returned. * The method returns immediately and feeds the search results into the queue * @param querystring the solr query string @@ -235,7 +236,7 @@ public interface SolrConnector extends Iterable /* Iterable of document * @param concurrency is the number of AbstractSolrConnector.POISON_DOCUMENT entries to add at the end of the feed * @param prefetchIDs if true, then first all IDs are fetched and then all documents are queries by the ID. If false then documents are retrieved directly * @param fields list of fields - * @return a blocking queue which is terminated with AbstractSolrConnector.POISON_DOCUMENT as last element + * @return a blocking queue which is terminated with AbstractSolrConnector.POISON_DOCUMENT as last element */ public BlockingQueue concurrentDocumentsByQuery( final String querystring, @@ -247,7 +248,33 @@ public interface SolrConnector extends Iterable /* Iterable of document final int concurrency, final boolean prefetchIDs, final String ... fields); - + + /** + * Get results from solr queries as a stream of documents. + * The result queue is considered as terminated if AbstractSolrConnector.POISON_DOCUMENT is returned. + * The method returns immediately and feeds the search results into the queue + * @param querystrings the list of solr query strings + * @param sort the solr sort string, may be null to be not used + * @param offset first result offset + * @param maxcount the maximum number of results + * @param maxtime the maximum time in milliseconds + * @param buffersize the size of an ArrayBlockingQueue; if <= 0 then a LinkedBlockingQueue is used + * @param concurrency is the number of AbstractSolrConnector.POISON_DOCUMENT entries to add at the end of the feed + * @param prefetchIDs if true, then first all IDs are fetched and then all documents are queries by the ID. If false then documents are retrieved directly + * @param fields list of fields + * @return a blocking queue which is terminated with AbstractSolrConnector.POISON_DOCUMENT as last element + */ + public BlockingQueue concurrentDocumentsByQueries( + final List querystrings, + final String sort, + final int offset, + final int maxcount, + final long maxtime, + final int buffersize, + final int concurrency, + final boolean prefetchIDs, + final String ... fields); + /** * get a document id result stream from a solr query. * The result queue is considered as terminated if AbstractSolrConnector.POISON_ID is returned. @@ -258,7 +285,7 @@ public interface SolrConnector extends Iterable /* Iterable of document * @param maxcount * @param buffersize the size of an ArrayBlockingQueue; if <= 0 then a LinkedBlockingQueue is used * @param concurrency is the number of AbstractSolrConnector.POISON_ID entries to add at the end of the feed - * @return + * @return a list of ids in q blocking queue which is terminated with a number of AbstractSolrConnector.POISON_ID */ public BlockingQueue concurrentIDsByQuery( final String querystring, @@ -269,4 +296,24 @@ public interface SolrConnector extends Iterable /* Iterable of document final int buffersize, final int concurrency); + /** + * get a document id result stream from a set of solr queries. + * The result queue is considered as terminated if AbstractSolrConnector.POISON_ID is returned. + * The method returns immediately and feeds the search results into the queue + * @param querystring a list of query strings + * @param sort the solr sort string, may be null to be not used + * @param offset common offset of all queries + * @param maxcount maximum count for each query + * @param buffersize the size of an ArrayBlockingQueue; if <= 0 then a LinkedBlockingQueue is used + * @param concurrency is the number of AbstractSolrConnector.POISON_ID entries to add at the end of the feed + * @return a list of ids in q blocking queue which is terminated with a number of AbstractSolrConnector.POISON_ID + */ + public BlockingQueue concurrentIDsByQueries( + final List querystrings, + final String sort, + final int offset, + final int maxcount, + final long maxtime, + final int buffersize, + final int concurrency); } diff --git a/source/net/yacy/search/schema/CollectionConfiguration.java b/source/net/yacy/search/schema/CollectionConfiguration.java index 6cb589df2..37453486c 100644 --- a/source/net/yacy/search/schema/CollectionConfiguration.java +++ b/source/net/yacy/search/schema/CollectionConfiguration.java @@ -1264,17 +1264,20 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri // partitioning of the index, get a facet for a partitioning key final long count = collectionConnector.getCountByQuery(collection1query); String partitioningKey = CollectionSchema.responsetime_i.getSolrFieldName(); - Map> partitioningFacet = collectionConnector.getFacets(collection1query, 100000, partitioningKey); - ReversibleScoreMap partitioning = partitioningFacet.get(partitioningKey); - long emptyCount = collectionConnector.getCountByQuery("-" + partitioningKey + ":[* TO *] AND (" + collection1query + ")"); - if (emptyCount > 0) partitioning.inc("", (int) emptyCount); - final long start = System.currentTimeMillis(); - for (String partitioningValue: partitioning) { - String partitioningQuery = (partitioningValue.length() == 0) ? - "-" + partitioningKey + ":[* TO *] AND (" + collection1query + ")" : - partitioningKey + ":" + partitioningValue + " AND (" + collection1query + ")"; - postprocessingActivity = "collecting " + partitioning.get(partitioningValue) + " documents from partition \"" + partitioningValue + "\" (averall " + count + ") from the collection for harvestkey " + harvestkey + ", partitioned by " + partitioningKey; - + postprocessingActivity = "collecting " + count + " documents from the collection for harvestkey " + harvestkey + ", partitioned by " + partitioningKey; + if (count > 0) { + Map> partitioningFacet = collectionConnector.getFacets(collection1query, 100000, partitioningKey); + ReversibleScoreMap partitioning = partitioningFacet.get(partitioningKey); + long emptyCount = collectionConnector.getCountByQuery("-" + partitioningKey + ":[* TO *] AND (" + collection1query + ")"); + if (emptyCount > 0) partitioning.inc("", (int) emptyCount); + final long start = System.currentTimeMillis(); + List querystrings = new ArrayList<>(partitioning.size()); + for (String partitioningValue: partitioning) { + String partitioningQuery = (partitioningValue.length() == 0) ? + "-" + partitioningKey + ":[* TO *] AND (" + collection1query + ")" : + partitioningKey + ":" + partitioningValue + " AND (" + collection1query + ")"; + querystrings.add(partitioningQuery); + } // start collection of documents final int concurrency = Math.max(1, Math.min((int) (MemoryControl.available() / (100L * 1024L * 1024L)), Runtime.getRuntime().availableProcessors())); //final int concurrency = 1; @@ -1283,8 +1286,8 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri this.contains(CollectionSchema.references_external_i) && this.contains(CollectionSchema.references_exthosts_i); ConcurrentLog.info("CollectionConfiguration", postprocessingActivity); - final BlockingQueue docs = collectionConnector.concurrentDocumentsByQuery( - partitioningQuery, + final BlockingQueue docs = collectionConnector.concurrentDocumentsByQueries( + querystrings, (this.contains(CollectionSchema.http_unique_b) || this.contains(CollectionSchema.www_unique_b)) ? CollectionSchema.host_subdomain_s.getSolrFieldName() + " asc," + // sort on subdomain to get hosts without subdomain first; that gives an opportunity to set www_unique_b flag to false CollectionSchema.url_protocol_s.getSolrFieldName() + " asc" // sort on protocol to get http before https; that gives an opportunity to set http_unique_b flag to false @@ -1415,19 +1418,17 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri } // wait for termination for (int rewrite_start = 0; rewrite_start < concurrency; rewrite_start++) rewriteThread[rewrite_start].join(); + + if (failids.size() > 0) { + ConcurrentLog.info("CollectionConfiguration", "cleanup_processing: deleting " + failids.size() + " documents which have permanent execution fails"); + collectionConnector.deleteByIds(failids); + } + if (count != countcheck.get()) ConcurrentLog.warn("CollectionConfiguration", "ambiguous collection document count for harvestkey " + harvestkey + ": expected=" + count + ", counted=" + countcheck + "; countquery=" + collection1query); // big gap for harvestkey = null + ConcurrentLog.info("CollectionConfiguration", "cleanup_processing: re-calculated " + proccount + " new documents, " + + proccount_referencechange + " reference-count changes, " + + proccount_citationchange + " citation ranking changes."); } - if (failids.size() > 0) { - ConcurrentLog.info("CollectionConfiguration", "cleanup_processing: deleting " + failids.size() + " documents which have permanent execution fails"); - collectionConnector.deleteByIds(failids); - } - if (count != countcheck.get()) ConcurrentLog.warn("CollectionConfiguration", "ambiguous collection document count for harvestkey " + harvestkey + ": expected=" + count + ", counted=" + countcheck + "; countquery=" + collection1query); // big gap for harvestkey = null - ConcurrentLog.info("CollectionConfiguration", "cleanup_processing: re-calculated " + proccount + " new documents, " + - proccount_referencechange + " reference-count changes, " + - proccount_citationchange + " citation ranking changes."); - - - } catch (final InterruptedException e2) { ConcurrentLog.warn("CollectionConfiguration", e2.getMessage(), e2); } catch (IOException e3) { From fe8b1d137d3cfb1cc05b6d4b51abc94ba53f2c93 Mon Sep 17 00:00:00 2001 From: Michael Peter Christen Date: Sun, 2 Nov 2014 13:28:10 +0100 Subject: [PATCH 3/3] emergency bugfix for 100% CPU in image drawing --- source/net/yacy/visualization/ChartPlotter.java | 1 + 1 file changed, 1 insertion(+) diff --git a/source/net/yacy/visualization/ChartPlotter.java b/source/net/yacy/visualization/ChartPlotter.java index 58af8e5d1..da4816176 100644 --- a/source/net/yacy/visualization/ChartPlotter.java +++ b/source/net/yacy/visualization/ChartPlotter.java @@ -159,6 +159,7 @@ public class ChartPlotter extends RasterPlotter { private void drawVerticalScale(final boolean left, final int scale, final int pixelperscale, final int offset, final Long colorNaming, final Long colorScale, final String name) { assert pixelperscale > 0; assert scale > 0; + if (pixelperscale <= 0) return; // this would not meet the termination condition in the while loop final int x = (left) ? this.leftborder : this.width - this.rightborder; int y = this.height - this.bottomborder; int s = offset;