15
15
// To view the GNU Lesser General Public License visit
16
16
// http://www.gnu.org/copyleft/lesser.html
17
17
// or write to the Free Software Foundation, Inc.,
18
// 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20
20
// CPU scheduling logic.
64
70
#define DEADLINE_CUSHION 0
65
71
// try to finish jobs this much in advance of their deadline
73
bool CLIENT_STATE::sufficient_coprocs(APP_VERSION& av) {
74
for (unsigned int i=0; i<av.coprocs.coprocs.size(); i++) {
75
COPROC* cp = av.coprocs.coprocs[i];
76
COPROC* cp2 = coprocs.lookup(cp->type);
78
msg_printf(av.project, MSG_INFO,
79
"Missing a %s coprocessor", cp->type
83
if (cp2->used + cp->count > cp2->count) {
84
if (log_flags.cpu_sched_debug) {
85
msg_printf(NULL, MSG_INFO,
86
"[cpu_sched_debug] insufficient coproc %s (%d + %d > %d)",
87
cp2->type, cp2->used, cp->count, cp2->count
96
void CLIENT_STATE::reserve_coprocs(APP_VERSION& av) {
97
for (unsigned int i=0; i<av.coprocs.coprocs.size(); i++) {
98
COPROC* cp = av.coprocs.coprocs[i];
99
COPROC* cp2 = coprocs.lookup(cp->type);
101
if (log_flags.cpu_sched_debug) {
102
msg_printf(NULL, MSG_INFO,
103
"[cpu_sched_debug] reserving %d of coproc %s",
107
cp2->used += cp->count;
111
void CLIENT_STATE::free_coprocs(APP_VERSION& av) {
112
for (unsigned int i=0; i<av.coprocs.coprocs.size(); i++) {
113
COPROC* cp = av.coprocs.coprocs[i];
114
COPROC* cp2 = coprocs.lookup(cp->type);
116
if (log_flags.cpu_sched_debug) {
117
msg_printf(NULL, MSG_INFO,
118
"[cpu_sched_debug] freeing %d of coproc %s",
122
cp2->used -= cp->count;
67
126
static bool more_preemptable(ACTIVE_TASK* t0, ACTIVE_TASK* t1) {
68
127
// returning true means t1 is more preemptable than t0,
69
128
// the "largest" result is at the front of a heap,
205
264
if (!rp->runnable()) continue;
206
265
if (rp->project->non_cpu_intensive) continue;
207
266
if (rp->already_selected) continue;
208
if (!rp->project->deadlines_missed) continue;
267
if (!rp->project->deadlines_missed && rp->project->duration_correction_factor < 90.0) continue;
268
// treat projects with DCF>90 as if they had deadline misses
210
if (!best_result || rp->report_deadline<best_result->report_deadline) {
270
bool new_best = false;
272
if (rp->report_deadline < best_result->report_deadline) {
211
279
best_result = rp;
212
280
best_atp = lookup_active_task_by_result(rp);
397
466
// Decide whether to run the CPU scheduler.
398
467
// This is called periodically.
399
// Scheduled tasks are placed in order of urgency for scheduling in the ordered_scheduled_results vector
468
// Scheduled tasks are placed in order of urgency for scheduling
469
// in the ordered_scheduled_results vector
401
471
bool CLIENT_STATE::possibly_schedule_cpus() {
402
472
double elapsed_time;
522
static bool schedule_if_possible(
523
RESULT* rp, double& ncpus_used, double& ram_left, double rrs, double expected_payoff
527
atp = gstate.lookup_active_task_by_result(rp);
528
if (!atp || atp->task_state() == PROCESS_UNINITIALIZED) {
529
if (!gstate.sufficient_coprocs(*rp->avp)) {
530
if (log_flags.cpu_sched_debug) {
531
msg_printf(rp->project, MSG_INFO,
532
"[cpu_sched_debug] insufficient coprocessors for %s", rp->name
539
// see if it fits in available RAM
541
if (atp->procinfo.working_set_size_smoothed > ram_left) {
542
if (log_flags.cpu_sched_debug) {
543
msg_printf(rp->project, MSG_INFO,
544
"[cpu_sched_debug] %s misses deadline but too large: %.2fMB",
545
rp->name, atp->procinfo.working_set_size_smoothed/MEGA
548
atp->too_large = true;
551
atp->too_large = false;
553
if (gstate.retry_shmem_time > gstate.now) {
554
if (atp->app_client_shm.shm == NULL) {
555
atp->needs_shmem = true;
558
atp->needs_shmem = false;
560
ram_left -= atp->procinfo.working_set_size_smoothed;
562
if (log_flags.cpu_sched_debug) {
563
msg_printf(rp->project, MSG_INFO,
564
"[cpu_sched_debug] scheduling %s",
568
ncpus_used += rp->avp->avg_ncpus;
569
rp->project->anticipated_debt -= (rp->project->resource_share / rrs) * expected_payoff;
452
573
// CPU scheduler - decide which results to run.
453
574
// output: sets ordered_scheduled_result.
455
576
void CLIENT_STATE::schedule_cpus() {
459
double expected_pay_off;
579
double expected_payoff;
461
581
double rrs = runnable_resource_share();
463
584
if (log_flags.cpu_sched_debug) {
464
585
msg_printf(0, MSG_INFO, "[cpu_sched_debug] schedule_cpus(): start");
467
// do round-robin simulation to find what results miss deadline,
588
// do round-robin simulation to find what results miss deadline
470
591
if (log_flags.cpu_sched_debug) {
490
611
active_tasks.active_tasks[i]->too_large = false;
493
expected_pay_off = global_prefs.cpu_scheduling_period();
614
expected_payoff = global_prefs.cpu_scheduling_period();
494
615
ordered_scheduled_results.clear();
495
616
double ram_left = available_ram();
500
621
if (!cpu_sched_rr_only) {
502
while ((int)ordered_scheduled_results.size() < ncpus) {
624
while (ncpus_used < ncpus) {
503
625
rp = earliest_deadline_result();
505
627
rp->already_selected = true;
507
// see if it fits in available RAM
509
atp = lookup_active_task_by_result(rp);
511
if (atp->procinfo.working_set_size_smoothed > ram_left) {
512
if (log_flags.cpu_sched_debug) {
513
msg_printf(rp->project, MSG_INFO,
514
"[cpu_sched_debug] %s misses deadline but too large: %.2fMB",
515
rp->name, atp->procinfo.working_set_size_smoothed/MEGA
518
atp->too_large = true;
521
atp->too_large = false;
523
// TODO: merge this chunk of code with its clone
524
if (gstate.retry_shmem_time > gstate.now) {
525
if (atp->app_client_shm.shm == NULL) {
526
atp->needs_shmem = true;
529
atp->needs_shmem = false;
531
ram_left -= atp->procinfo.working_set_size_smoothed;
629
if (!schedule_if_possible(rp, ncpus_used, ram_left, rrs, expected_payoff)) continue;
534
rp->project->anticipated_debt -= (rp->project->resource_share / rrs) * expected_pay_off;
535
631
rp->project->deadlines_missed--;
536
632
rp->edf_scheduled = true;
537
if (log_flags.cpu_sched_debug) {
538
msg_printf(rp->project, MSG_INFO,
539
"[cpu_sched_debug] scheduling (deadline) %s",
543
633
ordered_scheduled_results.push_back(rp);
549
639
// Next, choose results from projects with large debt
551
while ((int)ordered_scheduled_results.size() < ncpus) {
641
while (ncpus_used < ncpus) {
552
642
assign_results_to_projects();
553
643
rp = largest_debt_project_best_result();
555
atp = lookup_active_task_by_result(rp);
557
if (atp->procinfo.working_set_size_smoothed > ram_left) {
558
if (log_flags.cpu_sched_debug) {
559
msg_printf(NULL, MSG_INFO,
560
"[cpu_sched_debug] %s too large: %.2fMB",
561
rp->name, atp->procinfo.working_set_size_smoothed/MEGA
564
atp->too_large = true;
567
atp->too_large = false;
569
// don't select if it would need a new shared-mem seg
570
// and we're out of them
572
if (gstate.retry_shmem_time > gstate.now) {
573
if (atp->app_client_shm.shm == NULL) {
574
atp->needs_shmem = true;
577
atp->needs_shmem = false;
579
ram_left -= atp->procinfo.working_set_size_smoothed;
581
double xx = (rp->project->resource_share / rrs) * expected_pay_off;
582
rp->project->anticipated_debt -= xx;
583
if (log_flags.cpu_sched_debug) {
584
msg_printf(NULL, MSG_INFO, "[cpu_sched_debug] scheduling (regular) %s", rp->name);
645
if (!schedule_if_possible(rp, ncpus_used, ram_left, rrs, expected_payoff)) continue;
586
646
ordered_scheduled_results.push_back(rp);
590
650
set_client_state_dirty("schedule_cpus");
593
// make a list of preemptable tasks, ordered by their preemptability.
653
// make a list of running tasks, ordered by their preemptability.
595
655
void CLIENT_STATE::make_running_task_heap(
596
vector<ACTIVE_TASK*> &running_tasks
656
vector<ACTIVE_TASK*> &running_tasks, double& ncpus_used
599
659
ACTIVE_TASK* atp;
601
662
for (i=0; i<active_tasks.active_tasks.size(); i++) {
602
663
atp = active_tasks.active_tasks[i];
603
664
if (atp->result->project->non_cpu_intensive) continue;
604
665
if (!atp->result->runnable()) continue;
605
666
if (atp->scheduler_state != CPU_SCHED_SCHEDULED) continue;
606
667
running_tasks.push_back(atp);
668
ncpus_used += atp->app_version->avg_ncpus;
674
// make list of currently running tasks
737
// make heap of currently running tasks, ordered by preemptibility
676
make_running_task_heap(running_tasks);
739
make_running_task_heap(running_tasks, ncpus_used);
678
741
// if there are more running tasks than ncpus,
679
742
// then mark the extras for preemption
681
while (running_tasks.size() > (unsigned int)ncpus) {
682
running_tasks[0]->next_scheduler_state = CPU_SCHED_PREEMPTED;
744
while (ncpus_used > ncpus) {
745
atp = running_tasks[0];
746
atp->next_scheduler_state = CPU_SCHED_PREEMPTED;
747
ncpus_used -= atp->app_version->avg_ncpus;
684
749
running_tasks.begin(),
685
750
running_tasks.end(),
700
// keep track of how many tasks we plan on running
701
// (i.e. have next_scheduler_state = SCHEDULED)
703
int nrunning = (int)running_tasks.size();
705
765
// Loop through the scheduled results
706
// to see if they should preempt a running task
708
767
for (i=0; i<ordered_scheduled_results.size(); i++) {
709
768
RESULT* rp = ordered_scheduled_results[i];
794
// if it's already running, see if it fits in mem;
795
// If not, flag for preemption
735
798
if (log_flags.cpu_sched_debug) {
736
799
msg_printf(rp->project, MSG_INFO,
741
// the scheduled result is already running.
742
// see if it fits in mem
744
804
if (atp->procinfo.working_set_size_smoothed > ram_left) {
745
805
atp->next_scheduler_state = CPU_SCHED_PREEMPTED;
746
806
atp->too_large = true;
807
ncpus_used -= atp->app_version->avg_ncpus;
748
808
if (log_flags.mem_usage_debug) {
749
809
msg_printf(rp->project, MSG_INFO,
750
810
"[mem_usage_debug] enforce: result %s can't continue, too big %.2fMB > %.2fMB",
761
821
// Here if the result is not already running.
762
// If it already has a (non-running) active task,
763
// see if it fits in mem
822
// If it already has an active task and won't fit in mem, skip it
765
824
atp = lookup_active_task_by_result(rp);
781
// The scheduled result is not already running.
782
// Preempt something if needed and possible.
840
// Preempt something if needed (and possible).
784
842
bool run_task = false;
785
bool need_to_preempt = (nrunning==ncpus) && running_tasks.size();
843
bool need_to_preempt = (ncpus_used >= ncpus) && running_tasks.size();
786
844
// the 2nd half of the above is redundant
787
845
if (need_to_preempt) {
788
846
// examine the most preemptable task.
831
889
atp = get_task(rp);
832
890
atp->next_scheduler_state = CPU_SCHED_SCHEDULED;
891
ncpus_used += atp->app_version->avg_ncpus;
834
892
ram_left -= atp->procinfo.working_set_size_smoothed;
837
895
if (log_flags.cpu_sched_debug) {
838
896
msg_printf(0, MSG_INFO,
839
"[cpu_sched_debug] finished preempt loop, nrunning %d",
897
"[cpu_sched_debug] finished preempt loop, ncpus_used %f",
863
if (log_flags.cpu_sched_debug && nrunning < ncpus) {
864
msg_printf(0, MSG_INFO, "[cpu_sched_debug] Some CPUs idle (%d<%d)",
867
request_work_fetch("CPUs idle");
869
if (log_flags.cpu_sched_debug && nrunning > ncpus) {
870
msg_printf(0, MSG_INFO, "[cpu_sched_debug] Too many tasks started (%d>%d)",
921
if (log_flags.cpu_sched_debug && ncpus_used < ncpus) {
922
msg_printf(0, MSG_INFO, "[cpu_sched_debug] using %f out of %d CPUs",
925
if (ncpus_used < ncpus) {
926
request_work_fetch("CPUs idle");
875
930
// schedule new non CPU intensive tasks
927
982
case CPU_SCHED_SCHEDULED:
928
983
switch (atp->task_state()) {
929
984
case PROCESS_UNINITIALIZED:
985
if (!sufficient_coprocs(*atp->app_version)) continue;
930
986
case PROCESS_SUSPENDED:
932
988
retval = atp->resume_or_start(
1463
1519
if (config.ncpus>0) {
1464
1520
ncpus = config.ncpus;
1465
1521
} else if (host_info.p_ncpus>0) {
1466
ncpus = host_info.p_ncpus;
1522
ncpus = (int)((host_info.p_ncpus * global_prefs.max_ncpus_pct)/100);
1523
if (ncpus == 0) ncpus = 1;
1470
if (ncpus > global_prefs.max_cpus) ncpus = global_prefs.max_cpus;
1472
1528
if (initialized && ncpus != ncpus_old) {
1473
1529
msg_printf(0, MSG_INFO,