diff --git a/source/de/anomic/crawler/Balancer.java b/source/de/anomic/crawler/Balancer.java index 9c1c0dcd0..18b07d9ab 100644 --- a/source/de/anomic/crawler/Balancer.java +++ b/source/de/anomic/crawler/Balancer.java @@ -29,71 +29,46 @@ import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; import java.util.Map; -import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import de.anomic.kelondro.index.Row; import de.anomic.kelondro.index.ObjectIndex; -import de.anomic.kelondro.order.Base64Order; +import de.anomic.kelondro.order.CloneableIterator; import de.anomic.kelondro.table.EcoTable; -import de.anomic.kelondro.table.Stack; import de.anomic.kelondro.util.Log; -import de.anomic.yacy.yacySeedDB; public class Balancer { - private static final String stackSuffix = "9.stack"; private static final String indexSuffix = "9.db"; private static final int EcoFSBufferSize = 200; - // definition of payload for fileStack - private static final Row stackrow = new Row("byte[] urlhash-" + yacySeedDB.commonHashLength, Base64Order.enhancedCoder); - // class variables private final ConcurrentHashMap> - domainStacks; // a map from domain name part to Lists with url hashs - private final ArrayList urlRAMStack; // a list that is flushed first - private Stack urlFileStack; // a file with url hashes - private ObjectIndex urlFileIndex; - private final File cacheStacksPath; - private final String stackname; - private boolean top; // to alternate between top and bottom of the file stack - private long minimumLocalDelta; - private long minimumGlobalDelta; - private long lastPrepare; + domainStacks; // a map from domain name part to Lists with url hashs + private ConcurrentLinkedQueue + top; + private ObjectIndex urlFileIndex; + private final File cacheStacksPath; + private long minimumLocalDelta; + private long minimumGlobalDelta; + private int profileErrors; public Balancer(final File cachePath, final String stackname, final boolean fullram, final long minimumLocalDelta, final long minimumGlobalDelta) { this.cacheStacksPath = cachePath; - this.stackname = stackname; - final File stackFile = new File(cachePath, stackname + stackSuffix); - this.urlFileStack = Stack.open(stackFile, stackrow); this.domainStacks = new ConcurrentHashMap>(); - this.urlRAMStack = new ArrayList(); - this.top = true; + this.top = new ConcurrentLinkedQueue(); this.minimumLocalDelta = minimumLocalDelta; this.minimumGlobalDelta = minimumGlobalDelta; - this.lastPrepare = System.currentTimeMillis(); // create a stack for newly entered entries if (!(cachePath.exists())) cachePath.mkdir(); // make the path cacheStacksPath.mkdirs(); - urlFileIndex = new EcoTable(new File(cacheStacksPath, stackname + indexSuffix), CrawlEntry.rowdef, (fullram) ? EcoTable.tailCacheUsageAuto : EcoTable.tailCacheDenyUsage, EcoFSBufferSize, 0); - if (urlFileStack.size() != urlFileIndex.size() || (urlFileIndex.size() < 10000 && urlFileIndex.size() > 0)) { - // fix the file stack - Log.logInfo("Balancer", "re-creating the " + stackname + " balancer stack, size = " + urlFileIndex.size() + ((urlFileStack.size() == urlFileIndex.size()) ? "" : " (the old stack size was wrong)" )); - urlFileStack.clear(); - try { - final Iterator i = urlFileIndex.keys(true, null); - byte[] hash; - while (i != null && i.hasNext()) { - hash = i.next(); - pushHashToDomainStacks(new String(hash), true); - } - } catch (final IOException e) { - e.printStackTrace(); - } - } + File f = new File(cacheStacksPath, stackname + indexSuffix); + urlFileIndex = new EcoTable(f, CrawlEntry.rowdef, (fullram) ? EcoTable.tailCacheUsageAuto : EcoTable.tailCacheDenyUsage, EcoFSBufferSize, 0); + profileErrors = 0; + Log.logInfo("Balancer", "opened balancer file with " + urlFileIndex.size() + " entries from " + f.toString()); } public long getMinimumLocalDelta() { @@ -110,35 +85,21 @@ public class Balancer { } public synchronized void close() { - while (domainStacksNotEmpty()) flushOnceDomStacks(true, false, Integer.MAX_VALUE); // flush to ram, because the ram flush is optimized - size(); - try { flushAllRamStack(); } catch (final IOException e) {} if (urlFileIndex != null) { urlFileIndex.close(); urlFileIndex = null; } - if (urlFileStack != null) { - urlFileStack.close(); - urlFileStack = null; - } - } - - protected void finalize() { - if (urlFileStack != null) { - Log.logWarning("Balancer", "crawl stack " + stackname + " closed by finalizer"); - close(); - } } public synchronized void clear() { + Log.logInfo("Balancer", "cleaing balancer with " + urlFileIndex.size() + " entries from " + urlFileIndex.filename()); try { urlFileIndex.clear(); } catch (IOException e) { e.printStackTrace(); } - urlFileStack.clear(); domainStacks.clear(); - urlRAMStack.clear(); + top.clear(); } public synchronized CrawlEntry get(final String urlhash) throws IOException { @@ -189,38 +150,28 @@ public class Balancer { } if (removedCounter == 0) return 0; assert urlFileIndex.size() + removedCounter == s : "urlFileIndex.size() = " + urlFileIndex.size() + ", s = " + s; - - // now delete these hashes also from the queues - // iterate through the RAM stack - Iterator i = urlRAMStack.iterator(); - String h; - while (i.hasNext()) { - h = i.next(); - if (urlHashes.contains(h)) i.remove(); - } - - // iterate through the file stack - // in general this is a bad idea. But this can only be avoided by avoidance of this method - final Iterator j = urlFileStack.stackIterator(true); - while (j.hasNext()) { - h = new String(j.next().getColBytes(0)); - if (urlHashes.contains(h)) j.remove(); - } - - // iterate through the domain stacks - final Iterator>> k = domainStacks.entrySet().iterator(); - Map.Entry> se; - LinkedList stack; - while (k.hasNext()) { - se = k.next(); - stack = se.getValue(); - i = stack.iterator(); - while (i.hasNext()) { - if (urlHashes.contains(i.next())) i.remove(); - } - if (stack.size() == 0) k.remove(); - } + // iterate through the top list + Iterator j = top.iterator(); + String urlhash; + while (j.hasNext()) { + urlhash = j.next(); + if (urlHashes.contains(urlhash)) j.remove(); + } + + // iterate through the domain stacks + final Iterator>> k = domainStacks.entrySet().iterator(); + Map.Entry> se; + LinkedList stack; + while (k.hasNext()) { + se = k.next(); + stack = se.getValue(); + Iterator i = stack.iterator(); + while (i.hasNext()) { + if (urlHashes.contains(i.next())) i.remove(); + } + if (stack.size() == 0) k.remove(); + } return removedCounter; } @@ -232,21 +183,11 @@ public class Balancer { public synchronized boolean notEmpty() { // alternative method to the property size() > 0 // this is better because it may avoid synchronized access to domain stack summarization - return urlRAMStack.size() > 0 || urlFileStack.size() > 0 || domainStacksNotEmpty(); + return domainStacksNotEmpty(); } public synchronized int size() { - final int componentsize = urlFileIndex.size(); - /* - assert componentsize == urlFileStack.size() + urlRAMStack.size() + sizeDomainStacks() : - "size wrong in " + stackname + - " - urlFileIndex = " + urlFileIndex.size() + - ", componentsize = " + urlFileStack.size() + urlRAMStack.size() + sizeDomainStacks() + - " = (urlFileStack = " + urlFileStack.size() + - ", urlRAMStack = " + urlRAMStack.size() + - ", sizeDomainStacks = " + sizeDomainStacks() + ")"; - */ - return componentsize; + return urlFileIndex.size(); } private boolean domainStacksNotEmpty() { @@ -260,102 +201,10 @@ public class Balancer { return false; } - private int sizeDomainStacks() { - if (domainStacks == null) return 0; - int sum = 0; - //synchronized (domainStacks) { - final Iterator> i = domainStacks.values().iterator(); - while (i.hasNext()) sum += i.next().size(); - //} - return sum; - } - - /** - * removes the head element of all domain stacks and moves the element in either the ram stack or the file stack - * @param minimumleft - * @param ram - * @param onlyReadyForAccess - */ - private void flushOnceDomStacks(final boolean ram, final boolean onlyReadyForAccess, int max) { - // takes one entry from every domain stack and puts it on the ram or file stack - // the minimumleft value is a limit for the number of entries that should be left - if (domainStacks.size() == 0) return; - synchronized (domainStacks) { - final Iterator>> i = domainStacks.entrySet().iterator(); - Map.Entry> entry; - LinkedList list; - int c = 0; - while (i.hasNext() && c < max) { - entry = i.next(); - list = entry.getValue(); - if (onlyReadyForAccess && Latency.waitingRemainingGuessed(list.getFirst(), minimumLocalDelta, minimumGlobalDelta) > 0) continue; - if (ram) { - urlRAMStack.add(list.removeFirst()); - } else try { - urlFileStack.push(urlFileStack.row().newEntry(new byte[][] { (list.removeFirst()).getBytes() })); - } catch (final IOException e) { - e.printStackTrace(); - } - if (list.size() == 0) i.remove(); - c++; - } - } - } - - private void flushAllRamStack() throws IOException { - // this flushes only the ramStack to the fileStack, but does not flush the domainStacks - for (int i = 0; i < urlRAMStack.size() / 2; i++) { - urlFileStack.push(urlFileStack.row().newEntry(new byte[][]{(urlRAMStack.get(i)).getBytes()})); - urlFileStack.push(urlFileStack.row().newEntry(new byte[][]{(urlRAMStack.get(urlRAMStack.size() - i - 1)).getBytes()})); - } - if (urlRAMStack.size() % 2 != 0) - urlFileStack.push(urlFileStack.row().newEntry(new byte[][]{(urlRAMStack.get(urlRAMStack.size() / 2)).getBytes()})); - } - - private void shiftFileToDomStacks(final int wantedsize) { - int count = sizeDomainStacks() - wantedsize; - while ((urlFileStack != null) && (count > 0) && (urlFileStack.size() > 0)) { - // flush some entries from disc to ram stack - try { - // one from the top: - Row.Entry t = urlFileStack.pop(); - if (t == null) break; - pushHashToDomainStacks(new String(t.getColBytes(0)), false); - count--; - if (urlFileStack.size() == 0) break; - // one from the bottom: - t = urlFileStack.pot(); - if (t == null) break; - pushHashToDomainStacks(new String(t.getColBytes(0)), false); - count--; - } catch (final IOException e) { - break; - } - } - } - - private void shiftFileToRAM(final int wantedsize) { - while ((urlFileStack != null) && (urlRAMStack.size() <= wantedsize) && (urlFileStack.size() > 0)) { - // flush some entries from disc to ram stack - try { - // one from the top: - Row.Entry t = urlFileStack.pop(); - if (t == null) break; - urlRAMStack.add(new String(t.getColBytes(0))); - if (urlFileStack.size() == 0) break; - // one from the bottom: - t = urlFileStack.pot(); - if (t == null) break; - urlRAMStack.add(new String(t.getColBytes(0))); - } catch (final IOException e) { - break; - } - } - } - public synchronized void push(final CrawlEntry entry) throws IOException { assert entry != null; - if (urlFileIndex.has(entry.url().hash().getBytes())) { + String hash = entry.url().hash(); + if (urlFileIndex.has(hash.getBytes())) { //Log.logWarning("BALANCER", "double-check has failed for urlhash " + entry.url().hash() + " in " + stackname + " - fixed"); return; } @@ -363,13 +212,14 @@ public class Balancer { // add to index int s = urlFileIndex.size(); urlFileIndex.put(entry.toRow()); - assert s < urlFileIndex.size(); + assert s < urlFileIndex.size() : "hash = " + hash; + assert urlFileIndex.has(hash.getBytes()) : "hash = " + hash; // add the hash to a queue - pushHashToDomainStacks(entry.url().hash(), true); + pushHashToDomainStacks(entry.url().hash(), 10); } - private void pushHashToDomainStacks(final String hash, boolean flush) { + private void pushHashToDomainStacks(final String hash, int maxstacksize) { // extend domain stack final String dom = hash.substring(6); LinkedList domainList = domainStacks.get(dom); @@ -382,12 +232,21 @@ public class Balancer { } } else { // extend existent domain list - domainList.addLast(hash); + if (domainList.size() < maxstacksize) domainList.addLast(hash); } - - // check size of domainStacks and flush - if (flush && (domainStacks.size() > 100) || (sizeDomainStacks() > 1000)) { - flushOnceDomStacks(urlRAMStack.size() < 100, true, 100); // when the ram stack is small, flush it there + } + + private void removeHashFromDomainStacks(final String hash) { + // extend domain stack + final String dom = hash.substring(6); + LinkedList domainList = domainStacks.get(dom); + if (domainList == null) return; + Iterator i = domainList.iterator(); + while (i.hasNext()) { + if (i.next().equals(hash)) { + i.remove(); + return; + } } } @@ -404,178 +263,42 @@ public class Balancer { */ public synchronized CrawlEntry pop(boolean delay, CrawlProfile profile) throws IOException { // returns a crawl entry from the stack and ensures minimum delta times - // we have 3 sources to choose from: the ramStack, the domainStacks and the fileStack + filltop(delay, 600000, false); + filltop(delay, 60000, false); + filltop(delay, 10000, false); + filltop(delay, 6000, false); + filltop(delay, 3000, false); + filltop(delay, 1000, false); + filltop(delay, 0, true); + String result = null; // the result - // 1st: check ramStack - if (urlRAMStack.size() > 0) { - //result = urlRAMStack.remove(0); - Iterator i = urlRAMStack.iterator(); - String urlhash; - long waitingtime, min = Long.MAX_VALUE; - String besthash = null; - while (i.hasNext()) { - urlhash = i.next(); - waitingtime = Latency.waitingRemainingGuessed(urlhash, minimumLocalDelta, minimumGlobalDelta); - if (waitingtime == 0) { - // zero waiting is a good one - result = urlhash; - i.remove(); - min = Long.MAX_VALUE; // that causes that the if at the end of this loop is not used - besthash = null; - break; - } - if (waitingtime < min) { - min = waitingtime; - besthash = urlhash; - } - } - if (min <= 500 && besthash != null) { - // find that entry that was best end remove it - i = urlRAMStack.iterator(); - while (i.hasNext()) { - urlhash = i.next(); - if (urlhash.equals(besthash)) { - // zero waiting is a good one - result = urlhash; - i.remove(); - break; - } - } - } - } - - // the next options use the domain stack. If this is not filled enough, they dont work at all - // so just fill them up with some stuff - if (result == null) shiftFileToDomStacks(1000); - - // 2nd-b: check domainStacks for best match between stack size and retrieval time - String maxhash = null; - if ((result == null) && (domainStacks.size() > 0)) synchronized (domainStacks) { - // we order all domains by the number of entries per domain - // then we iterate through these domains in descending entry order - // and take that one, that has a zero waiting time - final Iterator>> i = domainStacks.entrySet().iterator(); - Map.Entry> entry; - String domhash; - LinkedList domlist; - final TreeMap hitlist = new TreeMap(); - int count = 0; - // first collect information about sizes of the domain lists - while (i.hasNext()) { - entry = i.next(); - domhash = entry.getKey(); - domlist = entry.getValue(); - hitlist.put(Integer.valueOf(domlist.size() * 100 + count++), domhash); - } - - // now iterate in descending order and fetch that one, - // that is acceptable by the minimumDelta constraint - long waitingtime; - while (hitlist.size() > 0) { - domhash = hitlist.remove(hitlist.lastKey()); - if (maxhash == null) maxhash = domhash; // remember first entry - waitingtime = Latency.waitingRemainingGuessed(domhash, minimumLocalDelta, minimumGlobalDelta); - if (waitingtime < 100) { - domlist = domainStacks.get(domhash); - result = domlist.removeFirst(); - if (domlist.size() == 0) domainStacks.remove(domhash); - break; - } - } - - } - - // 2nd-a: check domainStacks for latest arrivals - if ((result == null) && (domainStacks.size() > 0)) synchronized (domainStacks) { - // we select specific domains that have not been used for a long time - // Latest arrivals that have not yet been crawled fit also in that scheme - final Iterator>> i = domainStacks.entrySet().iterator(); - Map.Entry> entry; - String domhash; - long waitingtime, min = Long.MAX_VALUE; - String besthash = null; - LinkedList domlist; - while (i.hasNext()) { - entry = i.next(); - domhash = entry.getKey(); - waitingtime = Latency.waitingRemainingGuessed(domhash, minimumLocalDelta, minimumGlobalDelta); - if (waitingtime == 0) { - // zero waiting is a good one - domlist = entry.getValue(); - result = domlist.removeFirst(); - if (domlist.size() == 0) i.remove(); - min = Long.MAX_VALUE; // that causes that the if at the end of this loop is not used - besthash = null; - break; - } - if (waitingtime < min) { - min = waitingtime; - besthash = domhash; - } - } - if (min <= 500 && besthash != null) { - domlist = domainStacks.get(besthash); - result = domlist.removeFirst(); - if (domlist.size() == 0) domainStacks.remove(besthash); - } - } - - // 2nd-c: if we did yet not choose any entry, we simply take that one with the most entries - if ((result == null) && (maxhash != null)) { - LinkedList domlist = domainStacks.get(maxhash); - if (domlist != null) { - result = domlist.removeFirst(); - if (domlist.size() == 0) domainStacks.remove(maxhash); - } - } - - // 3rd: take entry from file - if ((result == null) && (urlFileStack.size() > 0)) { - Row.Entry nextentry = (top) ? urlFileStack.top() : urlFileStack.bot(); - if (nextentry == null) nextentry = (top) ? urlFileStack.bot() : urlFileStack.top(); - if (nextentry == null) { - // emergency case: this means that something with the stack organization is wrong - // the file appears to be broken. We kill the file. - urlFileStack.clear(); - Log.logSevere("BALANCER", "get() failed to fetch entry from file stack. reset stack file."); - } else { - final String nexthash = new String(nextentry.getColBytes(0)); - - // check if the time after retrieval of last hash from same - // domain is not shorter than the minimumDelta - long waitingtime = Latency.waitingRemainingGuessed(nexthash, minimumLocalDelta, minimumGlobalDelta); - if (waitingtime == 0) { - // the entry is fine - result = new String((top) ? urlFileStack.pop().getColBytes(0) : urlFileStack.pot().getColBytes(0)); - } else { - // try other entry - result = new String((top) ? urlFileStack.pot().getColBytes(0) : urlFileStack.pop().getColBytes(0)); - } - } - top = !top; // alternate top/bottom - } - - // check case where we did not found anything - if (result == null) { - Log.logSevere("BALANCER", "get() was not able to find a valid urlhash - total size = " + size() + ", fileStack.size() = " + urlFileStack.size() + ", ramStack.size() = " + urlRAMStack.size() + ", domainStacks.size() = " + domainStacks.size()); - return null; + // first simply take one of the entries in the top list, that should be one without any delay + if (this.top.size() > 0) { + result = top.remove(); } // finally: check minimumDelta and if necessary force a sleep final int s = urlFileIndex.size(); - Row.Entry rowEntry = urlFileIndex.remove(result.getBytes()); + Row.Entry rowEntry = (result == null) ? null : urlFileIndex.remove(result.getBytes()); + if (rowEntry == null) rowEntry = urlFileIndex.removeOne(); if (rowEntry == null) { - String error = "get() found a valid urlhash, but failed to fetch the corresponding url entry - total size = " + size() + ", fileStack.size() = " + urlFileStack.size() + ", ramStack.size() = " + urlRAMStack.size() + ", domainStacks.size() = " + domainStacks.size(); - //this.clear(); - throw new IOException(error + " - cleared the balancer"); + Log.logWarning("Balancer", "removeOne() failed - size = " + this.size()); + return null; } assert urlFileIndex.size() + 1 == s : "urlFileIndex.size() = " + urlFileIndex.size() + ", s = " + s + ", result = " + result; + final CrawlEntry crawlEntry = new CrawlEntry(rowEntry); + //Log.logInfo("Balancer", "fetched next url: " + crawlEntry.url().toNormalform(true, false)); + // at this point we must check if the crawlEntry has relevancy because the crawl profile still exists // if not: return null. A calling method must handle the null value and try again - if (profile != null && !profile.hasEntry(crawlEntry.profileHandle())) return null; + if (profile != null && !profile.hasEntry(crawlEntry.profileHandle())) { + profileErrors++; + if (profileErrors < 20) Log.logInfo("Balancer", "no profile entry for handle " + crawlEntry.profileHandle()); + return null; + } long sleeptime = Latency.waitingRemaining(crawlEntry.url(), minimumLocalDelta, minimumGlobalDelta); // this uses the robots.txt database and may cause a loading of robots.txt from the server if (delay && sleeptime > 0) { @@ -584,59 +307,81 @@ public class Balancer { // this is only to protection against the worst case, where the crawler could // behave in a DoS-manner Log.logInfo("BALANCER", "forcing crawl-delay of " + sleeptime + " milliseconds for " + crawlEntry.url().getHost() + ((sleeptime > Math.max(minimumLocalDelta, minimumGlobalDelta)) ? " (forced latency)" : "")); - if (System.currentTimeMillis() - this.lastPrepare > 10000) { - long t = System.currentTimeMillis(); - prepare(400); - this.lastPrepare = System.currentTimeMillis(); - sleeptime -= this.lastPrepare - t; - } if (sleeptime > 0) try {synchronized(this) { this.wait(sleeptime); }} catch (final InterruptedException e) {} } return crawlEntry; } - - /** - * return top-elements from the crawl stack - * we do not produce here more entries than exist on the stack - * because otherwise the balancing does not work properly - * @param count - * @return - * @throws IOException - */ - public synchronized ArrayList top(int count) throws IOException { - // construct a list using the urlRAMStack which was filled with this procedure - count = prepare(count); - final ArrayList list = new ArrayList(); - for (int i = 0; i < count; i++) { - final String urlhash = urlRAMStack.get(i); - final Row.Entry entry = urlFileIndex.get(urlhash.getBytes()); - if (entry == null) break; - list.add(new CrawlEntry(entry)); - } - return list; + + private void filltop(boolean delay, long maximumwaiting, boolean acceptonebest) { + if (this.top.size() > 0) return; + + // check if we need to get entries from the file index + try { + fillDomainStacks(400); + } catch (IOException e) { + e.printStackTrace(); + } + + // iterate over the domain stacks + Iterator>> i = this.domainStacks.entrySet().iterator(); + Map.Entry> entry; + long smallestWaiting = Long.MAX_VALUE; + String besthash = null; + while (i.hasNext()) { + entry = i.next(); + if (entry.getValue().size() == 0) { + i.remove(); + continue; + } + + String n = entry.getValue().getFirst(); + if (delay) { + long w = Latency.waitingRemainingGuessed(n, minimumLocalDelta, minimumGlobalDelta); + if (w > maximumwaiting) { + if (w < smallestWaiting) { + smallestWaiting = w; + besthash = n; + } + continue; + } + } + n = entry.getValue().removeFirst(); + this.top.add(n); + if (entry.getValue().size() == 0) i.remove(); + } + + // if we could not find any entry, then take the best we have seen so far + if (acceptonebest && this.top.size() > 0 && besthash != null) { + removeHashFromDomainStacks(besthash); + this.top.add(besthash); + } } - private int prepare(int count) throws IOException { - // if we need to flush anything, then flush the domain stack first, - // to avoid that new urls get hidden by old entries from the file stack - if (urlRAMStack == null) return 0; - - // ensure that the domain stacks are filled enough - shiftFileToDomStacks(count); - - // flush from the domain stacks first until they are empty - if ((domainStacksNotEmpty()) && (urlRAMStack.size() <= count)) { - flushOnceDomStacks(true, true, 100); - } - while ((domainStacksNotEmpty()) && (urlRAMStack.size() <= count)) { - // flush only that much as we need to display - flushOnceDomStacks(true, false, 100); - } - - // if the ram is still not full enough, use the file stack - shiftFileToRAM(count); - - return Math.min(count, urlRAMStack.size()); + private void fillDomainStacks(int maxdomstacksize) throws IOException { + if (this.domainStacks.size() > 0) return; + CloneableIterator i = this.urlFileIndex.keys(true, null); + while (i.hasNext()) { + pushHashToDomainStacks(new String(i.next()), 10); + if (this.domainStacks.size() > maxdomstacksize) break; + } + } + + public ArrayList top(int count) { + count = Math.min(count, top.size()); + ArrayList cel = new ArrayList(); + if (count == 0) return cel; + for (String n: top) { + try { + Row.Entry rowEntry = urlFileIndex.get(n.getBytes()); + if (rowEntry == null) continue; + final CrawlEntry crawlEntry = new CrawlEntry(rowEntry); + cel.add(crawlEntry); + count--; + if (count <= 0) break; + } catch (IOException e) { + } + } + return cel; } public synchronized Iterator iterator() throws IOException { diff --git a/source/de/anomic/crawler/CrawlQueues.java b/source/de/anomic/crawler/CrawlQueues.java index 6a3887c9e..412575e45 100644 --- a/source/de/anomic/crawler/CrawlQueues.java +++ b/source/de/anomic/crawler/CrawlQueues.java @@ -153,7 +153,13 @@ public class CrawlQueues { for (final crawlWorker w: workers.values()) { w.interrupt(); } - // TODO: wait some more time until all threads are finished + for (final crawlWorker w: workers.values()) { + try { + w.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } noticeURL.close(); errorURL.close(); delegatedURL.close(); @@ -470,9 +476,9 @@ public class CrawlQueues { // do nothing if either there are private processes to be done // or there is no global crawl on the stack - if(!crawlIsPossible(NoticedURL.STACK_TYPE_REMOTE, "Global")) return false; + if (!crawlIsPossible(NoticedURL.STACK_TYPE_REMOTE, "Global")) return false; - if(isPaused(plasmaSwitchboardConstants.CRAWLJOB_REMOTE_TRIGGERED_CRAWL)) return false; + if (isPaused(plasmaSwitchboardConstants.CRAWLJOB_REMOTE_TRIGGERED_CRAWL)) return false; // we don't want to crawl a global URL globally, since WE are the global part. (from this point of view) final String stats = "REMOTETRIGGEREDCRAWL[" + noticeURL.stackSize(NoticedURL.STACK_TYPE_CORE) + ", " + noticeURL.stackSize(NoticedURL.STACK_TYPE_LIMIT) + ", " + noticeURL.stackSize(NoticedURL.STACK_TYPE_OVERHANG) + ", " diff --git a/source/de/anomic/crawler/CrawlSwitchboard.java b/source/de/anomic/crawler/CrawlSwitchboard.java index 65e45cc64..dfd938059 100644 --- a/source/de/anomic/crawler/CrawlSwitchboard.java +++ b/source/de/anomic/crawler/CrawlSwitchboard.java @@ -30,6 +30,7 @@ import java.io.File; import java.io.IOException; import java.util.Iterator; +import de.anomic.crawler.CrawlProfile.entry; import de.anomic.kelondro.util.FileUtils; import de.anomic.kelondro.util.kelondroException; import de.anomic.kelondro.util.Log; @@ -93,6 +94,7 @@ public final class CrawlSwitchboard { try { this.profilesActiveCrawls = new CrawlProfile(profilesActiveFile); } catch (IOException e) { + e.printStackTrace(); FileUtils.deletedelete(profilesActiveFile); try { this.profilesActiveCrawls = new CrawlProfile(profilesActiveFile); @@ -103,8 +105,13 @@ public final class CrawlSwitchboard { } initActiveCrawlProfiles(); log.logInfo("Loaded active crawl profiles from file " + profilesActiveFile.getName() + - ", " + this.profilesActiveCrawls.size() + " entries" + - ", " + profilesActiveFile.length()/1024); + ", " + this.profilesActiveCrawls.size() + " entries"); + Iterator i = this.profilesActiveCrawls.profiles(true); + entry c; + while (i.hasNext()) { + c = i.next(); + log.logInfo("active crawl: " + c.handle() + " - " + c.name()); + } final File profilesPassiveFile = new File(queuesRoot, DBFILE_PASSIVE_CRAWL_PROFILES); if (!profilesPassiveFile.exists()) { // migrate old file @@ -261,8 +268,9 @@ public final class CrawlSwitchboard { public void close() { - profilesActiveCrawls.close(); - queuePreStack.close(); + this.profilesActiveCrawls.close(); + this.profilesPassiveCrawls.close(); + this.queuePreStack.close(); } } diff --git a/source/de/anomic/crawler/IndexingStack.java b/source/de/anomic/crawler/IndexingStack.java index 6a491e8c9..1a5b68990 100644 --- a/source/de/anomic/crawler/IndexingStack.java +++ b/source/de/anomic/crawler/IndexingStack.java @@ -82,6 +82,10 @@ public class IndexingStack { return (sbQueueStack == null) ? 0 : sbQueueStack.size(); } + public String file() { + return this.sbQueueStack.filename; + } + public synchronized void push(final QueueEntry entry) throws IOException { if (entry == null) return; if (sbQueueStack == null) return; // may occur during shutdown @@ -104,21 +108,21 @@ public class IndexingStack { while ((sizeBefore = sbQueueStack.size()) > 0) { Row.Entry b = sbQueueStack.pot(); if (b == null) { - Log.logInfo("IndexingStack", "sbQueueStack.pot() == null"); + Log.logInfo("IndexingStack", "sbQueueStack.pot() == null, file = " + sbQueueStack.filename); if (sbQueueStack.size() < sizeBefore) continue; - Log.logSevere("IndexingStack", "sbQueueStack does not shrink after pot() == null; trying pop()"); + Log.logSevere("IndexingStack", "sbQueueStack " + sbQueueStack.filename + " does not shrink after pot() == null; trying pop()"); } if (sbQueueStack.size() < sizeBefore) { return new QueueEntry(b); } else { - Log.logSevere("IndexingStack", "sbQueueStack does not shrink after pot() != null; trying pop()"); + Log.logSevere("IndexingStack", "sbQueueStack " + sbQueueStack.filename + " does not shrink after pot() != null; trying pop()"); } sizeBefore = sbQueueStack.size(); b = sbQueueStack.pop(); if (b == null) { - Log.logInfo("IndexingStack", "sbQueueStack.pop() == null"); + Log.logInfo("IndexingStack", "sbQueueStack.pop() == null, file = " + sbQueueStack.filename); if (sbQueueStack.size() < sizeBefore) continue; - Log.logSevere("IndexingStack", "sbQueueStack does not shrink after pop() == null; failed"); + Log.logSevere("IndexingStack", "sbQueueStack does not shrink after pop() == null; failed; file = " + sbQueueStack.filename); return null; } return new QueueEntry(b); diff --git a/source/de/anomic/crawler/Latency.java b/source/de/anomic/crawler/Latency.java index b24314cf0..3984df4cd 100644 --- a/source/de/anomic/crawler/Latency.java +++ b/source/de/anomic/crawler/Latency.java @@ -96,24 +96,34 @@ public class Latency { * @param minimumGlobalDelta * @return the remaining waiting time in milliseconds */ - public static long waitingRemainingGuessed(String urlhash, final long minimumLocalDelta, final long minimumGlobalDelta) { - assert urlhash.length() == 12 || urlhash.length() == 6; - Latency.Host latency = Latency.host((urlhash.length() == 6) ? urlhash : urlhash.substring(6)); - if (latency == null) return 0; + public static long waitingRemainingGuessed(String hosthash, final long minimumLocalDelta, final long minimumGlobalDelta) { + assert hosthash.length() == 12 || hosthash.length() == 6; + Host host = Latency.host((hosthash.length() == 6) ? hosthash : hosthash.substring(6)); + if (host == null) return 0; + + // the time since last access to the domain is the basis of the remaining calculation + final long timeSinceLastAccess = System.currentTimeMillis() - host.lastacc(); + + // find the minimum waiting time based on the network domain (local or global) + final boolean local = yacyURL.isLocal(hosthash); + long waiting = (local) ? minimumLocalDelta : minimumGlobalDelta; + + // if we have accessed the domain many times, get slower (the flux factor) + if (!local) waiting += host.flux(waiting); + + // use the access latency as rule how fast we can access the server + // this applies also to localhost, but differently, because it is not necessary to + // consider so many external accesses + waiting = Math.max(waiting, (local) ? host.average() / 2 : host.average() * 2); - final long delta = System.currentTimeMillis() - latency.lastacc(); - final boolean local = yacyURL.isLocal(urlhash); - long deltaBase = (local) ? minimumLocalDelta : minimumGlobalDelta; - final long genericDelta = Math.min( - 60000, - Math.max( - deltaBase + ((latency == null || local) ? 0 : latency.flux(deltaBase)), - (local || latency == null) ? 0 : latency.robotsDelay()) - ); // prevent that that robots file can stop our indexer completely - return (delta < genericDelta) ? genericDelta - delta : 0; + // prevent that that a robots file can stop our indexer completely + waiting = Math.min(60000, waiting); + + // return time that is remaining + //System.out.println("Latency: " + (waiting - timeSinceLastAccess)); + return Math.max(0, waiting - timeSinceLastAccess); } - /** * calculates how long should be waited until the domain can be accessed again * this follows from: diff --git a/source/de/anomic/crawler/NoticedURL.java b/source/de/anomic/crawler/NoticedURL.java index 70773a638..52a18d7b4 100755 --- a/source/de/anomic/crawler/NoticedURL.java +++ b/source/de/anomic/crawler/NoticedURL.java @@ -72,7 +72,7 @@ public class NoticedURL { } public void clear() { - Log.logInfo("NoticedURL", "clearing all stacks"); + Log.logInfo("NoticedURL", "CLEARING ALL STACKS!"); coreStack.clear(); limitStack.clear(); remoteStack.clear(); @@ -208,7 +208,7 @@ public class NoticedURL { } public void clear(final int stackType) { - Log.logInfo("NoticedURL", "clearing stack " + stackType); + Log.logInfo("NoticedURL", "CLEARING STACK " + stackType); switch (stackType) { case STACK_TYPE_CORE: coreStack.clear(); break; case STACK_TYPE_LIMIT: limitStack.clear(); break; @@ -221,11 +221,14 @@ public class NoticedURL { // this is a filo - pop int s; CrawlEntry entry; + int errors = 0; synchronized (balancer) { while ((s = balancer.size()) > 0) { entry = balancer.pop(delay, profile); if (entry == null) { if (s > balancer.size()) continue; + errors++; + if (errors < 100) continue; final int aftersize = balancer.size(); balancer.clear(); // the balancer is broken and cannot shrink Log.logWarning("BALANCER", "entry is null, balancer cannot shrink (bevore pop = " + s + ", after pop = " + aftersize + "); reset of balancer"); @@ -240,11 +243,7 @@ public class NoticedURL { // this is a filo - top if (count > balancer.size()) count = balancer.size(); ArrayList list; - try { - list = balancer.top(count); - } catch (final IOException e) { - list = new ArrayList(0); - } + list = balancer.top(count); return list; } diff --git a/source/de/anomic/kelondro/table/EcoTable.java b/source/de/anomic/kelondro/table/EcoTable.java index c10f07f43..4b54ffcf6 100644 --- a/source/de/anomic/kelondro/table/EcoTable.java +++ b/source/de/anomic/kelondro/table/EcoTable.java @@ -548,7 +548,7 @@ public class EcoTable implements ObjectIndex { final Row.Entry lr = rowdef.newEntry(le); final int i = index.remove(lr.getPrimaryKeyBytes()); assert i >= 0; - if (table != null) table.removeOne(); + if (table != null) table.remove(lr.getPrimaryKeyBytes()); assert file.size() == index.size() + fail : "file.size() = " + file.size() + ", index.size() = " + index.size(); return lr; } diff --git a/source/de/anomic/kelondro/table/Stack.java b/source/de/anomic/kelondro/table/Stack.java index 02542f7d9..aa0b4a112 100644 --- a/source/de/anomic/kelondro/table/Stack.java +++ b/source/de/anomic/kelondro/table/Stack.java @@ -188,7 +188,7 @@ public final class Stack extends FullRecords { // return row on the bottom of the stack and remove record final Node n = botNode(); if (n == null) { - Log.logInfo("Stack", "botNode() == null"); + Log.logInfo("Stack", "botNode() == null in " + this.filename); return null; } final Row.Entry ret = row().newEntry(n.getValueRow()); @@ -251,7 +251,7 @@ public final class Stack extends FullRecords { } final RecordHandle h = getHandle(root); if (h == null) { - Log.logInfo("Stack", "getHandle(root) == null"); + Log.logInfo("Stack", "getHandle(root) == null in " + this.filename); return null; } return new EcoNode(h); diff --git a/source/de/anomic/net/ftpc.java b/source/de/anomic/net/ftpc.java index bd14c5b33..dbc56c6ec 100644 --- a/source/de/anomic/net/ftpc.java +++ b/source/de/anomic/net/ftpc.java @@ -90,7 +90,7 @@ public class ftpc { private Socket ControlSocket = null; // socket timeout - private static final int ControlSocketTimeout = 300000; + private static final int ControlSocketTimeout = 10000; // data socket timeout private int DataSocketTimeout = 0; // in seconds (default infinite) @@ -2235,7 +2235,7 @@ public class ftpc { } } else { if (DataSocketActive != null) { - DataSocketActive.setSoTimeout(DataSocketTimeout); + DataSocketActive.setSoTimeout(DataSocketTimeout * 1000); } } } @@ -2530,7 +2530,7 @@ public class ftpc { * @return timeout in seconds */ public void setDataTimeoutByMaxFilesize(final int maxFilesize) { - int timeout = 0; + int timeout = 1; if (DataSocketRate > 0) { // calculate by minDataRate and MaxFTPFileSize timeout = maxFilesize / DataSocketRate; diff --git a/source/de/anomic/plasma/plasmaSwitchboard.java b/source/de/anomic/plasma/plasmaSwitchboard.java index e8cd367a2..ba945fa5c 100644 --- a/source/de/anomic/plasma/plasmaSwitchboard.java +++ b/source/de/anomic/plasma/plasmaSwitchboard.java @@ -1193,6 +1193,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch