Refactored postprocessing

For easier understanding and performances profiling.
luccioman 9 years ago
parent de663be48b
commit 42f45760ed

@ -48,6 +48,13 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.SolrInputField;
import org.eclipse.jetty.util.ConcurrentHashSet;
import net.yacy.cora.document.analysis.Classification;
import net.yacy.cora.document.analysis.Classification.ContentDomain;
import net.yacy.cora.document.analysis.EnhancedTextProfileSignature;
@ -100,13 +107,6 @@ import;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.SolrInputField;
import org.eclipse.jetty.util.ConcurrentHashSet;
public class CollectionConfiguration extends SchemaConfiguration implements Serializable {
@ -1131,212 +1131,38 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
collection.contains(CollectionSchema.cr_host_chance_d) &&
// create the ranking map
final Map<String, CRV> rankings = new ConcurrentHashMap<String, CRV>();
if (shallComputeCR) try {
int concurrency = Math.min(collection1hosts.size(), Runtime.getRuntime().availableProcessors());
postprocessingActivity = "collecting cr for " + collection1hosts.size() + " hosts, concurrency = " + concurrency;"CollectionConfiguration", postprocessingActivity);
int countcheck = 0;
for (String host: collection1hosts.keyList(true)) {
// Patch the citation index for links with canonical tags.
// This shall fulfill the following requirement:
// If a document A links to B and B contains a 'canonical C', then the citation rank computation shall consider that A links to C and B does not link to C.
// To do so, we first must collect all canonical links, find all references to them, get the anchor list of the documents and patch the citation reference of these links
String patchquery = CollectionSchema.host_s.getSolrFieldName() + ":" + host + " AND " + CollectionSchema.canonical_s.getSolrFieldName() + AbstractSolrConnector.CATCHALL_DTERM;
long patchquerycount = collectionConnector.getCountByQuery("{!cache=false}" + patchquery);
BlockingQueue<SolrDocument> documents_with_canonical_tag = collectionConnector.concurrentDocumentsByQuery(patchquery, CollectionSchema.url_chars_i.getSolrFieldName() + " asc", 0, 100000000, Long.MAX_VALUE, 20, 1, true,, CollectionSchema.sku.getSolrFieldName(), CollectionSchema.canonical_s.getSolrFieldName());
SolrDocument doc_B;
int patchquerycountcheck = 0;
try {
while ((doc_B = documents_with_canonical_tag.take()) != AbstractSolrConnector.POISON_DOCUMENT) {
// find all documents which link to the canonical doc
DigestURL doc_C_url = new DigestURL((String) doc_B.getFieldValue(CollectionSchema.canonical_s.getSolrFieldName()));
byte[] doc_B_id = ASCII.getBytes(((String) doc_B.getFieldValue(;
// we remove all references to B, because these become references to C
if (segment.connectedCitation()) {
ReferenceContainer<CitationReference> doc_A_ids = segment.urlCitation().remove(doc_B_id);
if (doc_A_ids == null) {
//System.out.println("*** document with canonical but no referrer: " + doc_B.getFieldValue(CollectionSchema.sku.getSolrFieldName()));
continue; // the document has a canonical tag but no referrer?
Iterator<CitationReference> doc_A_ids_iterator = doc_A_ids.entries();
// for each of the referrer A of B, set A as a referrer of C
while (doc_A_ids_iterator.hasNext()) {
CitationReference doc_A_citation =;
segment.urlCitation().add(doc_C_url.hash(), doc_A_citation);
if (MemoryControl.shortStatus()) {
ConcurrentLog.warn("CollectionConfiguration", "terminated canonical collection during postprocessing because of short memory");
} catch (InterruptedException e) {
} catch (SpaceExceededException e) {
if (patchquerycount != patchquerycountcheck) ConcurrentLog.warn("CollectionConfiguration", "ambiguous patchquery count for host " + host + ": expected=" + patchquerycount + ", counted=" + patchquerycountcheck);
// do the citation rank computation
if (collection1hosts.get(host) <= 0) continue;
// select all documents for each host
CRHost crh = new CRHost(segment, rrCache, host, 0.85d, 6);
int convergence_attempts = 0;
while (convergence_attempts++ < 30) {"CollectionConfiguration", "convergence step " + convergence_attempts + " for host " + host + " ...");
if (crh.convergenceStep()) break;
if (MemoryControl.shortStatus()) {
ConcurrentLog.warn("CollectionConfiguration", "terminated convergenceStep during postprocessing because of short memory");
}"CollectionConfiguration", "convergence for host " + host + " after " + convergence_attempts + " steps");
// we have now the cr for all documents of a specific host; we store them for later use
Map<String, CRV> crn = crh.normalize();
rankings.putAll(crn); // accumulate this here for usage in document update later
if (MemoryControl.shortStatus()) {
ConcurrentLog.warn("CollectionConfiguration", "terminated crn akkumulation during postprocessing because of short memory");
if (collection1hosts.size() != countcheck) ConcurrentLog.warn("CollectionConfiguration", "ambiguous host count: expected=" + collection1hosts.size() + ", counted=" + countcheck);
} catch (final IOException e2) {
collection1hosts = new ClusteredScoreMap<String>(true);
final Map<String, CRV> rankings = createRankingMap(segment, rrCache, collectionConnector, collection1hosts,
// process all documents at the webgraph for the outgoing links of this document
final AtomicInteger allcount = new AtomicInteger(0);
if (segment.fulltext().useWebgraph() && shallComputeCR) {
postprocessingActivity = "collecting host facets for webgraph cr calculation";"CollectionConfiguration", postprocessingActivity);
final Set<String> omitFields = new HashSet<String>();
// collect hosts from index which shall take part in citation computation
ReversibleScoreMap<String> webgraphhosts;
try {
Map<String, ReversibleScoreMap<String>> hostfacet = segment.fulltext().getWebgraphConnector().getFacets(webgraphquery, 10000000, WebgraphSchema.source_host_s.getSolrFieldName());
webgraphhosts = hostfacet.get(WebgraphSchema.source_host_s.getSolrFieldName());
} catch (final IOException e2) {
webgraphhosts = new ClusteredScoreMap<String>(true);
postprocessWebgraph(segment, webgraph, webgraphquery, rankings, allcount);
try {
final long start = System.currentTimeMillis();
for (String host: webgraphhosts.keyList(true)) {
if (webgraphhosts.get(host) <= 0) continue;
final String hostfinal = host;
// select all webgraph edges and modify their cr value
postprocessingActivity = "writing cr values to webgraph for host " + host;"CollectionConfiguration", postprocessingActivity);
String patchquery = WebgraphSchema.source_host_s.getSolrFieldName() + ":\"" + host + "\" AND " + WebgraphSchema.process_sxt.getSolrFieldName() + AbstractSolrConnector.CATCHALL_DTERM;
final long count = segment.fulltext().getWebgraphConnector().getCountByQuery("{!cache=false}" + patchquery);
int concurrency = Math.min((int) count, Math.max(1, Runtime.getRuntime().availableProcessors() / 4));"CollectionConfiguration", "collecting " + count + " documents from the webgraph, concurrency = " + concurrency);
final BlockingQueue<SolrDocument> docs = segment.fulltext().getWebgraphConnector().concurrentDocumentsByQuery(
WebgraphSchema.source_chars_i.getSolrFieldName() + " asc",
0, 100000000, Long.MAX_VALUE, concurrency + 1, concurrency, true
// TODO: add field list and do partial updates
final AtomicInteger proccount = new AtomicInteger(0);
Thread[] t = new Thread[concurrency];
for (final AtomicInteger i = new AtomicInteger(0); i.get() < t.length; i.incrementAndGet()) {
t[i.get()] = new Thread() {
private String name = "CollectionConfiguration.postprocessing.webgraph-" + i.get();
public void run() {
SolrDocument doc; String id;
try {
processloop: while ((doc = docs.take()) != AbstractSolrConnector.POISON_DOCUMENT) {
try {
SolrInputDocument sid = webgraph.toSolrInputDocument(doc, omitFields);
Collection<Object> proctags = doc.getFieldValues(WebgraphSchema.process_sxt.getSolrFieldName());
for (Object tag: proctags) try {
// switch over tag types
ProcessType tagtype = ProcessType.valueOf((String) tag);
// process all documents in collection
postprocessDocuments(segment, rrCache, harvestkey, byPartialUpdate, collectionConnector, collection,
collection1query, rankings, allcount);
// set cr values
if (tagtype == ProcessType.CITATION) {
if (segment.fulltext().useWebgraph() && webgraph.contains(WebgraphSchema.source_id_s) && webgraph.contains(WebgraphSchema.source_cr_host_norm_i)) {
id = (String) doc.getFieldValue(WebgraphSchema.source_id_s.getSolrFieldName());
CRV crv = rankings.get(id);
if (crv != null) {
sid.setField(WebgraphSchema.source_cr_host_norm_i.getSolrFieldName(), crv.crn);
if (webgraph.contains(WebgraphSchema.target_id_s) && webgraph.contains(WebgraphSchema.target_cr_host_norm_i)) {
id = (String) doc.getFieldValue(WebgraphSchema.target_id_s.getSolrFieldName());
CRV crv = rankings.get(id);
if (crv != null) {
sid.setField(WebgraphSchema.target_cr_host_norm_i.getSolrFieldName(), crv.crn);
} catch (IllegalArgumentException e) {
// write document back to index
try {
//segment.fulltext().getWebgraphConnector().deleteById((String) sid.getFieldValue(;
} catch (SolrException e) {
} catch (IOException e) {
if (proccount.get() % 1000 == 0) {
postprocessingActivity = "writing cr values to webgraph for host " + hostfinal + "postprocessed " + proccount + " from " + count + " documents; " +
(proccount.get() * 1000 / (System.currentTimeMillis() - start)) + " docs/second; " +
((System.currentTimeMillis() - start) * (count - proccount.get()) / proccount.get() / 60000) + " minutes remaining";
postprocessingCollection1Count = 0;
postprocessingWebgraphCount = 0;
postprocessingActivity = "postprocessing terminated";"CollectionConfiguration", postprocessingActivity);
} catch (Throwable e) {
continue processloop;
} catch (InterruptedException e) {
ConcurrentLog.warn("CollectionConfiguration", e.getMessage(), e);
for (int i = 0; i < t.length; i++) try {
if (t[i].isAlive()) t[i].interrupt();
} catch (InterruptedException e) {}
if (count != proccount.get()) ConcurrentLog.warn("CollectionConfiguration", "ambiguous webgraph document count for host " + host + ": expected=" + count + ", counted=" + proccount);
} catch (final IOException e2) {
ConcurrentLog.warn("CollectionConfiguration", e2.getMessage(), e2);
postprocessingRunning = false;
return allcount.get();
// process all documents in collection
private void postprocessDocuments(final Segment segment, final ReferenceReportCache rrCache,
final String harvestkey, final boolean byPartialUpdate, final SolrConnector collectionConnector,
final CollectionConfiguration collection, final String collection1query, final Map<String, CRV> rankings,
final AtomicInteger allcount) {
final Map<String, Long> hostExtentCache = new HashMap<String, Long>(); // a mapping from the host id to the number of documents which contain this host-id
final Set<String> uniqueURLs = new ConcurrentHashSet<String>(); // will be used in a concurrent environment
final Set<String> omitFields = new HashSet<String>();
final Collection<String> failids = new ArrayList<String>();
final Collection<String> failids = new ConcurrentHashSet<String>();
final AtomicInteger countcheck = new AtomicInteger(0);
final AtomicInteger proccount = new AtomicInteger();
final AtomicInteger proccount_referencechange = new AtomicInteger();
@ -1400,7 +1226,7 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
final Thread rewriteThread[] = new Thread[concurrency];
for (int rewrite_start = 0; rewrite_start < concurrency; rewrite_start++) {
rewriteThread[rewrite_start] = new Thread() {
rewriteThread[rewrite_start] = new Thread("CollectionConfiguration.postprocessing.rewriteThread-" + rewrite_start) {
public void run() {
SolrDocument doc;
@ -1516,12 +1342,207 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
ConcurrentLog.warn("CollectionConfiguration", e3.getMessage(), e3);
collectionConnector.commit(true); // make changes available directly to prevent that the process repeats again
postprocessingCollection1Count = 0;
postprocessingWebgraphCount = 0;
postprocessingActivity = "postprocessing terminated";
private void postprocessWebgraph(final Segment segment, final WebgraphConfiguration webgraph, String webgraphquery,
final Map<String, CRV> rankings, final AtomicInteger allcount) {
postprocessingActivity = "collecting host facets for webgraph cr calculation";"CollectionConfiguration", postprocessingActivity);
postprocessingRunning = false;
return allcount.get();
final Set<String> omitFields = new HashSet<String>();
// collect hosts from index which shall take part in citation computation
ReversibleScoreMap<String> webgraphhosts;
try {
Map<String, ReversibleScoreMap<String>> hostfacet = segment.fulltext().getWebgraphConnector().getFacets(webgraphquery, 10000000, WebgraphSchema.source_host_s.getSolrFieldName());
webgraphhosts = hostfacet.get(WebgraphSchema.source_host_s.getSolrFieldName());
} catch (final IOException e2) {
webgraphhosts = new ClusteredScoreMap<String>(true);
try {
final long start = System.currentTimeMillis();
for (String host: webgraphhosts.keyList(true)) {
if (webgraphhosts.get(host) <= 0) continue;
final String hostfinal = host;
// select all webgraph edges and modify their cr value
postprocessingActivity = "writing cr values to webgraph for host " + host;"CollectionConfiguration", postprocessingActivity);
String patchquery = WebgraphSchema.source_host_s.getSolrFieldName() + ":\"" + host + "\" AND " + WebgraphSchema.process_sxt.getSolrFieldName() + AbstractSolrConnector.CATCHALL_DTERM;
final long count = segment.fulltext().getWebgraphConnector().getCountByQuery("{!cache=false}" + patchquery);
int concurrency = Math.min((int) count, Math.max(1, Runtime.getRuntime().availableProcessors() / 4));"CollectionConfiguration", "collecting " + count + " documents from the webgraph, concurrency = " + concurrency);
final BlockingQueue<SolrDocument> docs = segment.fulltext().getWebgraphConnector().concurrentDocumentsByQuery(
WebgraphSchema.source_chars_i.getSolrFieldName() + " asc",
0, 100000000, Long.MAX_VALUE, concurrency + 1, concurrency, true
// TODO: add field list and do partial updates
final AtomicInteger proccount = new AtomicInteger(0);
Thread[] t = new Thread[concurrency];
for (int i = 0; i < t.length; i++) {
t[i] = new Thread("CollectionConfiguration.postprocessing.webgraph-" + i) {
public void run() {
SolrDocument doc; String id;
try {
processloop: while ((doc = docs.take()) != AbstractSolrConnector.POISON_DOCUMENT) {
try {
SolrInputDocument sid = webgraph.toSolrInputDocument(doc, omitFields);
Collection<Object> proctags = doc.getFieldValues(WebgraphSchema.process_sxt.getSolrFieldName());
for (Object tag: proctags) try {
// switch over tag types
ProcessType tagtype = ProcessType.valueOf((String) tag);
// set cr values
if (tagtype == ProcessType.CITATION) {
if (segment.fulltext().useWebgraph() && webgraph.contains(WebgraphSchema.source_id_s) && webgraph.contains(WebgraphSchema.source_cr_host_norm_i)) {
id = (String) doc.getFieldValue(WebgraphSchema.source_id_s.getSolrFieldName());
CRV crv = rankings.get(id);
if (crv != null) {
sid.setField(WebgraphSchema.source_cr_host_norm_i.getSolrFieldName(), crv.crn);
if (webgraph.contains(WebgraphSchema.target_id_s) && webgraph.contains(WebgraphSchema.target_cr_host_norm_i)) {
id = (String) doc.getFieldValue(WebgraphSchema.target_id_s.getSolrFieldName());
CRV crv = rankings.get(id);
if (crv != null) {
sid.setField(WebgraphSchema.target_cr_host_norm_i.getSolrFieldName(), crv.crn);
} catch (IllegalArgumentException e) {
// write document back to index
try {
//segment.fulltext().getWebgraphConnector().deleteById((String) sid.getFieldValue(;
} catch (SolrException e) {
} catch (IOException e) {
if (proccount.get() % 1000 == 0) {
postprocessingActivity = "writing cr values to webgraph for host " + hostfinal + "postprocessed " + proccount + " from " + count + " documents; " +
(proccount.get() * 1000 / (System.currentTimeMillis() - start)) + " docs/second; " +
((System.currentTimeMillis() - start) * (count - proccount.get()) / proccount.get() / 60000) + " minutes remaining";"CollectionConfiguration", postprocessingActivity);
} catch (Throwable e) {
continue processloop;
} catch (InterruptedException e) {
ConcurrentLog.warn("CollectionConfiguration", e.getMessage(), e);
for (int i = 0; i < t.length; i++) try {
if (t[i].isAlive()) t[i].interrupt();
} catch (InterruptedException e) {}
if (count != proccount.get()) ConcurrentLog.warn("CollectionConfiguration", "ambiguous webgraph document count for host " + host + ": expected=" + count + ", counted=" + proccount);
} catch (final IOException e2) {
ConcurrentLog.warn("CollectionConfiguration", e2.getMessage(), e2);
private Map<String, CRV> createRankingMap(final Segment segment, final ReferenceReportCache rrCache,
final SolrConnector collectionConnector, ReversibleScoreMap<String> collection1hosts,
boolean shallComputeCR) {
final Map<String, CRV> rankings = new ConcurrentHashMap<String, CRV>();
if (shallComputeCR) try {
int concurrency = Math.min(collection1hosts.size(), Runtime.getRuntime().availableProcessors());
postprocessingActivity = "collecting cr for " + collection1hosts.size() + " hosts, concurrency = " + concurrency;"CollectionConfiguration", postprocessingActivity);
int countcheck = 0;
for (String host: collection1hosts.keyList(true)) {
// Patch the citation index for links with canonical tags.
// This shall fulfill the following requirement:
// If a document A links to B and B contains a 'canonical C', then the citation rank computation shall consider that A links to C and B does not link to C.
// To do so, we first must collect all canonical links, find all references to them, get the anchor list of the documents and patch the citation reference of these links
String patchquery = CollectionSchema.host_s.getSolrFieldName() + ":" + host + " AND " + CollectionSchema.canonical_s.getSolrFieldName() + AbstractSolrConnector.CATCHALL_DTERM;
long patchquerycount = collectionConnector.getCountByQuery("{!cache=false}" + patchquery);
BlockingQueue<SolrDocument> documents_with_canonical_tag = collectionConnector.concurrentDocumentsByQuery(patchquery, CollectionSchema.url_chars_i.getSolrFieldName() + " asc", 0, 100000000, Long.MAX_VALUE, 20, 1, true,, CollectionSchema.sku.getSolrFieldName(), CollectionSchema.canonical_s.getSolrFieldName());
SolrDocument doc_B;
int patchquerycountcheck = 0;
try {
while ((doc_B = documents_with_canonical_tag.take()) != AbstractSolrConnector.POISON_DOCUMENT) {
// find all documents which link to the canonical doc
DigestURL doc_C_url = new DigestURL((String) doc_B.getFieldValue(CollectionSchema.canonical_s.getSolrFieldName()));
byte[] doc_B_id = ASCII.getBytes(((String) doc_B.getFieldValue(;
// we remove all references to B, because these become references to C
if (segment.connectedCitation()) {
ReferenceContainer<CitationReference> doc_A_ids = segment.urlCitation().remove(doc_B_id);
if (doc_A_ids == null) {
//System.out.println("*** document with canonical but no referrer: " + doc_B.getFieldValue(CollectionSchema.sku.getSolrFieldName()));
continue; // the document has a canonical tag but no referrer?
Iterator<CitationReference> doc_A_ids_iterator = doc_A_ids.entries();
// for each of the referrer A of B, set A as a referrer of C
while (doc_A_ids_iterator.hasNext()) {
CitationReference doc_A_citation =;
segment.urlCitation().add(doc_C_url.hash(), doc_A_citation);
if (MemoryControl.shortStatus()) {
ConcurrentLog.warn("CollectionConfiguration", "terminated canonical collection during postprocessing because of short memory");
} catch (InterruptedException e) {
} catch (SpaceExceededException e) {
if (patchquerycount != patchquerycountcheck) ConcurrentLog.warn("CollectionConfiguration", "ambiguous patchquery count for host " + host + ": expected=" + patchquerycount + ", counted=" + patchquerycountcheck);
// do the citation rank computation
if (collection1hosts.get(host) <= 0) continue;
// select all documents for each host
CRHost crh = new CRHost(segment, rrCache, host, 0.85d, 6);
int convergence_attempts = 0;
while (convergence_attempts++ < 30) {"CollectionConfiguration", "convergence step " + convergence_attempts + " for host " + host + " ...");
if (crh.convergenceStep()) break;
if (MemoryControl.shortStatus()) {
ConcurrentLog.warn("CollectionConfiguration", "terminated convergenceStep during postprocessing because of short memory");
}"CollectionConfiguration", "convergence for host " + host + " after " + convergence_attempts + " steps");
// we have now the cr for all documents of a specific host; we store them for later use
Map<String, CRV> crn = crh.normalize();
rankings.putAll(crn); // accumulate this here for usage in document update later
if (MemoryControl.shortStatus()) {
ConcurrentLog.warn("CollectionConfiguration", "terminated crn akkumulation during postprocessing because of short memory");
if (collection1hosts.size() != countcheck) ConcurrentLog.warn("CollectionConfiguration", "ambiguous host count: expected=" + collection1hosts.size() + ", counted=" + countcheck);
} catch (final IOException e2) {
collection1hosts = new ClusteredScoreMap<String>(true);
return rankings;
public void postprocessing_http_unique(final Segment segment, final SolrDocument doc, final SolrInputDocument sid, final DigestURL url) {
