From bca87f1e383587a248cd43476d9835b6b31c378e Mon Sep 17 00:00:00 2001 From: orbiter Date: Thu, 27 Mar 2008 12:03:16 +0000 Subject: [PATCH] - refactoring of serverThreads: renaming to distinguish busy-threads and blocking-threads - added blockingThreads which are threads that are not driven by pause times but by BlockingQueue lookup git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@4606 6c8d7289-2bf4-0310-a012-ef5d649a1542 --- htroot/ConfigBasic.java | 4 +- htroot/ConfigNetwork_p.java | 4 +- htroot/PeerLoadPicture.java | 4 +- htroot/PerformanceQueues_p.java | 3 +- htroot/SettingsAck_p.java | 5 +- source/dbtest.java | 12 +- source/de/anomic/plasma/plasmaHTCache.java | 4 +- .../de/anomic/plasma/plasmaSwitchboard.java | 31 ++- .../sch/serverPortForwardingSch.java | 7 +- .../server/serverAbstractBlockingThread.java | 91 +++++++ .../server/serverAbstractBusyThread.java | 197 ++++++++++++++ .../anomic/server/serverAbstractSwitch.java | 16 +- .../anomic/server/serverAbstractThread.java | 250 ++---------------- ...Process.java => serverBlockingThread.java} | 20 +- source/de/anomic/server/serverBusyThread.java | 70 +++++ source/de/anomic/server/serverCore.java | 2 +- .../server/serverInstantBlockingThread.java | 122 +++++++++ ...read.java => serverInstantBusyThread.java} | 34 +-- source/de/anomic/server/serverProcessor.java | 22 +- source/de/anomic/server/serverSwitch.java | 4 +- source/de/anomic/server/serverThread.java | 81 +----- 21 files changed, 586 insertions(+), 397 deletions(-) create mode 100644 source/de/anomic/server/serverAbstractBlockingThread.java create mode 100644 source/de/anomic/server/serverAbstractBusyThread.java rename source/de/anomic/server/{serverProcess.java => serverBlockingThread.java} (59%) create mode 100644 source/de/anomic/server/serverBusyThread.java create mode 100644 source/de/anomic/server/serverInstantBlockingThread.java rename source/de/anomic/server/{serverInstantThread.java => serverInstantBusyThread.java} (79%) diff --git a/htroot/ConfigBasic.java b/htroot/ConfigBasic.java index e60b547e6..9f8a5de3c 100644 --- a/htroot/ConfigBasic.java +++ b/htroot/ConfigBasic.java @@ -59,7 +59,7 @@ import de.anomic.plasma.plasmaSwitchboard; import de.anomic.server.serverCodings; import de.anomic.server.serverCore; import de.anomic.server.serverDomains; -import de.anomic.server.serverInstantThread; +import de.anomic.server.serverInstantBusyThread; import de.anomic.server.serverObjects; import de.anomic.server.serverSwitch; import de.anomic.yacy.yacyCore; @@ -96,7 +96,7 @@ public class ConfigBasic { //boolean doPeerPing = false; if ((yacyCore.seedDB.mySeed().isVirgin()) || (yacyCore.seedDB.mySeed().isJunior())) { - serverInstantThread.oneTimeJob(sb.yc, "peerPing", null, 0); + serverInstantBusyThread.oneTimeJob(sb.yc, "peerPing", null, 0); //doPeerPing = true; } diff --git a/htroot/ConfigNetwork_p.java b/htroot/ConfigNetwork_p.java index a57566c4b..644535c11 100644 --- a/htroot/ConfigNetwork_p.java +++ b/htroot/ConfigNetwork_p.java @@ -27,10 +27,10 @@ import de.anomic.http.httpHeader; import de.anomic.plasma.plasmaSwitchboard; +import de.anomic.server.serverBusyThread; import de.anomic.server.serverCodings; import de.anomic.server.serverObjects; import de.anomic.server.serverSwitch; -import de.anomic.server.serverThread; import de.anomic.yacy.yacyCore; public class ConfigNetwork_p { @@ -122,7 +122,7 @@ public class ConfigNetwork_p { newppm = Math.max(1, Integer.parseInt(post.get("acceptCrawlLimit", "1"))); } catch (NumberFormatException e) {} long newBusySleep = Math.max(100, 60000 / newppm); - serverThread rct = sb.getThread(plasmaSwitchboard.CRAWLJOB_REMOTE_TRIGGERED_CRAWL); + serverBusyThread rct = sb.getThread(plasmaSwitchboard.CRAWLJOB_REMOTE_TRIGGERED_CRAWL); rct.setBusySleep(newBusySleep); sb.setConfig(plasmaSwitchboard.CRAWLJOB_REMOTE_TRIGGERED_CRAWL_BUSYSLEEP, Long.toString(newBusySleep)); diff --git a/htroot/PeerLoadPicture.java b/htroot/PeerLoadPicture.java index ab39d9bef..400060b9b 100644 --- a/htroot/PeerLoadPicture.java +++ b/htroot/PeerLoadPicture.java @@ -7,9 +7,9 @@ import de.anomic.http.httpHeader; import de.anomic.plasma.plasmaGrafics; import de.anomic.plasma.plasmaSwitchboard; import de.anomic.plasma.plasmaGrafics.CircleThreadPiece; +import de.anomic.server.serverBusyThread; import de.anomic.server.serverObjects; import de.anomic.server.serverSwitch; -import de.anomic.server.serverThread; public class PeerLoadPicture { @@ -36,7 +36,7 @@ public class PeerLoadPicture { Iterator threads = env.threadNames(); String threadname; - serverThread thread; + serverBusyThread thread; long busy_time = 0; diff --git a/htroot/PerformanceQueues_p.java b/htroot/PerformanceQueues_p.java index 36ae36679..537aa03e7 100644 --- a/htroot/PerformanceQueues_p.java +++ b/htroot/PerformanceQueues_p.java @@ -49,6 +49,7 @@ import java.util.Map; import de.anomic.http.httpHeader; import de.anomic.plasma.plasmaSwitchboard; +import de.anomic.server.serverBusyThread; import de.anomic.server.serverCore; import de.anomic.server.serverFileUtils; import de.anomic.server.serverObjects; @@ -66,7 +67,7 @@ public class PerformanceQueues_p { Map defaultSettings = ((post == null) || (!(post.containsKey("submitdefault")))) ? null : serverFileUtils.loadHashMap(defaultSettingsFile); Iterator threads = switchboard.threadNames(); String threadName; - serverThread thread; + serverBusyThread thread; boolean xml = ((String)header.get("PATH")).endsWith(".xml"); prop.setLocalized(!xml); diff --git a/htroot/SettingsAck_p.java b/htroot/SettingsAck_p.java index 2fbbf7a93..0c4879038 100644 --- a/htroot/SettingsAck_p.java +++ b/htroot/SettingsAck_p.java @@ -70,7 +70,6 @@ import de.anomic.server.serverDate; import de.anomic.server.serverMemory; import de.anomic.server.serverObjects; import de.anomic.server.serverSwitch; -import de.anomic.server.serverThread; import de.anomic.yacy.yacyCore; import de.anomic.yacy.yacySeed; import de.anomic.yacy.yacySeedUploader; @@ -257,8 +256,8 @@ public class SettingsAck_p { httpd.initPortForwarding(); // notifying publishSeed Thread - serverThread peerPing = env.getThread("30_peerping"); - peerPing.notifyThread(); + //serverThread peerPing = env.getThread("30_peerping"); + //peerPing.notifyThread(); } catch (Exception e) { prop.put("info", "23"); prop.putHTML("info_errormsg",(e.getMessage() == null) ? "unknown" : e.getMessage().replaceAll("\n","
")); diff --git a/source/dbtest.java b/source/dbtest.java index 788ad15c8..758c99888 100644 --- a/source/dbtest.java +++ b/source/dbtest.java @@ -25,7 +25,7 @@ import de.anomic.kelondro.kelondroRowSet; import de.anomic.kelondro.kelondroSQLTable; import de.anomic.kelondro.kelondroSplitTable; import de.anomic.kelondro.kelondroTree; -import de.anomic.server.serverInstantThread; +import de.anomic.server.serverInstantBusyThread; import de.anomic.server.serverMemory; import de.anomic.server.logging.serverLog; import de.anomic.ymage.ymageChart; @@ -449,10 +449,10 @@ public class dbtest { r = Math.abs(random.nextLong() % 1000); jcontrol.add(new Long(r)); kcontrol.putb((int) r, "x".getBytes()); - serverInstantThread.oneTimeJob(new WriteJob(table_test, table_reference, r), 0, 50); + serverInstantBusyThread.oneTimeJob(new WriteJob(table_test, table_reference, r), 0, 50); if (random.nextLong() % 5 == 0) ra.add(new Long(r)); for (int j = 0; j < readCount; j++) { - serverInstantThread.oneTimeJob(new ReadJob(table_test, table_reference, random.nextLong() % writeCount), random.nextLong() % 1000, 20); + serverInstantBusyThread.oneTimeJob(new ReadJob(table_test, table_reference, random.nextLong() % writeCount), random.nextLong() % 1000, 20); } if ((ra.size() > 0) && (random.nextLong() % 7 == 0)) { rc++; @@ -461,13 +461,13 @@ public class dbtest { jcontrol.remove(R); kcontrol.removeb((int) R.longValue()); System.out.println("remove: " + R.longValue()); - serverInstantThread.oneTimeJob(new RemoveJob(table_test, table_reference, ((Long) ra.remove(p)).longValue()), 0, 50); + serverInstantBusyThread.oneTimeJob(new RemoveJob(table_test, table_reference, ((Long) ra.remove(p)).longValue()), 0, 50); } } System.out.println("removed: " + rc + ", size of jcontrol set: " + jcontrol.size() + ", size of kcontrol set: " + kcontrol.size()); - while (serverInstantThread.instantThreadCounter > 0) { + while (serverInstantBusyThread.instantThreadCounter > 0) { try {Thread.sleep(1000);} catch (InterruptedException e) {} // wait for all tasks to finish - System.out.println("count: " + serverInstantThread.instantThreadCounter + ", jobs: " + serverInstantThread.jobs.toString()); + System.out.println("count: " + serverInstantBusyThread.instantThreadCounter + ", jobs: " + serverInstantBusyThread.jobs.toString()); } try {Thread.sleep(6000);} catch (InterruptedException e) {} } diff --git a/source/de/anomic/plasma/plasmaHTCache.java b/source/de/anomic/plasma/plasmaHTCache.java index 2e40c0891..75114444d 100644 --- a/source/de/anomic/plasma/plasmaHTCache.java +++ b/source/de/anomic/plasma/plasmaHTCache.java @@ -83,7 +83,7 @@ import de.anomic.plasma.cache.UnsupportedProtocolException; import de.anomic.server.serverCodings; import de.anomic.server.serverDomains; import de.anomic.server.serverFileUtils; -import de.anomic.server.serverInstantThread; +import de.anomic.server.serverInstantBusyThread; import de.anomic.server.serverSystem; import de.anomic.server.serverThread; import de.anomic.server.logging.serverLog; @@ -292,7 +292,7 @@ public final class plasmaHTCache { // start the cache startup thread // this will collect information about the current cache size and elements try { - cacheScanThread = serverInstantThread.oneTimeJob(Class.forName("de.anomic.plasma.plasmaHTCache"), "cacheScan", log, 120000); + cacheScanThread = serverInstantBusyThread.oneTimeJob(Class.forName("de.anomic.plasma.plasmaHTCache"), "cacheScan", log, 120000); } catch (ClassNotFoundException e) { e.printStackTrace(); } diff --git a/source/de/anomic/plasma/plasmaSwitchboard.java b/source/de/anomic/plasma/plasmaSwitchboard.java index 51cc13c4e..fa4ae333e 100644 --- a/source/de/anomic/plasma/plasmaSwitchboard.java +++ b/source/de/anomic/plasma/plasmaSwitchboard.java @@ -137,9 +137,10 @@ import de.anomic.plasma.crawler.plasmaProtocolLoader; import de.anomic.plasma.dbImport.dbImportManager; import de.anomic.plasma.parser.ParserException; import de.anomic.server.serverAbstractSwitch; +import de.anomic.server.serverBusyThread; import de.anomic.server.serverDomains; import de.anomic.server.serverFileUtils; -import de.anomic.server.serverInstantThread; +import de.anomic.server.serverInstantBusyThread; import de.anomic.server.serverMemory; import de.anomic.server.serverObjects; import de.anomic.server.serverProfiling; @@ -975,7 +976,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch= 1000) wantedPPM = 1000; int newBusySleep = 60000 / wantedPPM; // for wantedPPM = 10: 6000; for wantedPPM = 1000: 60 - serverThread thread; + serverBusyThread thread; thread = getThread(INDEX_DIST); if (thread != null) { diff --git a/source/de/anomic/server/portForwarding/sch/serverPortForwardingSch.java b/source/de/anomic/server/portForwarding/sch/serverPortForwardingSch.java index c5c7ad4fe..f03e4c015 100644 --- a/source/de/anomic/server/portForwarding/sch/serverPortForwardingSch.java +++ b/source/de/anomic/server/portForwarding/sch/serverPortForwardingSch.java @@ -54,7 +54,7 @@ import com.jcraft.jsch.Session; import com.jcraft.jsch.UIKeyboardInteractive; import com.jcraft.jsch.UserInfo; -import de.anomic.server.serverInstantThread; +import de.anomic.server.serverInstantBusyThread; import de.anomic.server.serverSwitch; import de.anomic.server.logging.serverLog; import de.anomic.server.portForwarding.serverPortForwarding; @@ -95,7 +95,7 @@ public class serverPortForwardingSch implements serverPortForwarding{ private int localHostPort; private static Session session; - private static serverInstantThread sessionWatcher; + private static serverInstantBusyThread sessionWatcher; private serverLog log; @@ -190,8 +190,7 @@ public class serverPortForwardingSch implements serverPortForwarding{ if (sessionWatcher == null) { this.log.logFine("Deploying port forwarding session watcher thread."); this.switchboard.deployThread("portForwardingWatcher", "Remote Port Forwarding Watcher", "this thread is used to detect broken connections and to re-establish it if necessary.", null, - sessionWatcher = new serverInstantThread(this, "reconnect", null, null), 30000,30000,30000,1000); - sessionWatcher.setSyncObject(new Object()); + sessionWatcher = new serverInstantBusyThread(this, "reconnect", null, null), 30000,30000,30000,1000); } this.log.logInfo("Remote port forwarding connection established: " + diff --git a/source/de/anomic/server/serverAbstractBlockingThread.java b/source/de/anomic/server/serverAbstractBlockingThread.java new file mode 100644 index 000000000..49e4988d9 --- /dev/null +++ b/source/de/anomic/server/serverAbstractBlockingThread.java @@ -0,0 +1,91 @@ +// serverAbstractBlockingThread.java +// (C) 2008 by Michael Peter Christen; mc@yacy.net, Frankfurt a. M., Germany +// first published 27.03.2008 on http://yacy.net +// +// $LastChangedDate: 2006-04-02 22:40:07 +0200 (So, 02 Apr 2006) $ +// $LastChangedRevision: 1986 $ +// $LastChangedBy: orbiter $ +// +// LICENSE +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation; either version 2 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +package de.anomic.server; + +import java.util.concurrent.BlockingQueue; + +import de.anomic.server.logging.serverLog; + +public abstract class serverAbstractBlockingThread extends serverAbstractThread implements serverBlockingThread { + + private BlockingQueue input = null; + private BlockingQueue output = null; + + public void setInputQueue(BlockingQueue queue) { + this.input = queue; + } + public void setOutputQueue(BlockingQueue queue) { + this.output = queue; + } + public BlockingQueue getInputQueue() { + return this.input; + } + public BlockingQueue getOutputQueue() { + return this.output; + } + + public void run() { + this.open(); + if (log != null) { + logSystem("thread '" + this.getName() + "' deployed, starting loop."); + } + long timestamp; + long memstamp0, memstamp1; + long busyCycles = 0; + + while (running) { + try { + // do job + timestamp = System.currentTimeMillis(); + memstamp0 = serverMemory.used(); + O out = this.job(this.input.take()); + if (out != null) this.output.add(out); + // do memory and busy/idle-count/time monitoring + memstamp1 = serverMemory.used(); + if (memstamp1 >= memstamp0) { + // no GC in between. this is not shure but most probable + memuse += memstamp1 - memstamp0; + } else { + // GC was obviously in between. Add an average as simple heuristic + if (busyCycles > 0) memuse += memuse / busyCycles; + } + busytime += System.currentTimeMillis() - timestamp; + } catch (Exception e) { + // handle exceptions: thread must not die on any unexpected exceptions + // if the exception is too bad it should call terminate() + this.jobExceptionHandler(e); + } finally { + busyCycles++; + } + } + this.close(); + logSystem("thread '" + this.getName() + "' terminated."); + } + + private void logSystem(String text) { + if (log == null) serverLog.logConfig("THREAD-CONTROL", text); + else log.logConfig(text); + } +} diff --git a/source/de/anomic/server/serverAbstractBusyThread.java b/source/de/anomic/server/serverAbstractBusyThread.java new file mode 100644 index 000000000..0242ed9b3 --- /dev/null +++ b/source/de/anomic/server/serverAbstractBusyThread.java @@ -0,0 +1,197 @@ +// serverAbstractBusyThread.java +// (C) 2005 by Michael Peter Christen; mc@yacy.net, Frankfurt a. M., Germany +// first published 14.03.2005 on http://yacy.net +// +// $LastChangedDate: 2006-04-02 22:40:07 +0200 (So, 02 Apr 2006) $ +// $LastChangedRevision: 1986 $ +// $LastChangedBy: orbiter $ +// +// LICENSE +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation; either version 2 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +package de.anomic.server; + +import de.anomic.server.logging.serverLog; + +public abstract class serverAbstractBusyThread extends serverAbstractThread implements serverBusyThread { + + private long startup = 0, intermission = 0, idlePause = 0, busyPause = 0; + private long idletime = 0, memprereq = 0; + private long idleCycles = 0, busyCycles = 0, outofmemoryCycles = 0; + private boolean intermissionObedient = true; + + protected final void announceMoreSleepTime(long millis) { + this.idletime += millis; + } + + public final void setStartupSleep(long milliseconds) { + // sets a sleep time before execution of the job-loop + startup = milliseconds; + } + + public final long setIdleSleep(long milliseconds) { + // sets a sleep time for pauses between two jobs + idlePause = milliseconds; + return milliseconds; + } + + public final long setBusySleep(long milliseconds) { + // sets a sleep time for pauses between two jobs + busyPause = milliseconds; + return milliseconds; + } + + public void setMemPreReqisite(long freeBytes) { + // sets minimum required amount of memory for the job execution + memprereq = freeBytes; + } + + public void setObeyIntermission(boolean obey) { + // defines if the thread should obey the intermission command + intermissionObedient = obey; + } + + public final long getIdleCycles() { + // returns the total number of cycles of job execution with idle-result + return this.idleCycles; + } + + public final long getBusyCycles() { + // returns the total number of cycles of job execution with busy-result + return this.busyCycles; + } + + public long getOutOfMemoryCycles() { + // returns the total number of cycles where + // a job execution was omitted because of memory shortage + return this.outofmemoryCycles; + } + + public final long getSleepTime() { + // returns the total time that this thread has slept so far + return this.idletime; + } + + public void intermission(long pause) { + if (pause == Long.MAX_VALUE) + this.intermission = Long.MAX_VALUE; + else + this.intermission = System.currentTimeMillis() + pause; + } + + + public void run() { + if (startup > 0) { + // do a startup-delay + logSystem("thread '" + this.getName() + "' deployed, delaying start-up."); + ratz(startup); + if (!(running)) return; + } + this.open(); + if (log != null) { + if (startup > 0) + logSystem("thread '" + this.getName() + "' delayed, " + ((this.busyPause < 0) ? "starting now job." : "starting now loop.")); + else + logSystem("thread '" + this.getName() + "' deployed, " + ((this.busyPause < 0) ? "starting job." : "starting loop.")); + } + long timestamp; + long memstamp0, memstamp1; + boolean isBusy; + //Runtime rt = Runtime.getRuntime(); + + while (running) { + if ((this.intermissionObedient) && (this.intermission > 0) && (this.intermission != Long.MAX_VALUE)) { + long itime = this.intermission - System.currentTimeMillis(); + if (itime > 0) { + if (itime > this.idlePause) itime = this.idlePause; + logSystem("thread '" + this.getName() + + "' breaks for intermission: " + (itime / 1000) + + " seconds"); + ratz(itime); + } + this.intermission = 0; + } + + if (this.intermission == Long.MAX_VALUE) { + // omit Job, paused + logSystem("thread '" + this.getName() + "' paused"); + timestamp = System.currentTimeMillis(); + ratz(this.idlePause); + idletime += System.currentTimeMillis() - timestamp; + //} else if ((memnow = serverMemory.available()) > memprereq) try { + } else if (serverMemory.request(memprereq, false)) try { + // do job + timestamp = System.currentTimeMillis(); + memstamp0 = serverMemory.used(); + isBusy = this.job(); + // do memory and busy/idle-count/time monitoring + if (isBusy) { + memstamp1 = serverMemory.used(); + if (memstamp1 >= memstamp0) { + // no GC in between. this is not shure but most probable + memuse += memstamp1 - memstamp0; + } else { + // GC was obviously in between. Add an average as simple heuristic + if (busyCycles > 0) memuse += memuse / busyCycles; + } + busytime += System.currentTimeMillis() - timestamp; + busyCycles++; + } else { + idleCycles++; + } + // interrupt loop if this is interrupted or supposed to be a one-time job + if ((!running) || (this.isInterrupted())) break; + if ((this.idlePause < 0) || (this.busyPause < 0)) break; // for one-time jobs + // process scheduled pause + timestamp = System.currentTimeMillis(); + ratz((isBusy) ? this.busyPause : this.idlePause); + idletime += System.currentTimeMillis() - timestamp; + } catch (Exception e) { + // handle exceptions: thread must not die on any unexpected exceptions + // if the exception is too bad it should call terminate() + this.jobExceptionHandler(e); + busyCycles++; + } else { + log.logWarning("Thread '" + this.getName() + "' runs short memory cycle. Free mem: " + + (serverMemory.available() / 1024) + " KB, needed: " + (memprereq / 1024) + " KB"); + // omit job, not enough memory + // process scheduled pause + timestamp = System.currentTimeMillis(); + // do a clean-up + this.freemem(); + // sleep a while + ratz(this.idlePause); + idletime += System.currentTimeMillis() - timestamp; + outofmemoryCycles++; + } + } + this.close(); + logSystem("thread '" + this.getName() + "' terminated."); + } + + private void ratz(long millis) { + try { + Thread.sleep(millis); + } catch (InterruptedException e) { + if (this.log != null) this.log.logConfig("thread '" + this.getName() + "' interrupted because of shutdown."); + } + } + + private void logSystem(String text) { + if (log == null) serverLog.logConfig("THREAD-CONTROL", text); + else log.logConfig(text); + } +} diff --git a/source/de/anomic/server/serverAbstractSwitch.java b/source/de/anomic/server/serverAbstractSwitch.java index ea5ad604f..ca516a7e1 100644 --- a/source/de/anomic/server/serverAbstractSwitch.java +++ b/source/de/anomic/server/serverAbstractSwitch.java @@ -66,7 +66,7 @@ public abstract class serverAbstractSwitch implements serverSwitch { private Map configProps; private Map configRemoved; private HashMap authorization; - private TreeMap workerThreads; + private TreeMap workerThreads; private TreeMap switchActions; protected HashMap> accessTracker; // mappings from requesting host to an ArrayList of serverTrack-entries private LinkedBlockingQueue cacheStack; @@ -151,7 +151,7 @@ public abstract class serverAbstractSwitch implements serverSwitch { accessTracker = new HashMap>(); // init thread control - workerThreads = new TreeMap(); + workerThreads = new TreeMap(); // init switch actions switchActions = new TreeMap(); @@ -379,7 +379,7 @@ public abstract class serverAbstractSwitch implements serverSwitch { String threadShortDescription, String threadLongDescription, String threadMonitorURL, - serverThread newThread, + serverBusyThread newThread, long startupDelay) { deployThread(threadName, threadShortDescription, threadLongDescription, threadMonitorURL, newThread, startupDelay, @@ -393,7 +393,7 @@ public abstract class serverAbstractSwitch implements serverSwitch { String threadShortDescription, String threadLongDescription, String threadMonitorURL, - serverThread newThread, + serverBusyThread newThread, long startupDelay, long initialIdleSleep, long initialBusySleep, @@ -429,12 +429,12 @@ public abstract class serverAbstractSwitch implements serverSwitch { if (workerThreads.containsKey(threadName)) newThread.start(); } - public serverThread getThread(String threadName) { - return (serverThread) workerThreads.get(threadName); + public serverBusyThread getThread(String threadName) { + return workerThreads.get(threadName); } public void setThreadPerformance(String threadName, long idleMillis, long busyMillis, long memprereqBytes) { - serverThread thread = (serverThread) workerThreads.get(threadName); + serverBusyThread thread = workerThreads.get(threadName); if (thread != null) { thread.setIdleSleep(idleMillis); thread.setBusySleep(busyMillis); @@ -452,7 +452,7 @@ public abstract class serverAbstractSwitch implements serverSwitch { public void intermissionAllThreads(long pause) { Iterator e = workerThreads.keySet().iterator(); while (e.hasNext()) { - ((serverThread) workerThreads.get(e.next())).intermission(pause); + workerThreads.get(e.next()).intermission(pause); } } diff --git a/source/de/anomic/server/serverAbstractThread.java b/source/de/anomic/server/serverAbstractThread.java index efaceba45..ede4f4540 100644 --- a/source/de/anomic/server/serverAbstractThread.java +++ b/source/de/anomic/server/serverAbstractThread.java @@ -1,10 +1,13 @@ -// serverAbstractThread.java -// ----------------------- -// (C) by Michael Peter Christen; mc@anomic.de -// first published on http://www.yacy.net -// Frankfurt, Germany, 2005 -// last major change: 14.03.2005 +// serverAbstractThread.java +// (C) 2005 by Michael Peter Christen; mc@yacy.net, Frankfurt a. M., Germany +// first published 14.03.2005 on http://yacy.net // +// $LastChangedDate: 2006-04-02 22:40:07 +0200 (So, 02 Apr 2006) $ +// $LastChangedRevision: 1986 $ +// $LastChangedBy: orbiter $ +// +// LICENSE +// // This program is free software; you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation; either version 2 of the License, or @@ -18,28 +21,9 @@ // You should have received a copy of the GNU General Public License // along with this program; if not, write to the Free Software // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -// -// Using this software in any meaning (reading, learning, copying, compiling, -// running) means that you agree that the Author(s) is (are) not responsible -// for cost, loss of data or any harm that may be caused directly or indirectly -// by usage of this softare or this documentation. The usage of this software -// is on your own risk. The installation and usage (starting/running) of this -// software may allow other people or application to access your computer and -// any attached devices and is highly dependent on the configuration of the -// software which must be done by the user of the software; the author(s) is -// (are) also not responsible for proper configuration and usage of the -// software, even if provoked by documentation provided together with -// the software. -// -// Any changes to this file according to the GPL as documented in the file -// gpl.txt aside this file in the shipment you received can be done to the -// lines that follows this copyright notice here, but changes must not be -// done inside the copyright notive above. A re-distribution must contain -// the intact and unchanged copyright notice. -// Contributions and changes to the program code must be marked as such. /* - an Implementation of a serverRunnable must only extend this class and impement + an Implementation of a serverThread must only extend this class and implement the methods: open(), job() and @@ -54,16 +38,13 @@ import de.anomic.server.logging.serverLog; public abstract class serverAbstractThread extends Thread implements serverThread { - private long startup = 0, intermission = 0, idlePause = 0, busyPause = 0, blockPause = 0; - private boolean running = true; - private serverLog log = null; - private long idletime = 0, busytime = 0, memprereq = 0, memuse = 0; - private String shortDescr = "", longDescr = ""; - private String monitorURL = null; - private long threadBlockTimestamp = System.currentTimeMillis(); - private long idleCycles = 0, busyCycles = 0, outofmemoryCycles = 0; - private Object syncObject = null; - private boolean intermissionObedient = true; + protected boolean running = true; + protected serverLog log = null; + protected long busytime = 0, memuse = 0; + private long blockPause = 0; + private String shortDescr = "", longDescr = ""; + private String monitorURL = null; + private long threadBlockTimestamp = System.currentTimeMillis(); protected final void announceThreadBlockApply() { // shall only be used, if a thread blocks for an important reason @@ -85,10 +66,6 @@ public abstract class serverAbstractThread extends Thread implements serverThrea this.busytime += millis; } - protected final void announceMoreSleepTime(long millis) { - this.idletime += millis; - } - public final void setDescription(String shortText, String longText, String monitorURL) { // sets a visible description string this.shortDescr = shortText; @@ -96,33 +73,6 @@ public abstract class serverAbstractThread extends Thread implements serverThrea this.monitorURL = monitorURL; } - public final void setStartupSleep(long milliseconds) { - // sets a sleep time before execution of the job-loop - startup = milliseconds; - } - - public final long setIdleSleep(long milliseconds) { - // sets a sleep time for pauses between two jobs - idlePause = milliseconds; - return milliseconds; - } - - public final long setBusySleep(long milliseconds) { - // sets a sleep time for pauses between two jobs - busyPause = milliseconds; - return milliseconds; - } - - public void setMemPreReqisite(long freeBytes) { - // sets minimum required amount of memory for the job execution - memprereq = freeBytes; - } - - public void setObeyIntermission(boolean obey) { - // defines if the thread should obey the intermission command - intermissionObedient = obey; - } - public final String getShortDescription() { return this.shortDescr; } @@ -135,32 +85,11 @@ public abstract class serverAbstractThread extends Thread implements serverThrea return this.monitorURL; } - public final long getIdleCycles() { - // returns the total number of cycles of job execution with idle-result - return this.idleCycles; - } - - public final long getBusyCycles() { - // returns the total number of cycles of job execution with busy-result - return this.busyCycles; - } - - public long getOutOfMemoryCycles() { - // returns the total number of cycles where - // a job execution was omitted because of memory shortage - return this.outofmemoryCycles; - } - public final long getBlockTime() { // returns the total time that this thread has been blocked so far return this.blockPause; } - public final long getSleepTime() { - // returns the total time that this thread has slept so far - return this.idletime; - } - public final long getExecTime() { // returns the total time that this thread has worked so far return this.busytime; @@ -181,13 +110,6 @@ public abstract class serverAbstractThread extends Thread implements serverThrea return !this.running || Thread.currentThread().isInterrupted(); } - public void intermission(long pause) { - if (pause == Long.MAX_VALUE) - this.intermission = Long.MAX_VALUE; - else - this.intermission = System.currentTimeMillis() + pause; - } - public void terminate(boolean waitFor) { // after calling this method, the thread shall terminate this.running = false; @@ -204,30 +126,11 @@ public abstract class serverAbstractThread extends Thread implements serverThrea // If we reach this point, the process is closed } - /* - private final void logError(String text) { - if (log == null) serverLog.logSevere("THREAD-CONTROL", text); - else log.logSevere(text); - } - */ - private final void logError(String text,Throwable thrown) { if (log == null) serverLog.logSevere("THREAD-CONTROL", text, thrown); else log.logSevere(text,thrown); } - private void logSystem(String text) { - if (log == null) serverLog.logConfig("THREAD-CONTROL", text); - else log.logConfig(text); - } - - /* - private void logSystem(String text, Throwable thrown) { - if (log == null) serverLog.logConfig("THREAD-CONTROL", text, thrown); - else log.logConfig(text,thrown); - } - */ - public void jobExceptionHandler(Exception e) { if (!(e instanceof ClosedByInterruptException)) { // default handler for job exceptions. shall be overridden for own handler @@ -235,126 +138,7 @@ public abstract class serverAbstractThread extends Thread implements serverThrea } } - public void run() { - if (startup > 0) { - // do a startup-delay - logSystem("thread '" + this.getName() + "' deployed, delaying start-up."); - ratz(startup); - if (!(running)) return; - } - this.open(); - if (log != null) { - if (startup > 0) - logSystem("thread '" + this.getName() + "' delayed, " + ((this.busyPause < 0) ? "starting now job." : "starting now loop.")); - else - logSystem("thread '" + this.getName() + "' deployed, " + ((this.busyPause < 0) ? "starting job." : "starting loop.")); - } - long timestamp; - long memstamp0, memstamp1; - boolean isBusy; - //Runtime rt = Runtime.getRuntime(); - - while (running) { - if ((this.intermissionObedient) && (this.intermission > 0) && (this.intermission != Long.MAX_VALUE)) { - long itime = this.intermission - System.currentTimeMillis(); - if (itime > 0) { - if (itime > this.idlePause) itime = this.idlePause; - logSystem("thread '" + this.getName() - + "' breaks for intermission: " + (itime / 1000) - + " seconds"); - ratz(itime); - } - this.intermission = 0; - } - - if (this.intermission == Long.MAX_VALUE) { - // omit Job, paused - logSystem("thread '" + this.getName() + "' paused"); - timestamp = System.currentTimeMillis(); - ratz(this.idlePause); - idletime += System.currentTimeMillis() - timestamp; - //} else if ((memnow = serverMemory.available()) > memprereq) try { - } else if (serverMemory.request(memprereq, false)) try { - // do job - timestamp = System.currentTimeMillis(); - memstamp0 = serverMemory.used(); - isBusy = this.job(); - // do memory and busy/idle-count/time monitoring - if (isBusy) { - memstamp1 = serverMemory.used(); - if (memstamp1 >= memstamp0) { - // no GC in between. this is not shure but most probable - memuse += memstamp1 - memstamp0; - } else { - // GC was obviously in between. Add an average as simple heuristic - if (busyCycles > 0) memuse += memuse / busyCycles; - } - busytime += System.currentTimeMillis() - timestamp; - busyCycles++; - } else { - idleCycles++; - } - // interrupt loop if this is interrupted or supposed to be a one-time job - if ((!running) || (this.isInterrupted())) break; - if ((this.idlePause < 0) || (this.busyPause < 0)) break; // for one-time jobs - // process scheduled pause - timestamp = System.currentTimeMillis(); - ratz((isBusy) ? this.busyPause : this.idlePause); - idletime += System.currentTimeMillis() - timestamp; - } catch (Exception e) { - // handle exceptions: thread must not die on any unexpected exceptions - // if the exception is too bad it should call terminate() - this.jobExceptionHandler(e); - busyCycles++; - } else { - log.logWarning("Thread '" + this.getName() + "' runs short memory cycle. Free mem: " + - (serverMemory.available() / 1024) + " KB, needed: " + (memprereq / 1024) + " KB"); - // omit job, not enough memory - // process scheduled pause - timestamp = System.currentTimeMillis(); - // do a clean-up - this.freemem(); - // sleep a while - ratz(this.idlePause); - idletime += System.currentTimeMillis() - timestamp; - outofmemoryCycles++; - } - } - this.close(); - logSystem("thread '" + this.getName() + "' terminated."); - } - - private void ratz(long millis) { - try { - if (this.syncObject != null) { - synchronized(this.syncObject) { - this.syncObject.wait(millis); - } - } else { - Thread.sleep(millis); - } - } catch (InterruptedException e) { - if (this.log != null) this.log.logConfig("thread '" + this.getName() + "' interrupted because of shutdown."); - } - } - public void open() {} // dummy definition; should be overriden public void close() {} // dummy definition; should be overriden - public void setSyncObject(Object sync) { - this.syncObject = sync; - } - - public Object getSyncObject() { - return this.syncObject; - } - - public void notifyThread() { - if (this.syncObject != null) { - synchronized(this.syncObject) { - if (this.log != null) this.log.logFine("thread '" + this.getName() + "' has received a notification from thread '" + Thread.currentThread().getName() + "'."); - this.syncObject.notifyAll(); - } - } - } } diff --git a/source/de/anomic/server/serverProcess.java b/source/de/anomic/server/serverBlockingThread.java similarity index 59% rename from source/de/anomic/server/serverProcess.java rename to source/de/anomic/server/serverBlockingThread.java index f99c00782..15615b952 100644 --- a/source/de/anomic/server/serverProcess.java +++ b/source/de/anomic/server/serverBlockingThread.java @@ -1,6 +1,6 @@ -// serverProcess.java +// serverBlockingThread.java // (C) 2008 by Michael Peter Christen; mc@yacy.net, Frankfurt a. M., Germany -// first published 27.02.2008 on http://yacy.net +// first published 27.03.2008 on http://yacy.net // // $LastChangedDate: 2006-04-02 22:40:07 +0200 (So, 02 Apr 2006) $ // $LastChangedRevision: 1986 $ @@ -24,8 +24,18 @@ package de.anomic.server; -public interface serverProcess { +import java.util.concurrent.BlockingQueue; + +public interface serverBlockingThread extends serverThread { + + public void setInputQueue(BlockingQueue queue); + public void setOutputQueue(BlockingQueue queue); + public BlockingQueue getInputQueue(); + public BlockingQueue getOutputQueue(); + + public O job(I next) throws Exception; + // performes one job procedure; this loopes until terminate() is called + // job returns true if it has done something + // it returns false if it is idle and does not expect to work on more for a longer time - public O process(I input); - } diff --git a/source/de/anomic/server/serverBusyThread.java b/source/de/anomic/server/serverBusyThread.java new file mode 100644 index 000000000..dc041c6ff --- /dev/null +++ b/source/de/anomic/server/serverBusyThread.java @@ -0,0 +1,70 @@ +// serverBusyThread.java +// (C) 2008 by Michael Peter Christen; mc@yacy.net, Frankfurt a. M., Germany +// first published 27.03.2008 on http://yacy.net +// +// $LastChangedDate: 2006-04-02 22:40:07 +0200 (So, 02 Apr 2006) $ +// $LastChangedRevision: 1986 $ +// $LastChangedBy: orbiter $ +// +// LICENSE +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation; either version 2 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +package de.anomic.server; + +public interface serverBusyThread extends serverThread { + + public void setStartupSleep(long milliseconds); + // sets a sleep time before execution of the job-loop + + public long setIdleSleep(long milliseconds); + // sets a sleep time for pauses between two jobs if the job returns false (idle) + + public long setBusySleep(long milliseconds); + // sets a sleep time for pauses between two jobs if the job returns true (busy) + + public void setMemPreReqisite(long freeBytes); + // sets minimum required amount of memory for the job execution + + public void setObeyIntermission(boolean obey); + // defines if the thread should obey the intermission command + + public long getIdleCycles(); + // returns the total number of cycles of job execution with idle-result + + public long getBusyCycles(); + // returns the total number of cycles of job execution with busy-result + + public long getOutOfMemoryCycles(); + // returns the total number of cycles where + // a job execution was omitted because of memory shortage + + public long getSleepTime(); + // returns the total time that this thread has slept so far + + public void intermission(long pause); + // the thread is forced to pause for a specific time + // if the thread is busy meanwhile, the pause is ommitted + + public boolean job() throws Exception; + // performes one job procedure; this loopes until terminate() is called + // job returns true if it has done something + // it returns false if it is idle and does not expect to work on more for a longer time + + public void freemem(); + // is called when an outOfMemoryCycle is performed + // this method should try to free some memory, so that the job can be executed + +} diff --git a/source/de/anomic/server/serverCore.java b/source/de/anomic/server/serverCore.java index b91cf6b5a..bdda76262 100644 --- a/source/de/anomic/server/serverCore.java +++ b/source/de/anomic/server/serverCore.java @@ -83,7 +83,7 @@ import de.anomic.urlRedirector.urlRedirectord; import de.anomic.yacy.yacyCore; import de.anomic.yacy.yacySeed; -public final class serverCore extends serverAbstractThread implements serverThread { +public final class serverCore extends serverAbstractBusyThread implements serverBusyThread { // special ASCII codes used for protocol handling public static final byte HT = 9; // Horizontal Tab diff --git a/source/de/anomic/server/serverInstantBlockingThread.java b/source/de/anomic/server/serverInstantBlockingThread.java new file mode 100644 index 000000000..48f8404fe --- /dev/null +++ b/source/de/anomic/server/serverInstantBlockingThread.java @@ -0,0 +1,122 @@ +// serverInstantBlockingThread.java +// (C) 2008 by Michael Peter Christen; mc@yacy.net, Frankfurt a. M., Germany +// first published 27.03.2008 on http://yacy.net +// +// $LastChangedDate: 2006-04-02 22:40:07 +0200 (So, 02 Apr 2006) $ +// $LastChangedRevision: 1986 $ +// $LastChangedBy: orbiter $ +// +// LICENSE +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation; either version 2 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +package de.anomic.server; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.TreeMap; +import java.util.concurrent.BlockingQueue; + +import de.anomic.server.logging.serverLog; + +public class serverInstantBlockingThread extends serverAbstractBlockingThread implements serverBlockingThread { + + private Method jobExecMethod, jobCountMethod; + private Object environment; + private Long handle; + + public static int instantThreadCounter = 0; + public static TreeMap jobs = new TreeMap(); + + public serverInstantBlockingThread(Object env, String jobExec, String jobCount, BlockingQueue input, BlockingQueue output) { + // jobExec is the name of a method of the object 'env' that executes the one-step-run + // jobCount is the name of a method that returns the size of the job + + // set the blocking queues for input and output + this.setInputQueue(input); + this.setOutputQueue(output); + + // define execution class + Class theClass = (env instanceof Class) ? (Class) env : env.getClass(); + try { + this.jobExecMethod = theClass.getMethod(jobExec, new Class[0]); + } catch (NoSuchMethodException e) { + throw new RuntimeException("serverInstantThread, wrong declaration of jobExec: " + e.getMessage()); + } + try { + if (jobCount == null) + this.jobCountMethod = null; + else + this.jobCountMethod = theClass.getMethod(jobCount, new Class[0]); + + } catch (NoSuchMethodException e) { + throw new RuntimeException("serverInstantThread, wrong declaration of jobCount: " + e.getMessage()); + } + this.environment = (env instanceof Class) ? null : env; + this.setName(theClass.getName() + "." + jobExec); + this.handle = new Long(System.currentTimeMillis() + this.getName().hashCode()); + } + + public int getJobCount() { + if (this.jobCountMethod == null) return Integer.MAX_VALUE; + try { + Object result = jobCountMethod.invoke(environment, new Object[0]); + if (result instanceof Integer) + return ((Integer) result).intValue(); + else + return -1; + } catch (IllegalAccessException e) { + return -1; + } catch (IllegalArgumentException e) { + return -1; + } catch (InvocationTargetException e) { + serverLog.logSevere("BLOCKINGTHREAD", "invocation serverInstantThread of thread '" + this.getName() + "': " + e.getMessage(), e); + return -1; + } + } + + @SuppressWarnings("unchecked") + public O job(I next) throws Exception { + instantThreadCounter++; + //System.out.println("started job " + this.handle + ": " + this.getName()); + synchronized(jobs) {jobs.put(this.handle, this.getName());} + O out = null; + try { + out = (O) jobExecMethod.invoke(environment, new Object[0]); + } catch (IllegalAccessException e) { + serverLog.logSevere("BLOCKINGTHREAD", "Internal Error in serverInstantThread.job: " + e.getMessage()); + serverLog.logSevere("BLOCKINGTHREAD", "shutting down thread '" + this.getName() + "'"); + this.terminate(false); + } catch (IllegalArgumentException e) { + serverLog.logSevere("BLOCKINGTHREAD", "Internal Error in serverInstantThread.job: " + e.getMessage()); + serverLog.logSevere("BLOCKINGTHREAD", "shutting down thread '" + this.getName() + "'"); + this.terminate(false); + } catch (InvocationTargetException e) { + String targetException = e.getTargetException().getMessage(); + e.getTargetException().printStackTrace(); + e.printStackTrace(); + if ((targetException != null) && ((targetException.indexOf("heap space") > 0) || (targetException.indexOf("NullPointerException") > 0))) e.getTargetException().printStackTrace(); + serverLog.logSevere("BLOCKINGTHREAD", "Runtime Error in serverInstantThread.job, thread '" + this.getName() + "': " + e.getMessage() + "; target exception: " + targetException, e.getTargetException()); + e.getTargetException().printStackTrace(); + } catch (OutOfMemoryError e) { + serverLog.logSevere("BLOCKINGTHREAD", "OutOfMemory Error in serverInstantThread.job, thread '" + this.getName() + "': " + e.getMessage()); + e.printStackTrace(); + } + instantThreadCounter--; + synchronized(jobs) {jobs.remove(this.handle);} + return out; + } + +} \ No newline at end of file diff --git a/source/de/anomic/server/serverInstantThread.java b/source/de/anomic/server/serverInstantBusyThread.java similarity index 79% rename from source/de/anomic/server/serverInstantThread.java rename to source/de/anomic/server/serverInstantBusyThread.java index 0ba1ef3e1..85c9ec672 100644 --- a/source/de/anomic/server/serverInstantThread.java +++ b/source/de/anomic/server/serverInstantBusyThread.java @@ -46,7 +46,7 @@ import java.util.TreeMap; import de.anomic.server.logging.serverLog; -public final class serverInstantThread extends serverAbstractThread implements serverThread { +public final class serverInstantBusyThread extends serverAbstractBusyThread implements serverBusyThread { private Method jobExecMethod, jobCountMethod, freememExecMethod; private Object environment; @@ -55,7 +55,7 @@ public final class serverInstantThread extends serverAbstractThread implements s public static int instantThreadCounter = 0; public static TreeMap jobs = new TreeMap(); - public serverInstantThread(Object env, String jobExec, String jobCount, String freemem) { + public serverInstantBusyThread(Object env, String jobExec, String jobCount, String freemem) { // jobExec is the name of a method of the object 'env' that executes the one-step-run // jobCount is the name of a method that returns the size of the job // freemem is the name of a method that tries to free memory and returns void @@ -101,7 +101,7 @@ public final class serverInstantThread extends serverAbstractThread implements s } catch (IllegalArgumentException e) { return -1; } catch (InvocationTargetException e) { - serverLog.logSevere("SERVER", "invocation serverInstantThread of thread '" + this.getName() + "': " + e.getMessage(), e); + serverLog.logSevere("BUSYTHREAD", "invocation serverInstantThread of thread '" + this.getName() + "': " + e.getMessage(), e); return -1; } } @@ -116,22 +116,22 @@ public final class serverInstantThread extends serverAbstractThread implements s if (result == null) jobHasDoneSomething = true; else if (result instanceof Boolean) jobHasDoneSomething = ((Boolean) result).booleanValue(); } catch (IllegalAccessException e) { - serverLog.logSevere("SERVER", "Internal Error in serverInstantThread.job: " + e.getMessage()); - serverLog.logSevere("SERVER", "shutting down thread '" + this.getName() + "'"); + serverLog.logSevere("BUSYTHREAD", "Internal Error in serverInstantThread.job: " + e.getMessage()); + serverLog.logSevere("BUSYTHREAD", "shutting down thread '" + this.getName() + "'"); this.terminate(false); } catch (IllegalArgumentException e) { - serverLog.logSevere("SERVER", "Internal Error in serverInstantThread.job: " + e.getMessage()); - serverLog.logSevere("SERVER", "shutting down thread '" + this.getName() + "'"); + serverLog.logSevere("BUSYTHREAD", "Internal Error in serverInstantThread.job: " + e.getMessage()); + serverLog.logSevere("BUSYTHREAD", "shutting down thread '" + this.getName() + "'"); this.terminate(false); } catch (InvocationTargetException e) { String targetException = e.getTargetException().getMessage(); e.getTargetException().printStackTrace(); e.printStackTrace(); if ((targetException != null) && ((targetException.indexOf("heap space") > 0) || (targetException.indexOf("NullPointerException") > 0))) e.getTargetException().printStackTrace(); - serverLog.logSevere("SERVER", "Runtime Error in serverInstantThread.job, thread '" + this.getName() + "': " + e.getMessage() + "; target exception: " + targetException, e.getTargetException()); + serverLog.logSevere("BUSYTHREAD", "Runtime Error in serverInstantThread.job, thread '" + this.getName() + "': " + e.getMessage() + "; target exception: " + targetException, e.getTargetException()); e.getTargetException().printStackTrace(); } catch (OutOfMemoryError e) { - serverLog.logSevere("SERVER", "OutOfMemory Error in serverInstantThread.job, thread '" + this.getName() + "': " + e.getMessage()); + serverLog.logSevere("BUSYTHREAD", "OutOfMemory Error in serverInstantThread.job, thread '" + this.getName() + "': " + e.getMessage()); e.printStackTrace(); freemem(); } @@ -145,27 +145,27 @@ public final class serverInstantThread extends serverAbstractThread implements s try { freememExecMethod.invoke(environment, new Object[0]); } catch (IllegalAccessException e) { - serverLog.logSevere("SERVER", "Internal Error in serverInstantThread.freemem: " + e.getMessage()); - serverLog.logSevere("SERVER", "shutting down thread '" + this.getName() + "'"); + serverLog.logSevere("BUSYTHREAD", "Internal Error in serverInstantThread.freemem: " + e.getMessage()); + serverLog.logSevere("BUSYTHREAD", "shutting down thread '" + this.getName() + "'"); this.terminate(false); } catch (IllegalArgumentException e) { - serverLog.logSevere("SERVER", "Internal Error in serverInstantThread.freemem: " + e.getMessage()); - serverLog.logSevere("SERVER", "shutting down thread '" + this.getName() + "'"); + serverLog.logSevere("BUSYTHREAD", "Internal Error in serverInstantThread.freemem: " + e.getMessage()); + serverLog.logSevere("BUSYTHREAD", "shutting down thread '" + this.getName() + "'"); this.terminate(false); } catch (InvocationTargetException e) { String targetException = e.getTargetException().getMessage(); if (targetException.indexOf("heap space") > 0) e.getTargetException().printStackTrace(); - serverLog.logSevere("SERVER", "Runtime Error in serverInstantThread.freemem, thread '" + this.getName() + "': " + e.getMessage() + "; target exception: " + targetException, e.getTargetException()); + serverLog.logSevere("BUSYTHREAD", "Runtime Error in serverInstantThread.freemem, thread '" + this.getName() + "': " + e.getMessage() + "; target exception: " + targetException, e.getTargetException()); e.getTargetException().printStackTrace(); } catch (OutOfMemoryError e) { - serverLog.logSevere("SERVER", "OutOfMemory Error in serverInstantThread.freemem, thread '" + this.getName() + "': " + e.getMessage()); + serverLog.logSevere("BUSYTHREAD", "OutOfMemory Error in serverInstantThread.freemem, thread '" + this.getName() + "': " + e.getMessage()); e.printStackTrace(); } } - public static serverThread oneTimeJob(Object env, String jobExec, serverLog log, long startupDelay) { + public static serverBusyThread oneTimeJob(Object env, String jobExec, serverLog log, long startupDelay) { // start the job and execute it once as background process - serverThread thread = new serverInstantThread(env, jobExec, null, null); + serverBusyThread thread = new serverInstantBusyThread(env, jobExec, null, null); thread.setStartupSleep(startupDelay); thread.setIdleSleep(-1); thread.setBusySleep(-1); diff --git a/source/de/anomic/server/serverProcessor.java b/source/de/anomic/server/serverProcessor.java index 75efc9b84..b4ebf1bba 100644 --- a/source/de/anomic/server/serverProcessor.java +++ b/source/de/anomic/server/serverProcessor.java @@ -24,29 +24,9 @@ package de.anomic.server; -import java.util.concurrent.LinkedBlockingQueue; - - public class serverProcessor { public static final int availableCPU = Runtime.getRuntime().availableProcessors(); public static int useCPU = availableCPU; - - public static class queue { - String nickname; - int priority; - serverProcess implementation; - LinkedBlockingQueue inputQueue; - LinkedBlockingQueue outputQueue; - - public queue(String nickname, int priority, serverProcess implementation) { - this.nickname = nickname; - this.priority = priority; - this.implementation = implementation; - this.inputQueue = new LinkedBlockingQueue(); - this.outputQueue = new LinkedBlockingQueue(); - } - } - - + } diff --git a/source/de/anomic/server/serverSwitch.java b/source/de/anomic/server/serverSwitch.java index c58a09f5e..43b774406 100644 --- a/source/de/anomic/server/serverSwitch.java +++ b/source/de/anomic/server/serverSwitch.java @@ -84,11 +84,11 @@ public interface serverSwitch { String threadShortDescription, String threadLongDescription, String threadMonitorURL, - serverThread newThread, + serverBusyThread newThread, long startupDelay, long initialIdleSleep, long initialBusySleep, long initialMemoryPreRequisite); - public serverThread getThread(String threadName); + public serverBusyThread getThread(String threadName); public void setThreadPerformance(String threadName, long idleMillis, long busyMillis, long memprereq); public void terminateThread(String threadName, boolean waitFor); public void intermissionAllThreads(long pause); diff --git a/source/de/anomic/server/serverThread.java b/source/de/anomic/server/serverThread.java index 228a4ec38..b69199bef 100644 --- a/source/de/anomic/server/serverThread.java +++ b/source/de/anomic/server/serverThread.java @@ -1,10 +1,13 @@ -// serverThread.java -// ----------------------- -// (C) by Michael Peter Christen; mc@anomic.de -// first published on http://www.yacy.net -// Frankfurt, Germany, 2005 -// last major change: 14.03.2005 +// serverThread.java +// (C) 2005 by Michael Peter Christen; mc@yacy.net, Frankfurt a. M., Germany +// first published 13.03.2005 on http://yacy.net // +// $LastChangedDate: 2006-04-02 22:40:07 +0200 (So, 02 Apr 2006) $ +// $LastChangedRevision: 1986 $ +// $LastChangedBy: orbiter $ +// +// LICENSE +// // This program is free software; you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation; either version 2 of the License, or @@ -18,25 +21,6 @@ // You should have received a copy of the GNU General Public License // along with this program; if not, write to the Free Software // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -// -// Using this software in any meaning (reading, learning, copying, compiling, -// running) means that you agree that the Author(s) is (are) not responsible -// for cost, loss of data or any harm that may be caused directly or indirectly -// by usage of this softare or this documentation. The usage of this software -// is on your own risk. The installation and usage (starting/running) of this -// software may allow other people or application to access your computer and -// any attached devices and is highly dependent on the configuration of the -// software which must be done by the user of the software; the author(s) is -// (are) also not responsible for proper configuration and usage of the -// software, even if provoked by documentation provided together with -// the software. -// -// Any changes to this file according to the GPL as documented in the file -// gpl.txt aside this file in the shipment you received can be done to the -// lines that follows this copyright notice here, but changes must not be -// done inside the copyright notive above. A re-distribution must contain -// the intact and unchanged copyright notice. -// Contributions and changes to the program code must be marked as such. package de.anomic.server; @@ -56,21 +40,6 @@ public interface serverThread { public void setDescription(String shortText, String longText, String monitorURL); // sets a visible description string - public void setStartupSleep(long milliseconds); - // sets a sleep time before execution of the job-loop - - public long setIdleSleep(long milliseconds); - // sets a sleep time for pauses between two jobs if the job returns false (idle) - - public long setBusySleep(long milliseconds); - // sets a sleep time for pauses between two jobs if the job returns true (busy) - - public void setMemPreReqisite(long freeBytes); - // sets minimum required amount of memory for the job execution - - public void setObeyIntermission(boolean obey); - // defines if the thread should obey the intermission command - public String getShortDescription(); // returns short description string for online display @@ -80,22 +49,9 @@ public interface serverThread { public String getMonitorURL(); // returns an URL that can be used to monitor the thread and it's queue - public long getIdleCycles(); - // returns the total number of cycles of job execution with idle-result - - public long getBusyCycles(); - // returns the total number of cycles of job execution with busy-result - - public long getOutOfMemoryCycles(); - // returns the total number of cycles where - // a job execution was omitted because of memory shortage - public long getBlockTime(); // returns the total time that this thread has been blocked so far - public long getSleepTime(); - // returns the total time that this thread has slept so far - public long getExecTime(); // returns the total time that this thread has worked so far @@ -108,10 +64,6 @@ public interface serverThread { public void jobExceptionHandler(Exception e); // handles any action necessary during job execution - public void intermission(long pause); - // the thread is forced to pause for a specific time - // if the thread is busy meanwhile, the pause is ommitted - public boolean shutdownInProgress(); public void terminate(boolean waitFor); @@ -124,15 +76,6 @@ public interface serverThread { public void open(); // this is called right before the job queue is started - public boolean job() throws Exception; - // performes one job procedure; this loopes until terminate() is called - // job returns true if it has done something - // it returns false if it is idle and does not expect to work on more for a longer time - - public void freemem(); - // is called when an outOfMemoryCycle is performed - // this method should try to free some memory, so that the job can be executed - public int getJobCount(); // returns how many jobs are in the queue // can be used to calculate a busy-state @@ -140,10 +83,4 @@ public interface serverThread { public void close(); // jobs that need to be done after termination // terminate must be called before - - - public void setSyncObject(Object sync); - public Object getSyncObject(); - public void notifyThread(); - }