diff --git a/source/net/yacy/search/Switchboard.java b/source/net/yacy/search/Switchboard.java index df543f3a3..8a48991e9 100644 --- a/source/net/yacy/search/Switchboard.java +++ b/source/net/yacy/search/Switchboard.java @@ -69,7 +69,6 @@ import java.util.SortedMap; import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.LinkedBlockingQueue; @@ -99,8 +98,6 @@ import net.yacy.cora.document.WordCache; import net.yacy.cora.document.analysis.Classification; import net.yacy.cora.federate.solr.Ranking; import net.yacy.cora.federate.solr.SchemaConfiguration; -import net.yacy.cora.federate.solr.ProcessType; -import net.yacy.cora.federate.solr.connector.AbstractSolrConnector; import net.yacy.cora.federate.solr.instance.RemoteInstance; import net.yacy.cora.federate.yacy.CacheStrategy; import net.yacy.cora.lod.JenaTripleStore; @@ -2296,64 +2293,8 @@ public final class Switchboard extends serverSwitch { // if no crawl is running and processing is activated: // execute the (post-) processing steps for all entries that have a process tag assigned - if (this.crawlQueues.coreCrawlJobSize() == 0 && index.connectedCitation() && index.fulltext().getDefaultConfiguration().contains(CollectionSchema.process_sxt)) { - // that means we must search for those entries. - index.fulltext().getDefaultConnector().commit(true); // make sure that we have latest information that can be found - //BlockingQueue docs = index.fulltext().getSolr().concurrentQuery("*:*", 0, 1000, 60000, 10); - BlockingQueue docs = index.fulltext().getDefaultConnector().concurrentQuery(CollectionSchema.process_sxt.getSolrFieldName() + ":[* TO *]", 0, 10000, 60000, 50); - - SolrDocument doc; - int proccount_clickdepth = 0; - int proccount_clickdepthchange = 0; - int proccount_referencechange = 0; - while ((doc = docs.take()) != AbstractSolrConnector.POISON_DOCUMENT) { - // for each to-be-processed entry work on the process tag - Collection proctags = doc.getFieldValues(CollectionSchema.process_sxt.getSolrFieldName()); - for (Object tag: proctags) { - String tagname = (String) tag; - ProcessType tagtype = ProcessType.valueOf(tagname); - - // switch over tag types - if (tagtype == ProcessType.CLICKDEPTH) { - //proctags.remove(tag); - if (index.fulltext().getDefaultConfiguration().contains(CollectionSchema.clickdepth_i)) { - DigestURI url; - try { - // get new click depth and compare with old - Integer oldclickdepth = (Integer) doc.getFieldValue(CollectionSchema.clickdepth_i.getSolrFieldName()); - url = new DigestURI((String) doc.getFieldValue(CollectionSchema.sku.getSolrFieldName()), ASCII.getBytes((String) doc.getFieldValue(CollectionSchema.id.getSolrFieldName()))); - int clickdepth = CollectionConfiguration.getClickDepth(index.urlCitation(), url); - if (oldclickdepth == null || oldclickdepth.intValue() != clickdepth) { - //log.logInfo("new clickdepth " + clickdepth + " for " + url.toNormalform(true)); - proccount_clickdepthchange++; - } - SolrInputDocument sid = index.fulltext().getDefaultConfiguration().toSolrInputDocument(doc); - sid.setField(CollectionSchema.clickdepth_i.getSolrFieldName(), clickdepth); - - // refresh the link count; it's 'cheap' to do this here - if (index.fulltext().getDefaultConfiguration().contains(CollectionSchema.references_i)) { - Integer oldreferences = (Integer) doc.getFieldValue(CollectionSchema.references_i.getSolrFieldName()); - int references = index.urlCitation().count(url.hash()); - if (references > 0) { - if (oldreferences == null || oldreferences.intValue() != references) proccount_referencechange++; - sid.setField(CollectionSchema.references_i.getSolrFieldName(), references); - } - } - - // remove the processing tag - sid.removeField(CollectionSchema.process_sxt.getSolrFieldName()); - - // send back to index - index.fulltext().getDefaultConnector().add(sid); - proccount_clickdepth++; - } catch (Throwable e) { - Log.logException(e); - } - } - } - } - } - log.logInfo("cleanup_processing: re-calculated " + proccount_clickdepth + " new clickdepth values, " + proccount_clickdepthchange + " clickdepth values changed, " + proccount_referencechange + " reference-count values changed."); + if (this.crawlQueues.coreCrawlJobSize() == 0) { + index.fulltext().getDefaultConfiguration().postprocessing(index); } return true; diff --git a/source/net/yacy/search/index/Segment.java b/source/net/yacy/search/index/Segment.java index b2e1bc7a2..358d3cf28 100644 --- a/source/net/yacy/search/index/Segment.java +++ b/source/net/yacy/search/index/Segment.java @@ -61,15 +61,18 @@ import net.yacy.document.Parser; import net.yacy.kelondro.data.citation.CitationReference; import net.yacy.kelondro.data.citation.CitationReferenceFactory; import net.yacy.kelondro.data.meta.DigestURI; +import net.yacy.kelondro.data.meta.URIMetadataRow; import net.yacy.kelondro.data.word.Word; import net.yacy.kelondro.data.word.WordReference; import net.yacy.kelondro.data.word.WordReferenceFactory; import net.yacy.kelondro.data.word.WordReferenceRow; +import net.yacy.kelondro.index.RowHandleSet; import net.yacy.kelondro.logging.Log; import net.yacy.kelondro.rwi.IndexCell; import net.yacy.kelondro.rwi.ReferenceContainer; import net.yacy.kelondro.rwi.ReferenceFactory; import net.yacy.kelondro.util.Bitfield; +import net.yacy.kelondro.util.ByteBuffer; import net.yacy.kelondro.util.ISO639; import net.yacy.kelondro.util.MemoryControl; import net.yacy.repository.LoaderDispatcher; @@ -201,6 +204,65 @@ public class Segment { return this.urlCitationIndex; } + /** + * compute the click level using the citation reference database + * @param citations the citation database + * @param searchhash the hash of the url to be checked + * @return the clickdepth level or 999 if the root url cannot be found or a recursion limit is reached + * @throws IOException + */ + public int getClickDepth(final DigestURI url) throws IOException { + + final byte[] searchhash = url.hash(); + RowHandleSet rootCandidates = url.getPossibleRootHashes(); + + RowHandleSet ignore = new RowHandleSet(URIMetadataRow.rowdef.primaryKeyLength, URIMetadataRow.rowdef.objectOrder, 100); // a set of urlhashes to be ignored. This is generated from all hashes that are seen during recursion to prevent enless loops + RowHandleSet levelhashes = new RowHandleSet(URIMetadataRow.rowdef.primaryKeyLength, URIMetadataRow.rowdef.objectOrder, 1); // all hashes of a clickdepth. The first call contains the target hash only and therefore just one entry + try {levelhashes.put(searchhash);} catch (SpaceExceededException e) {throw new IOException(e);} + int leveldepth = 0; // the recursion depth and therefore the result depth-1. Shall be 0 for the first call + final byte[] hosthash = new byte[6]; // the host of the url to be checked + System.arraycopy(searchhash, 6, hosthash, 0, 6); + + long timeout = System.currentTimeMillis() + 10000; + for (int maxdepth = 0; maxdepth < 10 && System.currentTimeMillis() < timeout; maxdepth++) { + + RowHandleSet checknext = new RowHandleSet(URIMetadataRow.rowdef.primaryKeyLength, URIMetadataRow.rowdef.objectOrder, 100); + + // loop over all hashes at this clickdepth; the first call to this loop should contain only one hash and a leveldepth = 0 + checkloop: for (byte[] urlhash: levelhashes) { + + // get all the citations for this url and iterate + ReferenceContainer references = this.urlCitationIndex.get(urlhash, null); + if (references == null || references.size() == 0) continue checkloop; // don't know + Iterator i = references.entries(); + nextloop: while (i.hasNext()) { + CitationReference ref = i.next(); + if (ref == null) continue nextloop; + byte[] u = ref.urlhash(); + + // check ignore + if (ignore.has(u)) continue nextloop; + + // check if this is from the same host + if (!ByteBuffer.equals(u, 6, hosthash, 0, 6)) continue nextloop; + + // check if the url is a root url + if (rootCandidates.has(u)) { + return leveldepth + 1; + } + + // step to next depth level + try {checknext.put(u);} catch (SpaceExceededException e) {} + try {ignore.put(u);} catch (SpaceExceededException e) {} + } + } + leveldepth++; + levelhashes = checknext; + + } + return 999; + } + public long URLCount() { return this.fulltext.collectionSize(); } diff --git a/source/net/yacy/search/schema/CollectionConfiguration.java b/source/net/yacy/search/schema/CollectionConfiguration.java index 6df5eaf23..d3d0bceda 100644 --- a/source/net/yacy/search/schema/CollectionConfiguration.java +++ b/source/net/yacy/search/schema/CollectionConfiguration.java @@ -39,6 +39,7 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.BlockingQueue; import net.yacy.cora.document.ASCII; import net.yacy.cora.document.MultiProtocolURI; @@ -48,11 +49,12 @@ import net.yacy.cora.federate.solr.SchemaConfiguration; import net.yacy.cora.federate.solr.FailType; import net.yacy.cora.federate.solr.ProcessType; import net.yacy.cora.federate.solr.SchemaDeclaration; +import net.yacy.cora.federate.solr.connector.AbstractSolrConnector; +import net.yacy.cora.federate.solr.connector.SolrConnector; import net.yacy.cora.protocol.Domains; import net.yacy.cora.protocol.HeaderFramework; import net.yacy.cora.protocol.ResponseHeader; import net.yacy.cora.util.CommonPattern; -import net.yacy.cora.util.SpaceExceededException; import net.yacy.crawler.data.CrawlProfile; import net.yacy.crawler.retrieval.Response; import net.yacy.document.Condenser; @@ -62,12 +64,10 @@ import net.yacy.document.parser.html.ImageEntry; import net.yacy.kelondro.data.citation.CitationReference; import net.yacy.kelondro.data.meta.DigestURI; import net.yacy.kelondro.data.meta.URIMetadataRow; -import net.yacy.kelondro.index.RowHandleSet; import net.yacy.kelondro.logging.Log; import net.yacy.kelondro.rwi.IndexCell; -import net.yacy.kelondro.rwi.ReferenceContainer; import net.yacy.kelondro.util.Bitfield; -import net.yacy.kelondro.util.ByteBuffer; +import net.yacy.search.index.Segment; import org.apache.solr.common.SolrDocument; import org.apache.solr.common.SolrInputDocument; @@ -761,63 +761,77 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri return doc; } + /** - * compute the click level using the citation reference database - * @param citations the citation database - * @param searchhash the hash of the url to be checked - * @return the clickdepth level or 999 if the root url cannot be found or a recursion limit is reached - * @throws IOException + * post-processing steps for all entries that have a process tag assigned + * @param connector + * @param urlCitation + * @return */ - public static int getClickDepth(final IndexCell citations, final DigestURI url) throws IOException { - - final byte[] searchhash = url.hash(); - RowHandleSet rootCandidates = url.getPossibleRootHashes(); + public void postprocessing(Segment segment) { + if (!this.contains(CollectionSchema.process_sxt)) return; + if (!segment.connectedCitation()) return; + SolrConnector connector = segment.fulltext().getDefaultConnector(); + // that means we must search for those entries. + connector.commit(true); // make sure that we have latest information that can be found + //BlockingQueue docs = index.fulltext().getSolr().concurrentQuery("*:*", 0, 1000, 60000, 10); + BlockingQueue docs = connector.concurrentQuery(CollectionSchema.process_sxt.getSolrFieldName() + ":[* TO *]", 0, 10000, 60000, 50); - RowHandleSet ignore = new RowHandleSet(URIMetadataRow.rowdef.primaryKeyLength, URIMetadataRow.rowdef.objectOrder, 100); // a set of urlhashes to be ignored. This is generated from all hashes that are seen during recursion to prevent enless loops - RowHandleSet levelhashes = new RowHandleSet(URIMetadataRow.rowdef.primaryKeyLength, URIMetadataRow.rowdef.objectOrder, 1); // all hashes of a clickdepth. The first call contains the target hash only and therefore just one entry - try {levelhashes.put(searchhash);} catch (SpaceExceededException e) {throw new IOException(e);} - int leveldepth = 0; // the recursion depth and therefore the result depth-1. Shall be 0 for the first call - final byte[] hosthash = new byte[6]; // the host of the url to be checked - System.arraycopy(searchhash, 6, hosthash, 0, 6); - - long timeout = System.currentTimeMillis() + 10000; - for (int maxdepth = 0; maxdepth < 10 && System.currentTimeMillis() < timeout; maxdepth++) { - - RowHandleSet checknext = new RowHandleSet(URIMetadataRow.rowdef.primaryKeyLength, URIMetadataRow.rowdef.objectOrder, 100); - - // loop over all hashes at this clickdepth; the first call to this loop should contain only one hash and a leveldepth = 0 - checkloop: for (byte[] urlhash: levelhashes) { - - // get all the citations for this url and iterate - ReferenceContainer references = citations.get(urlhash, null); - if (references == null || references.size() == 0) continue checkloop; // don't know - Iterator i = references.entries(); - nextloop: while (i.hasNext()) { - CitationReference ref = i.next(); - if (ref == null) continue nextloop; - byte[] u = ref.urlhash(); - - // check ignore - if (ignore.has(u)) continue nextloop; - - // check if this is from the same host - if (!ByteBuffer.equals(u, 6, hosthash, 0, 6)) continue nextloop; + SolrDocument doc; + int proccount_clickdepth = 0; + int proccount_clickdepthchange = 0; + int proccount_referencechange = 0; + try { + while ((doc = docs.take()) != AbstractSolrConnector.POISON_DOCUMENT) { + // for each to-be-processed entry work on the process tag + Collection proctags = doc.getFieldValues(CollectionSchema.process_sxt.getSolrFieldName()); + for (Object tag: proctags) { + String tagname = (String) tag; + ProcessType tagtype = ProcessType.valueOf(tagname); - // check if the url is a root url - if (rootCandidates.has(u)) { - return leveldepth + 1; + // switch over tag types + if (tagtype == ProcessType.CLICKDEPTH) { + //proctags.remove(tag); + if (this.contains(CollectionSchema.clickdepth_i)) { + DigestURI url; + try { + // get new click depth and compare with old + Integer oldclickdepth = (Integer) doc.getFieldValue(CollectionSchema.clickdepth_i.getSolrFieldName()); + url = new DigestURI((String) doc.getFieldValue(CollectionSchema.sku.getSolrFieldName()), ASCII.getBytes((String) doc.getFieldValue(CollectionSchema.id.getSolrFieldName()))); + int clickdepth = segment.getClickDepth(url); + if (oldclickdepth == null || oldclickdepth.intValue() != clickdepth) { + //log.logInfo("new clickdepth " + clickdepth + " for " + url.toNormalform(true)); + proccount_clickdepthchange++; + } + SolrInputDocument sid = this.toSolrInputDocument(doc); + sid.setField(CollectionSchema.clickdepth_i.getSolrFieldName(), clickdepth); + + // refresh the link count; it's 'cheap' to do this here + if (this.contains(CollectionSchema.references_i)) { + Integer oldreferences = (Integer) doc.getFieldValue(CollectionSchema.references_i.getSolrFieldName()); + int references = segment.urlCitation().count(url.hash()); + if (references > 0) { + if (oldreferences == null || oldreferences.intValue() != references) proccount_referencechange++; + sid.setField(CollectionSchema.references_i.getSolrFieldName(), references); + } + } + + // remove the processing tag + sid.removeField(CollectionSchema.process_sxt.getSolrFieldName()); + + // send back to index + connector.add(sid); + proccount_clickdepth++; + } catch (Throwable e) { + Log.logException(e); + } + } } - - // step to next depth level - try {checknext.put(u);} catch (SpaceExceededException e) {} - try {ignore.put(u);} catch (SpaceExceededException e) {} } } - leveldepth++; - levelhashes = checknext; - + Log.logInfo("CollectionConfiguration", "cleanup_processing: re-calculated " + proccount_clickdepth + " new clickdepth values, " + proccount_clickdepthchange + " clickdepth values changed, " + proccount_referencechange + " reference-count values changed."); + } catch (InterruptedException e) { } - return 999; } /**