Postprocessing refactoring

Added Javadocs to refactored methods.
Added log warnings instead of silently failing some errors.
Only fill collection1hosts when required ( shallComputeCR true).
pull/71/head
luccioman 9 years ago
parent 42f45760ed
commit 8c49a755da

@ -1081,10 +1081,12 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
} }
/** /**
* post-processing steps for all entries that have a process tag assigned * Performs post-processing steps for all entries that have a process tag assigned
* @param connector * @param segment Solr segment. Must not be null.
* @param urlCitation * @param rrCache reference report cache for the segment.
* @return * @param harvestkey key from a harvest process, used to mark documents needing post-processing
* @param byPartialUpdate when true, perform partial updates on documents
* @return the number of post processed documents
*/ */
public int postprocessing(final Segment segment, final ReferenceReportCache rrCache, final String harvestkey, final boolean byPartialUpdate) { public int postprocessing(final Segment segment, final ReferenceReportCache rrCache, final String harvestkey, final boolean byPartialUpdate) {
if (!this.contains(CollectionSchema.process_sxt)) return 0; if (!this.contains(CollectionSchema.process_sxt)) return 0;
@ -1110,6 +1112,17 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
postprocessingWebgraphCount = -1; postprocessingWebgraphCount = -1;
} }
postprocessingActivity = "create ranking map";
ConcurrentLog.info("CollectionConfiguration", postprocessingActivity);
boolean shallComputeCR = (segment.fulltext().useWebgraph() &&
((webgraph.contains(WebgraphSchema.source_id_s) && webgraph.contains(WebgraphSchema.source_cr_host_norm_i)) ||
(webgraph.contains(WebgraphSchema.target_id_s) && webgraph.contains(WebgraphSchema.target_cr_host_norm_i))) ||
(collection.contains(CollectionSchema.cr_host_count_i) &&
collection.contains(CollectionSchema.cr_host_chance_d) &&
collection.contains(CollectionSchema.cr_host_norm_i)));
// create the ranking map
final Map<String, CRV> rankings;
if(shallComputeCR) {
// collect hosts from index which shall take part in citation computation // collect hosts from index which shall take part in citation computation
postprocessingActivity = "collecting host facets for collection"; postprocessingActivity = "collecting host facets for collection";
ConcurrentLog.info("CollectionConfiguration", postprocessingActivity); ConcurrentLog.info("CollectionConfiguration", postprocessingActivity);
@ -1122,17 +1135,10 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
collection1hosts = new ClusteredScoreMap<String>(true); collection1hosts = new ClusteredScoreMap<String>(true);
} }
postprocessingActivity = "create ranking map"; rankings = createRankingMap(segment, rrCache, collectionConnector, collection1hosts);
ConcurrentLog.info("CollectionConfiguration", postprocessingActivity); } else {
boolean shallComputeCR = (segment.fulltext().useWebgraph() && rankings = new ConcurrentHashMap<String, CRV>();
((webgraph.contains(WebgraphSchema.source_id_s) && webgraph.contains(WebgraphSchema.source_cr_host_norm_i)) || }
(webgraph.contains(WebgraphSchema.target_id_s) && webgraph.contains(WebgraphSchema.target_cr_host_norm_i))) ||
(collection.contains(CollectionSchema.cr_host_count_i) &&
collection.contains(CollectionSchema.cr_host_chance_d) &&
collection.contains(CollectionSchema.cr_host_norm_i)));
// create the ranking map
final Map<String, CRV> rankings = createRankingMap(segment, rrCache, collectionConnector, collection1hosts,
shallComputeCR);
// process all documents at the webgraph for the outgoing links of this document // process all documents at the webgraph for the outgoing links of this document
final AtomicInteger allcount = new AtomicInteger(0); final AtomicInteger allcount = new AtomicInteger(0);
@ -1153,6 +1159,18 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
return allcount.get(); return allcount.get();
} }
/**
* Performs postprocessing steps on the main documents dollection.
* @param segment Solr segment.
* @param rrCache reference report cache for the segment.
* @param harvestkey key from a harvest process, used to mark documents needing post-processing
* @param byPartialUpdate when true, perform partial updates on documents
* @param collectionConnector connector to the main Solr collection
* @param collection schema configuration for the collection
* @param collection1query query used to harvest items to postprocess in the main collection
* @param rankings postprocessed rankings
* @param allcount global postprocessed documents count
*/
private void postprocessDocuments(final Segment segment, final ReferenceReportCache rrCache, private void postprocessDocuments(final Segment segment, final ReferenceReportCache rrCache,
final String harvestkey, final boolean byPartialUpdate, final SolrConnector collectionConnector, final String harvestkey, final boolean byPartialUpdate, final SolrConnector collectionConnector,
final CollectionConfiguration collection, final String collection1query, final Map<String, CRV> rankings, final CollectionConfiguration collection, final String collection1query, final Map<String, CRV> rankings,
@ -1344,6 +1362,14 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
collectionConnector.commit(true); // make changes available directly to prevent that the process repeats again collectionConnector.commit(true); // make changes available directly to prevent that the process repeats again
} }
/**
* Perform postprocessing steps on the webgraph core.
* @param segment Solr segment.
* @param webgraph webgraph schema configuration
* @param webgraphquery query used to harvest items to postprocess in the webgraph collection
* @param rankings postprocessed rankings
* @param allcount global postprocessed documents count
*/
private void postprocessWebgraph(final Segment segment, final WebgraphConfiguration webgraph, String webgraphquery, private void postprocessWebgraph(final Segment segment, final WebgraphConfiguration webgraph, String webgraphquery,
final Map<String, CRV> rankings, final AtomicInteger allcount) { final Map<String, CRV> rankings, final AtomicInteger allcount) {
postprocessingActivity = "collecting host facets for webgraph cr calculation"; postprocessingActivity = "collecting host facets for webgraph cr calculation";
@ -1461,11 +1487,18 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
} }
} }
/**
* Patches the citation index for links with canonical tags and perform the citation rank computation
* @param segment Solr segment
* @param rrCache reference report cache for the segment
* @param collectionConnector default connector to the Solr segment
* @param collection1hosts hosts from index which shall take part in citation computation
* @return the ranking map
*/
private Map<String, CRV> createRankingMap(final Segment segment, final ReferenceReportCache rrCache, private Map<String, CRV> createRankingMap(final Segment segment, final ReferenceReportCache rrCache,
final SolrConnector collectionConnector, ReversibleScoreMap<String> collection1hosts, final SolrConnector collectionConnector, ReversibleScoreMap<String> collection1hosts) {
boolean shallComputeCR) {
final Map<String, CRV> rankings = new ConcurrentHashMap<String, CRV>(); final Map<String, CRV> rankings = new ConcurrentHashMap<String, CRV>();
if (shallComputeCR) try { try {
int concurrency = Math.min(collection1hosts.size(), Runtime.getRuntime().availableProcessors()); int concurrency = Math.min(collection1hosts.size(), Runtime.getRuntime().availableProcessors());
postprocessingActivity = "collecting cr for " + collection1hosts.size() + " hosts, concurrency = " + concurrency; postprocessingActivity = "collecting cr for " + collection1hosts.size() + " hosts, concurrency = " + concurrency;
ConcurrentLog.info("CollectionConfiguration", postprocessingActivity); ConcurrentLog.info("CollectionConfiguration", postprocessingActivity);
@ -1545,6 +1578,14 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
return rankings; return rankings;
} }
/**
* Search in the segment any document having the same url as doc but with the opposite secure/unsecure (https or http) version of the protocol.
* Then updates accordingly the document http_unique_b field.
* @param segment Solr segment
* @param doc document to process
* @param sid updatable version of the document
* @param url document's url
*/
public void postprocessing_http_unique(final Segment segment, final SolrDocument doc, final SolrInputDocument sid, final DigestURL url) { public void postprocessing_http_unique(final Segment segment, final SolrDocument doc, final SolrInputDocument sid, final DigestURL url) {
if (!this.contains(CollectionSchema.http_unique_b)) return; if (!this.contains(CollectionSchema.http_unique_b)) return;
if (!url.isHTTPS() && !url.isHTTP()) return; if (!url.isHTTPS() && !url.isHTTP()) return;
@ -1552,9 +1593,19 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
DigestURL u = new DigestURL((url.isHTTP() ? "https://" : "http://") + url.urlstub(true, true)); DigestURL u = new DigestURL((url.isHTTP() ? "https://" : "http://") + url.urlstub(true, true));
SolrDocument d = segment.fulltext().getDefaultConnector().getDocumentById(ASCII.String(u.hash()), CollectionSchema.http_unique_b.getSolrFieldName()); SolrDocument d = segment.fulltext().getDefaultConnector().getDocumentById(ASCII.String(u.hash()), CollectionSchema.http_unique_b.getSolrFieldName());
set_unique_flag(CollectionSchema.http_unique_b, doc, sid, d); set_unique_flag(CollectionSchema.http_unique_b, doc, sid, d);
} catch (final IOException e) {} } catch (final IOException e) {
ConcurrentLog.warn("CollectionConfiguration", "Failed to postProcess http_unique_b field" + e.getMessage() != null ? " : " + e.getMessage() : ".");
}
} }
/**
* Search in the segment any document having the same url as doc but with or without the www prefix.
* Then updates accordingly the document www_unique_b field.
* @param segment Solr segment
* @param doc document to process
* @param sid updatable version of the document
* @param url document's url
*/
public void postprocessing_www_unique(final Segment segment, final SolrDocument doc, final SolrInputDocument sid, final DigestURL url) { public void postprocessing_www_unique(final Segment segment, final SolrDocument doc, final SolrInputDocument sid, final DigestURL url) {
if (!this.contains(CollectionSchema.www_unique_b)) return; if (!this.contains(CollectionSchema.www_unique_b)) return;
final String us = url.urlstub(true, true); final String us = url.urlstub(true, true);
@ -1562,7 +1613,9 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
DigestURL u = new DigestURL(url.getProtocol() + (us.startsWith("www.") ? "://" + us.substring(4) : "://www." + us)); DigestURL u = new DigestURL(url.getProtocol() + (us.startsWith("www.") ? "://" + us.substring(4) : "://www." + us));
SolrDocument d = segment.fulltext().getDefaultConnector().getDocumentById(ASCII.String(u.hash()), CollectionSchema.www_unique_b.getSolrFieldName()); SolrDocument d = segment.fulltext().getDefaultConnector().getDocumentById(ASCII.String(u.hash()), CollectionSchema.www_unique_b.getSolrFieldName());
set_unique_flag(CollectionSchema.www_unique_b, doc, sid, d); set_unique_flag(CollectionSchema.www_unique_b, doc, sid, d);
} catch (final IOException e) {} } catch (final IOException e) {
ConcurrentLog.warn("CollectionConfiguration", "Failed to postProcess www_unique_b field" + e.getMessage() != null ? " : " + e.getMessage() : ".");
}
} }
private void set_unique_flag(CollectionSchema field, final SolrDocument doc, final SolrInputDocument sid, final SolrDocument d) { private void set_unique_flag(CollectionSchema field, final SolrDocument doc, final SolrInputDocument sid, final SolrDocument d) {

Loading…
Cancel
Save