From 49be60a7c81d60265a70fc0f26599a8ab8c883f5 Mon Sep 17 00:00:00 2001 From: Michael Peter Christen Date: Tue, 10 Jan 2012 03:03:12 +0100 Subject: [PATCH] WorkflowProcess is forced to make small pauses if shortMemoryStatus is reached. --- source/net/yacy/kelondro/table/Table.java | 12 ++++++ .../workflow/AbstractBlockingThread.java | 43 +++++++++++++------ .../kelondro/workflow/AbstractThread.java | 28 ++++++++++-- .../kelondro/workflow/WorkflowProcessor.java | 36 ++++++++++++---- 4 files changed, 94 insertions(+), 25 deletions(-) diff --git a/source/net/yacy/kelondro/table/Table.java b/source/net/yacy/kelondro/table/Table.java index 4bbe283fc..4de6fc9ba 100644 --- a/source/net/yacy/kelondro/table/Table.java +++ b/source/net/yacy/kelondro/table/Table.java @@ -262,6 +262,7 @@ public class Table implements Index, Iterable { } } + @Override public long mem() { return this.index.mem() + ((this.table == null) ? 0 : this.table.mem()); } @@ -271,10 +272,12 @@ public class Table implements Index, Iterable { return MemoryControl.shortStatus() || MemoryControl.available() < this.minmemremaining; } + @Override public byte[] smallestKey() { return this.index.smallestKey(); } + @Override public byte[] largestKey() { return this.index.largestKey(); } @@ -348,6 +351,7 @@ public class Table implements Index, Iterable { } } + @Override public synchronized void addUnique(final Entry row) throws IOException, RowSpaceExceededException { assert this.file.size() == this.index.size() : "file.size() = " + this.file.size() + ", index.size() = " + this.index.size(); assert this.table == null || this.table.size() == this.index.size() : "table.size() = " + this.table.size() + ", index.size() = " + this.index.size(); @@ -395,6 +399,7 @@ public class Table implements Index, Iterable { * and * @throws */ + @Override public synchronized List removeDoubles() throws IOException, RowSpaceExceededException { assert this.file.size() == this.index.size() : "file.size() = " + this.file.size() + ", index.size() = " + this.index.size(); final List report = new ArrayList(); @@ -447,6 +452,7 @@ public class Table implements Index, Iterable { return report; } + @Override public void close() { if (this.file != null) this.file.close(); this.file = null; @@ -461,10 +467,12 @@ public class Table implements Index, Iterable { if (this.file != null) close(); } + @Override public String filename() { return this.file.filename().toString(); } + @Override public Entry get(final byte[] key, final boolean _forcecopy) throws IOException { if (this.file == null || this.index == null) return null; Entry e = get0(key); @@ -506,6 +514,7 @@ public class Table implements Index, Iterable { return this.rowdef.newEntry(b); } + @Override public Map get(final Collection keys, final boolean forcecopy) throws IOException, InterruptedException { final Map map = new TreeMap(row().objectOrder); Row.Entry entry; @@ -516,15 +525,18 @@ public class Table implements Index, Iterable { return map; } + @Override public boolean has(final byte[] key) { if (this.index == null) return false; return this.index.has(key); } + @Override public synchronized CloneableIterator keys(final boolean up, final byte[] firstKey) throws IOException { return this.index.keys(up, firstKey); } + @Override public Entry replace(final Entry row) throws IOException, RowSpaceExceededException { assert row != null; if (this.file == null || row == null) return null; diff --git a/source/net/yacy/kelondro/workflow/AbstractBlockingThread.java b/source/net/yacy/kelondro/workflow/AbstractBlockingThread.java index 105fd16a2..e207f3237 100644 --- a/source/net/yacy/kelondro/workflow/AbstractBlockingThread.java +++ b/source/net/yacy/kelondro/workflow/AbstractBlockingThread.java @@ -7,7 +7,7 @@ // $LastChangedBy$ // // LICENSE -// +// // This program is free software; you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation; either version 2 of the License, or @@ -31,10 +31,12 @@ public abstract class AbstractBlockingThread extends Abst private WorkflowProcessor manager = null; private final static Log log = new Log("BlockingThread"); - + + @Override public void setManager(final WorkflowProcessor manager) { this.manager = manager; } + @Override public WorkflowProcessor getManager() { return this.manager; } @@ -48,9 +50,19 @@ public abstract class AbstractBlockingThread extends Abst long timestamp; long memstamp0, memstamp1; long busyCycles = 0; - - while (running) { + + while (this.running) { try { + // check memory status + if (!shutdownInProgress() && MemoryControl.shortStatus()) { + // try to idle a bit to get out of that problem somehow without making it worse + for (int i = 0; i < 5; i++) { + try {Thread.sleep(200);} catch (final InterruptedException e) {break;} + if (shutdownInProgress() || !MemoryControl.shortStatus()) { + break; + } + } + } // do job timestamp = System.currentTimeMillis(); memstamp0 = MemoryControl.used(); @@ -64,17 +76,21 @@ public abstract class AbstractBlockingThread extends Abst break; } final J out = this.job(in); - if (out != null) this.manager.passOn(out); + if (out != null) { + this.manager.passOn(out); + } // do memory and busy/idle-count/time monitoring memstamp1 = MemoryControl.used(); if (memstamp1 >= memstamp0) { - // no GC in between. this is not shure but most probable - memuse += memstamp1 - memstamp0; + // no GC in between. this is not sure but most probable + this.memuse += memstamp1 - memstamp0; } else { // GC was obviously in between. Add an average as simple heuristic - if (busyCycles > 0) memuse += memuse / busyCycles; + if (busyCycles > 0) { + this.memuse += this.memuse / busyCycles; + } } - busytime += System.currentTimeMillis() - timestamp; + this.busytime += System.currentTimeMillis() - timestamp; } catch (final InterruptedException e) { // don't ignore this: shut down this.running = false; @@ -91,9 +107,12 @@ public abstract class AbstractBlockingThread extends Abst this.close(); logSystem("thread '" + this.getName() + "' terminated."); } - + private void logSystem(final String text) { - if (log == null) Log.logConfig("THREAD-CONTROL", text); - else log.logConfig(text); + if (log == null) { + Log.logConfig("THREAD-CONTROL", text); + } else { + log.logConfig(text); + } } } diff --git a/source/net/yacy/kelondro/workflow/AbstractThread.java b/source/net/yacy/kelondro/workflow/AbstractThread.java index bfcdb0963..53281091d 100644 --- a/source/net/yacy/kelondro/workflow/AbstractThread.java +++ b/source/net/yacy/kelondro/workflow/AbstractThread.java @@ -40,13 +40,18 @@ import net.yacy.kelondro.logging.Log; public abstract class AbstractThread extends Thread implements WorkflowThread { private static Log log = new Log("WorkflowThread"); - protected boolean running = true; + protected boolean running = true, announcedShutdown = false; protected long busytime = 0, memuse = 0; private long blockPause = 0; private String shortDescr = "", longDescr = ""; private String monitorURL = null; private long threadBlockTimestamp = System.currentTimeMillis(); + + protected final boolean announceShutdown() { + return this.announcedShutdown; + } + protected final void announceThreadBlockApply() { // shall only be used, if a thread blocks for an important reason // like a socket connect and must renew the timestamp to correct @@ -67,6 +72,7 @@ public abstract class AbstractThread extends Thread implements WorkflowThread { this.busytime += millis; } + @Override public final void setDescription(final String shortText, final String longText, final String monitorURL) { // sets a visible description string this.shortDescr = shortText; @@ -74,37 +80,45 @@ public abstract class AbstractThread extends Thread implements WorkflowThread { this.monitorURL = monitorURL; } + @Override public final String getShortDescription() { return this.shortDescr; } + @Override public final String getLongDescription() { return this.longDescr; } + @Override public String getMonitorURL() { return this.monitorURL; } + @Override public final long getBlockTime() { // returns the total time that this thread has been blocked so far return this.blockPause; } + @Override public final long getExecTime() { // returns the total time that this thread has worked so far return this.busytime; } + @Override public long getMemoryUse() { // returns the sum of all memory usage differences before and after one busy job return this.memuse; } + @Override public boolean shutdownInProgress() { - return !this.running || Thread.currentThread().isInterrupted(); + return !this.running || this.announcedShutdown || Thread.currentThread().isInterrupted(); } + @Override public void terminate(final boolean waitFor) { // after calling this method, the thread shall terminate this.running = false; @@ -121,10 +135,14 @@ public abstract class AbstractThread extends Thread implements WorkflowThread { } private final void logError(final String text,final Throwable thrown) { - if (log == null) Log.logSevere("THREAD-CONTROL", text, thrown); - else log.logSevere(text,thrown); + if (log == null) { + Log.logSevere("THREAD-CONTROL", text, thrown); + } else { + log.logSevere(text,thrown); + } } + @Override public void jobExceptionHandler(final Exception e) { if (!(e instanceof ClosedByInterruptException)) { // default handler for job exceptions. shall be overridden for own handler @@ -132,7 +150,9 @@ public abstract class AbstractThread extends Thread implements WorkflowThread { } } + @Override public void open() {}; // dummy definition; should be overriden + @Override public void close() {}; // dummy definition; should be overriden } diff --git a/source/net/yacy/kelondro/workflow/WorkflowProcessor.java b/source/net/yacy/kelondro/workflow/WorkflowProcessor.java index ba4805dbe..ff8118b71 100644 --- a/source/net/yacy/kelondro/workflow/WorkflowProcessor.java +++ b/source/net/yacy/kelondro/workflow/WorkflowProcessor.java @@ -97,7 +97,9 @@ public class WorkflowProcessor { public J take() throws InterruptedException { // read from the input queue - if (this.input == null) return null; + if (this.input == null) { + return null; + } final long t = System.currentTimeMillis(); final J j = this.input.take(); this.blockTime += System.currentTimeMillis() - t; @@ -107,24 +109,34 @@ public class WorkflowProcessor { public void passOn(final J next) throws InterruptedException { // don't mix this method up with enQueue()! // this method enqueues into the _next_ queue, not this queue! - if (this.output == null) return; + if (this.output == null) { + return; + } final long t = System.currentTimeMillis(); this.output.enQueue(next); this.passOnTime += System.currentTimeMillis() - t; } public void clear() { - if (this.input != null) this.input.clear(); + if (this.input != null) { + this.input.clear(); + } } public synchronized void relaxCapacity() { - if (this.input.isEmpty()) return; - if (this.input.remainingCapacity() > 1000) return; + if (this.input.isEmpty()) { + return; + } + if (this.input.remainingCapacity() > 1000) { + return; + } final BlockingQueue i = new LinkedBlockingQueue(); J e; while (!this.input.isEmpty()) { e = this.input.poll(); - if (e == null) break; + if (e == null) { + break; + } i.add(e); } this.input = i; @@ -138,7 +150,9 @@ public class WorkflowProcessor { //Log.logWarning("PROCESSOR", "executing job " + environment.getClass().getName() + "." + methodName + " serialized"); try { final J out = (J) InstantBlockingThread.execMethod(this.environment, this.methodName).invoke(this.environment, new Object[]{in}); - if (out != null && this.output != null) this.output.enQueue(out); + if (out != null && this.output != null) { + this.output.enQueue(out); + } } catch (final IllegalArgumentException e) { Log.logException(e); } catch (final IllegalAccessException e) { @@ -161,8 +175,12 @@ public class WorkflowProcessor { @SuppressWarnings("unchecked") public void announceShutdown() { - if (this.executor == null) return; - if (this.executor.isShutdown()) return; + if (this.executor == null) { + return; + } + if (this.executor.isShutdown()) { + return; + } // before we put pills into the queue, make sure that they will take them relaxCapacity(); // put poison pills into the queue