From fd0976c0a7c44d42f2e1bbb76520e43d2d12b011 Mon Sep 17 00:00:00 2001 From: borg-0300 Date: Mon, 16 Mar 2009 18:08:43 +0000 Subject: [PATCH] refactoring git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@5723 6c8d7289-2bf4-0310-a012-ef5d649a1542 --- source/de/anomic/crawler/CrawlQueues.java | 1207 ++++++++++---------- source/de/anomic/crawler/CrawlStacker.java | 133 +-- 2 files changed, 667 insertions(+), 673 deletions(-) diff --git a/source/de/anomic/crawler/CrawlQueues.java b/source/de/anomic/crawler/CrawlQueues.java index 167c3e6a0..ccf9c303c 100644 --- a/source/de/anomic/crawler/CrawlQueues.java +++ b/source/de/anomic/crawler/CrawlQueues.java @@ -1,599 +1,608 @@ -// CrawlQueues.java -// (C) 2007 by Michael Peter Christen; mc@yacy.net, Frankfurt a. M., Germany -// first published 29.10.2007 on http://yacy.net -// -// This is a part of YaCy, a peer-to-peer based web search engine -// -// $LastChangedDate: 2006-04-02 22:40:07 +0200 (So, 02 Apr 2006) $ -// $LastChangedRevision: 1986 $ -// $LastChangedBy: orbiter $ -// -// LICENSE -// -// This program is free software; you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation; either version 2 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License -// along with this program; if not, write to the Free Software -// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - -package de.anomic.crawler; - -import java.io.File; -import java.io.IOException; -import java.net.MalformedURLException; -import java.text.ParseException; -import java.util.ArrayList; -import java.util.Date; -import java.util.Iterator; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import de.anomic.kelondro.table.FlexWidthArray; -import de.anomic.kelondro.text.Document; -import de.anomic.kelondro.util.DateFormatter; -import de.anomic.kelondro.util.Log; -import de.anomic.plasma.plasmaParser; -import de.anomic.plasma.plasmaSwitchboard; -import de.anomic.plasma.plasmaSwitchboardConstants; -import de.anomic.server.serverProcessorJob; -import de.anomic.xml.RSSFeed; -import de.anomic.xml.RSSMessage; -import de.anomic.yacy.yacyClient; -import de.anomic.yacy.yacySeed; -import de.anomic.yacy.yacyURL; -import de.anomic.yacy.dht.PeerSelection; - -public class CrawlQueues { - - private plasmaSwitchboard sb; - private Log log; - private Map workers; // mapping from url hash to Worker thread object - private ProtocolLoader loader; - private final ArrayList remoteCrawlProviderHashes; - - public NoticedURL noticeURL; - public ZURL errorURL, delegatedURL; - - public CrawlQueues(final plasmaSwitchboard sb, final File plasmaPath) { - this.sb = sb; - this.log = new Log("CRAWLER"); - this.workers = new ConcurrentHashMap(); - this.loader = new ProtocolLoader(sb, log); - this.remoteCrawlProviderHashes = new ArrayList(); - - // start crawling management - log.logConfig("Starting Crawling Management"); - noticeURL = new NoticedURL(plasmaPath); - //errorURL = new plasmaCrawlZURL(); // fresh error DB each startup; can be hold in RAM and reduces IO; - final File errorDBFile = new File(plasmaPath, "urlError2.db"); - if (errorDBFile.exists()) { - // delete the error db to get a fresh each time on startup - // this is useful because there is currently no re-use of the data in this table. - if (errorDBFile.isDirectory()) FlexWidthArray.delete(plasmaPath, "urlError2.db"); else errorDBFile.delete(); - } - errorURL = new ZURL(plasmaPath, "urlError3.db", false); - delegatedURL = new ZURL(plasmaPath, "urlDelegated3.db", true); - } - - public String urlExists(final String hash) { - // tests if hash occurrs in any database - // if it exists, the name of the database is returned, - // if it not exists, null is returned - if (noticeURL.existsInStack(hash)) return "crawler"; - if (delegatedURL.exists(hash)) return "delegated"; - if (errorURL.exists(hash)) return "errors"; - for (final crawlWorker worker: workers.values()) { - if (worker.entry.url().hash().equals(hash)) return "worker"; - } - return null; - } - - public void urlRemove(final String hash) { - noticeURL.removeByURLHash(hash); - delegatedURL.remove(hash); - errorURL.remove(hash); - } - - public yacyURL getURL(final String urlhash) { - assert urlhash != null; - if (urlhash == null || urlhash.length() == 0) return null; - final CrawlEntry ne = noticeURL.get(urlhash); - if (ne != null) return ne.url(); - ZURL.Entry ee = delegatedURL.getEntry(urlhash); - if (ee != null) return ee.url(); - ee = errorURL.getEntry(urlhash); - if (ee != null) return ee.url(); - for (final crawlWorker w: workers.values()) { - if (w.entry.url().hash().equals(urlhash)) return w.entry.url(); - } - return null; - } - - public void cleanup() { - // wait for all workers to finish - int timeout = (int) sb.getConfigLong("crawler.clientTimeout", 10000); - for (final crawlWorker w: workers.values()) { - if (w.age() > timeout) w.interrupt(); - } - } - - public void clear() { - // wait for all workers to finish - for (final crawlWorker w: workers.values()) { - w.interrupt(); - } - // TODO: wait some more time until all threads are finished - workers.clear(); - remoteCrawlProviderHashes.clear(); - noticeURL.clear(); - try { - errorURL.clear(); - } catch (final IOException e) { - e.printStackTrace(); - } - try { - delegatedURL.clear(); - } catch (final IOException e) { - e.printStackTrace(); - } - } - - public void close() { - // wait for all workers to finish - for (final crawlWorker w: workers.values()) { - w.interrupt(); - } - // TODO: wait some more time until all threads are finished - noticeURL.close(); - errorURL.close(); - delegatedURL.close(); - } - - public CrawlEntry[] activeWorkerEntries() { - synchronized (workers) { - final CrawlEntry[] e = new CrawlEntry[workers.size()]; - int i = 0; - for (final crawlWorker w: workers.values()) e[i++] = w.entry; - return e; - } - } - - public boolean isSupportedProtocol(final String protocol) { - return loader.isSupportedProtocol(protocol); - } - - public int coreCrawlJobSize() { - return noticeURL.stackSize(NoticedURL.STACK_TYPE_CORE); - } - - public boolean coreCrawlJob() { - - final boolean robinsonPrivateCase = ((sb.isRobinsonMode()) && - (!sb.getConfig(plasmaSwitchboardConstants.CLUSTER_MODE, "").equals(plasmaSwitchboardConstants.CLUSTER_MODE_PUBLIC_CLUSTER)) && - (!sb.getConfig(plasmaSwitchboardConstants.CLUSTER_MODE, "").equals(plasmaSwitchboardConstants.CLUSTER_MODE_PRIVATE_CLUSTER))); - - if (((robinsonPrivateCase) || (coreCrawlJobSize() <= 20)) && (limitCrawlJobSize() > 0)) { - // move some tasks to the core crawl job so we have something to do - final int toshift = Math.min(10, limitCrawlJobSize()); // this cannot be a big number because the balancer makes a forced waiting if it cannot balance - for (int i = 0; i < toshift; i++) { - noticeURL.shift(NoticedURL.STACK_TYPE_LIMIT, NoticedURL.STACK_TYPE_CORE, sb.webIndex.profilesActiveCrawls); - } - log.logInfo("shifted " + toshift + " jobs from global crawl to local crawl (coreCrawlJobSize()=" + coreCrawlJobSize() + - ", limitCrawlJobSize()=" + limitCrawlJobSize() + ", cluster.mode=" + sb.getConfig(plasmaSwitchboardConstants.CLUSTER_MODE, "") + - ", robinsonMode=" + ((sb.isRobinsonMode()) ? "on" : "off")); - } - - if(!crawlIsPossible(NoticedURL.STACK_TYPE_CORE, "Core")) return false; - - if(isPaused(plasmaSwitchboardConstants.CRAWLJOB_LOCAL_CRAWL)) return false; - - // do a local crawl - CrawlEntry urlEntry = null; - while (urlEntry == null && noticeURL.stackSize(NoticedURL.STACK_TYPE_CORE) > 0) { - final String stats = "LOCALCRAWL[" + noticeURL.stackSize(NoticedURL.STACK_TYPE_CORE) + ", " + noticeURL.stackSize(NoticedURL.STACK_TYPE_LIMIT) + ", " + noticeURL.stackSize(NoticedURL.STACK_TYPE_OVERHANG) + ", " + noticeURL.stackSize(NoticedURL.STACK_TYPE_REMOTE) + "]"; - try { - urlEntry = noticeURL.pop(NoticedURL.STACK_TYPE_CORE, true, sb.webIndex.profilesActiveCrawls); - if (urlEntry == null) continue; - final String profileHandle = urlEntry.profileHandle(); - // System.out.println("DEBUG plasmaSwitchboard.processCrawling: - // profileHandle = " + profileHandle + ", urlEntry.url = " + urlEntry.url()); - if (profileHandle == null) { - log.logSevere(stats + ": NULL PROFILE HANDLE '" + urlEntry.profileHandle() + "' for URL " + urlEntry.url()); - return true; - } - generateCrawl(urlEntry, stats, profileHandle); - return true; - } catch (final IOException e) { - log.logSevere(stats + ": CANNOT FETCH ENTRY: " + e.getMessage(), e); - if (e.getMessage().indexOf("hash is null") > 0) noticeURL.clear(NoticedURL.STACK_TYPE_CORE); - } - } - return true; - } - - /** - * Make some checks if crawl is valid and start it - * - * @param urlEntry - * @param profileHandle - * @param stats String for log prefixing - * @return - */ - private void generateCrawl(CrawlEntry urlEntry, final String stats, final String profileHandle) { - final CrawlProfile.entry profile = sb.webIndex.profilesActiveCrawls.getEntry(profileHandle); - if (profile != null) { - - // check if the protocol is supported - final yacyURL url = urlEntry.url(); - final String urlProtocol = url.getProtocol(); - if (this.sb.crawlQueues.isSupportedProtocol(urlProtocol)) { - - if (this.log.isFine()) - log.logFine(stats + ": URL=" + urlEntry.url() - + ", initiator=" + urlEntry.initiator() - + ", crawlOrder=" + ((profile.remoteIndexing()) ? "true" : "false") - + ", depth=" + urlEntry.depth() - + ", crawlDepth=" + profile.depth() - + ", must-match=" + profile.mustMatchPattern().toString() - + ", must-not-match=" + profile.mustNotMatchPattern().toString() - + ", permission=" + ((sb.webIndex.peers() == null) ? "undefined" : (((sb.webIndex.peers().mySeed().isSenior()) || (sb.webIndex.peers().mySeed().isPrincipal())) ? "true" : "false"))); - - processLocalCrawling(urlEntry, stats); - } else { - this.log.logSevere("Unsupported protocol in URL '" + url.toString()); - } - } else { - log.logWarning(stats + ": LOST PROFILE HANDLE '" + urlEntry.profileHandle() + "' for URL " + urlEntry.url()); - } - } - - /** - * if crawling was paused we have to wait until we were notified to continue - * blocks until pause is ended - * @param crawljob - * @return - */ - private boolean isPaused(String crawljob) { - final Object[] status = sb.crawlJobsStatus.get(crawljob); - boolean pauseEnded = false; - synchronized(status[plasmaSwitchboardConstants.CRAWLJOB_SYNC]) { - if (((Boolean)status[plasmaSwitchboardConstants.CRAWLJOB_STATUS]).booleanValue()) { - try { - status[plasmaSwitchboardConstants.CRAWLJOB_SYNC].wait(); - } - catch (final InterruptedException e) { pauseEnded = true;} - } - } - return pauseEnded; - } - - /** - * Checks if crawl queue has elements and new crawl will not exceed thread-limit - * @param stackType - * @param type - * @return - */ - private boolean crawlIsPossible(int stackType, final String type) { - if (noticeURL.stackSize(stackType) == 0) { - //log.logDebug("GlobalCrawl: queue is empty"); - return false; - } - - if (sb.webIndex.queuePreStack.size() >= (int) sb.getConfigLong(plasmaSwitchboardConstants.INDEXER_SLOTS, 30)) { - if (this.log.isFine()) log.logFine(type + "Crawl: too many processes in indexing queue, dismissed (" + "sbQueueSize=" + sb.webIndex.queuePreStack.size() + ")"); - return false; - } - if (this.size() >= sb.getConfigLong(plasmaSwitchboardConstants.CRAWLER_THREADS_ACTIVE_MAX, 10)) { - // try a cleanup - this.cleanup(); - } - // check again - if (this.size() >= sb.getConfigLong(plasmaSwitchboardConstants.CRAWLER_THREADS_ACTIVE_MAX, 10)) { - if (this.log.isFine()) log.logFine(type + "Crawl: too many processes in loader queue, dismissed (" + "cacheLoader=" + this.size() + ")"); - return false; - } - - if (sb.onlineCaution()) { - if (this.log.isFine()) log.logFine(type + "Crawl: online caution, omitting processing"); - return false; - } - return true; - } - - public boolean remoteCrawlLoaderJob() { - // check if we are allowed to crawl urls provided by other peers - if (!sb.webIndex.peers().mySeed().getFlagAcceptRemoteCrawl()) { - //this.log.logInfo("remoteCrawlLoaderJob: not done, we are not allowed to do that"); - return false; - } - - // check if we are a senior peer - if (!sb.webIndex.peers().mySeed().isActive()) { - //this.log.logInfo("remoteCrawlLoaderJob: not done, this should be a senior or principal peer"); - return false; - } - - if (sb.webIndex.queuePreStack.size() >= (int) sb.getConfigLong(plasmaSwitchboardConstants.INDEXER_SLOTS, 30) / 2) { - if (this.log.isFine()) log.logFine("remoteCrawlLoaderJob: too many processes in indexing queue, dismissed (" + "sbQueueSize=" + sb.webIndex.queuePreStack.size() + ")"); - return false; - } - - if (this.size() >= sb.getConfigLong(plasmaSwitchboardConstants.CRAWLER_THREADS_ACTIVE_MAX, 10)) { - // try a cleanup - cleanup(); - } - // check again - if (this.size() >= sb.getConfigLong(plasmaSwitchboardConstants.CRAWLER_THREADS_ACTIVE_MAX, 10)) { - if (this.log.isFine()) log.logFine("remoteCrawlLoaderJob: too many processes in loader queue, dismissed (" + "cacheLoader=" + this.size() + ")"); - return false; - } - - if (sb.onlineCaution()) { - if (this.log.isFine()) log.logFine("remoteCrawlLoaderJob: online caution, omitting processing"); - return false; - } - - if (remoteTriggeredCrawlJobSize() > 100) { - if (this.log.isFine()) log.logFine("remoteCrawlLoaderJob: the remote-triggered crawl job queue is filled, omitting processing"); - return false; - } - - if (coreCrawlJobSize() > 0) { - if (this.log.isFine()) log.logFine("remoteCrawlLoaderJob: a local crawl is running, omitting processing"); - return false; - } - - // check if we have an entry in the provider list, otherwise fill the list - yacySeed seed; - if (remoteCrawlProviderHashes.size() == 0) { - if (sb.webIndex.peers() != null && sb.webIndex.peers().sizeConnected() > 0) { - final Iterator e = PeerSelection.getProvidesRemoteCrawlURLs(sb.webIndex.peers()); - while (e.hasNext()) { - seed = e.next(); - if (seed != null) { - remoteCrawlProviderHashes.add(seed.hash); - } - } - } - } - if (remoteCrawlProviderHashes.size() == 0) return false; - - // take one entry from the provider list and load the entries from the remote peer - seed = null; - String hash = null; - while ((seed == null) && (remoteCrawlProviderHashes.size() > 0)) { - hash = remoteCrawlProviderHashes.remove(remoteCrawlProviderHashes.size() - 1); - if (hash == null) continue; - seed = sb.webIndex.peers().get(hash); - if (seed == null) continue; - // check if the peer is inside our cluster - if ((sb.isRobinsonMode()) && (!sb.isInMyCluster(seed))) { - seed = null; - continue; - } - } - if (seed == null) return false; - - // we know a peer which should provide remote crawl entries. load them now. - final RSSFeed feed = yacyClient.queryRemoteCrawlURLs(sb.webIndex.peers(), seed, 30, 60000); - if (feed == null || feed.size() == 0) { - // something is wrong with this provider. To prevent that we get not stuck with this peer - // we remove it from the peer list - sb.webIndex.peers().peerActions.peerDeparture(seed, "no results from provided remote crawls"); - // ask another peer - return remoteCrawlLoaderJob(); - } - - // parse the rss - yacyURL url, referrer; - Date loaddate; - for (final RSSMessage item: feed) { - //System.out.println("URL=" + item.getLink() + ", desc=" + item.getDescription() + ", pubDate=" + item.getPubDate()); - - // put url on remote crawl stack - try { - url = new yacyURL(item.getLink(), null); - } catch (final MalformedURLException e) { - url = null; - } - try { - referrer = new yacyURL(item.getReferrer(), null); - } catch (final MalformedURLException e) { - referrer = null; - } - try { - loaddate = DateFormatter.parseShortSecond(item.getPubDate()); - } catch (final ParseException e) { - loaddate = new Date(); - } - final String urlRejectReason = sb.crawlStacker.urlInAcceptedDomain(url); - if (urlRejectReason == null) { - // stack url - if (sb.getLog().isFinest()) sb.getLog().logFinest("crawlOrder: stack: url='" + url + "'"); - sb.crawlStacker.enqueueEntry(new CrawlEntry( - hash, - url, - (referrer == null) ? null : referrer.hash(), - item.getDescription(), - null, - loaddate, - sb.webIndex.defaultRemoteProfile.handle(), - 0, - 0, - 0 - )); - } else { - log.logWarning("crawlOrder: Rejected URL '" + urlToString(url) + "': " + urlRejectReason); - } - } - return true; - } - - /** - * @param url - * @return - */ - private String urlToString(final yacyURL url) { - return (url == null ? "null" : url.toNormalform(true, false)); - } - - public int limitCrawlJobSize() { - return noticeURL.stackSize(NoticedURL.STACK_TYPE_LIMIT); - } - - public int remoteTriggeredCrawlJobSize() { - return noticeURL.stackSize(NoticedURL.STACK_TYPE_REMOTE); - } - - public boolean remoteTriggeredCrawlJob() { - // work off crawl requests that had been placed by other peers to our crawl stack - - // do nothing if either there are private processes to be done - // or there is no global crawl on the stack - if(!crawlIsPossible(NoticedURL.STACK_TYPE_REMOTE, "Global")) return false; - - if(isPaused(plasmaSwitchboardConstants.CRAWLJOB_REMOTE_TRIGGERED_CRAWL)) return false; - - // we don't want to crawl a global URL globally, since WE are the global part. (from this point of view) - final String stats = "REMOTETRIGGEREDCRAWL[" + noticeURL.stackSize(NoticedURL.STACK_TYPE_CORE) + ", " + noticeURL.stackSize(NoticedURL.STACK_TYPE_LIMIT) + ", " + noticeURL.stackSize(NoticedURL.STACK_TYPE_OVERHANG) + ", " - + noticeURL.stackSize(NoticedURL.STACK_TYPE_REMOTE) + "]"; - try { - final CrawlEntry urlEntry = noticeURL.pop(NoticedURL.STACK_TYPE_REMOTE, true, sb.webIndex.profilesActiveCrawls); - final String profileHandle = urlEntry.profileHandle(); - // System.out.println("DEBUG plasmaSwitchboard.processCrawling: - // profileHandle = " + profileHandle + ", urlEntry.url = " + - // urlEntry.url()); - generateCrawl(urlEntry, stats, profileHandle); - return true; - } catch (final IOException e) { - log.logSevere(stats + ": CANNOT FETCH ENTRY: " + e.getMessage(), e); - if (e.getMessage().indexOf("hash is null") > 0) noticeURL.clear(NoticedURL.STACK_TYPE_REMOTE); - return true; - } - } - - private void processLocalCrawling(final CrawlEntry entry, final String stats) { - // work off one Crawl stack entry - if ((entry == null) || (entry.url() == null)) { - log.logInfo(stats + ": urlEntry = null"); - return; - } - new crawlWorker(entry); - - log.logInfo(stats + ": enqueued for load " + entry.url() + " [" + entry.url().hash() + "]"); - return; - } - - public Document loadResourceFromWeb( - final yacyURL url, - final int socketTimeout, - final boolean keepInMemory, - final boolean forText, - final boolean global - ) throws IOException { - - final CrawlEntry centry = new CrawlEntry( - sb.webIndex.peers().mySeed().hash, - url, - "", - "", - new Date(), - new Date(), - (forText) ? - ((global) ? - sb.webIndex.defaultTextSnippetGlobalProfile.handle() : - sb.webIndex.defaultTextSnippetLocalProfile.handle()) - : - ((global) ? - sb.webIndex.defaultMediaSnippetGlobalProfile.handle() : - sb.webIndex.defaultMediaSnippetLocalProfile.handle()), // crawl profile - 0, - 0, - 0); - - return loader.load(centry, (forText) ? plasmaParser.PARSER_MODE_CRAWLER : plasmaParser.PARSER_MODE_IMAGE); - } - - public int size() { - return workers.size(); - } - - protected final class crawlWorker extends Thread { - - private CrawlEntry entry; - private final Integer code; - private long start; - - public crawlWorker(final CrawlEntry entry) { - this.start = System.currentTimeMillis(); - this.entry = entry; - this.entry.setStatus("worker-initialized", serverProcessorJob.STATUS_INITIATED); - this.code = Integer.valueOf(entry.hashCode()); - if (!workers.containsKey(code)) { - workers.put(code, this); - this.start(); - } - } - - public long age() { - return System.currentTimeMillis() - start; - } - - public void run() { - try { - // checking robots.txt for http(s) resources - this.entry.setStatus("worker-checkingrobots", serverProcessorJob.STATUS_STARTED); - if ((entry.url().getProtocol().equals("http") || entry.url().getProtocol().equals("https")) && sb.robots.isDisallowed(entry.url())) { - if (log.isFine()) log.logFine("Crawling of URL '" + entry.url().toString() + "' disallowed by robots.txt."); - final ZURL.Entry eentry = errorURL.newEntry( - this.entry, - sb.webIndex.peers().mySeed().hash, - new Date(), - 1, - "denied by robots.txt"); - eentry.store(); - errorURL.push(eentry); - } else { - // starting a load from the internet - this.entry.setStatus("worker-loading", serverProcessorJob.STATUS_RUNNING); - final String result = loader.process(this.entry, plasmaParser.PARSER_MODE_CRAWLER); - if (result != null) { - final ZURL.Entry eentry = errorURL.newEntry( - this.entry, - sb.webIndex.peers().mySeed().hash, - new Date(), - 1, - "cannot load: " + result); - eentry.store(); - errorURL.push(eentry); - } else { - this.entry.setStatus("worker-processed", serverProcessorJob.STATUS_FINISHED); - } - } - } catch (final Exception e) { - final ZURL.Entry eentry = errorURL.newEntry( - this.entry, - sb.webIndex.peers().mySeed().hash, - new Date(), - 1, - e.getMessage() + " - in worker"); - eentry.store(); - errorURL.push(eentry); - e.printStackTrace(); - } finally { - workers.remove(code); - this.entry.setStatus("worker-finalized", serverProcessorJob.STATUS_FINISHED); - } - } - - } - -} +// CrawlQueues.java +// (C) 2007 by Michael Peter Christen; mc@yacy.net, Frankfurt a. M., Germany +// first published 29.10.2007 on http://yacy.net +// +// This is a part of YaCy, a peer-to-peer based web search engine +// +// $LastChangedDate$ +// $LastChangedRevision$ +// $LastChangedBy$ +// +// LICENSE +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation; either version 2 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +package de.anomic.crawler; + +import java.io.File; +import java.io.IOException; +import java.net.MalformedURLException; +import java.text.ParseException; +import java.util.ArrayList; +import java.util.Date; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import de.anomic.kelondro.table.FlexWidthArray; +import de.anomic.kelondro.text.Document; +import de.anomic.kelondro.util.DateFormatter; +import de.anomic.kelondro.util.Log; +import de.anomic.plasma.plasmaParser; +import de.anomic.plasma.plasmaSwitchboard; +import de.anomic.plasma.plasmaSwitchboardConstants; +import de.anomic.server.serverProcessorJob; +import de.anomic.xml.RSSFeed; +import de.anomic.xml.RSSMessage; +import de.anomic.yacy.yacyClient; +import de.anomic.yacy.yacySeed; +import de.anomic.yacy.yacyURL; +import de.anomic.yacy.dht.PeerSelection; + +public class CrawlQueues { + + private plasmaSwitchboard sb; + private Log log; + private Map workers; // mapping from url hash to Worker thread object + private ProtocolLoader loader; + private final ArrayList remoteCrawlProviderHashes; + + public NoticedURL noticeURL; + public ZURL errorURL, delegatedURL; + + public CrawlQueues(final plasmaSwitchboard sb, final File plasmaPath) { + this.sb = sb; + this.log = new Log("CRAWLER"); + this.workers = new ConcurrentHashMap(); + this.loader = new ProtocolLoader(sb, log); + this.remoteCrawlProviderHashes = new ArrayList(); + + // start crawling management + log.logConfig("Starting Crawling Management"); + noticeURL = new NoticedURL(plasmaPath); + //errorURL = new plasmaCrawlZURL(); // fresh error DB each startup; can be hold in RAM and reduces IO; + final File errorDBFile = new File(plasmaPath, "urlError2.db"); + if (errorDBFile.exists()) { + // delete the error db to get a fresh each time on startup + // this is useful because there is currently no re-use of the data in this table. + if (errorDBFile.isDirectory()) FlexWidthArray.delete(plasmaPath, "urlError2.db"); else errorDBFile.delete(); + } + errorURL = new ZURL(plasmaPath, "urlError3.db", false); + delegatedURL = new ZURL(plasmaPath, "urlDelegated3.db", true); + } + + public String urlExists(final String hash) { + // tests if hash occurrs in any database + // if it exists, the name of the database is returned, + // if it not exists, null is returned + if (noticeURL.existsInStack(hash)) return "crawler"; + if (delegatedURL.exists(hash)) return "delegated"; + if (errorURL.exists(hash)) return "errors"; + for (final crawlWorker worker: workers.values()) { + if (worker.entry.url().hash().equals(hash)) return "worker"; + } + return null; + } + + public void urlRemove(final String hash) { + noticeURL.removeByURLHash(hash); + delegatedURL.remove(hash); + errorURL.remove(hash); + } + + public yacyURL getURL(final String urlhash) { + assert urlhash != null; + if (urlhash == null || urlhash.length() == 0) return null; + final CrawlEntry ne = noticeURL.get(urlhash); + if (ne != null) return ne.url(); + ZURL.Entry ee = delegatedURL.getEntry(urlhash); + if (ee != null) return ee.url(); + ee = errorURL.getEntry(urlhash); + if (ee != null) return ee.url(); + for (final crawlWorker w: workers.values()) { + if (w.entry.url().hash().equals(urlhash)) return w.entry.url(); + } + return null; + } + + public void cleanup() { + // wait for all workers to finish + int timeout = (int) sb.getConfigLong("crawler.clientTimeout", 10000); + for (final crawlWorker w: workers.values()) { + if (w.age() > timeout) w.interrupt(); + } + } + + public void clear() { + // wait for all workers to finish + for (final crawlWorker w: workers.values()) { + w.interrupt(); + } + // TODO: wait some more time until all threads are finished + workers.clear(); + remoteCrawlProviderHashes.clear(); + noticeURL.clear(); + try { + errorURL.clear(); + } catch (final IOException e) { + e.printStackTrace(); + } + try { + delegatedURL.clear(); + } catch (final IOException e) { + e.printStackTrace(); + } + } + + public void close() { + // wait for all workers to finish + for (final crawlWorker w: workers.values()) { + w.interrupt(); + } + // TODO: wait some more time until all threads are finished + noticeURL.close(); + errorURL.close(); + delegatedURL.close(); + } + + public CrawlEntry[] activeWorkerEntries() { + synchronized (workers) { + final CrawlEntry[] e = new CrawlEntry[workers.size()]; + int i = 0; + for (final crawlWorker w: workers.values()) e[i++] = w.entry; + return e; + } + } + + public boolean isSupportedProtocol(final String protocol) { + return loader.isSupportedProtocol(protocol); + } + + public int coreCrawlJobSize() { + return noticeURL.stackSize(NoticedURL.STACK_TYPE_CORE); + } + + public boolean coreCrawlJob() { + + final boolean robinsonPrivateCase = ((sb.isRobinsonMode()) && + (!sb.getConfig(plasmaSwitchboardConstants.CLUSTER_MODE, "").equals(plasmaSwitchboardConstants.CLUSTER_MODE_PUBLIC_CLUSTER)) && + (!sb.getConfig(plasmaSwitchboardConstants.CLUSTER_MODE, "").equals(plasmaSwitchboardConstants.CLUSTER_MODE_PRIVATE_CLUSTER))); + + if (((robinsonPrivateCase) || (coreCrawlJobSize() <= 20)) && (limitCrawlJobSize() > 0)) { + // move some tasks to the core crawl job so we have something to do + final int toshift = Math.min(10, limitCrawlJobSize()); // this cannot be a big number because the balancer makes a forced waiting if it cannot balance + for (int i = 0; i < toshift; i++) { + noticeURL.shift(NoticedURL.STACK_TYPE_LIMIT, NoticedURL.STACK_TYPE_CORE, sb.webIndex.profilesActiveCrawls); + } + log.logInfo("shifted " + toshift + " jobs from global crawl to local crawl (coreCrawlJobSize()=" + coreCrawlJobSize() + + ", limitCrawlJobSize()=" + limitCrawlJobSize() + ", cluster.mode=" + sb.getConfig(plasmaSwitchboardConstants.CLUSTER_MODE, "") + + ", robinsonMode=" + ((sb.isRobinsonMode()) ? "on" : "off")); + } + + if(!crawlIsPossible(NoticedURL.STACK_TYPE_CORE, "Core")) return false; + + if(isPaused(plasmaSwitchboardConstants.CRAWLJOB_LOCAL_CRAWL)) return false; + + // do a local crawl + CrawlEntry urlEntry = null; + while (urlEntry == null && noticeURL.stackSize(NoticedURL.STACK_TYPE_CORE) > 0) { + final String stats = "LOCALCRAWL[" + noticeURL.stackSize(NoticedURL.STACK_TYPE_CORE) + ", " + noticeURL.stackSize(NoticedURL.STACK_TYPE_LIMIT) + ", " + noticeURL.stackSize(NoticedURL.STACK_TYPE_OVERHANG) + ", " + noticeURL.stackSize(NoticedURL.STACK_TYPE_REMOTE) + "]"; + try { + urlEntry = noticeURL.pop(NoticedURL.STACK_TYPE_CORE, true, sb.webIndex.profilesActiveCrawls); + if (urlEntry == null) continue; + final String profileHandle = urlEntry.profileHandle(); + // System.out.println("DEBUG plasmaSwitchboard.processCrawling: + // profileHandle = " + profileHandle + ", urlEntry.url = " + urlEntry.url()); + if (profileHandle == null) { + log.logSevere(stats + ": NULL PROFILE HANDLE '" + urlEntry.profileHandle() + "' for URL " + urlEntry.url()); + return true; + } + generateCrawl(urlEntry, stats, profileHandle); + return true; + } catch (final IOException e) { + log.logSevere(stats + ": CANNOT FETCH ENTRY: " + e.getMessage(), e); + if (e.getMessage().indexOf("hash is null") > 0) noticeURL.clear(NoticedURL.STACK_TYPE_CORE); + } + } + return true; + } + + /** + * Make some checks if crawl is valid and start it + * + * @param urlEntry + * @param profileHandle + * @param stats String for log prefixing + * @return + */ + private void generateCrawl(CrawlEntry urlEntry, final String stats, final String profileHandle) { + final CrawlProfile.entry profile = sb.webIndex.profilesActiveCrawls.getEntry(profileHandle); + if (profile != null) { + + // check if the protocol is supported + final yacyURL url = urlEntry.url(); + final String urlProtocol = url.getProtocol(); + if (this.sb.crawlQueues.isSupportedProtocol(urlProtocol)) { + + if (this.log.isFine()) + log.logFine(stats + ": URL=" + urlEntry.url() + + ", initiator=" + urlEntry.initiator() + + ", crawlOrder=" + ((profile.remoteIndexing()) ? "true" : "false") + + ", depth=" + urlEntry.depth() + + ", crawlDepth=" + profile.depth() + + ", must-match=" + profile.mustMatchPattern().toString() + + ", must-not-match=" + profile.mustNotMatchPattern().toString() + + ", permission=" + ((sb.webIndex.peers() == null) ? "undefined" : (((sb.webIndex.peers().mySeed().isSenior()) || (sb.webIndex.peers().mySeed().isPrincipal())) ? "true" : "false"))); + + processLocalCrawling(urlEntry, stats); + } else { + this.log.logSevere("Unsupported protocol in URL '" + url.toString()); + } + } else { + log.logWarning(stats + ": LOST PROFILE HANDLE '" + urlEntry.profileHandle() + "' for URL " + urlEntry.url()); + } + } + + /** + * if crawling was paused we have to wait until we were notified to continue + * blocks until pause is ended + * @param crawljob + * @return + */ + private boolean isPaused(String crawljob) { + final Object[] status = sb.crawlJobsStatus.get(crawljob); + boolean pauseEnded = false; + synchronized(status[plasmaSwitchboardConstants.CRAWLJOB_SYNC]) { + if (((Boolean)status[plasmaSwitchboardConstants.CRAWLJOB_STATUS]).booleanValue()) { + try { + status[plasmaSwitchboardConstants.CRAWLJOB_SYNC].wait(); + } + catch (final InterruptedException e) { pauseEnded = true;} + } + } + return pauseEnded; + } + + /** + * Checks if crawl queue has elements and new crawl will not exceed thread-limit + * @param stackType + * @param type + * @return + */ + private boolean crawlIsPossible(int stackType, final String type) { + int value; + if (noticeURL.stackSize(stackType) == 0) { + //log.logDebug("GlobalCrawl: queue is empty"); + return false; + } + + value = (int) sb.getConfigLong(plasmaSwitchboardConstants.INDEXER_SLOTS, 30); + if (sb.webIndex.queuePreStack.size() >= value) { + if (this.log.isFine()) { + log.logFine(type + "Crawl: too many processes in indexing queue, dismissed (" + "sbQueueSize=" + sb.webIndex.queuePreStack.size() + ")"); + } + return false; + } + value = (int) sb.getConfigLong(plasmaSwitchboardConstants.CRAWLER_THREADS_ACTIVE_MAX, 10); + if (this.size() >= value) { + // try a cleanup + this.cleanup(); + } + // check again + if (this.size() >= value) { + if (this.log.isFine()) { + log.logFine(type + "Crawl: too many processes in loader queue, dismissed (" + "cacheLoader=" + this.size() + ")"); + } + return false; + } + + if (sb.onlineCaution()) { + if (this.log.isFine()) { + log.logFine(type + "Crawl: online caution, omitting processing"); + } + return false; + } + return true; + } + + public boolean remoteCrawlLoaderJob() { + // check if we are allowed to crawl urls provided by other peers + if (!sb.webIndex.peers().mySeed().getFlagAcceptRemoteCrawl()) { + //this.log.logInfo("remoteCrawlLoaderJob: not done, we are not allowed to do that"); + return false; + } + + // check if we are a senior peer + if (!sb.webIndex.peers().mySeed().isActive()) { + //this.log.logInfo("remoteCrawlLoaderJob: not done, this should be a senior or principal peer"); + return false; + } + + if (sb.webIndex.queuePreStack.size() >= (int) sb.getConfigLong(plasmaSwitchboardConstants.INDEXER_SLOTS, 30) / 2) { + if (this.log.isFine()) log.logFine("remoteCrawlLoaderJob: too many processes in indexing queue, dismissed (" + "sbQueueSize=" + sb.webIndex.queuePreStack.size() + ")"); + return false; + } + + if (this.size() >= sb.getConfigLong(plasmaSwitchboardConstants.CRAWLER_THREADS_ACTIVE_MAX, 10)) { + // try a cleanup + cleanup(); + } + // check again + if (this.size() >= sb.getConfigLong(plasmaSwitchboardConstants.CRAWLER_THREADS_ACTIVE_MAX, 10)) { + if (this.log.isFine()) log.logFine("remoteCrawlLoaderJob: too many processes in loader queue, dismissed (" + "cacheLoader=" + this.size() + ")"); + return false; + } + + if (sb.onlineCaution()) { + if (this.log.isFine()) log.logFine("remoteCrawlLoaderJob: online caution, omitting processing"); + return false; + } + + if (remoteTriggeredCrawlJobSize() > 100) { + if (this.log.isFine()) log.logFine("remoteCrawlLoaderJob: the remote-triggered crawl job queue is filled, omitting processing"); + return false; + } + + if (coreCrawlJobSize() > 0) { + if (this.log.isFine()) log.logFine("remoteCrawlLoaderJob: a local crawl is running, omitting processing"); + return false; + } + + // check if we have an entry in the provider list, otherwise fill the list + yacySeed seed; + if (remoteCrawlProviderHashes.size() == 0) { + if (sb.webIndex.peers() != null && sb.webIndex.peers().sizeConnected() > 0) { + final Iterator e = PeerSelection.getProvidesRemoteCrawlURLs(sb.webIndex.peers()); + while (e.hasNext()) { + seed = e.next(); + if (seed != null) { + remoteCrawlProviderHashes.add(seed.hash); + } + } + } + } + if (remoteCrawlProviderHashes.size() == 0) return false; + + // take one entry from the provider list and load the entries from the remote peer + seed = null; + String hash = null; + while ((seed == null) && (remoteCrawlProviderHashes.size() > 0)) { + hash = remoteCrawlProviderHashes.remove(remoteCrawlProviderHashes.size() - 1); + if (hash == null) continue; + seed = sb.webIndex.peers().get(hash); + if (seed == null) continue; + // check if the peer is inside our cluster + if ((sb.isRobinsonMode()) && (!sb.isInMyCluster(seed))) { + seed = null; + continue; + } + } + if (seed == null) return false; + + // we know a peer which should provide remote crawl entries. load them now. + final RSSFeed feed = yacyClient.queryRemoteCrawlURLs(sb.webIndex.peers(), seed, 30, 60000); + if (feed == null || feed.size() == 0) { + // something is wrong with this provider. To prevent that we get not stuck with this peer + // we remove it from the peer list + sb.webIndex.peers().peerActions.peerDeparture(seed, "no results from provided remote crawls"); + // ask another peer + return remoteCrawlLoaderJob(); + } + + // parse the rss + yacyURL url, referrer; + Date loaddate; + for (final RSSMessage item: feed) { + //System.out.println("URL=" + item.getLink() + ", desc=" + item.getDescription() + ", pubDate=" + item.getPubDate()); + + // put url on remote crawl stack + try { + url = new yacyURL(item.getLink(), null); + } catch (final MalformedURLException e) { + url = null; + } + try { + referrer = new yacyURL(item.getReferrer(), null); + } catch (final MalformedURLException e) { + referrer = null; + } + try { + loaddate = DateFormatter.parseShortSecond(item.getPubDate()); + } catch (final ParseException e) { + loaddate = new Date(); + } + final String urlRejectReason = sb.crawlStacker.urlInAcceptedDomain(url); + if (urlRejectReason == null) { + // stack url + if (sb.getLog().isFinest()) sb.getLog().logFinest("crawlOrder: stack: url='" + url + "'"); + sb.crawlStacker.enqueueEntry(new CrawlEntry( + hash, + url, + (referrer == null) ? null : referrer.hash(), + item.getDescription(), + null, + loaddate, + sb.webIndex.defaultRemoteProfile.handle(), + 0, + 0, + 0 + )); + } else { + log.logWarning("crawlOrder: Rejected URL '" + urlToString(url) + "': " + urlRejectReason); + } + } + return true; + } + + /** + * @param url + * @return + */ + private String urlToString(final yacyURL url) { + return (url == null ? "null" : url.toNormalform(true, false)); + } + + public int limitCrawlJobSize() { + return noticeURL.stackSize(NoticedURL.STACK_TYPE_LIMIT); + } + + public int remoteTriggeredCrawlJobSize() { + return noticeURL.stackSize(NoticedURL.STACK_TYPE_REMOTE); + } + + public boolean remoteTriggeredCrawlJob() { + // work off crawl requests that had been placed by other peers to our crawl stack + + // do nothing if either there are private processes to be done + // or there is no global crawl on the stack + if(!crawlIsPossible(NoticedURL.STACK_TYPE_REMOTE, "Global")) return false; + + if(isPaused(plasmaSwitchboardConstants.CRAWLJOB_REMOTE_TRIGGERED_CRAWL)) return false; + + // we don't want to crawl a global URL globally, since WE are the global part. (from this point of view) + final String stats = "REMOTETRIGGEREDCRAWL[" + noticeURL.stackSize(NoticedURL.STACK_TYPE_CORE) + ", " + noticeURL.stackSize(NoticedURL.STACK_TYPE_LIMIT) + ", " + noticeURL.stackSize(NoticedURL.STACK_TYPE_OVERHANG) + ", " + + noticeURL.stackSize(NoticedURL.STACK_TYPE_REMOTE) + "]"; + try { + final CrawlEntry urlEntry = noticeURL.pop(NoticedURL.STACK_TYPE_REMOTE, true, sb.webIndex.profilesActiveCrawls); + final String profileHandle = urlEntry.profileHandle(); + // System.out.println("DEBUG plasmaSwitchboard.processCrawling: + // profileHandle = " + profileHandle + ", urlEntry.url = " + + // urlEntry.url()); + generateCrawl(urlEntry, stats, profileHandle); + return true; + } catch (final IOException e) { + log.logSevere(stats + ": CANNOT FETCH ENTRY: " + e.getMessage(), e); + if (e.getMessage().indexOf("hash is null") > 0) noticeURL.clear(NoticedURL.STACK_TYPE_REMOTE); + return true; + } + } + + private void processLocalCrawling(final CrawlEntry entry, final String stats) { + // work off one Crawl stack entry + if ((entry == null) || (entry.url() == null)) { + log.logInfo(stats + ": urlEntry = null"); + return; + } + new crawlWorker(entry); + + log.logInfo(stats + ": enqueued for load " + entry.url() + " [" + entry.url().hash() + "]"); + return; + } + + public Document loadResourceFromWeb( + final yacyURL url, + final int socketTimeout, + final boolean keepInMemory, + final boolean forText, + final boolean global + ) throws IOException { + + final CrawlEntry centry = new CrawlEntry( + sb.webIndex.peers().mySeed().hash, + url, + "", + "", + new Date(), + new Date(), + (forText) ? + ((global) ? + sb.webIndex.defaultTextSnippetGlobalProfile.handle() : + sb.webIndex.defaultTextSnippetLocalProfile.handle()) + : + ((global) ? + sb.webIndex.defaultMediaSnippetGlobalProfile.handle() : + sb.webIndex.defaultMediaSnippetLocalProfile.handle()), // crawl profile + 0, + 0, + 0); + + return loader.load(centry, (forText) ? plasmaParser.PARSER_MODE_CRAWLER : plasmaParser.PARSER_MODE_IMAGE); + } + + public int size() { + return workers.size(); + } + + protected final class crawlWorker extends Thread { + + private CrawlEntry entry; + private final Integer code; + private long start; + + public crawlWorker(final CrawlEntry entry) { + this.start = System.currentTimeMillis(); + this.entry = entry; + this.entry.setStatus("worker-initialized", serverProcessorJob.STATUS_INITIATED); + this.code = Integer.valueOf(entry.hashCode()); + if (!workers.containsKey(code)) { + workers.put(code, this); + this.start(); + } + } + + public long age() { + return System.currentTimeMillis() - start; + } + + public void run() { + try { + // checking robots.txt for http(s) resources + this.entry.setStatus("worker-checkingrobots", serverProcessorJob.STATUS_STARTED); + if ((entry.url().getProtocol().equals("http") || entry.url().getProtocol().equals("https")) && sb.robots.isDisallowed(entry.url())) { + if (log.isFine()) log.logFine("Crawling of URL '" + entry.url().toString() + "' disallowed by robots.txt."); + final ZURL.Entry eentry = errorURL.newEntry( + this.entry, + sb.webIndex.peers().mySeed().hash, + new Date(), + 1, + "denied by robots.txt"); + eentry.store(); + errorURL.push(eentry); + } else { + // starting a load from the internet + this.entry.setStatus("worker-loading", serverProcessorJob.STATUS_RUNNING); + final String result = loader.process(this.entry, plasmaParser.PARSER_MODE_CRAWLER); + if (result != null) { + final ZURL.Entry eentry = errorURL.newEntry( + this.entry, + sb.webIndex.peers().mySeed().hash, + new Date(), + 1, + "cannot load: " + result); + eentry.store(); + errorURL.push(eentry); + } else { + this.entry.setStatus("worker-processed", serverProcessorJob.STATUS_FINISHED); + } + } + } catch (final Exception e) { + final ZURL.Entry eentry = errorURL.newEntry( + this.entry, + sb.webIndex.peers().mySeed().hash, + new Date(), + 1, + e.getMessage() + " - in worker"); + eentry.store(); + errorURL.push(eentry); + e.printStackTrace(); + } finally { + workers.remove(code); + this.entry.setStatus("worker-finalized", serverProcessorJob.STATUS_FINISHED); + } + } + + } + +} diff --git a/source/de/anomic/crawler/CrawlStacker.java b/source/de/anomic/crawler/CrawlStacker.java index 66f356be6..b3b6d6536 100644 --- a/source/de/anomic/crawler/CrawlStacker.java +++ b/source/de/anomic/crawler/CrawlStacker.java @@ -41,17 +41,17 @@ import de.anomic.server.serverProcessor; import de.anomic.yacy.yacyURL; public final class CrawlStacker { - - final Log log = new Log("STACKCRAWL"); - + + private Log log = new Log("STACKCRAWL"); + private serverProcessor fastQueue, slowQueue; private long dnsHit, dnsMiss; private CrawlQueues nextQueue; private plasmaWordIndex wordIndex; private boolean acceptLocalURLs, acceptGlobalURLs; - + // this is the process that checks url for double-occurrences and for allowance/disallowance by robots.txt - + public CrawlStacker(CrawlQueues cq, plasmaWordIndex wordIndex, boolean acceptLocalURLs, boolean acceptGlobalURLs) { this.nextQueue = cq; this.wordIndex = wordIndex; @@ -59,10 +59,10 @@ public final class CrawlStacker { this.dnsMiss = 0; this.acceptLocalURLs = acceptLocalURLs; this.acceptGlobalURLs = acceptGlobalURLs; - + this.fastQueue = new serverProcessor("CrawlStackerFast", "This process checks new urls before they are enqueued into the balancer (proper, double-check, correct domain, filter)", new String[]{"Balancer"}, this, "job", 10000, null, 2); this.slowQueue = new serverProcessor("CrawlStackerSlow", "This is like CrawlStackerFast, but does additionaly a DNS lookup. The CrawlStackerFast does not need this because it can use the DNS cache.", new String[]{"Balancer"}, this, "job", 1000, null, 5); - + this.log.logInfo("STACKCRAWL thread initialized."); } @@ -74,20 +74,20 @@ public final class CrawlStacker { this.fastQueue.clear(); this.slowQueue.clear(); } - + public void announceClose() { this.log.logInfo("Flushing remaining " + size() + " crawl stacker job entries."); this.fastQueue.announceShutdown(); this.slowQueue.announceShutdown(); } - + public void close() { this.log.logInfo("Shutdown. waiting for remaining " + size() + " crawl stacker job entries. please wait."); this.fastQueue.announceShutdown(); this.slowQueue.announceShutdown(); this.fastQueue.awaitShutdown(2000); this.slowQueue.awaitShutdown(2000); - + this.log.logInfo("Shutdown. Closing stackCrawl queue."); clear(); @@ -105,7 +105,7 @@ public final class CrawlStacker { // we just don't know anything about that host return false; } - + /* public boolean job() { if (this.fastQueue.queueSize() > 0 && job(this.fastQueue)) return true; @@ -113,7 +113,7 @@ public final class CrawlStacker { return job(this.slowQueue); } */ - + public CrawlEntry job(CrawlEntry entry) { // this is the method that is called by the busy thread from outside if (entry == null) return null; @@ -133,11 +133,11 @@ public final class CrawlStacker { } return null; } - + public void enqueueEntry(final CrawlEntry entry) { - + // DEBUG - if (log.isFinest()) log.logFinest("ENQUEUE "+ entry.url() +", referer="+entry.referrerhash() +", initiator="+entry.initiator() +", name="+entry.name() +", load="+entry.loaddate() +", depth="+entry.depth()); + if (log.isFinest()) log.logFinest("ENQUEUE " + entry.url() + ", referer=" + entry.referrerhash() + ", initiator=" + entry.initiator() + ", name=" + entry.name() + ", load=" + entry.loaddate() + ", depth=" + entry.depth()); if (prefetchHost(entry.url().getHost())) { try { @@ -149,89 +149,79 @@ public final class CrawlStacker { } else { try { this.slowQueue.enQueue(entry); - this.dnsMiss++; + this.dnsMiss++; } catch (InterruptedException e) { e.printStackTrace(); } } } - + public String stackCrawl(final CrawlEntry entry) { // stacks a crawl item. The position can also be remote // returns null if successful, a reason string if not successful //this.log.logFinest("stackCrawl: nexturlString='" + nexturlString + "'"); - + final long startTime = System.currentTimeMillis(); - String reason = null; // failure reason // check if the protocol is supported final String urlProtocol = entry.url().getProtocol(); if (!nextQueue.isSupportedProtocol(urlProtocol)) { - reason = "unsupported protocol"; - this.log.logSevere("Unsupported protocol in URL '" + entry.url().toString() + "'. " + - "Stack processing time: " + (System.currentTimeMillis()-startTime) + "ms"); - return reason; + this.log.logSevere("Unsupported protocol in URL '" + entry.url().toString() + "'. " + + "Stack processing time: " + (System.currentTimeMillis() - startTime) + "ms"); + return "unsupported protocol"; } // check if ip is local ip address final String urlRejectReason = urlInAcceptedDomain(entry.url()); if (urlRejectReason != null) { - reason = "denied_(" + urlRejectReason + ")"; - if (this.log.isFine()) this.log.logFine(reason + "Stack processing time: " + (System.currentTimeMillis()-startTime) + "ms"); - return reason; + if (this.log.isFine()) this.log.logFine("denied_(" + urlRejectReason + ") Stack processing time: " + (System.currentTimeMillis() - startTime) + "ms"); + return "denied_(" + urlRejectReason + ")"; } - + // check blacklist if (plasmaSwitchboard.urlBlacklist.isListed(Blacklist.BLACKLIST_CRAWLER, entry.url())) { - reason = "url in blacklist"; if (this.log.isFine()) this.log.logFine("URL '" + entry.url().toString() + "' is in blacklist. " + - "Stack processing time: " + (System.currentTimeMillis()-startTime) + "ms"); - return reason; + "Stack processing time: " + (System.currentTimeMillis() - startTime) + "ms"); + return "url in blacklist"; } - + final CrawlProfile.entry profile = wordIndex.profilesActiveCrawls.getEntry(entry.profileHandle()); if (profile == null) { final String errorMsg = "LOST STACKER PROFILE HANDLE '" + entry.profileHandle() + "' for URL " + entry.url(); log.logWarning(errorMsg); return errorMsg; } - + // filter with must-match if ((entry.depth() > 0) && !profile.mustMatchPattern().matcher(entry.url().toString()).matches()) { - reason = "url does not match must-match filter"; if (this.log.isFine()) this.log.logFine("URL '" + entry.url().toString() + "' does not match must-match crawling filter '" + profile.mustMatchPattern().toString() + "'. " + - "Stack processing time: " + (System.currentTimeMillis()-startTime) + "ms"); - return reason; + "Stack processing time: " + (System.currentTimeMillis() - startTime) + "ms"); + return "url does not match must-match filter"; } // filter with must-not-match if ((entry.depth() > 0) && profile.mustNotMatchPattern().matcher(entry.url().toString()).matches()) { - reason = "url matches must-not-match filter"; if (this.log.isFine()) this.log.logFine("URL '" + entry.url().toString() + "' does matches do-not-match crawling filter '" + profile.mustNotMatchPattern().toString() + "'. " + - "Stack processing time: " + (System.currentTimeMillis()-startTime) + "ms"); - return reason; + "Stack processing time: " + (System.currentTimeMillis() - startTime) + "ms"); + return "url matches must-not-match filter"; } // deny cgi if (entry.url().isCGI()) { - reason = "cgi url not allowed"; - - if (this.log.isFine()) this.log.logFine("URL '" + entry.url().toString() + "' is CGI URL. " + - "Stack processing time: " + (System.currentTimeMillis()-startTime) + "ms"); - return reason; + if (this.log.isFine()) this.log.logFine("URL '" + entry.url().toString() + "' is CGI URL. " + + "Stack processing time: " + (System.currentTimeMillis() - startTime) + "ms"); + return "cgi url not allowed"; } - + // deny post properties if (entry.url().isPOST() && !(profile.crawlingQ())) { - reason = "post url not allowed"; - - if (this.log.isFine()) this.log.logFine("URL '" + entry.url().toString() + "' is post URL. " + - "Stack processing time: " + (System.currentTimeMillis()-startTime) + "ms"); - return reason; + if (this.log.isFine()) this.log.logFine("URL '" + entry.url().toString() + "' is post URL. " + + "Stack processing time: " + (System.currentTimeMillis() - startTime) + "ms"); + return "post url not allowed"; } - + final yacyURL referrerURL = (entry.referrerhash() == null) ? null : nextQueue.getURL(entry.referrerhash()); - + // add domain to profile domain list if ((profile.domFilterDepth() != Integer.MAX_VALUE) || (profile.domMaxPages() != Integer.MAX_VALUE)) { profile.domInc(entry.url().getHost(), (referrerURL == null) ? null : referrerURL.getHost().toLowerCase(), entry.depth()); @@ -239,18 +229,16 @@ public final class CrawlStacker { // deny urls that do not match with the profile domain list if (!(profile.grantedDomAppearance(entry.url().getHost()))) { - reason = "url does not match domain filter"; - if (this.log.isFine()) this.log.logFine("URL '" + entry.url().toString() + "' is not listed in granted domains. " + - "Stack processing time: " + (System.currentTimeMillis()-startTime) + "ms"); - return reason; + if (this.log.isFine()) this.log.logFine("URL '" + entry.url().toString() + "' is not listed in granted domains. " + + "Stack processing time: " + (System.currentTimeMillis() - startTime) + "ms"); + return "url does not match domain filter"; } // deny urls that exceed allowed number of occurrences if (!(profile.grantedDomCount(entry.url().getHost()))) { - reason = "domain counter exceeded"; - if (this.log.isFine()) this.log.logFine("URL '" + entry.url().toString() + "' appeared too often, a maximum of " + profile.domMaxPages() + " is allowed. "+ - "Stack processing time: " + (System.currentTimeMillis()-startTime) + "ms"); - return reason; + if (this.log.isFine()) this.log.logFine("URL '" + entry.url().toString() + "' appeared too often, a maximum of " + profile.domMaxPages() + " is allowed. " + + "Stack processing time: " + (System.currentTimeMillis() - startTime) + "ms"); + return "domain counter exceeded"; } // check if the url is double registered @@ -260,36 +248,34 @@ public final class CrawlStacker { final boolean recrawl = (oldEntry != null) && (profile.recrawlIfOlder() > oldEntry.loaddate().getTime()); // do double-check if ((dbocc != null) && (!recrawl)) { - reason = "double " + dbocc; - if (this.log.isFine()) this.log.logFine("URL '" + entry.url().toString() + "' is double registered in '" + dbocc + "'. " + "Stack processing time: " + (System.currentTimeMillis()-startTime) + "ms"); - return reason; + if (this.log.isFine()) this.log.logFine("URL '" + entry.url().toString() + "' is double registered in '" + dbocc + "'. " + "Stack processing time: " + (System.currentTimeMillis() - startTime) + "ms"); + return "double " + dbocc; } if ((oldEntry != null) && (!recrawl)) { - reason = "double LURL"; - if (this.log.isFine()) this.log.logFine("URL '" + entry.url().toString() + "' is double registered in 'LURL'. " + "Stack processing time: " + (System.currentTimeMillis()-startTime) + "ms"); - return reason; + if (this.log.isFine()) this.log.logFine("URL '" + entry.url().toString() + "' is double registered in 'LURL'. " + "Stack processing time: " + (System.currentTimeMillis() - startTime) + "ms"); + return "double LURL"; } - + // show potential re-crawl if (recrawl && oldEntry != null) { if (this.log.isFine()) this.log.logFine("RE-CRAWL of URL '" + entry.url().toString() + "': this url was crawled " + ((System.currentTimeMillis() - oldEntry.loaddate().getTime()) / 60000 / 60 / 24) + " days ago."); } } - + // store information final boolean local = entry.initiator().equals(wordIndex.peers().mySeed().hash); final boolean proxy = (entry.initiator() == null || entry.initiator().equals("------------")) && profile.handle().equals(wordIndex.defaultProxyProfile.handle()); final boolean remote = profile.handle().equals(wordIndex.defaultRemoteProfile.handle()); - final boolean global = + final boolean global = (profile.remoteIndexing()) /* granted */ && - (entry.depth() == profile.depth()) /* leaf node */ && + (entry.depth() == profile.depth()) /* leaf node */ && //(initiatorHash.equals(yacyCore.seedDB.mySeed.hash)) /* not proxy */ && ( (wordIndex.peers().mySeed().isSenior()) || (wordIndex.peers().mySeed().isPrincipal()) ) /* qualified */; - + if (!local && !global && !remote && !proxy) { this.log.logSevere("URL '" + entry.url().toString() + "' cannot be crawled. initiator = " + entry.initiator() + ", profile.handle = " + profile.handle()); } else { @@ -309,10 +295,9 @@ public final class CrawlStacker { nextQueue.noticeURL.push(NoticedURL.STACK_TYPE_REMOTE, entry); } } - + return null; } - /** * Test a url if it can be used for crawling/indexing @@ -344,11 +329,11 @@ public final class CrawlStacker { ("the host '" + host + "' is local, but local addresses are not accepted") : ("the host '" + host + "' is global, but global addresses are not accepted"); } - + public boolean acceptLocalURLs() { return this.acceptLocalURLs; } - + public boolean acceptGlobalURLs() { return this.acceptGlobalURLs; }