From da86f150ab8339a1ae516b732a40220578979069 Mon Sep 17 00:00:00 2001 From: Michael Peter Christen Date: Wed, 16 Apr 2014 21:34:28 +0200 Subject: [PATCH] - added a new Crawler Balancer: HostBalancer and HostQueues: This organizes all urls to be loaded in separate queues for each host. Each host separates the crawl depth into it's own queue. The primary rule for urls taken from any queue is, that the crawl depth is minimal. This produces a crawl depth which is identical to the clickdepth. Furthermorem the crawl is able to create a much better balancing over all hosts which is fair to all hosts that are in the queue. This process will create a very large number of files for wide crawls in the QUEUES folder: for each host a directory, for each crawl depth a file inside the directory. A crawl with maxdepth = 4 will be able to create 10.000s of files. To be able to use that many file readers, it was necessary to implement a new index data structure which opens the file only if an access is wanted (OnDemandOpenFileIndex). The usage of such on-demand file reader shall prevent that the number of file pointers is over the system limit, which is usually about 10.000 open files. Some parts of YaCy had to be adopted to handle the crawl depth number correctly. The logging and the IndexCreateQueues servlet had to be adopted to show the crawl queues differently, because the host name is attached to the port on the host to differentiate between http, https, and ftp services. --- defaults/yacy.logging | 1 + htroot/ConfigPortal.java | 8 +- htroot/Crawler_p.java | 4 + htroot/IndexCreateQueues_p.html | 2 +- htroot/IndexCreateQueues_p.java | 75 ++- .../net/yacy/cora/document/id/DigestURL.java | 23 +- source/net/yacy/crawler/Balancer.java | 8 + source/net/yacy/crawler/HostBalancer.java | 372 +++++++++++++ source/net/yacy/crawler/HostQueue.java | 516 ++++++++++++++---- source/net/yacy/crawler/HostQueues.java | 169 ------ source/net/yacy/crawler/LegacyBalancer.java | 6 + source/net/yacy/crawler/data/CrawlQueues.java | 10 +- source/net/yacy/crawler/data/Latency.java | 4 +- source/net/yacy/crawler/data/NoticedURL.java | 23 +- .../net/yacy/crawler/retrieval/Response.java | 2 +- source/net/yacy/document/Document.java | 2 +- source/net/yacy/document/TextParser.java | 15 +- .../document/importer/MediawikiImporter.java | 2 +- .../net/yacy/document/parser/bzipParser.java | 2 +- .../net/yacy/document/parser/gzipParser.java | 2 +- .../yacy/document/parser/sevenzipParser.java | 2 +- .../net/yacy/document/parser/tarParser.java | 2 +- .../net/yacy/document/parser/zipParser.java | 2 +- .../kelondro/index/OnDemandOpenFileIndex.java | 420 ++++++++++++++ .../yacy/kelondro/table/ChunkIterator.java | 1 + source/net/yacy/kelondro/table/Table.java | 56 +- .../net/yacy/repository/LoaderDispatcher.java | 2 +- source/net/yacy/search/Switchboard.java | 1 + .../net/yacy/search/index/DocumentIndex.java | 2 +- source/net/yacy/search/index/Segment.java | 10 +- .../net/yacy/search/query/QueryModifier.java | 9 +- .../schema/CollectionConfiguration.java | 3 +- 32 files changed, 1372 insertions(+), 384 deletions(-) create mode 100644 source/net/yacy/crawler/HostBalancer.java delete mode 100644 source/net/yacy/crawler/HostQueues.java create mode 100644 source/net/yacy/kelondro/index/OnDemandOpenFileIndex.java diff --git a/defaults/yacy.logging b/defaults/yacy.logging index ccb88a330..bfca8348e 100644 --- a/defaults/yacy.logging +++ b/defaults/yacy.logging @@ -29,6 +29,7 @@ sun.net.www.protocol.http.HttpURLConnection.level = INFO # Tray sun.awt.level = OFF java.awt.level = OFF +TABLE.level = INFO # List of global handlers handlers = java.util.logging.FileHandler,\ diff --git a/htroot/ConfigPortal.java b/htroot/ConfigPortal.java index 83abe30fa..f0dd5888e 100644 --- a/htroot/ConfigPortal.java +++ b/htroot/ConfigPortal.java @@ -29,6 +29,7 @@ import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; +import java.net.MalformedURLException; import java.util.Properties; import net.yacy.cora.document.id.DigestURL; @@ -99,7 +100,12 @@ public class ConfigPortal { String excludehosts = post.get("search.excludehosts", ""); sb.setConfig("search.excludehosts", excludehosts); - sb.setConfig("search.excludehosth", DigestURL.hosthashes(excludehosts)); + try { + sb.setConfig("search.excludehosth", DigestURL.hosthashes(excludehosts)); + } catch (MalformedURLException e) { + ConcurrentLog.logException(e); + sb.setConfig("search.excludehosth", ""); + } } if (post.containsKey("searchpage_default")) { // load defaults from defaults/yacy.init file diff --git a/htroot/Crawler_p.java b/htroot/Crawler_p.java index 1253cc502..691af66c4 100644 --- a/htroot/Crawler_p.java +++ b/htroot/Crawler_p.java @@ -339,16 +339,20 @@ public class Crawler_p { Set hosthashes = new HashSet(); for (DigestURL u: rootURLs) hosthashes.add(u.hosthash()); sb.index.fulltext().deleteStaleDomainHashes(hosthashes, deleteageDate); + sb.crawlQueues.removeHosts(hosthashes); } } else if (subPath) { siteFilter = CrawlProfile.subpathFilter(rootURLs); if (deleteold) { + Set hosthashes = new HashSet(); for (DigestURL u: rootURLs) { + hosthashes.add(u.hosthash()); String basepath = u.toNormalform(true); if (!basepath.endsWith("/")) {int p = basepath.lastIndexOf("/"); if (p > 0) basepath = basepath.substring(0, p + 1);} int count = sb.index.fulltext().remove(basepath, deleteageDate); if (count > 0) ConcurrentLog.info("Crawler_p", "deleted " + count + " documents for host " + u.getHost()); } + sb.crawlQueues.removeHosts(hosthashes); } } if (CrawlProfile.MATCH_ALL_STRING.equals(newcrawlingMustMatch)) { diff --git a/htroot/IndexCreateQueues_p.html b/htroot/IndexCreateQueues_p.html index 333856553..2de295770 100644 --- a/htroot/IndexCreateQueues_p.html +++ b/htroot/IndexCreateQueues_p.html @@ -65,7 +65,7 @@ #[hostcount]# #[hostdelta]# -  #[hostname]# +  #[hostnameport]# #{list}# diff --git a/htroot/IndexCreateQueues_p.java b/htroot/IndexCreateQueues_p.java index 40b832bdf..3adf3fa04 100644 --- a/htroot/IndexCreateQueues_p.java +++ b/htroot/IndexCreateQueues_p.java @@ -1,15 +1,19 @@ +import java.net.MalformedURLException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Set; import java.util.regex.Pattern; import java.util.regex.PatternSyntaxException; import net.yacy.cora.document.encoding.ASCII; +import net.yacy.cora.document.id.DigestURL; import net.yacy.cora.protocol.RequestHeader; import net.yacy.cora.util.ConcurrentLog; import net.yacy.crawler.CrawlSwitchboard; @@ -75,30 +79,45 @@ public class IndexCreateQueues_p { } } } else { - // iterating through the list of URLs - final Iterator iter = sb.crawlQueues.noticeURL.iterator(stackType); - Request entry; - final List removehashes = new ArrayList(); - while (iter.hasNext()) { - if ((entry = iter.next()) == null) continue; - String value = null; - - location: switch (option) { - case URL: value = (entry.url() == null) ? null : entry.url().toString(); break location; - case ANCHOR: value = entry.name(); break location; - case DEPTH: value = Integer.toString(entry.depth()); break location; - case INITIATOR: - value = (entry.initiator() == null || entry.initiator().length == 0) ? "proxy" : ASCII.String(entry.initiator()); - break location; - case MODIFIED: value = daydate(entry.appdate()); break location; - default: value = null; break location; + int removedByHosts = 0; + if (option == URL && deletepattern.startsWith(".*") && deletepattern.endsWith(".*")) { + // try to delete that using the host name + Set hosthashes = new HashSet(); + String hn = deletepattern.substring(2, deletepattern.length() - 2); + try { + hosthashes.add(DigestURL.hosthash(hn, hn.startsWith("ftp") ? 21 : 80)); + hosthashes.add(DigestURL.hosthash(hn, 443)); + removedByHosts = sb.crawlQueues.removeHosts(hosthashes); + } catch (MalformedURLException e) { } - - if (value != null && compiledPattern.matcher(value).matches()) removehashes.add(entry.url().hash()); } - ConcurrentLog.info("IndexCreateQueues_p", "created a remove list with " + removehashes.size() + " entries for pattern '" + deletepattern + "'"); - for (final byte[] b: removehashes) { - sb.crawlQueues.noticeURL.removeByURLHash(b); + + if (removedByHosts == 0) { + // iterating through the list of URLs + final Iterator iter = sb.crawlQueues.noticeURL.iterator(stackType); + Request entry; + final List removehashes = new ArrayList(); + while (iter.hasNext()) { + if ((entry = iter.next()) == null) continue; + String value = null; + + location: switch (option) { + case URL: value = (entry.url() == null) ? null : entry.url().toString(); break location; + case ANCHOR: value = entry.name(); break location; + case DEPTH: value = Integer.toString(entry.depth()); break location; + case INITIATOR: + value = (entry.initiator() == null || entry.initiator().length == 0) ? "proxy" : ASCII.String(entry.initiator()); + break location; + case MODIFIED: value = daydate(entry.appdate()); break location; + default: value = null; break location; + } + + if (value != null && compiledPattern.matcher(value).matches()) removehashes.add(entry.url().hash()); + } + ConcurrentLog.info("IndexCreateQueues_p", "created a remove list with " + removehashes.size() + " entries for pattern '" + deletepattern + "'"); + for (final byte[] b: removehashes) { + sb.crawlQueues.noticeURL.removeByURLHash(b); + } } } } catch (final PatternSyntaxException e) { @@ -121,13 +140,17 @@ public class IndexCreateQueues_p { int hc = 0; for (Map.Entry host: hosts.entrySet()) { - prop.putHTML("crawler_host_" + hc + "_hostname", host.getKey()); + String hostnameport = host.getKey(); + int p = hostnameport.lastIndexOf(':'); + String hostname = p < 0 ? hostnameport : hostnameport.substring(0, p); + prop.putHTML("crawler_host_" + hc + "_hostnameport", hostnameport); + prop.putHTML("crawler_host_" + hc + "_hostname", hostname); prop.put("crawler_host_" + hc + "_embed", embed ? 1 : 0); prop.put("crawler_host_" + hc + "_urlsPerHost", urlsPerHost); prop.putHTML("crawler_host_" + hc + "_queuename", stackType.name()); prop.put("crawler_host_" + hc + "_hostcount", host.getValue()[0]); prop.put("crawler_host_" + hc + "_hostdelta", host.getValue()[1] == Integer.MIN_VALUE ? "not accessed" : Integer.toString(host.getValue()[1])); - List domainStackReferences = sb.crawlQueues.noticeURL.getDomainStackReferences(stackType, host.getKey(), urlsPerHost, 10000); + List domainStackReferences = sb.crawlQueues.noticeURL.getDomainStackReferences(stackType, hostname, urlsPerHost, 10000); Seed initiator; String profileHandle; @@ -138,9 +161,11 @@ public class IndexCreateQueues_p { initiator = sb.peers.getConnected(request.initiator() == null ? "" : ASCII.String(request.initiator())); profileHandle = request.profileHandle(); profileEntry = profileHandle == null ? null : sb.crawler.getActive(profileHandle.getBytes()); + String depthString = Integer.toString(request.depth()); + while (depthString.length() < 4) depthString = "0" + depthString; prop.putHTML("crawler_host_" + hc + "_list_" + count + "_initiator", ((initiator == null) ? "proxy" : initiator.getName()) ); prop.put("crawler_host_" + hc + "_list_" + count + "_profile", ((profileEntry == null) ? "unknown" : profileEntry.collectionName())); - prop.put("crawler_host_" + hc + "_list_" + count + "_depth", request.depth()); + prop.putHTML("crawler_host_" + hc + "_list_" + count + "_depth", depthString); prop.put("crawler_host_" + hc + "_list_" + count + "_modified", daydate(request.appdate()) ); prop.putHTML("crawler_host_" + hc + "_list_" + count + "_anchor", request.name()); prop.putHTML("crawler_host_" + hc + "_list_" + count + "_url", request.url().toNormalform(true)); diff --git a/source/net/yacy/cora/document/id/DigestURL.java b/source/net/yacy/cora/document/id/DigestURL.java index 887aa8a7d..74a05e2b2 100644 --- a/source/net/yacy/cora/document/id/DigestURL.java +++ b/source/net/yacy/cora/document/id/DigestURL.java @@ -36,7 +36,6 @@ import net.yacy.cora.order.Digest; import net.yacy.cora.protocol.Domains; import net.yacy.cora.util.ByteArray; import net.yacy.cora.util.CommonPattern; -import net.yacy.cora.util.ConcurrentLog; /** * URI-object providing YaCy-hash computation @@ -57,18 +56,19 @@ public class DigestURL extends MultiProtocolURL implements Serializable { /** * Shortcut, calculate hash for shorted url/hostname * @param host + * @param port * @return */ - public static String hosthash(final String host) { + public static String hosthash(final String host, final int port) throws MalformedURLException { String h = host; - if (!h.startsWith("http://")) h = "http://" + h; - DigestURL url = null; - try { - url = new DigestURL(h); - } catch (final MalformedURLException e) { - ConcurrentLog.logException(e); - return null; + if (h.indexOf("//") < 0) { + if (port == 80 || port == 8080 || port == 8090) h = "http://" + h; + else if (port == 443) h = "https://" + h; + else if (port == 21 || port == 2121) h = "ftp://" + h; + else if (port > 999) h = "http://" + h + ":" + port; + else h = "http://" + h; } + DigestURL url = new DigestURL(h); return (url == null) ? null : ASCII.String(url.hash(), 6, 6); } @@ -77,15 +77,16 @@ public class DigestURL extends MultiProtocolURL implements Serializable { * the list is separated by comma * @param hostlist * @return list of host hashes without separation + * @throws MalformedURLException */ - public static String hosthashes(final String hostlist) { + public static String hosthashes(final String hostlist) throws MalformedURLException { String[] hs = CommonPattern.COMMA.split(hostlist); StringBuilder sb = new StringBuilder(hostlist.length()); for (String h: hs) { if (h == null) continue; h = h.trim(); if (h.isEmpty()) continue; - h = hosthash(h); + h = hosthash(h, h.startsWith("ftp.") ? 21 : 80); if (h == null || h.length() != 6) continue; sb.append(h); } diff --git a/source/net/yacy/crawler/Balancer.java b/source/net/yacy/crawler/Balancer.java index a8b236579..35b70aecb 100644 --- a/source/net/yacy/crawler/Balancer.java +++ b/source/net/yacy/crawler/Balancer.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import net.yacy.cora.storage.HandleSet; import net.yacy.cora.util.SpaceExceededException; @@ -62,6 +63,13 @@ public interface Balancer { */ public int removeAllByProfileHandle(final String profileHandle, final long timeout) throws IOException, SpaceExceededException; + /** + * delete all urls which are stored for given host hashes + * @param hosthashes + * @return number of deleted urls + */ + public int removeAllByHostHashes(final Set hosthashes); + /** * @param urlHashes, a list of hashes that shall be removed * @return number of entries that had been removed diff --git a/source/net/yacy/crawler/HostBalancer.java b/source/net/yacy/crawler/HostBalancer.java new file mode 100644 index 000000000..49e234f9f --- /dev/null +++ b/source/net/yacy/crawler/HostBalancer.java @@ -0,0 +1,372 @@ +/** + * HostQueues + * Copyright 2013 by Michael Christen + * First released 24.09.2013 at http://yacy.net + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program in the file lgpl21.txt + * If not, see . + */ + +package net.yacy.crawler; + +import java.io.File; +import java.io.IOException; +import java.net.MalformedURLException; +import java.util.ArrayList; +import java.util.ConcurrentModificationException; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; + +import net.yacy.cora.document.encoding.ASCII; +import net.yacy.cora.document.id.DigestURL; +import net.yacy.cora.order.Base64Order; +import net.yacy.cora.protocol.ClientIdentification; +import net.yacy.cora.storage.HandleSet; +import net.yacy.cora.util.ConcurrentLog; +import net.yacy.cora.util.SpaceExceededException; +import net.yacy.crawler.data.CrawlProfile; +import net.yacy.crawler.data.Latency; +import net.yacy.crawler.retrieval.Request; +import net.yacy.crawler.robots.RobotsTxt; +import net.yacy.kelondro.data.word.Word; +import net.yacy.kelondro.index.RowHandleSet; + +/** + * wrapper for single HostQueue queues; this is a collection of such queues. + * All these queues are stored in a common directory for the queue stacks. + * + * ATTENTION: the order of urls returned by this balancer must strictly follow the clickdepth order. + * That means that all links from a given host must be returned from the lowest crawldepth only. + * The crawldepth is interpreted as clickdepth and the crawler is producing that semantic using a + * correct crawl ordering. + */ +public class HostBalancer implements Balancer { + + + private final File hostsPath; + private final boolean exceed134217727; + private final Map queues; + private final Set roundRobinHostHashes; + private HandleSet urlHashDoubleCheck; + + public HostBalancer( + final File hostsPath, + final boolean exceed134217727) { + this.hostsPath = hostsPath; + this.exceed134217727 = exceed134217727; + this.urlHashDoubleCheck = new RowHandleSet(Word.commonHashLength, Word.commonHashOrder, 0); + + // create a stack for newly entered entries + if (!(hostsPath.exists())) hostsPath.mkdirs(); // make the path + this.queues = new ConcurrentHashMap(); + String[] list = this.hostsPath.list(); + for (String address: list) try { + File queuePath = new File(this.hostsPath, address); + HostQueue queue = new HostQueue(queuePath, this.queues.size() > 100, this.exceed134217727); + if (queue.size() == 0) { + queue.close(); + queuePath.delete(); + } else { + this.queues.put(DigestURL.hosthash(queue.getHost(), queue.getPort()), queue); + } + } catch (MalformedURLException e) { + ConcurrentLog.logException(e); + } + this.roundRobinHostHashes = new HashSet(); + } + + @Override + public synchronized void close() { + if (this.urlHashDoubleCheck != null) { + this.urlHashDoubleCheck.clear(); + } + for (HostQueue queue: this.queues.values()) queue.close(); + this.queues.clear(); + } + + @Override + public void clear() { + if (this.urlHashDoubleCheck != null) { + this.urlHashDoubleCheck.clear(); + } + for (HostQueue queue: this.queues.values()) queue.clear(); + this.queues.clear(); + } + + @Override + public Request get(final byte[] urlhash) throws IOException { + String hosthash = ASCII.String(urlhash, 6, 6); + HostQueue queue = this.queues.get(hosthash); + if (queue == null) return null; + return queue.get(urlhash); + } + + @Override + public int removeAllByProfileHandle(final String profileHandle, final long timeout) throws IOException, SpaceExceededException { + int c = 0; + for (HostQueue queue: this.queues.values()) c += queue.removeAllByProfileHandle(profileHandle, timeout); + return c; + } + + /** + * delete all urls which are stored for given host hashes + * @param hosthashes + * @return number of deleted urls + */ + @Override + public int removeAllByHostHashes(final Set hosthashes) { + int c = 0; + for (String h: hosthashes) { + HostQueue hq = this.queues.get(h); + if (hq != null) c += hq.removeAllByHostHashes(hosthashes); + } + // remove from cache + Iterator i = this.urlHashDoubleCheck.iterator(); + ArrayList deleteHashes = new ArrayList(); + while (i.hasNext()) { + String h = ASCII.String(i.next()); + if (hosthashes.contains(h.substring(6))) deleteHashes.add(h); + } + for (String h: deleteHashes) this.urlHashDoubleCheck.remove(ASCII.getBytes(h)); + return c; + } + + @Override + public synchronized int remove(final HandleSet urlHashes) throws IOException { + Map removeLists = new ConcurrentHashMap(); + for (byte[] urlhash: urlHashes) { + this.urlHashDoubleCheck.remove(urlhash); + String hosthash = ASCII.String(urlhash, 6, 6); + HandleSet removeList = removeLists.get(hosthash); + if (removeList == null) { + removeList = new RowHandleSet(Word.commonHashLength, Base64Order.enhancedCoder, 100); + removeLists.put(hosthash, removeList); + } + try {removeList.put(urlhash);} catch (SpaceExceededException e) {} + } + int c = 0; + for (Map.Entry entry: removeLists.entrySet()) { + HostQueue queue = this.queues.get(entry.getKey()); + if (queue != null) c += queue.remove(entry.getValue()); + } + return c; + } + + @Override + public boolean has(final byte[] urlhashb) { + if (this.urlHashDoubleCheck.has(urlhashb)) return true; + String hosthash = ASCII.String(urlhashb, 6, 6); + HostQueue queue = this.queues.get(hosthash); + if (queue == null) return false; + return queue.has(urlhashb); + } + + @Override + public int size() { + int c = 0; + for (HostQueue queue: this.queues.values()) c += queue.size(); + return c; + } + + @Override + public boolean isEmpty() { + for (HostQueue queue: this.queues.values()) if (!queue.isEmpty()) return false; + return true; + } + + /** + * push a request to one of the host queues. If the queue does not exist, it is created + * @param entry + * @param profile + * @param robots + * @return null if everything is ok or a string with an error message if the push is not allowed according to the crawl profile or robots + * @throws IOException + * @throws SpaceExceededException + */ + @Override + public synchronized String push(final Request entry, CrawlProfile profile, final RobotsTxt robots) throws IOException, SpaceExceededException { + if (this.has(entry.url().hash())) return "double occurrence"; + this.urlHashDoubleCheck.put(entry.url().hash()); + String hosthash = ASCII.String(entry.url().hash(), 6, 6); + HostQueue queue = this.queues.get(hosthash); + if (queue == null) { + queue = new HostQueue(this.hostsPath, entry.url().getHost(), entry.url().getPort(), this.queues.size() > 100, this.exceed134217727); + this.queues.put(hosthash, queue); + } + return queue.push(entry, profile, robots); + } + + /** + * get the next entry in this crawl queue in such a way that the domain access time delta is maximized + * and always above the given minimum delay time. An additional delay time is computed using the robots.txt + * crawl-delay time which is always respected. In case the minimum time cannot ensured, this method pauses + * the necessary time until the url is released and returned as CrawlEntry object. In case that a profile + * for the computed Entry does not exist, null is returned + * @param delay true if the requester demands forced delays using explicit thread sleep + * @param profile + * @return a url in a CrawlEntry object + * @throws IOException + * @throws SpaceExceededException + */ + @Override + public synchronized Request pop(boolean delay, CrawlSwitchboard cs, RobotsTxt robots) throws IOException { + tryagain: while (true) try { + if (this.roundRobinHostHashes.size() == 0) { + // select all queues on the lowest crawldepth level; that means: first look for the lowest level + int lowestCrawldepth = Integer.MAX_VALUE; + for (HostQueue hq: this.queues.values()) { + int lsd = hq.getLowestStackDepth(); + if (lsd < lowestCrawldepth) lowestCrawldepth = lsd; + } + // now add only such stacks which have the lowest level + for (Map.Entry entry: this.queues.entrySet()) { + if (entry.getValue().getLowestStackDepth() == lowestCrawldepth) this.roundRobinHostHashes.add(entry.getKey()); + } + // emergency case if this fails + if (this.roundRobinHostHashes.size() == 0) { + //assert this.queues.size() == 0; // thats the only case where that should happen + this.roundRobinHostHashes.addAll(this.queues.keySet()); + } + // if there are stacks with less than 10 entries, remove all stacks with more than 10 entries + // this shall kick out small stacks to prevent that too many files are opened for very wide crawls + boolean smallStacksExist = false; + smallsearch: for (String s: this.roundRobinHostHashes) { + HostQueue hq = this.queues.get(s); + if (hq != null && hq.size() <= 10) {smallStacksExist = true; break smallsearch;} + } + if (smallStacksExist) { + Iterator i = this.roundRobinHostHashes.iterator(); + while (i.hasNext()) { + String s = i.next(); + HostQueue hq = this.queues.get(s); + if (hq != null && hq.size() > 10) {i.remove();} + } + } + } + if (this.roundRobinHostHashes.size() == 0) return null; + + // first strategy: get one entry which does not need sleep time + for (String nextHH: this.roundRobinHostHashes) { + HostQueue hq = this.queues.get(nextHH); + int delta = Latency.waitingRemainingGuessed(hq.getHost(), DigestURL.hosthash(hq.getHost(), hq.getPort()), robots, ClientIdentification.yacyInternetCrawlerAgent); + if (delta <= 10) { + this.roundRobinHostHashes.remove(nextHH); + Request request = hq == null ? null : hq.pop(delay, cs, robots); + int size = hq == null ? 0 : hq.size(); + if (size == 0) { + hq.close(); + this.queues.remove(nextHH); + } + if (request != null) return request; + } + } + + // second strategy: take from the largest stack and clean round robin cache + int largest = Integer.MIN_VALUE; + String nextHH = null; + for (String h: this.roundRobinHostHashes) { + HostQueue hq = this.queues.get(h); + if (hq != null) { + int s = hq.size(); + if (s > largest) { + largest = s; + nextHH = h; + } + } + } + this.roundRobinHostHashes.clear(); // start from the beginning next time + HostQueue hq = this.queues.get(nextHH); + Request request = hq == null ? null : hq.pop(delay, cs, robots); + if (hq != null && hq.size() == 0) { + hq.close(); + this.queues.remove(nextHH); + } + return request; + } catch (ConcurrentModificationException e) { + continue tryagain; + } catch (Throwable e) { + throw new IOException(e.getMessage()); + } + } + + @Override + public Iterator iterator() throws IOException { + final Iterator hostsIterator = this.queues.values().iterator(); + @SuppressWarnings("unchecked") + final Iterator[] hostIterator = new Iterator[1]; + hostIterator[0] = null; + return new Iterator() { + @Override + public boolean hasNext() { + return hostsIterator.hasNext() || (hostIterator[0] != null && hostIterator[0].hasNext()); + } + @Override + public Request next() { + synchronized (HostBalancer.this) { + while (hostIterator[0] == null || !hostIterator[0].hasNext()) try { + HostQueue entry = hostsIterator.next(); + hostIterator[0] = entry.iterator(); + } catch (IOException e) {} + if (!hostIterator[0].hasNext()) return null; + return hostIterator[0].next(); + } + } + @Override + public void remove() { + hostIterator[0].remove(); + } + }; + } + + /** + * get a list of domains that are currently maintained as domain stacks + * @return a map of clear text strings of host names to an integer array: {the size of the domain stack, guessed delta waiting time} + */ + @Override + public Map getDomainStackHosts(RobotsTxt robots) { + Map map = new TreeMap(); // we use a tree map to get a stable ordering + for (HostQueue hq: this.queues.values()) try { + int delta = Latency.waitingRemainingGuessed(hq.getHost(), DigestURL.hosthash(hq.getHost(), hq.getPort()), robots, ClientIdentification.yacyInternetCrawlerAgent); + map.put(hq.getHost() + ":" + hq.getPort(), new Integer[]{hq.size(), delta}); + } catch (MalformedURLException e) { + ConcurrentLog.logException(e); + } + return map; + } + + /** + * get lists of crawl request entries for a specific host + * @param host + * @param maxcount + * @param maxtime + * @return a list of crawl loader requests + */ + @Override + public List getDomainStackReferences(String host, int maxcount, long maxtime) { + try { + HostQueue hq = this.queues.get(DigestURL.hosthash(host, host.startsWith("ftp.") ? 21 : 80)); + if (hq == null) hq = this.queues.get(DigestURL.hosthash(host, 443)); + return hq == null ? new ArrayList(0) : hq.getDomainStackReferences(host, maxcount, maxtime); + } catch (MalformedURLException e) { + ConcurrentLog.logException(e); + return null; + } + } + +} diff --git a/source/net/yacy/crawler/HostQueue.java b/source/net/yacy/crawler/HostQueue.java index 9d7c51df3..25e2ce3a4 100644 --- a/source/net/yacy/crawler/HostQueue.java +++ b/source/net/yacy/crawler/HostQueue.java @@ -22,114 +22,298 @@ package net.yacy.crawler; import java.io.File; import java.io.IOException; +import java.net.MalformedURLException; +import java.util.ArrayList; import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; import net.yacy.cora.document.encoding.ASCII; +import net.yacy.cora.document.encoding.UTF8; +import net.yacy.cora.document.id.DigestURL; import net.yacy.cora.order.Base64Order; +import net.yacy.cora.protocol.ClientIdentification; import net.yacy.cora.storage.HandleSet; import net.yacy.cora.util.ConcurrentLog; import net.yacy.cora.util.SpaceExceededException; import net.yacy.crawler.data.CrawlProfile; +import net.yacy.crawler.data.Latency; import net.yacy.crawler.retrieval.Request; import net.yacy.crawler.robots.RobotsTxt; import net.yacy.kelondro.data.word.Word; import net.yacy.kelondro.index.BufferedObjectIndex; +import net.yacy.kelondro.index.Index; +import net.yacy.kelondro.index.OnDemandOpenFileIndex; import net.yacy.kelondro.index.Row; import net.yacy.kelondro.index.RowHandleSet; import net.yacy.kelondro.table.Table; -import net.yacy.kelondro.util.MemoryControl; +import net.yacy.kelondro.util.kelondroException; import net.yacy.repository.Blacklist.BlacklistType; import net.yacy.search.Switchboard; -public class HostQueue { +public class HostQueue implements Balancer { public static final String indexSuffix = ".stack"; private static final int EcoFSBufferSize = 1000; private static final int objectIndexBufferSize = 1000; - private static final int MAX_DOUBLE_PUSH_CHECK = 100000; - private final String hostHash; - private final File queuesPath; - private BufferedObjectIndex requestStack; - private HandleSet urlHashDoubleCheck; + private final File hostPath; + private final String hostName; + private String hostHash; + private final int port; + private final boolean exceed134217727; + private final boolean onDemand; + private TreeMap depthStacks; - public HostQueue( - final File queuesPath, - final String hostHash, - final boolean useTailCache, + public HostQueue ( + final File hostsPath, + final String hostName, + final int port, + final boolean onDemand, final boolean exceed134217727) { - this.hostHash = hostHash; - this.queuesPath = queuesPath; - this.urlHashDoubleCheck = new RowHandleSet(Word.commonHashLength, Word.commonHashOrder, 0); - - // create a stack for newly entered entries - if (!(this.queuesPath.exists())) this.queuesPath.mkdir(); // make the path - this.queuesPath.mkdirs(); - final File f = new File(this.queuesPath, this.hostHash + indexSuffix); + this.onDemand = onDemand; + this.exceed134217727 = exceed134217727; + this.hostName = hostName; + this.port = port; + this.hostPath = new File(hostsPath, this.hostName + "." + this.port); + init(); + } + + public HostQueue ( + final File hostPath, + final boolean onDemand, + final boolean exceed134217727) { + this.onDemand = onDemand; + this.exceed134217727 = exceed134217727; + this.hostPath = hostPath; + // parse the hostName and port from the file name + String filename = hostPath.getName(); + int p = filename.lastIndexOf('.'); + this.hostName = filename.substring(0, p); + this.port = Integer.parseInt(filename.substring(p + 1)); + init(); + } + + private final void init() { try { - this.requestStack = new BufferedObjectIndex(new Table(f, Request.rowdef, EcoFSBufferSize, 0, useTailCache, exceed134217727, true), objectIndexBufferSize); - } catch (final SpaceExceededException e) { - try { - this.requestStack = new BufferedObjectIndex(new Table(f, Request.rowdef, 0, 0, false, exceed134217727, true), objectIndexBufferSize); - } catch (final SpaceExceededException e1) { - ConcurrentLog.logException(e1); + this.hostHash = DigestURL.hosthash(this.hostName, this.port); + } catch (MalformedURLException e) { + this.hostHash = ""; + } + if (!(this.hostPath.exists())) this.hostPath.mkdirs(); + this.depthStacks = new TreeMap(); + int size = openAllStacks(); + ConcurrentLog.info("Balancer", "opened HostQueue " + this.hostPath.getAbsolutePath() + " with " + size + " urls."); + } + + public String getHost() { + return this.hostName; + } + + public int getPort() { + return this.port; + } + + private int openAllStacks() { + String[] l = this.hostPath.list(); + int c = 0; + if (l != null) for (String s: l) { + if (s.endsWith(indexSuffix)) try { + int depth = Integer.parseInt(s.substring(0, s.length() - indexSuffix.length())); + File stackFile = new File(this.hostPath, s); + Index depthStack = openStack(stackFile, depth); + if (depthStack != null) { + int sz = depthStack.size(); + if (sz == 0) { + depthStack.close(); + stackFile.delete(); + } else { + this.depthStacks.put(depth, depthStack); + c += sz; + } + } + } catch (NumberFormatException e) {} + } + return c; + } + + public synchronized int getLowestStackDepth() { + while (this.depthStacks.size() > 0) { + Map.Entry entry; + synchronized (this) { + entry = this.depthStacks.firstEntry(); + } + if (entry == null) return 0; // happens only if map is empty + if (entry.getValue().size() == 0) { + entry.getValue().close(); + getFile(entry.getKey()).delete(); + this.depthStacks.remove(entry.getKey()); + continue; } + return entry.getKey(); } - ConcurrentLog.info("Balancer", "opened balancer file with " + this.requestStack.size() + " entries from " + f.toString()); + // this should not happen but it happens if a deletion is done + //assert false; + return 0; } - public synchronized void close() { - int sizeBeforeClose = this.size(); - if (this.urlHashDoubleCheck != null) { - this.urlHashDoubleCheck.clear(); - this.urlHashDoubleCheck = null; + private Index getLowestStack() { + while (this.depthStacks.size() > 0) { + Map.Entry entry; + synchronized (this) { + entry = this.depthStacks.firstEntry(); + } + if (entry == null) return null; // happens only if map is empty + if (entry.getValue().size() == 0) { + entry.getValue().close(); + getFile(entry.getKey()).delete(); + this.depthStacks.remove(entry.getKey()); + continue; + } + return entry.getValue(); } - if (this.requestStack != null) { - this.requestStack.close(); - this.requestStack = null; + // this should not happen + //assert false; + return null; + } + + private Index getStack(int depth) { + Index depthStack; + synchronized (this) { + depthStack = this.depthStacks.get(depth); + if (depthStack != null) return depthStack; + } + // create a new stack + synchronized (this) { + // check again + depthStack = this.depthStacks.get(depth); + if (depthStack != null) return depthStack; + // now actually create a new stack + final File f = getFile(depth); + depthStack = openStack(f, depth); + if (depthStack != null) this.depthStacks.put(depth, depthStack); } - if (sizeBeforeClose == 0) { - // clean up - new File(this.queuesPath, this.hostHash + indexSuffix).delete(); + return depthStack; + } + + private File getFile(int depth) { + String name = Integer.toString(depth); + while (name.length() < 4) name = "0" + name; + final File f = new File(this.hostPath, name + indexSuffix); + return f; + } + + private Index openStack(File f, int depth) { + for (int i = 0; i < 10; i++) { + // we try that again if it fails because it shall not fail + if (this.onDemand && depth > 3 && (!f.exists() || f.length() < 10000)) { + try { + return new BufferedObjectIndex(new OnDemandOpenFileIndex(f, Request.rowdef, exceed134217727), objectIndexBufferSize); + } catch (kelondroException e) { + // possibly the file was closed meanwhile + ConcurrentLog.logException(e); + } + } else { + try { + return new BufferedObjectIndex(new Table(f, Request.rowdef, EcoFSBufferSize, 0, false, exceed134217727, true), objectIndexBufferSize); + } catch (final SpaceExceededException e) { + try { + return new BufferedObjectIndex(new Table(f, Request.rowdef, 0, 0, false, exceed134217727, true), objectIndexBufferSize); + } catch (final SpaceExceededException e1) { + ConcurrentLog.logException(e1); + } + } catch (kelondroException e) { + // possibly the file was closed meanwhile + ConcurrentLog.logException(e); + } + } } + return null; } - public void clear() { - try { - this.requestStack.clear(); - } catch (final IOException e) { - ConcurrentLog.logException(e); + @Override + public synchronized void close() { + for (Map.Entry entry: this.depthStacks.entrySet()) { + int size = entry.getValue().size(); + entry.getValue().close(); + if (size == 0) getFile(entry.getKey()).delete(); + } + this.depthStacks.clear(); + String[] l = this.hostPath.list(); + if ((l == null || l.length == 0) && this.hostPath != null) this.hostPath.delete(); + } + + @Override + public synchronized void clear() { + for (Map.Entry entry: this.depthStacks.entrySet()) { + entry.getValue().close(); + getFile(entry.getKey()).delete(); + } + this.depthStacks.clear(); + String[] l = this.hostPath.list(); + for (String s: l) { + new File(this.hostPath, s).delete(); } - this.urlHashDoubleCheck.clear(); + this.hostPath.delete(); } + @Override public Request get(final byte[] urlhash) throws IOException { assert urlhash != null; - if (this.requestStack == null) return null; // case occurs during shutdown - final Row.Entry entry = this.requestStack.get(urlhash, false); - if (entry == null) return null; - return new Request(entry); + if (this.depthStacks == null) return null; // case occurs during shutdown + for (Index depthStack: this.depthStacks.values()) { + final Row.Entry entry = depthStack.get(urlhash, false); + if (entry == null) return null; + return new Request(entry); + } + return null; } + @Override public int removeAllByProfileHandle(final String profileHandle, final long timeout) throws IOException, SpaceExceededException { // first find a list of url hashes that shall be deleted - final HandleSet urlHashes = new RowHandleSet(this.requestStack.row().primaryKeyLength, Base64Order.enhancedCoder, 100); final long terminate = timeout == Long.MAX_VALUE ? Long.MAX_VALUE : (timeout > 0) ? System.currentTimeMillis() + timeout : Long.MAX_VALUE; + int count = 0; synchronized (this) { - final Iterator i = this.requestStack.rows(); - Row.Entry rowEntry; - Request crawlEntry; - while (i.hasNext() && (System.currentTimeMillis() < terminate)) { - rowEntry = i.next(); - crawlEntry = new Request(rowEntry); - if (crawlEntry.profileHandle().equals(profileHandle)) { - urlHashes.put(crawlEntry.url().hash()); + for (Index depthStack: this.depthStacks.values()) { + final HandleSet urlHashes = new RowHandleSet(Word.commonHashLength, Base64Order.enhancedCoder, 100); + final Iterator i = depthStack.rows(); + Row.Entry rowEntry; + Request crawlEntry; + while (i.hasNext() && (System.currentTimeMillis() < terminate)) { + rowEntry = i.next(); + crawlEntry = new Request(rowEntry); + if (crawlEntry.profileHandle().equals(profileHandle)) { + urlHashes.put(crawlEntry.url().hash()); + } + if (System.currentTimeMillis() > terminate) break; + } + for (final byte[] urlhash: urlHashes) { + depthStack.remove(urlhash); + count++; } } } - - // then delete all these urls from the queues and the file index - return remove(urlHashes); + return count; + } + + /** + * delete all urls which are stored for given host hashes + * @param hosthashes + * @return number of deleted urls + */ + @Override + public int removeAllByHostHashes(final Set hosthashes) { + for (String h: hosthashes) { + if (this.hostHash.equals(h)) { + int s = this.size(); + this.clear(); + return s; + } + } + return 0; } /** @@ -138,43 +322,53 @@ public class HostQueue { * @return number of entries that had been removed * @throws IOException */ + @Override public synchronized int remove(final HandleSet urlHashes) throws IOException { - final int s = this.requestStack.size(); int removedCounter = 0; - for (final byte[] urlhash: urlHashes) { - final Row.Entry entry = this.requestStack.remove(urlhash); - if (entry != null) removedCounter++; - - // remove from double-check caches - this.urlHashDoubleCheck.remove(urlhash); + for (Index depthStack: this.depthStacks.values()) { + final int s = depthStack.size(); + for (final byte[] urlhash: urlHashes) { + final Row.Entry entry = depthStack.remove(urlhash); + if (entry != null) removedCounter++; + } + if (removedCounter == 0) return 0; + assert depthStack.size() + removedCounter == s : "urlFileIndex.size() = " + depthStack.size() + ", s = " + s; } - if (removedCounter == 0) return 0; - assert this.requestStack.size() + removedCounter == s : "urlFileIndex.size() = " + this.requestStack.size() + ", s = " + s; return removedCounter; } + @Override public boolean has(final byte[] urlhashb) { - return this.requestStack.has(urlhashb) || this.urlHashDoubleCheck.has(urlhashb); + for (Index depthStack: this.depthStacks.values()) { + if (depthStack.has(urlhashb)) return true; + } + return false; } + @Override public int size() { - return this.requestStack.size(); + int size = 0; + for (Index depthStack: this.depthStacks.values()) { + size += depthStack.size(); + } + return size; } + @Override public boolean isEmpty() { - return this.requestStack.isEmpty(); + for (Index depthStack: this.depthStacks.values()) { + if (!depthStack.isEmpty()) return false; + } + return true; } + @Override public String push(final Request entry, CrawlProfile profile, final RobotsTxt robots) throws IOException, SpaceExceededException { assert entry != null; final byte[] hash = entry.url().hash(); synchronized (this) { // double-check - if (this.urlHashDoubleCheck.has(hash)) return "double occurrence in double_push_check"; - if (this.requestStack.has(hash)) return "double occurrence in urlFileIndex"; - - if (this.urlHashDoubleCheck.size() > MAX_DOUBLE_PUSH_CHECK || MemoryControl.shortStatus()) this.urlHashDoubleCheck.clear(); - this.urlHashDoubleCheck.put(hash); + if (this.has(hash)) return "double occurrence in urlFileIndex"; // increase dom counter if (profile != null && profile.domMaxPages() != Integer.MAX_VALUE && profile.domMaxPages() > 0) { @@ -182,75 +376,155 @@ public class HostQueue { } // add to index - final int s = this.requestStack.size(); - this.requestStack.put(entry.toRow()); - assert s < this.requestStack.size() : "hash = " + ASCII.String(hash) + ", s = " + s + ", size = " + this.requestStack.size(); - assert this.requestStack.has(hash) : "hash = " + ASCII.String(hash); - - // add the hash to a queue if the host is unknown to get this fast into the balancer - // now disabled to prevent that a crawl 'freezes' to a specific domain which hosts a lot of pages; the queues are filled anyway - //if (!this.domainStacks.containsKey(entry.url().getHost())) pushHashToDomainStacks(entry.url().getHost(), entry.url().hash()); + Index depthStack = getStack(entry.depth()); + final int s = depthStack.size(); + depthStack.put(entry.toRow()); + assert s < depthStack.size() : "hash = " + ASCII.String(hash) + ", s = " + s + ", size = " + depthStack.size(); + assert depthStack.has(hash) : "hash = " + ASCII.String(hash); } robots.ensureExist(entry.url(), profile.getAgent(), true); // concurrently load all robots.txt return null; } - - public Request pop() throws IOException { - // returns a crawl entry from the stack and ensures minimum delta times + + @Override + public Request pop(boolean delay, CrawlSwitchboard cs, RobotsTxt robots) throws IOException { + // returns a crawl entry from the stack and ensures minimum delta times + long sleeptime = 0; Request crawlEntry = null; - while (!this.requestStack.isEmpty()) { - synchronized (this) { - Row.Entry rowEntry = this.requestStack.removeOne(); - if (rowEntry == null) return null; + CrawlProfile profileEntry = null; + synchronized (this) { + mainloop: while (true) { + Index depthStack = getLowestStack(); + if (depthStack == null) return null; + Row.Entry rowEntry = null; + while (depthStack.size() > 0) { + rowEntry = depthStack.removeOne(); + if (rowEntry != null) break; + } + if (rowEntry == null) continue mainloop; crawlEntry = new Request(rowEntry); - + // check blacklist (again) because the user may have created blacklist entries after the queue has been filled if (Switchboard.urlBlacklist.isListed(BlacklistType.CRAWLER, crawlEntry.url())) { ConcurrentLog.fine("CRAWLER", "URL '" + crawlEntry.url() + "' is in blacklist."); - continue; + continue mainloop; + } + + // at this point we must check if the crawlEntry has relevance because the crawl profile still exists + // if not: return null. A calling method must handle the null value and try again + profileEntry = cs.get(UTF8.getBytes(crawlEntry.profileHandle())); + if (profileEntry == null) { + ConcurrentLog.warn("Balancer", "no profile entry for handle " + crawlEntry.profileHandle()); + continue mainloop; } + + // depending on the caching policy we need sleep time to avoid DoS-like situations + sleeptime = Latency.getDomainSleepTime(robots, profileEntry, crawlEntry.url()); break; } } if (crawlEntry == null) return null; + ClientIdentification.Agent agent = profileEntry == null ? ClientIdentification.yacyInternetCrawlerAgent : profileEntry.getAgent(); + long robotsTime = Latency.getRobotsTime(robots, crawlEntry.url(), agent); + Latency.updateAfterSelection(crawlEntry.url(), profileEntry == null ? 0 : robotsTime); + if (delay && sleeptime > 0) { + // force a busy waiting here + // in best case, this should never happen if the balancer works properly + // this is only to protection against the worst case, where the crawler could + // behave in a DoS-manner + ConcurrentLog.info("BALANCER", "forcing crawl-delay of " + sleeptime + " milliseconds for " + crawlEntry.url().getHost() + ": " + Latency.waitingRemainingExplain(crawlEntry.url(), robots, agent)); + long loops = sleeptime / 1000; + long rest = sleeptime % 1000; + if (loops < 3) { + rest = rest + 1000 * loops; + loops = 0; + } + Thread.currentThread().setName("Balancer waiting for " + crawlEntry.url().getHost() + ": " + sleeptime + " milliseconds"); + synchronized(this) { + // must be synchronized here to avoid 'takeover' moves from other threads which then idle the same time which would not be enough + if (rest > 0) {try {this.wait(rest);} catch (final InterruptedException e) {}} + for (int i = 0; i < loops; i++) { + ConcurrentLog.info("BALANCER", "waiting for " + crawlEntry.url().getHost() + ": " + (loops - i) + " seconds remaining..."); + try {this.wait(1000); } catch (final InterruptedException e) {} + } + } + Latency.updateAfterSelection(crawlEntry.url(), robotsTime); + } return crawlEntry; } + @Override public Iterator iterator() throws IOException { - return new EntryIterator(); + final Iterator> depthIterator = this.depthStacks.entrySet().iterator(); + @SuppressWarnings("unchecked") + final Iterator[] rowIterator = new Iterator[1]; + rowIterator[0] = null; + return new Iterator() { + @Override + public boolean hasNext() { + return depthIterator.hasNext() || (rowIterator[0] != null && rowIterator[0].hasNext()); + } + @Override + public Request next() { + synchronized (HostQueue.this) { + try { + while (rowIterator[0] == null || !rowIterator[0].hasNext()) { + Map.Entry entry = depthIterator.next(); + rowIterator[0] = entry.getValue().iterator(); + } + if (!rowIterator[0].hasNext()) return null; + Row.Entry rowEntry = rowIterator[0].next(); + if (rowEntry == null) return null; + return new Request(rowEntry); + } catch (Throwable e) { + return null; + } + } + } + @Override + public void remove() { + rowIterator[0].remove(); + } + }; } - private class EntryIterator implements Iterator { - - private Iterator rowIterator; - - public EntryIterator() throws IOException { - this.rowIterator = HostQueue.this.requestStack.rows(); - } - - @Override - public boolean hasNext() { - return (this.rowIterator == null) ? false : this.rowIterator.hasNext(); - } + /** + * get a list of domains that are currently maintained as domain stacks + * @return a map of clear text strings of host names to an integer array: {the size of the domain stack, guessed delta waiting time} + */ + @Override + public Map getDomainStackHosts(RobotsTxt robots) { + Map map = new TreeMap(); + int delta = Latency.waitingRemainingGuessed(this.hostName, this.hostHash, robots, ClientIdentification.yacyInternetCrawlerAgent); + map.put(this.hostName, new Integer[]{this.size(), delta}); + return map; + } - @Override - public Request next() { - final Row.Entry entry = this.rowIterator.next(); - try { - return (entry == null) ? null : new Request(entry); - } catch (final IOException e) { - ConcurrentLog.logException(e); - this.rowIterator = null; - return null; + /** + * get lists of crawl request entries for a specific host + * @param host + * @param maxcount + * @param maxtime + * @return a list of crawl loader requests + */ + @Override + public List getDomainStackReferences(String host, int maxcount, long maxtime) { + if (host == null) return new ArrayList(0); + if (!this.hostName.equals(host)) return new ArrayList(0); + final ArrayList cel = new ArrayList(maxcount); + long timeout = maxtime == Long.MAX_VALUE ? Long.MAX_VALUE : System.currentTimeMillis() + maxtime; + Iterator i; + try { + i = this.iterator(); + while (i.hasNext()) { + Request r = i.next(); + if (r != null) cel.add(r); + if (System.currentTimeMillis() > timeout || cel.size() >= maxcount) break; } + } catch (IOException e) { } - - @Override - public void remove() { - if (this.rowIterator != null) this.rowIterator.remove(); - } - + return cel; } } diff --git a/source/net/yacy/crawler/HostQueues.java b/source/net/yacy/crawler/HostQueues.java deleted file mode 100644 index 37085a782..000000000 --- a/source/net/yacy/crawler/HostQueues.java +++ /dev/null @@ -1,169 +0,0 @@ -/** - * HostQueues - * Copyright 2013 by Michael Christen - * First released 24.09.2013 at http://yacy.net - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License as published by the Free Software Foundation; either - * version 2.1 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public License - * along with this program in the file lgpl21.txt - * If not, see . - */ - -package net.yacy.crawler; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import net.yacy.cora.document.encoding.ASCII; -import net.yacy.cora.order.Base64Order; -import net.yacy.cora.storage.HandleSet; -import net.yacy.cora.util.SpaceExceededException; -import net.yacy.crawler.data.CrawlProfile; -import net.yacy.crawler.retrieval.Request; -import net.yacy.crawler.robots.RobotsTxt; -import net.yacy.kelondro.data.word.Word; -import net.yacy.kelondro.index.RowHandleSet; - -/** - * wrapper for single HostQueue queues; this is a collection of such queues. - * All these queues are stored in a common directory for the queue stacks - */ -public class HostQueues { - - private final File queuesPath; - private final boolean useTailCache; - private final boolean exceed134217727; - private final Map queues; - - public HostQueues( - final File queuesPath, - final boolean useTailCache, - final boolean exceed134217727) { - this.queuesPath = queuesPath; - this.useTailCache = useTailCache; - this.exceed134217727 = exceed134217727; - - // create a stack for newly entered entries - if (!(queuesPath.exists())) queuesPath.mkdir(); // make the path - this.queuesPath.mkdirs(); - this.queues = new HashMap(); - String[] list = this.queuesPath.list(); - for (String queuefile: list) { - if (queuefile.endsWith(HostQueue.indexSuffix)) { - String hosthash = queuefile.substring(0, queuefile.length() - HostQueue.indexSuffix.length()); - HostQueue queue = new HostQueue(this.queuesPath, hosthash, this.useTailCache, this.exceed134217727); - this.queues.put(hosthash, queue); - } - } - } - - public synchronized void close() { - for (HostQueue queue: this.queues.values()) queue.close(); - this.queues.clear(); - } - - public void clear() { - for (HostQueue queue: this.queues.values()) queue.clear(); - this.queues.clear(); - } - - public Request get(final byte[] urlhash) throws IOException { - String hosthash = ASCII.String(urlhash, 6, 6); - HostQueue queue = this.queues.get(hosthash); - if (queue == null) return null; - return queue.get(urlhash); - } - - public int removeAllByProfileHandle(final String profileHandle, final long timeout) throws IOException, SpaceExceededException { - int c = 0; - for (HostQueue queue: this.queues.values()) c += queue.removeAllByProfileHandle(profileHandle, timeout); - return c; - } - - public synchronized int remove(final HandleSet urlHashes) throws IOException { - Map removeLists = new HashMap(); - for (byte[] urlhash: urlHashes) { - String hosthash = ASCII.String(urlhash, 6, 6); - HandleSet removeList = removeLists.get(hosthash); - if (removeList == null) { - removeList = new RowHandleSet(Word.commonHashLength, Base64Order.enhancedCoder, 100); - removeLists.put(hosthash, removeList); - } - try {removeList.put(urlhash);} catch (SpaceExceededException e) {} - } - int c = 0; - for (Map.Entry entry: removeLists.entrySet()) { - HostQueue queue = this.queues.get(entry.getKey()); - if (queue != null) c += queue.remove(entry.getValue()); - } - return c; - } - - public boolean has(final byte[] urlhashb) { - String hosthash = ASCII.String(urlhashb, 6, 6); - HostQueue queue = this.queues.get(hosthash); - if (queue == null) return false; - return queue.has(urlhashb); - } - - public int size() { - int c = 0; - for (HostQueue queue: this.queues.values()) c += queue.size(); - return c; - } - - public boolean isEmpty() { - for (HostQueue queue: this.queues.values()) if (!queue.isEmpty()) return false; - return true; - } - - /** - * push a request to one of the host queues. If the queue does not exist, it is created - * @param entry - * @param profile - * @param robots - * @return null if everything is ok or a string with an error message if the push is not allowed according to the crawl profile or robots - * @throws IOException - * @throws SpaceExceededException - */ - public String push(final Request entry, CrawlProfile profile, final RobotsTxt robots) throws IOException, SpaceExceededException { - String hosthash = ASCII.String(entry.url().hash(), 6, 6); - HostQueue queue = this.queues.get(hosthash); - if (queue == null) { - queue = new HostQueue(this.queuesPath, hosthash, this.useTailCache, this.exceed134217727); - this.queues.put(hosthash, queue); - } - return queue.push(entry, profile, robots); - } - - /** - * remove one request from all stacks except from those as listed in notFromHost - * @param notFromHost do not collect from these hosts - * @return a list of requests - * @throws IOException - */ - public List pop(Set notFromHost) throws IOException { - ArrayList requests = new ArrayList(); - for (Map.Entry entry: this.queues.entrySet()) { - if (notFromHost.contains(entry.getKey())) continue; - Request r = entry.getValue().pop(); - if (r != null) requests.add(r); - } - return requests; - } - -} diff --git a/source/net/yacy/crawler/LegacyBalancer.java b/source/net/yacy/crawler/LegacyBalancer.java index eb771dcaf..dc8658f06 100644 --- a/source/net/yacy/crawler/LegacyBalancer.java +++ b/source/net/yacy/crawler/LegacyBalancer.java @@ -30,6 +30,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.Set; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -626,4 +627,9 @@ public class LegacyBalancer implements Balancer { } + @Override + public int removeAllByHostHashes(Set hosthashes) { + return 0; + } + } diff --git a/source/net/yacy/crawler/data/CrawlQueues.java b/source/net/yacy/crawler/data/CrawlQueues.java index e9431a016..7d66b039b 100644 --- a/source/net/yacy/crawler/data/CrawlQueues.java +++ b/source/net/yacy/crawler/data/CrawlQueues.java @@ -34,6 +34,7 @@ import java.util.Date; import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @@ -181,7 +182,12 @@ public class CrawlQueues { this.noticeURL.removeByURLHash(hash); this.delegatedURL.remove(hash); } - + + public int removeHosts(final Set hosthashes) { + return this.noticeURL.removeByHostHash(hosthashes); + //this.delegatedURL.remove(hash); + } + public DigestURL getURL(final byte[] urlhash) { assert urlhash != null; if (urlhash == null || urlhash.length == 0) { @@ -304,7 +310,7 @@ public class CrawlQueues { return true; } catch (final IOException e) { CrawlQueues.log.severe(stats + ": CANNOT FETCH ENTRY: " + e.getMessage(), e); - if (e.getMessage().indexOf("hash is null",0) > 0) { + if (e.getMessage() != null && e.getMessage().indexOf("hash is null",0) > 0) { this.noticeURL.clear(NoticedURL.StackType.LOCAL); } } diff --git a/source/net/yacy/crawler/data/Latency.java b/source/net/yacy/crawler/data/Latency.java index c1c7842b6..4d1ddabb5 100644 --- a/source/net/yacy/crawler/data/Latency.java +++ b/source/net/yacy/crawler/data/Latency.java @@ -164,7 +164,7 @@ public class Latency { waiting = Math.max(waiting, (int) (host.average() * Switchboard.getSwitchboard().getConfigFloat(SwitchboardConstants.CRAWLER_LATENCY_FACTOR, 0.5f))); // if the number of same hosts as in the url in the loading queue is greater than MaxSameHostInQueue, then increase waiting - if (Switchboard.getSwitchboard().crawlQueues.hostcount(hostname) > Switchboard.getSwitchboard().getConfigInt(SwitchboardConstants.CRAWLER_MAX_SAME_HOST_IN_QUEUE, 20)) waiting += 5000; + if (Switchboard.getSwitchboard().crawlQueues.hostcount(hostname) > Switchboard.getSwitchboard().getConfigInt(SwitchboardConstants.CRAWLER_MAX_SAME_HOST_IN_QUEUE, 20)) waiting += 3000; // the time since last access to the domain is the basis of the remaining calculation final int timeSinceLastAccess = (int) (System.currentTimeMillis() - host.lastacc()); @@ -207,7 +207,7 @@ public class Latency { waiting = Math.max(waiting, (int) (host.average() * Switchboard.getSwitchboard().getConfigFloat(SwitchboardConstants.CRAWLER_LATENCY_FACTOR, 0.5f))); // if the number of same hosts as in the url in the loading queue is greater than MaxSameHostInQueue, then increase waiting - if (Switchboard.getSwitchboard().crawlQueues.hostcount(url.getHost()) > Switchboard.getSwitchboard().getConfigInt(SwitchboardConstants.CRAWLER_MAX_SAME_HOST_IN_QUEUE, 20)) waiting += 5000; + if (Switchboard.getSwitchboard().crawlQueues.hostcount(url.getHost()) > Switchboard.getSwitchboard().getConfigInt(SwitchboardConstants.CRAWLER_MAX_SAME_HOST_IN_QUEUE, 20)) waiting += 3000; // the time since last access to the domain is the basis of the remaining calculation final int timeSinceLastAccess = (int) (System.currentTimeMillis() - host.lastacc()); diff --git a/source/net/yacy/crawler/data/NoticedURL.java b/source/net/yacy/crawler/data/NoticedURL.java index a64cf9193..045290e9e 100644 --- a/source/net/yacy/crawler/data/NoticedURL.java +++ b/source/net/yacy/crawler/data/NoticedURL.java @@ -33,6 +33,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import net.yacy.cora.order.Base64Order; import net.yacy.cora.storage.HandleSet; @@ -40,7 +41,7 @@ import net.yacy.cora.util.ConcurrentLog; import net.yacy.cora.util.SpaceExceededException; import net.yacy.crawler.Balancer; import net.yacy.crawler.CrawlSwitchboard; -import net.yacy.crawler.LegacyBalancer; +import net.yacy.crawler.HostBalancer; import net.yacy.crawler.retrieval.Request; import net.yacy.crawler.robots.RobotsTxt; import net.yacy.kelondro.data.word.Word; @@ -59,14 +60,13 @@ public class NoticedURL { protected NoticedURL( final File cachePath, - final boolean useTailCache, + @SuppressWarnings("unused") final boolean useTailCache, final boolean exceed134217727) { ConcurrentLog.info("NoticedURL", "CREATING STACKS at " + cachePath.toString()); - this.coreStack = new LegacyBalancer(cachePath, "urlNoticeCoreStack", useTailCache, exceed134217727); - this.limitStack = new LegacyBalancer(cachePath, "urlNoticeLimitStack", useTailCache, exceed134217727); - //overhangStack = new plasmaCrawlBalancer(overhangStackFile); - this.remoteStack = new LegacyBalancer(cachePath, "urlNoticeRemoteStack", useTailCache, exceed134217727); - this.noloadStack = new LegacyBalancer(cachePath, "urlNoticeNoLoadStack", useTailCache, exceed134217727); + this.coreStack = new HostBalancer(new File(cachePath, "CrawlerCoreStacks"), exceed134217727); + this.limitStack = new HostBalancer(new File(cachePath, "CrawlerLimitStacks"), exceed134217727); + this.remoteStack = new HostBalancer(new File(cachePath, "CrawlerRemoteStacks"), exceed134217727); + this.noloadStack = new HostBalancer(new File(cachePath, "CrawlerNoLoadStacks"), exceed134217727); } public void clear() { @@ -206,6 +206,15 @@ public class NoticedURL { return removed; } + public int removeByHostHash(final Set hosthashes) { + int removed = 0; + removed += this.noloadStack.removeAllByHostHashes(hosthashes); + removed += this.coreStack.removeAllByHostHashes(hosthashes); + removed += this.limitStack.removeAllByHostHashes(hosthashes); + removed += this.remoteStack.removeAllByHostHashes(hosthashes); + return removed; + } + /** * get a list of domains that are currently maintained as domain stacks * @return a map of clear text strings of host names to two integers: the size of the domain stacks and the access delta time diff --git a/source/net/yacy/crawler/retrieval/Response.java b/source/net/yacy/crawler/retrieval/Response.java index 004af54a8..3ff9f8104 100644 --- a/source/net/yacy/crawler/retrieval/Response.java +++ b/source/net/yacy/crawler/retrieval/Response.java @@ -835,7 +835,7 @@ public class Response { final String supportError = TextParser.supports(url(), this.responseHeader == null ? null : this.responseHeader.mime()); if (supportError != null) throw new Parser.Failure("no parser support:" + supportError, url()); try { - return TextParser.parseSource(new AnchorURL(url()), this.responseHeader == null ? null : this.responseHeader.mime(), this.responseHeader == null ? "UTF-8" : this.responseHeader.getCharacterEncoding(), this.content); + return TextParser.parseSource(new AnchorURL(url()), this.responseHeader == null ? null : this.responseHeader.mime(), this.responseHeader == null ? "UTF-8" : this.responseHeader.getCharacterEncoding(), this.request.depth(), this.content); } catch (final Exception e) { return null; } diff --git a/source/net/yacy/document/Document.java b/source/net/yacy/document/Document.java index 8b8b3cdfc..71b8b79e2 100644 --- a/source/net/yacy/document/Document.java +++ b/source/net/yacy/document/Document.java @@ -515,7 +515,7 @@ dc_rights this.emaillinks = new LinkedHashMap(); final Map collectedImages = new HashMap(); // this is a set that is collected now and joined later to the imagelinks for (final Map.Entry entry: this.images.entrySet()) { - if (entry.getKey().getHost().equals(thishost)) this.inboundlinks.put(entry.getKey(), "image"); else this.outboundlinks.put(entry.getKey(), "image"); + if (entry.getKey() != null && entry.getKey().getHost() != null && entry.getKey().getHost().equals(thishost)) this.inboundlinks.put(entry.getKey(), "image"); else this.outboundlinks.put(entry.getKey(), "image"); } for (final AnchorURL url: this.anchors) { if (url == null) continue; diff --git a/source/net/yacy/document/TextParser.java b/source/net/yacy/document/TextParser.java index 1f867ec9c..023299ad9 100644 --- a/source/net/yacy/document/TextParser.java +++ b/source/net/yacy/document/TextParser.java @@ -159,6 +159,7 @@ public final class TextParser { final AnchorURL location, final String mimeType, final String charset, + final int depth, final File sourceFile ) throws InterruptedException, Parser.Failure { @@ -172,7 +173,7 @@ public final class TextParser { throw new Parser.Failure(errorMsg, location); } sourceStream = new BufferedInputStream(new FileInputStream(sourceFile)); - docs = parseSource(location, mimeType, charset, sourceFile.length(), sourceStream); + docs = parseSource(location, mimeType, charset, depth, sourceFile.length(), sourceStream); } catch (final Exception e) { if (e instanceof InterruptedException) throw (InterruptedException) e; if (e instanceof Parser.Failure) throw (Parser.Failure) e; @@ -189,6 +190,7 @@ public final class TextParser { final AnchorURL location, String mimeType, final String charset, + final int depth, final byte[] content ) throws Parser.Failure { if (AbstractParser.log.isFine()) AbstractParser.log.fine("Parsing '" + location + "' from byte-array"); @@ -203,7 +205,7 @@ public final class TextParser { } assert !idioms.isEmpty() : "no parsers applied for url " + location.toNormalform(true); - Document[] docs = parseSource(location, mimeType, idioms, charset, content); + Document[] docs = parseSource(location, mimeType, idioms, charset, depth, content); return docs; } @@ -212,6 +214,7 @@ public final class TextParser { final AnchorURL location, String mimeType, final String charset, + final int depth, final long contentLength, final InputStream sourceStream ) throws Parser.Failure { @@ -242,7 +245,7 @@ public final class TextParser { } catch (final IOException e) { throw new Parser.Failure(e.getMessage(), location); } - Document[] docs = parseSource(location, mimeType, idioms, charset, b); + Document[] docs = parseSource(location, mimeType, idioms, charset, depth, b); return docs; } @@ -273,6 +276,7 @@ public final class TextParser { final String mimeType, final Set parsers, final String charset, + final int depth, final byte[] sourceArray ) throws Parser.Failure { final String fileExt = MultiProtocolURL.getFileExtension(location.getFileName()); @@ -326,7 +330,10 @@ public final class TextParser { } throw new Parser.Failure("All parser failed: " + failedParsers, location); } - for (final Document d: docs) { assert d.getTextStream() != null : "mimeType = " + mimeType; } // verify docs + for (final Document d: docs) { + assert d.getTextStream() != null : "mimeType = " + mimeType; + d.setDepth(depth); + } // verify docs return docs; } diff --git a/source/net/yacy/document/importer/MediawikiImporter.java b/source/net/yacy/document/importer/MediawikiImporter.java index 56c4eea5a..061698349 100644 --- a/source/net/yacy/document/importer/MediawikiImporter.java +++ b/source/net/yacy/document/importer/MediawikiImporter.java @@ -522,7 +522,7 @@ public class MediawikiImporter extends Thread implements Importer { public void genDocument() throws Parser.Failure { try { this.url = new AnchorURL(this.urlStub + this.title); - final Document[] parsed = TextParser.parseSource(this.url, "text/html", "UTF-8", UTF8.getBytes(this.html)); + final Document[] parsed = TextParser.parseSource(this.url, "text/html", "UTF-8", 1, UTF8.getBytes(this.html)); this.document = Document.mergeDocuments(this.url, "text/html", parsed); // the wiki parser is not able to find the proper title in the source text, so it must be set here this.document.setTitle(this.title); diff --git a/source/net/yacy/document/parser/bzipParser.java b/source/net/yacy/document/parser/bzipParser.java index 6e088fafd..373bc955d 100644 --- a/source/net/yacy/document/parser/bzipParser.java +++ b/source/net/yacy/document/parser/bzipParser.java @@ -94,7 +94,7 @@ public class bzipParser extends AbstractParser implements Parser { out.close(); // creating a new parser class to parse the unzipped content - docs = TextParser.parseSource(location, null, null, tempFile); + docs = TextParser.parseSource(location, null, null, 999, tempFile); } catch (final Exception e) { if (e instanceof InterruptedException) throw (InterruptedException) e; if (e instanceof Parser.Failure) throw (Parser.Failure) e; diff --git a/source/net/yacy/document/parser/gzipParser.java b/source/net/yacy/document/parser/gzipParser.java index d62022085..6f36eb541 100644 --- a/source/net/yacy/document/parser/gzipParser.java +++ b/source/net/yacy/document/parser/gzipParser.java @@ -79,7 +79,7 @@ public class gzipParser extends AbstractParser implements Parser { out.close(); // creating a new parser class to parse the unzipped content - docs = TextParser.parseSource(location,null,null,tempFile); + docs = TextParser.parseSource(location, null, null, 999, tempFile); } catch (final Exception e) { if (e instanceof InterruptedException) throw (InterruptedException) e; if (e instanceof Parser.Failure) throw (Parser.Failure) e; diff --git a/source/net/yacy/document/parser/sevenzipParser.java b/source/net/yacy/document/parser/sevenzipParser.java index 245521540..2dacd8ba9 100644 --- a/source/net/yacy/document/parser/sevenzipParser.java +++ b/source/net/yacy/document/parser/sevenzipParser.java @@ -171,7 +171,7 @@ public class sevenzipParser extends AbstractParser implements Parser { // below for reversion of the effects final AnchorURL url = AnchorURL.newAnchor(this.doc.dc_source(), this.prefix + "/" + super.filePath); final String mime = TextParser.mimeOf(super.filePath.substring(super.filePath.lastIndexOf('.') + 1)); - theDocs = TextParser.parseSource(url, mime, null, this.cfos.toByteArray()); + theDocs = TextParser.parseSource(url, mime, null, this.doc.getDepth() + 1, this.cfos.toByteArray()); this.doc.addSubDocuments(theDocs); } diff --git a/source/net/yacy/document/parser/tarParser.java b/source/net/yacy/document/parser/tarParser.java index 86ded03dd..79ccca964 100644 --- a/source/net/yacy/document/parser/tarParser.java +++ b/source/net/yacy/document/parser/tarParser.java @@ -90,7 +90,7 @@ public class tarParser extends AbstractParser implements Parser { try { tmp = FileUtils.createTempFile(this.getClass(), name); FileUtils.copy(tis, tmp, entry.getSize()); - subDocs = TextParser.parseSource(AnchorURL.newAnchor(url, "#" + name), mime, null, tmp); + subDocs = TextParser.parseSource(AnchorURL.newAnchor(url, "#" + name), mime, null, 999, tmp); if (subDocs == null) continue; for (final Document d: subDocs) docacc.add(d); } catch (final Parser.Failure e) { diff --git a/source/net/yacy/document/parser/zipParser.java b/source/net/yacy/document/parser/zipParser.java index ff232d723..2f381270a 100644 --- a/source/net/yacy/document/parser/zipParser.java +++ b/source/net/yacy/document/parser/zipParser.java @@ -89,7 +89,7 @@ public class zipParser extends AbstractParser implements Parser { FileUtils.copy(zis, tmp, entry.getSize()); final DigestURL virtualURL = DigestURL.newURL(url, "#" + name); //this.log.logInfo("ZIP file parser: " + virtualURL.toNormalform(false, false)); - docs = TextParser.parseSource(new AnchorURL(virtualURL), mime, null, tmp); + docs = TextParser.parseSource(new AnchorURL(virtualURL), mime, null, 999, tmp); if (docs == null) continue; for (final Document d: docs) docacc.add(d); } catch (final Parser.Failure e) { diff --git a/source/net/yacy/kelondro/index/OnDemandOpenFileIndex.java b/source/net/yacy/kelondro/index/OnDemandOpenFileIndex.java new file mode 100644 index 000000000..c8c8598aa --- /dev/null +++ b/source/net/yacy/kelondro/index/OnDemandOpenFileIndex.java @@ -0,0 +1,420 @@ +/** + * OnDemandOpenFileIndex + * Copyright 2014 by Michael Christen + * First released 16.04.2014 at http://yacy.net + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program in the file lgpl21.txt + * If not, see . + */ + + +package net.yacy.kelondro.index; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import net.yacy.cora.order.CloneableIterator; +import net.yacy.cora.util.ConcurrentLog; +import net.yacy.cora.util.SpaceExceededException; +import net.yacy.kelondro.index.Row.Entry; +import net.yacy.kelondro.table.Table; +import net.yacy.kelondro.util.kelondroException; + +/** + * a write buffer for ObjectIndex entries + * @author Michael Peter Christen + * + */ +public class OnDemandOpenFileIndex implements Index, Iterable { + + private final File file; + private final Row rowdef; + private int sizecache; + private final boolean exceed134217727; + + public OnDemandOpenFileIndex(final File file, Row rowdef, final boolean exceed134217727) { + this.file = file; + this.rowdef = rowdef; + this.exceed134217727 = exceed134217727; + this.sizecache = -1; + } + + private Index getIndex() { + try { + return new Table(file, rowdef, 1000, 0, false, exceed134217727, false); + } catch (kelondroException e) { + ConcurrentLog.logException(e); + return null; + } catch (SpaceExceededException e) { + ConcurrentLog.logException(e); + return null; + } + } + + @Override + public synchronized byte[] smallestKey() { + Index index = getIndex(); + if (index == null) return null; + byte[] b = index.smallestKey(); + index.close(); + return b; + } + + @Override + public synchronized byte[] largestKey() { + Index index = getIndex(); + if (index == null) return null; + byte[] b = index.largestKey(); + index.close(); + return b; + } + + @Override + public synchronized void optimize() { + Index index = getIndex(); + if (index == null) return; + index.optimize(); + index.close(); + } + + @Override + public synchronized long mem() { + Index index = getIndex(); + if (index == null) return 0; + long l = index.mem(); + index.close(); + return l; + } + + @Override + public synchronized void addUnique(final Entry row) throws SpaceExceededException, IOException { + Index index = getIndex(); + if (index == null) return; + try { + index.addUnique(row); + if (this.sizecache >= 0) this.sizecache++; + } catch (IOException e) { + throw e; + } finally { + index.close(); + } + } + + @Override + public synchronized void clear() throws IOException { + Index index = getIndex(); + if (index == null) return; + try { + index.clear(); + this.sizecache = 0; + } catch (IOException e) { + throw e; + } finally { + index.close(); + } + } + + @Override + public synchronized void close() { + } + + @Override + public synchronized void deleteOnExit() { + Index index = getIndex(); + index.deleteOnExit(); + index.close(); + } + + @Override + public String filename() { + return this.file.toString(); + } + + @Override + public synchronized int size() { + if (sizecache >= 0) return sizecache; + Index index = getIndex(); + if (index == null) return 0; + int i = index.size(); + index.close(); + this.sizecache = i; + return i; + } + + @Override + public synchronized Entry get(final byte[] key, final boolean forcecopy) throws IOException { + Index index = getIndex(); + if (index == null) return null; + try { + return index.get(key, forcecopy); + } catch (IOException e) { + throw e; + } finally { + index.close(); + } + } + + @Override + public synchronized Map get(final Collection keys, final boolean forcecopy) throws IOException, InterruptedException { + final Map map = new TreeMap(row().objectOrder); + Row.Entry entry; + for (final byte[] key: keys) { + entry = get(key, forcecopy); + if (entry != null) map.put(key, entry); + } + return map; + } + + @Override + public synchronized boolean has(final byte[] key) { + Index index = getIndex(); + if (index == null) return false; + boolean b = index.has(key); + index.close(); + return b; + } + + @Override + public synchronized boolean isEmpty() { + Index index = getIndex(); + if (index == null) return true; + boolean b = index.isEmpty(); + if (b) this.sizecache = 0; + index.close(); + return b; + } + + /** + * Adds the row to the index. The row is identified by the primary key of the row. + * @param row a index row + * @return true if this set did _not_ already contain the given row. + * @throws IOException + * @throws SpaceExceededException + */ + @Override + public synchronized boolean put(final Entry row) throws IOException, SpaceExceededException { + Index index = getIndex(); + if (index == null) return false; + try { + boolean b = index.put(row); + if (this.sizecache >= 0 && b) this.sizecache++; + return b; + } catch (IOException e) { + throw e; + } finally { + index.close(); + } + } + + @Override + public synchronized Entry remove(final byte[] key) throws IOException { + Index index = getIndex(); + if (index == null) return null; + try { + Entry e = index.remove(key); + if (this.sizecache >= 0 && e != null) this.sizecache--; + return e; + } catch (IOException e) { + throw e; + } finally { + index.close(); + } + } + + @Override + public synchronized boolean delete(final byte[] key) throws IOException { + Index index = getIndex(); + if (index == null) return false; + try { + boolean b = index.delete(key); + if (this.sizecache >= 0 && b) this.sizecache--; + return b; + } catch (IOException e) { + throw e; + } finally { + index.close(); + } + } + + @Override + public synchronized List removeDoubles() throws IOException, SpaceExceededException { + Index index = getIndex(); + if (index == null) return null; + try { + List l = index.removeDoubles(); + this.sizecache = index.size(); + return l; + } catch (IOException e) { + throw e; + } finally { + index.close(); + } + } + + @Override + public synchronized List top(final int count) throws IOException { + Index index = getIndex(); + if (index == null) return null; + try { + return index.top(count); + } catch (IOException e) { + throw e; + } finally { + index.close(); + } + } + + @Override + public synchronized List random(final int count) throws IOException { + Index index = getIndex(); + if (index == null) return null; + try { + return index.random(count); + } catch (IOException e) { + throw e; + } finally { + index.close(); + } + } + + @Override + public synchronized Entry removeOne() throws IOException { + Index index = getIndex(); + if (index == null) return null; + try { + Entry e = index.removeOne(); + if (this.sizecache >= 0 && e != null) this.sizecache--; + return e; + } catch (IOException e) { + throw e; + } finally { + index.close(); + } + } + + @Override + public synchronized Entry replace(final Entry row) throws SpaceExceededException, IOException { + Index index = getIndex(); + if (index == null) return null; + try { + Entry e = index.replace(row); + if (this.sizecache >= 0 && e == null) this.sizecache++; + return e; + } catch (IOException e) { + throw e; + } finally { + index.close(); + } + } + + @Override + public Row row() { + return this.rowdef; + } + + @Override + public synchronized CloneableIterator keys(final boolean up, final byte[] firstKey) throws IOException { + Index index = getIndex(); + if (index == null) return null; + try { + return index.keys(up, firstKey); + } catch (IOException e) { + throw e; + } finally { + index.close(); + } + } + + @Override + public synchronized Iterator iterator() { + Index index = getIndex(); + if (index == null) return null; + List list = new ArrayList(); + Iterator i = index.iterator(); + while (i.hasNext()) list.add(i.next()); + index.close(); + return list.iterator(); + } + + @Override + public synchronized CloneableIterator rows(final boolean up, final byte[] firstKey) throws IOException { + Index index = getIndex(); + if (index == null) return null; + final List list = new ArrayList(); + final Iterator i = index.rows(up, firstKey); + while (i.hasNext()) list.add(i.next()); + index.close(); + final Iterator li = list.iterator(); + return new CloneableIterator(){ + @Override + public boolean hasNext() { + return li.hasNext(); + } + @Override + public Entry next() { + return li.next(); + } + @Override + public void remove() { + li.remove(); + } + @Override + public CloneableIterator clone(Object modifier) { + return null; + } + @Override + public void close() { + } + }; + } + + @Override + public synchronized CloneableIterator rows() throws IOException { + Index index = getIndex(); + if (index == null) return null; + final List list = new ArrayList(); + final Iterator i = index.rows(); + while (i.hasNext()) list.add(i.next()); + index.close(); + final Iterator li = list.iterator(); + return new CloneableIterator(){ + @Override + public boolean hasNext() { + return li.hasNext(); + } + @Override + public Entry next() { + return li.next(); + } + @Override + public void remove() { + li.remove(); + } + @Override + public CloneableIterator clone(Object modifier) { + return null; + } + @Override + public void close() { + } + }; + } + +} diff --git a/source/net/yacy/kelondro/table/ChunkIterator.java b/source/net/yacy/kelondro/table/ChunkIterator.java index 521dd72cc..1ddd7ba41 100644 --- a/source/net/yacy/kelondro/table/ChunkIterator.java +++ b/source/net/yacy/kelondro/table/ChunkIterator.java @@ -56,6 +56,7 @@ public class ChunkIterator extends LookAheadIterator implements Iterator private final int recordsize; public ChunkIterator(final File file, final int recordsize, final int chunksize) throws FileNotFoundException { + if (!file.exists()) throw new FileNotFoundException(file.getAbsolutePath()); assert (file.exists()); assert file.length() % recordsize == 0; this.recordsize = recordsize; diff --git a/source/net/yacy/kelondro/table/Table.java b/source/net/yacy/kelondro/table/Table.java index 06939c5b2..ed67e398e 100644 --- a/source/net/yacy/kelondro/table/Table.java +++ b/source/net/yacy/kelondro/table/Table.java @@ -71,6 +71,7 @@ import net.yacy.kelondro.util.kelondroException; public class Table implements Index, Iterable { // static tracker objects + private final static ConcurrentLog log = new ConcurrentLog("TABLE"); private final static TreeMap tableTracker = new TreeMap(); private final static long maxarraylength = 134217727L; // (2^27-1) that may be the maximum size of array length in some JVMs @@ -89,7 +90,7 @@ public class Table implements Index, Iterable { final int initialSpace, boolean useTailCache, final boolean exceed134217727, - final boolean warmUp) throws SpaceExceededException { + final boolean warmUp) throws SpaceExceededException, kelondroException { this.rowdef = rowdef; this.buffersize = buffersize; @@ -112,9 +113,10 @@ public class Table implements Index, Iterable { fos = new FileOutputStream(tablefile); } catch (final FileNotFoundException e) { // should not happen - ConcurrentLog.severe("Table", "", e); + log.severe("", e); + } finally { + if (fos != null) try { fos.close(); } catch (final IOException e) {} } - if (fos != null) try { fos.close(); } catch (final IOException e) {} } try { @@ -136,22 +138,22 @@ public class Table implements Index, Iterable { this.table = null; } - ConcurrentLog.info("TABLE", "initialization of " + tablefile.getName() + ". table copy: " + ((this.table == null) ? "no" : "yes") + ", available RAM: " + (MemoryControl.available() / 1024L / 1024L) + "MB, needed: " + (neededRAM4table / 1024L / 1024L) + "MB, allocating space for " + records + " entries"); + if (log.isFine()) log.fine("initialization of " + tablefile.getName() + ". table copy: " + ((this.table == null) ? "no" : "yes") + ", available RAM: " + (MemoryControl.available() / 1024L / 1024L) + "MB, needed: " + (neededRAM4table / 1024L / 1024L) + "MB, allocating space for " + records + " entries"); final long neededRAM4index = 100L * 1024L * 1024L + records * (rowdef.primaryKeyLength + 4L) * 3L / 2L; if (records > 0 && !MemoryControl.request(neededRAM4index, true)) { // despite calculations seemed to show that there is enough memory for the table AND the index // there is now not enough memory left for the index. So delete the table again to free the memory // for the index - ConcurrentLog.severe("TABLE", tablefile.getName() + ": not enough RAM (" + (MemoryControl.available() / 1024L / 1024L) + "MB) left for index, deleting allocated table space to enable index space allocation (needed: " + (neededRAM4index / 1024L / 1024L) + "MB)"); + log.severe(tablefile.getName() + ": not enough RAM (" + (MemoryControl.available() / 1024L / 1024L) + "MB) left for index, deleting allocated table space to enable index space allocation (needed: " + (neededRAM4index / 1024L / 1024L) + "MB)"); this.table = null; System.gc(); - ConcurrentLog.severe("TABLE", tablefile.getName() + ": RAM after releasing the table: " + (MemoryControl.available() / 1024L / 1024L) + "MB"); + log.severe(tablefile.getName() + ": RAM after releasing the table: " + (MemoryControl.available() / 1024L / 1024L) + "MB"); } this.index = new RowHandleMap(rowdef.primaryKeyLength, rowdef.objectOrder, 4, records, tablefile.getAbsolutePath()); final RowHandleMap errors = new RowHandleMap(rowdef.primaryKeyLength, NaturalOrder.naturalOrder, 4, records, tablefile.getAbsolutePath() + ".errors"); - ConcurrentLog.info("TABLE", tablefile + ": TABLE " + tablefile.toString() + " has table copy " + ((this.table == null) ? "DISABLED" : "ENABLED")); + if (log.isFine()) log.fine(tablefile + ": TABLE " + tablefile.toString() + " has table copy " + ((this.table == null) ? "DISABLED" : "ENABLED")); // read all elements from the file into the copy table - ConcurrentLog.info("TABLE", "initializing RAM index for TABLE " + tablefile.getName() + ", please wait."); + if (log.isFine()) log.fine("initializing RAM index for TABLE " + tablefile.getName() + ", please wait."); int i = 0; byte[] key; if (this.table == null) { @@ -211,7 +213,7 @@ public class Table implements Index, Iterable { removeInFile(idx); key = entry.getKey(); if (key == null) continue; - ConcurrentLog.warn("Table", "removing not well-formed entry " + idx + " with key: " + NaturalOrder.arrayList(key, 0, key.length) + ", " + errorcc++ + "/" + errorc); + log.warn("removing not well-formed entry " + idx + " with key: " + NaturalOrder.arrayList(key, 0, key.length) + ", " + errorcc++ + "/" + errorc); } errors.close(); assert this.file.size() == this.index.size() : "file.size() = " + this.file.size() + ", index.size() = " + this.index.size() + ", file = " + filename(); @@ -220,10 +222,10 @@ public class Table implements Index, Iterable { if (!freshFile && warmUp) {warmUp0();} } catch (final FileNotFoundException e) { // should never happen - ConcurrentLog.severe("Table", "", e); + log.severe("", e); throw new kelondroException(e.getMessage()); } catch (final IOException e) { - ConcurrentLog.severe("Table", "", e); + log.severe("", e); throw new kelondroException(e.getMessage()); } @@ -242,7 +244,7 @@ public class Table implements Index, Iterable { //assert index.size() + doubles.size() == i; //System.out.println(" -removed " + doubles.size() + " doubles- done."); if (doubles.isEmpty()) return; - ConcurrentLog.info("TABLE", filename() + ": WARNING - TABLE " + filename() + " has " + doubles.size() + " doubles"); + log.info(filename() + ": WARNING - TABLE " + filename() + " has " + doubles.size() + " doubles"); // from all the doubles take one, put it back to the index and remove the others from the file // first put back one element each final byte[] record = new byte[this.rowdef.objectsize]; @@ -266,9 +268,9 @@ public class Table implements Index, Iterable { removeInFile(top.intValue()); } } catch (final SpaceExceededException e) { - ConcurrentLog.severe("Table", "", e); + log.severe("", e); } catch (final IOException e) { - ConcurrentLog.severe("Table", "", e); + log.severe("", e); } optimize(); } @@ -304,16 +306,16 @@ public class Table implements Index, Iterable { return Records.tableSize(tablefile, recordsize); } catch (final IOException e) { if (!fixIfCorrupted) { - ConcurrentLog.severe("Table", "table size broken for file " + tablefile.toString(), e); + log.severe("table size broken for file " + tablefile.toString(), e); throw new kelondroException(e.getMessage()); } - ConcurrentLog.severe("Table", "table size broken, try to fix " + tablefile.toString()); + log.severe("table size broken, try to fix " + tablefile.toString()); try { Records.fixTableSize(tablefile, recordsize); - ConcurrentLog.info("Table", "successfully fixed table file " + tablefile.toString()); + log.info("successfully fixed table file " + tablefile.toString()); return Records.tableSize(tablefile, recordsize); } catch (final IOException ee) { - ConcurrentLog.severe("Table", "table size fix did not work", ee); + log.severe("table size fix did not work", ee); throw new kelondroException(e.getMessage()); } } @@ -363,7 +365,7 @@ public class Table implements Index, Iterable { try { return this.file.size() == this.index.size(); } catch (final IOException e) { - ConcurrentLog.logException(e); + log.logException(e); return false; } } @@ -461,7 +463,7 @@ public class Table implements Index, Iterable { d.remove(s); removeInFile(s.intValue()); if (System.currentTimeMillis() - lastlog > 30000) { - ConcurrentLog.info("TABLE", "removing " + d.size() + " entries in " + filename()); + log.info("removing " + d.size() + " entries in " + filename()); lastlog = System.currentTimeMillis(); } } @@ -515,7 +517,7 @@ public class Table implements Index, Iterable { this.file.get(i, b, 0); } catch (final IndexOutOfBoundsException e) { // there must be a problem with the table index - ConcurrentLog.severe("Table", "IndexOutOfBoundsException: " + e.getMessage(), e); + log.severe("IndexOutOfBoundsException: " + e.getMessage(), e); this.index.remove(key); if (this.table != null) this.table.remove(key); return null; @@ -704,7 +706,7 @@ public class Table implements Index, Iterable { byte[] pk = lr.getPrimaryKeyBytes(); if (pk == null) { // Table file might be corrupt - ConcurrentLog.warn("TABLE", "Possible corruption found in table " + this.filename() + " detected. i=" + i + ",p=" + p); + log.warn("Possible corruption found in table " + this.filename() + " detected. i=" + i + ",p=" + p); continue; } this.index.put(pk, i); @@ -759,7 +761,7 @@ public class Table implements Index, Iterable { try { this.index.put(k, i); } catch (final SpaceExceededException e) { - ConcurrentLog.logException(e); + log.logException(e); throw new IOException("RowSpaceExceededException: " + e.getMessage()); } } @@ -789,7 +791,7 @@ public class Table implements Index, Iterable { try { this.table.set(i, te); } catch (final SpaceExceededException e) { - ConcurrentLog.logException(e); + log.logException(e); this.table = null; } @@ -895,7 +897,7 @@ public class Table implements Index, Iterable { @Override public boolean isEmpty() { - return this.index.isEmpty(); + return this.index == null || this.index.isEmpty(); } @Override @@ -1011,7 +1013,7 @@ public class Table implements Index, Iterable { try { Table.this.file.get(this.c, b, 0); } catch (final IOException e) { - ConcurrentLog.severe("Table", "", e); + log.severe("", e); return null; } } else { @@ -1116,7 +1118,7 @@ public class Table implements Index, Iterable { } System.out.println("FINISHED test after " + ((System.currentTimeMillis() - start) / 1000) + " seconds."); } catch (final Exception e) { - ConcurrentLog.logException(e); + log.logException(e); System.out.println("TERMINATED"); } } diff --git a/source/net/yacy/repository/LoaderDispatcher.java b/source/net/yacy/repository/LoaderDispatcher.java index 062e2b4bc..8c631e904 100644 --- a/source/net/yacy/repository/LoaderDispatcher.java +++ b/source/net/yacy/repository/LoaderDispatcher.java @@ -395,7 +395,7 @@ public final class LoaderDispatcher { final String supportError = TextParser.supports(url, responseHeader.mime()); if (supportError != null) throw new IOException("no parser support: " + supportError); try { - documents = TextParser.parseSource(url, responseHeader.mime(), responseHeader.getCharacterEncoding(), response.getContent()); + documents = TextParser.parseSource(url, responseHeader.mime(), responseHeader.getCharacterEncoding(), response.depth(), response.getContent()); if (documents == null) throw new IOException("document == null"); } catch (final Exception e) { throw new IOException("parser error: " + e.getMessage()); diff --git a/source/net/yacy/search/Switchboard.java b/source/net/yacy/search/Switchboard.java index 2d37411b3..05b91ae37 100644 --- a/source/net/yacy/search/Switchboard.java +++ b/source/net/yacy/search/Switchboard.java @@ -2546,6 +2546,7 @@ public final class Switchboard extends serverSwitch { new AnchorURL(response.url()), response.getMimeType(), response.getCharacterEncoding(), + response.depth(), response.getContent()); if ( documents == null ) { throw new Parser.Failure("Parser returned null.", response.url()); diff --git a/source/net/yacy/search/index/DocumentIndex.java b/source/net/yacy/search/index/DocumentIndex.java index ea392e561..a62338e1b 100644 --- a/source/net/yacy/search/index/DocumentIndex.java +++ b/source/net/yacy/search/index/DocumentIndex.java @@ -149,7 +149,7 @@ public class DocumentIndex extends Segment { length = -1; } try { - documents = TextParser.parseSource(url, null, null, length, url.getInputStream(ClientIdentification.yacyInternetCrawlerAgent, null, null)); + documents = TextParser.parseSource(url, null, null, 999, length, url.getInputStream(ClientIdentification.yacyInternetCrawlerAgent, null, null)); } catch (final Exception e ) { throw new IOException("cannot parse " + url.toString() + ": " + e.getMessage()); } diff --git a/source/net/yacy/search/index/Segment.java b/source/net/yacy/search/index/Segment.java index b3e6207cc..7825e5b03 100644 --- a/source/net/yacy/search/index/Segment.java +++ b/source/net/yacy/search/index/Segment.java @@ -36,6 +36,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.regex.Pattern; @@ -495,8 +496,13 @@ public class Segment { urlstub = null; } else { final String host = stub.getHost(); - String hh = DigestURL.hosthash(host); - docQueue = this.fulltext.getDefaultConnector().concurrentDocumentsByQuery(CollectionSchema.host_id_s + ":\"" + hh + "\"", CollectionSchema.url_chars_i.getSolrFieldName() + " asc", 0, Integer.MAX_VALUE, maxtime, maxcount, 1, CollectionSchema.id.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName()); + String hh = null; + try { + hh = DigestURL.hosthash(host, stub.getPort()); + } catch (MalformedURLException e) { + ConcurrentLog.logException(e); + } + docQueue = hh == null ? new ArrayBlockingQueue(0) : this.fulltext.getDefaultConnector().concurrentDocumentsByQuery(CollectionSchema.host_id_s + ":\"" + hh + "\"", CollectionSchema.url_chars_i.getSolrFieldName() + " asc", 0, Integer.MAX_VALUE, maxtime, maxcount, 1, CollectionSchema.id.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName()); urlstub = stub.toNormalform(true); } diff --git a/source/net/yacy/search/query/QueryModifier.java b/source/net/yacy/search/query/QueryModifier.java index 9a792b90f..4f14161cd 100644 --- a/source/net/yacy/search/query/QueryModifier.java +++ b/source/net/yacy/search/query/QueryModifier.java @@ -20,6 +20,7 @@ package net.yacy.search.query; +import java.net.MalformedURLException; import java.util.ArrayList; import org.apache.solr.common.params.CommonParams; @@ -27,6 +28,7 @@ import org.apache.solr.common.params.MultiMapSolrParams; import net.yacy.cora.document.id.DigestURL; import net.yacy.cora.util.CommonPattern; +import net.yacy.cora.util.ConcurrentLog; import net.yacy.kelondro.util.ISO639; import net.yacy.search.schema.CollectionSchema; import net.yacy.server.serverObjects; @@ -97,7 +99,12 @@ public class QueryModifier { while ( sitehost.endsWith(".") ) { sitehost = sitehost.substring(0, sitehost.length() - 1); } - sitehash = DigestURL.hosthash(sitehost); + try { + sitehash = DigestURL.hosthash(sitehost, sitehost.startsWith("ftp.") ? 21 : 80); + } catch (MalformedURLException e) { + sitehash = ""; + ConcurrentLog.logException(e); + } add("site:" + sitehost); } diff --git a/source/net/yacy/search/schema/CollectionConfiguration.java b/source/net/yacy/search/schema/CollectionConfiguration.java index a4812096c..abdb61c72 100644 --- a/source/net/yacy/search/schema/CollectionConfiguration.java +++ b/source/net/yacy/search/schema/CollectionConfiguration.java @@ -381,7 +381,8 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri } if ((allAttr || contains(CollectionSchema.crawldepth_i))) { - CollectionSchema.crawldepth_i.add(doc, document.getDepth()); + int depth = document.getDepth(); + CollectionSchema.crawldepth_i.add(doc, depth); } if (allAttr || (contains(CollectionSchema.cr_host_chance_d) && contains(CollectionSchema.cr_host_count_i) && contains(CollectionSchema.cr_host_norm_i))) {