From 42f45760ed51862ad8d93e571b22a81902b3c174 Mon Sep 17 00:00:00 2001 From: luccioman Date: Wed, 31 Aug 2016 12:16:25 +0200 Subject: [PATCH 1/3] Refactored postprocessing For easier understanding and performances profiling. --- .../schema/CollectionConfiguration.java | 441 +++++++++--------- 1 file changed, 231 insertions(+), 210 deletions(-) diff --git a/source/net/yacy/search/schema/CollectionConfiguration.java b/source/net/yacy/search/schema/CollectionConfiguration.java index 7fda7067c..0ea4a5747 100644 --- a/source/net/yacy/search/schema/CollectionConfiguration.java +++ b/source/net/yacy/search/schema/CollectionConfiguration.java @@ -48,6 +48,13 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Pattern; +import org.apache.solr.common.SolrDocument; +import org.apache.solr.common.SolrDocumentList; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.SolrInputField; +import org.eclipse.jetty.util.ConcurrentHashSet; + import net.yacy.cora.document.analysis.Classification; import net.yacy.cora.document.analysis.Classification.ContentDomain; import net.yacy.cora.document.analysis.EnhancedTextProfileSignature; @@ -100,13 +107,6 @@ import net.yacy.search.index.Segment.ReferenceReport; import net.yacy.search.index.Segment.ReferenceReportCache; import net.yacy.search.query.QueryParams; -import org.apache.solr.common.SolrDocument; -import org.apache.solr.common.SolrDocumentList; -import org.apache.solr.common.SolrException; -import org.apache.solr.common.SolrInputDocument; -import org.apache.solr.common.SolrInputField; -import org.eclipse.jetty.util.ConcurrentHashSet; - public class CollectionConfiguration extends SchemaConfiguration implements Serializable { @@ -1131,212 +1131,38 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri collection.contains(CollectionSchema.cr_host_chance_d) && collection.contains(CollectionSchema.cr_host_norm_i))); // create the ranking map - final Map rankings = new ConcurrentHashMap(); - if (shallComputeCR) try { - int concurrency = Math.min(collection1hosts.size(), Runtime.getRuntime().availableProcessors()); - postprocessingActivity = "collecting cr for " + collection1hosts.size() + " hosts, concurrency = " + concurrency; - ConcurrentLog.info("CollectionConfiguration", postprocessingActivity); - int countcheck = 0; - for (String host: collection1hosts.keyList(true)) { - // Patch the citation index for links with canonical tags. - // This shall fulfill the following requirement: - // If a document A links to B and B contains a 'canonical C', then the citation rank computation shall consider that A links to C and B does not link to C. - // To do so, we first must collect all canonical links, find all references to them, get the anchor list of the documents and patch the citation reference of these links - String patchquery = CollectionSchema.host_s.getSolrFieldName() + ":" + host + " AND " + CollectionSchema.canonical_s.getSolrFieldName() + AbstractSolrConnector.CATCHALL_DTERM; - long patchquerycount = collectionConnector.getCountByQuery("{!cache=false}" + patchquery); - BlockingQueue documents_with_canonical_tag = collectionConnector.concurrentDocumentsByQuery(patchquery, CollectionSchema.url_chars_i.getSolrFieldName() + " asc", 0, 100000000, Long.MAX_VALUE, 20, 1, true, - CollectionSchema.id.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName(), CollectionSchema.canonical_s.getSolrFieldName()); - SolrDocument doc_B; - int patchquerycountcheck = 0; - try { - while ((doc_B = documents_with_canonical_tag.take()) != AbstractSolrConnector.POISON_DOCUMENT) { - // find all documents which link to the canonical doc - DigestURL doc_C_url = new DigestURL((String) doc_B.getFieldValue(CollectionSchema.canonical_s.getSolrFieldName())); - byte[] doc_B_id = ASCII.getBytes(((String) doc_B.getFieldValue(CollectionSchema.id.getSolrFieldName()))); - // we remove all references to B, because these become references to C - if (segment.connectedCitation()) { - ReferenceContainer doc_A_ids = segment.urlCitation().remove(doc_B_id); - if (doc_A_ids == null) { - //System.out.println("*** document with canonical but no referrer: " + doc_B.getFieldValue(CollectionSchema.sku.getSolrFieldName())); - continue; // the document has a canonical tag but no referrer? - } - Iterator doc_A_ids_iterator = doc_A_ids.entries(); - // for each of the referrer A of B, set A as a referrer of C - while (doc_A_ids_iterator.hasNext()) { - CitationReference doc_A_citation = doc_A_ids_iterator.next(); - segment.urlCitation().add(doc_C_url.hash(), doc_A_citation); - } - } - patchquerycountcheck++; - if (MemoryControl.shortStatus()) { - ConcurrentLog.warn("CollectionConfiguration", "terminated canonical collection during postprocessing because of short memory"); - break; - } - } - } catch (InterruptedException e) { - ConcurrentLog.logException(e); - } catch (SpaceExceededException e) { - ConcurrentLog.logException(e); - } - if (patchquerycount != patchquerycountcheck) ConcurrentLog.warn("CollectionConfiguration", "ambiguous patchquery count for host " + host + ": expected=" + patchquerycount + ", counted=" + patchquerycountcheck); - - // do the citation rank computation - if (collection1hosts.get(host) <= 0) continue; - // select all documents for each host - CRHost crh = new CRHost(segment, rrCache, host, 0.85d, 6); - int convergence_attempts = 0; - while (convergence_attempts++ < 30) { - ConcurrentLog.info("CollectionConfiguration", "convergence step " + convergence_attempts + " for host " + host + " ..."); - if (crh.convergenceStep()) break; - if (MemoryControl.shortStatus()) { - ConcurrentLog.warn("CollectionConfiguration", "terminated convergenceStep during postprocessing because of short memory"); - break; - } - } - ConcurrentLog.info("CollectionConfiguration", "convergence for host " + host + " after " + convergence_attempts + " steps"); - // we have now the cr for all documents of a specific host; we store them for later use - Map crn = crh.normalize(); - //crh.log(crn); - rankings.putAll(crn); // accumulate this here for usage in document update later - if (MemoryControl.shortStatus()) { - ConcurrentLog.warn("CollectionConfiguration", "terminated crn akkumulation during postprocessing because of short memory"); - break; - } - countcheck++; - } - if (collection1hosts.size() != countcheck) ConcurrentLog.warn("CollectionConfiguration", "ambiguous host count: expected=" + collection1hosts.size() + ", counted=" + countcheck); - } catch (final IOException e2) { - ConcurrentLog.logException(e2); - collection1hosts = new ClusteredScoreMap(true); - } + final Map rankings = createRankingMap(segment, rrCache, collectionConnector, collection1hosts, + shallComputeCR); // process all documents at the webgraph for the outgoing links of this document final AtomicInteger allcount = new AtomicInteger(0); if (segment.fulltext().useWebgraph() && shallComputeCR) { - postprocessingActivity = "collecting host facets for webgraph cr calculation"; - ConcurrentLog.info("CollectionConfiguration", postprocessingActivity); - final Set omitFields = new HashSet(); - omitFields.add(WebgraphSchema.process_sxt.getSolrFieldName()); - omitFields.add(WebgraphSchema.harvestkey_s.getSolrFieldName()); - - // collect hosts from index which shall take part in citation computation - ReversibleScoreMap webgraphhosts; - try { - Map> hostfacet = segment.fulltext().getWebgraphConnector().getFacets(webgraphquery, 10000000, WebgraphSchema.source_host_s.getSolrFieldName()); - webgraphhosts = hostfacet.get(WebgraphSchema.source_host_s.getSolrFieldName()); - } catch (final IOException e2) { - ConcurrentLog.logException(e2); - webgraphhosts = new ClusteredScoreMap(true); - } - try { - final long start = System.currentTimeMillis(); - for (String host: webgraphhosts.keyList(true)) { - if (webgraphhosts.get(host) <= 0) continue; - final String hostfinal = host; - // select all webgraph edges and modify their cr value - postprocessingActivity = "writing cr values to webgraph for host " + host; - ConcurrentLog.info("CollectionConfiguration", postprocessingActivity); - String patchquery = WebgraphSchema.source_host_s.getSolrFieldName() + ":\"" + host + "\" AND " + WebgraphSchema.process_sxt.getSolrFieldName() + AbstractSolrConnector.CATCHALL_DTERM; - final long count = segment.fulltext().getWebgraphConnector().getCountByQuery("{!cache=false}" + patchquery); - int concurrency = Math.min((int) count, Math.max(1, Runtime.getRuntime().availableProcessors() / 4)); - ConcurrentLog.info("CollectionConfiguration", "collecting " + count + " documents from the webgraph, concurrency = " + concurrency); - final BlockingQueue docs = segment.fulltext().getWebgraphConnector().concurrentDocumentsByQuery( - patchquery, - WebgraphSchema.source_chars_i.getSolrFieldName() + " asc", - 0, 100000000, Long.MAX_VALUE, concurrency + 1, concurrency, true - // TODO: add field list and do partial updates - ); - final AtomicInteger proccount = new AtomicInteger(0); - Thread[] t = new Thread[concurrency]; - for (final AtomicInteger i = new AtomicInteger(0); i.get() < t.length; i.incrementAndGet()) { - t[i.get()] = new Thread() { - private String name = "CollectionConfiguration.postprocessing.webgraph-" + i.get(); - @Override - public void run() { - Thread.currentThread().setName(name); - SolrDocument doc; String id; - try { - processloop: while ((doc = docs.take()) != AbstractSolrConnector.POISON_DOCUMENT) { - try { - SolrInputDocument sid = webgraph.toSolrInputDocument(doc, omitFields); - Collection proctags = doc.getFieldValues(WebgraphSchema.process_sxt.getSolrFieldName()); - - for (Object tag: proctags) try { - - // switch over tag types - ProcessType tagtype = ProcessType.valueOf((String) tag); - - // set cr values - if (tagtype == ProcessType.CITATION) { - if (segment.fulltext().useWebgraph() && webgraph.contains(WebgraphSchema.source_id_s) && webgraph.contains(WebgraphSchema.source_cr_host_norm_i)) { - id = (String) doc.getFieldValue(WebgraphSchema.source_id_s.getSolrFieldName()); - CRV crv = rankings.get(id); - if (crv != null) { - sid.setField(WebgraphSchema.source_cr_host_norm_i.getSolrFieldName(), crv.crn); - } - } - if (webgraph.contains(WebgraphSchema.target_id_s) && webgraph.contains(WebgraphSchema.target_cr_host_norm_i)) { - id = (String) doc.getFieldValue(WebgraphSchema.target_id_s.getSolrFieldName()); - CRV crv = rankings.get(id); - if (crv != null) { - sid.setField(WebgraphSchema.target_cr_host_norm_i.getSolrFieldName(), crv.crn); - } - } - } - } catch (IllegalArgumentException e) { - ConcurrentLog.logException(e); - } - - // write document back to index - try { - sid.removeField(WebgraphSchema.process_sxt.getSolrFieldName()); - sid.removeField(WebgraphSchema.harvestkey_s.getSolrFieldName()); - //segment.fulltext().getWebgraphConnector().deleteById((String) sid.getFieldValue(WebgraphSchema.id.getSolrFieldName())); - segment.fulltext().getWebgraphConnector().add(sid); - } catch (SolrException e) { - ConcurrentLog.logException(e); - } catch (IOException e) { - ConcurrentLog.logException(e); - } - proccount.incrementAndGet(); - allcount.incrementAndGet(); - if (proccount.get() % 1000 == 0) { - postprocessingActivity = "writing cr values to webgraph for host " + hostfinal + "postprocessed " + proccount + " from " + count + " documents; " + - (proccount.get() * 1000 / (System.currentTimeMillis() - start)) + " docs/second; " + - ((System.currentTimeMillis() - start) * (count - proccount.get()) / proccount.get() / 60000) + " minutes remaining"; - ConcurrentLog.info("CollectionConfiguration", postprocessingActivity); - } - } catch (Throwable e) { - ConcurrentLog.logException(e); - continue processloop; - } - } - } catch (InterruptedException e) { - ConcurrentLog.warn("CollectionConfiguration", e.getMessage(), e); - } - } - }; - t[i.get()].start(); - } - for (int i = 0; i < t.length; i++) try { - t[i].join(10000); - if (t[i].isAlive()) t[i].interrupt(); - } catch (InterruptedException e) {} - - if (count != proccount.get()) ConcurrentLog.warn("CollectionConfiguration", "ambiguous webgraph document count for host " + host + ": expected=" + count + ", counted=" + proccount); - } - } catch (final IOException e2) { - ConcurrentLog.warn("CollectionConfiguration", e2.getMessage(), e2); - } + postprocessWebgraph(segment, webgraph, webgraphquery, rankings, allcount); } // process all documents in collection - final Map hostExtentCache = new HashMap(); // a mapping from the host id to the number of documents which contain this host-id + postprocessDocuments(segment, rrCache, harvestkey, byPartialUpdate, collectionConnector, collection, + collection1query, rankings, allcount); + + + postprocessingCollection1Count = 0; + postprocessingWebgraphCount = 0; + postprocessingActivity = "postprocessing terminated"; + ConcurrentLog.info("CollectionConfiguration", postprocessingActivity); + postprocessingRunning = false; + return allcount.get(); + } + + private void postprocessDocuments(final Segment segment, final ReferenceReportCache rrCache, + final String harvestkey, final boolean byPartialUpdate, final SolrConnector collectionConnector, + final CollectionConfiguration collection, final String collection1query, final Map rankings, + final AtomicInteger allcount) { + final Map hostExtentCache = new HashMap(); // a mapping from the host id to the number of documents which contain this host-id final Set uniqueURLs = new ConcurrentHashSet(); // will be used in a concurrent environment final Set omitFields = new HashSet(); omitFields.add(CollectionSchema.process_sxt.getSolrFieldName()); omitFields.add(CollectionSchema.harvestkey_s.getSolrFieldName()); - final Collection failids = new ArrayList(); + final Collection failids = new ConcurrentHashSet(); final AtomicInteger countcheck = new AtomicInteger(0); final AtomicInteger proccount = new AtomicInteger(); final AtomicInteger proccount_referencechange = new AtomicInteger(); @@ -1400,7 +1226,7 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri this.allFields()); final Thread rewriteThread[] = new Thread[concurrency]; for (int rewrite_start = 0; rewrite_start < concurrency; rewrite_start++) { - rewriteThread[rewrite_start] = new Thread() { + rewriteThread[rewrite_start] = new Thread("CollectionConfiguration.postprocessing.rewriteThread-" + rewrite_start) { @Override public void run() { SolrDocument doc; @@ -1516,13 +1342,208 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri ConcurrentLog.warn("CollectionConfiguration", e3.getMessage(), e3); } collectionConnector.commit(true); // make changes available directly to prevent that the process repeats again - postprocessingCollection1Count = 0; - postprocessingWebgraphCount = 0; - postprocessingActivity = "postprocessing terminated"; - ConcurrentLog.info("CollectionConfiguration", postprocessingActivity); - postprocessingRunning = false; - return allcount.get(); - } + } + + private void postprocessWebgraph(final Segment segment, final WebgraphConfiguration webgraph, String webgraphquery, + final Map rankings, final AtomicInteger allcount) { + postprocessingActivity = "collecting host facets for webgraph cr calculation"; + ConcurrentLog.info("CollectionConfiguration", postprocessingActivity); + final Set omitFields = new HashSet(); + omitFields.add(WebgraphSchema.process_sxt.getSolrFieldName()); + omitFields.add(WebgraphSchema.harvestkey_s.getSolrFieldName()); + + // collect hosts from index which shall take part in citation computation + ReversibleScoreMap webgraphhosts; + try { + Map> hostfacet = segment.fulltext().getWebgraphConnector().getFacets(webgraphquery, 10000000, WebgraphSchema.source_host_s.getSolrFieldName()); + webgraphhosts = hostfacet.get(WebgraphSchema.source_host_s.getSolrFieldName()); + } catch (final IOException e2) { + ConcurrentLog.logException(e2); + webgraphhosts = new ClusteredScoreMap(true); + } + try { + final long start = System.currentTimeMillis(); + for (String host: webgraphhosts.keyList(true)) { + if (webgraphhosts.get(host) <= 0) continue; + final String hostfinal = host; + // select all webgraph edges and modify their cr value + postprocessingActivity = "writing cr values to webgraph for host " + host; + ConcurrentLog.info("CollectionConfiguration", postprocessingActivity); + String patchquery = WebgraphSchema.source_host_s.getSolrFieldName() + ":\"" + host + "\" AND " + WebgraphSchema.process_sxt.getSolrFieldName() + AbstractSolrConnector.CATCHALL_DTERM; + final long count = segment.fulltext().getWebgraphConnector().getCountByQuery("{!cache=false}" + patchquery); + int concurrency = Math.min((int) count, Math.max(1, Runtime.getRuntime().availableProcessors() / 4)); + ConcurrentLog.info("CollectionConfiguration", "collecting " + count + " documents from the webgraph, concurrency = " + concurrency); + final BlockingQueue docs = segment.fulltext().getWebgraphConnector().concurrentDocumentsByQuery( + patchquery, + WebgraphSchema.source_chars_i.getSolrFieldName() + " asc", + 0, 100000000, Long.MAX_VALUE, concurrency + 1, concurrency, true + // TODO: add field list and do partial updates + ); + final AtomicInteger proccount = new AtomicInteger(0); + Thread[] t = new Thread[concurrency]; + for (int i = 0; i < t.length; i++) { + t[i] = new Thread("CollectionConfiguration.postprocessing.webgraph-" + i) { + @Override + public void run() { + SolrDocument doc; String id; + try { + processloop: while ((doc = docs.take()) != AbstractSolrConnector.POISON_DOCUMENT) { + try { + SolrInputDocument sid = webgraph.toSolrInputDocument(doc, omitFields); + Collection proctags = doc.getFieldValues(WebgraphSchema.process_sxt.getSolrFieldName()); + + for (Object tag: proctags) try { + + // switch over tag types + ProcessType tagtype = ProcessType.valueOf((String) tag); + + // set cr values + if (tagtype == ProcessType.CITATION) { + if (segment.fulltext().useWebgraph() && webgraph.contains(WebgraphSchema.source_id_s) && webgraph.contains(WebgraphSchema.source_cr_host_norm_i)) { + id = (String) doc.getFieldValue(WebgraphSchema.source_id_s.getSolrFieldName()); + CRV crv = rankings.get(id); + if (crv != null) { + sid.setField(WebgraphSchema.source_cr_host_norm_i.getSolrFieldName(), crv.crn); + } + } + if (webgraph.contains(WebgraphSchema.target_id_s) && webgraph.contains(WebgraphSchema.target_cr_host_norm_i)) { + id = (String) doc.getFieldValue(WebgraphSchema.target_id_s.getSolrFieldName()); + CRV crv = rankings.get(id); + if (crv != null) { + sid.setField(WebgraphSchema.target_cr_host_norm_i.getSolrFieldName(), crv.crn); + } + } + } + } catch (IllegalArgumentException e) { + ConcurrentLog.logException(e); + } + + // write document back to index + try { + sid.removeField(WebgraphSchema.process_sxt.getSolrFieldName()); + sid.removeField(WebgraphSchema.harvestkey_s.getSolrFieldName()); + //segment.fulltext().getWebgraphConnector().deleteById((String) sid.getFieldValue(WebgraphSchema.id.getSolrFieldName())); + segment.fulltext().getWebgraphConnector().add(sid); + } catch (SolrException e) { + ConcurrentLog.logException(e); + } catch (IOException e) { + ConcurrentLog.logException(e); + } + proccount.incrementAndGet(); + allcount.incrementAndGet(); + if (proccount.get() % 1000 == 0) { + postprocessingActivity = "writing cr values to webgraph for host " + hostfinal + "postprocessed " + proccount + " from " + count + " documents; " + + (proccount.get() * 1000 / (System.currentTimeMillis() - start)) + " docs/second; " + + ((System.currentTimeMillis() - start) * (count - proccount.get()) / proccount.get() / 60000) + " minutes remaining"; + ConcurrentLog.info("CollectionConfiguration", postprocessingActivity); + } + } catch (Throwable e) { + ConcurrentLog.logException(e); + continue processloop; + } + } + } catch (InterruptedException e) { + ConcurrentLog.warn("CollectionConfiguration", e.getMessage(), e); + } + } + }; + t[i].start(); + } + for (int i = 0; i < t.length; i++) try { + t[i].join(10000); + if (t[i].isAlive()) t[i].interrupt(); + } catch (InterruptedException e) {} + + if (count != proccount.get()) ConcurrentLog.warn("CollectionConfiguration", "ambiguous webgraph document count for host " + host + ": expected=" + count + ", counted=" + proccount); + } + } catch (final IOException e2) { + ConcurrentLog.warn("CollectionConfiguration", e2.getMessage(), e2); + } + } + + private Map createRankingMap(final Segment segment, final ReferenceReportCache rrCache, + final SolrConnector collectionConnector, ReversibleScoreMap collection1hosts, + boolean shallComputeCR) { + final Map rankings = new ConcurrentHashMap(); + if (shallComputeCR) try { + int concurrency = Math.min(collection1hosts.size(), Runtime.getRuntime().availableProcessors()); + postprocessingActivity = "collecting cr for " + collection1hosts.size() + " hosts, concurrency = " + concurrency; + ConcurrentLog.info("CollectionConfiguration", postprocessingActivity); + int countcheck = 0; + for (String host: collection1hosts.keyList(true)) { + // Patch the citation index for links with canonical tags. + // This shall fulfill the following requirement: + // If a document A links to B and B contains a 'canonical C', then the citation rank computation shall consider that A links to C and B does not link to C. + // To do so, we first must collect all canonical links, find all references to them, get the anchor list of the documents and patch the citation reference of these links + String patchquery = CollectionSchema.host_s.getSolrFieldName() + ":" + host + " AND " + CollectionSchema.canonical_s.getSolrFieldName() + AbstractSolrConnector.CATCHALL_DTERM; + long patchquerycount = collectionConnector.getCountByQuery("{!cache=false}" + patchquery); + BlockingQueue documents_with_canonical_tag = collectionConnector.concurrentDocumentsByQuery(patchquery, CollectionSchema.url_chars_i.getSolrFieldName() + " asc", 0, 100000000, Long.MAX_VALUE, 20, 1, true, + CollectionSchema.id.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName(), CollectionSchema.canonical_s.getSolrFieldName()); + SolrDocument doc_B; + int patchquerycountcheck = 0; + try { + while ((doc_B = documents_with_canonical_tag.take()) != AbstractSolrConnector.POISON_DOCUMENT) { + // find all documents which link to the canonical doc + DigestURL doc_C_url = new DigestURL((String) doc_B.getFieldValue(CollectionSchema.canonical_s.getSolrFieldName())); + byte[] doc_B_id = ASCII.getBytes(((String) doc_B.getFieldValue(CollectionSchema.id.getSolrFieldName()))); + // we remove all references to B, because these become references to C + if (segment.connectedCitation()) { + ReferenceContainer doc_A_ids = segment.urlCitation().remove(doc_B_id); + if (doc_A_ids == null) { + //System.out.println("*** document with canonical but no referrer: " + doc_B.getFieldValue(CollectionSchema.sku.getSolrFieldName())); + continue; // the document has a canonical tag but no referrer? + } + Iterator doc_A_ids_iterator = doc_A_ids.entries(); + // for each of the referrer A of B, set A as a referrer of C + while (doc_A_ids_iterator.hasNext()) { + CitationReference doc_A_citation = doc_A_ids_iterator.next(); + segment.urlCitation().add(doc_C_url.hash(), doc_A_citation); + } + } + patchquerycountcheck++; + if (MemoryControl.shortStatus()) { + ConcurrentLog.warn("CollectionConfiguration", "terminated canonical collection during postprocessing because of short memory"); + break; + } + } + } catch (InterruptedException e) { + ConcurrentLog.logException(e); + } catch (SpaceExceededException e) { + ConcurrentLog.logException(e); + } + if (patchquerycount != patchquerycountcheck) ConcurrentLog.warn("CollectionConfiguration", "ambiguous patchquery count for host " + host + ": expected=" + patchquerycount + ", counted=" + patchquerycountcheck); + + // do the citation rank computation + if (collection1hosts.get(host) <= 0) continue; + // select all documents for each host + CRHost crh = new CRHost(segment, rrCache, host, 0.85d, 6); + int convergence_attempts = 0; + while (convergence_attempts++ < 30) { + ConcurrentLog.info("CollectionConfiguration", "convergence step " + convergence_attempts + " for host " + host + " ..."); + if (crh.convergenceStep()) break; + if (MemoryControl.shortStatus()) { + ConcurrentLog.warn("CollectionConfiguration", "terminated convergenceStep during postprocessing because of short memory"); + break; + } + } + ConcurrentLog.info("CollectionConfiguration", "convergence for host " + host + " after " + convergence_attempts + " steps"); + // we have now the cr for all documents of a specific host; we store them for later use + Map crn = crh.normalize(); + //crh.log(crn); + rankings.putAll(crn); // accumulate this here for usage in document update later + if (MemoryControl.shortStatus()) { + ConcurrentLog.warn("CollectionConfiguration", "terminated crn akkumulation during postprocessing because of short memory"); + break; + } + countcheck++; + } + if (collection1hosts.size() != countcheck) ConcurrentLog.warn("CollectionConfiguration", "ambiguous host count: expected=" + collection1hosts.size() + ", counted=" + countcheck); + } catch (final IOException e2) { + ConcurrentLog.logException(e2); + collection1hosts = new ClusteredScoreMap(true); + } + return rankings; + } public void postprocessing_http_unique(final Segment segment, final SolrDocument doc, final SolrInputDocument sid, final DigestURL url) { if (!this.contains(CollectionSchema.http_unique_b)) return; From 8c49a755da40f3191dfeff3cba4b36b5c6681576 Mon Sep 17 00:00:00 2001 From: luccioman Date: Thu, 1 Sep 2016 15:40:28 +0200 Subject: [PATCH 2/3] Postprocessing refactoring Added Javadocs to refactored methods. Added log warnings instead of silently failing some errors. Only fill collection1hosts when required ( shallComputeCR true). --- .../schema/CollectionConfiguration.java | 99 ++++++++++++++----- 1 file changed, 76 insertions(+), 23 deletions(-) diff --git a/source/net/yacy/search/schema/CollectionConfiguration.java b/source/net/yacy/search/schema/CollectionConfiguration.java index 0ea4a5747..fa281c946 100644 --- a/source/net/yacy/search/schema/CollectionConfiguration.java +++ b/source/net/yacy/search/schema/CollectionConfiguration.java @@ -1081,10 +1081,12 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri } /** - * post-processing steps for all entries that have a process tag assigned - * @param connector - * @param urlCitation - * @return + * Performs post-processing steps for all entries that have a process tag assigned + * @param segment Solr segment. Must not be null. + * @param rrCache reference report cache for the segment. + * @param harvestkey key from a harvest process, used to mark documents needing post-processing + * @param byPartialUpdate when true, perform partial updates on documents + * @return the number of post processed documents */ public int postprocessing(final Segment segment, final ReferenceReportCache rrCache, final String harvestkey, final boolean byPartialUpdate) { if (!this.contains(CollectionSchema.process_sxt)) return 0; @@ -1109,18 +1111,6 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri postprocessingCollection1Count = -1; postprocessingWebgraphCount = -1; } - - // collect hosts from index which shall take part in citation computation - postprocessingActivity = "collecting host facets for collection"; - ConcurrentLog.info("CollectionConfiguration", postprocessingActivity); - ReversibleScoreMap collection1hosts; - try { - Map> hostfacet = collectionConnector.getFacets("{!cache=false}" + collection1query, 10000000, CollectionSchema.host_s.getSolrFieldName()); - collection1hosts = hostfacet.get(CollectionSchema.host_s.getSolrFieldName()); - } catch (final IOException e2) { - ConcurrentLog.logException(e2); - collection1hosts = new ClusteredScoreMap(true); - } postprocessingActivity = "create ranking map"; ConcurrentLog.info("CollectionConfiguration", postprocessingActivity); @@ -1131,8 +1121,24 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri collection.contains(CollectionSchema.cr_host_chance_d) && collection.contains(CollectionSchema.cr_host_norm_i))); // create the ranking map - final Map rankings = createRankingMap(segment, rrCache, collectionConnector, collection1hosts, - shallComputeCR); + final Map rankings; + if(shallComputeCR) { + // collect hosts from index which shall take part in citation computation + postprocessingActivity = "collecting host facets for collection"; + ConcurrentLog.info("CollectionConfiguration", postprocessingActivity); + ReversibleScoreMap collection1hosts; + try { + Map> hostfacet = collectionConnector.getFacets("{!cache=false}" + collection1query, 10000000, CollectionSchema.host_s.getSolrFieldName()); + collection1hosts = hostfacet.get(CollectionSchema.host_s.getSolrFieldName()); + } catch (final IOException e2) { + ConcurrentLog.logException(e2); + collection1hosts = new ClusteredScoreMap(true); + } + + rankings = createRankingMap(segment, rrCache, collectionConnector, collection1hosts); + } else { + rankings = new ConcurrentHashMap(); + } // process all documents at the webgraph for the outgoing links of this document final AtomicInteger allcount = new AtomicInteger(0); @@ -1153,6 +1159,18 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri return allcount.get(); } + /** + * Performs postprocessing steps on the main documents dollection. + * @param segment Solr segment. + * @param rrCache reference report cache for the segment. + * @param harvestkey key from a harvest process, used to mark documents needing post-processing + * @param byPartialUpdate when true, perform partial updates on documents + * @param collectionConnector connector to the main Solr collection + * @param collection schema configuration for the collection + * @param collection1query query used to harvest items to postprocess in the main collection + * @param rankings postprocessed rankings + * @param allcount global postprocessed documents count + */ private void postprocessDocuments(final Segment segment, final ReferenceReportCache rrCache, final String harvestkey, final boolean byPartialUpdate, final SolrConnector collectionConnector, final CollectionConfiguration collection, final String collection1query, final Map rankings, @@ -1344,6 +1362,14 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri collectionConnector.commit(true); // make changes available directly to prevent that the process repeats again } + /** + * Perform postprocessing steps on the webgraph core. + * @param segment Solr segment. + * @param webgraph webgraph schema configuration + * @param webgraphquery query used to harvest items to postprocess in the webgraph collection + * @param rankings postprocessed rankings + * @param allcount global postprocessed documents count + */ private void postprocessWebgraph(final Segment segment, final WebgraphConfiguration webgraph, String webgraphquery, final Map rankings, final AtomicInteger allcount) { postprocessingActivity = "collecting host facets for webgraph cr calculation"; @@ -1461,11 +1487,18 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri } } + /** + * Patches the citation index for links with canonical tags and perform the citation rank computation + * @param segment Solr segment + * @param rrCache reference report cache for the segment + * @param collectionConnector default connector to the Solr segment + * @param collection1hosts hosts from index which shall take part in citation computation + * @return the ranking map + */ private Map createRankingMap(final Segment segment, final ReferenceReportCache rrCache, - final SolrConnector collectionConnector, ReversibleScoreMap collection1hosts, - boolean shallComputeCR) { + final SolrConnector collectionConnector, ReversibleScoreMap collection1hosts) { final Map rankings = new ConcurrentHashMap(); - if (shallComputeCR) try { + try { int concurrency = Math.min(collection1hosts.size(), Runtime.getRuntime().availableProcessors()); postprocessingActivity = "collecting cr for " + collection1hosts.size() + " hosts, concurrency = " + concurrency; ConcurrentLog.info("CollectionConfiguration", postprocessingActivity); @@ -1545,6 +1578,14 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri return rankings; } + /** + * Search in the segment any document having the same url as doc but with the opposite secure/unsecure (https or http) version of the protocol. + * Then updates accordingly the document http_unique_b field. + * @param segment Solr segment + * @param doc document to process + * @param sid updatable version of the document + * @param url document's url + */ public void postprocessing_http_unique(final Segment segment, final SolrDocument doc, final SolrInputDocument sid, final DigestURL url) { if (!this.contains(CollectionSchema.http_unique_b)) return; if (!url.isHTTPS() && !url.isHTTP()) return; @@ -1552,9 +1593,19 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri DigestURL u = new DigestURL((url.isHTTP() ? "https://" : "http://") + url.urlstub(true, true)); SolrDocument d = segment.fulltext().getDefaultConnector().getDocumentById(ASCII.String(u.hash()), CollectionSchema.http_unique_b.getSolrFieldName()); set_unique_flag(CollectionSchema.http_unique_b, doc, sid, d); - } catch (final IOException e) {} + } catch (final IOException e) { + ConcurrentLog.warn("CollectionConfiguration", "Failed to postProcess http_unique_b field" + e.getMessage() != null ? " : " + e.getMessage() : "."); + } } + /** + * Search in the segment any document having the same url as doc but with or without the www prefix. + * Then updates accordingly the document www_unique_b field. + * @param segment Solr segment + * @param doc document to process + * @param sid updatable version of the document + * @param url document's url + */ public void postprocessing_www_unique(final Segment segment, final SolrDocument doc, final SolrInputDocument sid, final DigestURL url) { if (!this.contains(CollectionSchema.www_unique_b)) return; final String us = url.urlstub(true, true); @@ -1562,7 +1613,9 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri DigestURL u = new DigestURL(url.getProtocol() + (us.startsWith("www.") ? "://" + us.substring(4) : "://www." + us)); SolrDocument d = segment.fulltext().getDefaultConnector().getDocumentById(ASCII.String(u.hash()), CollectionSchema.www_unique_b.getSolrFieldName()); set_unique_flag(CollectionSchema.www_unique_b, doc, sid, d); - } catch (final IOException e) {} + } catch (final IOException e) { + ConcurrentLog.warn("CollectionConfiguration", "Failed to postProcess www_unique_b field" + e.getMessage() != null ? " : " + e.getMessage() : "."); + } } private void set_unique_flag(CollectionSchema field, final SolrDocument doc, final SolrInputDocument sid, final SolrDocument d) { From 0a9ff14d96714eacfd759447fa1cfa58a1e8f257 Mon Sep 17 00:00:00 2001 From: luccioman Date: Wed, 7 Sep 2016 10:03:48 +0200 Subject: [PATCH 3/3] Fixed NullPointerException case and added Javadoc --- .../solr/responsewriter/OpensearchResponseWriter.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/source/net/yacy/cora/federate/solr/responsewriter/OpensearchResponseWriter.java b/source/net/yacy/cora/federate/solr/responsewriter/OpensearchResponseWriter.java index d6f728ca3..7445cddf5 100644 --- a/source/net/yacy/cora/federate/solr/responsewriter/OpensearchResponseWriter.java +++ b/source/net/yacy/cora/federate/solr/responsewriter/OpensearchResponseWriter.java @@ -343,13 +343,20 @@ public class OpensearchResponseWriter implements QueryResponseWriter { return; } + /** + * @param snippets snippets list eventually empty + * @return the largest snippet containing at least a space character among the list, or null + */ public static String getLargestSnippet(LinkedHashSet snippets) { if (snippets == null || snippets.size() == 0) return null; String l = null; for (String s: snippets) { if ((l == null || s.length() > l.length()) && s.indexOf(' ') > 0) l = s; } - return l.replaceAll("\"", "'"); + if(l != null) { + l = l.replaceAll("\"", "'"); + } + return l; } public static void openTag(final Writer writer, final String tag) throws IOException {