~ubuntu-branches/ubuntu/precise/boinc/precise

« back to all changes in this revision

Viewing changes to client/cpu_sched.cpp

Tags: 6.12.8+dfsg-1
* New upstream release.
* Simplified debian/rules

Show diffs side-by-side

added added

removed removed

Lines of Context:
35
35
// If an app is running (not suspended), the interval
36
36
// during which it's been running.
37
37
 
 
38
#include "cpp.h"
 
39
 
38
40
#ifdef _WIN32
39
41
#include "boinc_win.h"
40
 
#endif
41
 
 
 
42
#include "win_util.h"
 
43
#else
 
44
#include "config.h"
42
45
#include <string>
43
46
#include <cstring>
44
 
 
 
47
#include <list>
 
48
#endif
 
49
 
 
50
 
 
51
#include "coproc.h"
 
52
#include "error_numbers.h"
 
53
#include "filesys.h"
45
54
#include "str_util.h"
46
55
#include "util.h"
47
 
#include "error_numbers.h"
48
 
#include "coproc.h"
49
56
 
50
57
#include "client_msgs.h"
51
58
#include "log_flags.h"
 
59
#include "app.h"
52
60
 
53
 
#ifdef SIM
54
 
#include "sim.h"
55
 
#else
56
61
#include "client_state.h"
57
 
#endif
58
62
 
59
63
using std::vector;
60
 
 
61
 
#define MAX_STD   (86400)
62
 
    // maximum short-term debt
 
64
using std::list;
63
65
 
64
66
#define DEADLINE_CUSHION    0
65
67
    // try to finish jobs this much in advance of their deadline
66
68
 
67
 
bool COPROCS::sufficient_coprocs(COPROCS& needed, bool log_flag, const char* prefix) {
68
 
    for (unsigned int i=0; i<needed.coprocs.size(); i++) {
69
 
        COPROC* cp = needed.coprocs[i];
70
 
        COPROC* cp2 = lookup(cp->type);
71
 
        if (!cp2) {
72
 
            msg_printf(NULL, MSG_INTERNAL_ERROR,
73
 
                "Missing a %s coprocessor", cp->type
74
 
            );
75
 
            return false;
76
 
        }
77
 
        if (cp2->used + cp->count > cp2->count) {
78
 
                        if (log_flag) {
79
 
                                msg_printf(NULL, MSG_INFO,
80
 
                                        "[%s] rr_sim: insufficient coproc %s (%d + %d > %d)",
81
 
                                        prefix, cp2->type, cp2->used, cp->count, cp2->count
82
 
                                );
83
 
                        }
84
 
            return false;
85
 
        }
86
 
    }
87
 
    return true;
88
 
}
89
 
 
90
 
void COPROCS::reserve_coprocs(COPROCS& needed, void* owner, bool log_flag, const char* prefix) {
91
 
    for (unsigned int i=0; i<needed.coprocs.size(); i++) {
92
 
        COPROC* cp = needed.coprocs[i];
93
 
        COPROC* cp2 = lookup(cp->type);
94
 
        if (!cp2) {
95
 
            msg_printf(NULL, MSG_INTERNAL_ERROR,
96
 
                "Coproc type %s not found", cp->type
97
 
            );
98
 
            continue;
99
 
        }
100
 
                if (log_flag) {
101
 
                        msg_printf(NULL, MSG_INFO,
102
 
                                "[%s] reserving %d of coproc %s", prefix, cp->count, cp2->type
103
 
                        );
104
 
                }
105
 
        cp2->used += cp->count;
106
 
        int n = cp->count;
107
 
        for (int j=0; j<cp2->count; j++) {
108
 
            if (!cp2->owner[j]) {
109
 
                cp2->owner[j] = owner;
110
 
                n--;
111
 
                if (!n) break;
112
 
            }
113
 
        }
114
 
    }
115
 
}
116
 
 
117
 
void COPROCS::free_coprocs(COPROCS& needed, void* owner, bool log_flag, const char* prefix) {
118
 
    for (unsigned int i=0; i<needed.coprocs.size(); i++) {
119
 
        COPROC* cp = needed.coprocs[i];
120
 
        COPROC* cp2 = lookup(cp->type);
121
 
        if (!cp2) continue;
122
 
                if (log_flag) {
123
 
                        msg_printf(NULL, MSG_INFO,
124
 
                                "[%s] freeing %d of coproc %s", prefix, cp->count, cp2->type
125
 
                        );
126
 
                }
127
 
        cp2->used -= cp->count;
128
 
        for (int j=0; j<cp2->count; j++) {
129
 
            if (cp2->owner[j] == owner) {
130
 
                cp2->owner[j] = 0;
131
 
            }
132
 
        }
133
 
    }
134
 
}
 
69
// used in schedule_cpus() to keep track of resources used
 
70
// by jobs tentatively scheduled so far
 
71
//
 
72
struct PROC_RESOURCES {
 
73
    int ncpus;
 
74
    double ncpus_used_st;   // #CPUs of GPU or single-thread jobs
 
75
    double ncpus_used_mt;   // #CPUs of multi-thread jobs
 
76
    COPROCS coprocs;
 
77
 
 
78
    // should we stop scanning jobs?
 
79
    //
 
80
    inline bool stop_scan_cpu() {
 
81
        return ncpus_used_st >= ncpus;
 
82
    }
 
83
 
 
84
    inline bool stop_scan_coproc(int rsc_type) {
 
85
        if (rsc_type == RSC_TYPE_CUDA) {
 
86
            return coprocs.cuda.used >= coprocs.cuda.count;
 
87
        }
 
88
        return coprocs.ati.used >= coprocs.ati.count;
 
89
    }
 
90
 
 
91
    // should we consider scheduling this job?
 
92
    //
 
93
    bool can_schedule(RESULT* rp) {
 
94
        if (rp->schedule_backoff > gstate.now) return false;
 
95
        if (rp->uses_coprocs()) {
 
96
            if (gpu_suspend_reason) return false;
 
97
            if (sufficient_coprocs(*rp->avp, log_flags.cpu_sched_debug)) {
 
98
                return true;
 
99
            } else {
 
100
                if (log_flags.cpu_sched_debug) {
 
101
                    msg_printf(rp->project, MSG_INFO,
 
102
                        "[cpu_sched] insufficient coprocessors for %s", rp->name
 
103
                    );
 
104
                }
 
105
                return false;
 
106
            }
 
107
        } else if (rp->avp->avg_ncpus > 1) {
 
108
            return (ncpus_used_mt + rp->avp->avg_ncpus <= ncpus);
 
109
        } else {
 
110
            return (ncpus_used_st < ncpus);
 
111
        }
 
112
    }
 
113
 
 
114
    // we've decided to run this - update bookkeeping
 
115
    //
 
116
    void schedule(RESULT* rp) {
 
117
        reserve_coprocs(
 
118
            *rp->avp, log_flags.cpu_sched_debug, "cpu_sched_debug"
 
119
        );
 
120
        if (rp->uses_coprocs()) {
 
121
            ncpus_used_st += rp->avp->avg_ncpus;
 
122
        } else if (rp->avp->avg_ncpus > 1) {
 
123
            ncpus_used_mt += rp->avp->avg_ncpus;
 
124
        } else {
 
125
            ncpus_used_st += rp->avp->avg_ncpus;
 
126
        }
 
127
    }
 
128
 
 
129
    bool sufficient_coprocs(APP_VERSION& av, bool log_flag) {
 
130
        double x;
 
131
        COPROC* cp2;
 
132
        if (av.ncudas) {
 
133
            x = av.ncudas;
 
134
            cp2 = &coprocs.cuda;
 
135
        } else if (av.natis) {
 
136
            x = av.natis;
 
137
            cp2 = &coprocs.ati;
 
138
        } else {
 
139
            return true;
 
140
        }
 
141
        if (!cp2->count) {
 
142
            msg_printf(NULL, MSG_INTERNAL_ERROR,
 
143
                "Missing a %s coprocessor", cp2->type
 
144
            );
 
145
            return false;
 
146
        }
 
147
        if (cp2->used + x > cp2->count) {
 
148
            if (log_flag) {
 
149
                msg_printf(NULL, MSG_INFO,
 
150
                    "[cpu_sched] insufficient coproc %s (%f + %f > %d)",
 
151
                    cp2->type, cp2->used, x, cp2->count
 
152
                );
 
153
            }
 
154
            return false;
 
155
        }
 
156
        return true;
 
157
    }
 
158
 
 
159
    void reserve_coprocs(
 
160
        APP_VERSION& av, bool log_flag, const char* prefix
 
161
    ) {
 
162
        double x;
 
163
        COPROC* cp2;
 
164
        if (av.ncudas) {
 
165
            x = av.ncudas;
 
166
            cp2 = &coprocs.cuda;
 
167
        } else if (av.natis) {
 
168
            x = av.natis;
 
169
            cp2 = &coprocs.ati;
 
170
        } else {
 
171
            return;
 
172
        }
 
173
        if (!cp2) {
 
174
            msg_printf(NULL, MSG_INTERNAL_ERROR,
 
175
                "Coproc type %s not found", cp2->type
 
176
            );
 
177
            return;
 
178
        }
 
179
        if (log_flag) {
 
180
            msg_printf(NULL, MSG_INFO,
 
181
                "[%s] reserving %f of coproc %s", prefix, x, cp2->type
 
182
            );
 
183
        }
 
184
        cp2->used += x;
 
185
    }
 
186
};
 
187
 
 
188
bool gpus_usable = true;
 
189
#ifndef SIM
 
190
// see whether there's been a change in coproc usability;
 
191
// if so set or clear "coproc_missing" flags and return true.
 
192
//
 
193
bool check_coprocs_usable() {
 
194
#ifdef _WIN32
 
195
    unsigned int i;
 
196
    bool new_usable = !is_remote_desktop();
 
197
    if (gpus_usable) {
 
198
        if (!new_usable) {
 
199
            gpus_usable = false;
 
200
            for (i=0; i<gstate.results.size(); i++) {
 
201
                RESULT* rp = gstate.results[i];
 
202
                if (rp->avp->ncudas || rp->avp->natis) {
 
203
                    rp->coproc_missing = true;
 
204
                }
 
205
            }
 
206
            msg_printf(NULL, MSG_INFO,
 
207
                "GPUs have become unusable; disabling tasks"
 
208
            );
 
209
            return true;
 
210
        }
 
211
    } else {
 
212
        if (new_usable) {
 
213
            gpus_usable = true;
 
214
            for (i=0; i<gstate.results.size(); i++) {
 
215
                RESULT* rp = gstate.results[i];
 
216
                if (rp->avp->ncudas || rp->avp->natis) {
 
217
                    rp->coproc_missing = false;
 
218
                }
 
219
            }
 
220
            msg_printf(NULL, MSG_INFO,
 
221
                "GPUs have become usable; enabling tasks"
 
222
            );
 
223
            return true;
 
224
        }
 
225
    }
 
226
#endif
 
227
    return false;
 
228
}
 
229
#endif
135
230
 
136
231
// return true if the task has finished its time slice
137
232
// and has checkpointed in last 10 secs
144
239
    return (running_beyond_sched_period && checkpointed_recently);
145
240
}
146
241
 
147
 
// Choose a "best" runnable result for each project
 
242
// Choose a "best" runnable CPU job for each project
148
243
//
149
244
// Values are returned in project->next_runnable_result
150
245
// (skip projects for which this is already non-NULL)
173
268
        if (!atp->runnable()) continue;
174
269
        rp = atp->result;
175
270
        if (rp->already_selected) continue;
 
271
        if (rp->uses_coprocs()) continue;
176
272
        if (!rp->runnable()) continue;
177
273
        project = rp->project;
178
274
        if (!project->next_runnable_result) {
200
296
    for (i=0; i<results.size(); i++) {
201
297
        rp = results[i];
202
298
        if (rp->already_selected) continue;
 
299
        if (rp->uses_coprocs()) continue;
203
300
        if (lookup_active_task_by_result(rp)) continue;
204
301
        if (!rp->runnable()) continue;
205
302
 
225
322
//
226
323
RESULT* CLIENT_STATE::largest_debt_project_best_result() {
227
324
    PROJECT *best_project = NULL;
228
 
    double best_debt = -MAX_STD;
 
325
    double best_debt = 0;
229
326
    bool first = true;
230
327
    unsigned int i;
231
328
 
233
330
        PROJECT* p = projects[i];
234
331
        if (!p->next_runnable_result) continue;
235
332
        if (p->non_cpu_intensive) continue;
236
 
        if (first || p->anticipated_debt > best_debt) {
237
 
            first = false;
238
 
            best_project = p;
239
 
            best_debt = p->anticipated_debt;
240
 
        }
 
333
#ifdef USE_REC
 
334
        if (first || project_priority(p)> best_debt) {
 
335
            first = false;
 
336
            best_project = p;
 
337
            best_debt = project_priority(p);
 
338
        }
 
339
#else
 
340
        if (first || p->cpu_pwf.anticipated_debt > best_debt) {
 
341
            first = false;
 
342
            best_project = p;
 
343
            best_debt = p->cpu_pwf.anticipated_debt;
 
344
        }
 
345
#endif
241
346
    }
242
347
    if (!best_project) return NULL;
243
348
 
 
349
#ifndef USE_REC
244
350
    if (log_flags.cpu_sched_debug) {
245
351
        msg_printf(best_project, MSG_INFO,
246
 
            "[cpu_sched_debug] highest debt: %f %s",
247
 
            best_project->anticipated_debt,
 
352
            "[cpu_sched] highest debt: %f %s",
 
353
            best_project->cpu_pwf.anticipated_debt,
248
354
            best_project->next_runnable_result->name
249
355
        );
250
356
    }
 
357
#endif
251
358
    RESULT* rp = best_project->next_runnable_result;
252
359
    best_project->next_runnable_result = 0;
253
360
    return rp;
254
361
}
255
362
 
256
 
// Return earliest-deadline result from a project with deadlines_missed>0
257
 
//
258
 
RESULT* CLIENT_STATE::earliest_deadline_result() {
 
363
// Return a job of the given type according to the following criteria
 
364
// (desc priority):
 
365
//  - from project with higher STD for that resource
 
366
//  - already-started job
 
367
//  - earlier received_time
 
368
//  - lexicographically earlier name
 
369
//
 
370
// Give priority to already-started jobs because of the following scenario:
 
371
// - client gets several jobs in a sched reply and starts downloading files
 
372
// - a later job finishes downloading and starts
 
373
// - an earlier finishes downloading and preempts
 
374
//
 
375
RESULT* first_coproc_result(int rsc_type) {
 
376
    unsigned int i;
 
377
    RESULT* best = NULL;
 
378
    double best_std=0;
 
379
    for (i=0; i<gstate.results.size(); i++) {
 
380
        RESULT* rp = gstate.results[i];
 
381
        if (rp->resource_type() != rsc_type) continue;
 
382
        if (!rp->runnable()) continue;
 
383
        if (rp->project->non_cpu_intensive) continue;
 
384
        if (rp->already_selected) continue;
 
385
#ifdef USE_REC
 
386
        double std = project_priority(rp->project);
 
387
#else
 
388
        double std = rp->project->anticipated_debt(rsc_type);
 
389
#endif
 
390
        if (!best) {
 
391
            best = rp;
 
392
            best_std = std;
 
393
            continue;
 
394
        }
 
395
 
 
396
        if (std < best_std) {
 
397
            continue;
 
398
        }
 
399
        if (std > best_std) {
 
400
            best = rp;
 
401
            best_std = std;
 
402
            continue;
 
403
        }
 
404
 
 
405
        bool bs = !best->not_started();
 
406
        bool rs = !rp->not_started();
 
407
        if (rs && !bs) {
 
408
            best = rp;
 
409
            best_std = std;
 
410
            continue;
 
411
        }
 
412
        if (!rs && bs) {
 
413
            continue;
 
414
        }
 
415
        if (rp->received_time < best->received_time) {
 
416
            best = rp;
 
417
            best_std = std;
 
418
        } else if (rp->received_time == best->received_time) {
 
419
            // make it deterministic by looking at name
 
420
            //
 
421
            if (strcmp(rp->name, best->name) > 0) {
 
422
                best = rp;
 
423
                best_std = std;
 
424
            }
 
425
        }
 
426
    }
 
427
    return best;
 
428
}
 
429
 
 
430
// Return earliest-deadline result for given resource type;
 
431
// return only results projected to miss their deadline,
 
432
// or from projects with extreme DCF
 
433
//
 
434
static RESULT* earliest_deadline_result(int rsc_type) {
259
435
    RESULT *best_result = NULL;
260
436
    ACTIVE_TASK* best_atp = NULL;
261
437
    unsigned int i;
262
438
 
263
 
    for (i=0; i<results.size(); i++) {
264
 
        RESULT* rp = results[i];
 
439
    for (i=0; i<gstate.results.size(); i++) {
 
440
        RESULT* rp = gstate.results[i];
 
441
        if (rp->resource_type() != rsc_type) continue;
 
442
        if (rp->already_selected) continue;
265
443
        if (!rp->runnable()) continue;
266
 
        if (rp->project->non_cpu_intensive) continue;
267
 
        if (rp->already_selected) continue;
268
 
        if (!rp->project->deadlines_missed && rp->project->duration_correction_factor < 90.0) continue;
269
 
            // treat projects with DCF>90 as if they had deadline misses
270
 
 
 
444
        PROJECT* p = rp->project;
 
445
        if (p->non_cpu_intensive) continue;
 
446
 
 
447
        bool only_deadline_misses = true;
 
448
 
 
449
        // treat projects with DCF>90 as if they had deadline misses
 
450
        //
 
451
        if (p->duration_correction_factor < 90.0) {
 
452
            int d;
 
453
            switch (rsc_type) {
 
454
            case RSC_TYPE_CUDA:
 
455
                d = p->cuda_pwf.deadlines_missed_copy;
 
456
                break;
 
457
            case RSC_TYPE_ATI:
 
458
                d = p->ati_pwf.deadlines_missed_copy;
 
459
                break;
 
460
            default:
 
461
                d = p->cpu_pwf.deadlines_missed_copy;
 
462
            }
 
463
            if (!d) {
 
464
                continue;
 
465
            }
 
466
        } else {
 
467
            only_deadline_misses = false;
 
468
        }
 
469
        
 
470
        if (only_deadline_misses && !rp->rr_sim_misses_deadline) {
 
471
            continue;
 
472
        }
271
473
        bool new_best = false;
272
474
        if (best_result) {
273
475
            if (rp->report_deadline < best_result->report_deadline) {
278
480
        }
279
481
        if (new_best) {
280
482
            best_result = rp;
281
 
            best_atp = lookup_active_task_by_result(rp);
 
483
            best_atp = gstate.lookup_active_task_by_result(rp);
282
484
            continue;
283
485
        }
284
486
        if (rp->report_deadline > best_result->report_deadline) {
285
487
            continue;
286
488
        }
287
489
 
288
 
        // If there's a tie, pick the job with the least remaining CPU time
 
490
        // If there's a tie, pick the job with the least remaining time
289
491
        // (but don't pick an unstarted job over one that's started)
290
492
        //
291
 
        ACTIVE_TASK* atp = lookup_active_task_by_result(rp);
 
493
        ACTIVE_TASK* atp = gstate.lookup_active_task_by_result(rp);
292
494
        if (best_atp && !atp) continue;
293
 
        if (rp->estimated_time_remaining(false)
294
 
            < best_result->estimated_time_remaining(false)
 
495
        if (rp->estimated_time_remaining() < best_result->estimated_time_remaining()
295
496
            || (!best_atp && atp)
296
497
        ) {
297
498
            best_result = rp;
302
503
 
303
504
    if (log_flags.cpu_sched_debug) {
304
505
        msg_printf(best_result->project, MSG_INFO,
305
 
            "[cpu_sched_debug] earliest deadline: %f %s",
 
506
            "[cpu_sched] earliest deadline: %.0f %s",
306
507
            best_result->report_deadline, best_result->name
307
508
        );
308
509
    }
314
515
    unsigned int i;
315
516
    for (i=0; i<projects.size(); i++) {
316
517
        PROJECT* p = projects[i];
317
 
        p->wall_cpu_time_this_debt_interval = 0.0;
318
 
    }
319
 
    total_wall_cpu_time_this_debt_interval = 0.0;
 
518
        p->cpu_pwf.reset_debt_accounting();
 
519
        if (host_info.have_cuda()) {
 
520
            p->cuda_pwf.reset_debt_accounting();
 
521
        }
 
522
        if (host_info.have_ati()) {
 
523
            p->ati_pwf.reset_debt_accounting();
 
524
        }
 
525
    }
 
526
    cpu_work_fetch.reset_debt_accounting();
 
527
    if (host_info.have_cuda()) {
 
528
        cuda_work_fetch.reset_debt_accounting();
 
529
    }
 
530
    if (host_info.have_ati()) {
 
531
        ati_work_fetch.reset_debt_accounting();
 
532
    }
320
533
    debt_interval_start = now;
321
534
}
322
535
 
 
536
#ifdef USE_REC
 
537
 
 
538
// update REC (recent estimated credit)
 
539
//
 
540
static void update_rec() {
 
541
    double f = gstate.host_info.p_fpops;
 
542
 
 
543
    for (unsigned int i=0; i<gstate.projects.size(); i++) {
 
544
        PROJECT* p = gstate.projects[i];
 
545
        double x = p->cpu_pwf.secs_this_debt_interval * f;
 
546
        if (gstate.host_info.have_cuda()) {
 
547
            x += p->cuda_pwf.secs_this_debt_interval * f * cuda_work_fetch.relative_speed;
 
548
        }
 
549
        if (gstate.host_info.have_ati()) {
 
550
            x += p->ati_pwf.secs_this_debt_interval * f * ati_work_fetch.relative_speed;
 
551
        }
 
552
        x /= 1e9;
 
553
        double old = p->pwf.rec;
 
554
 
 
555
        // start averages at zero
 
556
        //
 
557
        if (p->pwf.rec_time == 0) {
 
558
            p->pwf.rec_time = gstate.debt_interval_start;
 
559
        }
 
560
 
 
561
        update_average(
 
562
            gstate.now,
 
563
            gstate.debt_interval_start,
 
564
            x,
 
565
            REC_HALF_LIFE,
 
566
            p->pwf.rec,
 
567
            p->pwf.rec_time
 
568
        );
 
569
 
 
570
        if (log_flags.debt_debug) {
 
571
            double dt = gstate.now - gstate.debt_interval_start;
 
572
            msg_printf(p, MSG_INFO,
 
573
                "[debt] recent est credit: %.2fG in %.2f sec, %f + %f ->%f",
 
574
                x, dt, old, p->pwf.rec-old, p->pwf.rec
 
575
            );
 
576
        }
 
577
    }
 
578
}
 
579
 
 
580
double peak_flops(APP_VERSION* avp) {
 
581
    double f = gstate.host_info.p_fpops;
 
582
    return f * avp->avg_ncpus
 
583
        + f * avp->ncudas * cuda_work_fetch.relative_speed
 
584
        + f * avp->natis * ati_work_fetch.relative_speed
 
585
    ;
 
586
}
 
587
 
 
588
static double rec_sum;
 
589
 
 
590
// Initialize project "priorities" based on REC:
 
591
// compute resource share and REC fractions
 
592
// among compute-intensive, non-suspended projects
 
593
//
 
594
void project_priority_init() {
 
595
    double rs_sum = 0;
 
596
    for (unsigned int i=0; i<gstate.projects.size(); i++) {
 
597
        PROJECT* p = gstate.projects[i];
 
598
        if (p->non_cpu_intensive) continue;
 
599
        if (p->suspended_via_gui) continue;
 
600
        rs_sum += p->resource_share;
 
601
        rec_sum += p->pwf.rec;
 
602
    }
 
603
    if (rec_sum == 0) {
 
604
        rec_sum = 1;
 
605
    }
 
606
    for (unsigned int i=0; i<gstate.projects.size(); i++) {
 
607
        PROJECT* p = gstate.projects[i];
 
608
        if (p->non_cpu_intensive || p->suspended_via_gui || rs_sum==0) {
 
609
            p->resource_share_frac = 0;
 
610
            continue;
 
611
        }
 
612
        p->resource_share_frac = p->resource_share/rs_sum;
 
613
        p->pwf.rec_temp = p->pwf.rec;
 
614
 
 
615
    }
 
616
}
 
617
 
 
618
double project_priority(PROJECT* p) {
 
619
    double x = p->resource_share_frac - p->pwf.rec_temp/rec_sum;
 
620
#if 0
 
621
    printf("%s: rs frac %.3f rec_temp %.3f rec_sum %.3f prio %.3f\n",
 
622
        p->project_name, p->resource_share_frac, p->pwf.rec_temp, rec_sum, x
 
623
    );
 
624
#endif
 
625
    return x;
 
626
}
 
627
 
 
628
// we plan to run this job.
 
629
// bump the project's REC accordingly
 
630
//
 
631
void adjust_rec_temp(RESULT* rp) {
 
632
    PROJECT* p = rp->project;
 
633
    p->pwf.rec_temp += peak_flops(rp->avp);
 
634
}
 
635
 
 
636
#endif
 
637
 
323
638
// adjust project debts (short, long-term)
324
639
//
325
640
void CLIENT_STATE::adjust_debts() {
326
641
    unsigned int i;
327
 
    double total_long_term_debt = 0;
328
 
    double total_short_term_debt = 0;
329
 
    double prrs, rrs;
330
 
    int nprojects=0, nrprojects=0;
331
 
    PROJECT *p;
332
 
    double share_frac;
333
 
    double wall_cpu_time = now - debt_interval_start;
334
 
 
335
 
    if (wall_cpu_time < 1) {
336
 
        return;
337
 
    }
338
 
 
339
 
    // if the elapsed time is more than the scheduling period,
 
642
    double elapsed_time = now - debt_interval_start;
 
643
 
 
644
    // If the elapsed time is more than 2*DEBT_ADJUST_PERIOD
340
645
    // it must be because the host was suspended for a long time.
341
 
    // Currently we don't have a way to estimate how long this was for,
342
 
    // so ignore the last period and reset counters.
 
646
    // In this case, ignore the last period
343
647
    //
344
 
    if (wall_cpu_time > global_prefs.cpu_scheduling_period()*2) {
 
648
    if (elapsed_time > 2*DEBT_ADJUST_PERIOD || elapsed_time < 0) {
345
649
        if (log_flags.debt_debug) {
346
650
            msg_printf(NULL, MSG_INFO,
347
 
                "[debt_debug] adjust_debt: elapsed time (%d) longer than sched period (%d).  Ignoring this period.",
348
 
                (int)wall_cpu_time, (int)global_prefs.cpu_scheduling_period()
 
651
                "[debt] adjust_debt: elapsed time (%d) longer than sched enforce period(%d).  Ignoring this period.",
 
652
                (int)elapsed_time, (int)DEBT_ADJUST_PERIOD
349
653
            );
350
654
        }
351
655
        reset_debt_accounting();
352
656
        return;
353
657
    }
354
658
 
355
 
    // Total up total and per-project "wall CPU" since last CPU reschedule.
356
 
    // "Wall CPU" is the wall time during which a task was
357
 
    // runnable (at the OS level).
 
659
    // skip small intervals
358
660
    //
359
 
    // We use wall CPU for debt calculation
360
 
    // (instead of reported actual CPU) for two reasons:
361
 
    // 1) the process might have paged a lot, so the actual CPU
362
 
    //    may be a lot less than wall CPU
363
 
    // 2) BOINC relies on apps to report their CPU time.
364
 
    //    Sometimes there are bugs and apps report zero CPU.
365
 
    //    It's safer not to trust them.
 
661
    if (elapsed_time < 1) {
 
662
        return;
 
663
    }
 
664
 
 
665
    // total up how many instance-seconds projects got
366
666
    //
367
667
    for (i=0; i<active_tasks.active_tasks.size(); i++) {
368
668
        ACTIVE_TASK* atp = active_tasks.active_tasks[i];
369
669
        if (atp->scheduler_state != CPU_SCHED_SCHEDULED) continue;
370
 
        if (atp->wup->project->non_cpu_intensive) continue;
371
 
 
372
 
        atp->result->project->wall_cpu_time_this_debt_interval += wall_cpu_time;
373
 
        total_wall_cpu_time_this_debt_interval += wall_cpu_time;
374
 
    }
375
 
 
376
 
    rrs = runnable_resource_share();
377
 
    prrs = potentially_runnable_resource_share();
378
 
 
379
 
    for (i=0; i<projects.size(); i++) {
380
 
        p = projects[i];
381
 
 
382
 
        // potentially_runnable() can be false right after a result completes,
383
 
        // but we still need to update its LTD.
384
 
        // In this case its wall_cpu_time_this_debt_interval will be nonzero.
385
 
        //
386
 
        if (!(p->potentially_runnable()) && p->wall_cpu_time_this_debt_interval) {
387
 
            prrs += p->resource_share;
388
 
        }
389
 
    }
390
 
 
391
 
    for (i=0; i<projects.size(); i++) {
392
 
        p = projects[i];
393
 
        if (p->non_cpu_intensive) continue;
394
 
        nprojects++;
395
 
 
396
 
        // adjust long-term debts
397
 
        //
398
 
        if (p->potentially_runnable() || p->wall_cpu_time_this_debt_interval) {
399
 
            share_frac = p->resource_share/prrs;
400
 
            p->long_term_debt += share_frac*total_wall_cpu_time_this_debt_interval
401
 
                - p->wall_cpu_time_this_debt_interval;
402
 
        }
403
 
        total_long_term_debt += p->long_term_debt;
404
 
 
405
 
        // adjust short term debts
406
 
        //
407
 
        if (p->runnable()) {
408
 
            nrprojects++;
409
 
            share_frac = p->resource_share/rrs;
410
 
            p->short_term_debt += share_frac*total_wall_cpu_time_this_debt_interval
411
 
                - p->wall_cpu_time_this_debt_interval;
412
 
            total_short_term_debt += p->short_term_debt;
413
 
        } else {
414
 
            p->short_term_debt = 0;
415
 
            p->anticipated_debt = 0;
416
 
        }
417
 
    }
418
 
 
419
 
    if (nprojects==0) return;
420
 
 
421
 
    // long-term debt:
422
 
    //  normalize so mean is zero,
423
 
    // short-term debt:
424
 
    //  normalize so mean is zero, and limit abs value at MAX_STD
425
 
    //
426
 
    double avg_long_term_debt = total_long_term_debt / nprojects;
427
 
    double avg_short_term_debt = 0;
428
 
    if (nrprojects) {
429
 
        avg_short_term_debt = total_short_term_debt / nrprojects;
430
 
    }
431
 
    for (i=0; i<projects.size(); i++) {
432
 
        p = projects[i];
433
 
        if (p->non_cpu_intensive) continue;
434
 
        if (p->runnable()) {
435
 
            p->short_term_debt -= avg_short_term_debt;
436
 
            if (p->short_term_debt > MAX_STD) {
437
 
                p->short_term_debt = MAX_STD;
438
 
            }
439
 
            if (p->short_term_debt < -MAX_STD) {
440
 
                p->short_term_debt = -MAX_STD;
441
 
            }
442
 
        }
443
 
 
444
 
        p->long_term_debt -= avg_long_term_debt;
445
 
        if (log_flags.debt_debug) {
446
 
            msg_printf(0, MSG_INFO,
447
 
                "[debt_debug] adjust_debts(): project %s: STD %f, LTD %f",
448
 
                p->project_name, p->short_term_debt, p->long_term_debt
449
 
            );
450
 
        }
451
 
    }
 
670
        PROJECT* p = atp->result->project;
 
671
        if (p->non_cpu_intensive) continue;
 
672
        work_fetch.accumulate_inst_sec(atp, elapsed_time);
 
673
    }
 
674
 
 
675
#ifdef USE_REC
 
676
    update_rec();
 
677
#else
 
678
    cpu_work_fetch.update_long_term_debts();
 
679
    cpu_work_fetch.update_short_term_debts();
 
680
    if (host_info.have_cuda()) {
 
681
        cuda_work_fetch.update_long_term_debts();
 
682
        cuda_work_fetch.update_short_term_debts();
 
683
    }
 
684
    if (host_info.have_ati()) {
 
685
        ati_work_fetch.update_long_term_debts();
 
686
        ati_work_fetch.update_short_term_debts();
 
687
    }
 
688
#endif
452
689
 
453
690
    reset_debt_accounting();
454
691
}
466
703
    if (projects.size() == 0) return false;
467
704
    if (results.size() == 0) return false;
468
705
 
469
 
    // Reschedule every cpu_sched_period seconds,
 
706
    // Reschedule every CPU_SCHED_PERIOD seconds,
470
707
    // or if must_schedule_cpus is set
471
708
    // (meaning a new result is available, or a CPU has been freed).
472
709
    //
473
710
    elapsed_time = now - last_reschedule;
474
 
    if (elapsed_time >= global_prefs.cpu_scheduling_period()) {
475
 
        request_schedule_cpus("Scheduling period elapsed.");
 
711
    if (elapsed_time >= CPU_SCHED_PERIOD) {
 
712
        request_schedule_cpus("periodic CPU scheduling");
476
713
    }
477
714
 
478
715
    if (!must_schedule_cpus) return false;
482
719
    return true;
483
720
}
484
721
 
485
 
struct PROC_RESOURCES {
486
 
    int ncpus;
487
 
    double ncpus_used;
488
 
    double ram_left;
489
 
    int ncoproc_jobs;   // # of runnable jobs that use coprocs
490
 
    COPROCS coprocs;
491
 
 
492
 
    // should we stop scanning jobs?
493
 
    //
494
 
    bool stop_scan() {
495
 
                if (ncpus_used >= ncpus) {
496
 
                        if (!ncoproc_jobs) return true;
497
 
                        if (coprocs.fully_used()) return true;
498
 
                }
499
 
                return false;
500
 
    }
501
 
 
502
 
    // should we consider scheduling this job?
503
 
    //
504
 
    bool can_schedule(RESULT* rp, ACTIVE_TASK* atp) {
505
 
        if (rp->uses_coprocs()) {
506
 
 
507
 
            // if it uses coprocs, and they're available, yes
508
 
            //
509
 
                        if (atp && atp->coprocs_reserved) {
510
 
                                if (log_flags.cpu_sched_debug) {
511
 
                                        msg_printf(rp->project, MSG_INFO,
512
 
                                                "[cpu_sched_debug] already reserved coprocessors for %s", rp->name
513
 
                                        );
514
 
                                }
515
 
                                return true;
516
 
                        }
517
 
            if (coprocs.sufficient_coprocs(
518
 
                rp->avp->coprocs, log_flags.cpu_sched_debug, "cpu_sched_debug")
519
 
            ) {
520
 
                return true;
521
 
            } else {
522
 
                if (log_flags.cpu_sched_debug) {
523
 
                    msg_printf(rp->project, MSG_INFO,
524
 
                        "[cpu_sched_debug] insufficient coprocessors for %s", rp->name
525
 
                    );
526
 
                }
527
 
                return false;
528
 
            }
529
 
        } else {
530
 
            // otherwise, only if CPUs are available
531
 
            //
532
 
            return (ncpus_used < ncpus);
533
 
        }
534
 
    }
535
 
};
536
 
 
537
722
// Check whether the job can be run:
538
 
// - it will fit in RAM
539
723
// - we have enough shared-mem segments (old Mac problem)
540
 
// If so, update proc_rsc accordingly and return true
 
724
// If so, update proc_rsc and anticipated debts, and return true
541
725
//
542
726
static bool schedule_if_possible(
543
 
    RESULT* rp, ACTIVE_TASK* atp, PROC_RESOURCES& proc_rsc, double rrs, double expected_payoff
 
727
    RESULT* rp, ACTIVE_TASK* atp, PROC_RESOURCES& proc_rsc,
 
728
    const char* description
544
729
) {
545
730
    if (atp) {
546
 
        // see if it fits in available RAM
547
 
        //
548
 
        if (atp->procinfo.working_set_size_smoothed > proc_rsc.ram_left) {
549
 
            if (log_flags.cpu_sched_debug) {
550
 
                msg_printf(rp->project, MSG_INFO,
551
 
                    "[cpu_sched_debug]  %s misses deadline but too large: %.2fMB",
552
 
                    rp->name, atp->procinfo.working_set_size_smoothed/MEGA
553
 
                );
554
 
            }
555
 
            atp->too_large = true;
556
 
            return false;
557
 
        }
558
 
        atp->too_large = false;
559
 
        
560
731
        if (gstate.retry_shmem_time > gstate.now) {
561
732
            if (atp->app_client_shm.shm == NULL) {
562
733
                if (log_flags.cpu_sched_debug) {
563
734
                    msg_printf(rp->project, MSG_INFO,
564
 
                        "[cpu_sched_debug] waiting for shared mem: %s",
 
735
                        "[cpu_sched] waiting for shared mem: %s",
565
736
                        rp->name
566
737
                    );
567
738
                }
570
741
            }
571
742
            atp->needs_shmem = false;
572
743
        }
573
 
        proc_rsc.ram_left -= atp->procinfo.working_set_size_smoothed;
574
744
    }
 
745
 
575
746
    if (log_flags.cpu_sched_debug) {
576
747
        msg_printf(rp->project, MSG_INFO,
577
 
            "[cpu_sched_debug] scheduling %s", rp->name
 
748
            "[cpu_sched] scheduling %s (%s)", rp->name, description
578
749
        );
579
750
    }
580
 
        if (!atp || !atp->coprocs_reserved) {
581
 
                proc_rsc.coprocs.reserve_coprocs(
582
 
                        rp->avp->coprocs, rp, log_flags.cpu_sched_debug, "cpu_sched_debug"
583
 
                );
584
 
        }
585
 
    proc_rsc.ncpus_used += rp->avp->avg_ncpus;
586
 
    if (rp->uses_coprocs()) {
587
 
        proc_rsc.ncoproc_jobs--;
588
 
    }
589
 
    rp->project->anticipated_debt -= (rp->project->resource_share / rrs) * expected_payoff;
 
751
    proc_rsc.schedule(rp);
 
752
 
 
753
#ifdef USE_REC
 
754
    adjust_rec_temp(rp);
 
755
#else
 
756
    // project STD at end of time slice
 
757
    //
 
758
    double dt = gstate.global_prefs.cpu_scheduling_period();
 
759
    rp->project->cpu_pwf.anticipated_debt -= dt*rp->avp->avg_ncpus/cpu_work_fetch.ninstances;
 
760
    rp->project->cuda_pwf.anticipated_debt -= dt*rp->avp->ncudas/cuda_work_fetch.ninstances;
 
761
    rp->project->ati_pwf.anticipated_debt -= dt*rp->avp->natis/ati_work_fetch.ninstances;
 
762
#endif
590
763
    return true;
591
764
}
592
765
 
 
766
// If a job J once ran in EDF,
 
767
// and its project has another job of the same resource type
 
768
// marked as deadline miss, mark J as deadline miss.
 
769
// This avoids domino-effect preemption
 
770
//
 
771
static void promote_once_ran_edf() {
 
772
    for (unsigned int i=0; i<gstate.active_tasks.active_tasks.size(); i++) {
 
773
        ACTIVE_TASK* atp = gstate.active_tasks.active_tasks[i];
 
774
        if (atp->once_ran_edf) {
 
775
            RESULT* rp = atp->result;
 
776
            PROJECT* p = rp->project;
 
777
            if (p->deadlines_missed(rp->avp->rsc_type())) {
 
778
                rp->rr_sim_misses_deadline = true;
 
779
            }
 
780
        }
 
781
    }
 
782
}
 
783
 
 
784
void add_coproc_jobs(int rsc_type, PROC_RESOURCES& proc_rsc) {
 
785
    ACTIVE_TASK* atp;
 
786
    RESULT* rp;
 
787
    bool can_run;
 
788
 
 
789
#ifdef SIM
 
790
    if (!cpu_sched_rr_only) {
 
791
#endif
 
792
    // choose coproc jobs from projects with coproc deadline misses
 
793
    //
 
794
    while (!proc_rsc.stop_scan_coproc(rsc_type)) {
 
795
        rp = earliest_deadline_result(rsc_type);
 
796
        if (!rp) break;
 
797
        rp->already_selected = true;
 
798
        if (!proc_rsc.can_schedule(rp)) continue;
 
799
        atp = gstate.lookup_active_task_by_result(rp);
 
800
        can_run = schedule_if_possible(
 
801
            rp, atp, proc_rsc, "coprocessor job, EDF"
 
802
        );
 
803
        if (!can_run) continue;
 
804
        if (rsc_type == RSC_TYPE_CUDA) {
 
805
            rp->project->cuda_pwf.deadlines_missed_copy--;
 
806
        } else {
 
807
            rp->project->ati_pwf.deadlines_missed_copy--;
 
808
        }
 
809
        rp->edf_scheduled = true;
 
810
        gstate.ordered_scheduled_results.push_back(rp);
 
811
    }
 
812
#ifdef SIM
 
813
    }
 
814
#endif
 
815
 
 
816
    // then coproc jobs in FIFO order
 
817
    //
 
818
    while (!proc_rsc.stop_scan_coproc(rsc_type)) {
 
819
        rp = first_coproc_result(rsc_type);
 
820
        if (!rp) break;
 
821
        rp->already_selected = true;
 
822
        if (!proc_rsc.can_schedule(rp)) continue;
 
823
        atp = gstate.lookup_active_task_by_result(rp);
 
824
        can_run = schedule_if_possible(
 
825
            rp, atp, proc_rsc, "coprocessor job, FIFO"
 
826
        );
 
827
        if (!can_run) continue;
 
828
        gstate.ordered_scheduled_results.push_back(rp);
 
829
    }
 
830
}
 
831
 
593
832
// CPU scheduler - decide which results to run.
594
833
// output: sets ordered_scheduled_result.
595
834
//
596
835
void CLIENT_STATE::schedule_cpus() {
597
836
    RESULT* rp;
598
837
    PROJECT* p;
599
 
    double expected_payoff;
600
838
    unsigned int i;
601
 
    double rrs = runnable_resource_share();
602
839
    PROC_RESOURCES proc_rsc;
603
840
    ACTIVE_TASK* atp;
 
841
    bool can_run;
604
842
 
605
843
    proc_rsc.ncpus = ncpus;
606
 
    proc_rsc.ncpus_used = 0;
607
 
    proc_rsc.ram_left = available_ram();
608
 
    proc_rsc.coprocs.clone(coprocs, true);
609
 
    proc_rsc.ncoproc_jobs = 0;
 
844
    proc_rsc.ncpus_used_st = 0;
 
845
    proc_rsc.ncpus_used_mt = 0;
 
846
    proc_rsc.coprocs.clone(host_info.coprocs, false);
610
847
 
611
848
    if (log_flags.cpu_sched_debug) {
612
 
        msg_printf(0, MSG_INFO, "[cpu_sched_debug] schedule_cpus(): start");
 
849
        msg_printf(0, MSG_INFO, "[cpu_sched] schedule_cpus(): start");
613
850
    }
614
851
 
615
852
    // do round-robin simulation to find what results miss deadline
619
856
        print_deadline_misses();
620
857
    }
621
858
 
622
 
    adjust_debts();
 
859
    // avoid preemption of jobs that once ran EDF
 
860
    //
 
861
    promote_once_ran_edf();
623
862
 
624
863
    // set temporary variables
625
864
    //
 
865
#ifdef USE_REC
 
866
    project_priority_init();
 
867
#endif
626
868
    for (i=0; i<results.size(); i++) {
627
869
        rp = results[i];
628
870
        rp->already_selected = false;
629
871
        rp->edf_scheduled = false;
630
 
        if (rp->uses_coprocs()) proc_rsc.ncoproc_jobs++;
631
872
    }
632
873
    for (i=0; i<projects.size(); i++) {
633
874
        p = projects[i];
634
875
        p->next_runnable_result = NULL;
635
 
        p->anticipated_debt = p->short_term_debt;
636
 
        p->deadlines_missed = p->rr_sim_status.deadlines_missed;
 
876
#ifndef USE_REC
 
877
        p->cpu_pwf.anticipated_debt = p->cpu_pwf.short_term_debt;
 
878
        p->cuda_pwf.anticipated_debt = p->cuda_pwf.short_term_debt;
 
879
        p->ati_pwf.anticipated_debt = p->ati_pwf.short_term_debt;
 
880
#endif
 
881
        p->cpu_pwf.deadlines_missed_copy = p->cpu_pwf.deadlines_missed;
 
882
        p->cuda_pwf.deadlines_missed_copy = p->cuda_pwf.deadlines_missed;
 
883
        p->ati_pwf.deadlines_missed_copy = p->ati_pwf.deadlines_missed;
 
884
    }
 
885
    for (i=0; i<app_versions.size(); i++) {
 
886
        app_versions[i]->max_working_set_size = 0;
637
887
    }
638
888
    for (i=0; i<active_tasks.active_tasks.size(); i++) {
639
 
        active_tasks.active_tasks[i]->too_large = false;
 
889
        atp = active_tasks.active_tasks[i];
 
890
        atp->too_large = false;
 
891
        double w = atp->procinfo.working_set_size_smoothed;
 
892
        APP_VERSION* avp = atp->app_version;
 
893
        if (w > avp->max_working_set_size) {
 
894
            avp->max_working_set_size = w;
 
895
        }
640
896
    }
641
897
 
642
 
    expected_payoff = global_prefs.cpu_scheduling_period();
643
898
    ordered_scheduled_results.clear();
644
899
 
645
 
    // First choose results from projects with P.deadlines_missed>0
 
900
    // first, add GPU jobs
 
901
 
 
902
    add_coproc_jobs(RSC_TYPE_CUDA, proc_rsc);
 
903
    add_coproc_jobs(RSC_TYPE_ATI, proc_rsc);
 
904
 
 
905
    // then add CPU jobs.
 
906
    // Note: the jobs that actually get run are not necessarily
 
907
    // an initial segment of this list;
 
908
    // e.g. a multithread job may not get run because it has
 
909
    // a high-priority single-thread job ahead of it.
 
910
 
 
911
    // choose CPU jobs from projects with CPU deadline misses
646
912
    //
647
913
#ifdef SIM
648
914
    if (!cpu_sched_rr_only) {
649
915
#endif
650
 
    while (!proc_rsc.stop_scan()) {
651
 
        rp = earliest_deadline_result();
 
916
    while (!proc_rsc.stop_scan_cpu()) {
 
917
        rp = earliest_deadline_result(RSC_TYPE_CPU);
652
918
        if (!rp) break;
653
919
        rp->already_selected = true;
654
 
                atp = lookup_active_task_by_result(rp);
655
 
 
656
 
        if (!proc_rsc.can_schedule(rp, atp)) continue;
657
 
        if (!schedule_if_possible(rp, atp, proc_rsc, rrs, expected_payoff)) continue;
658
 
 
659
 
        rp->project->deadlines_missed--;
 
920
        if (!proc_rsc.can_schedule(rp)) continue;
 
921
        atp = lookup_active_task_by_result(rp);
 
922
        can_run = schedule_if_possible(
 
923
            rp, atp, proc_rsc, "CPU job, EDF"
 
924
        );
 
925
        if (!can_run) continue;
 
926
        rp->project->cpu_pwf.deadlines_missed_copy--;
660
927
        rp->edf_scheduled = true;
661
928
        ordered_scheduled_results.push_back(rp);
662
929
    }
664
931
    }
665
932
#endif
666
933
 
667
 
    // Next, choose results from projects with large debt
 
934
    // Next, choose CPU jobs from projects with large debt
668
935
    //
669
 
    while (!proc_rsc.stop_scan()) {
 
936
    while (!proc_rsc.stop_scan_cpu()) {
670
937
        assign_results_to_projects();
671
938
        rp = largest_debt_project_best_result();
672
939
        if (!rp) break;
673
 
                atp = lookup_active_task_by_result(rp);
674
 
        if (!proc_rsc.can_schedule(rp, atp)) continue;
675
 
        if (!schedule_if_possible(rp, atp, proc_rsc, rrs, expected_payoff)) continue;
 
940
        atp = lookup_active_task_by_result(rp);
 
941
        if (!proc_rsc.can_schedule(rp)) continue;
 
942
        can_run = schedule_if_possible(
 
943
            rp, atp, proc_rsc, "CPU job, debt order"
 
944
        );
 
945
        if (!can_run) continue;
676
946
        ordered_scheduled_results.push_back(rp);
677
947
    }
678
948
 
679
 
    request_enforce_schedule("schedule_cpus");
680
 
    set_client_state_dirty("schedule_cpus");
 
949
    enforce_schedule();
681
950
}
682
951
 
683
952
static inline bool in_ordered_scheduled_results(ACTIVE_TASK* atp) {
684
 
        for (unsigned int i=0; i<gstate.ordered_scheduled_results.size(); i++) {
685
 
                if (atp->result == gstate.ordered_scheduled_results[i]) return true;
686
 
        }
687
 
        return false;
688
 
}
689
 
 
690
 
// return true if t1 is more preemptable than t0
691
 
//
692
 
static inline bool more_preemptable(ACTIVE_TASK* t0, ACTIVE_TASK* t1) {
693
 
    if (t0->result->project->deadlines_missed && !t1->result->project->deadlines_missed) return true;
694
 
    if (!t0->result->project->deadlines_missed && t1->result->project->deadlines_missed) return false;
695
 
    if (t0->result->project->deadlines_missed && t1->result->project->deadlines_missed) {
696
 
        if (t0->result->report_deadline < t1->result->report_deadline) return true;
697
 
        if (t0->result->report_deadline > t1->result->report_deadline) return false;
698
 
        return (t0 < t1);
699
 
    } else {
700
 
                bool fin0 = finished_time_slice(t0);
701
 
                bool fin1 = finished_time_slice(t1);
702
 
                if (fin1 && !fin0) return true;
703
 
                if (fin0 && !fin1) return false;
704
 
        if (t0->result->report_deadline < t1->result->report_deadline) return true;
705
 
        if (t0->result->report_deadline > t1->result->report_deadline) return false;
706
 
        return (t0 < t1);
707
 
    }
708
 
}
709
 
 
710
 
// Make a list of preemptable tasks, in increasing order of preemptability.
711
 
// "Preemptable" means: running, non-GPU, not non-CPU-intensive,
712
 
// not in the scheduled results list.
713
 
//
714
 
void CLIENT_STATE::make_preemptable_task_list(
715
 
        vector<ACTIVE_TASK*> &preemptable_tasks, double& ncpus_used
 
953
    for (unsigned int i=0; i<gstate.ordered_scheduled_results.size(); i++) {
 
954
        if (atp->result == gstate.ordered_scheduled_results[i]) return true;
 
955
    }
 
956
    return false;
 
957
}
 
958
 
 
959
// scan the runnable list, keeping track of CPU usage X.
 
960
// if find a MT job J, and X < ncpus, move J before all non-MT jobs
 
961
// But don't promote a MT job ahead of a job in EDF
 
962
//
 
963
// This is needed because there may always be a 1-CPU jobs
 
964
// in the middle of its time-slice, and MT jobs could starve.
 
965
//
 
966
static void promote_multi_thread_jobs(vector<RESULT*>& runnable_jobs) {
 
967
    double cpus_used = 0;
 
968
    vector<RESULT*>::iterator first_non_mt = runnable_jobs.end();
 
969
    vector<RESULT*>::iterator cur = runnable_jobs.begin();
 
970
    while(1) {
 
971
        if (cur == runnable_jobs.end()) break;
 
972
        if (cpus_used >= gstate.ncpus) break;
 
973
        RESULT* rp = *cur;
 
974
        if (rp->rr_sim_misses_deadline) break;
 
975
        double nc = rp->avp->avg_ncpus;
 
976
        if (nc > 1) {
 
977
            if (first_non_mt != runnable_jobs.end()) {
 
978
                cur = runnable_jobs.erase(cur);
 
979
                runnable_jobs.insert(first_non_mt, rp);
 
980
                cpus_used = 0;
 
981
                first_non_mt = runnable_jobs.end();
 
982
                cur = runnable_jobs.begin();
 
983
                continue;
 
984
            }
 
985
        } else {
 
986
            if (first_non_mt == runnable_jobs.end()) {
 
987
                first_non_mt = cur;
 
988
            }
 
989
        }
 
990
        cpus_used += nc;
 
991
        cur++;
 
992
    }
 
993
}
 
994
 
 
995
// return true if r0 is more important to run than r1
 
996
//
 
997
static inline bool more_important(RESULT* r0, RESULT* r1) {
 
998
    // favor jobs in danger of deadline miss
 
999
    //
 
1000
    bool miss0 = r0->edf_scheduled;
 
1001
    bool miss1 = r1->edf_scheduled;
 
1002
    if (miss0 && !miss1) return true;
 
1003
    if (!miss0 && miss1) return false;
 
1004
 
 
1005
    // favor coproc jobs, so that e.g. if we're RAM-limited
 
1006
    // we'll use the GPU instead of the CPU
 
1007
    //
 
1008
    bool cp0 = r0->uses_coprocs();
 
1009
    bool cp1 = r1->uses_coprocs();
 
1010
    if (cp0 && !cp1) return true;
 
1011
    if (!cp0 && cp1) return false;
 
1012
 
 
1013
    // favor jobs in the middle of time slice
 
1014
    //
 
1015
    bool unfin0 = r0->unfinished_time_slice;
 
1016
    bool unfin1 = r1->unfinished_time_slice;
 
1017
    if (unfin0 && !unfin1) return true;
 
1018
    if (!unfin0 && unfin1) return false;
 
1019
 
 
1020
    // favor jobs selected first by schedule_cpus()
 
1021
    // (e.g., because their project has high STD)
 
1022
    //
 
1023
    if (r0->seqno < r1->seqno) return true;
 
1024
    if (r0->seqno > r1->seqno) return false;
 
1025
 
 
1026
    // tie breaker
 
1027
    return (r0 < r1);
 
1028
}
 
1029
 
 
1030
static void print_job_list(vector<RESULT*>& jobs) {
 
1031
    for (unsigned int i=0; i<jobs.size(); i++) {
 
1032
        RESULT* rp = jobs[i];
 
1033
        msg_printf(rp->project, MSG_INFO,
 
1034
            "[cpu_sched] %d: %s (MD: %s; UTS: %s)",
 
1035
            i, rp->name,
 
1036
            rp->edf_scheduled?"yes":"no",
 
1037
            rp->unfinished_time_slice?"yes":"no"
 
1038
        );
 
1039
    }
 
1040
}
 
1041
 
 
1042
// find running jobs that haven't finished their time slice.
 
1043
// Mark them as such, and add to list if not already there
 
1044
//
 
1045
void CLIENT_STATE::append_unfinished_time_slice(
 
1046
    vector<RESULT*> &runnable_jobs
716
1047
) {
717
1048
    unsigned int i;
718
 
    ACTIVE_TASK* atp;
 
1049
    int seqno = (int)runnable_jobs.size();
719
1050
 
720
 
        ncpus_used = 0;
721
1051
    for (i=0; i<active_tasks.active_tasks.size(); i++) {
722
 
        atp = active_tasks.active_tasks[i];
723
 
                if (in_ordered_scheduled_results(atp)) continue;
724
 
                if (!atp->result->runnable()) continue;
 
1052
        ACTIVE_TASK* atp = active_tasks.active_tasks[i];
 
1053
        if (!atp->result->runnable()) continue;
 
1054
        if (atp->result->uses_coprocs() && gpu_suspend_reason) continue;
725
1055
        if (atp->result->project->non_cpu_intensive) continue;
726
1056
        if (atp->scheduler_state != CPU_SCHED_SCHEDULED) continue;
727
 
                ncpus_used += atp->app_version->avg_ncpus;
728
 
                atp->next_scheduler_state = CPU_SCHED_SCHEDULED;
729
 
        if (atp->result->uses_coprocs()) continue;
730
 
        preemptable_tasks.push_back(atp);
731
 
#if 0
732
 
                msg_printf(0, MSG_INFO, "%s: misses %d deadline %f finished %d ptr %x",
733
 
                        atp->result->name,
734
 
                        atp->result->project->deadlines_missed,
735
 
                        atp->result->report_deadline,
736
 
                        finished_time_slice(atp), atp
737
 
                );
738
 
#endif
739
 
    }
740
 
 
741
 
    std::sort(
742
 
        preemptable_tasks.begin(),
743
 
        preemptable_tasks.end(),
744
 
        more_preemptable
 
1057
        if (finished_time_slice(atp)) continue;
 
1058
        atp->result->unfinished_time_slice = true;
 
1059
        if (in_ordered_scheduled_results(atp)) continue;
 
1060
        runnable_jobs.push_back(atp->result);
 
1061
        atp->result->seqno = seqno;
 
1062
    }
 
1063
}
 
1064
 
 
1065
////////// Coprocessor scheduling ////////////////
 
1066
//
 
1067
// theory of operations:
 
1068
//
 
1069
// Jobs can use one or more integral instances, or a fractional instance
 
1070
//
 
1071
// RESULT::coproc_indices
 
1072
//    for a running job, the coprocessor instances it's using
 
1073
// COPROC::pending_usage[]: for each instance, its usage by running jobs
 
1074
// CORPOC::usage[]: for each instance, its usage
 
1075
//
 
1076
// enforce_schedule() calls assign_coprocs(),
 
1077
// which assigns coproc instances to scheduled jobs,
 
1078
// and prunes jobs for which we can't make an assignment
 
1079
// (the job list is in order of decreasing priority)
 
1080
//
 
1081
// assign_coprocs():
 
1082
//     clear usage and pending_usage of all instances
 
1083
//     for each running job J
 
1084
//         increment pending_usage for the instances assigned to J
 
1085
//     for each scheduled job J
 
1086
//         if J is running
 
1087
//             if J's assignment fits
 
1088
//                 confirm assignment: dev pending_usage, inc usage
 
1089
//             else
 
1090
//                 prune J
 
1091
//         else
 
1092
//             if J.usage is fractional
 
1093
//                look for an instance that's already fractionally assigned
 
1094
//                if that fails, look for a free instance
 
1095
//                if that fails, prune J
 
1096
//             else
 
1097
//                if there are enough instances with usage=0
 
1098
//                    assign instances with pending_usage = usage = 0
 
1099
//                        (avoid preempting running jobs)
 
1100
//                    if need more, assign instances with usage = 0
 
1101
//                else
 
1102
//                    prune J
 
1103
 
 
1104
static inline void increment_pending_usage(
 
1105
    RESULT* rp, double usage, COPROC* cp
 
1106
) {
 
1107
    double x = (usage<1)?usage:1;
 
1108
    for (int i=0; i<usage; i++) {
 
1109
        int j = rp->coproc_indices[i];
 
1110
        cp->pending_usage[j] += x;
 
1111
        if (cp->pending_usage[j] > 1) {
 
1112
            if (log_flags.coproc_debug) {
 
1113
                msg_printf(rp->project, MSG_INFO,
 
1114
                    "[coproc] huh? %s %d %s pending usage > 1",
 
1115
                    cp->type, i, rp->name
 
1116
                );
 
1117
            }
 
1118
        }
 
1119
    }
 
1120
}
 
1121
 
 
1122
// check the GPU assignment for a currently-running app.
 
1123
// Note: don't check available RAM.
 
1124
// It may not be known (e.g. NVIDIA) and in any case, 
 
1125
// if the app is still running, it has enough RAM
 
1126
//
 
1127
static inline bool current_assignment_ok(
 
1128
    RESULT* rp, double usage, COPROC* cp, bool& defer_sched
 
1129
) {
 
1130
    defer_sched = false;
 
1131
    double x = (usage<1)?usage:1;
 
1132
    for (int i=0; i<usage; i++) {
 
1133
        int j = rp->coproc_indices[i];
 
1134
        if (cp->usage[j] + x > 1) {
 
1135
            if (log_flags.coproc_debug) {
 
1136
                msg_printf(rp->project, MSG_INFO,
 
1137
                    "[coproc] %s device %d already assigned: task %s",
 
1138
                    cp->type, j, rp->name
 
1139
                );
 
1140
            }
 
1141
            return false;
 
1142
        }
 
1143
    }
 
1144
    return true;
 
1145
}
 
1146
 
 
1147
static inline void confirm_current_assignment(
 
1148
    RESULT* rp, double usage, COPROC* cp
 
1149
) {
 
1150
    double x = (usage<1)?usage:1;
 
1151
    for (int i=0; i<usage; i++) {
 
1152
        int j = rp->coproc_indices[i];
 
1153
        cp->usage[j] +=x;
 
1154
        cp->pending_usage[j] -=x;
 
1155
        if (log_flags.coproc_debug) {
 
1156
            msg_printf(rp->project, MSG_INFO,
 
1157
                "[coproc] %s instance %d: confirming for %s",
 
1158
                cp->type, j, rp->name
 
1159
            );
 
1160
        }
 
1161
        cp->available_ram[j] -= rp->avp->gpu_ram;
 
1162
    }
 
1163
}
 
1164
 
 
1165
static inline bool get_fractional_assignment(
 
1166
    RESULT* rp, double usage, COPROC* cp, bool& defer_sched
 
1167
) {
 
1168
    int i;
 
1169
    defer_sched = false;
 
1170
 
 
1171
    // try to assign an instance that's already fractionally assigned
 
1172
    //
 
1173
    for (i=0; i<cp->count; i++) {
 
1174
        if (cp->available_ram_unknown[i]) {
 
1175
            continue;
 
1176
        }
 
1177
        if ((cp->usage[i] || cp->pending_usage[i])
 
1178
            && (cp->usage[i] + cp->pending_usage[i] + usage <= 1)
 
1179
        ) {
 
1180
            if (rp->avp->gpu_ram > cp->available_ram[i]) {
 
1181
                defer_sched = true;
 
1182
                continue;
 
1183
            }
 
1184
            rp->coproc_indices[0] = i;
 
1185
            cp->usage[i] += usage;
 
1186
            cp->available_ram[i] -= rp->avp->gpu_ram;
 
1187
            if (log_flags.coproc_debug) {
 
1188
                msg_printf(rp->project, MSG_INFO,
 
1189
                    "[coproc] Assigning %f of %s instance %d to %s",
 
1190
                    usage, cp->type, i, rp->name
 
1191
                );
 
1192
            }
 
1193
            return true;
 
1194
        }
 
1195
    }
 
1196
 
 
1197
    // failing that, assign an unreserved instance
 
1198
    //
 
1199
    for (i=0; i<cp->count; i++) {
 
1200
        if (cp->available_ram_unknown[i]) {
 
1201
            continue;
 
1202
        }
 
1203
        if (!cp->usage[i]) {
 
1204
            if (rp->avp->gpu_ram > cp->available_ram[i]) {
 
1205
                defer_sched = true;
 
1206
                continue;
 
1207
            }
 
1208
            rp->coproc_indices[0] = i;
 
1209
            cp->usage[i] += usage;
 
1210
            cp->available_ram[i] -= rp->avp->gpu_ram;
 
1211
            if (log_flags.coproc_debug) {
 
1212
                msg_printf(rp->project, MSG_INFO,
 
1213
                    "[coproc] Assigning %f of %s free instance %d to %s",
 
1214
                    usage, cp->type, i, rp->name
 
1215
                );
 
1216
            }
 
1217
            return true;
 
1218
        }
 
1219
    }
 
1220
    msg_printf(rp->project, MSG_INFO,
 
1221
        "[coproc] Insufficient %s for %s: need %f",
 
1222
        cp->type, rp->name, usage
745
1223
    );
 
1224
 
 
1225
    return false;
 
1226
}
 
1227
 
 
1228
static inline bool get_integer_assignment(
 
1229
    RESULT* rp, double usage, COPROC* cp, bool& defer_sched
 
1230
) {
 
1231
    int i;
 
1232
    defer_sched = false;
 
1233
 
 
1234
    // make sure we have enough free instances
 
1235
    //
 
1236
    int nfree = 0;
 
1237
    for (i=0; i<cp->count; i++) {
 
1238
        if (cp->available_ram_unknown[i]) {
 
1239
            continue;
 
1240
        }
 
1241
        if (!cp->usage[i]) {
 
1242
            if (rp->avp->gpu_ram > cp->available_ram[i]) {
 
1243
                defer_sched = true;
 
1244
                continue;
 
1245
            };
 
1246
            nfree++;
 
1247
        }
 
1248
    }
 
1249
    if (nfree < usage) {
 
1250
        if (log_flags.coproc_debug) {
 
1251
            msg_printf(rp->project, MSG_INFO,
 
1252
                "[coproc] Insufficient %s for %s; need %d, available %d",
 
1253
                cp->type, rp->name, (int)usage, nfree
 
1254
            );
 
1255
            if (defer_sched) {
 
1256
                msg_printf(rp->project, MSG_INFO,
 
1257
                    "[coproc] some instances lack available memory"
 
1258
                );
 
1259
            }
 
1260
        }
 
1261
        return false;
 
1262
    }
 
1263
 
 
1264
    int n = 0;
 
1265
 
 
1266
    // assign non-pending instances first
 
1267
 
 
1268
    for (i=0; i<cp->count; i++) {
 
1269
        if (cp->available_ram_unknown[i]) {
 
1270
            continue;
 
1271
        }
 
1272
        if (!cp->usage[i]
 
1273
            && !cp->pending_usage[i]
 
1274
            && (rp->avp->gpu_ram <= cp->available_ram[i])
 
1275
        ) {
 
1276
            cp->usage[i] = 1;
 
1277
            cp->available_ram[i] -= rp->avp->gpu_ram;
 
1278
            rp->coproc_indices[n++] = i;
 
1279
            if (log_flags.coproc_debug) {
 
1280
                msg_printf(rp->project, MSG_INFO,
 
1281
                    "[coproc] Assigning %s instance %d to %s",
 
1282
                    cp->type, i, rp->name
 
1283
                );
 
1284
            }
 
1285
            if (n == usage) return true;
 
1286
        }
 
1287
    }
 
1288
 
 
1289
    // if needed, assign pending instances
 
1290
 
 
1291
    for (i=0; i<cp->count; i++) {
 
1292
        if (cp->available_ram_unknown[i]) {
 
1293
            continue;
 
1294
        }
 
1295
        if (!cp->usage[i]
 
1296
            && (rp->avp->gpu_ram <= cp->available_ram[i])
 
1297
        ) {
 
1298
            cp->usage[i] = 1;
 
1299
            cp->available_ram[i] -= rp->avp->gpu_ram;
 
1300
            rp->coproc_indices[n++] = i;
 
1301
            if (log_flags.coproc_debug) {
 
1302
                msg_printf(rp->project, MSG_INFO,
 
1303
                    "[coproc] Assigning %s pending instance %d to %s",
 
1304
                    cp->type, i, rp->name
 
1305
                );
 
1306
            }
 
1307
            if (n == usage) return true;
 
1308
        }
 
1309
    }
 
1310
    if (log_flags.coproc_debug) {
 
1311
        msg_printf(rp->project, MSG_INFO,
 
1312
            "[coproc] huh??? ran out of %s instances for %s",
 
1313
            cp->type, rp->name
 
1314
        );
 
1315
    }
 
1316
    return false;
 
1317
}
 
1318
 
 
1319
static inline void mark_as_defer_sched(RESULT* rp) {
 
1320
    if (rp->uses_cuda()) {
 
1321
        rp->project->cuda_defer_sched = true;
 
1322
    } else if (rp->uses_ati()) {
 
1323
        rp->project->ati_defer_sched = true;
 
1324
    }
 
1325
    rp->schedule_backoff = gstate.now + 300; // try again in 5 minutes
 
1326
    gstate.request_schedule_cpus("insufficient GPU RAM");
 
1327
}
 
1328
 
 
1329
static inline void assign_coprocs(vector<RESULT*>& jobs) {
 
1330
    unsigned int i;
 
1331
    COPROC* cp;
 
1332
    double usage;
 
1333
 
 
1334
    gstate.host_info.coprocs.clear_usage();
 
1335
#ifndef SIM
 
1336
    if (gstate.host_info.have_cuda()) {
 
1337
        gstate.host_info.coprocs.cuda.get_available_ram();
 
1338
        if (log_flags.coproc_debug) {
 
1339
            gstate.host_info.coprocs.cuda.print_available_ram();
 
1340
        }
 
1341
    }
 
1342
    if (gstate.host_info.have_ati()) {
 
1343
        gstate.host_info.coprocs.ati.get_available_ram();
 
1344
        if (log_flags.coproc_debug) {
 
1345
            gstate.host_info.coprocs.ati.print_available_ram();
 
1346
        }
 
1347
    }
 
1348
#endif
 
1349
 
 
1350
    // fill in pending usage
 
1351
    //
 
1352
    for (i=0; i<jobs.size(); i++) {
 
1353
        RESULT* rp = jobs[i];
 
1354
        APP_VERSION* avp = rp->avp;
 
1355
        if (avp->ncudas) {
 
1356
            usage = avp->ncudas;
 
1357
            cp = &gstate.host_info.coprocs.cuda;
 
1358
        } else if (avp->natis) {
 
1359
            usage = avp->natis;
 
1360
            cp = &gstate.host_info.coprocs.ati;
 
1361
        } else {
 
1362
            continue;
 
1363
        }
 
1364
        ACTIVE_TASK* atp = gstate.lookup_active_task_by_result(rp);
 
1365
        if (!atp) continue;
 
1366
        if (atp->task_state() != PROCESS_EXECUTING) continue;
 
1367
        increment_pending_usage(rp, usage, cp);
 
1368
    }
 
1369
 
 
1370
    vector<RESULT*>::iterator job_iter;
 
1371
    job_iter = jobs.begin();
 
1372
    while (job_iter != jobs.end()) {
 
1373
        RESULT* rp = *job_iter;
 
1374
        APP_VERSION* avp = rp->avp;
 
1375
        if (avp->ncudas) {
 
1376
            usage = avp->ncudas;
 
1377
            cp = &gstate.host_info.coprocs.cuda;
 
1378
        } else if (avp->natis) {
 
1379
            usage = avp->natis;
 
1380
            cp = &gstate.host_info.coprocs.ati;
 
1381
        } else {
 
1382
            job_iter++;
 
1383
            continue;
 
1384
        }
 
1385
 
 
1386
        ACTIVE_TASK* atp = gstate.lookup_active_task_by_result(rp);
 
1387
        bool defer_sched;
 
1388
        if (atp && atp->task_state() == PROCESS_EXECUTING) {
 
1389
            if (current_assignment_ok(rp, usage, cp, defer_sched)) {
 
1390
                confirm_current_assignment(rp, usage, cp);
 
1391
                job_iter++;
 
1392
            } else {
 
1393
                if (defer_sched) {
 
1394
                    mark_as_defer_sched(rp);
 
1395
                }
 
1396
                job_iter = jobs.erase(job_iter);
 
1397
            }
 
1398
        } else {
 
1399
            if (usage < 1) {
 
1400
                if (get_fractional_assignment(rp, usage, cp, defer_sched)) {
 
1401
                    job_iter++;
 
1402
                } else {
 
1403
                    if (defer_sched) {
 
1404
                        mark_as_defer_sched(rp);
 
1405
                    }
 
1406
                    job_iter = jobs.erase(job_iter);
 
1407
                }
 
1408
            } else {
 
1409
                if (get_integer_assignment(rp, usage, cp, defer_sched)) {
 
1410
                    job_iter++;
 
1411
                } else {
 
1412
                    if (defer_sched) {
 
1413
                        mark_as_defer_sched(rp);
 
1414
                    }
 
1415
                    job_iter = jobs.erase(job_iter);
 
1416
                }
 
1417
            }
 
1418
        }
 
1419
    }
 
1420
 
746
1421
#if 0
747
 
        for (i=0; i<preemptable_tasks.size(); i++) {
748
 
                atp = preemptable_tasks[i];
749
 
                msg_printf(0, MSG_INFO, "list %d: %s", i, atp->result->name);
750
 
        }
 
1422
    // enforce "don't use GPUs while active" pref in NVIDIA case;
 
1423
    // it applies only to GPUs running a graphics app
 
1424
    //
 
1425
    if (gstate.host_info.coprocs.cuda.count && gstate.user_active && !gstate.global_prefs.run_gpu_if_user_active) {
 
1426
        job_iter = jobs.begin();
 
1427
        while (job_iter != jobs.end()) {
 
1428
            RESULT* rp = *job_iter;
 
1429
            if (!rp->avp->ncudas) {
 
1430
                job_iter++;
 
1431
                continue;
 
1432
            }
 
1433
            ACTIVE_TASK* atp = gstate.lookup_active_task_by_result(rp);
 
1434
            bool some_gpu_busy = false;
 
1435
            for (i=0; i<rp->avp->ncudas; i++) {
 
1436
                int dev = atp->coproc_indices[i];
 
1437
                if (gstate.host_info.coprocs.cuda.running_graphics_app[dev]) {
 
1438
                    some_gpu_busy = true;
 
1439
                    break;
 
1440
                }
 
1441
            }
 
1442
            if (some_gpu_busy) {
 
1443
                job_iter = jobs.erase(job_iter);
 
1444
            } else {
 
1445
                job_iter++;
 
1446
            }
 
1447
        }
 
1448
    }
751
1449
#endif
752
1450
}
753
1451
 
768
1466
// 
769
1467
bool CLIENT_STATE::enforce_schedule() {
770
1468
    unsigned int i;
771
 
    ACTIVE_TASK* atp, *preempt_atp;
772
1469
    vector<ACTIVE_TASK*> preemptable_tasks;
773
1470
    static double last_time = 0;
774
1471
    int retval;
775
 
    double ncpus_used;
776
 
    bool preempt_by_quit;
 
1472
    double ncpus_used=0, ncpus_used_non_gpu=0;
 
1473
    ACTIVE_TASK* atp;
777
1474
 
778
 
    // Do this when requested, and once a minute as a safety net
 
1475
    // NOTE: there's an assumption that debt is adjusted at
 
1476
    // least as often as the CPU sched period (see client_state.h).
 
1477
    // If you remove the following, make changes accordingly
779
1478
    //
780
 
    if (now - last_time > 60) {
781
 
        must_enforce_cpu_schedule = true;
782
 
    }
783
 
    if (!must_enforce_cpu_schedule) return false;
784
 
    must_enforce_cpu_schedule = false;
 
1479
    adjust_debts();
785
1480
    last_time = now;
786
1481
    bool action = false;
787
1482
 
 
1483
#ifndef SIM
 
1484
    // check whether GPUs are usable
 
1485
    //
 
1486
    if (check_coprocs_usable()) {
 
1487
        request_schedule_cpus("GPU usability change");
 
1488
        return true;
 
1489
    }
 
1490
#endif
 
1491
 
788
1492
    if (log_flags.cpu_sched_debug) {
789
 
        msg_printf(0, MSG_INFO, "[cpu_sched_debug] enforce_schedule(): start");
790
 
        for (i=0; i<ordered_scheduled_results.size(); i++) {
791
 
            RESULT* rp = ordered_scheduled_results[i];
792
 
            msg_printf(rp->project, MSG_INFO,
793
 
                "[cpu_sched_debug] want to run: %s",
794
 
                rp->name
795
 
            );
796
 
        }
797
 
    }
798
 
 
799
 
    // set temporary variables
800
 
    //
801
 
    for (i=0; i<projects.size(); i++){
802
 
        projects[i]->deadlines_missed = projects[i]->rr_sim_status.deadlines_missed;
803
 
    }
804
 
 
805
 
    // Set next_scheduler_state to preempt
 
1493
        msg_printf(0, MSG_INFO, "[cpu_sched] enforce_schedule(): start");
 
1494
        msg_printf(0, MSG_INFO, "[cpu_sched] preliminary job list:");
 
1495
        print_job_list(ordered_scheduled_results);
 
1496
    }
 
1497
 
 
1498
    // Set next_scheduler_state to PREEMPT for all tasks
806
1499
    //
807
1500
    for (i=0; i< active_tasks.active_tasks.size(); i++) {
808
1501
        atp = active_tasks.active_tasks[i];
809
1502
        atp->next_scheduler_state = CPU_SCHED_PREEMPTED;
810
1503
    }
811
1504
 
812
 
    make_preemptable_task_list(preemptable_tasks, ncpus_used);
 
1505
    // make initial "to-run" list
 
1506
    //
 
1507
    vector<RESULT*>runnable_jobs;
 
1508
    for (i=0; i<ordered_scheduled_results.size(); i++) {
 
1509
        RESULT* rp = ordered_scheduled_results[i];
 
1510
        rp->seqno = i;
 
1511
        rp->unfinished_time_slice = false;
 
1512
        runnable_jobs.push_back(rp);
 
1513
    }
 
1514
 
 
1515
    // append running jobs not done with time slice to the to-run list
 
1516
    //
 
1517
    append_unfinished_time_slice(runnable_jobs);
 
1518
 
 
1519
    // sort to-run list by decreasing importance
 
1520
    //
 
1521
    std::sort(
 
1522
        runnable_jobs.begin(),
 
1523
        runnable_jobs.end(),
 
1524
        more_important
 
1525
    );
 
1526
 
 
1527
    promote_multi_thread_jobs(runnable_jobs);
 
1528
 
 
1529
    if (log_flags.cpu_sched_debug) {
 
1530
        msg_printf(0, MSG_INFO, "[cpu_sched] final job list:");
 
1531
        print_job_list(runnable_jobs);
 
1532
    }
813
1533
 
814
1534
    double ram_left = available_ram();
 
1535
    double swap_left = (global_prefs.vm_max_used_frac)*host_info.m_swap;
815
1536
 
816
1537
    if (log_flags.mem_usage_debug) {
817
1538
        msg_printf(0, MSG_INFO,
818
 
            "[mem_usage_debug] enforce: available RAM %.2fMB",
819
 
            ram_left/MEGA
 
1539
            "[mem_usage] enforce: available RAM %.2fMB swap %.2fMB",
 
1540
            ram_left/MEGA, swap_left/MEGA
820
1541
        );
821
1542
    }
822
1543
 
823
 
    // schedule all non CPU intensive tasks
 
1544
    for (i=0; i<projects.size(); i++) {
 
1545
        projects[i]->cuda_defer_sched = false;
 
1546
        projects[i]->ati_defer_sched = false;
 
1547
    }
 
1548
 
 
1549
    // schedule non-CPU-intensive tasks,
 
1550
    // and look for backed-off GPU jobs
824
1551
    //
825
1552
    for (i=0; i<results.size(); i++) {
826
1553
        RESULT* rp = results[i];
828
1555
            atp = get_task(rp);
829
1556
            atp->next_scheduler_state = CPU_SCHED_SCHEDULED;
830
1557
            ram_left -= atp->procinfo.working_set_size_smoothed;
831
 
        }
832
 
    }
833
 
 
834
 
    double swap_left = (global_prefs.vm_max_used_frac)*host_info.m_swap;
835
 
 
836
 
#ifdef _WIN32
837
 
    // see whether we have any coproc jobs, and total their CPU usage
838
 
    //
839
 
        double new_ncpus_used = 0;
840
 
    bool have_coproc_job = false;
841
 
    for (i=0; i<ordered_scheduled_results.size(); i++) {
842
 
        RESULT* rp = ordered_scheduled_results[i];
843
 
        if (rp->uses_coprocs()) {
844
 
            have_coproc_job = true;
845
 
            new_ncpus_used += rp->avp->avg_ncpus;
846
 
        }
847
 
    }
848
 
#endif
849
 
 
850
 
    // Loop through the jobs we want to schedule.
851
 
    // Invariant: "ncpus_used" is the sum of CPU usage
852
 
    // of tasks with next_scheduler_state == CPU_SCHED_SCHEDULED
853
 
        // (including preemptable jobs).
854
 
        // Win: "new_ncpus_used" is the sum excluding preemptable jobs.
855
 
    //
856
 
    for (i=0; i<ordered_scheduled_results.size(); i++) {
857
 
        RESULT* rp = ordered_scheduled_results[i];
858
 
        if (log_flags.cpu_sched_debug) {
859
 
            msg_printf(rp->project, MSG_INFO,
860
 
                "[cpu_sched_debug] processing %s", rp->name
861
 
            );
862
 
        }
863
 
 
864
 
#ifdef _WIN32
865
 
                // Windows: if we have a coproc job, don't saturate CPUs
866
 
                //
867
 
                if (have_coproc_job && !rp->uses_coprocs()) {
868
 
                        if (new_ncpus_used + rp->avp->avg_ncpus >= ncpus) continue;
869
 
                }
870
 
#endif
871
 
 
 
1558
            swap_left -= atp->procinfo.swap_size;
 
1559
        }
 
1560
        if (rp->schedule_backoff) {
 
1561
            if (rp->schedule_backoff > gstate.now) {
 
1562
                if (rp->uses_cuda()) {
 
1563
                    rp->project->cuda_defer_sched = true;
 
1564
                } else if (rp->uses_ati()) {
 
1565
                    rp->project->ati_defer_sched = true;
 
1566
                }
 
1567
            } else {
 
1568
                rp->schedule_backoff = 0;
 
1569
                request_schedule_cpus("schedule backoff finished");
 
1570
            }
 
1571
        }
 
1572
    }
 
1573
 
 
1574
    // assign coprocessors to coproc jobs,
 
1575
    // and prune those that can't be assigned
 
1576
    //
 
1577
    assign_coprocs(runnable_jobs);
 
1578
 
 
1579
    // prune jobs that don't fit in RAM or that exceed CPU usage limits.
 
1580
    // Mark the rest as SCHEDULED
 
1581
    //
 
1582
    bool running_multithread = false;
 
1583
    for (i=0; i<runnable_jobs.size(); i++) {
 
1584
        RESULT* rp = runnable_jobs[i];
872
1585
        atp = lookup_active_task_by_result(rp);
 
1586
 
 
1587
        if (!rp->uses_coprocs()) {
 
1588
            // see if we're already using too many CPUs to run this job
 
1589
            //
 
1590
            if (ncpus_used >= ncpus) {
 
1591
                if (log_flags.cpu_sched_debug) {
 
1592
                    msg_printf(rp->project, MSG_INFO,
 
1593
                        "[cpu_sched] all CPUs used (%.2f > %d), skipping %s",
 
1594
                        ncpus_used, ncpus,
 
1595
                        rp->name
 
1596
                    );
 
1597
                }
 
1598
                continue;
 
1599
            }
 
1600
        
 
1601
            // Don't run a multithread app if usage would be #CPUS+1 or more.
 
1602
            // Multithread apps don't run well on an overcommitted system.
 
1603
            // Allow usage of #CPUS + fraction,
 
1604
            // so that a GPU app and a multithread app can run together.
 
1605
            //
 
1606
            if (rp->avp->avg_ncpus > 1) {
 
1607
                if (ncpus_used_non_gpu && (ncpus_used_non_gpu + rp->avp->avg_ncpus >= ncpus+1)) {
 
1608
                    // the "ncpus_used &&" is to allow running a job that uses
 
1609
                    // more than ncpus (this can happen in pathological cases)
 
1610
 
 
1611
                    if (log_flags.cpu_sched_debug) {
 
1612
                        msg_printf(rp->project, MSG_INFO,
 
1613
                            "[cpu_sched] not enough CPUs for multithread job, skipping %s",
 
1614
                            rp->name
 
1615
                        );
 
1616
                    }
 
1617
                    continue;
 
1618
                }
 
1619
                running_multithread = true;
 
1620
            } else {
 
1621
                // here for a single-thread app.
 
1622
                // Don't run if we're running a multithread app,
 
1623
                // and running this app would overcommit CPUs.
 
1624
                //
 
1625
                if (running_multithread) {
 
1626
                    if (ncpus_used + 1 > ncpus) {
 
1627
                        if (log_flags.cpu_sched_debug) {
 
1628
                            msg_printf(rp->project, MSG_INFO,
 
1629
                                "[cpu_sched] avoiding overcommit with multithread job, skipping %s",
 
1630
                                rp->name
 
1631
                            );
 
1632
                        }
 
1633
                        continue;
 
1634
                    }
 
1635
                }
 
1636
            }
 
1637
        }
 
1638
 
873
1639
        if (atp) {
874
1640
            atp->too_large = false;
875
1641
            if (atp->procinfo.working_set_size_smoothed > ram_left) {
876
1642
                atp->too_large = true;
877
1643
                if (log_flags.mem_usage_debug) {
878
1644
                    msg_printf(rp->project, MSG_INFO,
879
 
                        "[mem_usage_debug] enforce: result %s can't run, too big %.2fMB > %.2fMB",
 
1645
                        "[mem_usage] enforce: result %s can't run, too big %.2fMB > %.2fMB",
880
1646
                        rp->name,  atp->procinfo.working_set_size_smoothed/MEGA, ram_left/MEGA
881
1647
                    );
882
1648
                }
884
1650
            }
885
1651
        }
886
1652
 
887
 
        // Preempt tasks if needed (and possible).
888
 
        //
889
 
        bool failed_to_preempt = false;
890
 
                while (1) {
891
 
                        double next_ncpus_used = ncpus_used + rp->avp->avg_ncpus;
892
 
                                // the # of CPUs used if we run this job
893
 
 
894
 
                        if (log_flags.cpu_sched_debug) {
895
 
                                msg_printf(0, MSG_INFO, "ncpus_used %f next_ncpus_used %f",
896
 
                                        ncpus_used, next_ncpus_used
897
 
                                );
898
 
                        }
899
 
                        if (!preemptable_tasks.size()) break;
900
 
#ifdef _WIN32
901
 
                        if (have_coproc_job) {
902
 
                                if (ncpus_used + rp->avp->avg_ncpus < ncpus) break;
903
 
                        } else {
904
 
                                if (ncpus_used < ncpus) break;
905
 
                        }
906
 
#else
907
 
            if (ncpus_used < ncpus) break;
908
 
#endif
909
 
            // Preempt the most preemptable task if either
910
 
            // 1) it's completed its time slice and has checkpointed recently
911
 
            // 2) the scheduled result is in deadline trouble
912
 
            //
913
 
            preempt_atp = preemptable_tasks.back();
914
 
            if (rp->project->deadlines_missed || finished_time_slice(preempt_atp)) {
915
 
                if (rp->project->deadlines_missed) {
916
 
                    rp->project->deadlines_missed--;
917
 
                }
918
 
                preempt_atp->next_scheduler_state = CPU_SCHED_PREEMPTED;
919
 
                ncpus_used -= preempt_atp->app_version->avg_ncpus;
920
 
                preemptable_tasks.pop_back();
921
 
                if (log_flags.cpu_sched_debug) {
922
 
                    msg_printf(rp->project, MSG_INFO,
923
 
                        "[cpu_sched_debug] preempting %s",
924
 
                        preempt_atp->result->name
925
 
                    );
926
 
                }
927
 
            } else {
928
 
                if (log_flags.cpu_sched_debug) {
929
 
                    msg_printf(rp->project, MSG_INFO,
930
 
                        "[cpu_sched_debug] didn't preempt %s: tr %f tsc %f",
931
 
                        preempt_atp->result->name,
932
 
                                                now - preempt_atp->run_interval_start_wall_time,
933
 
                                                now - preempt_atp->checkpoint_wall_time
934
 
                    );
935
 
                }
936
 
                failed_to_preempt = true;
937
 
                break;
938
 
            }
939
 
        }
940
 
 
941
 
        if (failed_to_preempt && !rp->uses_coprocs()) {
942
 
            continue;
 
1653
        if (log_flags.cpu_sched_debug) {
 
1654
            msg_printf(rp->project, MSG_INFO,
 
1655
                "[cpu_sched] scheduling %s", rp->name
 
1656
            );
943
1657
        }
944
1658
 
945
1659
        // We've decided to run this job; create an ACTIVE_TASK if needed.
946
1660
        //
947
1661
        if (!atp) {
948
 
                        atp = get_task(rp);
 
1662
            atp = get_task(rp);
 
1663
        }
 
1664
 
 
1665
        // don't count CPU usage by GPU jobs
 
1666
        if (!rp->uses_coprocs()) {
 
1667
            ncpus_used_non_gpu += rp->avp->avg_ncpus;
949
1668
        }
950
1669
        ncpus_used += rp->avp->avg_ncpus;
951
 
#ifdef _WIN32
952
 
                if (!rp->uses_coprocs()) {
953
 
                        new_ncpus_used += rp->avp->avg_ncpus;
954
 
                }
955
 
#endif
956
1670
        atp->next_scheduler_state = CPU_SCHED_SCHEDULED;
957
1671
        ram_left -= atp->procinfo.working_set_size_smoothed;
958
1672
    }
959
 
    if (log_flags.cpu_sched_debug) {
960
 
        msg_printf(0, MSG_INFO,
961
 
            "[cpu_sched_debug] finished preempt loop, ncpus_used %f",
962
 
            ncpus_used
963
 
        );
964
 
    }
965
 
 
966
 
    // any jobs still in the preemptable list at this point are runnable;
967
 
    // make sure they don't exceed RAM limits
968
 
    //
969
 
    for (i=0; i<preemptable_tasks.size(); i++) {
970
 
        atp = preemptable_tasks[i];
971
 
        if (atp->procinfo.working_set_size_smoothed > ram_left) {
972
 
            atp->next_scheduler_state = CPU_SCHED_PREEMPTED;
973
 
            atp->too_large = true;
974
 
            if (log_flags.mem_usage_debug) {
975
 
                msg_printf(atp->result->project, MSG_INFO,
976
 
                    "[mem_usage_debug] enforce: result %s can't keep, too big %.2fMB > %.2fMB",
977
 
                    atp->result->name, atp->procinfo.working_set_size_smoothed/MEGA, ram_left/MEGA
978
 
                );
979
 
            }
980
 
        } else {
981
 
            atp->too_large = false;
982
 
            ram_left -= atp->procinfo.working_set_size_smoothed;
983
 
        }
984
 
    }
985
1673
 
986
1674
    if (log_flags.cpu_sched_debug && ncpus_used < ncpus) {
987
 
        msg_printf(0, MSG_INFO, "[cpu_sched_debug] using %f out of %d CPUs",
 
1675
        msg_printf(0, MSG_INFO, "[cpu_sched] using %.2f out of %d CPUs",
988
1676
            ncpus_used, ncpus
989
1677
        );
990
1678
        if (ncpus_used < ncpus) {
995
1683
    bool check_swap = (host_info.m_swap != 0);
996
1684
        // in case couldn't measure swap on this host
997
1685
 
998
 
    // preempt and start tasks as needed
 
1686
    // TODO: enforcement of swap space is broken right now
 
1687
 
 
1688
    // preempt tasks as needed, and note whether there are any coproc jobs
 
1689
    // in QUIT_PENDING state (in which case we won't start new coproc jobs)
999
1690
    //
 
1691
    bool coproc_quit_pending = false;
1000
1692
    for (i=0; i<active_tasks.active_tasks.size(); i++) {
1001
1693
        atp = active_tasks.active_tasks[i];
1002
1694
        if (log_flags.cpu_sched_debug) {
1003
1695
            msg_printf(atp->result->project, MSG_INFO,
1004
 
                "[cpu_sched_debug] %s sched state %d next %d task state %d",
 
1696
                "[cpu_sched] %s sched state %d next %d task state %d",
1005
1697
                atp->result->name, atp->scheduler_state,
1006
1698
                atp->next_scheduler_state, atp->task_state()
1007
1699
            );
1008
1700
        }
 
1701
        int preempt_type = REMOVE_MAYBE_SCHED;
1009
1702
        switch (atp->next_scheduler_state) {
1010
1703
        case CPU_SCHED_PREEMPTED:
1011
1704
            switch (atp->task_state()) {
1012
1705
            case PROCESS_EXECUTING:
1013
1706
                action = true;
1014
 
                preempt_by_quit = !global_prefs.leave_apps_in_memory;
1015
1707
                if (check_swap && swap_left < 0) {
1016
1708
                    if (log_flags.mem_usage_debug) {
1017
1709
                        msg_printf(atp->result->project, MSG_INFO,
1018
 
                            "[mem_usage_debug] out of swap space, will preempt by quit"
 
1710
                            "[mem_usage] out of swap space, will preempt by quit"
1019
1711
                        );
1020
1712
                    }
1021
 
                    preempt_by_quit = true;
 
1713
                    preempt_type = REMOVE_ALWAYS;
1022
1714
                }
1023
1715
                if (atp->too_large) {
1024
1716
                    if (log_flags.mem_usage_debug) {
1025
1717
                        msg_printf(atp->result->project, MSG_INFO,
1026
 
                            "[mem_usage_debug] job using too much memory, will preempt by quit"
 
1718
                            "[mem_usage] job using too much memory, will preempt by quit"
1027
1719
                        );
1028
1720
                    }
1029
 
                    preempt_by_quit = true;
 
1721
                    preempt_type = REMOVE_ALWAYS;
1030
1722
                }
1031
 
                atp->preempt(preempt_by_quit);
 
1723
                atp->preempt(preempt_type);
1032
1724
                break;
1033
1725
            case PROCESS_SUSPENDED:
1034
1726
                // Handle the case where user changes prefs from
1035
 
                // "leave in memory" to "remove from memory".
1036
 
                // Need to quit suspended tasks.
1037
 
                                if (atp->checkpoint_cpu_time && !global_prefs.leave_apps_in_memory) {
1038
 
                    atp->preempt(true);
 
1727
                // "leave in memory" to "remove from memory";
 
1728
                // need to quit suspended tasks.
 
1729
                //
 
1730
                if (atp->checkpoint_cpu_time && !global_prefs.leave_apps_in_memory) {
 
1731
                    atp->preempt(REMOVE_ALWAYS);
1039
1732
                }
1040
1733
                break;
1041
1734
            }
1042
1735
            atp->scheduler_state = CPU_SCHED_PREEMPTED;
1043
1736
            break;
1044
 
        case CPU_SCHED_SCHEDULED:
1045
 
            switch (atp->task_state()) {
1046
 
            case PROCESS_UNINITIALIZED:
1047
 
                if (!coprocs.sufficient_coprocs(
1048
 
                    atp->app_version->coprocs, log_flags.cpu_sched_debug, "cpu_sched_debug"
1049
 
                )){
1050
 
                    continue;
 
1737
        }
 
1738
        if (atp->result->uses_coprocs() && atp->task_state() == PROCESS_QUIT_PENDING) {
 
1739
            coproc_quit_pending = true;
 
1740
        }
 
1741
    }
 
1742
 
 
1743
    bool coproc_start_deferred = false;
 
1744
    for (i=0; i<active_tasks.active_tasks.size(); i++) {
 
1745
        atp = active_tasks.active_tasks[i];
 
1746
        if (atp->next_scheduler_state != CPU_SCHED_SCHEDULED) continue;
 
1747
        int ts = atp->task_state();
 
1748
        if (ts == PROCESS_UNINITIALIZED || ts == PROCESS_SUSPENDED) {
 
1749
            // If there's a quit pending for a coproc job,
 
1750
            // don't start new ones since they may bomb out
 
1751
            // on memory allocation.  Instead, trigger a retry
 
1752
            //
 
1753
            if (atp->result->uses_coprocs() && coproc_quit_pending) {
 
1754
                coproc_start_deferred = true;
 
1755
                continue;
 
1756
            }
 
1757
            action = true;
 
1758
 
 
1759
            bool first_time;
 
1760
            // GPU tasks can get suspended before they're ever run,
 
1761
            // so the only safe way of telling whether this is the
 
1762
            // first time the app is run is to check
 
1763
            // whether the slot dir is empty
 
1764
            //
 
1765
#ifdef SIM
 
1766
            first_time = atp->scheduler_state == CPU_SCHED_UNINITIALIZED;
 
1767
#else
 
1768
            first_time = is_dir_empty(atp->slot_dir);
 
1769
#endif
 
1770
            retval = atp->resume_or_start(first_time);
 
1771
            if ((retval == ERR_SHMGET) || (retval == ERR_SHMAT)) {
 
1772
                // Assume no additional shared memory segs
 
1773
                // will be available in the next 10 seconds
 
1774
                // (run only tasks which are already attached to shared memory).
 
1775
                //
 
1776
                if (gstate.retry_shmem_time < gstate.now) {
 
1777
                    request_schedule_cpus("no more shared memory");
1051
1778
                }
1052
 
            case PROCESS_SUSPENDED:
1053
 
                action = true;
1054
 
                retval = atp->resume_or_start(
1055
 
                    atp->scheduler_state == CPU_SCHED_UNINITIALIZED
 
1779
                gstate.retry_shmem_time = gstate.now + 10.0;
 
1780
                continue;
 
1781
            }
 
1782
            if (retval) {
 
1783
                report_result_error(
 
1784
                    *(atp->result), "Couldn't start or resume: %d", retval
1056
1785
                );
1057
 
                if ((retval == ERR_SHMGET) || (retval == ERR_SHMAT)) {
1058
 
                    // Assume no additional shared memory segs
1059
 
                    // will be available in the next 10 seconds
1060
 
                    // (run only tasks which are already attached to shared memory).
1061
 
                    //
1062
 
                    if (gstate.retry_shmem_time < gstate.now) {
1063
 
                        request_schedule_cpus("no more shared memory");
1064
 
                    }
1065
 
                    gstate.retry_shmem_time = gstate.now + 10.0;
1066
 
                    continue;
1067
 
                }
1068
 
                if (retval) {
1069
 
                    report_result_error(
1070
 
                        *(atp->result), "Couldn't start or resume: %d", retval
1071
 
                    );
1072
 
                    request_schedule_cpus("start failed");
1073
 
                    continue;
1074
 
                }
1075
 
                atp->run_interval_start_wall_time = now;
1076
 
                app_started = now;
1077
 
            }
1078
 
            atp->scheduler_state = CPU_SCHED_SCHEDULED;
1079
 
            swap_left -= atp->procinfo.swap_size;
1080
 
            break;
1081
 
        }
 
1786
                request_schedule_cpus("start failed");
 
1787
                continue;
 
1788
            }
 
1789
            if (atp->result->rr_sim_misses_deadline) {
 
1790
                atp->once_ran_edf = true;
 
1791
            }
 
1792
            atp->run_interval_start_wall_time = now;
 
1793
            app_started = now;
 
1794
        }
 
1795
        if (log_flags.cpu_sched_status) {
 
1796
            msg_printf(atp->result->project, MSG_INFO,
 
1797
                "[css] running %s (%s)",
 
1798
                atp->result->name, atp->result->resources
 
1799
            );
 
1800
        }
 
1801
        atp->scheduler_state = CPU_SCHED_SCHEDULED;
 
1802
        swap_left -= atp->procinfo.swap_size;
1082
1803
    }
1083
1804
    if (action) {
1084
1805
        set_client_state_dirty("enforce_cpu_schedule");
1085
1806
    }
1086
1807
    if (log_flags.cpu_sched_debug) {
1087
 
        msg_printf(0, MSG_INFO, "[cpu_sched_debug] enforce_schedule: end");
 
1808
        msg_printf(0, MSG_INFO, "[cpu_sched] enforce_schedule: end");
 
1809
    }
 
1810
    if (coproc_start_deferred) {
 
1811
        if (log_flags.cpu_sched_debug) {
 
1812
            msg_printf(0, MSG_INFO,
 
1813
                "[cpu_sched] coproc quit pending, deferring start"
 
1814
            );
 
1815
        }
 
1816
        request_schedule_cpus("coproc quit retry");
1088
1817
    }
1089
1818
    return action;
1090
1819
}
1091
1820
 
1092
 
// return true if we don't have enough runnable tasks to keep all CPUs busy
1093
 
//
1094
 
bool CLIENT_STATE::no_work_for_a_cpu() {
1095
 
    unsigned int i;
1096
 
    int count = 0;
1097
 
 
1098
 
    for (i=0; i< results.size(); i++){
1099
 
        RESULT* rp = results[i];
1100
 
        if (!rp->nearly_runnable()) continue;
1101
 
        if (rp->project->non_cpu_intensive) continue;
1102
 
        count++;
1103
 
    }
1104
 
    return ncpus > count;
1105
 
}
1106
 
 
1107
 
// trigger CPU schedule enforcement.
1108
 
// Called when a new schedule is computed,
1109
 
// and when an app checkpoints.
1110
 
//
1111
 
void CLIENT_STATE::request_enforce_schedule(const char* where) {
1112
 
    if (log_flags.cpu_sched_debug) {
1113
 
        msg_printf(0, MSG_INFO, "[cpu_sched_debug] Request enforce CPU schedule: %s", where);
1114
 
    }
1115
 
    must_enforce_cpu_schedule = true;
1116
 
}
1117
 
 
1118
1821
// trigger CPU scheduling.
1119
1822
// Called when a result is completed, 
1120
1823
// when new results become runnable, 
1123
1826
//
1124
1827
void CLIENT_STATE::request_schedule_cpus(const char* where) {
1125
1828
    if (log_flags.cpu_sched_debug) {
1126
 
        msg_printf(0, MSG_INFO, "[cpu_sched_debug] Request CPU reschedule: %s", where);
 
1829
        msg_printf(0, MSG_INFO, "[cpu_sched] Request CPU reschedule: %s", where);
1127
1830
    }
1128
1831
    must_schedule_cpus = true;
1129
1832
}
1139
1842
    return NULL;
1140
1843
}
1141
1844
 
1142
 
bool RESULT::computing_done() {
1143
 
    return (state() >= RESULT_COMPUTE_ERROR || ready_to_report);
 
1845
bool RESULT::not_started() {
 
1846
    if (computing_done()) return false;
 
1847
    if (gstate.lookup_active_task_by_result(this)) return false;
 
1848
    return true;
1144
1849
}
1145
1850
 
1146
1851
// find total resource shares of all projects
1157
1862
 
1158
1863
// same, but only runnable projects (can use CPU right now)
1159
1864
//
1160
 
double CLIENT_STATE::runnable_resource_share() {
 
1865
double CLIENT_STATE::runnable_resource_share(int rsc_type) {
1161
1866
    double x = 0;
1162
1867
    for (unsigned int i=0; i<projects.size(); i++) {
1163
1868
        PROJECT* p = projects[i];
1164
1869
        if (p->non_cpu_intensive) continue;
1165
 
        if (p->runnable()) {
 
1870
        if (p->runnable(rsc_type)) {
1166
1871
            x += p->resource_share;
1167
1872
        }
1168
1873
    }
1183
1888
    return x;
1184
1889
}
1185
1890
 
1186
 
double CLIENT_STATE::fetchable_resource_share() {
1187
 
    double x = 0;
1188
 
    for (unsigned int i=0; i<projects.size(); i++) {
1189
 
        PROJECT* p = projects[i];
1190
 
        if (p->non_cpu_intensive) continue;
1191
 
        if (p->long_term_debt < -global_prefs.cpu_scheduling_period()) continue;
1192
 
        if (p->contactable()) {
1193
 
            x += p->resource_share;
1194
 
        }
1195
 
    }
1196
 
    return x;
1197
 
}
1198
 
 
1199
1891
// same, but nearly runnable (could be downloading work right now)
1200
1892
//
1201
1893
double CLIENT_STATE::nearly_runnable_resource_share() {
1227
1919
    ACTIVE_TASK *atp = lookup_active_task_by_result(rp);
1228
1920
    if (!atp) {
1229
1921
        atp = new ACTIVE_TASK;
1230
 
        atp->slot = active_tasks.get_free_slot();
 
1922
        atp->get_free_slot(rp);
1231
1923
        atp->init(rp);
1232
1924
        active_tasks.active_tasks.push_back(atp);
1233
1925
    }
1241
1933
    return report_deadline - (
1242
1934
        gstate.work_buf_min()
1243
1935
            // Seconds that the host will not be connected to the Internet
1244
 
        + gstate.global_prefs.cpu_scheduling_period()
1245
 
            // Seconds that the CPU may be busy with some other result
1246
1936
        + DEADLINE_CUSHION
1247
1937
    );
1248
1938
}
1264
1954
    _state = val;
1265
1955
    if (log_flags.task_debug) {
1266
1956
        msg_printf(project, MSG_INFO,
1267
 
            "[task_debug] result state=%s for %s from %s",
 
1957
            "[task] result state=%s for %s from %s",
1268
1958
            result_state_name(val), name, where
1269
1959
        );
1270
1960
    }
1271
1961
}
1272
1962
 
1273
1963
// called at startup (after get_host_info())
1274
 
// and when general prefs have been parsed
 
1964
// and when general prefs have been parsed.
 
1965
// NOTE: GSTATE.NCPUS MUST BE 1 OR MORE; WE DIVIDE BY IT IN A COUPLE OF PLACES
1275
1966
//
1276
1967
void CLIENT_STATE::set_ncpus() {
1277
1968
    int ncpus_old = ncpus;
1293
1984
 
1294
1985
    if (initialized && ncpus != ncpus_old) {
1295
1986
        msg_printf(0, MSG_INFO,
1296
 
            "Number of usable CPUs has changed from %d to %d.  Running benchmarks.",
 
1987
            "Number of usable CPUs has changed from %d to %d.",
1297
1988
            ncpus_old, ncpus
1298
1989
        );
1299
 
        run_cpu_benchmarks = true;
1300
1990
        request_schedule_cpus("Number of usable CPUs has changed");
1301
1991
        request_work_fetch("Number of usable CPUs has changed");
1302
 
    }
1303
 
}
1304
 
 
1305
 
// preempt this task
1306
 
// called from the CLIENT_STATE::schedule_cpus()
1307
 
// if quit_task is true, do this by quitting
1308
 
//
1309
 
int ACTIVE_TASK::preempt(bool quit_task) {
1310
 
    int retval;
1311
 
 
1312
 
    // If the app hasn't checkpoint yet, suspend instead of quit
1313
 
    // (accommodate apps that never checkpoint)
1314
 
    //
1315
 
    if (quit_task && (checkpoint_cpu_time>0)) {
1316
 
        if (log_flags.cpu_sched) {
1317
 
            msg_printf(result->project, MSG_INFO,
1318
 
                "[cpu_sched] Preempting %s (removed from memory)",
1319
 
                result->name
1320
 
            );
1321
 
        }
1322
 
        set_task_state(PROCESS_QUIT_PENDING, "preempt");
1323
 
        retval = request_exit();
1324
 
    } else {
1325
 
        if (log_flags.cpu_sched) {
1326
 
            if (quit_task) {
1327
 
                msg_printf(result->project, MSG_INFO,
1328
 
                    "[cpu_sched] Preempting %s (left in memory because no checkpoint yet)",
1329
 
                    result->name
1330
 
                );
1331
 
            } else {
1332
 
                msg_printf(result->project, MSG_INFO,
1333
 
                    "[cpu_sched] Preempting %s (left in memory)",
1334
 
                    result->name
1335
 
                );
1336
 
            }
1337
 
        }
1338
 
        retval = suspend();
1339
 
    }
1340
 
    return 0;
 
1992
        work_fetch.init();
 
1993
    }
1341
1994
}
1342
1995
 
1343
1996
// The given result has just completed successfully.
1345
1998
// completion time for this project's results
1346
1999
//
1347
2000
void PROJECT::update_duration_correction_factor(ACTIVE_TASK* atp) {
1348
 
        RESULT* rp = atp->result;
 
2001
    RESULT* rp = atp->result;
1349
2002
#ifdef SIM
1350
2003
    if (dcf_dont_use) {
1351
2004
        duration_correction_factor = 1.0;
1352
2005
        return;
1353
2006
    }
1354
2007
    if (dcf_stats) {
1355
 
        ((SIM_PROJECT*)this)->update_dcf_stats(rp);
 
2008
        update_dcf_stats(rp);
1356
2009
        return;
1357
2010
    }
1358
2011
#endif
1359
2012
    double raw_ratio = atp->elapsed_time/rp->estimated_duration_uncorrected();
1360
 
    double adj_ratio = atp->elapsed_time/rp->estimated_duration(false);
 
2013
    double adj_ratio = atp->elapsed_time/rp->estimated_duration();
1361
2014
    double old_dcf = duration_correction_factor;
1362
2015
 
1363
2016
    // it's OK to overestimate completion time,
1382
2035
    if (duration_correction_factor > 100) duration_correction_factor = 100;
1383
2036
    if (duration_correction_factor < 0.01) duration_correction_factor = 0.01;
1384
2037
 
1385
 
    if (log_flags.cpu_sched_debug || log_flags.work_fetch_debug) {
 
2038
    if (log_flags.dcf_debug) {
1386
2039
        msg_printf(this, MSG_INFO,
1387
 
            "[csd|wfd] DCF: %f->%f, raw_ratio %f, adj_ratio %f",
 
2040
            "[dcf] DCF: %f->%f, raw_ratio %f, adj_ratio %f",
1388
2041
            old_dcf, duration_correction_factor, raw_ratio, adj_ratio
1389
2042
        );
1390
2043
    }
1391
2044
}
1392
2045
 
1393
 
const char *BOINC_RCSID_e830ee1 = "$Id: cpu_sched.cpp 16649 2008-12-08 19:58:02Z romw $";