You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
446 lines
15 KiB
446 lines
15 KiB
package net.yacy.crawler;
|
|
|
|
import static org.junit.Assert.assertEquals;
|
|
import static org.junit.Assert.assertNotNull;
|
|
import static org.junit.Assert.assertNull;
|
|
import static org.junit.Assert.assertTrue;
|
|
|
|
import java.io.ByteArrayInputStream;
|
|
import java.io.File;
|
|
import java.io.IOException;
|
|
import java.net.MalformedURLException;
|
|
import java.nio.charset.StandardCharsets;
|
|
import java.util.ArrayList;
|
|
import java.util.Date;
|
|
import java.util.Iterator;
|
|
import java.util.List;
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.logging.LogManager;
|
|
|
|
import org.junit.Test;
|
|
|
|
import net.yacy.cora.document.encoding.ASCII;
|
|
import net.yacy.cora.document.id.DigestURL;
|
|
import net.yacy.cora.federate.yacy.CacheStrategy;
|
|
import net.yacy.cora.order.Base64Order;
|
|
import net.yacy.cora.protocol.ClientIdentification;
|
|
import net.yacy.cora.protocol.Domains;
|
|
import net.yacy.cora.storage.HandleSet;
|
|
import net.yacy.cora.util.ConcurrentLog;
|
|
import net.yacy.cora.util.SpaceExceededException;
|
|
import net.yacy.crawler.data.CrawlProfile;
|
|
import net.yacy.crawler.retrieval.Request;
|
|
import net.yacy.crawler.robots.RobotsTxt;
|
|
import net.yacy.data.WorkTables;
|
|
import net.yacy.kelondro.blob.ArrayStack;
|
|
import net.yacy.kelondro.data.word.Word;
|
|
import net.yacy.kelondro.index.RowHandleSet;
|
|
import net.yacy.kelondro.util.FileUtils;
|
|
import net.yacy.search.SwitchboardConstants;
|
|
|
|
public class HostBalancerTest {
|
|
|
|
private static final File QUEUES_ROOT = new File("test/DATA/INDEX/QUEUES");
|
|
private static final File DATA_DIR = new File("test/DATA");
|
|
|
|
private static final boolean EXCEED_134217727 = true;
|
|
private static final int ON_DEMAND_LIMIT = 1000;
|
|
|
|
/**
|
|
* Test of reopen existing HostBalancer cache to test/demonstrate issue with
|
|
* HostQueue for file: protocol
|
|
*/
|
|
@Test
|
|
public void testReopen() throws IOException, SpaceExceededException, InterruptedException {
|
|
String hostDir = "C:\\filedirectory";
|
|
|
|
// prepare one urls for push test
|
|
String urlstr = "file:///" + hostDir;
|
|
DigestURL url = new DigestURL(urlstr);
|
|
Request req = new Request(url, null);
|
|
|
|
FileUtils.deletedelete(QUEUES_ROOT); // start clean test
|
|
|
|
HostBalancer hb = new HostBalancer(QUEUES_ROOT, ON_DEMAND_LIMIT, EXCEED_134217727, false);
|
|
hb.clear();
|
|
|
|
Thread.sleep(100);
|
|
assertEquals("After clear", 0, hb.size());
|
|
|
|
WorkTables wt = new WorkTables(DATA_DIR);
|
|
RobotsTxt rob = new RobotsTxt(wt, null, 10);
|
|
|
|
String res = hb.push(req, null, rob); // push url
|
|
assertNull(res); // should have no error text
|
|
assertTrue(hb.has(url.hash())); // check existence
|
|
assertEquals("first push of one url", 1, hb.size()); // expected size=1
|
|
|
|
res = hb.push(req, null, rob); // push same url (should be rejected = double occurence)
|
|
assertNotNull(res); // should state double occurrence
|
|
assertTrue(hb.has(url.hash()));
|
|
assertEquals("second push of same url", 1, hb.size());
|
|
|
|
hb.close(); // close
|
|
|
|
Thread.sleep(200); // wait a bit for file operation
|
|
|
|
hb = new HostBalancer(QUEUES_ROOT, ON_DEMAND_LIMIT, EXCEED_134217727, false); // reopen balancer
|
|
|
|
assertEquals("size after reopen (with one existing url)", 1, hb.size()); // expect size=1 from previous push
|
|
assertTrue("check existance of pushed url", hb.has(url.hash())); // check url exists (it fails as after reopen internal queue.hosthash is wrong)
|
|
|
|
res = hb.push(req, null, rob); // push same url as before (should be rejected, but isn't due to hosthash mismatch afte reopen)
|
|
assertNotNull("should state double occurence", res);
|
|
assertEquals("first push of same url after reopen", 1, hb.size()); // should stay size=1
|
|
assertTrue("check existance of pushed url", hb.has(url.hash()));
|
|
|
|
res = hb.push(req, null, rob);
|
|
assertNotNull("should state double occurence", res);
|
|
assertTrue("check existance of pushed url", hb.has(url.hash()));
|
|
assertEquals("second push of same url after reopen", 1, hb.size()); // double check, should stay size=1
|
|
|
|
// list all urls in hostbalancer
|
|
Iterator<Request> it = hb.iterator();
|
|
while (it.hasNext()) {
|
|
Request rres = it.next();
|
|
System.out.println(rres.toString());
|
|
}
|
|
hb.close();
|
|
|
|
}
|
|
|
|
/**
|
|
* A test task performing some operations to be profiled on the HostBalancer. To
|
|
* run concurrently.
|
|
*
|
|
*/
|
|
private static class ProfilingTask extends Thread {
|
|
|
|
private static final CrawlProfile CRAWL_PROFILE = new CrawlProfile(
|
|
CrawlSwitchboard.CRAWL_PROFILE_SNIPPET_GLOBAL_TEXT, CrawlProfile.MATCH_ALL_STRING, // crawlerUrlMustMatch
|
|
CrawlProfile.MATCH_NEVER_STRING, // crawlerUrlMustNotMatch
|
|
CrawlProfile.MATCH_ALL_STRING, // crawlerIpMustMatch
|
|
CrawlProfile.MATCH_NEVER_STRING, // crawlerIpMustNotMatch
|
|
CrawlProfile.MATCH_NEVER_STRING, // crawlerCountryMustMatch
|
|
CrawlProfile.MATCH_NEVER_STRING, // crawlerNoDepthLimitMatch
|
|
CrawlProfile.MATCH_ALL_STRING, // indexUrlMustMatch
|
|
CrawlProfile.MATCH_NEVER_STRING, // indexUrlMustNotMatch
|
|
CrawlProfile.MATCH_ALL_STRING, // indexContentMustMatch
|
|
CrawlProfile.MATCH_NEVER_STRING, // indexContentMustNotMatch
|
|
0, false, CrawlProfile.getRecrawlDate(CrawlSwitchboard.CRAWL_PROFILE_SNIPPET_GLOBAL_TEXT_RECRAWL_CYCLE),
|
|
-1, true, true, true, false, // crawlingQ, followFrames, obeyHtmlRobotsNoindex, obeyHtmlRobotsNofollow,
|
|
true, true, true, false, -1, false, true, CrawlProfile.MATCH_NEVER_STRING, CacheStrategy.IFEXIST,
|
|
"robot_" + CrawlSwitchboard.CRAWL_PROFILE_SNIPPET_GLOBAL_TEXT,
|
|
ClientIdentification.yacyIntranetCrawlerAgentName, null, null, 0);
|
|
|
|
/** RobotsTxt instance */
|
|
private final RobotsTxt robots;
|
|
|
|
/** The HostBalancer instance target */
|
|
private final HostBalancer balancer;
|
|
|
|
/** The test URLs for this task */
|
|
private final List<DigestURL> urls;
|
|
|
|
/** Number of steps to run */
|
|
private final int maxSteps;
|
|
|
|
/** Number of steps effectively run */
|
|
private int steps;
|
|
|
|
/** Sleep time (in milliseconds) between each operation */
|
|
private final long sleepTime;
|
|
|
|
/** Number of HostBalancer.push() failures */
|
|
private int pushFailures;
|
|
|
|
/** Total time spent (in nanoseconds) on the HostBalancer.push() operation */
|
|
private long pushTime;
|
|
|
|
/** Maximum time spent (in nanoseconds)on the HostBalancer.push() operation */
|
|
private long maxPushTime;
|
|
|
|
/** Total time spent (in nanoseconds) on the HostBalancer.has() operation */
|
|
private long hasTime;
|
|
|
|
/** Maximum time spent (in nanoseconds) on the HostBalancer.has() operation */
|
|
private long maxHasTime;
|
|
|
|
/** Total time spent (in nanoseconds) on the HostBalancer.remove() operation */
|
|
private long removeTime;
|
|
|
|
/** Maximum time spent (in nanoseconds) on the HostBalancer.remove() operation */
|
|
private long maxRemoveTime;
|
|
|
|
/**
|
|
* @param balancer the HostBalancer instance to be tested
|
|
* @param urls
|
|
* the test URLs
|
|
* @param steps
|
|
* number of loops
|
|
* @param sleepTime
|
|
* sleep time (in milliseconds) between each operation
|
|
*/
|
|
public ProfilingTask(final HostBalancer balancer, final RobotsTxt robots, final List<DigestURL> urls, final int steps, final long sleepTime) {
|
|
this.balancer = balancer;
|
|
this.robots = robots;
|
|
this.urls = urls;
|
|
this.maxSteps = steps;
|
|
this.sleepTime = sleepTime;
|
|
}
|
|
|
|
private void sleep() {
|
|
if (this.sleepTime > 0) {
|
|
try {
|
|
Thread.sleep(this.sleepTime);
|
|
} catch (InterruptedException ignored) {
|
|
}
|
|
}
|
|
}
|
|
|
|
@Override
|
|
public void run() {
|
|
try {
|
|
this.pushTime = 0;
|
|
this.maxPushTime = 0;
|
|
this.hasTime = 0;
|
|
this.maxHasTime = 0;
|
|
this.maxRemoveTime = 0;
|
|
this.removeTime = 0;
|
|
this.steps = 0;
|
|
long time;
|
|
while (this.steps < this.maxSteps) {
|
|
int processedURLs = 0;
|
|
/* Run the same steps for each test URL */
|
|
for (final DigestURL url : urls) {
|
|
if (this.steps >= this.maxSteps) {
|
|
break;
|
|
}
|
|
final byte[] urlHash = url.hash();
|
|
final Request req = new Request(ASCII.getBytes("testPeer"), url, null, "", new Date(),
|
|
CRAWL_PROFILE.handle(), 0, CRAWL_PROFILE.timezoneOffset());
|
|
|
|
|
|
/* Measure push() */
|
|
time = System.nanoTime();
|
|
try {
|
|
if(this.balancer.push(req, CRAWL_PROFILE, this.robots) != null) {
|
|
this.pushFailures++;
|
|
}
|
|
} catch (final SpaceExceededException e) {
|
|
this.pushFailures++;
|
|
}
|
|
time = (System.nanoTime() - time);
|
|
|
|
this.pushTime += time;
|
|
this.maxPushTime = Math.max(time, this.maxPushTime);
|
|
|
|
sleep();
|
|
|
|
/* Measure get() */
|
|
time = System.nanoTime();
|
|
this.balancer.has(urlHash);
|
|
time = (System.nanoTime() - time);
|
|
|
|
this.hasTime += time;
|
|
this.maxHasTime = Math.max(time, this.maxHasTime);
|
|
|
|
sleep();
|
|
|
|
this.steps++;
|
|
processedURLs++;
|
|
}
|
|
|
|
/* Now delete each previously inserted URL */
|
|
for (int i = 0; i < processedURLs; i++) {
|
|
DigestURL url = urls.get(i);
|
|
byte[] urlHash = url.hash();
|
|
final HandleSet urlHashes = new RowHandleSet(Word.commonHashLength, Base64Order.enhancedCoder, 1);
|
|
try {
|
|
urlHashes.put(urlHash);
|
|
|
|
/* Measure remove() operation */
|
|
time = System.nanoTime();
|
|
this.balancer.remove(urlHashes);
|
|
time = (System.nanoTime() - time);
|
|
|
|
this.removeTime += time;
|
|
this.maxRemoveTime = Math.max(time, this.maxRemoveTime);
|
|
} catch (final SpaceExceededException e) {
|
|
// should not happen
|
|
e.printStackTrace();
|
|
}
|
|
|
|
sleep();
|
|
}
|
|
}
|
|
} catch (MalformedURLException e) {
|
|
e.printStackTrace();
|
|
} catch (IOException e) {
|
|
e.printStackTrace();
|
|
}
|
|
}
|
|
|
|
public int getSteps() {
|
|
return this.steps;
|
|
}
|
|
|
|
public int getPushFailures() {
|
|
return this.pushFailures;
|
|
}
|
|
|
|
public long getPushTime() {
|
|
return this.pushTime;
|
|
}
|
|
|
|
public long getMaxPushTime() {
|
|
return this.maxPushTime;
|
|
}
|
|
|
|
public long getHasTime() {
|
|
return this.hasTime;
|
|
}
|
|
|
|
public long getMaxHasTime() {
|
|
return this.maxHasTime;
|
|
}
|
|
|
|
public long getRemoveTime() {
|
|
return this.removeTime;
|
|
}
|
|
|
|
public long getMaxRemoveTime() {
|
|
return this.maxRemoveTime;
|
|
}
|
|
|
|
|
|
}
|
|
|
|
/**
|
|
* Run a stress test on the HostBalancer
|
|
*
|
|
* @param args
|
|
* main arguments
|
|
* @throws IOException
|
|
* when a error occurred
|
|
*/
|
|
public static void main(final String args[]) throws IOException {
|
|
System.out.println("Stress test on HostBalancer");
|
|
|
|
/*
|
|
* Set the root log level to WARNING to prevent filling the console with
|
|
* too many information log messages
|
|
*/
|
|
LogManager.getLogManager()
|
|
.readConfiguration(new ByteArrayInputStream(".level=WARNING".getBytes(StandardCharsets.ISO_8859_1)));
|
|
|
|
/* Main control parameters. Modify values for different scenarios. */
|
|
|
|
/* Number of concurrent test tasks */
|
|
final int threads = 50;
|
|
/* Number of steps in each task */
|
|
final int steps = 100;
|
|
/* Number of test URLs in each task */
|
|
final int urlsPerThread = 5;
|
|
/* Sleep time between each measured operation on the balancer */
|
|
final long sleepTime = 0;
|
|
|
|
final RobotsTxt robots = new RobotsTxt(new WorkTables(DATA_DIR), null,
|
|
SwitchboardConstants.ROBOTS_TXT_THREADS_ACTIVE_MAX_DEFAULT);
|
|
|
|
FileUtils.deletedelete(QUEUES_ROOT);
|
|
|
|
final HostBalancer hb = new HostBalancer(QUEUES_ROOT, ON_DEMAND_LIMIT, EXCEED_134217727, false);
|
|
hb.clear();
|
|
|
|
System.out.println("HostBalancer initialized with persistent queues folder " + QUEUES_ROOT);
|
|
|
|
try {
|
|
System.out.println("Starting " + threads + " threads ...");
|
|
long time = System.nanoTime();
|
|
final List<ProfilingTask> tasks = new ArrayList<>();
|
|
for (int count = 0; count < threads; count++) {
|
|
final List<DigestURL> urls = new ArrayList<>();
|
|
for (int i = 0; i < urlsPerThread; i++) {
|
|
/* We use here local test URLs to prevent running RobotsTxt internals */
|
|
urls.add(new DigestURL("http://localhost/" + i + "/" + count));
|
|
}
|
|
final ProfilingTask thread = new ProfilingTask(hb, robots, urls, steps, sleepTime);
|
|
thread.start();
|
|
tasks.add(thread);
|
|
}
|
|
/* Wait for tasks termination */
|
|
for (final ProfilingTask task : tasks) {
|
|
try {
|
|
task.join();
|
|
} catch (InterruptedException e) {
|
|
e.printStackTrace();
|
|
}
|
|
}
|
|
/*
|
|
* Check consistency : balancer cache should be empty when all tasks have
|
|
* terminated without error
|
|
*/
|
|
final int depthCacheSize = HostBalancer.depthCache.size();
|
|
if(depthCacheSize > 0) {
|
|
System.out.println("Depth cache is not empty!!! Actual URLs count : " + depthCacheSize);
|
|
}
|
|
|
|
System.out.println("All threads terminated in " + TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - time)
|
|
+ "s. Computing statistics...");
|
|
long pushTime = 0;
|
|
long maxPushTime = 0;
|
|
long hasTime = 0;
|
|
long maxHasTime = 0;
|
|
int pushFailures = 0;
|
|
long removeTime = 0;
|
|
long maxRemoveTime = 0;
|
|
long totalSteps = 0;
|
|
for (final ProfilingTask task : tasks) {
|
|
pushTime += task.getPushTime();
|
|
maxPushTime = Math.max(task.getMaxPushTime(), maxPushTime);
|
|
hasTime += task.getHasTime();
|
|
maxHasTime = Math.max(task.getMaxHasTime(), maxHasTime);
|
|
pushFailures += task.getPushFailures();
|
|
removeTime += task.getRemoveTime();
|
|
maxRemoveTime = Math.max(task.getMaxRemoveTime(), maxRemoveTime);
|
|
totalSteps += task.getSteps();
|
|
}
|
|
System.out.println("HostBalancer.push() total time (ms) : " + TimeUnit.NANOSECONDS.toMillis(pushTime));
|
|
System.out.println("HostBalancer.push() maximum time (ms) : " + TimeUnit.NANOSECONDS.toMillis(maxPushTime));
|
|
System.out
|
|
.println("HostBalancer.push() mean time (ms) : " + TimeUnit.NANOSECONDS.toMillis(pushTime / totalSteps));
|
|
System.out
|
|
.println("HostBalancer.push() failures : " + pushFailures);
|
|
System.out.println("");
|
|
System.out.println("HostBalancer.has() total time (ms) : " + TimeUnit.NANOSECONDS.toMillis(hasTime));
|
|
System.out.println(
|
|
"HostBalancer.has() maximum time (ms) : " + TimeUnit.NANOSECONDS.toMillis(maxHasTime));
|
|
System.out.println("HostBalancer.has() mean time (ms) : "
|
|
+ TimeUnit.NANOSECONDS.toMillis(hasTime / totalSteps));
|
|
System.out.println("");
|
|
System.out.println("HostBalancer.remove() total time (ms) : " + TimeUnit.NANOSECONDS.toMillis(removeTime));
|
|
System.out.println("HostBalancer.remove() maximum time (ms) : " + TimeUnit.NANOSECONDS.toMillis(maxRemoveTime));
|
|
System.out.println(
|
|
"HostBalancer.remove() mean time (ms) : " + TimeUnit.NANOSECONDS.toMillis(removeTime / totalSteps));
|
|
} finally {
|
|
try {
|
|
hb.close();
|
|
} finally {
|
|
/* Shutdown running threads */
|
|
ArrayStack.shutdownDeleteService();
|
|
|
|
robots.close();
|
|
|
|
try {
|
|
Domains.close();
|
|
} finally {
|
|
ConcurrentLog.shutdown();
|
|
}
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
}
|