addressing/fixing some concurrency issues from

https://github.com/yacy/yacy_search_server/issues/505
pull/533/head
Michael Peter Christen 2 years ago
parent a2a40a3096
commit 23f1dc3741

@ -70,7 +70,7 @@ public class HostBalancer implements Balancer {
private final File hostsPath;
private final boolean exceed134217727;
private final Map<String, HostQueue> queues;
private final ConcurrentHashMap<String, HostQueue> queues;
private final Set<String> roundRobinHostHashes;
private final int onDemandLimit;
@ -283,12 +283,18 @@ public class HostBalancer implements Balancer {
* @throws SpaceExceededException
*/
@Override
public String push(final Request entry, CrawlProfile profile, final RobotsTxt robots) throws IOException, SpaceExceededException {
public String push(final Request entry, final CrawlProfile profile, final RobotsTxt robots) throws IOException, SpaceExceededException {
if (this.has(entry.url().hash())) return "double occurrence";
depthCache.put(entry.url().hash(), entry.depth());
final String hosthash = entry.url().hosthash();
// try a concurrent push
HostQueue queue = this.queues.get(hosthash);
if (queue != null) return queue.push(entry, profile, robots);
// to prevent new double HostQueue creation, do this now synchronized
synchronized (this) {
HostQueue queue = this.queues.get(hosthash);
queue = this.queues.get(hosthash);
if (queue == null) {
queue = new HostQueue(this.hostsPath, entry.url(), this.queues.size() > this.onDemandLimit, this.exceed134217727);
this.queues.put(hosthash, queue);
@ -311,7 +317,7 @@ public class HostBalancer implements Balancer {
* @throws SpaceExceededException
*/
@Override
public Request pop(boolean delay, CrawlSwitchboard cs, RobotsTxt robots) throws IOException {
public Request pop(final boolean delay, final CrawlSwitchboard cs, final RobotsTxt robots) throws IOException {
tryagain: while (true) try {
HostQueue rhq = null;
String rhh = null;
@ -551,7 +557,7 @@ public class HostBalancer implements Balancer {
* @return a map of clear text strings of host names + ports to an integer array: {the size of the domain stack, guessed delta waiting time}
*/
@Override
public Map<String, Integer[]> getDomainStackHosts(RobotsTxt robots) {
public Map<String, Integer[]> getDomainStackHosts(final RobotsTxt robots) {
final Map<String, Integer[]> map = new TreeMap<>(); // we use a tree map to get a stable ordering
for (final HostQueue hq: this.queues.values()) {
final int delta = Latency.waitingRemainingGuessed(hq.getHost(), hq.getPort(), hq.getHostHash(), robots, ClientIdentification.yacyInternetCrawlerAgent);
@ -568,7 +574,7 @@ public class HostBalancer implements Balancer {
* @return a list of crawl loader requests
*/
@Override
public List<Request> getDomainStackReferences(String host, int maxcount, long maxtime) {
public List<Request> getDomainStackReferences(final String host, final int maxcount, final long maxtime) {
if (host == null) {
return Collections.emptyList();
}

@ -36,8 +36,10 @@ import java.util.ConcurrentModificationException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentSkipListMap;
import net.yacy.cora.document.encoding.ASCII;
import net.yacy.cora.document.encoding.UTF8;
@ -76,7 +78,7 @@ public class HostQueue implements Balancer {
private final int port;
private final boolean exceed134217727;
private final boolean onDemand;
private TreeMap<Integer, Index> depthStacks;
private final NavigableMap<Integer, Index> depthStacks;
/**
* Create or open host queue. The host part of the hostUrl parameter is used
@ -112,6 +114,7 @@ public class HostQueue implements Balancer {
} else {
this.hostPath = new File(hostsPath, this.hostName + "-#"+ this.hostHash + "." + this.port);
}
this.depthStacks = new ConcurrentSkipListMap<>();
this.init();
}
@ -152,6 +155,7 @@ public class HostQueue implements Balancer {
}
this.hostHash = filename.substring(p1+2,pdot);
} else throw new RuntimeException("hostPath name must contain -# followd by hosthash: " + filename);
this.depthStacks = new ConcurrentSkipListMap<>();
this.init();
}
@ -166,7 +170,6 @@ public class HostQueue implements Balancer {
throw new MalformedURLException("hostPath could not be created: " + this.hostPath.toString());
}
}
this.depthStacks = new TreeMap<>();
final int size = this.openAllStacks();
if (log.isInfo()) log.info("opened HostQueue " + this.hostPath.getAbsolutePath() + " with " + size + " urls.");
}
@ -258,7 +261,7 @@ public class HostQueue implements Balancer {
return f;
}
private Index openStack(File f) {
private Index openStack(final File f) {
for (int i = 0; i < 10; i++) {
// we try that again if it fails because it shall not fail
if (this.onDemand && (!f.exists() || f.length() < 10000)) {
@ -300,12 +303,14 @@ public class HostQueue implements Balancer {
}
@Override
public synchronized void clear() {
for (final Map.Entry<Integer, Index> entry: this.depthStacks.entrySet()) {
entry.getValue().close();
deletedelete(this.getFile(entry.getKey()));
public void clear() {
final Set<Integer> keys = this.depthStacks.keySet(); // make a copy to be able to delete those concurrently
for (final Integer key: keys) {
final Index index = this.depthStacks.get(key);
index.close();
deletedelete(this.getFile(key));
this.depthStacks.remove(key);
}
this.depthStacks.clear();
final String[] l = this.hostPath.list();
if (l != null) for (final String s: l) {
deletedelete(new File(this.hostPath, s));
@ -422,9 +427,10 @@ public class HostQueue implements Balancer {
}
@Override
public String push(final Request entry, CrawlProfile profile, final RobotsTxt robots) throws IOException, SpaceExceededException {
public String push(final Request entry, final CrawlProfile profile, final RobotsTxt robots) throws IOException, SpaceExceededException {
assert entry != null;
final byte[] hash = entry.url().hash();
if (this.has(hash)) return "double occurrence in urlFileIndex";
synchronized (this) {
// double-check
if (this.has(hash)) return "double occurrence in urlFileIndex";
@ -450,7 +456,7 @@ public class HostQueue implements Balancer {
@Override
public Request pop(boolean delay, CrawlSwitchboard cs, RobotsTxt robots) throws IOException {
public Request pop(final boolean delay, final CrawlSwitchboard cs, final RobotsTxt robots) throws IOException {
// returns a crawl entry from the stack and ensures minimum delta times
long sleeptime = 0;
Request crawlEntry = null;
@ -557,7 +563,7 @@ public class HostQueue implements Balancer {
* @return a map of clear text strings of host names to an integer array: {the size of the domain stack, guessed delta waiting time}
*/
@Override
public Map<String, Integer[]> getDomainStackHosts(RobotsTxt robots) {
public Map<String, Integer[]> getDomainStackHosts(final RobotsTxt robots) {
final Map<String, Integer[]> map = new TreeMap<>();
final int delta = Latency.waitingRemainingGuessed(this.hostName, this.port, this.hostHash, robots, ClientIdentification.yacyInternetCrawlerAgent);
map.put(this.hostName, new Integer[]{this.size(), delta});
@ -572,7 +578,7 @@ public class HostQueue implements Balancer {
* @return a list of crawl loader requests
*/
@Override
public List<Request> getDomainStackReferences(String host, int maxcount, long maxtime) {
public List<Request> getDomainStackReferences(final String host, final int maxcount, final long maxtime) {
if (host == null) return new ArrayList<>(0);
if (!this.hostName.equals(host)) return new ArrayList<>(0);
final ArrayList<Request> cel = new ArrayList<>(maxcount);

Loading…
Cancel
Save