From 6d3d4c4ea63a9eae81ec69cf05787756245664d1 Mon Sep 17 00:00:00 2001 From: Michael Peter Christen Date: Wed, 17 Sep 2014 13:58:55 +0200 Subject: [PATCH] changed the concurrent enumeration of query results in such a way that it is now possible to get the results in two steps: - first retrieve all IDs as given for a query - then retieve each document individually This was necessary for very large result sets where a query may run for hours and is possibly terminated by a solr-internal timeout. This occurs regulary during postprocessing and therefore this commit may fix unwanted postprocessing terminations. --- htroot/HostBrowser.java | 2 +- htroot/IndexDeletion_p.java | 2 +- .../solr/connector/AbstractSolrConnector.java | 51 +++++++++++++++++-- .../ConcurrentUpdateSolrConnector.java | 4 +- .../solr/connector/SolrConnector.java | 2 + source/net/yacy/search/index/Fulltext.java | 4 +- source/net/yacy/search/index/Segment.java | 6 +-- .../schema/CollectionConfiguration.java | 6 +-- .../yacy/search/schema/HyperlinkGraph.java | 2 +- 9 files changed, 63 insertions(+), 16 deletions(-) diff --git a/htroot/HostBrowser.java b/htroot/HostBrowser.java index e171f74bb..65a07d324 100644 --- a/htroot/HostBrowser.java +++ b/htroot/HostBrowser.java @@ -297,7 +297,7 @@ public class HostBrowser { q.append(" AND ").append(CollectionSchema.url_paths_sxt.getSolrFieldName()).append(AbstractSolrConnector.CATCHALL_DTERM); } } - BlockingQueue docs = fulltext.getDefaultConnector().concurrentDocumentsByQuery(q.toString(), CollectionSchema.url_chars_i.getSolrFieldName() + " asc", 0, 100000, TIMEOUT, 100, 1, + BlockingQueue docs = fulltext.getDefaultConnector().concurrentDocumentsByQuery(q.toString(), CollectionSchema.url_chars_i.getSolrFieldName() + " asc", 0, 100000, TIMEOUT, 100, 1, false, CollectionSchema.id.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName(), CollectionSchema.failreason_s.getSolrFieldName(), diff --git a/htroot/IndexDeletion_p.java b/htroot/IndexDeletion_p.java index 748633dc5..31a17f580 100644 --- a/htroot/IndexDeletion_p.java +++ b/htroot/IndexDeletion_p.java @@ -135,7 +135,7 @@ public class IndexDeletion_p { } try { DigestURL u = new DigestURL(urlStub); - BlockingQueue dq = defaultConnector.concurrentDocumentsByQuery(CollectionSchema.host_s.getSolrFieldName() + ":\"" + u.getHost() + "\"", null, 0, 100000000, Long.MAX_VALUE, 100, 1, CollectionSchema.id.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName()); + BlockingQueue dq = defaultConnector.concurrentDocumentsByQuery(CollectionSchema.host_s.getSolrFieldName() + ":\"" + u.getHost() + "\"", null, 0, 100000000, Long.MAX_VALUE, 100, 1, false, CollectionSchema.id.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName()); SolrDocument doc; try { while ((doc = dq.take()) != AbstractSolrConnector.POISON_DOCUMENT) { diff --git a/source/net/yacy/cora/federate/solr/connector/AbstractSolrConnector.java b/source/net/yacy/cora/federate/solr/connector/AbstractSolrConnector.java index 4d6c4bfdd..7b04f663b 100644 --- a/source/net/yacy/cora/federate/solr/connector/AbstractSolrConnector.java +++ b/source/net/yacy/cora/federate/solr/connector/AbstractSolrConnector.java @@ -146,6 +146,51 @@ public abstract class AbstractSolrConnector implements SolrConnector { */ @Override public BlockingQueue concurrentDocumentsByQuery( + final String querystring, + final String sort, + final int offset, + final int maxcount, + final long maxtime, + final int buffersize, + final int concurrency, + final boolean prefetchIDs, + final String ... fields) { + assert buffersize > 0; + if (!prefetchIDs) return concurrentDocumentsByQueryNoPrefetch(querystring, sort, offset, maxcount, maxtime, buffersize, concurrency, fields); + final BlockingQueue idQueue = concurrentIDsByQuery(querystring, sort, offset, maxcount, maxtime, Math.min(maxcount, 10000000), 1); + final BlockingQueue queue = buffersize <= 0 ? new LinkedBlockingQueue() : new ArrayBlockingQueue(buffersize); + final long endtime = maxtime == Long.MAX_VALUE ? Long.MAX_VALUE : System.currentTimeMillis() + maxtime; // we know infinity! + final Thread t = new Thread() { + @Override + public void run() { + this.setName("AbstractSolrConnector:concurrentDocumentsByQueryWithPrefetch(" + querystring + ")"); + String nextID; + try { + while (System.currentTimeMillis() < endtime && (nextID = idQueue.take()) != AbstractSolrConnector.POISON_ID) { + + try { + SolrDocument d = getDocumentById(nextID, fields); + try {queue.put(d);} catch (final InterruptedException e) {} + } catch (final SolrException | IOException e) { + ConcurrentLog.logException(e); + // fail + ConcurrentLog.severe("AbstractSolrConnector", "aborted concurrentDocumentsByQuery: " + e.getMessage()); + break; + } + } + } catch (InterruptedException e) { + ConcurrentLog.severe("AbstractSolrConnector", "interrupted concurrentDocumentsByQuery: " + e.getMessage()); + } + for (int i = 0; i < Math.max(1, concurrency); i++) { + try {queue.put(AbstractSolrConnector.POISON_DOCUMENT);} catch (final InterruptedException e1) {} + } + } + }; + t.start(); + return queue; + } + + private BlockingQueue concurrentDocumentsByQueryNoPrefetch( final String querystring, final String sort, final int offset, @@ -162,7 +207,7 @@ public abstract class AbstractSolrConnector implements SolrConnector { final Thread t = new Thread() { @Override public void run() { - this.setName("AbstractSolrConnector:concurrentDocumentsByQuery(" + querystring + ")"); + this.setName("AbstractSolrConnector:concurrentDocumentsByQueryNoPrefetch(" + querystring + ")"); int o = offset; int count = 0; int retry = 0; @@ -187,7 +232,7 @@ public abstract class AbstractSolrConnector implements SolrConnector { continue loop; } // fail - ConcurrentLog.severe("AbstractSolrConnector", "aborted concurrentDocumentsByQuery after " + maxretries + " retries: " + e.getMessage()); + ConcurrentLog.severe("AbstractSolrConnector", "aborted concurrentDocumentsByQueryNoPrefetch after " + maxretries + " retries: " + e.getMessage()); break; } } @@ -199,7 +244,7 @@ public abstract class AbstractSolrConnector implements SolrConnector { t.start(); return queue; } - + @Override public BlockingQueue concurrentIDsByQuery( final String querystring, diff --git a/source/net/yacy/cora/federate/solr/connector/ConcurrentUpdateSolrConnector.java b/source/net/yacy/cora/federate/solr/connector/ConcurrentUpdateSolrConnector.java index d484ce309..3bfc4f2e8 100644 --- a/source/net/yacy/cora/federate/solr/connector/ConcurrentUpdateSolrConnector.java +++ b/source/net/yacy/cora/federate/solr/connector/ConcurrentUpdateSolrConnector.java @@ -359,9 +359,9 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector { } @Override - public BlockingQueue concurrentDocumentsByQuery(String querystring, String sort, int offset, int maxcount, long maxtime, int buffersize, final int concurrency, String... fields) { + public BlockingQueue concurrentDocumentsByQuery(String querystring, String sort, int offset, int maxcount, long maxtime, int buffersize, final int concurrency, final boolean prefetchIDs, String... fields) { commitDocBuffer(); - return this.connector.concurrentDocumentsByQuery(querystring, sort, offset, maxcount, maxtime, buffersize, concurrency, fields); + return this.connector.concurrentDocumentsByQuery(querystring, sort, offset, maxcount, maxtime, buffersize, concurrency, prefetchIDs, fields); } @Override diff --git a/source/net/yacy/cora/federate/solr/connector/SolrConnector.java b/source/net/yacy/cora/federate/solr/connector/SolrConnector.java index f9cbf2135..b5ddc001c 100644 --- a/source/net/yacy/cora/federate/solr/connector/SolrConnector.java +++ b/source/net/yacy/cora/federate/solr/connector/SolrConnector.java @@ -213,6 +213,7 @@ public interface SolrConnector extends Iterable /* Iterable of document * @param maxtime the maximum time in milliseconds * @param buffersize the size of an ArrayBlockingQueue; if <= 0 then a LinkedBlockingQueue is used * @param concurrency is the number of AbstractSolrConnector.POISON_DOCUMENT entries to add at the end of the feed + * @param prefetchIDs if true, then first all IDs are fetched and then all documents are queries by the ID. If false then documents are retrieved directly * @param fields list of fields * @return a blocking queue which is terminated with AbstractSolrConnector.POISON_DOCUMENT as last element */ @@ -224,6 +225,7 @@ public interface SolrConnector extends Iterable /* Iterable of document final long maxtime, final int buffersize, final int concurrency, + final boolean prefetchIDs, final String ... fields); /** diff --git a/source/net/yacy/search/index/Fulltext.java b/source/net/yacy/search/index/Fulltext.java index dfea34473..e7d7be5a6 100644 --- a/source/net/yacy/search/index/Fulltext.java +++ b/source/net/yacy/search/index/Fulltext.java @@ -429,7 +429,7 @@ public final class Fulltext { final String collectionQuery = CollectionSchema.host_s.getSolrFieldName() + ":\"" + host + "\"" + ((freshdate != null && freshdate.before(new Date())) ? (" AND " + CollectionSchema.load_date_dt.getSolrFieldName() + ":[* TO " + ISO8601Formatter.FORMATTER.format(freshdate) + "]") : ""); final AtomicInteger count = new AtomicInteger(0); - final BlockingQueue docs = Fulltext.this.getDefaultConnector().concurrentDocumentsByQuery(collectionQuery, null, 0, 1000000, Long.MAX_VALUE, 100, 1, CollectionSchema.id.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName()); + final BlockingQueue docs = Fulltext.this.getDefaultConnector().concurrentDocumentsByQuery(collectionQuery, null, 0, 1000000, Long.MAX_VALUE, 100, 1, false, CollectionSchema.id.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName()); try { Set deleteIDs = new HashSet(); SolrDocument doc; @@ -656,7 +656,7 @@ public final class Fulltext { this.count++; } } else { - BlockingQueue docs = Fulltext.this.getDefaultConnector().concurrentDocumentsByQuery(this.query + " AND " + CollectionSchema.httpstatus_i.getSolrFieldName() + ":200", null, 0, 100000000, Long.MAX_VALUE, 100, 1, + BlockingQueue docs = Fulltext.this.getDefaultConnector().concurrentDocumentsByQuery(this.query + " AND " + CollectionSchema.httpstatus_i.getSolrFieldName() + ":200", null, 0, 100000000, Long.MAX_VALUE, 100, 1, true, CollectionSchema.id.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName(), CollectionSchema.title.getSolrFieldName(), CollectionSchema.author.getSolrFieldName(), CollectionSchema.description_txt.getSolrFieldName(), CollectionSchema.size_i.getSolrFieldName(), CollectionSchema.last_modified.getSolrFieldName()); SolrDocument doc; diff --git a/source/net/yacy/search/index/Segment.java b/source/net/yacy/search/index/Segment.java index f58759307..ac580d7d1 100644 --- a/source/net/yacy/search/index/Segment.java +++ b/source/net/yacy/search/index/Segment.java @@ -266,7 +266,7 @@ public class Segment { if ((internalIDs.size() == 0 || !connectedCitation()) && Segment.this.fulltext.useWebgraph()) { // reqd the references from the webgraph SolrConnector webgraph = Segment.this.fulltext.getWebgraphConnector(); - BlockingQueue docs = webgraph.concurrentDocumentsByQuery("{!raw f=" + WebgraphSchema.target_id_s.getSolrFieldName() + "}" + ASCII.String(id), WebgraphSchema.source_chars_i.getSolrFieldName() + " asc", 0, 10000000, Long.MAX_VALUE, 100, 1, WebgraphSchema.source_id_s.getSolrFieldName()); + BlockingQueue docs = webgraph.concurrentDocumentsByQuery("{!raw f=" + WebgraphSchema.target_id_s.getSolrFieldName() + "}" + ASCII.String(id), WebgraphSchema.source_chars_i.getSolrFieldName() + " asc", 0, 10000000, Long.MAX_VALUE, 100, 1, false, WebgraphSchema.source_id_s.getSolrFieldName()); SolrDocument doc; try { while ((doc = docs.take()) != AbstractSolrConnector.POISON_DOCUMENT) { @@ -369,7 +369,7 @@ public class Segment { final BlockingQueue docQueue; final String urlstub; if (stub == null) { - docQueue = this.fulltext.getDefaultConnector().concurrentDocumentsByQuery(AbstractSolrConnector.CATCHALL_QUERY, CollectionSchema.url_chars_i.getSolrFieldName() + " asc", 0, Integer.MAX_VALUE, maxtime, maxcount, 1, CollectionSchema.id.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName()); + docQueue = this.fulltext.getDefaultConnector().concurrentDocumentsByQuery(AbstractSolrConnector.CATCHALL_QUERY, CollectionSchema.url_chars_i.getSolrFieldName() + " asc", 0, Integer.MAX_VALUE, maxtime, maxcount, 1, false, CollectionSchema.id.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName()); urlstub = null; } else { final String host = stub.getHost(); @@ -379,7 +379,7 @@ public class Segment { } catch (MalformedURLException e) { ConcurrentLog.logException(e); } - docQueue = hh == null ? new ArrayBlockingQueue(0) : this.fulltext.getDefaultConnector().concurrentDocumentsByQuery(CollectionSchema.host_id_s + ":\"" + hh + "\"", CollectionSchema.url_chars_i.getSolrFieldName() + " asc", 0, Integer.MAX_VALUE, maxtime, maxcount, 1, CollectionSchema.id.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName()); + docQueue = hh == null ? new ArrayBlockingQueue(0) : this.fulltext.getDefaultConnector().concurrentDocumentsByQuery(CollectionSchema.host_id_s + ":\"" + hh + "\"", CollectionSchema.url_chars_i.getSolrFieldName() + " asc", 0, Integer.MAX_VALUE, maxtime, maxcount, 1, false, CollectionSchema.id.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName()); urlstub = stub.toNormalform(true); } diff --git a/source/net/yacy/search/schema/CollectionConfiguration.java b/source/net/yacy/search/schema/CollectionConfiguration.java index a56f3c7ec..ee2a3225a 100644 --- a/source/net/yacy/search/schema/CollectionConfiguration.java +++ b/source/net/yacy/search/schema/CollectionConfiguration.java @@ -1056,7 +1056,7 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri // To do so, we first must collect all canonical links, find all references to them, get the anchor list of the documents and patch the citation reference of these links String patchquery = CollectionSchema.host_s.getSolrFieldName() + ":" + host + " AND " + CollectionSchema.canonical_s.getSolrFieldName() + AbstractSolrConnector.CATCHALL_DTERM; long patchquerycount = collectionConnector.getCountByQuery(patchquery); - BlockingQueue documents_with_canonical_tag = collectionConnector.concurrentDocumentsByQuery(patchquery, CollectionSchema.url_chars_i.getSolrFieldName() + " asc", 0, 100000000, Long.MAX_VALUE, 20, 1, + BlockingQueue documents_with_canonical_tag = collectionConnector.concurrentDocumentsByQuery(patchquery, CollectionSchema.url_chars_i.getSolrFieldName() + " asc", 0, 100000000, Long.MAX_VALUE, 20, 1, true, CollectionSchema.id.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName(), CollectionSchema.canonical_s.getSolrFieldName()); SolrDocument doc_B; int patchquerycountcheck = 0; @@ -1152,7 +1152,7 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri final long count = segment.fulltext().getWebgraphConnector().getCountByQuery(patchquery); int concurrency = Math.min((int) count, Math.max(1, Runtime.getRuntime().availableProcessors() / 4)); ConcurrentLog.info("CollectionConfiguration", "collecting " + count + " documents from the webgraph, concurrency = " + concurrency); - final BlockingQueue docs = segment.fulltext().getWebgraphConnector().concurrentDocumentsByQuery(patchquery, WebgraphSchema.source_chars_i.getSolrFieldName() + " asc", 0, 100000000, Long.MAX_VALUE, concurrency + 1, concurrency); + final BlockingQueue docs = segment.fulltext().getWebgraphConnector().concurrentDocumentsByQuery(patchquery, WebgraphSchema.source_chars_i.getSolrFieldName() + " asc", 0, 100000000, Long.MAX_VALUE, concurrency + 1, concurrency, true); final AtomicInteger proccount = new AtomicInteger(0); Thread[] t = new Thread[concurrency]; for (final AtomicInteger i = new AtomicInteger(0); i.get() < t.length; i.incrementAndGet()) { @@ -1260,7 +1260,7 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri CollectionSchema.host_subdomain_s.getSolrFieldName() + " asc," + // sort on subdomain to get hosts without subdomain first; that gives an opportunity to set www_unique_b flag to false CollectionSchema.url_protocol_s.getSolrFieldName() + " asc" // sort on protocol to get http before https; that gives an opportunity to set http_unique_b flag to false : null, // null sort is faster! - 0, 100000000, Long.MAX_VALUE, concurrency + 1, concurrency); + 0, 100000000, Long.MAX_VALUE, concurrency + 1, concurrency, true); final AtomicInteger proccount = new AtomicInteger(); final AtomicInteger proccount_referencechange = new AtomicInteger(); final AtomicInteger proccount_citationchange = new AtomicInteger(); diff --git a/source/net/yacy/search/schema/HyperlinkGraph.java b/source/net/yacy/search/schema/HyperlinkGraph.java index 3a22464a9..884378977 100644 --- a/source/net/yacy/search/schema/HyperlinkGraph.java +++ b/source/net/yacy/search/schema/HyperlinkGraph.java @@ -62,7 +62,7 @@ public class HyperlinkGraph implements Iterable { if (hostname.startsWith("www.")) hostname = hostname.substring(4); StringBuilder q = new StringBuilder(); q.append(CollectionSchema.host_s.getSolrFieldName()).append(':').append(hostname).append(" OR ").append(CollectionSchema.host_s.getSolrFieldName()).append(':').append("www.").append(hostname); - BlockingQueue docs = solrConnector.concurrentDocumentsByQuery(q.toString(), CollectionSchema.url_chars_i.getSolrFieldName() + " asc", 0, maxnodes, maxtime, 100, 1, + BlockingQueue docs = solrConnector.concurrentDocumentsByQuery(q.toString(), CollectionSchema.url_chars_i.getSolrFieldName() + " asc", 0, maxnodes, maxtime, 100, 1, true, CollectionSchema.id.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName(), CollectionSchema.failreason_s.getSolrFieldName(),