Added control over Robots.txt active threads maximum number.

When starting a crawl from a file containing thousands of links,
configuration setting "crawler.MaxActiveThreads" is effective to prevent
saturating the system with too many outgoing HTTP connections threads
launched by the crawler.
But robots.txt was not affected by this setting and was indefinitely
increasing the number of concurrently loading threads until most ot the
connections timed out.

To improve performance control, added a pool of threads for Robots.txt,
consistently used in its ensureExist() and massCrawlCheck() methods.
The Robots.txt threads pool max size can now be configured in the
/PerformanceQueus_p.html page, or with the new
"robots.txt.MaxActiveThreads" setting, initialized with the same default
value as the crawler.
pull/93/head
luccioman 8 years ago
parent 3092a8ced5
commit aa9ddf3c23

@ -1087,6 +1087,9 @@ thumbnailProgram =
# - wiki : the wiki-page
httpd.robots.txt = locked,dirs,bookmarks,network,news,status,profile
# maximum number of robots.txt loading threads
robots.txt.MaxActiveThreads = 200
# class to use for parsing wikicode
wikiParser.class = de.anomic.data.wikiCode

@ -71,8 +71,7 @@ public class CrawlCheck_p {
// mass check
final ClientIdentification.Agent agent = ClientIdentification.getAgent(post.get("agentName", ClientIdentification.yacyInternetCrawlerAgentName));
final int concurrency = Math.min(rootURLs.size(), 20);
Collection<CheckEntry> out = sb.robots.massCrawlCheck(rootURLs, agent, concurrency);
Collection<CheckEntry> out = sb.robots.massCrawlCheck(rootURLs, agent);
// evaluate the result from the concurrent computation
// make a string that is used to fill the starturls field again

@ -29,7 +29,6 @@ import java.util.Iterator;
import java.util.Map;
import net.yacy.cora.protocol.ConnectionInfo;
import net.yacy.cora.protocol.HeaderFramework;
import net.yacy.cora.protocol.RequestHeader;
import net.yacy.kelondro.data.word.WordReference;
import net.yacy.kelondro.rwi.IndexCell;
@ -235,7 +234,15 @@ public class PerformanceQueues_p {
// storing the new values into configfile
sb.setConfig(SwitchboardConstants.CRAWLER_THREADS_ACTIVE_MAX,maxBusy);
//switchboard.setConfig("crawler.MinIdleThreads",minIdle);
/*
* configuring the robots.txt loading pool
*/
// get the current crawler pool configuration
maxBusy = post.getInt("Robots.txt Pool_maxActive", SwitchboardConstants.ROBOTS_TXT_THREADS_ACTIVE_MAX_DEFAULT);
// storing the new values into configfile
sb.setConfig(SwitchboardConstants.ROBOTS_TXT_THREADS_ACTIVE_MAX, maxBusy);
/*
* configuring the http pool
@ -278,12 +285,16 @@ public class PerformanceQueues_p {
prop.put("pool_0_name","Crawler Pool");
prop.put("pool_0_maxActive", sb.getConfigLong(SwitchboardConstants.CRAWLER_THREADS_ACTIVE_MAX, 0));
prop.put("pool_0_numActive", sb.crawlQueues.activeWorkerEntries().size());
prop.put("pool_1_name","Robots.txt Pool");
prop.put("pool_1_maxActive", sb.getConfigInt(SwitchboardConstants.ROBOTS_TXT_THREADS_ACTIVE_MAX, SwitchboardConstants.ROBOTS_TXT_THREADS_ACTIVE_MAX_DEFAULT));
prop.put("pool_1_numActive", sb.crawlQueues.activeWorkerEntries().size());
prop.put("pool_1_name", "httpd Session Pool");
prop.put("pool_1_maxActive", ConnectionInfo.getServerMaxcount());
prop.put("pool_1_numActive", ConnectionInfo.getServerCount());
prop.put("pool_2_name", "httpd Session Pool");
prop.put("pool_2_maxActive", ConnectionInfo.getServerMaxcount());
prop.put("pool_2_numActive", ConnectionInfo.getServerCount());
prop.put("pool", "2");
prop.put("pool", "3");
// parse initialization memory settings
final String Xmx = sb.getConfig("javastart_Xmx", "Xmx600m").substring(3);

@ -31,11 +31,16 @@ import java.net.MalformedURLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import net.yacy.cora.document.id.DigestURL;
@ -49,8 +54,9 @@ import net.yacy.crawler.retrieval.Request;
import net.yacy.crawler.retrieval.Response;
import net.yacy.data.WorkTables;
import net.yacy.kelondro.blob.BEncodedHeap;
import net.yacy.repository.LoaderDispatcher;
import net.yacy.kelondro.util.NamePrefixThreadFactory;
import net.yacy.repository.Blacklist.BlacklistType;
import net.yacy.repository.LoaderDispatcher;
public class RobotsTxt {
@ -64,12 +70,24 @@ public class RobotsTxt {
//private static final HashSet<String> loadedRobots = new HashSet<String>(); // only for debugging
private final WorkTables tables;
private final LoaderDispatcher loader;
/** Thread pool used to launch concurrent tasks */
private ThreadPoolExecutor threadPool;
private static class DomSync {
private DomSync() {}
}
public RobotsTxt(final WorkTables worktables, LoaderDispatcher loader) {
/**
*
* @param worktables
* @param loader
* @param maxConcurrentTheads maximum active threads this instance is allowed to run for its concurrent tasks
*/
public RobotsTxt(final WorkTables worktables, LoaderDispatcher loader, final int maxActiveTheads) {
this.threadPool = new ThreadPoolExecutor(maxActiveTheads, maxActiveTheads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
new NamePrefixThreadFactory(RobotsTxt.class.getSimpleName()));
this.syncObjects = new ConcurrentHashMap<String, DomSync>();
this.tables = worktables;
this.loader = loader;
@ -89,6 +107,13 @@ public class RobotsTxt {
this.tables.getHeap(WorkTables.TABLE_ROBOTS_NAME).clear();
this.syncObjects.clear();
}
public void close() {
/* Shutdown all active robots.txt loading threads */
if(this.threadPool != null) {
this.threadPool.shutdownNow();
}
}
public int size() throws IOException {
return this.tables.getHeap(WorkTables.TABLE_ROBOTS_NAME).size();
@ -246,7 +271,19 @@ public class RobotsTxt {
}
}
};
if (concurrent) t.start(); else t.run();
if (concurrent) {
this.threadPool.execute(t);
} else {
t.run();
}
}
/**
* @return the approximate number of threads that are actively
* executing robots.txt loading tasks
*/
public int getActiveThreads() {
return this.threadPool != null ? this.threadPool.getActiveCount() : 0;
}
private void processOldEntry(RobotsTxtEntry robotsTxt4Host, DigestURL robotsURL, BEncodedHeap robotsTable) {
@ -374,45 +411,59 @@ public class RobotsTxt {
}
}
public Collection<CheckEntry> massCrawlCheck(final Collection<DigestURL> rootURLs, final ClientIdentification.Agent userAgent, final int concurrency) {
// put the rootURLs into a blocking queue as input for concurrent computation
final BlockingQueue<DigestURL> in = new LinkedBlockingQueue<DigestURL>();
try {
for (DigestURL u: rootURLs) in.put(u);
for (int i = 0; i < concurrency; i++) in.put(DigestURL.POISON);
} catch (InterruptedException e) {}
final BlockingQueue<CheckEntry> out = new LinkedBlockingQueue<CheckEntry>();
final Thread[] threads = new Thread[concurrency];
for (int i = 0; i < concurrency; i++) {
threads[i] = new Thread("RobotsTxt.massCrawlCheck-" + i) {
@Override
public void run() {
DigestURL u;
try {
while ((u = in.take()) != DigestURL.POISON) {
// try to load the robots
RobotsTxtEntry robotsEntry = getEntry(u, userAgent);
boolean robotsAllowed = robotsEntry == null ? true : !robotsEntry.isDisallowed(u);
if (robotsAllowed) {
try {
Request request = loader.request(u, true, false);
Response response = loader.load(request, CacheStrategy.NOCACHE,
BlacklistType.CRAWLER, userAgent);
out.put(new CheckEntry(u, robotsEntry, response, null));
} catch (final IOException e) {
out.put(new CheckEntry(u, robotsEntry, null, "error response: " + e.getMessage()));
}
} else {
out.put(new CheckEntry(u, robotsEntry, null, null));
}
}
} catch (InterruptedException e) {}
}
};
threads[i].start();
/**
* A unit task to load a robots.txt entry
*/
private class CrawlCheckTask implements Callable<CheckEntry> {
private final DigestURL url;
private final ClientIdentification.Agent userAgent;
public CrawlCheckTask(final DigestURL url, final ClientIdentification.Agent userAgent) {
this.url = url;
this.userAgent = userAgent;
}
@Override
public CheckEntry call() throws Exception {
// try to load the robots
RobotsTxtEntry robotsEntry = getEntry(this.url, this.userAgent);
boolean robotsAllowed = robotsEntry == null ? true : !robotsEntry.isDisallowed(this.url);
if (robotsAllowed) {
try {
Request request = loader.request(this.url, true, false);
Response response = loader.load(request, CacheStrategy.NOCACHE,
BlacklistType.CRAWLER, userAgent);
return new CheckEntry(this.url, robotsEntry, response, null);
} catch (final IOException e) {
return new CheckEntry(this.url, robotsEntry, null, "error response: " + e.getMessage());
}
}
return new CheckEntry(this.url, robotsEntry, null, null);
}
}
public Collection<CheckEntry> massCrawlCheck(final Collection<DigestURL> rootURLs, final ClientIdentification.Agent userAgent) {
final List<Future<CheckEntry>> futures = new ArrayList<>();
for (DigestURL u: rootURLs) {
futures.add(this.threadPool.submit(new CrawlCheckTask(u, userAgent)));
}
final Collection<CheckEntry> results = new ArrayList<>();
/* Now collect the results concurrently loaded */
for(Future<CheckEntry> future: futures) {
try {
results.add(future.get());
} catch (InterruptedException e) {
log.warn("massCrawlCheck was interrupted before retrieving all results.");
break;
} catch (ExecutionException e) {
/* A robots.txt loading failed : let's continue and try to get the next result
* (most of time this should not happen, as Exceptions are caught inside the concurrent task) */
continue;
}
}
// wait for termiation
try {for (Thread t: threads) t.join();} catch (InterruptedException e1) {}
return out;
return results;
}
}

@ -870,7 +870,8 @@ public final class Switchboard extends serverSwitch {
// load the robots.txt db
this.log.config("Initializing robots.txt DB");
this.robots = new RobotsTxt(this.tables, this.loader);
this.robots = new RobotsTxt(this.tables, this.loader,
this.getConfigInt(SwitchboardConstants.ROBOTS_TXT_THREADS_ACTIVE_MAX, SwitchboardConstants.ROBOTS_TXT_THREADS_ACTIVE_MAX_DEFAULT));
try {
this.log.config("Loaded robots.txt DB: " + this.robots.size() + " entries");
} catch (final IOException e) {
@ -1868,6 +1869,7 @@ public final class Switchboard extends serverSwitch {
this.crawlStacker.announceClose();
this.crawlStacker.close();
this.crawlQueues.close();
this.robots.close();
this.indexingDocumentProcessor.shutdown();
this.indexingCondensementProcessor.shutdown();
this.indexingAnalysisProcessor.shutdown();

@ -342,6 +342,10 @@ public final class SwitchboardConstants {
public static final String HTTPC_NAME_CACHE_CACHING_PATTERNS_NO = "httpc.nameCacheNoCachingPatterns";
public static final String ROBOTS_TXT = "httpd.robots.txt";
public static final String ROBOTS_TXT_DEFAULT = RobotsTxtConfig.LOCKED + "," + RobotsTxtConfig.DIRS;
/** Key of the setting configuring how many active robots.txt loading threads may be running on the same time at max */
public static final String ROBOTS_TXT_THREADS_ACTIVE_MAX = "robots.txt.MaxActiveThreads";
/** Default value of the setting configuring how many active robots.txt loading threads may be running on the same time at max */
public static final int ROBOTS_TXT_THREADS_ACTIVE_MAX_DEFAULT = 200;
/**
* <p><code>public static final String <strong>BLACKLIST_CLASS_DEFAULT</strong> = "de.anomic.plasma.urlPattern.defaultURLPattern"</code></p>

@ -42,7 +42,7 @@ public class HostBalancerTest {
assertEquals("After clear", 0, hb.size());
WorkTables wt = new WorkTables(datadir);
RobotsTxt rob = new RobotsTxt(wt, null);
RobotsTxt rob = new RobotsTxt(wt, null, 10);
String res = hb.push(req, null, rob); // push url
assertNull(res); // should have no error text

Loading…
Cancel
Save