changed the concurrent enumeration of query results in such a way that

it is now possible to get the results in two steps:
- first retrieve all IDs as given for a query
- then retieve each document individually

This was necessary for very large result sets where a query may run for
hours and is possibly terminated by a solr-internal timeout. This occurs
regulary during postprocessing and therefore this commit may fix
unwanted postprocessing terminations.
pull/1/head
Michael Peter Christen 11 years ago
parent ad35d9294f
commit 6d3d4c4ea6

@ -297,7 +297,7 @@ public class HostBrowser {
q.append(" AND ").append(CollectionSchema.url_paths_sxt.getSolrFieldName()).append(AbstractSolrConnector.CATCHALL_DTERM);
}
}
BlockingQueue<SolrDocument> docs = fulltext.getDefaultConnector().concurrentDocumentsByQuery(q.toString(), CollectionSchema.url_chars_i.getSolrFieldName() + " asc", 0, 100000, TIMEOUT, 100, 1,
BlockingQueue<SolrDocument> docs = fulltext.getDefaultConnector().concurrentDocumentsByQuery(q.toString(), CollectionSchema.url_chars_i.getSolrFieldName() + " asc", 0, 100000, TIMEOUT, 100, 1, false,
CollectionSchema.id.getSolrFieldName(),
CollectionSchema.sku.getSolrFieldName(),
CollectionSchema.failreason_s.getSolrFieldName(),

@ -135,7 +135,7 @@ public class IndexDeletion_p {
}
try {
DigestURL u = new DigestURL(urlStub);
BlockingQueue<SolrDocument> dq = defaultConnector.concurrentDocumentsByQuery(CollectionSchema.host_s.getSolrFieldName() + ":\"" + u.getHost() + "\"", null, 0, 100000000, Long.MAX_VALUE, 100, 1, CollectionSchema.id.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName());
BlockingQueue<SolrDocument> dq = defaultConnector.concurrentDocumentsByQuery(CollectionSchema.host_s.getSolrFieldName() + ":\"" + u.getHost() + "\"", null, 0, 100000000, Long.MAX_VALUE, 100, 1, false, CollectionSchema.id.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName());
SolrDocument doc;
try {
while ((doc = dq.take()) != AbstractSolrConnector.POISON_DOCUMENT) {

@ -146,6 +146,51 @@ public abstract class AbstractSolrConnector implements SolrConnector {
*/
@Override
public BlockingQueue<SolrDocument> concurrentDocumentsByQuery(
final String querystring,
final String sort,
final int offset,
final int maxcount,
final long maxtime,
final int buffersize,
final int concurrency,
final boolean prefetchIDs,
final String ... fields) {
assert buffersize > 0;
if (!prefetchIDs) return concurrentDocumentsByQueryNoPrefetch(querystring, sort, offset, maxcount, maxtime, buffersize, concurrency, fields);
final BlockingQueue<String> idQueue = concurrentIDsByQuery(querystring, sort, offset, maxcount, maxtime, Math.min(maxcount, 10000000), 1);
final BlockingQueue<SolrDocument> queue = buffersize <= 0 ? new LinkedBlockingQueue<SolrDocument>() : new ArrayBlockingQueue<SolrDocument>(buffersize);
final long endtime = maxtime == Long.MAX_VALUE ? Long.MAX_VALUE : System.currentTimeMillis() + maxtime; // we know infinity!
final Thread t = new Thread() {
@Override
public void run() {
this.setName("AbstractSolrConnector:concurrentDocumentsByQueryWithPrefetch(" + querystring + ")");
String nextID;
try {
while (System.currentTimeMillis() < endtime && (nextID = idQueue.take()) != AbstractSolrConnector.POISON_ID) {
try {
SolrDocument d = getDocumentById(nextID, fields);
try {queue.put(d);} catch (final InterruptedException e) {}
} catch (final SolrException | IOException e) {
ConcurrentLog.logException(e);
// fail
ConcurrentLog.severe("AbstractSolrConnector", "aborted concurrentDocumentsByQuery: " + e.getMessage());
break;
}
}
} catch (InterruptedException e) {
ConcurrentLog.severe("AbstractSolrConnector", "interrupted concurrentDocumentsByQuery: " + e.getMessage());
}
for (int i = 0; i < Math.max(1, concurrency); i++) {
try {queue.put(AbstractSolrConnector.POISON_DOCUMENT);} catch (final InterruptedException e1) {}
}
}
};
t.start();
return queue;
}
private BlockingQueue<SolrDocument> concurrentDocumentsByQueryNoPrefetch(
final String querystring,
final String sort,
final int offset,
@ -162,7 +207,7 @@ public abstract class AbstractSolrConnector implements SolrConnector {
final Thread t = new Thread() {
@Override
public void run() {
this.setName("AbstractSolrConnector:concurrentDocumentsByQuery(" + querystring + ")");
this.setName("AbstractSolrConnector:concurrentDocumentsByQueryNoPrefetch(" + querystring + ")");
int o = offset;
int count = 0;
int retry = 0;
@ -187,7 +232,7 @@ public abstract class AbstractSolrConnector implements SolrConnector {
continue loop;
}
// fail
ConcurrentLog.severe("AbstractSolrConnector", "aborted concurrentDocumentsByQuery after " + maxretries + " retries: " + e.getMessage());
ConcurrentLog.severe("AbstractSolrConnector", "aborted concurrentDocumentsByQueryNoPrefetch after " + maxretries + " retries: " + e.getMessage());
break;
}
}
@ -199,7 +244,7 @@ public abstract class AbstractSolrConnector implements SolrConnector {
t.start();
return queue;
}
@Override
public BlockingQueue<String> concurrentIDsByQuery(
final String querystring,

@ -359,9 +359,9 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
}
@Override
public BlockingQueue<SolrDocument> concurrentDocumentsByQuery(String querystring, String sort, int offset, int maxcount, long maxtime, int buffersize, final int concurrency, String... fields) {
public BlockingQueue<SolrDocument> concurrentDocumentsByQuery(String querystring, String sort, int offset, int maxcount, long maxtime, int buffersize, final int concurrency, final boolean prefetchIDs, String... fields) {
commitDocBuffer();
return this.connector.concurrentDocumentsByQuery(querystring, sort, offset, maxcount, maxtime, buffersize, concurrency, fields);
return this.connector.concurrentDocumentsByQuery(querystring, sort, offset, maxcount, maxtime, buffersize, concurrency, prefetchIDs, fields);
}
@Override

@ -213,6 +213,7 @@ public interface SolrConnector extends Iterable<String> /* Iterable of document
* @param maxtime the maximum time in milliseconds
* @param buffersize the size of an ArrayBlockingQueue; if <= 0 then a LinkedBlockingQueue is used
* @param concurrency is the number of AbstractSolrConnector.POISON_DOCUMENT entries to add at the end of the feed
* @param prefetchIDs if true, then first all IDs are fetched and then all documents are queries by the ID. If false then documents are retrieved directly
* @param fields list of fields
* @return a blocking queue which is terminated with AbstractSolrConnector.POISON_DOCUMENT as last element
*/
@ -224,6 +225,7 @@ public interface SolrConnector extends Iterable<String> /* Iterable of document
final long maxtime,
final int buffersize,
final int concurrency,
final boolean prefetchIDs,
final String ... fields);
/**

@ -429,7 +429,7 @@ public final class Fulltext {
final String collectionQuery = CollectionSchema.host_s.getSolrFieldName() + ":\"" + host + "\"" +
((freshdate != null && freshdate.before(new Date())) ? (" AND " + CollectionSchema.load_date_dt.getSolrFieldName() + ":[* TO " + ISO8601Formatter.FORMATTER.format(freshdate) + "]") : "");
final AtomicInteger count = new AtomicInteger(0);
final BlockingQueue<SolrDocument> docs = Fulltext.this.getDefaultConnector().concurrentDocumentsByQuery(collectionQuery, null, 0, 1000000, Long.MAX_VALUE, 100, 1, CollectionSchema.id.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName());
final BlockingQueue<SolrDocument> docs = Fulltext.this.getDefaultConnector().concurrentDocumentsByQuery(collectionQuery, null, 0, 1000000, Long.MAX_VALUE, 100, 1, false, CollectionSchema.id.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName());
try {
Set<String> deleteIDs = new HashSet<String>();
SolrDocument doc;
@ -656,7 +656,7 @@ public final class Fulltext {
this.count++;
}
} else {
BlockingQueue<SolrDocument> docs = Fulltext.this.getDefaultConnector().concurrentDocumentsByQuery(this.query + " AND " + CollectionSchema.httpstatus_i.getSolrFieldName() + ":200", null, 0, 100000000, Long.MAX_VALUE, 100, 1,
BlockingQueue<SolrDocument> docs = Fulltext.this.getDefaultConnector().concurrentDocumentsByQuery(this.query + " AND " + CollectionSchema.httpstatus_i.getSolrFieldName() + ":200", null, 0, 100000000, Long.MAX_VALUE, 100, 1, true,
CollectionSchema.id.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName(), CollectionSchema.title.getSolrFieldName(),
CollectionSchema.author.getSolrFieldName(), CollectionSchema.description_txt.getSolrFieldName(), CollectionSchema.size_i.getSolrFieldName(), CollectionSchema.last_modified.getSolrFieldName());
SolrDocument doc;

@ -266,7 +266,7 @@ public class Segment {
if ((internalIDs.size() == 0 || !connectedCitation()) && Segment.this.fulltext.useWebgraph()) {
// reqd the references from the webgraph
SolrConnector webgraph = Segment.this.fulltext.getWebgraphConnector();
BlockingQueue<SolrDocument> docs = webgraph.concurrentDocumentsByQuery("{!raw f=" + WebgraphSchema.target_id_s.getSolrFieldName() + "}" + ASCII.String(id), WebgraphSchema.source_chars_i.getSolrFieldName() + " asc", 0, 10000000, Long.MAX_VALUE, 100, 1, WebgraphSchema.source_id_s.getSolrFieldName());
BlockingQueue<SolrDocument> docs = webgraph.concurrentDocumentsByQuery("{!raw f=" + WebgraphSchema.target_id_s.getSolrFieldName() + "}" + ASCII.String(id), WebgraphSchema.source_chars_i.getSolrFieldName() + " asc", 0, 10000000, Long.MAX_VALUE, 100, 1, false, WebgraphSchema.source_id_s.getSolrFieldName());
SolrDocument doc;
try {
while ((doc = docs.take()) != AbstractSolrConnector.POISON_DOCUMENT) {
@ -369,7 +369,7 @@ public class Segment {
final BlockingQueue<SolrDocument> docQueue;
final String urlstub;
if (stub == null) {
docQueue = this.fulltext.getDefaultConnector().concurrentDocumentsByQuery(AbstractSolrConnector.CATCHALL_QUERY, CollectionSchema.url_chars_i.getSolrFieldName() + " asc", 0, Integer.MAX_VALUE, maxtime, maxcount, 1, CollectionSchema.id.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName());
docQueue = this.fulltext.getDefaultConnector().concurrentDocumentsByQuery(AbstractSolrConnector.CATCHALL_QUERY, CollectionSchema.url_chars_i.getSolrFieldName() + " asc", 0, Integer.MAX_VALUE, maxtime, maxcount, 1, false, CollectionSchema.id.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName());
urlstub = null;
} else {
final String host = stub.getHost();
@ -379,7 +379,7 @@ public class Segment {
} catch (MalformedURLException e) {
ConcurrentLog.logException(e);
}
docQueue = hh == null ? new ArrayBlockingQueue<SolrDocument>(0) : this.fulltext.getDefaultConnector().concurrentDocumentsByQuery(CollectionSchema.host_id_s + ":\"" + hh + "\"", CollectionSchema.url_chars_i.getSolrFieldName() + " asc", 0, Integer.MAX_VALUE, maxtime, maxcount, 1, CollectionSchema.id.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName());
docQueue = hh == null ? new ArrayBlockingQueue<SolrDocument>(0) : this.fulltext.getDefaultConnector().concurrentDocumentsByQuery(CollectionSchema.host_id_s + ":\"" + hh + "\"", CollectionSchema.url_chars_i.getSolrFieldName() + " asc", 0, Integer.MAX_VALUE, maxtime, maxcount, 1, false, CollectionSchema.id.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName());
urlstub = stub.toNormalform(true);
}

@ -1056,7 +1056,7 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
// To do so, we first must collect all canonical links, find all references to them, get the anchor list of the documents and patch the citation reference of these links
String patchquery = CollectionSchema.host_s.getSolrFieldName() + ":" + host + " AND " + CollectionSchema.canonical_s.getSolrFieldName() + AbstractSolrConnector.CATCHALL_DTERM;
long patchquerycount = collectionConnector.getCountByQuery(patchquery);
BlockingQueue<SolrDocument> documents_with_canonical_tag = collectionConnector.concurrentDocumentsByQuery(patchquery, CollectionSchema.url_chars_i.getSolrFieldName() + " asc", 0, 100000000, Long.MAX_VALUE, 20, 1,
BlockingQueue<SolrDocument> documents_with_canonical_tag = collectionConnector.concurrentDocumentsByQuery(patchquery, CollectionSchema.url_chars_i.getSolrFieldName() + " asc", 0, 100000000, Long.MAX_VALUE, 20, 1, true,
CollectionSchema.id.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName(), CollectionSchema.canonical_s.getSolrFieldName());
SolrDocument doc_B;
int patchquerycountcheck = 0;
@ -1152,7 +1152,7 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
final long count = segment.fulltext().getWebgraphConnector().getCountByQuery(patchquery);
int concurrency = Math.min((int) count, Math.max(1, Runtime.getRuntime().availableProcessors() / 4));
ConcurrentLog.info("CollectionConfiguration", "collecting " + count + " documents from the webgraph, concurrency = " + concurrency);
final BlockingQueue<SolrDocument> docs = segment.fulltext().getWebgraphConnector().concurrentDocumentsByQuery(patchquery, WebgraphSchema.source_chars_i.getSolrFieldName() + " asc", 0, 100000000, Long.MAX_VALUE, concurrency + 1, concurrency);
final BlockingQueue<SolrDocument> docs = segment.fulltext().getWebgraphConnector().concurrentDocumentsByQuery(patchquery, WebgraphSchema.source_chars_i.getSolrFieldName() + " asc", 0, 100000000, Long.MAX_VALUE, concurrency + 1, concurrency, true);
final AtomicInteger proccount = new AtomicInteger(0);
Thread[] t = new Thread[concurrency];
for (final AtomicInteger i = new AtomicInteger(0); i.get() < t.length; i.incrementAndGet()) {
@ -1260,7 +1260,7 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
CollectionSchema.host_subdomain_s.getSolrFieldName() + " asc," + // sort on subdomain to get hosts without subdomain first; that gives an opportunity to set www_unique_b flag to false
CollectionSchema.url_protocol_s.getSolrFieldName() + " asc" // sort on protocol to get http before https; that gives an opportunity to set http_unique_b flag to false
: null, // null sort is faster!
0, 100000000, Long.MAX_VALUE, concurrency + 1, concurrency);
0, 100000000, Long.MAX_VALUE, concurrency + 1, concurrency, true);
final AtomicInteger proccount = new AtomicInteger();
final AtomicInteger proccount_referencechange = new AtomicInteger();
final AtomicInteger proccount_citationchange = new AtomicInteger();

@ -62,7 +62,7 @@ public class HyperlinkGraph implements Iterable<HyperlinkEdge> {
if (hostname.startsWith("www.")) hostname = hostname.substring(4);
StringBuilder q = new StringBuilder();
q.append(CollectionSchema.host_s.getSolrFieldName()).append(':').append(hostname).append(" OR ").append(CollectionSchema.host_s.getSolrFieldName()).append(':').append("www.").append(hostname);
BlockingQueue<SolrDocument> docs = solrConnector.concurrentDocumentsByQuery(q.toString(), CollectionSchema.url_chars_i.getSolrFieldName() + " asc", 0, maxnodes, maxtime, 100, 1,
BlockingQueue<SolrDocument> docs = solrConnector.concurrentDocumentsByQuery(q.toString(), CollectionSchema.url_chars_i.getSolrFieldName() + " asc", 0, maxnodes, maxtime, 100, 1, true,
CollectionSchema.id.getSolrFieldName(),
CollectionSchema.sku.getSolrFieldName(),
CollectionSchema.failreason_s.getSolrFieldName(),

Loading…
Cancel
Save