enhanced the concurrency handling of indexing process (better queue size control, better data concept, better shutdown behavior)

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@4617 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
orbiter 17 years ago
parent 4c3f1b67ad
commit 368593e449

@ -111,15 +111,15 @@ public class plasmaCrawlNURL {
public int size() {
// this does not count the overhang stack size
return coreStack.size() + limitStack.size() + remoteStack.size();
return ((coreStack == null) ? 0 : coreStack.size()) + ((limitStack == null) ? 0 : limitStack.size()) + ((remoteStack == null) ? 0 : remoteStack.size());
}
public int stackSize(int stackType) {
switch (stackType) {
case STACK_TYPE_CORE: return coreStack.size();
case STACK_TYPE_LIMIT: return limitStack.size();
case STACK_TYPE_CORE: return (coreStack == null) ? 0 : coreStack.size();
case STACK_TYPE_LIMIT: return (limitStack == null) ? 0 : limitStack.size();
case STACK_TYPE_OVERHANG: return 0;
case STACK_TYPE_REMOTE: return remoteStack.size();
case STACK_TYPE_REMOTE: return (remoteStack == null) ? 0 : remoteStack.size();
default: return -1;
}
}

@ -453,13 +453,14 @@ public final class plasmaSearchEvent {
return (plasmaSearchEvent) lastEvents.get(eventID);
}
public static plasmaSearchEvent getEvent(plasmaSearchQuery query,
plasmaSearchRankingProfile ranking,
plasmaWordIndex wordIndex,
plasmaCrawlResults crawlResults,
TreeMap<String, String> preselectedPeerHashes,
boolean generateAbstracts) {
plasmaSearchEvent event = (plasmaSearchEvent) lastEvents.get(query.id(false));
public static plasmaSearchEvent getEvent(
plasmaSearchQuery query,
plasmaSearchRankingProfile ranking,
plasmaWordIndex wordIndex,
plasmaCrawlResults crawlResults,
TreeMap<String, String> preselectedPeerHashes,
boolean generateAbstracts) {
plasmaSearchEvent event = lastEvents.get(query.id(false));
if (event == null) {
event = new plasmaSearchEvent(query, wordIndex, crawlResults, preselectedPeerHashes, generateAbstracts);
} else {

@ -107,8 +107,6 @@ import java.util.Timer;
import java.util.TimerTask;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import de.anomic.data.URLLicense;
import de.anomic.data.blogBoard;
@ -242,14 +240,10 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<plasmaSwitchbo
public URLLicense licensedURLs;
public Timer moreMemory;
public serverProcessor<indexingQueueEntry, indexingQueueEntry> indexingDocumentProcessor;
public serverProcessor<indexingQueueEntry, indexingQueueEntry> indexingCondensementProcessor;
public serverProcessor<indexingQueueEntry, indexingQueueEntry> indexingAnalysisProcessor;
public serverProcessor<indexingQueueEntry, indexingQueueEntry> indexingStorageProcessor;
public LinkedBlockingQueue<indexingQueueEntry> indexingDocumentQueue;
public LinkedBlockingQueue<indexingQueueEntry> indexingCondensementQueue;
public LinkedBlockingQueue<indexingQueueEntry> indexingAnalysisQueue;
public ArrayBlockingQueue<indexingQueueEntry> indexingStorageQueue;
public serverProcessor<indexingQueueEntry> indexingDocumentProcessor;
public serverProcessor<indexingQueueEntry> indexingCondensementProcessor;
public serverProcessor<indexingQueueEntry> indexingAnalysisProcessor;
public serverProcessor<indexingQueueEntry> indexingStorageProcessor;
/*
* Remote Proxy configuration
@ -1295,15 +1289,10 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<plasmaSwitchbo
this.clusterhashes = yacyCore.seedDB.clusterHashes(getConfig("cluster.peers.yacydomain", ""));
// deploy blocking threads
indexingDocumentQueue = new LinkedBlockingQueue<indexingQueueEntry>();
indexingCondensementQueue = new LinkedBlockingQueue<indexingQueueEntry>();
indexingAnalysisQueue = new LinkedBlockingQueue<indexingQueueEntry>();
indexingStorageQueue = new ArrayBlockingQueue<indexingQueueEntry>(1);
indexingDocumentProcessor = new serverProcessor<indexingQueueEntry, indexingQueueEntry>(this, "parseDocument", indexingDocumentQueue, indexingCondensementQueue);
indexingCondensementProcessor = new serverProcessor<indexingQueueEntry, indexingQueueEntry>(this, "condenseDocument", indexingCondensementQueue, indexingAnalysisQueue);
indexingAnalysisProcessor = new serverProcessor<indexingQueueEntry, indexingQueueEntry>(this, "webStructureAnalysis", indexingAnalysisQueue, indexingStorageQueue);
indexingStorageProcessor = new serverProcessor<indexingQueueEntry, indexingQueueEntry>(this, "storeDocumentIndex", indexingStorageQueue, null);
indexingStorageProcessor = new serverProcessor<indexingQueueEntry>(this, "storeDocumentIndex", 1, null);
indexingAnalysisProcessor = new serverProcessor<indexingQueueEntry>(this, "webStructureAnalysis", serverProcessor.useCPU + 1, indexingStorageProcessor);
indexingCondensementProcessor = new serverProcessor<indexingQueueEntry>(this, "condenseDocument", serverProcessor.useCPU + 1, indexingAnalysisProcessor);
indexingDocumentProcessor = new serverProcessor<indexingQueueEntry>(this, "parseDocument", serverProcessor.useCPU + 1, indexingCondensementProcessor);
// deploy busy threads
log.logConfig("Starting Threads");
@ -1733,9 +1722,12 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<plasmaSwitchbo
if (transferIdxThread != null) stopTransferWholeIndex(false);
log.logConfig("SWITCHBOARD SHUTDOWN STEP 2: sending termination signal to threaded indexing");
// closing all still running db importer jobs
indexingDocumentProcessor.shutdown(4000);
indexingCondensementProcessor.shutdown(3000);
indexingAnalysisProcessor.shutdown(2000);
indexingStorageProcessor.shutdown(1000);
this.dbImportManager.close();
httpc.closeAllConnections();
crawlQueues.close();
wikiDB.close();
blogDB.close();
blogCommentDB.close();
@ -1749,13 +1741,10 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<plasmaSwitchbo
parser.close();
plasmaHTCache.close();
sbQueue.close();
indexingDocumentProcessor.shutdown(1000);
indexingCondensementProcessor.shutdown(1000);
indexingAnalysisProcessor.shutdown(1000);
webStructure.flushCitationReference("crg");
webStructure.close();
crawlQueues.close();
log.logConfig("SWITCHBOARD SHUTDOWN STEP 3: sending termination signal to database manager (stand by...)");
indexingStorageProcessor.shutdown(1000);
wordIndex.close();
yc.close();
log.logConfig("SWITCHBOARD SHUTDOWN TERMINATED");
@ -1872,7 +1861,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<plasmaSwitchbo
log.logFine("deQueue: thread was interrupted");
return false;
}
// get next queue entry and start a queue processing
plasmaSwitchboardQueue.QueueEntry queueEntry = deQueue();
assert queueEntry != null;
@ -1886,7 +1875,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<plasmaSwitchbo
// check for interruption
checkInterruption();
this.indexingDocumentQueue.put(new indexingQueueEntry(queueEntry, null, null));
this.indexingDocumentProcessor.enQueue(new indexingQueueEntry(queueEntry, null, null));
/*
// THE FOLLOWING CAN BE CONCURRENT ->
@ -1912,7 +1901,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<plasmaSwitchbo
}
}
public static class indexingQueueEntry implements serverProcessorJob {
public static class indexingQueueEntry extends serverProcessorJob {
public plasmaSwitchboardQueue.QueueEntry queueEntry;
public plasmaParserDocument document;
public plasmaCondenser condenser;
@ -1920,6 +1909,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<plasmaSwitchbo
plasmaSwitchboardQueue.QueueEntry queueEntry,
plasmaParserDocument document,
plasmaCondenser condenser) {
super();
this.queueEntry = queueEntry;
this.document = document;
this.condenser = condenser;

@ -28,21 +28,21 @@ import java.util.concurrent.BlockingQueue;
import de.anomic.server.logging.serverLog;
public abstract class serverAbstractBlockingThread<I extends serverProcessorJob, O extends serverProcessorJob> extends serverAbstractThread implements serverBlockingThread<I, O> {
public abstract class serverAbstractBlockingThread<J extends serverProcessorJob> extends serverAbstractThread implements serverBlockingThread<J> {
private BlockingQueue<I> input = null;
private BlockingQueue<O> output = null;
private BlockingQueue<J> input = null;
private serverProcessor<J> output = null;
public void setInputQueue(BlockingQueue<I> queue) {
public void setInputQueue(BlockingQueue<J> queue) {
this.input = queue;
}
public void setOutputQueue(BlockingQueue<O> queue) {
this.output = queue;
public void setOutputProcess(serverProcessor<J> processor) {
this.output = processor;
}
public BlockingQueue<I> getInputQueue() {
public BlockingQueue<J> getInputQueue() {
return this.input;
}
public BlockingQueue<O> getOutputQueue() {
public serverProcessor<J> getOutputProcess() {
return this.output;
}
@ -60,16 +60,16 @@ public abstract class serverAbstractBlockingThread<I extends serverProcessorJob,
// do job
timestamp = System.currentTimeMillis();
memstamp0 = serverMemory.used();
I in = this.input.take();
if ((in == null) || (in == serverProcessor.shutdownJob)) {
J in = this.input.take();
if ((in == null) || (in.status == serverProcessorJob.STATUS_POISON)) {
// the poison pill: shutdown
// a null element is pushed to the queue on purpose to signal
// that a termination should be made
this.running = false;
break;
}
O out = this.job(in);
if ((out != null) && (this.output != null)) this.output.put(out);
J out = this.job(in);
if ((out != null) && (this.output != null)) this.output.enQueue(out);
// do memory and busy/idle-count/time monitoring
memstamp1 = serverMemory.used();
if (memstamp1 >= memstamp0) {
@ -80,6 +80,10 @@ public abstract class serverAbstractBlockingThread<I extends serverProcessorJob,
if (busyCycles > 0) memuse += memuse / busyCycles;
}
busytime += System.currentTimeMillis() - timestamp;
} catch (InterruptedException e) {
// don't ignore this: shut down
this.running = false;
break;
} catch (Exception e) {
// handle exceptions: thread must not die on any unexpected exceptions
// if the exception is too bad it should call terminate()

@ -26,14 +26,14 @@ package de.anomic.server;
import java.util.concurrent.BlockingQueue;
public interface serverBlockingThread<I, O> extends serverThread {
public interface serverBlockingThread<J extends serverProcessorJob> extends serverThread {
public void setInputQueue(BlockingQueue<I> queue);
public void setOutputQueue(BlockingQueue<O> queue);
public BlockingQueue<I> getInputQueue();
public BlockingQueue<O> getOutputQueue();
public void setInputQueue(BlockingQueue<J> queue);
public void setOutputProcess(serverProcessor<J> queue);
public BlockingQueue<J> getInputQueue();
public serverProcessor<J> getOutputProcess();
public O job(I next) throws Exception;
public J job(J next) throws Exception;
// performes one job procedure; this loopes until terminate() is called
// job returns true if it has done something
// it returns false if it is idle and does not expect to work on more for a longer time

@ -114,7 +114,6 @@ public final class serverCore extends serverAbstractBusyThread implements server
private SSLSocketFactory sslSocketFactory = null;
private ServerSocket socket; // listener
serverLog log; // log object
private int timeout; // connection time-out of the socket
serverHandler handlerPrototype; // the command class (a serverHandler)

@ -31,7 +31,7 @@ import java.util.concurrent.ConcurrentHashMap;
import de.anomic.server.logging.serverLog;
public class serverInstantBlockingThread<I extends serverProcessorJob, O extends serverProcessorJob> extends serverAbstractBlockingThread<I, O> implements serverBlockingThread<I, O> {
public class serverInstantBlockingThread<J extends serverProcessorJob> extends serverAbstractBlockingThread<J> implements serverBlockingThread<J> {
private Method jobExecMethod;
private Object environment;
@ -40,33 +40,34 @@ public class serverInstantBlockingThread<I extends serverProcessorJob, O extends
public static int instantThreadCounter = 0;
public static ConcurrentHashMap<Long, String> jobs = new ConcurrentHashMap<Long, String>();
public serverInstantBlockingThread(Object env, String jobExec, BlockingQueue<I> input, BlockingQueue<O> output) {
public serverInstantBlockingThread(Object env, String jobExec, BlockingQueue<J> input, serverProcessor<J> output) {
// jobExec is the name of a method of the object 'env' that executes the one-step-run
// jobCount is the name of a method that returns the size of the job
// set the blocking queues for input and output
this.setInputQueue(input);
this.setOutputQueue(output);
this.setOutputProcess(output);
// define execution class
this.jobExecMethod = execMethod(env, jobExec);
this.environment = (env instanceof Class) ? null : env;
this.setName(jobExecMethod.getClass().getName() + "." + jobExecMethod.getName() + "." + handleCounter++);
this.handle = new Long(System.currentTimeMillis() + this.getName().hashCode());
}
protected static Method execMethod(Object env, String jobExec) {
Class<?> theClass = (env instanceof Class) ? (Class<?>) env : env.getClass();
try {
this.jobExecMethod = null;
Method[] methods = theClass.getMethods();
for (int i = 0; i < methods.length; i++) {
if ((methods[i].getParameterTypes().length == 1) && (methods[i].getName().equals(jobExec))) {
this.jobExecMethod = methods[i];
break;
return methods[i];
}
}
if (this.jobExecMethod == null) throw new NoSuchMethodException(jobExec + " does not exist in " + env.getClass().getName());
throw new NoSuchMethodException(jobExec + " does not exist in " + env.getClass().getName());
} catch (NoSuchMethodException e) {
throw new RuntimeException("serverInstantThread, wrong declaration of jobExec: " + e.getMessage());
}
this.environment = (env instanceof Class) ? null : env;
this.setName(theClass.getName() + "." + jobExec + "." + handleCounter++);
this.handle = new Long(System.currentTimeMillis() + this.getName().hashCode());
}
public int getJobCount() {
@ -74,14 +75,14 @@ public class serverInstantBlockingThread<I extends serverProcessorJob, O extends
}
@SuppressWarnings("unchecked")
public O job(I next) throws Exception {
public J job(J next) throws Exception {
if (next == null) return null; // poison pill: shutdown
instantThreadCounter++;
//System.out.println("started job " + this.handle + ": " + this.getName());
jobs.put(this.handle, this.getName());
O out = null;
J out = null;
try {
out = (O) jobExecMethod.invoke(environment, new Object[]{next});
out = (J) jobExecMethod.invoke(environment, new Object[]{next});
} catch (IllegalAccessException e) {
serverLog.logSevere("BLOCKINGTHREAD", "Internal Error in serverInstantThread.job: " + e.getMessage());
serverLog.logSevere("BLOCKINGTHREAD", "shutting down thread '" + this.getName() + "'");

@ -24,36 +24,70 @@
package de.anomic.server;
import java.lang.reflect.InvocationTargetException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
public class serverProcessor<I extends serverProcessorJob, O extends serverProcessorJob> {
import de.anomic.server.logging.serverLog;
public class serverProcessor<J extends serverProcessorJob> {
public static final int availableCPU = Runtime.getRuntime().availableProcessors();
public static int useCPU = availableCPU;
ExecutorService executor;
BlockingQueue<I> input;
BlockingQueue<O> output;
int poolsize;
private ExecutorService executor;
private BlockingQueue<J> input;
private serverProcessor<J> output;
private int poolsize;
private Object environment;
private String methodName;
public serverProcessor(Object env, String jobExec, BlockingQueue<I> input, BlockingQueue<O> output) {
this(env, jobExec, input, output, useCPU + 1);
public serverProcessor(Object env, String jobExec, int inputQueueSize, serverProcessor<J> output) {
this(env, jobExec, inputQueueSize, output, useCPU + 1);
}
public serverProcessor(Object env, String jobExec, BlockingQueue<I> input, BlockingQueue<O> output, int poolsize) {
public serverProcessor(Object env, String jobExec, int inputQueueSize, serverProcessor<J> output, int poolsize) {
// start a fixed number of executors that handle entries in the process queue
this.input = input;
this.environment = env;
this.methodName = jobExec;
this.input = new LinkedBlockingQueue<J>(inputQueueSize);
this.output = output;
this.poolsize = poolsize;
executor = Executors.newCachedThreadPool();
for (int i = 0; i < poolsize; i++) {
executor.submit(new serverInstantBlockingThread<I, O>(env, jobExec, input, output));
executor.submit(new serverInstantBlockingThread<J>(env, jobExec, input, output));
}
}
public int queueSize() {
return input.size();
}
@SuppressWarnings("unchecked")
public void enQueue(J in) throws InterruptedException {
// ensure that enough job executors are running
if ((this.input == null) || (executor == null) || (executor.isShutdown()) || (executor.isTerminated())) {
// execute serialized without extra thread
serverLog.logWarning("PROCESSOR", "executing job " + environment.getClass().getName() + "." + methodName + " serialized");
try {
J out = (J) serverInstantBlockingThread.execMethod(this.environment, this.methodName).invoke(environment, new Object[]{in});
if ((out != null) && (output != null)) output.enQueue(out);
} catch (IllegalArgumentException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
return;
}
// execute concurrent in thread
this.input.put(in);
}
@SuppressWarnings("unchecked")
public void shutdown(long millisTimeout) {
if (executor == null) return;
@ -61,7 +95,7 @@ public class serverProcessor<I extends serverProcessorJob, O extends serverProce
// put poison pills into the queue
for (int i = 0; i < poolsize; i++) {
try {
input.put((I) shutdownJob);
input.put((J) serverProcessorJob.poisonPill); // put a poison pill into the queue which will kill the job
} catch (InterruptedException e) { }
}
// wait for shutdown
@ -69,13 +103,8 @@ public class serverProcessor<I extends serverProcessorJob, O extends serverProce
executor.awaitTermination(millisTimeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {}
executor.shutdown();
executor = null;
this.executor = null;
this.input = null;
}
public static class SpecialJob implements serverProcessorJob {
int type = 0;
}
public static final serverProcessorJob shutdownJob = new SpecialJob();
}

@ -1,5 +1,48 @@
// serverProcessor.java
// (C) 2008 by Michael Peter Christen; mc@yacy.net, Frankfurt a. M., Germany
// first published 29.02.2008 on http://yacy.net
//
// $LastChangedDate: 2006-04-02 22:40:07 +0200 (So, 02 Apr 2006) $
// $LastChangedRevision: 1986 $
// $LastChangedBy: orbiter $
//
// LICENSE
//
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 2 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
package de.anomic.server;
public interface serverProcessorJob {
public class serverProcessorJob {
public final static int STATUS_INITIATED = 0;
public final static int STATUS_STARTED = 1;
public final static int STATUS_RUNNING = 2;
public final static int STATUS_FINISHED = 3;
public final static int STATUS_POISON = 99;
public int status = 0;
public serverProcessorJob() {
this.status = STATUS_INITIATED;
}
public serverProcessorJob(int status) {
this.status = status;
}
public static final serverProcessorJob poisonPill = new serverProcessorJob(serverProcessorJob.STATUS_POISON); // kills job queue executors
}

Loading…
Cancel
Save