From 0cf9e9580b874ae79d6480b5b7a0be54da1982c6 Mon Sep 17 00:00:00 2001 From: Michael Peter Christen Date: Wed, 6 Nov 2013 15:01:40 +0100 Subject: [PATCH] added clickdepth and CR computation debug code to verify that the process is complete --- .../schema/CollectionConfiguration.java | 65 ++++++++++++------- 1 file changed, 43 insertions(+), 22 deletions(-) diff --git a/source/net/yacy/search/schema/CollectionConfiguration.java b/source/net/yacy/search/schema/CollectionConfiguration.java index adda277b2..31b3c35e2 100644 --- a/source/net/yacy/search/schema/CollectionConfiguration.java +++ b/source/net/yacy/search/schema/CollectionConfiguration.java @@ -898,17 +898,19 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri String query = (harvestkey == null || !segment.fulltext().getDefaultConfiguration().contains(CollectionSchema.harvestkey_s) ? "" : CollectionSchema.harvestkey_s.getSolrFieldName() + ":\"" + harvestkey + "\" AND ") + CollectionSchema.process_sxt.getSolrFieldName() + ":" + ProcessType.CITATION.toString(); hostscore = collectionConnector.getFacets(query, 10000000, CollectionSchema.host_s.getSolrFieldName()).get(CollectionSchema.host_s.getSolrFieldName()); - if (hostscore == null) hostscore = new ClusteredScoreMap(); - + ConcurrentLog.info("CollectionConfiguration", "collecting " + hostscore.size() + " hosts"); + int countcheck = 0; for (String host: hostscore.keyList(true)) { // Patch the citation index for links with canonical tags. // This shall fulfill the following requirement: - // If a document A links to B and B contains a 'canonical C', then the citation rank coputation shall consider that A links to C and B does not link to C. + // If a document A links to B and B contains a 'canonical C', then the citation rank computation shall consider that A links to C and B does not link to C. // To do so, we first must collect all canonical links, find all references to them, get the anchor list of the documents and patch the citation reference of these links String patchquery = CollectionSchema.host_s.getSolrFieldName() + ":" + host + " AND " + CollectionSchema.canonical_s.getSolrFieldName() + ":[* TO *]"; + long patchquerycount = collectionConnector.getCountByQuery(patchquery); BlockingQueue documents_with_canonical_tag = collectionConnector.concurrentDocumentsByQuery(patchquery, 0, 10000000, 60000L, 50, CollectionSchema.id.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName(), CollectionSchema.canonical_s.getSolrFieldName()); SolrDocument doc_B; + int patchquerycountcheck = 0; try { while ((doc_B = documents_with_canonical_tag.take()) != AbstractSolrConnector.POISON_DOCUMENT) { // find all documents which link to the canonical doc @@ -926,10 +928,12 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri CitationReference doc_A_citation = doc_A_ids_iterator.next(); segment.urlCitation().add(doc_C_url.hash(), doc_A_citation); } + patchquerycountcheck++; } } catch (InterruptedException e) { } catch (SpaceExceededException e) { } + if (patchquerycount != patchquerycountcheck) ConcurrentLog.warn("CollectionConfiguration", "ambiguous patchquery count for host " + host + ": expected=" + patchquerycount + ", counted=" + patchquerycountcheck); // do the citation rank computation if (hostscore.get(host) <= 0) continue; @@ -939,12 +943,14 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri while (convergence_attempts++ < 30) { if (crh.convergenceStep()) break; } - ConcurrentLog.info("CollectionConfiguration.CRHost", "convergence for host " + host + " after " + convergence_attempts + " steps"); + ConcurrentLog.info("CollectionConfiguration", "convergence for host " + host + " after " + convergence_attempts + " steps"); // we have now the cr for all documents of a specific host; we store them for later use Map crn = crh.normalize(); //crh.log(crn); ranking.putAll(crn); // accumulate this here for usage in document update later + countcheck++; } + if (hostscore.size() != countcheck) ConcurrentLog.warn("CollectionConfiguration", "ambiguous host count: expected=" + hostscore.size() + ", counted=" + countcheck); } catch (final IOException e2) { hostscore = new ClusteredScoreMap(); } @@ -952,13 +958,15 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri // process all documents at the webgraph for the outgoing links of this document SolrDocument doc; if (webgraphConnector != null) { - for (String host: hostscore.keyList(true)) { - if (hostscore.get(host) <= 0) continue; - // select all webgraph edges and modify their cr value - BlockingQueue docs = webgraphConnector.concurrentDocumentsByQuery( - WebgraphSchema.source_host_s.getSolrFieldName() + ":\"" + host + "\"", - 0, 10000000, 60000, 50); - try { + try { + for (String host: hostscore.keyList(true)) { + if (hostscore.get(host) <= 0) continue; + // select all webgraph edges and modify their cr value + String query = WebgraphSchema.source_host_s.getSolrFieldName() + ":\"" + host + "\""; + long count = webgraphConnector.getCountByQuery(query); + ConcurrentLog.info("CollectionConfiguration", "collecting " + count + " documents from the webgraph"); + BlockingQueue docs = webgraphConnector.concurrentDocumentsByQuery(query, 0, 10000000, 60000, 50); + int countcheck = 0; while ((doc = docs.take()) != AbstractSolrConnector.POISON_DOCUMENT) { boolean changed = false; SolrInputDocument sid = segment.fulltext().getWebgraphConfiguration().toSolrInputDocument(doc, null); @@ -978,21 +986,29 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri webgraphConnector.add(sid); } catch (SolrException e) { } catch (IOException e) { - } + } + countcheck++; } - } catch (final InterruptedException e) {} + if (count != countcheck) ConcurrentLog.warn("CollectionConfiguration", "ambiguous webgraph document count for host " + host + ": expected=" + count + ", counted=" + countcheck); + } + } catch (final IOException e2) { + ConcurrentLog.warn("CollectionConfiguration", e2.getMessage(), e2); + } catch (final InterruptedException e3) { + ConcurrentLog.warn("CollectionConfiguration", e3.getMessage(), e3); } } // process all documents in collection - BlockingQueue docs = collectionConnector.concurrentDocumentsByQuery( - (harvestkey == null ? "" : CollectionSchema.harvestkey_s.getSolrFieldName() + ":\"" + harvestkey + "\" AND ") + - CollectionSchema.process_sxt.getSolrFieldName() + ":[* TO *]", - 0, 10000, 60000, 50); + String query = (harvestkey == null ? "" : CollectionSchema.harvestkey_s.getSolrFieldName() + ":\"" + harvestkey + "\" AND ") + + CollectionSchema.process_sxt.getSolrFieldName() + ":[* TO *]"; int proccount = 0, proccount_clickdepthchange = 0, proccount_referencechange = 0, proccount_citationchange = 0, proccount_uniquechange = 0; Map hostExtentCache = new HashMap(); // a mapping from the host id to the number of documents which contain this host-id Set uniqueURLs = new HashSet(); try { + long count = collectionConnector.getCountByQuery(query); + ConcurrentLog.info("CollectionConfiguration", "collecting " + count + " documents from the collection for harvestkey " + harvestkey); + BlockingQueue docs = collectionConnector.concurrentDocumentsByQuery(query, 0, 10000, 60000, 50); + int countcheck = 0; while ((doc = docs.take()) != AbstractSolrConnector.POISON_DOCUMENT) { // for each to-be-processed entry work on the process tag Collection proctags = doc.getFieldValues(CollectionSchema.process_sxt.getSolrFieldName()); @@ -1031,8 +1047,8 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri if (!hostExtentCache.containsKey(hosthash)) { StringBuilder q = new StringBuilder(); q.append(CollectionSchema.host_id_s.getSolrFieldName()).append(":\"").append(hosthash).append("\" AND ").append(CollectionSchema.httpstatus_i.getSolrFieldName()).append(":200"); - long count = segment.fulltext().getDefaultConnector().getCountByQuery(q.toString()); - hostExtentCache.put(hosthash, count); + long hostExtentCount = segment.fulltext().getDefaultConnector().getCountByQuery(q.toString()); + hostExtentCache.put(hosthash, hostExtentCount); } if (postprocessing_references(rrCache, doc, sid, url, hostExtentCache)) proccount_referencechange++; @@ -1047,13 +1063,18 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri proccount++; } catch (final Throwable e1) { } + countcheck++; } + if (count != countcheck) ConcurrentLog.warn("CollectionConfiguration", "ambiguous collection document count for harvestkey " + harvestkey + ": expected=" + count + ", counted=" + countcheck); ConcurrentLog.info("CollectionConfiguration", "cleanup_processing: re-calculated " + proccount+ " new documents, " + proccount_clickdepthchange + " clickdepth changes, " + proccount_referencechange + " reference-count changes, " + proccount_uniquechange + " unique field changes, " + proccount_citationchange + " citation ranking changes."); - } catch (final InterruptedException e) { + } catch (final InterruptedException e2) { + ConcurrentLog.warn("CollectionConfiguration", e2.getMessage(), e2); + } catch (IOException e3) { + ConcurrentLog.warn("CollectionConfiguration", e3.getMessage(), e3); } return proccount; } @@ -1148,8 +1169,8 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri if (entry == null || entry.getValue() == null) continue; try { String url = (String) connector.getDocumentById(ASCII.String(entry.getKey()), CollectionSchema.sku.getSolrFieldName()).getFieldValue(CollectionSchema.sku.getSolrFieldName()); - ConcurrentLog.info("CollectionConfiguration.CRHost", "CR for " + url); - ConcurrentLog.info("CollectionConfiguration.CRHost", ">> " + entry.getValue().toString()); + ConcurrentLog.info("CollectionConfiguration", "CR for " + url); + ConcurrentLog.info("CollectionConfiguration", ">> " + entry.getValue().toString()); } catch (final IOException e) { ConcurrentLog.logException(e); }