- added new monitoring servlet at

http://localhost:8080/PerformanceConcurrency_p.html
- used the new monitoring to do some fine-tuning of the indexing queue

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@5402 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
orbiter 16 years ago
parent 449e697436
commit e34ac22fbd

@ -0,0 +1,50 @@
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
<head>
<title>YaCy '#[clientname]#': Performance of Concurrent Processes</title>
#%env/templates/metas.template%#
<script type="text/javascript" src="js/html.js"></script>
<meta http-equiv="REFRESH" content="1; url=/PerformanceConcurrency_p.html">
</head>
<body id="PerformanceConcurrency">
#%env/templates/header.template%#
#%env/templates/submenuConfig.template%#
<h2>Performance of Concurrent Processes</h2>
<fieldset><legend>serverProcessor Objects</legend>
<table border="0" cellpadding="2" cellspacing="1">
<tr class="TableHeader" valign="bottom">
<td>Thread</td>
<td>Queue Size<br />Current</td>
<td>Queue Size<br />Maximum</td>
<td>Concurrency:<br />Number of Threads</td>
<td>Childs</td>
<td colspan="2">Average<br />Block Time<br />Reading</td>
<td colspan="2">Average<br />Exec Time</td>
<td colspan="2">Average<br />Block Time<br />Writing</td>
<td>Total<br />Cycles</td>
<td>Full Description</td>
</tr>
#{table}#
<tr #(class)#class="TableCellLight"::class="TableCellDark"::class="TableCellSummary"#(/class)#>
<td align="left">#[threadname]#</td>
<td align="right">#[queuesize]#</td>
<td align="right">#[queuesizemax]#</td>
<td align="right">#[concurrency]#</td>
<td align="right">#[childs]#</td>
<td align="right">#[blockreadtime]#&nbsp;ms</td>
<td align="right">#[blockreadpercent]#%</td>
<td align="right">#[exectime]#&nbsp;ms</td>
<td align="right">#[execpercent]#%</td>
<td align="right">#[blockwritetime]#&nbsp;ms</td>
<td align="right">#[blockwritepercent]#%</td>
<td align="right">#[totalcycles]#</td>
<td align="left">#[longdescr]#</td>
</tr>
#{/table}#
</table>
</fieldset>
#%env/templates/footer.template%#
</body>
</html>

@ -0,0 +1,93 @@
// PerformanceConcurrency_p.java
// -----------------------
// part of YaCy
// (C) by Michael Peter Christen; mc@yacy.net
// first published on http:// www.yacy.net
// Frankfurt, Germany, 19.12.2008
//
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 2 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
import java.util.Iterator;
import de.anomic.http.httpRequestHeader;
import de.anomic.server.serverObjects;
import de.anomic.server.serverProcessor;
import de.anomic.server.serverSwitch;
public class PerformanceConcurrency_p {
public static serverObjects respond(final httpRequestHeader header, final serverObjects post, final serverSwitch<?> sb) {
// return variable that accumulates replacements
final serverObjects prop = new serverObjects();
// calculate totals
long blocktime_total = 0, exectime_total = 0, passontime_total = 0;
Iterator<serverProcessor<?>> threads = serverProcessor.processes();
serverProcessor<?> p;
while (threads.hasNext()) {
p = threads.next();
blocktime_total += p.getBlockTime();
exectime_total += p.getExecTime();
passontime_total += p.getPassOnTime();
}
if (blocktime_total == 0) blocktime_total = 1;
if (exectime_total == 0) exectime_total = 1;
if (passontime_total == 0) passontime_total = 1;
// set templates for latest news from the threads
long blocktime, exectime, passontime;
threads = serverProcessor.processes();
int c = 0;
long cycles;
while (threads.hasNext()) {
p = threads.next();
cycles = p.getExecCount();
if (cycles == 0) cycles = 1; // avoid division by zero
// set values to templates
prop.put("table_" + c + "_threadname", p.getName());
prop.putHTML("table_" + c + "_longdescr", p.getDescription());
prop.put("table_" + c + "_queuesize", p.queueSize());
prop.put("table_" + c + "_queuesizemax", p.queueSizeMax());
prop.put("table_" + c + "_concurrency", p.concurrency());
prop.putHTML("table_" + c + "_childs", p.getChilds());
blocktime = p.getBlockTime();
exectime = p.getExecTime();
passontime = p.getPassOnTime();
prop.putNum("table_" + c + "_blockreadtime", blocktime / cycles);
prop.putNum("table_" + c + "_blockreadpercent", 100 * blocktime / blocktime_total);
prop.putNum("table_" + c + "_exectime", exectime / cycles);
prop.putNum("table_" + c + "_execpercent", 100 * exectime / exectime_total);
prop.putNum("table_" + c + "_blockwritetime", passontime / cycles);
prop.putNum("table_" + c + "_blockwritepercent", 100 * passontime / passontime_total);
prop.putNum("table_" + c + "_totalcycles", p.getExecCount());
// set a color for the line to show problems
boolean problem = false;
boolean warning = false;
if (p.queueSize() == p.queueSizeMax()) problem = true;
if (p.queueSize() > p.queueSizeMax() * 8 / 10) warning = true;
if (100 * blocktime / blocktime_total > 80) warning = true;
if (100 * exectime / exectime_total > 80) warning = true;
if (100 * passontime / passontime_total > 80) warning = true;
prop.put("table_" + c + "_class", (!warning && !problem) ? 0 : (!problem) ? 1 : 2);
c++;
}
prop.put("table", c);
// return rewrite values for templates
return prop;
}
}

@ -29,7 +29,8 @@
delete the file 'DATA/SETTINGS/yacy.conf' in the YaCy application root folder and start YaCy again.
</p>
<ul class="settingsMenu">
<li><a href="PerformanceQueues_p.html">Performance Settings of Queues and Processes</a></li>
<li><a href="PerformanceQueues_p.html">Performance Settings of Busy Queues</a></li>
<li><a href="PerformanceConcurrency_p.html">Performance of Concurrent Processes</a></li>
<li><a href="PerformanceMemory_p.html">Performance Settings for Memory</a></li>
<li><a href="PerformanceSearch_p.html">Performance Settings of Search Sequence</a></li>
</ul>

@ -65,8 +65,8 @@ public final class CrawlStacker {
this.acceptLocalURLs = acceptLocalURLs;
this.acceptGlobalURLs = acceptGlobalURLs;
this.fastQueue = new serverProcessor<CrawlEntry>(this, "job", 10000, null, 2);
this.slowQueue = new serverProcessor<CrawlEntry>(this, "job", 1000, null, 5);
this.fastQueue = new serverProcessor<CrawlEntry>("CrawlStackerFast", "This process checks new urls before they are enqueued into the balancer (proper, double-check, correct domain, filter)", new String[]{"Balancer"}, this, "job", 10000, null, 2);
this.slowQueue = new serverProcessor<CrawlEntry>("CrawlStackerSlow", "This is like CrawlStackerFast, but does additionaly a DNS lookup. The CrawlStackerFast does not need this because it can use the DNS cache.", new String[]{"Balancer"}, this, "job", 1000, null, 5);
this.log.logInfo("STACKCRAWL thread initialized.");
}
@ -80,8 +80,14 @@ public final class CrawlStacker {
this.slowQueue.clear();
}
public void announceClose() {
this.log.logInfo("Flushing remaining " + size() + " crawl stacker job entries.");
this.fastQueue.announceShutdown();
this.slowQueue.announceShutdown();
}
public void close() {
this.log.logInfo("Shutdown. Flushing remaining " + size() + " crawl stacker job entries. please wait.");
this.log.logInfo("Shutdown. waiting for remaining " + size() + " crawl stacker job entries. please wait.");
this.fastQueue.announceShutdown();
this.slowQueue.announceShutdown();
this.fastQueue.awaitShutdown(2000);

@ -564,10 +564,26 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<IndexingStack.
this.clusterhashes = this.webIndex.seedDB.clusterHashes(getConfig("cluster.peers.yacydomain", ""));
// deploy blocking threads
indexingStorageProcessor = new serverProcessor<indexingQueueEntry>(this, "storeDocumentIndex", serverProcessor.useCPU, null, 1);
indexingAnalysisProcessor = new serverProcessor<indexingQueueEntry>(this, "webStructureAnalysis", serverProcessor.useCPU + 1, indexingStorageProcessor);
indexingCondensementProcessor = new serverProcessor<indexingQueueEntry>(this, "condenseDocument", serverProcessor.useCPU + 2, indexingAnalysisProcessor);
indexingDocumentProcessor = new serverProcessor<indexingQueueEntry>(this, "parseDocument", serverProcessor.useCPU + 3, indexingCondensementProcessor);
indexingStorageProcessor = new serverProcessor<indexingQueueEntry>(
"storeDocumentIndex",
"This is the sequencing step of the indexing queue: no concurrency is wanted here, because the access of the indexer works better if it is not concurrent. Files are written as streams, councurrency would destroy IO performance. In this process the words are written to the RWI cache, which flushes if it is full.",
new String[]{"RWI/Cache/Collections"},
this, "storeDocumentIndex", serverProcessor.useCPU + 40, null, 1);
indexingAnalysisProcessor = new serverProcessor<indexingQueueEntry>(
"webStructureAnalysis",
"This just stores the link structure of the document into a web structure database.",
new String[]{"storeDocumentIndex"},
this, "webStructureAnalysis", serverProcessor.useCPU + 20, indexingStorageProcessor, serverProcessor.useCPU + 1);
indexingCondensementProcessor = new serverProcessor<indexingQueueEntry>(
"condenseDocument",
"This does a structural analysis of plain texts: markup of headlines, slicing into phrases (i.e. sentences), markup with position, counting of words, calculation of term frequency.",
new String[]{"webStructureAnalysis"},
this, "condenseDocument", serverProcessor.useCPU + 10, indexingAnalysisProcessor, serverProcessor.useCPU + 1);
indexingDocumentProcessor = new serverProcessor<indexingQueueEntry>(
"parseDocument",
"This does the parsing of the newly loaded documents from the web. The result is not only a plain text document, but also a list of URLs that are embedded into the document. The urls are handed over to the CrawlStacker. This process has two child process queues!",
new String[]{"condenseDocument", "CrawlStacker"},
this, "parseDocument", serverProcessor.useCPU + 1, indexingCondensementProcessor, serverProcessor.useCPU + 1);
// deploy busy threads
log.logConfig("Starting Threads");
@ -711,10 +727,14 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<IndexingStack.
plasmaSearchEvent.cleanupEvents(true);
// switch the networks
synchronized (this) {
// shut down
synchronized (this.webIndex) {
this.webIndex.close();
}
// TODO: restart CrawlStacker
this.crawlStacker.announceClose();
this.crawlStacker.close();
// start up
setConfig("network.unit.definition", networkDefinition);
overwriteNetworkDefinition();
final File indexPrimaryPath = getConfigPath(plasmaSwitchboardConstants.INDEX_PRIMARY_PATH, plasmaSwitchboardConstants.INDEX_PATH_DEFAULT);
@ -723,6 +743,12 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<IndexingStack.
final boolean useCommons = getConfigBool("index.storeCommons", false);
final int redundancy = (int) sb.getConfigLong("network.unit.dhtredundancy.senior", 1);
this.webIndex = new plasmaWordIndex(getConfig(plasmaSwitchboardConstants.NETWORK_NAME, ""), getLog(), indexPrimaryPath, indexSecondaryPath, wordCacheMaxCount, useCommons, redundancy);
// we need a new stacker, because this uses network-specific attributes to sort out urls (local, global)
this.crawlStacker = new CrawlStacker(
crawlQueues,
this.webIndex,
"local.any".indexOf(getConfig("network.unit.domain", "global")) >= 0,
"global.any".indexOf(getConfig("network.unit.domain", "global")) >= 0);
}
// start up crawl jobs
continueCrawlJob(plasmaSwitchboardConstants.CRAWLJOB_LOCAL_CRAWL);
@ -1033,14 +1059,15 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<IndexingStack.
log.logConfig("SWITCHBOARD SHUTDOWN STEP 2: sending termination signal to threaded indexing");
// closing all still running db importer jobs
indexingDocumentProcessor.announceShutdown();
crawlStacker.close();
indexingDocumentProcessor.awaitShutdown(4000);
crawlStacker.announceClose();
indexingCondensementProcessor.announceShutdown();
indexingAnalysisProcessor.announceShutdown();
indexingStorageProcessor.announceShutdown();
indexingCondensementProcessor.awaitShutdown(3000);
indexingAnalysisProcessor.awaitShutdown(2000);
indexingStorageProcessor.awaitShutdown(1000);
crawlStacker.close();
this.dbImportManager.close();
JakartaCommonsHttpClient.closeAllConnections();
wikiDB.close();

@ -24,26 +24,17 @@
package de.anomic.server;
import java.util.concurrent.BlockingQueue;
import de.anomic.server.logging.serverLog;
public abstract class serverAbstractBlockingThread<J extends serverProcessorJob> extends serverAbstractThread implements serverBlockingThread<J> {
private BlockingQueue<J> input = null;
private serverProcessor<J> output = null;
private serverProcessor<J> manager = null;
public void setInputQueue(final BlockingQueue<J> queue) {
this.input = queue;
}
public void setOutputProcess(final serverProcessor<J> processor) {
this.output = processor;
}
public BlockingQueue<J> getInputQueue() {
return this.input;
public void setManager(final serverProcessor<J> manager) {
this.manager = manager;
}
public serverProcessor<J> getOutputProcess() {
return this.output;
public serverProcessor<J> getManager() {
return this.manager;
}
@SuppressWarnings("unchecked")
@ -61,17 +52,17 @@ public abstract class serverAbstractBlockingThread<J extends serverProcessorJob>
// do job
timestamp = System.currentTimeMillis();
memstamp0 = serverMemory.used();
final J in = this.input.take();
final J in = this.manager.take();
if ((in == null) || (in == serverProcessorJob.poisonPill) || (in.status == serverProcessorJob.STATUS_POISON)) {
// the poison pill: shutdown
// a null element is pushed to the queue on purpose to signal
// that a termination should be made
if (this.output != null) this.output.enQueue((J) serverProcessorJob.poisonPill); // pass on the pill
//this.manager.enQueueNext((J) serverProcessorJob.poisonPill); // pass on the pill
this.running = false;
break;
}
final J out = this.job(in);
if ((out != null) && (this.output != null)) this.output.enQueue(out);
if (out != null) this.manager.passOn(out);
// do memory and busy/idle-count/time monitoring
memstamp1 = serverMemory.used();
if (memstamp1 >= memstamp0) {

@ -24,14 +24,10 @@
package de.anomic.server;
import java.util.concurrent.BlockingQueue;
public interface serverBlockingThread<J extends serverProcessorJob> extends serverThread {
public void setInputQueue(BlockingQueue<J> queue);
public void setOutputProcess(serverProcessor<J> queue);
public BlockingQueue<J> getInputQueue();
public serverProcessor<J> getOutputProcess();
public void setManager(serverProcessor<J> queue);
public serverProcessor<J> getManager();
public J job(J next) throws Exception;
// performes one job procedure; this loopes until terminate() is called

@ -26,7 +26,6 @@ package de.anomic.server;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import de.anomic.server.logging.serverLog;
@ -40,13 +39,12 @@ public class serverInstantBlockingThread<J extends serverProcessorJob> extends s
public static int instantThreadCounter = 0;
public static final ConcurrentHashMap<Long, String> jobs = new ConcurrentHashMap<Long, String>();
public serverInstantBlockingThread(final Object env, final String jobExec, final BlockingQueue<J> input, final serverProcessor<J> output) {
public serverInstantBlockingThread(final Object env, final String jobExec, final serverProcessor<J> manager) {
// jobExec is the name of a method of the object 'env' that executes the one-step-run
// jobCount is the name of a method that returns the size of the job
// set the blocking queues for input and output
this.setInputQueue(input);
this.setOutputProcess(output);
// set the manager of blocking queues for input and output
this.setManager(manager);
// define execution class
this.jobExecMethod = execMethod(env, jobExec);
@ -71,7 +69,7 @@ public class serverInstantBlockingThread<J extends serverProcessorJob> extends s
}
public int getJobCount() {
return this.getInputQueue().size();
return this.getManager().queueSize();
}
@SuppressWarnings("unchecked")
@ -79,7 +77,8 @@ public class serverInstantBlockingThread<J extends serverProcessorJob> extends s
// see if we got a poison pill to tell us to shut down
if (next == null) return (J) serverProcessorJob.poisonPill;
if (next == serverProcessorJob.poisonPill || next.status == serverProcessorJob.STATUS_POISON) return next;
long t = System.currentTimeMillis();
instantThreadCounter++;
//System.out.println("started job " + this.handle + ": " + this.getName());
jobs.put(this.handle, this.getName());
@ -107,6 +106,7 @@ public class serverInstantBlockingThread<J extends serverProcessorJob> extends s
}
instantThreadCounter--;
jobs.remove(this.handle);
this.getManager().increaseJobTime(System.currentTimeMillis() - t);
return out;
}

@ -25,6 +25,8 @@
package de.anomic.server;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -37,35 +39,73 @@ public class serverProcessor<J extends serverProcessorJob> {
public static final int availableCPU = Runtime.getRuntime().availableProcessors();
public static int useCPU = availableCPU;
private static final ArrayList<serverProcessor<?>> processMonitor = new ArrayList<serverProcessor<?>>();
private ExecutorService executor;
private BlockingQueue<J> input;
private serverProcessor<J> output;
private int poolsize;
private Object environment;
private String methodName;
private String processName, methodName, description;
private String[] childs;
private long blockTime, execTime, passOnTime;
private long execCount;
public serverProcessor(final Object env, final String jobExec, final int inputQueueSize, final serverProcessor<J> output) {
this(env, jobExec, inputQueueSize, output, useCPU + 1);
}
public serverProcessor(final Object env, final String jobExec, final int inputQueueSize, final serverProcessor<J> output, final int poolsize) {
public serverProcessor(
String name, String description, String[] childnames,
final Object env, final String jobExecMethod, final int inputQueueSize, final serverProcessor<J> output, final int poolsize) {
// start a fixed number of executors that handle entries in the process queue
this.environment = env;
this.methodName = jobExec;
this.processName = name;
this.description = description;
this.methodName = jobExecMethod;
this.childs = childnames;
this.input = new LinkedBlockingQueue<J>(inputQueueSize);
this.output = output;
this.poolsize = poolsize;
executor = Executors.newCachedThreadPool(new NamePrefixThreadFactory(jobExec));
this.executor = Executors.newCachedThreadPool(new NamePrefixThreadFactory(jobExecMethod));
for (int i = 0; i < poolsize; i++) {
executor.submit(new serverInstantBlockingThread<J>(env, jobExec, input, output));
this.executor.submit(new serverInstantBlockingThread<J>(env, jobExecMethod, this));
}
// init statistics
blockTime = 0;
execTime = 0;
passOnTime = 0;
execCount = 0;
// store this object for easy monitoring
processMonitor.add(this);
}
public int queueSize() {
return this.input.size();
}
public int queueSizeMax() {
return this.input.size() + this.input.remainingCapacity();
}
public int concurrency() {
return this.poolsize;
}
public J take() throws InterruptedException {
// read from the input queue
long t = System.currentTimeMillis();
J j = this.input.take();
this.blockTime += System.currentTimeMillis() - t;
return j;
}
public void passOn(J next) throws InterruptedException {
// don't mix this method up with enQueue()!
// this method enqueues into the _next_ queue, not this queue!
if (this.output == null) return;
long t = System.currentTimeMillis();
this.output.enQueue(next);
this.passOnTime += System.currentTimeMillis() - t;
}
public void clear() {
if (this.input != null) this.input.clear();
}
@ -114,9 +154,9 @@ public class serverProcessor<J extends serverProcessorJob> {
// put poison pills into the queue
for (int i = 0; i < poolsize; i++) {
try {
serverLog.logInfo("serverProcessor", "putting poison pill in queue for " + this.methodName + ", thread " + i);
serverLog.logInfo("serverProcessor", "putting poison pill in queue " + this.processName + ", thread " + i);
input.put((J) serverProcessorJob.poisonPill); // put a poison pill into the queue which will kill the job
serverLog.logInfo("serverProcessor", ".. poison pill is in queue for " + this.methodName + ", thread " + i + ". awaiting termination");
serverLog.logInfo("serverProcessor", ".. poison pill is in queue " + this.processName + ", thread " + i + ". awaiting termination");
} catch (final InterruptedException e) { }
}
}
@ -130,9 +170,73 @@ public class serverProcessor<J extends serverProcessorJob> {
} catch (final InterruptedException e) {}
executor.shutdown();
}
serverLog.logInfo("serverProcessor", "queue for " + this.methodName + ": shutdown.");
serverLog.logInfo("serverProcessor", "queue " + this.processName + ": shutdown.");
this.executor = null;
this.input = null;
// remove entry from monitor
Iterator<serverProcessor<?>> i = processes();
serverProcessor<?> p;
while (i.hasNext()) {
p = i.next();
if (p == this) {
i.remove();
break;
}
}
}
public static Iterator<serverProcessor<?>> processes() {
return processMonitor.iterator();
}
protected void increaseJobTime(long time) {
this.execTime += time;
this.execCount++;
}
public String getName() {
return this.processName;
}
public String getDescription() {
return this.description;
}
public String getChilds() {
StringBuilder s = new StringBuilder();
for (int i = 0; i < this.childs.length; i++) {
s.append(this.childs[i]);
s.append(' ');
}
return s.toString();
}
/**
* the block time is the time that a take() blocks until it gets a value
* @return
*/
public long getBlockTime() {
return blockTime;
}
/**
* the exec time is the complete time of the execution and processing of the value from take()
* @return
*/
public long getExecTime() {
return execTime;
}
public long getExecCount() {
return execCount;
}
/**
* the passOn time is the time that a put() takes to enqueue a result value to the next queue
* in case that the target queue is limited and may be full, this value may increase
* @return
*/
public long getPassOnTime() {
return passOnTime;
}
}

Loading…
Cancel
Save