refactoring of WorkflowProcessor, added process counter, update of

process counter if an blocking thread dies. Added also a new column in
PerformanceConcurrency_p servlet to show the actual number of concurrent
processes.
pull/1/head
Michael Peter Christen 12 years ago
parent 4058369288
commit 44e363f37f

@ -17,7 +17,8 @@
<td>Thread</td>
<td>Queue Size<br />Current</td>
<td>Queue Size<br />Maximum</td>
<td>Concurrency:<br />Number of Threads</td>
<td>Executors:<br />Current Number of Threads</td>
<td>Concurrency:<br />Maximum Number of Threads</td>
<td>Childs</td>
<td colspan="2">Average<br />Block Time<br />Reading</td>
<td colspan="2">Average<br />Exec Time</td>
@ -30,6 +31,7 @@
<td align="left">#[threadname]#</td>
<td align="right">#[queuesize]#</td>
<td align="right">#[queuesizemax]#</td>
<td align="right">#[executors]#</td>
<td align="right">#[concurrency]#</td>
<td align="right">#[childs]#</td>
<td align="right">#[blockreadtime]#&nbsp;ms</td>

@ -59,9 +59,10 @@ public class PerformanceConcurrency_p {
// set values to templates
prop.put("table_" + c + "_threadname", p.getName());
prop.putHTML("table_" + c + "_longdescr", p.getDescription());
prop.put("table_" + c + "_queuesize", p.queueSize());
prop.put("table_" + c + "_queuesizemax", p.queueSizeMax());
prop.put("table_" + c + "_concurrency", p.concurrency());
prop.put("table_" + c + "_queuesize", p.getQueueSize());
prop.put("table_" + c + "_queuesizemax", p.getMaxQueueSize());
prop.put("table_" + c + "_concurrency", p.getMaxConcurrency());
prop.put("table_" + c + "_executors", p.getExecutors());
prop.putHTML("table_" + c + "_childs", p.getChilds());
blocktime = p.getBlockTime();
@ -78,8 +79,8 @@ public class PerformanceConcurrency_p {
// set a color for the line to show problems
boolean problem = false;
boolean warning = false;
if (p.queueSize() == p.queueSizeMax()) problem = true;
if (p.queueSize() > p.queueSizeMax() * 8 / 10) warning = true;
if (p.getQueueSize() == p.getMaxQueueSize()) problem = true;
if (p.getQueueSize() > p.getMaxQueueSize() * 8 / 10) warning = true;
if (100 * blocktime / blocktime_total > 80) warning = true;
if (100 * exectime / exectime_total > 80) warning = true;
if (100 * passontime / passontime_total > 80) warning = true;

@ -104,7 +104,7 @@ public final class CrawlStacker {
public int size() {
return this.requestQueue.queueSize();
return this.requestQueue.getQueueSize();
}
public boolean isEmpty() {
if (!this.requestQueue.queueIsEmpty()) return false;

@ -104,6 +104,7 @@ public abstract class AbstractBlockingThread<J extends WorkflowJob> extends Abst
busyCycles++;
}
}
this.manager.decExecutors();
this.close();
logSystem("thread '" + this.getName() + "' terminated.");
}

@ -75,7 +75,7 @@ public class InstantBlockingThread<J extends WorkflowJob> extends AbstractBlocki
@Override
public int getJobCount() {
return getManager().queueSize();
return getManager().getQueueSize();
}
@Override

@ -93,7 +93,7 @@ public class WorkflowProcessor<J extends WorkflowJob> {
return this.methodName;
}
public int queueSize() {
public int getQueueSize() {
if (this.input == null) return 0;
return this.input.size();
}
@ -102,15 +102,26 @@ public class WorkflowProcessor<J extends WorkflowJob> {
return this.input == null || this.input.isEmpty();
}
public int queueSizeMax() {
public int getMaxQueueSize() {
if (this.input == null) return 0;
return this.input.size() + this.input.remainingCapacity();
}
public int concurrency() {
public int getMaxConcurrency() {
return this.maxpoolsize;
}
public int getExecutors() {
return this.executorRunning.get();
}
/**
* the decExecutors method may only be called within the AbstractBlockingThread while loop!!
*/
public void decExecutors() {
this.executorRunning.decrementAndGet();
}
public J take() throws InterruptedException {
// read from the input queue
if (this.input == null) {

@ -130,7 +130,7 @@ public class Dispatcher {
}
public int transmissionSize() {
return (this.indexingTransmissionProcessor == null) ? 0 : this.indexingTransmissionProcessor.queueSize();
return (this.indexingTransmissionProcessor == null) ? 0 : this.indexingTransmissionProcessor.getQueueSize();
}
/**
@ -374,7 +374,7 @@ public class Dispatcher {
*/
public boolean dequeueContainer() {
if (this.transmissionCloud == null) return false;
if (this.indexingTransmissionProcessor.queueSize() > this.indexingTransmissionProcessor.concurrency()) return false;
if (this.indexingTransmissionProcessor.getQueueSize() > this.indexingTransmissionProcessor.getMaxConcurrency()) return false;
ByteArray maxtarget = null;
int maxsize = -1;
for (final Map.Entry<ByteArray, Transmission.Chunk> chunk: this.transmissionCloud.entrySet()) {

@ -1132,10 +1132,10 @@ public final class Switchboard extends serverSwitch {
}
public int getIndexingProcessorsQueueSize() {
return this.indexingDocumentProcessor.queueSize()
+ this.indexingCondensementProcessor.queueSize()
+ this.indexingAnalysisProcessor.queueSize()
+ this.indexingStorageProcessor.queueSize();
return this.indexingDocumentProcessor.getQueueSize()
+ this.indexingCondensementProcessor.getQueueSize()
+ this.indexingAnalysisProcessor.getQueueSize()
+ this.indexingStorageProcessor.getQueueSize();
}
public void overwriteNetworkDefinition() throws FileNotFoundException, IOException {

Loading…
Cancel
Save