diff --git a/source/net/yacy/search/schema/CollectionConfiguration.java b/source/net/yacy/search/schema/CollectionConfiguration.java index 99c38c74e..bec249e35 100644 --- a/source/net/yacy/search/schema/CollectionConfiguration.java +++ b/source/net/yacy/search/schema/CollectionConfiguration.java @@ -1227,105 +1227,125 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri } // process all documents in collection - Map hostExtentCache = new HashMap(); // a mapping from the host id to the number of documents which contain this host-id - Set uniqueURLs = new HashSet(); + final Map hostExtentCache = new HashMap(); // a mapping from the host id to the number of documents which contain this host-id + final Set uniqueURLs = new HashSet(); try { - Set omitFields = new HashSet(); + final Set omitFields = new HashSet(); omitFields.add(CollectionSchema.process_sxt.getSolrFieldName()); omitFields.add(CollectionSchema.harvestkey_s.getSolrFieldName()); - int proccount = 0, proccount_referencechange = 0, proccount_citationchange = 0; - long count = collectionConnector.getCountByQuery(collection1query); - long start = System.currentTimeMillis(); + final long count = collectionConnector.getCountByQuery(collection1query); + final long start = System.currentTimeMillis(); + final int concurrency = Runtime.getRuntime().availableProcessors(); + final boolean reference_computation = this.contains(CollectionSchema.references_i) && + this.contains(CollectionSchema.references_internal_i) && + this.contains(CollectionSchema.references_external_i) && + this.contains(CollectionSchema.references_exthosts_i); postprocessingActivity = "collecting " + count + " documents from the collection for harvestkey " + harvestkey; ConcurrentLog.info("CollectionConfiguration", postprocessingActivity); - BlockingQueue docs = collectionConnector.concurrentDocumentsByQuery( + final BlockingQueue docs = collectionConnector.concurrentDocumentsByQuery( collection1query, (this.contains(CollectionSchema.http_unique_b) || this.contains(CollectionSchema.www_unique_b)) ? CollectionSchema.host_subdomain_s.getSolrFieldName() + " asc," + // sort on subdomain to get hosts without subdomain first; that gives an opportunity to set www_unique_b flag to false CollectionSchema.url_protocol_s.getSolrFieldName() + " asc" // sort on protocol to get http before https; that gives an opportunity to set http_unique_b flag to false : null, // null sort is faster! - 0, 100000000, Long.MAX_VALUE, 200, 1); - int countcheck = 0; - Collection failids = new ArrayList(); - SolrDocument doc; - while ((doc = docs.take()) != AbstractSolrConnector.POISON_DOCUMENT) { - // for each to-be-processed entry work on the process tag - Collection proctags = doc.getFieldValues(CollectionSchema.process_sxt.getSolrFieldName()); - final String u = (String) doc.getFieldValue(CollectionSchema.sku.getSolrFieldName()); - final String i = (String) doc.getFieldValue(CollectionSchema.id.getSolrFieldName()); - try { - DigestURL url = new DigestURL(u, ASCII.getBytes(i)); - byte[] id = url.hash(); - SolrInputDocument sid = collection.toSolrInputDocument(doc, omitFields); - - for (Object tag: proctags) try { - - // switch over tag types - ProcessType tagtype = ProcessType.valueOf((String) tag); - - if (tagtype == ProcessType.CITATION && - collection.contains(CollectionSchema.cr_host_count_i) && - collection.contains(CollectionSchema.cr_host_chance_d) && - collection.contains(CollectionSchema.cr_host_norm_i)) { - CRV crv = rankings.remove(ASCII.String(id)); // instead of 'get'ting the CRV, we also remove it because we will not need it again and free some memory here - if (crv != null) { - sid.setField(CollectionSchema.cr_host_count_i.getSolrFieldName(), crv.count); - sid.setField(CollectionSchema.cr_host_chance_d.getSolrFieldName(), crv.cr); - sid.setField(CollectionSchema.cr_host_norm_i.getSolrFieldName(), crv.crn); - proccount_citationchange++; + 0, 100000000, Long.MAX_VALUE, 100, concurrency); + final AtomicInteger proccount = new AtomicInteger(); + final AtomicInteger proccount_referencechange = new AtomicInteger(); + final AtomicInteger proccount_citationchange = new AtomicInteger(); + final AtomicInteger countcheck = new AtomicInteger(0); + final Collection failids = new ArrayList(); + final Thread rewriteThread[] = new Thread[concurrency]; + for (int rewrite_start = 0; rewrite_start < concurrency; rewrite_start++) { + rewriteThread[rewrite_start] = new Thread() { + @Override + public void run() { + SolrDocument doc; + try { + while ((doc = docs.take()) != AbstractSolrConnector.POISON_DOCUMENT) { + // for each to-be-processed entry work on the process tag + Collection proctags = doc.getFieldValues(CollectionSchema.process_sxt.getSolrFieldName()); + final String u = (String) doc.getFieldValue(CollectionSchema.sku.getSolrFieldName()); + final String i = (String) doc.getFieldValue(CollectionSchema.id.getSolrFieldName()); + try { + DigestURL url = new DigestURL(u, ASCII.getBytes(i)); + byte[] id = url.hash(); + SolrInputDocument sid = collection.toSolrInputDocument(doc, omitFields); + + for (Object tag: proctags) try { + + // switch over tag types + ProcessType tagtype = ProcessType.valueOf((String) tag); + + if (tagtype == ProcessType.CITATION && + collection.contains(CollectionSchema.cr_host_count_i) && + collection.contains(CollectionSchema.cr_host_chance_d) && + collection.contains(CollectionSchema.cr_host_norm_i)) { + CRV crv = rankings.remove(ASCII.String(id)); // instead of 'get'ting the CRV, we also remove it because we will not need it again and free some memory here + if (crv != null) { + sid.setField(CollectionSchema.cr_host_count_i.getSolrFieldName(), crv.count); + sid.setField(CollectionSchema.cr_host_chance_d.getSolrFieldName(), crv.cr); + sid.setField(CollectionSchema.cr_host_norm_i.getSolrFieldName(), crv.crn); + proccount_citationchange.incrementAndGet(); + } + } + + if (tagtype == ProcessType.UNIQUE) { + postprocessing_http_unique(segment, sid, url); + postprocessing_www_unique(segment, sid, url); + postprocessing_doublecontent(segment, uniqueURLs, sid, url); + } + + } catch (IllegalArgumentException e) {} + + // compute references + if (reference_computation) { + String hosthash = url.hosthash(); + if (!hostExtentCache.containsKey(hosthash)) { + StringBuilder q = new StringBuilder(); + q.append(CollectionSchema.host_id_s.getSolrFieldName()).append(":\"").append(hosthash).append("\" AND ").append(CollectionSchema.httpstatus_i.getSolrFieldName()).append(":200"); + long hostExtentCount = segment.fulltext().getDefaultConnector().getCountByQuery(q.toString()); + hostExtentCache.put(hosthash, hostExtentCount); + } + if (postprocessing_references(rrCache, sid, url, hostExtentCache)) proccount_referencechange.incrementAndGet(); + } + + // all processing steps checked, remove the processing and harvesting key + sid.removeField(CollectionSchema.process_sxt.getSolrFieldName()); + sid.removeField(CollectionSchema.harvestkey_s.getSolrFieldName()); + + // send back to index + //collectionConnector.deleteById(i); + collectionConnector.add(sid); + + int thiscount = proccount.incrementAndGet(); allcount.incrementAndGet(); + if (thiscount % 100 == 0) { + postprocessingActivity = "postprocessed " + thiscount + " from " + count + " collection documents; " + + (thiscount * 60000 / (System.currentTimeMillis() - start)) + " ppm; " + + ((System.currentTimeMillis() - start) * (count - thiscount) / thiscount / 60000) + " minutes remaining"; + ConcurrentLog.info("CollectionConfiguration", postprocessingActivity); + } + } catch (final Throwable e1) { + ConcurrentLog.logException(e1); + failids.add(i); + } + countcheck.incrementAndGet(); } + } catch (InterruptedException e) { + ConcurrentLog.logException(e); } - - if (tagtype == ProcessType.UNIQUE) { - postprocessing_http_unique(segment, sid, url); - postprocessing_www_unique(segment, sid, url); - postprocessing_doublecontent(segment, uniqueURLs, sid, url); - } - - } catch (IllegalArgumentException e) {} - - // refresh the link count; it's 'cheap' to do this here - String hosthash = url.hosthash(); - if (!hostExtentCache.containsKey(hosthash)) { - StringBuilder q = new StringBuilder(); - q.append(CollectionSchema.host_id_s.getSolrFieldName()).append(":\"").append(hosthash).append("\" AND ").append(CollectionSchema.httpstatus_i.getSolrFieldName()).append(":200"); - long hostExtentCount = segment.fulltext().getDefaultConnector().getCountByQuery(q.toString()); - hostExtentCache.put(hosthash, hostExtentCount); - } - if (this.contains(CollectionSchema.references_i) && - this.contains(CollectionSchema.references_internal_i) && - this.contains(CollectionSchema.references_external_i) && - this.contains(CollectionSchema.references_exthosts_i)) { - if (postprocessing_references(rrCache, sid, url, hostExtentCache)) proccount_referencechange++; } - - // all processing steps checked, remove the processing and harvesting key - sid.removeField(CollectionSchema.process_sxt.getSolrFieldName()); - sid.removeField(CollectionSchema.harvestkey_s.getSolrFieldName()); - - // send back to index - //collectionConnector.deleteById(i); - collectionConnector.add(sid); - - proccount++; allcount.incrementAndGet(); - if (proccount % 100 == 0) { - postprocessingActivity = "postprocessed " + proccount + " from " + count + " collection documents; " + - (proccount * 60000 / (System.currentTimeMillis() - start)) + " ppm; " + - ((System.currentTimeMillis() - start) * (count - proccount) / proccount / 60000) + " minutes remaining"; - ConcurrentLog.info("CollectionConfiguration", postprocessingActivity); - } - } catch (final Throwable e1) { - ConcurrentLog.logException(e1); - failids.add(i); - } - countcheck++; + }; + rewriteThread[rewrite_start].start(); } + // wait for termination + for (int rewrite_start = 0; rewrite_start < concurrency; rewrite_start++) rewriteThread[rewrite_start].join(); + if (failids.size() > 0) { ConcurrentLog.info("CollectionConfiguration", "cleanup_processing: deleting " + failids.size() + " documents which have permanent execution fails"); collectionConnector.deleteByIds(failids); } - if (count != countcheck) ConcurrentLog.warn("CollectionConfiguration", "ambiguous collection document count for harvestkey " + harvestkey + ": expected=" + count + ", counted=" + countcheck); // big gap for harvestkey = null - ConcurrentLog.info("CollectionConfiguration", "cleanup_processing: re-calculated " + proccount+ " new documents, " + + if (count != countcheck.get()) ConcurrentLog.warn("CollectionConfiguration", "ambiguous collection document count for harvestkey " + harvestkey + ": expected=" + count + ", counted=" + countcheck); // big gap for harvestkey = null + ConcurrentLog.info("CollectionConfiguration", "cleanup_processing: re-calculated " + proccount + " new documents, " + proccount_referencechange + " reference-count changes, " + proccount_citationchange + " citation ranking changes."); } catch (final InterruptedException e2) {