fixes and enhancements for postprocessing

pull/1/head
Michael Peter Christen 11 years ago
parent 7c1b968378
commit d325cb8912

@ -2295,7 +2295,7 @@ public final class Switchboard extends serverSwitch {
Fulltext fulltext = index.fulltext();
CollectionConfiguration collection1Configuration = fulltext.getDefaultConfiguration();
WebgraphConfiguration webgraphConfiguration = fulltext.getWebgraphConfiguration();
if (!this.crawlJobIsPaused(SwitchboardConstants.CRAWLJOB_LOCAL_CRAWL) && MemoryControl.available() > 512L * 1024L * 1024L && Memory.load() < 2.0f) {
if (!this.crawlJobIsPaused(SwitchboardConstants.CRAWLJOB_LOCAL_CRAWL) && MemoryControl.available() > 512L * 1024L * 1024L && Memory.load() < 2.5f) {
// we optimize first because that is useful for postprocessing
int proccount = 0;

@ -936,7 +936,7 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
}
// create the ranking map
Map<byte[], CRV> ranking = null;
Map<String, CRV> rankings = null;
if ((segment.fulltext().useWebgraph() &&
((webgraph.contains(WebgraphSchema.source_id_s) && webgraph.contains(WebgraphSchema.source_cr_host_norm_i)) ||
(webgraph.contains(WebgraphSchema.target_id_s) && webgraph.contains(WebgraphSchema.target_cr_host_norm_i))) ||
@ -944,7 +944,7 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
collection.contains(CollectionSchema.cr_host_chance_d) &&
collection.contains(CollectionSchema.cr_host_norm_i)))) try {
ConcurrentLog.info("CollectionConfiguration", "collecting " + hostscore.size() + " hosts");
ranking = new TreeMap<byte[], CRV>(Base64Order.enhancedCoder);
rankings = new HashMap<String, CRV>();
int countcheck = 0;
for (String host: hostscore.keyList(true)) {
// Patch the citation index for links with canonical tags.
@ -1004,9 +1004,9 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
}
ConcurrentLog.info("CollectionConfiguration", "convergence for host " + host + " after " + convergence_attempts + " steps");
// we have now the cr for all documents of a specific host; we store them for later use
Map<byte[], CRV> crn = crh.normalize();
Map<String, CRV> crn = crh.normalize();
//crh.log(crn);
ranking.putAll(crn); // accumulate this here for usage in document update later
rankings.putAll(crn); // accumulate this here for usage in document update later
if (MemoryControl.shortStatus()) {
ConcurrentLog.warn("CollectionConfiguration", "terminated crn akkumulation during postprocessing because of short memory");
break;
@ -1032,7 +1032,7 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
for (String host: hostscore.keyList(true)) {
if (hostscore.get(host) <= 0) continue;
// select all webgraph edges and modify their cr value
query = "{!raw f=" + WebgraphSchema.source_host_s.getSolrFieldName() + "}" + host;
query = WebgraphSchema.source_host_s.getSolrFieldName() + ":\"" + host + "\" AND " + WebgraphSchema.process_sxt.getSolrFieldName() + AbstractSolrConnector.CATCHALL_DTERM;
long count = segment.fulltext().getWebgraphConnector().getCountByQuery(query);
ConcurrentLog.info("CollectionConfiguration", "collecting " + count + " documents from the webgraph");
BlockingQueue<SolrDocument> docs = segment.fulltext().getWebgraphConnector().concurrentDocumentsByQuery(query, 0, 10000000, 1800000, 100);
@ -1040,15 +1040,15 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
while ((doc = docs.take()) != AbstractSolrConnector.POISON_DOCUMENT) {
SolrInputDocument sid = webgraph.toSolrInputDocument(doc, omitFields);
if (webgraph.contains(WebgraphSchema.source_id_s) && webgraph.contains(WebgraphSchema.source_cr_host_norm_i)) {
byte[] id = ASCII.getBytes((String) doc.getFieldValue(WebgraphSchema.source_id_s.getSolrFieldName()));
CRV crv = ranking.get(id);
String id = (String) doc.getFieldValue(WebgraphSchema.source_id_s.getSolrFieldName());
CRV crv = rankings.get(id);
if (crv != null) {
sid.setField(WebgraphSchema.source_cr_host_norm_i.getSolrFieldName(), crv.crn);
}
}
if (webgraph.contains(WebgraphSchema.target_id_s) && webgraph.contains(WebgraphSchema.target_cr_host_norm_i)) {
byte[] id = ASCII.getBytes((String) doc.getFieldValue(WebgraphSchema.target_id_s.getSolrFieldName()));
CRV crv = ranking.get(id);
String id = (String) doc.getFieldValue(WebgraphSchema.target_id_s.getSolrFieldName());
CRV crv = rankings.get(id);
if (crv != null) {
sid.setField(WebgraphSchema.target_cr_host_norm_i.getSolrFieldName(), crv.crn);
}
@ -1118,7 +1118,7 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
collection.contains(CollectionSchema.cr_host_count_i) &&
collection.contains(CollectionSchema.cr_host_chance_d) &&
collection.contains(CollectionSchema.cr_host_norm_i)) {
CRV crv = ranking.get(id);
CRV crv = rankings.get(ASCII.String(id));
if (crv != null) {
sid.setField(CollectionSchema.cr_host_count_i.getSolrFieldName(), crv.count);
sid.setField(CollectionSchema.cr_host_chance_d.getSolrFieldName(), crv.cr);
@ -1199,7 +1199,7 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
*/
private static final class CRHost {
private final Segment segment;
private final Map<byte[], double[]> crt;
private final Map<String, double[]> crt;
private final int cr_host_count;
private final RowHandleMap internal_links_counter;
private double damping;
@ -1211,13 +1211,13 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
this.rrCache = rrCache;
this.converge_eq_factor = (int) Math.pow(10.0d, converge_digits);
SolrConnector connector = segment.fulltext().getDefaultConnector();
this.crt = new TreeMap<byte[], double[]>(Base64Order.enhancedCoder);
this.crt = new HashMap<String, double[]>();
try {
// select all documents for each host
BlockingQueue<String> ids = connector.concurrentIDsByQuery("{!raw f=" + CollectionSchema.host_s.getSolrFieldName() + "}" + host, 0, 10000000, 600000);
String id;
while ((id = ids.take()) != AbstractSolrConnector.POISON_ID) {
this.crt.put(ASCII.getBytes(id), new double[]{0.0d,0.0d}); //{old value, new value}
this.crt.put(id, new double[]{0.0d,0.0d}); //{old value, new value}
if (MemoryControl.shortStatus()) {
ConcurrentLog.warn("CollectionConfiguration", "terminated CRHost collection during postprocessing because of short memory");
break;
@ -1227,24 +1227,24 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
}
this.cr_host_count = this.crt.size();
double initval = 1.0d / cr_host_count;
for (Map.Entry<byte[], double[]> entry: this.crt.entrySet()) entry.getValue()[0] = initval;
for (Map.Entry<String, double[]> entry: this.crt.entrySet()) entry.getValue()[0] = initval;
this.internal_links_counter = new RowHandleMap(12, Base64Order.enhancedCoder, 8, 100, "internal_links_counter");
}
/**
* produce a map from IDs to CRV records, normalization entries containing the values that are stored to solr.
* @return
*/
public Map<byte[], CRV> normalize() {
public Map<String, CRV> normalize() {
final TreeMap<Double, List<byte[]>> reorder = new TreeMap<Double, List<byte[]>>();
for (Map.Entry<byte[], double[]> entry: this.crt.entrySet()) {
for (Map.Entry<String, double[]> entry: this.crt.entrySet()) {
Double d = entry.getValue()[0];
List<byte[]> ds = reorder.get(d);
if (ds == null) {ds = new ArrayList<byte[]>(); reorder.put(d, ds);}
ds.add(entry.getKey());
ds.add(ASCII.getBytes(entry.getKey()));
}
int nextcount = (this.cr_host_count + 1) / 2;
int nextcrn = 0;
Map<byte[], CRV> r = new TreeMap<byte[], CRV>(Base64Order.enhancedCoder);
Map<String, CRV> r = new HashMap<String, CRV>();
while (reorder.size() > 0) {
int count = nextcount;
while (reorder.size() > 0 && count > 0) {
@ -1252,14 +1252,14 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
List<byte[]> ids = next.getValue();
count -= ids.size();
double cr = next.getKey();
for (byte[] id: ids) r.put(id, new CRV(this.cr_host_count, cr, nextcrn));
for (byte[] id: ids) r.put(ASCII.String(id), new CRV(this.cr_host_count, cr, nextcrn));
}
nextcrn++;
nextcount = Math.max(1, (nextcount + count + 1) / 2);
}
// finally, increase the crn number in such a way that the maximum is always 10
int inc = 11 - nextcrn; // nextcrn is +1
for (Map.Entry<byte[], CRV> entry: r.entrySet()) entry.getValue().crn += inc;
for (Map.Entry<String, CRV> entry: r.entrySet()) entry.getValue().crn += inc;
return r;
}
/**
@ -1320,16 +1320,16 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
boolean convergence = true;
double df = (1.0d - damping) / this.cr_host_count;
try {
for (Map.Entry<byte[], double[]> entry: this.crt.entrySet()) {
byte[] id = entry.getKey();
ReferenceReport rr = this.rrCache.getReferenceReport(id, false);
for (Map.Entry<String, double[]> entry: this.crt.entrySet()) {
String id = entry.getKey();
ReferenceReport rr = this.rrCache.getReferenceReport(ASCII.getBytes(id), false);
// sum up the cr of the internal links
HandleSet iids = rr.getInternallIDs();
double ncr = 0.0d;
for (byte[] iid: iids) {
int ilc = getInternalLinks(iid);
if (ilc > 0) { // if (ilc == 0) then the reference report is wrong!
double[] d = this.crt.get(iid);
double[] d = this.crt.get(ASCII.String(iid));
// d[] could be empty at some situations
if (d != null && d.length > 0) {
ncr += d[0] / ilc;
@ -1345,7 +1345,7 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
entry.getValue()[1] = ncr;
}
// after the loop, replace the old value with the new value in crt
for (Map.Entry<byte[], double[]> entry: this.crt.entrySet()) {
for (Map.Entry<String, double[]> entry: this.crt.entrySet()) {
entry.getValue()[0] = entry.getValue()[1];
}
} catch (final IOException e) {

Loading…
Cancel
Save