83
// Find an app and app_version for the client's platform(s).
86
WORKUNIT& wu, APP* &app, APP_VERSION* &avp,
87
SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply, PLATFORM_LIST& platforms,
113
// return BEST_APP_VERSION for the given host, or NULL if none
116
BEST_APP_VERSION* get_app_version(
117
SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply, WORKUNIT& wu
91
if (anonymous(platforms.list[0])) {
92
app = ss.lookup_app(wu.appid);
123
BEST_APP_VERSION* bavp;
126
// see if app is already in memoized array
128
for (i=0; i<reply.wreq.best_app_versions.size(); i++) {
129
bavp = reply.wreq.best_app_versions[i];
130
if (bavp->appid == wu.appid) {
131
if (!bavp->avp) return NULL;
136
APP* app = ssp->lookup_app(wu.appid);
138
log_messages.printf(MSG_CRITICAL, "WU refers to nonexistent app: %d\n", wu.appid);
142
bavp = new BEST_APP_VERSION;
143
bavp->appid = wu.appid;
144
if (anonymous(sreq.platforms.list[0])) {
93
145
found = sreq.has_version(*app);
95
log_messages.printf(SCHED_MSG_LOG::MSG_DEBUG,
96
"Didn't find anonymous app\n"
147
if (config.debug_send) {
148
log_messages.printf(MSG_DEBUG, "Didn't find anonymous app for %s\n", app->name);
152
bavp->avp = (APP_VERSION*)1; // arbitrary nonzero value
154
reply.wreq.best_app_versions.push_back(bavp);
159
// go through the client's platforms.
160
// Scan the app versions for each platform.
161
// Find the one with highest expected FLOPS
163
bavp->host_usage.flops = 0;
165
for (i=0; i<sreq.platforms.list.size(); i++) {
166
PLATFORM* p = sreq.platforms.list[i];
167
for (j=0; j<ssp->napp_versions; j++) {
168
HOST_USAGE host_usage;
169
APP_VERSION& av = ssp->app_versions[j];
170
if (av.appid != wu.appid) continue;
171
if (av.platformid != p->id) continue;
172
if (reply.wreq.core_client_version < av.min_core_version) {
173
reply.wreq.outdated_core = true;
176
if (strlen(av.plan_class) && sreq.client_cap_plan_class) {
177
if (!app_plan(sreq, av.plan_class, host_usage)) {
181
host_usage.init_seq(reply.host.p_fpops);
183
if (host_usage.flops > bavp->host_usage.flops) {
184
bavp->host_usage = host_usage;
189
reply.wreq.best_app_versions.push_back(bavp);
191
if (config.debug_version_select) {
192
log_messages.printf(MSG_DEBUG,
193
"Best version of app %s is %d (%.2f GFLOPS)\n",
194
app->name, bavp->avp->id, bavp->host_usage.flops/1e9
98
return ERR_NO_APP_VERSION;
102
found = find_app_version(reply.wreq, wu, platforms, ss, app, avp);
104
log_messages.printf(SCHED_MSG_LOG::MSG_DEBUG, "Didn't find app version\n");
105
return ERR_NO_APP_VERSION;
108
// see if the core client is too old.
198
// here if no app version exists
110
if (!app_core_compatible(reply.wreq, *avp)) {
111
log_messages.printf(SCHED_MSG_LOG::MSG_DEBUG, "Didn't find app version: core client too old\n");
112
return ERR_NO_APP_VERSION;
200
if (config.debug_version_select) {
201
log_messages.printf(MSG_DEBUG,
202
"no app version available: APP#%d PLATFORM#%d min_version %d\n",
203
app->id, sreq.platforms.list[0]->id, app->min_version
208
"%s is not available for your type of computer.",
209
app->user_friendly_name
211
USER_MESSAGE um(message, "high");
212
reply.wreq.insert_no_work_message(um);
213
reply.wreq.no_app_version = true;
219
static char* find_user_friendly_name(int appid) {
220
APP* app = ssp->lookup_app(appid);
221
if (app) return app->user_friendly_name;
222
return "deprecated application";
118
226
// Compute the max additional disk usage we can impose on the host.
119
227
// Depending on the client version, it can either send us
222
330
static double estimate_wallclock_duration(
223
331
WORKUNIT& wu, SCHEDULER_REQUEST& request, SCHEDULER_REPLY& reply
226
if (reply.wreq.core_client_version<=419) {
227
running_frac = reply.host.on_frac;
229
running_frac = reply.host.active_frac * reply.host.on_frac;
231
if (running_frac < HOST_ACTIVE_FRAC_MIN) {
232
running_frac = HOST_ACTIVE_FRAC_MIN;
234
if (running_frac > 1) running_frac = 1;
235
333
double ecd = estimate_cpu_duration(wu, reply);
236
double ewd = ecd/running_frac;
334
double ewd = ecd/reply.wreq.running_frac;
237
335
if (reply.host.duration_correction_factor) {
238
336
ewd *= reply.host.duration_correction_factor;
240
338
if (reply.host.cpu_efficiency) {
241
339
ewd /= reply.host.cpu_efficiency;
244
SCHED_MSG_LOG::MSG_DEBUG,
245
"est cpu dur %f; running_frac %f; rsf %f; est %f\n",
246
ecd, running_frac, request.resource_share_fraction, ewd
341
if (config.debug_send) {
342
log_messages.printf(MSG_DEBUG,
343
"est cpu dur %f; est wall dur %f\n",
344
ecd, reply.wreq.running_frac, ewd
251
// Find or compute various info about the host.
252
// These parameters affect how work is sent to the host
350
// Find or compute various info about the host;
351
// this info affects which jobs are sent to the host.
254
353
static int get_host_info(SCHEDULER_REPLY& reply) {
257
extract_venue(reply.user.project_prefs, reply.host.venue, buf);
259
unsigned int pos = 0;
356
unsigned int pos = 0;
360
extract_venue(reply.user.project_prefs, reply.host.venue, buf);
262
363
// scan user's project prefs for elements of the form <app_id>N</app_id>,
263
364
// indicating the apps they want to run.
265
366
reply.wreq.host_info.preferred_apps.clear();
266
while (parse_int(str.substr(pos,str.length()-pos).c_str(), "<app_id>", temp_int)) {
367
while (parse_int(str.substr(pos,str.length()-pos).c_str(), "<app_id>", temp_int)) {
268
369
ai.appid = temp_int;
370
ai.work_available = false;
269
371
reply.wreq.host_info.preferred_apps.push_back(ai);
271
pos = str.find("<app_id>", pos) + 1;
373
pos = str.find("<app_id>", pos) + 1;
375
if (parse_bool(buf,"allow_non_preferred_apps", flag)) {
376
reply.wreq.host_info.allow_non_preferred_apps = flag;
378
if (parse_bool(buf,"allow_beta_work", flag)) {
379
reply.wreq.host_info.allow_beta_work = flag;
273
temp_int = parse_int(buf,"<allow_beta_work>", temp_int);
274
reply.wreq.host_info.allow_beta_work = temp_int;
276
// Decide whether or not this computer is a 'reliable' computer
382
// Decide whether or not this computer is 'reliable'
383
// A computer is reliable if the following conditions are true
384
// (for those that are set in the config file)
385
// 1) The host average turnaround is less than the config
386
// max average turnaround
387
// 2) The host error rate is less then the config max error rate
388
// 3) The host results per day is equal to the config file value
278
390
double expavg_credit = reply.host.expavg_credit;
279
391
double expavg_time = reply.host.expavg_time;
280
392
double avg_turnaround = reply.host.avg_turnaround;
281
393
update_average(0, 0, CREDIT_HALF_LIFE, expavg_credit, expavg_time);
282
double credit_scale, turnaround_scale;
283
if (strstr(reply.host.os_name,"Windows") || strstr(reply.host.os_name,"Linux")
286
turnaround_scale = 1;
395
// Platforms other then Windows, Linux and Intel Macs need a
396
// larger set of computers to be marked reliable
398
double multiplier = 1.0;
399
if (strstr(reply.host.os_name,"Windows")
400
|| strstr(reply.host.os_name,"Linux")
401
|| (strstr(reply.host.os_name,"Darwin")
402
&& !(strstr(reply.host.p_vendor,"Power Macintosh"))
289
turnaround_scale = 1.25;
292
if (((expavg_credit/reply.host.p_ncpus) > config.reliable_min_avg_credit*credit_scale || config.reliable_min_avg_credit == 0)
293
&& (avg_turnaround < config.reliable_max_avg_turnaround*turnaround_scale || config.reliable_max_avg_turnaround == 0)
409
if ((config.reliable_max_avg_turnaround == 0 || reply.host.avg_turnaround < config.reliable_max_avg_turnaround*multiplier)
410
&& (config.reliable_max_error_rate == 0 || reply.host.error_rate < config.reliable_max_error_rate*multiplier)
411
&& (config.daily_result_quota == 0 || reply.host.max_results_day >= config.daily_result_quota)
295
413
reply.wreq.host_info.reliable = true;
296
log_messages.printf(SCHED_MSG_LOG::MSG_DEBUG,
297
"[HOST#%d] is reliable (OS = %s) expavg_credit = %.0f avg_turnaround(hours) = %.0f \n",
298
reply.host.id, reply.host.os_name, expavg_credit,
415
if (config.debug_send) {
416
log_messages.printf(MSG_DEBUG,
417
"[HOST#%d] is%s reliable (OS = %s) error_rate = %.3f avg_turn_hrs = %.0f \n",
419
reply.wreq.host_info.reliable?"":" not",
420
reply.host.os_name, reply.host.error_rate,
421
reply.host.avg_turnaround/3600
305
// Check to see if the user has set application preferences.
306
// If they have, then only send work for the allowed applications
427
// Return true if the user has set application preferences,
428
// and this job is not for a selected app
308
static inline int check_app_filter(
309
WORKUNIT& wu, SCHEDULER_REQUEST& request, SCHEDULER_REPLY& reply
430
bool app_not_selected(
431
WORKUNIT& wu, SCHEDULER_REQUEST& , SCHEDULER_REPLY& reply
313
if (reply.wreq.host_info.preferred_apps.size() == 0) return 0;
314
bool app_allowed = false;
435
if (reply.wreq.host_info.preferred_apps.size() == 0) return false;
315
436
for (i=0; i<reply.wreq.host_info.preferred_apps.size(); i++) {
316
if (wu.appid==reply.wreq.host_info.preferred_apps[i].appid) {
437
if (wu.appid == reply.wreq.host_info.preferred_apps[i].appid) {
438
reply.wreq.host_info.preferred_apps[i].work_available = true;
321
if (!app_allowed && !reply.wreq.beta_only) {
322
reply.wreq.no_allowed_apps_available = true;
323
log_messages.printf(SCHED_MSG_LOG::MSG_DEBUG,
324
"[USER#%d] [WU#%d] user doesn't want work for this application\n",
327
return INFEASIBLE_APP_SETTING;
332
static inline int check_memory(
333
WORKUNIT& wu, SCHEDULER_REQUEST& request, SCHEDULER_REPLY& reply
445
// see how much RAM we can use on this machine
446
// TODO: compute this once, not once per job
448
static inline void get_mem_sizes(
449
SCHEDULER_REQUEST& request, SCHEDULER_REPLY& reply,
450
double& ram, double& usable_ram
335
// see how much RAM we can use on this machine
337
double ram = reply.host.m_nbytes;
452
ram = reply.host.m_nbytes;
338
453
if (ram <= 0) ram = DEFAULT_RAM_SIZE;
339
double usable_ram = ram;
340
455
double busy_frac = request.global_prefs.ram_max_used_busy_frac;
341
456
double idle_frac = request.global_prefs.ram_max_used_idle_frac;
885
1006
IP_RESULT ipr ("", time(0)+wu.delay_bound, est_cpu);
886
1007
request.ip_results.push_back(ipr);
1010
// mark job as done if debugging flag is set
1012
if (mark_jobs_done) {
1016
"server_state=%d outcome=%d",
1017
RESULT_SERVER_STATE_OVER, RESULT_OUTCOME_SUCCESS
1019
result.update_field(buf);
1022
sprintf(buf, "transition_time=%d", time(0));
1023
dbwu.update_field(buf);
892
SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply, PLATFORM_LIST& platforms,
895
reply.wreq.disk_available = max_allowable_disk(sreq, reply);
896
reply.wreq.insufficient_disk = false;
897
reply.wreq.insufficient_mem = false;
898
reply.wreq.insufficient_speed = false;
899
reply.wreq.excessive_work_buf = false;
900
reply.wreq.no_app_version = false;
901
reply.wreq.hr_reject_temp = false;
902
reply.wreq.hr_reject_perm = false;
903
reply.wreq.daily_result_quota_exceeded = false;
904
reply.wreq.core_client_version = sreq.core_client_major_version*100
905
+ sreq.core_client_minor_version;
906
reply.wreq.nresults = 0;
907
get_host_info(reply); // parse project prefs for app details
908
reply.wreq.beta_only = false;
911
SCHED_MSG_LOG::MSG_NORMAL,
912
"[HOST#%d] got request for %f seconds of work; available disk %f GB\n",
913
reply.host.id, sreq.work_req_seconds, reply.wreq.disk_available/1e9
916
if (sreq.work_req_seconds <= 0) return 0;
918
reply.wreq.seconds_to_fill = sreq.work_req_seconds;
919
if (reply.wreq.seconds_to_fill > MAX_SECONDS_TO_SEND) {
920
reply.wreq.seconds_to_fill = MAX_SECONDS_TO_SEND;
922
if (reply.wreq.seconds_to_fill < MIN_SECONDS_TO_SEND) {
923
reply.wreq.seconds_to_fill = MIN_SECONDS_TO_SEND;
926
if (config.workload_sim && sreq.have_other_results_list) {
928
sreq.global_prefs.work_buf_min(), reply.host.p_ncpus, sreq.ip_results
932
if (config.locality_scheduling) {
933
reply.wreq.infeasible_only = false;
934
send_work_locality(sreq, reply, platforms, ss);
936
// give top priority to results that require a 'reliable host'
938
if (reply.wreq.host_info.reliable) {
939
reply.wreq.reliable_only = true;
940
reply.wreq.infeasible_only = false;
941
scan_work_array(sreq, reply, platforms, ss);
943
reply.wreq.reliable_only = false;
945
// give 2nd priority to results for a beta app
946
// (projects should load beta work with care,
947
// otherwise your users won't get production work done!
949
if (reply.wreq.host_info.allow_beta_work) {
950
reply.wreq.beta_only = true;
952
SCHED_MSG_LOG::MSG_DEBUG,
953
"[HOST#%d] will accept beta work. Scanning for beta work.\n",
956
scan_work_array(sreq, reply, platforms, ss);
958
reply.wreq.beta_only = false;
960
// give next priority to results that were infeasible for some other host
962
reply.wreq.infeasible_only = true;
963
scan_work_array(sreq, reply, platforms, ss);
965
reply.wreq.infeasible_only = false;
966
scan_work_array(sreq, reply, platforms, ss);
970
SCHED_MSG_LOG::MSG_NORMAL,
971
"[HOST#%d] Sent %d results [scheduler ran %f seconds]\n",
972
reply.host.id, reply.wreq.nresults, elapsed_wallclock_time()
1029
// send messages to user about why jobs were or weren't sent
1031
static void explain_to_user(SCHEDULER_REPLY& reply) {
1035
// If work was sent from apps // the user did not select, explain
1037
if (reply.wreq.nresults && !reply.wreq.user_apps_only) {
1038
USER_MESSAGE um("No work can be sent for the applications you have selected", "high");
1039
reply.insert_message(um);
1041
// Inform the user about applications with no work
1042
for (i=0; i<reply.wreq.host_info.preferred_apps.size(); i++) {
1043
if (!reply.wreq.host_info.preferred_apps[i].work_available) {
1044
APP* app = ssp->lookup_app(reply.wreq.host_info.preferred_apps[i].appid);
1045
// don't write message if the app is deprecated
1047
char explanation[256];
1048
sprintf(explanation,
1049
"No work is available for %s",
1050
find_user_friendly_name(reply.wreq.host_info.preferred_apps[i].appid)
1052
USER_MESSAGE um(explanation, "high");
1053
reply.insert_message(um);
1058
// Tell the user about applications they didn't qualify for
1060
for(i=0;i<preferred_app_message_index;i++){
1061
reply.insert_message(reply.wreq.no_work_messages.at(i));
1063
USER_MESSAGE um1("You have selected to receive work from other applications if no work is available for the applications you selected", "high");
1064
reply.insert_message(um1);
1065
USER_MESSAGE um2("Sending work from other applications", "high");
1066
reply.insert_message(um2);
1069
// if client asked for work and we're not sending any, explain why
975
1071
if (reply.wreq.nresults == 0) {
976
1072
reply.set_delay(DELAY_NO_WORK_TEMP);
977
1073
USER_MESSAGE um2("No work sent", "high");
978
1074
reply.insert_message(um2);
1075
// Inform the user about applications with no work
1076
for(i=0; i<reply.wreq.host_info.preferred_apps.size(); i++) {
1077
if (!reply.wreq.host_info.preferred_apps[i].work_available) {
1078
APP* app = ssp->lookup_app(reply.wreq.host_info.preferred_apps[i].appid);
1079
// don't write message if the app is deprecated
1080
if ( app != NULL ) {
1081
char explanation[256];
1082
sprintf(explanation,"No work is available for %s",find_user_friendly_name(reply.wreq.host_info.preferred_apps[i].appid));
1083
USER_MESSAGE um(explanation, "high");
1084
reply.insert_message(um);
1088
// Inform the user about applications they didn't qualify for
1089
for(int i=0;i<reply.wreq.no_work_messages.size();i++){
1090
reply.insert_message(reply.wreq.no_work_messages.at(i));
979
1092
if (reply.wreq.no_app_version) {
980
USER_MESSAGE um("(there was work for other platforms)", "high");
981
reply.insert_message(um);
982
1093
reply.set_delay(DELAY_NO_WORK_PERM);
984
1095
if (reply.wreq.no_allowed_apps_available) {
985
1096
USER_MESSAGE um(
986
"(There was work but not for the applications you have allowed. Please check your settings on the website.)",
989
reply.insert_message(um);
991
if (reply.wreq.insufficient_disk) {
993
"(there was work but you don't have enough disk space allocated)",
996
reply.insert_message(um);
998
if (reply.wreq.insufficient_mem) {
1000
"(there was work but your computer doesn't have enough memory)",
1003
reply.insert_message(um);
1005
if (reply.wreq.insufficient_speed) {
1097
"No work available for the applications you have selected. Please check your settings on the web site.",
1100
reply.insert_message(um);
1102
if (reply.wreq.speed.insufficient) {
1007
1103
if (reply.wreq.core_client_version>419) {
1008
1104
sprintf(helpful,
1009
1105
"(won't finish in time) "
1010
"Computer on %.1f%% of time, BOINC on %.1f%% of that, this project gets %.1f%% of that",
1011
100.0*reply.host.on_frac, 100.0*reply.host.active_frac, 100.0*sreq.resource_share_fraction
1106
"Computer on %.1f%% of time, BOINC on %.1f%% of that",
1107
100.0*reply.host.on_frac, 100.0*reply.host.active_frac
1015
1110
sprintf(helpful,
1016
1111
"(won't finish in time) "
1017
"Computer available %.1f%% of time, this project gets %.1f%% of that",
1018
100.0*reply.host.on_frac, 100.0*sreq.resource_share_fraction
1112
"Computer available %.1f%% of time",
1113
100.0*reply.host.on_frac
1021
1116
USER_MESSAGE um(helpful, "high");
1080
1172
reply.set_delay(delay_time);
1082
1174
if (reply.wreq.cache_size_exceeded) {
1084
sprintf(helpful, "(reached per-host limit of %d tasks)",
1175
sprintf(helpful, "(reached per-CPU limit of %d tasks)",
1085
1176
config.max_wus_in_progress
1087
1178
USER_MESSAGE um(helpful, "high");
1088
1179
reply.insert_message(um);
1089
1180
reply.set_delay(DELAY_NO_WORK_CACHE);
1090
log_messages.printf(
1091
SCHED_MSG_LOG::MSG_NORMAL,
1092
"host %d already has %d result(s) on cache\n",
1181
log_messages.printf(MSG_NORMAL,
1182
"host %d already has %d result(s) in progress\n",
1093
1183
reply.host.id, reply.wreq.nresults_on_host
1189
static void get_running_frac(SCHEDULER_REPLY& reply) {
1190
if (reply.wreq.core_client_version<=419) {
1191
reply.wreq.running_frac = reply.host.on_frac;
1193
reply.wreq.running_frac = reply.host.active_frac * reply.host.on_frac;
1195
if (reply.wreq.running_frac < HOST_ACTIVE_FRAC_MIN) {
1196
reply.wreq.running_frac = HOST_ACTIVE_FRAC_MIN;
1198
if (reply.wreq.running_frac > 1) reply.wreq.running_frac = 1;
1201
void send_work(SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply) {
1202
if (sreq.work_req_seconds <= 0) return;
1204
reply.wreq.core_client_version =
1205
sreq.core_client_major_version*100 + sreq.core_client_minor_version;
1206
reply.wreq.disk_available = max_allowable_disk(sreq, reply);
1207
reply.wreq.core_client_version = sreq.core_client_major_version*100
1208
+ sreq.core_client_minor_version;
1210
if (hr_unknown_platform(sreq.host)) {
1211
reply.wreq.hr_reject_perm = true;
1215
get_host_info(reply); // parse project prefs for app details
1217
get_running_frac(reply);
1219
if (config.debug_send) {
1220
log_messages.printf(MSG_DEBUG,
1221
"%s matchmaking scheduling; %s EDF sim\n",
1227
config.workload_sim?"Using":"Not using"
1229
log_messages.printf(MSG_DEBUG,
1230
"available disk %f GB, work_buf_min %d\n",
1231
reply.wreq.disk_available/1e9,
1232
(int)sreq.global_prefs.work_buf_min()
1234
log_messages.printf(MSG_DEBUG,
1235
"running frac %f DCF %f CPU effic %f est delay %d\n",
1236
reply.wreq.running_frac,
1237
reply.host.duration_correction_factor,
1238
reply.host.cpu_efficiency,
1239
(int)sreq.estimated_delay
1243
reply.wreq.seconds_to_fill = sreq.work_req_seconds;
1244
if (reply.wreq.seconds_to_fill > MAX_SECONDS_TO_SEND) {
1245
reply.wreq.seconds_to_fill = MAX_SECONDS_TO_SEND;
1247
if (reply.wreq.seconds_to_fill < MIN_SECONDS_TO_SEND) {
1248
reply.wreq.seconds_to_fill = MIN_SECONDS_TO_SEND;
1251
if (config.enable_assignment) {
1252
if (send_assigned_jobs(sreq, reply)) {
1253
if (config.debug_assignment) {
1254
log_messages.printf(MSG_DEBUG,
1255
"[HOST#%d] sent assigned jobs\n", reply.host.id
1262
if (config.workload_sim && sreq.have_other_results_list) {
1264
sreq.global_prefs.work_buf_min(), effective_ncpus(reply.host), sreq.ip_results
1268
if (config.locality_scheduling) {
1269
reply.wreq.infeasible_only = false;
1270
send_work_locality(sreq, reply);
1273
send_work_matchmaker(sreq, reply);
1275
reply.wreq.beta_only = false;
1276
reply.wreq.user_apps_only = true;
1278
// give top priority to results that require a 'reliable host'
1280
if (reply.wreq.host_info.reliable) {
1281
reply.wreq.reliable_only = true;
1282
reply.wreq.infeasible_only = false;
1283
scan_work_array(sreq, reply);
1285
reply.wreq.reliable_only = false;
1287
// give 2nd priority to results for a beta app
1288
// (projects should load beta work with care,
1289
// otherwise your users won't get production work done!
1291
if (reply.wreq.host_info.allow_beta_work) {
1292
reply.wreq.beta_only = true;
1293
if (config.debug_send) {
1294
log_messages.printf(MSG_DEBUG,
1295
"[HOST#%d] will accept beta work. Scanning for beta work.\n",
1299
scan_work_array(sreq, reply);
1301
reply.wreq.beta_only = false;
1303
// give next priority to results that were infeasible for some other host
1305
reply.wreq.infeasible_only = true;
1306
scan_work_array(sreq, reply);
1308
reply.wreq.infeasible_only = false;
1309
scan_work_array(sreq, reply);
1311
// If user has selected apps but will accept any,
1312
// and we haven't found any jobs for selected apps, try others
1314
if (!reply.wreq.nresults && reply.wreq.host_info.allow_non_preferred_apps ) {
1315
reply.wreq.user_apps_only = false;
1316
preferred_app_message_index = reply.wreq.no_work_messages.size();
1317
if (config.debug_send) {
1318
log_messages.printf(MSG_DEBUG,
1319
"[HOST#%d] is looking for work from a non-preferred application\n",
1323
scan_work_array(sreq, reply);
1328
explain_to_user(reply);
1339
BEST_APP_VERSION* bavp;
1341
void get_score(SCHEDULER_REQUEST&, SCHEDULER_REPLY&);
1349
std::list<JOB> jobs; // sorted high to low
1352
double higher_score_disk_usage(double);
1353
double lowest_score();
1354
inline bool request_satisfied() {
1355
return est_time >= work_req;
1357
void send(SCHEDULER_REQUEST&, SCHEDULER_REPLY&);
1360
// reread result from DB, make sure it's still unsent
1361
// TODO: from here to add_result_to_reply()
1362
// (which updates the DB record) should be a transaction
1364
int read_sendable_result(DB_RESULT& result) {
1365
int retval = result.lookup_id(result.id);
1367
log_messages.printf(MSG_CRITICAL,
1368
"[RESULT#%d] result.lookup_id() failed %d\n",
1371
return ERR_NOT_FOUND;
1373
if (result.server_state != RESULT_SERVER_STATE_UNSENT) {
1374
log_messages.printf(MSG_NORMAL,
1375
"[RESULT#%d] expected to be unsent; instead, state is %d\n",
1376
result.id, result.server_state
1378
return ERR_BAD_RESULT_STATE;
1101
const char *BOINC_RCSID_32dcd335e7 = "$Id: sched_send.C 12791 2007-05-31 18:14:45Z boincadm $";
1383
// compute a "score" for sending this job to this host.
1384
// return 0 if the WU is infeasible
1386
void JOB::get_score(SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply) {
1391
WU_RESULT& wu_result = ssp->wu_results[index];
1392
wu = wu_result.workunit;
1393
app = ssp->lookup_app(wu.appid);
1397
// Find the app and app_version for the client's platform.
1399
if (anonymous(sreq.platforms.list[0])) {
1400
found = sreq.has_version(*app);
1404
bavp = get_app_version(sreq, reply, wu);
1408
retval = wu_is_infeasible_fast(wu, sreq, reply, *app);
1410
if (config.debug_send) {
1411
log_messages.printf(MSG_DEBUG,
1412
"[HOST#%d] [WU#%d %s] WU is infeasible: %s\n",
1413
reply.host.id, wu.id, wu.name, infeasible_string(retval)
1421
// check if user has selected apps
1423
if (!reply.wreq.host_info.allow_beta_work || config.distinct_beta_apps) {
1424
if (app_not_selected(wu, sreq, reply)) {
1425
if (!reply.wreq.host_info.allow_non_preferred_apps) {
1430
if (reply.wreq.host_info.allow_non_preferred_apps) {
1436
// if it's a beta user, try to send beta jobs
1439
if (reply.wreq.host_info.allow_beta_work) {
1447
// if job needs to get done fast, send to fast/reliable host
1449
if (reply.wreq.host_info.reliable && (wu_result.need_reliable)) {
1453
// if job already committed to an HR class,
1454
// try to send to host in that class
1456
if (wu_result.infeasible_count) {
1460
// If user has selected some apps but will accept jobs from others,
1461
// try to send them jobs from the selected apps
1465
bool wu_is_infeasible_slow(
1466
WU_RESULT& wu_result, SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply
1473
// Don't send if we've already sent a result of this WU to this user.
1475
if (config.one_result_per_user_per_wu) {
1477
"where workunitid=%d and userid=%d",
1478
wu_result.workunit.id, reply.user.id
1480
retval = result.count(n, buf);
1482
log_messages.printf(MSG_CRITICAL,
1483
"send_work: can't get result count (%d)\n", retval
1488
if (config.debug_send) {
1489
log_messages.printf(MSG_DEBUG,
1490
"send_work: user %d already has %d result(s) for WU %d\n",
1491
reply.user.id, n, wu_result.workunit.id
1497
} else if (config.one_result_per_host_per_wu) {
1498
// Don't send if we've already sent a result
1499
// of this WU to this host.
1500
// We only have to check this
1501
// if we don't send one result per user.
1504
"where workunitid=%d and hostid=%d",
1505
wu_result.workunit.id, reply.host.id
1507
retval = result.count(n, buf);
1509
log_messages.printf(MSG_CRITICAL,
1510
"send_work: can't get result count (%d)\n", retval
1515
if (config.debug_send) {
1516
log_messages.printf(MSG_DEBUG,
1517
"send_work: host %d already has %d result(s) for WU %d\n",
1518
reply.host.id, n, wu_result.workunit.id
1526
APP* app = ssp->lookup_app(wu_result.workunit.appid);
1527
WORKUNIT wu = wu_result.workunit;
1528
if (app_hr_type(*app)) {
1529
if (already_sent_to_different_platform_careful(
1530
sreq, reply.wreq, wu, *app
1532
if (config.debug_send) {
1533
log_messages.printf(MSG_DEBUG,
1534
"[HOST#%d] [WU#%d %s] WU is infeasible (assigned to different platform)\n",
1535
reply.host.id, wu.id, wu.name
1538
// Mark the workunit as infeasible.
1539
// This ensures that jobs already assigned to a platform
1540
// are processed first.
1542
wu_result.infeasible_count++;
1549
double JOB_SET::lowest_score() {
1550
if (jobs.empty()) return 0;
1551
return jobs.back().score;
1554
// add the given job, and remove lowest-score jobs
1555
// that are in excess of work request
1556
// or that cause the disk limit to be exceeded
1558
void JOB_SET::add_job(JOB& job) {
1559
while (!jobs.empty()) {
1560
JOB& worst_job = jobs.back();
1561
if (est_time + job.est_time - worst_job.est_time > work_req) {
1562
est_time -= worst_job.est_time;
1563
disk_usage -= worst_job.disk_usage;
1567
while (!jobs.empty()) {
1568
JOB& worst_job = jobs.back();
1569
if (disk_usage + job.disk_usage > disk_limit) {
1570
est_time -= worst_job.est_time;
1571
disk_usage -= worst_job.disk_usage;
1575
list<JOB>::iterator i = jobs.begin();
1576
while (i != jobs.end()) {
1577
if (i->score < job.score) {
1578
jobs.insert(i, job);
1583
if (i == jobs.end()) {
1584
jobs.push_back(job);
1586
est_time += job.est_time;
1587
disk_usage += job.disk_usage;
1590
// return the disk usage of jobs above the given score
1592
double JOB_SET::higher_score_disk_usage(double v) {
1594
list<JOB>::iterator i = jobs.begin();
1595
while (i != jobs.end()) {
1596
if (i->score < v) break;
1597
sum += i->disk_usage;
1603
void JOB_SET::send(SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply) {
1608
list<JOB>::iterator i = jobs.begin();
1609
while (i != jobs.end()) {
1611
WU_RESULT wu_result = ssp->wu_results[job.index];
1612
ssp->wu_results[job.index].state = WR_STATE_EMPTY;
1613
wu = wu_result.workunit;
1614
result.id = wu_result.resultid;
1615
retval = read_sendable_result(result);
1616
if (retval) continue;
1617
add_result_to_reply(result, wu, sreq, reply, job.bavp);
1622
void send_work_matchmaker(SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply) {
1623
int i, slots_scanned=0, slots_locked=0;
1627
int max_locked = 10;
1631
i = rand() % ssp->max_wu_results;
1633
i = (i+1) % ssp->max_wu_results;
1635
if (slots_scanned >= max_slots) break;
1636
WU_RESULT& wu_result = ssp->wu_results[i];
1637
switch (wu_result.state) {
1638
case WR_STATE_EMPTY:
1640
case WR_STATE_PRESENT:
1649
job.get_score(sreq, reply);
1650
if (config.debug_send) {
1651
log_messages.printf(MSG_DEBUG,
1652
"score for %s: %f\n", wu_result.workunit.name, job.score
1655
if (job.score > jobs.lowest_score()) {
1656
ssp->wu_results[i].state = pid;
1658
if (wu_is_infeasible_slow(wu_result, sreq, reply)) {
1660
ssp->wu_results[i].state = WR_STATE_EMPTY;
1667
if (jobs.request_satisfied() && slots_scanned>=min_slots) break;
1670
jobs.send(sreq, reply);
1672
if (slots_locked > max_locked) {
1673
log_messages.printf(MSG_CRITICAL,
1674
"Found too many locked slots (%d>%d) - increase array size",
1675
slots_locked, max_locked
1681
const char *BOINC_RCSID_32dcd335e7 = "$Id: sched_send.C 15176 2008-05-12 20:25:35Z romw $";