enhanced and fixed postprocessing

pull/1/head
Michael Peter Christen 11 years ago
parent f86fe90eda
commit 9d5895f643

@ -35,12 +35,13 @@ import net.yacy.cora.federate.yacy.CacheStrategy;
import net.yacy.cora.order.Base64Order;
import net.yacy.cora.protocol.ClientIdentification;
import net.yacy.cora.protocol.RequestHeader;
import net.yacy.cora.storage.HandleSet;
import net.yacy.cora.util.ConcurrentLog;
import net.yacy.kelondro.data.citation.CitationReference;
import net.yacy.kelondro.rwi.IndexCell;
import net.yacy.kelondro.rwi.ReferenceContainer;
import net.yacy.cora.util.SpaceExceededException;
import net.yacy.peers.graphics.WebStructureGraph;
import net.yacy.search.Switchboard;
import net.yacy.search.index.Segment.ReferenceReport;
import net.yacy.search.index.Segment.ReferenceReportCache;
import net.yacy.server.serverObjects;
import net.yacy.server.serverSwitch;
@ -138,31 +139,28 @@ public class webstructure {
// citations
prop.put("citations", 1);
IndexCell<CitationReference> citationReferences = sb.index.urlCitation();
ReferenceContainer<CitationReference> citations = null;
// citationReferences.count(urlhash) would give to the number of references good for ranking
try {
citations = citationReferences != null ? citationReferences.get(urlhash, null) : null;
} catch (final IOException e) {
}
if (citations != null) {
ReferenceReportCache rrc = sb.index.getReferenceReportCache();
ReferenceReport rr = null;
try {rr = rrc.getReferenceReport(urlhash, true);} catch (IOException e) {}
if (rr != null && rr.getInternalCount() > 0 && rr.getExternalCount() > 0) {
prop.put("citations_count", 1);
prop.put("citations_documents", 1);
prop.put("citations_documents_0_hash", urlhash);
prop.put("citations_documents_0_count", citations.size());
prop.put("citations_documents_0_date", GenericFormatter.SHORT_DAY_FORMATTER.format(new Date(citations.lastWrote())));
prop.put("citations_documents_0_count", rr.getInternalCount() + rr.getExternalCount());
prop.put("citations_documents_0_date", GenericFormatter.SHORT_DAY_FORMATTER.format(new Date())); // superfluous?
prop.put("citations_documents_0_urle", url == null ? 0 : 1);
if (url != null) prop.putXML("citations_documents_0_urle_url", url.toNormalform(true));
int d = 0;
Iterator<CitationReference> i = citations.entries();
HandleSet ids = rr.getInternallIDs();
try {ids.putAll(rr.getExternalIDs());} catch (SpaceExceededException e) {}
Iterator<byte[]> i = ids.iterator();
while (i.hasNext()) {
CitationReference cr = i.next();
byte[] refhash = cr.urlhash();
byte[] refhash = i.next();
DigestURL refurl = authenticated ? sb.getURL(refhash) : null;
prop.put("citations_documents_0_anchors_" + d + "_urle", refurl == null ? 0 : 1);
if (refurl != null) prop.putXML("citations_documents_0_anchors_" + d + "_urle_url", refurl.toNormalform(true));
prop.put("citations_documents_0_anchors_" + d + "_urle_hash", refhash);
prop.put("citations_documents_0_anchors_" + d + "_urle_date", GenericFormatter.SHORT_DAY_FORMATTER.format(new Date(cr.lastModified())));
prop.put("citations_documents_0_anchors_" + d + "_urle_date", GenericFormatter.SHORT_DAY_FORMATTER.format(new Date())); // superfluous?
d++;
}
prop.put("citations_documents_0_count", d);

@ -41,6 +41,7 @@ import net.yacy.cora.storage.Configuration;
import net.yacy.cora.storage.HandleSet;
import net.yacy.cora.util.ConcurrentLog;
import net.yacy.search.index.Segment;
import net.yacy.search.index.Segment.ClickdepthCache;
import net.yacy.search.index.Segment.ReferenceReport;
import net.yacy.search.index.Segment.ReferenceReportCache;
import net.yacy.search.schema.CollectionSchema;
@ -178,13 +179,13 @@ public class SchemaConfiguration extends Configuration implements Serializable {
return changed;
}
public boolean postprocessing_clickdepth(Segment segment, SolrDocument doc, SolrInputDocument sid, DigestURL url, SchemaDeclaration clickdepthfield) {
public boolean postprocessing_clickdepth(ClickdepthCache clickdepthCache, SolrInputDocument sid, DigestURL url, SchemaDeclaration clickdepthfield, int maxtime) {
if (!this.contains(clickdepthfield)) return false;
// get new click depth and compare with old
Integer oldclickdepth = (Integer) doc.getFieldValue(clickdepthfield.getSolrFieldName());
Integer oldclickdepth = (Integer) sid.getFieldValue(clickdepthfield.getSolrFieldName());
if (oldclickdepth != null && oldclickdepth.intValue() != 999) return false; // we do not want to compute that again
try {
int clickdepth = segment.getClickDepth(url);
int clickdepth = clickdepthCache.getClickdepth(url, maxtime);
if (oldclickdepth == null || oldclickdepth.intValue() != clickdepth) {
sid.setField(clickdepthfield.getSolrFieldName(), clickdepth);
return true;
@ -194,15 +195,15 @@ public class SchemaConfiguration extends Configuration implements Serializable {
return false;
}
public boolean postprocessing_references(ReferenceReportCache rrCache, SolrDocument doc, SolrInputDocument sid, DigestURL url, Map<String, Long> hostExtentCount) {
public boolean postprocessing_references(ReferenceReportCache rrCache, SolrInputDocument sid, DigestURL url, Map<String, Long> hostExtentCount) {
if (!(this.contains(CollectionSchema.references_i) ||
this.contains(CollectionSchema.references_internal_i) ||
this.contains(CollectionSchema.references_external_i) || this.contains(CollectionSchema.references_exthosts_i))) return false;
Integer all_old = doc == null ? null : (Integer) doc.getFieldValue(CollectionSchema.references_i.getSolrFieldName());
Integer internal_old = doc == null ? null : (Integer) doc.getFieldValue(CollectionSchema.references_internal_i.getSolrFieldName());
Integer external_old = doc == null ? null : (Integer) doc.getFieldValue(CollectionSchema.references_external_i.getSolrFieldName());
Integer exthosts_old = doc == null ? null : (Integer) doc.getFieldValue(CollectionSchema.references_exthosts_i.getSolrFieldName());
Integer hostextc_old = doc == null ? null : (Integer) doc.getFieldValue(CollectionSchema.host_extent_i.getSolrFieldName());
Integer all_old = sid == null ? null : (Integer) sid.getFieldValue(CollectionSchema.references_i.getSolrFieldName());
Integer internal_old = sid == null ? null : (Integer) sid.getFieldValue(CollectionSchema.references_internal_i.getSolrFieldName());
Integer external_old = sid == null ? null : (Integer) sid.getFieldValue(CollectionSchema.references_external_i.getSolrFieldName());
Integer exthosts_old = sid == null ? null : (Integer) sid.getFieldValue(CollectionSchema.references_exthosts_i.getSolrFieldName());
Integer hostextc_old = sid == null ? null : (Integer) sid.getFieldValue(CollectionSchema.host_extent_i.getSolrFieldName());
try {
ReferenceReport rr = rrCache.getReferenceReport(url.hash(), false);
List<String> internalIDs = new ArrayList<String>();

@ -187,6 +187,8 @@ import net.yacy.repository.Blacklist.BlacklistType;
import net.yacy.repository.FilterEngine;
import net.yacy.repository.LoaderDispatcher;
import net.yacy.search.index.Segment;
import net.yacy.search.index.Segment.ClickdepthCache;
import net.yacy.search.index.Segment.ReferenceReportCache;
import net.yacy.search.query.AccessTracker;
import net.yacy.search.query.SearchEvent;
import net.yacy.search.query.SearchEventCache;
@ -2279,6 +2281,8 @@ public final class Switchboard extends serverSwitch {
// execute the (post-) processing steps for all entries that have a process tag assigned
if (!this.crawlJobIsPaused(SwitchboardConstants.CRAWLJOB_LOCAL_CRAWL)) {
int proccount = 0;
ReferenceReportCache rrCache = index.getReferenceReportCache();
ClickdepthCache clickdepthCache = index.getClickdepthCache(rrCache);
if (index.fulltext().getDefaultConfiguration().contains(CollectionSchema.harvestkey_s.getSolrFieldName())) {
Set<String> deletionCandidates = this.crawler.getFinishesProfiles(this.crawlQueues);
int cleanup = deletionCandidates.size();
@ -2286,8 +2290,8 @@ public final class Switchboard extends serverSwitch {
// run postprocessing on these profiles
postprocessingRunning = true;
for (String profileHash: deletionCandidates) {
proccount += index.fulltext().getDefaultConfiguration().postprocessing(index, profileHash);
proccount += index.fulltext().getWebgraphConfiguration().postprocessing(index, profileHash);
proccount += index.fulltext().getDefaultConfiguration().postprocessing(index, rrCache, clickdepthCache, profileHash);
proccount += index.fulltext().getWebgraphConfiguration().postprocessing(index, clickdepthCache, profileHash);
}
this.crawler.cleanProfiles(deletionCandidates);
@ -2297,8 +2301,8 @@ public final class Switchboard extends serverSwitch {
if (this.crawler.allCrawlsFinished(this.crawlQueues)) {
// run postprocessing on all profiles
postprocessingRunning = true;
proccount += index.fulltext().getDefaultConfiguration().postprocessing(index, null);
proccount += index.fulltext().getWebgraphConfiguration().postprocessing(index, null);
proccount += index.fulltext().getDefaultConfiguration().postprocessing(index, rrCache, clickdepthCache, null);
proccount += index.fulltext().getWebgraphConfiguration().postprocessing(index, clickdepthCache, null);
this.crawler.cleanProfiles(this.crawler.getActiveProfiles());
log.info("cleanup post-processed " + proccount + " documents");

@ -35,6 +35,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.BlockingQueue;
import java.util.regex.Pattern;
@ -51,6 +52,7 @@ import net.yacy.cora.federate.solr.connector.SolrConnector;
import net.yacy.cora.federate.yacy.CacheStrategy;
import net.yacy.cora.order.Base64Order;
import net.yacy.cora.order.ByteOrder;
import net.yacy.cora.order.NaturalOrder;
import net.yacy.cora.protocol.ClientIdentification;
import net.yacy.cora.protocol.ResponseHeader;
import net.yacy.cora.storage.HandleSet;
@ -217,49 +219,48 @@ public class Segment {
* @return the clickdepth level or 999 if the root url cannot be found or a recursion limit is reached
* @throws IOException
*/
public int getClickDepth(final DigestURL url) throws IOException {
private int getClickDepth(ReferenceReportCache rrc, final DigestURL url, int maxtime) throws IOException {
final byte[] searchhash = url.hash();
RowHandleSet rootCandidates = getPossibleRootHashes(url);
RowHandleSet ignore = new RowHandleSet(URIMetadataRow.rowdef.primaryKeyLength, URIMetadataRow.rowdef.objectOrder, 100); // a set of urlhashes to be ignored. This is generated from all hashes that are seen during recursion to prevent enless loops
RowHandleSet levelhashes = new RowHandleSet(URIMetadataRow.rowdef.primaryKeyLength, URIMetadataRow.rowdef.objectOrder, 1); // all hashes of a clickdepth. The first call contains the target hash only and therefore just one entry
try {levelhashes.put(searchhash);} catch (final SpaceExceededException e) {throw new IOException(e);}
Set<byte[]> ignore = new TreeSet<byte[]>(NaturalOrder.naturalOrder); // a set of urlhashes to be ignored. This is generated from all hashes that are seen during recursion to prevent enless loops
Set<byte[]> levelhashes = new TreeSet<byte[]>(NaturalOrder.naturalOrder); // all hashes of a clickdepth. The first call contains the target hash only and therefore just one entry
levelhashes.add(searchhash);
int leveldepth = 0; // the recursion depth and therefore the result depth-1. Shall be 0 for the first call
final byte[] hosthash = new byte[6]; // the host of the url to be checked
System.arraycopy(searchhash, 6, hosthash, 0, 6);
long timeout = System.currentTimeMillis() + 1000;
long timeout = System.currentTimeMillis() + maxtime;
mainloop: for (int maxdepth = 0; maxdepth < 6 && System.currentTimeMillis() < timeout; maxdepth++) {
RowHandleSet checknext = new RowHandleSet(URIMetadataRow.rowdef.primaryKeyLength, URIMetadataRow.rowdef.objectOrder, 100);
Set<byte[]> checknext = new TreeSet<byte[]>(NaturalOrder.naturalOrder);
// loop over all hashes at this clickdepth; the first call to this loop should contain only one hash and a leveldepth = 0
checkloop: for (byte[] urlhash: levelhashes) {
// get all the citations for this url and iterate
ReferenceContainer<CitationReference> references = this.urlCitationIndex.get(urlhash, null);
if (references == null || references.size() == 0) continue checkloop; // don't know
Iterator<CitationReference> i = references.entries();
ReferenceReport rr = rrc.getReferenceReport(urlhash, false);
//ReferenceContainer<CitationReference> references = this.urlCitationIndex.get(urlhash, null);
if (rr == null || rr.getInternalCount() == 0) continue checkloop; // don't know
Iterator<byte[]> i = rr.getInternallIDs().iterator();
nextloop: while (i.hasNext()) {
CitationReference ref = i.next();
if (ref == null) continue nextloop;
byte[] u = ref.urlhash();
byte[] u = i.next();
if (u == null) continue nextloop;
// check if this is from the same host
if (!ByteBuffer.equals(u, 6, hosthash, 0, 6)) continue nextloop;
assert (ByteBuffer.equals(u, 6, hosthash, 0, 6));
// check ignore
if (ignore.has(u)) continue nextloop;
if (ignore.contains(u)) continue nextloop;
// check if the url is a root url
if (rootCandidates.has(u)) {
return leveldepth + 1;
}
// step to next depth level
try {checknext.put(u);} catch (final SpaceExceededException e) {}
try {ignore.put(u);} catch (final SpaceExceededException e) {}
checknext.add(u);
ignore.add(u);
}
if (System.currentTimeMillis() > timeout) break mainloop;
}
@ -284,6 +285,7 @@ public class Segment {
rootCandidates.put(new DigestURL(rootStub + "/default.htm").hash());
rootCandidates.put(new DigestURL(rootStub + "/default.html").hash());
rootCandidates.put(new DigestURL(rootStub + "/default.php").hash());
rootCandidates.optimize();
} catch (final Throwable e) {}
return rootCandidates;
}
@ -311,6 +313,30 @@ public class Segment {
}
}
public ClickdepthCache getClickdepthCache(ReferenceReportCache rrc) {
return new ClickdepthCache(rrc);
}
public class ClickdepthCache {
ReferenceReportCache rrc;
Map<byte[], Integer> cache;
public ClickdepthCache(ReferenceReportCache rrc) {
this.rrc = rrc;
this.cache = new TreeMap<byte[], Integer>(Base64Order.enhancedCoder);
}
public int getClickdepth(final DigestURL url, int maxtime) throws IOException {
Integer clickdepth = cache.get(url.hash());
if (clickdepth != null) {
//ConcurrentLog.info("Segment", "get clickdepth of url " + url.toNormalform(true) + ": " + clickdepth + " CACHE HIT");
return clickdepth.intValue();
}
clickdepth = Segment.this.getClickDepth(this.rrc, url, maxtime);
//ConcurrentLog.info("Segment", "get clickdepth of url " + url.toNormalform(true) + ": " + clickdepth);
this.cache.put(url.hash(), clickdepth);
return clickdepth.intValue();
}
}
/**
* A ReferenceReport object is a container for all referenced to a specific url.
* The class stores the number of links from domain-internal and domain-external backlinks,
@ -326,12 +352,29 @@ public class Segment {
this.externalHosts = new RowHandleSet(6, Base64Order.enhancedCoder, 0);
this.internalIDs = new RowHandleSet(12, Base64Order.enhancedCoder, 0);
this.externalIDs = new RowHandleSet(12, Base64Order.enhancedCoder, 0);
boolean useWebgraph = Segment.this.fulltext.writeToWebgraph();
if (useWebgraph) {
if (connectedCitation()) {
// read the references from the citation index
ReferenceContainer<CitationReference> references;
references = urlCitation().get(id, null);
if (references == null) return; // no references at all
Iterator<CitationReference> ri = references.entries();
while (ri.hasNext()) {
CitationReference ref = ri.next();
byte[] hh = ref.hosthash(); // host hash
if (ByteBuffer.equals(hh, 0, id, 6, 6)) {
internalIDs.put(ref.urlhash());
internal++;
} else {
externalHosts.put(hh);
externalIDs.put(ref.urlhash());
external++;
}
}
}
if ((internalIDs.size() == 0 || !connectedCitation()) && Segment.this.fulltext.writeToWebgraph()) {
// reqd the references from the webgraph
SolrConnector webgraph = Segment.this.fulltext.getWebgraphConnector();
webgraph.commit(true);
BlockingQueue<SolrDocument> docs = webgraph.concurrentDocumentsByQuery(WebgraphSchema.target_id_s.getSolrFieldName() + ":\"" + ASCII.String(id) + "\"", 0, 10000000, 600000, 100, WebgraphSchema.source_id_s.getSolrFieldName());
BlockingQueue<SolrDocument> docs = webgraph.concurrentDocumentsByQuery("{!raw f=" + WebgraphSchema.target_id_s.getSolrFieldName() + "}" + ASCII.String(id), 0, 10000000, 1000, 100, WebgraphSchema.source_id_s.getSolrFieldName());
SolrDocument doc;
try {
while ((doc = docs.take()) != AbstractSolrConnector.POISON_DOCUMENT) {
@ -355,25 +398,6 @@ public class Segment {
ConcurrentLog.logException(e);
}
}
if ((!useWebgraph || (internalIDs.size() == 0 && externalIDs.size() == 0)) && connectedCitation()) {
// read the references from the citation index
ReferenceContainer<CitationReference> references;
references = urlCitation().get(id, null);
if (references == null) return; // no references at all
Iterator<CitationReference> ri = references.entries();
while (ri.hasNext()) {
CitationReference ref = ri.next();
byte[] hh = ref.hosthash(); // host hash
if (ByteBuffer.equals(hh, 0, id, 6, 6)) {
internalIDs.put(ref.urlhash());
internal++;
} else {
externalHosts.put(hh);
externalIDs.put(ref.urlhash());
external++;
}
}
}
}
public int getInternalCount() {
return this.internal;
@ -627,7 +651,7 @@ public class Segment {
// ENRICH DOCUMENT WITH RANKING INFORMATION
if (this.connectedCitation()) {
this.fulltext.getDefaultConfiguration().postprocessing_references(this.getReferenceReportCache(), null, vector, url, null);
this.fulltext.getDefaultConfiguration().postprocessing_references(this.getReferenceReportCache(), vector, url, null);
}
// STORE TO SOLR
String error = null;

@ -78,6 +78,7 @@ import net.yacy.kelondro.index.RowHandleMap;
import net.yacy.kelondro.rwi.ReferenceContainer;
import net.yacy.kelondro.util.Bitfield;
import net.yacy.search.index.Segment;
import net.yacy.search.index.Segment.ClickdepthCache;
import net.yacy.search.index.Segment.ReferenceReport;
import net.yacy.search.index.Segment.ReferenceReportCache;
import net.yacy.search.query.QueryParams;
@ -884,13 +885,13 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
* @param urlCitation
* @return
*/
public int postprocessing(final Segment segment, String harvestkey) {
public int postprocessing(final Segment segment, ReferenceReportCache rrCache, ClickdepthCache clickdepthCache, String harvestkey) {
if (!this.contains(CollectionSchema.process_sxt)) return 0;
if (!segment.connectedCitation()) return 0;
if (!segment.connectedCitation() && !segment.fulltext().writeToWebgraph()) return 0;
SolrConnector collectionConnector = segment.fulltext().getDefaultConnector();
SolrConnector webgraphConnector = segment.fulltext().getWebgraphConnector();
collectionConnector.commit(true); // make sure that we have latest information that can be found
ReferenceReportCache rrCache = segment.getReferenceReportCache();
if (webgraphConnector != null) webgraphConnector.commit(true);
Map<byte[], CRV> ranking = new TreeMap<byte[], CRV>(Base64Order.enhancedCoder);
ReversibleScoreMap<String> hostscore = null;
try {
@ -907,7 +908,7 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
// To do so, we first must collect all canonical links, find all references to them, get the anchor list of the documents and patch the citation reference of these links
String patchquery = CollectionSchema.host_s.getSolrFieldName() + ":" + host + " AND " + CollectionSchema.canonical_s.getSolrFieldName() + ":[* TO *]";
long patchquerycount = collectionConnector.getCountByQuery(patchquery);
BlockingQueue<SolrDocument> documents_with_canonical_tag = collectionConnector.concurrentDocumentsByQuery(patchquery, 0, 10000000, 60000L, 50,
BlockingQueue<SolrDocument> documents_with_canonical_tag = collectionConnector.concurrentDocumentsByQuery(patchquery, 0, 10000000, 600000, 100,
CollectionSchema.id.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName(), CollectionSchema.canonical_s.getSolrFieldName());
SolrDocument doc_B;
int patchquerycountcheck = 0;
@ -917,6 +918,7 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
DigestURL doc_C_url = new DigestURL((String) doc_B.getFieldValue(CollectionSchema.canonical_s.getSolrFieldName()));
byte[] doc_B_id = ASCII.getBytes(((String) doc_B.getFieldValue(CollectionSchema.id.getSolrFieldName())));
// we remove all references to B, because these become references to C
if (segment.connectedCitation()) {
ReferenceContainer<CitationReference> doc_A_ids = segment.urlCitation().remove(doc_B_id);
if (doc_A_ids == null) {
//System.out.println("*** document with canonical but no referrer: " + doc_B.getFieldValue(CollectionSchema.sku.getSolrFieldName()));
@ -928,10 +930,13 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
CitationReference doc_A_citation = doc_A_ids_iterator.next();
segment.urlCitation().add(doc_C_url.hash(), doc_A_citation);
}
}
patchquerycountcheck++;
}
} catch (InterruptedException e) {
ConcurrentLog.logException(e);
} catch (SpaceExceededException e) {
ConcurrentLog.logException(e);
}
if (patchquerycount != patchquerycountcheck) ConcurrentLog.warn("CollectionConfiguration", "ambiguous patchquery count for host " + host + ": expected=" + patchquerycount + ", counted=" + patchquerycountcheck);
@ -962,10 +967,10 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
for (String host: hostscore.keyList(true)) {
if (hostscore.get(host) <= 0) continue;
// select all webgraph edges and modify their cr value
String query = WebgraphSchema.source_host_s.getSolrFieldName() + ":\"" + host + "\"";
String query = "{!raw f=" + WebgraphSchema.source_host_s.getSolrFieldName() + "}" + host;
long count = webgraphConnector.getCountByQuery(query);
ConcurrentLog.info("CollectionConfiguration", "collecting " + count + " documents from the webgraph");
BlockingQueue<SolrDocument> docs = webgraphConnector.concurrentDocumentsByQuery(query, 0, 10000000, 60000, 50);
BlockingQueue<SolrDocument> docs = webgraphConnector.concurrentDocumentsByQuery(query, 0, 10000000, 600000, 100);
int countcheck = 0;
while ((doc = docs.take()) != AbstractSolrConnector.POISON_DOCUMENT) {
boolean changed = false;
@ -983,9 +988,13 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
changed = true;
}
if (changed) try {
sid.removeField(WebgraphSchema.process_sxt.getSolrFieldName());
sid.removeField(WebgraphSchema.harvestkey_s.getSolrFieldName());
webgraphConnector.add(sid);
} catch (SolrException e) {
ConcurrentLog.logException(e);
} catch (IOException e) {
ConcurrentLog.logException(e);
}
countcheck++;
}
@ -1007,7 +1016,7 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
try {
long count = collectionConnector.getCountByQuery(query);
ConcurrentLog.info("CollectionConfiguration", "collecting " + count + " documents from the collection for harvestkey " + harvestkey);
BlockingQueue<SolrDocument> docs = collectionConnector.concurrentDocumentsByQuery(query, 0, 10000, 60000, 50);
BlockingQueue<SolrDocument> docs = collectionConnector.concurrentDocumentsByQuery(query, 0, 10000000, 600000, 100);
int countcheck = 0;
while ((doc = docs.take()) != AbstractSolrConnector.POISON_DOCUMENT) {
// for each to-be-processed entry work on the process tag
@ -1023,7 +1032,7 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
// switch over tag types
ProcessType tagtype = ProcessType.valueOf((String) tag);
if (tagtype == ProcessType.CLICKDEPTH) {
if (postprocessing_clickdepth(segment, doc, sid, url, CollectionSchema.clickdepth_i)) proccount_clickdepthchange++;
if (postprocessing_clickdepth(clickdepthCache, sid, url, CollectionSchema.clickdepth_i, 100)) proccount_clickdepthchange++;
}
if (tagtype == ProcessType.CITATION) {
@ -1050,7 +1059,7 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
long hostExtentCount = segment.fulltext().getDefaultConnector().getCountByQuery(q.toString());
hostExtentCache.put(hosthash, hostExtentCount);
}
if (postprocessing_references(rrCache, doc, sid, url, hostExtentCache)) proccount_referencechange++;
if (postprocessing_references(rrCache, sid, url, hostExtentCache)) proccount_referencechange++;
// all processing steps checked, remove the processing and harvesting key
sid.removeField(CollectionSchema.process_sxt.getSolrFieldName());
@ -1062,10 +1071,11 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
proccount++;
} catch (final Throwable e1) {
ConcurrentLog.logException(e1);
}
countcheck++;
}
if (count != countcheck) ConcurrentLog.warn("CollectionConfiguration", "ambiguous collection document count for harvestkey " + harvestkey + ": expected=" + count + ", counted=" + countcheck);
if (count != countcheck) ConcurrentLog.warn("CollectionConfiguration", "ambiguous collection document count for harvestkey " + harvestkey + ": expected=" + count + ", counted=" + countcheck); // big gap for harvestkey = null
ConcurrentLog.info("CollectionConfiguration", "cleanup_processing: re-calculated " + proccount+ " new documents, " +
proccount_clickdepthchange + " clickdepth changes, " +
proccount_referencechange + " reference-count changes, " +
@ -1113,7 +1123,7 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
this.crt = new TreeMap<byte[], double[]>(Base64Order.enhancedCoder);
try {
// select all documents for each host
BlockingQueue<String> ids = connector.concurrentIDsByQuery("{!raw f=" + CollectionSchema.host_s.getSolrFieldName() + "}" + host, 0, 1000000, 600000);
BlockingQueue<String> ids = connector.concurrentIDsByQuery("{!raw f=" + CollectionSchema.host_s.getSolrFieldName() + "}" + host, 0, 10000000, 600000);
String id;
while ((id = ids.take()) != AbstractSolrConnector.POISON_ID) {
this.crt.put(ASCII.getBytes(id), new double[]{0.0d,0.0d}); //{old value, new value}
@ -1226,7 +1236,7 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
if (ilc > 0) { // if (ilc == 0) then the reference report is wrong!
double[] d = this.crt.get(iid);
// d[] could be empty at some situations
if (d.length > 0) {
if (d != null && d.length > 0) {
ncr += d[0] / ilc;
} else {
// Output a warning that d[] is empty

@ -57,6 +57,7 @@ import net.yacy.cora.util.CommonPattern;
import net.yacy.cora.util.ConcurrentLog;
import net.yacy.document.parser.html.ImageEntry;
import net.yacy.search.index.Segment;
import net.yacy.search.index.Segment.ClickdepthCache;
public class WebgraphConfiguration extends SchemaConfiguration implements Serializable {
@ -295,17 +296,15 @@ public class WebgraphConfiguration extends SchemaConfiguration implements Serial
}
}
public int postprocessing(final Segment segment, final String harvestkey) {
public int postprocessing(final Segment segment, ClickdepthCache clickdepthCache, final String harvestkey) {
if (!this.contains(WebgraphSchema.process_sxt)) return 0;
if (!segment.connectedCitation()) return 0;
if (!segment.fulltext().writeToWebgraph()) return 0;
SolrConnector connector = segment.fulltext().getWebgraphConnector();
SolrConnector webgraphConnector = segment.fulltext().getWebgraphConnector();
// that means we must search for those entries.
connector.commit(true); // make sure that we have latest information that can be found
webgraphConnector.commit(true); // make sure that we have latest information that can be found
//BlockingQueue<SolrDocument> docs = index.fulltext().getSolr().concurrentQuery("*:*", 0, 1000, 60000, 10);
String query = (harvestkey == null || !this.contains(WebgraphSchema.harvestkey_s) ? "" : WebgraphSchema.harvestkey_s.getSolrFieldName() + ":\"" + harvestkey + "\" AND ") +
WebgraphSchema.process_sxt.getSolrFieldName() + ":[* TO *]";
BlockingQueue<SolrDocument> docs = connector.concurrentDocumentsByQuery(query, 0, 100000, 60000, 50);
String query = (harvestkey == null || !this.contains(WebgraphSchema.harvestkey_s) ? "" : WebgraphSchema.harvestkey_s.getSolrFieldName() + ":\"" + harvestkey + "\" AND ") + WebgraphSchema.process_sxt.getSolrFieldName() + ":[* TO *]";
BlockingQueue<SolrDocument> docs = webgraphConnector.concurrentDocumentsByQuery(query, 0, 10000000, 600000, 100);
SolrDocument doc;
String protocol, urlstub, id;
@ -318,7 +317,7 @@ public class WebgraphConfiguration extends SchemaConfiguration implements Serial
try {
SolrInputDocument sid = this.toSolrInputDocument(doc);
//boolean changed = false;
for (Object tag: proctags) {
// switch over tag types
@ -329,23 +328,30 @@ public class WebgraphConfiguration extends SchemaConfiguration implements Serial
urlstub = (String) doc.getFieldValue(WebgraphSchema.source_urlstub_s.getSolrFieldName());
id = (String) doc.getFieldValue(WebgraphSchema.source_id_s.getSolrFieldName());
url = new DigestURL(protocol + "://" + urlstub, ASCII.getBytes(id));
if (postprocessing_clickdepth(segment, doc, sid, url, WebgraphSchema.source_clickdepth_i)) proccount_clickdepthchange++;
if (postprocessing_clickdepth(clickdepthCache, sid, url, WebgraphSchema.source_clickdepth_i, 100)) {
proccount_clickdepthchange++;
//changed = true;
}
//ConcurrentLog.info("WebgraphConfiguration", "postprocessing webgraph source id " + id + ", url=" + protocol + "://" + urlstub + ", result: " + (changed ? "changed" : "not changed"));
}
if (this.contains(WebgraphSchema.target_protocol_s) && this.contains(WebgraphSchema.target_urlstub_s) && this.contains(WebgraphSchema.target_id_s)) {
protocol = (String) doc.getFieldValue(WebgraphSchema.target_protocol_s.getSolrFieldName());
urlstub = (String) doc.getFieldValue(WebgraphSchema.target_urlstub_s.getSolrFieldName());
id = (String) doc.getFieldValue(WebgraphSchema.target_id_s.getSolrFieldName());
url = new DigestURL(protocol + "://" + urlstub, ASCII.getBytes(id));
if (postprocessing_clickdepth(segment, doc, sid, url, WebgraphSchema.target_clickdepth_i)) proccount_clickdepthchange++;
if (postprocessing_clickdepth(clickdepthCache, sid, url, WebgraphSchema.target_clickdepth_i, 100)) {
proccount_clickdepthchange++;
//changed = true;
}
//ConcurrentLog.info("WebgraphConfiguration", "postprocessing webgraph target id " + id + ", url=" + protocol + "://" + urlstub + ", result: " + (changed ? "changed" : "not changed"));
}
}
}
// all processing steps checked, remove the processing tag
sid.removeField(WebgraphSchema.process_sxt.getSolrFieldName());
if (this.contains(WebgraphSchema.harvestkey_s)) sid.removeField(WebgraphSchema.harvestkey_s.getSolrFieldName());
// send back to index
connector.add(sid);
webgraphConnector.add(sid);
proccount++;
} catch (Throwable e1) {
Log.warn(WebgraphConfiguration.class, "postprocessing failed", e1);

Loading…
Cancel
Save