diff --git a/defaults/yacy.logging b/defaults/yacy.logging index ccb88a330..bfca8348e 100644 --- a/defaults/yacy.logging +++ b/defaults/yacy.logging @@ -29,6 +29,7 @@ sun.net.www.protocol.http.HttpURLConnection.level = INFO # Tray sun.awt.level = OFF java.awt.level = OFF +TABLE.level = INFO # List of global handlers handlers = java.util.logging.FileHandler,\ diff --git a/htroot/ConfigPortal.java b/htroot/ConfigPortal.java index 83abe30fa..f0dd5888e 100644 --- a/htroot/ConfigPortal.java +++ b/htroot/ConfigPortal.java @@ -29,6 +29,7 @@ import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; +import java.net.MalformedURLException; import java.util.Properties; import net.yacy.cora.document.id.DigestURL; @@ -99,7 +100,12 @@ public class ConfigPortal { String excludehosts = post.get("search.excludehosts", ""); sb.setConfig("search.excludehosts", excludehosts); - sb.setConfig("search.excludehosth", DigestURL.hosthashes(excludehosts)); + try { + sb.setConfig("search.excludehosth", DigestURL.hosthashes(excludehosts)); + } catch (MalformedURLException e) { + ConcurrentLog.logException(e); + sb.setConfig("search.excludehosth", ""); + } } if (post.containsKey("searchpage_default")) { // load defaults from defaults/yacy.init file diff --git a/htroot/Crawler_p.java b/htroot/Crawler_p.java index 1253cc502..691af66c4 100644 --- a/htroot/Crawler_p.java +++ b/htroot/Crawler_p.java @@ -339,16 +339,20 @@ public class Crawler_p { Set hosthashes = new HashSet(); for (DigestURL u: rootURLs) hosthashes.add(u.hosthash()); sb.index.fulltext().deleteStaleDomainHashes(hosthashes, deleteageDate); + sb.crawlQueues.removeHosts(hosthashes); } } else if (subPath) { siteFilter = CrawlProfile.subpathFilter(rootURLs); if (deleteold) { + Set hosthashes = new HashSet(); for (DigestURL u: rootURLs) { + hosthashes.add(u.hosthash()); String basepath = u.toNormalform(true); if (!basepath.endsWith("/")) {int p = basepath.lastIndexOf("/"); if (p > 0) basepath = basepath.substring(0, p + 1);} int count = sb.index.fulltext().remove(basepath, deleteageDate); if (count > 0) ConcurrentLog.info("Crawler_p", "deleted " + count + " documents for host " + u.getHost()); } + sb.crawlQueues.removeHosts(hosthashes); } } if (CrawlProfile.MATCH_ALL_STRING.equals(newcrawlingMustMatch)) { diff --git a/htroot/IndexCreateQueues_p.html b/htroot/IndexCreateQueues_p.html index 333856553..2de295770 100644 --- a/htroot/IndexCreateQueues_p.html +++ b/htroot/IndexCreateQueues_p.html @@ -65,7 +65,7 @@ #[hostcount]# #[hostdelta]# -  #[hostname]# +  #[hostnameport]# #{list}# diff --git a/htroot/IndexCreateQueues_p.java b/htroot/IndexCreateQueues_p.java index 40b832bdf..3adf3fa04 100644 --- a/htroot/IndexCreateQueues_p.java +++ b/htroot/IndexCreateQueues_p.java @@ -1,15 +1,19 @@ +import java.net.MalformedURLException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Set; import java.util.regex.Pattern; import java.util.regex.PatternSyntaxException; import net.yacy.cora.document.encoding.ASCII; +import net.yacy.cora.document.id.DigestURL; import net.yacy.cora.protocol.RequestHeader; import net.yacy.cora.util.ConcurrentLog; import net.yacy.crawler.CrawlSwitchboard; @@ -75,30 +79,45 @@ public class IndexCreateQueues_p { } } } else { - // iterating through the list of URLs - final Iterator iter = sb.crawlQueues.noticeURL.iterator(stackType); - Request entry; - final List removehashes = new ArrayList(); - while (iter.hasNext()) { - if ((entry = iter.next()) == null) continue; - String value = null; - - location: switch (option) { - case URL: value = (entry.url() == null) ? null : entry.url().toString(); break location; - case ANCHOR: value = entry.name(); break location; - case DEPTH: value = Integer.toString(entry.depth()); break location; - case INITIATOR: - value = (entry.initiator() == null || entry.initiator().length == 0) ? "proxy" : ASCII.String(entry.initiator()); - break location; - case MODIFIED: value = daydate(entry.appdate()); break location; - default: value = null; break location; + int removedByHosts = 0; + if (option == URL && deletepattern.startsWith(".*") && deletepattern.endsWith(".*")) { + // try to delete that using the host name + Set hosthashes = new HashSet(); + String hn = deletepattern.substring(2, deletepattern.length() - 2); + try { + hosthashes.add(DigestURL.hosthash(hn, hn.startsWith("ftp") ? 21 : 80)); + hosthashes.add(DigestURL.hosthash(hn, 443)); + removedByHosts = sb.crawlQueues.removeHosts(hosthashes); + } catch (MalformedURLException e) { } - - if (value != null && compiledPattern.matcher(value).matches()) removehashes.add(entry.url().hash()); } - ConcurrentLog.info("IndexCreateQueues_p", "created a remove list with " + removehashes.size() + " entries for pattern '" + deletepattern + "'"); - for (final byte[] b: removehashes) { - sb.crawlQueues.noticeURL.removeByURLHash(b); + + if (removedByHosts == 0) { + // iterating through the list of URLs + final Iterator iter = sb.crawlQueues.noticeURL.iterator(stackType); + Request entry; + final List removehashes = new ArrayList(); + while (iter.hasNext()) { + if ((entry = iter.next()) == null) continue; + String value = null; + + location: switch (option) { + case URL: value = (entry.url() == null) ? null : entry.url().toString(); break location; + case ANCHOR: value = entry.name(); break location; + case DEPTH: value = Integer.toString(entry.depth()); break location; + case INITIATOR: + value = (entry.initiator() == null || entry.initiator().length == 0) ? "proxy" : ASCII.String(entry.initiator()); + break location; + case MODIFIED: value = daydate(entry.appdate()); break location; + default: value = null; break location; + } + + if (value != null && compiledPattern.matcher(value).matches()) removehashes.add(entry.url().hash()); + } + ConcurrentLog.info("IndexCreateQueues_p", "created a remove list with " + removehashes.size() + " entries for pattern '" + deletepattern + "'"); + for (final byte[] b: removehashes) { + sb.crawlQueues.noticeURL.removeByURLHash(b); + } } } } catch (final PatternSyntaxException e) { @@ -121,13 +140,17 @@ public class IndexCreateQueues_p { int hc = 0; for (Map.Entry host: hosts.entrySet()) { - prop.putHTML("crawler_host_" + hc + "_hostname", host.getKey()); + String hostnameport = host.getKey(); + int p = hostnameport.lastIndexOf(':'); + String hostname = p < 0 ? hostnameport : hostnameport.substring(0, p); + prop.putHTML("crawler_host_" + hc + "_hostnameport", hostnameport); + prop.putHTML("crawler_host_" + hc + "_hostname", hostname); prop.put("crawler_host_" + hc + "_embed", embed ? 1 : 0); prop.put("crawler_host_" + hc + "_urlsPerHost", urlsPerHost); prop.putHTML("crawler_host_" + hc + "_queuename", stackType.name()); prop.put("crawler_host_" + hc + "_hostcount", host.getValue()[0]); prop.put("crawler_host_" + hc + "_hostdelta", host.getValue()[1] == Integer.MIN_VALUE ? "not accessed" : Integer.toString(host.getValue()[1])); - List domainStackReferences = sb.crawlQueues.noticeURL.getDomainStackReferences(stackType, host.getKey(), urlsPerHost, 10000); + List domainStackReferences = sb.crawlQueues.noticeURL.getDomainStackReferences(stackType, hostname, urlsPerHost, 10000); Seed initiator; String profileHandle; @@ -138,9 +161,11 @@ public class IndexCreateQueues_p { initiator = sb.peers.getConnected(request.initiator() == null ? "" : ASCII.String(request.initiator())); profileHandle = request.profileHandle(); profileEntry = profileHandle == null ? null : sb.crawler.getActive(profileHandle.getBytes()); + String depthString = Integer.toString(request.depth()); + while (depthString.length() < 4) depthString = "0" + depthString; prop.putHTML("crawler_host_" + hc + "_list_" + count + "_initiator", ((initiator == null) ? "proxy" : initiator.getName()) ); prop.put("crawler_host_" + hc + "_list_" + count + "_profile", ((profileEntry == null) ? "unknown" : profileEntry.collectionName())); - prop.put("crawler_host_" + hc + "_list_" + count + "_depth", request.depth()); + prop.putHTML("crawler_host_" + hc + "_list_" + count + "_depth", depthString); prop.put("crawler_host_" + hc + "_list_" + count + "_modified", daydate(request.appdate()) ); prop.putHTML("crawler_host_" + hc + "_list_" + count + "_anchor", request.name()); prop.putHTML("crawler_host_" + hc + "_list_" + count + "_url", request.url().toNormalform(true)); diff --git a/source/net/yacy/cora/document/id/DigestURL.java b/source/net/yacy/cora/document/id/DigestURL.java index 887aa8a7d..74a05e2b2 100644 --- a/source/net/yacy/cora/document/id/DigestURL.java +++ b/source/net/yacy/cora/document/id/DigestURL.java @@ -36,7 +36,6 @@ import net.yacy.cora.order.Digest; import net.yacy.cora.protocol.Domains; import net.yacy.cora.util.ByteArray; import net.yacy.cora.util.CommonPattern; -import net.yacy.cora.util.ConcurrentLog; /** * URI-object providing YaCy-hash computation @@ -57,18 +56,19 @@ public class DigestURL extends MultiProtocolURL implements Serializable { /** * Shortcut, calculate hash for shorted url/hostname * @param host + * @param port * @return */ - public static String hosthash(final String host) { + public static String hosthash(final String host, final int port) throws MalformedURLException { String h = host; - if (!h.startsWith("http://")) h = "http://" + h; - DigestURL url = null; - try { - url = new DigestURL(h); - } catch (final MalformedURLException e) { - ConcurrentLog.logException(e); - return null; + if (h.indexOf("//") < 0) { + if (port == 80 || port == 8080 || port == 8090) h = "http://" + h; + else if (port == 443) h = "https://" + h; + else if (port == 21 || port == 2121) h = "ftp://" + h; + else if (port > 999) h = "http://" + h + ":" + port; + else h = "http://" + h; } + DigestURL url = new DigestURL(h); return (url == null) ? null : ASCII.String(url.hash(), 6, 6); } @@ -77,15 +77,16 @@ public class DigestURL extends MultiProtocolURL implements Serializable { * the list is separated by comma * @param hostlist * @return list of host hashes without separation + * @throws MalformedURLException */ - public static String hosthashes(final String hostlist) { + public static String hosthashes(final String hostlist) throws MalformedURLException { String[] hs = CommonPattern.COMMA.split(hostlist); StringBuilder sb = new StringBuilder(hostlist.length()); for (String h: hs) { if (h == null) continue; h = h.trim(); if (h.isEmpty()) continue; - h = hosthash(h); + h = hosthash(h, h.startsWith("ftp.") ? 21 : 80); if (h == null || h.length() != 6) continue; sb.append(h); } diff --git a/source/net/yacy/crawler/Balancer.java b/source/net/yacy/crawler/Balancer.java index a8b236579..35b70aecb 100644 --- a/source/net/yacy/crawler/Balancer.java +++ b/source/net/yacy/crawler/Balancer.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import net.yacy.cora.storage.HandleSet; import net.yacy.cora.util.SpaceExceededException; @@ -62,6 +63,13 @@ public interface Balancer { */ public int removeAllByProfileHandle(final String profileHandle, final long timeout) throws IOException, SpaceExceededException; + /** + * delete all urls which are stored for given host hashes + * @param hosthashes + * @return number of deleted urls + */ + public int removeAllByHostHashes(final Set hosthashes); + /** * @param urlHashes, a list of hashes that shall be removed * @return number of entries that had been removed diff --git a/source/net/yacy/crawler/HostBalancer.java b/source/net/yacy/crawler/HostBalancer.java new file mode 100644 index 000000000..49e234f9f --- /dev/null +++ b/source/net/yacy/crawler/HostBalancer.java @@ -0,0 +1,372 @@ +/** + * HostQueues + * Copyright 2013 by Michael Christen + * First released 24.09.2013 at http://yacy.net + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program in the file lgpl21.txt + * If not, see . + */ + +package net.yacy.crawler; + +import java.io.File; +import java.io.IOException; +import java.net.MalformedURLException; +import java.util.ArrayList; +import java.util.ConcurrentModificationException; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; + +import net.yacy.cora.document.encoding.ASCII; +import net.yacy.cora.document.id.DigestURL; +import net.yacy.cora.order.Base64Order; +import net.yacy.cora.protocol.ClientIdentification; +import net.yacy.cora.storage.HandleSet; +import net.yacy.cora.util.ConcurrentLog; +import net.yacy.cora.util.SpaceExceededException; +import net.yacy.crawler.data.CrawlProfile; +import net.yacy.crawler.data.Latency; +import net.yacy.crawler.retrieval.Request; +import net.yacy.crawler.robots.RobotsTxt; +import net.yacy.kelondro.data.word.Word; +import net.yacy.kelondro.index.RowHandleSet; + +/** + * wrapper for single HostQueue queues; this is a collection of such queues. + * All these queues are stored in a common directory for the queue stacks. + * + * ATTENTION: the order of urls returned by this balancer must strictly follow the clickdepth order. + * That means that all links from a given host must be returned from the lowest crawldepth only. + * The crawldepth is interpreted as clickdepth and the crawler is producing that semantic using a + * correct crawl ordering. + */ +public class HostBalancer implements Balancer { + + + private final File hostsPath; + private final boolean exceed134217727; + private final Map queues; + private final Set roundRobinHostHashes; + private HandleSet urlHashDoubleCheck; + + public HostBalancer( + final File hostsPath, + final boolean exceed134217727) { + this.hostsPath = hostsPath; + this.exceed134217727 = exceed134217727; + this.urlHashDoubleCheck = new RowHandleSet(Word.commonHashLength, Word.commonHashOrder, 0); + + // create a stack for newly entered entries + if (!(hostsPath.exists())) hostsPath.mkdirs(); // make the path + this.queues = new ConcurrentHashMap(); + String[] list = this.hostsPath.list(); + for (String address: list) try { + File queuePath = new File(this.hostsPath, address); + HostQueue queue = new HostQueue(queuePath, this.queues.size() > 100, this.exceed134217727); + if (queue.size() == 0) { + queue.close(); + queuePath.delete(); + } else { + this.queues.put(DigestURL.hosthash(queue.getHost(), queue.getPort()), queue); + } + } catch (MalformedURLException e) { + ConcurrentLog.logException(e); + } + this.roundRobinHostHashes = new HashSet(); + } + + @Override + public synchronized void close() { + if (this.urlHashDoubleCheck != null) { + this.urlHashDoubleCheck.clear(); + } + for (HostQueue queue: this.queues.values()) queue.close(); + this.queues.clear(); + } + + @Override + public void clear() { + if (this.urlHashDoubleCheck != null) { + this.urlHashDoubleCheck.clear(); + } + for (HostQueue queue: this.queues.values()) queue.clear(); + this.queues.clear(); + } + + @Override + public Request get(final byte[] urlhash) throws IOException { + String hosthash = ASCII.String(urlhash, 6, 6); + HostQueue queue = this.queues.get(hosthash); + if (queue == null) return null; + return queue.get(urlhash); + } + + @Override + public int removeAllByProfileHandle(final String profileHandle, final long timeout) throws IOException, SpaceExceededException { + int c = 0; + for (HostQueue queue: this.queues.values()) c += queue.removeAllByProfileHandle(profileHandle, timeout); + return c; + } + + /** + * delete all urls which are stored for given host hashes + * @param hosthashes + * @return number of deleted urls + */ + @Override + public int removeAllByHostHashes(final Set hosthashes) { + int c = 0; + for (String h: hosthashes) { + HostQueue hq = this.queues.get(h); + if (hq != null) c += hq.removeAllByHostHashes(hosthashes); + } + // remove from cache + Iterator i = this.urlHashDoubleCheck.iterator(); + ArrayList deleteHashes = new ArrayList(); + while (i.hasNext()) { + String h = ASCII.String(i.next()); + if (hosthashes.contains(h.substring(6))) deleteHashes.add(h); + } + for (String h: deleteHashes) this.urlHashDoubleCheck.remove(ASCII.getBytes(h)); + return c; + } + + @Override + public synchronized int remove(final HandleSet urlHashes) throws IOException { + Map removeLists = new ConcurrentHashMap(); + for (byte[] urlhash: urlHashes) { + this.urlHashDoubleCheck.remove(urlhash); + String hosthash = ASCII.String(urlhash, 6, 6); + HandleSet removeList = removeLists.get(hosthash); + if (removeList == null) { + removeList = new RowHandleSet(Word.commonHashLength, Base64Order.enhancedCoder, 100); + removeLists.put(hosthash, removeList); + } + try {removeList.put(urlhash);} catch (SpaceExceededException e) {} + } + int c = 0; + for (Map.Entry entry: removeLists.entrySet()) { + HostQueue queue = this.queues.get(entry.getKey()); + if (queue != null) c += queue.remove(entry.getValue()); + } + return c; + } + + @Override + public boolean has(final byte[] urlhashb) { + if (this.urlHashDoubleCheck.has(urlhashb)) return true; + String hosthash = ASCII.String(urlhashb, 6, 6); + HostQueue queue = this.queues.get(hosthash); + if (queue == null) return false; + return queue.has(urlhashb); + } + + @Override + public int size() { + int c = 0; + for (HostQueue queue: this.queues.values()) c += queue.size(); + return c; + } + + @Override + public boolean isEmpty() { + for (HostQueue queue: this.queues.values()) if (!queue.isEmpty()) return false; + return true; + } + + /** + * push a request to one of the host queues. If the queue does not exist, it is created + * @param entry + * @param profile + * @param robots + * @return null if everything is ok or a string with an error message if the push is not allowed according to the crawl profile or robots + * @throws IOException + * @throws SpaceExceededException + */ + @Override + public synchronized String push(final Request entry, CrawlProfile profile, final RobotsTxt robots) throws IOException, SpaceExceededException { + if (this.has(entry.url().hash())) return "double occurrence"; + this.urlHashDoubleCheck.put(entry.url().hash()); + String hosthash = ASCII.String(entry.url().hash(), 6, 6); + HostQueue queue = this.queues.get(hosthash); + if (queue == null) { + queue = new HostQueue(this.hostsPath, entry.url().getHost(), entry.url().getPort(), this.queues.size() > 100, this.exceed134217727); + this.queues.put(hosthash, queue); + } + return queue.push(entry, profile, robots); + } + + /** + * get the next entry in this crawl queue in such a way that the domain access time delta is maximized + * and always above the given minimum delay time. An additional delay time is computed using the robots.txt + * crawl-delay time which is always respected. In case the minimum time cannot ensured, this method pauses + * the necessary time until the url is released and returned as CrawlEntry object. In case that a profile + * for the computed Entry does not exist, null is returned + * @param delay true if the requester demands forced delays using explicit thread sleep + * @param profile + * @return a url in a CrawlEntry object + * @throws IOException + * @throws SpaceExceededException + */ + @Override + public synchronized Request pop(boolean delay, CrawlSwitchboard cs, RobotsTxt robots) throws IOException { + tryagain: while (true) try { + if (this.roundRobinHostHashes.size() == 0) { + // select all queues on the lowest crawldepth level; that means: first look for the lowest level + int lowestCrawldepth = Integer.MAX_VALUE; + for (HostQueue hq: this.queues.values()) { + int lsd = hq.getLowestStackDepth(); + if (lsd < lowestCrawldepth) lowestCrawldepth = lsd; + } + // now add only such stacks which have the lowest level + for (Map.Entry entry: this.queues.entrySet()) { + if (entry.getValue().getLowestStackDepth() == lowestCrawldepth) this.roundRobinHostHashes.add(entry.getKey()); + } + // emergency case if this fails + if (this.roundRobinHostHashes.size() == 0) { + //assert this.queues.size() == 0; // thats the only case where that should happen + this.roundRobinHostHashes.addAll(this.queues.keySet()); + } + // if there are stacks with less than 10 entries, remove all stacks with more than 10 entries + // this shall kick out small stacks to prevent that too many files are opened for very wide crawls + boolean smallStacksExist = false; + smallsearch: for (String s: this.roundRobinHostHashes) { + HostQueue hq = this.queues.get(s); + if (hq != null && hq.size() <= 10) {smallStacksExist = true; break smallsearch;} + } + if (smallStacksExist) { + Iterator i = this.roundRobinHostHashes.iterator(); + while (i.hasNext()) { + String s = i.next(); + HostQueue hq = this.queues.get(s); + if (hq != null && hq.size() > 10) {i.remove();} + } + } + } + if (this.roundRobinHostHashes.size() == 0) return null; + + // first strategy: get one entry which does not need sleep time + for (String nextHH: this.roundRobinHostHashes) { + HostQueue hq = this.queues.get(nextHH); + int delta = Latency.waitingRemainingGuessed(hq.getHost(), DigestURL.hosthash(hq.getHost(), hq.getPort()), robots, ClientIdentification.yacyInternetCrawlerAgent); + if (delta <= 10) { + this.roundRobinHostHashes.remove(nextHH); + Request request = hq == null ? null : hq.pop(delay, cs, robots); + int size = hq == null ? 0 : hq.size(); + if (size == 0) { + hq.close(); + this.queues.remove(nextHH); + } + if (request != null) return request; + } + } + + // second strategy: take from the largest stack and clean round robin cache + int largest = Integer.MIN_VALUE; + String nextHH = null; + for (String h: this.roundRobinHostHashes) { + HostQueue hq = this.queues.get(h); + if (hq != null) { + int s = hq.size(); + if (s > largest) { + largest = s; + nextHH = h; + } + } + } + this.roundRobinHostHashes.clear(); // start from the beginning next time + HostQueue hq = this.queues.get(nextHH); + Request request = hq == null ? null : hq.pop(delay, cs, robots); + if (hq != null && hq.size() == 0) { + hq.close(); + this.queues.remove(nextHH); + } + return request; + } catch (ConcurrentModificationException e) { + continue tryagain; + } catch (Throwable e) { + throw new IOException(e.getMessage()); + } + } + + @Override + public Iterator iterator() throws IOException { + final Iterator hostsIterator = this.queues.values().iterator(); + @SuppressWarnings("unchecked") + final Iterator[] hostIterator = new Iterator[1]; + hostIterator[0] = null; + return new Iterator() { + @Override + public boolean hasNext() { + return hostsIterator.hasNext() || (hostIterator[0] != null && hostIterator[0].hasNext()); + } + @Override + public Request next() { + synchronized (HostBalancer.this) { + while (hostIterator[0] == null || !hostIterator[0].hasNext()) try { + HostQueue entry = hostsIterator.next(); + hostIterator[0] = entry.iterator(); + } catch (IOException e) {} + if (!hostIterator[0].hasNext()) return null; + return hostIterator[0].next(); + } + } + @Override + public void remove() { + hostIterator[0].remove(); + } + }; + } + + /** + * get a list of domains that are currently maintained as domain stacks + * @return a map of clear text strings of host names to an integer array: {the size of the domain stack, guessed delta waiting time} + */ + @Override + public Map getDomainStackHosts(RobotsTxt robots) { + Map map = new TreeMap(); // we use a tree map to get a stable ordering + for (HostQueue hq: this.queues.values()) try { + int delta = Latency.waitingRemainingGuessed(hq.getHost(), DigestURL.hosthash(hq.getHost(), hq.getPort()), robots, ClientIdentification.yacyInternetCrawlerAgent); + map.put(hq.getHost() + ":" + hq.getPort(), new Integer[]{hq.size(), delta}); + } catch (MalformedURLException e) { + ConcurrentLog.logException(e); + } + return map; + } + + /** + * get lists of crawl request entries for a specific host + * @param host + * @param maxcount + * @param maxtime + * @return a list of crawl loader requests + */ + @Override + public List getDomainStackReferences(String host, int maxcount, long maxtime) { + try { + HostQueue hq = this.queues.get(DigestURL.hosthash(host, host.startsWith("ftp.") ? 21 : 80)); + if (hq == null) hq = this.queues.get(DigestURL.hosthash(host, 443)); + return hq == null ? new ArrayList(0) : hq.getDomainStackReferences(host, maxcount, maxtime); + } catch (MalformedURLException e) { + ConcurrentLog.logException(e); + return null; + } + } + +} diff --git a/source/net/yacy/crawler/HostQueue.java b/source/net/yacy/crawler/HostQueue.java index 9d7c51df3..25e2ce3a4 100644 --- a/source/net/yacy/crawler/HostQueue.java +++ b/source/net/yacy/crawler/HostQueue.java @@ -22,114 +22,298 @@ package net.yacy.crawler; import java.io.File; import java.io.IOException; +import java.net.MalformedURLException; +import java.util.ArrayList; import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; import net.yacy.cora.document.encoding.ASCII; +import net.yacy.cora.document.encoding.UTF8; +import net.yacy.cora.document.id.DigestURL; import net.yacy.cora.order.Base64Order; +import net.yacy.cora.protocol.ClientIdentification; import net.yacy.cora.storage.HandleSet; import net.yacy.cora.util.ConcurrentLog; import net.yacy.cora.util.SpaceExceededException; import net.yacy.crawler.data.CrawlProfile; +import net.yacy.crawler.data.Latency; import net.yacy.crawler.retrieval.Request; import net.yacy.crawler.robots.RobotsTxt; import net.yacy.kelondro.data.word.Word; import net.yacy.kelondro.index.BufferedObjectIndex; +import net.yacy.kelondro.index.Index; +import net.yacy.kelondro.index.OnDemandOpenFileIndex; import net.yacy.kelondro.index.Row; import net.yacy.kelondro.index.RowHandleSet; import net.yacy.kelondro.table.Table; -import net.yacy.kelondro.util.MemoryControl; +import net.yacy.kelondro.util.kelondroException; import net.yacy.repository.Blacklist.BlacklistType; import net.yacy.search.Switchboard; -public class HostQueue { +public class HostQueue implements Balancer { public static final String indexSuffix = ".stack"; private static final int EcoFSBufferSize = 1000; private static final int objectIndexBufferSize = 1000; - private static final int MAX_DOUBLE_PUSH_CHECK = 100000; - private final String hostHash; - private final File queuesPath; - private BufferedObjectIndex requestStack; - private HandleSet urlHashDoubleCheck; + private final File hostPath; + private final String hostName; + private String hostHash; + private final int port; + private final boolean exceed134217727; + private final boolean onDemand; + private TreeMap depthStacks; - public HostQueue( - final File queuesPath, - final String hostHash, - final boolean useTailCache, + public HostQueue ( + final File hostsPath, + final String hostName, + final int port, + final boolean onDemand, final boolean exceed134217727) { - this.hostHash = hostHash; - this.queuesPath = queuesPath; - this.urlHashDoubleCheck = new RowHandleSet(Word.commonHashLength, Word.commonHashOrder, 0); - - // create a stack for newly entered entries - if (!(this.queuesPath.exists())) this.queuesPath.mkdir(); // make the path - this.queuesPath.mkdirs(); - final File f = new File(this.queuesPath, this.hostHash + indexSuffix); + this.onDemand = onDemand; + this.exceed134217727 = exceed134217727; + this.hostName = hostName; + this.port = port; + this.hostPath = new File(hostsPath, this.hostName + "." + this.port); + init(); + } + + public HostQueue ( + final File hostPath, + final boolean onDemand, + final boolean exceed134217727) { + this.onDemand = onDemand; + this.exceed134217727 = exceed134217727; + this.hostPath = hostPath; + // parse the hostName and port from the file name + String filename = hostPath.getName(); + int p = filename.lastIndexOf('.'); + this.hostName = filename.substring(0, p); + this.port = Integer.parseInt(filename.substring(p + 1)); + init(); + } + + private final void init() { try { - this.requestStack = new BufferedObjectIndex(new Table(f, Request.rowdef, EcoFSBufferSize, 0, useTailCache, exceed134217727, true), objectIndexBufferSize); - } catch (final SpaceExceededException e) { - try { - this.requestStack = new BufferedObjectIndex(new Table(f, Request.rowdef, 0, 0, false, exceed134217727, true), objectIndexBufferSize); - } catch (final SpaceExceededException e1) { - ConcurrentLog.logException(e1); + this.hostHash = DigestURL.hosthash(this.hostName, this.port); + } catch (MalformedURLException e) { + this.hostHash = ""; + } + if (!(this.hostPath.exists())) this.hostPath.mkdirs(); + this.depthStacks = new TreeMap(); + int size = openAllStacks(); + ConcurrentLog.info("Balancer", "opened HostQueue " + this.hostPath.getAbsolutePath() + " with " + size + " urls."); + } + + public String getHost() { + return this.hostName; + } + + public int getPort() { + return this.port; + } + + private int openAllStacks() { + String[] l = this.hostPath.list(); + int c = 0; + if (l != null) for (String s: l) { + if (s.endsWith(indexSuffix)) try { + int depth = Integer.parseInt(s.substring(0, s.length() - indexSuffix.length())); + File stackFile = new File(this.hostPath, s); + Index depthStack = openStack(stackFile, depth); + if (depthStack != null) { + int sz = depthStack.size(); + if (sz == 0) { + depthStack.close(); + stackFile.delete(); + } else { + this.depthStacks.put(depth, depthStack); + c += sz; + } + } + } catch (NumberFormatException e) {} + } + return c; + } + + public synchronized int getLowestStackDepth() { + while (this.depthStacks.size() > 0) { + Map.Entry entry; + synchronized (this) { + entry = this.depthStacks.firstEntry(); + } + if (entry == null) return 0; // happens only if map is empty + if (entry.getValue().size() == 0) { + entry.getValue().close(); + getFile(entry.getKey()).delete(); + this.depthStacks.remove(entry.getKey()); + continue; } + return entry.getKey(); } - ConcurrentLog.info("Balancer", "opened balancer file with " + this.requestStack.size() + " entries from " + f.toString()); + // this should not happen but it happens if a deletion is done + //assert false; + return 0; } - public synchronized void close() { - int sizeBeforeClose = this.size(); - if (this.urlHashDoubleCheck != null) { - this.urlHashDoubleCheck.clear(); - this.urlHashDoubleCheck = null; + private Index getLowestStack() { + while (this.depthStacks.size() > 0) { + Map.Entry entry; + synchronized (this) { + entry = this.depthStacks.firstEntry(); + } + if (entry == null) return null; // happens only if map is empty + if (entry.getValue().size() == 0) { + entry.getValue().close(); + getFile(entry.getKey()).delete(); + this.depthStacks.remove(entry.getKey()); + continue; + } + return entry.getValue(); } - if (this.requestStack != null) { - this.requestStack.close(); - this.requestStack = null; + // this should not happen + //assert false; + return null; + } + + private Index getStack(int depth) { + Index depthStack; + synchronized (this) { + depthStack = this.depthStacks.get(depth); + if (depthStack != null) return depthStack; + } + // create a new stack + synchronized (this) { + // check again + depthStack = this.depthStacks.get(depth); + if (depthStack != null) return depthStack; + // now actually create a new stack + final File f = getFile(depth); + depthStack = openStack(f, depth); + if (depthStack != null) this.depthStacks.put(depth, depthStack); } - if (sizeBeforeClose == 0) { - // clean up - new File(this.queuesPath, this.hostHash + indexSuffix).delete(); + return depthStack; + } + + private File getFile(int depth) { + String name = Integer.toString(depth); + while (name.length() < 4) name = "0" + name; + final File f = new File(this.hostPath, name + indexSuffix); + return f; + } + + private Index openStack(File f, int depth) { + for (int i = 0; i < 10; i++) { + // we try that again if it fails because it shall not fail + if (this.onDemand && depth > 3 && (!f.exists() || f.length() < 10000)) { + try { + return new BufferedObjectIndex(new OnDemandOpenFileIndex(f, Request.rowdef, exceed134217727), objectIndexBufferSize); + } catch (kelondroException e) { + // possibly the file was closed meanwhile + ConcurrentLog.logException(e); + } + } else { + try { + return new BufferedObjectIndex(new Table(f, Request.rowdef, EcoFSBufferSize, 0, false, exceed134217727, true), objectIndexBufferSize); + } catch (final SpaceExceededException e) { + try { + return new BufferedObjectIndex(new Table(f, Request.rowdef, 0, 0, false, exceed134217727, true), objectIndexBufferSize); + } catch (final SpaceExceededException e1) { + ConcurrentLog.logException(e1); + } + } catch (kelondroException e) { + // possibly the file was closed meanwhile + ConcurrentLog.logException(e); + } + } } + return null; } - public void clear() { - try { - this.requestStack.clear(); - } catch (final IOException e) { - ConcurrentLog.logException(e); + @Override + public synchronized void close() { + for (Map.Entry entry: this.depthStacks.entrySet()) { + int size = entry.getValue().size(); + entry.getValue().close(); + if (size == 0) getFile(entry.getKey()).delete(); + } + this.depthStacks.clear(); + String[] l = this.hostPath.list(); + if ((l == null || l.length == 0) && this.hostPath != null) this.hostPath.delete(); + } + + @Override + public synchronized void clear() { + for (Map.Entry entry: this.depthStacks.entrySet()) { + entry.getValue().close(); + getFile(entry.getKey()).delete(); + } + this.depthStacks.clear(); + String[] l = this.hostPath.list(); + for (String s: l) { + new File(this.hostPath, s).delete(); } - this.urlHashDoubleCheck.clear(); + this.hostPath.delete(); } + @Override public Request get(final byte[] urlhash) throws IOException { assert urlhash != null; - if (this.requestStack == null) return null; // case occurs during shutdown - final Row.Entry entry = this.requestStack.get(urlhash, false); - if (entry == null) return null; - return new Request(entry); + if (this.depthStacks == null) return null; // case occurs during shutdown + for (Index depthStack: this.depthStacks.values()) { + final Row.Entry entry = depthStack.get(urlhash, false); + if (entry == null) return null; + return new Request(entry); + } + return null; } + @Override public int removeAllByProfileHandle(final String profileHandle, final long timeout) throws IOException, SpaceExceededException { // first find a list of url hashes that shall be deleted - final HandleSet urlHashes = new RowHandleSet(this.requestStack.row().primaryKeyLength, Base64Order.enhancedCoder, 100); final long terminate = timeout == Long.MAX_VALUE ? Long.MAX_VALUE : (timeout > 0) ? System.currentTimeMillis() + timeout : Long.MAX_VALUE; + int count = 0; synchronized (this) { - final Iterator i = this.requestStack.rows(); - Row.Entry rowEntry; - Request crawlEntry; - while (i.hasNext() && (System.currentTimeMillis() < terminate)) { - rowEntry = i.next(); - crawlEntry = new Request(rowEntry); - if (crawlEntry.profileHandle().equals(profileHandle)) { - urlHashes.put(crawlEntry.url().hash()); + for (Index depthStack: this.depthStacks.values()) { + final HandleSet urlHashes = new RowHandleSet(Word.commonHashLength, Base64Order.enhancedCoder, 100); + final Iterator i = depthStack.rows(); + Row.Entry rowEntry; + Request crawlEntry; + while (i.hasNext() && (System.currentTimeMillis() < terminate)) { + rowEntry = i.next(); + crawlEntry = new Request(rowEntry); + if (crawlEntry.profileHandle().equals(profileHandle)) { + urlHashes.put(crawlEntry.url().hash()); + } + if (System.currentTimeMillis() > terminate) break; + } + for (final byte[] urlhash: urlHashes) { + depthStack.remove(urlhash); + count++; } } } - - // then delete all these urls from the queues and the file index - return remove(urlHashes); + return count; + } + + /** + * delete all urls which are stored for given host hashes + * @param hosthashes + * @return number of deleted urls + */ + @Override + public int removeAllByHostHashes(final Set hosthashes) { + for (String h: hosthashes) { + if (this.hostHash.equals(h)) { + int s = this.size(); + this.clear(); + return s; + } + } + return 0; } /** @@ -138,43 +322,53 @@ public class HostQueue { * @return number of entries that had been removed * @throws IOException */ + @Override public synchronized int remove(final HandleSet urlHashes) throws IOException { - final int s = this.requestStack.size(); int removedCounter = 0; - for (final byte[] urlhash: urlHashes) { - final Row.Entry entry = this.requestStack.remove(urlhash); - if (entry != null) removedCounter++; - - // remove from double-check caches - this.urlHashDoubleCheck.remove(urlhash); + for (Index depthStack: this.depthStacks.values()) { + final int s = depthStack.size(); + for (final byte[] urlhash: urlHashes) { + final Row.Entry entry = depthStack.remove(urlhash); + if (entry != null) removedCounter++; + } + if (removedCounter == 0) return 0; + assert depthStack.size() + removedCounter == s : "urlFileIndex.size() = " + depthStack.size() + ", s = " + s; } - if (removedCounter == 0) return 0; - assert this.requestStack.size() + removedCounter == s : "urlFileIndex.size() = " + this.requestStack.size() + ", s = " + s; return removedCounter; } + @Override public boolean has(final byte[] urlhashb) { - return this.requestStack.has(urlhashb) || this.urlHashDoubleCheck.has(urlhashb); + for (Index depthStack: this.depthStacks.values()) { + if (depthStack.has(urlhashb)) return true; + } + return false; } + @Override public int size() { - return this.requestStack.size(); + int size = 0; + for (Index depthStack: this.depthStacks.values()) { + size += depthStack.size(); + } + return size; } + @Override public boolean isEmpty() { - return this.requestStack.isEmpty(); + for (Index depthStack: this.depthStacks.values()) { + if (!depthStack.isEmpty()) return false; + } + return true; } + @Override public String push(final Request entry, CrawlProfile profile, final RobotsTxt robots) throws IOException, SpaceExceededException { assert entry != null; final byte[] hash = entry.url().hash(); synchronized (this) { // double-check - if (this.urlHashDoubleCheck.has(hash)) return "double occurrence in double_push_check"; - if (this.requestStack.has(hash)) return "double occurrence in urlFileIndex"; - - if (this.urlHashDoubleCheck.size() > MAX_DOUBLE_PUSH_CHECK || MemoryControl.shortStatus()) this.urlHashDoubleCheck.clear(); - this.urlHashDoubleCheck.put(hash); + if (this.has(hash)) return "double occurrence in urlFileIndex"; // increase dom counter if (profile != null && profile.domMaxPages() != Integer.MAX_VALUE && profile.domMaxPages() > 0) { @@ -182,75 +376,155 @@ public class HostQueue { } // add to index - final int s = this.requestStack.size(); - this.requestStack.put(entry.toRow()); - assert s < this.requestStack.size() : "hash = " + ASCII.String(hash) + ", s = " + s + ", size = " + this.requestStack.size(); - assert this.requestStack.has(hash) : "hash = " + ASCII.String(hash); - - // add the hash to a queue if the host is unknown to get this fast into the balancer - // now disabled to prevent that a crawl 'freezes' to a specific domain which hosts a lot of pages; the queues are filled anyway - //if (!this.domainStacks.containsKey(entry.url().getHost())) pushHashToDomainStacks(entry.url().getHost(), entry.url().hash()); + Index depthStack = getStack(entry.depth()); + final int s = depthStack.size(); + depthStack.put(entry.toRow()); + assert s < depthStack.size() : "hash = " + ASCII.String(hash) + ", s = " + s + ", size = " + depthStack.size(); + assert depthStack.has(hash) : "hash = " + ASCII.String(hash); } robots.ensureExist(entry.url(), profile.getAgent(), true); // concurrently load all robots.txt return null; } - - public Request pop() throws IOException { - // returns a crawl entry from the stack and ensures minimum delta times + + @Override + public Request pop(boolean delay, CrawlSwitchboard cs, RobotsTxt robots) throws IOException { + // returns a crawl entry from the stack and ensures minimum delta times + long sleeptime = 0; Request crawlEntry = null; - while (!this.requestStack.isEmpty()) { - synchronized (this) { - Row.Entry rowEntry = this.requestStack.removeOne(); - if (rowEntry == null) return null; + CrawlProfile profileEntry = null; + synchronized (this) { + mainloop: while (true) { + Index depthStack = getLowestStack(); + if (depthStack == null) return null; + Row.Entry rowEntry = null; + while (depthStack.size() > 0) { + rowEntry = depthStack.removeOne(); + if (rowEntry != null) break; + } + if (rowEntry == null) continue mainloop; crawlEntry = new Request(rowEntry); - + // check blacklist (again) because the user may have created blacklist entries after the queue has been filled if (Switchboard.urlBlacklist.isListed(BlacklistType.CRAWLER, crawlEntry.url())) { ConcurrentLog.fine("CRAWLER", "URL '" + crawlEntry.url() + "' is in blacklist."); - continue; + continue mainloop; + } + + // at this point we must check if the crawlEntry has relevance because the crawl profile still exists + // if not: return null. A calling method must handle the null value and try again + profileEntry = cs.get(UTF8.getBytes(crawlEntry.profileHandle())); + if (profileEntry == null) { + ConcurrentLog.warn("Balancer", "no profile entry for handle " + crawlEntry.profileHandle()); + continue mainloop; } + + // depending on the caching policy we need sleep time to avoid DoS-like situations + sleeptime = Latency.getDomainSleepTime(robots, profileEntry, crawlEntry.url()); break; } } if (crawlEntry == null) return null; + ClientIdentification.Agent agent = profileEntry == null ? ClientIdentification.yacyInternetCrawlerAgent : profileEntry.getAgent(); + long robotsTime = Latency.getRobotsTime(robots, crawlEntry.url(), agent); + Latency.updateAfterSelection(crawlEntry.url(), profileEntry == null ? 0 : robotsTime); + if (delay && sleeptime > 0) { + // force a busy waiting here + // in best case, this should never happen if the balancer works properly + // this is only to protection against the worst case, where the crawler could + // behave in a DoS-manner + ConcurrentLog.info("BALANCER", "forcing crawl-delay of " + sleeptime + " milliseconds for " + crawlEntry.url().getHost() + ": " + Latency.waitingRemainingExplain(crawlEntry.url(), robots, agent)); + long loops = sleeptime / 1000; + long rest = sleeptime % 1000; + if (loops < 3) { + rest = rest + 1000 * loops; + loops = 0; + } + Thread.currentThread().setName("Balancer waiting for " + crawlEntry.url().getHost() + ": " + sleeptime + " milliseconds"); + synchronized(this) { + // must be synchronized here to avoid 'takeover' moves from other threads which then idle the same time which would not be enough + if (rest > 0) {try {this.wait(rest);} catch (final InterruptedException e) {}} + for (int i = 0; i < loops; i++) { + ConcurrentLog.info("BALANCER", "waiting for " + crawlEntry.url().getHost() + ": " + (loops - i) + " seconds remaining..."); + try {this.wait(1000); } catch (final InterruptedException e) {} + } + } + Latency.updateAfterSelection(crawlEntry.url(), robotsTime); + } return crawlEntry; } + @Override public Iterator iterator() throws IOException { - return new EntryIterator(); + final Iterator> depthIterator = this.depthStacks.entrySet().iterator(); + @SuppressWarnings("unchecked") + final Iterator[] rowIterator = new Iterator[1]; + rowIterator[0] = null; + return new Iterator() { + @Override + public boolean hasNext() { + return depthIterator.hasNext() || (rowIterator[0] != null && rowIterator[0].hasNext()); + } + @Override + public Request next() { + synchronized (HostQueue.this) { + try { + while (rowIterator[0] == null || !rowIterator[0].hasNext()) { + Map.Entry entry = depthIterator.next(); + rowIterator[0] = entry.getValue().iterator(); + } + if (!rowIterator[0].hasNext()) return null; + Row.Entry rowEntry = rowIterator[0].next(); + if (rowEntry == null) return null; + return new Request(rowEntry); + } catch (Throwable e) { + return null; + } + } + } + @Override + public void remove() { + rowIterator[0].remove(); + } + }; } - private class EntryIterator implements Iterator { - - private Iterator rowIterator; - - public EntryIterator() throws IOException { - this.rowIterator = HostQueue.this.requestStack.rows(); - } - - @Override - public boolean hasNext() { - return (this.rowIterator == null) ? false : this.rowIterator.hasNext(); - } + /** + * get a list of domains that are currently maintained as domain stacks + * @return a map of clear text strings of host names to an integer array: {the size of the domain stack, guessed delta waiting time} + */ + @Override + public Map getDomainStackHosts(RobotsTxt robots) { + Map map = new TreeMap(); + int delta = Latency.waitingRemainingGuessed(this.hostName, this.hostHash, robots, ClientIdentification.yacyInternetCrawlerAgent); + map.put(this.hostName, new Integer[]{this.size(), delta}); + return map; + } - @Override - public Request next() { - final Row.Entry entry = this.rowIterator.next(); - try { - return (entry == null) ? null : new Request(entry); - } catch (final IOException e) { - ConcurrentLog.logException(e); - this.rowIterator = null; - return null; + /** + * get lists of crawl request entries for a specific host + * @param host + * @param maxcount + * @param maxtime + * @return a list of crawl loader requests + */ + @Override + public List getDomainStackReferences(String host, int maxcount, long maxtime) { + if (host == null) return new ArrayList(0); + if (!this.hostName.equals(host)) return new ArrayList(0); + final ArrayList cel = new ArrayList(maxcount); + long timeout = maxtime == Long.MAX_VALUE ? Long.MAX_VALUE : System.currentTimeMillis() + maxtime; + Iterator i; + try { + i = this.iterator(); + while (i.hasNext()) { + Request r = i.next(); + if (r != null) cel.add(r); + if (System.currentTimeMillis() > timeout || cel.size() >= maxcount) break; } + } catch (IOException e) { } - - @Override - public void remove() { - if (this.rowIterator != null) this.rowIterator.remove(); - } - + return cel; } } diff --git a/source/net/yacy/crawler/HostQueues.java b/source/net/yacy/crawler/HostQueues.java deleted file mode 100644 index 37085a782..000000000 --- a/source/net/yacy/crawler/HostQueues.java +++ /dev/null @@ -1,169 +0,0 @@ -/** - * HostQueues - * Copyright 2013 by Michael Christen - * First released 24.09.2013 at http://yacy.net - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License as published by the Free Software Foundation; either - * version 2.1 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public License - * along with this program in the file lgpl21.txt - * If not, see . - */ - -package net.yacy.crawler; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import net.yacy.cora.document.encoding.ASCII; -import net.yacy.cora.order.Base64Order; -import net.yacy.cora.storage.HandleSet; -import net.yacy.cora.util.SpaceExceededException; -import net.yacy.crawler.data.CrawlProfile; -import net.yacy.crawler.retrieval.Request; -import net.yacy.crawler.robots.RobotsTxt; -import net.yacy.kelondro.data.word.Word; -import net.yacy.kelondro.index.RowHandleSet; - -/** - * wrapper for single HostQueue queues; this is a collection of such queues. - * All these queues are stored in a common directory for the queue stacks - */ -public class HostQueues { - - private final File queuesPath; - private final boolean useTailCache; - private final boolean exceed134217727; - private final Map queues; - - public HostQueues( - final File queuesPath, - final boolean useTailCache, - final boolean exceed134217727) { - this.queuesPath = queuesPath; - this.useTailCache = useTailCache; - this.exceed134217727 = exceed134217727; - - // create a stack for newly entered entries - if (!(queuesPath.exists())) queuesPath.mkdir(); // make the path - this.queuesPath.mkdirs(); - this.queues = new HashMap(); - String[] list = this.queuesPath.list(); - for (String queuefile: list) { - if (queuefile.endsWith(HostQueue.indexSuffix)) { - String hosthash = queuefile.substring(0, queuefile.length() - HostQueue.indexSuffix.length()); - HostQueue queue = new HostQueue(this.queuesPath, hosthash, this.useTailCache, this.exceed134217727); - this.queues.put(hosthash, queue); - } - } - } - - public synchronized void close() { - for (HostQueue queue: this.queues.values()) queue.close(); - this.queues.clear(); - } - - public void clear() { - for (HostQueue queue: this.queues.values()) queue.clear(); - this.queues.clear(); - } - - public Request get(final byte[] urlhash) throws IOException { - String hosthash = ASCII.String(urlhash, 6, 6); - HostQueue queue = this.queues.get(hosthash); - if (queue == null) return null; - return queue.get(urlhash); - } - - public int removeAllByProfileHandle(final String profileHandle, final long timeout) throws IOException, SpaceExceededException { - int c = 0; - for (HostQueue queue: this.queues.values()) c += queue.removeAllByProfileHandle(profileHandle, timeout); - return c; - } - - public synchronized int remove(final HandleSet urlHashes) throws IOException { - Map removeLists = new HashMap(); - for (byte[] urlhash: urlHashes) { - String hosthash = ASCII.String(urlhash, 6, 6); - HandleSet removeList = removeLists.get(hosthash); - if (removeList == null) { - removeList = new RowHandleSet(Word.commonHashLength, Base64Order.enhancedCoder, 100); - removeLists.put(hosthash, removeList); - } - try {removeList.put(urlhash);} catch (SpaceExceededException e) {} - } - int c = 0; - for (Map.Entry entry: removeLists.entrySet()) { - HostQueue queue = this.queues.get(entry.getKey()); - if (queue != null) c += queue.remove(entry.getValue()); - } - return c; - } - - public boolean has(final byte[] urlhashb) { - String hosthash = ASCII.String(urlhashb, 6, 6); - HostQueue queue = this.queues.get(hosthash); - if (queue == null) return false; - return queue.has(urlhashb); - } - - public int size() { - int c = 0; - for (HostQueue queue: this.queues.values()) c += queue.size(); - return c; - } - - public boolean isEmpty() { - for (HostQueue queue: this.queues.values()) if (!queue.isEmpty()) return false; - return true; - } - - /** - * push a request to one of the host queues. If the queue does not exist, it is created - * @param entry - * @param profile - * @param robots - * @return null if everything is ok or a string with an error message if the push is not allowed according to the crawl profile or robots - * @throws IOException - * @throws SpaceExceededException - */ - public String push(final Request entry, CrawlProfile profile, final RobotsTxt robots) throws IOException, SpaceExceededException { - String hosthash = ASCII.String(entry.url().hash(), 6, 6); - HostQueue queue = this.queues.get(hosthash); - if (queue == null) { - queue = new HostQueue(this.queuesPath, hosthash, this.useTailCache, this.exceed134217727); - this.queues.put(hosthash, queue); - } - return queue.push(entry, profile, robots); - } - - /** - * remove one request from all stacks except from those as listed in notFromHost - * @param notFromHost do not collect from these hosts - * @return a list of requests - * @throws IOException - */ - public List pop(Set notFromHost) throws IOException { - ArrayList requests = new ArrayList(); - for (Map.Entry entry: this.queues.entrySet()) { - if (notFromHost.contains(entry.getKey())) continue; - Request r = entry.getValue().pop(); - if (r != null) requests.add(r); - } - return requests; - } - -} diff --git a/source/net/yacy/crawler/LegacyBalancer.java b/source/net/yacy/crawler/LegacyBalancer.java index eb771dcaf..dc8658f06 100644 --- a/source/net/yacy/crawler/LegacyBalancer.java +++ b/source/net/yacy/crawler/LegacyBalancer.java @@ -30,6 +30,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.Set; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -626,4 +627,9 @@ public class LegacyBalancer implements Balancer { } + @Override + public int removeAllByHostHashes(Set hosthashes) { + return 0; + } + } diff --git a/source/net/yacy/crawler/data/CrawlQueues.java b/source/net/yacy/crawler/data/CrawlQueues.java index e9431a016..7d66b039b 100644 --- a/source/net/yacy/crawler/data/CrawlQueues.java +++ b/source/net/yacy/crawler/data/CrawlQueues.java @@ -34,6 +34,7 @@ import java.util.Date; import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @@ -181,7 +182,12 @@ public class CrawlQueues { this.noticeURL.removeByURLHash(hash); this.delegatedURL.remove(hash); } - + + public int removeHosts(final Set hosthashes) { + return this.noticeURL.removeByHostHash(hosthashes); + //this.delegatedURL.remove(hash); + } + public DigestURL getURL(final byte[] urlhash) { assert urlhash != null; if (urlhash == null || urlhash.length == 0) { @@ -304,7 +310,7 @@ public class CrawlQueues { return true; } catch (final IOException e) { CrawlQueues.log.severe(stats + ": CANNOT FETCH ENTRY: " + e.getMessage(), e); - if (e.getMessage().indexOf("hash is null",0) > 0) { + if (e.getMessage() != null && e.getMessage().indexOf("hash is null",0) > 0) { this.noticeURL.clear(NoticedURL.StackType.LOCAL); } } diff --git a/source/net/yacy/crawler/data/Latency.java b/source/net/yacy/crawler/data/Latency.java index c1c7842b6..4d1ddabb5 100644 --- a/source/net/yacy/crawler/data/Latency.java +++ b/source/net/yacy/crawler/data/Latency.java @@ -164,7 +164,7 @@ public class Latency { waiting = Math.max(waiting, (int) (host.average() * Switchboard.getSwitchboard().getConfigFloat(SwitchboardConstants.CRAWLER_LATENCY_FACTOR, 0.5f))); // if the number of same hosts as in the url in the loading queue is greater than MaxSameHostInQueue, then increase waiting - if (Switchboard.getSwitchboard().crawlQueues.hostcount(hostname) > Switchboard.getSwitchboard().getConfigInt(SwitchboardConstants.CRAWLER_MAX_SAME_HOST_IN_QUEUE, 20)) waiting += 5000; + if (Switchboard.getSwitchboard().crawlQueues.hostcount(hostname) > Switchboard.getSwitchboard().getConfigInt(SwitchboardConstants.CRAWLER_MAX_SAME_HOST_IN_QUEUE, 20)) waiting += 3000; // the time since last access to the domain is the basis of the remaining calculation final int timeSinceLastAccess = (int) (System.currentTimeMillis() - host.lastacc()); @@ -207,7 +207,7 @@ public class Latency { waiting = Math.max(waiting, (int) (host.average() * Switchboard.getSwitchboard().getConfigFloat(SwitchboardConstants.CRAWLER_LATENCY_FACTOR, 0.5f))); // if the number of same hosts as in the url in the loading queue is greater than MaxSameHostInQueue, then increase waiting - if (Switchboard.getSwitchboard().crawlQueues.hostcount(url.getHost()) > Switchboard.getSwitchboard().getConfigInt(SwitchboardConstants.CRAWLER_MAX_SAME_HOST_IN_QUEUE, 20)) waiting += 5000; + if (Switchboard.getSwitchboard().crawlQueues.hostcount(url.getHost()) > Switchboard.getSwitchboard().getConfigInt(SwitchboardConstants.CRAWLER_MAX_SAME_HOST_IN_QUEUE, 20)) waiting += 3000; // the time since last access to the domain is the basis of the remaining calculation final int timeSinceLastAccess = (int) (System.currentTimeMillis() - host.lastacc()); diff --git a/source/net/yacy/crawler/data/NoticedURL.java b/source/net/yacy/crawler/data/NoticedURL.java index a64cf9193..045290e9e 100644 --- a/source/net/yacy/crawler/data/NoticedURL.java +++ b/source/net/yacy/crawler/data/NoticedURL.java @@ -33,6 +33,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import net.yacy.cora.order.Base64Order; import net.yacy.cora.storage.HandleSet; @@ -40,7 +41,7 @@ import net.yacy.cora.util.ConcurrentLog; import net.yacy.cora.util.SpaceExceededException; import net.yacy.crawler.Balancer; import net.yacy.crawler.CrawlSwitchboard; -import net.yacy.crawler.LegacyBalancer; +import net.yacy.crawler.HostBalancer; import net.yacy.crawler.retrieval.Request; import net.yacy.crawler.robots.RobotsTxt; import net.yacy.kelondro.data.word.Word; @@ -59,14 +60,13 @@ public class NoticedURL { protected NoticedURL( final File cachePath, - final boolean useTailCache, + @SuppressWarnings("unused") final boolean useTailCache, final boolean exceed134217727) { ConcurrentLog.info("NoticedURL", "CREATING STACKS at " + cachePath.toString()); - this.coreStack = new LegacyBalancer(cachePath, "urlNoticeCoreStack", useTailCache, exceed134217727); - this.limitStack = new LegacyBalancer(cachePath, "urlNoticeLimitStack", useTailCache, exceed134217727); - //overhangStack = new plasmaCrawlBalancer(overhangStackFile); - this.remoteStack = new LegacyBalancer(cachePath, "urlNoticeRemoteStack", useTailCache, exceed134217727); - this.noloadStack = new LegacyBalancer(cachePath, "urlNoticeNoLoadStack", useTailCache, exceed134217727); + this.coreStack = new HostBalancer(new File(cachePath, "CrawlerCoreStacks"), exceed134217727); + this.limitStack = new HostBalancer(new File(cachePath, "CrawlerLimitStacks"), exceed134217727); + this.remoteStack = new HostBalancer(new File(cachePath, "CrawlerRemoteStacks"), exceed134217727); + this.noloadStack = new HostBalancer(new File(cachePath, "CrawlerNoLoadStacks"), exceed134217727); } public void clear() { @@ -206,6 +206,15 @@ public class NoticedURL { return removed; } + public int removeByHostHash(final Set hosthashes) { + int removed = 0; + removed += this.noloadStack.removeAllByHostHashes(hosthashes); + removed += this.coreStack.removeAllByHostHashes(hosthashes); + removed += this.limitStack.removeAllByHostHashes(hosthashes); + removed += this.remoteStack.removeAllByHostHashes(hosthashes); + return removed; + } + /** * get a list of domains that are currently maintained as domain stacks * @return a map of clear text strings of host names to two integers: the size of the domain stacks and the access delta time diff --git a/source/net/yacy/crawler/retrieval/Response.java b/source/net/yacy/crawler/retrieval/Response.java index 004af54a8..3ff9f8104 100644 --- a/source/net/yacy/crawler/retrieval/Response.java +++ b/source/net/yacy/crawler/retrieval/Response.java @@ -835,7 +835,7 @@ public class Response { final String supportError = TextParser.supports(url(), this.responseHeader == null ? null : this.responseHeader.mime()); if (supportError != null) throw new Parser.Failure("no parser support:" + supportError, url()); try { - return TextParser.parseSource(new AnchorURL(url()), this.responseHeader == null ? null : this.responseHeader.mime(), this.responseHeader == null ? "UTF-8" : this.responseHeader.getCharacterEncoding(), this.content); + return TextParser.parseSource(new AnchorURL(url()), this.responseHeader == null ? null : this.responseHeader.mime(), this.responseHeader == null ? "UTF-8" : this.responseHeader.getCharacterEncoding(), this.request.depth(), this.content); } catch (final Exception e) { return null; } diff --git a/source/net/yacy/document/Document.java b/source/net/yacy/document/Document.java index 8b8b3cdfc..71b8b79e2 100644 --- a/source/net/yacy/document/Document.java +++ b/source/net/yacy/document/Document.java @@ -515,7 +515,7 @@ dc_rights this.emaillinks = new LinkedHashMap(); final Map collectedImages = new HashMap(); // this is a set that is collected now and joined later to the imagelinks for (final Map.Entry entry: this.images.entrySet()) { - if (entry.getKey().getHost().equals(thishost)) this.inboundlinks.put(entry.getKey(), "image"); else this.outboundlinks.put(entry.getKey(), "image"); + if (entry.getKey() != null && entry.getKey().getHost() != null && entry.getKey().getHost().equals(thishost)) this.inboundlinks.put(entry.getKey(), "image"); else this.outboundlinks.put(entry.getKey(), "image"); } for (final AnchorURL url: this.anchors) { if (url == null) continue; diff --git a/source/net/yacy/document/TextParser.java b/source/net/yacy/document/TextParser.java index 1f867ec9c..023299ad9 100644 --- a/source/net/yacy/document/TextParser.java +++ b/source/net/yacy/document/TextParser.java @@ -159,6 +159,7 @@ public final class TextParser { final AnchorURL location, final String mimeType, final String charset, + final int depth, final File sourceFile ) throws InterruptedException, Parser.Failure { @@ -172,7 +173,7 @@ public final class TextParser { throw new Parser.Failure(errorMsg, location); } sourceStream = new BufferedInputStream(new FileInputStream(sourceFile)); - docs = parseSource(location, mimeType, charset, sourceFile.length(), sourceStream); + docs = parseSource(location, mimeType, charset, depth, sourceFile.length(), sourceStream); } catch (final Exception e) { if (e instanceof InterruptedException) throw (InterruptedException) e; if (e instanceof Parser.Failure) throw (Parser.Failure) e; @@ -189,6 +190,7 @@ public final class TextParser { final AnchorURL location, String mimeType, final String charset, + final int depth, final byte[] content ) throws Parser.Failure { if (AbstractParser.log.isFine()) AbstractParser.log.fine("Parsing '" + location + "' from byte-array"); @@ -203,7 +205,7 @@ public final class TextParser { } assert !idioms.isEmpty() : "no parsers applied for url " + location.toNormalform(true); - Document[] docs = parseSource(location, mimeType, idioms, charset, content); + Document[] docs = parseSource(location, mimeType, idioms, charset, depth, content); return docs; } @@ -212,6 +214,7 @@ public final class TextParser { final AnchorURL location, String mimeType, final String charset, + final int depth, final long contentLength, final InputStream sourceStream ) throws Parser.Failure { @@ -242,7 +245,7 @@ public final class TextParser { } catch (final IOException e) { throw new Parser.Failure(e.getMessage(), location); } - Document[] docs = parseSource(location, mimeType, idioms, charset, b); + Document[] docs = parseSource(location, mimeType, idioms, charset, depth, b); return docs; } @@ -273,6 +276,7 @@ public final class TextParser { final String mimeType, final Set parsers, final String charset, + final int depth, final byte[] sourceArray ) throws Parser.Failure { final String fileExt = MultiProtocolURL.getFileExtension(location.getFileName()); @@ -326,7 +330,10 @@ public final class TextParser { } throw new Parser.Failure("All parser failed: " + failedParsers, location); } - for (final Document d: docs) { assert d.getTextStream() != null : "mimeType = " + mimeType; } // verify docs + for (final Document d: docs) { + assert d.getTextStream() != null : "mimeType = " + mimeType; + d.setDepth(depth); + } // verify docs return docs; } diff --git a/source/net/yacy/document/importer/MediawikiImporter.java b/source/net/yacy/document/importer/MediawikiImporter.java index 56c4eea5a..061698349 100644 --- a/source/net/yacy/document/importer/MediawikiImporter.java +++ b/source/net/yacy/document/importer/MediawikiImporter.java @@ -522,7 +522,7 @@ public class MediawikiImporter extends Thread implements Importer { public void genDocument() throws Parser.Failure { try { this.url = new AnchorURL(this.urlStub + this.title); - final Document[] parsed = TextParser.parseSource(this.url, "text/html", "UTF-8", UTF8.getBytes(this.html)); + final Document[] parsed = TextParser.parseSource(this.url, "text/html", "UTF-8", 1, UTF8.getBytes(this.html)); this.document = Document.mergeDocuments(this.url, "text/html", parsed); // the wiki parser is not able to find the proper title in the source text, so it must be set here this.document.setTitle(this.title); diff --git a/source/net/yacy/document/parser/bzipParser.java b/source/net/yacy/document/parser/bzipParser.java index 6e088fafd..373bc955d 100644 --- a/source/net/yacy/document/parser/bzipParser.java +++ b/source/net/yacy/document/parser/bzipParser.java @@ -94,7 +94,7 @@ public class bzipParser extends AbstractParser implements Parser { out.close(); // creating a new parser class to parse the unzipped content - docs = TextParser.parseSource(location, null, null, tempFile); + docs = TextParser.parseSource(location, null, null, 999, tempFile); } catch (final Exception e) { if (e instanceof InterruptedException) throw (InterruptedException) e; if (e instanceof Parser.Failure) throw (Parser.Failure) e; diff --git a/source/net/yacy/document/parser/gzipParser.java b/source/net/yacy/document/parser/gzipParser.java index d62022085..6f36eb541 100644 --- a/source/net/yacy/document/parser/gzipParser.java +++ b/source/net/yacy/document/parser/gzipParser.java @@ -79,7 +79,7 @@ public class gzipParser extends AbstractParser implements Parser { out.close(); // creating a new parser class to parse the unzipped content - docs = TextParser.parseSource(location,null,null,tempFile); + docs = TextParser.parseSource(location, null, null, 999, tempFile); } catch (final Exception e) { if (e instanceof InterruptedException) throw (InterruptedException) e; if (e instanceof Parser.Failure) throw (Parser.Failure) e; diff --git a/source/net/yacy/document/parser/sevenzipParser.java b/source/net/yacy/document/parser/sevenzipParser.java index 245521540..2dacd8ba9 100644 --- a/source/net/yacy/document/parser/sevenzipParser.java +++ b/source/net/yacy/document/parser/sevenzipParser.java @@ -171,7 +171,7 @@ public class sevenzipParser extends AbstractParser implements Parser { // below for reversion of the effects final AnchorURL url = AnchorURL.newAnchor(this.doc.dc_source(), this.prefix + "/" + super.filePath); final String mime = TextParser.mimeOf(super.filePath.substring(super.filePath.lastIndexOf('.') + 1)); - theDocs = TextParser.parseSource(url, mime, null, this.cfos.toByteArray()); + theDocs = TextParser.parseSource(url, mime, null, this.doc.getDepth() + 1, this.cfos.toByteArray()); this.doc.addSubDocuments(theDocs); } diff --git a/source/net/yacy/document/parser/tarParser.java b/source/net/yacy/document/parser/tarParser.java index 86ded03dd..79ccca964 100644 --- a/source/net/yacy/document/parser/tarParser.java +++ b/source/net/yacy/document/parser/tarParser.java @@ -90,7 +90,7 @@ public class tarParser extends AbstractParser implements Parser { try { tmp = FileUtils.createTempFile(this.getClass(), name); FileUtils.copy(tis, tmp, entry.getSize()); - subDocs = TextParser.parseSource(AnchorURL.newAnchor(url, "#" + name), mime, null, tmp); + subDocs = TextParser.parseSource(AnchorURL.newAnchor(url, "#" + name), mime, null, 999, tmp); if (subDocs == null) continue; for (final Document d: subDocs) docacc.add(d); } catch (final Parser.Failure e) { diff --git a/source/net/yacy/document/parser/zipParser.java b/source/net/yacy/document/parser/zipParser.java index ff232d723..2f381270a 100644 --- a/source/net/yacy/document/parser/zipParser.java +++ b/source/net/yacy/document/parser/zipParser.java @@ -89,7 +89,7 @@ public class zipParser extends AbstractParser implements Parser { FileUtils.copy(zis, tmp, entry.getSize()); final DigestURL virtualURL = DigestURL.newURL(url, "#" + name); //this.log.logInfo("ZIP file parser: " + virtualURL.toNormalform(false, false)); - docs = TextParser.parseSource(new AnchorURL(virtualURL), mime, null, tmp); + docs = TextParser.parseSource(new AnchorURL(virtualURL), mime, null, 999, tmp); if (docs == null) continue; for (final Document d: docs) docacc.add(d); } catch (final Parser.Failure e) { diff --git a/source/net/yacy/kelondro/index/OnDemandOpenFileIndex.java b/source/net/yacy/kelondro/index/OnDemandOpenFileIndex.java new file mode 100644 index 000000000..c8c8598aa --- /dev/null +++ b/source/net/yacy/kelondro/index/OnDemandOpenFileIndex.java @@ -0,0 +1,420 @@ +/** + * OnDemandOpenFileIndex + * Copyright 2014 by Michael Christen + * First released 16.04.2014 at http://yacy.net + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program in the file lgpl21.txt + * If not, see . + */ + + +package net.yacy.kelondro.index; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import net.yacy.cora.order.CloneableIterator; +import net.yacy.cora.util.ConcurrentLog; +import net.yacy.cora.util.SpaceExceededException; +import net.yacy.kelondro.index.Row.Entry; +import net.yacy.kelondro.table.Table; +import net.yacy.kelondro.util.kelondroException; + +/** + * a write buffer for ObjectIndex entries + * @author Michael Peter Christen + * + */ +public class OnDemandOpenFileIndex implements Index, Iterable { + + private final File file; + private final Row rowdef; + private int sizecache; + private final boolean exceed134217727; + + public OnDemandOpenFileIndex(final File file, Row rowdef, final boolean exceed134217727) { + this.file = file; + this.rowdef = rowdef; + this.exceed134217727 = exceed134217727; + this.sizecache = -1; + } + + private Index getIndex() { + try { + return new Table(file, rowdef, 1000, 0, false, exceed134217727, false); + } catch (kelondroException e) { + ConcurrentLog.logException(e); + return null; + } catch (SpaceExceededException e) { + ConcurrentLog.logException(e); + return null; + } + } + + @Override + public synchronized byte[] smallestKey() { + Index index = getIndex(); + if (index == null) return null; + byte[] b = index.smallestKey(); + index.close(); + return b; + } + + @Override + public synchronized byte[] largestKey() { + Index index = getIndex(); + if (index == null) return null; + byte[] b = index.largestKey(); + index.close(); + return b; + } + + @Override + public synchronized void optimize() { + Index index = getIndex(); + if (index == null) return; + index.optimize(); + index.close(); + } + + @Override + public synchronized long mem() { + Index index = getIndex(); + if (index == null) return 0; + long l = index.mem(); + index.close(); + return l; + } + + @Override + public synchronized void addUnique(final Entry row) throws SpaceExceededException, IOException { + Index index = getIndex(); + if (index == null) return; + try { + index.addUnique(row); + if (this.sizecache >= 0) this.sizecache++; + } catch (IOException e) { + throw e; + } finally { + index.close(); + } + } + + @Override + public synchronized void clear() throws IOException { + Index index = getIndex(); + if (index == null) return; + try { + index.clear(); + this.sizecache = 0; + } catch (IOException e) { + throw e; + } finally { + index.close(); + } + } + + @Override + public synchronized void close() { + } + + @Override + public synchronized void deleteOnExit() { + Index index = getIndex(); + index.deleteOnExit(); + index.close(); + } + + @Override + public String filename() { + return this.file.toString(); + } + + @Override + public synchronized int size() { + if (sizecache >= 0) return sizecache; + Index index = getIndex(); + if (index == null) return 0; + int i = index.size(); + index.close(); + this.sizecache = i; + return i; + } + + @Override + public synchronized Entry get(final byte[] key, final boolean forcecopy) throws IOException { + Index index = getIndex(); + if (index == null) return null; + try { + return index.get(key, forcecopy); + } catch (IOException e) { + throw e; + } finally { + index.close(); + } + } + + @Override + public synchronized Map get(final Collection keys, final boolean forcecopy) throws IOException, InterruptedException { + final Map map = new TreeMap(row().objectOrder); + Row.Entry entry; + for (final byte[] key: keys) { + entry = get(key, forcecopy); + if (entry != null) map.put(key, entry); + } + return map; + } + + @Override + public synchronized boolean has(final byte[] key) { + Index index = getIndex(); + if (index == null) return false; + boolean b = index.has(key); + index.close(); + return b; + } + + @Override + public synchronized boolean isEmpty() { + Index index = getIndex(); + if (index == null) return true; + boolean b = index.isEmpty(); + if (b) this.sizecache = 0; + index.close(); + return b; + } + + /** + * Adds the row to the index. The row is identified by the primary key of the row. + * @param row a index row + * @return true if this set did _not_ already contain the given row. + * @throws IOException + * @throws SpaceExceededException + */ + @Override + public synchronized boolean put(final Entry row) throws IOException, SpaceExceededException { + Index index = getIndex(); + if (index == null) return false; + try { + boolean b = index.put(row); + if (this.sizecache >= 0 && b) this.sizecache++; + return b; + } catch (IOException e) { + throw e; + } finally { + index.close(); + } + } + + @Override + public synchronized Entry remove(final byte[] key) throws IOException { + Index index = getIndex(); + if (index == null) return null; + try { + Entry e = index.remove(key); + if (this.sizecache >= 0 && e != null) this.sizecache--; + return e; + } catch (IOException e) { + throw e; + } finally { + index.close(); + } + } + + @Override + public synchronized boolean delete(final byte[] key) throws IOException { + Index index = getIndex(); + if (index == null) return false; + try { + boolean b = index.delete(key); + if (this.sizecache >= 0 && b) this.sizecache--; + return b; + } catch (IOException e) { + throw e; + } finally { + index.close(); + } + } + + @Override + public synchronized List removeDoubles() throws IOException, SpaceExceededException { + Index index = getIndex(); + if (index == null) return null; + try { + List l = index.removeDoubles(); + this.sizecache = index.size(); + return l; + } catch (IOException e) { + throw e; + } finally { + index.close(); + } + } + + @Override + public synchronized List top(final int count) throws IOException { + Index index = getIndex(); + if (index == null) return null; + try { + return index.top(count); + } catch (IOException e) { + throw e; + } finally { + index.close(); + } + } + + @Override + public synchronized List random(final int count) throws IOException { + Index index = getIndex(); + if (index == null) return null; + try { + return index.random(count); + } catch (IOException e) { + throw e; + } finally { + index.close(); + } + } + + @Override + public synchronized Entry removeOne() throws IOException { + Index index = getIndex(); + if (index == null) return null; + try { + Entry e = index.removeOne(); + if (this.sizecache >= 0 && e != null) this.sizecache--; + return e; + } catch (IOException e) { + throw e; + } finally { + index.close(); + } + } + + @Override + public synchronized Entry replace(final Entry row) throws SpaceExceededException, IOException { + Index index = getIndex(); + if (index == null) return null; + try { + Entry e = index.replace(row); + if (this.sizecache >= 0 && e == null) this.sizecache++; + return e; + } catch (IOException e) { + throw e; + } finally { + index.close(); + } + } + + @Override + public Row row() { + return this.rowdef; + } + + @Override + public synchronized CloneableIterator keys(final boolean up, final byte[] firstKey) throws IOException { + Index index = getIndex(); + if (index == null) return null; + try { + return index.keys(up, firstKey); + } catch (IOException e) { + throw e; + } finally { + index.close(); + } + } + + @Override + public synchronized Iterator iterator() { + Index index = getIndex(); + if (index == null) return null; + List list = new ArrayList(); + Iterator i = index.iterator(); + while (i.hasNext()) list.add(i.next()); + index.close(); + return list.iterator(); + } + + @Override + public synchronized CloneableIterator rows(final boolean up, final byte[] firstKey) throws IOException { + Index index = getIndex(); + if (index == null) return null; + final List list = new ArrayList(); + final Iterator i = index.rows(up, firstKey); + while (i.hasNext()) list.add(i.next()); + index.close(); + final Iterator li = list.iterator(); + return new CloneableIterator(){ + @Override + public boolean hasNext() { + return li.hasNext(); + } + @Override + public Entry next() { + return li.next(); + } + @Override + public void remove() { + li.remove(); + } + @Override + public CloneableIterator clone(Object modifier) { + return null; + } + @Override + public void close() { + } + }; + } + + @Override + public synchronized CloneableIterator rows() throws IOException { + Index index = getIndex(); + if (index == null) return null; + final List list = new ArrayList(); + final Iterator i = index.rows(); + while (i.hasNext()) list.add(i.next()); + index.close(); + final Iterator li = list.iterator(); + return new CloneableIterator(){ + @Override + public boolean hasNext() { + return li.hasNext(); + } + @Override + public Entry next() { + return li.next(); + } + @Override + public void remove() { + li.remove(); + } + @Override + public CloneableIterator clone(Object modifier) { + return null; + } + @Override + public void close() { + } + }; + } + +} diff --git a/source/net/yacy/kelondro/table/ChunkIterator.java b/source/net/yacy/kelondro/table/ChunkIterator.java index 521dd72cc..1ddd7ba41 100644 --- a/source/net/yacy/kelondro/table/ChunkIterator.java +++ b/source/net/yacy/kelondro/table/ChunkIterator.java @@ -56,6 +56,7 @@ public class ChunkIterator extends LookAheadIterator implements Iterator private final int recordsize; public ChunkIterator(final File file, final int recordsize, final int chunksize) throws FileNotFoundException { + if (!file.exists()) throw new FileNotFoundException(file.getAbsolutePath()); assert (file.exists()); assert file.length() % recordsize == 0; this.recordsize = recordsize; diff --git a/source/net/yacy/kelondro/table/Table.java b/source/net/yacy/kelondro/table/Table.java index 06939c5b2..ed67e398e 100644 --- a/source/net/yacy/kelondro/table/Table.java +++ b/source/net/yacy/kelondro/table/Table.java @@ -71,6 +71,7 @@ import net.yacy.kelondro.util.kelondroException; public class Table implements Index, Iterable { // static tracker objects + private final static ConcurrentLog log = new ConcurrentLog("TABLE"); private final static TreeMap tableTracker = new TreeMap(); private final static long maxarraylength = 134217727L; // (2^27-1) that may be the maximum size of array length in some JVMs @@ -89,7 +90,7 @@ public class Table implements Index, Iterable { final int initialSpace, boolean useTailCache, final boolean exceed134217727, - final boolean warmUp) throws SpaceExceededException { + final boolean warmUp) throws SpaceExceededException, kelondroException { this.rowdef = rowdef; this.buffersize = buffersize; @@ -112,9 +113,10 @@ public class Table implements Index, Iterable { fos = new FileOutputStream(tablefile); } catch (final FileNotFoundException e) { // should not happen - ConcurrentLog.severe("Table", "", e); + log.severe("", e); + } finally { + if (fos != null) try { fos.close(); } catch (final IOException e) {} } - if (fos != null) try { fos.close(); } catch (final IOException e) {} } try { @@ -136,22 +138,22 @@ public class Table implements Index, Iterable { this.table = null; } - ConcurrentLog.info("TABLE", "initialization of " + tablefile.getName() + ". table copy: " + ((this.table == null) ? "no" : "yes") + ", available RAM: " + (MemoryControl.available() / 1024L / 1024L) + "MB, needed: " + (neededRAM4table / 1024L / 1024L) + "MB, allocating space for " + records + " entries"); + if (log.isFine()) log.fine("initialization of " + tablefile.getName() + ". table copy: " + ((this.table == null) ? "no" : "yes") + ", available RAM: " + (MemoryControl.available() / 1024L / 1024L) + "MB, needed: " + (neededRAM4table / 1024L / 1024L) + "MB, allocating space for " + records + " entries"); final long neededRAM4index = 100L * 1024L * 1024L + records * (rowdef.primaryKeyLength + 4L) * 3L / 2L; if (records > 0 && !MemoryControl.request(neededRAM4index, true)) { // despite calculations seemed to show that there is enough memory for the table AND the index // there is now not enough memory left for the index. So delete the table again to free the memory // for the index - ConcurrentLog.severe("TABLE", tablefile.getName() + ": not enough RAM (" + (MemoryControl.available() / 1024L / 1024L) + "MB) left for index, deleting allocated table space to enable index space allocation (needed: " + (neededRAM4index / 1024L / 1024L) + "MB)"); + log.severe(tablefile.getName() + ": not enough RAM (" + (MemoryControl.available() / 1024L / 1024L) + "MB) left for index, deleting allocated table space to enable index space allocation (needed: " + (neededRAM4index / 1024L / 1024L) + "MB)"); this.table = null; System.gc(); - ConcurrentLog.severe("TABLE", tablefile.getName() + ": RAM after releasing the table: " + (MemoryControl.available() / 1024L / 1024L) + "MB"); + log.severe(tablefile.getName() + ": RAM after releasing the table: " + (MemoryControl.available() / 1024L / 1024L) + "MB"); } this.index = new RowHandleMap(rowdef.primaryKeyLength, rowdef.objectOrder, 4, records, tablefile.getAbsolutePath()); final RowHandleMap errors = new RowHandleMap(rowdef.primaryKeyLength, NaturalOrder.naturalOrder, 4, records, tablefile.getAbsolutePath() + ".errors"); - ConcurrentLog.info("TABLE", tablefile + ": TABLE " + tablefile.toString() + " has table copy " + ((this.table == null) ? "DISABLED" : "ENABLED")); + if (log.isFine()) log.fine(tablefile + ": TABLE " + tablefile.toString() + " has table copy " + ((this.table == null) ? "DISABLED" : "ENABLED")); // read all elements from the file into the copy table - ConcurrentLog.info("TABLE", "initializing RAM index for TABLE " + tablefile.getName() + ", please wait."); + if (log.isFine()) log.fine("initializing RAM index for TABLE " + tablefile.getName() + ", please wait."); int i = 0; byte[] key; if (this.table == null) { @@ -211,7 +213,7 @@ public class Table implements Index, Iterable { removeInFile(idx); key = entry.getKey(); if (key == null) continue; - ConcurrentLog.warn("Table", "removing not well-formed entry " + idx + " with key: " + NaturalOrder.arrayList(key, 0, key.length) + ", " + errorcc++ + "/" + errorc); + log.warn("removing not well-formed entry " + idx + " with key: " + NaturalOrder.arrayList(key, 0, key.length) + ", " + errorcc++ + "/" + errorc); } errors.close(); assert this.file.size() == this.index.size() : "file.size() = " + this.file.size() + ", index.size() = " + this.index.size() + ", file = " + filename(); @@ -220,10 +222,10 @@ public class Table implements Index, Iterable { if (!freshFile && warmUp) {warmUp0();} } catch (final FileNotFoundException e) { // should never happen - ConcurrentLog.severe("Table", "", e); + log.severe("", e); throw new kelondroException(e.getMessage()); } catch (final IOException e) { - ConcurrentLog.severe("Table", "", e); + log.severe("", e); throw new kelondroException(e.getMessage()); } @@ -242,7 +244,7 @@ public class Table implements Index, Iterable { //assert index.size() + doubles.size() == i; //System.out.println(" -removed " + doubles.size() + " doubles- done."); if (doubles.isEmpty()) return; - ConcurrentLog.info("TABLE", filename() + ": WARNING - TABLE " + filename() + " has " + doubles.size() + " doubles"); + log.info(filename() + ": WARNING - TABLE " + filename() + " has " + doubles.size() + " doubles"); // from all the doubles take one, put it back to the index and remove the others from the file // first put back one element each final byte[] record = new byte[this.rowdef.objectsize]; @@ -266,9 +268,9 @@ public class Table implements Index, Iterable { removeInFile(top.intValue()); } } catch (final SpaceExceededException e) { - ConcurrentLog.severe("Table", "", e); + log.severe("", e); } catch (final IOException e) { - ConcurrentLog.severe("Table", "", e); + log.severe("", e); } optimize(); } @@ -304,16 +306,16 @@ public class Table implements Index, Iterable { return Records.tableSize(tablefile, recordsize); } catch (final IOException e) { if (!fixIfCorrupted) { - ConcurrentLog.severe("Table", "table size broken for file " + tablefile.toString(), e); + log.severe("table size broken for file " + tablefile.toString(), e); throw new kelondroException(e.getMessage()); } - ConcurrentLog.severe("Table", "table size broken, try to fix " + tablefile.toString()); + log.severe("table size broken, try to fix " + tablefile.toString()); try { Records.fixTableSize(tablefile, recordsize); - ConcurrentLog.info("Table", "successfully fixed table file " + tablefile.toString()); + log.info("successfully fixed table file " + tablefile.toString()); return Records.tableSize(tablefile, recordsize); } catch (final IOException ee) { - ConcurrentLog.severe("Table", "table size fix did not work", ee); + log.severe("table size fix did not work", ee); throw new kelondroException(e.getMessage()); } } @@ -363,7 +365,7 @@ public class Table implements Index, Iterable { try { return this.file.size() == this.index.size(); } catch (final IOException e) { - ConcurrentLog.logException(e); + log.logException(e); return false; } } @@ -461,7 +463,7 @@ public class Table implements Index, Iterable { d.remove(s); removeInFile(s.intValue()); if (System.currentTimeMillis() - lastlog > 30000) { - ConcurrentLog.info("TABLE", "removing " + d.size() + " entries in " + filename()); + log.info("removing " + d.size() + " entries in " + filename()); lastlog = System.currentTimeMillis(); } } @@ -515,7 +517,7 @@ public class Table implements Index, Iterable { this.file.get(i, b, 0); } catch (final IndexOutOfBoundsException e) { // there must be a problem with the table index - ConcurrentLog.severe("Table", "IndexOutOfBoundsException: " + e.getMessage(), e); + log.severe("IndexOutOfBoundsException: " + e.getMessage(), e); this.index.remove(key); if (this.table != null) this.table.remove(key); return null; @@ -704,7 +706,7 @@ public class Table implements Index, Iterable { byte[] pk = lr.getPrimaryKeyBytes(); if (pk == null) { // Table file might be corrupt - ConcurrentLog.warn("TABLE", "Possible corruption found in table " + this.filename() + " detected. i=" + i + ",p=" + p); + log.warn("Possible corruption found in table " + this.filename() + " detected. i=" + i + ",p=" + p); continue; } this.index.put(pk, i); @@ -759,7 +761,7 @@ public class Table implements Index, Iterable { try { this.index.put(k, i); } catch (final SpaceExceededException e) { - ConcurrentLog.logException(e); + log.logException(e); throw new IOException("RowSpaceExceededException: " + e.getMessage()); } } @@ -789,7 +791,7 @@ public class Table implements Index, Iterable { try { this.table.set(i, te); } catch (final SpaceExceededException e) { - ConcurrentLog.logException(e); + log.logException(e); this.table = null; } @@ -895,7 +897,7 @@ public class Table implements Index, Iterable { @Override public boolean isEmpty() { - return this.index.isEmpty(); + return this.index == null || this.index.isEmpty(); } @Override @@ -1011,7 +1013,7 @@ public class Table implements Index, Iterable { try { Table.this.file.get(this.c, b, 0); } catch (final IOException e) { - ConcurrentLog.severe("Table", "", e); + log.severe("", e); return null; } } else { @@ -1116,7 +1118,7 @@ public class Table implements Index, Iterable { } System.out.println("FINISHED test after " + ((System.currentTimeMillis() - start) / 1000) + " seconds."); } catch (final Exception e) { - ConcurrentLog.logException(e); + log.logException(e); System.out.println("TERMINATED"); } } diff --git a/source/net/yacy/repository/LoaderDispatcher.java b/source/net/yacy/repository/LoaderDispatcher.java index 062e2b4bc..8c631e904 100644 --- a/source/net/yacy/repository/LoaderDispatcher.java +++ b/source/net/yacy/repository/LoaderDispatcher.java @@ -395,7 +395,7 @@ public final class LoaderDispatcher { final String supportError = TextParser.supports(url, responseHeader.mime()); if (supportError != null) throw new IOException("no parser support: " + supportError); try { - documents = TextParser.parseSource(url, responseHeader.mime(), responseHeader.getCharacterEncoding(), response.getContent()); + documents = TextParser.parseSource(url, responseHeader.mime(), responseHeader.getCharacterEncoding(), response.depth(), response.getContent()); if (documents == null) throw new IOException("document == null"); } catch (final Exception e) { throw new IOException("parser error: " + e.getMessage()); diff --git a/source/net/yacy/search/Switchboard.java b/source/net/yacy/search/Switchboard.java index 2d37411b3..05b91ae37 100644 --- a/source/net/yacy/search/Switchboard.java +++ b/source/net/yacy/search/Switchboard.java @@ -2546,6 +2546,7 @@ public final class Switchboard extends serverSwitch { new AnchorURL(response.url()), response.getMimeType(), response.getCharacterEncoding(), + response.depth(), response.getContent()); if ( documents == null ) { throw new Parser.Failure("Parser returned null.", response.url()); diff --git a/source/net/yacy/search/index/DocumentIndex.java b/source/net/yacy/search/index/DocumentIndex.java index ea392e561..a62338e1b 100644 --- a/source/net/yacy/search/index/DocumentIndex.java +++ b/source/net/yacy/search/index/DocumentIndex.java @@ -149,7 +149,7 @@ public class DocumentIndex extends Segment { length = -1; } try { - documents = TextParser.parseSource(url, null, null, length, url.getInputStream(ClientIdentification.yacyInternetCrawlerAgent, null, null)); + documents = TextParser.parseSource(url, null, null, 999, length, url.getInputStream(ClientIdentification.yacyInternetCrawlerAgent, null, null)); } catch (final Exception e ) { throw new IOException("cannot parse " + url.toString() + ": " + e.getMessage()); } diff --git a/source/net/yacy/search/index/Segment.java b/source/net/yacy/search/index/Segment.java index b3e6207cc..7825e5b03 100644 --- a/source/net/yacy/search/index/Segment.java +++ b/source/net/yacy/search/index/Segment.java @@ -36,6 +36,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.regex.Pattern; @@ -495,8 +496,13 @@ public class Segment { urlstub = null; } else { final String host = stub.getHost(); - String hh = DigestURL.hosthash(host); - docQueue = this.fulltext.getDefaultConnector().concurrentDocumentsByQuery(CollectionSchema.host_id_s + ":\"" + hh + "\"", CollectionSchema.url_chars_i.getSolrFieldName() + " asc", 0, Integer.MAX_VALUE, maxtime, maxcount, 1, CollectionSchema.id.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName()); + String hh = null; + try { + hh = DigestURL.hosthash(host, stub.getPort()); + } catch (MalformedURLException e) { + ConcurrentLog.logException(e); + } + docQueue = hh == null ? new ArrayBlockingQueue(0) : this.fulltext.getDefaultConnector().concurrentDocumentsByQuery(CollectionSchema.host_id_s + ":\"" + hh + "\"", CollectionSchema.url_chars_i.getSolrFieldName() + " asc", 0, Integer.MAX_VALUE, maxtime, maxcount, 1, CollectionSchema.id.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName()); urlstub = stub.toNormalform(true); } diff --git a/source/net/yacy/search/query/QueryModifier.java b/source/net/yacy/search/query/QueryModifier.java index 9a792b90f..4f14161cd 100644 --- a/source/net/yacy/search/query/QueryModifier.java +++ b/source/net/yacy/search/query/QueryModifier.java @@ -20,6 +20,7 @@ package net.yacy.search.query; +import java.net.MalformedURLException; import java.util.ArrayList; import org.apache.solr.common.params.CommonParams; @@ -27,6 +28,7 @@ import org.apache.solr.common.params.MultiMapSolrParams; import net.yacy.cora.document.id.DigestURL; import net.yacy.cora.util.CommonPattern; +import net.yacy.cora.util.ConcurrentLog; import net.yacy.kelondro.util.ISO639; import net.yacy.search.schema.CollectionSchema; import net.yacy.server.serverObjects; @@ -97,7 +99,12 @@ public class QueryModifier { while ( sitehost.endsWith(".") ) { sitehost = sitehost.substring(0, sitehost.length() - 1); } - sitehash = DigestURL.hosthash(sitehost); + try { + sitehash = DigestURL.hosthash(sitehost, sitehost.startsWith("ftp.") ? 21 : 80); + } catch (MalformedURLException e) { + sitehash = ""; + ConcurrentLog.logException(e); + } add("site:" + sitehost); } diff --git a/source/net/yacy/search/schema/CollectionConfiguration.java b/source/net/yacy/search/schema/CollectionConfiguration.java index a4812096c..abdb61c72 100644 --- a/source/net/yacy/search/schema/CollectionConfiguration.java +++ b/source/net/yacy/search/schema/CollectionConfiguration.java @@ -381,7 +381,8 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri } if ((allAttr || contains(CollectionSchema.crawldepth_i))) { - CollectionSchema.crawldepth_i.add(doc, document.getDepth()); + int depth = document.getDepth(); + CollectionSchema.crawldepth_i.add(doc, depth); } if (allAttr || (contains(CollectionSchema.cr_host_chance_d) && contains(CollectionSchema.cr_host_count_i) && contains(CollectionSchema.cr_host_norm_i))) {