added high-precision scheduler for API processes. This allows also to

make the execution in dependency of available RAM or CPU load. The
default value for CPU load is 4.0 and the check runs once a minute.
pull/1/head
Michael Peter Christen 10 years ago
parent 8aee7f940e
commit 68e8039fd1

@ -638,6 +638,10 @@ collection=user
80_searchresult_busysleep=200 80_searchresult_busysleep=200
80_searchresult_memprereq=0 80_searchresult_memprereq=0
80_searchresult_loadprereq=2.0 80_searchresult_loadprereq=2.0
85_scheduler_idlesleep=60000
85_scheduler_busysleep=60000
85_scheduler_memprereq=1048576
85_scheduler_loadprereq=4.0
90_cleanup_idlesleep=300000 90_cleanup_idlesleep=300000
90_cleanup_busysleep=300000 90_cleanup_busysleep=300000
90_cleanup_memprereq=0 90_cleanup_memprereq=0

@ -126,8 +126,8 @@ public class Table_API_p {
if (action.equals("selminutes") && time > 0 && time < 10) { if (action.equals("selminutes") && time > 0 && time < 10) {
row.put(WorkTables.TABLE_API_COL_APICALL_SCHEDULE_TIME, 10); row.put(WorkTables.TABLE_API_COL_APICALL_SCHEDULE_TIME, 10);
} }
if (action.equals("selminutes") && time > 50) { if (action.equals("selminutes") && time > 59) {
row.put(WorkTables.TABLE_API_COL_APICALL_SCHEDULE_TIME, 50); row.put(WorkTables.TABLE_API_COL_APICALL_SCHEDULE_TIME, 59);
} }
if (action.equals("selhours") && time > 23) { if (action.equals("selhours") && time > 23) {
row.put(WorkTables.TABLE_API_COL_APICALL_SCHEDULE_TIME, 23); row.put(WorkTables.TABLE_API_COL_APICALL_SCHEDULE_TIME, 23);

@ -980,18 +980,31 @@ public final class Switchboard extends serverSwitch {
MemoryControl.gc(10000, "plasmaSwitchboard, help for profiler"); // help for profiler - thq MemoryControl.gc(10000, "plasmaSwitchboard, help for profiler"); // help for profiler - thq
deployThread( deployThread(
SwitchboardConstants.CLEANUP, SwitchboardConstants.CLEANUP,
"Cleanup", "Cleanup",
"simple cleaning process for monitoring information", "cleaning process",
null, null,
new InstantBusyThread( new InstantBusyThread(
this, this,
SwitchboardConstants.CLEANUP_METHOD_START, SwitchboardConstants.CLEANUP_METHOD_START,
SwitchboardConstants.CLEANUP_METHOD_JOBCOUNT, SwitchboardConstants.CLEANUP_METHOD_JOBCOUNT,
SwitchboardConstants.CLEANUP_METHOD_FREEMEM, SwitchboardConstants.CLEANUP_METHOD_FREEMEM,
30000, 30000,
10000), 10000),
60000); // all 10 minutes, wait 1 minute until first run 60000); // all 10 minutes, wait 1 minute until first run
deployThread(
SwitchboardConstants.SCHEDULER,
"Scheduler",
"starts scheduled processes from the API Processing table",
null,
new InstantBusyThread(
this,
SwitchboardConstants.SCHEDULER_METHOD_START,
SwitchboardConstants.SCHEDULER_METHOD_JOBCOUNT,
SwitchboardConstants.SCHEDULER_METHOD_FREEMEM,
30000,
10000),
60000); // all 10 minutes, wait 1 minute until first run
deployThread( deployThread(
SwitchboardConstants.SURROGATES, SwitchboardConstants.SURROGATES,
"Surrogates", "Surrogates",
@ -1149,7 +1162,7 @@ public final class Switchboard extends serverSwitch {
public void run() { public void run() {
Thread.currentThread().setName("Switchboard.setHttpServer"); Thread.currentThread().setName("Switchboard.setHttpServer");
try {Thread.sleep(10000);} catch (final InterruptedException e) {} // needs httpd up try {Thread.sleep(10000);} catch (final InterruptedException e) {} // needs httpd up
execAPIActions(); // trigger startup actions schedulerJob(); // trigger startup actions
} }
}.start(); }.start();
} }
@ -2007,9 +2020,85 @@ public final class Switchboard extends serverSwitch {
CircleTool.clearcache(); CircleTool.clearcache();
NetworkGraph.clearcache(); NetworkGraph.clearcache();
} }
public int schedulerJobSize() {
try {
return this.tables.size(WorkTables.TABLE_API_NAME);
} catch (IOException e) {
ConcurrentLog.logException(e);
return 0;
}
}
public boolean schedulerJob() {
// execute scheduled API actions
Tables.Row row;
final Collection<String> pks = new LinkedHashSet<String>();
final Date now = new Date();
try {
final Iterator<Tables.Row> plainIterator = this.tables.iterator(WorkTables.TABLE_API_NAME);
final Iterator<Tables.Row> mapIterator = Tables.orderBy(plainIterator, -1, WorkTables.TABLE_API_COL_DATE_RECORDING).iterator();
while (mapIterator.hasNext()) {
row = mapIterator.next();
if (row == null) continue;
// select api calls according to scheduler settings
final Date date_next_exec = row.get(WorkTables.TABLE_API_COL_DATE_NEXT_EXEC, (Date) null);
if (date_next_exec != null && now.after(date_next_exec)) pks.add(UTF8.String(row.getPK()));
// select api calls according to event settings
final String kind = row.get(WorkTables.TABLE_API_COL_APICALL_EVENT_KIND, "off");
if (!"off".equals(kind)) {
String action = row.get(WorkTables.TABLE_API_COL_APICALL_EVENT_ACTION, "startup");
if ("startup".equals(action)) {
if (startupAction) {
pks.add(UTF8.String(row.getPK()));
if ("once".equals(kind)) {
row.put(WorkTables.TABLE_API_COL_APICALL_EVENT_KIND, "off");
sb.tables.update(WorkTables.TABLE_API_NAME, row);
}
}
} else try {
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmm");
long d = dateFormat.parse(dateFormat.format(new Date()).substring(0, 8) + action).getTime();
long cycle = getThread(SwitchboardConstants.CLEANUP).getBusySleep();
if (d < System.currentTimeMillis() && System.currentTimeMillis() - d < cycle) {
pks.add(UTF8.String(row.getPK()));
if ("once".equals(kind)) {
row.put(WorkTables.TABLE_API_COL_APICALL_EVENT_KIND, "off");
row.put(WorkTables.TABLE_API_COL_DATE_NEXT_EXEC, "");
sb.tables.update(WorkTables.TABLE_API_NAME, row);
}
}
} catch (final ParseException e) {}
}
}
} catch (final IOException e) {
ConcurrentLog.logException(e);
}
for (final String pk : pks) {
try {
row = this.tables.select(WorkTables.TABLE_API_NAME, UTF8.getBytes(pk));
WorkTables.calculateAPIScheduler(row, true); // calculate next update time
this.tables.update(WorkTables.TABLE_API_NAME, row);
} catch (final Throwable e ) {
ConcurrentLog.logException(e);
continue;
}
}
startupAction = false;
// execute api calls
final Map<String, Integer> callResult = this.tables.execAPICalls("localhost", getLocalPort("port", 8090), pks, getConfig(SwitchboardConstants.ADMIN_ACCOUNT_USER_NAME, "admin"), getConfig(SwitchboardConstants.ADMIN_ACCOUNT_B64MD5, ""));
for ( final Map.Entry<String, Integer> call : callResult.entrySet() ) {
this.log.info("Scheduler executed api call, response " + call.getValue() + ": " + call.getKey());
}
return pks.size() > 0;
}
public int cleanupJobSize() { public int cleanupJobSize() {
int c = 1; // "es gibt immer was zu tun" int c = 1; // run this always!
if ( (this.crawlQueues.delegatedURL.size() > 1000) ) { if ( (this.crawlQueues.delegatedURL.size() > 1000) ) {
c++; c++;
} }
@ -2359,10 +2448,6 @@ public final class Switchboard extends serverSwitch {
this.lastStats = System.currentTimeMillis(); this.lastStats = System.currentTimeMillis();
} catch (IOException e) {} } catch (IOException e) {}
// execute api actions; this must be done after postprocessing because
// these actions may also influence the search index/ call optimize steps
execAPIActions();
// show deadlocks if there are any in the log // show deadlocks if there are any in the log
if (Memory.deadlocks() > 0) Memory.logDeadlocks(); if (Memory.deadlocks() > 0) Memory.logDeadlocks();
@ -2373,73 +2458,6 @@ public final class Switchboard extends serverSwitch {
} }
} }
private void execAPIActions() {
// execute scheduled API actions
Tables.Row row;
final Collection<String> pks = new LinkedHashSet<String>();
final Date now = new Date();
try {
final Iterator<Tables.Row> plainIterator = this.tables.iterator(WorkTables.TABLE_API_NAME);
final Iterator<Tables.Row> mapIterator = Tables.orderBy(plainIterator, -1, WorkTables.TABLE_API_COL_DATE_RECORDING).iterator();
while (mapIterator.hasNext()) {
row = mapIterator.next();
if (row == null) continue;
// select api calls according to scheduler settings
final Date date_next_exec = row.get(WorkTables.TABLE_API_COL_DATE_NEXT_EXEC, (Date) null);
if (date_next_exec != null && now.after(date_next_exec)) pks.add(UTF8.String(row.getPK()));
// select api calls according to event settings
final String kind = row.get(WorkTables.TABLE_API_COL_APICALL_EVENT_KIND, "off");
if (!"off".equals(kind)) {
String action = row.get(WorkTables.TABLE_API_COL_APICALL_EVENT_ACTION, "startup");
if ("startup".equals(action)) {
if (startupAction) {
pks.add(UTF8.String(row.getPK()));
if ("once".equals(kind)) {
row.put(WorkTables.TABLE_API_COL_APICALL_EVENT_KIND, "off");
sb.tables.update(WorkTables.TABLE_API_NAME, row);
}
}
} else try {
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmm");
long d = dateFormat.parse(dateFormat.format(new Date()).substring(0, 8) + action).getTime();
long cycle = getThread(SwitchboardConstants.CLEANUP).getBusySleep();
if (d < System.currentTimeMillis() && System.currentTimeMillis() - d < cycle) {
pks.add(UTF8.String(row.getPK()));
if ("once".equals(kind)) {
row.put(WorkTables.TABLE_API_COL_APICALL_EVENT_KIND, "off");
row.put(WorkTables.TABLE_API_COL_DATE_NEXT_EXEC, "");
sb.tables.update(WorkTables.TABLE_API_NAME, row);
}
}
} catch (final ParseException e) {}
}
}
} catch (final IOException e) {
ConcurrentLog.logException(e);
}
for (final String pk : pks) {
try {
row = this.tables.select(WorkTables.TABLE_API_NAME, UTF8.getBytes(pk));
WorkTables.calculateAPIScheduler(row, true); // calculate next update time
this.tables.update(WorkTables.TABLE_API_NAME, row);
} catch (final Throwable e ) {
ConcurrentLog.logException(e);
continue;
}
}
startupAction = false;
// execute api calls
final Map<String, Integer> callResult = this.tables.execAPICalls("localhost", getLocalPort("port", 8090), pks, getConfig(SwitchboardConstants.ADMIN_ACCOUNT_USER_NAME, "admin"), getConfig(SwitchboardConstants.ADMIN_ACCOUNT_B64MD5, ""));
for ( final Map.Entry<String, Integer> call : callResult.entrySet() ) {
this.log.info("Scheduler executed api call, response " + call.getValue() + ": " + call.getKey());
}
}
/** /**
* With this function the crawling process can be paused * With this function the crawling process can be paused
* *

@ -156,6 +156,17 @@ public final class SwitchboardConstants {
public static final String SEARCHRESULT_METHOD_START = "searchresultProcess"; public static final String SEARCHRESULT_METHOD_START = "searchresultProcess";
public static final String SEARCHRESULT_METHOD_JOBCOUNT = "searchresultQueueSize"; public static final String SEARCHRESULT_METHOD_JOBCOUNT = "searchresultQueueSize";
public static final String SEARCHRESULT_METHOD_FREEMEM = "searchresultFreeMem"; public static final String SEARCHRESULT_METHOD_FREEMEM = "searchresultFreeMem";
// 85_scheduler
/**
* <p><code>public static final String <strong>SCHEDULER</strong> = "85_scheduler"</code></p>
* <p>The cleanup thread which is responsible for the start of scheduled processes from the API table</p>
*/
public static final String SCHEDULER = "85_scheduler";
public static final String SCHEDULER_METHOD_START = "schedulerJob";
public static final String SCHEDULER_METHOD_JOBCOUNT = "schedulerJobSize";
public static final String SCHEDULER_METHOD_FREEMEM = null;
public static final String SCHEDULER_IDLESLEEP = "85_scheduler_idlesleep";
public static final String SCHEDULER_BUSYSLEEP = "85_scheduler_busysleep";
// 90_cleanup // 90_cleanup
/** /**
* <p><code>public static final String <strong>CLEANUP</strong> = "90_cleanup"</code></p> * <p><code>public static final String <strong>CLEANUP</strong> = "90_cleanup"</code></p>

Loading…
Cancel
Save