added the new field harvestkey_s to the collection index and the

webgraph index which is temporary filled with the crawl profile key.
This is used to select a set of documents for post-processing as soon as
a crawl is finished. Now the postprocessing for a specific crawl is
started when that specific crawl is finished and not at the end of all
post-processing steps.
pull/1/head
Michael Peter Christen 11 years ago
parent 14442efa6d
commit 4f83d5f18c

@ -90,6 +90,9 @@ clickdepth_i
## needed (post-)processing steps on this metadata set
process_sxt
## key from a harvest process (i.e. the crawl profile hash key) which is needed for near-realtime postprocessing. This shall be deleted as soon as postprocessing has been terminated.
harvestkey_s
### optional but highly recommended values, part of the index distribution process

@ -26,6 +26,9 @@ collection_sxt
## needed (post-)processing steps on this metadata set, used i.e. for clickdepth-computation.
#process_sxt
## key from a harvest process (i.e. the crawl profile hash key) which is needed for near-realtime postprocessing. This shall be deleted as soon as postprocessing has been terminated.
harvestkey_s
##

@ -148,7 +148,7 @@ public class CrawlProfile extends ConcurrentHashMap<String, String> implements M
}
if (name.length() > 256) name = name.substring(256);
this.doms = new ConcurrentHashMap<String, AtomicInteger>();
final String handle = Base64Order.enhancedCoder.encode(Digest.encodeMD5Raw(name)).substring(0, Word.commonHashLength);
final String handle = Base64Order.enhancedCoder.encode(Digest.encodeMD5Raw(name + crawlerUrlMustMatch + depth + crawlerUrlMustNotMatch + domMaxPages)).substring(0, Word.commonHashLength);
put(HANDLE, handle);
put(NAME, name);
put(AGENT_NAME, userAgentName);

@ -2135,8 +2135,17 @@ public final class Switchboard extends serverSwitch {
Set<String> deletionCandidates = this.crawler.getFinishesProfiles(this.crawlQueues);
int cleanup = deletionCandidates.size();
if (cleanup > 0) {
// run postprocessing on these profiles
postprocessingRunning = true;
int proccount = 0;
for (String profileHash: deletionCandidates) {
proccount += index.fulltext().getDefaultConfiguration().postprocessing(index, profileHash);
proccount += index.fulltext().getWebgraphConfiguration().postprocessing(index, profileHash);
}
postprocessingRunning = false;
this.crawler.cleanProfiles(deletionCandidates);
log.info("cleanup removed " + cleanup + " crawl profiles");
log.info("cleanup removed " + cleanup + " crawl profiles, post-processed " + proccount + " documents");
}
}
@ -2277,8 +2286,8 @@ public final class Switchboard extends serverSwitch {
if (this.crawlQueues.noticeURL.isEmpty()) this.crawlQueues.noticeURL.clear(); // flushes more caches
postprocessingRunning = true;
int proccount = 0;
proccount += index.fulltext().getDefaultConfiguration().postprocessing(index);
proccount += index.fulltext().getWebgraphConfiguration().postprocessing(index);
proccount += index.fulltext().getDefaultConfiguration().postprocessing(index, null);
proccount += index.fulltext().getWebgraphConfiguration().postprocessing(index, null);
long idleSearch = System.currentTimeMillis() - this.localSearchLastAccess;
long idleAdmin = System.currentTimeMillis() - this.adminAuthenticationLastAccess;
long deltaOptimize = System.currentTimeMillis() - this.optimizeLastRun;
@ -2665,18 +2674,28 @@ public final class Switchboard extends serverSwitch {
// the condenser may be null in case that an indexing is not wanted (there may be a no-indexing flag in the file)
if ( in.condenser != null ) {
for ( int i = 0; i < in.documents.length; i++ ) {
CrawlProfile profile = in.queueEntry.profile();
storeDocumentIndex(
in.queueEntry,
in.queueEntry.profile().collections(),
in.documents[i],
in.condenser[i],
null,
"crawler/indexing queue");
profile == null ? "crawler" : profile.handle());
}
}
in.queueEntry.updateStatus(Response.QUEUE_STATE_FINISHED);
}
/**
*
* @param queueEntry
* @param collections
* @param document
* @param condenser
* @param searchEvent
* @param sourceName if this document was created by a crawl, then the sourceName contains the crawl hash
*/
private void storeDocumentIndex(
final Response queueEntry,
final Map<String, Pattern> collections,

@ -599,7 +599,7 @@ public class Segment {
final Document document,
final Condenser condenser,
final SearchEvent searchEvent,
final String sourceName,
final String sourceName, // contains the crawl profile hash if this comes from a web crawl
final boolean storeToRWI
) {
final long startTime = System.currentTimeMillis();
@ -619,7 +619,7 @@ public class Segment {
char docType = Response.docType(document.dc_format());
// CREATE SOLR DOCUMENT
final CollectionConfiguration.SolrVector vector = this.fulltext.getDefaultConfiguration().yacy2solr(collections, responseHeader, document, condenser, referrerURL, language, urlCitationIndex, this.fulltext.getWebgraphConfiguration());
final CollectionConfiguration.SolrVector vector = this.fulltext.getDefaultConfiguration().yacy2solr(collections, responseHeader, document, condenser, referrerURL, language, urlCitationIndex, this.fulltext.getWebgraphConfiguration(), sourceName);
// ENRICH DOCUMENT WITH RANKING INFORMATION
if (this.connectedCitation()) {

@ -360,7 +360,7 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
final Map<String, Pattern> collections, final ResponseHeader responseHeader,
final Document document, final Condenser condenser, final DigestURL referrerURL, final String language,
final IndexCell<CitationReference> citations,
final WebgraphConfiguration webgraph) {
final WebgraphConfiguration webgraph, final String sourceName) {
// we use the SolrCell design as index schema
SolrVector doc = new SolrVector();
final DigestURL digestURL = document.dc_source();
@ -822,7 +822,7 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
// create a subgraph
if (!containsCanonical) {
// a document with canonical tag should not get a webgraph relation, because that belongs to the canonical document
webgraph.addEdges(subgraph, digestURL, responseHeader, collections, clickdepth, images, true, document.getAnchors(), citations);
webgraph.addEdges(subgraph, digestURL, responseHeader, collections, clickdepth, images, true, document.getAnchors(), citations, sourceName);
}
// list all links
@ -871,6 +871,9 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
List<String> p = new ArrayList<String>();
for (ProcessType t: processTypes) p.add(t.name());
add(doc, CollectionSchema.process_sxt, p);
if (allAttr || contains(CollectionSchema.harvestkey_s)) {
add(doc, CollectionSchema.harvestkey_s, sourceName);
}
}
return doc;
}
@ -882,7 +885,7 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
* @param urlCitation
* @return
*/
public int postprocessing(final Segment segment) {
public int postprocessing(final Segment segment, String harvestkey) {
if (!this.contains(CollectionSchema.process_sxt)) return 0;
if (!segment.connectedCitation()) return 0;
SolrConnector connector = segment.fulltext().getDefaultConnector();
@ -891,7 +894,10 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
Map<byte[], CRV> ranking = new TreeMap<byte[], CRV>(Base64Order.enhancedCoder);
try {
// collect hosts from index which shall take part in citation computation
ReversibleScoreMap<String> hostscore = connector.getFacets(CollectionSchema.process_sxt.getSolrFieldName() + ":" + ProcessType.CITATION.toString(), 10000, CollectionSchema.host_s.getSolrFieldName()).get(CollectionSchema.host_s.getSolrFieldName());
ReversibleScoreMap<String> hostscore = connector.getFacets(
(harvestkey == null ? "" : CollectionSchema.harvestkey_s.getSolrFieldName() + ":\"" + harvestkey + "\" AND ") +
CollectionSchema.process_sxt.getSolrFieldName() + ":" + ProcessType.CITATION.toString(),
10000, CollectionSchema.host_s.getSolrFieldName()).get(CollectionSchema.host_s.getSolrFieldName());
if (hostscore == null) hostscore = new ClusteredScoreMap<String>();
// for each host, do a citation rank computation
for (String host: hostscore.keyList(true)) {
@ -912,7 +918,10 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
}
// process all documents
BlockingQueue<SolrDocument> docs = connector.concurrentDocumentsByQuery(CollectionSchema.process_sxt.getSolrFieldName() + ":[* TO *]", 0, 10000, 60000, 50);
BlockingQueue<SolrDocument> docs = connector.concurrentDocumentsByQuery(
(harvestkey == null ? "" : CollectionSchema.harvestkey_s.getSolrFieldName() + ":\"" + harvestkey + "\" AND ") +
CollectionSchema.process_sxt.getSolrFieldName() + ":[* TO *]",
0, 10000, 60000, 50);
SolrDocument doc;
int proccount = 0, proccount_clickdepthchange = 0, proccount_referencechange = 0, proccount_citationchange = 0, proccount_uniquechange = 0;
Map<String, Long> hostExtentCache = new HashMap<String, Long>(); // a mapping from the host id to the number of documents which contain this host-id
@ -961,8 +970,9 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
}
if (postprocessing_references(rrCache, doc, sid, url, hostExtentCache)) proccount_referencechange++;
// all processing steps checked, remove the processing tag
// all processing steps checked, remove the processing and harvesting key
sid.removeField(CollectionSchema.process_sxt.getSolrFieldName());
sid.removeField(CollectionSchema.harvestkey_s.getSolrFieldName());
// send back to index
//connector.deleteById(ASCII.String(id));

@ -59,6 +59,7 @@ public enum CollectionSchema implements SchemaDeclaration {
references_exthosts_i(SolrType.num_integer, true, true, false, false, false, "number of external hosts which provide http references"),
clickdepth_i(SolrType.num_integer, true, true, false, false, false, "depth of web page according to number of clicks from the 'main' page, which is the page that appears if only the host is entered as url"),
process_sxt(SolrType.string, true, true, true, false, false, "needed (post-)processing steps on this metadata set"),
harvestkey_s(SolrType.string, true, true, false, false, false, "key from a harvest process (i.e. the crawl profile hash key) which is needed for near-realtime postprocessing. This shall be deleted as soon as postprocessing has been terminated."),
// optional but recommended, part of index distribution
load_date_dt(SolrType.date, true, true, false, false, false, "time when resource was loaded"),
@ -231,6 +232,23 @@ public enum CollectionSchema implements SchemaDeclaration {
this.omitNorms = omitNorms;
this.searchable = searchable;
this.comment = comment;
// verify our naming scheme
String name = this.name();
int p = name.indexOf('_');
if (p > 0) {
String ext = name.substring(p + 1);
assert !ext.equals("i") || (type == SolrType.num_integer && !multiValued) : name;
assert !ext.equals("l") || (type == SolrType.num_long && !multiValued) : name;
assert !ext.equals("b") || (type == SolrType.bool && !multiValued) : name;
assert !ext.equals("s") || (type == SolrType.string && !multiValued) : name;
assert !ext.equals("sxt") || (type == SolrType.string && multiValued) : name;
assert !ext.equals("dt") || (type == SolrType.date && !multiValued) : name;
assert !ext.equals("t") || (type == SolrType.text_general && !multiValued) : name;
assert !ext.equals("coordinate") || (type == SolrType.coordinate && !multiValued) : name;
assert !ext.equals("txt") || (type == SolrType.text_general && multiValued) : name;
assert !ext.equals("val") || (type == SolrType.num_integer && multiValued) : name;
assert !ext.equals("d") || (type == SolrType.num_double && !multiValued) : name;
}
assert type.appropriateName(this) : "bad configuration: " + this.name();
}

@ -117,7 +117,7 @@ public class WebgraphConfiguration extends SchemaConfiguration implements Serial
final Subgraph subgraph,
final DigestURL source, final ResponseHeader responseHeader, Map<String, Pattern> collections, int clickdepth_source,
final List<ImageEntry> images, final boolean inbound, final Collection<AnchorURL> links,
final IndexCell<CitationReference> citations) {
final IndexCell<CitationReference> citations, final String sourceName) {
boolean allAttr = this.isEmpty();
int target_order = 0;
boolean generalNofollow = responseHeader.get("X-Robots-Tag", "").indexOf("nofollow") >= 0;
@ -284,6 +284,9 @@ public class WebgraphConfiguration extends SchemaConfiguration implements Serial
List<String> pr = new ArrayList<String>();
for (ProcessType t: processTypes) pr.add(t.name());
add(edge, WebgraphSchema.process_sxt, pr);
if (allAttr || contains(CollectionSchema.harvestkey_s)) {
add(edge, CollectionSchema.harvestkey_s, sourceName);
}
}
// add the edge to the subgraph
@ -291,7 +294,7 @@ public class WebgraphConfiguration extends SchemaConfiguration implements Serial
}
}
public int postprocessing(Segment segment) {
public int postprocessing(final Segment segment, final String harvestkey) {
if (!this.contains(WebgraphSchema.process_sxt)) return 0;
if (!segment.connectedCitation()) return 0;
if (!segment.fulltext().writeToWebgraph()) return 0;
@ -299,7 +302,10 @@ public class WebgraphConfiguration extends SchemaConfiguration implements Serial
// that means we must search for those entries.
connector.commit(true); // make sure that we have latest information that can be found
//BlockingQueue<SolrDocument> docs = index.fulltext().getSolr().concurrentQuery("*:*", 0, 1000, 60000, 10);
BlockingQueue<SolrDocument> docs = connector.concurrentDocumentsByQuery(WebgraphSchema.process_sxt.getSolrFieldName() + ":[* TO *]", 0, 100000, 60000, 50);
BlockingQueue<SolrDocument> docs = connector.concurrentDocumentsByQuery(
(harvestkey == null ? "" : CollectionSchema.harvestkey_s.getSolrFieldName() + ":\"" + harvestkey + "\" AND ") +
WebgraphSchema.process_sxt.getSolrFieldName() + ":[* TO *]",
0, 100000, 60000, 50);
SolrDocument doc;
String protocol, urlstub, id;
@ -335,6 +341,7 @@ public class WebgraphConfiguration extends SchemaConfiguration implements Serial
// all processing steps checked, remove the processing tag
sid.removeField(WebgraphSchema.process_sxt.getSolrFieldName());
sid.removeField(WebgraphSchema.harvestkey_s.getSolrFieldName());
// send back to index
connector.add(sid);

@ -36,6 +36,7 @@ public enum WebgraphSchema implements SchemaDeclaration {
load_date_dt(SolrType.date, true, true, false, false, false, "time when resource was loaded"),
collection_sxt(SolrType.string, true, true, true, false, false, "tags that are attached to crawls/index generation to separate the search result into user-defined subsets"),
process_sxt(SolrType.string, true, true, true, false, false, "needed (post-)processing steps on this metadata set, used i.e. for clickdepth-computation."),
harvestkey_s(SolrType.string, true, true, false, false, false, "key from a harvest process (i.e. the crawl profile hash key) which is needed for near-realtime postprocessing. This shall be deleted as soon as postprocessing has been terminated."),
// source information
source_id_s(SolrType.string, true, true, false, false, false, "primary key of document, the URL hash (source)"),
@ -114,6 +115,23 @@ public enum WebgraphSchema implements SchemaDeclaration {
this.omitNorms = omitNorms;
this.searchable = searchable;
this.comment = comment;
// verify our naming scheme
String name = this.name();
int p = name.indexOf('_');
if (p > 0) {
String ext = name.substring(p + 1);
assert !ext.equals("i") || (type == SolrType.num_integer && !multiValued) : name;
assert !ext.equals("l") || (type == SolrType.num_long && !multiValued) : name;
assert !ext.equals("b") || (type == SolrType.bool && !multiValued) : name;
assert !ext.equals("s") || (type == SolrType.string && !multiValued) : name;
assert !ext.equals("sxt") || (type == SolrType.string && multiValued) : name;
assert !ext.equals("dt") || (type == SolrType.date && !multiValued) : name;
assert !ext.equals("t") || (type == SolrType.text_general && !multiValued) : name;
assert !ext.equals("coordinate") || (type == SolrType.coordinate && !multiValued) : name;
assert !ext.equals("txt") || (type == SolrType.text_general && multiValued) : name;
assert !ext.equals("val") || (type == SolrType.num_integer && multiValued) : name;
assert !ext.equals("d") || (type == SolrType.num_double && !multiValued) : name;
}
assert type.appropriateName(this) : "bad configuration: " + this.name();
}

Loading…
Cancel
Save