From 0241d070bc757b980f265a4463ed520c77ba8d50 Mon Sep 17 00:00:00 2001
From: orbiter
Date: Fri, 28 Mar 2008 11:56:28 +0000
Subject: [PATCH] added concurrency to indexing process: - the methods
{parsing, semantic analysis (condensing), structure analysis (web structure)}
in the serialized indexing path had been made concurrent. - four
BlockingQueues handle concurrency and hand-over of the indexing objects, the
last object in the queue is stored into a blockingQueue of maximum size 1 to
serialize the process for storage (which uses IO and therefore here should
not be deserialized) - a concurrency of (CPUs + 1) is default. Single-CPU
users will profil from the change because large files cannot block the
indexing process any more. - removed the secondary indexing thread, which is
superfluous now. Concurrency is default for all users.
git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@4609 6c8d7289-2bf4-0310-a012-ef5d649a1542
---
defaults/yacy.init | 9 -
htroot/SettingsAck_p.java | 5 +-
.../plasma/crawler/plasmaCrawlQueues.java | 2 +-
source/de/anomic/plasma/plasmaCrawlZURL.java | 2 +-
.../de/anomic/plasma/plasmaSwitchboard.java | 172 +++++++++++-------
.../anomic/plasma/plasmaSwitchboardQueue.java | 21 +--
.../server/serverAbstractBlockingThread.java | 14 +-
.../server/serverAbstractBusyThread.java | 28 ++-
source/de/anomic/server/serverBusyThread.java | 1 +
.../server/serverInstantBlockingThread.java | 57 +++---
source/de/anomic/server/serverProcessor.java | 51 +++++-
.../de/anomic/server/serverProcessorJob.java | 5 +
12 files changed, 236 insertions(+), 131 deletions(-)
create mode 100644 source/de/anomic/server/serverProcessorJob.java
diff --git a/defaults/yacy.init b/defaults/yacy.init
index 809babaf0..ddd0b9a8a 100644
--- a/defaults/yacy.init
+++ b/defaults/yacy.init
@@ -586,15 +586,6 @@ filterOutStopwordsFromTopwords=true
cleanup.deletionProcessedNews = true
cleanup.deletionPublishedNews = true
-# multiprocessor-settings
-# you may want to run time-consuming processes on several processors
-# the most time-consuming process is the indexing-Process
-# We implemented an option to run several of these processes here
-# setting the number of processes to Zero is not allowed
-# If you have a double-processor system,
-# a cluster value of '2' would be appropriate
-80_indexing_cluster=1
-
# default memory settings for startup of yacy
# is valid in unix/shell and windows environments but
# not for first startup of YaCy
diff --git a/htroot/SettingsAck_p.java b/htroot/SettingsAck_p.java
index 0c4879038..59c140532 100644
--- a/htroot/SettingsAck_p.java
+++ b/htroot/SettingsAck_p.java
@@ -64,6 +64,7 @@ import de.anomic.http.httpdProxyHandler;
import de.anomic.kelondro.kelondroBase64Order;
import de.anomic.plasma.plasmaParser;
import de.anomic.plasma.plasmaSwitchboard;
+import de.anomic.server.serverBusyThread;
import de.anomic.server.serverCodings;
import de.anomic.server.serverCore;
import de.anomic.server.serverDate;
@@ -256,8 +257,8 @@ public class SettingsAck_p {
httpd.initPortForwarding();
// notifying publishSeed Thread
- //serverThread peerPing = env.getThread("30_peerping");
- //peerPing.notifyThread();
+ serverBusyThread peerPing = env.getThread("30_peerping");
+ peerPing.notifyThread();
} catch (Exception e) {
prop.put("info", "23");
prop.putHTML("info_errormsg",(e.getMessage() == null) ? "unknown" : e.getMessage().replaceAll("\n","
"));
diff --git a/source/de/anomic/plasma/crawler/plasmaCrawlQueues.java b/source/de/anomic/plasma/crawler/plasmaCrawlQueues.java
index d91f0b4d9..1e4f02c56 100644
--- a/source/de/anomic/plasma/crawler/plasmaCrawlQueues.java
+++ b/source/de/anomic/plasma/crawler/plasmaCrawlQueues.java
@@ -84,7 +84,7 @@ public class plasmaCrawlQueues {
errorURL = new plasmaCrawlZURL(plasmaPath, "urlError2.db", false);
delegatedURL = new plasmaCrawlZURL(plasmaPath, "urlDelegated2.db", true);
}
-
+
public String urlExists(String hash) {
// tests if hash occurrs in any database
// if it exists, the name of the database is returned,
diff --git a/source/de/anomic/plasma/plasmaCrawlZURL.java b/source/de/anomic/plasma/plasmaCrawlZURL.java
index 5848317ad..082bc3a47 100644
--- a/source/de/anomic/plasma/plasmaCrawlZURL.java
+++ b/source/de/anomic/plasma/plasmaCrawlZURL.java
@@ -202,7 +202,7 @@ public class plasmaCrawlZURL {
newrow.setCol(4, this.anycause.getBytes());
newrow.setCol(5, this.bentry.toRow().bytes());
try {
- urlIndex.put(newrow);
+ if (urlIndex != null) urlIndex.put(newrow);
this.stored = true;
} catch (IOException e) {
System.out.println("INTERNAL ERROR AT plasmaEURL:url2hash:" + e.toString());
diff --git a/source/de/anomic/plasma/plasmaSwitchboard.java b/source/de/anomic/plasma/plasmaSwitchboard.java
index fa4ae333e..c9ffc2f50 100644
--- a/source/de/anomic/plasma/plasmaSwitchboard.java
+++ b/source/de/anomic/plasma/plasmaSwitchboard.java
@@ -107,6 +107,8 @@ import java.util.Timer;
import java.util.TimerTask;
import java.util.TreeMap;
import java.util.TreeSet;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
import de.anomic.data.URLLicense;
import de.anomic.data.blogBoard;
@@ -143,6 +145,8 @@ import de.anomic.server.serverFileUtils;
import de.anomic.server.serverInstantBusyThread;
import de.anomic.server.serverMemory;
import de.anomic.server.serverObjects;
+import de.anomic.server.serverProcessor;
+import de.anomic.server.serverProcessorJob;
import de.anomic.server.serverProfiling;
import de.anomic.server.serverSemaphore;
import de.anomic.server.serverSwitch;
@@ -238,6 +242,15 @@ public final class plasmaSwitchboard extends serverAbstractSwitch indexingDocumentProcessor;
+ public serverProcessor indexingCondensementProcessor;
+ public serverProcessor indexingAnalysisProcessor;
+ public serverProcessor indexingStorageProcessor;
+ public LinkedBlockingQueue indexingDocumentQueue;
+ public LinkedBlockingQueue indexingCondensementQueue;
+ public LinkedBlockingQueue indexingAnalysisQueue;
+ public ArrayBlockingQueue indexingStorageQueue;
+
/*
* Remote Proxy configuration
*/
@@ -386,7 +399,6 @@ public final class plasmaSwitchboard extends serverAbstractSwitchName of the indexer thread, performing the actual indexing of a website
*/
public static final String INDEXER = "80_indexing";
- public static final String INDEXER_CLUSTER = "80_indexing_cluster";
public static final String INDEXER_MEMPREREQ = "80_indexing_memprereq";
public static final String INDEXER_IDLESLEEP = "80_indexing_idlesleep";
public static final String INDEXER_BUSYSLEEP = "80_indexing_busysleep";
@@ -1282,36 +1294,30 @@ public final class plasmaSwitchboard extends serverAbstractSwitch();
+ indexingCondensementQueue = new LinkedBlockingQueue();
+ indexingAnalysisQueue = new LinkedBlockingQueue();
+ indexingStorageQueue = new ArrayBlockingQueue(1);
+ indexingDocumentProcessor = new serverProcessor(this, "parseDocument", indexingDocumentQueue, indexingCondensementQueue);
+ indexingCondensementProcessor = new serverProcessor(this, "condenseDocument", indexingCondensementQueue, indexingAnalysisQueue);
+ indexingAnalysisProcessor = new serverProcessor(this, "webStructureAnalysis", indexingAnalysisQueue, indexingStorageQueue);
+ indexingStorageProcessor = new serverProcessor(this, "storeDocumentIndex", indexingStorageQueue, null);
+
+ // deploy busy threads
log.logConfig("Starting Threads");
serverMemory.gc(1000, "plasmaSwitchboard, help for profiler"); // help for profiler - thq
moreMemory = new Timer(); // init GC Thread - thq
moreMemory.schedule(new MoreMemory(), 300000, 600000);
-
- int indexing_cluster = Integer.parseInt(getConfig(INDEXER_CLUSTER, "1"));
- if (indexing_cluster < 1) indexing_cluster = 1;
+
deployThread(CLEANUP, "Cleanup", "simple cleaning process for monitoring information", null,
new serverInstantBusyThread(this, CLEANUP_METHOD_START, CLEANUP_METHOD_JOBCOUNT, CLEANUP_METHOD_FREEMEM), 10000); // all 5 Minutes
deployThread(CRAWLSTACK, "Crawl URL Stacker", "process that checks url for double-occurrences and for allowance/disallowance by robots.txt", null,
new serverInstantBusyThread(crawlStacker, CRAWLSTACK_METHOD_START, CRAWLSTACK_METHOD_JOBCOUNT, CRAWLSTACK_METHOD_FREEMEM), 8000);
-
- //deployThread(PARSER, "Parsing", "thread that feeds a concurrent document parsing queue", "/IndexCreateIndexingQueue_p.html",
- //new serverInstantThread(this, PARSER_METHOD_START, PARSER_METHOD_JOBCOUNT, PARSER_METHOD_FREEMEM), 10000);
-
- deployThread(INDEXER, "Indexing", "thread that either distributes the index into the DHT, stores parsed documents or flushes the index cache", "/IndexCreateIndexingQueue_p.html",
+ deployThread(INDEXER, "Indexing", "thread that either initiates a parsing/indexing queue, distributes the index into the DHT, stores parsed documents or flushes the index cache", "/IndexCreateIndexingQueue_p.html",
new serverInstantBusyThread(this, INDEXER_METHOD_START, INDEXER_METHOD_JOBCOUNT, INDEXER_METHOD_FREEMEM), 10000);
-
- for (i = 1; i < indexing_cluster; i++) {
- setConfig((i + 80) + "_indexing_idlesleep", getConfig(INDEXER_IDLESLEEP, ""));
- setConfig((i + 80) + "_indexing_busysleep", getConfig(INDEXER_BUSYSLEEP, ""));
- deployThread((i + 80) + "_indexing", "Parsing/Indexing (cluster job)", "thread that performes document parsing and indexing", null,
- new serverInstantBusyThread(this, INDEXER_METHOD_START, INDEXER_METHOD_JOBCOUNT, INDEXER_METHOD_FREEMEM), 10000 + (i * 1000),
- Long.parseLong(getConfig(INDEXER_IDLESLEEP , "5000")),
- Long.parseLong(getConfig(INDEXER_BUSYSLEEP , "0")),
- Long.parseLong(getConfig(INDEXER_MEMPREREQ , "1000000")));
- }
-
deployThread(PROXY_CACHE_ENQUEUE, "Proxy Cache Enqueue", "job takes new input files from RAM stack, stores them, and hands over to the Indexing Stack", null,
new serverInstantBusyThread(this, PROXY_CACHE_ENQUEUE_METHOD_START, PROXY_CACHE_ENQUEUE_METHOD_JOBCOUNT, PROXY_CACHE_ENQUEUE_METHOD_FREEMEM), 10000);
deployThread(CRAWLJOB_REMOTE_TRIGGERED_CRAWL, "Remote Crawl Job", "thread that performes a single crawl/indexing step triggered by a remote peer", null,
@@ -1743,9 +1749,13 @@ public final class plasmaSwitchboard extends serverAbstractSwitch
// parse and index the resource
- queueEntry.updateStatus(plasmaSwitchboardQueue.QUEUE_STATE_PARSING_WAITING);
- queueEntry.updateStatus(plasmaSwitchboardQueue.QUEUE_STATE_PARSING_RUNNING);
- plasmaParserDocument document = parseDocument(queueEntry);
- if (document == null) {
- if (!queueEntry.profile().storeHTCache()) {
- plasmaHTCache.filesInUse.remove(queueEntry.cacheFile());
- //plasmaHTCache.deleteURLfromCache(entry.url());
- }
- queueEntry.close();
- return true;
- }
- queueEntry.updateStatus(plasmaSwitchboardQueue.QUEUE_STATE_PARSING_COMPLETE);
+ indexingQueueEntry document = parseDocument(new indexingQueueEntry(queueEntry, null, null));
// do condensing
- queueEntry.updateStatus(plasmaSwitchboardQueue.QUEUE_STATE_CONDENSING_WAITING);
- queueEntry.updateStatus(plasmaSwitchboardQueue.QUEUE_STATE_CONDENSING_RUNNING);
- plasmaCondenser condensement = condenseDocument(queueEntry, document);
- if (condensement == null) {
- if (!queueEntry.profile().storeHTCache()) {
- plasmaHTCache.filesInUse.remove(queueEntry.cacheFile());
- //plasmaHTCache.deleteURLfromCache(entry.url());
- }
- queueEntry.close();
- return true;
- }
- queueEntry.updateStatus(plasmaSwitchboardQueue.QUEUE_STATE_CONDENSING_COMPLETE);
+ indexingQueueEntry condensement = condenseDocument(document);
// do a web structure analysis
- queueEntry.updateStatus(plasmaSwitchboardQueue.QUEUE_STATE_STRUCTUREANALYSIS_WAITING);
- queueEntry.updateStatus(plasmaSwitchboardQueue.QUEUE_STATE_STRUCTUREANALYSIS_RUNNING);
- document.notifyWebStructure(webStructure, condensement, queueEntry.getModificationDate());
- queueEntry.updateStatus(plasmaSwitchboardQueue.QUEUE_STATE_STRUCTUREANALYSIS_COMPLETE);
+ indexingQueueEntry analysis = webStructureAnalysis(condensement);
// <- CONCURRENT UNTIL HERE, THEN SERIALIZE AGAIN
// store the result
- queueEntry.updateStatus(plasmaSwitchboardQueue.QUEUE_STATE_INDEXSTORAGE_WAITING);
- queueEntry.updateStatus(plasmaSwitchboardQueue.QUEUE_STATE_INDEXSTORAGE_RUNNING);
- storeDocumentIndex(queueEntry, document, condensement);
- queueEntry.updateStatus(plasmaSwitchboardQueue.QUEUE_STATE_INDEXSTORAGE_COMPLETE);
-
- // finally close the queue process
- if (!queueEntry.profile().storeHTCache()) {
- plasmaHTCache.filesInUse.remove(queueEntry.cacheFile());
- //plasmaHTCache.deleteURLfromCache(entry.url());
- }
- queueEntry.close();
-
+ storeDocumentIndex(analysis);
+ */
return true;
} catch (InterruptedException e) {
log.logInfo("DEQUEUE: Shutdown detected.");
@@ -1931,6 +1912,20 @@ public final class plasmaSwitchboard extends serverAbstractSwitch 1000)) c++;
@@ -2134,6 +2129,25 @@ public final class plasmaSwitchboard extends serverAbstractSwitch extends serverAbstractThread implements serverBlockingThread {
+public abstract class serverAbstractBlockingThread extends serverAbstractThread implements serverBlockingThread {
private BlockingQueue input = null;
private BlockingQueue output = null;
@@ -60,8 +60,16 @@ public abstract class serverAbstractBlockingThread extends serverAbstractT
// do job
timestamp = System.currentTimeMillis();
memstamp0 = serverMemory.used();
- O out = this.job(this.input.take());
- if (out != null) this.output.add(out);
+ I in = this.input.take();
+ if ((in == null) || (in == serverProcessor.shutdownJob)) {
+ // the poison pill: shutdown
+ // a null element is pushed to the queue on purpose to signal
+ // that a termination should be made
+ this.running = false;
+ break;
+ }
+ O out = this.job(in);
+ if ((out != null) && (this.output != null)) this.output.put(out);
// do memory and busy/idle-count/time monitoring
memstamp1 = serverMemory.used();
if (memstamp1 >= memstamp0) {
diff --git a/source/de/anomic/server/serverAbstractBusyThread.java b/source/de/anomic/server/serverAbstractBusyThread.java
index 0242ed9b3..ffd90497f 100644
--- a/source/de/anomic/server/serverAbstractBusyThread.java
+++ b/source/de/anomic/server/serverAbstractBusyThread.java
@@ -32,6 +32,7 @@ public abstract class serverAbstractBusyThread extends serverAbstractThread impl
private long idletime = 0, memprereq = 0;
private long idleCycles = 0, busyCycles = 0, outofmemoryCycles = 0;
private boolean intermissionObedient = true;
+ private Object syncObject = new Object();
protected final void announceMoreSleepTime(long millis) {
this.idletime += millis;
@@ -181,12 +182,31 @@ public abstract class serverAbstractBusyThread extends serverAbstractThread impl
this.close();
logSystem("thread '" + this.getName() + "' terminated.");
}
-
+
private void ratz(long millis) {
- try {
- Thread.sleep(millis);
+ try {/*
+ if (this.syncObject != null) {
+ synchronized (this.syncObject) {
+ this.syncObject.wait(millis);
+ }
+ } else {*/
+ Thread.sleep(millis);
+ //}
} catch (InterruptedException e) {
- if (this.log != null) this.log.logConfig("thread '" + this.getName() + "' interrupted because of shutdown.");
+ if (this.log != null)
+ this.log.logConfig("thread '" + this.getName() + "' interrupted because of shutdown.");
+ }
+ }
+
+ public void notifyThread() {
+ if (this.syncObject != null) {
+ synchronized (this.syncObject) {
+ if (this.log != null)
+ this.log.logFine("thread '" + this.getName()
+ + "' has received a notification from thread '"
+ + Thread.currentThread().getName() + "'.");
+ this.syncObject.notifyAll();
+ }
}
}
diff --git a/source/de/anomic/server/serverBusyThread.java b/source/de/anomic/server/serverBusyThread.java
index dc041c6ff..e11cef462 100644
--- a/source/de/anomic/server/serverBusyThread.java
+++ b/source/de/anomic/server/serverBusyThread.java
@@ -67,4 +67,5 @@ public interface serverBusyThread extends serverThread {
// is called when an outOfMemoryCycle is performed
// this method should try to free some memory, so that the job can be executed
+ public void notifyThread();
}
diff --git a/source/de/anomic/server/serverInstantBlockingThread.java b/source/de/anomic/server/serverInstantBlockingThread.java
index 48f8404fe..ac44fb360 100644
--- a/source/de/anomic/server/serverInstantBlockingThread.java
+++ b/source/de/anomic/server/serverInstantBlockingThread.java
@@ -26,21 +26,21 @@ package de.anomic.server;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
-import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
import de.anomic.server.logging.serverLog;
-public class serverInstantBlockingThread extends serverAbstractBlockingThread implements serverBlockingThread {
+public class serverInstantBlockingThread extends serverAbstractBlockingThread implements serverBlockingThread {
- private Method jobExecMethod, jobCountMethod;
+ private Method jobExecMethod;
private Object environment;
private Long handle;
-
+ private static int handleCounter = 0;
public static int instantThreadCounter = 0;
- public static TreeMap jobs = new TreeMap();
+ public static ConcurrentHashMap jobs = new ConcurrentHashMap();
- public serverInstantBlockingThread(Object env, String jobExec, String jobCount, BlockingQueue input, BlockingQueue output) {
+ public serverInstantBlockingThread(Object env, String jobExec, BlockingQueue input, BlockingQueue output) {
// jobExec is the name of a method of the object 'env' that executes the one-step-run
// jobCount is the name of a method that returns the size of the job
@@ -51,50 +51,37 @@ public class serverInstantBlockingThread extends serverAbstractBlockingThr
// define execution class
Class> theClass = (env instanceof Class) ? (Class>) env : env.getClass();
try {
- this.jobExecMethod = theClass.getMethod(jobExec, new Class[0]);
+ this.jobExecMethod = null;
+ Method[] methods = theClass.getMethods();
+ for (int i = 0; i < methods.length; i++) {
+ if ((methods[i].getParameterTypes().length == 1) && (methods[i].getName().equals(jobExec))) {
+ this.jobExecMethod = methods[i];
+ break;
+ }
+ }
+ if (this.jobExecMethod == null) throw new NoSuchMethodException(jobExec + " does not exist in " + env.getClass().getName());
} catch (NoSuchMethodException e) {
throw new RuntimeException("serverInstantThread, wrong declaration of jobExec: " + e.getMessage());
}
- try {
- if (jobCount == null)
- this.jobCountMethod = null;
- else
- this.jobCountMethod = theClass.getMethod(jobCount, new Class[0]);
-
- } catch (NoSuchMethodException e) {
- throw new RuntimeException("serverInstantThread, wrong declaration of jobCount: " + e.getMessage());
- }
+
this.environment = (env instanceof Class) ? null : env;
- this.setName(theClass.getName() + "." + jobExec);
+ this.setName(theClass.getName() + "." + jobExec + "." + handleCounter++);
this.handle = new Long(System.currentTimeMillis() + this.getName().hashCode());
}
public int getJobCount() {
- if (this.jobCountMethod == null) return Integer.MAX_VALUE;
- try {
- Object result = jobCountMethod.invoke(environment, new Object[0]);
- if (result instanceof Integer)
- return ((Integer) result).intValue();
- else
- return -1;
- } catch (IllegalAccessException e) {
- return -1;
- } catch (IllegalArgumentException e) {
- return -1;
- } catch (InvocationTargetException e) {
- serverLog.logSevere("BLOCKINGTHREAD", "invocation serverInstantThread of thread '" + this.getName() + "': " + e.getMessage(), e);
- return -1;
- }
+ return this.getInputQueue().size();
}
@SuppressWarnings("unchecked")
public O job(I next) throws Exception {
+ if (next == null) return null; // poison pill: shutdown
instantThreadCounter++;
//System.out.println("started job " + this.handle + ": " + this.getName());
- synchronized(jobs) {jobs.put(this.handle, this.getName());}
+ jobs.put(this.handle, this.getName());
O out = null;
try {
- out = (O) jobExecMethod.invoke(environment, new Object[0]);
+ out = (O) jobExecMethod.invoke(environment, new Object[]{next});
} catch (IllegalAccessException e) {
serverLog.logSevere("BLOCKINGTHREAD", "Internal Error in serverInstantThread.job: " + e.getMessage());
serverLog.logSevere("BLOCKINGTHREAD", "shutting down thread '" + this.getName() + "'");
@@ -115,7 +102,7 @@ public class serverInstantBlockingThread extends serverAbstractBlockingThr
e.printStackTrace();
}
instantThreadCounter--;
- synchronized(jobs) {jobs.remove(this.handle);}
+ jobs.remove(this.handle);
return out;
}
diff --git a/source/de/anomic/server/serverProcessor.java b/source/de/anomic/server/serverProcessor.java
index b4ebf1bba..8a925c83a 100644
--- a/source/de/anomic/server/serverProcessor.java
+++ b/source/de/anomic/server/serverProcessor.java
@@ -24,9 +24,58 @@
package de.anomic.server;
-public class serverProcessor {
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+public class serverProcessor {
public static final int availableCPU = Runtime.getRuntime().availableProcessors();
public static int useCPU = availableCPU;
+ ExecutorService executor;
+ BlockingQueue input;
+ BlockingQueue output;
+ int poolsize;
+
+ public serverProcessor(Object env, String jobExec, BlockingQueue input, BlockingQueue output) {
+ this(env, jobExec, input, output, useCPU + 1);
+ }
+
+ public serverProcessor(Object env, String jobExec, BlockingQueue input, BlockingQueue output, int poolsize) {
+ // start a fixed number of executors that handle entries in the process queue
+ this.input = input;
+ this.output = output;
+ this.poolsize = poolsize;
+ executor = Executors.newCachedThreadPool();
+ for (int i = 0; i < poolsize; i++) {
+ executor.submit(new serverInstantBlockingThread(env, jobExec, input, output));
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public void shutdown(long millisTimeout) {
+ if (executor == null) return;
+ if (executor.isShutdown()) return;
+ // put poison pills into the queue
+ for (int i = 0; i < poolsize; i++) {
+ try {
+ input.put((I) shutdownJob);
+ } catch (InterruptedException e) { }
+ }
+ // wait for shutdown
+ try {
+ executor.awaitTermination(millisTimeout, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {}
+ executor.shutdown();
+ executor = null;
+ }
+
+ public static class SpecialJob implements serverProcessorJob {
+ int type = 0;
+ }
+
+ public static final serverProcessorJob shutdownJob = new SpecialJob();
+
}
diff --git a/source/de/anomic/server/serverProcessorJob.java b/source/de/anomic/server/serverProcessorJob.java
new file mode 100644
index 000000000..ddc98c835
--- /dev/null
+++ b/source/de/anomic/server/serverProcessorJob.java
@@ -0,0 +1,5 @@
+package de.anomic.server;
+
+public interface serverProcessorJob {
+
+}