35
35
// If an app is running (not suspended), the interval
36
36
// during which it's been running.
39
41
#include "boinc_win.h"
52
#include "error_numbers.h"
45
54
#include "str_util.h"
47
#include "error_numbers.h"
50
57
#include "client_msgs.h"
51
58
#include "log_flags.h"
56
61
#include "client_state.h"
61
#define MAX_STD (86400)
62
// maximum short-term debt
64
66
#define DEADLINE_CUSHION 0
65
67
// try to finish jobs this much in advance of their deadline
67
bool COPROCS::sufficient_coprocs(COPROCS& needed, bool log_flag, const char* prefix) {
68
for (unsigned int i=0; i<needed.coprocs.size(); i++) {
69
COPROC* cp = needed.coprocs[i];
70
COPROC* cp2 = lookup(cp->type);
72
msg_printf(NULL, MSG_INTERNAL_ERROR,
73
"Missing a %s coprocessor", cp->type
77
if (cp2->used + cp->count > cp2->count) {
79
msg_printf(NULL, MSG_INFO,
80
"[%s] rr_sim: insufficient coproc %s (%d + %d > %d)",
81
prefix, cp2->type, cp2->used, cp->count, cp2->count
90
void COPROCS::reserve_coprocs(COPROCS& needed, void* owner, bool log_flag, const char* prefix) {
91
for (unsigned int i=0; i<needed.coprocs.size(); i++) {
92
COPROC* cp = needed.coprocs[i];
93
COPROC* cp2 = lookup(cp->type);
95
msg_printf(NULL, MSG_INTERNAL_ERROR,
96
"Coproc type %s not found", cp->type
101
msg_printf(NULL, MSG_INFO,
102
"[%s] reserving %d of coproc %s", prefix, cp->count, cp2->type
105
cp2->used += cp->count;
107
for (int j=0; j<cp2->count; j++) {
108
if (!cp2->owner[j]) {
109
cp2->owner[j] = owner;
117
void COPROCS::free_coprocs(COPROCS& needed, void* owner, bool log_flag, const char* prefix) {
118
for (unsigned int i=0; i<needed.coprocs.size(); i++) {
119
COPROC* cp = needed.coprocs[i];
120
COPROC* cp2 = lookup(cp->type);
123
msg_printf(NULL, MSG_INFO,
124
"[%s] freeing %d of coproc %s", prefix, cp->count, cp2->type
127
cp2->used -= cp->count;
128
for (int j=0; j<cp2->count; j++) {
129
if (cp2->owner[j] == owner) {
69
// used in schedule_cpus() to keep track of resources used
70
// by jobs tentatively scheduled so far
72
struct PROC_RESOURCES {
74
double ncpus_used_st; // #CPUs of GPU or single-thread jobs
75
double ncpus_used_mt; // #CPUs of multi-thread jobs
78
// should we stop scanning jobs?
80
inline bool stop_scan_cpu() {
81
return ncpus_used_st >= ncpus;
84
inline bool stop_scan_coproc(int rsc_type) {
85
if (rsc_type == RSC_TYPE_CUDA) {
86
return coprocs.cuda.used >= coprocs.cuda.count;
88
return coprocs.ati.used >= coprocs.ati.count;
91
// should we consider scheduling this job?
93
bool can_schedule(RESULT* rp) {
94
if (rp->schedule_backoff > gstate.now) return false;
95
if (rp->uses_coprocs()) {
96
if (gpu_suspend_reason) return false;
97
if (sufficient_coprocs(*rp->avp, log_flags.cpu_sched_debug)) {
100
if (log_flags.cpu_sched_debug) {
101
msg_printf(rp->project, MSG_INFO,
102
"[cpu_sched] insufficient coprocessors for %s", rp->name
107
} else if (rp->avp->avg_ncpus > 1) {
108
return (ncpus_used_mt + rp->avp->avg_ncpus <= ncpus);
110
return (ncpus_used_st < ncpus);
114
// we've decided to run this - update bookkeeping
116
void schedule(RESULT* rp) {
118
*rp->avp, log_flags.cpu_sched_debug, "cpu_sched_debug"
120
if (rp->uses_coprocs()) {
121
ncpus_used_st += rp->avp->avg_ncpus;
122
} else if (rp->avp->avg_ncpus > 1) {
123
ncpus_used_mt += rp->avp->avg_ncpus;
125
ncpus_used_st += rp->avp->avg_ncpus;
129
bool sufficient_coprocs(APP_VERSION& av, bool log_flag) {
135
} else if (av.natis) {
142
msg_printf(NULL, MSG_INTERNAL_ERROR,
143
"Missing a %s coprocessor", cp2->type
147
if (cp2->used + x > cp2->count) {
149
msg_printf(NULL, MSG_INFO,
150
"[cpu_sched] insufficient coproc %s (%f + %f > %d)",
151
cp2->type, cp2->used, x, cp2->count
159
void reserve_coprocs(
160
APP_VERSION& av, bool log_flag, const char* prefix
167
} else if (av.natis) {
174
msg_printf(NULL, MSG_INTERNAL_ERROR,
175
"Coproc type %s not found", cp2->type
180
msg_printf(NULL, MSG_INFO,
181
"[%s] reserving %f of coproc %s", prefix, x, cp2->type
188
bool gpus_usable = true;
190
// see whether there's been a change in coproc usability;
191
// if so set or clear "coproc_missing" flags and return true.
193
bool check_coprocs_usable() {
196
bool new_usable = !is_remote_desktop();
200
for (i=0; i<gstate.results.size(); i++) {
201
RESULT* rp = gstate.results[i];
202
if (rp->avp->ncudas || rp->avp->natis) {
203
rp->coproc_missing = true;
206
msg_printf(NULL, MSG_INFO,
207
"GPUs have become unusable; disabling tasks"
214
for (i=0; i<gstate.results.size(); i++) {
215
RESULT* rp = gstate.results[i];
216
if (rp->avp->ncudas || rp->avp->natis) {
217
rp->coproc_missing = false;
220
msg_printf(NULL, MSG_INFO,
221
"GPUs have become usable; enabling tasks"
136
231
// return true if the task has finished its time slice
137
232
// and has checkpointed in last 10 secs
233
330
PROJECT* p = projects[i];
234
331
if (!p->next_runnable_result) continue;
235
332
if (p->non_cpu_intensive) continue;
236
if (first || p->anticipated_debt > best_debt) {
239
best_debt = p->anticipated_debt;
334
if (first || project_priority(p)> best_debt) {
337
best_debt = project_priority(p);
340
if (first || p->cpu_pwf.anticipated_debt > best_debt) {
343
best_debt = p->cpu_pwf.anticipated_debt;
242
347
if (!best_project) return NULL;
244
350
if (log_flags.cpu_sched_debug) {
245
351
msg_printf(best_project, MSG_INFO,
246
"[cpu_sched_debug] highest debt: %f %s",
247
best_project->anticipated_debt,
352
"[cpu_sched] highest debt: %f %s",
353
best_project->cpu_pwf.anticipated_debt,
248
354
best_project->next_runnable_result->name
251
358
RESULT* rp = best_project->next_runnable_result;
252
359
best_project->next_runnable_result = 0;
256
// Return earliest-deadline result from a project with deadlines_missed>0
258
RESULT* CLIENT_STATE::earliest_deadline_result() {
363
// Return a job of the given type according to the following criteria
365
// - from project with higher STD for that resource
366
// - already-started job
367
// - earlier received_time
368
// - lexicographically earlier name
370
// Give priority to already-started jobs because of the following scenario:
371
// - client gets several jobs in a sched reply and starts downloading files
372
// - a later job finishes downloading and starts
373
// - an earlier finishes downloading and preempts
375
RESULT* first_coproc_result(int rsc_type) {
379
for (i=0; i<gstate.results.size(); i++) {
380
RESULT* rp = gstate.results[i];
381
if (rp->resource_type() != rsc_type) continue;
382
if (!rp->runnable()) continue;
383
if (rp->project->non_cpu_intensive) continue;
384
if (rp->already_selected) continue;
386
double std = project_priority(rp->project);
388
double std = rp->project->anticipated_debt(rsc_type);
396
if (std < best_std) {
399
if (std > best_std) {
405
bool bs = !best->not_started();
406
bool rs = !rp->not_started();
415
if (rp->received_time < best->received_time) {
418
} else if (rp->received_time == best->received_time) {
419
// make it deterministic by looking at name
421
if (strcmp(rp->name, best->name) > 0) {
430
// Return earliest-deadline result for given resource type;
431
// return only results projected to miss their deadline,
432
// or from projects with extreme DCF
434
static RESULT* earliest_deadline_result(int rsc_type) {
259
435
RESULT *best_result = NULL;
260
436
ACTIVE_TASK* best_atp = NULL;
263
for (i=0; i<results.size(); i++) {
264
RESULT* rp = results[i];
439
for (i=0; i<gstate.results.size(); i++) {
440
RESULT* rp = gstate.results[i];
441
if (rp->resource_type() != rsc_type) continue;
442
if (rp->already_selected) continue;
265
443
if (!rp->runnable()) continue;
266
if (rp->project->non_cpu_intensive) continue;
267
if (rp->already_selected) continue;
268
if (!rp->project->deadlines_missed && rp->project->duration_correction_factor < 90.0) continue;
269
// treat projects with DCF>90 as if they had deadline misses
444
PROJECT* p = rp->project;
445
if (p->non_cpu_intensive) continue;
447
bool only_deadline_misses = true;
449
// treat projects with DCF>90 as if they had deadline misses
451
if (p->duration_correction_factor < 90.0) {
455
d = p->cuda_pwf.deadlines_missed_copy;
458
d = p->ati_pwf.deadlines_missed_copy;
461
d = p->cpu_pwf.deadlines_missed_copy;
467
only_deadline_misses = false;
470
if (only_deadline_misses && !rp->rr_sim_misses_deadline) {
271
473
bool new_best = false;
272
474
if (best_result) {
273
475
if (rp->report_deadline < best_result->report_deadline) {
315
516
for (i=0; i<projects.size(); i++) {
316
517
PROJECT* p = projects[i];
317
p->wall_cpu_time_this_debt_interval = 0.0;
319
total_wall_cpu_time_this_debt_interval = 0.0;
518
p->cpu_pwf.reset_debt_accounting();
519
if (host_info.have_cuda()) {
520
p->cuda_pwf.reset_debt_accounting();
522
if (host_info.have_ati()) {
523
p->ati_pwf.reset_debt_accounting();
526
cpu_work_fetch.reset_debt_accounting();
527
if (host_info.have_cuda()) {
528
cuda_work_fetch.reset_debt_accounting();
530
if (host_info.have_ati()) {
531
ati_work_fetch.reset_debt_accounting();
320
533
debt_interval_start = now;
538
// update REC (recent estimated credit)
540
static void update_rec() {
541
double f = gstate.host_info.p_fpops;
543
for (unsigned int i=0; i<gstate.projects.size(); i++) {
544
PROJECT* p = gstate.projects[i];
545
double x = p->cpu_pwf.secs_this_debt_interval * f;
546
if (gstate.host_info.have_cuda()) {
547
x += p->cuda_pwf.secs_this_debt_interval * f * cuda_work_fetch.relative_speed;
549
if (gstate.host_info.have_ati()) {
550
x += p->ati_pwf.secs_this_debt_interval * f * ati_work_fetch.relative_speed;
553
double old = p->pwf.rec;
555
// start averages at zero
557
if (p->pwf.rec_time == 0) {
558
p->pwf.rec_time = gstate.debt_interval_start;
563
gstate.debt_interval_start,
570
if (log_flags.debt_debug) {
571
double dt = gstate.now - gstate.debt_interval_start;
572
msg_printf(p, MSG_INFO,
573
"[debt] recent est credit: %.2fG in %.2f sec, %f + %f ->%f",
574
x, dt, old, p->pwf.rec-old, p->pwf.rec
580
double peak_flops(APP_VERSION* avp) {
581
double f = gstate.host_info.p_fpops;
582
return f * avp->avg_ncpus
583
+ f * avp->ncudas * cuda_work_fetch.relative_speed
584
+ f * avp->natis * ati_work_fetch.relative_speed
588
static double rec_sum;
590
// Initialize project "priorities" based on REC:
591
// compute resource share and REC fractions
592
// among compute-intensive, non-suspended projects
594
void project_priority_init() {
596
for (unsigned int i=0; i<gstate.projects.size(); i++) {
597
PROJECT* p = gstate.projects[i];
598
if (p->non_cpu_intensive) continue;
599
if (p->suspended_via_gui) continue;
600
rs_sum += p->resource_share;
601
rec_sum += p->pwf.rec;
606
for (unsigned int i=0; i<gstate.projects.size(); i++) {
607
PROJECT* p = gstate.projects[i];
608
if (p->non_cpu_intensive || p->suspended_via_gui || rs_sum==0) {
609
p->resource_share_frac = 0;
612
p->resource_share_frac = p->resource_share/rs_sum;
613
p->pwf.rec_temp = p->pwf.rec;
618
double project_priority(PROJECT* p) {
619
double x = p->resource_share_frac - p->pwf.rec_temp/rec_sum;
621
printf("%s: rs frac %.3f rec_temp %.3f rec_sum %.3f prio %.3f\n",
622
p->project_name, p->resource_share_frac, p->pwf.rec_temp, rec_sum, x
628
// we plan to run this job.
629
// bump the project's REC accordingly
631
void adjust_rec_temp(RESULT* rp) {
632
PROJECT* p = rp->project;
633
p->pwf.rec_temp += peak_flops(rp->avp);
323
638
// adjust project debts (short, long-term)
325
640
void CLIENT_STATE::adjust_debts() {
327
double total_long_term_debt = 0;
328
double total_short_term_debt = 0;
330
int nprojects=0, nrprojects=0;
333
double wall_cpu_time = now - debt_interval_start;
335
if (wall_cpu_time < 1) {
339
// if the elapsed time is more than the scheduling period,
642
double elapsed_time = now - debt_interval_start;
644
// If the elapsed time is more than 2*DEBT_ADJUST_PERIOD
340
645
// it must be because the host was suspended for a long time.
341
// Currently we don't have a way to estimate how long this was for,
342
// so ignore the last period and reset counters.
646
// In this case, ignore the last period
344
if (wall_cpu_time > global_prefs.cpu_scheduling_period()*2) {
648
if (elapsed_time > 2*DEBT_ADJUST_PERIOD || elapsed_time < 0) {
345
649
if (log_flags.debt_debug) {
346
650
msg_printf(NULL, MSG_INFO,
347
"[debt_debug] adjust_debt: elapsed time (%d) longer than sched period (%d). Ignoring this period.",
348
(int)wall_cpu_time, (int)global_prefs.cpu_scheduling_period()
651
"[debt] adjust_debt: elapsed time (%d) longer than sched enforce period(%d). Ignoring this period.",
652
(int)elapsed_time, (int)DEBT_ADJUST_PERIOD
351
655
reset_debt_accounting();
355
// Total up total and per-project "wall CPU" since last CPU reschedule.
356
// "Wall CPU" is the wall time during which a task was
357
// runnable (at the OS level).
659
// skip small intervals
359
// We use wall CPU for debt calculation
360
// (instead of reported actual CPU) for two reasons:
361
// 1) the process might have paged a lot, so the actual CPU
362
// may be a lot less than wall CPU
363
// 2) BOINC relies on apps to report their CPU time.
364
// Sometimes there are bugs and apps report zero CPU.
365
// It's safer not to trust them.
661
if (elapsed_time < 1) {
665
// total up how many instance-seconds projects got
367
667
for (i=0; i<active_tasks.active_tasks.size(); i++) {
368
668
ACTIVE_TASK* atp = active_tasks.active_tasks[i];
369
669
if (atp->scheduler_state != CPU_SCHED_SCHEDULED) continue;
370
if (atp->wup->project->non_cpu_intensive) continue;
372
atp->result->project->wall_cpu_time_this_debt_interval += wall_cpu_time;
373
total_wall_cpu_time_this_debt_interval += wall_cpu_time;
376
rrs = runnable_resource_share();
377
prrs = potentially_runnable_resource_share();
379
for (i=0; i<projects.size(); i++) {
382
// potentially_runnable() can be false right after a result completes,
383
// but we still need to update its LTD.
384
// In this case its wall_cpu_time_this_debt_interval will be nonzero.
386
if (!(p->potentially_runnable()) && p->wall_cpu_time_this_debt_interval) {
387
prrs += p->resource_share;
391
for (i=0; i<projects.size(); i++) {
393
if (p->non_cpu_intensive) continue;
396
// adjust long-term debts
398
if (p->potentially_runnable() || p->wall_cpu_time_this_debt_interval) {
399
share_frac = p->resource_share/prrs;
400
p->long_term_debt += share_frac*total_wall_cpu_time_this_debt_interval
401
- p->wall_cpu_time_this_debt_interval;
403
total_long_term_debt += p->long_term_debt;
405
// adjust short term debts
409
share_frac = p->resource_share/rrs;
410
p->short_term_debt += share_frac*total_wall_cpu_time_this_debt_interval
411
- p->wall_cpu_time_this_debt_interval;
412
total_short_term_debt += p->short_term_debt;
414
p->short_term_debt = 0;
415
p->anticipated_debt = 0;
419
if (nprojects==0) return;
422
// normalize so mean is zero,
424
// normalize so mean is zero, and limit abs value at MAX_STD
426
double avg_long_term_debt = total_long_term_debt / nprojects;
427
double avg_short_term_debt = 0;
429
avg_short_term_debt = total_short_term_debt / nrprojects;
431
for (i=0; i<projects.size(); i++) {
433
if (p->non_cpu_intensive) continue;
435
p->short_term_debt -= avg_short_term_debt;
436
if (p->short_term_debt > MAX_STD) {
437
p->short_term_debt = MAX_STD;
439
if (p->short_term_debt < -MAX_STD) {
440
p->short_term_debt = -MAX_STD;
444
p->long_term_debt -= avg_long_term_debt;
445
if (log_flags.debt_debug) {
446
msg_printf(0, MSG_INFO,
447
"[debt_debug] adjust_debts(): project %s: STD %f, LTD %f",
448
p->project_name, p->short_term_debt, p->long_term_debt
670
PROJECT* p = atp->result->project;
671
if (p->non_cpu_intensive) continue;
672
work_fetch.accumulate_inst_sec(atp, elapsed_time);
678
cpu_work_fetch.update_long_term_debts();
679
cpu_work_fetch.update_short_term_debts();
680
if (host_info.have_cuda()) {
681
cuda_work_fetch.update_long_term_debts();
682
cuda_work_fetch.update_short_term_debts();
684
if (host_info.have_ati()) {
685
ati_work_fetch.update_long_term_debts();
686
ati_work_fetch.update_short_term_debts();
453
690
reset_debt_accounting();
571
742
atp->needs_shmem = false;
573
proc_rsc.ram_left -= atp->procinfo.working_set_size_smoothed;
575
746
if (log_flags.cpu_sched_debug) {
576
747
msg_printf(rp->project, MSG_INFO,
577
"[cpu_sched_debug] scheduling %s", rp->name
748
"[cpu_sched] scheduling %s (%s)", rp->name, description
580
if (!atp || !atp->coprocs_reserved) {
581
proc_rsc.coprocs.reserve_coprocs(
582
rp->avp->coprocs, rp, log_flags.cpu_sched_debug, "cpu_sched_debug"
585
proc_rsc.ncpus_used += rp->avp->avg_ncpus;
586
if (rp->uses_coprocs()) {
587
proc_rsc.ncoproc_jobs--;
589
rp->project->anticipated_debt -= (rp->project->resource_share / rrs) * expected_payoff;
751
proc_rsc.schedule(rp);
756
// project STD at end of time slice
758
double dt = gstate.global_prefs.cpu_scheduling_period();
759
rp->project->cpu_pwf.anticipated_debt -= dt*rp->avp->avg_ncpus/cpu_work_fetch.ninstances;
760
rp->project->cuda_pwf.anticipated_debt -= dt*rp->avp->ncudas/cuda_work_fetch.ninstances;
761
rp->project->ati_pwf.anticipated_debt -= dt*rp->avp->natis/ati_work_fetch.ninstances;
766
// If a job J once ran in EDF,
767
// and its project has another job of the same resource type
768
// marked as deadline miss, mark J as deadline miss.
769
// This avoids domino-effect preemption
771
static void promote_once_ran_edf() {
772
for (unsigned int i=0; i<gstate.active_tasks.active_tasks.size(); i++) {
773
ACTIVE_TASK* atp = gstate.active_tasks.active_tasks[i];
774
if (atp->once_ran_edf) {
775
RESULT* rp = atp->result;
776
PROJECT* p = rp->project;
777
if (p->deadlines_missed(rp->avp->rsc_type())) {
778
rp->rr_sim_misses_deadline = true;
784
void add_coproc_jobs(int rsc_type, PROC_RESOURCES& proc_rsc) {
790
if (!cpu_sched_rr_only) {
792
// choose coproc jobs from projects with coproc deadline misses
794
while (!proc_rsc.stop_scan_coproc(rsc_type)) {
795
rp = earliest_deadline_result(rsc_type);
797
rp->already_selected = true;
798
if (!proc_rsc.can_schedule(rp)) continue;
799
atp = gstate.lookup_active_task_by_result(rp);
800
can_run = schedule_if_possible(
801
rp, atp, proc_rsc, "coprocessor job, EDF"
803
if (!can_run) continue;
804
if (rsc_type == RSC_TYPE_CUDA) {
805
rp->project->cuda_pwf.deadlines_missed_copy--;
807
rp->project->ati_pwf.deadlines_missed_copy--;
809
rp->edf_scheduled = true;
810
gstate.ordered_scheduled_results.push_back(rp);
816
// then coproc jobs in FIFO order
818
while (!proc_rsc.stop_scan_coproc(rsc_type)) {
819
rp = first_coproc_result(rsc_type);
821
rp->already_selected = true;
822
if (!proc_rsc.can_schedule(rp)) continue;
823
atp = gstate.lookup_active_task_by_result(rp);
824
can_run = schedule_if_possible(
825
rp, atp, proc_rsc, "coprocessor job, FIFO"
827
if (!can_run) continue;
828
gstate.ordered_scheduled_results.push_back(rp);
593
832
// CPU scheduler - decide which results to run.
594
833
// output: sets ordered_scheduled_result.
596
835
void CLIENT_STATE::schedule_cpus() {
599
double expected_payoff;
601
double rrs = runnable_resource_share();
602
839
PROC_RESOURCES proc_rsc;
603
840
ACTIVE_TASK* atp;
605
843
proc_rsc.ncpus = ncpus;
606
proc_rsc.ncpus_used = 0;
607
proc_rsc.ram_left = available_ram();
608
proc_rsc.coprocs.clone(coprocs, true);
609
proc_rsc.ncoproc_jobs = 0;
844
proc_rsc.ncpus_used_st = 0;
845
proc_rsc.ncpus_used_mt = 0;
846
proc_rsc.coprocs.clone(host_info.coprocs, false);
611
848
if (log_flags.cpu_sched_debug) {
612
msg_printf(0, MSG_INFO, "[cpu_sched_debug] schedule_cpus(): start");
849
msg_printf(0, MSG_INFO, "[cpu_sched] schedule_cpus(): start");
615
852
// do round-robin simulation to find what results miss deadline
667
// Next, choose results from projects with large debt
934
// Next, choose CPU jobs from projects with large debt
669
while (!proc_rsc.stop_scan()) {
936
while (!proc_rsc.stop_scan_cpu()) {
670
937
assign_results_to_projects();
671
938
rp = largest_debt_project_best_result();
673
atp = lookup_active_task_by_result(rp);
674
if (!proc_rsc.can_schedule(rp, atp)) continue;
675
if (!schedule_if_possible(rp, atp, proc_rsc, rrs, expected_payoff)) continue;
940
atp = lookup_active_task_by_result(rp);
941
if (!proc_rsc.can_schedule(rp)) continue;
942
can_run = schedule_if_possible(
943
rp, atp, proc_rsc, "CPU job, debt order"
945
if (!can_run) continue;
676
946
ordered_scheduled_results.push_back(rp);
679
request_enforce_schedule("schedule_cpus");
680
set_client_state_dirty("schedule_cpus");
683
952
static inline bool in_ordered_scheduled_results(ACTIVE_TASK* atp) {
684
for (unsigned int i=0; i<gstate.ordered_scheduled_results.size(); i++) {
685
if (atp->result == gstate.ordered_scheduled_results[i]) return true;
690
// return true if t1 is more preemptable than t0
692
static inline bool more_preemptable(ACTIVE_TASK* t0, ACTIVE_TASK* t1) {
693
if (t0->result->project->deadlines_missed && !t1->result->project->deadlines_missed) return true;
694
if (!t0->result->project->deadlines_missed && t1->result->project->deadlines_missed) return false;
695
if (t0->result->project->deadlines_missed && t1->result->project->deadlines_missed) {
696
if (t0->result->report_deadline < t1->result->report_deadline) return true;
697
if (t0->result->report_deadline > t1->result->report_deadline) return false;
700
bool fin0 = finished_time_slice(t0);
701
bool fin1 = finished_time_slice(t1);
702
if (fin1 && !fin0) return true;
703
if (fin0 && !fin1) return false;
704
if (t0->result->report_deadline < t1->result->report_deadline) return true;
705
if (t0->result->report_deadline > t1->result->report_deadline) return false;
710
// Make a list of preemptable tasks, in increasing order of preemptability.
711
// "Preemptable" means: running, non-GPU, not non-CPU-intensive,
712
// not in the scheduled results list.
714
void CLIENT_STATE::make_preemptable_task_list(
715
vector<ACTIVE_TASK*> &preemptable_tasks, double& ncpus_used
953
for (unsigned int i=0; i<gstate.ordered_scheduled_results.size(); i++) {
954
if (atp->result == gstate.ordered_scheduled_results[i]) return true;
959
// scan the runnable list, keeping track of CPU usage X.
960
// if find a MT job J, and X < ncpus, move J before all non-MT jobs
961
// But don't promote a MT job ahead of a job in EDF
963
// This is needed because there may always be a 1-CPU jobs
964
// in the middle of its time-slice, and MT jobs could starve.
966
static void promote_multi_thread_jobs(vector<RESULT*>& runnable_jobs) {
967
double cpus_used = 0;
968
vector<RESULT*>::iterator first_non_mt = runnable_jobs.end();
969
vector<RESULT*>::iterator cur = runnable_jobs.begin();
971
if (cur == runnable_jobs.end()) break;
972
if (cpus_used >= gstate.ncpus) break;
974
if (rp->rr_sim_misses_deadline) break;
975
double nc = rp->avp->avg_ncpus;
977
if (first_non_mt != runnable_jobs.end()) {
978
cur = runnable_jobs.erase(cur);
979
runnable_jobs.insert(first_non_mt, rp);
981
first_non_mt = runnable_jobs.end();
982
cur = runnable_jobs.begin();
986
if (first_non_mt == runnable_jobs.end()) {
995
// return true if r0 is more important to run than r1
997
static inline bool more_important(RESULT* r0, RESULT* r1) {
998
// favor jobs in danger of deadline miss
1000
bool miss0 = r0->edf_scheduled;
1001
bool miss1 = r1->edf_scheduled;
1002
if (miss0 && !miss1) return true;
1003
if (!miss0 && miss1) return false;
1005
// favor coproc jobs, so that e.g. if we're RAM-limited
1006
// we'll use the GPU instead of the CPU
1008
bool cp0 = r0->uses_coprocs();
1009
bool cp1 = r1->uses_coprocs();
1010
if (cp0 && !cp1) return true;
1011
if (!cp0 && cp1) return false;
1013
// favor jobs in the middle of time slice
1015
bool unfin0 = r0->unfinished_time_slice;
1016
bool unfin1 = r1->unfinished_time_slice;
1017
if (unfin0 && !unfin1) return true;
1018
if (!unfin0 && unfin1) return false;
1020
// favor jobs selected first by schedule_cpus()
1021
// (e.g., because their project has high STD)
1023
if (r0->seqno < r1->seqno) return true;
1024
if (r0->seqno > r1->seqno) return false;
1030
static void print_job_list(vector<RESULT*>& jobs) {
1031
for (unsigned int i=0; i<jobs.size(); i++) {
1032
RESULT* rp = jobs[i];
1033
msg_printf(rp->project, MSG_INFO,
1034
"[cpu_sched] %d: %s (MD: %s; UTS: %s)",
1036
rp->edf_scheduled?"yes":"no",
1037
rp->unfinished_time_slice?"yes":"no"
1042
// find running jobs that haven't finished their time slice.
1043
// Mark them as such, and add to list if not already there
1045
void CLIENT_STATE::append_unfinished_time_slice(
1046
vector<RESULT*> &runnable_jobs
1049
int seqno = (int)runnable_jobs.size();
721
1051
for (i=0; i<active_tasks.active_tasks.size(); i++) {
722
atp = active_tasks.active_tasks[i];
723
if (in_ordered_scheduled_results(atp)) continue;
724
if (!atp->result->runnable()) continue;
1052
ACTIVE_TASK* atp = active_tasks.active_tasks[i];
1053
if (!atp->result->runnable()) continue;
1054
if (atp->result->uses_coprocs() && gpu_suspend_reason) continue;
725
1055
if (atp->result->project->non_cpu_intensive) continue;
726
1056
if (atp->scheduler_state != CPU_SCHED_SCHEDULED) continue;
727
ncpus_used += atp->app_version->avg_ncpus;
728
atp->next_scheduler_state = CPU_SCHED_SCHEDULED;
729
if (atp->result->uses_coprocs()) continue;
730
preemptable_tasks.push_back(atp);
732
msg_printf(0, MSG_INFO, "%s: misses %d deadline %f finished %d ptr %x",
734
atp->result->project->deadlines_missed,
735
atp->result->report_deadline,
736
finished_time_slice(atp), atp
742
preemptable_tasks.begin(),
743
preemptable_tasks.end(),
1057
if (finished_time_slice(atp)) continue;
1058
atp->result->unfinished_time_slice = true;
1059
if (in_ordered_scheduled_results(atp)) continue;
1060
runnable_jobs.push_back(atp->result);
1061
atp->result->seqno = seqno;
1065
////////// Coprocessor scheduling ////////////////
1067
// theory of operations:
1069
// Jobs can use one or more integral instances, or a fractional instance
1071
// RESULT::coproc_indices
1072
// for a running job, the coprocessor instances it's using
1073
// COPROC::pending_usage[]: for each instance, its usage by running jobs
1074
// CORPOC::usage[]: for each instance, its usage
1076
// enforce_schedule() calls assign_coprocs(),
1077
// which assigns coproc instances to scheduled jobs,
1078
// and prunes jobs for which we can't make an assignment
1079
// (the job list is in order of decreasing priority)
1081
// assign_coprocs():
1082
// clear usage and pending_usage of all instances
1083
// for each running job J
1084
// increment pending_usage for the instances assigned to J
1085
// for each scheduled job J
1087
// if J's assignment fits
1088
// confirm assignment: dev pending_usage, inc usage
1092
// if J.usage is fractional
1093
// look for an instance that's already fractionally assigned
1094
// if that fails, look for a free instance
1095
// if that fails, prune J
1097
// if there are enough instances with usage=0
1098
// assign instances with pending_usage = usage = 0
1099
// (avoid preempting running jobs)
1100
// if need more, assign instances with usage = 0
1104
static inline void increment_pending_usage(
1105
RESULT* rp, double usage, COPROC* cp
1107
double x = (usage<1)?usage:1;
1108
for (int i=0; i<usage; i++) {
1109
int j = rp->coproc_indices[i];
1110
cp->pending_usage[j] += x;
1111
if (cp->pending_usage[j] > 1) {
1112
if (log_flags.coproc_debug) {
1113
msg_printf(rp->project, MSG_INFO,
1114
"[coproc] huh? %s %d %s pending usage > 1",
1115
cp->type, i, rp->name
1122
// check the GPU assignment for a currently-running app.
1123
// Note: don't check available RAM.
1124
// It may not be known (e.g. NVIDIA) and in any case,
1125
// if the app is still running, it has enough RAM
1127
static inline bool current_assignment_ok(
1128
RESULT* rp, double usage, COPROC* cp, bool& defer_sched
1130
defer_sched = false;
1131
double x = (usage<1)?usage:1;
1132
for (int i=0; i<usage; i++) {
1133
int j = rp->coproc_indices[i];
1134
if (cp->usage[j] + x > 1) {
1135
if (log_flags.coproc_debug) {
1136
msg_printf(rp->project, MSG_INFO,
1137
"[coproc] %s device %d already assigned: task %s",
1138
cp->type, j, rp->name
1147
static inline void confirm_current_assignment(
1148
RESULT* rp, double usage, COPROC* cp
1150
double x = (usage<1)?usage:1;
1151
for (int i=0; i<usage; i++) {
1152
int j = rp->coproc_indices[i];
1154
cp->pending_usage[j] -=x;
1155
if (log_flags.coproc_debug) {
1156
msg_printf(rp->project, MSG_INFO,
1157
"[coproc] %s instance %d: confirming for %s",
1158
cp->type, j, rp->name
1161
cp->available_ram[j] -= rp->avp->gpu_ram;
1165
static inline bool get_fractional_assignment(
1166
RESULT* rp, double usage, COPROC* cp, bool& defer_sched
1169
defer_sched = false;
1171
// try to assign an instance that's already fractionally assigned
1173
for (i=0; i<cp->count; i++) {
1174
if (cp->available_ram_unknown[i]) {
1177
if ((cp->usage[i] || cp->pending_usage[i])
1178
&& (cp->usage[i] + cp->pending_usage[i] + usage <= 1)
1180
if (rp->avp->gpu_ram > cp->available_ram[i]) {
1184
rp->coproc_indices[0] = i;
1185
cp->usage[i] += usage;
1186
cp->available_ram[i] -= rp->avp->gpu_ram;
1187
if (log_flags.coproc_debug) {
1188
msg_printf(rp->project, MSG_INFO,
1189
"[coproc] Assigning %f of %s instance %d to %s",
1190
usage, cp->type, i, rp->name
1197
// failing that, assign an unreserved instance
1199
for (i=0; i<cp->count; i++) {
1200
if (cp->available_ram_unknown[i]) {
1203
if (!cp->usage[i]) {
1204
if (rp->avp->gpu_ram > cp->available_ram[i]) {
1208
rp->coproc_indices[0] = i;
1209
cp->usage[i] += usage;
1210
cp->available_ram[i] -= rp->avp->gpu_ram;
1211
if (log_flags.coproc_debug) {
1212
msg_printf(rp->project, MSG_INFO,
1213
"[coproc] Assigning %f of %s free instance %d to %s",
1214
usage, cp->type, i, rp->name
1220
msg_printf(rp->project, MSG_INFO,
1221
"[coproc] Insufficient %s for %s: need %f",
1222
cp->type, rp->name, usage
1228
static inline bool get_integer_assignment(
1229
RESULT* rp, double usage, COPROC* cp, bool& defer_sched
1232
defer_sched = false;
1234
// make sure we have enough free instances
1237
for (i=0; i<cp->count; i++) {
1238
if (cp->available_ram_unknown[i]) {
1241
if (!cp->usage[i]) {
1242
if (rp->avp->gpu_ram > cp->available_ram[i]) {
1249
if (nfree < usage) {
1250
if (log_flags.coproc_debug) {
1251
msg_printf(rp->project, MSG_INFO,
1252
"[coproc] Insufficient %s for %s; need %d, available %d",
1253
cp->type, rp->name, (int)usage, nfree
1256
msg_printf(rp->project, MSG_INFO,
1257
"[coproc] some instances lack available memory"
1266
// assign non-pending instances first
1268
for (i=0; i<cp->count; i++) {
1269
if (cp->available_ram_unknown[i]) {
1273
&& !cp->pending_usage[i]
1274
&& (rp->avp->gpu_ram <= cp->available_ram[i])
1277
cp->available_ram[i] -= rp->avp->gpu_ram;
1278
rp->coproc_indices[n++] = i;
1279
if (log_flags.coproc_debug) {
1280
msg_printf(rp->project, MSG_INFO,
1281
"[coproc] Assigning %s instance %d to %s",
1282
cp->type, i, rp->name
1285
if (n == usage) return true;
1289
// if needed, assign pending instances
1291
for (i=0; i<cp->count; i++) {
1292
if (cp->available_ram_unknown[i]) {
1296
&& (rp->avp->gpu_ram <= cp->available_ram[i])
1299
cp->available_ram[i] -= rp->avp->gpu_ram;
1300
rp->coproc_indices[n++] = i;
1301
if (log_flags.coproc_debug) {
1302
msg_printf(rp->project, MSG_INFO,
1303
"[coproc] Assigning %s pending instance %d to %s",
1304
cp->type, i, rp->name
1307
if (n == usage) return true;
1310
if (log_flags.coproc_debug) {
1311
msg_printf(rp->project, MSG_INFO,
1312
"[coproc] huh??? ran out of %s instances for %s",
1319
static inline void mark_as_defer_sched(RESULT* rp) {
1320
if (rp->uses_cuda()) {
1321
rp->project->cuda_defer_sched = true;
1322
} else if (rp->uses_ati()) {
1323
rp->project->ati_defer_sched = true;
1325
rp->schedule_backoff = gstate.now + 300; // try again in 5 minutes
1326
gstate.request_schedule_cpus("insufficient GPU RAM");
1329
static inline void assign_coprocs(vector<RESULT*>& jobs) {
1334
gstate.host_info.coprocs.clear_usage();
1336
if (gstate.host_info.have_cuda()) {
1337
gstate.host_info.coprocs.cuda.get_available_ram();
1338
if (log_flags.coproc_debug) {
1339
gstate.host_info.coprocs.cuda.print_available_ram();
1342
if (gstate.host_info.have_ati()) {
1343
gstate.host_info.coprocs.ati.get_available_ram();
1344
if (log_flags.coproc_debug) {
1345
gstate.host_info.coprocs.ati.print_available_ram();
1350
// fill in pending usage
1352
for (i=0; i<jobs.size(); i++) {
1353
RESULT* rp = jobs[i];
1354
APP_VERSION* avp = rp->avp;
1356
usage = avp->ncudas;
1357
cp = &gstate.host_info.coprocs.cuda;
1358
} else if (avp->natis) {
1360
cp = &gstate.host_info.coprocs.ati;
1364
ACTIVE_TASK* atp = gstate.lookup_active_task_by_result(rp);
1366
if (atp->task_state() != PROCESS_EXECUTING) continue;
1367
increment_pending_usage(rp, usage, cp);
1370
vector<RESULT*>::iterator job_iter;
1371
job_iter = jobs.begin();
1372
while (job_iter != jobs.end()) {
1373
RESULT* rp = *job_iter;
1374
APP_VERSION* avp = rp->avp;
1376
usage = avp->ncudas;
1377
cp = &gstate.host_info.coprocs.cuda;
1378
} else if (avp->natis) {
1380
cp = &gstate.host_info.coprocs.ati;
1386
ACTIVE_TASK* atp = gstate.lookup_active_task_by_result(rp);
1388
if (atp && atp->task_state() == PROCESS_EXECUTING) {
1389
if (current_assignment_ok(rp, usage, cp, defer_sched)) {
1390
confirm_current_assignment(rp, usage, cp);
1394
mark_as_defer_sched(rp);
1396
job_iter = jobs.erase(job_iter);
1400
if (get_fractional_assignment(rp, usage, cp, defer_sched)) {
1404
mark_as_defer_sched(rp);
1406
job_iter = jobs.erase(job_iter);
1409
if (get_integer_assignment(rp, usage, cp, defer_sched)) {
1413
mark_as_defer_sched(rp);
1415
job_iter = jobs.erase(job_iter);
747
for (i=0; i<preemptable_tasks.size(); i++) {
748
atp = preemptable_tasks[i];
749
msg_printf(0, MSG_INFO, "list %d: %s", i, atp->result->name);
1422
// enforce "don't use GPUs while active" pref in NVIDIA case;
1423
// it applies only to GPUs running a graphics app
1425
if (gstate.host_info.coprocs.cuda.count && gstate.user_active && !gstate.global_prefs.run_gpu_if_user_active) {
1426
job_iter = jobs.begin();
1427
while (job_iter != jobs.end()) {
1428
RESULT* rp = *job_iter;
1429
if (!rp->avp->ncudas) {
1433
ACTIVE_TASK* atp = gstate.lookup_active_task_by_result(rp);
1434
bool some_gpu_busy = false;
1435
for (i=0; i<rp->avp->ncudas; i++) {
1436
int dev = atp->coproc_indices[i];
1437
if (gstate.host_info.coprocs.cuda.running_graphics_app[dev]) {
1438
some_gpu_busy = true;
1442
if (some_gpu_busy) {
1443
job_iter = jobs.erase(job_iter);
769
1467
bool CLIENT_STATE::enforce_schedule() {
771
ACTIVE_TASK* atp, *preempt_atp;
772
1469
vector<ACTIVE_TASK*> preemptable_tasks;
773
1470
static double last_time = 0;
776
bool preempt_by_quit;
1472
double ncpus_used=0, ncpus_used_non_gpu=0;
778
// Do this when requested, and once a minute as a safety net
1475
// NOTE: there's an assumption that debt is adjusted at
1476
// least as often as the CPU sched period (see client_state.h).
1477
// If you remove the following, make changes accordingly
780
if (now - last_time > 60) {
781
must_enforce_cpu_schedule = true;
783
if (!must_enforce_cpu_schedule) return false;
784
must_enforce_cpu_schedule = false;
785
1480
last_time = now;
786
1481
bool action = false;
1484
// check whether GPUs are usable
1486
if (check_coprocs_usable()) {
1487
request_schedule_cpus("GPU usability change");
788
1492
if (log_flags.cpu_sched_debug) {
789
msg_printf(0, MSG_INFO, "[cpu_sched_debug] enforce_schedule(): start");
790
for (i=0; i<ordered_scheduled_results.size(); i++) {
791
RESULT* rp = ordered_scheduled_results[i];
792
msg_printf(rp->project, MSG_INFO,
793
"[cpu_sched_debug] want to run: %s",
799
// set temporary variables
801
for (i=0; i<projects.size(); i++){
802
projects[i]->deadlines_missed = projects[i]->rr_sim_status.deadlines_missed;
805
// Set next_scheduler_state to preempt
1493
msg_printf(0, MSG_INFO, "[cpu_sched] enforce_schedule(): start");
1494
msg_printf(0, MSG_INFO, "[cpu_sched] preliminary job list:");
1495
print_job_list(ordered_scheduled_results);
1498
// Set next_scheduler_state to PREEMPT for all tasks
807
1500
for (i=0; i< active_tasks.active_tasks.size(); i++) {
808
1501
atp = active_tasks.active_tasks[i];
809
1502
atp->next_scheduler_state = CPU_SCHED_PREEMPTED;
812
make_preemptable_task_list(preemptable_tasks, ncpus_used);
1505
// make initial "to-run" list
1507
vector<RESULT*>runnable_jobs;
1508
for (i=0; i<ordered_scheduled_results.size(); i++) {
1509
RESULT* rp = ordered_scheduled_results[i];
1511
rp->unfinished_time_slice = false;
1512
runnable_jobs.push_back(rp);
1515
// append running jobs not done with time slice to the to-run list
1517
append_unfinished_time_slice(runnable_jobs);
1519
// sort to-run list by decreasing importance
1522
runnable_jobs.begin(),
1523
runnable_jobs.end(),
1527
promote_multi_thread_jobs(runnable_jobs);
1529
if (log_flags.cpu_sched_debug) {
1530
msg_printf(0, MSG_INFO, "[cpu_sched] final job list:");
1531
print_job_list(runnable_jobs);
814
1534
double ram_left = available_ram();
1535
double swap_left = (global_prefs.vm_max_used_frac)*host_info.m_swap;
816
1537
if (log_flags.mem_usage_debug) {
817
1538
msg_printf(0, MSG_INFO,
818
"[mem_usage_debug] enforce: available RAM %.2fMB",
1539
"[mem_usage] enforce: available RAM %.2fMB swap %.2fMB",
1540
ram_left/MEGA, swap_left/MEGA
823
// schedule all non CPU intensive tasks
1544
for (i=0; i<projects.size(); i++) {
1545
projects[i]->cuda_defer_sched = false;
1546
projects[i]->ati_defer_sched = false;
1549
// schedule non-CPU-intensive tasks,
1550
// and look for backed-off GPU jobs
825
1552
for (i=0; i<results.size(); i++) {
826
1553
RESULT* rp = results[i];
828
1555
atp = get_task(rp);
829
1556
atp->next_scheduler_state = CPU_SCHED_SCHEDULED;
830
1557
ram_left -= atp->procinfo.working_set_size_smoothed;
834
double swap_left = (global_prefs.vm_max_used_frac)*host_info.m_swap;
837
// see whether we have any coproc jobs, and total their CPU usage
839
double new_ncpus_used = 0;
840
bool have_coproc_job = false;
841
for (i=0; i<ordered_scheduled_results.size(); i++) {
842
RESULT* rp = ordered_scheduled_results[i];
843
if (rp->uses_coprocs()) {
844
have_coproc_job = true;
845
new_ncpus_used += rp->avp->avg_ncpus;
850
// Loop through the jobs we want to schedule.
851
// Invariant: "ncpus_used" is the sum of CPU usage
852
// of tasks with next_scheduler_state == CPU_SCHED_SCHEDULED
853
// (including preemptable jobs).
854
// Win: "new_ncpus_used" is the sum excluding preemptable jobs.
856
for (i=0; i<ordered_scheduled_results.size(); i++) {
857
RESULT* rp = ordered_scheduled_results[i];
858
if (log_flags.cpu_sched_debug) {
859
msg_printf(rp->project, MSG_INFO,
860
"[cpu_sched_debug] processing %s", rp->name
865
// Windows: if we have a coproc job, don't saturate CPUs
867
if (have_coproc_job && !rp->uses_coprocs()) {
868
if (new_ncpus_used + rp->avp->avg_ncpus >= ncpus) continue;
1558
swap_left -= atp->procinfo.swap_size;
1560
if (rp->schedule_backoff) {
1561
if (rp->schedule_backoff > gstate.now) {
1562
if (rp->uses_cuda()) {
1563
rp->project->cuda_defer_sched = true;
1564
} else if (rp->uses_ati()) {
1565
rp->project->ati_defer_sched = true;
1568
rp->schedule_backoff = 0;
1569
request_schedule_cpus("schedule backoff finished");
1574
// assign coprocessors to coproc jobs,
1575
// and prune those that can't be assigned
1577
assign_coprocs(runnable_jobs);
1579
// prune jobs that don't fit in RAM or that exceed CPU usage limits.
1580
// Mark the rest as SCHEDULED
1582
bool running_multithread = false;
1583
for (i=0; i<runnable_jobs.size(); i++) {
1584
RESULT* rp = runnable_jobs[i];
872
1585
atp = lookup_active_task_by_result(rp);
1587
if (!rp->uses_coprocs()) {
1588
// see if we're already using too many CPUs to run this job
1590
if (ncpus_used >= ncpus) {
1591
if (log_flags.cpu_sched_debug) {
1592
msg_printf(rp->project, MSG_INFO,
1593
"[cpu_sched] all CPUs used (%.2f > %d), skipping %s",
1601
// Don't run a multithread app if usage would be #CPUS+1 or more.
1602
// Multithread apps don't run well on an overcommitted system.
1603
// Allow usage of #CPUS + fraction,
1604
// so that a GPU app and a multithread app can run together.
1606
if (rp->avp->avg_ncpus > 1) {
1607
if (ncpus_used_non_gpu && (ncpus_used_non_gpu + rp->avp->avg_ncpus >= ncpus+1)) {
1608
// the "ncpus_used &&" is to allow running a job that uses
1609
// more than ncpus (this can happen in pathological cases)
1611
if (log_flags.cpu_sched_debug) {
1612
msg_printf(rp->project, MSG_INFO,
1613
"[cpu_sched] not enough CPUs for multithread job, skipping %s",
1619
running_multithread = true;
1621
// here for a single-thread app.
1622
// Don't run if we're running a multithread app,
1623
// and running this app would overcommit CPUs.
1625
if (running_multithread) {
1626
if (ncpus_used + 1 > ncpus) {
1627
if (log_flags.cpu_sched_debug) {
1628
msg_printf(rp->project, MSG_INFO,
1629
"[cpu_sched] avoiding overcommit with multithread job, skipping %s",
874
1640
atp->too_large = false;
875
1641
if (atp->procinfo.working_set_size_smoothed > ram_left) {
876
1642
atp->too_large = true;
877
1643
if (log_flags.mem_usage_debug) {
878
1644
msg_printf(rp->project, MSG_INFO,
879
"[mem_usage_debug] enforce: result %s can't run, too big %.2fMB > %.2fMB",
1645
"[mem_usage] enforce: result %s can't run, too big %.2fMB > %.2fMB",
880
1646
rp->name, atp->procinfo.working_set_size_smoothed/MEGA, ram_left/MEGA
887
// Preempt tasks if needed (and possible).
889
bool failed_to_preempt = false;
891
double next_ncpus_used = ncpus_used + rp->avp->avg_ncpus;
892
// the # of CPUs used if we run this job
894
if (log_flags.cpu_sched_debug) {
895
msg_printf(0, MSG_INFO, "ncpus_used %f next_ncpus_used %f",
896
ncpus_used, next_ncpus_used
899
if (!preemptable_tasks.size()) break;
901
if (have_coproc_job) {
902
if (ncpus_used + rp->avp->avg_ncpus < ncpus) break;
904
if (ncpus_used < ncpus) break;
907
if (ncpus_used < ncpus) break;
909
// Preempt the most preemptable task if either
910
// 1) it's completed its time slice and has checkpointed recently
911
// 2) the scheduled result is in deadline trouble
913
preempt_atp = preemptable_tasks.back();
914
if (rp->project->deadlines_missed || finished_time_slice(preempt_atp)) {
915
if (rp->project->deadlines_missed) {
916
rp->project->deadlines_missed--;
918
preempt_atp->next_scheduler_state = CPU_SCHED_PREEMPTED;
919
ncpus_used -= preempt_atp->app_version->avg_ncpus;
920
preemptable_tasks.pop_back();
921
if (log_flags.cpu_sched_debug) {
922
msg_printf(rp->project, MSG_INFO,
923
"[cpu_sched_debug] preempting %s",
924
preempt_atp->result->name
928
if (log_flags.cpu_sched_debug) {
929
msg_printf(rp->project, MSG_INFO,
930
"[cpu_sched_debug] didn't preempt %s: tr %f tsc %f",
931
preempt_atp->result->name,
932
now - preempt_atp->run_interval_start_wall_time,
933
now - preempt_atp->checkpoint_wall_time
936
failed_to_preempt = true;
941
if (failed_to_preempt && !rp->uses_coprocs()) {
1653
if (log_flags.cpu_sched_debug) {
1654
msg_printf(rp->project, MSG_INFO,
1655
"[cpu_sched] scheduling %s", rp->name
945
1659
// We've decided to run this job; create an ACTIVE_TASK if needed.
1665
// don't count CPU usage by GPU jobs
1666
if (!rp->uses_coprocs()) {
1667
ncpus_used_non_gpu += rp->avp->avg_ncpus;
950
1669
ncpus_used += rp->avp->avg_ncpus;
952
if (!rp->uses_coprocs()) {
953
new_ncpus_used += rp->avp->avg_ncpus;
956
1670
atp->next_scheduler_state = CPU_SCHED_SCHEDULED;
957
1671
ram_left -= atp->procinfo.working_set_size_smoothed;
959
if (log_flags.cpu_sched_debug) {
960
msg_printf(0, MSG_INFO,
961
"[cpu_sched_debug] finished preempt loop, ncpus_used %f",
966
// any jobs still in the preemptable list at this point are runnable;
967
// make sure they don't exceed RAM limits
969
for (i=0; i<preemptable_tasks.size(); i++) {
970
atp = preemptable_tasks[i];
971
if (atp->procinfo.working_set_size_smoothed > ram_left) {
972
atp->next_scheduler_state = CPU_SCHED_PREEMPTED;
973
atp->too_large = true;
974
if (log_flags.mem_usage_debug) {
975
msg_printf(atp->result->project, MSG_INFO,
976
"[mem_usage_debug] enforce: result %s can't keep, too big %.2fMB > %.2fMB",
977
atp->result->name, atp->procinfo.working_set_size_smoothed/MEGA, ram_left/MEGA
981
atp->too_large = false;
982
ram_left -= atp->procinfo.working_set_size_smoothed;
986
1674
if (log_flags.cpu_sched_debug && ncpus_used < ncpus) {
987
msg_printf(0, MSG_INFO, "[cpu_sched_debug] using %f out of %d CPUs",
1675
msg_printf(0, MSG_INFO, "[cpu_sched] using %.2f out of %d CPUs",
988
1676
ncpus_used, ncpus
990
1678
if (ncpus_used < ncpus) {
995
1683
bool check_swap = (host_info.m_swap != 0);
996
1684
// in case couldn't measure swap on this host
998
// preempt and start tasks as needed
1686
// TODO: enforcement of swap space is broken right now
1688
// preempt tasks as needed, and note whether there are any coproc jobs
1689
// in QUIT_PENDING state (in which case we won't start new coproc jobs)
1691
bool coproc_quit_pending = false;
1000
1692
for (i=0; i<active_tasks.active_tasks.size(); i++) {
1001
1693
atp = active_tasks.active_tasks[i];
1002
1694
if (log_flags.cpu_sched_debug) {
1003
1695
msg_printf(atp->result->project, MSG_INFO,
1004
"[cpu_sched_debug] %s sched state %d next %d task state %d",
1696
"[cpu_sched] %s sched state %d next %d task state %d",
1005
1697
atp->result->name, atp->scheduler_state,
1006
1698
atp->next_scheduler_state, atp->task_state()
1701
int preempt_type = REMOVE_MAYBE_SCHED;
1009
1702
switch (atp->next_scheduler_state) {
1010
1703
case CPU_SCHED_PREEMPTED:
1011
1704
switch (atp->task_state()) {
1012
1705
case PROCESS_EXECUTING:
1014
preempt_by_quit = !global_prefs.leave_apps_in_memory;
1015
1707
if (check_swap && swap_left < 0) {
1016
1708
if (log_flags.mem_usage_debug) {
1017
1709
msg_printf(atp->result->project, MSG_INFO,
1018
"[mem_usage_debug] out of swap space, will preempt by quit"
1710
"[mem_usage] out of swap space, will preempt by quit"
1021
preempt_by_quit = true;
1713
preempt_type = REMOVE_ALWAYS;
1023
1715
if (atp->too_large) {
1024
1716
if (log_flags.mem_usage_debug) {
1025
1717
msg_printf(atp->result->project, MSG_INFO,
1026
"[mem_usage_debug] job using too much memory, will preempt by quit"
1718
"[mem_usage] job using too much memory, will preempt by quit"
1029
preempt_by_quit = true;
1721
preempt_type = REMOVE_ALWAYS;
1031
atp->preempt(preempt_by_quit);
1723
atp->preempt(preempt_type);
1033
1725
case PROCESS_SUSPENDED:
1034
1726
// Handle the case where user changes prefs from
1035
// "leave in memory" to "remove from memory".
1036
// Need to quit suspended tasks.
1037
if (atp->checkpoint_cpu_time && !global_prefs.leave_apps_in_memory) {
1727
// "leave in memory" to "remove from memory";
1728
// need to quit suspended tasks.
1730
if (atp->checkpoint_cpu_time && !global_prefs.leave_apps_in_memory) {
1731
atp->preempt(REMOVE_ALWAYS);
1042
1735
atp->scheduler_state = CPU_SCHED_PREEMPTED;
1044
case CPU_SCHED_SCHEDULED:
1045
switch (atp->task_state()) {
1046
case PROCESS_UNINITIALIZED:
1047
if (!coprocs.sufficient_coprocs(
1048
atp->app_version->coprocs, log_flags.cpu_sched_debug, "cpu_sched_debug"
1738
if (atp->result->uses_coprocs() && atp->task_state() == PROCESS_QUIT_PENDING) {
1739
coproc_quit_pending = true;
1743
bool coproc_start_deferred = false;
1744
for (i=0; i<active_tasks.active_tasks.size(); i++) {
1745
atp = active_tasks.active_tasks[i];
1746
if (atp->next_scheduler_state != CPU_SCHED_SCHEDULED) continue;
1747
int ts = atp->task_state();
1748
if (ts == PROCESS_UNINITIALIZED || ts == PROCESS_SUSPENDED) {
1749
// If there's a quit pending for a coproc job,
1750
// don't start new ones since they may bomb out
1751
// on memory allocation. Instead, trigger a retry
1753
if (atp->result->uses_coprocs() && coproc_quit_pending) {
1754
coproc_start_deferred = true;
1760
// GPU tasks can get suspended before they're ever run,
1761
// so the only safe way of telling whether this is the
1762
// first time the app is run is to check
1763
// whether the slot dir is empty
1766
first_time = atp->scheduler_state == CPU_SCHED_UNINITIALIZED;
1768
first_time = is_dir_empty(atp->slot_dir);
1770
retval = atp->resume_or_start(first_time);
1771
if ((retval == ERR_SHMGET) || (retval == ERR_SHMAT)) {
1772
// Assume no additional shared memory segs
1773
// will be available in the next 10 seconds
1774
// (run only tasks which are already attached to shared memory).
1776
if (gstate.retry_shmem_time < gstate.now) {
1777
request_schedule_cpus("no more shared memory");
1052
case PROCESS_SUSPENDED:
1054
retval = atp->resume_or_start(
1055
atp->scheduler_state == CPU_SCHED_UNINITIALIZED
1779
gstate.retry_shmem_time = gstate.now + 10.0;
1783
report_result_error(
1784
*(atp->result), "Couldn't start or resume: %d", retval
1057
if ((retval == ERR_SHMGET) || (retval == ERR_SHMAT)) {
1058
// Assume no additional shared memory segs
1059
// will be available in the next 10 seconds
1060
// (run only tasks which are already attached to shared memory).
1062
if (gstate.retry_shmem_time < gstate.now) {
1063
request_schedule_cpus("no more shared memory");
1065
gstate.retry_shmem_time = gstate.now + 10.0;
1069
report_result_error(
1070
*(atp->result), "Couldn't start or resume: %d", retval
1072
request_schedule_cpus("start failed");
1075
atp->run_interval_start_wall_time = now;
1078
atp->scheduler_state = CPU_SCHED_SCHEDULED;
1079
swap_left -= atp->procinfo.swap_size;
1786
request_schedule_cpus("start failed");
1789
if (atp->result->rr_sim_misses_deadline) {
1790
atp->once_ran_edf = true;
1792
atp->run_interval_start_wall_time = now;
1795
if (log_flags.cpu_sched_status) {
1796
msg_printf(atp->result->project, MSG_INFO,
1797
"[css] running %s (%s)",
1798
atp->result->name, atp->result->resources
1801
atp->scheduler_state = CPU_SCHED_SCHEDULED;
1802
swap_left -= atp->procinfo.swap_size;
1084
1805
set_client_state_dirty("enforce_cpu_schedule");
1086
1807
if (log_flags.cpu_sched_debug) {
1087
msg_printf(0, MSG_INFO, "[cpu_sched_debug] enforce_schedule: end");
1808
msg_printf(0, MSG_INFO, "[cpu_sched] enforce_schedule: end");
1810
if (coproc_start_deferred) {
1811
if (log_flags.cpu_sched_debug) {
1812
msg_printf(0, MSG_INFO,
1813
"[cpu_sched] coproc quit pending, deferring start"
1816
request_schedule_cpus("coproc quit retry");
1092
// return true if we don't have enough runnable tasks to keep all CPUs busy
1094
bool CLIENT_STATE::no_work_for_a_cpu() {
1098
for (i=0; i< results.size(); i++){
1099
RESULT* rp = results[i];
1100
if (!rp->nearly_runnable()) continue;
1101
if (rp->project->non_cpu_intensive) continue;
1104
return ncpus > count;
1107
// trigger CPU schedule enforcement.
1108
// Called when a new schedule is computed,
1109
// and when an app checkpoints.
1111
void CLIENT_STATE::request_enforce_schedule(const char* where) {
1112
if (log_flags.cpu_sched_debug) {
1113
msg_printf(0, MSG_INFO, "[cpu_sched_debug] Request enforce CPU schedule: %s", where);
1115
must_enforce_cpu_schedule = true;
1118
1821
// trigger CPU scheduling.
1119
1822
// Called when a result is completed,
1120
1823
// when new results become runnable,