added missing synchronization in crawl balancer

to avoid that the synchronization is triggered during many-time-used size() operation
a notEmpty method was added that can avoid the synchronization many times

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@4025 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
orbiter 18 years ago
parent 9628db6cdc
commit 69d640b041

@ -96,7 +96,7 @@ public class plasmaCrawlBalancer {
} }
public synchronized void close() { public synchronized void close() {
while (sizeDomainStacks() > 0) flushOnceDomStacks(0, true); // flush to ram, because the ram flush is optimized while (domainStacksNotEmpty()) flushOnceDomStacks(0, true); // flush to ram, because the ram flush is optimized
try { flushAllRamStack(); } catch (IOException e) {} try { flushAllRamStack(); } catch (IOException e) {}
if (urlFileIndex != null) { if (urlFileIndex != null) {
urlFileIndex.close(); urlFileIndex.close();
@ -190,6 +190,12 @@ public class plasmaCrawlBalancer {
} }
} }
public boolean notEmpty() {
// alternative method to the property size() > 0
// this is better because it may avoid synchronized access to domain stack summarization
return urlRAMStack.size() > 0 || urlFileStack.size() > 0 || domainStacksNotEmpty();
}
public int size() { public int size() {
int componentsize = urlFileStack.size() + urlRAMStack.size() + sizeDomainStacks(); int componentsize = urlFileStack.size() + urlRAMStack.size() + sizeDomainStacks();
if (componentsize != urlFileIndex.size()) { if (componentsize != urlFileIndex.size()) {
@ -204,6 +210,17 @@ public class plasmaCrawlBalancer {
return componentsize; return componentsize;
} }
private boolean domainStacksNotEmpty() {
if (domainStacks == null) return false;
synchronized (domainStacks) {
Iterator i = domainStacks.values().iterator();
while (i.hasNext()) {
if (((LinkedList) i.next()).size() > 0) return true;
}
}
return false;
}
private int sizeDomainStacks() { private int sizeDomainStacks() {
if (domainStacks == null) return 0; if (domainStacks == null) return 0;
int sum = 0; int sum = 0;
@ -218,22 +235,24 @@ public class plasmaCrawlBalancer {
// takes one entry from every domain stack and puts it on the ram or file stack // takes one entry from every domain stack and puts it on the ram or file stack
// the minimumleft value is a limit for the number of entries that should be left // the minimumleft value is a limit for the number of entries that should be left
if (domainStacks.size() == 0) return; if (domainStacks.size() == 0) return;
Iterator i = domainStacks.entrySet().iterator(); synchronized (domainStacks) {
Map.Entry entry; Iterator i = domainStacks.entrySet().iterator();
LinkedList list; Map.Entry entry;
while (i.hasNext()) { LinkedList list;
entry = (Map.Entry) i.next(); while (i.hasNext()) {
list = (LinkedList) entry.getValue(); entry = (Map.Entry) i.next();
if (list.size() > minimumleft) { list = (LinkedList) entry.getValue();
if (ram) { if (list.size() > minimumleft) {
urlRAMStack.add(list.removeFirst()); if (ram) {
} else try { urlRAMStack.add(list.removeFirst());
urlFileStack.push(urlFileStack.row().newEntry(new byte[][]{((String) list.removeFirst()).getBytes()})); } else try {
} catch (IOException e) { urlFileStack.push(urlFileStack.row().newEntry(new byte[][] { ((String) list.removeFirst()).getBytes() }));
e.printStackTrace(); } catch (IOException e) {
e.printStackTrace();
}
} }
if (list.size() == 0) i.remove();
} }
if (list.size() == 0) i.remove();
} }
} }
@ -260,8 +279,10 @@ public class plasmaCrawlBalancer {
if (domainList == null) { if (domainList == null) {
// create new list // create new list
domainList = new LinkedList(); domainList = new LinkedList();
domainList.add(entry.urlhash()); synchronized (domainStacks) {
domainStacks.put(dom, domainList); domainList.add(entry.urlhash());
domainStacks.put(dom, domainList);
}
} else { } else {
// extend existent domain list // extend existent domain list
domainList.addLast(entry.urlhash()); domainList.addLast(entry.urlhash());
@ -288,7 +309,7 @@ public class plasmaCrawlBalancer {
} }
// 2nd-a: check domainStacks for latest arrivals // 2nd-a: check domainStacks for latest arrivals
if ((result == null) && (domainStacks.size() > 0)) { if ((result == null) && (domainStacks.size() > 0)) synchronized (domainStacks) {
// we select specific domains that have not been used for a long time // we select specific domains that have not been used for a long time
// i.e. 60 seconds. Latest arrivals that have not yet been crawled // i.e. 60 seconds. Latest arrivals that have not yet been crawled
// fit also in that scheme // fit also in that scheme
@ -323,7 +344,7 @@ public class plasmaCrawlBalancer {
} }
// 2nd-b: check domainStacks for best match between stack size and retrieval time // 2nd-b: check domainStacks for best match between stack size and retrieval time
if ((result == null) && (domainStacks.size() > 0)) { if ((result == null) && (domainStacks.size() > 0)) synchronized (domainStacks) {
// we order all domains by the number of entries per domain // we order all domains by the number of entries per domain
// then we iterate through these domains in descending entry order // then we iterate through these domains in descending entry order
// and that that one, that has a delta > minimumDelta // and that that one, that has a delta > minimumDelta
@ -437,7 +458,7 @@ public class plasmaCrawlBalancer {
public synchronized plasmaCrawlEntry top(int dist) throws IOException { public synchronized plasmaCrawlEntry top(int dist) throws IOException {
// if we need to flush anything, then flush the domain stack first, // if we need to flush anything, then flush the domain stack first,
// to avoid that new urls get hidden by old entries from the file stack // to avoid that new urls get hidden by old entries from the file stack
while ((sizeDomainStacks() > 0) && (urlRAMStack.size() <= dist)) { while ((domainStacksNotEmpty()) && (urlRAMStack.size() <= dist)) {
// flush only that much as we need to display // flush only that much as we need to display
flushOnceDomStacks(0, true); flushOnceDomStacks(0, true);
} }

@ -86,6 +86,10 @@ public class plasmaCrawlNURL {
remoteStack.close(); remoteStack.close();
} }
public boolean notEmpty() {
return coreStack.notEmpty() || limitStack.notEmpty() || remoteStack.notEmpty();
}
public int size() { public int size() {
// this does not count the overhang stack size // this does not count the overhang stack size
return coreStack.size() + limitStack.size() + remoteStack.size(); return coreStack.size() + limitStack.size() + remoteStack.size();

@ -1620,7 +1620,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
* shutdown procedure * shutdown procedure
*/ */
public boolean cleanProfiles() throws InterruptedException { public boolean cleanProfiles() throws InterruptedException {
if ((sbQueue.size() > 0) || (cacheLoader.size() > 0) || (noticeURL.size() > 0)) return false; if ((sbQueue.size() > 0) || (cacheLoader.size() > 0) || (noticeURL.notEmpty())) return false;
final Iterator iter = profiles.profiles(true); final Iterator iter = profiles.profiles(true);
plasmaCrawlProfile.entry entry; plasmaCrawlProfile.entry entry;
boolean hasDoneSomething = false; boolean hasDoneSomething = false;
@ -3260,7 +3260,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
if (wordIndex.size() < 100) { if (wordIndex.size() < 100) {
return "no DHT distribution: not enough words - wordIndex.size() = " + wordIndex.size(); return "no DHT distribution: not enough words - wordIndex.size() = " + wordIndex.size();
} }
if ((getConfig(INDEX_DIST_ALLOW_WHILE_CRAWLING, "false").equalsIgnoreCase("false")) && (noticeURL.size() > 0)) { if ((getConfig(INDEX_DIST_ALLOW_WHILE_CRAWLING, "false").equalsIgnoreCase("false")) && (noticeURL.notEmpty())) {
return "no DHT distribution: crawl in progress: noticeURL.stackSize() = " + noticeURL.size() + ", sbQueue.size() = " + sbQueue.size(); return "no DHT distribution: crawl in progress: noticeURL.stackSize() = " + noticeURL.size() + ", sbQueue.size() = " + sbQueue.size();
} }
if ((getConfig(INDEX_DIST_ALLOW_WHILE_INDEXING, "false").equalsIgnoreCase("false")) && (sbQueue.size() > 1)) { if ((getConfig(INDEX_DIST_ALLOW_WHILE_INDEXING, "false").equalsIgnoreCase("false")) && (sbQueue.size() > 1)) {

Loading…
Cancel
Save