diff --git a/source/net/yacy/cora/federate/solr/SchemaConfiguration.java b/source/net/yacy/cora/federate/solr/SchemaConfiguration.java index 6035fb139..4900086d1 100644 --- a/source/net/yacy/cora/federate/solr/SchemaConfiguration.java +++ b/source/net/yacy/cora/federate/solr/SchemaConfiguration.java @@ -28,9 +28,12 @@ import java.util.Iterator; import java.util.List; import org.apache.log4j.Logger; +import org.apache.solr.common.SolrDocument; import org.apache.solr.common.SolrInputDocument; import net.yacy.cora.storage.Configuration; +import net.yacy.kelondro.data.meta.DigestURI; +import net.yacy.search.index.Segment; public class SchemaConfiguration extends Configuration implements Serializable { @@ -66,6 +69,33 @@ public class SchemaConfiguration extends Configuration implements Serializable { } } + public boolean postprocessing_clickdepth(Segment segment, SolrDocument doc, SolrInputDocument sid, DigestURI url, SchemaDeclaration clickdepthfield) { + if (!this.contains(clickdepthfield)) return false; + // get new click depth and compare with old + Integer oldclickdepth = (Integer) doc.getFieldValue(clickdepthfield.getSolrFieldName()); + if (oldclickdepth != null && oldclickdepth.intValue() != 999) return false; // we do not want to compute that again + try { + int clickdepth = segment.getClickDepth(url); + if (oldclickdepth == null || oldclickdepth.intValue() != clickdepth) { + sid.setField(clickdepthfield.getSolrFieldName(), clickdepth); + return true; + } + } catch (IOException e) { + } + return false; + } + + public boolean postprocessing_references(Segment segment, SolrDocument doc, SolrInputDocument sid, DigestURI url, SchemaDeclaration referencesfield) { + if (!this.contains(referencesfield)) return false; + Integer oldreferences = (Integer) doc.getFieldValue(referencesfield.getSolrFieldName()); + int references = segment.urlCitation().count(url.hash()); + if (references > 0 && (oldreferences == null || oldreferences.intValue() != references)) { + sid.setField(referencesfield.getSolrFieldName(), references); + return true; + } + return false; + } + public boolean contains(SchemaDeclaration field) { return this.contains(field.name()); } diff --git a/source/net/yacy/search/Switchboard.java b/source/net/yacy/search/Switchboard.java index 8a48991e9..eb069eac2 100644 --- a/source/net/yacy/search/Switchboard.java +++ b/source/net/yacy/search/Switchboard.java @@ -2295,6 +2295,7 @@ public final class Switchboard extends serverSwitch { // execute the (post-) processing steps for all entries that have a process tag assigned if (this.crawlQueues.coreCrawlJobSize() == 0) { index.fulltext().getDefaultConfiguration().postprocessing(index); + index.fulltext().getWebgraphConfiguration().postprocessing(index); } return true; diff --git a/source/net/yacy/search/schema/CollectionConfiguration.java b/source/net/yacy/search/schema/CollectionConfiguration.java index d3d0bceda..9157a64ef 100644 --- a/source/net/yacy/search/schema/CollectionConfiguration.java +++ b/source/net/yacy/search/schema/CollectionConfiguration.java @@ -778,58 +778,38 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri BlockingQueue docs = connector.concurrentQuery(CollectionSchema.process_sxt.getSolrFieldName() + ":[* TO *]", 0, 10000, 60000, 50); SolrDocument doc; - int proccount_clickdepth = 0; - int proccount_clickdepthchange = 0; - int proccount_referencechange = 0; + int proccount = 0, proccount_clickdepthchange = 0, proccount_referencechange = 0; try { while ((doc = docs.take()) != AbstractSolrConnector.POISON_DOCUMENT) { // for each to-be-processed entry work on the process tag Collection proctags = doc.getFieldValues(CollectionSchema.process_sxt.getSolrFieldName()); for (Object tag: proctags) { - String tagname = (String) tag; - ProcessType tagtype = ProcessType.valueOf(tagname); - // switch over tag types - if (tagtype == ProcessType.CLICKDEPTH) { - //proctags.remove(tag); - if (this.contains(CollectionSchema.clickdepth_i)) { - DigestURI url; - try { - // get new click depth and compare with old - Integer oldclickdepth = (Integer) doc.getFieldValue(CollectionSchema.clickdepth_i.getSolrFieldName()); - url = new DigestURI((String) doc.getFieldValue(CollectionSchema.sku.getSolrFieldName()), ASCII.getBytes((String) doc.getFieldValue(CollectionSchema.id.getSolrFieldName()))); - int clickdepth = segment.getClickDepth(url); - if (oldclickdepth == null || oldclickdepth.intValue() != clickdepth) { - //log.logInfo("new clickdepth " + clickdepth + " for " + url.toNormalform(true)); - proccount_clickdepthchange++; - } - SolrInputDocument sid = this.toSolrInputDocument(doc); - sid.setField(CollectionSchema.clickdepth_i.getSolrFieldName(), clickdepth); - - // refresh the link count; it's 'cheap' to do this here - if (this.contains(CollectionSchema.references_i)) { - Integer oldreferences = (Integer) doc.getFieldValue(CollectionSchema.references_i.getSolrFieldName()); - int references = segment.urlCitation().count(url.hash()); - if (references > 0) { - if (oldreferences == null || oldreferences.intValue() != references) proccount_referencechange++; - sid.setField(CollectionSchema.references_i.getSolrFieldName(), references); - } - } - - // remove the processing tag - sid.removeField(CollectionSchema.process_sxt.getSolrFieldName()); - - // send back to index - connector.add(sid); - proccount_clickdepth++; - } catch (Throwable e) { - Log.logException(e); - } + try { + DigestURI url = new DigestURI((String) doc.getFieldValue(CollectionSchema.sku.getSolrFieldName()), ASCII.getBytes((String) doc.getFieldValue(CollectionSchema.id.getSolrFieldName()))); + SolrInputDocument sid = this.toSolrInputDocument(doc); + + // switch over tag types + ProcessType tagtype = ProcessType.valueOf((String) tag); + if (tagtype == ProcessType.CLICKDEPTH) { + if (postprocessing_clickdepth(segment, doc, sid, url, CollectionSchema.clickdepth_i)) proccount_clickdepthchange++; } + + // refresh the link count; it's 'cheap' to do this here + if (postprocessing_references(segment, doc, sid, url, CollectionSchema.references_i)) proccount_referencechange++; + + // all processing steps checked, remove the processing tag + sid.removeField(CollectionSchema.process_sxt.getSolrFieldName()); + + // send back to index + connector.add(sid); + proccount++; + } catch (Throwable e1) { } + } } - Log.logInfo("CollectionConfiguration", "cleanup_processing: re-calculated " + proccount_clickdepth + " new clickdepth values, " + proccount_clickdepthchange + " clickdepth values changed, " + proccount_referencechange + " reference-count values changed."); + Log.logInfo("CollectionConfiguration", "cleanup_processing: re-calculated " + proccount+ " new documents, " + proccount_clickdepthchange + " clickdepth values changed, " + proccount_referencechange + " reference-count values changed."); } catch (InterruptedException e) { } } diff --git a/source/net/yacy/search/schema/WebgraphConfiguration.java b/source/net/yacy/search/schema/WebgraphConfiguration.java index 415a34da7..3fc706db1 100644 --- a/source/net/yacy/search/schema/WebgraphConfiguration.java +++ b/source/net/yacy/search/schema/WebgraphConfiguration.java @@ -28,6 +28,7 @@ import java.io.File; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; +import java.util.Collection; import java.util.Date; import java.util.Iterator; import java.util.LinkedHashSet; @@ -35,6 +36,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.concurrent.BlockingQueue; import org.apache.solr.common.SolrDocument; import org.apache.solr.common.SolrInputDocument; @@ -43,6 +45,8 @@ import net.yacy.cora.document.ASCII; import net.yacy.cora.federate.solr.ProcessType; import net.yacy.cora.federate.solr.SchemaConfiguration; import net.yacy.cora.federate.solr.SchemaDeclaration; +import net.yacy.cora.federate.solr.connector.AbstractSolrConnector; +import net.yacy.cora.federate.solr.connector.SolrConnector; import net.yacy.cora.protocol.Domains; import net.yacy.cora.protocol.ResponseHeader; import net.yacy.cora.util.CommonPattern; @@ -51,6 +55,7 @@ import net.yacy.kelondro.data.citation.CitationReference; import net.yacy.kelondro.data.meta.DigestURI; import net.yacy.kelondro.logging.Log; import net.yacy.kelondro.rwi.IndexCell; +import net.yacy.search.index.Segment; public class WebgraphConfiguration extends SchemaConfiguration implements Serializable { @@ -196,9 +201,11 @@ public class WebgraphConfiguration extends SchemaConfiguration implements Serial add(edge, WebgraphSchema.source_path_folders_count_i, paths.length); add(edge, WebgraphSchema.source_path_folders_sxt, paths); } - add(edge, WebgraphSchema.source_clickdepth_i, clickdepth_source); - if (clickdepth_source < 0 || clickdepth_source > 1) processTypes.add(ProcessType.CLICKDEPTH); - + if (this.contains(WebgraphSchema.source_protocol_s) && this.contains(WebgraphSchema.source_urlstub_s) && this.contains(WebgraphSchema.source_id_s)) { + add(edge, WebgraphSchema.source_clickdepth_i, clickdepth_source); + if (clickdepth_source < 0 || clickdepth_source > 1) processTypes.add(ProcessType.CLICKDEPTH); + } + // add the source attributes about the target if (allAttr || contains(WebgraphSchema.target_inbound_b)) add(edge, WebgraphSchema.target_inbound_b, inbound); if (allAttr || contains(WebgraphSchema.target_name_t)) add(edge, WebgraphSchema.target_name_t, name.length() > 0 ? name : ""); @@ -252,14 +259,16 @@ public class WebgraphConfiguration extends SchemaConfiguration implements Serial add(edge, WebgraphSchema.target_path_folders_sxt, paths); } - if ((allAttr || contains(WebgraphSchema.target_clickdepth_i)) && citations != null) { - if (target_url.probablyRootURL()) { - boolean lc = this.lazy; this.lazy = false; - add(edge, WebgraphSchema.target_clickdepth_i, 0); - this.lazy = lc; - } else { - add(edge, WebgraphSchema.target_clickdepth_i, 999); - processTypes.add(ProcessType.CLICKDEPTH); // postprocessing needed; this is also needed if the depth is positive; there could be a shortcut + if (this.contains(WebgraphSchema.target_protocol_s) && this.contains(WebgraphSchema.target_urlstub_s) && this.contains(WebgraphSchema.target_id_s)) { + if ((allAttr || contains(WebgraphSchema.target_clickdepth_i)) && citations != null) { + if (target_url.probablyRootURL()) { + boolean lc = this.lazy; this.lazy = false; + add(edge, WebgraphSchema.target_clickdepth_i, 0); + this.lazy = lc; + } else { + add(edge, WebgraphSchema.target_clickdepth_i, 999); + processTypes.add(ProcessType.CLICKDEPTH); // postprocessing needed; this is also needed if the depth is positive; there could be a shortcut + } } } @@ -273,6 +282,63 @@ public class WebgraphConfiguration extends SchemaConfiguration implements Serial subgraph.edges.add(edge); } } + + public void postprocessing(Segment segment) { + if (!this.contains(WebgraphSchema.process_sxt)) return; + if (!segment.connectedCitation()) return; + SolrConnector connector = segment.fulltext().getWebgraphConnector(); + // that means we must search for those entries. + connector.commit(true); // make sure that we have latest information that can be found + //BlockingQueue docs = index.fulltext().getSolr().concurrentQuery("*:*", 0, 1000, 60000, 10); + BlockingQueue docs = connector.concurrentQuery(WebgraphSchema.process_sxt.getSolrFieldName() + ":[* TO *]", 0, 100000, 60000, 50); + + SolrDocument doc; + String protocol, urlstub, id; + DigestURI url; + int proccount = 0, proccount_clickdepthchange = 0, proccount_referencechange = 0; + try { + while ((doc = docs.take()) != AbstractSolrConnector.POISON_DOCUMENT) { + // for each to-be-processed entry work on the process tag + Collection proctags = doc.getFieldValues(WebgraphSchema.process_sxt.getSolrFieldName()); + for (Object tag: proctags) { + + try { + SolrInputDocument sid = this.toSolrInputDocument(doc); + + // switch over tag types + ProcessType tagtype = ProcessType.valueOf((String) tag); + if (tagtype == ProcessType.CLICKDEPTH) { + if (this.contains(WebgraphSchema.source_protocol_s) && this.contains(WebgraphSchema.source_urlstub_s) && this.contains(WebgraphSchema.source_id_s)) { + protocol = (String) doc.getFieldValue(WebgraphSchema.source_protocol_s.getSolrFieldName()); + urlstub = (String) doc.getFieldValue(WebgraphSchema.source_urlstub_s.getSolrFieldName()); + id = (String) doc.getFieldValue(WebgraphSchema.source_id_s.getSolrFieldName()); + url = new DigestURI(protocol + "://" + urlstub, ASCII.getBytes(id)); + if (postprocessing_clickdepth(segment, doc, sid, url, WebgraphSchema.source_clickdepth_i)) proccount_clickdepthchange++; + } + if (this.contains(WebgraphSchema.target_protocol_s) && this.contains(WebgraphSchema.target_urlstub_s) && this.contains(WebgraphSchema.target_id_s)) { + protocol = (String) doc.getFieldValue(WebgraphSchema.target_protocol_s.getSolrFieldName()); + urlstub = (String) doc.getFieldValue(WebgraphSchema.target_urlstub_s.getSolrFieldName()); + id = (String) doc.getFieldValue(WebgraphSchema.target_id_s.getSolrFieldName()); + url = new DigestURI(protocol + "://" + urlstub, ASCII.getBytes(id)); + if (postprocessing_clickdepth(segment, doc, sid, url, WebgraphSchema.target_clickdepth_i)) proccount_clickdepthchange++; + } + } + + // all processing steps checked, remove the processing tag + sid.removeField(WebgraphSchema.process_sxt.getSolrFieldName()); + + // send back to index + connector.add(sid); + proccount++; + } catch (Throwable e1) { + } + + } + } + Log.logInfo("WebgraphConfiguration", "cleanup_processing: re-calculated " + proccount+ " new documents, " + proccount_clickdepthchange + " clickdepth values changed, " + proccount_referencechange + " reference-count values changed."); + } catch (InterruptedException e) { + } + } /** * encode a string containing attributes from anchor rel properties binary: