diff --git a/defaults/yacy.init b/defaults/yacy.init index 809babaf0..ddd0b9a8a 100644 --- a/defaults/yacy.init +++ b/defaults/yacy.init @@ -586,15 +586,6 @@ filterOutStopwordsFromTopwords=true cleanup.deletionProcessedNews = true cleanup.deletionPublishedNews = true -# multiprocessor-settings -# you may want to run time-consuming processes on several processors -# the most time-consuming process is the indexing-Process -# We implemented an option to run several of these processes here -# setting the number of processes to Zero is not allowed -# If you have a double-processor system, -# a cluster value of '2' would be appropriate -80_indexing_cluster=1 - # default memory settings for startup of yacy # is valid in unix/shell and windows environments but # not for first startup of YaCy diff --git a/htroot/SettingsAck_p.java b/htroot/SettingsAck_p.java index 0c4879038..59c140532 100644 --- a/htroot/SettingsAck_p.java +++ b/htroot/SettingsAck_p.java @@ -64,6 +64,7 @@ import de.anomic.http.httpdProxyHandler; import de.anomic.kelondro.kelondroBase64Order; import de.anomic.plasma.plasmaParser; import de.anomic.plasma.plasmaSwitchboard; +import de.anomic.server.serverBusyThread; import de.anomic.server.serverCodings; import de.anomic.server.serverCore; import de.anomic.server.serverDate; @@ -256,8 +257,8 @@ public class SettingsAck_p { httpd.initPortForwarding(); // notifying publishSeed Thread - //serverThread peerPing = env.getThread("30_peerping"); - //peerPing.notifyThread(); + serverBusyThread peerPing = env.getThread("30_peerping"); + peerPing.notifyThread(); } catch (Exception e) { prop.put("info", "23"); prop.putHTML("info_errormsg",(e.getMessage() == null) ? "unknown" : e.getMessage().replaceAll("\n","
")); diff --git a/source/de/anomic/plasma/crawler/plasmaCrawlQueues.java b/source/de/anomic/plasma/crawler/plasmaCrawlQueues.java index d91f0b4d9..1e4f02c56 100644 --- a/source/de/anomic/plasma/crawler/plasmaCrawlQueues.java +++ b/source/de/anomic/plasma/crawler/plasmaCrawlQueues.java @@ -84,7 +84,7 @@ public class plasmaCrawlQueues { errorURL = new plasmaCrawlZURL(plasmaPath, "urlError2.db", false); delegatedURL = new plasmaCrawlZURL(plasmaPath, "urlDelegated2.db", true); } - + public String urlExists(String hash) { // tests if hash occurrs in any database // if it exists, the name of the database is returned, diff --git a/source/de/anomic/plasma/plasmaCrawlZURL.java b/source/de/anomic/plasma/plasmaCrawlZURL.java index 5848317ad..082bc3a47 100644 --- a/source/de/anomic/plasma/plasmaCrawlZURL.java +++ b/source/de/anomic/plasma/plasmaCrawlZURL.java @@ -202,7 +202,7 @@ public class plasmaCrawlZURL { newrow.setCol(4, this.anycause.getBytes()); newrow.setCol(5, this.bentry.toRow().bytes()); try { - urlIndex.put(newrow); + if (urlIndex != null) urlIndex.put(newrow); this.stored = true; } catch (IOException e) { System.out.println("INTERNAL ERROR AT plasmaEURL:url2hash:" + e.toString()); diff --git a/source/de/anomic/plasma/plasmaSwitchboard.java b/source/de/anomic/plasma/plasmaSwitchboard.java index fa4ae333e..c9ffc2f50 100644 --- a/source/de/anomic/plasma/plasmaSwitchboard.java +++ b/source/de/anomic/plasma/plasmaSwitchboard.java @@ -107,6 +107,8 @@ import java.util.Timer; import java.util.TimerTask; import java.util.TreeMap; import java.util.TreeSet; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import de.anomic.data.URLLicense; import de.anomic.data.blogBoard; @@ -143,6 +145,8 @@ import de.anomic.server.serverFileUtils; import de.anomic.server.serverInstantBusyThread; import de.anomic.server.serverMemory; import de.anomic.server.serverObjects; +import de.anomic.server.serverProcessor; +import de.anomic.server.serverProcessorJob; import de.anomic.server.serverProfiling; import de.anomic.server.serverSemaphore; import de.anomic.server.serverSwitch; @@ -238,6 +242,15 @@ public final class plasmaSwitchboard extends serverAbstractSwitch indexingDocumentProcessor; + public serverProcessor indexingCondensementProcessor; + public serverProcessor indexingAnalysisProcessor; + public serverProcessor indexingStorageProcessor; + public LinkedBlockingQueue indexingDocumentQueue; + public LinkedBlockingQueue indexingCondensementQueue; + public LinkedBlockingQueue indexingAnalysisQueue; + public ArrayBlockingQueue indexingStorageQueue; + /* * Remote Proxy configuration */ @@ -386,7 +399,6 @@ public final class plasmaSwitchboard extends serverAbstractSwitchName of the indexer thread, performing the actual indexing of a website

*/ public static final String INDEXER = "80_indexing"; - public static final String INDEXER_CLUSTER = "80_indexing_cluster"; public static final String INDEXER_MEMPREREQ = "80_indexing_memprereq"; public static final String INDEXER_IDLESLEEP = "80_indexing_idlesleep"; public static final String INDEXER_BUSYSLEEP = "80_indexing_busysleep"; @@ -1282,36 +1294,30 @@ public final class plasmaSwitchboard extends serverAbstractSwitch(); + indexingCondensementQueue = new LinkedBlockingQueue(); + indexingAnalysisQueue = new LinkedBlockingQueue(); + indexingStorageQueue = new ArrayBlockingQueue(1); + indexingDocumentProcessor = new serverProcessor(this, "parseDocument", indexingDocumentQueue, indexingCondensementQueue); + indexingCondensementProcessor = new serverProcessor(this, "condenseDocument", indexingCondensementQueue, indexingAnalysisQueue); + indexingAnalysisProcessor = new serverProcessor(this, "webStructureAnalysis", indexingAnalysisQueue, indexingStorageQueue); + indexingStorageProcessor = new serverProcessor(this, "storeDocumentIndex", indexingStorageQueue, null); + + // deploy busy threads log.logConfig("Starting Threads"); serverMemory.gc(1000, "plasmaSwitchboard, help for profiler"); // help for profiler - thq moreMemory = new Timer(); // init GC Thread - thq moreMemory.schedule(new MoreMemory(), 300000, 600000); - - int indexing_cluster = Integer.parseInt(getConfig(INDEXER_CLUSTER, "1")); - if (indexing_cluster < 1) indexing_cluster = 1; + deployThread(CLEANUP, "Cleanup", "simple cleaning process for monitoring information", null, new serverInstantBusyThread(this, CLEANUP_METHOD_START, CLEANUP_METHOD_JOBCOUNT, CLEANUP_METHOD_FREEMEM), 10000); // all 5 Minutes deployThread(CRAWLSTACK, "Crawl URL Stacker", "process that checks url for double-occurrences and for allowance/disallowance by robots.txt", null, new serverInstantBusyThread(crawlStacker, CRAWLSTACK_METHOD_START, CRAWLSTACK_METHOD_JOBCOUNT, CRAWLSTACK_METHOD_FREEMEM), 8000); - - //deployThread(PARSER, "Parsing", "thread that feeds a concurrent document parsing queue", "/IndexCreateIndexingQueue_p.html", - //new serverInstantThread(this, PARSER_METHOD_START, PARSER_METHOD_JOBCOUNT, PARSER_METHOD_FREEMEM), 10000); - - deployThread(INDEXER, "Indexing", "thread that either distributes the index into the DHT, stores parsed documents or flushes the index cache", "/IndexCreateIndexingQueue_p.html", + deployThread(INDEXER, "Indexing", "thread that either initiates a parsing/indexing queue, distributes the index into the DHT, stores parsed documents or flushes the index cache", "/IndexCreateIndexingQueue_p.html", new serverInstantBusyThread(this, INDEXER_METHOD_START, INDEXER_METHOD_JOBCOUNT, INDEXER_METHOD_FREEMEM), 10000); - - for (i = 1; i < indexing_cluster; i++) { - setConfig((i + 80) + "_indexing_idlesleep", getConfig(INDEXER_IDLESLEEP, "")); - setConfig((i + 80) + "_indexing_busysleep", getConfig(INDEXER_BUSYSLEEP, "")); - deployThread((i + 80) + "_indexing", "Parsing/Indexing (cluster job)", "thread that performes document parsing and indexing", null, - new serverInstantBusyThread(this, INDEXER_METHOD_START, INDEXER_METHOD_JOBCOUNT, INDEXER_METHOD_FREEMEM), 10000 + (i * 1000), - Long.parseLong(getConfig(INDEXER_IDLESLEEP , "5000")), - Long.parseLong(getConfig(INDEXER_BUSYSLEEP , "0")), - Long.parseLong(getConfig(INDEXER_MEMPREREQ , "1000000"))); - } - deployThread(PROXY_CACHE_ENQUEUE, "Proxy Cache Enqueue", "job takes new input files from RAM stack, stores them, and hands over to the Indexing Stack", null, new serverInstantBusyThread(this, PROXY_CACHE_ENQUEUE_METHOD_START, PROXY_CACHE_ENQUEUE_METHOD_JOBCOUNT, PROXY_CACHE_ENQUEUE_METHOD_FREEMEM), 10000); deployThread(CRAWLJOB_REMOTE_TRIGGERED_CRAWL, "Remote Crawl Job", "thread that performes a single crawl/indexing step triggered by a remote peer", null, @@ -1743,9 +1749,13 @@ public final class plasmaSwitchboard extends serverAbstractSwitch // parse and index the resource - queueEntry.updateStatus(plasmaSwitchboardQueue.QUEUE_STATE_PARSING_WAITING); - queueEntry.updateStatus(plasmaSwitchboardQueue.QUEUE_STATE_PARSING_RUNNING); - plasmaParserDocument document = parseDocument(queueEntry); - if (document == null) { - if (!queueEntry.profile().storeHTCache()) { - plasmaHTCache.filesInUse.remove(queueEntry.cacheFile()); - //plasmaHTCache.deleteURLfromCache(entry.url()); - } - queueEntry.close(); - return true; - } - queueEntry.updateStatus(plasmaSwitchboardQueue.QUEUE_STATE_PARSING_COMPLETE); + indexingQueueEntry document = parseDocument(new indexingQueueEntry(queueEntry, null, null)); // do condensing - queueEntry.updateStatus(plasmaSwitchboardQueue.QUEUE_STATE_CONDENSING_WAITING); - queueEntry.updateStatus(plasmaSwitchboardQueue.QUEUE_STATE_CONDENSING_RUNNING); - plasmaCondenser condensement = condenseDocument(queueEntry, document); - if (condensement == null) { - if (!queueEntry.profile().storeHTCache()) { - plasmaHTCache.filesInUse.remove(queueEntry.cacheFile()); - //plasmaHTCache.deleteURLfromCache(entry.url()); - } - queueEntry.close(); - return true; - } - queueEntry.updateStatus(plasmaSwitchboardQueue.QUEUE_STATE_CONDENSING_COMPLETE); + indexingQueueEntry condensement = condenseDocument(document); // do a web structure analysis - queueEntry.updateStatus(plasmaSwitchboardQueue.QUEUE_STATE_STRUCTUREANALYSIS_WAITING); - queueEntry.updateStatus(plasmaSwitchboardQueue.QUEUE_STATE_STRUCTUREANALYSIS_RUNNING); - document.notifyWebStructure(webStructure, condensement, queueEntry.getModificationDate()); - queueEntry.updateStatus(plasmaSwitchboardQueue.QUEUE_STATE_STRUCTUREANALYSIS_COMPLETE); + indexingQueueEntry analysis = webStructureAnalysis(condensement); // <- CONCURRENT UNTIL HERE, THEN SERIALIZE AGAIN // store the result - queueEntry.updateStatus(plasmaSwitchboardQueue.QUEUE_STATE_INDEXSTORAGE_WAITING); - queueEntry.updateStatus(plasmaSwitchboardQueue.QUEUE_STATE_INDEXSTORAGE_RUNNING); - storeDocumentIndex(queueEntry, document, condensement); - queueEntry.updateStatus(plasmaSwitchboardQueue.QUEUE_STATE_INDEXSTORAGE_COMPLETE); - - // finally close the queue process - if (!queueEntry.profile().storeHTCache()) { - plasmaHTCache.filesInUse.remove(queueEntry.cacheFile()); - //plasmaHTCache.deleteURLfromCache(entry.url()); - } - queueEntry.close(); - + storeDocumentIndex(analysis); + */ return true; } catch (InterruptedException e) { log.logInfo("DEQUEUE: Shutdown detected."); @@ -1931,6 +1912,20 @@ public final class plasmaSwitchboard extends serverAbstractSwitch 1000)) c++; @@ -2134,6 +2129,25 @@ public final class plasmaSwitchboard extends serverAbstractSwitch extends serverAbstractThread implements serverBlockingThread { +public abstract class serverAbstractBlockingThread extends serverAbstractThread implements serverBlockingThread { private BlockingQueue input = null; private BlockingQueue output = null; @@ -60,8 +60,16 @@ public abstract class serverAbstractBlockingThread extends serverAbstractT // do job timestamp = System.currentTimeMillis(); memstamp0 = serverMemory.used(); - O out = this.job(this.input.take()); - if (out != null) this.output.add(out); + I in = this.input.take(); + if ((in == null) || (in == serverProcessor.shutdownJob)) { + // the poison pill: shutdown + // a null element is pushed to the queue on purpose to signal + // that a termination should be made + this.running = false; + break; + } + O out = this.job(in); + if ((out != null) && (this.output != null)) this.output.put(out); // do memory and busy/idle-count/time monitoring memstamp1 = serverMemory.used(); if (memstamp1 >= memstamp0) { diff --git a/source/de/anomic/server/serverAbstractBusyThread.java b/source/de/anomic/server/serverAbstractBusyThread.java index 0242ed9b3..ffd90497f 100644 --- a/source/de/anomic/server/serverAbstractBusyThread.java +++ b/source/de/anomic/server/serverAbstractBusyThread.java @@ -32,6 +32,7 @@ public abstract class serverAbstractBusyThread extends serverAbstractThread impl private long idletime = 0, memprereq = 0; private long idleCycles = 0, busyCycles = 0, outofmemoryCycles = 0; private boolean intermissionObedient = true; + private Object syncObject = new Object(); protected final void announceMoreSleepTime(long millis) { this.idletime += millis; @@ -181,12 +182,31 @@ public abstract class serverAbstractBusyThread extends serverAbstractThread impl this.close(); logSystem("thread '" + this.getName() + "' terminated."); } - + private void ratz(long millis) { - try { - Thread.sleep(millis); + try {/* + if (this.syncObject != null) { + synchronized (this.syncObject) { + this.syncObject.wait(millis); + } + } else {*/ + Thread.sleep(millis); + //} } catch (InterruptedException e) { - if (this.log != null) this.log.logConfig("thread '" + this.getName() + "' interrupted because of shutdown."); + if (this.log != null) + this.log.logConfig("thread '" + this.getName() + "' interrupted because of shutdown."); + } + } + + public void notifyThread() { + if (this.syncObject != null) { + synchronized (this.syncObject) { + if (this.log != null) + this.log.logFine("thread '" + this.getName() + + "' has received a notification from thread '" + + Thread.currentThread().getName() + "'."); + this.syncObject.notifyAll(); + } } } diff --git a/source/de/anomic/server/serverBusyThread.java b/source/de/anomic/server/serverBusyThread.java index dc041c6ff..e11cef462 100644 --- a/source/de/anomic/server/serverBusyThread.java +++ b/source/de/anomic/server/serverBusyThread.java @@ -67,4 +67,5 @@ public interface serverBusyThread extends serverThread { // is called when an outOfMemoryCycle is performed // this method should try to free some memory, so that the job can be executed + public void notifyThread(); } diff --git a/source/de/anomic/server/serverInstantBlockingThread.java b/source/de/anomic/server/serverInstantBlockingThread.java index 48f8404fe..ac44fb360 100644 --- a/source/de/anomic/server/serverInstantBlockingThread.java +++ b/source/de/anomic/server/serverInstantBlockingThread.java @@ -26,21 +26,21 @@ package de.anomic.server; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; -import java.util.TreeMap; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; import de.anomic.server.logging.serverLog; -public class serverInstantBlockingThread extends serverAbstractBlockingThread implements serverBlockingThread { +public class serverInstantBlockingThread extends serverAbstractBlockingThread implements serverBlockingThread { - private Method jobExecMethod, jobCountMethod; + private Method jobExecMethod; private Object environment; private Long handle; - + private static int handleCounter = 0; public static int instantThreadCounter = 0; - public static TreeMap jobs = new TreeMap(); + public static ConcurrentHashMap jobs = new ConcurrentHashMap(); - public serverInstantBlockingThread(Object env, String jobExec, String jobCount, BlockingQueue input, BlockingQueue output) { + public serverInstantBlockingThread(Object env, String jobExec, BlockingQueue input, BlockingQueue output) { // jobExec is the name of a method of the object 'env' that executes the one-step-run // jobCount is the name of a method that returns the size of the job @@ -51,50 +51,37 @@ public class serverInstantBlockingThread extends serverAbstractBlockingThr // define execution class Class theClass = (env instanceof Class) ? (Class) env : env.getClass(); try { - this.jobExecMethod = theClass.getMethod(jobExec, new Class[0]); + this.jobExecMethod = null; + Method[] methods = theClass.getMethods(); + for (int i = 0; i < methods.length; i++) { + if ((methods[i].getParameterTypes().length == 1) && (methods[i].getName().equals(jobExec))) { + this.jobExecMethod = methods[i]; + break; + } + } + if (this.jobExecMethod == null) throw new NoSuchMethodException(jobExec + " does not exist in " + env.getClass().getName()); } catch (NoSuchMethodException e) { throw new RuntimeException("serverInstantThread, wrong declaration of jobExec: " + e.getMessage()); } - try { - if (jobCount == null) - this.jobCountMethod = null; - else - this.jobCountMethod = theClass.getMethod(jobCount, new Class[0]); - - } catch (NoSuchMethodException e) { - throw new RuntimeException("serverInstantThread, wrong declaration of jobCount: " + e.getMessage()); - } + this.environment = (env instanceof Class) ? null : env; - this.setName(theClass.getName() + "." + jobExec); + this.setName(theClass.getName() + "." + jobExec + "." + handleCounter++); this.handle = new Long(System.currentTimeMillis() + this.getName().hashCode()); } public int getJobCount() { - if (this.jobCountMethod == null) return Integer.MAX_VALUE; - try { - Object result = jobCountMethod.invoke(environment, new Object[0]); - if (result instanceof Integer) - return ((Integer) result).intValue(); - else - return -1; - } catch (IllegalAccessException e) { - return -1; - } catch (IllegalArgumentException e) { - return -1; - } catch (InvocationTargetException e) { - serverLog.logSevere("BLOCKINGTHREAD", "invocation serverInstantThread of thread '" + this.getName() + "': " + e.getMessage(), e); - return -1; - } + return this.getInputQueue().size(); } @SuppressWarnings("unchecked") public O job(I next) throws Exception { + if (next == null) return null; // poison pill: shutdown instantThreadCounter++; //System.out.println("started job " + this.handle + ": " + this.getName()); - synchronized(jobs) {jobs.put(this.handle, this.getName());} + jobs.put(this.handle, this.getName()); O out = null; try { - out = (O) jobExecMethod.invoke(environment, new Object[0]); + out = (O) jobExecMethod.invoke(environment, new Object[]{next}); } catch (IllegalAccessException e) { serverLog.logSevere("BLOCKINGTHREAD", "Internal Error in serverInstantThread.job: " + e.getMessage()); serverLog.logSevere("BLOCKINGTHREAD", "shutting down thread '" + this.getName() + "'"); @@ -115,7 +102,7 @@ public class serverInstantBlockingThread extends serverAbstractBlockingThr e.printStackTrace(); } instantThreadCounter--; - synchronized(jobs) {jobs.remove(this.handle);} + jobs.remove(this.handle); return out; } diff --git a/source/de/anomic/server/serverProcessor.java b/source/de/anomic/server/serverProcessor.java index b4ebf1bba..8a925c83a 100644 --- a/source/de/anomic/server/serverProcessor.java +++ b/source/de/anomic/server/serverProcessor.java @@ -24,9 +24,58 @@ package de.anomic.server; -public class serverProcessor { +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +public class serverProcessor { public static final int availableCPU = Runtime.getRuntime().availableProcessors(); public static int useCPU = availableCPU; + ExecutorService executor; + BlockingQueue input; + BlockingQueue output; + int poolsize; + + public serverProcessor(Object env, String jobExec, BlockingQueue input, BlockingQueue output) { + this(env, jobExec, input, output, useCPU + 1); + } + + public serverProcessor(Object env, String jobExec, BlockingQueue input, BlockingQueue output, int poolsize) { + // start a fixed number of executors that handle entries in the process queue + this.input = input; + this.output = output; + this.poolsize = poolsize; + executor = Executors.newCachedThreadPool(); + for (int i = 0; i < poolsize; i++) { + executor.submit(new serverInstantBlockingThread(env, jobExec, input, output)); + } + } + + @SuppressWarnings("unchecked") + public void shutdown(long millisTimeout) { + if (executor == null) return; + if (executor.isShutdown()) return; + // put poison pills into the queue + for (int i = 0; i < poolsize; i++) { + try { + input.put((I) shutdownJob); + } catch (InterruptedException e) { } + } + // wait for shutdown + try { + executor.awaitTermination(millisTimeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) {} + executor.shutdown(); + executor = null; + } + + public static class SpecialJob implements serverProcessorJob { + int type = 0; + } + + public static final serverProcessorJob shutdownJob = new SpecialJob(); + } diff --git a/source/de/anomic/server/serverProcessorJob.java b/source/de/anomic/server/serverProcessorJob.java new file mode 100644 index 000000000..ddc98c835 --- /dev/null +++ b/source/de/anomic/server/serverProcessorJob.java @@ -0,0 +1,5 @@ +package de.anomic.server; + +public interface serverProcessorJob { + +}