From d2f4926951a9c5392de854b4b4f772790003a80d Mon Sep 17 00:00:00 2001 From: orbiter Date: Mon, 31 Mar 2008 18:45:27 +0000 Subject: [PATCH] - more logging for balancer to get a hint where the problem is - fix for new concurrency method in kelondroSplitTable git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@4631 6c8d7289-2bf4-0310-a012-ef5d649a1542 --- .../anomic/kelondro/kelondroSplitTable.java | 50 +++++++++---------- .../de/anomic/plasma/plasmaCrawlBalancer.java | 9 +++- 2 files changed, 30 insertions(+), 29 deletions(-) diff --git a/source/de/anomic/kelondro/kelondroSplitTable.java b/source/de/anomic/kelondro/kelondroSplitTable.java index 3aa5e4576..d6d9966f6 100644 --- a/source/de/anomic/kelondro/kelondroSplitTable.java +++ b/source/de/anomic/kelondro/kelondroSplitTable.java @@ -36,13 +36,14 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -79,7 +80,7 @@ public class kelondroSplitTable implements kelondroIndex { public void init(boolean resetOnFail) { // init the thread pool for the keeperOf executor service - this.executor = new ThreadPoolExecutor(serverProcessor.useCPU + 1, serverProcessor.useCPU + 1, 10, TimeUnit.SECONDS, new ArrayBlockingQueue(serverProcessor.useCPU + 1)); + this.executor = new ThreadPoolExecutor(serverProcessor.useCPU + 1, serverProcessor.useCPU + 1, 10, TimeUnit.SECONDS, new LinkedBlockingQueue()); // initialized tables map this.tables = new HashMap(); @@ -264,21 +265,31 @@ public class kelondroSplitTable implements kelondroIndex { // start a concurrent query to database tables CompletionService cs = new ExecutorCompletionService(executor); int s = tables.size(); + int rejected = 0; for (final kelondroIndex table : tables.values()) { - cs.submit(new Callable() { - public kelondroIndex call() { - try { - if (table.has(key)) return table; else return dummyIndex; - } catch (IOException e) { - return dummyIndex; - } - } - }); + try { + cs.submit(new Callable() { + public kelondroIndex call() { + try { + if (table.has(key)) return table; else return dummyIndex; + } catch (IOException e) { + return dummyIndex; + } + } + }); + } catch (RejectedExecutionException e) { + // the executor is either shutting down or the blocking queue is full + // execute the search direct here without concurrency + try { + if (table.has(key)) return table; + } catch (IOException ee) {} + rejected++; + } } // read the result try { - for (int i = 0; i < s; i++) { + for (int i = 0, n = s - rejected; i < n; i++) { Future f = cs.take(); kelondroIndex index = f.get(); if (index != dummyIndex) { @@ -296,21 +307,6 @@ public class kelondroSplitTable implements kelondroIndex { //System.out.println("*DEBUG SplitTable fail.time = " + (System.currentTimeMillis() - start) + " ms"); return null; } - /* - public synchronized kelondroIndex keeperOf(byte[] key) throws IOException { - // TODO: apply concurrency here! - // because the index is stored only in one table, - // and the index is completely in RAM, a concurrency would create not concurrent File accesses - long start = System.currentTimeMillis(); - for (final kelondroIndex table : tables.values()) { - if (table.has(key)) { - System.out.println("*DEBUG SplitTable success.time = " + (System.currentTimeMillis() - start) + " ms"); - return table; - } - } - System.out.println("*DEBUG SplitTable fail.time = " + (System.currentTimeMillis() - start) + " ms"); - return null; - }*/ public synchronized void addUnique(kelondroRow.Entry row) throws IOException { addUnique(row, null); diff --git a/source/de/anomic/plasma/plasmaCrawlBalancer.java b/source/de/anomic/plasma/plasmaCrawlBalancer.java index c04b2e7ce..797d1182f 100644 --- a/source/de/anomic/plasma/plasmaCrawlBalancer.java +++ b/source/de/anomic/plasma/plasmaCrawlBalancer.java @@ -75,7 +75,7 @@ public class plasmaCrawlBalancer { // class variables private ArrayList urlRAMStack; // a list that is flushed first private kelondroStack urlFileStack; // a file with url hashes - kelondroIndex urlFileIndex; + kelondroIndex urlFileIndex; private HashMap> domainStacks; // a map from domain name part to Lists with url hashs private File cacheStacksPath; private String stackname; @@ -248,7 +248,12 @@ public class plasmaCrawlBalancer { if (componentsize != urlFileIndex.size()) { // here is urlIndexFile.size() always smaller. why? if (kelondroAbstractRecords.debugmode) { - serverLog.logWarning("PLASMA BALANCER", "size operation wrong in " + stackname + " - componentsize = " + componentsize + ", urlFileIndex.size() = " + urlFileIndex.size()); + serverLog.logWarning("BALANCER", "size wrong in " + stackname + + " - urlFileIndex = " + urlFileIndex.size() + + ", componentsize = " + componentsize + + " = (urlFileStack = " + urlFileStack.size() + + ", urlRAMStack = " + urlRAMStack.size() + + ", sizeDomainStacks = " + sizeDomainStacks() + ")"); } if ((componentsize == 0) && (urlFileIndex.size() > 0)) { resetFileIndex();