parent
76afcccaaf
commit
dbfa865700
@ -0,0 +1,324 @@
|
||||
/**
|
||||
* CrawlQueue
|
||||
* Copyright 2013 by Michael Christen
|
||||
* First released 30.08.2013 at http://yacy.net
|
||||
*
|
||||
* This library is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU Lesser General Public
|
||||
* License as published by the Free Software Foundation; either
|
||||
* version 2.1 of the License, or (at your option) any later version.
|
||||
*
|
||||
* This library is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
* Lesser General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Lesser General Public License
|
||||
* along with this program in the file lgpl21.txt
|
||||
* If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
package net.yacy.crawler;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.Iterator;
|
||||
import java.util.Set;
|
||||
|
||||
import net.yacy.cora.document.ASCII;
|
||||
import net.yacy.cora.document.UTF8;
|
||||
import net.yacy.cora.federate.yacy.CacheStrategy;
|
||||
import net.yacy.cora.order.Base64Order;
|
||||
import net.yacy.cora.storage.HandleSet;
|
||||
import net.yacy.cora.util.ConcurrentLog;
|
||||
import net.yacy.cora.util.SpaceExceededException;
|
||||
import net.yacy.crawler.data.Cache;
|
||||
import net.yacy.crawler.data.CrawlProfile;
|
||||
import net.yacy.crawler.data.Latency;
|
||||
import net.yacy.crawler.retrieval.Request;
|
||||
import net.yacy.crawler.robots.RobotsTxt;
|
||||
import net.yacy.kelondro.data.meta.DigestURI;
|
||||
import net.yacy.kelondro.data.meta.URIMetadataRow;
|
||||
import net.yacy.kelondro.index.BufferedObjectIndex;
|
||||
import net.yacy.kelondro.index.Row;
|
||||
import net.yacy.kelondro.index.RowHandleSet;
|
||||
import net.yacy.kelondro.table.Table;
|
||||
import net.yacy.kelondro.util.MemoryControl;
|
||||
import net.yacy.repository.Blacklist.BlacklistType;
|
||||
import net.yacy.search.Switchboard;
|
||||
|
||||
public class CrawlQueue {
|
||||
|
||||
private static final int EcoFSBufferSize = 1000;
|
||||
private static final int objectIndexBufferSize = 1000;
|
||||
private static final int MAX_DOUBLE_PUSH_CHECK = 100000;
|
||||
|
||||
private BufferedObjectIndex urlFileIndex;
|
||||
private final HandleSet double_push_check;
|
||||
private final Set<String> myAgentIDs;
|
||||
private final RobotsTxt robots;
|
||||
private final int minimumLocalDelta;
|
||||
private final int minimumGlobalDelta;
|
||||
|
||||
public CrawlQueue(
|
||||
final File cachePath,
|
||||
final String filename,
|
||||
final int minimumLocalDelta,
|
||||
final int minimumGlobalDelta,
|
||||
final Set<String> myAgentIDs,
|
||||
final RobotsTxt robots,
|
||||
final boolean useTailCache,
|
||||
final boolean exceed134217727) {
|
||||
|
||||
this.myAgentIDs = myAgentIDs;
|
||||
this.robots = robots;
|
||||
this.minimumLocalDelta = minimumLocalDelta;
|
||||
this.minimumGlobalDelta = minimumGlobalDelta;
|
||||
// create a stack for newly entered entries
|
||||
if (!(cachePath.exists())) cachePath.mkdir(); // make the path
|
||||
cachePath.mkdirs();
|
||||
final File f = new File(cachePath, filename);
|
||||
try {
|
||||
this.urlFileIndex = new BufferedObjectIndex(new Table(f, Request.rowdef, EcoFSBufferSize, 0, useTailCache, exceed134217727, true), objectIndexBufferSize);
|
||||
} catch (final SpaceExceededException e) {
|
||||
try {
|
||||
this.urlFileIndex = new BufferedObjectIndex(new Table(f, Request.rowdef, 0, 0, false, exceed134217727, true), objectIndexBufferSize);
|
||||
} catch (final SpaceExceededException e1) {
|
||||
ConcurrentLog.logException(e1);
|
||||
}
|
||||
}
|
||||
this.double_push_check = new RowHandleSet(URIMetadataRow.rowdef.primaryKeyLength, URIMetadataRow.rowdef.objectOrder, 0);
|
||||
ConcurrentLog.info("CrawlQueue", "opened queue file with " + this.urlFileIndex.size() + " entries from " + f.toString());
|
||||
}
|
||||
|
||||
public synchronized void close() {
|
||||
if (this.urlFileIndex != null) {
|
||||
this.urlFileIndex.close();
|
||||
this.urlFileIndex = null;
|
||||
}
|
||||
}
|
||||
|
||||
public void clear() {
|
||||
ConcurrentLog.info("CrawlQueue", "cleaning CrawlQueue with " + this.urlFileIndex.size() + " entries from " + this.urlFileIndex.filename());
|
||||
try {
|
||||
this.urlFileIndex.clear();
|
||||
} catch (final IOException e) {
|
||||
ConcurrentLog.logException(e);
|
||||
}
|
||||
this.double_push_check.clear();
|
||||
}
|
||||
|
||||
public Request get(final byte[] urlhash) throws IOException {
|
||||
assert urlhash != null;
|
||||
if (this.urlFileIndex == null) return null; // case occurs during shutdown
|
||||
final Row.Entry entry = this.urlFileIndex.get(urlhash, false);
|
||||
if (entry == null) return null;
|
||||
return new Request(entry);
|
||||
}
|
||||
|
||||
public int removeAllByProfileHandle(final String profileHandle, final long timeout) throws IOException, SpaceExceededException {
|
||||
// removes all entries with a specific profile hash.
|
||||
// this may last some time
|
||||
// returns number of deletions
|
||||
|
||||
// first find a list of url hashes that shall be deleted
|
||||
final HandleSet urlHashes = new RowHandleSet(this.urlFileIndex.row().primaryKeyLength, Base64Order.enhancedCoder, 100);
|
||||
final long terminate = timeout == Long.MAX_VALUE ? Long.MAX_VALUE : (timeout > 0) ? System.currentTimeMillis() + timeout : Long.MAX_VALUE;
|
||||
synchronized (this) {
|
||||
final Iterator<Row.Entry> i = this.urlFileIndex.rows();
|
||||
Row.Entry rowEntry;
|
||||
Request crawlEntry;
|
||||
while (i.hasNext() && (System.currentTimeMillis() < terminate)) {
|
||||
rowEntry = i.next();
|
||||
crawlEntry = new Request(rowEntry);
|
||||
if (crawlEntry.profileHandle().equals(profileHandle)) {
|
||||
urlHashes.put(crawlEntry.url().hash());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// then delete all these urls from the queues and the file index
|
||||
return remove(urlHashes);
|
||||
}
|
||||
|
||||
/**
|
||||
* this method is only here, because so many import/export methods need it
|
||||
and it was implemented in the previous architecture
|
||||
however, usage is not recommended
|
||||
* @param urlHashes, a list of hashes that shall be removed
|
||||
* @return number of entries that had been removed
|
||||
* @throws IOException
|
||||
*/
|
||||
public synchronized int remove(final HandleSet urlHashes) throws IOException {
|
||||
final int s = this.urlFileIndex.size();
|
||||
int removedCounter = 0;
|
||||
for (final byte[] urlhash: urlHashes) {
|
||||
final Row.Entry entry = this.urlFileIndex.remove(urlhash);
|
||||
if (entry != null) removedCounter++;
|
||||
|
||||
// remove from double-check caches
|
||||
this.double_push_check.remove(urlhash);
|
||||
}
|
||||
if (removedCounter == 0) return 0;
|
||||
assert this.urlFileIndex.size() + removedCounter == s : "urlFileIndex.size() = " + this.urlFileIndex.size() + ", s = " + s;
|
||||
|
||||
return removedCounter;
|
||||
}
|
||||
|
||||
public boolean has(final byte[] urlhashb) {
|
||||
return this.urlFileIndex.has(urlhashb) || this.double_push_check.has(urlhashb);
|
||||
}
|
||||
|
||||
public int size() {
|
||||
return this.urlFileIndex.size();
|
||||
}
|
||||
|
||||
public boolean isEmpty() {
|
||||
return this.urlFileIndex.isEmpty();
|
||||
}
|
||||
|
||||
/**
|
||||
* push a crawl request on the balancer stack
|
||||
* @param entry
|
||||
* @return null if this was successful or a String explaining what went wrong in case of an error
|
||||
* @throws IOException
|
||||
* @throws SpaceExceededException
|
||||
*/
|
||||
public String push(final Request entry, CrawlProfile profile) throws IOException, SpaceExceededException {
|
||||
assert entry != null;
|
||||
final byte[] hash = entry.url().hash();
|
||||
synchronized (this) {
|
||||
// double-check
|
||||
if (this.double_push_check.has(hash)) return "double occurrence in double_push_check";
|
||||
if (this.urlFileIndex.has(hash)) return "double occurrence in urlFileIndex";
|
||||
|
||||
if (this.double_push_check.size() > MAX_DOUBLE_PUSH_CHECK || MemoryControl.shortStatus()) this.double_push_check.clear();
|
||||
this.double_push_check.put(hash);
|
||||
|
||||
// increase dom counter
|
||||
if (profile != null && profile.domMaxPages() != Integer.MAX_VALUE && profile.domMaxPages() > 0) {
|
||||
profile.domInc(entry.url().getHost());
|
||||
}
|
||||
|
||||
// add to index
|
||||
final int s = this.urlFileIndex.size();
|
||||
this.urlFileIndex.put(entry.toRow());
|
||||
assert s < this.urlFileIndex.size() : "hash = " + ASCII.String(hash) + ", s = " + s + ", size = " + this.urlFileIndex.size();
|
||||
assert this.urlFileIndex.has(hash) : "hash = " + ASCII.String(hash);
|
||||
|
||||
// add the hash to a queue if the host is unknown to get this fast into the balancer
|
||||
// now disabled to prevent that a crawl 'freezes' to a specific domain which hosts a lot of pages; the queues are filled anyway
|
||||
//if (!this.domainStacks.containsKey(entry.url().getHost())) pushHashToDomainStacks(entry.url().getHost(), entry.url().hash());
|
||||
}
|
||||
this.robots.ensureExist(entry.url(), CrawlQueue.this.myAgentIDs, true); // concurrently load all robots.txt
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the minimum sleep time for a given url. The result can also be negative to reflect the time since the last access
|
||||
* The time can be as low as Integer.MIN_VALUE to show that there should not be any limitation at all.
|
||||
* @param robots
|
||||
* @param profileEntry
|
||||
* @param crawlURL
|
||||
* @return the sleep time in milliseconds; may be negative for no sleep time
|
||||
*/
|
||||
private long getDomainSleepTime(final CrawlProfile profileEntry, final DigestURI crawlURL) {
|
||||
if (profileEntry == null) return 0;
|
||||
long sleeptime = (
|
||||
profileEntry.cacheStrategy() == CacheStrategy.CACHEONLY ||
|
||||
(profileEntry.cacheStrategy() == CacheStrategy.IFEXIST && Cache.has(crawlURL.hash()))
|
||||
) ? Integer.MIN_VALUE : Latency.waitingRemaining(crawlURL, robots, this.myAgentIDs, this.minimumLocalDelta, this.minimumGlobalDelta); // this uses the robots.txt database and may cause a loading of robots.txt from the server
|
||||
return sleeptime;
|
||||
}
|
||||
|
||||
/**
|
||||
* load a robots.txt to get the robots time.
|
||||
* ATTENTION: this method causes that a robots.txt is loaded from the web which may cause a longer delay in execution.
|
||||
* This shall therefore not be called in synchronized environments.
|
||||
* @param robots
|
||||
* @param profileEntry
|
||||
* @param crawlURL
|
||||
* @return
|
||||
*/
|
||||
private long getRobotsTime(final RobotsTxt robots, final DigestURI crawlURL) {
|
||||
long sleeptime = Latency.waitingRobots(crawlURL, robots, this.myAgentIDs); // this uses the robots.txt database and may cause a loading of robots.txt from the server
|
||||
return sleeptime < 0 ? 0 : sleeptime;
|
||||
}
|
||||
|
||||
/**
|
||||
* get the next entry in this crawl queue in such a way that the domain access time delta is maximized
|
||||
* and always above the given minimum delay time. An additional delay time is computed using the robots.txt
|
||||
* crawl-delay time which is always respected. In case the minimum time cannot ensured, this method pauses
|
||||
* the necessary time until the url is released and returned as CrawlEntry object. In case that a profile
|
||||
* for the computed Entry does not exist, null is returned
|
||||
* @param delay true if the requester demands forced delays using explicit thread sleep
|
||||
* @param profile
|
||||
* @return a url in a CrawlEntry object
|
||||
* @throws IOException
|
||||
* @throws SpaceExceededException
|
||||
*/
|
||||
public Request pop(final boolean delay, final CrawlSwitchboard cs, final RobotsTxt robots) throws IOException {
|
||||
// returns a crawl entry from the stack and ensures minimum delta times
|
||||
|
||||
if (this.urlFileIndex.isEmpty()) return null;
|
||||
long sleeptime = 0;
|
||||
Request crawlEntry = null;
|
||||
CrawlProfile profileEntry = null;
|
||||
while (this.urlFileIndex.size() > 0) {
|
||||
synchronized (this) {
|
||||
Row.Entry rowEntry = this.urlFileIndex.removeOne();
|
||||
if (rowEntry == null) return null;
|
||||
crawlEntry = new Request(rowEntry);
|
||||
profileEntry = cs.getActive(UTF8.getBytes(crawlEntry.profileHandle()));
|
||||
if (profileEntry == null) {
|
||||
ConcurrentLog.warn("CrawlQueue", "no profile entry for handle " + crawlEntry.profileHandle());
|
||||
return null;
|
||||
}
|
||||
|
||||
// check blacklist (again) because the user may have created blacklist entries after the queue has been filled
|
||||
if (Switchboard.urlBlacklist.isListed(BlacklistType.CRAWLER, crawlEntry.url())) {
|
||||
ConcurrentLog.fine("CrawlQueue", "URL '" + crawlEntry.url() + "' is in blacklist.");
|
||||
continue;
|
||||
}
|
||||
|
||||
// at this point we must check if the crawlEntry has relevance because the crawl profile still exists
|
||||
// if not: return null. A calling method must handle the null value and try again
|
||||
profileEntry = cs.getActive(UTF8.getBytes(crawlEntry.profileHandle()));
|
||||
if (profileEntry == null) {
|
||||
ConcurrentLog.warn("CrawlQueue", "no profile entry for handle " + crawlEntry.profileHandle());
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
// depending on the caching policy we need sleep time to avoid DoS-like situations
|
||||
sleeptime = getDomainSleepTime(profileEntry, crawlEntry.url());
|
||||
|
||||
|
||||
long robotsTime = getRobotsTime(robots, crawlEntry.url());
|
||||
Latency.updateAfterSelection(crawlEntry.url(), profileEntry == null ? 0 : robotsTime);
|
||||
if (delay && sleeptime > 0) {
|
||||
// force a busy waiting here
|
||||
// in best case, this should never happen if the balancer works propertly
|
||||
// this is only to protection against the worst case, where the crawler could
|
||||
// behave in a DoS-manner
|
||||
ConcurrentLog.info("CrawlQueue", "forcing crawl-delay of " + sleeptime + " milliseconds for " + crawlEntry.url().getHost() + ": " + Latency.waitingRemainingExplain(crawlEntry.url(), robots, this.myAgentIDs, this.minimumLocalDelta, this.minimumGlobalDelta));
|
||||
long loops = sleeptime / 1000;
|
||||
long rest = sleeptime % 1000;
|
||||
if (loops < 3) {
|
||||
rest = rest + 1000 * loops;
|
||||
loops = 0;
|
||||
}
|
||||
Thread.currentThread().setName("CrawlQueue waiting for " +crawlEntry.url().getHost() + ": " + sleeptime + " milliseconds");
|
||||
synchronized(this) {
|
||||
// must be synchronized here to avoid 'takeover' moves from other threads which then idle the same time which would not be enough
|
||||
if (rest > 0) {try {this.wait(rest);} catch (final InterruptedException e) {}}
|
||||
for (int i = 0; i < loops; i++) {
|
||||
ConcurrentLog.info("CrawlQueue", "waiting for " + crawlEntry.url().getHost() + ": " + (loops - i) + " seconds remaining...");
|
||||
try {this.wait(1000); } catch (final InterruptedException e) {}
|
||||
}
|
||||
}
|
||||
Latency.updateAfterSelection(crawlEntry.url(), robotsTime);
|
||||
}
|
||||
return crawlEntry;
|
||||
}
|
||||
}
|
Loading…
Reference in new issue