From e34ac22fbd38645b1b25132d7f90f5f6d26c6de9 Mon Sep 17 00:00:00 2001
From: orbiter
Date: Fri, 19 Dec 2008 15:26:01 +0000
Subject: [PATCH] - added new monitoring servlet at
http://localhost:8080/PerformanceConcurrency_p.html - used the new monitoring
to do some fine-tuning of the indexing queue
git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@5402 6c8d7289-2bf4-0310-a012-ef5d649a1542
---
htroot/PerformanceConcurrency_p.html | 50 +++++++
htroot/PerformanceConcurrency_p.java | 93 +++++++++++++
htroot/Settings_p.html | 3 +-
source/de/anomic/crawler/CrawlStacker.java | 12 +-
.../de/anomic/plasma/plasmaSwitchboard.java | 39 +++++-
.../server/serverAbstractBlockingThread.java | 25 ++--
.../anomic/server/serverBlockingThread.java | 8 +-
.../server/serverInstantBlockingThread.java | 14 +-
source/de/anomic/server/serverProcessor.java | 128 ++++++++++++++++--
9 files changed, 320 insertions(+), 52 deletions(-)
create mode 100644 htroot/PerformanceConcurrency_p.html
create mode 100644 htroot/PerformanceConcurrency_p.java
diff --git a/htroot/PerformanceConcurrency_p.html b/htroot/PerformanceConcurrency_p.html
new file mode 100644
index 000000000..a635c6fba
--- /dev/null
+++ b/htroot/PerformanceConcurrency_p.html
@@ -0,0 +1,50 @@
+
+
+
+ YaCy '#[clientname]#': Performance of Concurrent Processes
+ #%env/templates/metas.template%#
+
+
+
+
+ #%env/templates/header.template%#
+ #%env/templates/submenuConfig.template%#
+
Performance of Concurrent Processes
+
+
+
+ #%env/templates/footer.template%#
+
+
diff --git a/htroot/PerformanceConcurrency_p.java b/htroot/PerformanceConcurrency_p.java
new file mode 100644
index 000000000..43bfd4de8
--- /dev/null
+++ b/htroot/PerformanceConcurrency_p.java
@@ -0,0 +1,93 @@
+// PerformanceConcurrency_p.java
+// -----------------------
+// part of YaCy
+// (C) by Michael Peter Christen; mc@yacy.net
+// first published on http:// www.yacy.net
+// Frankfurt, Germany, 19.12.2008
+//
+// This program is free software; you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation; either version 2 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program; if not, write to the Free Software
+// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+import java.util.Iterator;
+
+import de.anomic.http.httpRequestHeader;
+import de.anomic.server.serverObjects;
+import de.anomic.server.serverProcessor;
+import de.anomic.server.serverSwitch;
+
+public class PerformanceConcurrency_p {
+
+ public static serverObjects respond(final httpRequestHeader header, final serverObjects post, final serverSwitch> sb) {
+ // return variable that accumulates replacements
+ final serverObjects prop = new serverObjects();
+
+ // calculate totals
+ long blocktime_total = 0, exectime_total = 0, passontime_total = 0;
+ Iterator> threads = serverProcessor.processes();
+ serverProcessor> p;
+ while (threads.hasNext()) {
+ p = threads.next();
+ blocktime_total += p.getBlockTime();
+ exectime_total += p.getExecTime();
+ passontime_total += p.getPassOnTime();
+ }
+ if (blocktime_total == 0) blocktime_total = 1;
+ if (exectime_total == 0) exectime_total = 1;
+ if (passontime_total == 0) passontime_total = 1;
+
+ // set templates for latest news from the threads
+ long blocktime, exectime, passontime;
+ threads = serverProcessor.processes();
+ int c = 0;
+ long cycles;
+ while (threads.hasNext()) {
+ p = threads.next();
+ cycles = p.getExecCount();
+ if (cycles == 0) cycles = 1; // avoid division by zero
+
+ // set values to templates
+ prop.put("table_" + c + "_threadname", p.getName());
+ prop.putHTML("table_" + c + "_longdescr", p.getDescription());
+ prop.put("table_" + c + "_queuesize", p.queueSize());
+ prop.put("table_" + c + "_queuesizemax", p.queueSizeMax());
+ prop.put("table_" + c + "_concurrency", p.concurrency());
+ prop.putHTML("table_" + c + "_childs", p.getChilds());
+
+ blocktime = p.getBlockTime();
+ exectime = p.getExecTime();
+ passontime = p.getPassOnTime();
+ prop.putNum("table_" + c + "_blockreadtime", blocktime / cycles);
+ prop.putNum("table_" + c + "_blockreadpercent", 100 * blocktime / blocktime_total);
+ prop.putNum("table_" + c + "_exectime", exectime / cycles);
+ prop.putNum("table_" + c + "_execpercent", 100 * exectime / exectime_total);
+ prop.putNum("table_" + c + "_blockwritetime", passontime / cycles);
+ prop.putNum("table_" + c + "_blockwritepercent", 100 * passontime / passontime_total);
+ prop.putNum("table_" + c + "_totalcycles", p.getExecCount());
+
+ // set a color for the line to show problems
+ boolean problem = false;
+ boolean warning = false;
+ if (p.queueSize() == p.queueSizeMax()) problem = true;
+ if (p.queueSize() > p.queueSizeMax() * 8 / 10) warning = true;
+ if (100 * blocktime / blocktime_total > 80) warning = true;
+ if (100 * exectime / exectime_total > 80) warning = true;
+ if (100 * passontime / passontime_total > 80) warning = true;
+ prop.put("table_" + c + "_class", (!warning && !problem) ? 0 : (!problem) ? 1 : 2);
+ c++;
+ }
+ prop.put("table", c);
+ // return rewrite values for templates
+ return prop;
+ }
+}
diff --git a/htroot/Settings_p.html b/htroot/Settings_p.html
index 337b97306..8be9e92ac 100644
--- a/htroot/Settings_p.html
+++ b/htroot/Settings_p.html
@@ -29,7 +29,8 @@
delete the file 'DATA/SETTINGS/yacy.conf' in the YaCy application root folder and start YaCy again.
diff --git a/source/de/anomic/crawler/CrawlStacker.java b/source/de/anomic/crawler/CrawlStacker.java
index 2fbc90b28..ee157d0ad 100644
--- a/source/de/anomic/crawler/CrawlStacker.java
+++ b/source/de/anomic/crawler/CrawlStacker.java
@@ -65,8 +65,8 @@ public final class CrawlStacker {
this.acceptLocalURLs = acceptLocalURLs;
this.acceptGlobalURLs = acceptGlobalURLs;
- this.fastQueue = new serverProcessor(this, "job", 10000, null, 2);
- this.slowQueue = new serverProcessor(this, "job", 1000, null, 5);
+ this.fastQueue = new serverProcessor("CrawlStackerFast", "This process checks new urls before they are enqueued into the balancer (proper, double-check, correct domain, filter)", new String[]{"Balancer"}, this, "job", 10000, null, 2);
+ this.slowQueue = new serverProcessor("CrawlStackerSlow", "This is like CrawlStackerFast, but does additionaly a DNS lookup. The CrawlStackerFast does not need this because it can use the DNS cache.", new String[]{"Balancer"}, this, "job", 1000, null, 5);
this.log.logInfo("STACKCRAWL thread initialized.");
}
@@ -80,8 +80,14 @@ public final class CrawlStacker {
this.slowQueue.clear();
}
+ public void announceClose() {
+ this.log.logInfo("Flushing remaining " + size() + " crawl stacker job entries.");
+ this.fastQueue.announceShutdown();
+ this.slowQueue.announceShutdown();
+ }
+
public void close() {
- this.log.logInfo("Shutdown. Flushing remaining " + size() + " crawl stacker job entries. please wait.");
+ this.log.logInfo("Shutdown. waiting for remaining " + size() + " crawl stacker job entries. please wait.");
this.fastQueue.announceShutdown();
this.slowQueue.announceShutdown();
this.fastQueue.awaitShutdown(2000);
diff --git a/source/de/anomic/plasma/plasmaSwitchboard.java b/source/de/anomic/plasma/plasmaSwitchboard.java
index d3189cb4b..ee507cc1c 100644
--- a/source/de/anomic/plasma/plasmaSwitchboard.java
+++ b/source/de/anomic/plasma/plasmaSwitchboard.java
@@ -564,10 +564,26 @@ public final class plasmaSwitchboard extends serverAbstractSwitch(this, "storeDocumentIndex", serverProcessor.useCPU, null, 1);
- indexingAnalysisProcessor = new serverProcessor(this, "webStructureAnalysis", serverProcessor.useCPU + 1, indexingStorageProcessor);
- indexingCondensementProcessor = new serverProcessor(this, "condenseDocument", serverProcessor.useCPU + 2, indexingAnalysisProcessor);
- indexingDocumentProcessor = new serverProcessor(this, "parseDocument", serverProcessor.useCPU + 3, indexingCondensementProcessor);
+ indexingStorageProcessor = new serverProcessor(
+ "storeDocumentIndex",
+ "This is the sequencing step of the indexing queue: no concurrency is wanted here, because the access of the indexer works better if it is not concurrent. Files are written as streams, councurrency would destroy IO performance. In this process the words are written to the RWI cache, which flushes if it is full.",
+ new String[]{"RWI/Cache/Collections"},
+ this, "storeDocumentIndex", serverProcessor.useCPU + 40, null, 1);
+ indexingAnalysisProcessor = new serverProcessor(
+ "webStructureAnalysis",
+ "This just stores the link structure of the document into a web structure database.",
+ new String[]{"storeDocumentIndex"},
+ this, "webStructureAnalysis", serverProcessor.useCPU + 20, indexingStorageProcessor, serverProcessor.useCPU + 1);
+ indexingCondensementProcessor = new serverProcessor(
+ "condenseDocument",
+ "This does a structural analysis of plain texts: markup of headlines, slicing into phrases (i.e. sentences), markup with position, counting of words, calculation of term frequency.",
+ new String[]{"webStructureAnalysis"},
+ this, "condenseDocument", serverProcessor.useCPU + 10, indexingAnalysisProcessor, serverProcessor.useCPU + 1);
+ indexingDocumentProcessor = new serverProcessor(
+ "parseDocument",
+ "This does the parsing of the newly loaded documents from the web. The result is not only a plain text document, but also a list of URLs that are embedded into the document. The urls are handed over to the CrawlStacker. This process has two child process queues!",
+ new String[]{"condenseDocument", "CrawlStacker"},
+ this, "parseDocument", serverProcessor.useCPU + 1, indexingCondensementProcessor, serverProcessor.useCPU + 1);
// deploy busy threads
log.logConfig("Starting Threads");
@@ -711,10 +727,14 @@ public final class plasmaSwitchboard extends serverAbstractSwitch= 0,
+ "global.any".indexOf(getConfig("network.unit.domain", "global")) >= 0);
}
// start up crawl jobs
continueCrawlJob(plasmaSwitchboardConstants.CRAWLJOB_LOCAL_CRAWL);
@@ -1033,14 +1059,15 @@ public final class plasmaSwitchboard extends serverAbstractSwitch extends serverAbstractThread implements serverBlockingThread {
- private BlockingQueue input = null;
- private serverProcessor output = null;
+ private serverProcessor manager = null;
- public void setInputQueue(final BlockingQueue queue) {
- this.input = queue;
- }
- public void setOutputProcess(final serverProcessor processor) {
- this.output = processor;
- }
- public BlockingQueue getInputQueue() {
- return this.input;
+ public void setManager(final serverProcessor manager) {
+ this.manager = manager;
}
- public serverProcessor getOutputProcess() {
- return this.output;
+ public serverProcessor getManager() {
+ return this.manager;
}
@SuppressWarnings("unchecked")
@@ -61,17 +52,17 @@ public abstract class serverAbstractBlockingThread
// do job
timestamp = System.currentTimeMillis();
memstamp0 = serverMemory.used();
- final J in = this.input.take();
+ final J in = this.manager.take();
if ((in == null) || (in == serverProcessorJob.poisonPill) || (in.status == serverProcessorJob.STATUS_POISON)) {
// the poison pill: shutdown
// a null element is pushed to the queue on purpose to signal
// that a termination should be made
- if (this.output != null) this.output.enQueue((J) serverProcessorJob.poisonPill); // pass on the pill
+ //this.manager.enQueueNext((J) serverProcessorJob.poisonPill); // pass on the pill
this.running = false;
break;
}
final J out = this.job(in);
- if ((out != null) && (this.output != null)) this.output.enQueue(out);
+ if (out != null) this.manager.passOn(out);
// do memory and busy/idle-count/time monitoring
memstamp1 = serverMemory.used();
if (memstamp1 >= memstamp0) {
diff --git a/source/de/anomic/server/serverBlockingThread.java b/source/de/anomic/server/serverBlockingThread.java
index 49be2cd23..221c82ccf 100644
--- a/source/de/anomic/server/serverBlockingThread.java
+++ b/source/de/anomic/server/serverBlockingThread.java
@@ -24,14 +24,10 @@
package de.anomic.server;
-import java.util.concurrent.BlockingQueue;
-
public interface serverBlockingThread extends serverThread {
- public void setInputQueue(BlockingQueue queue);
- public void setOutputProcess(serverProcessor queue);
- public BlockingQueue getInputQueue();
- public serverProcessor getOutputProcess();
+ public void setManager(serverProcessor queue);
+ public serverProcessor getManager();
public J job(J next) throws Exception;
// performes one job procedure; this loopes until terminate() is called
diff --git a/source/de/anomic/server/serverInstantBlockingThread.java b/source/de/anomic/server/serverInstantBlockingThread.java
index fc81eeb37..26221b19a 100644
--- a/source/de/anomic/server/serverInstantBlockingThread.java
+++ b/source/de/anomic/server/serverInstantBlockingThread.java
@@ -26,7 +26,6 @@ package de.anomic.server;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import de.anomic.server.logging.serverLog;
@@ -40,13 +39,12 @@ public class serverInstantBlockingThread extends s
public static int instantThreadCounter = 0;
public static final ConcurrentHashMap jobs = new ConcurrentHashMap();
- public serverInstantBlockingThread(final Object env, final String jobExec, final BlockingQueue input, final serverProcessor output) {
+ public serverInstantBlockingThread(final Object env, final String jobExec, final serverProcessor manager) {
// jobExec is the name of a method of the object 'env' that executes the one-step-run
// jobCount is the name of a method that returns the size of the job
- // set the blocking queues for input and output
- this.setInputQueue(input);
- this.setOutputProcess(output);
+ // set the manager of blocking queues for input and output
+ this.setManager(manager);
// define execution class
this.jobExecMethod = execMethod(env, jobExec);
@@ -71,7 +69,7 @@ public class serverInstantBlockingThread extends s
}
public int getJobCount() {
- return this.getInputQueue().size();
+ return this.getManager().queueSize();
}
@SuppressWarnings("unchecked")
@@ -79,7 +77,8 @@ public class serverInstantBlockingThread extends s
// see if we got a poison pill to tell us to shut down
if (next == null) return (J) serverProcessorJob.poisonPill;
if (next == serverProcessorJob.poisonPill || next.status == serverProcessorJob.STATUS_POISON) return next;
-
+ long t = System.currentTimeMillis();
+
instantThreadCounter++;
//System.out.println("started job " + this.handle + ": " + this.getName());
jobs.put(this.handle, this.getName());
@@ -107,6 +106,7 @@ public class serverInstantBlockingThread extends s
}
instantThreadCounter--;
jobs.remove(this.handle);
+ this.getManager().increaseJobTime(System.currentTimeMillis() - t);
return out;
}
diff --git a/source/de/anomic/server/serverProcessor.java b/source/de/anomic/server/serverProcessor.java
index 1b9fa2f86..27081fe39 100644
--- a/source/de/anomic/server/serverProcessor.java
+++ b/source/de/anomic/server/serverProcessor.java
@@ -25,6 +25,8 @@
package de.anomic.server;
import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -37,35 +39,73 @@ public class serverProcessor {
public static final int availableCPU = Runtime.getRuntime().availableProcessors();
public static int useCPU = availableCPU;
+ private static final ArrayList> processMonitor = new ArrayList>();
private ExecutorService executor;
private BlockingQueue input;
private serverProcessor output;
private int poolsize;
private Object environment;
- private String methodName;
+ private String processName, methodName, description;
+ private String[] childs;
+ private long blockTime, execTime, passOnTime;
+ private long execCount;
- public serverProcessor(final Object env, final String jobExec, final int inputQueueSize, final serverProcessor output) {
- this(env, jobExec, inputQueueSize, output, useCPU + 1);
- }
-
- public serverProcessor(final Object env, final String jobExec, final int inputQueueSize, final serverProcessor output, final int poolsize) {
+ public serverProcessor(
+ String name, String description, String[] childnames,
+ final Object env, final String jobExecMethod, final int inputQueueSize, final serverProcessor output, final int poolsize) {
// start a fixed number of executors that handle entries in the process queue
this.environment = env;
- this.methodName = jobExec;
+ this.processName = name;
+ this.description = description;
+ this.methodName = jobExecMethod;
+ this.childs = childnames;
this.input = new LinkedBlockingQueue(inputQueueSize);
this.output = output;
this.poolsize = poolsize;
- executor = Executors.newCachedThreadPool(new NamePrefixThreadFactory(jobExec));
+ this.executor = Executors.newCachedThreadPool(new NamePrefixThreadFactory(jobExecMethod));
for (int i = 0; i < poolsize; i++) {
- executor.submit(new serverInstantBlockingThread(env, jobExec, input, output));
+ this.executor.submit(new serverInstantBlockingThread(env, jobExecMethod, this));
}
+ // init statistics
+ blockTime = 0;
+ execTime = 0;
+ passOnTime = 0;
+ execCount = 0;
+
+ // store this object for easy monitoring
+ processMonitor.add(this);
}
public int queueSize() {
return this.input.size();
}
+ public int queueSizeMax() {
+ return this.input.size() + this.input.remainingCapacity();
+ }
+
+ public int concurrency() {
+ return this.poolsize;
+ }
+
+ public J take() throws InterruptedException {
+ // read from the input queue
+ long t = System.currentTimeMillis();
+ J j = this.input.take();
+ this.blockTime += System.currentTimeMillis() - t;
+ return j;
+ }
+
+ public void passOn(J next) throws InterruptedException {
+ // don't mix this method up with enQueue()!
+ // this method enqueues into the _next_ queue, not this queue!
+ if (this.output == null) return;
+ long t = System.currentTimeMillis();
+ this.output.enQueue(next);
+ this.passOnTime += System.currentTimeMillis() - t;
+ }
+
public void clear() {
if (this.input != null) this.input.clear();
}
@@ -114,9 +154,9 @@ public class serverProcessor {
// put poison pills into the queue
for (int i = 0; i < poolsize; i++) {
try {
- serverLog.logInfo("serverProcessor", "putting poison pill in queue for " + this.methodName + ", thread " + i);
+ serverLog.logInfo("serverProcessor", "putting poison pill in queue " + this.processName + ", thread " + i);
input.put((J) serverProcessorJob.poisonPill); // put a poison pill into the queue which will kill the job
- serverLog.logInfo("serverProcessor", ".. poison pill is in queue for " + this.methodName + ", thread " + i + ". awaiting termination");
+ serverLog.logInfo("serverProcessor", ".. poison pill is in queue " + this.processName + ", thread " + i + ". awaiting termination");
} catch (final InterruptedException e) { }
}
}
@@ -130,9 +170,73 @@ public class serverProcessor {
} catch (final InterruptedException e) {}
executor.shutdown();
}
- serverLog.logInfo("serverProcessor", "queue for " + this.methodName + ": shutdown.");
+ serverLog.logInfo("serverProcessor", "queue " + this.processName + ": shutdown.");
this.executor = null;
this.input = null;
+ // remove entry from monitor
+ Iterator> i = processes();
+ serverProcessor> p;
+ while (i.hasNext()) {
+ p = i.next();
+ if (p == this) {
+ i.remove();
+ break;
+ }
+ }
+ }
+
+ public static Iterator> processes() {
+ return processMonitor.iterator();
+ }
+
+ protected void increaseJobTime(long time) {
+ this.execTime += time;
+ this.execCount++;
+ }
+
+ public String getName() {
+ return this.processName;
+ }
+
+ public String getDescription() {
+ return this.description;
+ }
+
+ public String getChilds() {
+ StringBuilder s = new StringBuilder();
+ for (int i = 0; i < this.childs.length; i++) {
+ s.append(this.childs[i]);
+ s.append(' ');
+ }
+ return s.toString();
+ }
+
+ /**
+ * the block time is the time that a take() blocks until it gets a value
+ * @return
+ */
+ public long getBlockTime() {
+ return blockTime;
+ }
+
+ /**
+ * the exec time is the complete time of the execution and processing of the value from take()
+ * @return
+ */
+ public long getExecTime() {
+ return execTime;
+ }
+ public long getExecCount() {
+ return execCount;
+ }
+
+ /**
+ * the passOn time is the time that a put() takes to enqueue a result value to the next queue
+ * in case that the target queue is limited and may be full, this value may increase
+ * @return
+ */
+ public long getPassOnTime() {
+ return passOnTime;
}
}