more enhancements to posprocessing speed

pull/1/head
Michael Peter Christen 10 years ago
parent 9a7fe9e0d1
commit 92007e5d2d

@ -138,7 +138,7 @@ public abstract class AbstractSolrConnector implements SolrConnector {
}
/**
* Get a query result from solr as a stream of documents.
* Get results from a solr query as a stream of documents.
* The result queue is considered as terminated if AbstractSolrConnector.POISON_DOCUMENT is returned.
* The method returns immediately and feeds the search results into the queue
* @param querystring the solr query string
@ -148,7 +148,9 @@ public abstract class AbstractSolrConnector implements SolrConnector {
* @param maxtime the maximum time in milliseconds
* @param buffersize the size of an ArrayBlockingQueue; if <= 0 then a LinkedBlockingQueue is used
* @param concurrency is the number of AbstractSolrConnector.POISON_DOCUMENT entries to add at the end of the feed
* @return a blocking queue which is terminated with AbstractSolrConnector.POISON_DOCUMENT as last element
* @param prefetchIDs if true, then first all IDs are fetched and then all documents are queries by the ID. If false then documents are retrieved directly
* @param fields list of fields
* @return a blocking queue which is terminated with AbstractSolrConnector.POISON_DOCUMENT as last element
*/
@Override
public BlockingQueue<SolrDocument> concurrentDocumentsByQuery(
@ -161,17 +163,52 @@ public abstract class AbstractSolrConnector implements SolrConnector {
final int concurrency,
final boolean prefetchIDs,
final String ... fields) {
List<String> querystrings = new ArrayList<>(1);
querystrings.add(querystring);
return concurrentDocumentsByQueries(querystrings, sort, offset, maxcount, maxtime, buffersize, concurrency, prefetchIDs, fields);
}
/**
* Get results from solr queries as a stream of documents.
* The result queue is considered as terminated if AbstractSolrConnector.POISON_DOCUMENT is returned.
* The method returns immediately and feeds the search results into the queue
* @param querystrings the list of solr query strings
* @param sort the solr sort string, may be null to be not used
* @param offset first result offset
* @param maxcount the maximum number of results
* @param maxtime the maximum time in milliseconds
* @param buffersize the size of an ArrayBlockingQueue; if <= 0 then a LinkedBlockingQueue is used
* @param concurrency is the number of AbstractSolrConnector.POISON_DOCUMENT entries to add at the end of the feed
* @param prefetchIDs if true, then first all IDs are fetched and then all documents are queries by the ID. If false then documents are retrieved directly
* @param fields list of fields
* @return a blocking queue which is terminated with AbstractSolrConnector.POISON_DOCUMENT as last element
*/
@Override
public BlockingQueue<SolrDocument> concurrentDocumentsByQueries(
final List<String> querystrings,
final String sort,
final int offset,
final int maxcount,
final long maxtime,
final int buffersize,
final int concurrency,
final boolean prefetchIDs,
final String ... fields) {
assert buffersize > 0;
if (!prefetchIDs) return concurrentDocumentsByQueryNoPrefetch(querystring, sort, offset, maxcount, maxtime, buffersize, concurrency, fields);
final BlockingQueue<String> idQueue = concurrentIDsByQuery(querystring, sort, offset, maxcount, maxtime, Math.min(maxcount, 10000000), concurrency);
if (!prefetchIDs) return concurrentDocumentsByQueriesNoPrefetch(querystrings, sort, offset, maxcount, maxtime, buffersize, concurrency, fields);
final BlockingQueue<SolrDocument> queue = buffersize <= 0 ? new LinkedBlockingQueue<SolrDocument>() : new ArrayBlockingQueue<SolrDocument>(Math.max(buffersize, concurrency));
final long endtime = maxtime == Long.MAX_VALUE ? Long.MAX_VALUE : System.currentTimeMillis() + maxtime; // we know infinity!
if (querystrings.size() == 0) {
for (int i = 0; i < Math.max(1, concurrency); i++) try {queue.put(AbstractSolrConnector.POISON_DOCUMENT);} catch (final InterruptedException e1) {}
return queue;
}
final BlockingQueue<String> idQueue = concurrentIDsByQueries(querystrings, sort, offset, maxcount, maxtime, Math.min(maxcount, 10000000), concurrency);
final long endtime = maxtime < 0 || maxtime == Long.MAX_VALUE ? Long.MAX_VALUE : System.currentTimeMillis() + maxtime; // we know infinity!
final Thread[] t = new Thread[concurrency];
for (int i = 0; i < concurrency; i++) {
for (int i = 0; i < Math.max(1, concurrency); i++) {
t[i] = new Thread() {
@Override
public void run() {
this.setName("AbstractSolrConnector:concurrentDocumentsByQueryWithPrefetch(" + querystring + ")");
this.setName("AbstractSolrConnector:concurrentDocumentsByQueriesWithPrefetch(" + querystrings.size() + " queries, first: " + querystrings.iterator().next() + ")");
String nextID;
try {
while (System.currentTimeMillis() < endtime && (nextID = idQueue.take()) != AbstractSolrConnector.POISON_ID) {
@ -197,9 +234,9 @@ public abstract class AbstractSolrConnector implements SolrConnector {
}
return queue;
}
private BlockingQueue<SolrDocument> concurrentDocumentsByQueryNoPrefetch(
final String querystring,
private BlockingQueue<SolrDocument> concurrentDocumentsByQueriesNoPrefetch(
final List<String> querystrings,
final String sort,
final int offset,
final int maxcount,
@ -209,43 +246,52 @@ public abstract class AbstractSolrConnector implements SolrConnector {
final String ... fields) {
assert buffersize > 0;
final BlockingQueue<SolrDocument> queue = buffersize <= 0 ? new LinkedBlockingQueue<SolrDocument>() : new ArrayBlockingQueue<SolrDocument>(buffersize);
final long endtime = maxtime == Long.MAX_VALUE ? Long.MAX_VALUE : System.currentTimeMillis() + maxtime; // we know infinity!
final int ps = Math.min(pagesize_docs, buffersize);
final int maxretries = 60;
if (querystrings.size() == 0) {
for (int i = 0; i < Math.max(1, concurrency); i++) try {queue.put(AbstractSolrConnector.POISON_DOCUMENT);} catch (final InterruptedException e1) {}
return queue;
}
final long endtime = maxtime < 0 || maxtime == Long.MAX_VALUE ? Long.MAX_VALUE : System.currentTimeMillis() + maxtime; // we know infinity!
final int ps = buffersize < 0 ? pagesize_docs : Math.min(pagesize_docs, buffersize);
final int maxretries = 6;
final Thread t = new Thread() {
@Override
public void run() {
this.setName("AbstractSolrConnector:concurrentDocumentsByQueryNoPrefetch(" + querystring + ")");
int o = offset;
int count = 0;
int retry = 0;
loop: while (System.currentTimeMillis() < endtime && count < maxcount) {
try {
SolrDocumentList sdl = getDocumentListByQuery(querystring, sort, o, Math.min(maxcount, ps), fields);
for (SolrDocument d: sdl) {
try {queue.put(d);} catch (final InterruptedException e) {break;}
count++;
}
if (sdl.size() < ps) {
//System.out.println("sdl.size() = " + sdl.size() + ", pagesize = " + pagesize);
break loop; // finished
}
o += sdl.size();
retry = 0;
} catch (final SolrException | IOException e) {
ConcurrentLog.logException(e);
if (retry++ < maxretries) {
// remote Solr may be temporary down, so we wait a bit
try {Thread.sleep(100);} catch (InterruptedException e1) {}
continue loop;
try {
for (String querystring: querystrings) {
this.setName("AbstractSolrConnector:concurrentDocumentsByQueryNoPrefetch(" + querystring + ")");
int o = offset;
int count = 0;
int retry = 0;
loop: while (System.currentTimeMillis() < endtime && count < maxcount) {
try {
SolrDocumentList sdl = getDocumentListByQuery(querystring, sort, o, Math.min(maxcount, ps), fields);
for (SolrDocument d: sdl) {
try {queue.put(d);} catch (final InterruptedException e) {break;}
count++;
}
if (sdl.size() < ps) {
//System.out.println("sdl.size() = " + sdl.size() + ", pagesize = " + pagesize);
break loop; // finished
}
o += sdl.size();
retry = 0;
} catch (final SolrException | IOException e) {
ConcurrentLog.logException(e);
if (retry++ < maxretries) {
// remote Solr may be temporary down, so we wait a bit
try {Thread.sleep(100);} catch (InterruptedException e1) {}
continue loop;
}
// fail
ConcurrentLog.severe("AbstractSolrConnector", "aborted concurrentDocumentsByQueryNoPrefetch after " + maxretries + " retries: " + e.getMessage());
break;
}
}
// fail
ConcurrentLog.severe("AbstractSolrConnector", "aborted concurrentDocumentsByQueryNoPrefetch after " + maxretries + " retries: " + e.getMessage());
break;
}
}
for (int i = 0; i < Math.max(1, concurrency); i++) {
try {queue.put(AbstractSolrConnector.POISON_DOCUMENT);} catch (final InterruptedException e1) {}
} catch (Throwable e) {} finally {
for (int i = 0; i < Math.max(1, concurrency); i++) {
try {queue.put(AbstractSolrConnector.POISON_DOCUMENT);} catch (final InterruptedException e1) {}
}
}
}
};
@ -253,6 +299,18 @@ public abstract class AbstractSolrConnector implements SolrConnector {
return queue;
}
/**
* get a document id result stream from a solr query.
* The result queue is considered as terminated if AbstractSolrConnector.POISON_ID is returned.
* The method returns immediately and feeds the search results into the queue
* @param querystring
* @param sort the solr sort string, may be null to be not used
* @param offset
* @param maxcount
* @param buffersize the size of an ArrayBlockingQueue; if <= 0 then a LinkedBlockingQueue is used
* @param concurrency is the number of AbstractSolrConnector.POISON_ID entries to add at the end of the feed
* @return a list of ids in q blocking queue which is terminated with a number of AbstractSolrConnector.POISON_ID
*/
@Override
public BlockingQueue<String> concurrentIDsByQuery(
final String querystring,
@ -262,28 +320,57 @@ public abstract class AbstractSolrConnector implements SolrConnector {
final long maxtime,
final int buffersize,
final int concurrency) {
List<String> querystrings = new ArrayList<>(1);
querystrings.add(querystring);
return concurrentIDsByQueries(querystrings, sort, offset, maxcount, maxtime, buffersize, concurrency);
}
/**
* get a document id result stream from a set of solr queries.
* The result queue is considered as terminated if AbstractSolrConnector.POISON_ID is returned.
* The method returns immediately and feeds the search results into the queue
* @param querystring a list of query strings
* @param sort the solr sort string, may be null to be not used
* @param offset common offset of all queries
* @param maxcount maximum count for each query
* @param buffersize the size of an ArrayBlockingQueue; if <= 0 then a LinkedBlockingQueue is used
* @param concurrency is the number of AbstractSolrConnector.POISON_ID entries to add at the end of the feed
* @return a list of ids in q blocking queue which is terminated with a number of AbstractSolrConnector.POISON_ID
*/
@Override
public BlockingQueue<String> concurrentIDsByQueries(
final List<String> querystrings,
final String sort,
final int offset,
final int maxcount,
final long maxtime,
final int buffersize,
final int concurrency) {
final BlockingQueue<String> queue = buffersize <= 0 ? new LinkedBlockingQueue<String>() : new ArrayBlockingQueue<String>(buffersize);
final long endtime = maxtime == Long.MAX_VALUE ? Long.MAX_VALUE : System.currentTimeMillis() + maxtime; // we know infinity!
final long endtime = maxtime < 0 || maxtime == Long.MAX_VALUE ? Long.MAX_VALUE : System.currentTimeMillis() + maxtime; // we know infinity!
final Thread t = new Thread() {
@Override
public void run() {
this.setName("AbstractSolrConnector:concurrentIDsByQuery(" + querystring + ")");
int o = offset;
try {
while (System.currentTimeMillis() < endtime) {
try {
SolrDocumentList sdl = getDocumentListByQuery(querystring, sort, o, Math.min(maxcount, pagesize_ids), CollectionSchema.id.getSolrFieldName());
int count = 0;
for (SolrDocument d: sdl) {
try {queue.put((String) d.getFieldValue(CollectionSchema.id.getSolrFieldName()));} catch (final InterruptedException e) {break;}
count++;
for (String querystring: querystrings) {
this.setName("AbstractSolrConnector:concurrentIDsByQueries(" + querystring + ")");
int o = offset;
while (System.currentTimeMillis() < endtime) {
try {
SolrDocumentList sdl = getDocumentListByQuery(querystring, sort, o, maxcount < 0 ? pagesize_ids : Math.min(maxcount, pagesize_ids), CollectionSchema.id.getSolrFieldName());
int count = 0;
for (SolrDocument d: sdl) {
try {queue.put((String) d.getFieldValue(CollectionSchema.id.getSolrFieldName()));} catch (final InterruptedException e) {break;}
count++;
}
if (count < pagesize_ids) break;
o += count;
if (o > maxcount && maxcount > 0) break;
} catch (final SolrException e) {
break;
} catch (final IOException e) {
break;
}
if (count < pagesize_ids) break;
o += pagesize_ids;
} catch (final SolrException e) {
break;
} catch (final IOException e) {
break;
}
}
} catch (Throwable e) {} finally {

@ -24,6 +24,7 @@ import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
@ -364,11 +365,28 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
return this.connector.concurrentDocumentsByQuery(querystring, sort, offset, maxcount, maxtime, buffersize, concurrency, prefetchIDs, fields);
}
@Override
public BlockingQueue<SolrDocument> concurrentDocumentsByQueries(
List<String> querystrings, String sort, int offset, int maxcount,
long maxtime, int buffersize, int concurrency, boolean prefetchIDs,
String... fields) {
commitDocBuffer();
return this.connector.concurrentDocumentsByQueries(querystrings, sort, offset, maxcount, maxtime, buffersize, concurrency, prefetchIDs, fields);
}
@Override
public BlockingQueue<String> concurrentIDsByQuery(String querystring, String sort, int offset, int maxcount, long maxtime, int buffersize, final int concurrency) {
commitDocBuffer();
return this.connector.concurrentIDsByQuery(querystring, sort, offset, maxcount, maxtime, buffersize, concurrency);
}
@Override
public BlockingQueue<String> concurrentIDsByQueries(
List<String> querystrings, String sort, int offset, int maxcount,
long maxtime, int buffersize, int concurrency) {
commitDocBuffer();
return this.connector.concurrentIDsByQueries(querystrings, sort, offset, maxcount, maxtime, buffersize, concurrency);
}
@Override
public void update(final SolrInputDocument solrdoc) throws IOException, SolrException {
@ -381,4 +399,5 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
commitDocBuffer();
this.connector.update(solrdoc);
}
}

@ -22,6 +22,7 @@ package net.yacy.cora.federate.solr.connector;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
@ -223,7 +224,7 @@ public interface SolrConnector extends Iterable<String> /* Iterable of document
public Map<String, ReversibleScoreMap<String>> getFacets(String query, int maxresults, final String ... fields) throws IOException;
/**
* Get a query result from solr as a stream of documents.
* Get results from a solr query as a stream of documents.
* The result queue is considered as terminated if AbstractSolrConnector.POISON_DOCUMENT is returned.
* The method returns immediately and feeds the search results into the queue
* @param querystring the solr query string
@ -235,7 +236,7 @@ public interface SolrConnector extends Iterable<String> /* Iterable of document
* @param concurrency is the number of AbstractSolrConnector.POISON_DOCUMENT entries to add at the end of the feed
* @param prefetchIDs if true, then first all IDs are fetched and then all documents are queries by the ID. If false then documents are retrieved directly
* @param fields list of fields
* @return a blocking queue which is terminated with AbstractSolrConnector.POISON_DOCUMENT as last element
* @return a blocking queue which is terminated with AbstractSolrConnector.POISON_DOCUMENT as last element
*/
public BlockingQueue<SolrDocument> concurrentDocumentsByQuery(
final String querystring,
@ -247,7 +248,33 @@ public interface SolrConnector extends Iterable<String> /* Iterable of document
final int concurrency,
final boolean prefetchIDs,
final String ... fields);
/**
* Get results from solr queries as a stream of documents.
* The result queue is considered as terminated if AbstractSolrConnector.POISON_DOCUMENT is returned.
* The method returns immediately and feeds the search results into the queue
* @param querystrings the list of solr query strings
* @param sort the solr sort string, may be null to be not used
* @param offset first result offset
* @param maxcount the maximum number of results
* @param maxtime the maximum time in milliseconds
* @param buffersize the size of an ArrayBlockingQueue; if <= 0 then a LinkedBlockingQueue is used
* @param concurrency is the number of AbstractSolrConnector.POISON_DOCUMENT entries to add at the end of the feed
* @param prefetchIDs if true, then first all IDs are fetched and then all documents are queries by the ID. If false then documents are retrieved directly
* @param fields list of fields
* @return a blocking queue which is terminated with AbstractSolrConnector.POISON_DOCUMENT as last element
*/
public BlockingQueue<SolrDocument> concurrentDocumentsByQueries(
final List<String> querystrings,
final String sort,
final int offset,
final int maxcount,
final long maxtime,
final int buffersize,
final int concurrency,
final boolean prefetchIDs,
final String ... fields);
/**
* get a document id result stream from a solr query.
* The result queue is considered as terminated if AbstractSolrConnector.POISON_ID is returned.
@ -258,7 +285,7 @@ public interface SolrConnector extends Iterable<String> /* Iterable of document
* @param maxcount
* @param buffersize the size of an ArrayBlockingQueue; if <= 0 then a LinkedBlockingQueue is used
* @param concurrency is the number of AbstractSolrConnector.POISON_ID entries to add at the end of the feed
* @return
* @return a list of ids in q blocking queue which is terminated with a number of AbstractSolrConnector.POISON_ID
*/
public BlockingQueue<String> concurrentIDsByQuery(
final String querystring,
@ -269,4 +296,24 @@ public interface SolrConnector extends Iterable<String> /* Iterable of document
final int buffersize,
final int concurrency);
/**
* get a document id result stream from a set of solr queries.
* The result queue is considered as terminated if AbstractSolrConnector.POISON_ID is returned.
* The method returns immediately and feeds the search results into the queue
* @param querystring a list of query strings
* @param sort the solr sort string, may be null to be not used
* @param offset common offset of all queries
* @param maxcount maximum count for each query
* @param buffersize the size of an ArrayBlockingQueue; if <= 0 then a LinkedBlockingQueue is used
* @param concurrency is the number of AbstractSolrConnector.POISON_ID entries to add at the end of the feed
* @return a list of ids in q blocking queue which is terminated with a number of AbstractSolrConnector.POISON_ID
*/
public BlockingQueue<String> concurrentIDsByQueries(
final List<String> querystrings,
final String sort,
final int offset,
final int maxcount,
final long maxtime,
final int buffersize,
final int concurrency);
}

@ -1264,17 +1264,20 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
// partitioning of the index, get a facet for a partitioning key
final long count = collectionConnector.getCountByQuery(collection1query);
String partitioningKey = CollectionSchema.responsetime_i.getSolrFieldName();
Map<String, ReversibleScoreMap<String>> partitioningFacet = collectionConnector.getFacets(collection1query, 100000, partitioningKey);
ReversibleScoreMap<String> partitioning = partitioningFacet.get(partitioningKey);
long emptyCount = collectionConnector.getCountByQuery("-" + partitioningKey + ":[* TO *] AND (" + collection1query + ")");
if (emptyCount > 0) partitioning.inc("", (int) emptyCount);
final long start = System.currentTimeMillis();
for (String partitioningValue: partitioning) {
String partitioningQuery = (partitioningValue.length() == 0) ?
"-" + partitioningKey + ":[* TO *] AND (" + collection1query + ")" :
partitioningKey + ":" + partitioningValue + " AND (" + collection1query + ")";
postprocessingActivity = "collecting " + partitioning.get(partitioningValue) + " documents from partition \"" + partitioningValue + "\" (averall " + count + ") from the collection for harvestkey " + harvestkey + ", partitioned by " + partitioningKey;
postprocessingActivity = "collecting " + count + " documents from the collection for harvestkey " + harvestkey + ", partitioned by " + partitioningKey;
if (count > 0) {
Map<String, ReversibleScoreMap<String>> partitioningFacet = collectionConnector.getFacets(collection1query, 100000, partitioningKey);
ReversibleScoreMap<String> partitioning = partitioningFacet.get(partitioningKey);
long emptyCount = collectionConnector.getCountByQuery("-" + partitioningKey + ":[* TO *] AND (" + collection1query + ")");
if (emptyCount > 0) partitioning.inc("", (int) emptyCount);
final long start = System.currentTimeMillis();
List<String> querystrings = new ArrayList<>(partitioning.size());
for (String partitioningValue: partitioning) {
String partitioningQuery = (partitioningValue.length() == 0) ?
"-" + partitioningKey + ":[* TO *] AND (" + collection1query + ")" :
partitioningKey + ":" + partitioningValue + " AND (" + collection1query + ")";
querystrings.add(partitioningQuery);
}
// start collection of documents
final int concurrency = Math.max(1, Math.min((int) (MemoryControl.available() / (100L * 1024L * 1024L)), Runtime.getRuntime().availableProcessors()));
//final int concurrency = 1;
@ -1283,8 +1286,8 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
this.contains(CollectionSchema.references_external_i) &&
this.contains(CollectionSchema.references_exthosts_i);
ConcurrentLog.info("CollectionConfiguration", postprocessingActivity);
final BlockingQueue<SolrDocument> docs = collectionConnector.concurrentDocumentsByQuery(
partitioningQuery,
final BlockingQueue<SolrDocument> docs = collectionConnector.concurrentDocumentsByQueries(
querystrings,
(this.contains(CollectionSchema.http_unique_b) || this.contains(CollectionSchema.www_unique_b)) ?
CollectionSchema.host_subdomain_s.getSolrFieldName() + " asc," + // sort on subdomain to get hosts without subdomain first; that gives an opportunity to set www_unique_b flag to false
CollectionSchema.url_protocol_s.getSolrFieldName() + " asc" // sort on protocol to get http before https; that gives an opportunity to set http_unique_b flag to false
@ -1415,19 +1418,17 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
}
// wait for termination
for (int rewrite_start = 0; rewrite_start < concurrency; rewrite_start++) rewriteThread[rewrite_start].join();
if (failids.size() > 0) {
ConcurrentLog.info("CollectionConfiguration", "cleanup_processing: deleting " + failids.size() + " documents which have permanent execution fails");
collectionConnector.deleteByIds(failids);
}
if (count != countcheck.get()) ConcurrentLog.warn("CollectionConfiguration", "ambiguous collection document count for harvestkey " + harvestkey + ": expected=" + count + ", counted=" + countcheck + "; countquery=" + collection1query); // big gap for harvestkey = null
ConcurrentLog.info("CollectionConfiguration", "cleanup_processing: re-calculated " + proccount + " new documents, " +
proccount_referencechange + " reference-count changes, " +
proccount_citationchange + " citation ranking changes.");
}
if (failids.size() > 0) {
ConcurrentLog.info("CollectionConfiguration", "cleanup_processing: deleting " + failids.size() + " documents which have permanent execution fails");
collectionConnector.deleteByIds(failids);
}
if (count != countcheck.get()) ConcurrentLog.warn("CollectionConfiguration", "ambiguous collection document count for harvestkey " + harvestkey + ": expected=" + count + ", counted=" + countcheck + "; countquery=" + collection1query); // big gap for harvestkey = null
ConcurrentLog.info("CollectionConfiguration", "cleanup_processing: re-calculated " + proccount + " new documents, " +
proccount_referencechange + " reference-count changes, " +
proccount_citationchange + " citation ranking changes.");
} catch (final InterruptedException e2) {
ConcurrentLog.warn("CollectionConfiguration", e2.getMessage(), e2);
} catch (IOException e3) {

Loading…
Cancel
Save