diff --git a/source/net/yacy/cora/federate/solr/connector/AbstractSolrConnector.java b/source/net/yacy/cora/federate/solr/connector/AbstractSolrConnector.java index bb0cc98a3..df7f9ecc5 100644 --- a/source/net/yacy/cora/federate/solr/connector/AbstractSolrConnector.java +++ b/source/net/yacy/cora/federate/solr/connector/AbstractSolrConnector.java @@ -269,24 +269,27 @@ public abstract class AbstractSolrConnector implements SolrConnector { public void run() { this.setName("AbstractSolrConnector:concurrentIDsByQuery(" + querystring + ")"); int o = offset; - while (System.currentTimeMillis() < endtime) { - try { - SolrDocumentList sdl = getDocumentListByQuery(querystring, sort, o, Math.min(maxcount, pagesize_ids), CollectionSchema.id.getSolrFieldName()); - int count = 0; - for (SolrDocument d: sdl) { - try {queue.put((String) d.getFieldValue(CollectionSchema.id.getSolrFieldName()));} catch (final InterruptedException e) {break;} - count++; + try { + while (System.currentTimeMillis() < endtime) { + try { + SolrDocumentList sdl = getDocumentListByQuery(querystring, sort, o, Math.min(maxcount, pagesize_ids), CollectionSchema.id.getSolrFieldName()); + int count = 0; + for (SolrDocument d: sdl) { + try {queue.put((String) d.getFieldValue(CollectionSchema.id.getSolrFieldName()));} catch (final InterruptedException e) {break;} + count++; + } + if (count < pagesize_ids) break; + o += pagesize_ids; + } catch (final SolrException e) { + break; + } catch (final IOException e) { + break; } - if (count < pagesize_ids) break; - o += pagesize_ids; - } catch (final SolrException e) { - break; - } catch (final IOException e) { - break; } - } - for (int i = 0; i < concurrency; i++) { - try {queue.put(AbstractSolrConnector.POISON_ID);} catch (final InterruptedException e1) {} + } catch (Throwable e) {} finally { + for (int i = 0; i < concurrency; i++) { + try {queue.put(AbstractSolrConnector.POISON_ID);} catch (final InterruptedException e1) {} + } } } }; diff --git a/source/net/yacy/cora/federate/solr/connector/SolrServerConnector.java b/source/net/yacy/cora/federate/solr/connector/SolrServerConnector.java index 098cef848..5f829520a 100644 --- a/source/net/yacy/cora/federate/solr/connector/SolrServerConnector.java +++ b/source/net/yacy/cora/federate/solr/connector/SolrServerConnector.java @@ -300,7 +300,7 @@ public abstract class SolrServerConnector extends AbstractSolrConnector implemen QueryResponse rsp; int retry = 0; Throwable error = null; - while (retry++ < 60) { + while (retry++ < 10) { try { if (q != null) Thread.currentThread().setName("solr query: q = " + q + (fq == null ? "" : ", fq = " + fq) + (sort == null ? "" : ", sort = " + sort) + "; retry = " + retry + "; fl = " + fl); // for debugging in Threaddump rsp = this.server.query(params); diff --git a/source/net/yacy/search/schema/CollectionConfiguration.java b/source/net/yacy/search/schema/CollectionConfiguration.java index 8c7b8bd7d..988334538 100644 --- a/source/net/yacy/search/schema/CollectionConfiguration.java +++ b/source/net/yacy/search/schema/CollectionConfiguration.java @@ -1252,166 +1252,180 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri // process all documents in collection final Map hostExtentCache = new HashMap(); // a mapping from the host id to the number of documents which contain this host-id final Set uniqueURLs = new ConcurrentHashSet(); // will be used in a concurrent environment + final Set omitFields = new HashSet(); + omitFields.add(CollectionSchema.process_sxt.getSolrFieldName()); + omitFields.add(CollectionSchema.harvestkey_s.getSolrFieldName()); + final Collection failids = new ArrayList(); + final AtomicInteger countcheck = new AtomicInteger(0); + final AtomicInteger proccount = new AtomicInteger(); + final AtomicInteger proccount_referencechange = new AtomicInteger(); + final AtomicInteger proccount_citationchange = new AtomicInteger(); try { - final Set omitFields = new HashSet(); - omitFields.add(CollectionSchema.process_sxt.getSolrFieldName()); - omitFields.add(CollectionSchema.harvestkey_s.getSolrFieldName()); + // partitioning of the index, get a facet for a partitioning key final long count = collectionConnector.getCountByQuery(collection1query); - final long start = System.currentTimeMillis(); - final int concurrency = Math.max(1, Math.min((int) (MemoryControl.available() / (100L * 1024L * 1024L)), Runtime.getRuntime().availableProcessors())); - //final int concurrency = 1; - final boolean reference_computation = this.contains(CollectionSchema.references_i) && - this.contains(CollectionSchema.references_internal_i) && - this.contains(CollectionSchema.references_external_i) && - this.contains(CollectionSchema.references_exthosts_i); - postprocessingActivity = "collecting " + count + " documents from the collection for harvestkey " + harvestkey; - ConcurrentLog.info("CollectionConfiguration", postprocessingActivity); - final BlockingQueue docs = collectionConnector.concurrentDocumentsByQuery( - collection1query, - (this.contains(CollectionSchema.http_unique_b) || this.contains(CollectionSchema.www_unique_b)) ? - CollectionSchema.host_subdomain_s.getSolrFieldName() + " asc," + // sort on subdomain to get hosts without subdomain first; that gives an opportunity to set www_unique_b flag to false - CollectionSchema.url_protocol_s.getSolrFieldName() + " asc" // sort on protocol to get http before https; that gives an opportunity to set http_unique_b flag to false - : null, // null sort is faster! - 0, 100000000, Long.MAX_VALUE, concurrency + 1, concurrency, true, - byPartialUpdate ? - new String[]{ - // the following fields are needed to perform the postprocessing - // and should only be used for partial updates; for full updates use a - // full list of fields to avoid LazyInstantiation which has poor performace - CollectionSchema.id.getSolrFieldName(), - CollectionSchema.sku.getSolrFieldName(), - CollectionSchema.harvestkey_s.getSolrFieldName(), - CollectionSchema.process_sxt.getSolrFieldName(), - CollectionSchema.canonical_equal_sku_b.getSolrFieldName(), - CollectionSchema.canonical_s.getSolrFieldName(), - CollectionSchema.exact_signature_l.getSolrFieldName(), - CollectionSchema.fuzzy_signature_l.getSolrFieldName(), - CollectionSchema.title_exact_signature_l.getSolrFieldName(), - CollectionSchema.description_exact_signature_l.getSolrFieldName(), - CollectionSchema.host_id_s.getSolrFieldName(), - CollectionSchema.host_s.getSolrFieldName(), - CollectionSchema.host_subdomain_s.getSolrFieldName(), - CollectionSchema.url_chars_i.getSolrFieldName(), - CollectionSchema.url_protocol_s.getSolrFieldName(), - CollectionSchema.httpstatus_i.getSolrFieldName(), - CollectionSchema.inboundlinkscount_i.getSolrFieldName(), - CollectionSchema.robots_i.getSolrFieldName()} : - this.allFields()); - final AtomicInteger proccount = new AtomicInteger(); - final AtomicInteger proccount_referencechange = new AtomicInteger(); - final AtomicInteger proccount_citationchange = new AtomicInteger(); - final AtomicInteger countcheck = new AtomicInteger(0); - final Collection failids = new ArrayList(); - final Thread rewriteThread[] = new Thread[concurrency]; - for (int rewrite_start = 0; rewrite_start < concurrency; rewrite_start++) { - rewriteThread[rewrite_start] = new Thread() { - @Override - public void run() { - SolrDocument doc; - try { - while ((doc = docs.take()) != AbstractSolrConnector.POISON_DOCUMENT) { - // for each to-be-processed entry work on the process tag - Collection proctags = doc.getFieldValues(CollectionSchema.process_sxt.getSolrFieldName()); - final String u = (String) doc.getFieldValue(CollectionSchema.sku.getSolrFieldName()); - final String i = (String) doc.getFieldValue(CollectionSchema.id.getSolrFieldName()); - if (proctags == null || proctags.size() == 0) { - // this should not happen since we collected the documents using a process_sxt:[* TO *] term - ConcurrentLog.warn("CollectionConfiguration", "no process_sxt entry for url " + u + ", id=" + i); - continue; - } - try { - DigestURL url = new DigestURL(u, ASCII.getBytes(i)); - byte[] id = url.hash(); - SolrInputDocument sid = byPartialUpdate ? new SolrInputDocument() : collection.toSolrInputDocument(doc, omitFields); - sid.setField(CollectionSchema.id.getSolrFieldName(), i); - for (Object tag: proctags) try { + String partitioningKey = CollectionSchema.responsetime_i.getSolrFieldName(); + Map> partitioningFacet = collectionConnector.getFacets(collection1query, 100000, partitioningKey); + ReversibleScoreMap partitioning = partitioningFacet.get(partitioningKey); + long emptyCount = collectionConnector.getCountByQuery(partitioningKey + ":\"\" AND (" + collection1query + ")"); + if (emptyCount > 0) partitioning.inc("", (int) emptyCount); + for (String partitioningValue: partitioning) { + String partitioningQuery = partitioningKey + ":\"" + partitioningValue + "\" AND (" + collection1query + ")"; + postprocessingActivity = "collecting " + partitioning.get(partitioningValue) + " documents from partition \"" + partitioningValue + "\" (averall " + count + ") from the collection for harvestkey " + harvestkey + ", partitioned by " + partitioningKey; + + // start collection of documents + final long start = System.currentTimeMillis(); + final int concurrency = Math.max(1, Math.min((int) (MemoryControl.available() / (100L * 1024L * 1024L)), Runtime.getRuntime().availableProcessors())); + //final int concurrency = 1; + final boolean reference_computation = this.contains(CollectionSchema.references_i) && + this.contains(CollectionSchema.references_internal_i) && + this.contains(CollectionSchema.references_external_i) && + this.contains(CollectionSchema.references_exthosts_i); + ConcurrentLog.info("CollectionConfiguration", postprocessingActivity); + final BlockingQueue docs = collectionConnector.concurrentDocumentsByQuery( + partitioningQuery, + (this.contains(CollectionSchema.http_unique_b) || this.contains(CollectionSchema.www_unique_b)) ? + CollectionSchema.host_subdomain_s.getSolrFieldName() + " asc," + // sort on subdomain to get hosts without subdomain first; that gives an opportunity to set www_unique_b flag to false + CollectionSchema.url_protocol_s.getSolrFieldName() + " asc" // sort on protocol to get http before https; that gives an opportunity to set http_unique_b flag to false + : null, // null sort is faster! + 0, 100000000, Long.MAX_VALUE, concurrency + 1, concurrency, true, + byPartialUpdate ? + new String[]{ + // the following fields are needed to perform the postprocessing + // and should only be used for partial updates; for full updates use a + // full list of fields to avoid LazyInstantiation which has poor performace + CollectionSchema.id.getSolrFieldName(), + CollectionSchema.sku.getSolrFieldName(), + CollectionSchema.harvestkey_s.getSolrFieldName(), + CollectionSchema.process_sxt.getSolrFieldName(), + CollectionSchema.canonical_equal_sku_b.getSolrFieldName(), + CollectionSchema.canonical_s.getSolrFieldName(), + CollectionSchema.exact_signature_l.getSolrFieldName(), + CollectionSchema.fuzzy_signature_l.getSolrFieldName(), + CollectionSchema.title_exact_signature_l.getSolrFieldName(), + CollectionSchema.description_exact_signature_l.getSolrFieldName(), + CollectionSchema.host_id_s.getSolrFieldName(), + CollectionSchema.host_s.getSolrFieldName(), + CollectionSchema.host_subdomain_s.getSolrFieldName(), + CollectionSchema.url_chars_i.getSolrFieldName(), + CollectionSchema.url_protocol_s.getSolrFieldName(), + CollectionSchema.httpstatus_i.getSolrFieldName(), + CollectionSchema.inboundlinkscount_i.getSolrFieldName(), + CollectionSchema.robots_i.getSolrFieldName()} : + this.allFields()); + final Thread rewriteThread[] = new Thread[concurrency]; + for (int rewrite_start = 0; rewrite_start < concurrency; rewrite_start++) { + rewriteThread[rewrite_start] = new Thread() { + @Override + public void run() { + SolrDocument doc; + try { + while ((doc = docs.take()) != AbstractSolrConnector.POISON_DOCUMENT) { + // for each to-be-processed entry work on the process tag + Collection proctags = doc.getFieldValues(CollectionSchema.process_sxt.getSolrFieldName()); + final String u = (String) doc.getFieldValue(CollectionSchema.sku.getSolrFieldName()); + final String i = (String) doc.getFieldValue(CollectionSchema.id.getSolrFieldName()); + if (proctags == null || proctags.size() == 0) { + // this should not happen since we collected the documents using a process_sxt:[* TO *] term + ConcurrentLog.warn("CollectionConfiguration", "no process_sxt entry for url " + u + ", id=" + i); + continue; + } + try { + DigestURL url = new DigestURL(u, ASCII.getBytes(i)); + byte[] id = url.hash(); + SolrInputDocument sid = byPartialUpdate ? new SolrInputDocument() : collection.toSolrInputDocument(doc, omitFields); + sid.setField(CollectionSchema.id.getSolrFieldName(), i); + for (Object tag: proctags) try { + + // switch over tag types + ProcessType tagtype = ProcessType.valueOf((String) tag); + + if (tagtype == ProcessType.CITATION && + collection.contains(CollectionSchema.cr_host_count_i) && + collection.contains(CollectionSchema.cr_host_chance_d) && + collection.contains(CollectionSchema.cr_host_norm_i)) { + CRV crv = rankings.remove(ASCII.String(id)); // instead of 'get'ting the CRV, we also remove it because we will not need it again and free some memory here + if (crv != null) { + sid.setField(CollectionSchema.cr_host_count_i.getSolrFieldName(), crv.count); + sid.setField(CollectionSchema.cr_host_chance_d.getSolrFieldName(), crv.cr); + sid.setField(CollectionSchema.cr_host_norm_i.getSolrFieldName(), crv.crn); + proccount_citationchange.incrementAndGet(); + } + } + + if (tagtype == ProcessType.UNIQUE) { + postprocessing_http_unique(segment, doc, sid, url); + postprocessing_www_unique(segment, doc, sid, url); + postprocessing_doublecontent(segment, uniqueURLs, doc, sid, url); + } + + } catch (IllegalArgumentException e) {} - // switch over tag types - ProcessType tagtype = ProcessType.valueOf((String) tag); - - if (tagtype == ProcessType.CITATION && - collection.contains(CollectionSchema.cr_host_count_i) && - collection.contains(CollectionSchema.cr_host_chance_d) && - collection.contains(CollectionSchema.cr_host_norm_i)) { - CRV crv = rankings.remove(ASCII.String(id)); // instead of 'get'ting the CRV, we also remove it because we will not need it again and free some memory here - if (crv != null) { - sid.setField(CollectionSchema.cr_host_count_i.getSolrFieldName(), crv.count); - sid.setField(CollectionSchema.cr_host_chance_d.getSolrFieldName(), crv.cr); - sid.setField(CollectionSchema.cr_host_norm_i.getSolrFieldName(), crv.crn); - proccount_citationchange.incrementAndGet(); + // compute references + if (reference_computation) { + String hosthash = url.hosthash(); + if (!hostExtentCache.containsKey(hosthash)) { + StringBuilder q = new StringBuilder(); + q.append(CollectionSchema.host_id_s.getSolrFieldName()).append(":\"").append(hosthash).append("\" AND ").append(CollectionSchema.httpstatus_i.getSolrFieldName()).append(":200"); + long hostExtentCount = segment.fulltext().getDefaultConnector().getCountByQuery(q.toString()); + hostExtentCache.put(hosthash, hostExtentCount); } + if (postprocessing_references(rrCache, sid, url, hostExtentCache)) proccount_referencechange.incrementAndGet(); } - - if (tagtype == ProcessType.UNIQUE) { - postprocessing_http_unique(segment, doc, sid, url); - postprocessing_www_unique(segment, doc, sid, url); - postprocessing_doublecontent(segment, uniqueURLs, doc, sid, url); + + // all processing steps checked, remove the processing and harvesting key + if (byPartialUpdate) { + sid.setField(CollectionSchema.process_sxt.getSolrFieldName(), null); // setting this to null will cause a removal when doing a partial update + sid.setField(CollectionSchema.harvestkey_s.getSolrFieldName(), null); + } else { + sid.removeField(CollectionSchema.process_sxt.getSolrFieldName()); + sid.removeField(CollectionSchema.harvestkey_s.getSolrFieldName()); } + // with standard solr fields selected, the sid now contains the fields + // id, http_unique_b, www_unique_b, references_i, references_internal_i, references_external_i, references_exthosts_i, host_extent_i + // and the value for host_extent_i is by default 2147483647 - } catch (IllegalArgumentException e) {} - - // compute references - if (reference_computation) { - String hosthash = url.hosthash(); - if (!hostExtentCache.containsKey(hosthash)) { - StringBuilder q = new StringBuilder(); - q.append(CollectionSchema.host_id_s.getSolrFieldName()).append(":\"").append(hosthash).append("\" AND ").append(CollectionSchema.httpstatus_i.getSolrFieldName()).append(":200"); - long hostExtentCount = segment.fulltext().getDefaultConnector().getCountByQuery(q.toString()); - hostExtentCache.put(hosthash, hostExtentCount); + // send back to index + //collectionConnector.deleteById(i); + if (byPartialUpdate) { + collectionConnector.update(sid); + } else { + collectionConnector.add(sid); } - if (postprocessing_references(rrCache, sid, url, hostExtentCache)) proccount_referencechange.incrementAndGet(); - } - - // all processing steps checked, remove the processing and harvesting key - if (byPartialUpdate) { - sid.setField(CollectionSchema.process_sxt.getSolrFieldName(), null); // setting this to null will cause a removal when doing a partial update - sid.setField(CollectionSchema.harvestkey_s.getSolrFieldName(), null); - } else { - sid.removeField(CollectionSchema.process_sxt.getSolrFieldName()); - sid.removeField(CollectionSchema.harvestkey_s.getSolrFieldName()); - } - // with standard solr fields selected, the sid now contains the fields - // id, http_unique_b, www_unique_b, references_i, references_internal_i, references_external_i, references_exthosts_i, host_extent_i - // and the value for host_extent_i is by default 2147483647 - - // send back to index - //collectionConnector.deleteById(i); - if (byPartialUpdate) { - collectionConnector.update(sid); - } else { - collectionConnector.add(sid); - } - long thiscount = proccount.incrementAndGet(); allcount.incrementAndGet(); - if (thiscount % 100 == 0) { - postprocessingActivity = "postprocessed " + thiscount + " from " + count + " collection documents; " + - (thiscount * 60000L / (System.currentTimeMillis() - start)) + " ppm; " + - ((System.currentTimeMillis() - start) * (count - thiscount) / thiscount / 60000) + " minutes remaining"; - ConcurrentLog.info("CollectionConfiguration", postprocessingActivity); + long thiscount = proccount.incrementAndGet(); allcount.incrementAndGet(); + if (thiscount % 100 == 0) { + postprocessingActivity = "postprocessed " + thiscount + " from " + count + " collection documents; " + + (thiscount * 60000L / (System.currentTimeMillis() - start)) + " ppm; " + + ((System.currentTimeMillis() - start) * (count - thiscount) / thiscount / 60000) + " minutes remaining"; + ConcurrentLog.info("CollectionConfiguration", postprocessingActivity); + } + } catch (final Throwable e1) { + ConcurrentLog.logException(e1); + failids.add(i); } - } catch (final Throwable e1) { - ConcurrentLog.logException(e1); - failids.add(i); + countcheck.incrementAndGet(); } - countcheck.incrementAndGet(); + } catch (InterruptedException e) { + ConcurrentLog.logException(e); } - } catch (InterruptedException e) { - ConcurrentLog.logException(e); } - } - }; - rewriteThread[rewrite_start].start(); + }; + rewriteThread[rewrite_start].start(); + } + // wait for termination + for (int rewrite_start = 0; rewrite_start < concurrency; rewrite_start++) rewriteThread[rewrite_start].join(); } - // wait for termination - for (int rewrite_start = 0; rewrite_start < concurrency; rewrite_start++) rewriteThread[rewrite_start].join(); if (failids.size() > 0) { ConcurrentLog.info("CollectionConfiguration", "cleanup_processing: deleting " + failids.size() + " documents which have permanent execution fails"); collectionConnector.deleteByIds(failids); } - if (count != countcheck.get()) ConcurrentLog.warn("CollectionConfiguration", "ambiguous collection document count for harvestkey " + harvestkey + ": expected=" + count + ", counted=" + countcheck); // big gap for harvestkey = null + if (count != countcheck.get()) ConcurrentLog.warn("CollectionConfiguration", "ambiguous collection document count for harvestkey " + harvestkey + ": expected=" + count + ", counted=" + countcheck + "; countquery=" + collection1query); // big gap for harvestkey = null ConcurrentLog.info("CollectionConfiguration", "cleanup_processing: re-calculated " + proccount + " new documents, " + proccount_referencechange + " reference-count changes, " + proccount_citationchange + " citation ranking changes."); + + + } catch (final InterruptedException e2) { ConcurrentLog.warn("CollectionConfiguration", e2.getMessage(), e2); } catch (IOException e3) {