diff --git a/defaults/solr.collection.schema b/defaults/solr.collection.schema index a8e814f93..8382892ed 100644 --- a/defaults/solr.collection.schema +++ b/defaults/solr.collection.schema @@ -90,6 +90,9 @@ clickdepth_i ## needed (post-)processing steps on this metadata set process_sxt +## key from a harvest process (i.e. the crawl profile hash key) which is needed for near-realtime postprocessing. This shall be deleted as soon as postprocessing has been terminated. +harvestkey_s + ### optional but highly recommended values, part of the index distribution process diff --git a/defaults/solr.webgraph.schema b/defaults/solr.webgraph.schema index 25afe61b3..ceae4cd87 100644 --- a/defaults/solr.webgraph.schema +++ b/defaults/solr.webgraph.schema @@ -26,6 +26,9 @@ collection_sxt ## needed (post-)processing steps on this metadata set, used i.e. for clickdepth-computation. #process_sxt + +## key from a harvest process (i.e. the crawl profile hash key) which is needed for near-realtime postprocessing. This shall be deleted as soon as postprocessing has been terminated. +harvestkey_s ## diff --git a/source/net/yacy/crawler/data/CrawlProfile.java b/source/net/yacy/crawler/data/CrawlProfile.java index 03efd21b9..b393c08fe 100644 --- a/source/net/yacy/crawler/data/CrawlProfile.java +++ b/source/net/yacy/crawler/data/CrawlProfile.java @@ -148,7 +148,7 @@ public class CrawlProfile extends ConcurrentHashMap implements M } if (name.length() > 256) name = name.substring(256); this.doms = new ConcurrentHashMap(); - final String handle = Base64Order.enhancedCoder.encode(Digest.encodeMD5Raw(name)).substring(0, Word.commonHashLength); + final String handle = Base64Order.enhancedCoder.encode(Digest.encodeMD5Raw(name + crawlerUrlMustMatch + depth + crawlerUrlMustNotMatch + domMaxPages)).substring(0, Word.commonHashLength); put(HANDLE, handle); put(NAME, name); put(AGENT_NAME, userAgentName); diff --git a/source/net/yacy/search/Switchboard.java b/source/net/yacy/search/Switchboard.java index 36f4930bc..f0debff28 100644 --- a/source/net/yacy/search/Switchboard.java +++ b/source/net/yacy/search/Switchboard.java @@ -2135,8 +2135,17 @@ public final class Switchboard extends serverSwitch { Set deletionCandidates = this.crawler.getFinishesProfiles(this.crawlQueues); int cleanup = deletionCandidates.size(); if (cleanup > 0) { + // run postprocessing on these profiles + postprocessingRunning = true; + int proccount = 0; + for (String profileHash: deletionCandidates) { + proccount += index.fulltext().getDefaultConfiguration().postprocessing(index, profileHash); + proccount += index.fulltext().getWebgraphConfiguration().postprocessing(index, profileHash); + } + postprocessingRunning = false; + this.crawler.cleanProfiles(deletionCandidates); - log.info("cleanup removed " + cleanup + " crawl profiles"); + log.info("cleanup removed " + cleanup + " crawl profiles, post-processed " + proccount + " documents"); } } @@ -2277,8 +2286,8 @@ public final class Switchboard extends serverSwitch { if (this.crawlQueues.noticeURL.isEmpty()) this.crawlQueues.noticeURL.clear(); // flushes more caches postprocessingRunning = true; int proccount = 0; - proccount += index.fulltext().getDefaultConfiguration().postprocessing(index); - proccount += index.fulltext().getWebgraphConfiguration().postprocessing(index); + proccount += index.fulltext().getDefaultConfiguration().postprocessing(index, null); + proccount += index.fulltext().getWebgraphConfiguration().postprocessing(index, null); long idleSearch = System.currentTimeMillis() - this.localSearchLastAccess; long idleAdmin = System.currentTimeMillis() - this.adminAuthenticationLastAccess; long deltaOptimize = System.currentTimeMillis() - this.optimizeLastRun; @@ -2665,18 +2674,28 @@ public final class Switchboard extends serverSwitch { // the condenser may be null in case that an indexing is not wanted (there may be a no-indexing flag in the file) if ( in.condenser != null ) { for ( int i = 0; i < in.documents.length; i++ ) { + CrawlProfile profile = in.queueEntry.profile(); storeDocumentIndex( in.queueEntry, in.queueEntry.profile().collections(), in.documents[i], in.condenser[i], null, - "crawler/indexing queue"); + profile == null ? "crawler" : profile.handle()); } } in.queueEntry.updateStatus(Response.QUEUE_STATE_FINISHED); } + /** + * + * @param queueEntry + * @param collections + * @param document + * @param condenser + * @param searchEvent + * @param sourceName if this document was created by a crawl, then the sourceName contains the crawl hash + */ private void storeDocumentIndex( final Response queueEntry, final Map collections, diff --git a/source/net/yacy/search/index/Segment.java b/source/net/yacy/search/index/Segment.java index 548b513af..6ebb21dc1 100644 --- a/source/net/yacy/search/index/Segment.java +++ b/source/net/yacy/search/index/Segment.java @@ -599,7 +599,7 @@ public class Segment { final Document document, final Condenser condenser, final SearchEvent searchEvent, - final String sourceName, + final String sourceName, // contains the crawl profile hash if this comes from a web crawl final boolean storeToRWI ) { final long startTime = System.currentTimeMillis(); @@ -619,7 +619,7 @@ public class Segment { char docType = Response.docType(document.dc_format()); // CREATE SOLR DOCUMENT - final CollectionConfiguration.SolrVector vector = this.fulltext.getDefaultConfiguration().yacy2solr(collections, responseHeader, document, condenser, referrerURL, language, urlCitationIndex, this.fulltext.getWebgraphConfiguration()); + final CollectionConfiguration.SolrVector vector = this.fulltext.getDefaultConfiguration().yacy2solr(collections, responseHeader, document, condenser, referrerURL, language, urlCitationIndex, this.fulltext.getWebgraphConfiguration(), sourceName); // ENRICH DOCUMENT WITH RANKING INFORMATION if (this.connectedCitation()) { diff --git a/source/net/yacy/search/schema/CollectionConfiguration.java b/source/net/yacy/search/schema/CollectionConfiguration.java index b78400f05..4a978f067 100644 --- a/source/net/yacy/search/schema/CollectionConfiguration.java +++ b/source/net/yacy/search/schema/CollectionConfiguration.java @@ -360,7 +360,7 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri final Map collections, final ResponseHeader responseHeader, final Document document, final Condenser condenser, final DigestURL referrerURL, final String language, final IndexCell citations, - final WebgraphConfiguration webgraph) { + final WebgraphConfiguration webgraph, final String sourceName) { // we use the SolrCell design as index schema SolrVector doc = new SolrVector(); final DigestURL digestURL = document.dc_source(); @@ -822,7 +822,7 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri // create a subgraph if (!containsCanonical) { // a document with canonical tag should not get a webgraph relation, because that belongs to the canonical document - webgraph.addEdges(subgraph, digestURL, responseHeader, collections, clickdepth, images, true, document.getAnchors(), citations); + webgraph.addEdges(subgraph, digestURL, responseHeader, collections, clickdepth, images, true, document.getAnchors(), citations, sourceName); } // list all links @@ -871,6 +871,9 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri List p = new ArrayList(); for (ProcessType t: processTypes) p.add(t.name()); add(doc, CollectionSchema.process_sxt, p); + if (allAttr || contains(CollectionSchema.harvestkey_s)) { + add(doc, CollectionSchema.harvestkey_s, sourceName); + } } return doc; } @@ -882,7 +885,7 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri * @param urlCitation * @return */ - public int postprocessing(final Segment segment) { + public int postprocessing(final Segment segment, String harvestkey) { if (!this.contains(CollectionSchema.process_sxt)) return 0; if (!segment.connectedCitation()) return 0; SolrConnector connector = segment.fulltext().getDefaultConnector(); @@ -891,7 +894,10 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri Map ranking = new TreeMap(Base64Order.enhancedCoder); try { // collect hosts from index which shall take part in citation computation - ReversibleScoreMap hostscore = connector.getFacets(CollectionSchema.process_sxt.getSolrFieldName() + ":" + ProcessType.CITATION.toString(), 10000, CollectionSchema.host_s.getSolrFieldName()).get(CollectionSchema.host_s.getSolrFieldName()); + ReversibleScoreMap hostscore = connector.getFacets( + (harvestkey == null ? "" : CollectionSchema.harvestkey_s.getSolrFieldName() + ":\"" + harvestkey + "\" AND ") + + CollectionSchema.process_sxt.getSolrFieldName() + ":" + ProcessType.CITATION.toString(), + 10000, CollectionSchema.host_s.getSolrFieldName()).get(CollectionSchema.host_s.getSolrFieldName()); if (hostscore == null) hostscore = new ClusteredScoreMap(); // for each host, do a citation rank computation for (String host: hostscore.keyList(true)) { @@ -912,7 +918,10 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri } // process all documents - BlockingQueue docs = connector.concurrentDocumentsByQuery(CollectionSchema.process_sxt.getSolrFieldName() + ":[* TO *]", 0, 10000, 60000, 50); + BlockingQueue docs = connector.concurrentDocumentsByQuery( + (harvestkey == null ? "" : CollectionSchema.harvestkey_s.getSolrFieldName() + ":\"" + harvestkey + "\" AND ") + + CollectionSchema.process_sxt.getSolrFieldName() + ":[* TO *]", + 0, 10000, 60000, 50); SolrDocument doc; int proccount = 0, proccount_clickdepthchange = 0, proccount_referencechange = 0, proccount_citationchange = 0, proccount_uniquechange = 0; Map hostExtentCache = new HashMap(); // a mapping from the host id to the number of documents which contain this host-id @@ -961,8 +970,9 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri } if (postprocessing_references(rrCache, doc, sid, url, hostExtentCache)) proccount_referencechange++; - // all processing steps checked, remove the processing tag + // all processing steps checked, remove the processing and harvesting key sid.removeField(CollectionSchema.process_sxt.getSolrFieldName()); + sid.removeField(CollectionSchema.harvestkey_s.getSolrFieldName()); // send back to index //connector.deleteById(ASCII.String(id)); diff --git a/source/net/yacy/search/schema/CollectionSchema.java b/source/net/yacy/search/schema/CollectionSchema.java index 9b4009717..ccd75f343 100644 --- a/source/net/yacy/search/schema/CollectionSchema.java +++ b/source/net/yacy/search/schema/CollectionSchema.java @@ -59,6 +59,7 @@ public enum CollectionSchema implements SchemaDeclaration { references_exthosts_i(SolrType.num_integer, true, true, false, false, false, "number of external hosts which provide http references"), clickdepth_i(SolrType.num_integer, true, true, false, false, false, "depth of web page according to number of clicks from the 'main' page, which is the page that appears if only the host is entered as url"), process_sxt(SolrType.string, true, true, true, false, false, "needed (post-)processing steps on this metadata set"), + harvestkey_s(SolrType.string, true, true, false, false, false, "key from a harvest process (i.e. the crawl profile hash key) which is needed for near-realtime postprocessing. This shall be deleted as soon as postprocessing has been terminated."), // optional but recommended, part of index distribution load_date_dt(SolrType.date, true, true, false, false, false, "time when resource was loaded"), @@ -231,6 +232,23 @@ public enum CollectionSchema implements SchemaDeclaration { this.omitNorms = omitNorms; this.searchable = searchable; this.comment = comment; + // verify our naming scheme + String name = this.name(); + int p = name.indexOf('_'); + if (p > 0) { + String ext = name.substring(p + 1); + assert !ext.equals("i") || (type == SolrType.num_integer && !multiValued) : name; + assert !ext.equals("l") || (type == SolrType.num_long && !multiValued) : name; + assert !ext.equals("b") || (type == SolrType.bool && !multiValued) : name; + assert !ext.equals("s") || (type == SolrType.string && !multiValued) : name; + assert !ext.equals("sxt") || (type == SolrType.string && multiValued) : name; + assert !ext.equals("dt") || (type == SolrType.date && !multiValued) : name; + assert !ext.equals("t") || (type == SolrType.text_general && !multiValued) : name; + assert !ext.equals("coordinate") || (type == SolrType.coordinate && !multiValued) : name; + assert !ext.equals("txt") || (type == SolrType.text_general && multiValued) : name; + assert !ext.equals("val") || (type == SolrType.num_integer && multiValued) : name; + assert !ext.equals("d") || (type == SolrType.num_double && !multiValued) : name; + } assert type.appropriateName(this) : "bad configuration: " + this.name(); } diff --git a/source/net/yacy/search/schema/WebgraphConfiguration.java b/source/net/yacy/search/schema/WebgraphConfiguration.java index 0faa2f780..09667711f 100644 --- a/source/net/yacy/search/schema/WebgraphConfiguration.java +++ b/source/net/yacy/search/schema/WebgraphConfiguration.java @@ -117,7 +117,7 @@ public class WebgraphConfiguration extends SchemaConfiguration implements Serial final Subgraph subgraph, final DigestURL source, final ResponseHeader responseHeader, Map collections, int clickdepth_source, final List images, final boolean inbound, final Collection links, - final IndexCell citations) { + final IndexCell citations, final String sourceName) { boolean allAttr = this.isEmpty(); int target_order = 0; boolean generalNofollow = responseHeader.get("X-Robots-Tag", "").indexOf("nofollow") >= 0; @@ -284,6 +284,9 @@ public class WebgraphConfiguration extends SchemaConfiguration implements Serial List pr = new ArrayList(); for (ProcessType t: processTypes) pr.add(t.name()); add(edge, WebgraphSchema.process_sxt, pr); + if (allAttr || contains(CollectionSchema.harvestkey_s)) { + add(edge, CollectionSchema.harvestkey_s, sourceName); + } } // add the edge to the subgraph @@ -291,7 +294,7 @@ public class WebgraphConfiguration extends SchemaConfiguration implements Serial } } - public int postprocessing(Segment segment) { + public int postprocessing(final Segment segment, final String harvestkey) { if (!this.contains(WebgraphSchema.process_sxt)) return 0; if (!segment.connectedCitation()) return 0; if (!segment.fulltext().writeToWebgraph()) return 0; @@ -299,7 +302,10 @@ public class WebgraphConfiguration extends SchemaConfiguration implements Serial // that means we must search for those entries. connector.commit(true); // make sure that we have latest information that can be found //BlockingQueue docs = index.fulltext().getSolr().concurrentQuery("*:*", 0, 1000, 60000, 10); - BlockingQueue docs = connector.concurrentDocumentsByQuery(WebgraphSchema.process_sxt.getSolrFieldName() + ":[* TO *]", 0, 100000, 60000, 50); + BlockingQueue docs = connector.concurrentDocumentsByQuery( + (harvestkey == null ? "" : CollectionSchema.harvestkey_s.getSolrFieldName() + ":\"" + harvestkey + "\" AND ") + + WebgraphSchema.process_sxt.getSolrFieldName() + ":[* TO *]", + 0, 100000, 60000, 50); SolrDocument doc; String protocol, urlstub, id; @@ -335,6 +341,7 @@ public class WebgraphConfiguration extends SchemaConfiguration implements Serial // all processing steps checked, remove the processing tag sid.removeField(WebgraphSchema.process_sxt.getSolrFieldName()); + sid.removeField(WebgraphSchema.harvestkey_s.getSolrFieldName()); // send back to index connector.add(sid); diff --git a/source/net/yacy/search/schema/WebgraphSchema.java b/source/net/yacy/search/schema/WebgraphSchema.java index 096a15d9a..2cd80994c 100644 --- a/source/net/yacy/search/schema/WebgraphSchema.java +++ b/source/net/yacy/search/schema/WebgraphSchema.java @@ -36,6 +36,7 @@ public enum WebgraphSchema implements SchemaDeclaration { load_date_dt(SolrType.date, true, true, false, false, false, "time when resource was loaded"), collection_sxt(SolrType.string, true, true, true, false, false, "tags that are attached to crawls/index generation to separate the search result into user-defined subsets"), process_sxt(SolrType.string, true, true, true, false, false, "needed (post-)processing steps on this metadata set, used i.e. for clickdepth-computation."), + harvestkey_s(SolrType.string, true, true, false, false, false, "key from a harvest process (i.e. the crawl profile hash key) which is needed for near-realtime postprocessing. This shall be deleted as soon as postprocessing has been terminated."), // source information source_id_s(SolrType.string, true, true, false, false, false, "primary key of document, the URL hash (source)"), @@ -114,6 +115,23 @@ public enum WebgraphSchema implements SchemaDeclaration { this.omitNorms = omitNorms; this.searchable = searchable; this.comment = comment; + // verify our naming scheme + String name = this.name(); + int p = name.indexOf('_'); + if (p > 0) { + String ext = name.substring(p + 1); + assert !ext.equals("i") || (type == SolrType.num_integer && !multiValued) : name; + assert !ext.equals("l") || (type == SolrType.num_long && !multiValued) : name; + assert !ext.equals("b") || (type == SolrType.bool && !multiValued) : name; + assert !ext.equals("s") || (type == SolrType.string && !multiValued) : name; + assert !ext.equals("sxt") || (type == SolrType.string && multiValued) : name; + assert !ext.equals("dt") || (type == SolrType.date && !multiValued) : name; + assert !ext.equals("t") || (type == SolrType.text_general && !multiValued) : name; + assert !ext.equals("coordinate") || (type == SolrType.coordinate && !multiValued) : name; + assert !ext.equals("txt") || (type == SolrType.text_general && multiValued) : name; + assert !ext.equals("val") || (type == SolrType.num_integer && multiValued) : name; + assert !ext.equals("d") || (type == SolrType.num_double && !multiValued) : name; + } assert type.appropriateName(this) : "bad configuration: " + this.name(); }