|
|
|
@ -63,7 +63,7 @@ public class CrawlQueues {
|
|
|
|
|
public NoticedURL noticeURL;
|
|
|
|
|
public ZURL errorURL, delegatedURL;
|
|
|
|
|
|
|
|
|
|
public CrawlQueues(final Switchboard sb, final File plasmaPath) {
|
|
|
|
|
public CrawlQueues(final Switchboard sb, final File queuePath) {
|
|
|
|
|
this.sb = sb;
|
|
|
|
|
this.log = new Log("CRAWLER");
|
|
|
|
|
this.workers = new ConcurrentHashMap<Integer, crawlWorker>();
|
|
|
|
@ -71,16 +71,69 @@ public class CrawlQueues {
|
|
|
|
|
|
|
|
|
|
// start crawling management
|
|
|
|
|
log.logConfig("Starting Crawling Management");
|
|
|
|
|
noticeURL = new NoticedURL(plasmaPath);
|
|
|
|
|
noticeURL = new NoticedURL(queuePath);
|
|
|
|
|
//errorURL = new plasmaCrawlZURL(); // fresh error DB each startup; can be hold in RAM and reduces IO;
|
|
|
|
|
final File errorDBFile = new File(plasmaPath, "urlError2.db");
|
|
|
|
|
final File errorDBFile = new File(queuePath, "urlError2.db");
|
|
|
|
|
if (errorDBFile.exists()) {
|
|
|
|
|
// delete the error db to get a fresh each time on startup
|
|
|
|
|
// this is useful because there is currently no re-use of the data in this table.
|
|
|
|
|
if (errorDBFile.isDirectory()) SplitTable.delete(plasmaPath, "urlError2.db"); else FileUtils.deletedelete(errorDBFile);
|
|
|
|
|
if (errorDBFile.isDirectory()) SplitTable.delete(queuePath, "urlError2.db"); else FileUtils.deletedelete(errorDBFile);
|
|
|
|
|
}
|
|
|
|
|
errorURL = new ZURL(queuePath, "urlError3.db", false);
|
|
|
|
|
delegatedURL = new ZURL(queuePath, "urlDelegated3.db", true);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void relocate(final File newQueuePath) {
|
|
|
|
|
this.close();
|
|
|
|
|
|
|
|
|
|
this.workers = new ConcurrentHashMap<Integer, crawlWorker>();
|
|
|
|
|
this.remoteCrawlProviderHashes.clear();
|
|
|
|
|
|
|
|
|
|
noticeURL = new NoticedURL(newQueuePath);
|
|
|
|
|
final File errorDBFile = new File(newQueuePath, "urlError2.db");
|
|
|
|
|
if (errorDBFile.exists()) {
|
|
|
|
|
if (errorDBFile.isDirectory()) SplitTable.delete(newQueuePath, "urlError2.db"); else FileUtils.deletedelete(errorDBFile);
|
|
|
|
|
}
|
|
|
|
|
errorURL = new ZURL(newQueuePath, "urlError3.db", false);
|
|
|
|
|
delegatedURL = new ZURL(newQueuePath, "urlDelegated3.db", true);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void close() {
|
|
|
|
|
// wait for all workers to finish
|
|
|
|
|
for (final crawlWorker w: workers.values()) {
|
|
|
|
|
w.interrupt();
|
|
|
|
|
}
|
|
|
|
|
for (final crawlWorker w: workers.values()) {
|
|
|
|
|
try {
|
|
|
|
|
w.join();
|
|
|
|
|
} catch (InterruptedException e) {
|
|
|
|
|
e.printStackTrace();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
noticeURL.close();
|
|
|
|
|
errorURL.close();
|
|
|
|
|
delegatedURL.close();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void clear() {
|
|
|
|
|
// wait for all workers to finish
|
|
|
|
|
for (final crawlWorker w: workers.values()) {
|
|
|
|
|
w.interrupt();
|
|
|
|
|
}
|
|
|
|
|
// TODO: wait some more time until all threads are finished
|
|
|
|
|
workers.clear();
|
|
|
|
|
remoteCrawlProviderHashes.clear();
|
|
|
|
|
noticeURL.clear();
|
|
|
|
|
try {
|
|
|
|
|
errorURL.clear();
|
|
|
|
|
} catch (final IOException e) {
|
|
|
|
|
e.printStackTrace();
|
|
|
|
|
}
|
|
|
|
|
try {
|
|
|
|
|
delegatedURL.clear();
|
|
|
|
|
} catch (final IOException e) {
|
|
|
|
|
e.printStackTrace();
|
|
|
|
|
}
|
|
|
|
|
errorURL = new ZURL(plasmaPath, "urlError3.db", false);
|
|
|
|
|
delegatedURL = new ZURL(plasmaPath, "urlDelegated3.db", true);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
@ -127,44 +180,6 @@ public class CrawlQueues {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void clear() {
|
|
|
|
|
// wait for all workers to finish
|
|
|
|
|
for (final crawlWorker w: workers.values()) {
|
|
|
|
|
w.interrupt();
|
|
|
|
|
}
|
|
|
|
|
// TODO: wait some more time until all threads are finished
|
|
|
|
|
workers.clear();
|
|
|
|
|
remoteCrawlProviderHashes.clear();
|
|
|
|
|
noticeURL.clear();
|
|
|
|
|
try {
|
|
|
|
|
errorURL.clear();
|
|
|
|
|
} catch (final IOException e) {
|
|
|
|
|
e.printStackTrace();
|
|
|
|
|
}
|
|
|
|
|
try {
|
|
|
|
|
delegatedURL.clear();
|
|
|
|
|
} catch (final IOException e) {
|
|
|
|
|
e.printStackTrace();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void close() {
|
|
|
|
|
// wait for all workers to finish
|
|
|
|
|
for (final crawlWorker w: workers.values()) {
|
|
|
|
|
w.interrupt();
|
|
|
|
|
}
|
|
|
|
|
for (final crawlWorker w: workers.values()) {
|
|
|
|
|
try {
|
|
|
|
|
w.join();
|
|
|
|
|
} catch (InterruptedException e) {
|
|
|
|
|
e.printStackTrace();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
noticeURL.close();
|
|
|
|
|
errorURL.close();
|
|
|
|
|
delegatedURL.close();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public Request[] activeWorkerEntries() {
|
|
|
|
|
synchronized (workers) {
|
|
|
|
|
final Request[] e = new Request[workers.size()];
|
|
|
|
@ -195,9 +210,16 @@ public class CrawlQueues {
|
|
|
|
|
", robinsonMode=" + ((sb.isRobinsonMode()) ? "on" : "off"));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if(!crawlIsPossible(NoticedURL.STACK_TYPE_CORE, "Core")) return false;
|
|
|
|
|
String queueCheck = crawlIsPossible(NoticedURL.STACK_TYPE_CORE, "Core");
|
|
|
|
|
if (queueCheck != null) {
|
|
|
|
|
if (log.isFine()) log.logFine("omitting de-queue/local: " + queueCheck);
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if(isPaused(SwitchboardConstants.CRAWLJOB_LOCAL_CRAWL)) return false;
|
|
|
|
|
if (isPaused(SwitchboardConstants.CRAWLJOB_LOCAL_CRAWL)) {
|
|
|
|
|
if (log.isFinest()) log.logFinest("omitting de-queue/local: paused");
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// do a local crawl
|
|
|
|
|
Request urlEntry = null;
|
|
|
|
@ -291,34 +313,29 @@ public class CrawlQueues {
|
|
|
|
|
* @param type
|
|
|
|
|
* @return
|
|
|
|
|
*/
|
|
|
|
|
private boolean crawlIsPossible(int stackType, final String type) {
|
|
|
|
|
int value;
|
|
|
|
|
private String crawlIsPossible(int stackType, final String type) {
|
|
|
|
|
//System.out.println("stacksize = " + noticeURL.stackSize(stackType));
|
|
|
|
|
if (noticeURL.stackSize(stackType) == 0) {
|
|
|
|
|
//log.logDebug("GlobalCrawl: queue is empty");
|
|
|
|
|
return false;
|
|
|
|
|
return "stack is empty";
|
|
|
|
|
}
|
|
|
|
|
value = (int) sb.getConfigLong(SwitchboardConstants.CRAWLER_THREADS_ACTIVE_MAX, 10);
|
|
|
|
|
if (this.size() >= value) {
|
|
|
|
|
// try a cleanup
|
|
|
|
|
|
|
|
|
|
// check the worker threads
|
|
|
|
|
int maxWorkers = (int) sb.getConfigLong(SwitchboardConstants.CRAWLER_THREADS_ACTIVE_MAX, 10);
|
|
|
|
|
if (this.workers.size() >= maxWorkers) {
|
|
|
|
|
// too many worker threads, try a cleanup
|
|
|
|
|
this.cleanup();
|
|
|
|
|
}
|
|
|
|
|
// check again
|
|
|
|
|
if (this.size() >= value) {
|
|
|
|
|
if (this.log.isFine()) {
|
|
|
|
|
log.logFine(type + "Crawl: too many processes in loader queue, dismissed (" + "cacheLoader=" + this.size() + "), httpClients = " + Client.connectionCount());
|
|
|
|
|
}
|
|
|
|
|
return false;
|
|
|
|
|
if (this.workers.size() >= maxWorkers) {
|
|
|
|
|
return "too many workers active: " + this.workers.size();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
String cautionCause = sb.onlineCaution();
|
|
|
|
|
if (cautionCause != null) {
|
|
|
|
|
if (this.log.isFine()) {
|
|
|
|
|
log.logFine(type + "Crawl: online caution for " + cautionCause + ", omitting processing");
|
|
|
|
|
}
|
|
|
|
|
return false;
|
|
|
|
|
return "online caution: " + cautionCause;
|
|
|
|
|
}
|
|
|
|
|
return true;
|
|
|
|
|
return null;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public boolean remoteCrawlLoaderJob() {
|
|
|
|
@ -467,9 +484,16 @@ public class CrawlQueues {
|
|
|
|
|
|
|
|
|
|
// do nothing if either there are private processes to be done
|
|
|
|
|
// or there is no global crawl on the stack
|
|
|
|
|
if (!crawlIsPossible(NoticedURL.STACK_TYPE_REMOTE, "Global")) return false;
|
|
|
|
|
String queueCheck = crawlIsPossible(NoticedURL.STACK_TYPE_REMOTE, "Global");
|
|
|
|
|
if (queueCheck != null) {
|
|
|
|
|
if (log.isFine()) log.logFine("omitting de-queue/remote: " + queueCheck);
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (isPaused(SwitchboardConstants.CRAWLJOB_REMOTE_TRIGGERED_CRAWL)) return false;
|
|
|
|
|
if (isPaused(SwitchboardConstants.CRAWLJOB_REMOTE_TRIGGERED_CRAWL)) {
|
|
|
|
|
if (log.isFinest()) log.logFinest("omitting de-queue/remote: paused");
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// we don't want to crawl a global URL globally, since WE are the global part. (from this point of view)
|
|
|
|
|
final String stats = "REMOTETRIGGEREDCRAWL[" + noticeURL.stackSize(NoticedURL.STACK_TYPE_CORE) + ", " + noticeURL.stackSize(NoticedURL.STACK_TYPE_LIMIT) + ", " + noticeURL.stackSize(NoticedURL.STACK_TYPE_OVERHANG) + ", "
|
|
|
|
|