to prevent crawler to concurrently access and alter same crawl queue

after restart, put hosthash in queue's filename (which is used as primary 
key for crawl queue. Hint: initial hosthash from url and recalculated hosthash 
from just hostname:port are not the same. 
fixes http://mantis.tokeek.de/view.php?id=668 (partially)
pull/62/head
reger 9 years ago
parent 2cc4e56010
commit 22db449f2a

@ -103,7 +103,7 @@ public class HostBalancer implements Balancer {
queue.close();
FileUtils.deletedelete(queuePath);
} else {
queues.put(DigestURL.hosthash(queue.getHost(), queue.getPort()), queue);
queues.put(queue.getHostHash(), queue);
}
} catch (MalformedURLException | RuntimeException e) {
log.warn("delete queue due to init error for " + hostsPath.getName() + " host=" + hoststr + " " + e.getLocalizedMessage());
@ -244,11 +244,11 @@ public class HostBalancer implements Balancer {
public String push(final Request entry, CrawlProfile profile, final RobotsTxt robots) throws IOException, SpaceExceededException {
if (this.has(entry.url().hash())) return "double occurrence";
depthCache.put(entry.url().hash(), entry.depth());
String hosthash = ASCII.String(entry.url().hash(), 6, 6);
String hosthash = entry.url().hosthash();
synchronized (this) {
HostQueue queue = this.queues.get(hosthash);
if (queue == null) {
queue = new HostQueue(this.hostsPath, entry.url().getHost(), entry.url().getPort(), this.queues.size() > this.onDemandLimit, this.exceed134217727);
queue = new HostQueue(this.hostsPath, entry.url(), this.queues.size() > this.onDemandLimit, this.exceed134217727);
this.queues.put(hosthash, queue);
// profile might be null when continue crawls after YaCy restart
robots.ensureExist(entry.url(), profile == null ? ClientIdentification.yacyInternetCrawlerAgent : profile.getAgent(), true); // concurrently load all robots.txt
@ -376,7 +376,7 @@ public class HostBalancer implements Balancer {
for (String h: lastEntries) this.roundRobinHostHashes.remove(h);
}
}
/*
// first strategy: get one entry which does not need sleep time
Iterator<String> nhhi = this.roundRobinHostHashes.iterator();
@ -386,7 +386,7 @@ public class HostBalancer implements Balancer {
if (rhq == null) {
nhhi.remove();
continue nosleep;
}
}
int delta = Latency.waitingRemainingGuessed(rhq.getHost(), rhh, robots, ClientIdentification.yacyInternetCrawlerAgent);
if (delta <= 10 || this.roundRobinHostHashes.size() == 1 || rhq.size() == 1) {
nhhi.remove();
@ -494,7 +494,7 @@ public class HostBalancer implements Balancer {
map.put(hq.getHost() + ":" + hq.getPort(), new Integer[]{hq.size(), delta});
} catch (MalformedURLException e) {
ConcurrentLog.logException(e);
}
}
return map;
}

@ -64,28 +64,48 @@ public class HostQueue implements Balancer {
private static final int EcoFSBufferSize = 1000;
private static final int objectIndexBufferSize = 1000;
private final File hostPath;
private final File hostPath; // path to the stack files
private final String hostName;
private String hostHash;
private final String hostHash;
private final int port;
private final boolean exceed134217727;
private final boolean onDemand;
private TreeMap<Integer, Index> depthStacks;
/**
* Create or open host queue. The host part of the hostUrl parameter is used
* to calculate the stack directory name.
*
* @param hostsPath
* @param hostUrl
* @param onDemand
* @param exceed134217727
* @throws MalformedURLException
*/
public HostQueue (
final File hostsPath,
final String hostName,
final int port,
final DigestURL hostUrl, // any url from that host (only host data are extracted)
final boolean onDemand,
final boolean exceed134217727) throws MalformedURLException {
this.onDemand = onDemand;
this.exceed134217727 = exceed134217727;
this.hostName = (hostName == null) ? "localhost" : hostName; // might be null (file://) but hostqueue needs a name (for queue file)
this.port = port;
this.hostPath = new File(hostsPath, this.hostName + "." + this.port);
this.hostName = (hostUrl.getHost() == null) ? "localhost" : hostUrl.getHost(); // might be null (file://) but hostqueue needs a name (for queue file)
this.port = hostUrl.getPort();
this.hostHash = hostUrl.hosthash(); // hosthash is calculated by protocol + hostname + port
// hostName/port included just for human readability (& historically), "-#" marker used to define begin of hosthash in directoryname
this.hostPath = new File(hostsPath, this.hostName + "-#"+ this.hostHash + "." + this.port);
init();
}
/**
* Initializes host queue from cache files. The internal id of the queue is
* extracted form the path name an must match the key initially generated
* currently the hosthash is used as id.
* @param hostPath path of the stack directory (containing the primary key/id of the queue)
* @param onDemand
* @param exceed134217727
* @throws MalformedURLException
*/
public HostQueue (
final File hostPath,
final boolean onDemand,
@ -95,10 +115,14 @@ public class HostQueue implements Balancer {
this.hostPath = hostPath;
// parse the hostName and port from the file name
String filename = hostPath.getName();
int p = filename.lastIndexOf('.');
if (p < 0) throw new RuntimeException("hostPath name must contain a dot: " + filename);
this.hostName = filename.substring(0, p);
this.port = Integer.parseInt(filename.substring(p + 1)); // consider "host.com" contains dot but no required port -> will throw exception
int pdot = filename.lastIndexOf('.');
if (pdot < 0) throw new RuntimeException("hostPath name must contain a dot: " + filename);
this.port = Integer.parseInt(filename.substring(pdot + 1)); // consider "host.com" contains dot but no required port -> will throw exception
int p1 = filename.lastIndexOf("-#");
if (p1 >= 0) {
this.hostName = filename.substring(0,p1);
this.hostHash = filename.substring(p1+2,pdot);
} else throw new RuntimeException("hostPath name must contain -# followd by hosthash: " + filename);
init();
}
@ -107,14 +131,6 @@ public class HostQueue implements Balancer {
* @throws MalformedURLException if directory for the host could not be created
*/
private final void init() throws MalformedURLException {
try {
if (this.hostName == null)
this.hostHash="";
else
this.hostHash = DigestURL.hosthash(this.hostName, this.port);
} catch (MalformedURLException e) {
this.hostHash = "";
}
if (!(this.hostPath.exists())) {
this.hostPath.mkdirs();
if (!this.hostPath.exists()) { // check if directory created (if not, likely a name violation)
@ -134,6 +150,15 @@ public class HostQueue implements Balancer {
return this.port;
}
/**
* Get the hosthash of this queue determined during init.
*
* @return
*/
public String getHostHash() {
return this.hostHash;
}
private int openAllStacks() {
String[] l = this.hostPath.list();
int c = 0;
@ -157,26 +182,6 @@ public class HostQueue implements Balancer {
return c;
}
public synchronized int getLowestStackDepth() {
while (this.depthStacks.size() > 0) {
Map.Entry<Integer, Index> entry;
synchronized (this) {
entry = this.depthStacks.firstEntry();
}
if (entry == null) return 0; // happens only if map is empty
if (entry.getValue().size() == 0) {
entry.getValue().close();
deletedelete(getFile(entry.getKey()));
this.depthStacks.remove(entry.getKey());
continue;
}
return entry.getKey();
}
// this should not happen but it happens if a deletion is done
//assert false;
return 0;
}
private Index getLowestStack() {
while (this.depthStacks.size() > 0) {
Map.Entry<Integer, Index> entry;
@ -196,16 +201,17 @@ public class HostQueue implements Balancer {
//assert false;
return null;
}
/**
* Get existing url stack with crawl depth or create a new (empty) stack
*
* @param depth
* @return existing or new/empty stack
*/
private Index getStack(int depth) {
Index depthStack;
synchronized (this) {
depthStack = this.depthStacks.get(depth);
if (depthStack != null) return depthStack;
}
// create a new stack
synchronized (this) {
// check again
depthStack = this.depthStacks.get(depth);
if (depthStack != null) return depthStack;
// now actually create a new stack

Loading…
Cancel
Save