- refactoring of serverThreads: renaming to distinguish busy-threads and blocking-threads

- added blockingThreads which are threads that are not driven by pause times but by BlockingQueue lookup

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@4606 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
orbiter 17 years ago
parent 968c775025
commit bca87f1e38

@ -59,7 +59,7 @@ import de.anomic.plasma.plasmaSwitchboard;
import de.anomic.server.serverCodings;
import de.anomic.server.serverCore;
import de.anomic.server.serverDomains;
import de.anomic.server.serverInstantThread;
import de.anomic.server.serverInstantBusyThread;
import de.anomic.server.serverObjects;
import de.anomic.server.serverSwitch;
import de.anomic.yacy.yacyCore;
@ -96,7 +96,7 @@ public class ConfigBasic {
//boolean doPeerPing = false;
if ((yacyCore.seedDB.mySeed().isVirgin()) || (yacyCore.seedDB.mySeed().isJunior())) {
serverInstantThread.oneTimeJob(sb.yc, "peerPing", null, 0);
serverInstantBusyThread.oneTimeJob(sb.yc, "peerPing", null, 0);
//doPeerPing = true;
}

@ -27,10 +27,10 @@
import de.anomic.http.httpHeader;
import de.anomic.plasma.plasmaSwitchboard;
import de.anomic.server.serverBusyThread;
import de.anomic.server.serverCodings;
import de.anomic.server.serverObjects;
import de.anomic.server.serverSwitch;
import de.anomic.server.serverThread;
import de.anomic.yacy.yacyCore;
public class ConfigNetwork_p {
@ -122,7 +122,7 @@ public class ConfigNetwork_p {
newppm = Math.max(1, Integer.parseInt(post.get("acceptCrawlLimit", "1")));
} catch (NumberFormatException e) {}
long newBusySleep = Math.max(100, 60000 / newppm);
serverThread rct = sb.getThread(plasmaSwitchboard.CRAWLJOB_REMOTE_TRIGGERED_CRAWL);
serverBusyThread rct = sb.getThread(plasmaSwitchboard.CRAWLJOB_REMOTE_TRIGGERED_CRAWL);
rct.setBusySleep(newBusySleep);
sb.setConfig(plasmaSwitchboard.CRAWLJOB_REMOTE_TRIGGERED_CRAWL_BUSYSLEEP, Long.toString(newBusySleep));

@ -7,9 +7,9 @@ import de.anomic.http.httpHeader;
import de.anomic.plasma.plasmaGrafics;
import de.anomic.plasma.plasmaSwitchboard;
import de.anomic.plasma.plasmaGrafics.CircleThreadPiece;
import de.anomic.server.serverBusyThread;
import de.anomic.server.serverObjects;
import de.anomic.server.serverSwitch;
import de.anomic.server.serverThread;
public class PeerLoadPicture {
@ -36,7 +36,7 @@ public class PeerLoadPicture {
Iterator<String> threads = env.threadNames();
String threadname;
serverThread thread;
serverBusyThread thread;
long busy_time = 0;

@ -49,6 +49,7 @@ import java.util.Map;
import de.anomic.http.httpHeader;
import de.anomic.plasma.plasmaSwitchboard;
import de.anomic.server.serverBusyThread;
import de.anomic.server.serverCore;
import de.anomic.server.serverFileUtils;
import de.anomic.server.serverObjects;
@ -66,7 +67,7 @@ public class PerformanceQueues_p {
Map<String, String> defaultSettings = ((post == null) || (!(post.containsKey("submitdefault")))) ? null : serverFileUtils.loadHashMap(defaultSettingsFile);
Iterator<String> threads = switchboard.threadNames();
String threadName;
serverThread thread;
serverBusyThread thread;
boolean xml = ((String)header.get("PATH")).endsWith(".xml");
prop.setLocalized(!xml);

@ -70,7 +70,6 @@ import de.anomic.server.serverDate;
import de.anomic.server.serverMemory;
import de.anomic.server.serverObjects;
import de.anomic.server.serverSwitch;
import de.anomic.server.serverThread;
import de.anomic.yacy.yacyCore;
import de.anomic.yacy.yacySeed;
import de.anomic.yacy.yacySeedUploader;
@ -257,8 +256,8 @@ public class SettingsAck_p {
httpd.initPortForwarding();
// notifying publishSeed Thread
serverThread peerPing = env.getThread("30_peerping");
peerPing.notifyThread();
//serverThread peerPing = env.getThread("30_peerping");
//peerPing.notifyThread();
} catch (Exception e) {
prop.put("info", "23");
prop.putHTML("info_errormsg",(e.getMessage() == null) ? "unknown" : e.getMessage().replaceAll("\n","<br>"));

@ -25,7 +25,7 @@ import de.anomic.kelondro.kelondroRowSet;
import de.anomic.kelondro.kelondroSQLTable;
import de.anomic.kelondro.kelondroSplitTable;
import de.anomic.kelondro.kelondroTree;
import de.anomic.server.serverInstantThread;
import de.anomic.server.serverInstantBusyThread;
import de.anomic.server.serverMemory;
import de.anomic.server.logging.serverLog;
import de.anomic.ymage.ymageChart;
@ -449,10 +449,10 @@ public class dbtest {
r = Math.abs(random.nextLong() % 1000);
jcontrol.add(new Long(r));
kcontrol.putb((int) r, "x".getBytes());
serverInstantThread.oneTimeJob(new WriteJob(table_test, table_reference, r), 0, 50);
serverInstantBusyThread.oneTimeJob(new WriteJob(table_test, table_reference, r), 0, 50);
if (random.nextLong() % 5 == 0) ra.add(new Long(r));
for (int j = 0; j < readCount; j++) {
serverInstantThread.oneTimeJob(new ReadJob(table_test, table_reference, random.nextLong() % writeCount), random.nextLong() % 1000, 20);
serverInstantBusyThread.oneTimeJob(new ReadJob(table_test, table_reference, random.nextLong() % writeCount), random.nextLong() % 1000, 20);
}
if ((ra.size() > 0) && (random.nextLong() % 7 == 0)) {
rc++;
@ -461,13 +461,13 @@ public class dbtest {
jcontrol.remove(R);
kcontrol.removeb((int) R.longValue());
System.out.println("remove: " + R.longValue());
serverInstantThread.oneTimeJob(new RemoveJob(table_test, table_reference, ((Long) ra.remove(p)).longValue()), 0, 50);
serverInstantBusyThread.oneTimeJob(new RemoveJob(table_test, table_reference, ((Long) ra.remove(p)).longValue()), 0, 50);
}
}
System.out.println("removed: " + rc + ", size of jcontrol set: " + jcontrol.size() + ", size of kcontrol set: " + kcontrol.size());
while (serverInstantThread.instantThreadCounter > 0) {
while (serverInstantBusyThread.instantThreadCounter > 0) {
try {Thread.sleep(1000);} catch (InterruptedException e) {} // wait for all tasks to finish
System.out.println("count: " + serverInstantThread.instantThreadCounter + ", jobs: " + serverInstantThread.jobs.toString());
System.out.println("count: " + serverInstantBusyThread.instantThreadCounter + ", jobs: " + serverInstantBusyThread.jobs.toString());
}
try {Thread.sleep(6000);} catch (InterruptedException e) {}
}

@ -83,7 +83,7 @@ import de.anomic.plasma.cache.UnsupportedProtocolException;
import de.anomic.server.serverCodings;
import de.anomic.server.serverDomains;
import de.anomic.server.serverFileUtils;
import de.anomic.server.serverInstantThread;
import de.anomic.server.serverInstantBusyThread;
import de.anomic.server.serverSystem;
import de.anomic.server.serverThread;
import de.anomic.server.logging.serverLog;
@ -292,7 +292,7 @@ public final class plasmaHTCache {
// start the cache startup thread
// this will collect information about the current cache size and elements
try {
cacheScanThread = serverInstantThread.oneTimeJob(Class.forName("de.anomic.plasma.plasmaHTCache"), "cacheScan", log, 120000);
cacheScanThread = serverInstantBusyThread.oneTimeJob(Class.forName("de.anomic.plasma.plasmaHTCache"), "cacheScan", log, 120000);
} catch (ClassNotFoundException e) {
e.printStackTrace();
}

@ -137,9 +137,10 @@ import de.anomic.plasma.crawler.plasmaProtocolLoader;
import de.anomic.plasma.dbImport.dbImportManager;
import de.anomic.plasma.parser.ParserException;
import de.anomic.server.serverAbstractSwitch;
import de.anomic.server.serverBusyThread;
import de.anomic.server.serverDomains;
import de.anomic.server.serverFileUtils;
import de.anomic.server.serverInstantThread;
import de.anomic.server.serverInstantBusyThread;
import de.anomic.server.serverMemory;
import de.anomic.server.serverObjects;
import de.anomic.server.serverProfiling;
@ -975,7 +976,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<plasmaSwitchbo
// start yacy core
log.logConfig("Starting YaCy Protocol Core");
this.yc = new yacyCore(this);
serverInstantThread.oneTimeJob(yacyCore.peerActions, "loadSeedLists", yacyCore.log, 0);
serverInstantBusyThread.oneTimeJob(yacyCore.peerActions, "loadSeedLists", yacyCore.log, 0);
long startedSeedListAquisition = System.currentTimeMillis();
// set up local robots.txt
@ -1291,43 +1292,41 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<plasmaSwitchbo
int indexing_cluster = Integer.parseInt(getConfig(INDEXER_CLUSTER, "1"));
if (indexing_cluster < 1) indexing_cluster = 1;
deployThread(CLEANUP, "Cleanup", "simple cleaning process for monitoring information", null,
new serverInstantThread(this, CLEANUP_METHOD_START, CLEANUP_METHOD_JOBCOUNT, CLEANUP_METHOD_FREEMEM), 10000); // all 5 Minutes
new serverInstantBusyThread(this, CLEANUP_METHOD_START, CLEANUP_METHOD_JOBCOUNT, CLEANUP_METHOD_FREEMEM), 10000); // all 5 Minutes
deployThread(CRAWLSTACK, "Crawl URL Stacker", "process that checks url for double-occurrences and for allowance/disallowance by robots.txt", null,
new serverInstantThread(crawlStacker, CRAWLSTACK_METHOD_START, CRAWLSTACK_METHOD_JOBCOUNT, CRAWLSTACK_METHOD_FREEMEM), 8000);
new serverInstantBusyThread(crawlStacker, CRAWLSTACK_METHOD_START, CRAWLSTACK_METHOD_JOBCOUNT, CRAWLSTACK_METHOD_FREEMEM), 8000);
//deployThread(PARSER, "Parsing", "thread that feeds a concurrent document parsing queue", "/IndexCreateIndexingQueue_p.html",
//new serverInstantThread(this, PARSER_METHOD_START, PARSER_METHOD_JOBCOUNT, PARSER_METHOD_FREEMEM), 10000);
deployThread(INDEXER, "Indexing", "thread that either distributes the index into the DHT, stores parsed documents or flushes the index cache", "/IndexCreateIndexingQueue_p.html",
new serverInstantThread(this, INDEXER_METHOD_START, INDEXER_METHOD_JOBCOUNT, INDEXER_METHOD_FREEMEM), 10000);
new serverInstantBusyThread(this, INDEXER_METHOD_START, INDEXER_METHOD_JOBCOUNT, INDEXER_METHOD_FREEMEM), 10000);
for (i = 1; i < indexing_cluster; i++) {
setConfig((i + 80) + "_indexing_idlesleep", getConfig(INDEXER_IDLESLEEP, ""));
setConfig((i + 80) + "_indexing_busysleep", getConfig(INDEXER_BUSYSLEEP, ""));
deployThread((i + 80) + "_indexing", "Parsing/Indexing (cluster job)", "thread that performes document parsing and indexing", null,
new serverInstantThread(this, INDEXER_METHOD_START, INDEXER_METHOD_JOBCOUNT, INDEXER_METHOD_FREEMEM), 10000 + (i * 1000),
new serverInstantBusyThread(this, INDEXER_METHOD_START, INDEXER_METHOD_JOBCOUNT, INDEXER_METHOD_FREEMEM), 10000 + (i * 1000),
Long.parseLong(getConfig(INDEXER_IDLESLEEP , "5000")),
Long.parseLong(getConfig(INDEXER_BUSYSLEEP , "0")),
Long.parseLong(getConfig(INDEXER_MEMPREREQ , "1000000")));
}
deployThread(PROXY_CACHE_ENQUEUE, "Proxy Cache Enqueue", "job takes new input files from RAM stack, stores them, and hands over to the Indexing Stack", null,
new serverInstantThread(this, PROXY_CACHE_ENQUEUE_METHOD_START, PROXY_CACHE_ENQUEUE_METHOD_JOBCOUNT, PROXY_CACHE_ENQUEUE_METHOD_FREEMEM), 10000);
new serverInstantBusyThread(this, PROXY_CACHE_ENQUEUE_METHOD_START, PROXY_CACHE_ENQUEUE_METHOD_JOBCOUNT, PROXY_CACHE_ENQUEUE_METHOD_FREEMEM), 10000);
deployThread(CRAWLJOB_REMOTE_TRIGGERED_CRAWL, "Remote Crawl Job", "thread that performes a single crawl/indexing step triggered by a remote peer", null,
new serverInstantThread(crawlQueues, CRAWLJOB_REMOTE_TRIGGERED_CRAWL_METHOD_START, CRAWLJOB_REMOTE_TRIGGERED_CRAWL_METHOD_JOBCOUNT, CRAWLJOB_REMOTE_TRIGGERED_CRAWL_METHOD_FREEMEM), 30000);
new serverInstantBusyThread(crawlQueues, CRAWLJOB_REMOTE_TRIGGERED_CRAWL_METHOD_START, CRAWLJOB_REMOTE_TRIGGERED_CRAWL_METHOD_JOBCOUNT, CRAWLJOB_REMOTE_TRIGGERED_CRAWL_METHOD_FREEMEM), 30000);
deployThread(CRAWLJOB_REMOTE_CRAWL_LOADER, "Remote Crawl URL Loader", "thread that loads remote crawl lists from other peers", "",
new serverInstantThread(crawlQueues, CRAWLJOB_REMOTE_CRAWL_LOADER_METHOD_START, CRAWLJOB_REMOTE_CRAWL_LOADER_METHOD_JOBCOUNT, CRAWLJOB_REMOTE_CRAWL_LOADER_METHOD_FREEMEM), 30000); // error here?
new serverInstantBusyThread(crawlQueues, CRAWLJOB_REMOTE_CRAWL_LOADER_METHOD_START, CRAWLJOB_REMOTE_CRAWL_LOADER_METHOD_JOBCOUNT, CRAWLJOB_REMOTE_CRAWL_LOADER_METHOD_FREEMEM), 30000); // error here?
deployThread(CRAWLJOB_LOCAL_CRAWL, "Local Crawl", "thread that performes a single crawl step from the local crawl queue", "/IndexCreateWWWLocalQueue_p.html",
new serverInstantThread(crawlQueues, CRAWLJOB_LOCAL_CRAWL_METHOD_START, CRAWLJOB_LOCAL_CRAWL_METHOD_JOBCOUNT, CRAWLJOB_LOCAL_CRAWL_METHOD_FREEMEM), 10000);
new serverInstantBusyThread(crawlQueues, CRAWLJOB_LOCAL_CRAWL_METHOD_START, CRAWLJOB_LOCAL_CRAWL_METHOD_JOBCOUNT, CRAWLJOB_LOCAL_CRAWL_METHOD_FREEMEM), 10000);
deployThread(SEED_UPLOAD, "Seed-List Upload", "task that a principal peer performes to generate and upload a seed-list to a ftp account", null,
new serverInstantThread(yc, SEED_UPLOAD_METHOD_START, SEED_UPLOAD_METHOD_JOBCOUNT, SEED_UPLOAD_METHOD_FREEMEM), 180000);
serverInstantThread peerPing = null;
new serverInstantBusyThread(yc, SEED_UPLOAD_METHOD_START, SEED_UPLOAD_METHOD_JOBCOUNT, SEED_UPLOAD_METHOD_FREEMEM), 180000);
deployThread(PEER_PING, "YaCy Core", "this is the p2p-control and peer-ping task", null,
peerPing = new serverInstantThread(yc, PEER_PING_METHOD_START, PEER_PING_METHOD_JOBCOUNT, PEER_PING_METHOD_FREEMEM), 2000);
peerPing.setSyncObject(new Object());
new serverInstantBusyThread(yc, PEER_PING_METHOD_START, PEER_PING_METHOD_JOBCOUNT, PEER_PING_METHOD_FREEMEM), 2000);
deployThread(INDEX_DIST, "DHT Distribution", "selection, transfer and deletion of index entries that are not searched on your peer, but on others", null,
new serverInstantThread(this, INDEX_DIST_METHOD_START, INDEX_DIST_METHOD_JOBCOUNT, INDEX_DIST_METHOD_FREEMEM), 60000,
new serverInstantBusyThread(this, INDEX_DIST_METHOD_START, INDEX_DIST_METHOD_JOBCOUNT, INDEX_DIST_METHOD_FREEMEM), 60000,
Long.parseLong(getConfig(INDEX_DIST_IDLESLEEP , "5000")),
Long.parseLong(getConfig(INDEX_DIST_BUSYSLEEP , "0")),
Long.parseLong(getConfig(INDEX_DIST_MEMPREREQ , "1000000")));
@ -2422,7 +2421,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<plasmaSwitchbo
if (wantedPPM >= 1000) wantedPPM = 1000;
int newBusySleep = 60000 / wantedPPM; // for wantedPPM = 10: 6000; for wantedPPM = 1000: 60
serverThread thread;
serverBusyThread thread;
thread = getThread(INDEX_DIST);
if (thread != null) {

@ -54,7 +54,7 @@ import com.jcraft.jsch.Session;
import com.jcraft.jsch.UIKeyboardInteractive;
import com.jcraft.jsch.UserInfo;
import de.anomic.server.serverInstantThread;
import de.anomic.server.serverInstantBusyThread;
import de.anomic.server.serverSwitch;
import de.anomic.server.logging.serverLog;
import de.anomic.server.portForwarding.serverPortForwarding;
@ -95,7 +95,7 @@ public class serverPortForwardingSch implements serverPortForwarding{
private int localHostPort;
private static Session session;
private static serverInstantThread sessionWatcher;
private static serverInstantBusyThread sessionWatcher;
private serverLog log;
@ -190,8 +190,7 @@ public class serverPortForwardingSch implements serverPortForwarding{
if (sessionWatcher == null) {
this.log.logFine("Deploying port forwarding session watcher thread.");
this.switchboard.deployThread("portForwardingWatcher", "Remote Port Forwarding Watcher", "this thread is used to detect broken connections and to re-establish it if necessary.", null,
sessionWatcher = new serverInstantThread(this, "reconnect", null, null), 30000,30000,30000,1000);
sessionWatcher.setSyncObject(new Object());
sessionWatcher = new serverInstantBusyThread(this, "reconnect", null, null), 30000,30000,30000,1000);
}
this.log.logInfo("Remote port forwarding connection established: " +

@ -0,0 +1,91 @@
// serverAbstractBlockingThread.java
// (C) 2008 by Michael Peter Christen; mc@yacy.net, Frankfurt a. M., Germany
// first published 27.03.2008 on http://yacy.net
//
// $LastChangedDate: 2006-04-02 22:40:07 +0200 (So, 02 Apr 2006) $
// $LastChangedRevision: 1986 $
// $LastChangedBy: orbiter $
//
// LICENSE
//
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 2 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
package de.anomic.server;
import java.util.concurrent.BlockingQueue;
import de.anomic.server.logging.serverLog;
public abstract class serverAbstractBlockingThread<I, O> extends serverAbstractThread implements serverBlockingThread<I, O> {
private BlockingQueue<I> input = null;
private BlockingQueue<O> output = null;
public void setInputQueue(BlockingQueue<I> queue) {
this.input = queue;
}
public void setOutputQueue(BlockingQueue<O> queue) {
this.output = queue;
}
public BlockingQueue<I> getInputQueue() {
return this.input;
}
public BlockingQueue<O> getOutputQueue() {
return this.output;
}
public void run() {
this.open();
if (log != null) {
logSystem("thread '" + this.getName() + "' deployed, starting loop.");
}
long timestamp;
long memstamp0, memstamp1;
long busyCycles = 0;
while (running) {
try {
// do job
timestamp = System.currentTimeMillis();
memstamp0 = serverMemory.used();
O out = this.job(this.input.take());
if (out != null) this.output.add(out);
// do memory and busy/idle-count/time monitoring
memstamp1 = serverMemory.used();
if (memstamp1 >= memstamp0) {
// no GC in between. this is not shure but most probable
memuse += memstamp1 - memstamp0;
} else {
// GC was obviously in between. Add an average as simple heuristic
if (busyCycles > 0) memuse += memuse / busyCycles;
}
busytime += System.currentTimeMillis() - timestamp;
} catch (Exception e) {
// handle exceptions: thread must not die on any unexpected exceptions
// if the exception is too bad it should call terminate()
this.jobExceptionHandler(e);
} finally {
busyCycles++;
}
}
this.close();
logSystem("thread '" + this.getName() + "' terminated.");
}
private void logSystem(String text) {
if (log == null) serverLog.logConfig("THREAD-CONTROL", text);
else log.logConfig(text);
}
}

@ -0,0 +1,197 @@
// serverAbstractBusyThread.java
// (C) 2005 by Michael Peter Christen; mc@yacy.net, Frankfurt a. M., Germany
// first published 14.03.2005 on http://yacy.net
//
// $LastChangedDate: 2006-04-02 22:40:07 +0200 (So, 02 Apr 2006) $
// $LastChangedRevision: 1986 $
// $LastChangedBy: orbiter $
//
// LICENSE
//
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 2 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
package de.anomic.server;
import de.anomic.server.logging.serverLog;
public abstract class serverAbstractBusyThread extends serverAbstractThread implements serverBusyThread {
private long startup = 0, intermission = 0, idlePause = 0, busyPause = 0;
private long idletime = 0, memprereq = 0;
private long idleCycles = 0, busyCycles = 0, outofmemoryCycles = 0;
private boolean intermissionObedient = true;
protected final void announceMoreSleepTime(long millis) {
this.idletime += millis;
}
public final void setStartupSleep(long milliseconds) {
// sets a sleep time before execution of the job-loop
startup = milliseconds;
}
public final long setIdleSleep(long milliseconds) {
// sets a sleep time for pauses between two jobs
idlePause = milliseconds;
return milliseconds;
}
public final long setBusySleep(long milliseconds) {
// sets a sleep time for pauses between two jobs
busyPause = milliseconds;
return milliseconds;
}
public void setMemPreReqisite(long freeBytes) {
// sets minimum required amount of memory for the job execution
memprereq = freeBytes;
}
public void setObeyIntermission(boolean obey) {
// defines if the thread should obey the intermission command
intermissionObedient = obey;
}
public final long getIdleCycles() {
// returns the total number of cycles of job execution with idle-result
return this.idleCycles;
}
public final long getBusyCycles() {
// returns the total number of cycles of job execution with busy-result
return this.busyCycles;
}
public long getOutOfMemoryCycles() {
// returns the total number of cycles where
// a job execution was omitted because of memory shortage
return this.outofmemoryCycles;
}
public final long getSleepTime() {
// returns the total time that this thread has slept so far
return this.idletime;
}
public void intermission(long pause) {
if (pause == Long.MAX_VALUE)
this.intermission = Long.MAX_VALUE;
else
this.intermission = System.currentTimeMillis() + pause;
}
public void run() {
if (startup > 0) {
// do a startup-delay
logSystem("thread '" + this.getName() + "' deployed, delaying start-up.");
ratz(startup);
if (!(running)) return;
}
this.open();
if (log != null) {
if (startup > 0)
logSystem("thread '" + this.getName() + "' delayed, " + ((this.busyPause < 0) ? "starting now job." : "starting now loop."));
else
logSystem("thread '" + this.getName() + "' deployed, " + ((this.busyPause < 0) ? "starting job." : "starting loop."));
}
long timestamp;
long memstamp0, memstamp1;
boolean isBusy;
//Runtime rt = Runtime.getRuntime();
while (running) {
if ((this.intermissionObedient) && (this.intermission > 0) && (this.intermission != Long.MAX_VALUE)) {
long itime = this.intermission - System.currentTimeMillis();
if (itime > 0) {
if (itime > this.idlePause) itime = this.idlePause;
logSystem("thread '" + this.getName()
+ "' breaks for intermission: " + (itime / 1000)
+ " seconds");
ratz(itime);
}
this.intermission = 0;
}
if (this.intermission == Long.MAX_VALUE) {
// omit Job, paused
logSystem("thread '" + this.getName() + "' paused");
timestamp = System.currentTimeMillis();
ratz(this.idlePause);
idletime += System.currentTimeMillis() - timestamp;
//} else if ((memnow = serverMemory.available()) > memprereq) try {
} else if (serverMemory.request(memprereq, false)) try {
// do job
timestamp = System.currentTimeMillis();
memstamp0 = serverMemory.used();
isBusy = this.job();
// do memory and busy/idle-count/time monitoring
if (isBusy) {
memstamp1 = serverMemory.used();
if (memstamp1 >= memstamp0) {
// no GC in between. this is not shure but most probable
memuse += memstamp1 - memstamp0;
} else {
// GC was obviously in between. Add an average as simple heuristic
if (busyCycles > 0) memuse += memuse / busyCycles;
}
busytime += System.currentTimeMillis() - timestamp;
busyCycles++;
} else {
idleCycles++;
}
// interrupt loop if this is interrupted or supposed to be a one-time job
if ((!running) || (this.isInterrupted())) break;
if ((this.idlePause < 0) || (this.busyPause < 0)) break; // for one-time jobs
// process scheduled pause
timestamp = System.currentTimeMillis();
ratz((isBusy) ? this.busyPause : this.idlePause);
idletime += System.currentTimeMillis() - timestamp;
} catch (Exception e) {
// handle exceptions: thread must not die on any unexpected exceptions
// if the exception is too bad it should call terminate()
this.jobExceptionHandler(e);
busyCycles++;
} else {
log.logWarning("Thread '" + this.getName() + "' runs short memory cycle. Free mem: " +
(serverMemory.available() / 1024) + " KB, needed: " + (memprereq / 1024) + " KB");
// omit job, not enough memory
// process scheduled pause
timestamp = System.currentTimeMillis();
// do a clean-up
this.freemem();
// sleep a while
ratz(this.idlePause);
idletime += System.currentTimeMillis() - timestamp;
outofmemoryCycles++;
}
}
this.close();
logSystem("thread '" + this.getName() + "' terminated.");
}
private void ratz(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
if (this.log != null) this.log.logConfig("thread '" + this.getName() + "' interrupted because of shutdown.");
}
}
private void logSystem(String text) {
if (log == null) serverLog.logConfig("THREAD-CONTROL", text);
else log.logConfig(text);
}
}

@ -66,7 +66,7 @@ public abstract class serverAbstractSwitch<E> implements serverSwitch<E> {
private Map<String, String> configProps;
private Map<String, String> configRemoved;
private HashMap<InetAddress, String> authorization;
private TreeMap<String, serverThread> workerThreads;
private TreeMap<String, serverBusyThread> workerThreads;
private TreeMap<String, serverSwitchAction> switchActions;
protected HashMap<String, TreeMap<Long, String>> accessTracker; // mappings from requesting host to an ArrayList of serverTrack-entries
private LinkedBlockingQueue<E> cacheStack;
@ -151,7 +151,7 @@ public abstract class serverAbstractSwitch<E> implements serverSwitch<E> {
accessTracker = new HashMap<String, TreeMap<Long, String>>();
// init thread control
workerThreads = new TreeMap<String, serverThread>();
workerThreads = new TreeMap<String, serverBusyThread>();
// init switch actions
switchActions = new TreeMap<String, serverSwitchAction>();
@ -379,7 +379,7 @@ public abstract class serverAbstractSwitch<E> implements serverSwitch<E> {
String threadShortDescription,
String threadLongDescription,
String threadMonitorURL,
serverThread newThread,
serverBusyThread newThread,
long startupDelay) {
deployThread(threadName, threadShortDescription, threadLongDescription, threadMonitorURL,
newThread, startupDelay,
@ -393,7 +393,7 @@ public abstract class serverAbstractSwitch<E> implements serverSwitch<E> {
String threadShortDescription,
String threadLongDescription,
String threadMonitorURL,
serverThread newThread,
serverBusyThread newThread,
long startupDelay,
long initialIdleSleep,
long initialBusySleep,
@ -429,12 +429,12 @@ public abstract class serverAbstractSwitch<E> implements serverSwitch<E> {
if (workerThreads.containsKey(threadName)) newThread.start();
}
public serverThread getThread(String threadName) {
return (serverThread) workerThreads.get(threadName);
public serverBusyThread getThread(String threadName) {
return workerThreads.get(threadName);
}
public void setThreadPerformance(String threadName, long idleMillis, long busyMillis, long memprereqBytes) {
serverThread thread = (serverThread) workerThreads.get(threadName);
serverBusyThread thread = workerThreads.get(threadName);
if (thread != null) {
thread.setIdleSleep(idleMillis);
thread.setBusySleep(busyMillis);
@ -452,7 +452,7 @@ public abstract class serverAbstractSwitch<E> implements serverSwitch<E> {
public void intermissionAllThreads(long pause) {
Iterator<String> e = workerThreads.keySet().iterator();
while (e.hasNext()) {
((serverThread) workerThreads.get(e.next())).intermission(pause);
workerThreads.get(e.next()).intermission(pause);
}
}

@ -1,10 +1,13 @@
// serverAbstractThread.java
// -----------------------
// (C) by Michael Peter Christen; mc@anomic.de
// first published on http://www.yacy.net
// Frankfurt, Germany, 2005
// last major change: 14.03.2005
// serverAbstractThread.java
// (C) 2005 by Michael Peter Christen; mc@yacy.net, Frankfurt a. M., Germany
// first published 14.03.2005 on http://yacy.net
//
// $LastChangedDate: 2006-04-02 22:40:07 +0200 (So, 02 Apr 2006) $
// $LastChangedRevision: 1986 $
// $LastChangedBy: orbiter $
//
// LICENSE
//
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 2 of the License, or
@ -18,28 +21,9 @@
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
//
// Using this software in any meaning (reading, learning, copying, compiling,
// running) means that you agree that the Author(s) is (are) not responsible
// for cost, loss of data or any harm that may be caused directly or indirectly
// by usage of this softare or this documentation. The usage of this software
// is on your own risk. The installation and usage (starting/running) of this
// software may allow other people or application to access your computer and
// any attached devices and is highly dependent on the configuration of the
// software which must be done by the user of the software; the author(s) is
// (are) also not responsible for proper configuration and usage of the
// software, even if provoked by documentation provided together with
// the software.
//
// Any changes to this file according to the GPL as documented in the file
// gpl.txt aside this file in the shipment you received can be done to the
// lines that follows this copyright notice here, but changes must not be
// done inside the copyright notive above. A re-distribution must contain
// the intact and unchanged copyright notice.
// Contributions and changes to the program code must be marked as such.
/*
an Implementation of a serverRunnable must only extend this class and impement
an Implementation of a serverThread must only extend this class and implement
the methods:
open(),
job() and
@ -54,16 +38,13 @@ import de.anomic.server.logging.serverLog;
public abstract class serverAbstractThread extends Thread implements serverThread {
private long startup = 0, intermission = 0, idlePause = 0, busyPause = 0, blockPause = 0;
private boolean running = true;
private serverLog log = null;
private long idletime = 0, busytime = 0, memprereq = 0, memuse = 0;
private String shortDescr = "", longDescr = "";
private String monitorURL = null;
private long threadBlockTimestamp = System.currentTimeMillis();
private long idleCycles = 0, busyCycles = 0, outofmemoryCycles = 0;
private Object syncObject = null;
private boolean intermissionObedient = true;
protected boolean running = true;
protected serverLog log = null;
protected long busytime = 0, memuse = 0;
private long blockPause = 0;
private String shortDescr = "", longDescr = "";
private String monitorURL = null;
private long threadBlockTimestamp = System.currentTimeMillis();
protected final void announceThreadBlockApply() {
// shall only be used, if a thread blocks for an important reason
@ -85,10 +66,6 @@ public abstract class serverAbstractThread extends Thread implements serverThrea
this.busytime += millis;
}
protected final void announceMoreSleepTime(long millis) {
this.idletime += millis;
}
public final void setDescription(String shortText, String longText, String monitorURL) {
// sets a visible description string
this.shortDescr = shortText;
@ -96,33 +73,6 @@ public abstract class serverAbstractThread extends Thread implements serverThrea
this.monitorURL = monitorURL;
}
public final void setStartupSleep(long milliseconds) {
// sets a sleep time before execution of the job-loop
startup = milliseconds;
}
public final long setIdleSleep(long milliseconds) {
// sets a sleep time for pauses between two jobs
idlePause = milliseconds;
return milliseconds;
}
public final long setBusySleep(long milliseconds) {
// sets a sleep time for pauses between two jobs
busyPause = milliseconds;
return milliseconds;
}
public void setMemPreReqisite(long freeBytes) {
// sets minimum required amount of memory for the job execution
memprereq = freeBytes;
}
public void setObeyIntermission(boolean obey) {
// defines if the thread should obey the intermission command
intermissionObedient = obey;
}
public final String getShortDescription() {
return this.shortDescr;
}
@ -135,32 +85,11 @@ public abstract class serverAbstractThread extends Thread implements serverThrea
return this.monitorURL;
}
public final long getIdleCycles() {
// returns the total number of cycles of job execution with idle-result
return this.idleCycles;
}
public final long getBusyCycles() {
// returns the total number of cycles of job execution with busy-result
return this.busyCycles;
}
public long getOutOfMemoryCycles() {
// returns the total number of cycles where
// a job execution was omitted because of memory shortage
return this.outofmemoryCycles;
}
public final long getBlockTime() {
// returns the total time that this thread has been blocked so far
return this.blockPause;
}
public final long getSleepTime() {
// returns the total time that this thread has slept so far
return this.idletime;
}
public final long getExecTime() {
// returns the total time that this thread has worked so far
return this.busytime;
@ -181,13 +110,6 @@ public abstract class serverAbstractThread extends Thread implements serverThrea
return !this.running || Thread.currentThread().isInterrupted();
}
public void intermission(long pause) {
if (pause == Long.MAX_VALUE)
this.intermission = Long.MAX_VALUE;
else
this.intermission = System.currentTimeMillis() + pause;
}
public void terminate(boolean waitFor) {
// after calling this method, the thread shall terminate
this.running = false;
@ -204,30 +126,11 @@ public abstract class serverAbstractThread extends Thread implements serverThrea
// If we reach this point, the process is closed
}
/*
private final void logError(String text) {
if (log == null) serverLog.logSevere("THREAD-CONTROL", text);
else log.logSevere(text);
}
*/
private final void logError(String text,Throwable thrown) {
if (log == null) serverLog.logSevere("THREAD-CONTROL", text, thrown);
else log.logSevere(text,thrown);
}
private void logSystem(String text) {
if (log == null) serverLog.logConfig("THREAD-CONTROL", text);
else log.logConfig(text);
}
/*
private void logSystem(String text, Throwable thrown) {
if (log == null) serverLog.logConfig("THREAD-CONTROL", text, thrown);
else log.logConfig(text,thrown);
}
*/
public void jobExceptionHandler(Exception e) {
if (!(e instanceof ClosedByInterruptException)) {
// default handler for job exceptions. shall be overridden for own handler
@ -235,126 +138,7 @@ public abstract class serverAbstractThread extends Thread implements serverThrea
}
}
public void run() {
if (startup > 0) {
// do a startup-delay
logSystem("thread '" + this.getName() + "' deployed, delaying start-up.");
ratz(startup);
if (!(running)) return;
}
this.open();
if (log != null) {
if (startup > 0)
logSystem("thread '" + this.getName() + "' delayed, " + ((this.busyPause < 0) ? "starting now job." : "starting now loop."));
else
logSystem("thread '" + this.getName() + "' deployed, " + ((this.busyPause < 0) ? "starting job." : "starting loop."));
}
long timestamp;
long memstamp0, memstamp1;
boolean isBusy;
//Runtime rt = Runtime.getRuntime();
while (running) {
if ((this.intermissionObedient) && (this.intermission > 0) && (this.intermission != Long.MAX_VALUE)) {
long itime = this.intermission - System.currentTimeMillis();
if (itime > 0) {
if (itime > this.idlePause) itime = this.idlePause;
logSystem("thread '" + this.getName()
+ "' breaks for intermission: " + (itime / 1000)
+ " seconds");
ratz(itime);
}
this.intermission = 0;
}
if (this.intermission == Long.MAX_VALUE) {
// omit Job, paused
logSystem("thread '" + this.getName() + "' paused");
timestamp = System.currentTimeMillis();
ratz(this.idlePause);
idletime += System.currentTimeMillis() - timestamp;
//} else if ((memnow = serverMemory.available()) > memprereq) try {
} else if (serverMemory.request(memprereq, false)) try {
// do job
timestamp = System.currentTimeMillis();
memstamp0 = serverMemory.used();
isBusy = this.job();
// do memory and busy/idle-count/time monitoring
if (isBusy) {
memstamp1 = serverMemory.used();
if (memstamp1 >= memstamp0) {
// no GC in between. this is not shure but most probable
memuse += memstamp1 - memstamp0;
} else {
// GC was obviously in between. Add an average as simple heuristic
if (busyCycles > 0) memuse += memuse / busyCycles;
}
busytime += System.currentTimeMillis() - timestamp;
busyCycles++;
} else {
idleCycles++;
}
// interrupt loop if this is interrupted or supposed to be a one-time job
if ((!running) || (this.isInterrupted())) break;
if ((this.idlePause < 0) || (this.busyPause < 0)) break; // for one-time jobs
// process scheduled pause
timestamp = System.currentTimeMillis();
ratz((isBusy) ? this.busyPause : this.idlePause);
idletime += System.currentTimeMillis() - timestamp;
} catch (Exception e) {
// handle exceptions: thread must not die on any unexpected exceptions
// if the exception is too bad it should call terminate()
this.jobExceptionHandler(e);
busyCycles++;
} else {
log.logWarning("Thread '" + this.getName() + "' runs short memory cycle. Free mem: " +
(serverMemory.available() / 1024) + " KB, needed: " + (memprereq / 1024) + " KB");
// omit job, not enough memory
// process scheduled pause
timestamp = System.currentTimeMillis();
// do a clean-up
this.freemem();
// sleep a while
ratz(this.idlePause);
idletime += System.currentTimeMillis() - timestamp;
outofmemoryCycles++;
}
}
this.close();
logSystem("thread '" + this.getName() + "' terminated.");
}
private void ratz(long millis) {
try {
if (this.syncObject != null) {
synchronized(this.syncObject) {
this.syncObject.wait(millis);
}
} else {
Thread.sleep(millis);
}
} catch (InterruptedException e) {
if (this.log != null) this.log.logConfig("thread '" + this.getName() + "' interrupted because of shutdown.");
}
}
public void open() {} // dummy definition; should be overriden
public void close() {} // dummy definition; should be overriden
public void setSyncObject(Object sync) {
this.syncObject = sync;
}
public Object getSyncObject() {
return this.syncObject;
}
public void notifyThread() {
if (this.syncObject != null) {
synchronized(this.syncObject) {
if (this.log != null) this.log.logFine("thread '" + this.getName() + "' has received a notification from thread '" + Thread.currentThread().getName() + "'.");
this.syncObject.notifyAll();
}
}
}
}

@ -1,6 +1,6 @@
// serverProcess.java
// serverBlockingThread.java
// (C) 2008 by Michael Peter Christen; mc@yacy.net, Frankfurt a. M., Germany
// first published 27.02.2008 on http://yacy.net
// first published 27.03.2008 on http://yacy.net
//
// $LastChangedDate: 2006-04-02 22:40:07 +0200 (So, 02 Apr 2006) $
// $LastChangedRevision: 1986 $
@ -24,8 +24,18 @@
package de.anomic.server;
public interface serverProcess<I, O> {
import java.util.concurrent.BlockingQueue;
public interface serverBlockingThread<I, O> extends serverThread {
public void setInputQueue(BlockingQueue<I> queue);
public void setOutputQueue(BlockingQueue<O> queue);
public BlockingQueue<I> getInputQueue();
public BlockingQueue<O> getOutputQueue();
public O job(I next) throws Exception;
// performes one job procedure; this loopes until terminate() is called
// job returns true if it has done something
// it returns false if it is idle and does not expect to work on more for a longer time
public O process(I input);
}

@ -0,0 +1,70 @@
// serverBusyThread.java
// (C) 2008 by Michael Peter Christen; mc@yacy.net, Frankfurt a. M., Germany
// first published 27.03.2008 on http://yacy.net
//
// $LastChangedDate: 2006-04-02 22:40:07 +0200 (So, 02 Apr 2006) $
// $LastChangedRevision: 1986 $
// $LastChangedBy: orbiter $
//
// LICENSE
//
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 2 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
package de.anomic.server;
public interface serverBusyThread extends serverThread {
public void setStartupSleep(long milliseconds);
// sets a sleep time before execution of the job-loop
public long setIdleSleep(long milliseconds);
// sets a sleep time for pauses between two jobs if the job returns false (idle)
public long setBusySleep(long milliseconds);
// sets a sleep time for pauses between two jobs if the job returns true (busy)
public void setMemPreReqisite(long freeBytes);
// sets minimum required amount of memory for the job execution
public void setObeyIntermission(boolean obey);
// defines if the thread should obey the intermission command
public long getIdleCycles();
// returns the total number of cycles of job execution with idle-result
public long getBusyCycles();
// returns the total number of cycles of job execution with busy-result
public long getOutOfMemoryCycles();
// returns the total number of cycles where
// a job execution was omitted because of memory shortage
public long getSleepTime();
// returns the total time that this thread has slept so far
public void intermission(long pause);
// the thread is forced to pause for a specific time
// if the thread is busy meanwhile, the pause is ommitted
public boolean job() throws Exception;
// performes one job procedure; this loopes until terminate() is called
// job returns true if it has done something
// it returns false if it is idle and does not expect to work on more for a longer time
public void freemem();
// is called when an outOfMemoryCycle is performed
// this method should try to free some memory, so that the job can be executed
}

@ -83,7 +83,7 @@ import de.anomic.urlRedirector.urlRedirectord;
import de.anomic.yacy.yacyCore;
import de.anomic.yacy.yacySeed;
public final class serverCore extends serverAbstractThread implements serverThread {
public final class serverCore extends serverAbstractBusyThread implements serverBusyThread {
// special ASCII codes used for protocol handling
public static final byte HT = 9; // Horizontal Tab

@ -0,0 +1,122 @@
// serverInstantBlockingThread.java
// (C) 2008 by Michael Peter Christen; mc@yacy.net, Frankfurt a. M., Germany
// first published 27.03.2008 on http://yacy.net
//
// $LastChangedDate: 2006-04-02 22:40:07 +0200 (So, 02 Apr 2006) $
// $LastChangedRevision: 1986 $
// $LastChangedBy: orbiter $
//
// LICENSE
//
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 2 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
package de.anomic.server;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import de.anomic.server.logging.serverLog;
public class serverInstantBlockingThread<I, O> extends serverAbstractBlockingThread<I, O> implements serverBlockingThread<I, O> {
private Method jobExecMethod, jobCountMethod;
private Object environment;
private Long handle;
public static int instantThreadCounter = 0;
public static TreeMap<Long, String> jobs = new TreeMap<Long, String>();
public serverInstantBlockingThread(Object env, String jobExec, String jobCount, BlockingQueue<I> input, BlockingQueue<O> output) {
// jobExec is the name of a method of the object 'env' that executes the one-step-run
// jobCount is the name of a method that returns the size of the job
// set the blocking queues for input and output
this.setInputQueue(input);
this.setOutputQueue(output);
// define execution class
Class<?> theClass = (env instanceof Class) ? (Class<?>) env : env.getClass();
try {
this.jobExecMethod = theClass.getMethod(jobExec, new Class[0]);
} catch (NoSuchMethodException e) {
throw new RuntimeException("serverInstantThread, wrong declaration of jobExec: " + e.getMessage());
}
try {
if (jobCount == null)
this.jobCountMethod = null;
else
this.jobCountMethod = theClass.getMethod(jobCount, new Class[0]);
} catch (NoSuchMethodException e) {
throw new RuntimeException("serverInstantThread, wrong declaration of jobCount: " + e.getMessage());
}
this.environment = (env instanceof Class) ? null : env;
this.setName(theClass.getName() + "." + jobExec);
this.handle = new Long(System.currentTimeMillis() + this.getName().hashCode());
}
public int getJobCount() {
if (this.jobCountMethod == null) return Integer.MAX_VALUE;
try {
Object result = jobCountMethod.invoke(environment, new Object[0]);
if (result instanceof Integer)
return ((Integer) result).intValue();
else
return -1;
} catch (IllegalAccessException e) {
return -1;
} catch (IllegalArgumentException e) {
return -1;
} catch (InvocationTargetException e) {
serverLog.logSevere("BLOCKINGTHREAD", "invocation serverInstantThread of thread '" + this.getName() + "': " + e.getMessage(), e);
return -1;
}
}
@SuppressWarnings("unchecked")
public O job(I next) throws Exception {
instantThreadCounter++;
//System.out.println("started job " + this.handle + ": " + this.getName());
synchronized(jobs) {jobs.put(this.handle, this.getName());}
O out = null;
try {
out = (O) jobExecMethod.invoke(environment, new Object[0]);
} catch (IllegalAccessException e) {
serverLog.logSevere("BLOCKINGTHREAD", "Internal Error in serverInstantThread.job: " + e.getMessage());
serverLog.logSevere("BLOCKINGTHREAD", "shutting down thread '" + this.getName() + "'");
this.terminate(false);
} catch (IllegalArgumentException e) {
serverLog.logSevere("BLOCKINGTHREAD", "Internal Error in serverInstantThread.job: " + e.getMessage());
serverLog.logSevere("BLOCKINGTHREAD", "shutting down thread '" + this.getName() + "'");
this.terminate(false);
} catch (InvocationTargetException e) {
String targetException = e.getTargetException().getMessage();
e.getTargetException().printStackTrace();
e.printStackTrace();
if ((targetException != null) && ((targetException.indexOf("heap space") > 0) || (targetException.indexOf("NullPointerException") > 0))) e.getTargetException().printStackTrace();
serverLog.logSevere("BLOCKINGTHREAD", "Runtime Error in serverInstantThread.job, thread '" + this.getName() + "': " + e.getMessage() + "; target exception: " + targetException, e.getTargetException());
e.getTargetException().printStackTrace();
} catch (OutOfMemoryError e) {
serverLog.logSevere("BLOCKINGTHREAD", "OutOfMemory Error in serverInstantThread.job, thread '" + this.getName() + "': " + e.getMessage());
e.printStackTrace();
}
instantThreadCounter--;
synchronized(jobs) {jobs.remove(this.handle);}
return out;
}
}

@ -46,7 +46,7 @@ import java.util.TreeMap;
import de.anomic.server.logging.serverLog;
public final class serverInstantThread extends serverAbstractThread implements serverThread {
public final class serverInstantBusyThread extends serverAbstractBusyThread implements serverBusyThread {
private Method jobExecMethod, jobCountMethod, freememExecMethod;
private Object environment;
@ -55,7 +55,7 @@ public final class serverInstantThread extends serverAbstractThread implements s
public static int instantThreadCounter = 0;
public static TreeMap<Long, String> jobs = new TreeMap<Long, String>();
public serverInstantThread(Object env, String jobExec, String jobCount, String freemem) {
public serverInstantBusyThread(Object env, String jobExec, String jobCount, String freemem) {
// jobExec is the name of a method of the object 'env' that executes the one-step-run
// jobCount is the name of a method that returns the size of the job
// freemem is the name of a method that tries to free memory and returns void
@ -101,7 +101,7 @@ public final class serverInstantThread extends serverAbstractThread implements s
} catch (IllegalArgumentException e) {
return -1;
} catch (InvocationTargetException e) {
serverLog.logSevere("SERVER", "invocation serverInstantThread of thread '" + this.getName() + "': " + e.getMessage(), e);
serverLog.logSevere("BUSYTHREAD", "invocation serverInstantThread of thread '" + this.getName() + "': " + e.getMessage(), e);
return -1;
}
}
@ -116,22 +116,22 @@ public final class serverInstantThread extends serverAbstractThread implements s
if (result == null) jobHasDoneSomething = true;
else if (result instanceof Boolean) jobHasDoneSomething = ((Boolean) result).booleanValue();
} catch (IllegalAccessException e) {
serverLog.logSevere("SERVER", "Internal Error in serverInstantThread.job: " + e.getMessage());
serverLog.logSevere("SERVER", "shutting down thread '" + this.getName() + "'");
serverLog.logSevere("BUSYTHREAD", "Internal Error in serverInstantThread.job: " + e.getMessage());
serverLog.logSevere("BUSYTHREAD", "shutting down thread '" + this.getName() + "'");
this.terminate(false);
} catch (IllegalArgumentException e) {
serverLog.logSevere("SERVER", "Internal Error in serverInstantThread.job: " + e.getMessage());
serverLog.logSevere("SERVER", "shutting down thread '" + this.getName() + "'");
serverLog.logSevere("BUSYTHREAD", "Internal Error in serverInstantThread.job: " + e.getMessage());
serverLog.logSevere("BUSYTHREAD", "shutting down thread '" + this.getName() + "'");
this.terminate(false);
} catch (InvocationTargetException e) {
String targetException = e.getTargetException().getMessage();
e.getTargetException().printStackTrace();
e.printStackTrace();
if ((targetException != null) && ((targetException.indexOf("heap space") > 0) || (targetException.indexOf("NullPointerException") > 0))) e.getTargetException().printStackTrace();
serverLog.logSevere("SERVER", "Runtime Error in serverInstantThread.job, thread '" + this.getName() + "': " + e.getMessage() + "; target exception: " + targetException, e.getTargetException());
serverLog.logSevere("BUSYTHREAD", "Runtime Error in serverInstantThread.job, thread '" + this.getName() + "': " + e.getMessage() + "; target exception: " + targetException, e.getTargetException());
e.getTargetException().printStackTrace();
} catch (OutOfMemoryError e) {
serverLog.logSevere("SERVER", "OutOfMemory Error in serverInstantThread.job, thread '" + this.getName() + "': " + e.getMessage());
serverLog.logSevere("BUSYTHREAD", "OutOfMemory Error in serverInstantThread.job, thread '" + this.getName() + "': " + e.getMessage());
e.printStackTrace();
freemem();
}
@ -145,27 +145,27 @@ public final class serverInstantThread extends serverAbstractThread implements s
try {
freememExecMethod.invoke(environment, new Object[0]);
} catch (IllegalAccessException e) {
serverLog.logSevere("SERVER", "Internal Error in serverInstantThread.freemem: " + e.getMessage());
serverLog.logSevere("SERVER", "shutting down thread '" + this.getName() + "'");
serverLog.logSevere("BUSYTHREAD", "Internal Error in serverInstantThread.freemem: " + e.getMessage());
serverLog.logSevere("BUSYTHREAD", "shutting down thread '" + this.getName() + "'");
this.terminate(false);
} catch (IllegalArgumentException e) {
serverLog.logSevere("SERVER", "Internal Error in serverInstantThread.freemem: " + e.getMessage());
serverLog.logSevere("SERVER", "shutting down thread '" + this.getName() + "'");
serverLog.logSevere("BUSYTHREAD", "Internal Error in serverInstantThread.freemem: " + e.getMessage());
serverLog.logSevere("BUSYTHREAD", "shutting down thread '" + this.getName() + "'");
this.terminate(false);
} catch (InvocationTargetException e) {
String targetException = e.getTargetException().getMessage();
if (targetException.indexOf("heap space") > 0) e.getTargetException().printStackTrace();
serverLog.logSevere("SERVER", "Runtime Error in serverInstantThread.freemem, thread '" + this.getName() + "': " + e.getMessage() + "; target exception: " + targetException, e.getTargetException());
serverLog.logSevere("BUSYTHREAD", "Runtime Error in serverInstantThread.freemem, thread '" + this.getName() + "': " + e.getMessage() + "; target exception: " + targetException, e.getTargetException());
e.getTargetException().printStackTrace();
} catch (OutOfMemoryError e) {
serverLog.logSevere("SERVER", "OutOfMemory Error in serverInstantThread.freemem, thread '" + this.getName() + "': " + e.getMessage());
serverLog.logSevere("BUSYTHREAD", "OutOfMemory Error in serverInstantThread.freemem, thread '" + this.getName() + "': " + e.getMessage());
e.printStackTrace();
}
}
public static serverThread oneTimeJob(Object env, String jobExec, serverLog log, long startupDelay) {
public static serverBusyThread oneTimeJob(Object env, String jobExec, serverLog log, long startupDelay) {
// start the job and execute it once as background process
serverThread thread = new serverInstantThread(env, jobExec, null, null);
serverBusyThread thread = new serverInstantBusyThread(env, jobExec, null, null);
thread.setStartupSleep(startupDelay);
thread.setIdleSleep(-1);
thread.setBusySleep(-1);

@ -24,29 +24,9 @@
package de.anomic.server;
import java.util.concurrent.LinkedBlockingQueue;
public class serverProcessor {
public static final int availableCPU = Runtime.getRuntime().availableProcessors();
public static int useCPU = availableCPU;
public static class queue<I, O> {
String nickname;
int priority;
serverProcess<I, O> implementation;
LinkedBlockingQueue<I> inputQueue;
LinkedBlockingQueue<O> outputQueue;
public queue(String nickname, int priority, serverProcess<I, O> implementation) {
this.nickname = nickname;
this.priority = priority;
this.implementation = implementation;
this.inputQueue = new LinkedBlockingQueue<I>();
this.outputQueue = new LinkedBlockingQueue<O>();
}
}
}

@ -84,11 +84,11 @@ public interface serverSwitch<E> {
String threadShortDescription,
String threadLongDescription,
String threadMonitorURL,
serverThread newThread,
serverBusyThread newThread,
long startupDelay,
long initialIdleSleep, long initialBusySleep,
long initialMemoryPreRequisite);
public serverThread getThread(String threadName);
public serverBusyThread getThread(String threadName);
public void setThreadPerformance(String threadName, long idleMillis, long busyMillis, long memprereq);
public void terminateThread(String threadName, boolean waitFor);
public void intermissionAllThreads(long pause);

@ -1,10 +1,13 @@
// serverThread.java
// -----------------------
// (C) by Michael Peter Christen; mc@anomic.de
// first published on http://www.yacy.net
// Frankfurt, Germany, 2005
// last major change: 14.03.2005
// serverThread.java
// (C) 2005 by Michael Peter Christen; mc@yacy.net, Frankfurt a. M., Germany
// first published 13.03.2005 on http://yacy.net
//
// $LastChangedDate: 2006-04-02 22:40:07 +0200 (So, 02 Apr 2006) $
// $LastChangedRevision: 1986 $
// $LastChangedBy: orbiter $
//
// LICENSE
//
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 2 of the License, or
@ -18,25 +21,6 @@
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
//
// Using this software in any meaning (reading, learning, copying, compiling,
// running) means that you agree that the Author(s) is (are) not responsible
// for cost, loss of data or any harm that may be caused directly or indirectly
// by usage of this softare or this documentation. The usage of this software
// is on your own risk. The installation and usage (starting/running) of this
// software may allow other people or application to access your computer and
// any attached devices and is highly dependent on the configuration of the
// software which must be done by the user of the software; the author(s) is
// (are) also not responsible for proper configuration and usage of the
// software, even if provoked by documentation provided together with
// the software.
//
// Any changes to this file according to the GPL as documented in the file
// gpl.txt aside this file in the shipment you received can be done to the
// lines that follows this copyright notice here, but changes must not be
// done inside the copyright notive above. A re-distribution must contain
// the intact and unchanged copyright notice.
// Contributions and changes to the program code must be marked as such.
package de.anomic.server;
@ -56,21 +40,6 @@ public interface serverThread {
public void setDescription(String shortText, String longText, String monitorURL);
// sets a visible description string
public void setStartupSleep(long milliseconds);
// sets a sleep time before execution of the job-loop
public long setIdleSleep(long milliseconds);
// sets a sleep time for pauses between two jobs if the job returns false (idle)
public long setBusySleep(long milliseconds);
// sets a sleep time for pauses between two jobs if the job returns true (busy)
public void setMemPreReqisite(long freeBytes);
// sets minimum required amount of memory for the job execution
public void setObeyIntermission(boolean obey);
// defines if the thread should obey the intermission command
public String getShortDescription();
// returns short description string for online display
@ -80,22 +49,9 @@ public interface serverThread {
public String getMonitorURL();
// returns an URL that can be used to monitor the thread and it's queue
public long getIdleCycles();
// returns the total number of cycles of job execution with idle-result
public long getBusyCycles();
// returns the total number of cycles of job execution with busy-result
public long getOutOfMemoryCycles();
// returns the total number of cycles where
// a job execution was omitted because of memory shortage
public long getBlockTime();
// returns the total time that this thread has been blocked so far
public long getSleepTime();
// returns the total time that this thread has slept so far
public long getExecTime();
// returns the total time that this thread has worked so far
@ -108,10 +64,6 @@ public interface serverThread {
public void jobExceptionHandler(Exception e);
// handles any action necessary during job execution
public void intermission(long pause);
// the thread is forced to pause for a specific time
// if the thread is busy meanwhile, the pause is ommitted
public boolean shutdownInProgress();
public void terminate(boolean waitFor);
@ -124,15 +76,6 @@ public interface serverThread {
public void open();
// this is called right before the job queue is started
public boolean job() throws Exception;
// performes one job procedure; this loopes until terminate() is called
// job returns true if it has done something
// it returns false if it is idle and does not expect to work on more for a longer time
public void freemem();
// is called when an outOfMemoryCycle is performed
// this method should try to free some memory, so that the job can be executed
public int getJobCount();
// returns how many jobs are in the queue
// can be used to calculate a busy-state
@ -140,10 +83,4 @@ public interface serverThread {
public void close();
// jobs that need to be done after termination
// terminate must be called before
public void setSyncObject(Object sync);
public Object getSyncObject();
public void notifyThread();
}

Loading…
Cancel
Save