From dbfa8657003b96610afe09d9cc9981c02b64901d Mon Sep 17 00:00:00 2001 From: Michael Peter Christen Date: Wed, 31 Jul 2013 13:16:32 +0200 Subject: [PATCH] added a stub of a class for crawler redesign --- source/net/yacy/crawler/CrawlQueue.java | 324 ++++++++++++++++++++++++ 1 file changed, 324 insertions(+) create mode 100644 source/net/yacy/crawler/CrawlQueue.java diff --git a/source/net/yacy/crawler/CrawlQueue.java b/source/net/yacy/crawler/CrawlQueue.java new file mode 100644 index 000000000..1923d1e48 --- /dev/null +++ b/source/net/yacy/crawler/CrawlQueue.java @@ -0,0 +1,324 @@ +/** + * CrawlQueue + * Copyright 2013 by Michael Christen + * First released 30.08.2013 at http://yacy.net + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program in the file lgpl21.txt + * If not, see . + */ + +package net.yacy.crawler; + +import java.io.File; +import java.io.IOException; +import java.util.Iterator; +import java.util.Set; + +import net.yacy.cora.document.ASCII; +import net.yacy.cora.document.UTF8; +import net.yacy.cora.federate.yacy.CacheStrategy; +import net.yacy.cora.order.Base64Order; +import net.yacy.cora.storage.HandleSet; +import net.yacy.cora.util.ConcurrentLog; +import net.yacy.cora.util.SpaceExceededException; +import net.yacy.crawler.data.Cache; +import net.yacy.crawler.data.CrawlProfile; +import net.yacy.crawler.data.Latency; +import net.yacy.crawler.retrieval.Request; +import net.yacy.crawler.robots.RobotsTxt; +import net.yacy.kelondro.data.meta.DigestURI; +import net.yacy.kelondro.data.meta.URIMetadataRow; +import net.yacy.kelondro.index.BufferedObjectIndex; +import net.yacy.kelondro.index.Row; +import net.yacy.kelondro.index.RowHandleSet; +import net.yacy.kelondro.table.Table; +import net.yacy.kelondro.util.MemoryControl; +import net.yacy.repository.Blacklist.BlacklistType; +import net.yacy.search.Switchboard; + +public class CrawlQueue { + + private static final int EcoFSBufferSize = 1000; + private static final int objectIndexBufferSize = 1000; + private static final int MAX_DOUBLE_PUSH_CHECK = 100000; + + private BufferedObjectIndex urlFileIndex; + private final HandleSet double_push_check; + private final Set myAgentIDs; + private final RobotsTxt robots; + private final int minimumLocalDelta; + private final int minimumGlobalDelta; + + public CrawlQueue( + final File cachePath, + final String filename, + final int minimumLocalDelta, + final int minimumGlobalDelta, + final Set myAgentIDs, + final RobotsTxt robots, + final boolean useTailCache, + final boolean exceed134217727) { + + this.myAgentIDs = myAgentIDs; + this.robots = robots; + this.minimumLocalDelta = minimumLocalDelta; + this.minimumGlobalDelta = minimumGlobalDelta; + // create a stack for newly entered entries + if (!(cachePath.exists())) cachePath.mkdir(); // make the path + cachePath.mkdirs(); + final File f = new File(cachePath, filename); + try { + this.urlFileIndex = new BufferedObjectIndex(new Table(f, Request.rowdef, EcoFSBufferSize, 0, useTailCache, exceed134217727, true), objectIndexBufferSize); + } catch (final SpaceExceededException e) { + try { + this.urlFileIndex = new BufferedObjectIndex(new Table(f, Request.rowdef, 0, 0, false, exceed134217727, true), objectIndexBufferSize); + } catch (final SpaceExceededException e1) { + ConcurrentLog.logException(e1); + } + } + this.double_push_check = new RowHandleSet(URIMetadataRow.rowdef.primaryKeyLength, URIMetadataRow.rowdef.objectOrder, 0); + ConcurrentLog.info("CrawlQueue", "opened queue file with " + this.urlFileIndex.size() + " entries from " + f.toString()); + } + + public synchronized void close() { + if (this.urlFileIndex != null) { + this.urlFileIndex.close(); + this.urlFileIndex = null; + } + } + + public void clear() { + ConcurrentLog.info("CrawlQueue", "cleaning CrawlQueue with " + this.urlFileIndex.size() + " entries from " + this.urlFileIndex.filename()); + try { + this.urlFileIndex.clear(); + } catch (final IOException e) { + ConcurrentLog.logException(e); + } + this.double_push_check.clear(); + } + + public Request get(final byte[] urlhash) throws IOException { + assert urlhash != null; + if (this.urlFileIndex == null) return null; // case occurs during shutdown + final Row.Entry entry = this.urlFileIndex.get(urlhash, false); + if (entry == null) return null; + return new Request(entry); + } + + public int removeAllByProfileHandle(final String profileHandle, final long timeout) throws IOException, SpaceExceededException { + // removes all entries with a specific profile hash. + // this may last some time + // returns number of deletions + + // first find a list of url hashes that shall be deleted + final HandleSet urlHashes = new RowHandleSet(this.urlFileIndex.row().primaryKeyLength, Base64Order.enhancedCoder, 100); + final long terminate = timeout == Long.MAX_VALUE ? Long.MAX_VALUE : (timeout > 0) ? System.currentTimeMillis() + timeout : Long.MAX_VALUE; + synchronized (this) { + final Iterator i = this.urlFileIndex.rows(); + Row.Entry rowEntry; + Request crawlEntry; + while (i.hasNext() && (System.currentTimeMillis() < terminate)) { + rowEntry = i.next(); + crawlEntry = new Request(rowEntry); + if (crawlEntry.profileHandle().equals(profileHandle)) { + urlHashes.put(crawlEntry.url().hash()); + } + } + } + + // then delete all these urls from the queues and the file index + return remove(urlHashes); + } + + /** + * this method is only here, because so many import/export methods need it + and it was implemented in the previous architecture + however, usage is not recommended + * @param urlHashes, a list of hashes that shall be removed + * @return number of entries that had been removed + * @throws IOException + */ + public synchronized int remove(final HandleSet urlHashes) throws IOException { + final int s = this.urlFileIndex.size(); + int removedCounter = 0; + for (final byte[] urlhash: urlHashes) { + final Row.Entry entry = this.urlFileIndex.remove(urlhash); + if (entry != null) removedCounter++; + + // remove from double-check caches + this.double_push_check.remove(urlhash); + } + if (removedCounter == 0) return 0; + assert this.urlFileIndex.size() + removedCounter == s : "urlFileIndex.size() = " + this.urlFileIndex.size() + ", s = " + s; + + return removedCounter; + } + + public boolean has(final byte[] urlhashb) { + return this.urlFileIndex.has(urlhashb) || this.double_push_check.has(urlhashb); + } + + public int size() { + return this.urlFileIndex.size(); + } + + public boolean isEmpty() { + return this.urlFileIndex.isEmpty(); + } + + /** + * push a crawl request on the balancer stack + * @param entry + * @return null if this was successful or a String explaining what went wrong in case of an error + * @throws IOException + * @throws SpaceExceededException + */ + public String push(final Request entry, CrawlProfile profile) throws IOException, SpaceExceededException { + assert entry != null; + final byte[] hash = entry.url().hash(); + synchronized (this) { + // double-check + if (this.double_push_check.has(hash)) return "double occurrence in double_push_check"; + if (this.urlFileIndex.has(hash)) return "double occurrence in urlFileIndex"; + + if (this.double_push_check.size() > MAX_DOUBLE_PUSH_CHECK || MemoryControl.shortStatus()) this.double_push_check.clear(); + this.double_push_check.put(hash); + + // increase dom counter + if (profile != null && profile.domMaxPages() != Integer.MAX_VALUE && profile.domMaxPages() > 0) { + profile.domInc(entry.url().getHost()); + } + + // add to index + final int s = this.urlFileIndex.size(); + this.urlFileIndex.put(entry.toRow()); + assert s < this.urlFileIndex.size() : "hash = " + ASCII.String(hash) + ", s = " + s + ", size = " + this.urlFileIndex.size(); + assert this.urlFileIndex.has(hash) : "hash = " + ASCII.String(hash); + + // add the hash to a queue if the host is unknown to get this fast into the balancer + // now disabled to prevent that a crawl 'freezes' to a specific domain which hosts a lot of pages; the queues are filled anyway + //if (!this.domainStacks.containsKey(entry.url().getHost())) pushHashToDomainStacks(entry.url().getHost(), entry.url().hash()); + } + this.robots.ensureExist(entry.url(), CrawlQueue.this.myAgentIDs, true); // concurrently load all robots.txt + return null; + } + + /** + * Get the minimum sleep time for a given url. The result can also be negative to reflect the time since the last access + * The time can be as low as Integer.MIN_VALUE to show that there should not be any limitation at all. + * @param robots + * @param profileEntry + * @param crawlURL + * @return the sleep time in milliseconds; may be negative for no sleep time + */ + private long getDomainSleepTime(final CrawlProfile profileEntry, final DigestURI crawlURL) { + if (profileEntry == null) return 0; + long sleeptime = ( + profileEntry.cacheStrategy() == CacheStrategy.CACHEONLY || + (profileEntry.cacheStrategy() == CacheStrategy.IFEXIST && Cache.has(crawlURL.hash())) + ) ? Integer.MIN_VALUE : Latency.waitingRemaining(crawlURL, robots, this.myAgentIDs, this.minimumLocalDelta, this.minimumGlobalDelta); // this uses the robots.txt database and may cause a loading of robots.txt from the server + return sleeptime; + } + + /** + * load a robots.txt to get the robots time. + * ATTENTION: this method causes that a robots.txt is loaded from the web which may cause a longer delay in execution. + * This shall therefore not be called in synchronized environments. + * @param robots + * @param profileEntry + * @param crawlURL + * @return + */ + private long getRobotsTime(final RobotsTxt robots, final DigestURI crawlURL) { + long sleeptime = Latency.waitingRobots(crawlURL, robots, this.myAgentIDs); // this uses the robots.txt database and may cause a loading of robots.txt from the server + return sleeptime < 0 ? 0 : sleeptime; + } + + /** + * get the next entry in this crawl queue in such a way that the domain access time delta is maximized + * and always above the given minimum delay time. An additional delay time is computed using the robots.txt + * crawl-delay time which is always respected. In case the minimum time cannot ensured, this method pauses + * the necessary time until the url is released and returned as CrawlEntry object. In case that a profile + * for the computed Entry does not exist, null is returned + * @param delay true if the requester demands forced delays using explicit thread sleep + * @param profile + * @return a url in a CrawlEntry object + * @throws IOException + * @throws SpaceExceededException + */ + public Request pop(final boolean delay, final CrawlSwitchboard cs, final RobotsTxt robots) throws IOException { + // returns a crawl entry from the stack and ensures minimum delta times + + if (this.urlFileIndex.isEmpty()) return null; + long sleeptime = 0; + Request crawlEntry = null; + CrawlProfile profileEntry = null; + while (this.urlFileIndex.size() > 0) { + synchronized (this) { + Row.Entry rowEntry = this.urlFileIndex.removeOne(); + if (rowEntry == null) return null; + crawlEntry = new Request(rowEntry); + profileEntry = cs.getActive(UTF8.getBytes(crawlEntry.profileHandle())); + if (profileEntry == null) { + ConcurrentLog.warn("CrawlQueue", "no profile entry for handle " + crawlEntry.profileHandle()); + return null; + } + + // check blacklist (again) because the user may have created blacklist entries after the queue has been filled + if (Switchboard.urlBlacklist.isListed(BlacklistType.CRAWLER, crawlEntry.url())) { + ConcurrentLog.fine("CrawlQueue", "URL '" + crawlEntry.url() + "' is in blacklist."); + continue; + } + + // at this point we must check if the crawlEntry has relevance because the crawl profile still exists + // if not: return null. A calling method must handle the null value and try again + profileEntry = cs.getActive(UTF8.getBytes(crawlEntry.profileHandle())); + if (profileEntry == null) { + ConcurrentLog.warn("CrawlQueue", "no profile entry for handle " + crawlEntry.profileHandle()); + continue; + } + } + } + // depending on the caching policy we need sleep time to avoid DoS-like situations + sleeptime = getDomainSleepTime(profileEntry, crawlEntry.url()); + + + long robotsTime = getRobotsTime(robots, crawlEntry.url()); + Latency.updateAfterSelection(crawlEntry.url(), profileEntry == null ? 0 : robotsTime); + if (delay && sleeptime > 0) { + // force a busy waiting here + // in best case, this should never happen if the balancer works propertly + // this is only to protection against the worst case, where the crawler could + // behave in a DoS-manner + ConcurrentLog.info("CrawlQueue", "forcing crawl-delay of " + sleeptime + " milliseconds for " + crawlEntry.url().getHost() + ": " + Latency.waitingRemainingExplain(crawlEntry.url(), robots, this.myAgentIDs, this.minimumLocalDelta, this.minimumGlobalDelta)); + long loops = sleeptime / 1000; + long rest = sleeptime % 1000; + if (loops < 3) { + rest = rest + 1000 * loops; + loops = 0; + } + Thread.currentThread().setName("CrawlQueue waiting for " +crawlEntry.url().getHost() + ": " + sleeptime + " milliseconds"); + synchronized(this) { + // must be synchronized here to avoid 'takeover' moves from other threads which then idle the same time which would not be enough + if (rest > 0) {try {this.wait(rest);} catch (final InterruptedException e) {}} + for (int i = 0; i < loops; i++) { + ConcurrentLog.info("CrawlQueue", "waiting for " + crawlEntry.url().getHost() + ": " + (loops - i) + " seconds remaining..."); + try {this.wait(1000); } catch (final InterruptedException e) {} + } + } + Latency.updateAfterSelection(crawlEntry.url(), robotsTime); + } + return crawlEntry; + } +}