make sure that the postprocessing background thread never dies by any

exception
pull/1/head
orbiter 11 years ago
parent 29ccbf6491
commit d68438c3d9

@ -1163,52 +1163,59 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
Thread.currentThread().setName(name);
SolrDocument doc; String id;
try {
while ((doc = docs.take()) != AbstractSolrConnector.POISON_DOCUMENT) {
SolrInputDocument sid = webgraph.toSolrInputDocument(doc, omitFields);
Collection<Object> proctags = doc.getFieldValues(WebgraphSchema.process_sxt.getSolrFieldName());
for (Object tag: proctags) try {
// switch over tag types
ProcessType tagtype = ProcessType.valueOf((String) tag);
processloop: while ((doc = docs.take()) != AbstractSolrConnector.POISON_DOCUMENT) {
try {
SolrInputDocument sid = webgraph.toSolrInputDocument(doc, omitFields);
Collection<Object> proctags = doc.getFieldValues(WebgraphSchema.process_sxt.getSolrFieldName());
for (Object tag: proctags) try {
// set cr values
if (tagtype == ProcessType.CITATION) {
if (segment.fulltext().useWebgraph() && webgraph.contains(WebgraphSchema.source_id_s) && webgraph.contains(WebgraphSchema.source_cr_host_norm_i)) {
id = (String) doc.getFieldValue(WebgraphSchema.source_id_s.getSolrFieldName());
CRV crv = rankings.get(id);
if (crv != null) {
sid.setField(WebgraphSchema.source_cr_host_norm_i.getSolrFieldName(), crv.crn);
// switch over tag types
ProcessType tagtype = ProcessType.valueOf((String) tag);
// set cr values
if (tagtype == ProcessType.CITATION) {
if (segment.fulltext().useWebgraph() && webgraph.contains(WebgraphSchema.source_id_s) && webgraph.contains(WebgraphSchema.source_cr_host_norm_i)) {
id = (String) doc.getFieldValue(WebgraphSchema.source_id_s.getSolrFieldName());
CRV crv = rankings.get(id);
if (crv != null) {
sid.setField(WebgraphSchema.source_cr_host_norm_i.getSolrFieldName(), crv.crn);
}
}
}
if (webgraph.contains(WebgraphSchema.target_id_s) && webgraph.contains(WebgraphSchema.target_cr_host_norm_i)) {
id = (String) doc.getFieldValue(WebgraphSchema.target_id_s.getSolrFieldName());
CRV crv = rankings.get(id);
if (crv != null) {
sid.setField(WebgraphSchema.target_cr_host_norm_i.getSolrFieldName(), crv.crn);
if (webgraph.contains(WebgraphSchema.target_id_s) && webgraph.contains(WebgraphSchema.target_cr_host_norm_i)) {
id = (String) doc.getFieldValue(WebgraphSchema.target_id_s.getSolrFieldName());
CRV crv = rankings.get(id);
if (crv != null) {
sid.setField(WebgraphSchema.target_cr_host_norm_i.getSolrFieldName(), crv.crn);
}
}
}
} catch (IllegalArgumentException e) {
ConcurrentLog.logException(e);
}
} catch (IllegalArgumentException e) {}
// write document back to index
try {
sid.removeField(WebgraphSchema.process_sxt.getSolrFieldName());
sid.removeField(WebgraphSchema.harvestkey_s.getSolrFieldName());
//segment.fulltext().getWebgraphConnector().deleteById((String) sid.getFieldValue(WebgraphSchema.id.getSolrFieldName()));
segment.fulltext().getWebgraphConnector().add(sid);
} catch (SolrException e) {
ConcurrentLog.logException(e);
} catch (IOException e) {
// write document back to index
try {
sid.removeField(WebgraphSchema.process_sxt.getSolrFieldName());
sid.removeField(WebgraphSchema.harvestkey_s.getSolrFieldName());
//segment.fulltext().getWebgraphConnector().deleteById((String) sid.getFieldValue(WebgraphSchema.id.getSolrFieldName()));
segment.fulltext().getWebgraphConnector().add(sid);
} catch (SolrException e) {
ConcurrentLog.logException(e);
} catch (IOException e) {
ConcurrentLog.logException(e);
}
proccount.incrementAndGet();
allcount.incrementAndGet();
if (proccount.get() % 1000 == 0) {
postprocessingActivity = "writing cr values to webgraph for host " + hostfinal + "postprocessed " + proccount + " from " + count + " documents; " +
(proccount.get() * 1000 / (System.currentTimeMillis() - start)) + " docs/second; " +
((System.currentTimeMillis() - start) * (count - proccount.get()) / proccount.get() / 60000) + " minutes remaining";
ConcurrentLog.info("CollectionConfiguration", postprocessingActivity);
}
} catch (Throwable e) {
ConcurrentLog.logException(e);
}
proccount.incrementAndGet();
allcount.incrementAndGet();
if (proccount.get() % 1000 == 0) {
postprocessingActivity = "writing cr values to webgraph for host " + hostfinal + "postprocessed " + proccount + " from " + count + " documents; " +
(proccount.get() * 1000 / (System.currentTimeMillis() - start)) + " docs/second; " +
((System.currentTimeMillis() - start) * (count - proccount.get()) / proccount.get() / 60000) + " minutes remaining";
ConcurrentLog.info("CollectionConfiguration", postprocessingActivity);
continue processloop;
}
}
} catch (InterruptedException e) {
@ -1218,7 +1225,10 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
};
t[i.get()].start();
}
for (int i = 0; i < t.length; i++) try {t[i].join();} catch (InterruptedException e) {}
for (int i = 0; i < t.length; i++) try {
t[i].join(10000);
if (t[i].isAlive()) t[i].interrupt();
} catch (InterruptedException e) {}
if (count != proccount.get()) ConcurrentLog.warn("CollectionConfiguration", "ambiguous webgraph document count for host " + host + ": expected=" + count + ", counted=" + proccount);
}

Loading…
Cancel
Save