From 42f45760ed51862ad8d93e571b22a81902b3c174 Mon Sep 17 00:00:00 2001 From: luccioman Date: Wed, 31 Aug 2016 12:16:25 +0200 Subject: [PATCH] Refactored postprocessing For easier understanding and performances profiling. --- .../schema/CollectionConfiguration.java | 441 +++++++++--------- 1 file changed, 231 insertions(+), 210 deletions(-) diff --git a/source/net/yacy/search/schema/CollectionConfiguration.java b/source/net/yacy/search/schema/CollectionConfiguration.java index 7fda7067c..0ea4a5747 100644 --- a/source/net/yacy/search/schema/CollectionConfiguration.java +++ b/source/net/yacy/search/schema/CollectionConfiguration.java @@ -48,6 +48,13 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Pattern; +import org.apache.solr.common.SolrDocument; +import org.apache.solr.common.SolrDocumentList; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.SolrInputField; +import org.eclipse.jetty.util.ConcurrentHashSet; + import net.yacy.cora.document.analysis.Classification; import net.yacy.cora.document.analysis.Classification.ContentDomain; import net.yacy.cora.document.analysis.EnhancedTextProfileSignature; @@ -100,13 +107,6 @@ import net.yacy.search.index.Segment.ReferenceReport; import net.yacy.search.index.Segment.ReferenceReportCache; import net.yacy.search.query.QueryParams; -import org.apache.solr.common.SolrDocument; -import org.apache.solr.common.SolrDocumentList; -import org.apache.solr.common.SolrException; -import org.apache.solr.common.SolrInputDocument; -import org.apache.solr.common.SolrInputField; -import org.eclipse.jetty.util.ConcurrentHashSet; - public class CollectionConfiguration extends SchemaConfiguration implements Serializable { @@ -1131,212 +1131,38 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri collection.contains(CollectionSchema.cr_host_chance_d) && collection.contains(CollectionSchema.cr_host_norm_i))); // create the ranking map - final Map rankings = new ConcurrentHashMap(); - if (shallComputeCR) try { - int concurrency = Math.min(collection1hosts.size(), Runtime.getRuntime().availableProcessors()); - postprocessingActivity = "collecting cr for " + collection1hosts.size() + " hosts, concurrency = " + concurrency; - ConcurrentLog.info("CollectionConfiguration", postprocessingActivity); - int countcheck = 0; - for (String host: collection1hosts.keyList(true)) { - // Patch the citation index for links with canonical tags. - // This shall fulfill the following requirement: - // If a document A links to B and B contains a 'canonical C', then the citation rank computation shall consider that A links to C and B does not link to C. - // To do so, we first must collect all canonical links, find all references to them, get the anchor list of the documents and patch the citation reference of these links - String patchquery = CollectionSchema.host_s.getSolrFieldName() + ":" + host + " AND " + CollectionSchema.canonical_s.getSolrFieldName() + AbstractSolrConnector.CATCHALL_DTERM; - long patchquerycount = collectionConnector.getCountByQuery("{!cache=false}" + patchquery); - BlockingQueue documents_with_canonical_tag = collectionConnector.concurrentDocumentsByQuery(patchquery, CollectionSchema.url_chars_i.getSolrFieldName() + " asc", 0, 100000000, Long.MAX_VALUE, 20, 1, true, - CollectionSchema.id.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName(), CollectionSchema.canonical_s.getSolrFieldName()); - SolrDocument doc_B; - int patchquerycountcheck = 0; - try { - while ((doc_B = documents_with_canonical_tag.take()) != AbstractSolrConnector.POISON_DOCUMENT) { - // find all documents which link to the canonical doc - DigestURL doc_C_url = new DigestURL((String) doc_B.getFieldValue(CollectionSchema.canonical_s.getSolrFieldName())); - byte[] doc_B_id = ASCII.getBytes(((String) doc_B.getFieldValue(CollectionSchema.id.getSolrFieldName()))); - // we remove all references to B, because these become references to C - if (segment.connectedCitation()) { - ReferenceContainer doc_A_ids = segment.urlCitation().remove(doc_B_id); - if (doc_A_ids == null) { - //System.out.println("*** document with canonical but no referrer: " + doc_B.getFieldValue(CollectionSchema.sku.getSolrFieldName())); - continue; // the document has a canonical tag but no referrer? - } - Iterator doc_A_ids_iterator = doc_A_ids.entries(); - // for each of the referrer A of B, set A as a referrer of C - while (doc_A_ids_iterator.hasNext()) { - CitationReference doc_A_citation = doc_A_ids_iterator.next(); - segment.urlCitation().add(doc_C_url.hash(), doc_A_citation); - } - } - patchquerycountcheck++; - if (MemoryControl.shortStatus()) { - ConcurrentLog.warn("CollectionConfiguration", "terminated canonical collection during postprocessing because of short memory"); - break; - } - } - } catch (InterruptedException e) { - ConcurrentLog.logException(e); - } catch (SpaceExceededException e) { - ConcurrentLog.logException(e); - } - if (patchquerycount != patchquerycountcheck) ConcurrentLog.warn("CollectionConfiguration", "ambiguous patchquery count for host " + host + ": expected=" + patchquerycount + ", counted=" + patchquerycountcheck); - - // do the citation rank computation - if (collection1hosts.get(host) <= 0) continue; - // select all documents for each host - CRHost crh = new CRHost(segment, rrCache, host, 0.85d, 6); - int convergence_attempts = 0; - while (convergence_attempts++ < 30) { - ConcurrentLog.info("CollectionConfiguration", "convergence step " + convergence_attempts + " for host " + host + " ..."); - if (crh.convergenceStep()) break; - if (MemoryControl.shortStatus()) { - ConcurrentLog.warn("CollectionConfiguration", "terminated convergenceStep during postprocessing because of short memory"); - break; - } - } - ConcurrentLog.info("CollectionConfiguration", "convergence for host " + host + " after " + convergence_attempts + " steps"); - // we have now the cr for all documents of a specific host; we store them for later use - Map crn = crh.normalize(); - //crh.log(crn); - rankings.putAll(crn); // accumulate this here for usage in document update later - if (MemoryControl.shortStatus()) { - ConcurrentLog.warn("CollectionConfiguration", "terminated crn akkumulation during postprocessing because of short memory"); - break; - } - countcheck++; - } - if (collection1hosts.size() != countcheck) ConcurrentLog.warn("CollectionConfiguration", "ambiguous host count: expected=" + collection1hosts.size() + ", counted=" + countcheck); - } catch (final IOException e2) { - ConcurrentLog.logException(e2); - collection1hosts = new ClusteredScoreMap(true); - } + final Map rankings = createRankingMap(segment, rrCache, collectionConnector, collection1hosts, + shallComputeCR); // process all documents at the webgraph for the outgoing links of this document final AtomicInteger allcount = new AtomicInteger(0); if (segment.fulltext().useWebgraph() && shallComputeCR) { - postprocessingActivity = "collecting host facets for webgraph cr calculation"; - ConcurrentLog.info("CollectionConfiguration", postprocessingActivity); - final Set omitFields = new HashSet(); - omitFields.add(WebgraphSchema.process_sxt.getSolrFieldName()); - omitFields.add(WebgraphSchema.harvestkey_s.getSolrFieldName()); - - // collect hosts from index which shall take part in citation computation - ReversibleScoreMap webgraphhosts; - try { - Map> hostfacet = segment.fulltext().getWebgraphConnector().getFacets(webgraphquery, 10000000, WebgraphSchema.source_host_s.getSolrFieldName()); - webgraphhosts = hostfacet.get(WebgraphSchema.source_host_s.getSolrFieldName()); - } catch (final IOException e2) { - ConcurrentLog.logException(e2); - webgraphhosts = new ClusteredScoreMap(true); - } - try { - final long start = System.currentTimeMillis(); - for (String host: webgraphhosts.keyList(true)) { - if (webgraphhosts.get(host) <= 0) continue; - final String hostfinal = host; - // select all webgraph edges and modify their cr value - postprocessingActivity = "writing cr values to webgraph for host " + host; - ConcurrentLog.info("CollectionConfiguration", postprocessingActivity); - String patchquery = WebgraphSchema.source_host_s.getSolrFieldName() + ":\"" + host + "\" AND " + WebgraphSchema.process_sxt.getSolrFieldName() + AbstractSolrConnector.CATCHALL_DTERM; - final long count = segment.fulltext().getWebgraphConnector().getCountByQuery("{!cache=false}" + patchquery); - int concurrency = Math.min((int) count, Math.max(1, Runtime.getRuntime().availableProcessors() / 4)); - ConcurrentLog.info("CollectionConfiguration", "collecting " + count + " documents from the webgraph, concurrency = " + concurrency); - final BlockingQueue docs = segment.fulltext().getWebgraphConnector().concurrentDocumentsByQuery( - patchquery, - WebgraphSchema.source_chars_i.getSolrFieldName() + " asc", - 0, 100000000, Long.MAX_VALUE, concurrency + 1, concurrency, true - // TODO: add field list and do partial updates - ); - final AtomicInteger proccount = new AtomicInteger(0); - Thread[] t = new Thread[concurrency]; - for (final AtomicInteger i = new AtomicInteger(0); i.get() < t.length; i.incrementAndGet()) { - t[i.get()] = new Thread() { - private String name = "CollectionConfiguration.postprocessing.webgraph-" + i.get(); - @Override - public void run() { - Thread.currentThread().setName(name); - SolrDocument doc; String id; - try { - processloop: while ((doc = docs.take()) != AbstractSolrConnector.POISON_DOCUMENT) { - try { - SolrInputDocument sid = webgraph.toSolrInputDocument(doc, omitFields); - Collection proctags = doc.getFieldValues(WebgraphSchema.process_sxt.getSolrFieldName()); - - for (Object tag: proctags) try { - - // switch over tag types - ProcessType tagtype = ProcessType.valueOf((String) tag); - - // set cr values - if (tagtype == ProcessType.CITATION) { - if (segment.fulltext().useWebgraph() && webgraph.contains(WebgraphSchema.source_id_s) && webgraph.contains(WebgraphSchema.source_cr_host_norm_i)) { - id = (String) doc.getFieldValue(WebgraphSchema.source_id_s.getSolrFieldName()); - CRV crv = rankings.get(id); - if (crv != null) { - sid.setField(WebgraphSchema.source_cr_host_norm_i.getSolrFieldName(), crv.crn); - } - } - if (webgraph.contains(WebgraphSchema.target_id_s) && webgraph.contains(WebgraphSchema.target_cr_host_norm_i)) { - id = (String) doc.getFieldValue(WebgraphSchema.target_id_s.getSolrFieldName()); - CRV crv = rankings.get(id); - if (crv != null) { - sid.setField(WebgraphSchema.target_cr_host_norm_i.getSolrFieldName(), crv.crn); - } - } - } - } catch (IllegalArgumentException e) { - ConcurrentLog.logException(e); - } - - // write document back to index - try { - sid.removeField(WebgraphSchema.process_sxt.getSolrFieldName()); - sid.removeField(WebgraphSchema.harvestkey_s.getSolrFieldName()); - //segment.fulltext().getWebgraphConnector().deleteById((String) sid.getFieldValue(WebgraphSchema.id.getSolrFieldName())); - segment.fulltext().getWebgraphConnector().add(sid); - } catch (SolrException e) { - ConcurrentLog.logException(e); - } catch (IOException e) { - ConcurrentLog.logException(e); - } - proccount.incrementAndGet(); - allcount.incrementAndGet(); - if (proccount.get() % 1000 == 0) { - postprocessingActivity = "writing cr values to webgraph for host " + hostfinal + "postprocessed " + proccount + " from " + count + " documents; " + - (proccount.get() * 1000 / (System.currentTimeMillis() - start)) + " docs/second; " + - ((System.currentTimeMillis() - start) * (count - proccount.get()) / proccount.get() / 60000) + " minutes remaining"; - ConcurrentLog.info("CollectionConfiguration", postprocessingActivity); - } - } catch (Throwable e) { - ConcurrentLog.logException(e); - continue processloop; - } - } - } catch (InterruptedException e) { - ConcurrentLog.warn("CollectionConfiguration", e.getMessage(), e); - } - } - }; - t[i.get()].start(); - } - for (int i = 0; i < t.length; i++) try { - t[i].join(10000); - if (t[i].isAlive()) t[i].interrupt(); - } catch (InterruptedException e) {} - - if (count != proccount.get()) ConcurrentLog.warn("CollectionConfiguration", "ambiguous webgraph document count for host " + host + ": expected=" + count + ", counted=" + proccount); - } - } catch (final IOException e2) { - ConcurrentLog.warn("CollectionConfiguration", e2.getMessage(), e2); - } + postprocessWebgraph(segment, webgraph, webgraphquery, rankings, allcount); } // process all documents in collection - final Map hostExtentCache = new HashMap(); // a mapping from the host id to the number of documents which contain this host-id + postprocessDocuments(segment, rrCache, harvestkey, byPartialUpdate, collectionConnector, collection, + collection1query, rankings, allcount); + + + postprocessingCollection1Count = 0; + postprocessingWebgraphCount = 0; + postprocessingActivity = "postprocessing terminated"; + ConcurrentLog.info("CollectionConfiguration", postprocessingActivity); + postprocessingRunning = false; + return allcount.get(); + } + + private void postprocessDocuments(final Segment segment, final ReferenceReportCache rrCache, + final String harvestkey, final boolean byPartialUpdate, final SolrConnector collectionConnector, + final CollectionConfiguration collection, final String collection1query, final Map rankings, + final AtomicInteger allcount) { + final Map hostExtentCache = new HashMap(); // a mapping from the host id to the number of documents which contain this host-id final Set uniqueURLs = new ConcurrentHashSet(); // will be used in a concurrent environment final Set omitFields = new HashSet(); omitFields.add(CollectionSchema.process_sxt.getSolrFieldName()); omitFields.add(CollectionSchema.harvestkey_s.getSolrFieldName()); - final Collection failids = new ArrayList(); + final Collection failids = new ConcurrentHashSet(); final AtomicInteger countcheck = new AtomicInteger(0); final AtomicInteger proccount = new AtomicInteger(); final AtomicInteger proccount_referencechange = new AtomicInteger(); @@ -1400,7 +1226,7 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri this.allFields()); final Thread rewriteThread[] = new Thread[concurrency]; for (int rewrite_start = 0; rewrite_start < concurrency; rewrite_start++) { - rewriteThread[rewrite_start] = new Thread() { + rewriteThread[rewrite_start] = new Thread("CollectionConfiguration.postprocessing.rewriteThread-" + rewrite_start) { @Override public void run() { SolrDocument doc; @@ -1516,13 +1342,208 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri ConcurrentLog.warn("CollectionConfiguration", e3.getMessage(), e3); } collectionConnector.commit(true); // make changes available directly to prevent that the process repeats again - postprocessingCollection1Count = 0; - postprocessingWebgraphCount = 0; - postprocessingActivity = "postprocessing terminated"; - ConcurrentLog.info("CollectionConfiguration", postprocessingActivity); - postprocessingRunning = false; - return allcount.get(); - } + } + + private void postprocessWebgraph(final Segment segment, final WebgraphConfiguration webgraph, String webgraphquery, + final Map rankings, final AtomicInteger allcount) { + postprocessingActivity = "collecting host facets for webgraph cr calculation"; + ConcurrentLog.info("CollectionConfiguration", postprocessingActivity); + final Set omitFields = new HashSet(); + omitFields.add(WebgraphSchema.process_sxt.getSolrFieldName()); + omitFields.add(WebgraphSchema.harvestkey_s.getSolrFieldName()); + + // collect hosts from index which shall take part in citation computation + ReversibleScoreMap webgraphhosts; + try { + Map> hostfacet = segment.fulltext().getWebgraphConnector().getFacets(webgraphquery, 10000000, WebgraphSchema.source_host_s.getSolrFieldName()); + webgraphhosts = hostfacet.get(WebgraphSchema.source_host_s.getSolrFieldName()); + } catch (final IOException e2) { + ConcurrentLog.logException(e2); + webgraphhosts = new ClusteredScoreMap(true); + } + try { + final long start = System.currentTimeMillis(); + for (String host: webgraphhosts.keyList(true)) { + if (webgraphhosts.get(host) <= 0) continue; + final String hostfinal = host; + // select all webgraph edges and modify their cr value + postprocessingActivity = "writing cr values to webgraph for host " + host; + ConcurrentLog.info("CollectionConfiguration", postprocessingActivity); + String patchquery = WebgraphSchema.source_host_s.getSolrFieldName() + ":\"" + host + "\" AND " + WebgraphSchema.process_sxt.getSolrFieldName() + AbstractSolrConnector.CATCHALL_DTERM; + final long count = segment.fulltext().getWebgraphConnector().getCountByQuery("{!cache=false}" + patchquery); + int concurrency = Math.min((int) count, Math.max(1, Runtime.getRuntime().availableProcessors() / 4)); + ConcurrentLog.info("CollectionConfiguration", "collecting " + count + " documents from the webgraph, concurrency = " + concurrency); + final BlockingQueue docs = segment.fulltext().getWebgraphConnector().concurrentDocumentsByQuery( + patchquery, + WebgraphSchema.source_chars_i.getSolrFieldName() + " asc", + 0, 100000000, Long.MAX_VALUE, concurrency + 1, concurrency, true + // TODO: add field list and do partial updates + ); + final AtomicInteger proccount = new AtomicInteger(0); + Thread[] t = new Thread[concurrency]; + for (int i = 0; i < t.length; i++) { + t[i] = new Thread("CollectionConfiguration.postprocessing.webgraph-" + i) { + @Override + public void run() { + SolrDocument doc; String id; + try { + processloop: while ((doc = docs.take()) != AbstractSolrConnector.POISON_DOCUMENT) { + try { + SolrInputDocument sid = webgraph.toSolrInputDocument(doc, omitFields); + Collection proctags = doc.getFieldValues(WebgraphSchema.process_sxt.getSolrFieldName()); + + for (Object tag: proctags) try { + + // switch over tag types + ProcessType tagtype = ProcessType.valueOf((String) tag); + + // set cr values + if (tagtype == ProcessType.CITATION) { + if (segment.fulltext().useWebgraph() && webgraph.contains(WebgraphSchema.source_id_s) && webgraph.contains(WebgraphSchema.source_cr_host_norm_i)) { + id = (String) doc.getFieldValue(WebgraphSchema.source_id_s.getSolrFieldName()); + CRV crv = rankings.get(id); + if (crv != null) { + sid.setField(WebgraphSchema.source_cr_host_norm_i.getSolrFieldName(), crv.crn); + } + } + if (webgraph.contains(WebgraphSchema.target_id_s) && webgraph.contains(WebgraphSchema.target_cr_host_norm_i)) { + id = (String) doc.getFieldValue(WebgraphSchema.target_id_s.getSolrFieldName()); + CRV crv = rankings.get(id); + if (crv != null) { + sid.setField(WebgraphSchema.target_cr_host_norm_i.getSolrFieldName(), crv.crn); + } + } + } + } catch (IllegalArgumentException e) { + ConcurrentLog.logException(e); + } + + // write document back to index + try { + sid.removeField(WebgraphSchema.process_sxt.getSolrFieldName()); + sid.removeField(WebgraphSchema.harvestkey_s.getSolrFieldName()); + //segment.fulltext().getWebgraphConnector().deleteById((String) sid.getFieldValue(WebgraphSchema.id.getSolrFieldName())); + segment.fulltext().getWebgraphConnector().add(sid); + } catch (SolrException e) { + ConcurrentLog.logException(e); + } catch (IOException e) { + ConcurrentLog.logException(e); + } + proccount.incrementAndGet(); + allcount.incrementAndGet(); + if (proccount.get() % 1000 == 0) { + postprocessingActivity = "writing cr values to webgraph for host " + hostfinal + "postprocessed " + proccount + " from " + count + " documents; " + + (proccount.get() * 1000 / (System.currentTimeMillis() - start)) + " docs/second; " + + ((System.currentTimeMillis() - start) * (count - proccount.get()) / proccount.get() / 60000) + " minutes remaining"; + ConcurrentLog.info("CollectionConfiguration", postprocessingActivity); + } + } catch (Throwable e) { + ConcurrentLog.logException(e); + continue processloop; + } + } + } catch (InterruptedException e) { + ConcurrentLog.warn("CollectionConfiguration", e.getMessage(), e); + } + } + }; + t[i].start(); + } + for (int i = 0; i < t.length; i++) try { + t[i].join(10000); + if (t[i].isAlive()) t[i].interrupt(); + } catch (InterruptedException e) {} + + if (count != proccount.get()) ConcurrentLog.warn("CollectionConfiguration", "ambiguous webgraph document count for host " + host + ": expected=" + count + ", counted=" + proccount); + } + } catch (final IOException e2) { + ConcurrentLog.warn("CollectionConfiguration", e2.getMessage(), e2); + } + } + + private Map createRankingMap(final Segment segment, final ReferenceReportCache rrCache, + final SolrConnector collectionConnector, ReversibleScoreMap collection1hosts, + boolean shallComputeCR) { + final Map rankings = new ConcurrentHashMap(); + if (shallComputeCR) try { + int concurrency = Math.min(collection1hosts.size(), Runtime.getRuntime().availableProcessors()); + postprocessingActivity = "collecting cr for " + collection1hosts.size() + " hosts, concurrency = " + concurrency; + ConcurrentLog.info("CollectionConfiguration", postprocessingActivity); + int countcheck = 0; + for (String host: collection1hosts.keyList(true)) { + // Patch the citation index for links with canonical tags. + // This shall fulfill the following requirement: + // If a document A links to B and B contains a 'canonical C', then the citation rank computation shall consider that A links to C and B does not link to C. + // To do so, we first must collect all canonical links, find all references to them, get the anchor list of the documents and patch the citation reference of these links + String patchquery = CollectionSchema.host_s.getSolrFieldName() + ":" + host + " AND " + CollectionSchema.canonical_s.getSolrFieldName() + AbstractSolrConnector.CATCHALL_DTERM; + long patchquerycount = collectionConnector.getCountByQuery("{!cache=false}" + patchquery); + BlockingQueue documents_with_canonical_tag = collectionConnector.concurrentDocumentsByQuery(patchquery, CollectionSchema.url_chars_i.getSolrFieldName() + " asc", 0, 100000000, Long.MAX_VALUE, 20, 1, true, + CollectionSchema.id.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName(), CollectionSchema.canonical_s.getSolrFieldName()); + SolrDocument doc_B; + int patchquerycountcheck = 0; + try { + while ((doc_B = documents_with_canonical_tag.take()) != AbstractSolrConnector.POISON_DOCUMENT) { + // find all documents which link to the canonical doc + DigestURL doc_C_url = new DigestURL((String) doc_B.getFieldValue(CollectionSchema.canonical_s.getSolrFieldName())); + byte[] doc_B_id = ASCII.getBytes(((String) doc_B.getFieldValue(CollectionSchema.id.getSolrFieldName()))); + // we remove all references to B, because these become references to C + if (segment.connectedCitation()) { + ReferenceContainer doc_A_ids = segment.urlCitation().remove(doc_B_id); + if (doc_A_ids == null) { + //System.out.println("*** document with canonical but no referrer: " + doc_B.getFieldValue(CollectionSchema.sku.getSolrFieldName())); + continue; // the document has a canonical tag but no referrer? + } + Iterator doc_A_ids_iterator = doc_A_ids.entries(); + // for each of the referrer A of B, set A as a referrer of C + while (doc_A_ids_iterator.hasNext()) { + CitationReference doc_A_citation = doc_A_ids_iterator.next(); + segment.urlCitation().add(doc_C_url.hash(), doc_A_citation); + } + } + patchquerycountcheck++; + if (MemoryControl.shortStatus()) { + ConcurrentLog.warn("CollectionConfiguration", "terminated canonical collection during postprocessing because of short memory"); + break; + } + } + } catch (InterruptedException e) { + ConcurrentLog.logException(e); + } catch (SpaceExceededException e) { + ConcurrentLog.logException(e); + } + if (patchquerycount != patchquerycountcheck) ConcurrentLog.warn("CollectionConfiguration", "ambiguous patchquery count for host " + host + ": expected=" + patchquerycount + ", counted=" + patchquerycountcheck); + + // do the citation rank computation + if (collection1hosts.get(host) <= 0) continue; + // select all documents for each host + CRHost crh = new CRHost(segment, rrCache, host, 0.85d, 6); + int convergence_attempts = 0; + while (convergence_attempts++ < 30) { + ConcurrentLog.info("CollectionConfiguration", "convergence step " + convergence_attempts + " for host " + host + " ..."); + if (crh.convergenceStep()) break; + if (MemoryControl.shortStatus()) { + ConcurrentLog.warn("CollectionConfiguration", "terminated convergenceStep during postprocessing because of short memory"); + break; + } + } + ConcurrentLog.info("CollectionConfiguration", "convergence for host " + host + " after " + convergence_attempts + " steps"); + // we have now the cr for all documents of a specific host; we store them for later use + Map crn = crh.normalize(); + //crh.log(crn); + rankings.putAll(crn); // accumulate this here for usage in document update later + if (MemoryControl.shortStatus()) { + ConcurrentLog.warn("CollectionConfiguration", "terminated crn akkumulation during postprocessing because of short memory"); + break; + } + countcheck++; + } + if (collection1hosts.size() != countcheck) ConcurrentLog.warn("CollectionConfiguration", "ambiguous host count: expected=" + collection1hosts.size() + ", counted=" + countcheck); + } catch (final IOException e2) { + ConcurrentLog.logException(e2); + collection1hosts = new ClusteredScoreMap(true); + } + return rankings; + } public void postprocessing_http_unique(final Segment segment, final SolrDocument doc, final SolrInputDocument sid, final DigestURL url) { if (!this.contains(CollectionSchema.http_unique_b)) return;