From 68e8039fd1f04171eefd9eb0c6199a5d48ad844f Mon Sep 17 00:00:00 2001 From: Michael Peter Christen Date: Fri, 14 Nov 2014 10:02:50 +0100 Subject: [PATCH] added high-precision scheduler for API processes. This allows also to make the execution in dependency of available RAM or CPU load. The default value for CPU load is 4.0 and the check runs once a minute. --- defaults/yacy.init | 4 + htroot/Table_API_p.java | 4 +- source/net/yacy/search/Switchboard.java | 188 ++++++++++-------- .../net/yacy/search/SwitchboardConstants.java | 11 + 4 files changed, 120 insertions(+), 87 deletions(-) diff --git a/defaults/yacy.init b/defaults/yacy.init index e436b7465..37911223e 100644 --- a/defaults/yacy.init +++ b/defaults/yacy.init @@ -638,6 +638,10 @@ collection=user 80_searchresult_busysleep=200 80_searchresult_memprereq=0 80_searchresult_loadprereq=2.0 +85_scheduler_idlesleep=60000 +85_scheduler_busysleep=60000 +85_scheduler_memprereq=1048576 +85_scheduler_loadprereq=4.0 90_cleanup_idlesleep=300000 90_cleanup_busysleep=300000 90_cleanup_memprereq=0 diff --git a/htroot/Table_API_p.java b/htroot/Table_API_p.java index badc9787e..e86601925 100644 --- a/htroot/Table_API_p.java +++ b/htroot/Table_API_p.java @@ -126,8 +126,8 @@ public class Table_API_p { if (action.equals("selminutes") && time > 0 && time < 10) { row.put(WorkTables.TABLE_API_COL_APICALL_SCHEDULE_TIME, 10); } - if (action.equals("selminutes") && time > 50) { - row.put(WorkTables.TABLE_API_COL_APICALL_SCHEDULE_TIME, 50); + if (action.equals("selminutes") && time > 59) { + row.put(WorkTables.TABLE_API_COL_APICALL_SCHEDULE_TIME, 59); } if (action.equals("selhours") && time > 23) { row.put(WorkTables.TABLE_API_COL_APICALL_SCHEDULE_TIME, 23); diff --git a/source/net/yacy/search/Switchboard.java b/source/net/yacy/search/Switchboard.java index a634ab755..f687a1940 100644 --- a/source/net/yacy/search/Switchboard.java +++ b/source/net/yacy/search/Switchboard.java @@ -980,18 +980,31 @@ public final class Switchboard extends serverSwitch { MemoryControl.gc(10000, "plasmaSwitchboard, help for profiler"); // help for profiler - thq deployThread( - SwitchboardConstants.CLEANUP, - "Cleanup", - "simple cleaning process for monitoring information", - null, - new InstantBusyThread( - this, - SwitchboardConstants.CLEANUP_METHOD_START, - SwitchboardConstants.CLEANUP_METHOD_JOBCOUNT, - SwitchboardConstants.CLEANUP_METHOD_FREEMEM, - 30000, - 10000), - 60000); // all 10 minutes, wait 1 minute until first run + SwitchboardConstants.CLEANUP, + "Cleanup", + "cleaning process", + null, + new InstantBusyThread( + this, + SwitchboardConstants.CLEANUP_METHOD_START, + SwitchboardConstants.CLEANUP_METHOD_JOBCOUNT, + SwitchboardConstants.CLEANUP_METHOD_FREEMEM, + 30000, + 10000), + 60000); // all 10 minutes, wait 1 minute until first run + deployThread( + SwitchboardConstants.SCHEDULER, + "Scheduler", + "starts scheduled processes from the API Processing table", + null, + new InstantBusyThread( + this, + SwitchboardConstants.SCHEDULER_METHOD_START, + SwitchboardConstants.SCHEDULER_METHOD_JOBCOUNT, + SwitchboardConstants.SCHEDULER_METHOD_FREEMEM, + 30000, + 10000), + 60000); // all 10 minutes, wait 1 minute until first run deployThread( SwitchboardConstants.SURROGATES, "Surrogates", @@ -1149,7 +1162,7 @@ public final class Switchboard extends serverSwitch { public void run() { Thread.currentThread().setName("Switchboard.setHttpServer"); try {Thread.sleep(10000);} catch (final InterruptedException e) {} // needs httpd up - execAPIActions(); // trigger startup actions + schedulerJob(); // trigger startup actions } }.start(); } @@ -2007,9 +2020,85 @@ public final class Switchboard extends serverSwitch { CircleTool.clearcache(); NetworkGraph.clearcache(); } + + public int schedulerJobSize() { + try { + return this.tables.size(WorkTables.TABLE_API_NAME); + } catch (IOException e) { + ConcurrentLog.logException(e); + return 0; + } + } + + public boolean schedulerJob() { + + // execute scheduled API actions + Tables.Row row; + final Collection pks = new LinkedHashSet(); + final Date now = new Date(); + try { + final Iterator plainIterator = this.tables.iterator(WorkTables.TABLE_API_NAME); + final Iterator mapIterator = Tables.orderBy(plainIterator, -1, WorkTables.TABLE_API_COL_DATE_RECORDING).iterator(); + while (mapIterator.hasNext()) { + row = mapIterator.next(); + if (row == null) continue; + + // select api calls according to scheduler settings + final Date date_next_exec = row.get(WorkTables.TABLE_API_COL_DATE_NEXT_EXEC, (Date) null); + if (date_next_exec != null && now.after(date_next_exec)) pks.add(UTF8.String(row.getPK())); + + // select api calls according to event settings + final String kind = row.get(WorkTables.TABLE_API_COL_APICALL_EVENT_KIND, "off"); + if (!"off".equals(kind)) { + String action = row.get(WorkTables.TABLE_API_COL_APICALL_EVENT_ACTION, "startup"); + if ("startup".equals(action)) { + if (startupAction) { + pks.add(UTF8.String(row.getPK())); + if ("once".equals(kind)) { + row.put(WorkTables.TABLE_API_COL_APICALL_EVENT_KIND, "off"); + sb.tables.update(WorkTables.TABLE_API_NAME, row); + } + } + } else try { + SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmm"); + long d = dateFormat.parse(dateFormat.format(new Date()).substring(0, 8) + action).getTime(); + long cycle = getThread(SwitchboardConstants.CLEANUP).getBusySleep(); + if (d < System.currentTimeMillis() && System.currentTimeMillis() - d < cycle) { + pks.add(UTF8.String(row.getPK())); + if ("once".equals(kind)) { + row.put(WorkTables.TABLE_API_COL_APICALL_EVENT_KIND, "off"); + row.put(WorkTables.TABLE_API_COL_DATE_NEXT_EXEC, ""); + sb.tables.update(WorkTables.TABLE_API_NAME, row); + } + } + } catch (final ParseException e) {} + } + } + } catch (final IOException e) { + ConcurrentLog.logException(e); + } + for (final String pk : pks) { + try { + row = this.tables.select(WorkTables.TABLE_API_NAME, UTF8.getBytes(pk)); + WorkTables.calculateAPIScheduler(row, true); // calculate next update time + this.tables.update(WorkTables.TABLE_API_NAME, row); + } catch (final Throwable e ) { + ConcurrentLog.logException(e); + continue; + } + } + startupAction = false; + + // execute api calls + final Map callResult = this.tables.execAPICalls("localhost", getLocalPort("port", 8090), pks, getConfig(SwitchboardConstants.ADMIN_ACCOUNT_USER_NAME, "admin"), getConfig(SwitchboardConstants.ADMIN_ACCOUNT_B64MD5, "")); + for ( final Map.Entry call : callResult.entrySet() ) { + this.log.info("Scheduler executed api call, response " + call.getValue() + ": " + call.getKey()); + } + return pks.size() > 0; + } public int cleanupJobSize() { - int c = 1; // "es gibt immer was zu tun" + int c = 1; // run this always! if ( (this.crawlQueues.delegatedURL.size() > 1000) ) { c++; } @@ -2359,10 +2448,6 @@ public final class Switchboard extends serverSwitch { this.lastStats = System.currentTimeMillis(); } catch (IOException e) {} - // execute api actions; this must be done after postprocessing because - // these actions may also influence the search index/ call optimize steps - execAPIActions(); - // show deadlocks if there are any in the log if (Memory.deadlocks() > 0) Memory.logDeadlocks(); @@ -2373,73 +2458,6 @@ public final class Switchboard extends serverSwitch { } } - private void execAPIActions() { - - // execute scheduled API actions - Tables.Row row; - final Collection pks = new LinkedHashSet(); - final Date now = new Date(); - - try { - final Iterator plainIterator = this.tables.iterator(WorkTables.TABLE_API_NAME); - final Iterator mapIterator = Tables.orderBy(plainIterator, -1, WorkTables.TABLE_API_COL_DATE_RECORDING).iterator(); - while (mapIterator.hasNext()) { - row = mapIterator.next(); - if (row == null) continue; - - // select api calls according to scheduler settings - final Date date_next_exec = row.get(WorkTables.TABLE_API_COL_DATE_NEXT_EXEC, (Date) null); - if (date_next_exec != null && now.after(date_next_exec)) pks.add(UTF8.String(row.getPK())); - - // select api calls according to event settings - final String kind = row.get(WorkTables.TABLE_API_COL_APICALL_EVENT_KIND, "off"); - if (!"off".equals(kind)) { - String action = row.get(WorkTables.TABLE_API_COL_APICALL_EVENT_ACTION, "startup"); - if ("startup".equals(action)) { - if (startupAction) { - pks.add(UTF8.String(row.getPK())); - if ("once".equals(kind)) { - row.put(WorkTables.TABLE_API_COL_APICALL_EVENT_KIND, "off"); - sb.tables.update(WorkTables.TABLE_API_NAME, row); - } - } - } else try { - SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmm"); - long d = dateFormat.parse(dateFormat.format(new Date()).substring(0, 8) + action).getTime(); - long cycle = getThread(SwitchboardConstants.CLEANUP).getBusySleep(); - if (d < System.currentTimeMillis() && System.currentTimeMillis() - d < cycle) { - pks.add(UTF8.String(row.getPK())); - if ("once".equals(kind)) { - row.put(WorkTables.TABLE_API_COL_APICALL_EVENT_KIND, "off"); - row.put(WorkTables.TABLE_API_COL_DATE_NEXT_EXEC, ""); - sb.tables.update(WorkTables.TABLE_API_NAME, row); - } - } - } catch (final ParseException e) {} - } - } - } catch (final IOException e) { - ConcurrentLog.logException(e); - } - for (final String pk : pks) { - try { - row = this.tables.select(WorkTables.TABLE_API_NAME, UTF8.getBytes(pk)); - WorkTables.calculateAPIScheduler(row, true); // calculate next update time - this.tables.update(WorkTables.TABLE_API_NAME, row); - } catch (final Throwable e ) { - ConcurrentLog.logException(e); - continue; - } - } - startupAction = false; - - // execute api calls - final Map callResult = this.tables.execAPICalls("localhost", getLocalPort("port", 8090), pks, getConfig(SwitchboardConstants.ADMIN_ACCOUNT_USER_NAME, "admin"), getConfig(SwitchboardConstants.ADMIN_ACCOUNT_B64MD5, "")); - for ( final Map.Entry call : callResult.entrySet() ) { - this.log.info("Scheduler executed api call, response " + call.getValue() + ": " + call.getKey()); - } - } - /** * With this function the crawling process can be paused * diff --git a/source/net/yacy/search/SwitchboardConstants.java b/source/net/yacy/search/SwitchboardConstants.java index fbf162028..88c4873fa 100644 --- a/source/net/yacy/search/SwitchboardConstants.java +++ b/source/net/yacy/search/SwitchboardConstants.java @@ -156,6 +156,17 @@ public final class SwitchboardConstants { public static final String SEARCHRESULT_METHOD_START = "searchresultProcess"; public static final String SEARCHRESULT_METHOD_JOBCOUNT = "searchresultQueueSize"; public static final String SEARCHRESULT_METHOD_FREEMEM = "searchresultFreeMem"; + // 85_scheduler + /** + *

public static final String SCHEDULER = "85_scheduler"

+ *

The cleanup thread which is responsible for the start of scheduled processes from the API table

+ */ + public static final String SCHEDULER = "85_scheduler"; + public static final String SCHEDULER_METHOD_START = "schedulerJob"; + public static final String SCHEDULER_METHOD_JOBCOUNT = "schedulerJobSize"; + public static final String SCHEDULER_METHOD_FREEMEM = null; + public static final String SCHEDULER_IDLESLEEP = "85_scheduler_idlesleep"; + public static final String SCHEDULER_BUSYSLEEP = "85_scheduler_busysleep"; // 90_cleanup /** *

public static final String CLEANUP = "90_cleanup"