77
const int MIN_SECONDS_TO_SEND = 0;
78
const int MAX_SECONDS_TO_SEND = (28*SECONDS_IN_DAY);
80
// return a number that
81
// - is the # of CPUs in EDF simulation
82
// - scales the daily result quota
83
// - scales max_wus_in_progress
85
inline int effective_ncpus(SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply) {
86
int ncpus = reply.host.p_ncpus;
87
if (ncpus > config.max_ncpus) ncpus = config.max_ncpus;
88
if (ncpus < 1) ncpus = 1;
89
if (config.have_cuda_apps) {
90
COPROC* cp = sreq.coprocs.lookup("cuda");
91
if (cp && cp->count > ncpus) {
98
const double DEFAULT_RAM_SIZE = 64000000;
99
// if host sends us an impossible RAM size, use this instead
101
bool SCHEDULER_REQUEST::has_version(APP& app) {
104
for (i=0; i<client_app_versions.size(); i++) {
105
CLIENT_APP_VERSION& cav = client_app_versions[i];
106
if (!strcmp(cav.app_name, app.name) && cav.version_num >= app.min_version) {
113
// return BEST_APP_VERSION for the given host, or NULL if none
116
BEST_APP_VERSION* get_app_version(
117
SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply, WORKUNIT& wu
122
BEST_APP_VERSION* bavp;
126
// see if app is already in memoized array
128
for (i=0; i<reply.wreq.best_app_versions.size(); i++) {
129
bavp = reply.wreq.best_app_versions[i];
130
if (bavp->appid == wu.appid) {
131
if (!bavp->avp) return NULL;
136
APP* app = ssp->lookup_app(wu.appid);
138
log_messages.printf(MSG_CRITICAL, "WU refers to nonexistent app: %d\n", wu.appid);
142
bavp = new BEST_APP_VERSION;
143
bavp->appid = wu.appid;
144
if (anonymous(sreq.platforms.list[0])) {
145
found = sreq.has_version(*app);
147
if (config.debug_send) {
148
log_messages.printf(MSG_DEBUG,
149
"Didn't find anonymous platform app for %s\n", app->name
152
"Your app_info.xml file doesn't have a version of %s.",
153
app->user_friendly_name
155
USER_MESSAGE um(message, "high");
156
reply.wreq.insert_no_work_message(um);
157
reply.wreq.no_app_version = true;
161
if (config.debug_send) {
162
log_messages.printf(MSG_DEBUG,
163
"Found anonymous platform app for %s\n", app->name
166
// TODO: anonymous platform apps should be able to tell us
167
// how fast they are and how many CPUs and coprocs they use.
168
// For now, assume they use 1 CPU
170
bavp->host_usage.sequential_app(reply.host.p_fpops);
171
bavp->avp = (APP_VERSION*)1; // arbitrary nonzero value;
172
// means the client already has the app version
174
reply.wreq.best_app_versions.push_back(bavp);
175
if (!bavp->avp) return NULL;
179
// Go through the client's platforms.
180
// Scan the app versions for each platform.
181
// Find the one with highest expected FLOPS
183
bavp->host_usage.flops = 0;
185
for (i=0; i<sreq.platforms.list.size(); i++) {
186
PLATFORM* p = sreq.platforms.list[i];
187
for (j=0; j<ssp->napp_versions; j++) {
188
HOST_USAGE host_usage;
189
APP_VERSION& av = ssp->app_versions[j];
190
if (av.appid != wu.appid) continue;
191
if (av.platformid != p->id) continue;
192
if (sreq.core_client_version < av.min_core_version) {
193
log_messages.printf(MSG_NORMAL,
194
"outdated client version %d < min core version %d\n",
195
sreq.core_client_version, av.min_core_version
197
reply.wreq.outdated_core = true;
200
if (strlen(av.plan_class)) {
201
if (!sreq.client_cap_plan_class) continue;
202
if (!app_plan(sreq, av.plan_class, host_usage)) {
206
host_usage.sequential_app(reply.host.p_fpops);
208
if (host_usage.flops > bavp->host_usage.flops) {
209
bavp->host_usage = host_usage;
214
reply.wreq.best_app_versions.push_back(bavp);
216
if (config.debug_version_select) {
217
log_messages.printf(MSG_DEBUG,
218
"Best version of app %s is %d (%.2f GFLOPS)\n",
219
app->name, bavp->avp->id, bavp->host_usage.flops/1e9
83
const double MIN_REQ_SECS = 0;
84
const double MAX_REQ_SECS = (28*SECONDS_IN_DAY);
86
const int MAX_GPUS = 8;
87
// don't believe clients who claim they have more GPUs than this
94
void WORK_REQ::get_job_limits() {
96
n = g_reply->host.p_ncpus;
97
if (g_request->global_prefs.max_ncpus_pct && g_request->global_prefs.max_ncpus_pct < 100) {
98
n = (int)((n*g_request->global_prefs.max_ncpus_pct)/100.);
100
if (n > config.max_ncpus) n = config.max_ncpus;
104
n = g_request->coprocs.cuda.count + g_request->coprocs.ati.count;
105
if (n > MAX_GPUS) n = MAX_GPUS;
108
int mult = effective_ncpus + config.gpu_multiplier * effective_ngpus;
109
if (config.non_cpu_intensive) {
112
if (effective_ngpus) effective_ngpus = 1;
115
if (config.max_wus_to_send) {
116
g_wreq->max_jobs_per_rpc = mult * config.max_wus_to_send;
223
// here if no app version exists
225
if (config.debug_version_select) {
226
log_messages.printf(MSG_DEBUG,
227
"no app version available: APP#%d PLATFORM#%d min_version %d\n",
228
app->id, sreq.platforms.list[0]->id, app->min_version
232
"%s is not available for your type of computer.",
233
app->user_friendly_name
118
g_wreq->max_jobs_per_rpc = 999999;
121
config.max_jobs_in_progress.reset(g_reply->host, g_request->coprocs);
123
if (config.debug_quota) {
124
log_messages.printf(MSG_NORMAL,
125
"[quota] max jobs per RPC: %d\n",
126
g_wreq->max_jobs_per_rpc
235
USER_MESSAGE um(message, "high");
236
reply.wreq.insert_no_work_message(um);
237
reply.wreq.no_app_version = true;
128
config.max_jobs_in_progress.print_log();
243
132
static const char* find_user_friendly_name(int appid) {
244
APP* app = ssp->lookup_app(appid);
245
if (app) return app->user_friendly_name;
133
APP* app = ssp->lookup_app(appid);
134
if (app) return app->user_friendly_name;
246
135
return "deprecated application";
304
193
// We can only honor the min_free pref.
306
195
x = host.d_free - prefs.disk_min_free_gb*GIGA; // may be negative
307
reply.disk_limits.min_free = x;
196
g_reply->disk_limits.min_free = x;
308
197
x1 = x2 = x3 = 0;
312
201
if (config.debug_send) {
313
log_messages.printf(MSG_DEBUG,
314
"Insufficient disk: disk_max_used_gb %f disk_max_used_pct %f disk_min_free_gb %f\n",
315
prefs.disk_max_used_gb, prefs.disk_max_used_pct,
316
prefs.disk_min_free_gb
318
log_messages.printf(MSG_DEBUG,
319
"Insufficient disk: host.d_total %f host.d_free %f host.d_boinc_used_total %f\n",
320
host.d_total, host.d_free, host.d_boinc_used_total
322
log_messages.printf(MSG_DEBUG,
323
"Insufficient disk: x1 %f x2 %f x3 %f x %f\n",
202
log_messages.printf(MSG_NORMAL,
203
"[send] No disk space available: disk_max_used_gb %.2fGB disk_max_used_pct %.2f disk_min_free_gb %.2fGB\n",
204
prefs.disk_max_used_gb/GIGA,
205
prefs.disk_max_used_pct,
206
prefs.disk_min_free_gb/GIGA
208
log_messages.printf(MSG_NORMAL,
209
"[send] No disk space available: host.d_total %.2fGB host.d_free %.2fGB host.d_boinc_used_total %.2fGB\n",
212
host.d_boinc_used_total/GIGA
214
log_messages.printf(MSG_NORMAL,
215
"[send] No disk space available: x1 %.2fGB x2 %.2fGB x3 %.2fGB x %.2fGB\n",
216
x1/GIGA, x2/GIGA, x3/GIGA, x/GIGA
327
reply.wreq.disk.set_insufficient(-x);
219
g_wreq->disk.set_insufficient(-x);
332
// if a host has active_frac < 0.1, assume 0.1 so we don't deprive it of work.
334
const double HOST_ACTIVE_FRAC_MIN = 0.1;
336
// estimate the number of CPU seconds that a workunit requires
337
// running on this host.
339
double estimate_cpu_duration(WORKUNIT& wu, SCHEDULER_REPLY& reply) {
340
double p_fpops = reply.host.p_fpops;
341
if (p_fpops <= 0) p_fpops = 1e9;
225
static double estimate_duration_unscaled(WORKUNIT& wu, BEST_APP_VERSION& bav) {
342
226
double rsc_fpops_est = wu.rsc_fpops_est;
343
227
if (rsc_fpops_est <= 0) rsc_fpops_est = 1e12;
344
return rsc_fpops_est/p_fpops;
228
return rsc_fpops_est/bav.host_usage.projected_flops;
231
static inline void get_running_frac() {
233
if (g_request->core_client_version<=41900) {
234
rf = g_reply->host.on_frac;
236
rf = g_reply->host.active_frac * g_reply->host.on_frac;
239
// clamp running_frac to a reasonable range
242
if (config.debug_send) {
243
log_messages.printf(MSG_NORMAL, "[send] running_frac=%f; setting to 1\n", rf);
246
} else if (rf < .1) {
247
if (config.debug_send) {
248
log_messages.printf(MSG_NORMAL, "[send] running_frac=%f; setting to 0.1\n", rf);
252
g_wreq->running_frac = rf;
347
255
// estimate the amount of real time to complete this WU,
348
256
// taking into account active_frac etc.
349
257
// Note: don't factor in resource_share_fraction.
350
// The core client no longer necessarily does round-robin
351
// across all projects.
258
// The core client doesn't necessarily round-robin across all projects.
353
static double estimate_wallclock_duration(
354
WORKUNIT& wu, SCHEDULER_REQUEST&, SCHEDULER_REPLY& reply
356
double ecd = estimate_cpu_duration(wu, reply);
357
double ewd = ecd/reply.wreq.running_frac;
358
if (reply.host.duration_correction_factor) {
359
ewd *= reply.host.duration_correction_factor;
260
double estimate_duration(WORKUNIT& wu, BEST_APP_VERSION& bav) {
261
double edu = estimate_duration_unscaled(wu, bav);
262
double ed = edu/g_wreq->running_frac;
361
263
if (config.debug_send) {
362
log_messages.printf(MSG_DEBUG,
363
"est cpu dur %f; est wall dur %f\n", ecd, ewd
264
log_messages.printf(MSG_NORMAL,
265
"[send] est. duration for WU %d: unscaled %.2f scaled %.2f\n",
369
// Find or compute various info about the host;
370
// this info affects which jobs are sent to the host.
372
static int get_host_info(SCHEDULER_REPLY& reply) {
272
static void get_prefs_info() {
375
275
unsigned int pos = 0;
379
extract_venue(reply.user.project_prefs, reply.host.venue, buf);
279
extract_venue(g_reply->user.project_prefs, g_reply->host.venue, buf);
382
282
// scan user's project prefs for elements of the form <app_id>N</app_id>,
383
283
// indicating the apps they want to run.
385
reply.wreq.host_info.preferred_apps.clear();
285
g_wreq->preferred_apps.clear();
386
286
while (parse_int(str.substr(pos,str.length()-pos).c_str(), "<app_id>", temp_int)) {
388
288
ai.appid = temp_int;
389
289
ai.work_available = false;
390
reply.wreq.host_info.preferred_apps.push_back(ai);
290
g_wreq->preferred_apps.push_back(ai);
392
292
pos = str.find("<app_id>", pos) + 1;
394
if (parse_bool(buf,"allow_non_preferred_apps", flag)) {
395
reply.wreq.host_info.allow_non_preferred_apps = flag;
397
if (parse_bool(buf,"allow_beta_work", flag)) {
398
reply.wreq.host_info.allow_beta_work = flag;
401
// Decide whether or not this computer is 'reliable'
402
// A computer is reliable if the following conditions are true
403
// (for those that are set in the config file)
404
// 1) The host average turnaround is less than the config
405
// max average turnaround
406
// 2) The host error rate is less then the config max error rate
407
// 3) The host results per day is equal to the config file value
409
double expavg_credit = reply.host.expavg_credit;
410
double expavg_time = reply.host.expavg_time;
411
update_average(0, 0, CREDIT_HALF_LIFE, expavg_credit, expavg_time);
413
// Platforms other then Windows, Linux and Intel Macs need a
294
if (parse_bool(buf,"allow_non_preferred_apps", flag)) {
295
g_wreq->allow_non_preferred_apps = flag;
297
if (parse_bool(buf,"allow_beta_work", flag)) {
298
g_wreq->allow_beta_work = flag;
300
if (parse_bool(buf,"no_gpus", flag)) {
301
// deprecated, but need to handle
303
g_wreq->no_cuda = true;
304
g_wreq->no_ati = true;
307
if (parse_bool(buf,"no_cpu", flag)) {
308
g_wreq->no_cpu = flag;
310
if (parse_bool(buf,"no_cuda", flag)) {
311
g_wreq->no_cuda = flag;
313
if (parse_bool(buf,"no_ati", flag)) {
314
g_wreq->no_ati = flag;
318
// Decide whether or not this app version is 'reliable'
319
// An app version is reliable if the following conditions are true
320
// (for those that are set in the config file)
321
// 1) The host average turnaround is less than a threshold
322
// 2) consecutive_valid is above a threshold
323
// 3) The host results per day is equal to the max value
325
void get_reliability_version(HOST_APP_VERSION& hav, double multiplier) {
326
if (hav.turnaround.n > MIN_HOST_SAMPLES && config.reliable_max_avg_turnaround) {
328
if (hav.turnaround.get_avg() > config.reliable_max_avg_turnaround*multiplier) {
329
if (config.debug_send) {
330
log_messages.printf(MSG_NORMAL,
331
"[send] [AV#%d] not reliable; avg turnaround: %.3f > %.3f hrs\n",
333
hav.turnaround.get_avg()/3600,
334
config.reliable_max_avg_turnaround*multiplier/3600
337
hav.reliable = false;
341
if (hav.consecutive_valid < CONS_VALID_RELIABLE) {
342
if (config.debug_send) {
343
log_messages.printf(MSG_NORMAL,
344
"[send] [AV#%d] not reliable; cons valid %d < %d\n",
346
hav.consecutive_valid, CONS_VALID_RELIABLE
349
hav.reliable = false;
352
if (config.daily_result_quota) {
353
if (hav.max_jobs_per_day < config.daily_result_quota) {
354
if (config.debug_send) {
355
log_messages.printf(MSG_NORMAL,
356
"[send] [AV#%d] not reliable; max_jobs_per_day %d>%d\n",
358
hav.max_jobs_per_day,
359
config.daily_result_quota
362
hav.reliable = false;
367
if (config.debug_send) {
368
log_messages.printf(MSG_NORMAL,
369
"[send] [HOST#%d] app version %d is reliable\n",
370
g_reply->host.id, hav.app_version_id
373
g_wreq->has_reliable_version = true;
376
// decide whether do unreplicated jobs with this app version
378
static void set_trust(DB_HOST_APP_VERSION& hav) {
380
if (hav.consecutive_valid < CONS_VALID_UNREPLICATED) {
381
if (config.debug_send) {
382
log_messages.printf(MSG_NORMAL,
383
"[send] set_trust: cons valid %d < %d, don't use single replication\n",
384
hav.consecutive_valid, CONS_VALID_UNREPLICATED
389
double x = 1./hav.consecutive_valid;
390
if (drand() > x) hav.trusted = true;
391
if (config.debug_send) {
392
log_messages.printf(MSG_NORMAL,
393
"[send] set_trust: random choice for cons valid %d: %s\n",
394
hav.consecutive_valid, hav.trusted?"yes":"no"
399
static void update_quota(DB_HOST_APP_VERSION& hav) {
400
if (config.daily_result_quota) {
401
if (hav.max_jobs_per_day == 0) {
402
hav.max_jobs_per_day = config.daily_result_quota;
403
if (config.debug_quota) {
404
log_messages.printf(MSG_NORMAL,
405
"[quota] [HAV#%d] Initializing max_results_day to %d\n",
407
config.daily_result_quota
413
if (g_request->last_rpc_dayofyear != g_request->current_rpc_dayofyear) {
414
if (config.debug_quota) {
415
log_messages.printf(MSG_NORMAL,
416
"[quota] [HOST#%d] [HAV#%d] Resetting n_jobs_today\n",
417
g_reply->host.id, hav.app_version_id
420
hav.n_jobs_today = 0;
424
void update_n_jobs_today() {
425
for (unsigned int i=0; i<g_wreq->host_app_versions.size(); i++) {
426
DB_HOST_APP_VERSION& hav = g_wreq->host_app_versions[i];
431
static void get_reliability_and_trust() {
432
// Platforms other than Windows, Linux and Intel Macs need a
414
433
// larger set of computers to be marked reliable
416
435
double multiplier = 1.0;
417
if (strstr(reply.host.os_name,"Windows")
418
|| strstr(reply.host.os_name,"Linux")
419
|| (strstr(reply.host.os_name,"Darwin")
420
&& !(strstr(reply.host.p_vendor,"Power Macintosh"))
436
if (strstr(g_reply->host.os_name,"Windows")
437
|| strstr(g_reply->host.os_name,"Linux")
438
|| (strstr(g_reply->host.os_name,"Darwin")
439
&& !(strstr(g_reply->host.p_vendor,"Power Macintosh"))
427
if ((config.reliable_max_avg_turnaround == 0 || reply.host.avg_turnaround < config.reliable_max_avg_turnaround*multiplier)
428
&& (config.reliable_max_error_rate == 0 || reply.host.error_rate < config.reliable_max_error_rate*multiplier)
429
&& (config.daily_result_quota == 0 || reply.host.max_results_day >= config.daily_result_quota)
431
reply.wreq.host_info.reliable = true;
433
if (config.debug_send) {
434
log_messages.printf(MSG_DEBUG,
435
"[HOST#%d] is%s reliable (OS = %s) error_rate = %.6f avg_turn_hrs = %.3f \n",
437
reply.wreq.host_info.reliable?"":" not",
438
reply.host.os_name, reply.host.error_rate,
439
reply.host.avg_turnaround/3600
446
for (unsigned int i=0; i<g_wreq->host_app_versions.size(); i++) {
447
DB_HOST_APP_VERSION& hav = g_wreq->host_app_versions[i];
448
get_reliability_version(hav, multiplier);
445
453
// Return true if the user has set application preferences,
446
454
// and this job is not for a selected app
448
bool app_not_selected(
449
WORKUNIT& wu, SCHEDULER_REQUEST& , SCHEDULER_REPLY& reply
456
bool app_not_selected(WORKUNIT& wu) {
453
if (reply.wreq.host_info.preferred_apps.size() == 0) return false;
454
for (i=0; i<reply.wreq.host_info.preferred_apps.size(); i++) {
455
if (wu.appid == reply.wreq.host_info.preferred_apps[i].appid) {
456
reply.wreq.host_info.preferred_apps[i].work_available = true;
459
if (g_wreq->preferred_apps.size() == 0) return false;
460
for (i=0; i<g_wreq->preferred_apps.size(); i++) {
461
if (wu.appid == g_wreq->preferred_apps[i].appid) {
462
g_wreq->preferred_apps[i].work_available = true;
841
957
// and we haven't exceeded result per RPC limit,
842
958
// and we haven't exceeded results per day limit
845
SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply, bool locality_sched
960
bool work_needed(bool locality_sched) {
847
961
if (locality_sched) {
848
962
// if we've failed to send a result because of a transient condition,
849
963
// return false to preserve invariant
851
if (reply.wreq.disk.insufficient || reply.wreq.speed.insufficient || reply.wreq.mem.insufficient || reply.wreq.no_allowed_apps_available) {
855
if (reply.wreq.seconds_to_fill <= 0) return false;
856
if (reply.wreq.disk_available <= 0) {
859
if (reply.wreq.nresults >= config.max_wus_to_send) return false;
861
int ncpus = effective_ncpus(sreq, reply);
863
// host.max_results_day is between 1 and config.daily_result_quota inclusive
864
// wreq.daily_result_quota is between ncpus
865
// and ncpus*host.max_results_day inclusive
867
if (config.daily_result_quota) {
868
if (reply.host.max_results_day == 0 || reply.host.max_results_day>config.daily_result_quota) {
869
reply.host.max_results_day = config.daily_result_quota;
871
reply.wreq.daily_result_quota = ncpus*reply.host.max_results_day;
872
if (reply.host.nresults_today >= reply.wreq.daily_result_quota) {
873
reply.wreq.daily_result_quota_exceeded = true;
878
if (config.max_wus_in_progress) {
879
if (reply.wreq.nresults_on_host >= config.max_wus_in_progress*ncpus) {
965
if (g_wreq->disk.insufficient || g_wreq->speed.insufficient || g_wreq->mem.insufficient || g_wreq->no_allowed_apps_available) {
880
966
if (config.debug_send) {
881
log_messages.printf(MSG_DEBUG,
882
"in-progress job limit exceeded; %d > %d*%d\n",
883
reply.wreq.nresults_on_host, config.max_wus_in_progress, ncpus
967
log_messages.printf(MSG_NORMAL,
968
"[send] stopping work search - locality condition\n"
886
reply.wreq.cache_size_exceeded = true;
893
void SCHEDULER_REPLY::got_good_result() {
894
host.max_results_day *= 2;
895
if (host.max_results_day > config.daily_result_quota) {
896
host.max_results_day = config.daily_result_quota;
900
void SCHEDULER_REPLY::got_bad_result() {
901
host.max_results_day -= 1;
902
if (host.max_results_day < 1) {
903
host.max_results_day = 1;
975
// see if we've reached limits on in-progress jobs
977
bool some_type_allowed = false;
978
if (config.max_jobs_in_progress.exceeded(NULL, true)) {
979
if (config.debug_quota) {
980
log_messages.printf(MSG_NORMAL,
981
"[quota] reached limit on GPU jobs in progress\n"
984
g_wreq->clear_gpu_req();
985
if (g_wreq->effective_ngpus) {
986
g_wreq->max_jobs_on_host_gpu_exceeded = true;
989
some_type_allowed = true;
991
if (config.max_jobs_in_progress.exceeded(NULL, false)) {
992
if (config.debug_quota) {
993
log_messages.printf(MSG_NORMAL,
994
"[quota] reached limit on CPU jobs in progress\n"
997
g_wreq->clear_cpu_req();
998
g_wreq->max_jobs_on_host_cpu_exceeded = true;
1000
some_type_allowed = true;
1002
if (!some_type_allowed) {
1003
if (config.debug_send) {
1004
log_messages.printf(MSG_NORMAL,
1005
"[send] in-progress job limit exceeded\n"
1008
g_wreq->max_jobs_on_host_exceeded = true;
1012
// see if we've reached max jobs per RPC
1014
if (g_wreq->njobs_sent >= g_wreq->max_jobs_per_rpc) {
1015
if (config.debug_quota) {
1016
log_messages.printf(MSG_NORMAL,
1017
"[quota] stopping work search - njobs %d >= max_jobs_per_rpc %d\n",
1018
g_wreq->njobs_sent, g_wreq->max_jobs_per_rpc
1025
log_messages.printf(MSG_NORMAL,
1026
"work_needed: spec req %d sec to fill %.2f; CPU (%.2f, %.2f) CUDA (%.2f, %.2f) ATI(%.2f, %.2f)\n",
1027
g_wreq->rsc_spec_request,
1028
g_wreq->seconds_to_fill,
1029
g_wreq->cpu_req_secs, g_wreq->cpu_req_instances,
1030
g_wreq->cuda_req_secs, g_wreq->cuda_req_instances,
1031
g_wreq->ati_req_secs, g_wreq->ati_req_instances
1034
if (g_wreq->rsc_spec_request) {
1035
if (g_wreq->need_cpu()) {
1038
if (g_wreq->need_cuda()) {
1041
if (g_wreq->need_ati()) {
1045
if (g_wreq->seconds_to_fill > 0) {
1049
if (config.debug_send) {
1050
log_messages.printf(MSG_NORMAL, "[send] don't need more work\n");
1055
// return the app version ID, or -2/-3/-4 if anonymous platform
1057
inline static int get_app_version_id(BEST_APP_VERSION* bavp) {
1059
return bavp->avp->id;
1061
return bavp->cavp->host_usage.resource_type();
907
1065
int add_result_to_reply(
908
DB_RESULT& result, WORKUNIT& wu, SCHEDULER_REQUEST& request,
909
SCHEDULER_REPLY& reply, BEST_APP_VERSION* bavp
1066
DB_RESULT& result, WORKUNIT& wu, BEST_APP_VERSION* bavp,
1067
bool locality_scheduling
912
double wu_seconds_filled;
913
1070
bool resent_result = false;
914
1071
APP* app = ssp->lookup_app(wu.appid);
916
retval = add_wu_to_reply(wu, reply, app, bavp);
1073
retval = add_wu_to_reply(wu, *g_reply, app, bavp);
917
1074
if (retval) return retval;
919
// in the scheduling locality case,
920
// reduce the available space by LESS than the workunit rsc_disk_bound,
921
// IF the host already has the file OR the file was not already sent.
1076
// Adjust available disk space.
1077
// In the scheduling locality case,
1078
// reduce the available space by less than the workunit rsc_disk_bound,
1079
// if the host already has the file or the file was not already sent.
923
if (!config.locality_scheduling ||
924
decrement_disk_space_locality(wu, request, reply)
926
reply.wreq.disk_available -= wu.rsc_disk_bound;
1081
if (!locality_scheduling || decrement_disk_space_locality(wu)) {
1082
g_wreq->disk_available -= wu.rsc_disk_bound;
929
1085
// update the result in DB
931
result.hostid = reply.host.id;
932
result.userid = reply.user.id;
1087
result.hostid = g_reply->host.id;
1088
result.userid = g_reply->user.id;
933
1089
result.sent_time = time(0);
1090
result.report_deadline = result.sent_time + wu.delay_bound;
1091
result.flops_estimate = bavp->host_usage.peak_flops;
1092
result.app_version_id = get_app_version_id(bavp);
934
1093
int old_server_state = result.server_state;
936
int delay_bound = wu.delay_bound;
937
1095
if (result.server_state != RESULT_SERVER_STATE_IN_PROGRESS) {
938
// We are sending this result for the first time
940
// If the workunit needs reliable and is being sent to a reliable host,
941
// then shorten the delay bound by the percent specified
943
if (config.reliable_on_priority && result.priority >= config.reliable_on_priority && config.reliable_reduced_delay_bound > 0.01
945
double reduced_delay_bound = delay_bound*config.reliable_reduced_delay_bound;
946
double est_wallclock_duration = estimate_wallclock_duration(wu, request, reply);
947
// Check to see how reasonable this reduced time is.
948
// Increase it to twice the estimated delay bound
949
// if all the following apply:
951
// 1) Twice the estimate is longer then the reduced delay bound
952
// 2) Twice the estimate is less then the original delay bound
953
// 3) Twice the estimate is less then the twice the reduced delay bound
954
if (est_wallclock_duration*2 > reduced_delay_bound && est_wallclock_duration*2 < delay_bound && est_wallclock_duration*2 < delay_bound*config.reliable_reduced_delay_bound*2 ) {
955
reduced_delay_bound = est_wallclock_duration*2;
957
delay_bound = (int) reduced_delay_bound;
960
result.report_deadline = result.sent_time + delay_bound;
1096
// We're sending this result for the first time
961
1098
result.server_state = RESULT_SERVER_STATE_IN_PROGRESS;
963
1100
// Result was already sent to this host but was lost,
964
// so we are resending it.
1101
// so we're resending it.
966
1103
resent_result = true;
968
// TODO: explain the following
970
if (result.report_deadline < result.sent_time) {
971
result.report_deadline = result.sent_time + 10;
973
if (result.report_deadline > result.sent_time + delay_bound) {
974
result.report_deadline = result.sent_time + delay_bound;
977
1105
if (config.debug_send) {
978
log_messages.printf(MSG_DEBUG,
979
"[RESULT#%d] [HOST#%d] (resend lost work)\n",
980
result.id, reply.host.id
1106
log_messages.printf(MSG_NORMAL,
1107
"[send] [RESULT#%d] [HOST#%d] (resend lost work)\n",
1108
result.id, g_reply->host.id
1138
1374
// Tell the user about applications they didn't qualify for
1140
1376
for (j=0; j<preferred_app_message_index; j++){
1141
reply.insert_message(reply.wreq.no_work_messages.at(j));
1377
g_reply->insert_message(g_wreq->no_work_messages.at(j));
1144
"You have selected to receive work from other applications if no work is available for the applications you selected",
1147
reply.insert_message(um1);
1148
USER_MESSAGE um2("Sending work from other applications", "high");
1149
reply.insert_message(um2);
1379
g_reply->insert_message(
1380
"Your preferences allow work from applications other than those selected",
1383
g_reply->insert_message(
1384
"Sending work from other applications", "low"
1153
1389
// if client asked for work and we're not sending any, explain why
1155
if (reply.wreq.nresults == 0) {
1156
reply.set_delay(DELAY_NO_WORK_TEMP);
1157
USER_MESSAGE um2("No work sent", "high");
1158
reply.insert_message(um2);
1159
// Inform the user about applications with no work
1160
for (i=0; i<reply.wreq.host_info.preferred_apps.size(); i++) {
1161
if (!reply.wreq.host_info.preferred_apps[i].work_available) {
1162
APP* app = ssp->lookup_app(reply.wreq.host_info.preferred_apps[i].appid);
1163
// don't write message if the app is deprecated
1165
char explanation[256];
1166
sprintf(explanation, "No work is available for %s",
1167
find_user_friendly_name(reply.wreq.host_info.preferred_apps[i].appid)
1391
if (g_wreq->njobs_sent == 0) {
1392
g_reply->set_delay(DELAY_NO_WORK_TEMP);
1393
g_reply->insert_message("No work sent", "low");
1395
// Tell the user about applications with no work
1397
for (i=0; i<g_wreq->preferred_apps.size(); i++) {
1398
if (!g_wreq->preferred_apps[i].work_available) {
1399
APP* app = ssp->lookup_app(g_wreq->preferred_apps[i].appid);
1400
// don't write message if the app is deprecated
1402
sprintf(buf, "No work is available for %s",
1403
find_user_friendly_name(
1404
g_wreq->preferred_apps[i].appid
1169
USER_MESSAGE um(explanation, "high");
1170
reply.insert_message(um);
1174
// Inform the user about applications they didn't qualify for
1175
for (i=0; i<reply.wreq.no_work_messages.size(); i++){
1176
reply.insert_message(reply.wreq.no_work_messages.at(i));
1178
if (reply.wreq.no_app_version) {
1179
reply.set_delay(DELAY_NO_WORK_PERM);
1181
if (reply.wreq.no_allowed_apps_available) {
1183
"No work available for the applications you have selected. Please check your settings on the web site.",
1407
g_reply->insert_message(buf, "low");
1412
// Tell the user about applications they didn't qualify for
1414
for (i=0; i<g_wreq->no_work_messages.size(); i++){
1415
g_reply->insert_message(g_wreq->no_work_messages.at(i));
1417
if (g_wreq->no_allowed_apps_available) {
1418
g_reply->insert_message(
1419
_("No work available for the applications you have selected. Please check your project preferences on the web site."),
1186
reply.insert_message(um);
1188
if (reply.wreq.speed.insufficient) {
1189
if (sreq.core_client_version>419) {
1191
"(won't finish in time) "
1192
"BOINC runs %.1f%% of time, computation enabled %.1f%% of that",
1193
100.0*reply.host.on_frac, 100.0*reply.host.active_frac
1423
if (g_wreq->speed.insufficient) {
1424
if (g_request->core_client_version>41900) {
1426
"Tasks won't finish in time: BOINC runs %.1f%% of the time; computation is enabled %.1f%% of that",
1427
100*g_reply->host.on_frac, 100*g_reply->host.active_frac
1197
"(won't finish in time) "
1198
"Computer available %.1f%% of time",
1199
100.0*reply.host.on_frac
1202
USER_MESSAGE um(helpful, "high");
1203
reply.insert_message(um);
1205
if (reply.wreq.hr_reject_temp) {
1207
"(there was work but it was committed to other platforms)",
1210
reply.insert_message(um);
1212
if (reply.wreq.hr_reject_perm) {
1214
"(your platform is not supported by this project)",
1217
reply.insert_message(um);
1219
if (reply.wreq.outdated_core) {
1221
" (your BOINC client is old - please install current version)",
1224
reply.insert_message(um);
1225
reply.set_delay(DELAY_NO_WORK_PERM);
1226
log_messages.printf(MSG_NORMAL,
1227
"Not sending work because client is outdated\n"
1230
if (reply.wreq.excessive_work_buf) {
1232
"(Your network connection interval is longer than WU deadline)",
1235
reply.insert_message(um);
1237
if (reply.wreq.no_jobs_available) {
1239
"(Project has no jobs available)",
1242
reply.insert_message(um);
1244
if (reply.wreq.daily_result_quota_exceeded) {
1245
struct tm *rpc_time_tm;
1248
sprintf(helpful, "(reached daily quota of %d results)", reply.wreq.daily_result_quota);
1249
USER_MESSAGE um(helpful, "high");
1250
reply.insert_message(um);
1251
log_messages.printf(MSG_NORMAL,
1252
"Daily result quota exceeded for host %d\n",
1256
// set delay so host won't return until a random time in
1257
// the first hour of the next day.
1258
// This is to prevent a lot of hosts from flooding the scheduler
1259
// with requests at the same time of day.
1261
time_t t = reply.host.rpc_time;
1262
rpc_time_tm = localtime(&t);
1263
delay_time = (23 - rpc_time_tm->tm_hour) * 3600
1264
+ (59 - rpc_time_tm->tm_min) * 60
1265
+ (60 - rpc_time_tm->tm_sec)
1266
+ (int)(3600*(double)rand()/(double)RAND_MAX);
1267
reply.set_delay(delay_time);
1269
if (reply.wreq.cache_size_exceeded) {
1270
sprintf(helpful, "(reached per-CPU limit of %d tasks)",
1271
config.max_wus_in_progress
1273
USER_MESSAGE um(helpful, "high");
1274
reply.insert_message(um);
1275
reply.set_delay(DELAY_NO_WORK_CACHE);
1276
log_messages.printf(MSG_NORMAL,
1277
"host %d already has %d result(s) in progress\n",
1278
reply.host.id, reply.wreq.nresults_on_host
1284
static void get_running_frac(SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply) {
1285
if (sreq.core_client_version<=419) {
1286
reply.wreq.running_frac = reply.host.on_frac;
1431
"Tasks won't finish in time: Computer available %.1f%% of the time",
1432
100*g_reply->host.on_frac
1435
g_reply->insert_message(buf, "low");
1437
if (g_wreq->hr_reject_temp) {
1438
g_reply->insert_message(
1439
"Tasks are committed to other platforms",
1443
if (g_wreq->hr_reject_perm) {
1444
g_reply->insert_message(
1445
_("Your computer type is not supported by this project"),
1449
if (g_wreq->outdated_client) {
1450
g_reply->insert_message(
1451
_("Newer BOINC version required; please install current version"),
1454
g_reply->set_delay(DELAY_NO_WORK_PERM);
1455
log_messages.printf(MSG_NORMAL,
1456
"Not sending work because newer client version required\n"
1459
if (g_wreq->no_cuda_prefs) {
1460
g_reply->insert_message(
1461
_("Tasks for NVIDIA GPU are available, but your preferences are set to not accept them"),
1465
if (g_wreq->no_ati_prefs) {
1466
g_reply->insert_message(
1467
_("Tasks for ATI GPU are available, but your preferences are set to not accept them"),
1471
if (g_wreq->no_cpu_prefs) {
1472
g_reply->insert_message(
1473
_("Tasks for CPU are available, but your preferences are set to not accept them"),
1477
DB_HOST_APP_VERSION* havp = quota_exceeded_version();
1479
sprintf(buf, "This computer has finished a daily quota of %d tasks)",
1480
havp->max_jobs_per_day
1482
g_reply->insert_message(buf, "low");
1483
if (config.debug_quota) {
1484
log_messages.printf(MSG_NORMAL,
1485
"[quota] Daily quota %d exceeded for app version %d\n",
1486
havp->max_jobs_per_day, havp->app_version_id
1489
g_reply->set_delay(DELAY_NO_WORK_CACHE);
1491
if (g_wreq->max_jobs_on_host_exceeded
1492
|| g_wreq->max_jobs_on_host_cpu_exceeded
1493
|| g_wreq->max_jobs_on_host_gpu_exceeded
1495
sprintf(buf, "This computer has reached a limit on tasks in progress");
1496
g_reply->insert_message(buf, "low");
1497
g_reply->set_delay(DELAY_NO_WORK_CACHE);
1502
static double clamp_req_sec(double x) {
1503
if (x < MIN_REQ_SECS) return MIN_REQ_SECS;
1504
if (x > MAX_REQ_SECS) return MAX_REQ_SECS;
1508
// prepare to send jobs, both resent and new;
1509
// decipher request type, fill in WORK_REQ
1511
void send_work_setup() {
1514
g_wreq->seconds_to_fill = clamp_req_sec(g_request->work_req_seconds);
1515
g_wreq->cpu_req_secs = clamp_req_sec(g_request->cpu_req_secs);
1516
g_wreq->cpu_req_instances = g_request->cpu_req_instances;
1517
g_wreq->anonymous_platform = is_anonymous(g_request->platforms.list[0]);
1519
if (g_wreq->anonymous_platform) {
1520
estimate_flops_anon_platform();
1522
cuda_requirements.clear();
1523
ati_requirements.clear();
1525
g_wreq->disk_available = max_allowable_disk();
1528
g_wreq->get_job_limits();
1530
if (g_request->coprocs.cuda.count) {
1531
g_wreq->cuda_req_secs = clamp_req_sec(g_request->coprocs.cuda.req_secs);
1532
g_wreq->cuda_req_instances = g_request->coprocs.cuda.req_instances;
1533
if (g_request->coprocs.cuda.estimated_delay < 0) {
1534
g_request->coprocs.cuda.estimated_delay = g_request->cpu_estimated_delay;
1537
if (g_request->coprocs.ati.count) {
1538
g_wreq->ati_req_secs = clamp_req_sec(g_request->coprocs.ati.req_secs);
1539
g_wreq->ati_req_instances = g_request->coprocs.ati.req_instances;
1540
if (g_request->coprocs.ati.estimated_delay < 0) {
1541
g_request->coprocs.ati.estimated_delay = g_request->cpu_estimated_delay;
1544
if (g_wreq->cpu_req_secs || g_wreq->cuda_req_secs || g_wreq->ati_req_secs) {
1545
g_wreq->rsc_spec_request = true;
1288
reply.wreq.running_frac = reply.host.active_frac * reply.host.on_frac;
1290
if (reply.wreq.running_frac < HOST_ACTIVE_FRAC_MIN) {
1291
reply.wreq.running_frac = HOST_ACTIVE_FRAC_MIN;
1293
if (reply.wreq.running_frac > 1) reply.wreq.running_frac = 1;
1296
static void send_work_old(SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply) {
1297
reply.wreq.beta_only = false;
1298
reply.wreq.user_apps_only = true;
1300
// give top priority to results that require a 'reliable host'
1302
if (reply.wreq.host_info.reliable) {
1303
reply.wreq.reliable_only = true;
1304
reply.wreq.infeasible_only = false;
1305
scan_work_array(sreq, reply);
1307
reply.wreq.reliable_only = false;
1309
// give 2nd priority to results for a beta app
1310
// (projects should load beta work with care,
1311
// otherwise your users won't get production work done!
1313
if (reply.wreq.host_info.allow_beta_work) {
1314
reply.wreq.beta_only = true;
1315
if (config.debug_send) {
1316
log_messages.printf(MSG_DEBUG,
1317
"[HOST#%d] will accept beta work. Scanning for beta work.\n",
1321
scan_work_array(sreq, reply);
1323
reply.wreq.beta_only = false;
1325
// give next priority to results that were infeasible for some other host
1327
reply.wreq.infeasible_only = true;
1328
scan_work_array(sreq, reply);
1330
reply.wreq.infeasible_only = false;
1331
scan_work_array(sreq, reply);
1333
// If user has selected apps but will accept any,
1334
// and we haven't found any jobs for selected apps, try others
1336
if (!reply.wreq.nresults && reply.wreq.host_info.allow_non_preferred_apps ) {
1337
reply.wreq.user_apps_only = false;
1338
preferred_app_message_index = reply.wreq.no_work_messages.size();
1339
if (config.debug_send) {
1340
log_messages.printf(MSG_DEBUG,
1341
"[HOST#%d] is looking for work from a non-preferred application\n",
1345
scan_work_array(sreq, reply);
1350
// decide whether to unreplicated jobs to this host
1352
void set_trust(SCHEDULER_REPLY& reply) {
1353
reply.wreq.trust = false;
1354
if (reply.host.error_rate > ER_MAX) {
1355
if (config.debug_send) {
1356
log_messages.printf(MSG_DEBUG,
1357
"set_trust: error rate %f > %f, don't trust\n",
1358
reply.host.error_rate, ER_MAX
1363
double x = sqrt(reply.host.error_rate/ER_MAX);
1364
if (drand() > x) reply.wreq.trust = true;
1365
if (config.debug_send) {
1366
log_messages.printf(MSG_DEBUG,
1367
"set_trust: random choice for error rate %f: %s\n",
1368
reply.host.error_rate, reply.wreq.trust?"yes":"no"
1373
void send_work(SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply) {
1374
if (sreq.work_req_seconds <= 0) return;
1376
reply.wreq.disk_available = max_allowable_disk(sreq, reply);
1378
if (hr_unknown_platform(sreq.host)) {
1379
reply.wreq.hr_reject_perm = true;
1383
get_host_info(reply); // parse project prefs for app details
1387
get_running_frac(sreq, reply);
1389
if (config.debug_send) {
1390
log_messages.printf(MSG_DEBUG,
1391
"%s matchmaker scheduling; %s EDF sim\n",
1547
g_wreq->rsc_spec_request = false;
1550
for (i=0; i<g_request->other_results.size(); i++) {
1551
OTHER_RESULT& r = g_request->other_results[i];
1553
bool uses_gpu = false;
1554
bool have_cav = false;
1555
if (r.app_version >= 0
1556
&& r.app_version < (int)g_request->client_app_versions.size()
1558
CLIENT_APP_VERSION& cav = g_request->client_app_versions[r.app_version];
1562
uses_gpu = cav.host_usage.uses_gpu();
1566
if (r.have_plan_class && app_plan_uses_gpu(r.plan_class)) {
1570
config.max_jobs_in_progress.register_job(app, uses_gpu);
1573
// print details of request to log
1575
if (config.debug_send) {
1576
log_messages.printf(MSG_NORMAL,
1577
"[send] %s matchmaker scheduling; %s EDF sim\n",
1392
1578
config.matchmaker?"Using":"Not using",
1393
1579
config.workload_sim?"Using":"Not using"
1395
log_messages.printf(MSG_DEBUG,
1396
"available disk %f GB, work_buf_min %d\n",
1397
reply.wreq.disk_available/GIGA,
1398
(int)sreq.global_prefs.work_buf_min()
1400
log_messages.printf(MSG_DEBUG,
1401
"running frac %f DCF %f est delay %d\n",
1402
reply.wreq.running_frac,
1403
reply.host.duration_correction_factor,
1404
(int)sreq.estimated_delay
1408
reply.wreq.seconds_to_fill = sreq.work_req_seconds;
1409
if (reply.wreq.seconds_to_fill > MAX_SECONDS_TO_SEND) {
1410
reply.wreq.seconds_to_fill = MAX_SECONDS_TO_SEND;
1412
if (reply.wreq.seconds_to_fill < MIN_SECONDS_TO_SEND) {
1413
reply.wreq.seconds_to_fill = MIN_SECONDS_TO_SEND;
1581
log_messages.printf(MSG_NORMAL,
1582
"[send] CPU: req %.2f sec, %.2f instances; est delay %.2f\n",
1583
g_wreq->cpu_req_secs, g_wreq->cpu_req_instances,
1584
g_request->cpu_estimated_delay
1586
if (g_request->coprocs.cuda.count) {
1587
log_messages.printf(MSG_NORMAL,
1588
"[send] CUDA: req %.2f sec, %.2f instances; est delay %.2f\n",
1589
g_wreq->cuda_req_secs, g_wreq->cuda_req_instances,
1590
g_request->coprocs.cuda.estimated_delay
1593
if (g_request->coprocs.ati.count) {
1594
log_messages.printf(MSG_NORMAL,
1595
"[send] ATI: req %.2f sec, %.2f instances; est delay %.2f\n",
1596
g_wreq->ati_req_secs, g_wreq->ati_req_instances,
1597
g_request->coprocs.ati.estimated_delay
1600
log_messages.printf(MSG_NORMAL,
1601
"[send] work_req_seconds: %.2f secs\n",
1602
g_wreq->seconds_to_fill
1604
log_messages.printf(MSG_NORMAL,
1605
"[send] available disk %.2f GB, work_buf_min %d\n",
1606
g_wreq->disk_available/GIGA,
1607
(int)g_request->global_prefs.work_buf_min()
1609
log_messages.printf(MSG_NORMAL,
1610
"[send] active_frac %f on_frac %f\n",
1611
g_reply->host.active_frac,
1612
g_reply->host.on_frac
1614
if (g_wreq->anonymous_platform) {
1615
log_messages.printf(MSG_NORMAL,
1616
"Anonymous platform app versions:\n"
1618
for (i=0; i<g_request->client_app_versions.size(); i++) {
1619
CLIENT_APP_VERSION& cav = g_request->client_app_versions[i];
1620
log_messages.printf(MSG_NORMAL,
1621
" app: %s version %d cpus %.2f cudas %.2f atis %.2f flops %fG\n",
1624
cav.host_usage.avg_ncpus,
1625
cav.host_usage.ncudas,
1626
cav.host_usage.natis,
1627
cav.host_usage.projected_flops/1e9
1634
// If a record is not in DB, create it.
1636
int update_host_app_versions(vector<RESULT>& results, int hostid) {
1637
vector<DB_HOST_APP_VERSION> new_havs;
1641
for (i=0; i<results.size(); i++) {
1642
RESULT& r = results[i];
1643
int gavid = generalized_app_version_id(r.app_version_id, r.appid);
1644
DB_HOST_APP_VERSION* havp = gavid_to_havp(gavid);
1647
for (j=0; j<new_havs.size(); j++) {
1648
DB_HOST_APP_VERSION& hav = new_havs[j];
1649
if (hav.app_version_id == gavid) {
1654
DB_HOST_APP_VERSION hav;
1656
hav.host_id = hostid;
1657
hav.app_version_id = gavid;
1658
new_havs.push_back(hav);
1663
// create new records
1665
for (i=0; i<new_havs.size(); i++) {
1666
DB_HOST_APP_VERSION& hav = new_havs[i];
1668
retval = hav.insert();
1670
log_messages.printf(MSG_CRITICAL,
1671
"hav.insert(): %d\n", retval
1674
if (config.debug_credit) {
1675
log_messages.printf(MSG_NORMAL,
1676
"[credit] created host_app_version record (%d, %d)\n",
1677
hav.host_id, hav.app_version_id
1688
if (!work_needed(false)) {
1689
send_user_messages();
1692
g_wreq->no_jobs_available = true;
1694
if (!g_wreq->rsc_spec_request && g_wreq->seconds_to_fill == 0) {
1698
if (all_apps_use_hr && hr_unknown_platform(g_request->host)) {
1699
log_messages.printf(MSG_NORMAL,
1700
"Not sending work because unknown HR class\n"
1702
g_wreq->hr_reject_perm = true;
1706
// decide on attributes of HOST_APP_VERSIONS
1708
get_reliability_and_trust();
1416
1712
if (config.enable_assignment) {
1417
if (send_assigned_jobs(sreq, reply)) {
1713
if (send_assigned_jobs()) {
1418
1714
if (config.debug_assignment) {
1419
log_messages.printf(MSG_DEBUG,
1420
"[HOST#%d] sent assigned jobs\n", reply.host.id
1715
log_messages.printf(MSG_NORMAL,
1716
"[assign] [HOST#%d] sent assigned jobs\n", g_reply->host.id
1427
if (config.workload_sim && sreq.have_other_results_list) {
1723
if (config.workload_sim && g_request->have_other_results_list) {
1428
1724
init_ip_results(
1429
sreq.global_prefs.work_buf_min(), effective_ncpus(sreq, reply), sreq.ip_results
1725
g_request->global_prefs.work_buf_min(),
1726
g_wreq->effective_ncpus, g_request->ip_results
1433
if (config.locality_scheduling) {
1434
reply.wreq.infeasible_only = false;
1435
send_work_locality(sreq, reply);
1730
if (config.locality_scheduler_fraction > 0) {
1731
if (drand() < config.locality_scheduler_fraction) {
1732
if (config.debug_locality) {
1733
log_messages.printf(MSG_NORMAL,
1734
"[mixed] sending locality work first\n"
1737
send_work_locality();
1738
if (config.debug_locality) {
1739
log_messages.printf(MSG_NORMAL,
1740
"[mixed] sending non-locality work second\n"
1745
if (config.debug_locality) {
1746
log_messages.printf(MSG_NORMAL,
1747
"[mixed] sending non-locality work first\n"
1751
if (config.debug_locality) {
1752
log_messages.printf(MSG_NORMAL,
1753
"[mixed] sending locality work second\n"
1756
send_work_locality();
1758
} else if (config.locality_scheduling) {
1759
send_work_locality();
1436
1760
} else if (config.matchmaker) {
1437
send_work_matchmaker(sreq, reply);
1439
send_work_old(sreq, reply);
1442
explain_to_user(sreq, reply);
1445
// Matchmaker scheduling code follows
1453
BEST_APP_VERSION* bavp;
1455
bool get_score(SCHEDULER_REQUEST&, SCHEDULER_REPLY&);
1464
std::list<JOB> jobs; // sorted high to low
1466
JOB_SET(SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply) {
1467
work_req = sreq.work_req_seconds;
1470
disk_limit = reply.wreq.disk_available;
1471
max_jobs = config.max_wus_to_send;
1472
int ncpus = effective_ncpus(sreq, reply), n;
1474
if (config.daily_result_quota) {
1475
if (reply.host.max_results_day == 0 || reply.host.max_results_day>config.daily_result_quota) {
1476
reply.host.max_results_day = config.daily_result_quota;
1478
reply.wreq.daily_result_quota = ncpus*reply.host.max_results_day;
1479
n = reply.wreq.daily_result_quota - reply.host.nresults_today;
1481
if (n < max_jobs) max_jobs = n;
1484
if (config.max_wus_in_progress) {
1485
n = config.max_wus_in_progress*ncpus - reply.wreq.nresults_on_host;
1487
if (n < max_jobs) max_jobs = n;
1491
double higher_score_disk_usage(double);
1492
double lowest_score();
1493
inline bool request_satisfied() {
1494
return est_time >= work_req;
1496
void send(SCHEDULER_REQUEST&, SCHEDULER_REPLY&);
1499
// reread result from DB, make sure it's still unsent
1500
// TODO: from here to add_result_to_reply()
1501
// (which updates the DB record) should be a transaction
1503
int read_sendable_result(DB_RESULT& result) {
1504
int retval = result.lookup_id(result.id);
1506
log_messages.printf(MSG_CRITICAL,
1507
"[RESULT#%d] result.lookup_id() failed %d\n",
1510
return ERR_NOT_FOUND;
1512
if (result.server_state != RESULT_SERVER_STATE_UNSENT) {
1513
log_messages.printf(MSG_NORMAL,
1514
"[RESULT#%d] expected to be unsent; instead, state is %d\n",
1515
result.id, result.server_state
1517
return ERR_BAD_RESULT_STATE;
1522
// compute a "score" for sending this job to this host.
1523
// Return false if the WU is infeasible.
1524
// Otherwise set est_time and disk_usage.
1526
bool JOB::get_score(SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply) {
1530
WU_RESULT& wu_result = ssp->wu_results[index];
1531
wu = wu_result.workunit;
1532
app = ssp->lookup_app(wu.appid);
1536
// Find the app_version for the client's platform.
1538
bavp = get_app_version(sreq, reply, wu);
1539
if (!bavp) return false;
1541
retval = wu_is_infeasible_fast(wu, sreq, reply, *app);
1543
if (config.debug_send) {
1544
log_messages.printf(MSG_DEBUG,
1545
"[HOST#%d] [WU#%d %s] WU is infeasible: %s\n",
1546
reply.host.id, wu.id, wu.name, infeasible_string(retval)
1554
// check if user has selected apps,
1555
// and send beta work to beta users
1557
if (app->beta && !config.distinct_beta_apps) {
1558
if (reply.wreq.host_info.allow_beta_work) {
1564
if (app_not_selected(wu, sreq, reply)) {
1565
if (!reply.wreq.host_info.allow_non_preferred_apps) {
1568
// Allow work to be sent, but it will not get a bump in its score
1575
// if job needs to get done fast, send to fast/reliable host
1577
if (reply.wreq.host_info.reliable && (wu_result.need_reliable)) {
1581
// if job already committed to an HR class,
1582
// try to send to host in that class
1584
if (wu_result.infeasible_count) {
1588
// Favor jobs that will run fast
1590
score += bavp->host_usage.flops/1e9;
1592
// match large jobs to fast hosts
1594
if (config.job_size_matching) {
1595
double host_stdev = (reply.host.p_fpops - ssp->perf_info.host_fpops_mean)/ ssp->perf_info.host_fpops_stdev;
1596
double diff = host_stdev - wu_result.fpops_size;
1600
// TODO: If user has selected some apps but will accept jobs from others,
1601
// try to send them jobs from the selected apps
1604
est_time = estimate_wallclock_duration(wu, sreq, reply);
1605
disk_usage = wu.rsc_disk_bound;
1609
bool wu_is_infeasible_slow(
1610
WU_RESULT& wu_result, SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply
1617
// Don't send if we've already sent a result of this WU to this user.
1619
if (config.one_result_per_user_per_wu) {
1621
"where workunitid=%d and userid=%d",
1622
wu_result.workunit.id, reply.user.id
1624
retval = result.count(n, buf);
1626
log_messages.printf(MSG_CRITICAL,
1627
"send_work: can't get result count (%d)\n", retval
1632
if (config.debug_send) {
1633
log_messages.printf(MSG_DEBUG,
1634
"send_work: user %d already has %d result(s) for WU %d\n",
1635
reply.user.id, n, wu_result.workunit.id
1641
} else if (config.one_result_per_host_per_wu) {
1642
// Don't send if we've already sent a result
1643
// of this WU to this host.
1644
// We only have to check this
1645
// if we don't send one result per user.
1648
"where workunitid=%d and hostid=%d",
1649
wu_result.workunit.id, reply.host.id
1651
retval = result.count(n, buf);
1653
log_messages.printf(MSG_CRITICAL,
1654
"send_work: can't get result count (%d)\n", retval
1659
if (config.debug_send) {
1660
log_messages.printf(MSG_DEBUG,
1661
"send_work: host %d already has %d result(s) for WU %d\n",
1662
reply.host.id, n, wu_result.workunit.id
1670
APP* app = ssp->lookup_app(wu_result.workunit.appid);
1671
WORKUNIT wu = wu_result.workunit;
1672
if (app_hr_type(*app)) {
1673
if (already_sent_to_different_platform_careful(
1674
sreq, reply.wreq, wu, *app
1676
if (config.debug_send) {
1677
log_messages.printf(MSG_DEBUG,
1678
"[HOST#%d] [WU#%d %s] WU is infeasible (assigned to different platform)\n",
1679
reply.host.id, wu.id, wu.name
1682
// Mark the workunit as infeasible.
1683
// This ensures that jobs already assigned to a platform
1684
// are processed first.
1686
wu_result.infeasible_count++;
1693
double JOB_SET::lowest_score() {
1694
if (jobs.empty()) return 0;
1695
return jobs.back().score;
1698
// add the given job, and remove lowest-score jobs that
1699
// - are in excess of work request
1700
// - are in excess of per-request or per-day limits
1701
// - cause the disk limit to be exceeded
1703
void JOB_SET::add_job(JOB& job) {
1704
while (!jobs.empty()) {
1705
JOB& worst_job = jobs.back();
1706
if (est_time + job.est_time - worst_job.est_time > work_req) {
1707
est_time -= worst_job.est_time;
1708
disk_usage -= worst_job.disk_usage;
1710
ssp->wu_results[worst_job.index].state = WR_STATE_PRESENT;
1715
while (!jobs.empty()) {
1716
JOB& worst_job = jobs.back();
1717
if (disk_usage + job.disk_usage > disk_limit) {
1718
est_time -= worst_job.est_time;
1719
disk_usage -= worst_job.disk_usage;
1721
ssp->wu_results[worst_job.index].state = WR_STATE_PRESENT;
1727
if (jobs.size() == max_jobs) {
1728
JOB& worst_job = jobs.back();
1730
ssp->wu_results[worst_job.index].state = WR_STATE_PRESENT;
1733
list<JOB>::iterator i = jobs.begin();
1734
while (i != jobs.end()) {
1735
if (i->score < job.score) {
1736
jobs.insert(i, job);
1741
if (i == jobs.end()) {
1742
jobs.push_back(job);
1744
est_time += job.est_time;
1745
disk_usage += job.disk_usage;
1746
if (config.debug_send) {
1747
log_messages.printf(MSG_DEBUG,
1748
"added job to set. est_time %f disk_usage %f\n",
1749
est_time, disk_usage
1754
// return the disk usage of jobs above the given score
1756
double JOB_SET::higher_score_disk_usage(double v) {
1758
list<JOB>::iterator i = jobs.begin();
1759
while (i != jobs.end()) {
1760
if (i->score < v) break;
1761
sum += i->disk_usage;
1767
void JOB_SET::send(SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply) {
1772
list<JOB>::iterator i = jobs.begin();
1773
while (i != jobs.end()) {
1775
WU_RESULT wu_result = ssp->wu_results[job.index];
1776
ssp->wu_results[job.index].state = WR_STATE_EMPTY;
1777
wu = wu_result.workunit;
1778
result.id = wu_result.resultid;
1779
retval = read_sendable_result(result);
1781
add_result_to_reply(result, wu, sreq, reply, job.bavp);
1786
void send_work_matchmaker(SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply) {
1787
int i, slots_locked=0, slots_nonempty=0;
1788
JOB_SET jobs (sreq, reply);
1789
int min_slots = config.mm_min_slots;
1790
if (!min_slots) min_slots = ssp->max_wu_results/2;
1791
int max_slots = config.mm_max_slots;
1792
if (!max_slots) max_slots = ssp->max_wu_results;
1793
int max_locked = 10;
1796
i = rand() % ssp->max_wu_results;
1798
// scan through the job cache, maintaining a JOB_SET of jobs
1799
// that we can send to this client, ordered by score.
1801
for (int slots_scanned=0; slots_scanned<max_slots; slots_scanned++) {
1802
i = (i+1) % ssp->max_wu_results;
1803
WU_RESULT& wu_result = ssp->wu_results[i];
1804
switch (wu_result.state) {
1805
case WR_STATE_EMPTY:
1807
case WR_STATE_PRESENT:
1812
if (wu_result.state == g_pid) break;
1820
// get score for this job, and skip it if it fails quick check.
1821
// NOTE: the EDF check done in get_score()
1822
// includes only in-progress jobs.
1824
if (!job.get_score(sreq, reply)) {
1827
if (config.debug_send) {
1828
log_messages.printf(MSG_DEBUG,
1829
"score for %s: %f\n", wu_result.workunit.name, job.score
1833
if (job.score > jobs.lowest_score() || !jobs.request_satisfied()) {
1834
ssp->wu_results[i].state = g_pid;
1836
if (wu_is_infeasible_slow(wu_result, sreq, reply)) {
1837
// if we can't use this job, put it back in pool
1840
ssp->wu_results[i].state = WR_STATE_PRESENT;
1847
if (jobs.request_satisfied() && slots_scanned>=min_slots) break;
1850
if (!slots_nonempty) {
1851
log_messages.printf(MSG_CRITICAL,
1852
"Job cache is empty - check feeder\n"
1854
reply.wreq.no_jobs_available = true;
1857
// TODO: trim jobs from tail of list until we pass the EDF check
1859
jobs.send(sreq, reply);
1861
if (slots_locked > max_locked) {
1862
log_messages.printf(MSG_CRITICAL,
1863
"Found too many locked slots (%d>%d) - increase array size",
1864
slots_locked, max_locked
1869
const char *BOINC_RCSID_32dcd335e7 = "$Id: sched_send.cpp 16611 2008-12-03 20:55:22Z romw $";
1761
send_work_matchmaker();
1767
retval = update_host_app_versions(g_reply->results, g_reply->host.id);
1769
log_messages.printf(MSG_CRITICAL,
1770
"update_host_app_versions() failed: %d\n", retval
1773
send_user_messages();
1776
const char *BOINC_RCSID_32dcd335e7 = "$Id: sched_send.cpp 22651 2010-11-08 17:57:13Z romw $";