added concurrency to postprocess rewrite process

pull/1/head
Michael Peter Christen 11 years ago
parent a1e8bdd5e9
commit 191ec8c82a

@ -1227,105 +1227,125 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
}
// process all documents in collection
Map<String, Long> hostExtentCache = new HashMap<String, Long>(); // a mapping from the host id to the number of documents which contain this host-id
Set<String> uniqueURLs = new HashSet<String>();
final Map<String, Long> hostExtentCache = new HashMap<String, Long>(); // a mapping from the host id to the number of documents which contain this host-id
final Set<String> uniqueURLs = new HashSet<String>();
try {
Set<String> omitFields = new HashSet<String>();
final Set<String> omitFields = new HashSet<String>();
omitFields.add(CollectionSchema.process_sxt.getSolrFieldName());
omitFields.add(CollectionSchema.harvestkey_s.getSolrFieldName());
int proccount = 0, proccount_referencechange = 0, proccount_citationchange = 0;
long count = collectionConnector.getCountByQuery(collection1query);
long start = System.currentTimeMillis();
final long count = collectionConnector.getCountByQuery(collection1query);
final long start = System.currentTimeMillis();
final int concurrency = Runtime.getRuntime().availableProcessors();
final boolean reference_computation = this.contains(CollectionSchema.references_i) &&
this.contains(CollectionSchema.references_internal_i) &&
this.contains(CollectionSchema.references_external_i) &&
this.contains(CollectionSchema.references_exthosts_i);
postprocessingActivity = "collecting " + count + " documents from the collection for harvestkey " + harvestkey;
ConcurrentLog.info("CollectionConfiguration", postprocessingActivity);
BlockingQueue<SolrDocument> docs = collectionConnector.concurrentDocumentsByQuery(
final BlockingQueue<SolrDocument> docs = collectionConnector.concurrentDocumentsByQuery(
collection1query,
(this.contains(CollectionSchema.http_unique_b) || this.contains(CollectionSchema.www_unique_b)) ?
CollectionSchema.host_subdomain_s.getSolrFieldName() + " asc," + // sort on subdomain to get hosts without subdomain first; that gives an opportunity to set www_unique_b flag to false
CollectionSchema.url_protocol_s.getSolrFieldName() + " asc" // sort on protocol to get http before https; that gives an opportunity to set http_unique_b flag to false
: null, // null sort is faster!
0, 100000000, Long.MAX_VALUE, 200, 1);
int countcheck = 0;
Collection<String> failids = new ArrayList<String>();
SolrDocument doc;
while ((doc = docs.take()) != AbstractSolrConnector.POISON_DOCUMENT) {
// for each to-be-processed entry work on the process tag
Collection<Object> proctags = doc.getFieldValues(CollectionSchema.process_sxt.getSolrFieldName());
final String u = (String) doc.getFieldValue(CollectionSchema.sku.getSolrFieldName());
final String i = (String) doc.getFieldValue(CollectionSchema.id.getSolrFieldName());
try {
DigestURL url = new DigestURL(u, ASCII.getBytes(i));
byte[] id = url.hash();
SolrInputDocument sid = collection.toSolrInputDocument(doc, omitFields);
for (Object tag: proctags) try {
// switch over tag types
ProcessType tagtype = ProcessType.valueOf((String) tag);
if (tagtype == ProcessType.CITATION &&
collection.contains(CollectionSchema.cr_host_count_i) &&
collection.contains(CollectionSchema.cr_host_chance_d) &&
collection.contains(CollectionSchema.cr_host_norm_i)) {
CRV crv = rankings.remove(ASCII.String(id)); // instead of 'get'ting the CRV, we also remove it because we will not need it again and free some memory here
if (crv != null) {
sid.setField(CollectionSchema.cr_host_count_i.getSolrFieldName(), crv.count);
sid.setField(CollectionSchema.cr_host_chance_d.getSolrFieldName(), crv.cr);
sid.setField(CollectionSchema.cr_host_norm_i.getSolrFieldName(), crv.crn);
proccount_citationchange++;
0, 100000000, Long.MAX_VALUE, 100, concurrency);
final AtomicInteger proccount = new AtomicInteger();
final AtomicInteger proccount_referencechange = new AtomicInteger();
final AtomicInteger proccount_citationchange = new AtomicInteger();
final AtomicInteger countcheck = new AtomicInteger(0);
final Collection<String> failids = new ArrayList<String>();
final Thread rewriteThread[] = new Thread[concurrency];
for (int rewrite_start = 0; rewrite_start < concurrency; rewrite_start++) {
rewriteThread[rewrite_start] = new Thread() {
@Override
public void run() {
SolrDocument doc;
try {
while ((doc = docs.take()) != AbstractSolrConnector.POISON_DOCUMENT) {
// for each to-be-processed entry work on the process tag
Collection<Object> proctags = doc.getFieldValues(CollectionSchema.process_sxt.getSolrFieldName());
final String u = (String) doc.getFieldValue(CollectionSchema.sku.getSolrFieldName());
final String i = (String) doc.getFieldValue(CollectionSchema.id.getSolrFieldName());
try {
DigestURL url = new DigestURL(u, ASCII.getBytes(i));
byte[] id = url.hash();
SolrInputDocument sid = collection.toSolrInputDocument(doc, omitFields);
for (Object tag: proctags) try {
// switch over tag types
ProcessType tagtype = ProcessType.valueOf((String) tag);
if (tagtype == ProcessType.CITATION &&
collection.contains(CollectionSchema.cr_host_count_i) &&
collection.contains(CollectionSchema.cr_host_chance_d) &&
collection.contains(CollectionSchema.cr_host_norm_i)) {
CRV crv = rankings.remove(ASCII.String(id)); // instead of 'get'ting the CRV, we also remove it because we will not need it again and free some memory here
if (crv != null) {
sid.setField(CollectionSchema.cr_host_count_i.getSolrFieldName(), crv.count);
sid.setField(CollectionSchema.cr_host_chance_d.getSolrFieldName(), crv.cr);
sid.setField(CollectionSchema.cr_host_norm_i.getSolrFieldName(), crv.crn);
proccount_citationchange.incrementAndGet();
}
}
if (tagtype == ProcessType.UNIQUE) {
postprocessing_http_unique(segment, sid, url);
postprocessing_www_unique(segment, sid, url);
postprocessing_doublecontent(segment, uniqueURLs, sid, url);
}
} catch (IllegalArgumentException e) {}
// compute references
if (reference_computation) {
String hosthash = url.hosthash();
if (!hostExtentCache.containsKey(hosthash)) {
StringBuilder q = new StringBuilder();
q.append(CollectionSchema.host_id_s.getSolrFieldName()).append(":\"").append(hosthash).append("\" AND ").append(CollectionSchema.httpstatus_i.getSolrFieldName()).append(":200");
long hostExtentCount = segment.fulltext().getDefaultConnector().getCountByQuery(q.toString());
hostExtentCache.put(hosthash, hostExtentCount);
}
if (postprocessing_references(rrCache, sid, url, hostExtentCache)) proccount_referencechange.incrementAndGet();
}
// all processing steps checked, remove the processing and harvesting key
sid.removeField(CollectionSchema.process_sxt.getSolrFieldName());
sid.removeField(CollectionSchema.harvestkey_s.getSolrFieldName());
// send back to index
//collectionConnector.deleteById(i);
collectionConnector.add(sid);
int thiscount = proccount.incrementAndGet(); allcount.incrementAndGet();
if (thiscount % 100 == 0) {
postprocessingActivity = "postprocessed " + thiscount + " from " + count + " collection documents; " +
(thiscount * 60000 / (System.currentTimeMillis() - start)) + " ppm; " +
((System.currentTimeMillis() - start) * (count - thiscount) / thiscount / 60000) + " minutes remaining";
ConcurrentLog.info("CollectionConfiguration", postprocessingActivity);
}
} catch (final Throwable e1) {
ConcurrentLog.logException(e1);
failids.add(i);
}
countcheck.incrementAndGet();
}
} catch (InterruptedException e) {
ConcurrentLog.logException(e);
}
if (tagtype == ProcessType.UNIQUE) {
postprocessing_http_unique(segment, sid, url);
postprocessing_www_unique(segment, sid, url);
postprocessing_doublecontent(segment, uniqueURLs, sid, url);
}
} catch (IllegalArgumentException e) {}
// refresh the link count; it's 'cheap' to do this here
String hosthash = url.hosthash();
if (!hostExtentCache.containsKey(hosthash)) {
StringBuilder q = new StringBuilder();
q.append(CollectionSchema.host_id_s.getSolrFieldName()).append(":\"").append(hosthash).append("\" AND ").append(CollectionSchema.httpstatus_i.getSolrFieldName()).append(":200");
long hostExtentCount = segment.fulltext().getDefaultConnector().getCountByQuery(q.toString());
hostExtentCache.put(hosthash, hostExtentCount);
}
if (this.contains(CollectionSchema.references_i) &&
this.contains(CollectionSchema.references_internal_i) &&
this.contains(CollectionSchema.references_external_i) &&
this.contains(CollectionSchema.references_exthosts_i)) {
if (postprocessing_references(rrCache, sid, url, hostExtentCache)) proccount_referencechange++;
}
// all processing steps checked, remove the processing and harvesting key
sid.removeField(CollectionSchema.process_sxt.getSolrFieldName());
sid.removeField(CollectionSchema.harvestkey_s.getSolrFieldName());
// send back to index
//collectionConnector.deleteById(i);
collectionConnector.add(sid);
proccount++; allcount.incrementAndGet();
if (proccount % 100 == 0) {
postprocessingActivity = "postprocessed " + proccount + " from " + count + " collection documents; " +
(proccount * 60000 / (System.currentTimeMillis() - start)) + " ppm; " +
((System.currentTimeMillis() - start) * (count - proccount) / proccount / 60000) + " minutes remaining";
ConcurrentLog.info("CollectionConfiguration", postprocessingActivity);
}
} catch (final Throwable e1) {
ConcurrentLog.logException(e1);
failids.add(i);
}
countcheck++;
};
rewriteThread[rewrite_start].start();
}
// wait for termination
for (int rewrite_start = 0; rewrite_start < concurrency; rewrite_start++) rewriteThread[rewrite_start].join();
if (failids.size() > 0) {
ConcurrentLog.info("CollectionConfiguration", "cleanup_processing: deleting " + failids.size() + " documents which have permanent execution fails");
collectionConnector.deleteByIds(failids);
}
if (count != countcheck) ConcurrentLog.warn("CollectionConfiguration", "ambiguous collection document count for harvestkey " + harvestkey + ": expected=" + count + ", counted=" + countcheck); // big gap for harvestkey = null
ConcurrentLog.info("CollectionConfiguration", "cleanup_processing: re-calculated " + proccount+ " new documents, " +
if (count != countcheck.get()) ConcurrentLog.warn("CollectionConfiguration", "ambiguous collection document count for harvestkey " + harvestkey + ": expected=" + count + ", counted=" + countcheck); // big gap for harvestkey = null
ConcurrentLog.info("CollectionConfiguration", "cleanup_processing: re-calculated " + proccount + " new documents, " +
proccount_referencechange + " reference-count changes, " +
proccount_citationchange + " citation ranking changes.");
} catch (final InterruptedException e2) {

Loading…
Cancel
Save