refactoring: removed dependency from switchboard in Balancer/CrawlQueues

pull/1/head
Michael Peter Christen 13 years ago
parent 33d1062c79
commit a5d7da68a0

@ -148,7 +148,7 @@ public class IndexCreateQueues_p {
prop.put("crawler_host_" + hc + "_list_" + count + "_depth", request.depth()); prop.put("crawler_host_" + hc + "_list_" + count + "_depth", request.depth());
prop.put("crawler_host_" + hc + "_list_" + count + "_modified", daydate(request.appdate()) ); prop.put("crawler_host_" + hc + "_list_" + count + "_modified", daydate(request.appdate()) );
prop.putHTML("crawler_host_" + hc + "_list_" + count + "_anchor", request.name()); prop.putHTML("crawler_host_" + hc + "_list_" + count + "_anchor", request.name());
prop.put("crawler_host_" + hc + "_list_" + count + "_delta", sb.crawlQueues.noticeURL.getDomainSleepTime(stackType, sb.crawler, request)); prop.put("crawler_host_" + hc + "_list_" + count + "_delta", sb.crawlQueues.noticeURL.getDomainSleepTime(stackType, sb.robots, sb.crawler, request));
prop.putHTML("crawler_host_" + hc + "_list_" + count + "_url", request.url().toNormalform(false, true)); prop.putHTML("crawler_host_" + hc + "_list_" + count + "_url", request.url().toNormalform(false, true));
prop.put("crawler_host_" + hc + "_list_" + count + "_hash", request.url().hash()); prop.put("crawler_host_" + hc + "_list_" + count + "_hash", request.url().hash());
count++; count++;

@ -71,7 +71,7 @@ public class urls {
(System.currentTimeMillis() < timeout) && (System.currentTimeMillis() < timeout) &&
(sb.crawlQueues.noticeURL.stackSize(stackType) > 0)) { (sb.crawlQueues.noticeURL.stackSize(stackType) > 0)) {
try { try {
entry = sb.crawlQueues.noticeURL.pop(stackType, false, sb.crawler); entry = sb.crawlQueues.noticeURL.pop(stackType, false, sb.crawler, sb.robots);
} catch (final IOException e) { } catch (final IOException e) {
break; break;
} }

@ -280,19 +280,19 @@ public class Balancer {
* @param crawlEntry * @param crawlEntry
* @return * @return
*/ */
public long getDomainSleepTime(final CrawlSwitchboard cs, Request crawlEntry) { public long getDomainSleepTime(final CrawlSwitchboard cs, final RobotsTxt robots, Request crawlEntry) {
final CrawlProfile profileEntry = cs.getActive(UTF8.getBytes(crawlEntry.profileHandle())); final CrawlProfile profileEntry = cs.getActive(UTF8.getBytes(crawlEntry.profileHandle()));
return getDomainSleepTime(cs, profileEntry, crawlEntry.url()); return getDomainSleepTime(cs, robots, profileEntry, crawlEntry.url());
} }
private long getDomainSleepTime(final CrawlSwitchboard cs, final CrawlProfile profileEntry, final DigestURI crawlURL) { private long getDomainSleepTime(final CrawlSwitchboard cs, final RobotsTxt robots, final CrawlProfile profileEntry, final DigestURI crawlURL) {
if (profileEntry == null) { if (profileEntry == null) {
return 0; return 0;
} }
long sleeptime = ( long sleeptime = (
profileEntry.cacheStrategy() == CacheStrategy.CACHEONLY || profileEntry.cacheStrategy() == CacheStrategy.CACHEONLY ||
(profileEntry.cacheStrategy() == CacheStrategy.IFEXIST && Cache.has(crawlURL.hash())) (profileEntry.cacheStrategy() == CacheStrategy.IFEXIST && Cache.has(crawlURL.hash()))
) ? 0 : Latency.waitingRemaining(crawlURL, this.myAgentIDs, this.minimumLocalDelta, this.minimumGlobalDelta); // this uses the robots.txt database and may cause a loading of robots.txt from the server ) ? 0 : Latency.waitingRemaining(crawlURL, robots, this.myAgentIDs, this.minimumLocalDelta, this.minimumGlobalDelta); // this uses the robots.txt database and may cause a loading of robots.txt from the server
return sleeptime; return sleeptime;
} }
@ -367,7 +367,7 @@ public class Balancer {
* @throws IOException * @throws IOException
* @throws RowSpaceExceededException * @throws RowSpaceExceededException
*/ */
public Request pop(final boolean delay, final CrawlSwitchboard cs) throws IOException { public Request pop(final boolean delay, final CrawlSwitchboard cs, final RobotsTxt robots) throws IOException {
// returns a crawl entry from the stack and ensures minimum delta times // returns a crawl entry from the stack and ensures minimum delta times
long sleeptime = 0; long sleeptime = 0;
@ -409,7 +409,7 @@ public class Balancer {
return null; return null;
} }
// depending on the caching policy we need sleep time to avoid DoS-like situations // depending on the caching policy we need sleep time to avoid DoS-like situations
sleeptime = getDomainSleepTime(cs, profileEntry, crawlEntry.url()); sleeptime = getDomainSleepTime(cs, robots, profileEntry, crawlEntry.url());
assert Base64Order.enhancedCoder.equal(nexthash, rowEntry.getPrimaryKeyBytes()) : "result = " + ASCII.String(nexthash) + ", rowEntry.getPrimaryKeyBytes() = " + ASCII.String(rowEntry.getPrimaryKeyBytes()); assert Base64Order.enhancedCoder.equal(nexthash, rowEntry.getPrimaryKeyBytes()) : "result = " + ASCII.String(nexthash) + ", rowEntry.getPrimaryKeyBytes() = " + ASCII.String(rowEntry.getPrimaryKeyBytes());
assert Base64Order.enhancedCoder.equal(nexthash, crawlEntry.url().hash()) : "result = " + ASCII.String(nexthash) + ", crawlEntry.url().hash() = " + ASCII.String(crawlEntry.url().hash()); assert Base64Order.enhancedCoder.equal(nexthash, crawlEntry.url().hash()) : "result = " + ASCII.String(nexthash) + ", crawlEntry.url().hash() = " + ASCII.String(crawlEntry.url().hash());
@ -425,7 +425,7 @@ public class Balancer {
// in best case, this should never happen if the balancer works propertly // in best case, this should never happen if the balancer works propertly
// this is only to protection against the worst case, where the crawler could // this is only to protection against the worst case, where the crawler could
// behave in a DoS-manner // behave in a DoS-manner
Log.logInfo("BALANCER", "forcing crawl-delay of " + sleeptime + " milliseconds for " + crawlEntry.url().getHost() + ": " + Latency.waitingRemainingExplain(crawlEntry.url(), this.myAgentIDs, this.minimumLocalDelta, this.minimumGlobalDelta) + ", domainStacks.size() = " + this.domainStacks.size() + ", domainStacksInitSize = " + this.domStackInitSize); Log.logInfo("BALANCER", "forcing crawl-delay of " + sleeptime + " milliseconds for " + crawlEntry.url().getHost() + ": " + Latency.waitingRemainingExplain(crawlEntry.url(), robots, this.myAgentIDs, this.minimumLocalDelta, this.minimumGlobalDelta) + ", domainStacks.size() = " + this.domainStacks.size() + ", domainStacksInitSize = " + this.domStackInitSize);
long loops = sleeptime / 1000; long loops = sleeptime / 1000;
long rest = sleeptime % 1000; long rest = sleeptime % 1000;
if (loops < 3) { if (loops < 3) {

@ -226,7 +226,7 @@ public class CrawlQueues {
// move some tasks to the core crawl job so we have something to do // move some tasks to the core crawl job so we have something to do
final int toshift = Math.min(10, limitCrawlJobSize()); // this cannot be a big number because the balancer makes a forced waiting if it cannot balance final int toshift = Math.min(10, limitCrawlJobSize()); // this cannot be a big number because the balancer makes a forced waiting if it cannot balance
for (int i = 0; i < toshift; i++) { for (int i = 0; i < toshift; i++) {
this.noticeURL.shift(NoticedURL.StackType.GLOBAL, NoticedURL.StackType.LOCAL, this.sb.crawler); this.noticeURL.shift(NoticedURL.StackType.GLOBAL, NoticedURL.StackType.LOCAL, this.sb.crawler, this.sb.robots);
} }
this.log.logInfo("shifted " + toshift + " jobs from global crawl to local crawl (coreCrawlJobSize()=" + coreCrawlJobSize() + this.log.logInfo("shifted " + toshift + " jobs from global crawl to local crawl (coreCrawlJobSize()=" + coreCrawlJobSize() +
", limitCrawlJobSize()=" + limitCrawlJobSize() + ", cluster.mode=" + this.sb.getConfig(SwitchboardConstants.CLUSTER_MODE, "") + ", limitCrawlJobSize()=" + limitCrawlJobSize() + ", cluster.mode=" + this.sb.getConfig(SwitchboardConstants.CLUSTER_MODE, "") +
@ -261,7 +261,7 @@ public class CrawlQueues {
try { try {
if (this.noticeURL.stackSize(NoticedURL.StackType.NOLOAD) > 0) { if (this.noticeURL.stackSize(NoticedURL.StackType.NOLOAD) > 0) {
// get one entry that will not be loaded, just indexed // get one entry that will not be loaded, just indexed
urlEntry = this.noticeURL.pop(NoticedURL.StackType.NOLOAD, true, this.sb.crawler); urlEntry = this.noticeURL.pop(NoticedURL.StackType.NOLOAD, true, this.sb.crawler, this.sb.robots);
if (urlEntry == null) { if (urlEntry == null) {
continue; continue;
} }
@ -284,7 +284,7 @@ public class CrawlQueues {
return true; return true;
} }
urlEntry = this.noticeURL.pop(NoticedURL.StackType.LOCAL, true, this.sb.crawler); urlEntry = this.noticeURL.pop(NoticedURL.StackType.LOCAL, true, this.sb.crawler, this.sb.robots);
if (urlEntry == null) { if (urlEntry == null) {
continue; continue;
} }
@ -582,7 +582,7 @@ public class CrawlQueues {
final String stats = "REMOTETRIGGEREDCRAWL[" + this.noticeURL.stackSize(NoticedURL.StackType.LOCAL) + ", " + this.noticeURL.stackSize(NoticedURL.StackType.GLOBAL) + ", " + this.noticeURL.stackSize(NoticedURL.StackType.OVERHANG) + ", " final String stats = "REMOTETRIGGEREDCRAWL[" + this.noticeURL.stackSize(NoticedURL.StackType.LOCAL) + ", " + this.noticeURL.stackSize(NoticedURL.StackType.GLOBAL) + ", " + this.noticeURL.stackSize(NoticedURL.StackType.OVERHANG) + ", "
+ this.noticeURL.stackSize(NoticedURL.StackType.REMOTE) + "]"; + this.noticeURL.stackSize(NoticedURL.StackType.REMOTE) + "]";
try { try {
final Request urlEntry = this.noticeURL.pop(NoticedURL.StackType.REMOTE, true, this.sb.crawler); final Request urlEntry = this.noticeURL.pop(NoticedURL.StackType.REMOTE, true, this.sb.crawler, this.sb.robots);
final String profileHandle = urlEntry.profileHandle(); final String profileHandle = urlEntry.profileHandle();
// System.out.println("DEBUG plasmaSwitchboard.processCrawling: // System.out.println("DEBUG plasmaSwitchboard.processCrawling:
// profileHandle = " + profileHandle + ", urlEntry.url = " + // profileHandle = " + profileHandle + ", urlEntry.url = " +

@ -32,7 +32,6 @@ import java.util.concurrent.ConcurrentHashMap;
import net.yacy.cora.document.MultiProtocolURI; import net.yacy.cora.document.MultiProtocolURI;
import net.yacy.cora.protocol.Domains; import net.yacy.cora.protocol.Domains;
import net.yacy.kelondro.util.MemoryControl; import net.yacy.kelondro.util.MemoryControl;
import net.yacy.search.Switchboard;
public class Latency { public class Latency {
@ -161,7 +160,7 @@ public class Latency {
* @param minimumGlobalDelta * @param minimumGlobalDelta
* @return the remaining waiting time in milliseconds * @return the remaining waiting time in milliseconds
*/ */
public static long waitingRemaining(final MultiProtocolURI url, final Set<String> thisAgents, final long minimumLocalDelta, final long minimumGlobalDelta) { public static long waitingRemaining(final MultiProtocolURI url, final RobotsTxt robots, final Set<String> thisAgents, final long minimumLocalDelta, final long minimumGlobalDelta) {
// first check if the domain was _ever_ accessed before // first check if the domain was _ever_ accessed before
final Host host = host(url); final Host host = host(url);
@ -188,7 +187,7 @@ public class Latency {
if (!local) { if (!local) {
RobotsTxtEntry robotsEntry; RobotsTxtEntry robotsEntry;
try { try {
robotsEntry = Switchboard.getSwitchboard().robots.getEntry(url, thisAgents); robotsEntry = robots.getEntry(url, thisAgents);
} catch (final IOException e) { } catch (final IOException e) {
robotsEntry = null; robotsEntry = null;
} }
@ -211,7 +210,7 @@ public class Latency {
} }
public static String waitingRemainingExplain(final MultiProtocolURI url, final Set<String> thisAgents, final long minimumLocalDelta, final long minimumGlobalDelta) { public static String waitingRemainingExplain(final MultiProtocolURI url, final RobotsTxt robots, final Set<String> thisAgents, final long minimumLocalDelta, final long minimumGlobalDelta) {
// first check if the domain was _ever_ accessed before // first check if the domain was _ever_ accessed before
final Host host = host(url); final Host host = host(url);
@ -241,7 +240,7 @@ public class Latency {
if (!local) { if (!local) {
RobotsTxtEntry robotsEntry; RobotsTxtEntry robotsEntry;
try { try {
robotsEntry = Switchboard.getSwitchboard().robots.getEntry(url, thisAgents); robotsEntry = robots.getEntry(url, thisAgents);
} catch (final IOException e) { } catch (final IOException e) {
robotsEntry = null; robotsEntry = null;
} }

@ -247,12 +247,12 @@ public class NoticedURL {
* get a list of domains that are currently maintained as domain stacks * get a list of domains that are currently maintained as domain stacks
* @return a collection of clear text strings of host names * @return a collection of clear text strings of host names
*/ */
public long getDomainSleepTime(final StackType stackType, final CrawlSwitchboard cs, Request crawlEntry) { public long getDomainSleepTime(final StackType stackType, final RobotsTxt robots, final CrawlSwitchboard cs, Request crawlEntry) {
switch (stackType) { switch (stackType) {
case LOCAL: return this.coreStack.getDomainSleepTime(cs, crawlEntry); case LOCAL: return this.coreStack.getDomainSleepTime(cs, robots, crawlEntry);
case GLOBAL: return this.limitStack.getDomainSleepTime(cs, crawlEntry); case GLOBAL: return this.limitStack.getDomainSleepTime(cs, robots, crawlEntry);
case REMOTE: return this.remoteStack.getDomainSleepTime(cs, crawlEntry); case REMOTE: return this.remoteStack.getDomainSleepTime(cs, robots, crawlEntry);
case NOLOAD: return this.noloadStack.getDomainSleepTime(cs, crawlEntry); case NOLOAD: return this.noloadStack.getDomainSleepTime(cs, robots, crawlEntry);
default: return 0; default: return 0;
} }
} }
@ -273,19 +273,19 @@ public class NoticedURL {
} }
} }
public Request pop(final StackType stackType, final boolean delay, final CrawlSwitchboard cs) throws IOException { public Request pop(final StackType stackType, final boolean delay, final CrawlSwitchboard cs, final RobotsTxt robots) throws IOException {
switch (stackType) { switch (stackType) {
case LOCAL: return pop(this.coreStack, delay, cs); case LOCAL: return pop(this.coreStack, delay, cs, robots);
case GLOBAL: return pop(this.limitStack, delay, cs); case GLOBAL: return pop(this.limitStack, delay, cs, robots);
case REMOTE: return pop(this.remoteStack, delay, cs); case REMOTE: return pop(this.remoteStack, delay, cs, robots);
case NOLOAD: return pop(this.noloadStack, false, cs); case NOLOAD: return pop(this.noloadStack, false, cs, robots);
default: return null; default: return null;
} }
} }
public void shift(final StackType fromStack, final StackType toStack, final CrawlSwitchboard cs) { public void shift(final StackType fromStack, final StackType toStack, final CrawlSwitchboard cs, final RobotsTxt robots) {
try { try {
final Request entry = pop(fromStack, false, cs); final Request entry = pop(fromStack, false, cs, robots);
if (entry != null) { if (entry != null) {
final String warning = push(toStack, entry); final String warning = push(toStack, entry);
if (warning != null) { if (warning != null) {
@ -308,14 +308,14 @@ public class NoticedURL {
} }
} }
private Request pop(final Balancer balancer, final boolean delay, final CrawlSwitchboard cs) throws IOException { private Request pop(final Balancer balancer, final boolean delay, final CrawlSwitchboard cs, final RobotsTxt robots) throws IOException {
// this is a filo - pop // this is a filo - pop
int s; int s;
Request entry; Request entry;
int errors = 0; int errors = 0;
synchronized (balancer) { synchronized (balancer) {
while ((s = balancer.size()) > 0) { while ((s = balancer.size()) > 0) {
entry = balancer.pop(delay, cs); entry = balancer.pop(delay, cs, robots);
if (entry == null) { if (entry == null) {
if (s > balancer.size()) continue; if (s > balancer.size()) continue;
errors++; errors++;

Loading…
Cancel
Save