added concurrency to indexing process:

- the methods {parsing, semantic analysis (condensing), structure analysis (web structure)} in the serialized indexing path had been made concurrent.
- four BlockingQueues handle concurrency and hand-over of the indexing objects, the last object in the queue is stored into a blockingQueue of maximum size 1 to serialize the process for storage (which uses IO and therefore here should not be deserialized)
- a concurrency of (CPUs + 1) is default. Single-CPU users will profil from the change because large files cannot block the indexing process any more.
- removed the secondary indexing thread, which is superfluous now. Concurrency is default for all users.

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@4609 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
orbiter 17 years ago
parent 9fb5d661f2
commit 0241d070bc

@ -586,15 +586,6 @@ filterOutStopwordsFromTopwords=true
cleanup.deletionProcessedNews = true cleanup.deletionProcessedNews = true
cleanup.deletionPublishedNews = true cleanup.deletionPublishedNews = true
# multiprocessor-settings
# you may want to run time-consuming processes on several processors
# the most time-consuming process is the indexing-Process
# We implemented an option to run several of these processes here
# setting the number of processes to Zero is not allowed
# If you have a double-processor system,
# a cluster value of '2' would be appropriate
80_indexing_cluster=1
# default memory settings for startup of yacy # default memory settings for startup of yacy
# is valid in unix/shell and windows environments but # is valid in unix/shell and windows environments but
# not for first startup of YaCy # not for first startup of YaCy

@ -64,6 +64,7 @@ import de.anomic.http.httpdProxyHandler;
import de.anomic.kelondro.kelondroBase64Order; import de.anomic.kelondro.kelondroBase64Order;
import de.anomic.plasma.plasmaParser; import de.anomic.plasma.plasmaParser;
import de.anomic.plasma.plasmaSwitchboard; import de.anomic.plasma.plasmaSwitchboard;
import de.anomic.server.serverBusyThread;
import de.anomic.server.serverCodings; import de.anomic.server.serverCodings;
import de.anomic.server.serverCore; import de.anomic.server.serverCore;
import de.anomic.server.serverDate; import de.anomic.server.serverDate;
@ -256,8 +257,8 @@ public class SettingsAck_p {
httpd.initPortForwarding(); httpd.initPortForwarding();
// notifying publishSeed Thread // notifying publishSeed Thread
//serverThread peerPing = env.getThread("30_peerping"); serverBusyThread peerPing = env.getThread("30_peerping");
//peerPing.notifyThread(); peerPing.notifyThread();
} catch (Exception e) { } catch (Exception e) {
prop.put("info", "23"); prop.put("info", "23");
prop.putHTML("info_errormsg",(e.getMessage() == null) ? "unknown" : e.getMessage().replaceAll("\n","<br>")); prop.putHTML("info_errormsg",(e.getMessage() == null) ? "unknown" : e.getMessage().replaceAll("\n","<br>"));

@ -202,7 +202,7 @@ public class plasmaCrawlZURL {
newrow.setCol(4, this.anycause.getBytes()); newrow.setCol(4, this.anycause.getBytes());
newrow.setCol(5, this.bentry.toRow().bytes()); newrow.setCol(5, this.bentry.toRow().bytes());
try { try {
urlIndex.put(newrow); if (urlIndex != null) urlIndex.put(newrow);
this.stored = true; this.stored = true;
} catch (IOException e) { } catch (IOException e) {
System.out.println("INTERNAL ERROR AT plasmaEURL:url2hash:" + e.toString()); System.out.println("INTERNAL ERROR AT plasmaEURL:url2hash:" + e.toString());

@ -107,6 +107,8 @@ import java.util.Timer;
import java.util.TimerTask; import java.util.TimerTask;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import de.anomic.data.URLLicense; import de.anomic.data.URLLicense;
import de.anomic.data.blogBoard; import de.anomic.data.blogBoard;
@ -143,6 +145,8 @@ import de.anomic.server.serverFileUtils;
import de.anomic.server.serverInstantBusyThread; import de.anomic.server.serverInstantBusyThread;
import de.anomic.server.serverMemory; import de.anomic.server.serverMemory;
import de.anomic.server.serverObjects; import de.anomic.server.serverObjects;
import de.anomic.server.serverProcessor;
import de.anomic.server.serverProcessorJob;
import de.anomic.server.serverProfiling; import de.anomic.server.serverProfiling;
import de.anomic.server.serverSemaphore; import de.anomic.server.serverSemaphore;
import de.anomic.server.serverSwitch; import de.anomic.server.serverSwitch;
@ -238,6 +242,15 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<plasmaSwitchbo
public URLLicense licensedURLs; public URLLicense licensedURLs;
public Timer moreMemory; public Timer moreMemory;
public serverProcessor<indexingQueueEntry, indexingQueueEntry> indexingDocumentProcessor;
public serverProcessor<indexingQueueEntry, indexingQueueEntry> indexingCondensementProcessor;
public serverProcessor<indexingQueueEntry, indexingQueueEntry> indexingAnalysisProcessor;
public serverProcessor<indexingQueueEntry, indexingQueueEntry> indexingStorageProcessor;
public LinkedBlockingQueue<indexingQueueEntry> indexingDocumentQueue;
public LinkedBlockingQueue<indexingQueueEntry> indexingCondensementQueue;
public LinkedBlockingQueue<indexingQueueEntry> indexingAnalysisQueue;
public ArrayBlockingQueue<indexingQueueEntry> indexingStorageQueue;
/* /*
* Remote Proxy configuration * Remote Proxy configuration
*/ */
@ -386,7 +399,6 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<plasmaSwitchbo
* <p>Name of the indexer thread, performing the actual indexing of a website</p> * <p>Name of the indexer thread, performing the actual indexing of a website</p>
*/ */
public static final String INDEXER = "80_indexing"; public static final String INDEXER = "80_indexing";
public static final String INDEXER_CLUSTER = "80_indexing_cluster";
public static final String INDEXER_MEMPREREQ = "80_indexing_memprereq"; public static final String INDEXER_MEMPREREQ = "80_indexing_memprereq";
public static final String INDEXER_IDLESLEEP = "80_indexing_idlesleep"; public static final String INDEXER_IDLESLEEP = "80_indexing_idlesleep";
public static final String INDEXER_BUSYSLEEP = "80_indexing_busysleep"; public static final String INDEXER_BUSYSLEEP = "80_indexing_busysleep";
@ -1282,36 +1294,30 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<plasmaSwitchbo
try {Thread.sleep(1000);} catch (InterruptedException e) {} try {Thread.sleep(1000);} catch (InterruptedException e) {}
this.clusterhashes = yacyCore.seedDB.clusterHashes(getConfig("cluster.peers.yacydomain", "")); this.clusterhashes = yacyCore.seedDB.clusterHashes(getConfig("cluster.peers.yacydomain", ""));
// deploy threads // deploy blocking threads
indexingDocumentQueue = new LinkedBlockingQueue<indexingQueueEntry>();
indexingCondensementQueue = new LinkedBlockingQueue<indexingQueueEntry>();
indexingAnalysisQueue = new LinkedBlockingQueue<indexingQueueEntry>();
indexingStorageQueue = new ArrayBlockingQueue<indexingQueueEntry>(1);
indexingDocumentProcessor = new serverProcessor<indexingQueueEntry, indexingQueueEntry>(this, "parseDocument", indexingDocumentQueue, indexingCondensementQueue);
indexingCondensementProcessor = new serverProcessor<indexingQueueEntry, indexingQueueEntry>(this, "condenseDocument", indexingCondensementQueue, indexingAnalysisQueue);
indexingAnalysisProcessor = new serverProcessor<indexingQueueEntry, indexingQueueEntry>(this, "webStructureAnalysis", indexingAnalysisQueue, indexingStorageQueue);
indexingStorageProcessor = new serverProcessor<indexingQueueEntry, indexingQueueEntry>(this, "storeDocumentIndex", indexingStorageQueue, null);
// deploy busy threads
log.logConfig("Starting Threads"); log.logConfig("Starting Threads");
serverMemory.gc(1000, "plasmaSwitchboard, help for profiler"); // help for profiler - thq serverMemory.gc(1000, "plasmaSwitchboard, help for profiler"); // help for profiler - thq
moreMemory = new Timer(); // init GC Thread - thq moreMemory = new Timer(); // init GC Thread - thq
moreMemory.schedule(new MoreMemory(), 300000, 600000); moreMemory.schedule(new MoreMemory(), 300000, 600000);
int indexing_cluster = Integer.parseInt(getConfig(INDEXER_CLUSTER, "1"));
if (indexing_cluster < 1) indexing_cluster = 1;
deployThread(CLEANUP, "Cleanup", "simple cleaning process for monitoring information", null, deployThread(CLEANUP, "Cleanup", "simple cleaning process for monitoring information", null,
new serverInstantBusyThread(this, CLEANUP_METHOD_START, CLEANUP_METHOD_JOBCOUNT, CLEANUP_METHOD_FREEMEM), 10000); // all 5 Minutes new serverInstantBusyThread(this, CLEANUP_METHOD_START, CLEANUP_METHOD_JOBCOUNT, CLEANUP_METHOD_FREEMEM), 10000); // all 5 Minutes
deployThread(CRAWLSTACK, "Crawl URL Stacker", "process that checks url for double-occurrences and for allowance/disallowance by robots.txt", null, deployThread(CRAWLSTACK, "Crawl URL Stacker", "process that checks url for double-occurrences and for allowance/disallowance by robots.txt", null,
new serverInstantBusyThread(crawlStacker, CRAWLSTACK_METHOD_START, CRAWLSTACK_METHOD_JOBCOUNT, CRAWLSTACK_METHOD_FREEMEM), 8000); new serverInstantBusyThread(crawlStacker, CRAWLSTACK_METHOD_START, CRAWLSTACK_METHOD_JOBCOUNT, CRAWLSTACK_METHOD_FREEMEM), 8000);
deployThread(INDEXER, "Indexing", "thread that either initiates a parsing/indexing queue, distributes the index into the DHT, stores parsed documents or flushes the index cache", "/IndexCreateIndexingQueue_p.html",
//deployThread(PARSER, "Parsing", "thread that feeds a concurrent document parsing queue", "/IndexCreateIndexingQueue_p.html",
//new serverInstantThread(this, PARSER_METHOD_START, PARSER_METHOD_JOBCOUNT, PARSER_METHOD_FREEMEM), 10000);
deployThread(INDEXER, "Indexing", "thread that either distributes the index into the DHT, stores parsed documents or flushes the index cache", "/IndexCreateIndexingQueue_p.html",
new serverInstantBusyThread(this, INDEXER_METHOD_START, INDEXER_METHOD_JOBCOUNT, INDEXER_METHOD_FREEMEM), 10000); new serverInstantBusyThread(this, INDEXER_METHOD_START, INDEXER_METHOD_JOBCOUNT, INDEXER_METHOD_FREEMEM), 10000);
for (i = 1; i < indexing_cluster; i++) {
setConfig((i + 80) + "_indexing_idlesleep", getConfig(INDEXER_IDLESLEEP, ""));
setConfig((i + 80) + "_indexing_busysleep", getConfig(INDEXER_BUSYSLEEP, ""));
deployThread((i + 80) + "_indexing", "Parsing/Indexing (cluster job)", "thread that performes document parsing and indexing", null,
new serverInstantBusyThread(this, INDEXER_METHOD_START, INDEXER_METHOD_JOBCOUNT, INDEXER_METHOD_FREEMEM), 10000 + (i * 1000),
Long.parseLong(getConfig(INDEXER_IDLESLEEP , "5000")),
Long.parseLong(getConfig(INDEXER_BUSYSLEEP , "0")),
Long.parseLong(getConfig(INDEXER_MEMPREREQ , "1000000")));
}
deployThread(PROXY_CACHE_ENQUEUE, "Proxy Cache Enqueue", "job takes new input files from RAM stack, stores them, and hands over to the Indexing Stack", null, deployThread(PROXY_CACHE_ENQUEUE, "Proxy Cache Enqueue", "job takes new input files from RAM stack, stores them, and hands over to the Indexing Stack", null,
new serverInstantBusyThread(this, PROXY_CACHE_ENQUEUE_METHOD_START, PROXY_CACHE_ENQUEUE_METHOD_JOBCOUNT, PROXY_CACHE_ENQUEUE_METHOD_FREEMEM), 10000); new serverInstantBusyThread(this, PROXY_CACHE_ENQUEUE_METHOD_START, PROXY_CACHE_ENQUEUE_METHOD_JOBCOUNT, PROXY_CACHE_ENQUEUE_METHOD_FREEMEM), 10000);
deployThread(CRAWLJOB_REMOTE_TRIGGERED_CRAWL, "Remote Crawl Job", "thread that performes a single crawl/indexing step triggered by a remote peer", null, deployThread(CRAWLJOB_REMOTE_TRIGGERED_CRAWL, "Remote Crawl Job", "thread that performes a single crawl/indexing step triggered by a remote peer", null,
@ -1743,9 +1749,13 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<plasmaSwitchbo
parser.close(); parser.close();
plasmaHTCache.close(); plasmaHTCache.close();
sbQueue.close(); sbQueue.close();
indexingDocumentProcessor.shutdown(1000);
indexingCondensementProcessor.shutdown(1000);
indexingAnalysisProcessor.shutdown(1000);
webStructure.flushCitationReference("crg"); webStructure.flushCitationReference("crg");
webStructure.close(); webStructure.close();
log.logConfig("SWITCHBOARD SHUTDOWN STEP 3: sending termination signal to database manager (stand by...)"); log.logConfig("SWITCHBOARD SHUTDOWN STEP 3: sending termination signal to database manager (stand by...)");
indexingStorageProcessor.shutdown(1000);
wordIndex.close(); wordIndex.close();
yc.close(); yc.close();
log.logConfig("SWITCHBOARD SHUTDOWN TERMINATED"); log.logConfig("SWITCHBOARD SHUTDOWN TERMINATED");
@ -1873,57 +1883,28 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<plasmaSwitchbo
} }
sbQueue.enQueueToActive(queueEntry); sbQueue.enQueueToActive(queueEntry);
// check for interruption
checkInterruption();
this.indexingDocumentQueue.put(new indexingQueueEntry(queueEntry, null, null));
/*
// THE FOLLOWING CAN BE CONCURRENT -> // THE FOLLOWING CAN BE CONCURRENT ->
// parse and index the resource // parse and index the resource
queueEntry.updateStatus(plasmaSwitchboardQueue.QUEUE_STATE_PARSING_WAITING); indexingQueueEntry document = parseDocument(new indexingQueueEntry(queueEntry, null, null));
queueEntry.updateStatus(plasmaSwitchboardQueue.QUEUE_STATE_PARSING_RUNNING);
plasmaParserDocument document = parseDocument(queueEntry);
if (document == null) {
if (!queueEntry.profile().storeHTCache()) {
plasmaHTCache.filesInUse.remove(queueEntry.cacheFile());
//plasmaHTCache.deleteURLfromCache(entry.url());
}
queueEntry.close();
return true;
}
queueEntry.updateStatus(plasmaSwitchboardQueue.QUEUE_STATE_PARSING_COMPLETE);
// do condensing // do condensing
queueEntry.updateStatus(plasmaSwitchboardQueue.QUEUE_STATE_CONDENSING_WAITING); indexingQueueEntry condensement = condenseDocument(document);
queueEntry.updateStatus(plasmaSwitchboardQueue.QUEUE_STATE_CONDENSING_RUNNING);
plasmaCondenser condensement = condenseDocument(queueEntry, document);
if (condensement == null) {
if (!queueEntry.profile().storeHTCache()) {
plasmaHTCache.filesInUse.remove(queueEntry.cacheFile());
//plasmaHTCache.deleteURLfromCache(entry.url());
}
queueEntry.close();
return true;
}
queueEntry.updateStatus(plasmaSwitchboardQueue.QUEUE_STATE_CONDENSING_COMPLETE);
// do a web structure analysis // do a web structure analysis
queueEntry.updateStatus(plasmaSwitchboardQueue.QUEUE_STATE_STRUCTUREANALYSIS_WAITING); indexingQueueEntry analysis = webStructureAnalysis(condensement);
queueEntry.updateStatus(plasmaSwitchboardQueue.QUEUE_STATE_STRUCTUREANALYSIS_RUNNING);
document.notifyWebStructure(webStructure, condensement, queueEntry.getModificationDate());
queueEntry.updateStatus(plasmaSwitchboardQueue.QUEUE_STATE_STRUCTUREANALYSIS_COMPLETE);
// <- CONCURRENT UNTIL HERE, THEN SERIALIZE AGAIN // <- CONCURRENT UNTIL HERE, THEN SERIALIZE AGAIN
// store the result // store the result
queueEntry.updateStatus(plasmaSwitchboardQueue.QUEUE_STATE_INDEXSTORAGE_WAITING); storeDocumentIndex(analysis);
queueEntry.updateStatus(plasmaSwitchboardQueue.QUEUE_STATE_INDEXSTORAGE_RUNNING); */
storeDocumentIndex(queueEntry, document, condensement);
queueEntry.updateStatus(plasmaSwitchboardQueue.QUEUE_STATE_INDEXSTORAGE_COMPLETE);
// finally close the queue process
if (!queueEntry.profile().storeHTCache()) {
plasmaHTCache.filesInUse.remove(queueEntry.cacheFile());
//plasmaHTCache.deleteURLfromCache(entry.url());
}
queueEntry.close();
return true; return true;
} catch (InterruptedException e) { } catch (InterruptedException e) {
log.logInfo("DEQUEUE: Shutdown detected."); log.logInfo("DEQUEUE: Shutdown detected.");
@ -1931,6 +1912,20 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<plasmaSwitchbo
} }
} }
public static class indexingQueueEntry implements serverProcessorJob {
public plasmaSwitchboardQueue.QueueEntry queueEntry;
public plasmaParserDocument document;
public plasmaCondenser condenser;
public indexingQueueEntry(
plasmaSwitchboardQueue.QueueEntry queueEntry,
plasmaParserDocument document,
plasmaCondenser condenser) {
this.queueEntry = queueEntry;
this.document = document;
this.condenser = condenser;
}
}
public int cleanupJobSize() { public int cleanupJobSize() {
int c = 0; int c = 0;
if ((crawlQueues.delegatedURL.stackSize() > 1000)) c++; if ((crawlQueues.delegatedURL.stackSize() > 1000)) c++;
@ -2134,6 +2129,25 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<plasmaSwitchbo
} }
} }
public indexingQueueEntry parseDocument(indexingQueueEntry in) {
in.queueEntry.updateStatus(plasmaSwitchboardQueue.QUEUE_STATE_PARSING);
plasmaParserDocument document = null;
try {
document = parseDocument(in.queueEntry);
} catch (InterruptedException e) {
document = null;
}
if (document == null) {
if (!in.queueEntry.profile().storeHTCache()) {
plasmaHTCache.filesInUse.remove(in.queueEntry.cacheFile());
//plasmaHTCache.deleteURLfromCache(entry.url());
}
in.queueEntry.close();
return null;
}
return new indexingQueueEntry(in.queueEntry, document, null);
}
private plasmaParserDocument parseDocument(plasmaSwitchboardQueue.QueueEntry entry) throws InterruptedException { private plasmaParserDocument parseDocument(plasmaSwitchboardQueue.QueueEntry entry) throws InterruptedException {
plasmaParserDocument document = null; plasmaParserDocument document = null;
int processCase = entry.processCase(); int processCase = entry.processCase();
@ -2198,6 +2212,25 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<plasmaSwitchbo
return document; return document;
} }
public indexingQueueEntry condenseDocument(indexingQueueEntry in) {
in.queueEntry.updateStatus(plasmaSwitchboardQueue.QUEUE_STATE_CONDENSING);
plasmaCondenser condenser = null;
try {
condenser = condenseDocument(in.queueEntry, in.document);
} catch (InterruptedException e) {
condenser = null;
}
if (condenser == null) {
if (!in.queueEntry.profile().storeHTCache()) {
plasmaHTCache.filesInUse.remove(in.queueEntry.cacheFile());
//plasmaHTCache.deleteURLfromCache(entry.url());
}
in.queueEntry.close();
return null;
}
return new indexingQueueEntry(in.queueEntry, in.document, condenser);
}
private plasmaCondenser condenseDocument(plasmaSwitchboardQueue.QueueEntry entry, plasmaParserDocument document) throws InterruptedException { private plasmaCondenser condenseDocument(plasmaSwitchboardQueue.QueueEntry entry, plasmaParserDocument document) throws InterruptedException {
// CREATE INDEX // CREATE INDEX
String dc_title = document.dc_title(); String dc_title = document.dc_title();
@ -2242,6 +2275,23 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<plasmaSwitchbo
return condenser; return condenser;
} }
public indexingQueueEntry webStructureAnalysis(indexingQueueEntry in) {
in.queueEntry.updateStatus(plasmaSwitchboardQueue.QUEUE_STATE_STRUCTUREANALYSIS);
in.document.notifyWebStructure(webStructure, in.condenser, in.queueEntry.getModificationDate());
return in;
}
public void storeDocumentIndex(indexingQueueEntry in) {
in.queueEntry.updateStatus(plasmaSwitchboardQueue.QUEUE_STATE_INDEXSTORAGE);
storeDocumentIndex(in.queueEntry, in.document, in.condenser);
if (!in.queueEntry.profile().storeHTCache()) {
plasmaHTCache.filesInUse.remove(in.queueEntry.cacheFile());
//plasmaHTCache.deleteURLfromCache(entry.url());
}
in.queueEntry.updateStatus(plasmaSwitchboardQueue.QUEUE_STATE_FINISHED);
in.queueEntry.close();
}
private void storeDocumentIndex(plasmaSwitchboardQueue.QueueEntry entry, plasmaParserDocument document, plasmaCondenser condenser) { private void storeDocumentIndex(plasmaSwitchboardQueue.QueueEntry entry, plasmaParserDocument document, plasmaCondenser condenser) {
// CREATE INDEX // CREATE INDEX

@ -214,18 +214,11 @@ public class plasmaSwitchboardQueue {
} }
public static final int QUEUE_STATE_FRESH = 0; public static final int QUEUE_STATE_FRESH = 0;
public static final int QUEUE_STATE_PARSING_WAITING = 1; public static final int QUEUE_STATE_PARSING = 1;
public static final int QUEUE_STATE_PARSING_RUNNING = 2; public static final int QUEUE_STATE_CONDENSING = 2;
public static final int QUEUE_STATE_PARSING_COMPLETE = 3; public static final int QUEUE_STATE_STRUCTUREANALYSIS = 3;
public static final int QUEUE_STATE_CONDENSING_WAITING = 4; public static final int QUEUE_STATE_INDEXSTORAGE = 4;
public static final int QUEUE_STATE_CONDENSING_RUNNING = 5; public static final int QUEUE_STATE_FINISHED = 5;
public static final int QUEUE_STATE_CONDENSING_COMPLETE = 6;
public static final int QUEUE_STATE_STRUCTUREANALYSIS_WAITING = 7;
public static final int QUEUE_STATE_STRUCTUREANALYSIS_RUNNING = 8;
public static final int QUEUE_STATE_STRUCTUREANALYSIS_COMPLETE = 9;
public static final int QUEUE_STATE_INDEXSTORAGE_WAITING = 10;
public static final int QUEUE_STATE_INDEXSTORAGE_RUNNING = 11;
public static final int QUEUE_STATE_INDEXSTORAGE_COMPLETE = 12;
public class QueueEntry { public class QueueEntry {
yacyURL url; // plasmaURL.urlStringLength yacyURL url; // plasmaURL.urlStringLength

@ -28,7 +28,7 @@ import java.util.concurrent.BlockingQueue;
import de.anomic.server.logging.serverLog; import de.anomic.server.logging.serverLog;
public abstract class serverAbstractBlockingThread<I, O> extends serverAbstractThread implements serverBlockingThread<I, O> { public abstract class serverAbstractBlockingThread<I extends serverProcessorJob, O extends serverProcessorJob> extends serverAbstractThread implements serverBlockingThread<I, O> {
private BlockingQueue<I> input = null; private BlockingQueue<I> input = null;
private BlockingQueue<O> output = null; private BlockingQueue<O> output = null;
@ -60,8 +60,16 @@ public abstract class serverAbstractBlockingThread<I, O> extends serverAbstractT
// do job // do job
timestamp = System.currentTimeMillis(); timestamp = System.currentTimeMillis();
memstamp0 = serverMemory.used(); memstamp0 = serverMemory.used();
O out = this.job(this.input.take()); I in = this.input.take();
if (out != null) this.output.add(out); if ((in == null) || (in == serverProcessor.shutdownJob)) {
// the poison pill: shutdown
// a null element is pushed to the queue on purpose to signal
// that a termination should be made
this.running = false;
break;
}
O out = this.job(in);
if ((out != null) && (this.output != null)) this.output.put(out);
// do memory and busy/idle-count/time monitoring // do memory and busy/idle-count/time monitoring
memstamp1 = serverMemory.used(); memstamp1 = serverMemory.used();
if (memstamp1 >= memstamp0) { if (memstamp1 >= memstamp0) {

@ -32,6 +32,7 @@ public abstract class serverAbstractBusyThread extends serverAbstractThread impl
private long idletime = 0, memprereq = 0; private long idletime = 0, memprereq = 0;
private long idleCycles = 0, busyCycles = 0, outofmemoryCycles = 0; private long idleCycles = 0, busyCycles = 0, outofmemoryCycles = 0;
private boolean intermissionObedient = true; private boolean intermissionObedient = true;
private Object syncObject = new Object();
protected final void announceMoreSleepTime(long millis) { protected final void announceMoreSleepTime(long millis) {
this.idletime += millis; this.idletime += millis;
@ -183,10 +184,29 @@ public abstract class serverAbstractBusyThread extends serverAbstractThread impl
} }
private void ratz(long millis) { private void ratz(long millis) {
try { try {/*
if (this.syncObject != null) {
synchronized (this.syncObject) {
this.syncObject.wait(millis);
}
} else {*/
Thread.sleep(millis); Thread.sleep(millis);
//}
} catch (InterruptedException e) { } catch (InterruptedException e) {
if (this.log != null) this.log.logConfig("thread '" + this.getName() + "' interrupted because of shutdown."); if (this.log != null)
this.log.logConfig("thread '" + this.getName() + "' interrupted because of shutdown.");
}
}
public void notifyThread() {
if (this.syncObject != null) {
synchronized (this.syncObject) {
if (this.log != null)
this.log.logFine("thread '" + this.getName()
+ "' has received a notification from thread '"
+ Thread.currentThread().getName() + "'.");
this.syncObject.notifyAll();
}
} }
} }

@ -67,4 +67,5 @@ public interface serverBusyThread extends serverThread {
// is called when an outOfMemoryCycle is performed // is called when an outOfMemoryCycle is performed
// this method should try to free some memory, so that the job can be executed // this method should try to free some memory, so that the job can be executed
public void notifyThread();
} }

@ -26,21 +26,21 @@ package de.anomic.server;
import java.lang.reflect.InvocationTargetException; import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import de.anomic.server.logging.serverLog; import de.anomic.server.logging.serverLog;
public class serverInstantBlockingThread<I, O> extends serverAbstractBlockingThread<I, O> implements serverBlockingThread<I, O> { public class serverInstantBlockingThread<I extends serverProcessorJob, O extends serverProcessorJob> extends serverAbstractBlockingThread<I, O> implements serverBlockingThread<I, O> {
private Method jobExecMethod, jobCountMethod; private Method jobExecMethod;
private Object environment; private Object environment;
private Long handle; private Long handle;
private static int handleCounter = 0;
public static int instantThreadCounter = 0; public static int instantThreadCounter = 0;
public static TreeMap<Long, String> jobs = new TreeMap<Long, String>(); public static ConcurrentHashMap<Long, String> jobs = new ConcurrentHashMap<Long, String>();
public serverInstantBlockingThread(Object env, String jobExec, String jobCount, BlockingQueue<I> input, BlockingQueue<O> output) { public serverInstantBlockingThread(Object env, String jobExec, BlockingQueue<I> input, BlockingQueue<O> output) {
// jobExec is the name of a method of the object 'env' that executes the one-step-run // jobExec is the name of a method of the object 'env' that executes the one-step-run
// jobCount is the name of a method that returns the size of the job // jobCount is the name of a method that returns the size of the job
@ -51,50 +51,37 @@ public class serverInstantBlockingThread<I, O> extends serverAbstractBlockingThr
// define execution class // define execution class
Class<?> theClass = (env instanceof Class) ? (Class<?>) env : env.getClass(); Class<?> theClass = (env instanceof Class) ? (Class<?>) env : env.getClass();
try { try {
this.jobExecMethod = theClass.getMethod(jobExec, new Class[0]); this.jobExecMethod = null;
Method[] methods = theClass.getMethods();
for (int i = 0; i < methods.length; i++) {
if ((methods[i].getParameterTypes().length == 1) && (methods[i].getName().equals(jobExec))) {
this.jobExecMethod = methods[i];
break;
}
}
if (this.jobExecMethod == null) throw new NoSuchMethodException(jobExec + " does not exist in " + env.getClass().getName());
} catch (NoSuchMethodException e) { } catch (NoSuchMethodException e) {
throw new RuntimeException("serverInstantThread, wrong declaration of jobExec: " + e.getMessage()); throw new RuntimeException("serverInstantThread, wrong declaration of jobExec: " + e.getMessage());
} }
try {
if (jobCount == null)
this.jobCountMethod = null;
else
this.jobCountMethod = theClass.getMethod(jobCount, new Class[0]);
} catch (NoSuchMethodException e) {
throw new RuntimeException("serverInstantThread, wrong declaration of jobCount: " + e.getMessage());
}
this.environment = (env instanceof Class) ? null : env; this.environment = (env instanceof Class) ? null : env;
this.setName(theClass.getName() + "." + jobExec); this.setName(theClass.getName() + "." + jobExec + "." + handleCounter++);
this.handle = new Long(System.currentTimeMillis() + this.getName().hashCode()); this.handle = new Long(System.currentTimeMillis() + this.getName().hashCode());
} }
public int getJobCount() { public int getJobCount() {
if (this.jobCountMethod == null) return Integer.MAX_VALUE; return this.getInputQueue().size();
try {
Object result = jobCountMethod.invoke(environment, new Object[0]);
if (result instanceof Integer)
return ((Integer) result).intValue();
else
return -1;
} catch (IllegalAccessException e) {
return -1;
} catch (IllegalArgumentException e) {
return -1;
} catch (InvocationTargetException e) {
serverLog.logSevere("BLOCKINGTHREAD", "invocation serverInstantThread of thread '" + this.getName() + "': " + e.getMessage(), e);
return -1;
}
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public O job(I next) throws Exception { public O job(I next) throws Exception {
if (next == null) return null; // poison pill: shutdown
instantThreadCounter++; instantThreadCounter++;
//System.out.println("started job " + this.handle + ": " + this.getName()); //System.out.println("started job " + this.handle + ": " + this.getName());
synchronized(jobs) {jobs.put(this.handle, this.getName());} jobs.put(this.handle, this.getName());
O out = null; O out = null;
try { try {
out = (O) jobExecMethod.invoke(environment, new Object[0]); out = (O) jobExecMethod.invoke(environment, new Object[]{next});
} catch (IllegalAccessException e) { } catch (IllegalAccessException e) {
serverLog.logSevere("BLOCKINGTHREAD", "Internal Error in serverInstantThread.job: " + e.getMessage()); serverLog.logSevere("BLOCKINGTHREAD", "Internal Error in serverInstantThread.job: " + e.getMessage());
serverLog.logSevere("BLOCKINGTHREAD", "shutting down thread '" + this.getName() + "'"); serverLog.logSevere("BLOCKINGTHREAD", "shutting down thread '" + this.getName() + "'");
@ -115,7 +102,7 @@ public class serverInstantBlockingThread<I, O> extends serverAbstractBlockingThr
e.printStackTrace(); e.printStackTrace();
} }
instantThreadCounter--; instantThreadCounter--;
synchronized(jobs) {jobs.remove(this.handle);} jobs.remove(this.handle);
return out; return out;
} }

@ -24,9 +24,58 @@
package de.anomic.server; package de.anomic.server;
public class serverProcessor { import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class serverProcessor<I extends serverProcessorJob, O extends serverProcessorJob> {
public static final int availableCPU = Runtime.getRuntime().availableProcessors(); public static final int availableCPU = Runtime.getRuntime().availableProcessors();
public static int useCPU = availableCPU; public static int useCPU = availableCPU;
ExecutorService executor;
BlockingQueue<I> input;
BlockingQueue<O> output;
int poolsize;
public serverProcessor(Object env, String jobExec, BlockingQueue<I> input, BlockingQueue<O> output) {
this(env, jobExec, input, output, useCPU + 1);
}
public serverProcessor(Object env, String jobExec, BlockingQueue<I> input, BlockingQueue<O> output, int poolsize) {
// start a fixed number of executors that handle entries in the process queue
this.input = input;
this.output = output;
this.poolsize = poolsize;
executor = Executors.newCachedThreadPool();
for (int i = 0; i < poolsize; i++) {
executor.submit(new serverInstantBlockingThread<I, O>(env, jobExec, input, output));
}
}
@SuppressWarnings("unchecked")
public void shutdown(long millisTimeout) {
if (executor == null) return;
if (executor.isShutdown()) return;
// put poison pills into the queue
for (int i = 0; i < poolsize; i++) {
try {
input.put((I) shutdownJob);
} catch (InterruptedException e) { }
}
// wait for shutdown
try {
executor.awaitTermination(millisTimeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {}
executor.shutdown();
executor = null;
}
public static class SpecialJob implements serverProcessorJob {
int type = 0;
}
public static final serverProcessorJob shutdownJob = new SpecialJob();
} }

@ -0,0 +1,5 @@
package de.anomic.server;
public interface serverProcessorJob {
}
Loading…
Cancel
Save