From e1bfe9d07ad3d67293d441d03deec08c68dc19b8 Mon Sep 17 00:00:00 2001 From: orbiter Date: Thu, 25 Apr 2013 11:33:17 +0200 Subject: [PATCH] - reduction of the concurrently running processes to make YaCy more adjusted to smaller and 1-core devices. - the workflow processor now starts no process at all. these are started as soon as parser/condenser/indexing queues are filled. - better abstraction --- htroot/ViewImage.java | 3 +- source/net/yacy/cora/protocol/Domains.java | 2 +- source/net/yacy/crawler/CrawlStacker.java | 5 +- source/net/yacy/crawler/data/CrawlQueues.java | 8 +-- source/net/yacy/kelondro/blob/MapHeap.java | 2 +- source/net/yacy/kelondro/data/word/Word.java | 4 +- .../kelondro/data/word/WordReferenceVars.java | 4 +- .../net/yacy/kelondro/index/RowHandleMap.java | 3 +- .../workflow/AbstractBlockingThread.java | 2 +- .../kelondro/workflow/AbstractThread.java | 2 +- .../workflow/InstantBlockingThread.java | 15 +++-- .../kelondro/workflow/WorkflowProcessor.java | 57 ++++++++++++------- source/net/yacy/peers/Dispatcher.java | 15 +---- source/net/yacy/search/Switchboard.java | 26 +++------ .../net/yacy/search/index/DocumentIndex.java | 8 +-- .../net/yacy/search/snippet/TextSnippet.java | 2 +- 16 files changed, 76 insertions(+), 82 deletions(-) diff --git a/htroot/ViewImage.java b/htroot/ViewImage.java index 4dba97075..9c45f7783 100644 --- a/htroot/ViewImage.java +++ b/htroot/ViewImage.java @@ -44,6 +44,7 @@ import net.yacy.kelondro.data.meta.DigestURI; import net.yacy.kelondro.logging.Log; import net.yacy.kelondro.util.FileUtils; import net.yacy.kelondro.util.MemoryControl; +import net.yacy.kelondro.workflow.WorkflowProcessor; import net.yacy.repository.Blacklist.BlacklistType; import net.yacy.search.Switchboard; import net.yacy.server.serverObjects; @@ -51,7 +52,7 @@ import net.yacy.server.serverSwitch; public class ViewImage { - private static Map iconcache = new ConcurrentARC(1000, Math.max(10, Math.min(32, Runtime.getRuntime().availableProcessors() * 2))); + private static Map iconcache = new ConcurrentARC(1000, Math.max(10, Math.min(32, WorkflowProcessor.availableCPU * 2))); private static String defaulticon = "htroot/env/grafics/dfltfvcn.ico"; private static byte[] defaulticonb; static { diff --git a/source/net/yacy/cora/protocol/Domains.java b/source/net/yacy/cora/protocol/Domains.java index ba098d95a..389fb71ca 100644 --- a/source/net/yacy/cora/protocol/Domains.java +++ b/source/net/yacy/cora/protocol/Domains.java @@ -77,7 +77,7 @@ public class Domains { private static final int MAX_NAME_CACHE_HIT_SIZE = 100000; private static final int MAX_NAME_CACHE_MISS_SIZE = 100000; - private static final int CONCURRENCY_LEVEL = Runtime.getRuntime().availableProcessors() + 1; + private static final int CONCURRENCY_LEVEL = Runtime.getRuntime().availableProcessors() * 2; // a dns cache private static final ARC NAME_CACHE_HIT = new ConcurrentARC(MAX_NAME_CACHE_HIT_SIZE, CONCURRENCY_LEVEL); diff --git a/source/net/yacy/crawler/CrawlStacker.java b/source/net/yacy/crawler/CrawlStacker.java index f24293bf2..1f413e9b3 100644 --- a/source/net/yacy/crawler/CrawlStacker.java +++ b/source/net/yacy/crawler/CrawlStacker.java @@ -151,10 +151,7 @@ public final class CrawlStacker { // DEBUG if (this.log.isFinest()) this.log.logFinest("ENQUEUE " + entry.url() + ", referer=" + entry.referrerhash() + ", initiator=" + ((entry.initiator() == null) ? "" : ASCII.String(entry.initiator())) + ", name=" + entry.name() + ", appdate=" + entry.appdate() + ", depth=" + entry.depth()); - try { - this.requestQueue.enQueue(entry); - } catch (InterruptedException e) { - } + this.requestQueue.enQueue(entry); } public void enqueueEntriesAsynchronous(final byte[] initiator, final String profileHandle, final Map hyperlinks) { new Thread() { diff --git a/source/net/yacy/crawler/data/CrawlQueues.java b/source/net/yacy/crawler/data/CrawlQueues.java index 0c93b99cc..e6785dd8f 100644 --- a/source/net/yacy/crawler/data/CrawlQueues.java +++ b/source/net/yacy/crawler/data/CrawlQueues.java @@ -275,12 +275,8 @@ public class CrawlQueues { this.log.logSevere(stats + ": NULL PROFILE HANDLE '" + urlEntry.profileHandle() + "' for URL " + urlEntry.url()); return true; } - try { - this.sb.indexingDocumentProcessor.enQueue(new IndexingQueueEntry(new Response(urlEntry, profile), null, null)); - Log.logInfo("CrawlQueues", "placed NOLOAD URL on indexing queue: " + urlEntry.url().toNormalform(true)); - } catch (final InterruptedException e) { - Log.logException(e); - } + this.sb.indexingDocumentProcessor.enQueue(new IndexingQueueEntry(new Response(urlEntry, profile), null, null)); + Log.logInfo("CrawlQueues", "placed NOLOAD URL on indexing queue: " + urlEntry.url().toNormalform(true)); return true; } diff --git a/source/net/yacy/kelondro/blob/MapHeap.java b/source/net/yacy/kelondro/blob/MapHeap.java index 9a67a57e8..849f3cc49 100644 --- a/source/net/yacy/kelondro/blob/MapHeap.java +++ b/source/net/yacy/kelondro/blob/MapHeap.java @@ -72,7 +72,7 @@ public class MapHeap implements Map> { final int cachesize, final char fillchar) throws IOException { this.blob = new Heap(heapFile, keylength, ordering, buffermax); - this.cache = new ConcurrentARC>(cachesize, Math.max(32, 4 * Runtime.getRuntime().availableProcessors()), ordering); + this.cache = new ConcurrentARC>(cachesize, Math.min(32, 2 * Runtime.getRuntime().availableProcessors()), ordering); this.fillchar = fillchar; } diff --git a/source/net/yacy/kelondro/data/word/Word.java b/source/net/yacy/kelondro/data/word/Word.java index 44f5a2bdf..d9829fcc4 100644 --- a/source/net/yacy/kelondro/data/word/Word.java +++ b/source/net/yacy/kelondro/data/word/Word.java @@ -57,10 +57,10 @@ public class Word { private static ARC hashCache = null; static { try { - hashCache = new ConcurrentARC(hashCacheSize, Math.max(32, 4 * Runtime.getRuntime().availableProcessors())); + hashCache = new ConcurrentARC(hashCacheSize, Math.min(32, 2 * Runtime.getRuntime().availableProcessors())); Log.logInfo("Word", "hashCache.size = " + hashCacheSize); } catch (final OutOfMemoryError e) { - hashCache = new ConcurrentARC(1000, Math.max(8, 2 * Runtime.getRuntime().availableProcessors())); + hashCache = new ConcurrentARC(1000, Math.min(8, 1 + Runtime.getRuntime().availableProcessors())); Log.logInfo("Word", "hashCache.size = " + 1000); } } diff --git a/source/net/yacy/kelondro/data/word/WordReferenceVars.java b/source/net/yacy/kelondro/data/word/WordReferenceVars.java index aebc82263..433179f35 100644 --- a/source/net/yacy/kelondro/data/word/WordReferenceVars.java +++ b/source/net/yacy/kelondro/data/word/WordReferenceVars.java @@ -46,6 +46,7 @@ import net.yacy.kelondro.rwi.Reference; import net.yacy.kelondro.rwi.ReferenceContainer; import net.yacy.kelondro.util.Bitfield; import net.yacy.kelondro.util.ByteArray; +import net.yacy.kelondro.workflow.WorkflowProcessor; public class WordReferenceVars extends AbstractReference implements WordReference, Reference, Cloneable, Comparable, Comparator { @@ -54,7 +55,6 @@ public class WordReferenceVars extends AbstractReference implements WordReferenc * object for termination of concurrent blocking queue processing */ public static final WordReferenceVars poison = new WordReferenceVars(); - private static int cores = Runtime.getRuntime().availableProcessors(); protected static final byte[] default_language = UTF8.getBytes("en"); private final Bitfield flags; @@ -494,7 +494,7 @@ public class WordReferenceVars extends AbstractReference implements WordReferenc @Override public void run() { // start the transformation threads - final int cores0 = Math.min(cores, this.container.size() / 100) + 1; + final int cores0 = Math.min(WorkflowProcessor.availableCPU, this.container.size() / 100) + 1; final TransformWorker[] worker = new TransformWorker[cores0]; for (int i = 0; i < cores0; i++) { worker[i] = new TransformWorker(this.out, this.maxtime, this.local); diff --git a/source/net/yacy/kelondro/index/RowHandleMap.java b/source/net/yacy/kelondro/index/RowHandleMap.java index 0cf8f825d..26accedb9 100644 --- a/source/net/yacy/kelondro/index/RowHandleMap.java +++ b/source/net/yacy/kelondro/index/RowHandleMap.java @@ -53,6 +53,7 @@ import net.yacy.cora.order.CloneableIterator; import net.yacy.cora.storage.HandleMap; import net.yacy.cora.util.SpaceExceededException; import net.yacy.kelondro.logging.Log; +import net.yacy.kelondro.workflow.WorkflowProcessor; public final class RowHandleMap implements HandleMap, Iterable> { @@ -116,7 +117,7 @@ public final class RowHandleMap implements HandleMap, Iterable extends AbstractThread implements BlockingThread { private WorkflowProcessor manager = null; - private final static Log log = new Log("BlockingThread"); + private final static Log log = new Log("AbstractBlockingThread"); @Override public void setManager(final WorkflowProcessor manager) { diff --git a/source/net/yacy/kelondro/workflow/AbstractThread.java b/source/net/yacy/kelondro/workflow/AbstractThread.java index b52e246ef..3fbd42de7 100644 --- a/source/net/yacy/kelondro/workflow/AbstractThread.java +++ b/source/net/yacy/kelondro/workflow/AbstractThread.java @@ -39,7 +39,7 @@ import net.yacy.kelondro.logging.Log; public abstract class AbstractThread extends Thread implements WorkflowThread { - private static Log log = new Log("WorkflowThread"); + private static Log log = new Log("AbstractThread"); protected boolean running = true; private boolean announcedShutdown = false; protected long busytime = 0, memuse = 0; diff --git a/source/net/yacy/kelondro/workflow/InstantBlockingThread.java b/source/net/yacy/kelondro/workflow/InstantBlockingThread.java index 20870f90f..5ddc81615 100644 --- a/source/net/yacy/kelondro/workflow/InstantBlockingThread.java +++ b/source/net/yacy/kelondro/workflow/InstantBlockingThread.java @@ -28,6 +28,7 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; import net.yacy.kelondro.logging.Log; @@ -38,11 +39,11 @@ public class InstantBlockingThread extends AbstractBlocki private final Method jobExecMethod; private final Object environment; private final Long handle; - private static int handleCounter = 0; - private static int instantThreadCounter = 0; + private static AtomicInteger handleCounter = new AtomicInteger(0); + private static AtomicInteger instantThreadCounter = new AtomicInteger(0); private static final ConcurrentMap jobs = new ConcurrentHashMap(); - public InstantBlockingThread(final Object env, final String jobExec, final WorkflowProcessor manager) { + public InstantBlockingThread(final WorkflowProcessor manager) { // jobExec is the name of a method of the object 'env' that executes the one-step-run // jobCount is the name of a method that returns the size of the job @@ -50,9 +51,11 @@ public class InstantBlockingThread extends AbstractBlocki setManager(manager); // define execution class + final Object env = manager.getEnvironment(); + final String jobExec = manager.getMethodName(); this.jobExecMethod = execMethod(env, jobExec); this.environment = (env instanceof Class) ? null : env; - setName(this.jobExecMethod.getClass().getName() + "." + this.jobExecMethod.getName() + "." + handleCounter++); + setName(this.jobExecMethod.getClass().getName() + "." + this.jobExecMethod.getName() + "." + handleCounter.getAndIncrement()); this.handle = Long.valueOf(System.currentTimeMillis() + getName().hashCode()); } @@ -88,7 +91,7 @@ public class InstantBlockingThread extends AbstractBlocki } else { final long t = System.currentTimeMillis(); - instantThreadCounter++; + instantThreadCounter.incrementAndGet(); //System.out.println("started job " + this.handle + ": " + this.getName()); jobs.put(this.handle, getName()); @@ -103,7 +106,7 @@ public class InstantBlockingThread extends AbstractBlocki if (targetException != null) Log.logException(targetException); Log.logSevere(BLOCKINGTHREAD, "Runtime Error in serverInstantThread.job, thread '" + getName() + "': " + e.getMessage()); } - instantThreadCounter--; + instantThreadCounter.decrementAndGet(); jobs.remove(this.handle); getManager().increaseJobTime(System.currentTimeMillis() - t); } diff --git a/source/net/yacy/kelondro/workflow/WorkflowProcessor.java b/source/net/yacy/kelondro/workflow/WorkflowProcessor.java index b4ec8261f..d3fe0d361 100644 --- a/source/net/yacy/kelondro/workflow/WorkflowProcessor.java +++ b/source/net/yacy/kelondro/workflow/WorkflowProcessor.java @@ -24,7 +24,6 @@ package net.yacy.kelondro.workflow; -import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.Iterator; import java.util.concurrent.BlockingQueue; @@ -32,6 +31,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import net.yacy.kelondro.logging.Log; import net.yacy.kelondro.util.NamePrefixThreadFactory; @@ -43,9 +43,10 @@ public class WorkflowProcessor { private static final ArrayList> processMonitor = new ArrayList>(); private ExecutorService executor; + private AtomicInteger executorRunning; private BlockingQueue input; private final WorkflowProcessor output; - private final int poolsize; + private final int maxpoolsize; private final Object environment; private final String processName, methodName, description; private final String[] childs; @@ -55,20 +56,25 @@ public class WorkflowProcessor { public WorkflowProcessor( final String name, final String description, final String[] childnames, final Object env, final String jobExecMethod, - final int inputQueueSize, final WorkflowProcessor output, final int poolsize) { + final int inputQueueSize, final WorkflowProcessor output, + final int maxpoolsize) { // start a fixed number of executors that handle entries in the process queue this.environment = env; this.processName = name; this.description = description; this.methodName = jobExecMethod; this.childs = childnames; - this.input = new LinkedBlockingQueue(inputQueueSize); + this.maxpoolsize = maxpoolsize; + this.input = new LinkedBlockingQueue(Math.max(maxpoolsize + 1, inputQueueSize)); this.output = output; - this.poolsize = poolsize; - this.executor = Executors.newCachedThreadPool(new NamePrefixThreadFactory(jobExecMethod)); - for (int i = 0; i < poolsize; i++) { - this.executor.submit(new InstantBlockingThread(env, jobExecMethod, this)); + this.executor = Executors.newCachedThreadPool(new NamePrefixThreadFactory(this.methodName)); + this.executorRunning = new AtomicInteger(0); + /* + for (int i = 0; i < this.maxpoolsize; i++) { + this.executor.submit(new InstantBlockingThread(this)); + this.executorRunning++; } + */ // init statistics this.blockTime = 0; this.execTime = 0; @@ -79,6 +85,14 @@ public class WorkflowProcessor { processMonitor.add(this); } + public Object getEnvironment() { + return this.environment; + } + + public String getMethodName() { + return this.methodName; + } + public int queueSize() { if (this.input == null) return 0; return this.input.size(); @@ -94,7 +108,7 @@ public class WorkflowProcessor { } public int concurrency() { - return this.poolsize; + return this.maxpoolsize; } public J take() throws InterruptedException { @@ -108,7 +122,7 @@ public class WorkflowProcessor { return j; } - public void passOn(final J next) throws InterruptedException { + public void passOn(final J next) { // don't mix this method up with enQueue()! // this method enqueues into the _next_ queue, not this queue! if (this.output == null) { @@ -145,9 +159,9 @@ public class WorkflowProcessor { } @SuppressWarnings("unchecked") - public void enQueue(final J in) throws InterruptedException { + public void enQueue(final J in) { // ensure that enough job executors are running - if ((this.input == null) || (this.executor == null) || (this.executor.isShutdown()) || (this.executor.isTerminated())) { + if (this.input == null || this.executor == null || this.executor.isShutdown() || this.executor.isTerminated()) { // execute serialized without extra thread //Log.logWarning("PROCESSOR", "executing job " + environment.getClass().getName() + "." + methodName + " serialized"); try { @@ -155,21 +169,23 @@ public class WorkflowProcessor { if (out != null && this.output != null) { this.output.enQueue(out); } - } catch (final IllegalArgumentException e) { - Log.logException(e); - } catch (final IllegalAccessException e) { - Log.logException(e); - } catch (final InvocationTargetException e) { + } catch (final Throwable e) { Log.logException(e); } return; - } + } // execute concurrent in thread while (this.input != null) { try { + if (this.input.size() > this.executorRunning.get() && this.executorRunning.get() < this.maxpoolsize) synchronized (executor) { + if (this.input.size() > this.executorRunning.get() && this.executorRunning.get() < this.maxpoolsize) { + this.executorRunning.incrementAndGet(); + this.executor.submit(new InstantBlockingThread(this)); + } + } this.input.put(in); break; - } catch (final InterruptedException e) { + } catch (final Throwable e) { try {Thread.sleep(10);} catch (final InterruptedException ee) {} } } @@ -186,7 +202,7 @@ public class WorkflowProcessor { // before we put pills into the queue, make sure that they will take them relaxCapacity(); // put poison pills into the queue - for (int i = 0; i < this.poolsize; i++) { + for (int i = 0; i < this.executorRunning.get(); i++) { try { Log.logInfo("serverProcessor", "putting poison pill in queue " + this.processName + ", thread " + i); this.input.put((J) WorkflowJob.poisonPill); // put a poison pill into the queue which will kill the job @@ -200,6 +216,7 @@ public class WorkflowProcessor { Log.logInfo("WorkflowProcess", "waiting for queue " + this.processName + " to shut down; input.size = " + this.input.size()); try {Thread.sleep(1000);} catch (InterruptedException e) {} } + this.executorRunning.set(0); // shut down executors if (this.executor != null & !this.executor.isShutdown()) { diff --git a/source/net/yacy/peers/Dispatcher.java b/source/net/yacy/peers/Dispatcher.java index 20fce9b38..c00297300 100644 --- a/source/net/yacy/peers/Dispatcher.java +++ b/source/net/yacy/peers/Dispatcher.java @@ -117,7 +117,7 @@ public class Dispatcher { gzipBody, timeout); - final int concurrentSender = Math.min(32, Math.max(10, WorkflowProcessor.availableCPU)); + final int concurrentSender = Math.min(32, WorkflowProcessor.availableCPU * 2); this.indexingTransmissionProcessor = new WorkflowProcessor( "transferDocumentIndex", "This is the RWI transmission process", @@ -385,11 +385,7 @@ public class Dispatcher { } if (maxsize < 0) return false; final Transmission.Chunk chunk = this.transmissionCloud.remove(maxtarget); - try { - this.indexingTransmissionProcessor.enQueue(chunk); - } catch (final InterruptedException e) { - Log.logException(e); - } + this.indexingTransmissionProcessor.enQueue(chunk); return true; } @@ -414,12 +410,7 @@ public class Dispatcher { if (!success) this.log.logInfo("STORE: Chunk " + ASCII.String(chunk.primaryTarget()) + " has failed to transmit index; marked peer as busy"); if (chunk.canFinish()) { - try { - if (this.indexingTransmissionProcessor != null) this.indexingTransmissionProcessor.enQueue(chunk); - } catch (final InterruptedException e) { - Log.logException(e); - return null; - } + if (this.indexingTransmissionProcessor != null) this.indexingTransmissionProcessor.enQueue(chunk); return chunk; } this.log.logInfo("STORE: Chunk " + ASCII.String(chunk.primaryTarget()) + " has not enough targets left. This transmission has failed, putting back index to backend"); diff --git a/source/net/yacy/search/Switchboard.java b/source/net/yacy/search/Switchboard.java index 3affb3e82..2b341575d 100644 --- a/source/net/yacy/search/Switchboard.java +++ b/source/net/yacy/search/Switchboard.java @@ -884,7 +884,7 @@ public final class Switchboard extends serverSwitch { "storeDocumentIndex", 2, null, - 1 /*Math.max(1, WorkflowProcessor.availableCPU / 2)*/); + 1); this.indexingAnalysisProcessor = new WorkflowProcessor( "webStructureAnalysis", @@ -1790,17 +1790,11 @@ public final class Switchboard extends serverSwitch { return "not allowed: " + noIndexReason; } - // put document into the concurrent processing queue - try { - this.indexingDocumentProcessor.enQueue(new IndexingQueueEntry( - response, - null, - null)); - return null; - } catch ( final InterruptedException e ) { - Log.logException(e); - return "interrupted: " + e.getMessage(); - } + this.indexingDocumentProcessor.enQueue(new IndexingQueueEntry( + response, + null, + null)); + return null; } public boolean processSurrogate(final String s) { @@ -1912,13 +1906,7 @@ public final class Switchboard extends serverSwitch { final IndexingQueueEntry queueEntry = new IndexingQueueEntry(response, new Document[] {document}, null); - // place the queue entry into the concurrent process of the condenser (document analysis) - try { - this.indexingCondensementProcessor.enQueue(queueEntry); - } catch ( final InterruptedException e ) { - Log.logException(e); - break; - } + this.indexingCondensementProcessor.enQueue(queueEntry); if (shallTerminate()) break; } } diff --git a/source/net/yacy/search/index/DocumentIndex.java b/source/net/yacy/search/index/DocumentIndex.java index 755cc2083..1c9a6e0c2 100644 --- a/source/net/yacy/search/index/DocumentIndex.java +++ b/source/net/yacy/search/index/DocumentIndex.java @@ -40,6 +40,7 @@ import net.yacy.document.LibraryProvider; import net.yacy.document.TextParser; import net.yacy.kelondro.data.meta.DigestURI; import net.yacy.kelondro.logging.Log; +import net.yacy.kelondro.workflow.WorkflowProcessor; import net.yacy.search.schema.CollectionConfiguration; import net.yacy.search.schema.WebgraphConfiguration; @@ -77,11 +78,10 @@ public class DocumentIndex extends Segment { ); super.fulltext().connectLocalSolr(); super.writeWebgraph(true); - final int cores = Runtime.getRuntime().availableProcessors() + 1; this.callback = callback; - this.queue = new LinkedBlockingQueue(cores * 300); - this.worker = new Worker[cores]; - for ( int i = 0; i < cores; i++ ) { + this.queue = new LinkedBlockingQueue(WorkflowProcessor.availableCPU * 300); + this.worker = new Worker[WorkflowProcessor.availableCPU]; + for ( int i = 0; i < WorkflowProcessor.availableCPU; i++ ) { this.worker[i] = new Worker(i); this.worker[i].start(); } diff --git a/source/net/yacy/search/snippet/TextSnippet.java b/source/net/yacy/search/snippet/TextSnippet.java index ab1967a47..8ef802a1b 100644 --- a/source/net/yacy/search/snippet/TextSnippet.java +++ b/source/net/yacy/search/snippet/TextSnippet.java @@ -88,7 +88,7 @@ public class TextSnippet implements Comparable, Comparator cache; public Cache() { - this.cache = new ConcurrentARC(MAX_CACHE, Math.max(32, 4 * Runtime.getRuntime().availableProcessors())); + this.cache = new ConcurrentARC(MAX_CACHE, Math.min(32, 2 * Runtime.getRuntime().availableProcessors())); } public void put(final String wordhashes, final String urlhash, final String snippet) { // generate key