reducing the concurrent query stack size and reduced concurrency of

postprocessing to avoid OOM situations
pull/1/head
Michael Peter Christen 11 years ago
parent eca9380e3d
commit 6344718f8b

@ -152,8 +152,10 @@ public abstract class AbstractSolrConnector implements SolrConnector {
final int buffersize, final int buffersize,
final int concurrency, final int concurrency,
final String ... fields) { final String ... fields) {
assert buffersize > 0;
final BlockingQueue<SolrDocument> queue = buffersize <= 0 ? new LinkedBlockingQueue<SolrDocument>() : new ArrayBlockingQueue<SolrDocument>(buffersize); final BlockingQueue<SolrDocument> queue = buffersize <= 0 ? new LinkedBlockingQueue<SolrDocument>() : new ArrayBlockingQueue<SolrDocument>(buffersize);
final long endtime = maxtime == Long.MAX_VALUE ? Long.MAX_VALUE : System.currentTimeMillis() + maxtime; // we know infinity! final long endtime = maxtime == Long.MAX_VALUE ? Long.MAX_VALUE : System.currentTimeMillis() + maxtime; // we know infinity!
final int ps = Math.min(pagesize, buffersize);
final Thread t = new Thread() { final Thread t = new Thread() {
@Override @Override
public void run() { public void run() {
@ -162,12 +164,12 @@ public abstract class AbstractSolrConnector implements SolrConnector {
int count = 0; int count = 0;
while (System.currentTimeMillis() < endtime && count < maxcount) { while (System.currentTimeMillis() < endtime && count < maxcount) {
try { try {
SolrDocumentList sdl = getDocumentListByQuery(querystring, sort, o, Math.min(maxcount, pagesize), fields); SolrDocumentList sdl = getDocumentListByQuery(querystring, sort, o, Math.min(maxcount, ps), fields);
for (SolrDocument d: sdl) { for (SolrDocument d: sdl) {
try {queue.put(d);} catch (final InterruptedException e) {break;} try {queue.put(d);} catch (final InterruptedException e) {break;}
count++; count++;
} }
if (sdl.size() < pagesize) { if (sdl.size() < ps) {
//System.out.println("sdl.size() = " + sdl.size() + ", pagesize = " + pagesize); //System.out.println("sdl.size() = " + sdl.size() + ", pagesize = " + pagesize);
break; break;
} }

@ -239,18 +239,24 @@ public class EmbeddedSolrConnector extends SolrServerConnector implements SolrCo
DocList response = resultContext == null ? new DocSlice(0, 0, new int[0], new float[0], 0, 0.0f) : resultContext.docs; DocList response = resultContext == null ? new DocSlice(0, 0, new int[0], new float[0], 0, 0.0f) : resultContext.docs;
sdl.setNumFound(response == null ? 0 : response.matches()); sdl.setNumFound(response == null ? 0 : response.matches());
sdl.setStart(response == null ? 0 : response.offset()); sdl.setStart(response == null ? 0 : response.offset());
String originalName = Thread.currentThread().getName();
if (response != null) { if (response != null) {
try { try {
SolrIndexSearcher searcher = req.getSearcher(); SolrIndexSearcher searcher = req.getSearcher();
final int responseCount = response.size(); final int responseCount = response.size();
DocIterator iterator = response.iterator(); DocIterator iterator = response.iterator();
for (int i = 0; i < responseCount; i++) { for (int i = 0; i < responseCount; i++) {
sdl.add(doc2SolrDoc(searcher.doc(iterator.nextDoc(), (Set<String>) null))); int docid = iterator.nextDoc();
Thread.currentThread().setName("EmbeddedSolrConnector.SolrQueryResponse2SolrDocumentList: " + docid);
Document responsedoc = searcher.doc(docid, (Set<String>) null);
SolrDocument sordoc = doc2SolrDoc(responsedoc);
sdl.add(sordoc);
} }
} catch (IOException e) { } catch (IOException e) {
ConcurrentLog.logException(e); ConcurrentLog.logException(e);
} }
} }
Thread.currentThread().setName(originalName);
return sdl; return sdl;
} }

@ -762,7 +762,7 @@ public final class Protocol {
// the search-result-url transports all the attributes of word indexes // the search-result-url transports all the attributes of word indexes
if ( !Base64Order.enhancedCoder.equal(entry.urlhash(), urlEntry.hash()) ) { if ( !Base64Order.enhancedCoder.equal(entry.urlhash(), urlEntry.hash()) ) {
Network.log.info("remote search: url-hash " + ASCII.String(urlEntry.hash()) + " does not belong to word-attached-hash " + ASCII.String(entry.urlhash()) + "; url = " + urlEntry.url() + " from peer " + target.getName()); Network.log.info("remote search: url-hash " + ASCII.String(urlEntry.hash()) + " does not belong to word-attached-hash " + ASCII.String(entry.urlhash()) + "; url = " + urlEntry.url().toNormalform(true) + " from peer " + target.getName());
continue; // spammed continue; // spammed
} }

@ -1055,7 +1055,7 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
// To do so, we first must collect all canonical links, find all references to them, get the anchor list of the documents and patch the citation reference of these links // To do so, we first must collect all canonical links, find all references to them, get the anchor list of the documents and patch the citation reference of these links
String patchquery = CollectionSchema.host_s.getSolrFieldName() + ":" + host + " AND " + CollectionSchema.canonical_s.getSolrFieldName() + AbstractSolrConnector.CATCHALL_DTERM; String patchquery = CollectionSchema.host_s.getSolrFieldName() + ":" + host + " AND " + CollectionSchema.canonical_s.getSolrFieldName() + AbstractSolrConnector.CATCHALL_DTERM;
long patchquerycount = collectionConnector.getCountByQuery(patchquery); long patchquerycount = collectionConnector.getCountByQuery(patchquery);
BlockingQueue<SolrDocument> documents_with_canonical_tag = collectionConnector.concurrentDocumentsByQuery(patchquery, CollectionSchema.url_chars_i.getSolrFieldName() + " asc", 0, 100000000, Long.MAX_VALUE, 200, 1, BlockingQueue<SolrDocument> documents_with_canonical_tag = collectionConnector.concurrentDocumentsByQuery(patchquery, CollectionSchema.url_chars_i.getSolrFieldName() + " asc", 0, 100000000, Long.MAX_VALUE, 20, 1,
CollectionSchema.id.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName(), CollectionSchema.canonical_s.getSolrFieldName()); CollectionSchema.id.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName(), CollectionSchema.canonical_s.getSolrFieldName());
SolrDocument doc_B; SolrDocument doc_B;
int patchquerycountcheck = 0; int patchquerycountcheck = 0;
@ -1151,7 +1151,7 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
final long count = segment.fulltext().getWebgraphConnector().getCountByQuery(patchquery); final long count = segment.fulltext().getWebgraphConnector().getCountByQuery(patchquery);
int concurrency = Math.min((int) count, Math.max(1, Runtime.getRuntime().availableProcessors() / 4)); int concurrency = Math.min((int) count, Math.max(1, Runtime.getRuntime().availableProcessors() / 4));
ConcurrentLog.info("CollectionConfiguration", "collecting " + count + " documents from the webgraph, concurrency = " + concurrency); ConcurrentLog.info("CollectionConfiguration", "collecting " + count + " documents from the webgraph, concurrency = " + concurrency);
final BlockingQueue<SolrDocument> docs = segment.fulltext().getWebgraphConnector().concurrentDocumentsByQuery(patchquery, WebgraphSchema.source_chars_i.getSolrFieldName() + " asc", 0, 100000000, Long.MAX_VALUE, 200, concurrency); final BlockingQueue<SolrDocument> docs = segment.fulltext().getWebgraphConnector().concurrentDocumentsByQuery(patchquery, WebgraphSchema.source_chars_i.getSolrFieldName() + " asc", 0, 100000000, Long.MAX_VALUE, concurrency + 1, concurrency);
final AtomicInteger proccount = new AtomicInteger(0); final AtomicInteger proccount = new AtomicInteger(0);
Thread[] t = new Thread[concurrency]; Thread[] t = new Thread[concurrency];
for (final AtomicInteger i = new AtomicInteger(0); i.get() < t.length; i.incrementAndGet()) { for (final AtomicInteger i = new AtomicInteger(0); i.get() < t.length; i.incrementAndGet()) {
@ -1235,7 +1235,8 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
omitFields.add(CollectionSchema.harvestkey_s.getSolrFieldName()); omitFields.add(CollectionSchema.harvestkey_s.getSolrFieldName());
final long count = collectionConnector.getCountByQuery(collection1query); final long count = collectionConnector.getCountByQuery(collection1query);
final long start = System.currentTimeMillis(); final long start = System.currentTimeMillis();
final int concurrency = Runtime.getRuntime().availableProcessors(); final int concurrency = Math.max(1, Math.min((int) (MemoryControl.available() / (100L * 1024L * 1024L)), Runtime.getRuntime().availableProcessors()));
//final int concurrency = 1;
final boolean reference_computation = this.contains(CollectionSchema.references_i) && final boolean reference_computation = this.contains(CollectionSchema.references_i) &&
this.contains(CollectionSchema.references_internal_i) && this.contains(CollectionSchema.references_internal_i) &&
this.contains(CollectionSchema.references_external_i) && this.contains(CollectionSchema.references_external_i) &&
@ -1248,7 +1249,7 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
CollectionSchema.host_subdomain_s.getSolrFieldName() + " asc," + // sort on subdomain to get hosts without subdomain first; that gives an opportunity to set www_unique_b flag to false CollectionSchema.host_subdomain_s.getSolrFieldName() + " asc," + // sort on subdomain to get hosts without subdomain first; that gives an opportunity to set www_unique_b flag to false
CollectionSchema.url_protocol_s.getSolrFieldName() + " asc" // sort on protocol to get http before https; that gives an opportunity to set http_unique_b flag to false CollectionSchema.url_protocol_s.getSolrFieldName() + " asc" // sort on protocol to get http before https; that gives an opportunity to set http_unique_b flag to false
: null, // null sort is faster! : null, // null sort is faster!
0, 100000000, Long.MAX_VALUE, 100, concurrency); 0, 100000000, Long.MAX_VALUE, concurrency + 1, concurrency);
final AtomicInteger proccount = new AtomicInteger(); final AtomicInteger proccount = new AtomicInteger();
final AtomicInteger proccount_referencechange = new AtomicInteger(); final AtomicInteger proccount_referencechange = new AtomicInteger();
final AtomicInteger proccount_citationchange = new AtomicInteger(); final AtomicInteger proccount_citationchange = new AtomicInteger();

Loading…
Cancel
Save