From b6e274f2112c027ae78af42d629c8f4d44c377b7 Mon Sep 17 00:00:00 2001 From: orbiter Date: Sat, 6 Jun 2009 16:20:27 +0000 Subject: [PATCH] omit most of forced crawl delays by using a separat delay table which flushes delayed URLs at the correct time git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@6029 6c8d7289-2bf4-0310-a012-ef5d649a1542 --- source/de/anomic/crawler/Balancer.java | 160 ++++++++++++++++--------- 1 file changed, 104 insertions(+), 56 deletions(-) diff --git a/source/de/anomic/crawler/Balancer.java b/source/de/anomic/crawler/Balancer.java index 57bade44b..54fd8f5ca 100644 --- a/source/de/anomic/crawler/Balancer.java +++ b/source/de/anomic/crawler/Balancer.java @@ -29,6 +29,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; import java.util.Map; +import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; @@ -48,6 +49,8 @@ public class Balancer { domainStacks; // a map from domain name part to Lists with url hashs private ConcurrentLinkedQueue top; + private TreeMap + delayed; private ObjectIndex urlFileIndex; private final File cacheStacksPath; private long minimumLocalDelta; @@ -60,6 +63,7 @@ public class Balancer { this.cacheStacksPath = cachePath; this.domainStacks = new ConcurrentHashMap>(); this.top = new ConcurrentLinkedQueue(); + this.delayed = new TreeMap(); this.minimumLocalDelta = minimumLocalDelta; this.minimumGlobalDelta = minimumGlobalDelta; @@ -93,7 +97,7 @@ public class Balancer { } } - public synchronized void clear() { + public void clear() { Log.logInfo("Balancer", "cleaing balancer with " + urlFileIndex.size() + " entries from " + urlFileIndex.filename()); try { urlFileIndex.clear(); @@ -102,9 +106,12 @@ public class Balancer { } domainStacks.clear(); top.clear(); + synchronized (this.delayed) { + delayed.clear(); + } } - public synchronized CrawlEntry get(final String urlhash) throws IOException { + public CrawlEntry get(final String urlhash) throws IOException { assert urlhash != null; if (urlFileIndex == null) return null; // case occurs during shutdown final Row.Entry entry = urlFileIndex.get(urlhash.getBytes()); @@ -112,7 +119,7 @@ public class Balancer { return new CrawlEntry(entry); } - public synchronized int removeAllByProfileHandle(final String profileHandle, final long timeout) throws IOException { + public int removeAllByProfileHandle(final String profileHandle, final long timeout) throws IOException { // removes all entries with a specific profile hash. // this may last some time // returns number of deletions @@ -143,7 +150,7 @@ public class Balancer { * @return number of entries that had been removed * @throws IOException */ - public synchronized int remove(final HashSet urlHashes) throws IOException { + public int remove(final HashSet urlHashes) throws IOException { final int s = urlFileIndex.size(); int removedCounter = 0; for (final String urlhash: urlHashes) { @@ -161,34 +168,42 @@ public class Balancer { if (urlHashes.contains(urlhash)) j.remove(); } + // remove from delayed + synchronized (this.delayed) { + Iterator> k = this.delayed.entrySet().iterator(); + while (k.hasNext()) { + if (urlHashes.contains(k.next().getValue())) k.remove(); + } + } + // iterate through the domain stacks - final Iterator>> k = domainStacks.entrySet().iterator(); + final Iterator>> q = domainStacks.entrySet().iterator(); Map.Entry> se; LinkedList stack; - while (k.hasNext()) { - se = k.next(); + while (q.hasNext()) { + se = q.next(); stack = se.getValue(); Iterator i = stack.iterator(); while (i.hasNext()) { if (urlHashes.contains(i.next())) i.remove(); } - if (stack.size() == 0) k.remove(); + if (stack.size() == 0) q.remove(); } return removedCounter; } - public synchronized boolean has(final String urlhash) { + public boolean has(final String urlhash) { return urlFileIndex.has(urlhash.getBytes()); } - public synchronized boolean notEmpty() { + public boolean notEmpty() { // alternative method to the property size() > 0 // this is better because it may avoid synchronized access to domain stack summarization return domainStacksNotEmpty(); } - public synchronized int size() { + public int size() { return urlFileIndex.size(); } @@ -203,22 +218,24 @@ public class Balancer { return false; } - public synchronized void push(final CrawlEntry entry) throws IOException { + public void push(final CrawlEntry entry) throws IOException { assert entry != null; String hash = entry.url().hash(); - if (urlFileIndex.has(hash.getBytes())) { - //Log.logWarning("BALANCER", "double-check has failed for urlhash " + entry.url().hash() + " in " + stackname + " - fixed"); - return; - } - - // add to index - int s = urlFileIndex.size(); - urlFileIndex.put(entry.toRow()); - assert s < urlFileIndex.size() : "hash = " + hash; - assert urlFileIndex.has(hash.getBytes()) : "hash = " + hash; + synchronized (this) { + if (urlFileIndex.has(hash.getBytes())) { + //Log.logWarning("BALANCER", "double-check has failed for urlhash " + entry.url().hash() + " in " + stackname + " - fixed"); + return; + } - // add the hash to a queue - pushHashToDomainStacks(entry.url().hash(), 50); + // add to index + int s = urlFileIndex.size(); + urlFileIndex.put(entry.toRow()); + assert s < urlFileIndex.size() : "hash = " + hash; + assert urlFileIndex.has(hash.getBytes()) : "hash = " + hash; + + // add the hash to a queue + pushHashToDomainStacks(entry.url().hash(), 50); + } } private void pushHashToDomainStacks(final String hash, int maxstacksize) { @@ -252,6 +269,18 @@ public class Balancer { } } + private String nextFromDelayed() { + if (this.delayed.size() == 0) return null; + synchronized (this.delayed) { + if (this.delayed.size() == 0) return null; + Long first = this.delayed.firstKey(); + if (first.longValue() < System.currentTimeMillis()) { + return this.delayed.remove(first); + } + } + return null; + } + /** * get the next entry in this crawl queue in such a way that the domain access time delta is maximized * and always above the given minimum delay time. An additional delay time is computed using the robots.txt @@ -263,7 +292,7 @@ public class Balancer { * @return a url in a CrawlEntry object * @throws IOException */ - public synchronized CrawlEntry pop(boolean delay, CrawlProfile profile) throws IOException { + public CrawlEntry pop(boolean delay, CrawlProfile profile) throws IOException { // returns a crawl entry from the stack and ensures minimum delta times filltop(delay, -600000, false); @@ -277,35 +306,53 @@ public class Balancer { filltop(delay, -500, false); filltop(delay, 0, true); - String result = null; // the result - - // first simply take one of the entries in the top list, that should be one without any delay - if (this.top.size() > 0) { - result = top.remove(); - } - - // finally: check minimumDelta and if necessary force a sleep - final int s = urlFileIndex.size(); - Row.Entry rowEntry = (result == null) ? null : urlFileIndex.remove(result.getBytes()); - if (rowEntry == null) rowEntry = urlFileIndex.removeOne(); - if (rowEntry == null) { - Log.logWarning("Balancer", "removeOne() failed - size = " + this.size()); - return null; - } - assert urlFileIndex.size() + 1 == s : "urlFileIndex.size() = " + urlFileIndex.size() + ", s = " + s + ", result = " + result; - - final CrawlEntry crawlEntry = new CrawlEntry(rowEntry); - //Log.logInfo("Balancer", "fetched next url: " + crawlEntry.url().toNormalform(true, false)); - - // at this point we must check if the crawlEntry has relevancy because the crawl profile still exists - // if not: return null. A calling method must handle the null value and try again - if (profile != null && !profile.hasEntry(crawlEntry.profileHandle())) { - profileErrors++; - if (profileErrors < 20) Log.logInfo("Balancer", "no profile entry for handle " + crawlEntry.profileHandle()); - return null; - } - long sleeptime = Latency.waitingRemaining(crawlEntry.url(), minimumLocalDelta, minimumGlobalDelta); // this uses the robots.txt database and may cause a loading of robots.txt from the server - + long sleeptime = 0; + CrawlEntry crawlEntry = null; + while (this.urlFileIndex.size() > 0) { + // first simply take one of the entries in the top list, that should be one without any delay + String result = nextFromDelayed(); + if (result == null && this.top.size() > 0) result = top.remove(); + + // check minimumDelta and if necessary force a sleep + //final int s = urlFileIndex.size(); + Row.Entry rowEntry = (result == null) ? null : urlFileIndex.remove(result.getBytes()); + if (rowEntry == null) { + rowEntry = urlFileIndex.removeOne(); + result = (rowEntry == null) ? null : new String(rowEntry.getPrimaryKeyBytes()); + } + if (rowEntry == null) { + Log.logWarning("Balancer", "removeOne() failed - size = " + this.size()); + return null; + } + //assert urlFileIndex.size() + 1 == s : "urlFileIndex.size() = " + urlFileIndex.size() + ", s = " + s + ", result = " + result; + + crawlEntry = new CrawlEntry(rowEntry); + //Log.logInfo("Balancer", "fetched next url: " + crawlEntry.url().toNormalform(true, false)); + + // at this point we must check if the crawlEntry has relevancy because the crawl profile still exists + // if not: return null. A calling method must handle the null value and try again + if (profile != null && !profile.hasEntry(crawlEntry.profileHandle())) { + profileErrors++; + if (profileErrors < 20) Log.logInfo("Balancer", "no profile entry for handle " + crawlEntry.profileHandle()); + return null; + } + sleeptime = Latency.waitingRemaining(crawlEntry.url(), minimumLocalDelta, minimumGlobalDelta); // this uses the robots.txt database and may cause a loading of robots.txt from the server + + assert result.equals(new String(rowEntry.getPrimaryKeyBytes())) : "result = " + result + ", rowEntry.getPrimaryKeyBytes() = " + new String(rowEntry.getPrimaryKeyBytes()); + assert result.equals(crawlEntry.url().hash()) : "result = " + result + ", crawlEntry.url().hash() = " + crawlEntry.url().hash(); + if (this.domainStacks.size() <= 1) break; + + if (delay && sleeptime > 0) { + // put that thing back to omit a delay here + this.delayed.put(new Long(System.currentTimeMillis() + sleeptime + 1), result); + this.urlFileIndex.put(rowEntry); + this.domainStacks.remove(result.substring(6)); + continue; + } + break; + } + if (crawlEntry == null) return null; + if (delay && sleeptime > 0) { // force a busy waiting here // in best case, this should never happen if the balancer works propertly @@ -378,8 +425,9 @@ public class Balancer { } private void fillDomainStacks(int maxdomstacksize) throws IOException { - if (this.domainStacks.size() > 0 && System.currentTimeMillis() - lastDomainStackFill < 600000L) return; + if (this.domainStacks.size() > 0 && System.currentTimeMillis() - lastDomainStackFill < 200000L) return; this.domainStacks.clear(); + //synchronized (this.delayed) { delayed.clear(); } this.lastDomainStackFill = System.currentTimeMillis(); CloneableIterator i = this.urlFileIndex.keys(true, null); while (i.hasNext()) { @@ -406,7 +454,7 @@ public class Balancer { return cel; } - public synchronized Iterator iterator() throws IOException { + public Iterator iterator() throws IOException { return new EntryIterator(); }