1
/*___INFO__MARK_BEGIN__*/
2
/*************************************************************************
4
* The Contents of this file are made available subject to the terms of
5
* the Sun Industry Standards Source License Version 1.2
7
* Sun Microsystems Inc., March, 2001
10
* Sun Industry Standards Source License Version 1.2
11
* =================================================
12
* The contents of this file are subject to the Sun Industry Standards
13
* Source License Version 1.2 (the "License"); You may not use this file
14
* except in compliance with the License. You may obtain a copy of the
15
* License at http://gridengine.sunsource.net/Gridengine_SISSL_license.html
17
* Software provided under this License is provided on an "AS IS" basis,
18
* WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING,
19
* WITHOUT LIMITATION, WARRANTIES THAT THE SOFTWARE IS FREE OF DEFECTS,
20
* MERCHANTABLE, FIT FOR A PARTICULAR PURPOSE, OR NON-INFRINGING.
21
* See the License for the specific provisions governing your rights and
22
* obligations concerning the Software.
24
* The Initial Developer of the Original Code is: Sun Microsystems, Inc.
26
* Copyright: 2001 by Sun Microsystems, Inc.
28
* All Rights Reserved.
30
************************************************************************/
31
/*___INFO__MARK_END__*/
41
#include "msg_common.h"
42
#include "basis_types.h"
47
#include "sge_select_queue.h"
48
#include "sge_select_queueL.h"
49
#include "sge_parse_num_par.h"
50
#include "sge_complex_schedd.h"
51
#include "valid_queue_user.h"
52
#include "subordinate_schedd.h"
53
#include "sge_range_schedd.h"
54
#include "sge_pe_schedd.h"
55
#include "sge_range.h"
58
#include "sge_orderL.h"
61
#include "sge_qinstanceL.h"
66
#include "sort_hosts.h"
67
#include "schedd_monitor.h"
68
#include "schedd_message.h"
69
#include "msg_schedd.h"
70
#include "sge_schedd_text.h"
71
#include "sge_ja_task.h"
72
#include "sge_string.h"
73
#include "sge_hostname.h"
76
#include "sge_cqueue.h"
77
#include "sge_qinstance.h"
78
#include "sge_qinstance_type.h"
79
#include "sge_userprj.h"
81
#include "sge_centry.h"
82
#include "sge_object.h"
83
#include "sge_resource_utilization.h"
84
#include "sge_qinstance_state.h"
85
#include "sge_schedd_conf.h"
86
#include "sge_subordinate.h"
89
#include "sge_pqs_api.h"
91
#include "sge_calendarL.h"
92
#include "sge_attrL.h"
93
#include "sched/sge_resource_quota_schedd.h"
94
#include "sgeobj/sge_advance_reservation.h"
95
#include "sgeobj/sge_userset.h"
96
#include "sgeobj/sge_hgroup.h"
97
#include "uti/sge_time.h"
99
/* -- these implement helpers for the category optimization -------- */
102
lListElem *category; /* ref to the category */
103
lListElem *cache; /* ref to the cache object in th category */
104
bool use_category; /* if true: use the category
105
with immediate dispatch runs only and only if there is more than a single job of that category
106
prevents 'skip_host_list' and 'skip_queue_list' be used with reservation */
107
bool mod_category; /* if true: update the category with new messages, queues, and hosts */
108
u_long32 *posible_pe_slots; /* stores all posible slots settings for a pe job with ranges */
109
bool is_pe_slots_rev; /* if it is true, the posible_pe_slots are stored in the category */
113
fill_category_use_t(const sge_assignment_t *best, category_use_t *use_category, const char *pe_name);
116
add_pe_slots_to_category(category_use_t *use_category, u_long32 *max_slotsp, lListElem *pe,
117
int min_slots, int max_slots, lList *pe_range);
118
/* -- these implement parallel assignemnt ------------------------- */
121
parallel_reservation_max_time_slots(sge_assignment_t *best, int *available_slots);
124
parallel_maximize_slots_pe(sge_assignment_t *best, int *available_slots);
127
parallel_assignment(sge_assignment_t *a, category_use_t *use_category, int *available_slots);
130
#pragma no_inline(parallel_assignment)
134
parallel_available_slots(const sge_assignment_t *a, int *slots, int *slots_qend);
137
parallel_tag_queues_suitable4job(sge_assignment_t *assignment, category_use_t *use_category, int *available_slots);
140
parallel_global_slots(const sge_assignment_t *a, int *slots, int *slots_qend);
143
parallel_tag_hosts_queues(sge_assignment_t *a, lListElem *hep, int *slots,
144
int *slots_qend, bool *master_host, category_use_t *use_category,
145
lList **unclear_cqueue_list);
148
#pragma no_inline(parallel_tag_hosts_queues)
152
parallel_max_host_slots(sge_assignment_t *a, lListElem *host);
156
parallel_queue_slots(sge_assignment_t *a,lListElem *qep, int *slots, int *slots_qend,
157
bool allow_non_requestable);
160
void clean_up_parallel_job(sge_assignment_t *a);
162
/* -- these implement sequential assignemnt ---------------------- */
165
sequential_tag_queues_suitable4job(sge_assignment_t *a);
168
sequential_queue_time( u_long32 *start, const sge_assignment_t *a, int *violations, lListElem *qep);
171
sequential_host_time(u_long32 *start, const sge_assignment_t *a, int *violations, lListElem *hep);
174
sequential_global_time(u_long32 *start_time, const sge_assignment_t *a, int *violations);
177
match_static_advance_reservation(const sge_assignment_t *a);
180
sequential_update_host_order(lList *host_list, lList *queues);
182
/* -- base functions ---------------------------------------------- */
185
compute_soft_violations(const sge_assignment_t *a, lListElem *queue, int violation, lList *load_attr, lList *config_attr,
186
lList *actual_attr, u_long32 layer, double lc_factor, u_long32 tag);
189
rc_time_by_slots(const sge_assignment_t *a, lList *requested, lList *load_attr, lList *config_attr, lList *actual_attr,
190
lListElem *queue, bool allow_non_requestable, dstring *reason, int slots,
191
u_long32 layer, double lc_factor, u_long32 tag, u_long32 *start_time, const char *object_name);
196
ri_slots_by_time(const sge_assignment_t *a, int *slots, int *slots_qend,
197
lList *rue_list, lListElem *request, lList *load_attr, lList *total_list, lListElem *queue,
198
u_long32 layer, double lc_factor, dstring *reason, bool allow_non_requestable,
199
bool no_centry, const char *object_name);
202
match_static_resource(int slots, lListElem *req_cplx, lListElem *src_cplx, dstring *reason,
203
int is_threshold, int force_existence, bool allow_non_requestable);
206
resource_cmp(u_long32 relop, double req, double src_dl);
209
job_is_forced_centry_missing(const lListElem *job, const lList *master_centry_list, const lListElem *queue_or_host);
212
clear_resource_tags( lList *resources, u_long32 max_tag);
215
find_best_result(dispatch_t r1, dispatch_t r2);
217
/* ---- helpers for load computation ---------------------------------------------------------- */
220
*load_locate_elem(lList *load_list, lListElem *global_consumable, lListElem *host_consumable,
221
lListElem *queue_consumable, const char *limit);
225
sge_call_pe_qsort(sge_assignment_t *a, const char *qsort_args);
229
load_check_alarm(char *reason, const char *name, const char *load_value, const char *limit_value,
230
u_long32 relop, u_long32 type, lListElem *hep, lListElem *hlep, double lc_host,
231
double lc_global, const lList *load_adjustments, int load_is_value);
234
load_np_value_adjustment(const char* name, lListElem *hep, double *load_correction);
236
/* ---- Implementation ------------------------------------------------------------------------- */
238
void assignment_init(sge_assignment_t *a, lListElem *job, lListElem *ja_task, bool is_load_adj)
242
a->user = lGetString(job, JB_owner);
243
a->group = lGetString(job, JB_group);
244
a->project = lGetString(job, JB_project);
245
a->job_id = lGetUlong(job, JB_job_number);
246
a->is_soft = job_has_soft_requests(job);
250
a->load_adjustments = sconf_get_job_load_adjustments();
253
if (ja_task != NULL) {
254
a->ja_task = ja_task;
255
a->ja_task_id = lGetUlong(ja_task, JAT_task_number);
259
void assignment_copy(sge_assignment_t *dst, sge_assignment_t *src, bool move_gdil)
261
if (dst == NULL || src == NULL) {
265
if (dst->load_adjustments != NULL) {
266
lFreeList(&dst->load_adjustments);
270
lFreeList(&(dst->gdil));
271
lFreeList(&(dst->limit_list));
272
lFreeList(&(dst->skip_cqueue_list));
273
lFreeList(&(dst->skip_host_list));
276
memcpy(dst, src, sizeof(sge_assignment_t));
278
if (src->load_adjustments != NULL) {
279
dst->load_adjustments = lCopyList("cpy_load_adj", src->load_adjustments);
283
dst->gdil = dst->limit_list = dst->skip_cqueue_list = dst->skip_host_list = NULL;
285
src->gdil = src->limit_list = src->skip_cqueue_list = src->skip_host_list = NULL;
288
void assignment_release(sge_assignment_t *a)
290
lFreeList(&(a->load_adjustments));
291
lFreeList(&(a->gdil));
292
lFreeList(&(a->limit_list));
293
lFreeList(&(a->skip_cqueue_list));
294
lFreeList(&(a->skip_host_list));
297
void assignment_clear_cache(sge_assignment_t *a)
299
lFreeList(&(a->limit_list));
300
lFreeList(&(a->skip_cqueue_list));
301
lFreeList(&(a->skip_host_list));
305
find_best_result(dispatch_t r1, dispatch_t r2)
307
DENTER(BASIS_LAYER, "find_best_result");
309
if (r1 == DISPATCH_NEVER ||
310
r2 == DISPATCH_NEVER) {
311
DRETURN(DISPATCH_NEVER);
313
else if (r1 == DISPATCH_OK ||
315
DRETURN(DISPATCH_OK);
317
else if (r1 == DISPATCH_NOT_AT_TIME ||
318
r2 == DISPATCH_NOT_AT_TIME) {
319
DRETURN(DISPATCH_NOT_AT_TIME);
321
else if (r1 == DISPATCH_NEVER_JOB ||
322
r2 == DISPATCH_NEVER_JOB) {
323
DRETURN(DISPATCH_NEVER_JOB);
325
else if (r1 == DISPATCH_NEVER_CAT ||
326
r2 == DISPATCH_NEVER_CAT) {
327
DRETURN(DISPATCH_NEVER_CAT);
329
else if (r1 == DISPATCH_MISSING_ATTR ||
330
r2 == DISPATCH_MISSING_ATTR ) {
331
DRETURN(DISPATCH_MISSING_ATTR);
334
CRITICAL((SGE_EVENT, MSG_JOBMATCHINGUNEXPECTEDRESULT));
335
DRETURN(DISPATCH_NEVER);
340
is_not_better(sge_assignment_t *a, u_long32 viola_best, u_long32 tt_best, u_long32 viola_this, u_long32 tt_this)
342
/* earlier start time has higher preference than lower soft violations */
343
if (a->is_reservation && tt_this >= tt_best)
345
if (!a->is_reservation && viola_this >= viola_best)
350
/****** scheduler/sge_select_parallel_environment() ****************************
352
* sge_select_parallel_environment() -- Decide about a PE assignment
355
* static dispatch_t sge_select_parallel_environment(sge_assignment_t *best, lList
359
* When users use wildcard PE request such as -pe <pe_range> 'mpi8_*'
360
* more than a single parallel environment can match the wildcard expression.
361
* In case of 'now' assignments the PE that gives us the largest assignment
362
* is selected. When scheduling a reservation we search for the earliest
363
* assignment for each PE and then choose that one that finally gets us the
364
* maximum number of slots.
367
* The scheduler info messages are not cached. They are added globaly and have
368
* to be added for each job in the category. When the messages are updated
369
* this has to be changed.
372
* sge_assignment_t *best - herein we keep all important in/out information
373
* lList *pe_list - the list of all parallel environments (PE_Type)
376
* dispatch_t - 0 ok got an assignment
377
* 1 no assignment at the specified time (???)
378
* -1 assignment will never be possible for all jobs of that category
379
* -2 assignment will never be possible for that particular job
382
* MT-NOTE: sge_select_parallel_environment() is not MT safe
383
*******************************************************************************/
385
sge_select_parallel_environment(sge_assignment_t *best, lList *pe_list)
387
int matched_pe_count = 0;
389
const char *pe_request, *pe_name;
390
dispatch_t result, best_result = DISPATCH_NEVER_CAT;
393
DENTER(TOP_LAYER, "sge_select_parallel_environment");
395
pe_request = lGetString(best->job, JB_pe);
397
DPRINTF(("handling parallel job "sge_u32"."sge_u32"\n", best->job_id, best->ja_task_id));
399
if (best->is_reservation) { /* reservation scheduling */
401
if (!best->is_advance_reservation) {
402
old_logging = schedd_mes_get_logging();
403
schedd_mes_set_logging(0);
407
for_each(pe, pe_list) {
408
int available_slots = 0;
410
if (!pe_is_matching(pe, pe_request)) {
416
pe_name = lGetString(pe, PE_name);
417
if (best->gdil == NULL) { /* first pe run */
419
best->pe_name = pe_name;
421
/* determine earliest start time with that PE */
422
result = parallel_reservation_max_time_slots(best, &available_slots);
423
if (result != DISPATCH_OK) {
424
schedd_mes_add(best->job_id, SCHEDD_INFO_PESLOTSNOTINRANGE_SI, pe_name, available_slots);
425
best_result = find_best_result(best_result, result);
428
DPRINTF(("### first ### reservation in PE \"%s\" at "sge_u32" with %d soft violations\n",
429
pe_name, best->start, best->soft_violations));
430
} else { /* test with all other pes */
431
sge_assignment_t tmp = SGE_ASSIGNMENT_INIT;
433
assignment_copy(&tmp, best, false);
435
tmp.pe_name = pe_name;
437
/* try to find earlier assignment again with minimum slot amount */
439
result = parallel_reservation_max_time_slots(&tmp, &available_slots);
441
if (result != DISPATCH_OK) {
442
schedd_mes_add(best->job_id, SCHEDD_INFO_PESLOTSNOTINRANGE_SI, pe_name, available_slots);
443
best_result = find_best_result(best_result, result);
444
assignment_release(&tmp);
448
if (tmp.start < best->start ||
449
(tmp.start == best->start && tmp.soft_violations < best->soft_violations)) {
450
assignment_copy(best, &tmp, true);
451
DPRINTF(("### better ### reservation in PE \"%s\" at "sge_u32" with %d soft violations\n",
452
pe_name, best->start, best->soft_violations));
455
assignment_release(&tmp);
460
/* now assignments */
462
result = match_static_advance_reservation(best);
463
if (result != DISPATCH_OK) {
467
if ((ar_id = lGetUlong(best->job, JB_ar)) != 0) {
468
lListElem *ar = lGetElemUlong(best->ar_list, AR_id, ar_id);
469
pe = lGetElemStr(pe_list, PE_name, lGetString(ar, AR_granted_pe));
472
DPRINTF(("Critical Error, ar references non existing PE\n"));
474
int available_slots = 0;
478
best->pe_name = lGetString(pe, PE_name);
480
best_result = parallel_maximize_slots_pe(best, &available_slots);
481
if (best_result != DISPATCH_OK) {
482
schedd_mes_add(best->job_id, SCHEDD_INFO_PESLOTSNOTINRANGE_SI, best->pe_name, available_slots);
484
DPRINTF(("### AR ### assignment in PE \"%s\" with %d soft violations and %d available slots\n",
485
best->pe_name, best->soft_violations, available_slots));
488
for_each(pe, pe_list) {
490
if (!pe_is_matching(pe, pe_request)) {
494
pe_name = lGetString(pe, PE_name);
497
if (best->gdil == NULL) {
498
int available_slots = 0;
500
best->pe_name = pe_name;
501
result = parallel_maximize_slots_pe(best, &available_slots);
503
if (result != DISPATCH_OK) {
504
schedd_mes_add(best->job_id, SCHEDD_INFO_PESLOTSNOTINRANGE_SI, pe_name, available_slots);
505
best_result = find_best_result(best_result, result);
508
DPRINTF(("### first ### assignment in PE \"%s\" with %d soft violations\n",
509
pe_name, best->soft_violations));
511
int available_slots = 0;
512
sge_assignment_t tmp = SGE_ASSIGNMENT_INIT;
514
assignment_copy(&tmp, best, false);
516
tmp.pe_name = pe_name;
518
result = parallel_maximize_slots_pe(&tmp, &available_slots);
520
if (result != DISPATCH_OK) {
521
assignment_release(&tmp);
522
schedd_mes_add(best->job_id, SCHEDD_INFO_PESLOTSNOTINRANGE_SI, pe_name, available_slots);
523
best_result = find_best_result(best_result, result);
527
if ((tmp.slots > best->slots) ||
528
(tmp.start == best->start &&
529
tmp.soft_violations < best->soft_violations)) {
530
assignment_copy(best, &tmp, true);
531
DPRINTF(("### better ### assignment in PE \"%s\" with %d soft violations\n",
532
pe_name, best->soft_violations));
535
assignment_release(&tmp);
541
if (matched_pe_count == 0) {
542
schedd_mes_add(best->job_id, SCHEDD_INFO_NOPEMATCH_ );
543
best_result = DISPATCH_NEVER_CAT;
544
} else if (best->is_reservation && best->gdil) {
545
int available_slots = 0;
546
result = parallel_maximize_slots_pe(best, &available_slots);
547
if (result != DISPATCH_OK) { /* ... should never happen */
548
best_result = DISPATCH_NEVER_CAT;
553
best_result = DISPATCH_OK;
556
switch (best_result) {
558
DPRINTF(("SELECT PE("sge_u32"."sge_u32") returns PE %s %d slots at "sge_u32")\n",
559
best->job_id, best->ja_task_id, best->pe_name, best->slots, best->start));
561
case DISPATCH_NOT_AT_TIME:
562
DPRINTF(("SELECT PE("sge_u32"."sge_u32") returns <later>\n",
563
best->job_id, best->ja_task_id));
565
case DISPATCH_NEVER_CAT:
566
DPRINTF(("SELECT PE("sge_u32"."sge_u32") returns <category_never>\n",
567
best->job_id, best->ja_task_id));
569
case DISPATCH_NEVER_JOB:
570
DPRINTF(("SELECT PE("sge_u32"."sge_u32") returns <job_never>\n",
571
best->job_id, best->ja_task_id));
573
case DISPATCH_MISSING_ATTR:
575
DPRINTF(("!!!!!!!! SELECT PE("sge_u32"."sge_u32") returns unexpected %d\n",
576
best->job_id, best->ja_task_id, best_result));
580
if (best->is_reservation && !best->is_advance_reservation) {
582
schedd_mes_set_logging(old_logging);
586
clean_up_parallel_job(best);
588
DRETURN(best_result);
591
/****** sge_select_queue/clean_up_parallel_job() *******************************
593
* clean_up_parallel_job() -- removes tages
596
* static void clean_up_parallel_job(sge_assignment_t *a)
599
* duing pe job dispatch are man queues and hosts taged. This
600
* function removes the tages.
603
* sge_assignment_t *a - the resource structure
608
* MT-NOTE: clean_up_parallel_job() is not MT safe
610
*******************************************************************************/
612
void clean_up_parallel_job(sge_assignment_t *a)
614
lListElem *host = NULL;
615
qinstance_list_set_tag(a->queue_list, 0);
616
for_each(host, a->host_list) {
617
lSetUlong(host, EH_tagged, 0);
618
lSetUlong(host, EH_master_host, 0);
623
/****** scheduler/parallel_reservation_max_time_slots() *****************************************
625
* parallel_reservation_max_time_slots() -- Search earliest possible assignment
628
* static dispatch_t parallel_reservation_max_time_slots(sge_assignment_t *best)
631
* The earliest possible assignment is searched for a job assuming a
632
* particular parallel environment be used with a particular slot
633
* number. If the slot number passed is 0 we start with the minimum
634
* possible slot number for that job. The search starts with the
635
* latest queue end time if DISPATCH_TIME_QUEUE_END was specified
636
* rather than a real time value.
639
* sge_assignment_t *best - herein we keep all important in/out information
642
* dispatch_t - 0 ok got an assignment
643
* 1 no assignment at the specified time (???)
644
* -1 assignment will never be possible for all jobs of that category
645
* -2 assignment will never be possible for that particular job
648
* MT-NOTE: parallel_reservation_max_time_slots() is not MT safe
649
*******************************************************************************/
651
parallel_reservation_max_time_slots(sge_assignment_t *best, int *available_slots)
653
u_long32 pe_time, first_time;
654
sge_assignment_t tmp_assignment = SGE_ASSIGNMENT_INIT;
655
dispatch_t result = DISPATCH_NEVER_CAT;
656
sge_qeti_t *qeti = NULL;
657
bool is_first = true;
659
category_use_t use_category;
661
DENTER(TOP_LAYER, "parallel_reservation_max_time_slots");
663
/* assemble job category information */
664
fill_category_use_t(best, &use_category, best->pe_name);
666
qeti = sge_qeti_allocate(best->job, best->pe, best->ckpt,
667
best->host_list, best->queue_list, best->centry_list, best->acl_list, best->hgrp_list, best->ar_list);
669
ERROR((SGE_EVENT, "could not allocate qeti object needed reservation "
670
"scheduling of parallel job "sge_U32CFormat, sge_u32c(best->job_id)));
671
DRETURN(DISPATCH_NEVER_CAT);
674
assignment_copy(&tmp_assignment, best, false);
675
if (best->slots == 0) {
676
tmp_assignment.slots = range_list_get_first_id(lGetList(best->job, JB_pe_range), NULL);
679
if (best->start == DISPATCH_TIME_QUEUE_END) {
680
first_time = sge_qeti_first(qeti);
681
if (first_time == 0) { /* we need at least one reservation run */
682
first_time = sconf_get_now();
685
/* the first iteration will be done using best->start
686
further ones will use earliert times */
687
first_time = best->start;
688
sge_qeti_next_before(qeti, best->start);
691
old_logging = schedd_mes_get_logging(); /* store logging mode */
692
for (pe_time = first_time ; pe_time; pe_time = sge_qeti_next(qeti)) {
693
DPRINTF(("SELECT PE TIME(%s, "sge_u32") tries at "sge_u32"\n",
694
best->pe_name, best->job_id, pe_time));
695
tmp_assignment.start = pe_time;
697
/* this is an additional run, we have already at least one posible match,
698
all additional scheduling information is not important, since we can
703
use_category.mod_category = false;
704
schedd_mes_set_logging(0);
708
result = parallel_assignment(&tmp_assignment, &use_category, available_slots);
709
assignment_clear_cache(&tmp_assignment);
711
if (result == DISPATCH_OK) {
712
if (tmp_assignment.gdil) {
713
DPRINTF(("SELECT PE TIME: earlier assignment at "sge_u32"\n", pe_time));
715
assignment_copy(best, &tmp_assignment, true);
716
assignment_release(&tmp_assignment);
718
DPRINTF(("SELECT PE TIME: no earlier assignment at "sge_u32"\n", pe_time));
722
schedd_mes_set_logging(old_logging); /* restore logging mode */
725
sge_qeti_release(&qeti);
726
assignment_release(&tmp_assignment);
729
result = DISPATCH_OK;
734
DPRINTF(("SELECT PE TIME(%s, %d) returns "sge_u32"\n",
735
best->pe_name, best->slots, best->start));
737
case DISPATCH_NEVER_CAT:
738
DPRINTF(("SELECT PE TIME(%s, %d) returns <category_never>\n",
739
best->pe_name, best->slots));
741
case DISPATCH_NEVER_JOB:
742
DPRINTF(("SELECT PE TIME(%s, %d) returns <job_never>\n",
743
best->pe_name, best->slots));
746
DPRINTF(("!!!!!!!! SELECT PE TIME(%s, %d) returns unexpected %d\n",
747
best->pe_name, best->slots, result));
754
/****** scheduler/parallel_maximize_slots_pe() *****************************************
756
* parallel_maximize_slots_pe() -- Maximize number of slots for an assignment
759
* static int parallel_maximize_slots_pe(sge_assignment_t *best, lList *host_list,
760
* lList *queue_list, lList *centry_list, lList *acl_list)
763
* The largest possible slot amount is searched for a job assuming a
764
* particular parallel environment be used at a particular start time.
765
* If the slot number passed is 0 we start with the minimum
766
* possible slot number for that job.
768
* To search most efficent for the right slot value, it has three search
769
* strategies implemented:
771
* - least slot value first
772
* - highest slot value first
774
* To be able to use binary search all possible slot values are stored in
775
* one array. The slot values in this array are sorted ascending. After the
776
* right slot value was found, it is very easy to compute the best strategy
777
* from the result. For each strategy it will compute how many iterations
778
* would have been needed to compute the correct result. These steps will
779
* be stored for the next run and used to figure out the best algorithm.
780
* To ensure that we can adapt to rapid changes and also ignore spiks we
781
* are using the running avarage algorithm in a 80-20 setting. This means
782
* that the algorithm will need 4 (max 5) iterations to adopt to a new
785
* Further enhancements:
786
* It might be a good idea to store the derived values with the job cartegories
787
* and allow to find the best strategy per category.
790
* sge_assignment_t *best - herein we keep all important in/out information
791
* lList *host_list - a list of all available hosts
792
* lList *queue_list - a list of all available queues
793
* lList *centry_list - a list of all available complex attributes
794
* lList *acl_list - a list of all access lists
797
* int - 0 ok got an assignment (maybe without maximizing it)
798
* 1 no assignment at the specified time
799
* -1 assignment will never be possible for all jobs of that category
800
* -2 assignment will never be possible for that particular job
803
* MT-NOTE: parallel_maximize_slots_pe() is MT safe as long as the provided
804
* lists are owned be the caller
808
* sconf_update_pe_alg
809
* add_pe_slots_to_category
811
*******************************************************************************/
813
parallel_maximize_slots_pe(sge_assignment_t *best, int *available_slots)
816
int min_slots, max_slots;
821
sge_assignment_t tmp = SGE_ASSIGNMENT_INIT;
822
dispatch_t result = DISPATCH_NEVER_CAT;
823
const char *pe_name = best->pe_name;
824
bool is_first = true;
826
category_use_t use_category;
827
u_long32 max_slotsp = 0;
829
int match_current = 0;
831
schedd_pe_algorithm alg = sconf_best_pe_alg();
833
DENTER(TOP_LAYER, "parallel_maximize_slots_pe");
836
(pe_range=lGetList(best->job, JB_pe_range)) == NULL ||
837
(pe=best->pe) == NULL) {
838
DRETURN(DISPATCH_NEVER_CAT);
841
/* assemble job category information */
842
fill_category_use_t(best, &use_category, pe_name);
844
first = range_list_get_first_id(pe_range, NULL);
845
last = range_list_get_last_id(pe_range, NULL);
846
max_pe_slots = lGetUlong(pe, PE_slots);
849
min_slots = best->slots;
854
if (best->gdil || best->slots == max_pe_slots) { /* already found maximum */
855
DRETURN(DISPATCH_OK);
858
DPRINTF(("MAXIMIZE SLOT: FIRST %d LAST %d MAX SLOT %d\n", first, last, max_pe_slots));
860
/* this limits max range number RANGE_INFINITY (i.e. -pe pe 1-) to reasonable number */
861
max_slots = MIN(last, max_pe_slots);
863
DPRINTF(("MAXIMIZE SLOT FOR "sge_u32" using \"%s\" FROM %d TO %d\n",
864
best->job_id, pe_name, min_slots, max_slots));
866
old_logging = schedd_mes_get_logging(); /* store logging mode */
868
if ((max_slots < min_slots) ||
870
ERROR((SGE_EVENT, "invalid pe job range setting for job "sge_u32"\n", best->job_id));
871
DRETURN(DISPATCH_NEVER_CAT);
874
/* --- prepare the posible slots for the binary search */
875
max_slotsp = (max_slots - min_slots+1);
876
if (!add_pe_slots_to_category(&use_category, &max_slotsp, pe, min_slots, max_slots, pe_range)) {
877
ERROR((SGE_EVENT, MSG_SGETEXT_NOMEM));
878
DRETURN(DISPATCH_NEVER_CAT);
880
if (max_slotsp == 0) {
881
DPRINTF(("no slots in PE %s available for job "sge_u32"\n", pe_name, best->job_id));
882
DRETURN(DISPATCH_NEVER_CAT);
885
assignment_copy(&tmp, best, false);
887
/* --- work on the different slot ranges and try to find the best one --- */
888
if (alg == SCHEDD_PE_BINARY) {
890
int max = max_slotsp-1;
894
current = (min + max) / 2;
896
if (is_first) { /* first run, collect information */
898
} else { /* this is an additional run, we have already at least one posible match */
899
use_category.mod_category = false;
900
schedd_mes_set_logging(0);
904
/* we try that slot amount */
905
tmp.slots = use_category.posible_pe_slots[current];
906
result = parallel_assignment(&tmp, &use_category, available_slots);
907
assignment_clear_cache(&tmp);
909
if (result == DISPATCH_OK) {
910
assignment_copy(best, &tmp, true);
911
assignment_release(&tmp);
912
match_current = current;
918
} while (min <= max && max != -1);
921
/* compute how many runs the bin search might have needed */
922
/* This will not give us the correct answer in all cases, but
923
it is close enough. And on average it is corrent :-) */
924
int end = max_slotsp;
925
for ( runs=1; runs < end; runs++) {
930
if (alg == SCHEDD_PE_LOW_FIRST) {
931
for (current = 0; current < max_slotsp; current++) {
933
if (is_first) { /* first run, collect information */
935
} else { /* this is an additional run, we have already at least one posible match */
936
use_category.mod_category = false;
937
schedd_mes_set_logging(0);
941
/* we try that slot amount */
942
tmp.slots = use_category.posible_pe_slots[current];
943
result = parallel_assignment(&tmp, &use_category, available_slots);
944
assignment_clear_cache(&tmp);
946
if (result != DISPATCH_OK) { /* we have mismatch, stop */
947
break; /* the other runs will not work */
950
match_current = current;
951
assignment_copy(best, &tmp, true);
952
assignment_release(&tmp);
954
} else { /* optimistic search */
955
for (current = max_slotsp-1; current >= 0; current--) {
957
if (is_first) { /* first run, collect information */
959
} else { /* this is an additional run, we have already at least one posible match */
960
use_category.mod_category = false;
961
schedd_mes_set_logging(0);
965
/* we try that slot amount */
966
tmp.slots = use_category.posible_pe_slots[current];
967
result = parallel_assignment(&tmp, &use_category, available_slots);
968
assignment_clear_cache(&tmp);
970
if (result == DISPATCH_OK) { /* we have a match, stop */
971
assignment_copy(best, &tmp, true); /* all other runs will also match */
972
assignment_release(&tmp);
973
match_current = current;
981
sconf_update_pe_alg(runs, match_current, max_slotsp);
984
/* --- we are done --- */
985
if (!use_category.is_pe_slots_rev) {
986
FREE(use_category.posible_pe_slots);
989
assignment_release(&tmp);
991
schedd_mes_set_logging(old_logging); /* restore logging mode */
995
result = DISPATCH_OK;
1001
if (!best->is_reservation) {
1002
sconf_inc_comprehensive_jobs();
1005
DPRINTF(("MAXIMIZE SLOT(%s, %d) returns <ok>\n",
1006
pe_name, best->slots));
1008
case DISPATCH_NOT_AT_TIME:
1009
DPRINTF(("MAXIMIZE SLOT(%s, %d) returns <later>\n",
1010
pe_name, best->slots));
1012
case DISPATCH_NEVER_CAT:
1013
DPRINTF(("MAXIMIZE SLOT(%s, %d) returns <category_never>\n",
1014
pe_name, best->slots));
1016
case DISPATCH_NEVER_JOB:
1017
DPRINTF(("MAXIMIZE SLOT(%s, %d) returns <job_never>\n",
1018
pe_name, best->slots));
1021
DPRINTF(("!!!!!!!! MAXIMIZE SLOT(%d, %d) returns unexpected %d\n",
1022
best->slots, (int)best->start, result));
1029
/****** sge_select_queue/sge_select_queue() ************************************
1031
* sge_select_queue() -- checks whether a job matches a given queue or host
1034
* int sge_select_queue(lList *requested_attr, lListElem *queue, lListElem
1035
* *host, lList *exechost_list, lList *centry_list, bool
1036
* allow_non_requestable, int slots)
1039
* Takes the requested attributes from a job and checks if it matches the given
1040
* host or queue. One and only one should be specified. If both, the function
1041
* assumes, that the queue belongs to the given host.
1044
* lList *requested_attr - list of requested attributes
1045
* lListElem *queue - current queue or null if host is set
1046
* lListElem *host - current host or null if queue is set
1047
* lList *exechost_list - list of all hosts in the system
1048
* lList *centry_list - system wide attribut config list
1049
* bool allow_non_requestable - allow non requestable?
1050
* int slots - number of requested slots
1051
* lList *queue_user_list - list of users or null
1052
* lList *acl_list - acl_list or null
1053
* lListElem *job - job or null
1056
* int - 1, if okay, QU_tag will be set if a queue is selected
1060
* The caller is responsible for cleaning tags.
1062
* No range is used. For serial jobs we will need a call for hard and one
1063
* for soft requests. For parallel jobs we will call this function for each
1064
* -l request. Because of in serial jobs requests can be simply added.
1065
* In Parallel jobs each -l requests a different set of queues.
1067
*******************************************************************************/
1069
sge_select_queue(lList *requested_attr, lListElem *queue, lListElem *host,
1070
lList *exechost_list, lList *centry_list, bool allow_non_requestable,
1071
int slots, lList *queue_user_list, lList *acl_list, lListElem *job)
1074
lList *load_attr = NULL;
1075
lList *config_attr = NULL;
1076
lList *actual_attr = NULL;
1077
lListElem *global = NULL;
1078
lListElem *qu = NULL;
1081
const char *project;
1083
sge_assignment_t a = SGE_ASSIGNMENT_INIT;
1084
double lc_factor = 0; /* scaling for load correction */
1085
u_long32 ulc_factor;
1086
/* actually we don't care on start time here to this is just a dummy setting */
1087
u_long32 start_time = DISPATCH_TIME_NOW;
1089
DENTER(TOP_LAYER, "sge_select_queue");
1091
clear_resource_tags(requested_attr, MAX_TAG);
1093
assignment_init(&a, NULL, NULL, false);
1094
a.centry_list = centry_list;
1095
a.host_list = exechost_list;
1097
if (acl_list != NULL) {
1098
/* check if job owner has access rights to the queue */
1099
DPRINTF(("testing queue access lists\n"));
1100
for_each(qu, queue_user_list) {
1101
const char *name = lGetString(qu, ST_name);
1102
DPRINTF(("-----> checking queue user: %s\n", name ));
1103
q_access |= (name[0]=='@')?
1104
sge_has_access(NULL, &name[1], queue, acl_list):
1105
sge_has_access(name, NULL, queue, acl_list);
1107
if (q_access == 0) {
1108
DPRINTF(("no access\n"));
1109
assignment_release(&a);
1117
/* check if job can run in queue based on project */
1118
DPRINTF(("testing job projects lists\n"));
1119
if ( (project = lGetString(job, JB_project)) ) {
1120
if ((!(projects = lGetList(queue, QU_projects)))) {
1121
DPRINTF(("no access because queue has no project\n"));
1122
assignment_release(&a);
1125
if ((!prj_list_locate(projects, project))) {
1126
DPRINTF(("no access because project not contained in queues project list"));
1127
assignment_release(&a);
1132
/* check if job can run in queue based on excluded projects */
1133
DPRINTF(("testing job xprojects lists\n"));
1134
if ((projects = lGetList(queue, QU_xprojects))) {
1135
if (((project = lGetString(job, JB_project)) &&
1136
prj_list_locate(projects, project))) {
1137
DPRINTF(("no access\n"));
1138
assignment_release(&a);
1146
*is queue contained in hard queue list ?
1148
DPRINTF(("queue contained in jobs hard queue list?\n"));
1149
if (lGetList(job, JB_hard_queue_list)) {
1150
lList *qref_list = lGetList(job, JB_hard_queue_list);
1151
const char *qname = NULL;
1152
const char *qinstance_name = NULL;
1154
qname = lGetString(queue, QU_qname);
1155
qinstance_name = lGetString(queue, QU_full_name);
1156
if ((lGetElemStr(qref_list, QR_name, qname) != NULL) ||
1157
(lGetElemStr(qref_list, QR_name, qinstance_name) != NULL)) {
1160
DPRINTF(("denied because queue \"%s\" is not contained in the hard "
1161
"queue list (-q) that was requested by job %d\n",
1162
qname, lGetUlong(job, JB_job_number)));
1163
assignment_release(&a);
1170
global = host_list_locate(exechost_list, SGE_GLOBAL_NAME);
1171
load_attr = lGetList(global, EH_load_list);
1172
config_attr = lGetList(global, EH_consumable_config_list);
1173
actual_attr = lGetList(global, EH_resource_utilization);
1175
/* is there a multiplier for load correction (may be not in qstat, qmon etc) */
1176
if (lGetPosViaElem(global, EH_load_correction_factor, SGE_NO_ABORT) >= 0) {
1177
if ((ulc_factor=lGetUlong(global, EH_load_correction_factor)))
1178
lc_factor = ((double)ulc_factor)/100;
1181
ret = rc_time_by_slots(&a, requested_attr, load_attr, config_attr, actual_attr,
1182
NULL, allow_non_requestable, NULL, slots, DOMINANT_LAYER_HOST, lc_factor, GLOBAL_TAG,
1183
&start_time, SGE_GLOBAL_NAME);
1186
if(ret == DISPATCH_OK || ret == DISPATCH_MISSING_ATTR){
1188
host = host_list_locate(exechost_list, lGetHost(queue, QU_qhostname));
1190
load_attr = lGetList(host, EH_load_list);
1191
config_attr = lGetList(host, EH_consumable_config_list);
1192
actual_attr = lGetList(host, EH_resource_utilization);
1194
if (lGetPosViaElem(host, EH_load_correction_factor, SGE_NO_ABORT) >= 0) {
1195
if ((ulc_factor=lGetUlong(host, EH_load_correction_factor)))
1196
lc_factor = ((double)ulc_factor)/100;
1199
ret = rc_time_by_slots(&a, requested_attr, load_attr, config_attr,
1200
actual_attr, NULL, allow_non_requestable, NULL,
1201
slots, DOMINANT_LAYER_HOST, lc_factor, HOST_TAG,
1202
&start_time, lGetHost(host, EH_name));
1205
if((ret == DISPATCH_OK || ret == DISPATCH_MISSING_ATTR) && queue){
1206
config_attr = lGetList(queue, QU_consumable_config_list);
1207
actual_attr = lGetList(queue, QU_resource_utilization);
1209
ret = rc_time_by_slots(&a, requested_attr, NULL, config_attr, actual_attr,
1210
queue, allow_non_requestable, NULL, slots, DOMINANT_LAYER_QUEUE, 0, QUEUE_TAG,
1211
&start_time, lGetString(queue, QU_full_name));
1215
assignment_release(&a);
1217
DRETURN((ret == DISPATCH_OK) ? true : false);
1221
/****** sge_select_queue/rc_time_by_slots() **********************************
1223
* rc_time_by_slots() -- checks weather all resource requests on one level
1227
* static int rc_time_by_slots(lList *requested, lList *load_attr, lList
1228
* *config_attr, lList *actual_attr, lList *centry_list, lListElem *queue,
1229
* bool allow_non_requestable, char *reason, int reason_size, int slots,
1230
* u_long32 layer, double lc_factor, u_long32 tag)
1233
* Checks, weather all requests, default requests and implicit requests on this
1234
* this level are fulfilled.
1236
* With reservation scheduling the earliest start time due to resources of the
1237
* resource container is the maximum of the earliest start times for all
1238
* resources comprised by the resource container that requested by a job.
1241
* lList *requested - list of attribute requests
1242
* lList *load_attr - list of load attributes or null on queue level
1243
* lList *config_attr - list of user defined attributes
1244
* lList *actual_attr - usage of all consumables (RUE_Type)
1245
* lList *centry_list - system wide attribute config. list (CE_Type)
1246
* lListElem *queue - current queue or NULL on global/host level
1247
* bool allow_non_requestable - allow none requestabales?
1248
* char *reason - error message
1249
* int reason_size - max error message size
1250
* int slots - number of slots the job is looking for
1251
* u_long32 layer - current layer flag
1252
* double lc_factor - load correction factor
1253
* u_long32 tag - current layer tag
1254
* u_long32 *start_time - in/out argument for start time
1255
* u_long32 duration - jobs estimated total run time
1256
* const char *object_name - name of the object used for monitoring purposes
1262
* MT-NOTES: is not thread save. uses a static buffer
1265
* we have some special behavior, when slots is set to -1.
1266
*******************************************************************************/
1268
rc_time_by_slots(const sge_assignment_t *a, lList *requested, lList *load_attr, lList *config_attr,
1269
lList *actual_attr, lListElem *queue, bool allow_non_requestable,
1270
dstring *reason, int slots, u_long32 layer, double lc_factor, u_long32 tag,
1271
u_long32 *start_time, const char *object_name)
1273
static lListElem *implicit_slots_request = NULL;
1275
u_long32 latest_time = DISPATCH_TIME_NOW;
1278
bool is_not_found = false;
1280
DENTER(TOP_LAYER, "rc_time_by_slots");
1282
clear_resource_tags(requested, QUEUE_TAG);
1284
/* ensure availability of implicit slot request */
1285
if (!implicit_slots_request) {
1286
implicit_slots_request = lCreateElem(CE_Type);
1287
lSetString(implicit_slots_request, CE_name, SGE_ATTR_SLOTS);
1288
lSetString(implicit_slots_request, CE_stringval, "1");
1289
lSetDouble(implicit_slots_request, CE_doubleval, 1);
1292
/* match number of free slots */
1294
tmp_start = *start_time;
1295
ret = ri_time_by_slots(a, implicit_slots_request, load_attr, config_attr, actual_attr, queue,
1296
reason, allow_non_requestable, slots, layer, lc_factor, &tmp_start, object_name);
1298
if (ret == DISPATCH_OK && *start_time == DISPATCH_TIME_QUEUE_END) {
1299
DPRINTF(("%s: \"slot\" request delays start time from "sge_U32CFormat
1300
" to "sge_U32CFormat"\n", object_name, latest_time, MAX(latest_time, tmp_start)));
1301
latest_time = MAX(latest_time, tmp_start);
1304
/* we don't care if slots are not specified, except at queue level */
1305
if (ret == DISPATCH_MISSING_ATTR && tag != QUEUE_TAG) {
1308
if (ret != DISPATCH_OK) {
1313
/* ensure all default requests are fulfilled */
1314
if (slots != -1 && !allow_non_requestable) {
1321
for_each (attr, actual_attr) {
1322
name = lGetString(attr, RUE_name);
1323
if (!strcmp(name, "slots")) {
1327
/* consumable && used in this global/host/queue && not requested */
1328
if (!is_requested(requested, name)) {
1329
lListElem *default_request = lGetElemStr(a->centry_list, CE_name, name);
1330
const char *def_req = lGetString(default_request, CE_default);
1331
valtype = lGetUlong(default_request, CE_valtype);
1332
parse_ulong_val(&dval, NULL, valtype, def_req, NULL, 0);
1334
/* ignore default request if the value is 0 */
1335
if(def_req != NULL && dval != 0.0) {
1337
char tmp_reason_buf[2048];
1339
sge_dstring_init(&tmp_reason, tmp_reason_buf, sizeof(tmp_reason_buf));
1341
/* build the default request */
1342
lSetString(default_request, CE_stringval, def_req);
1343
lSetDouble(default_request, CE_doubleval, dval);
1345
tmp_start = *start_time;
1346
ff = ri_time_by_slots(a, default_request, load_attr, config_attr, actual_attr,
1347
queue, &tmp_reason, true, slots, layer, lc_factor, &tmp_start, object_name);
1349
if (ff != DISPATCH_OK) {
1350
/* useless to continue in these cases */
1351
sge_dstring_append(reason, MSG_SCHEDD_FORDEFAULTREQUEST);
1352
sge_dstring_append_dstring(reason, &tmp_reason);
1356
if (*start_time == DISPATCH_TIME_QUEUE_END) {
1357
DPRINTF(("%s: default request \"%s\" delays start time from "sge_U32CFormat
1358
" to "sge_U32CFormat"\n", object_name, name, latest_time, MAX(latest_time, tmp_start)));
1359
latest_time = MAX(latest_time, tmp_start);
1369
/* explicit requests */
1370
for_each (attr, requested) {
1371
const char *attr_name = lGetString(attr, CE_name);
1373
tmp_start = *start_time;
1374
switch (ri_time_by_slots(a, attr,load_attr, config_attr, actual_attr, queue,
1375
reason, allow_non_requestable, slots, layer, lc_factor, &tmp_start, object_name)) {
1377
case DISPATCH_NEVER_CAT : /* will never match */
1378
DRETURN(DISPATCH_NEVER_CAT);
1380
case DISPATCH_OK : /* a match was found */
1381
if (*start_time == DISPATCH_TIME_QUEUE_END) {
1382
DPRINTF(("%s: explicit request \"%s\" delays start time from "sge_U32CFormat
1383
"to "sge_U32CFormat"\n", object_name, attr_name, latest_time,
1384
MAX(latest_time, tmp_start)));
1385
latest_time = MAX(latest_time, tmp_start);
1387
if (lGetUlong(attr, CE_tagged) < tag && tag != RQS_TAG) {
1388
lSetUlong(attr, CE_tagged, tag);
1392
case DISPATCH_NOT_AT_TIME : /* will match later-on */
1393
DPRINTF(("%s: request for %s will match later-on\n", object_name, attr_name));
1394
DRETURN(DISPATCH_NOT_AT_TIME);
1396
case DISPATCH_MISSING_ATTR : /* the requested element does not exist */
1397
if (tag == QUEUE_TAG && lGetUlong(attr, CE_tagged) == NO_TAG) {
1398
sge_dstring_sprintf(reason, MSG_SCHEDD_JOBREQUESTSUNKOWNRESOURCE_S, attr_name);
1399
DRETURN(DISPATCH_NEVER_CAT);
1402
if (tag != QUEUE_TAG) {
1403
is_not_found = true;
1407
default: /* error */
1412
if (*start_time == DISPATCH_TIME_QUEUE_END) {
1413
*start_time = latest_time;
1417
DRETURN(DISPATCH_MISSING_ATTR);
1420
DRETURN(DISPATCH_OK);
1424
match_static_resource(int slots, lListElem *req_cplx, lListElem *src_cplx, dstring *reason,
1425
int is_threshold, int force_existence, bool allow_non_requestable)
1428
dispatch_t ret = DISPATCH_OK;
1429
char availability_text[2048];
1431
DENTER(TOP_LAYER, "match_static_resource");
1433
/* check whether attrib is requestable */
1434
if (!allow_non_requestable && lGetUlong(src_cplx, CE_requestable) == REQU_NO) {
1435
sge_dstring_append(reason, MSG_SCHEDD_JOBREQUESTSNONREQUESTABLERESOURCE);
1436
sge_dstring_append(reason, lGetString(src_cplx, CE_name));
1437
sge_dstring_append(reason, "\"");
1438
DRETURN(DISPATCH_NEVER_CAT);
1441
match = compare_complexes(slots, req_cplx, src_cplx, availability_text, false, false);
1444
sge_dstring_append(reason, MSG_SCHEDD_ITOFFERSONLY);
1445
sge_dstring_append(reason, availability_text);
1446
ret = DISPATCH_NEVER_CAT;
1452
/****** sge_select_queue/clear_resource_tags() *********************************
1454
* clear_resource_tags() -- removes the tags from a resouce request.
1457
* static void clear_resource_tags(lList *resouces, u_long32 max_tag)
1460
* Removes the tages from the given resouce list. A tag is only removed
1461
* if it is smaller or equal to the given tag value. The tag value "MAX_TAG" results
1462
* in removing all existing tags, or the value "HOST_TAG" removes queue and host
1463
* tags but keeps the global tags.
1466
* lList *resouces - list of job requests.
1467
* u_long32 max_tag - max tag element
1469
*******************************************************************************/
1470
static void clear_resource_tags(lList *resources, u_long32 max_tag) {
1472
lListElem *attr=NULL;
1474
for_each(attr, resources){
1475
if(lGetUlong(attr, CE_tagged) <= max_tag)
1476
lSetUlong(attr, CE_tagged, NO_TAG);
1481
/****** sge_select_queue/sge_queue_match_static() ************************
1483
* sge_queue_match_static() -- Do matching that depends not on time.
1486
* static int sge_queue_match_static(lListElem *queue, lListElem *job,
1487
* const lListElem *pe, const lListElem *ckpt, lList *centry_list, lList
1488
* *host_list, lList *acl_list)
1491
* Checks if a job fits on a queue or not. All checks that depend on the
1492
* current load and resource situation must get handled outside.
1493
* The queue also gets tagged in QU_tagged4schedule to indicate whether it
1494
* is specified using -masterq queue_list.
1497
* lListElem *queue - The queue we're matching
1498
* lListElem *job - The job
1499
* const lListElem *pe - The PE object
1500
* const lListElem *ckpt - The ckpt object
1501
* lList *centry_list - The centry list
1502
* lList *acl_list - The ACL list
1505
* dispatch_t - DISPATCH_OK, ok
1506
* DISPATCH_NEVER_CAT, assignment will never be possible for all jobs of that category
1509
*******************************************************************************/
1510
dispatch_t sge_queue_match_static(lListElem *queue, lListElem *job, const lListElem *pe,
1511
const lListElem *ckpt, lList *centry_list, lList *acl_list,
1512
lList *hgrp_list, lList *ar_list)
1518
const char *project;
1519
const lList *hard_queue_list, *master_hard_queue_list;
1520
const char *qinstance_name = lGetString(queue, QU_full_name);
1521
bool could_be_master = false;
1523
DENTER(TOP_LAYER, "sge_queue_match_static");
1525
/* check if queue was reserved for AR job */
1526
ar_id = lGetUlong(job, JB_ar);
1527
ar_ep = lGetElemUlong(ar_list, AR_id, ar_id);
1529
if (ar_ep != NULL) {
1530
DPRINTF(("searching for queue %s\n", qinstance_name));
1532
if (lGetSubStr(ar_ep, QU_full_name, qinstance_name, AR_reserved_queues) == NULL) {
1533
schedd_mes_add_global(SCHEDD_INFO_QNOTARRESERVED_SI, qinstance_name, ar_id);
1534
DRETURN(DISPATCH_NEVER_CAT);
1537
/* this is not advance reservation job, we have to drop queues in orphaned state */
1538
if (lGetUlong(queue, QU_state) == QI_ORPHANED) {
1539
schedd_mes_add_global(SCHEDD_INFO_QUEUENOTAVAIL_, qinstance_name);
1540
DRETURN(DISPATCH_NEVER_CAT);
1544
job_id = lGetUlong(job, JB_job_number);
1545
/* check if job owner has access rights to the queue */
1546
if (!sge_has_access(lGetString(job, JB_owner), lGetString(job, JB_group), queue, acl_list)) {
1547
DPRINTF(("Job %d has no permission for queue %s\n", (int)job_id, qinstance_name));
1548
schedd_mes_add(job_id, SCHEDD_INFO_HASNOPERMISSION_SS, "queue", qinstance_name);
1549
DRETURN(DISPATCH_NEVER_CAT);
1552
/* check if job can run in queue based on project */
1553
if ((projects = lGetList(queue, QU_projects))) {
1554
if ((!(project = lGetString(job, JB_project)))) {
1555
schedd_mes_add(job_id, SCHEDD_INFO_HASNOPRJ_S,
1556
"queue", qinstance_name);
1557
DRETURN(DISPATCH_NEVER_CAT);
1559
if ((!prj_list_locate(projects, project))) {
1560
schedd_mes_add(job_id, SCHEDD_INFO_HASINCORRECTPRJ_SSS,
1561
project, "queue", qinstance_name);
1562
DRETURN(DISPATCH_NEVER_CAT);
1566
/* check if job can run in queue based on excluded projects */
1567
if ((projects = lGetList(queue, QU_xprojects))) {
1568
if (((project = lGetString(job, JB_project)) &&
1569
prj_list_locate(projects, project))) {
1570
schedd_mes_add(job_id, SCHEDD_INFO_EXCLPRJ_SSS,
1571
project, "queue", qinstance_name);
1572
DRETURN(DISPATCH_NEVER_CAT);
1576
hard_queue_list = lGetList(job, JB_hard_queue_list);
1577
master_hard_queue_list = lGetList(job, JB_master_hard_queue_list);
1578
if (hard_queue_list || master_hard_queue_list) {
1579
if (!centry_list_are_queues_requestable(centry_list)) {
1580
schedd_mes_add(job_id, SCHEDD_INFO_QUEUENOTREQUESTABLE_S,
1582
DRETURN(DISPATCH_NEVER_CAT);
1587
* is this queue a candidate for being the master queue?
1589
if (master_hard_queue_list) {
1590
bool is_in_list = true;
1591
if (qref_list_cq_rejected(master_hard_queue_list, lGetString(queue, QU_qname),
1592
lGetHost(queue, QU_qhostname), hgrp_list))
1598
lSetUlong(queue, QU_tagged4schedule, is_in_list ? 1 : 0);
1600
DPRINTF(("Queue \"%s\" is not contained in the master hard "
1601
"queue list (-masterq) that was requested by job %d\n",
1602
qinstance_name, (int) job_id));
1604
could_be_master = true;
1607
lSetUlong(queue, QU_tagged4schedule, 0);
1612
* is queue contained in hard queue list ?
1614
if (hard_queue_list) {
1615
if ((could_be_master == false) && qref_list_cq_rejected(hard_queue_list, lGetString(queue, QU_qname),
1616
lGetHost(queue, QU_qhostname), hgrp_list)) {
1617
DPRINTF(("Queue \"%s\" is not contained in the hard "
1618
"queue list (-q) that was requested by job %d\n",
1619
qinstance_name, (int) job_id));
1620
schedd_mes_add(job_id, SCHEDD_INFO_NOTINHARDQUEUELST_S,
1622
DRETURN(DISPATCH_NEVER_CAT);
1627
** different checks for different job types:
1630
if (pe) { /* parallel job */
1631
if (!qinstance_is_parallel_queue(queue)) {
1632
DPRINTF(("Queue \"%s\" is not a parallel queue as requested by "
1633
"job %d\n", qinstance_name, (int)job_id));
1634
schedd_mes_add(job_id, SCHEDD_INFO_NOTPARALLELQUEUE_S, qinstance_name);
1635
DRETURN(DISPATCH_NEVER_CAT);
1639
* check if the requested PE is named in the PE reference list of Queue
1641
if (!qinstance_is_pe_referenced(queue, pe)) {
1642
DPRINTF(("Queue "SFQ" does not reference PE "SFQ"\n",
1643
qinstance_name, lGetString(pe, PE_name)));
1644
schedd_mes_add(job_id, SCHEDD_INFO_NOTINQUEUELSTOFPE_SS,
1645
qinstance_name, lGetString(pe, PE_name));
1646
DRETURN(DISPATCH_NEVER_CAT);
1650
if (ckpt) { /* ckpt job */
1651
/* is it a ckpt queue ? */
1652
if (!qinstance_is_checkpointing_queue(queue)) {
1653
DPRINTF(("Queue \"%s\" is not a checkpointing queue as requested by "
1654
"job %d\n", qinstance_name, (int)job_id));
1655
schedd_mes_add(job_id, SCHEDD_INFO_NOTACKPTQUEUE_SS, qinstance_name);
1656
DRETURN(DISPATCH_NEVER_CAT);
1660
* check if the requested CKPT is named in the CKPT ref list of Queue
1662
if (!qinstance_is_ckpt_referenced(queue, ckpt)) {
1663
DPRINTF(("Queue \"%s\" does not reference checkpointing object "SFQ
1664
"\n", qinstance_name, lGetString(ckpt, CK_name)));
1665
schedd_mes_add(job_id, SCHEDD_INFO_NOTINQUEUELSTOFCKPT_SS,
1666
qinstance_name, lGetString(ckpt, CK_name));
1667
DRETURN(DISPATCH_NEVER_CAT);
1671
/* to be activated as soon as immediate jobs are available */
1672
if (JOB_TYPE_IS_IMMEDIATE(lGetUlong(job, JB_type))) {
1673
if (!qinstance_is_interactive_queue(queue)) {
1674
DPRINTF(("Queue \"%s\" is not an interactive queue as requested by "
1675
"job %d\n", qinstance_name, (int)job_id));
1676
schedd_mes_add(job_id, SCHEDD_INFO_QUEUENOTINTERACTIVE_S, qinstance_name);
1677
DRETURN(DISPATCH_NEVER_CAT);
1681
if (!pe && !ckpt && !JOB_TYPE_IS_IMMEDIATE(lGetUlong(job, JB_type))) { /* serial (batch) job */
1682
/* is it a batch or transfer queue */
1683
if (!qinstance_is_batch_queue(queue)) {
1684
DPRINTF(("Queue \"%s\" is not a batch queue as "
1685
"requested by job %d\n", qinstance_name, (int)job_id));
1686
schedd_mes_add(job_id, SCHEDD_INFO_NOTASERIALQUEUE_S, qinstance_name);
1687
DRETURN(DISPATCH_NEVER_CAT);
1691
if (ckpt && !pe && lGetString(job, JB_script_file) &&
1692
qinstance_is_parallel_queue(queue) && !qinstance_is_batch_queue(queue)) {
1693
DPRINTF(("Queue \"%s\" is not a serial queue as "
1694
"requested by job %d\n", qinstance_name, (int)job_id));
1695
schedd_mes_add(job_id, SCHEDD_INFO_NOTPARALLELJOB_S, qinstance_name);
1696
DRETURN(DISPATCH_NEVER_CAT);
1699
if (job_is_forced_centry_missing(job, centry_list, queue)) {
1700
DRETURN(DISPATCH_NEVER_CAT);
1703
DRETURN(DISPATCH_OK);
1707
job_is_forced_centry_missing(const lListElem *job,
1708
const lList *master_centry_list,
1709
const lListElem *queue_or_host)
1714
DENTER(TOP_LAYER, "job_is_forced_centry_missing");
1715
if (job != NULL && master_centry_list != NULL && queue_or_host != NULL) {
1716
lList *res_list = lGetList(job, JB_hard_resource_list);
1718
for_each(centry, master_centry_list) {
1719
const char *name = lGetString(centry, CE_name);
1720
bool is_requ = is_requested(res_list, name);
1721
bool is_forced = lGetUlong(centry, CE_requestable) == REQU_FORCED ? true : false;
1722
const char *object_name = NULL;
1723
bool is_qinstance = object_has_type(queue_or_host, QU_Type);
1724
bool is_host = object_has_type(queue_or_host, EH_Type);
1728
is_forced = qinstance_is_centry_a_complex_value(queue_or_host, centry);
1729
object_name = lGetString(queue_or_host, QU_full_name);
1730
} else if (is_host) {
1731
is_forced = host_is_centry_a_complex_value(queue_or_host, centry);
1732
object_name = lGetHost(queue_or_host, EH_name);
1739
if (is_forced && !is_requ) {
1740
u_long32 job_id = lGetUlong(job, JB_job_number);
1742
DPRINTF(("job "sge_u32" does not request 'forced' resource "SFQ" of "
1743
SFN"\n", job_id, name, object_name));
1745
schedd_mes_add(job_id, SCHEDD_INFO_NOTREQFORCEDRES_SS,
1747
} else if (is_host) {
1748
schedd_mes_add(job_id, SCHEDD_INFO_NOFORCEDRES_SS,
1759
/****** sge_select_queue/compute_soft_violations() ********************************
1761
* compute_soft_violations() -- counts the violations in the request for a given host or queue
1764
* static int compute_soft_violations(lListElem *queue, int violation, lListElem *job,lList *load_attr, lList *config_attr,
1765
* lList *actual_attr, lList *centry_list, u_long32 layer, double lc_factor, u_long32 tag)
1768
* this function checks if the current resources can satisfy the requests. The resources come from the global host, a
1769
* given host or the queue. The function returns the number of violations.
1772
* const sge_assignment_t *a - job info structure
1773
* lListElem *queue - should only be set, when one using this method on queue level
1774
* int violation - the number of previous violations. This is needed to get a correct result on queue level.
1775
* lList *load_attr - the load attributs, only when used on hosts or global
1776
* lList *config_attr - a list of custom attributes (CE_Type)
1777
* lList *actual_attr - a list of custom consumables, they contain the current usage of these attributes (RUE_Type)
1778
* u_long32 layer - the curent layer flag
1779
* double lc_factor - should be set, when load correction has to be done.
1780
* u_long32 tag - the current layer tag. (GLOGAL_TAG, HOST_TAG, QUEUE_TAG)
1783
* static int - the number of violations ( = (prev. violations) + (new violations in this run)).
1785
*******************************************************************************/
1787
compute_soft_violations(const sge_assignment_t *a, lListElem *queue, int violation, lList *load_attr, lList *config_attr,
1788
lList *actual_attr, u_long32 layer, double lc_factor, u_long32 tag)
1791
const char *queue_name = NULL;
1793
char reason_buf[1024 + 1];
1794
unsigned int soft_violation = violation;
1795
lList *soft_requests = NULL;
1797
u_long32 start_time = DISPATCH_TIME_NOW;
1799
DENTER(TOP_LAYER, "compute_soft_violations");
1801
sge_dstring_init(&reason, reason_buf, sizeof(reason_buf));
1803
soft_requests = lGetList(a->job, JB_soft_resource_list);
1804
clear_resource_tags(soft_requests, tag);
1808
queue_name = lGetString(queue, QU_full_name);
1811
/* count number of soft violations for _one_ slot of this job */
1813
for_each (attr, soft_requests) {
1814
switch (ri_time_by_slots(a, attr, load_attr, config_attr, actual_attr, queue,
1815
&reason, false, 1, layer, lc_factor, &start_time, queue_name?queue_name:"no queue")) {
1817
case DISPATCH_NEVER_CAT:
1820
/* element not found */
1821
case DISPATCH_MISSING_ATTR:
1822
case DISPATCH_NOT_AT_TIME:
1823
if (tag == QUEUE_TAG && lGetUlong(attr, CE_tagged) == NO_TAG) {
1827
/* everything is fine */
1829
if (lGetUlong(attr, CE_tagged) < tag)
1830
lSetUlong(attr, CE_tagged, tag);
1835
DPRINTF(("queue %s does not fulfill soft %d requests (first: %s)\n",
1836
queue_name, soft_violation, reason_buf));
1839
* check whether queue fulfills soft queue request of the job (-q)
1841
if (lGetList(a->job, JB_soft_queue_list)) {
1842
lList *qref_list = lGetList(a->job, JB_soft_queue_list);
1844
const char *qinstance_name = NULL;
1846
qinstance_name = lGetString(queue, QU_full_name);
1848
for_each (qr, qref_list) {
1849
if (qref_cq_rejected(lGetString(qr, QR_name), lGetString(queue, QU_qname),
1850
lGetHost(queue, QU_qhostname), a->hgrp_list)) {
1851
DPRINTF(("Queue \"%s\" is not contained in the soft "
1852
"queue list (-q) that was requested by job %d\n",
1853
qinstance_name, (int) job_id));
1859
/* store number of soft violations in queue */
1860
lSetUlong(queue, QU_soft_violation, soft_violation);
1863
DRETURN(soft_violation);
1866
/****** sge_select_queue/sge_host_match_static() ********************************
1868
* sge_host_match_static() -- Static test whether job fits to host
1871
* static int sge_host_match_static(lListElem *job, lListElem *ja_task,
1872
* lListElem *host, lList *centry_list, lList *acl_list)
1877
* lListElem *job - ???
1878
* lListElem *ja_task - ???
1879
* lListElem *host - ???
1880
* lList *centry_list - ???
1881
* lList *acl_list - ???
1885
* -1 assignment will never be possible for all jobs of that category
1886
* -2 assignment will never be possible for that particular job
1887
*******************************************************************************/
1889
sge_host_match_static(lListElem *job, lListElem *ja_task, lListElem *host,
1890
lList *centry_list, lList *acl_list)
1893
const char *project;
1895
const char *eh_name;
1897
DENTER(TOP_LAYER, "sge_host_match_static");
1900
DRETURN(DISPATCH_OK);
1903
job_id = lGetUlong(job, JB_job_number);
1904
eh_name = lGetHost(host, EH_name);
1906
/* check if job owner has access rights to the host */
1907
if (!sge_has_access_(lGetString(job, JB_owner),
1908
lGetString(job, JB_group), lGetList(host, EH_acl),
1909
lGetList(host, EH_xacl), acl_list)) {
1910
DPRINTF(("Job %d has no permission for host %s\n",
1911
(int)job_id, eh_name));
1912
schedd_mes_add(job_id, SCHEDD_INFO_HASNOPERMISSION_SS,
1914
DRETURN(DISPATCH_NEVER_CAT);
1917
/* check if job can run on host based on required projects */
1918
if ((projects = lGetList(host, EH_prj))) {
1920
if ((!(project = lGetString(job, JB_project)))) {
1921
schedd_mes_add(job_id, SCHEDD_INFO_HASNOPRJ_S,
1923
DRETURN(DISPATCH_NEVER_CAT);
1926
if ((!prj_list_locate(projects, project))) {
1927
schedd_mes_add(job_id, SCHEDD_INFO_HASINCORRECTPRJ_SSS,
1928
project, "host", eh_name);
1929
DRETURN(DISPATCH_NEVER_CAT);
1933
/* check if job can run on host based on excluded projects */
1934
if ((projects = lGetList(host, EH_xprj))) {
1935
if (((project = lGetString(job, JB_project)) &&
1936
prj_list_locate(projects, project))) {
1937
schedd_mes_add(job_id, SCHEDD_INFO_EXCLPRJ_SSS,
1938
project, "host", eh_name);
1939
DRETURN(DISPATCH_NEVER_CAT);
1943
if (job_is_forced_centry_missing(job, centry_list, host)) {
1944
DRETURN(DISPATCH_NEVER_CAT);
1949
** check if job can run on host based on the list of jids/taskids
1950
** contained in the reschedule_unknown-list
1957
task_id = lGetUlong(ja_task, JAT_task_number);
1958
rulp = lGetList(host, EH_reschedule_unknown_list);
1960
for_each(ruep, rulp) {
1961
if (lGetUlong(ruep, RU_job_number) == job_id
1962
&& lGetUlong(ruep, RU_task_number) == task_id) {
1963
DPRINTF(("RU: Job "sge_u32"."sge_u32" Host "SFN"\n", job_id,
1965
schedd_mes_add(job_id, SCHEDD_INFO_CLEANUPNECESSARY_S,
1967
DRETURN(DISPATCH_NEVER_JOB);
1972
DRETURN(DISPATCH_OK);
1975
/****** sge_select_queue/is_requested() ****************************************
1977
* is_requested() -- Returns true if specified resource is requested.
1980
* bool is_requested(lList *req, const char *attr)
1983
* Returns true if specified resource is requested. Both long name
1984
* and shortcut name are checked.
1987
* lList *req - The request list (CE_Type)
1988
* const char *attr - The resource name.
1991
* bool - true if requested, otherwise false
1994
* MT-NOTE: is_requested() is MT safe
1995
*******************************************************************************/
1996
bool is_requested(lList *req, const char *attr)
1998
if (lGetElemStr(req, CE_name, attr) ||
1999
lGetElemStr(req, CE_shortcut , attr)) {
2006
static int load_check_alarm(char *reason, const char *name, const char *load_value,
2007
const char *limit_value, u_long32 relop,
2008
u_long32 type, lListElem *hep,
2009
lListElem *hlep, double lc_host,
2010
double lc_global, const lList *load_adjustments, int load_is_value)
2012
lListElem *job_load;
2015
#define STR_LC_DIAGNOSIS 1024
2016
char lc_diagnosis1[STR_LC_DIAGNOSIS], lc_diagnosis2[STR_LC_DIAGNOSIS];
2018
DENTER(TOP_LAYER, "load_check_alarm");
2026
if (!parse_ulong_val(&load, NULL, type, load_value, NULL, 0)) {
2028
sprintf(reason, MSG_SCHEDD_WHYEXCEEDINVALIDLOAD_SS, load_value, name);
2031
if (!parse_ulong_val(&limit, NULL, type, limit_value, NULL, 0)) {
2033
sprintf(reason, MSG_SCHEDD_WHYEXCEEDINVALIDTHRESHOLD_SS, name, limit_value);
2036
if (load_is_value) { /* we got no load - this is just the complex value */
2037
sge_strlcpy(lc_diagnosis2, MSG_SCHEDD_LCDIAGNOLOAD, STR_LC_DIAGNOSIS);
2038
} else if (((hlep && lc_host) || lc_global) &&
2039
(job_load = lGetElemStr(load_adjustments, CE_name, name))) { /* load correction */
2040
const char *load_correction_str;
2041
double load_correction;
2043
load_correction_str = lGetString(job_load, CE_stringval);
2044
if (!parse_ulong_val(&load_correction, NULL, type, load_correction_str, NULL, 0)) {
2046
sprintf(reason, MSG_SCHEDD_WHYEXCEEDINVALIDLOADADJUST_SS, name, load_correction_str);
2052
load_correction *= lc_host;
2054
if ((nproc = load_np_value_adjustment(name, hep, &load_correction)) > 0) {
2055
sprintf(lc_diagnosis1, MSG_SCHEDD_LCDIAGHOSTNP_SFI,
2056
load_correction_str, lc_host, nproc);
2059
sprintf(lc_diagnosis1, MSG_SCHEDD_LCDIAGHOST_SF,
2060
load_correction_str, lc_host);
2066
load_correction *= lc_global;
2067
sprintf(lc_diagnosis1, MSG_SCHEDD_LCDIAGGLOBAL_SF,
2068
load_correction_str, lc_global);
2070
/* it depends on relop in complex config
2071
whether load_correction is pos/neg */
2075
load += load_correction;
2076
sprintf(lc_diagnosis2, MSG_SCHEDD_LCDIAGPOSITIVE_SS, load_value, lc_diagnosis1);
2084
load -= load_correction;
2085
sprintf(lc_diagnosis2, MSG_SCHEDD_LCDIAGNEGATIVE_SS, load_value, lc_diagnosis1);
2089
sge_strlcpy(lc_diagnosis2, MSG_SCHEDD_LCDIAGNONE, STR_LC_DIAGNOSIS);
2092
/* is threshold exceeded ? */
2093
if (resource_cmp(relop, load, limit)) {
2095
if (type == TYPE_BOO){
2096
sprintf(reason, MSG_SCHEDD_WHYEXCEEDBOOLVALUE_SSSSS,
2097
name, load?MSG_TRUE:MSG_FALSE, lc_diagnosis2, map_op2str(relop), limit_value);
2100
sprintf(reason, MSG_SCHEDD_WHYEXCEEDFLOATVALUE_SFSSS,
2101
name, load, lc_diagnosis2, map_op2str(relop), limit_value);
2112
match = string_base_cmp(type, limit_value, load_value);
2115
sprintf(reason, MSG_SCHEDD_WHYEXCEEDSTRINGVALUE_SSSS, name, load_value, map_op2str(relop), limit_value);
2121
sprintf(reason, MSG_SCHEDD_WHYEXCEEDCOMPLEXTYPE_S, name);
2128
/****** sge_select_queue/load_np_value_adjustment() ****************************
2130
* load_np_value_adjustment() -- adjusts np load values for the number of processors
2133
* static int load_np_value_adjustment(const char* name, lListElem *hep,
2134
* double *load_correction)
2137
* Tests the load value name for "np_*". If this pattern is found, it will
2138
* retrieve the number of processors and adjusts the load_correction accordingly.
2139
* If the pattern is not found, it does nothing and returns 0 for number of processors.
2142
* const char* name - load value name
2143
* lListElem *hep - host object
2144
* double *load_correction - current load_correction for further corrections
2147
* static int - number of processors, or 0 if it was called on a none np load value
2150
* MT-NOTE: load_np_value_adjustment() is MT safe
2152
*******************************************************************************/
2153
static int load_np_value_adjustment(const char* name, lListElem *hep, double *load_correction) {
2156
if (!strncmp(name, "np_", 3)) {
2158
lListElem *ep_nproc;
2160
if ((ep_nproc = lGetSubStr(hep, HL_name, LOAD_ATTR_NUM_PROC, EH_load_list))) {
2161
const char* cp = lGetString(ep_nproc, HL_value);
2166
*load_correction /= nproc;
2178
static int resource_cmp(u_long32 relop, double req, double src_dl)
2184
match = ( req==src_dl);
2187
match = ( req<=src_dl);
2190
match = ( req<src_dl);
2193
match = ( req>src_dl);
2196
match = ( req>=src_dl);
2199
match = ( req!=src_dl);
2202
match = 0; /* default -> no match */
2208
/* ----------------------------------------
2212
checks given threshold of the queue;
2213
centry_list and exechost_list get used
2217
1 yes, the threshold is exceeded
2222
sge_load_alarm(char *reason, lListElem *qep, lList *threshold,
2223
const lList *exechost_list, const lList *centry_list,
2224
const lList *load_adjustments, bool is_check_consumable)
2226
lListElem *hep, *global_hep, *tep;
2227
u_long32 ulc_factor;
2228
const char *load_value = NULL;
2229
const char *limit_value = NULL;
2230
double lc_host = 0, lc_global = 0;
2231
int load_is_value = 0;
2233
DENTER(TOP_LAYER, "sge_load_alarm");
2236
/* no threshold -> no alarm */
2240
hep = host_list_locate(exechost_list, lGetHost(qep, QU_qhostname));
2244
sprintf(reason, MSG_SCHEDD_WHYEXCEEDNOHOST_S, lGetHost(qep, QU_qhostname));
2245
/* no host for queue -> ERROR */
2249
if ((lGetPosViaElem(hep, EH_load_correction_factor, SGE_NO_ABORT) >= 0)
2250
&& (ulc_factor=lGetUlong(hep, EH_load_correction_factor))) {
2251
lc_host = ((double)ulc_factor)/100;
2254
if ((global_hep = host_list_locate(exechost_list, SGE_GLOBAL_NAME)) != NULL) {
2255
if ((lGetPosViaElem(global_hep, EH_load_correction_factor, SGE_NO_ABORT) >= 0)
2256
&& (ulc_factor=lGetUlong(global_hep, EH_load_correction_factor)))
2257
lc_global = ((double)ulc_factor)/100;
2260
for_each (tep, threshold) {
2261
lListElem *hlep = NULL, *glep = NULL, *queue_ep = NULL, *cep = NULL;
2262
bool need_free_cep = false;
2264
u_long32 relop, type;
2266
name = lGetString(tep, CE_name);
2267
/* complex attriute definition */
2269
if (!(cep = centry_list_locate(centry_list, name))) {
2271
sprintf(reason, MSG_SCHEDD_WHYEXCEEDNOCOMPLEX_S, name);
2274
if (!is_check_consumable && lGetBool(cep, CE_consumable)) {
2279
hlep = lGetSubStr(hep, HL_name, name, EH_load_list);
2282
if (!lGetBool(cep, CE_consumable)) {
2284
load_value = lGetString(hlep, HL_value);
2286
} else if ((global_hep != NULL) &&
2287
((glep = lGetSubStr(global_hep, HL_name, name, EH_load_list)) != NULL)) {
2288
load_value = lGetString(glep, HL_value);
2292
queue_ep = lGetSubStr(qep, CE_name, name, QU_consumable_config_list);
2293
if (queue_ep != NULL) {
2294
load_value = lGetString(queue_ep, CE_stringval);
2298
sprintf(reason, MSG_SCHEDD_NOVALUEFORATTR_S, name);
2305
/* load thesholds... */
2306
if ((cep = get_attribute_by_name(global_hep, hep, qep, name, centry_list, DISPATCH_TIME_NOW, 0)) == NULL ) {
2308
sprintf(reason, MSG_SCHEDD_WHYEXCEEDNOCOMPLEX_S, name);
2311
need_free_cep = true;
2313
load_value = lGetString(cep, CE_pj_stringval);
2314
load_is_value = (lGetUlong(cep, CE_pj_dominant) & DOMINANT_TYPE_MASK) != DOMINANT_TYPE_CLOAD;
2317
relop = lGetUlong(cep, CE_relop);
2318
limit_value = lGetString(tep, CE_stringval);
2319
type = lGetUlong(cep, CE_valtype);
2321
if(load_check_alarm(reason, name, load_value, limit_value, relop,
2322
type, hep, hlep, lc_host, lc_global,
2323
load_adjustments, load_is_value)) {
2324
if (need_free_cep) {
2329
if (need_free_cep) {
2337
/* ----------------------------------------
2339
sge_load_alarm_reasons()
2341
checks given threshold of the queue;
2342
centry_list and exechost_list get used
2345
fills and returns string buffer containing reasons for alarm states
2348
char *sge_load_alarm_reason(lListElem *qep, lList *threshold,
2349
const lList *exechost_list, const lList *centry_list,
2350
char *reason, int reason_size,
2351
const char *threshold_type)
2353
DENTER(TOP_LAYER, "sge_load_alarm_reason");
2357
/* no threshold -> no alarm */
2358
if (threshold != NULL) {
2363
/* get actual complex values for queue */
2364
queue_complexes2scheduler(&rlp, qep, exechost_list, centry_list);
2366
/* check all thresholds */
2367
for_each (tep, threshold) {
2368
const char *name; /* complex attrib name */
2369
lListElem *cep; /* actual complex attribute */
2370
char dom_str[5]; /* dominance as string */
2371
u_long32 dom_val; /* dominance as u_long */
2372
char buffer[MAX_STRING_SIZE]; /* buffer for one line */
2373
const char *load_value; /* actual load value */
2374
const char *limit_value; /* limit defined by threshold */
2376
name = lGetString(tep, CE_name);
2378
if ( first == true ) {
2381
strncat(reason, "\n\t", reason_size);
2384
/* find actual complex attribute */
2385
if ((cep = lGetElemStr(rlp, CE_name, name)) == NULL) {
2386
/* no complex attribute for threshold -> ERROR */
2387
if (qinstance_state_is_unknown(qep)) {
2388
snprintf(buffer, MAX_STRING_SIZE, MSG_QINSTANCE_VALUEMISSINGMASTERDOWN_S, name);
2390
snprintf(buffer, MAX_STRING_SIZE, MSG_SCHEDD_NOCOMPLEXATTRIBUTEFORTHRESHOLD_S, name);
2392
strncat(reason, buffer, reason_size);
2396
limit_value = lGetString(tep, CE_stringval);
2398
if (!(lGetUlong(cep, CE_pj_dominant) & DOMINANT_TYPE_VALUE)) {
2399
dom_val = lGetUlong(cep, CE_pj_dominant);
2400
load_value = lGetString(cep, CE_pj_stringval);
2402
dom_val = lGetUlong(cep, CE_dominant);
2403
load_value = lGetString(cep, CE_stringval);
2406
monitor_dominance(dom_str, dom_val);
2408
snprintf(buffer, MAX_STRING_SIZE, "alarm %s:%s=%s %s-threshold=%s",
2416
strncat(reason, buffer, reason_size);
2425
/* ----------------------------------------
2427
sge_split_queue_load()
2429
splits the incoming queue list (1st arg) into an unloaded and
2430
overloaded (2nd arg) list according to the load values contained in
2431
the execution host list (3rd arg) and with respect to the definitions
2432
in the complex list (4th arg).
2434
temporarily sets QU_tagged4schedule but sets it to 0 on exit.
2438
-1 errors in functions called by sge_split_queue_load
2441
int sge_split_queue_load(
2442
lList **unloaded, /* QU_Type */
2443
lList **overloaded, /* QU_Type */
2444
lList *exechost_list, /* EH_Type */
2445
lList *centry_list, /* CE_Type */
2446
const lList *load_adjustments, /* CE_Type */
2447
lList *granted, /* JG_Type */
2448
bool is_consumable_load_alarm, /* is true, when the consumable evaluation
2450
bool is_comprehensive, /* do the load evaluation comprehensive (include consumables) */
2456
int ret, load_alarm, nverified = 0;
2459
DENTER(TOP_LAYER, "sge_split_queue_load");
2461
/* a job has been dispatched recently,
2462
but load correction is not in use at all */
2463
if (granted && !load_adjustments && !is_consumable_load_alarm) {
2467
if (!(granted && !load_adjustments)) {
2469
/* tag those queues being overloaded */
2470
for_each(qep, *unloaded) {
2471
thresholds = lGetList(qep, ttype);
2474
/* do not verify load alarm anew if a job has been dispatched recently
2475
but not to the host where this queue resides */
2476
if (!granted || (granted && (sconf_get_global_load_correction() ||
2477
lGetElemHost(granted, JG_qhostname, lGetHost(qep, QU_qhostname))))) {
2480
if (sge_load_alarm(reason, qep, thresholds, exechost_list, centry_list, load_adjustments, is_comprehensive) != 0) {
2482
if (ttype==QU_suspend_thresholds) {
2483
DPRINTF(("queue %s tagged to be in suspend alarm: %s\n",
2484
lGetString(qep, QU_full_name), reason));
2485
schedd_mes_add_global(SCHEDD_INFO_QUEUEINALARM_SS, lGetString(qep, QU_full_name), reason);
2487
DPRINTF(("queue %s tagged to be overloaded: %s\n",
2488
lGetString(qep, QU_full_name), reason));
2489
schedd_mes_add_global(SCHEDD_INFO_QUEUEOVERLOADED_SS, lGetString(qep, QU_full_name), reason);
2494
lSetUlong(qep, QU_tagged4schedule, load_alarm);
2499
DPRINTF(("verified threshold of %d queues\n", nverified));
2501
/* split queues in unloaded and overloaded lists */
2502
where = lWhere("%T(%I == %u)", lGetListDescr(*unloaded), QU_tagged4schedule, 0);
2503
ret = lSplit(unloaded, overloaded, "overloaded queues", where);
2507
for_each(qep, *overloaded) { /* make sure QU_tagged4schedule is 0 on exit */
2508
lSetUlong(qep, QU_tagged4schedule, 0);
2520
/****** sge_select_queue/sge_split_queue_slots_free() **************************
2522
* sge_split_queue_slots_free() -- ???
2525
* int sge_split_queue_slots_free(lList **free, lList **full)
2528
* Split queue list into queues with at least one slots and queues with
2529
* less than one free slot. The list optioally returned in full gets the
2530
* QNOSLOTS queue instance state set.
2533
* lList **free - Input queue instance list and return free slots.
2534
* lList **full - If non-NULL the full queue instances get returned here.
2537
* TODO: take a look into list hashing, not needed for temp list
2542
*******************************************************************************/
2543
int sge_split_queue_slots_free(lList **free, lList **full)
2545
lList *full_queues = NULL;
2546
lListElem *this = NULL;
2547
lListElem *next = NULL;
2549
DENTER(TOP_LAYER, "sge_split_queue_nslots_free");
2555
for (this=lFirst(*free); ((next=lNext(this))), this ; this = next) {
2556
if (qinstance_slots_used(this) >= (int)lGetUlong(this, QU_job_slots)) {
2558
this = lDechainElem(*free, this);
2560
if (!qinstance_state_is_full(this)) {
2561
schedd_mes_add_global(SCHEDD_INFO_QUEUEFULL_, lGetString(this, QU_full_name));
2562
qinstance_state_set_full(this, true);
2564
if (full_queues == NULL) {
2565
full_queues = lCreateListHash("full one", lGetListDescr(*free), false);
2567
lAppendElem(full_queues, this);
2569
else if (full != NULL) {
2570
if (*full == NULL) {
2571
*full = lCreateList("full one", lGetListDescr(*free));
2573
lAppendElem(*full, this);
2581
/* dump out the -tsm log and add the new queues to the disabled queue list */
2583
schedd_log_list(MSG_SCHEDD_LOGLIST_QUEUESFULLANDDROPPED , full_queues, QU_full_name);
2585
if (*full == NULL) {
2586
*full = full_queues;
2590
lAddList(*full, &full_queues);
2595
lFreeList(&full_queues);
2603
/* ----------------------------------------
2605
sge_split_suspended()
2607
splits the incoming queue list (1st arg) into non suspended queues and
2608
suspended queues (2nd arg)
2615
int sge_split_suspended(
2616
lList **queue_list, /* QU_Type */
2617
lList **suspended /* QU_Type */
2623
DENTER(TOP_LAYER, "sge_split_suspended");
2634
where = lWhere("%T(!(%I m= %u) && !(%I m= %u) && !(%I m= %u) && !(%I m= %u))",
2635
lGetListDescr(*queue_list),
2636
QU_state, QI_SUSPENDED,
2637
QU_state, QI_CAL_SUSPENDED,
2638
QU_state, QI_CAL_DISABLED,
2639
QU_state, QI_SUSPENDED_ON_SUBORDINATE);
2641
ret = lSplit(queue_list, &lp, "full queues", where);
2645
lListElem* mes_queue;
2647
for_each(mes_queue, lp) {
2648
if (!qinstance_state_is_manual_suspended(mes_queue)) {
2649
qinstance_state_set_manual_suspended(mes_queue, true);
2650
schedd_mes_add_global(SCHEDD_INFO_QUEUESUSP_, lGetString(mes_queue, QU_full_name));
2654
schedd_log_list(MSG_SCHEDD_LOGLIST_QUEUESSUSPENDEDANDDROPPED , lp, QU_full_name);
2657
if (suspended != NULL) {
2658
if (*suspended == NULL) {
2663
lAddList(*suspended, &lp);
2668
lFreeList(suspended);
2674
/* ----------------------------------------
2676
sge_split_cal_disabled()
2678
splits the incoming queue list (1st arg) into non disabled queues and
2679
cal_disabled queues (2nd arg)
2681
lList **queue_list, QU_Type
2682
lList **disabled QU_Type
2686
-1 errors in functions called by sge_split_queue_load
2690
sge_split_cal_disabled(lList **queue_list, lList **disabled)
2695
bool do_free_list = false;
2697
DENTER(TOP_LAYER, "sge_split_disabled");
2703
if (disabled == NULL) {
2705
do_free_list = true;
2709
where = lWhere("%T(!(%I m= %u))", lGetListDescr(*queue_list),
2710
QU_state, QI_CAL_DISABLED);
2711
ret = lSplit(queue_list, disabled, "full queues", where);
2714
if (*disabled != NULL) {
2715
lListElem* mes_queue;
2717
for_each(mes_queue, *disabled) {
2718
schedd_mes_add_global(SCHEDD_INFO_QUEUEDISABLED_, lGetString(mes_queue, QU_full_name));
2721
schedd_log_list(MSG_SCHEDD_LOGLIST_QUEUESDISABLEDANDDROPPED , *disabled, QU_full_name);
2724
lFreeList(disabled);
2731
/* ----------------------------------------
2733
sge_split_disabled()
2735
splits the incoming queue list (1st arg) into non disabled queues and
2736
disabled queues (2nd arg)
2738
lList **queue_list, QU_Type
2739
lList **disabled QU_Type
2743
-1 errors in functions called by sge_split_queue_load
2747
sge_split_disabled(lList **queue_list, lList **disabled)
2752
bool do_free_list = false;
2754
DENTER(TOP_LAYER, "sge_split_disabled");
2760
if (disabled == NULL) {
2762
do_free_list = true;
2766
where = lWhere("%T(!(%I m= %u) && !(%I m= %u))", lGetListDescr(*queue_list),
2767
QU_state, QI_DISABLED, QU_state, QI_CAL_DISABLED);
2768
ret = lSplit(queue_list, disabled, "full queues", where);
2771
if (*disabled != NULL) {
2772
lListElem* mes_queue;
2774
for_each(mes_queue, *disabled) {
2775
schedd_mes_add_global(SCHEDD_INFO_QUEUEDISABLED_, lGetString(mes_queue, QU_full_name));
2778
schedd_log_list(MSG_SCHEDD_LOGLIST_QUEUESDISABLEDANDDROPPED , *disabled, QU_full_name);
2781
lFreeList(disabled);
2788
/****** sge_select_queue/pe_cq_rejected() **************************************
2790
* pe_cq_rejected() -- Check, if -pe pe_name rejects cluster queue
2793
* static bool pe_cq_rejected(const char *pe_name, const lListElem *cq)
2796
* Match a jobs -pe 'pe_name' with pe_list cluster queue configuration.
2797
* True is returned if the parallel environment has no access.
2800
* const char *project - the pe request of a job (no wildcard)
2801
* const lListElem *cq - cluster queue (CQ_Type)
2804
* static bool - True, if rejected
2807
* MT-NOTE: pe_cq_rejected() is MT safe
2808
*******************************************************************************/
2809
static bool pe_cq_rejected(const char *pe_name, const lListElem *cq)
2811
const lListElem *alist;
2814
DENTER(TOP_LAYER, "pe_cq_rejected");
2821
for_each (alist, lGetList(cq, CQ_pe_list)) {
2822
if (lGetSubStr(alist, ST_name, pe_name, ASTRLIST_value)) {
2831
/****** sge_select_queue/project_cq_rejected() *********************************
2833
* project_cq_rejected() -- Check, if -P project rejects cluster queue
2836
* static bool project_cq_rejected(const char *project, const lListElem *cq)
2839
* Match a jobs -P 'project' with project/xproject cluster queue configuration.
2840
* True is returned if the project has no access.
2843
* const char *project - the project of a job or NULL
2844
* const lListElem *cq - cluster queue (CQ_Type)
2847
* static bool - True, if rejected
2850
* MT-NOTE: project_cq_rejected() is MT safe
2851
*******************************************************************************/
2852
static bool project_cq_rejected(const char *project, const lListElem *cq)
2854
const lList *projects;
2855
const lListElem *alist;
2858
DENTER(TOP_LAYER, "project_cq_rejected");
2861
/* without project: rejected, if each "project" profile
2862
does contain project references */
2863
for_each (alist, lGetList(cq, CQ_projects)) {
2864
if (!lGetList(alist, APRJLIST_value)) {
2872
/* with project: rejected, if project is excluded by each "xproject" profile */
2874
for_each (alist, lGetList(cq, CQ_xprojects)) {
2875
projects = lGetList(alist, APRJLIST_value);
2876
if (!projects || !prj_list_locate(projects, project)) {
2885
/* with project: rejected, if project is not included with each "project" profile */
2887
for_each (alist, lGetList(cq, CQ_projects)) {
2888
projects = lGetList(alist, APRJLIST_value);
2889
if (!projects || prj_list_locate(projects, project)) {
2901
/****** sge_select_queue/interactive_cq_rejected() *****************************
2903
* interactive_cq_rejected() -- Check, if -now yes rejects cluster queue
2906
* static bool interactive_cq_rejected(const lListElem *cq)
2909
* Returns true if -now yes jobs can not be run in cluster queue
2912
* const lListElem *cq - cluster queue (CQ_Type)
2915
* static bool - True, if rejected
2918
* MT-NOTE: interactive_cq_rejected() is MT safe
2919
*******************************************************************************/
2920
static bool interactive_cq_rejected(const lListElem *cq)
2922
const lListElem *alist;
2925
DENTER(TOP_LAYER, "interactive_cq_rejected");
2928
for_each (alist, lGetList(cq, CQ_qtype)) {
2929
if ((lGetUlong(alist, AQTLIST_value) & IQ)) {
2938
/****** sge_select_queue/access_cq_rejected() **********************************
2940
* access_cq_rejected() -- Check, if cluster queue rejects user/project
2943
* static bool access_cq_rejected(const char *user, const char *group, const
2944
* lList *acl_list, const lListElem *cq)
2950
* const char *user - Username
2951
* const char *group - Groupname
2952
* const lList *acl_list - List of access list definitions
2953
* const lListElem *cq - Cluster queue
2956
* static bool - True, if rejected
2959
* MT-NOTE: access_cq_rejected() is MT safe
2960
*******************************************************************************/
2961
static bool access_cq_rejected(const char *user, const char *group,
2962
const lList *acl_list, const lListElem *cq)
2964
const lListElem *alist;
2967
DENTER(TOP_LAYER, "access_cq_rejected");
2969
/* rejected, if user/group is excluded by each "xacl" profile */
2971
for_each (alist, lGetList(cq, CQ_xacl)) {
2972
if (!sge_contained_in_access_list_(user, group, lGetList(alist, AUSRLIST_value), acl_list)) {
2981
/* rejected, if user/group is not included in any "acl" profile */
2983
for_each (alist, lGetList(cq, CQ_acl)) {
2984
const lList *t = lGetList(alist, AUSRLIST_value);
2985
if (!t || sge_contained_in_access_list_(user, group, t, acl_list)) {
2994
/****** sge_select_queue/cqueue_match_static() *********************************
2996
* cqueue_match_static() -- Does cluster queue match the job?
2999
* static dispatch_t cqueue_match_static(const char *cqname,
3000
* sge_assignment_t *a)
3003
* The function tries to find reasons (-q, -l and -P) why the
3004
* entire cluster is not suited for the job.
3007
* const char *cqname - Cluster queue name
3008
* sge_assignment_t *a - ???
3011
* static dispatch_t - Returns DISPATCH_OK or DISPATCH_NEVER_CAT
3014
* MT-NOTE: cqueue_match_static() is MT safe
3015
*******************************************************************************/
3016
dispatch_t cqueue_match_static(const char *cqname, sge_assignment_t *a)
3018
const lList *hard_resource_list;
3019
const lListElem *cq;
3020
const char *project, *pe_name;
3023
DENTER(TOP_LAYER, "cqueue_match_static");
3025
/* detect if entire cluster queue ruled out due to -q */
3026
if (qref_list_cq_rejected(lGetList(a->job, JB_hard_queue_list), cqname, NULL, NULL) &&
3027
(!a->pe_name || qref_list_cq_rejected(lGetList(a->job, JB_master_hard_queue_list), cqname, NULL, NULL))) {
3028
DPRINTF(("Cluster Queue \"%s\" is not contained in the hard queue list (-q) that "
3029
"was requested by job %d\n", cqname, (int)a->job_id));
3030
schedd_mes_add(a->job_id, SCHEDD_INFO_NOTINHARDQUEUELST_S, cqname);
3031
DRETURN(DISPATCH_NEVER_CAT);
3034
/* check if cqueue was reserved for AR job */
3035
ar_id = lGetUlong(a->job, JB_ar);
3037
lListElem *ar_ep = lGetElemUlong(a->ar_list, AR_id, ar_id);
3039
if (ar_ep != NULL) {
3040
lListElem *gep = NULL;
3042
for_each(gep, lGetList(ar_ep, AR_granted_slots)) {
3043
char *ar_cqueue = cqueue_get_name_from_qinstance(lGetString(gep, JG_qname));
3045
if (strcmp(cqname, ar_cqueue) == 0) {
3055
DPRINTF(("Cluster Queue \"%s\" was not reserved by advance reservation %d\n", cqname, ar_id));
3056
schedd_mes_add(a->job_id, SCHEDD_INFO_QINOTARRESERVED_SI, cqname, ar_id);
3057
DRETURN(DISPATCH_NEVER_CAT);
3060
/* should never happen */
3061
DRETURN(DISPATCH_NEVER_CAT);
3065
cq = lGetElemStr(*(object_type_get_master_list(SGE_TYPE_CQUEUE)), CQ_name, cqname);
3067
/* detect if entire cluster queue ruled out due to -l */
3068
if ((hard_resource_list = lGetList(a->job, JB_hard_resource_list))) {
3069
dstring unsatisfied = DSTRING_INIT;
3070
if (request_cq_rejected(hard_resource_list, cq, a->centry_list,
3071
(!a->pe_name || a->slots == 1)?true:false, &unsatisfied)) {
3072
DPRINTF(("Cluster Queue \"%s\" can not fulfill resource request (-l %s) that "
3073
"was requested by job %d\n", cqname, sge_dstring_get_string(&unsatisfied), (int)a->job_id));
3074
schedd_mes_add(a->job_id, SCHEDD_INFO_CANNOTRUNINQUEUE_SSS, sge_dstring_get_string(&unsatisfied),
3075
cqname, "of cluster queue");
3076
sge_dstring_free(&unsatisfied);
3077
DRETURN(DISPATCH_NEVER_CAT);
3081
/* detect if entire cluster queue ruled out due to -P */
3082
project = a->project;
3083
if (project_cq_rejected(project, cq)) {
3084
DPRINTF(("Cluster queue \"%s\" does not work for -P %s job %d\n",
3085
cqname, project?project:"<no project>", (int)a->job_id));
3086
schedd_mes_add(a->job_id, SCHEDD_INFO_HASNOPRJ_S, "cluster queue", cqname);
3087
DRETURN(DISPATCH_NEVER_CAT);
3090
/* detect if entire cluster queue ruled out due to user_list/xuser_lists */
3091
if (access_cq_rejected(a->user, a->group, a->acl_list, cq)) {
3092
DPRINTF(("Job %d has no permission for cluster queue %s\n", (int)a->job_id, cqname));
3093
schedd_mes_add(a->job_id, SCHEDD_INFO_HASNOPERMISSION_SS, "cluster queue", cqname);
3094
DRETURN(DISPATCH_NEVER_CAT);
3097
/* detect if entire cluster queue ruled out due to -pe */
3098
if (a->pe && (pe_name=a->pe_name) && pe_cq_rejected(pe_name, cq)) {
3099
DPRINTF(("Cluster queue "SFQ" does not reference PE "SFQ"\n", cqname, pe_name));
3100
schedd_mes_add(a->job_id, SCHEDD_INFO_NOTINQUEUELSTOFPE_SS, cqname, pe_name);
3101
DRETURN(DISPATCH_NEVER_CAT);
3104
/* detect if entire cluster queue ruled out due to -I y aka -now yes */
3105
if (JOB_TYPE_IS_IMMEDIATE(lGetUlong(a->job, JB_type)) && interactive_cq_rejected(cq)) {
3106
DPRINTF(("Queue \"%s\" is not an interactive queue as requested by job %d\n",
3107
cqname, (int)a->job_id));
3108
schedd_mes_add(a->job_id, SCHEDD_INFO_QUEUENOTINTERACTIVE_S, cqname);
3109
DRETURN(DISPATCH_NEVER_CAT);
3112
DRETURN(DISPATCH_OK);
3116
/****** sge_select_queue/sequential_tag_queues_suitable4job() **************
3118
* sequential_tag_queues_suitable4job() -- ???
3123
* The start time of a queue is always returned using the QU_available_at
3126
* The overall behaviour of this function is somewhat dependent on the
3127
* value that gets passed to assignment->start and whether soft requests
3128
* were specified with the job:
3130
* (1) In case of now assignemnts (DISPATCH_TIME_NOW) only the first queue
3131
* suitable for jobs without soft requests is tagged. When soft requests
3132
* are specified all queues must be verified and tagged in order to find
3133
* the queue that fits best.
3135
* (2) In case of reservation assignments (DISPATCH_TIME_QUEUE_END) the earliest
3136
* time is searched when the resources of global/host/queue are sufficient
3137
* for the job. The time-wise iteration is then done for each single resources
3140
* Actually there are cases when iterating through all queues were not
3141
* needed: (a) if there was a global limitation search could stop once
3142
* a queue is found that causes no further delay (b) if job has
3143
* a soft request search could stop once a queue is found with minimum (=0)
3147
* sge_assignment_t *assignment - job info structure
3150
* dispatch_t - 0 ok got an assignment
3151
* start time(s) and slots are tagged
3152
* 1 no assignment at the specified time
3153
* -1 assignment will never be possible for all jobs of that category
3154
* -2 assignment will never be possible for that particular job
3157
* MT-NOTE: sequential_tag_queues_suitable4job() is not MT safe
3158
*******************************************************************************/
3160
sequential_tag_queues_suitable4job(sge_assignment_t *a)
3162
lList *skip_host_list = NULL;
3163
lList *skip_queue_list = NULL;
3165
lList *unclear_cqueue_list = NULL;
3166
lList *unclear_host_list = NULL;
3167
dstring rule_name = DSTRING_INIT;
3168
dstring rue_string = DSTRING_INIT;
3169
dstring limit_name = DSTRING_INIT;
3170
u_long32 ar_id = lGetUlong(a->job, JB_ar);
3171
lListElem *ar_ep = lGetElemUlong(a->ar_list, AR_id, ar_id);
3173
category_use_t use_category;
3174
bool got_solution = false;
3175
u_long32 tt_best = U_LONG32_MAX;
3176
u_long32 violations_best = U_LONG32_MAX;
3179
u_long32 tt_global = a->start;
3180
dispatch_t best_queue_result = DISPATCH_NEVER_CAT;
3181
int global_violations = 0;
3182
int queue_violations = 0;
3185
DENTER(TOP_LAYER, "sequential_tag_queues_suitable4job");
3187
/* assemble job category information */
3188
fill_category_use_t(a, &use_category, "NONE");
3190
/* restore job messages from previous dispatch runs of jobs of the same category */
3191
if (use_category.use_category) {
3192
schedd_mes_set_tmp_list(use_category.cache, CCT_job_messages, a->job_id);
3193
skip_host_list = lGetList(use_category.cache, CCT_ignore_hosts);
3194
skip_queue_list = lGetList(use_category.cache, CCT_ignore_queues);
3197
if (ar_ep != NULL) {
3198
result = match_static_advance_reservation(a);
3199
if (result != DISPATCH_OK) {
3204
a->pi->seq_global++;
3205
result = sequential_global_time(&tt_global, a, &global_violations);
3206
if (result != DISPATCH_OK && result != DISPATCH_MISSING_ATTR) {
3211
for_each(qep, a->queue_list) {
3212
u_long32 tt_host = a->start;
3213
u_long32 tt_queue = a->start;
3214
const char *eh_name;
3215
const char *qname, *cqname;
3216
u_long32 tt_rqs = 0;
3221
qname = lGetString(qep, QU_full_name);
3222
cqname = lGetString(qep, QU_qname);
3223
eh_name = lGetHost(qep, QU_qhostname);
3225
/* try to foreclose the cluster queue */
3226
if (lGetElemStr(a->skip_cqueue_list, CTI_name, cqname)) {
3227
DPRINTF(("skip cluster queue %s\n", cqname));
3230
if (lGetElemStr(a->skip_host_list, CTI_name, eh_name)) {
3231
DPRINTF(("rqs skip host %s\n", eh_name));
3234
if (skip_host_list && lGetElemStr(skip_host_list, CTI_name, eh_name)){
3235
DPRINTF(("job category skip host %s\n", eh_name));
3239
if (skip_queue_list && lGetElemStr(skip_queue_list, CTI_name, qname)){
3240
DPRINTF(("job category skip queue %s\n", qname));
3245
/* resource quota matching */
3246
if ((result = rqs_by_slots(a, cqname, eh_name, &tt_rqs, &is_global,
3247
&rue_string, &limit_name, &rule_name, got_solution?tt_best:U_LONG32_MAX)) != DISPATCH_OK) {
3248
best_queue_result = find_best_result(result, best_queue_result);
3249
if (is_global == false) {
3250
DPRINTF(("no match due to GLOBAL RQS\n", eh_name));
3253
break; /* hit a global limit */
3255
if (got_solution && is_not_better(a, violations_best, tt_best, 0, tt_rqs)) {
3256
DPRINTF(("CUT TREE: Due to RQS for \"%s\"\n", qname));
3257
if (is_global == false)
3264
/* static cqueue matching */
3265
if (!lGetElemStr(unclear_cqueue_list, CTI_name, cqname)) {
3268
a->pi->seq_cqstat++;
3269
if (cqueue_match_static(cqname, a) != DISPATCH_OK) {
3270
lAddElemStr(&(a->skip_cqueue_list), CTI_name, cqname, CTI_Type);
3271
best_queue_result = find_best_result(DISPATCH_NEVER_CAT, best_queue_result);
3274
lAddElemStr(&unclear_cqueue_list, CTI_name, cqname, CTI_Type);
3277
/* static host matching */
3278
if (!(hep = lGetElemHost(a->host_list, EH_name, eh_name))) {
3279
ERROR((SGE_EVENT, MSG_SCHEDD_UNKNOWN_HOST_SS, qname, eh_name));
3280
if (skip_queue_list != NULL) {
3281
lAddElemStr(&skip_queue_list, CTI_name, qname, CTI_Type);
3285
if (!lGetElemStr(unclear_host_list, CTI_name, eh_name)) {
3288
if (qref_list_eh_rejected(lGetList(a->job, JB_hard_queue_list), eh_name, a->hgrp_list)) {
3289
schedd_mes_add(a->job_id, SCHEDD_INFO_NOTINHARDQUEUELST_S, eh_name);
3290
DPRINTF(("Host \"%s\" is not contained in the hard queue list (-q) that "
3291
"was requested by job %d\n", eh_name, (int)a->job_id));
3293
lAddElemStr(&skip_host_list, CTI_name, eh_name, CTI_Type);
3295
lAddElemStr(&(a->skip_host_list), CTI_name, eh_name, CTI_Type);
3296
best_queue_result = find_best_result(DISPATCH_NEVER_CAT, best_queue_result);
3300
if ((result=sge_host_match_static(a->job, a->ja_task, hep, a->centry_list, a->acl_list))) {
3301
if (result == DISPATCH_NEVER_JOB || !skip_host_list)
3302
lAddElemStr(&(a->skip_host_list), CTI_name, eh_name, CTI_Type);
3304
lAddElemStr(&(a->skip_host_list), CTI_name, eh_name, CTI_Type);
3305
best_queue_result = find_best_result(result, best_queue_result);
3308
lAddElemStr(&unclear_host_list, CTI_name, eh_name, CTI_Type);
3311
/* static queue matching */
3314
if (sge_queue_match_static(qep, a->job, NULL, a->ckpt, a->centry_list,
3315
a->acl_list, a->hgrp_list, a->ar_list) != DISPATCH_OK) {
3316
if (skip_queue_list)
3317
lAddElemStr(&skip_queue_list, CTI_name, qname, CTI_Type);
3318
best_queue_result = find_best_result(DISPATCH_NEVER_CAT, best_queue_result);
3323
if (ar_ep != NULL) {
3324
lListElem *ar_queue;
3326
dstring reason = DSTRING_INIT;
3328
ar_queue = lGetSubStr(ar_ep, QU_full_name, qname, AR_reserved_queues);
3331
* We are only interested in the static complexes on queue/host level.
3332
* These are tagged in this function. The result doesn't matter
3334
for_each(rep, lGetList(a->job, JB_hard_resource_list)) {
3335
const char *attrname = lGetString(rep, CE_name);
3336
lListElem *cplx_el = lGetElemStr(a->centry_list, CE_name, attrname);
3338
if (lGetBool(cplx_el, CE_consumable)) {
3341
sge_dstring_clear(&reason);
3342
cplx_el = get_attribute_by_name(a->gep, hep, qep, attrname, a->centry_list, a->start, a->duration);
3343
if (cplx_el == NULL) {
3344
result = DISPATCH_MISSING_ATTR;
3345
sge_dstring_sprintf(&reason, MSG_ATTRIB_MISSINGATTRIBUTEXINCOMPLEXES_S, attrname);
3349
result = match_static_resource(1, rep, cplx_el, &reason, false, false, false);
3350
lFreeElem(&cplx_el);
3351
if (result != DISPATCH_OK) {
3354
lSetUlong(rep, CE_tagged, HOST_TAG);
3356
if (result != DISPATCH_OK) {
3357
char buff[1024 + 1];
3358
centry_list_append_to_string(lGetList(a->job, JB_hard_resource_list), buff, sizeof(buff) - 1);
3359
if (*buff && (buff[strlen(buff) - 1] == '\n')) {
3360
buff[strlen(buff) - 1] = 0;
3362
schedd_mes_add(a->job_id, SCHEDD_INFO_CANNOTRUNATHOST_SSS, buff,
3363
lGetHost(hep, EH_name), sge_dstring_get_string(&reason));
3365
result = sequential_queue_time(&tt_queue, a, &queue_violations, ar_queue);
3366
if (result != DISPATCH_OK) {
3367
DPRINTF(("ar queue %s returned %d\n", qname, result));
3368
if (skip_queue_list) {
3369
lAddElemStr(&skip_queue_list, CTI_name, lGetString(ar_queue, QU_full_name), CTI_Type);
3371
best_queue_result = find_best_result(result, best_queue_result);
3374
tt_global = tt_host = tt_queue;
3376
sge_dstring_free(&reason);
3378
queue_violations = global_violations;
3380
/* dynamic host matching */
3383
result = sequential_host_time(&tt_host, a, &queue_violations, hep);
3384
if (result != DISPATCH_OK && result != DISPATCH_MISSING_ATTR) {
3386
lAddElemStr(&skip_host_list, CTI_name, eh_name, CTI_Type);
3388
lAddElemStr(&(a->skip_host_list), CTI_name, eh_name, CTI_Type);
3389
DPRINTF(("host %s returned %d\n", eh_name, result));
3390
best_queue_result = find_best_result(result, best_queue_result);
3393
if (got_solution && is_not_better(a, violations_best, tt_best, queue_violations, tt_host)) {
3394
lAddElemStr(&(a->skip_host_list), CTI_name, eh_name, CTI_Type);
3395
DPRINTF(("CUT TREE: Due to HOST for \"%s\"\n", qname));
3399
/* dynamic queue matching */
3402
result = sequential_queue_time(&tt_queue, a, &queue_violations, qep);
3403
if (result != DISPATCH_OK) {
3404
DPRINTF(("queue %s returned %d\n", qname, result));
3405
if (skip_queue_list) {
3406
lAddElemStr(&skip_queue_list, CTI_name, qname, CTI_Type);
3408
best_queue_result = find_best_result(result, best_queue_result);
3411
if (got_solution && is_not_better(a, violations_best, tt_best, queue_violations, tt_queue)) {
3412
DPRINTF(("CUT TREE: Due to QUEUE for \"%s\"\n", qname));
3417
if (result == DISPATCH_OK) {
3418
u_long32 this_tt = 0, this_violations = 0;
3419
lSetUlong(qep, QU_tag, 1); /* tag number of slots per queue and time when it will be available */
3422
if (a->start == DISPATCH_TIME_QUEUE_END) {
3425
DPRINTF((" global "sge_u32" host "sge_u32" queue "sge_u32" rqs "sge_u32"\n",
3426
tt_global, tt_host, tt_queue, tt_rqs));
3427
lSetUlong(qep, QU_available_at, this_tt = MAX(tt_queue, MAX(tt_host, MAX(tt_rqs, tt_global))));
3430
this_violations = lGetUlong(qep, QU_soft_violation);
3432
DPRINTF((" set Q: %s number=1 when="sge_u32" violations="sge_u32"\n", qname, this_tt, this_violations));
3433
best_queue_result = DISPATCH_OK;
3435
if (!a->is_reservation) {
3436
if (!a->is_soft || this_violations == 0)
3439
/* further search is pointless, if reservation depends from a global consumble */
3440
/* earlier start time has higher preference than lower soft violations */
3441
if (ar_ep == NULL && tt_global >= MAX(MAX(tt_queue, tt_host), tt_rqs) && (!a->is_soft || this_violations == 0))
3444
got_solution = true;
3445
violations_best = this_violations;
3448
DPRINTF(("queue %s reported %d\n", qname, result));
3449
if (skip_queue_list) {
3450
lAddElemStr(&skip_queue_list, CTI_name, qname, CTI_Type);
3452
best_queue_result = find_best_result(result, best_queue_result);
3456
lFreeList(&unclear_cqueue_list);
3457
lFreeList(&unclear_host_list);
3459
/* cache so far generated messages with the job category */
3460
if (use_category.use_category) {
3461
lList *temp = schedd_mes_get_tmp_list();
3463
lSetList(use_category.cache, CCT_job_messages, lCopyList(NULL, temp));
3467
sge_dstring_free(&rue_string);
3468
sge_dstring_free(&limit_name);
3469
sge_dstring_free(&rule_name);
3471
DRETURN(best_queue_result);
3476
/****** sge_select_queue/add_pe_slots_to_category() ****************************
3478
* add_pe_slots_to_category() -- defines an array of valid slot values
3481
* static bool add_pe_slots_to_category(category_use_t *use_category,
3482
* u_long32 *max_slotsp, lListElem *pe, int min_slots, int max_slots, lList
3486
* In case of pe ranges does this function alocate memory and filles it wil
3487
* valid pe slot values. If a category is set, it stores them the category
3491
* category_use_t *use_category - category caching structure, must not be NULL
3492
* u_long32 *max_slotsp - number of different slot settings
3493
* lListElem *pe - pe, must not be NULL
3494
* int min_slots - min slot setting (pe range)
3495
* int max_slots - max slot setting (pe range)
3496
* lList *pe_range - pe range, must not be NULL
3499
* static bool - true, if successfull
3502
* MT-NOTE: add_pe_slots_to_category() is MT safe
3504
*******************************************************************************/
3506
add_pe_slots_to_category(category_use_t *use_category, u_long32 *max_slotsp, lListElem *pe,
3507
int min_slots, int max_slots, lList *pe_range)
3509
if (use_category->cache != NULL) {
3510
use_category->posible_pe_slots = lGetRef(use_category->cache, CCT_pe_job_slots);
3513
/* --- prepare the posible slots for the binary search */
3514
if (use_category->posible_pe_slots == NULL) {
3518
for (slots = min_slots; slots <= max_slots; slots++) {
3520
/* sort out slot numbers that would conflict with allocation rule */
3521
if (sge_pe_slots_per_host(pe, slots) == 0) {
3525
/* only slot numbers from jobs PE range request */
3526
if (range_list_is_id_within(pe_range, slots) == 0) {
3530
if (use_category->posible_pe_slots == NULL) {
3531
use_category->posible_pe_slots = malloc((max_slots - min_slots + 1) * sizeof(u_long32));
3532
if (use_category->posible_pe_slots == NULL) {
3536
use_category->posible_pe_slots[(*max_slotsp)] = slots;
3540
if (use_category->cache != NULL) {
3541
lSetRef(use_category->cache, CCT_pe_job_slots, use_category->posible_pe_slots);
3542
lSetUlong(use_category->cache, CCT_pe_job_slot_count, *max_slotsp);
3543
use_category->is_pe_slots_rev = true;
3546
use_category->is_pe_slots_rev = false;
3550
use_category->is_pe_slots_rev = true;
3551
*max_slotsp = lGetUlong(use_category->cache, CCT_pe_job_slot_count);
3556
/****** sge_select_queue/fill_category_use_t() **************
3558
* fill_category_use_t() -- fills the category_use_t structure.
3561
* void fill_category_use_t(sge_assignment_t *a, category_use_t
3562
* *use_category, const char *pe_name)
3565
* If a cache structure for the given PE does not exist, it
3566
* will generate the neccissary data structures.
3569
* sge_assignment_t *a - job info structure (in)
3570
* category_use_t *use_category - category info structure (out)
3571
* const char* pe_name - the current pe name or "NONE"
3574
* MT-NOTE: fill_category_use_t() is MT safe
3575
*******************************************************************************/
3576
static void fill_category_use_t(const sge_assignment_t *a, category_use_t *use_category, const char *pe_name)
3578
lListElem *job = a->job;
3580
DENTER(TOP_LAYER, "fill_category_use_t");
3582
use_category->category = lGetRef(job, JB_category);
3583
if (use_category->category != NULL) {
3584
use_category->cache = lGetElemStr(lGetList(use_category->category, CT_cache), CCT_pe_name, pe_name);
3585
if (use_category->cache == NULL) {
3586
use_category->cache = lCreateElem(CCT_Type);
3588
lSetString(use_category->cache, CCT_pe_name, pe_name);
3589
lSetList(use_category->cache, CCT_ignore_queues, lCreateList("", CTI_Type));
3590
lSetList(use_category->cache, CCT_ignore_hosts, lCreateList("", CTI_Type));
3591
lSetList(use_category->cache, CCT_job_messages, lCreateList("", MES_Type));
3593
if (lGetList(use_category->category, CT_cache) == NULL) {
3594
lSetList(use_category->category, CT_cache, lCreateList("pe_cache", CCT_Type));
3596
lAppendElem(lGetList(use_category->category, CT_cache), use_category->cache);
3599
use_category->mod_category = true;
3601
use_category->use_category = ((a->start == DISPATCH_TIME_NOW) &&
3602
lGetUlong(use_category->category, CT_refcount) > MIN_JOBS_IN_CATEGORY) ? true : false;
3605
use_category->cache = NULL;
3606
use_category->mod_category = false;
3607
use_category->use_category = false;
3609
use_category->posible_pe_slots = NULL;
3610
use_category->is_pe_slots_rev = false;
3615
static int get_soft_violations(sge_assignment_t *a, lListElem *hep, lListElem *qep)
3619
violations = compute_soft_violations(a, NULL, 0,
3620
lGetList(a->gep, EH_load_list),
3621
lGetList(a->gep, EH_consumable_config_list),
3622
lGetList(a->gep, EH_resource_utilization),
3623
DOMINANT_LAYER_GLOBAL, 0, GLOBAL_TAG);
3625
violations = compute_soft_violations(a, NULL, violations,
3626
lGetList(hep, EH_load_list),
3627
lGetList(hep, EH_consumable_config_list),
3628
lGetList(hep, EH_resource_utilization),
3629
DOMINANT_LAYER_HOST, 0, HOST_TAG);
3631
violations = compute_soft_violations(a, qep, violations,
3633
lGetList(qep, QU_consumable_config_list),
3634
lGetList(qep, QU_resource_utilization),
3635
DOMINANT_LAYER_QUEUE, 0, QUEUE_TAG);
3641
/****** sge_select_queue/parallel_tag_queues_suitable4job() *********
3643
* parallel_tag_queues_suitable4job() -- Tag queues/hosts for
3644
* a comprehensive/parallel assignment
3647
* static int parallel_tag_queues_suitable4job(sge_assignment_t
3651
* We tag the amount of available slots for that job at global, host and
3652
* queue level under consideration of all constraints of the job. We also
3653
* mark those queues that are suitable as a master queue as possible master
3654
* queues and count the number of violations of the job's soft request.
3655
* The method below is named comprehensive since it does the tagging game
3656
* for the whole parallel job and under consideration of all available
3657
* resources that could help to suffice the jobs request. This is necessary
3658
* to prevent consumable resource limited at host/global level multiple
3661
* While tagging we also set queues QU_host_seq_no based on the sort
3662
* order of each host. Assumption is the host list passed is sorted
3663
* according the load forumla.
3666
* sge_assignment_t *assignment - ???
3667
* category_use_t use_category - information on how to use the job category
3670
* static dispatch_t - 0 ok got an assignment
3671
* 1 no assignment at the specified time
3672
* -1 assignment will never be possible for all jobs of that category
3673
* -2 assignment will never be possible for that particular job
3676
* MT-NOTE: parallel_tag_queues_suitable4job() is not MT safe
3677
*******************************************************************************/
3679
parallel_tag_queues_suitable4job(sge_assignment_t *a, category_use_t *use_category, int *available_slots)
3681
lListElem *job = a->job;
3682
bool need_master_host = (lGetList(job, JB_master_hard_queue_list) != NULL) ? true : false;
3684
int accu_host_slots, accu_host_slots_qend;
3685
bool have_master_host, have_master_host_qend, suited_as_master_host, suited_as_master_host_qend = false;
3687
dispatch_t best_result = DISPATCH_NEVER_CAT;
3688
int gslots = a->slots;
3689
int gslots_qend = 0;
3690
int allocation_rule, minslots;
3691
dstring rule_name = DSTRING_INIT;
3692
dstring rue_name = DSTRING_INIT;
3693
dstring limit_name = DSTRING_INIT;
3696
DENTER(TOP_LAYER, "parallel_tag_queues_suitable4job");
3698
clean_up_parallel_job(a);
3700
if (use_category->use_category) {
3701
schedd_mes_set_tmp_list(use_category->cache, CCT_job_messages, a->job_id);
3704
/* remove reasons from last unsuccesful iteration */
3705
clean_monitor_alp();
3707
if (lGetUlong(a->job, JB_ar) == 0) {
3709
a->pi->par_global++;
3710
parallel_global_slots(a, &gslots, &gslots_qend);
3713
if (gslots < a->slots) {
3714
best_result = (gslots_qend < a->slots) ? DISPATCH_NEVER_CAT : DISPATCH_NOT_AT_TIME;
3715
*available_slots = gslots_qend;
3717
if (best_result == DISPATCH_NOT_AT_TIME) {
3718
DPRINTF(("GLOBAL will <category_later> get us %d slots (%d)\n",
3719
gslots, gslots_qend));
3721
DPRINTF(("GLOBAL will <category_never> get us %d slots (%d)\n",
3722
gslots, gslots_qend));
3725
lList *skip_host_list = NULL;
3726
lList *unclear_cqueue_list = NULL;
3728
int last_accu_host_slots, last_accu_host_slots_qend;
3730
if (use_category->use_category) {
3731
skip_host_list = lGetList(use_category->cache, CCT_ignore_hosts);
3734
accu_host_slots = accu_host_slots_qend = 0;
3735
have_master_host = have_master_host_qend = false;
3736
allocation_rule = sge_pe_slots_per_host(a->pe, a->slots);
3737
minslots = ALLOC_RULE_IS_BALANCED(allocation_rule)?allocation_rule:1;
3740
u_long32 host_seq_no = 0;
3741
const char *eh_name;
3745
a->soft_violations = 0;
3746
for_each (qep, a->queue_list) {
3747
lSetUlong(qep, QU_soft_violation, 0);
3748
if (!(hep = host_list_locate(a->host_list, lGetHost(qep, QU_qhostname))))
3750
get_soft_violations(a, hep, qep);
3751
DPRINTF(("EVALUATE: %s soft violations %d\n", lGetString(qep, QU_full_name), lGetUlong(qep, QU_soft_violation)));
3753
if (sconf_get_queue_sort_method() == QSM_LOAD)
3754
lPSortList(a->queue_list, "%I+ %I+ %I+", QU_soft_violation, QU_host_seq_no, QU_seq_no);
3756
lPSortList(a->queue_list, "%I+ %I+ %I+", QU_soft_violation, QU_seq_no, QU_host_seq_no);
3758
if (sconf_get_queue_sort_method() == QSM_LOAD)
3759
lPSortList(a->queue_list, "%I+ %I+", QU_host_seq_no, QU_seq_no);
3761
lPSortList(a->queue_list, "%I+ %I+", QU_seq_no, QU_host_seq_no);
3764
for_each (hep, a->host_list)
3765
lSetUlong(hep, EH_seq_no, -1);
3767
for_each (qep, a->queue_list) {
3768
DPRINTF(("AFTER SORT: %s soft violations %d\n", lGetString(qep, QU_full_name), lGetUlong(qep, QU_soft_violation)));
3769
eh_name = lGetHost(qep, QU_qhostname);
3770
if (!(hep = host_list_locate(a->host_list, eh_name)))
3772
if (lGetUlong(hep, EH_seq_no) == -1)
3773
lSetUlong(hep, EH_seq_no, host_seq_no++);
3775
lPSortList(a->host_list, "%I+", EH_seq_no);
3780
last_accu_host_slots = accu_host_slots;
3781
last_accu_host_slots_qend = accu_host_slots_qend;
3783
for_each (hep, a->host_list) {
3785
int hslots = 0, hslots_qend = 0;
3786
const char *eh_name = lGetHost(hep, EH_name);
3788
if (!strcasecmp(eh_name, SGE_GLOBAL_NAME) || !strcasecmp(eh_name, SGE_TEMPLATE_NAME)) {
3792
/* this host does not work for this category, skip it */
3793
if (skip_host_list && lGetElemStr(skip_host_list, CTI_name, eh_name)){
3796
if (lGetElemStr(a->skip_host_list, CTI_name, eh_name)){
3801
* do not perform expensive checks for this host if there
3802
* is not at least one free queue residing at this host:
3803
* see if there are queues which are not disbaled/suspended/calender;
3804
* which have at least one free slot, which are not unknown, several alarms
3807
if (lGetElemHost(a->queue_list, QU_qhostname, eh_name)) {
3809
parallel_tag_hosts_queues(a, hep, &hslots, &hslots_qend,
3810
&suited_as_master_host, use_category, &unclear_cqueue_list);
3812
if (hslots >= minslots) {
3813
/* Now RQ limit debitation can be performed for each queue instance based on QU_tag.
3814
While considering allocation_rule and master queue demand we try to get as
3815
much from each queue as needed, but not more than 'maxslots'. When we are through
3816
all queue instances and got not even 'minslots' the slots must be undebited. */
3817
bool got_master_queue = false;
3819
int rqs_hslots = 0, maxslots, slots, slots_qend;
3821
/* still broken for cases where round robin requires multiple rounds */
3822
if (ALLOC_RULE_IS_BALANCED(allocation_rule))
3823
maxslots = allocation_rule;
3824
else if (allocation_rule == ALLOC_RULE_FILLUP)
3825
maxslots = a->slots - accu_host_slots;
3826
else /* ALLOC_RULE_ROUNDROBIN */
3829
/* debit on RQS limits */
3830
for_each (qep, a->queue_list) {
3831
const char *qname = lGetString(qep, QU_full_name);
3833
if (sge_hostcmp(eh_name, lGetHost(qep, QU_qhostname)))
3836
DPRINTF(("tagged: %d maxslots: %d rqs_hslots: %d\n", (int)lGetUlong(qep, QU_tag), maxslots, rqs_hslots));
3837
DPRINTF(("SLOT HARVESTING: %s soft violations: %d master: %d\n",
3838
lGetString(qep, QU_full_name), (int)lGetUlong(qep, QU_soft_violation), (int)lGetUlong(qep, QU_tagged4schedule)));
3840
/* how much is still needed */
3841
slots = MIN(lGetUlong(qep, QU_tag), maxslots - rqs_hslots);
3843
if (need_master_host) {
3844
/* care for slave tasks assignments of -masterq jobs */
3845
if (!have_master_host && !got_master_queue && lGetUlong(qep, QU_tagged4schedule)==0
3846
&& accu_host_slots + rqs_hslots + slots == a->slots)
3848
/* care for master tasks assignments of -masterq jobs */
3849
if (lGetUlong(qep, QU_tagged4schedule)==1) {
3850
if (!have_master_host && !got_master_queue)
3851
slots = MIN(slots, 1);
3858
if (lGetUlong(a->job, JB_ar)==0 && !a->is_advance_reservation) {
3859
DPRINTF(("trying to debit %d slots in queue "SFQ"\n", slots, qname));
3860
parallel_check_and_debit_rqs_slots(a, eh_name, lGetString(qep, QU_qname),
3861
&slots, &slots_qend, &rule_name, &rue_name, &limit_name);
3862
DPRINTF(("could debiting %d slots in queue "SFQ"\n", slots, qname));
3867
/* add gdil element for this queue */
3868
if (!(gdil_ep=lGetElemStr(a->gdil, JG_qname, qname))) {
3869
gdil_ep = lAddElemStr(&(a->gdil), JG_qname, qname, JG_Type);
3870
lSetUlong(gdil_ep, JG_qversion, lGetUlong(qep, QU_version));
3871
lSetHost(gdil_ep, JG_qhostname, eh_name);
3872
lSetUlong(gdil_ep, JG_slots, slots);
3874
/* master queue must be at first position */
3875
if (need_master_host && !have_master_host && lGetUlong(qep, QU_tagged4schedule)==1) {
3876
lDechainElem(a->gdil, gdil_ep);
3877
lInsertElem(a->gdil, NULL, gdil_ep);
3878
got_master_queue = true;
3881
lAddUlong(gdil_ep, JG_slots, slots);
3884
a->soft_violations += slots * lGetUlong(qep, QU_soft_violation);
3886
lSetUlong(qep, QU_tag, slots);
3888
rqs_hslots += slots;
3890
if (rqs_hslots == maxslots)
3894
if (rqs_hslots < minslots) {
3895
const void *iter = NULL;
3896
DPRINTF(("reverting debitation since "SFQ" gets us only %d slots while min. %d are needed\n",
3897
eh_name, rqs_hslots, minslots));
3899
/* must revert all RQS debitations */
3900
for (qep = lGetElemHostFirst(a->queue_list, QU_qhostname, eh_name, &iter); qep;
3901
qep = lGetElemHostNext(a->queue_list, QU_qhostname, eh_name, &iter)) {
3902
slots = lGetUlong(qep, QU_tag);
3904
if (lGetUlong(a->job, JB_ar)==0 && !a->is_advance_reservation) {
3905
parallel_revert_rqs_slot_debitation(a, eh_name, lGetString(qep, QU_qname),
3906
slots, 0, &rule_name, &rue_name, &limit_name);
3908
if ((gdil_ep=lGetElemStr(a->gdil, JG_qname, lGetString(qep, QU_full_name)))) {
3909
if (lGetUlong(gdil_ep, JG_slots) - slots != 0)
3910
lSetUlong(gdil_ep, JG_slots, lGetUlong(gdil_ep, JG_slots) - slots);
3912
lDelElemStr(&(a->gdil), JG_qname, lGetString(qep, QU_full_name));
3914
a->soft_violations -= slots * lGetUlong(qep, QU_soft_violation);
3915
lSetUlong(qep, QU_tag, 0);
3919
suited_as_master_host = false;
3921
if (got_master_queue)
3922
suited_as_master_host = true;
3923
accu_host_slots += rqs_hslots;
3927
if ( hslots_qend >= minslots && a->care_reservation && !a->is_reservation) {
3929
bool got_master_queue = false;
3930
int rqs_hslots = 0, maxslots, slots, slots_qend;
3932
/* still broken for cases where round robin requires multiple rounds */
3933
if (ALLOC_RULE_IS_BALANCED(allocation_rule))
3934
maxslots = allocation_rule;
3935
else if (allocation_rule == ALLOC_RULE_FILLUP)
3936
maxslots = a->slots - accu_host_slots_qend;
3937
else /* ALLOC_RULE_ROUNDROBIN */
3940
/* debit on RQS limits */
3941
for_each (qep, a->queue_list) {
3942
if (sge_hostcmp(eh_name, lGetHost(qep, QU_qhostname)))
3946
slots_qend = MIN(lGetUlong(qep, QU_tag_qend), maxslots - rqs_hslots);
3948
if (need_master_host) {
3949
/* care for slave tasks assignments of -masterq jobs */
3950
if (!have_master_host_qend && !got_master_queue && lGetUlong(qep, QU_tagged4schedule)==0
3951
&& accu_host_slots_qend + rqs_hslots + slots_qend == a->slots)
3953
/* care for master tasks assignments of -masterq jobs */
3954
if (need_master_host && lGetUlong(qep, QU_tagged4schedule)==1) {
3955
if (!have_master_host_qend && !got_master_queue)
3956
slots_qend = MIN(slots_qend, 1);
3962
if (lGetUlong(a->job, JB_ar)==0 && !a->is_advance_reservation) {
3963
DPRINTF(("trying to debit %d slots_qend in queue "SFQ"\n", slots_qend, lGetString(qep, QU_full_name)));
3964
parallel_check_and_debit_rqs_slots(a, eh_name, lGetString(qep, QU_qname),
3965
&slots, &slots_qend, &rule_name, &rue_name, &limit_name);
3966
DPRINTF(("could debiting %d slots_qend in queue "SFQ"\n", slots_qend, lGetString(qep, QU_full_name)));
3968
lSetUlong(qep, QU_tag_qend, slots_qend);
3969
rqs_hslots += slots_qend;
3971
if (rqs_hslots == maxslots)
3974
if (rqs_hslots < minslots) {
3975
const void *iter = NULL;
3976
DPRINTF(("reverting debitation since "SFQ" gets us only %d slots while min. %d are needed\n",
3977
eh_name, rqs_hslots, minslots));
3979
/* must revert all RQS debitations */
3980
for (qep = lGetElemHostFirst(a->queue_list, QU_qhostname, eh_name, &iter); qep;
3981
qep = lGetElemHostNext(a->queue_list, QU_qhostname, eh_name, &iter)) {
3982
slots_qend = lGetUlong(qep, QU_tag_qend);
3983
if (slots_qend != 0) {
3984
if (lGetUlong(a->job, JB_ar)==0 && !a->is_advance_reservation)
3985
parallel_revert_rqs_slot_debitation(a, eh_name, lGetString(qep, QU_qname),
3986
0, slots_qend, &rule_name, &rue_name, &limit_name);
3987
lSetUlong(qep, QU_tag_qend, 0);
3992
if (got_master_queue)
3993
suited_as_master_host_qend = true;
3994
accu_host_slots_qend += rqs_hslots;
3998
/* tag full amount or zero */
3999
have_master_host |= suited_as_master_host;
4000
have_master_host_qend |= suited_as_master_host_qend;
4003
/* mark host as not suitable */
4004
/* at least when accu_host_slots and accu_host_slots_qend < a->slots */
4005
/* and only, if modify category is set */
4006
if (skip_host_list && use_category->mod_category ) {
4007
if (hslots < minslots && hslots_qend < minslots) {
4008
lAddElemStr(&skip_host_list, CTI_name, eh_name, CTI_Type);
4012
/* exit is possible when we (a) got the full slot amount or
4013
(b) got full slot_qend amount and know full slot amount is not achievable anymore */
4014
if ((accu_host_slots >= a->slots && (!need_master_host || have_master_host)) &&
4015
(!a->care_reservation || a->is_reservation ||
4016
(accu_host_slots_qend >= a->slots && (!need_master_host || have_master_host_qend)))) {
4017
DPRINTF(("WE ARE THROUGH!\n"));
4022
/* no need to collect further when slots_qend are complete */
4023
if (a->care_reservation && accu_host_slots_qend >= a->slots && (!need_master_host || have_master_host_qend))
4024
a->care_reservation = false;
4026
} /* for each host */
4028
} while (allocation_rule == ALLOC_RULE_ROUNDROBIN && !done &&
4029
(last_accu_host_slots != accu_host_slots || last_accu_host_slots_qend != accu_host_slots_qend));
4031
lFreeList(&unclear_cqueue_list);
4033
if (accu_host_slots >= a->slots &&
4034
(!need_master_host || have_master_host)) {
4035
/* stop looking for smaller slot amounts */
4036
DPRINTF(("--------------> BINGO %d slots %s at specified time <--------------\n",
4037
a->slots, need_master_host?"plus master host":""));
4038
best_result = DISPATCH_OK;
4039
} else if (accu_host_slots_qend >= a->slots && (!need_master_host || have_master_host)) {
4040
DPRINTF(("--------------> %d slots %s later <--------------\n",
4041
a->slots, need_master_host?"plus master host":""));
4042
best_result = DISPATCH_NOT_AT_TIME;
4044
DPRINTF(("--------------> NEVER ONLY %d but %d needed <--------------\n",
4045
accu_host_slots, a->slots));
4046
best_result = DISPATCH_NEVER_CAT;
4049
*available_slots = accu_host_slots;
4051
if (rmon_condition(xaybzc, INFOPRINT)) {
4052
switch (best_result) {
4054
DPRINTF(("COMPREHSENSIVE ASSIGNMENT(%d) returns "sge_u32"\n",
4055
a->slots, a->start));
4057
case DISPATCH_NOT_AT_TIME:
4058
DPRINTF(("COMPREHSENSIVE ASSIGNMENT(%d) returns <later>\n",
4061
case DISPATCH_NEVER_CAT:
4062
DPRINTF(("COMPREHSENSIVE ASSIGNMENT(%d) returns <category_never>\n",
4065
case DISPATCH_NEVER_JOB:
4066
DPRINTF(("COMPREHSENSIVE ASSIGNMENT(%d) returns <job_never>\n",
4070
DPRINTF(("!!!!!!!! COMPREHSENSIVE ASSIGNMENT(%d) returns unexpected %d\n",
4077
sge_dstring_free(&rule_name);
4078
sge_dstring_free(&rue_name);
4079
sge_dstring_free(&limit_name);
4081
if (use_category->use_category) {
4082
lList *temp = schedd_mes_get_tmp_list();
4084
lSetList(use_category->cache, CCT_job_messages, lCopyList(NULL, temp));
4088
if (best_result != DISPATCH_OK) {
4089
lFreeList(&(a->gdil));
4092
DRETURN(best_result);
4096
/****** sge_select_queue/parallel_host_slots() ******************************
4098
* parallel_host_slots() -- Return host slots available at time period
4102
* The maximum amount available at the host for the specified time period
4109
*******************************************************************************/
4111
parallel_host_slots(sge_assignment_t *a, int *slots, int *slots_qend,
4112
lListElem *hep, bool allow_non_requestable) {
4114
int hslots = 0, hslots_qend = 0;
4115
const char *eh_name;
4116
dispatch_t result = DISPATCH_NEVER_CAT;
4117
lList *hard_requests = lGetList(a->job, JB_hard_resource_list);
4118
lList *load_list = lGetList(hep, EH_load_list);
4119
lList *config_attr = lGetList(hep, EH_consumable_config_list);
4120
lList *actual_attr = lGetList(hep, EH_resource_utilization);
4121
double lc_factor = 0;
4123
DENTER(TOP_LAYER, "parallel_host_slots");
4125
eh_name = lGetHost(hep, EH_name);
4127
clear_resource_tags(hard_requests, HOST_TAG);
4132
if (!(qref_list_eh_rejected(lGetList(a->job, JB_hard_queue_list), eh_name, a->hgrp_list) &&
4133
qref_list_eh_rejected(lGetList(a->job, JB_master_hard_queue_list), eh_name, a->hgrp_list)) &&
4134
sge_host_match_static(a->job, a->ja_task, hep, a->centry_list, a->acl_list) == DISPATCH_OK) {
4136
/* cause load be raised artificially to reflect load correction when
4137
checking job requests */
4138
if (lGetPosViaElem(hep, EH_load_correction_factor, SGE_NO_ABORT) >= 0) {
4139
u_long32 ulc_factor;
4140
if ((ulc_factor=lGetUlong(hep, EH_load_correction_factor))) {
4141
lc_factor = ((double)ulc_factor)/100;
4148
result = parallel_rc_slots_by_time(a, hard_requests, &hslots, &hslots_qend,
4149
config_attr, actual_attr, load_list, false, NULL,
4150
DOMINANT_LAYER_HOST, lc_factor, HOST_TAG, false, eh_name, false);
4153
const void *iterator = NULL;
4155
int t_max = parallel_max_host_slots(a, hep);
4157
DPRINTF(("\tparallel_host_slots(%s) threshold load adjustment reduces slots"
4158
" from %d to %d\n", eh_name, hslots, t_max));
4162
for (gdil = lGetElemHostFirst(a->gdil, JG_qhostname, eh_name, &iterator);
4164
gdil = lGetElemHostNext(a->gdil, JG_qhostname, eh_name, &iterator)) {
4165
hslots -= lGetUlong(gdil, JG_slots);
4171
*slots_qend = hslots_qend;
4173
if (result == DISPATCH_OK) {
4174
DPRINTF(("\tparallel_host_slots(%s) returns %d/%d\n", eh_name, hslots, hslots_qend));
4177
DPRINTF(("\tparallel_host_slots(%s) returns <error>\n", eh_name));
4183
/****** sge_select_queue/parallel_tag_hosts_queues() **********************************
4185
* parallel_tag_hosts_queues() -- Determine host slots and tag queue(s) accordingly
4190
* For a particular job the maximum number of slots that could be served
4191
* at that host is determined in accordance with the allocation rule and
4192
* returned. The time of the assignment can be either DISPATCH_TIME_NOW
4193
* or a specific time, but never DISPATCH_TIME_QUEUE_END.
4195
* In those cases when the allocation rule allows more than one slot be
4196
* served per host it is necessary to also consider per queue possibly
4197
* specified load thresholds. This is because load is global/per host
4198
* concept while load thresholds are a queue attribute.
4200
* In those cases when the allocation rule gives us neither a fixed amount
4201
* of slots required nor an upper limit for the number per host slots (i.e.
4202
* $fill_up and $round_robin) we must iterate through all slot numbers from
4203
* 1 to the maximum number of slots "total_slots" and check with each slot
4204
* amount whether we can get it or not. Iteration stops when we can't get
4205
* more slots the host based on the queue limitations and load thresholds.
4207
* As long as only one single queue at the host is eligible for the job the
4208
* it is sufficient to check with each iteration whether the corresponding
4209
* number of slots can be served by the host and it's queue or not. The
4210
* really sick case however is when multiple queues are eligable for a host:
4211
* Here we have to determine in each iteration step also the maximum number
4212
* of slots each queue could get us by doing a per queue iteration from the
4213
* 1 up to the maximum number of slots we're testing. The optimization in
4214
* effect here is to check always only if we could get more slots than with
4215
* the former per host slot amount iteration.
4218
* sge_assignment_t *a -
4219
* lListElem *hep - current host
4220
* lListElem *global - global host
4221
* int *slots - out: # free slots
4222
* int *slots_qend - out: # free slots in the far far future
4223
* int global_soft_violations - # of global soft violations
4224
* bool *master_host - out: if true, found a master host
4225
* category_use_t *use_category - int/out : how to use the job category
4228
* static dispatch_t - 0 ok got an assignment
4229
* 1 no assignment at the specified time
4230
* -1 assignment will never be possible for all jobs of that category
4231
* -2 assignment will never be possible for that particular job
4234
* MT-NOTE: parallel_tag_hosts_queues() is not MT safe
4235
*******************************************************************************/
4237
parallel_tag_hosts_queues(sge_assignment_t *a, lListElem *hep, int *slots, int *slots_qend,
4238
bool *master_host, category_use_t *use_category,
4239
lList **unclear_cqueue_list)
4241
bool suited_as_master_host = false;
4242
int min_host_slots = 1;
4243
int max_host_slots = a->slots;
4244
int accu_queue_slots, accu_queue_slots_qend;
4245
int qslots, qslots_qend;
4247
int hslots_qend = 0;
4248
const char *cqname, *qname, *eh_name = lGetHost(hep, EH_name);
4249
lListElem *qep, *next_queue;
4250
dispatch_t result = DISPATCH_OK;
4251
const void *queue_iterator = NULL;
4252
int allocation_rule;
4254
DENTER(TOP_LAYER, "parallel_tag_hosts_queues");
4256
allocation_rule = sge_pe_slots_per_host(a->pe, a->slots);
4258
if (ALLOC_RULE_IS_BALANCED(allocation_rule)) {
4259
min_host_slots = max_host_slots = allocation_rule;
4262
if (lGetUlong(a->job, JB_ar) == 0) {
4263
parallel_host_slots(a, &hslots, &hslots_qend, hep, false);
4265
lList *config_attr = lGetList(hep, EH_consumable_config_list);
4266
lList *actual_attr = lGetList(hep, EH_resource_utilization);
4267
lList *load_attr = lGetList(hep, EH_load_list);
4268
lList *hard_resource_list = lGetList(a->job, JB_hard_resource_list);
4270
dstring reason = DSTRING_INIT;
4272
clear_resource_tags(hard_resource_list, HOST_TAG);
4274
for_each(rep, hard_resource_list) {
4275
const char *attrname = lGetString(rep, CE_name);
4276
lListElem *cplx_el = lGetElemStr(a->centry_list, CE_name, attrname);
4278
if (lGetBool(cplx_el, CE_consumable)) {
4281
sge_dstring_clear(&reason);
4282
cplx_el = get_attribute(attrname, config_attr, actual_attr, load_attr, a->centry_list, NULL,
4283
DOMINANT_LAYER_HOST, 0, &reason, false, DISPATCH_TIME_NOW, 0);
4284
if (cplx_el != NULL) {
4285
if (match_static_resource(1, rep, cplx_el, &reason, false, false, false) == DISPATCH_OK) {
4286
lSetUlong(rep, CE_tagged, HOST_TAG);
4288
lFreeElem(&cplx_el);
4291
sge_dstring_free(&reason);
4295
const void *iterator = NULL;
4296
lListElem *ar = ar_list_locate(a->ar_list, lGetUlong(a->job, JB_ar));
4297
lList *granted_list = lGetList(ar, AR_granted_slots);
4299
for (gdil_ep = lGetElemHostFirst(granted_list, JG_qhostname, eh_name, &iterator);
4301
gdil_ep = lGetElemHostNext(granted_list, JG_qhostname, eh_name, &iterator)) {
4302
hslots += lGetUlong(gdil_ep, JG_slots);
4304
hslots_qend = hslots;
4309
DPRINTF(("HOST %s itself (and queue threshold) will get us %d slots (%d later) ... "
4310
"we need %d\n", eh_name, hslots, hslots_qend, min_host_slots));
4312
hslots = MIN(hslots, max_host_slots);
4313
hslots_qend = MIN(hslots_qend, max_host_slots);
4316
if (hslots >= min_host_slots || hslots_qend >= min_host_slots) {
4317
lList *skip_queue_list = NULL;
4318
if (use_category->use_category) {
4319
skip_queue_list = lGetList(use_category->cache, CCT_ignore_queues);
4322
accu_queue_slots = accu_queue_slots_qend = 0;
4324
for (next_queue = lGetElemHostFirst(a->queue_list, QU_qhostname, eh_name, &queue_iterator);
4326
next_queue = lGetElemHostNext(a->queue_list, QU_qhostname, eh_name, &queue_iterator)) {
4328
qname = lGetString(qep, QU_full_name);
4329
cqname = lGetString(qep, QU_qname);
4331
/* try to foreclose the cluster queue */
4332
if (lGetElemStr(a->skip_cqueue_list, CTI_name, cqname)) {
4333
DPRINTF(("(1) skip cluster queue %s\n", cqname));
4337
if (!lGetElemStr(*unclear_cqueue_list, CTI_name, cqname)) {
4340
a->pi->par_cqstat++;
4341
if (cqueue_match_static(cqname, a) != DISPATCH_OK) {
4342
lAddElemStr(&(a->skip_cqueue_list), CTI_name, cqname, CTI_Type);
4343
DPRINTF(("add cluster queue %s to skip list\n", cqname));
4346
DPRINTF(("cqueue %s is not rejected\n", cqname));
4347
lAddElemStr(unclear_cqueue_list, CTI_name, cqname, CTI_Type);
4350
if (skip_queue_list && lGetElemStr(skip_queue_list, CTI_name, qname)){
4351
DPRINTF(("(2) skip cluster queue %s\n", cqname));
4355
DPRINTF(("checking queue %s because cqueue %s is not rejected\n", qname, cqname));
4356
result = parallel_queue_slots(a, qep, &qslots, &qslots_qend, false);
4358
if (result == DISPATCH_OK && (qslots > 0 || qslots_qend > 0)) {
4360
/* could this host be a master host */
4361
if (!suited_as_master_host && lGetUlong(qep, QU_tagged4schedule)) {
4362
DPRINTF(("HOST %s can be master host because of queue %s\n", eh_name, qname));
4363
suited_as_master_host = true;
4366
DPRINTF(("QUEUE %s TIME: %d + %d -> %d QEND: %d + %d -> %d (%d soft violations)\n", qname,
4367
accu_queue_slots, qslots, accu_queue_slots+ qslots,
4368
accu_queue_slots_qend, qslots_qend, accu_queue_slots_qend + qslots_qend,
4369
(int)lGetUlong(qep, QU_soft_violation)));
4371
accu_queue_slots += qslots;
4372
accu_queue_slots_qend += qslots_qend;
4373
lSetUlong(qep, QU_tag, qslots);
4374
lSetUlong(qep, QU_tag_qend, qslots_qend);
4376
if (skip_queue_list) {
4377
lAddElemStr(&skip_queue_list, CTI_name, qname, CTI_Type);
4379
DPRINTF(("HOST(1.5) %s will get us nothing\n", eh_name));
4382
} /* for each queue of the host */
4384
hslots = MIN(accu_queue_slots, hslots);
4385
hslots_qend = MIN(accu_queue_slots_qend, hslots_qend);
4387
DPRINTF(("HOST %s and it's queues will get us %d slots (%d later) ... we need %d\n",
4388
eh_name, hslots, hslots_qend, min_host_slots));
4392
*slots_qend = hslots_qend;
4393
*master_host = suited_as_master_host;
4395
DRETURN(DISPATCH_OK);
4399
* Determine maximum number of host_slots as limited
4400
* by queue load thresholds. The maximum only considers
4401
* thresholds with load adjustments
4403
* for each queue Q at this host {
4404
* for each threshold T of a queue {
4405
* if compare operator > or >=
4406
* avail(Q, T) = <threshold - load / adjustment) + 1
4408
* avail(Q, T) = (threshold - load / -adjustMent) + 1
4410
* avail(Q) = MIN(all avail(Q, T))
4412
* host_slot_max_by_T = MAX(all min(Q))
4415
parallel_max_host_slots(sge_assignment_t *a, lListElem *host) {
4416
int avail_h = 0, avail_q;
4418
lListElem *next_queue, *qep, *centry = NULL;
4419
lListElem *lv, *lc, *tr, *fv, *cep;
4420
const char *eh_name = lGetHost(host, EH_name);
4421
const void *queue_iterator = NULL;
4422
const char *load_value, *limit_value, *adj_value;
4424
bool is_np_adjustment = false;
4425
lList *requests = lGetList(a->job, JB_hard_resource_list);
4427
DENTER(TOP_LAYER, "parallel_max_host_slots");
4429
for (next_queue = lGetElemHostFirst(a->queue_list, QU_qhostname, eh_name, &queue_iterator);
4431
next_queue = lGetElemHostNext(a->queue_list, QU_qhostname, eh_name, &queue_iterator)) {
4432
lList *load_thresholds = lGetList(qep, QU_load_thresholds);
4435
for_each(tr, load_thresholds) {
4436
double load, threshold, adjustment;
4437
const char *name = lGetString(tr, CE_name);
4438
if ((cep = centry_list_locate(a->centry_list, name)) == NULL) {
4442
if (!lGetBool(cep, CE_consumable)) { /* work on the load values */
4443
if ((lc = lGetElemStr(a->load_adjustments, CE_name, name)) == NULL) {
4446
if ((lv=lGetSubStr(host, HL_name, name, EH_load_list)) != NULL) {
4447
load_value = lGetString(lv, HL_value);
4449
else if ((lv = lGetSubStr(a->gep, HL_name, name, EH_load_list)) != NULL) {
4450
load_value = lGetString(lv, HL_value);
4453
fv = lGetSubStr(qep, CE_name, name, QU_consumable_config_list);
4454
load_value = lGetString(fv, CE_stringval);
4456
adj_value = lGetString(lc, CE_stringval);
4457
is_np_adjustment = true;
4458
adj_value = lGetString(lc, CE_stringval);
4461
else { /* work on a consumable */
4463
if ((lc = lGetElemStr(requests, CE_name, name)) != NULL) {
4464
adj_value = lGetString(lc, CE_stringval);
4466
else { /* is default value */
4467
adj_value = lGetString(cep, CE_default);
4470
if ((centry = get_attribute_by_name(a->gep, host, qep, name, a->centry_list, a->start, a->duration)) == NULL) {
4471
/* no load value, no assigned consumable to queue, host, or global */
4472
DPRINTF(("the consumable "SFN" used in queue "SFN" as load threshold has no instance at queue, host or global level\n",
4473
name, lGetString(qep, QU_full_name)));
4477
load_value = lGetString(centry, CE_pj_stringval);
4480
limit_value = lGetString(tr, CE_stringval);
4481
type = lGetUlong(cep, CE_valtype);
4483
/* get the needed values. If the load value is not a number, ignore it */
4491
if (!parse_ulong_val(&load, NULL, type, load_value, NULL, 0) ||
4492
!parse_ulong_val(&threshold, NULL, type, limit_value, NULL, 0) ||
4493
!parse_ulong_val(&adjustment, NULL, type, adj_value, NULL, 0)) {
4504
/* the string in load_value is not needed anymore, we can free the element */
4507
/* a adjustment of NULL is ignored here. We can dispatch unlimited jobs based
4508
on the load value */
4509
if (adjustment == 0) {
4513
switch (lGetUlong(cep, CE_relop)) {
4515
case CMPLXNE_OP : continue; /* we cannot compute a usefull range */
4517
case CMPLXLE_OP : adjustment *= -1;
4521
if (is_np_adjustment) {
4522
load_np_value_adjustment(name, host, &adjustment);
4525
/* load alarm is defined in a way, that a host can go with one
4526
used slot into alarm and not that the dispatching prevents
4527
the alarm state. For this behavior we have to add 1 to the
4528
available slots. However, this is not true for consumables
4530
avail = (threshold - load)/adjustment;
4531
if (!lGetBool(cep, CE_consumable)) {
4534
avail_q = MIN(avail, avail_q);
4537
avail_h = MAX(avail_h, avail_q);
4542
/****** sge_select_queue/sge_sequential_assignment() ***************************
4544
* sge_sequential_assignment() -- Make an assignment for a sequential job.
4547
* int sge_sequential_assignment(sge_assignment_t *assignment)
4550
* For sequential job assignments all the earliest job start time
4551
* is determined with each queue instance and the earliest one gets
4552
* chosen. Secondary criterion for queue selection minimizing jobs
4555
* The overall behaviour of this function is somewhat dependent on the
4556
* value that gets passed to assignment->start and whether soft requests
4557
* were specified with the job:
4559
* (1) In case of now assignemnts (DISPATCH_TIME_NOW) only the first queue
4560
* suitable for jobs without soft requests is tagged. When soft requests
4561
* are specified all queues must be verified and tagged in order to find
4562
* the queue that fits best. On success the start time is set
4564
* (2) In case of queue end assignments (DISPATCH_TIME_QUEUE_END)
4568
* sge_assignment_t *assignment - ???
4571
* int - 0 ok got an assignment + time (DISPATCH_TIME_NOW and DISPATCH_TIME_QUEUE_END)
4572
* 1 no assignment at the specified time
4573
* -1 assignment will never be possible for all jobs of that category
4574
* -2 assignment will never be possible for that particular job
4577
* MT-NOTE: sge_sequential_assignment() is not MT safe
4578
*******************************************************************************/
4579
dispatch_t sge_sequential_assignment(sge_assignment_t *a)
4582
int old_logging = 0;
4584
DENTER(TOP_LAYER, "sge_sequential_assignment");
4587
DRETURN(DISPATCH_NEVER_CAT);
4590
if (a->is_reservation && !a->is_advance_reservation){
4591
/* turn off messages for reservation scheduling */
4592
old_logging = schedd_mes_get_logging();
4593
schedd_mes_set_logging(0);
4597
/* untag all queues */
4598
qinstance_list_set_tag(a->queue_list, 0);
4600
sequential_update_host_order(a->host_list, a->queue_list);
4602
if (sconf_get_qs_state() != QS_STATE_EMPTY) {
4603
/*------------------------------------------------------------------
4604
* There is no need to sort the queues after each dispatch in
4607
* 1. The last dispatch was also a sequential job without
4608
* soft requests. If not then the queues are sorted by
4609
* other criterions (soft violations, # of tagged slots,
4611
* 2. The hosts sort order has not changed since last dispatch.
4612
* Load correction or consumables in the load formula can
4613
* change the order of the hosts. We detect changings in the
4614
* host order by comparing the host sequence number with the
4615
* sequence number from previous run.
4616
* ------------------------------------------------------------------*/
4617
if (sconf_get_last_dispatch_type() != DISPATCH_TYPE_FAST || sconf_get_host_order_changed()) {
4618
DPRINTF(("SORTING HOSTS!\n"));
4619
if (sconf_get_queue_sort_method() == QSM_LOAD)
4620
lPSortList(a->queue_list, "%I+ %I+", QU_host_seq_no, QU_seq_no);
4622
lPSortList(a->queue_list, "%I+ %I+", QU_seq_no, QU_host_seq_no);
4625
sconf_set_last_dispatch_type(DISPATCH_TYPE_FAST_SOFT_REQ);
4628
sconf_set_last_dispatch_type(DISPATCH_TYPE_FAST);
4632
result = sequential_tag_queues_suitable4job(a);
4634
if (result == DISPATCH_OK) {
4636
u_long32 job_start_time = U_LONG32_MAX;
4637
u_long32 min_soft_violations = U_LONG32_MAX;
4638
lListElem *best_queue = NULL;
4640
if (a->is_reservation) {
4641
for_each (qep, a->queue_list) {
4642
if (lGetUlong(qep, QU_tag) != 0) {
4643
u_long32 temp_job_start_time = lGetUlong(qep, QU_available_at);
4645
DPRINTF((" Q: %s "sge_u32" "sge_u32" (jst: "sge_u32")\n", lGetString(qep, QU_full_name),
4646
lGetUlong(qep, QU_tag), temp_job_start_time, job_start_time));
4648
/* earlier start time has higher preference than lower soft violations */
4649
if ((job_start_time > temp_job_start_time) ||
4651
min_soft_violations > lGetUlong(qep, QU_soft_violation) &&
4652
job_start_time == temp_job_start_time)
4656
job_start_time = temp_job_start_time;
4657
min_soft_violations = lGetUlong(qep, QU_soft_violation);
4662
DPRINTF(("earliest queue \"%s\" at "sge_u32"\n", lGetString(best_queue, QU_full_name), job_start_time));
4664
DPRINTF(("no earliest queue found!\n"));
4668
if (sconf_get_queue_sort_method() == QSM_LOAD)
4669
lPSortList(a->queue_list, "%I+ %I+ %I+", QU_soft_violation, QU_host_seq_no, QU_seq_no);
4671
lPSortList(a->queue_list, "%I+ %I+ %I+", QU_soft_violation, QU_seq_no, QU_host_seq_no);
4674
for_each (qep, a->queue_list) {
4675
if (lGetUlong(qep, QU_tag)) {
4676
job_start_time = lGetUlong(qep, QU_available_at);
4684
DRETURN(DISPATCH_NEVER_CAT); /* should never happen */
4689
const char *qname = lGetString(best_queue, QU_full_name);
4690
const char *eh_name = lGetHost(best_queue, QU_qhostname);
4692
DPRINTF((sge_u32": 1 slot in queue %s user %s %s for "sge_u32"\n",
4693
a->job_id, qname, a->user? a->user:"<unknown>",
4694
!a->is_reservation?"scheduled":"reserved", job_start_time));
4696
gdil_ep = lAddElemStr(&gdil, JG_qname, qname, JG_Type);
4697
lSetUlong(gdil_ep, JG_qversion, lGetUlong(best_queue, QU_version));
4698
lSetHost(gdil_ep, JG_qhostname, eh_name);
4699
lSetUlong(gdil_ep, JG_slots, 1);
4701
if (!a->is_reservation) {
4702
sconf_inc_fast_jobs();
4705
lFreeList(&(a->gdil));
4707
if (a->start == DISPATCH_TIME_QUEUE_END) {
4708
a->start = job_start_time;
4711
result = DISPATCH_OK;
4717
DPRINTF(("SEQUENTIAL ASSIGNMENT("sge_u32"."sge_u32") returns <time> "sge_u32"\n",
4718
a->job_id, a->ja_task_id, a->start));
4720
case DISPATCH_NOT_AT_TIME:
4721
DPRINTF(("SEQUENTIAL ASSIGNMENT("sge_u32"."sge_u32") returns <later>\n",
4722
a->job_id, a->ja_task_id));
4724
case DISPATCH_NEVER_CAT:
4725
DPRINTF(("SEQUENTIAL ASSIGNMENT("sge_u32"."sge_u32") returns <category_never>\n",
4726
a->job_id, a->ja_task_id));
4728
case DISPATCH_NEVER_JOB:
4729
DPRINTF(("SEQUENTIAL ASSIGNMENT("sge_u32"."sge_u32") returns <job_never>\n",
4730
a->job_id, a->ja_task_id));
4732
case DISPATCH_MISSING_ATTR:
4734
DPRINTF(("!!!!!!!! SEQUENTIAL ASSIGNMENT("sge_u32"."sge_u32") returns unexpected %d\n",
4735
a->job_id, a->ja_task_id, result));
4739
if (a->is_reservation && !a->is_advance_reservation) {
4740
schedd_mes_set_logging(old_logging);
4747
/*------------------------------------------------------------------
4748
* FAST TRACK FOR SEQUENTIAL JOBS WITHOUT A SOFT REQUEST
4750
* It is much faster not to review slots in a comprehensive fashion
4751
* for jobs of this type.
4752
* ------------------------------------------------------------------*/
4753
static int sequential_update_host_order(lList *host_list, lList *queues)
4755
lListElem *hep, *qep;
4756
double previous_load = 0;
4757
int previous_load_inited = 0;
4759
const char *eh_name;
4760
const void *iterator = NULL;
4761
bool host_order_changed = false;
4763
DENTER(TOP_LAYER, "sequential_update_host_order");
4765
if (!sconf_get_host_order_changed()) {
4769
sconf_set_host_order_changed(host_order_changed);
4771
for_each (hep, host_list) { /* in share/load order */
4773
/* figure out host_seqno
4774
in case the load of two hosts is equal this
4775
must be also reflected by the sequence number */
4776
if (!previous_load_inited) {
4778
previous_load = lGetDouble(hep, EH_sort_value);
4779
previous_load_inited = 1;
4781
if (previous_load < lGetDouble(hep, EH_sort_value)) {
4783
previous_load = lGetDouble(hep, EH_sort_value);
4787
/* set host_seqno for all queues of this host */
4788
eh_name = lGetHost(hep, EH_name);
4790
qep = lGetElemHostFirst(queues, QU_qhostname, eh_name, &iterator);
4791
while (qep != NULL) {
4792
lSetUlong(qep, QU_host_seq_no, host_seqno);
4793
qep = lGetElemHostNext(queues, QU_qhostname, eh_name, &iterator);
4796
/* detect whether host_seqno has changed since last dispatch operation */
4797
if (host_seqno != lGetUlong(hep, EH_seq_no)) {
4798
DPRINTF(("HOST SORT ORDER CHANGED FOR HOST %s FROM %d to %d\n",
4799
eh_name, lGetUlong(hep, EH_seq_no), host_seqno));
4800
host_order_changed = true;
4801
lSetUlong(hep, EH_seq_no, host_seqno);
4805
sconf_set_host_order_changed(host_order_changed);
4810
/****** sge_select_queue/parallel_assignment() *****************************
4812
* parallel_assignment() -- Can we assign with a fixed PE/slot/time
4815
* int parallel_assignment(sge_assignment_t *assignment)
4818
* Returns if possible an assignment for a particular PE with a
4819
* fixed slot at a fixed time.
4822
* sge_assignment_t *a -
4823
* category_use_t *use_category - has information on how to use the job category
4826
* dispatch_t - 0 ok got an assignment
4827
* 1 no assignment at the specified time
4828
* -1 assignment will never be possible for all jobs of that category
4829
* -2 assignment will never be possible for that particular job
4832
* MT-NOTE: parallel_assignment() is not MT safe
4833
*******************************************************************************/
4835
parallel_assignment(sge_assignment_t *a, category_use_t *use_category, int *available_slots)
4838
int pslots = a->slots;
4839
int pslots_qend = 0;
4842
const char *qsort_args;
4845
DENTER(TOP_LAYER, "parallel_assignment");
4848
DRETURN(DISPATCH_NEVER_CAT);
4851
if ((lGetUlong(a->job, JB_ar) == 0) && (ret = parallel_available_slots(a, &pslots, &pslots_qend)) != DISPATCH_OK) {
4852
*available_slots = MIN(pslots, pslots_qend);
4855
if (a->slots > pslots) {
4856
*available_slots = MIN(pslots, pslots_qend);
4857
DRETURN((a->slots > pslots_qend)? DISPATCH_NEVER_CAT : DISPATCH_NOT_AT_TIME);
4861
/* depends on a correct host order */
4862
ret = parallel_tag_queues_suitable4job(a, use_category, available_slots);
4864
if (ret != DISPATCH_OK) {
4869
/* must be understood in the context of changing queue sort orders */
4870
sconf_set_last_dispatch_type(DISPATCH_TYPE_COMPREHENSIVE);
4873
/* if dynamic qsort function was supplied, call it */
4874
if ((qsort_args=lGetString(a->pe, PE_qsort_args)) != NULL) {
4876
ret = sge_call_pe_qsort(a, qsort_args);
4888
/****** sched/select_queue/parallel_queue_slots() *************************
4890
* parallel_queue_slots() --
4893
* int - 0 ok got an assignment + set time for DISPATCH_TIME_NOW and
4894
* DISPATCH_TIME_QUEUE_END (only with fixed_slot equals true)
4895
* 1 no assignment at the specified time
4896
* -1 assignment will never be possible for all jobs of that category
4897
******************************************************************************/
4899
parallel_queue_slots(sge_assignment_t *a, lListElem *qep, int *slots, int *slots_qend,
4900
bool allow_non_requestable)
4902
lList *hard_requests = lGetList(a->job, JB_hard_resource_list);
4903
lList *config_attr = lGetList(qep, QU_consumable_config_list);
4904
lList *actual_attr = lGetList(qep, QU_resource_utilization);
4905
const char *qname = lGetString(qep, QU_full_name);
4906
int qslots = 0, qslots_qend = 0;
4907
int lslots = INT_MAX, lslots_qend = INT_MAX;
4909
dispatch_t result = DISPATCH_NEVER_CAT;
4911
DENTER(TOP_LAYER, "parallel_queue_slots");
4916
if (sge_queue_match_static(qep, a->job, a->pe, a->ckpt, a->centry_list,
4917
a->acl_list, a->hgrp_list, a->ar_list) == DISPATCH_OK) {
4920
u_long32 ar_id = lGetUlong(a->job, JB_ar);
4923
lListElem *ar_queue;
4925
lList *ar_queue_config_attr;
4926
lList *ar_queue_actual_attr;
4927
lListElem *ar_ep = lGetElemUlong(a->ar_list, AR_id, ar_id);
4928
lList *hard_resource_list = lGetList(a->job, JB_hard_resource_list);
4929
dstring reason = DSTRING_INIT;
4931
clear_resource_tags(hard_resource_list, QUEUE_TAG);
4933
ar_queue_config_attr = lGetList(qep, QU_consumable_config_list);
4934
ar_queue_actual_attr = lGetList(qep, QU_resource_utilization);
4936
for_each(rep, hard_resource_list) {
4937
const char *attrname = lGetString(rep, CE_name);
4938
lListElem *cplx_el = lGetElemStr(a->centry_list, CE_name, attrname);
4940
if (lGetBool(cplx_el, CE_consumable)) {
4943
sge_dstring_clear(&reason);
4944
cplx_el = get_attribute(attrname, ar_queue_config_attr, ar_queue_actual_attr, NULL, a->centry_list, NULL,
4945
DOMINANT_LAYER_QUEUE, 0, &reason, false, DISPATCH_TIME_NOW, 0);
4946
if (cplx_el != NULL) {
4947
if (match_static_resource(1, rep, cplx_el, &reason, false, false, false) == DISPATCH_OK) {
4948
lSetUlong(rep, CE_tagged, PE_TAG);
4950
lFreeElem(&cplx_el);
4953
sge_dstring_free(&reason);
4955
ar_queue = lGetSubStr(ar_ep, QU_full_name, qname, AR_reserved_queues);
4956
ar_queue_config_attr = lGetList(ar_queue, QU_consumable_config_list);
4957
ar_queue_actual_attr = lGetList(ar_queue, QU_resource_utilization);
4959
DPRINTF(("verifing AR queue\n"));
4960
result = parallel_rc_slots_by_time(a, hard_requests, &qslots, &qslots_qend,
4961
ar_queue_config_attr, ar_queue_actual_attr, NULL, true, ar_queue,
4962
DOMINANT_LAYER_QUEUE, 0, QUEUE_TAG, false, lGetString(ar_queue, QU_full_name), false);
4964
if (a->is_advance_reservation
4965
|| (((a->pi)?a->pi->par_rqs++:0), result = parallel_rqs_slots_by_time(a, &lslots, &lslots_qend,
4966
lGetHost(qep, QU_qhostname), lGetString(qep, QU_qname))) == DISPATCH_OK) {
4967
DPRINTF(("verifing normal queue\n"));
4972
result = parallel_rc_slots_by_time(a, hard_requests, &qslots, &qslots_qend,
4973
config_attr, actual_attr, NULL, true, qep,
4974
DOMINANT_LAYER_QUEUE, 0, QUEUE_TAG, false, lGetString(qep, QU_full_name), false);
4978
if ((gdil=lGetElemStr(a->gdil, JG_qname, lGetString(qep, QU_full_name))))
4979
qslots -= lGetUlong(gdil, JG_slots);
4983
*slots = MIN(qslots, lslots);
4984
*slots_qend = MIN(qslots_qend, lslots_qend);
4986
if (result == DISPATCH_OK) {
4987
DPRINTF(("\tparallel_queue_slots(%s) returns %d/%d\n", qname, qslots, qslots_qend));
4989
DPRINTF(("\tparallel_queue_slots(%s) returns <error>\n", qname));
4995
/****** sched/select_queue/sequential_queue_time() *************************
4997
* sequential_queue_time() --
5000
* dispatch_t - 0 ok got an assignment + set time for DISPATCH_TIME_NOW and
5001
* DISPATCH_TIME_QUEUE_END (only with fixed_slot equals true)
5002
* 1 no assignment at the specified time
5003
* -1 assignment will never be possible for all jobs of that category
5004
******************************************************************************/
5006
sequential_queue_time(u_long32 *start, const sge_assignment_t *a,
5007
int *violations, lListElem *qep)
5010
char reason_buf[1024];
5012
u_long32 tmp_time = *start;
5013
lList *hard_requests = lGetList(a->job, JB_hard_resource_list);
5014
lList *config_attr = lGetList(qep, QU_consumable_config_list);
5015
lList *actual_attr = lGetList(qep, QU_resource_utilization);
5016
const char *qname = lGetString(qep, QU_full_name);
5018
DENTER(TOP_LAYER, "sequential_queue_time");
5020
sge_dstring_init(&reason, reason_buf, sizeof(reason_buf));
5022
/* match the resources */
5023
result = rc_time_by_slots(a, hard_requests, NULL, config_attr, actual_attr,
5024
qep, false, &reason, 1, DOMINANT_LAYER_QUEUE,
5025
0, QUEUE_TAG, &tmp_time, qname);
5027
if (result == DISPATCH_OK) {
5028
if (violations != NULL) {
5029
*violations = compute_soft_violations(a, qep, *violations, NULL, config_attr, actual_attr,
5030
DOMINANT_LAYER_QUEUE, 0, QUEUE_TAG);
5033
char buff[1024 + 1];
5034
centry_list_append_to_string(hard_requests, buff, sizeof(buff) - 1);
5035
if (*buff && (buff[strlen(buff) - 1] == '\n')) {
5036
buff[strlen(buff) - 1] = 0;
5038
schedd_mes_add(a->job_id, SCHEDD_INFO_CANNOTRUNINQUEUE_SSS, buff, qname, reason_buf);
5041
if (a->is_reservation && result == DISPATCH_OK) {
5043
DPRINTF(("queue_time_by_slots(%s) returns earliest start time "sge_u32"\n", qname, *start));
5044
} else if (result == DISPATCH_OK) {
5045
DPRINTF(("queue_time_by_slots(%s) returns <at specified time>\n", qname));
5047
DPRINTF(("queue_time_by_slots(%s) returns <later>\n", qname));
5056
/****** sge_select_queue/host_time_by_slots() ******************************
5058
* host_time_by_slots() -- Return time when host slots are available
5061
* int host_time_by_slots(int slots, u_long32 *start, u_long32 duration,
5062
* int *host_soft_violations, lListElem *job, lListElem *ja_task, lListElem
5063
* *hep, lList *centry_list, lList *acl_list)
5066
* The time when the specified slot amount is available at the host
5067
* is determined. Behaviour depends on input/output parameter start
5070
* 0 an assignment is possible now
5071
* 1 no assignment now but later
5072
* -1 assignment never possible for all jobs of the same category
5073
* -2 assignment never possible for that particular job
5076
* 0 an assignment is possible at the specified time
5077
* 1 no assignment at specified time but later
5078
* -1 assignment never possible for all jobs of the same category
5079
* -2 assignment never possible for that particular job
5081
* DISPATCH_TIME_QUEUE_END
5082
* 0 an assignment is possible and the start time is returned
5083
* -1 assignment never possible for all jobs of the same category
5084
* -2 assignment never possible for that particular job
5088
* u_long32 *start - ???
5089
* u_long32 duration - ???
5090
* int *host_soft_violations - ???
5091
* lListElem *job - ???
5092
* lListElem *ja_task - ???
5093
* lListElem *hep - ???
5094
* lList *centry_list - ???
5095
* lList *acl_list - ???
5098
*******************************************************************************/
5100
sequential_host_time(u_long32 *start, const sge_assignment_t *a,
5101
int *violations, lListElem *hep)
5103
lList *hard_requests = lGetList(a->job, JB_hard_resource_list);
5104
lList *load_attr = lGetList(hep, EH_load_list);
5105
lList *config_attr = lGetList(hep, EH_consumable_config_list);
5106
lList *actual_attr = lGetList(hep, EH_resource_utilization);
5107
double lc_factor = 0;
5108
u_long32 ulc_factor;
5110
u_long32 tmp_time = *start;
5111
const char *eh_name = lGetHost(hep, EH_name);
5112
dstring reason; char reason_buf[1024];
5114
DENTER(TOP_LAYER, "sequential_host_time");
5116
sge_dstring_init(&reason, reason_buf, sizeof(reason_buf));
5118
clear_resource_tags(hard_requests, HOST_TAG);
5120
/* cause load be raised artificially to reflect load correction when
5121
checking job requests */
5122
if (lGetPosViaElem(hep, EH_load_correction_factor, SGE_NO_ABORT) >= 0) {
5123
if ((ulc_factor=lGetUlong(hep, EH_load_correction_factor)))
5124
lc_factor = ((double)ulc_factor)/100;
5127
result = rc_time_by_slots(a, hard_requests, load_attr,
5128
config_attr, actual_attr, NULL, false,
5129
&reason, 1, DOMINANT_LAYER_HOST,
5130
lc_factor, HOST_TAG, &tmp_time, eh_name);
5132
if (result == DISPATCH_OK || result == DISPATCH_MISSING_ATTR) {
5133
if (violations != NULL) {
5134
*violations = compute_soft_violations(a, NULL, *violations, load_attr, config_attr,
5135
actual_attr, DOMINANT_LAYER_HOST, 0, HOST_TAG);
5138
char buff[1024 + 1];
5139
centry_list_append_to_string(hard_requests, buff, sizeof(buff) - 1);
5140
if (*buff && (buff[strlen(buff) - 1] == '\n'))
5141
buff[strlen(buff) - 1] = 0;
5142
schedd_mes_add(a->job_id, SCHEDD_INFO_CANNOTRUNATHOST_SSS, buff, eh_name, reason_buf);
5145
if (a->is_reservation && (result == DISPATCH_OK || result == DISPATCH_MISSING_ATTR)) {
5147
DPRINTF(("host_time_by_slots(%s) returns earliest start time "sge_u32"\n", eh_name, *start));
5148
} else if (result == DISPATCH_OK || result == DISPATCH_MISSING_ATTR) {
5149
DPRINTF(("host_time_by_slots(%s) returns <at specified time>\n", eh_name));
5151
DPRINTF(("host_time_by_slots(%s) returns <later>\n", eh_name));
5157
/****** sched/select_queue/sequential_global_time() ***************************
5159
* sequential_global_time() --
5162
* int - 0 ok got an assignment + set time for DISPATCH_TIME_QUEUE_END
5163
* 1 no assignment at the specified time
5164
* -1 assignment will never be possible for all jobs of that category
5165
******************************************************************************/
5167
sequential_global_time(u_long32 *start, const sge_assignment_t *a, int *violations)
5169
dstring reason; char reason_buf[1024];
5170
dispatch_t result = DISPATCH_NEVER_CAT;
5171
u_long32 tmp_time = *start;
5172
lList *hard_request = lGetList(a->job, JB_hard_resource_list);
5173
lList *load_attr = lGetList(a->gep, EH_load_list);
5174
lList *config_attr = lGetList(a->gep, EH_consumable_config_list);
5175
lList *actual_attr = lGetList(a->gep, EH_resource_utilization);
5176
double lc_factor=0.0;
5177
u_long32 ulc_factor;
5179
DENTER(TOP_LAYER, "sequential_global_time");
5181
sge_dstring_init(&reason, reason_buf, sizeof(reason_buf));
5183
clear_resource_tags(hard_request, GLOBAL_TAG);
5185
/* check if job has access to any hosts globally */
5186
if ((result=sge_host_match_static(a->job, NULL, a->gep, a->centry_list,
5187
a->acl_list)) != 0) {
5191
/* cause global load be raised artificially to reflect load correction when
5192
checking job requests */
5193
if (lGetPosViaElem(a->gep, EH_load_correction_factor, SGE_NO_ABORT) >= 0) {
5194
if ((ulc_factor=lGetUlong(a->gep, EH_load_correction_factor)))
5195
lc_factor = ((double)ulc_factor)/100;
5198
result = rc_time_by_slots(a, hard_request, load_attr, config_attr, actual_attr, NULL, false, &reason,
5199
1, DOMINANT_LAYER_GLOBAL, lc_factor, GLOBAL_TAG, &tmp_time, SGE_GLOBAL_NAME);
5201
if ((result == DISPATCH_OK) || (result == DISPATCH_MISSING_ATTR)) {
5202
if (violations != NULL) {
5203
*violations = compute_soft_violations(a, NULL, *violations, load_attr, config_attr,
5204
actual_attr, DOMINANT_LAYER_GLOBAL, 0, GLOBAL_TAG);
5207
char buff[1024 + 1];
5208
centry_list_append_to_string(hard_request, buff, sizeof(buff) - 1);
5209
if (*buff && (buff[strlen(buff) - 1] == '\n')) {
5210
buff[strlen(buff) - 1] = 0;
5212
schedd_mes_add (a->job_id, SCHEDD_INFO_CANNOTRUNGLOBALLY_SS, buff, reason_buf);
5215
if (a->is_reservation && (result == DISPATCH_OK || result == DISPATCH_MISSING_ATTR)) {
5217
DPRINTF(("global_time_by_slots() returns earliest start time "sge_u32"\n", *start));
5219
else if (result == DISPATCH_OK || result == DISPATCH_MISSING_ATTR) {
5220
DPRINTF(("global_time_by_slots() returns <at specified time>\n"));
5223
DPRINTF(("global_time_by_slots() returns <later>\n"));
5229
/****** sched/select_queue/parallel_global_slots() ***************************
5231
* parallel_global_slots() --
5234
* dispatch_t - 0 ok got an assignment + set time for DISPATCH_TIME_QUEUE_END
5235
* 1 no assignment at the specified time
5236
* -1 assignment will never be possible for all jobs of that category
5237
******************************************************************************/
5239
parallel_global_slots(const sge_assignment_t *a, int *slots, int *slots_qend)
5241
dispatch_t result = DISPATCH_NEVER_CAT;
5242
lList *hard_request = lGetList(a->job, JB_hard_resource_list);
5243
lList *load_attr = lGetList(a->gep, EH_load_list);
5244
lList *config_attr = lGetList(a->gep, EH_consumable_config_list);
5245
lList *actual_attr = lGetList(a->gep, EH_resource_utilization);
5246
double lc_factor=0.0;
5247
int gslots = 0, gslots_qend = 0;
5249
DENTER(TOP_LAYER, "parallel_global_slots");
5251
clear_resource_tags(hard_request, GLOBAL_TAG);
5253
/* check if job has access to any hosts globally */
5254
if (sge_host_match_static(a->job, NULL, a->gep, a->centry_list, a->acl_list) == DISPATCH_OK) {
5255
/* cause global load be raised artificially to reflect load correction when
5256
checking job requests */
5258
if (lGetPosViaElem(a->gep, EH_load_correction_factor, SGE_NO_ABORT) >= 0) {
5259
u_long32 ulc_factor;
5260
if ((ulc_factor=lGetUlong(a->gep, EH_load_correction_factor))) {
5261
lc_factor = ((double)ulc_factor)/100;
5265
result = parallel_rc_slots_by_time(a, hard_request, &gslots,
5266
&gslots_qend, config_attr, actual_attr, load_attr,
5267
false, NULL, DOMINANT_LAYER_GLOBAL, lc_factor, GLOBAL_TAG, false, SGE_GLOBAL_NAME, false);
5271
*slots_qend = gslots_qend;
5273
if (result == DISPATCH_OK) {
5274
DPRINTF(("\tparallel_global_slots() returns %d/%d\n", gslots, gslots_qend));
5277
DPRINTF(("\tparallel_global_slots() returns <error>\n"));
5283
/****** sge_select_queue/parallel_available_slots() **********************************
5285
* parallel_available_slots() -- Check if number of PE slots is available
5294
* dispatch_t - 0 ok got an assignment
5295
* 1 no assignment at the specified time
5296
* -1 assignment will never be possible for all jobs of that category
5299
* MT-NOTE: parallel_available_slots() is not MT safe
5300
*******************************************************************************/
5302
parallel_available_slots(const sge_assignment_t *a, int *slots, int *slots_qend)
5305
char reason_buf[1024];
5307
int total = lGetUlong(a->pe, PE_slots);
5310
static lListElem *implicit_slots_request = NULL;
5311
static lList *implicit_total_list = NULL;
5312
lListElem *tep = NULL;
5314
dstring slots_as_str;
5316
DENTER(TOP_LAYER, "parallel_available_slots");
5318
sge_dstring_init(&reason, reason_buf, sizeof(reason_buf));
5320
if ((result=pe_match_static(a->job, a->pe, a->acl_list)) != DISPATCH_OK) {
5324
if (!implicit_slots_request) {
5325
implicit_slots_request = lCreateElem(CE_Type);
5326
lSetString(implicit_slots_request, CE_name, SGE_ATTR_SLOTS);
5327
lSetString(implicit_slots_request, CE_stringval, "1");
5328
lSetDouble(implicit_slots_request, CE_doubleval, 1);
5331
/* PE slots should be stored in a PE_consumable_config_list ... */
5332
if (!implicit_total_list) {
5333
tep = lAddElemStr(&implicit_total_list, CE_name, SGE_ATTR_SLOTS, CE_Type);
5336
if (!tep && !(tep = lGetElemStr(implicit_total_list, CE_name, SGE_ATTR_SLOTS))) {
5337
DRETURN(DISPATCH_NEVER_CAT);
5340
total = lGetUlong(a->pe, PE_slots);
5341
lSetDouble(tep, CE_doubleval, total);
5342
sge_dstring_init(&slots_as_str, strbuf, sizeof(strbuf));
5343
sge_dstring_sprintf(&slots_as_str, "%d", total);
5344
lSetString(tep, CE_stringval, strbuf);
5346
if (ri_slots_by_time(a, &pslots, &pslots_qend,
5347
lGetList(a->pe, PE_resource_utilization), implicit_slots_request,
5348
NULL, implicit_total_list, NULL, 0, 0, &reason, true, true, a->pe_name)) {
5349
DRETURN(DISPATCH_NEVER_CAT);
5357
*slots_qend = pslots_qend;
5360
DPRINTF(("\tparallel_available_slots(%s) returns %d/%d\n", a->pe_name,
5361
pslots, pslots_qend));
5363
DRETURN(DISPATCH_OK);
5367
/* ----------------------------------------
5369
sge_get_double_qattr()
5371
writes actual value of the queue attriute into *uvalp
5374
0 ok, value in *uvalp is valid
5375
-1 the queue has no such attribute
5376
-2 type error: cant compute uval from actual string value
5380
sge_get_double_qattr(double *dvalp, char *attrname, lListElem *q,
5381
const lList *exechost_list, const lList *centry_list,
5382
bool *has_value_from_object)
5389
lListElem *global = NULL;
5390
lListElem *host = NULL;
5392
DENTER(TOP_LAYER, "sge_get_double_qattr");
5394
global = host_list_locate(exechost_list, SGE_GLOBAL_NAME);
5395
host = host_list_locate(exechost_list, lGetHost(q, QU_qhostname));
5398
*has_value_from_object = false;
5399
if (( ep = get_attribute_by_name(global, host, q, attrname, centry_list, DISPATCH_TIME_NOW, 0)) &&
5400
((type=lGetUlong(ep, CE_valtype)) != TYPE_STR) &&
5401
(type != TYPE_CSTR) && (type != TYPE_RESTR) && (type != TYPE_HOST) ) {
5403
if ((lGetUlong(ep, CE_pj_dominant)&DOMINANT_TYPE_MASK)!=DOMINANT_TYPE_VALUE ) {
5404
parse_ulong_val(&tmp_dval, NULL, type, lGetString(ep, CE_pj_stringval), NULL, 0);
5405
monitor_dominance(dom_str, lGetUlong(ep, CE_pj_dominant));
5406
*has_value_from_object = true;
5408
parse_ulong_val(&tmp_dval, NULL, type, lGetString(ep, CE_stringval), NULL, 0);
5409
monitor_dominance(dom_str, lGetUlong(ep, CE_dominant));
5410
*has_value_from_object = ((lGetUlong(ep, CE_dominant) & DOMINANT_TYPE_MASK) == DOMINANT_TYPE_VALUE) ? false : true;
5415
DPRINTF(("resource %s: %f\n", dom_str, tmp_dval));
5425
/* ----------------------------------------
5427
sge_get_string_qattr()
5429
writes string value into dst
5432
-1 if the queue has no such attribute
5435
int sge_get_string_qattr(
5440
const lList *exechost_list,
5441
const lList *centry_list
5444
lListElem *global = NULL;
5445
lListElem *host = NULL;
5448
DENTER(TOP_LAYER, "sge_get_string_qattr");
5450
global = host_list_locate(exechost_list, SGE_GLOBAL_NAME);
5451
host = host_list_locate(exechost_list, lGetHost(q, QU_qhostname));
5453
ep = get_attribute_by_name(global, host, q, attrname, centry_list, DISPATCH_TIME_NOW, 0);
5455
/* first copy ... */
5457
sge_strlcpy(dst, lGetString(ep, CE_stringval), dst_len);
5465
/* ... and then free */
5470
/****** sge_select_queue/ri_time_by_slots() ******************************************
5472
* ri_time_by_slots() -- Determine availability time through slot number
5475
* int ri_time_by_slots(lListElem *rep, lList *load_attr, lList
5476
* *config_attr, lList *actual_attr, lList *centry_list, lListElem *queue,
5477
* char *reason, int reason_size, bool allow_non_requestable, int slots,
5478
* u_long32 layer, double lc_factor)
5481
* Checks for one level, if one request is fulfilled or not.
5483
* With reservation scheduling the earliest start time due to
5484
* availability of the resource instance is determined by ensuring
5485
* non-consumable resource requests are fulfilled or by finding the
5486
* earliest time utilization of a consumable resource is below the
5487
* threshold required for the request.
5490
* sge_assignment_t *a - assignment object that holds job specific scheduling relevant data
5491
* lListElem *rep - requested attribute
5492
* lList *load_attr - list of load attributes or null on queue level
5493
* lList *config_attr - list of user defined attributes (CE_Type)
5494
* lList *actual_attr - usage of user consumables (RUE_Type)
5495
*i lListElem *queue - the current queue, or null on host level
5496
* dstring *reason - target for error message
5497
* bool allow_non_requestable - allow none requestable attributes?
5498
* int slots - the number of slotes the job is looking for?
5499
* u_long32 layer - the current layer
5500
* double lc_factor - load correction factor
5501
* u_long32 *start_time - in/out argument for start time
5502
* const char *object_name - name of the object used for monitoring purposes
5507
*******************************************************************************/
5509
ri_time_by_slots(const sge_assignment_t *a, lListElem *rep, lList *load_attr, lList *config_attr, lList *actual_attr,
5510
lListElem *queue, dstring *reason, bool allow_non_requestable,
5511
int slots, u_long32 layer, double lc_factor, u_long32 *start_time, const char *object_name)
5513
lListElem *cplx_el=NULL;
5514
const char *attrname;
5515
dispatch_t ret = DISPATCH_OK;
5516
lListElem *actual_el;
5517
u_long32 ready_time;
5518
double util, total, request = 0;
5519
lListElem *capacitiy_el;
5520
bool schedule_based = (a->is_advance_reservation || a->is_schedule_based) ? true : false;
5521
u_long32 now = sconf_get_now();
5524
DENTER(TOP_LAYER, "ri_time_by_slots");
5526
attrname = lGetString(rep, CE_name);
5527
actual_el = lGetElemStr(actual_attr, RUE_name, attrname);
5528
ready_time = *start_time;
5531
* Consumables are treated futher below in schedule based mode
5532
* thus we always assume zero consumable utilization here
5535
if (!(cplx_el = get_attribute(attrname, config_attr, actual_attr, load_attr, a->centry_list, queue, layer,
5536
lc_factor, reason, schedule_based, DISPATCH_TIME_NOW, 0))) {
5537
DRETURN(DISPATCH_MISSING_ATTR);
5540
ret = match_static_resource(slots, rep, cplx_el, reason, false, false, allow_non_requestable);
5541
if (actual_el != NULL) {
5542
utilized = lGetNumberOfElem(lGetList(actual_el, RUE_utilized));
5545
if (ret != DISPATCH_OK || (!schedule_based && utilized == 0)) {
5546
lFreeElem(&cplx_el);
5550
DPRINTF(("ri_time_by_slots(%s) consumable = %s\n",
5551
attrname, (lGetBool(cplx_el, CE_consumable)?"true":"false")));
5553
if (!lGetBool(cplx_el, CE_consumable)) {
5554
if (ready_time == DISPATCH_TIME_QUEUE_END) {
5557
DPRINTF(("%s: ri_time_by_slots(%s) <is no consumable>\n", object_name, attrname));
5558
lFreeElem(&cplx_el);
5559
DRETURN(DISPATCH_OK); /* already checked */
5562
/* we're done if there is no consumable capacity */
5563
if (!(capacitiy_el = lGetElemStr(config_attr, CE_name, attrname))) {
5564
DPRINTF(("%s: ri_time_by_slots(%s) <does not exist>\n", object_name, attrname));
5565
lFreeElem(&cplx_el);
5566
DRETURN(DISPATCH_MISSING_ATTR); /* does not exist */
5569
/* determine 'total' and 'request' values */
5570
total = lGetDouble(capacitiy_el, CE_doubleval);
5572
if (!parse_ulong_val(&request, NULL, lGetUlong(cplx_el, CE_valtype),
5573
lGetString(rep, CE_stringval), NULL, 0)) {
5574
sge_dstring_append(reason, "wrong type");
5575
lFreeElem(&cplx_el);
5576
DRETURN(DISPATCH_NEVER_CAT);
5578
lFreeElem(&cplx_el);
5580
if (ready_time == DISPATCH_TIME_QUEUE_END) {
5581
double threshold = total - request * slots;
5583
/* verify there are enough resources in principle */
5584
if (threshold < 0) {
5585
ret = DISPATCH_NEVER_CAT;
5587
/* seek for the time near queue end where resources are sufficient */
5588
u_long32 when = utilization_below(actual_el, threshold, object_name);
5590
/* may happen only if scheduler code is run outside scheduler with
5591
DISPATCH_TIME_QUEUE_END time spec */
5599
DPRINTF(("\t\t%s: time_by_slots: %d of %s=%f can be served %s\n",
5600
object_name, slots, attrname, request,
5601
ret == DISPATCH_OK ? "at time" : "never"));
5607
/* here we handle DISPATCH_TIME_NOW + any other time */
5608
if (*start_time == DISPATCH_TIME_NOW) {
5612
ready_time = *start_time;
5615
util = utilization_max(actual_el, ready_time, a->duration);
5617
DPRINTF(("\t\t%s: time_by_slots: %s total = %f util = %f from "sge_U32CFormat" plus "sge_U32CFormat" seconds\n",
5618
object_name, attrname, total, util, ready_time, a->duration));
5620
/* ensure resource is sufficient from now until finish */
5621
if (request * slots > total - util) {
5623
dstring availability; char availability_text[2048];
5625
sge_dstring_init(&availability, availability_text, sizeof(availability_text));
5627
/* we can't assign right now - maybe later ? */
5628
if (request * slots > total ) {
5629
DPRINTF(("\t\t%s: time_by_slots: %s %f > %f (never)\n", object_name, attrname,
5630
request * slots, total));
5631
ret = DISPATCH_NEVER_CAT; /* surely not */
5633
else if (request * slots > total - utilization_queue_end(actual_el)) {
5634
DPRINTF(("\t\t%s: time_by_slots: %s %f <= %f (but booked out!!)\n", object_name, attrname,
5635
request * slots, total));
5636
ret = DISPATCH_NEVER_CAT; /* booked out until infinity */
5639
DPRINTF(("\t\t%s: time_by_slots: %s %f > %f (later)\n", object_name, attrname,
5640
request * slots, total - util));
5641
ret = DISPATCH_NOT_AT_TIME;
5644
monitor_dominance(dom_str, DOMINANT_TYPE_CONSUMABLE | layer);
5645
sge_dstring_sprintf(&availability, "%s:%s=%f", dom_str, attrname, total - util);
5646
sge_dstring_append(reason, MSG_SCHEDD_ITOFFERSONLY);
5647
sge_dstring_append(reason, availability_text);
5649
if ((a->duration != DISPATCH_TIME_NOW) &&
5650
(request * slots <= total - utilization_max(actual_el, ready_time, DISPATCH_TIME_NOW))) {
5651
sge_dstring_append(reason, MSG_SCHEDD_DUETORR);
5658
DPRINTF(("\t\t%s: time_by_slots: %d of %s=%f can be served %s\n",
5659
object_name, slots, attrname, request,
5660
ret == DISPATCH_OK ? "at time" : ((ret == DISPATCH_NOT_AT_TIME)? "later":"never")));
5665
/****** sge_select_queue/ri_slots_by_time() ************************************
5667
* ri_slots_by_time() -- Determine number of slots avail. within time frame
5670
* static dispatch_t ri_slots_by_time(const sge_assignment_t *a, int *slots,
5671
* int *slots_qend, lList *rue_list, lListElem *request, lList *load_attr,
5672
* lList *total_list, lListElem *queue, u_long32 layer, double lc_factor,
5673
* dstring *reason, bool allow_non_requestable, bool no_centry, const char
5677
* The number of slots available with a resource can be zero for static
5678
* resources or is determined based on maximum utilization within the
5679
* specific time frame, the total amount of the resource and the per
5680
* task request of the parallel job (ri_slots_by_time())
5683
* const sge_assignment_t *a - ???
5684
* int *slots - Returns maximum slots that can be served
5685
* within the specified time frame.
5686
* int *slots_qend - Returns the maximum possible number of slots
5687
* lList *rue_list - Resource utilization (RUE_Type)
5688
* lListElem *request - Job request (CE_Type)
5689
* lList *load_attr - Load information for the resource
5690
* lList *total_list - Total resource amount (CE_Type)
5691
* lListElem *queue - Queue instance (QU_Type) for queue-based resources
5692
* u_long32 layer - DOMINANT_LAYER_{GLOBAL|HOST|QUEUE}
5693
* double lc_factor - load correction factor
5694
* dstring *reason - diagnosis information if no rsrc available
5695
* bool allow_non_requestable - ???
5696
* bool no_centry - ???
5697
* const char *object_name - ???
5700
* static dispatch_t -
5703
* MT-NOTE: ri_slots_by_time() is not MT safe
5704
*******************************************************************************/
5706
ri_slots_by_time(const sge_assignment_t *a, int *slots, int *slots_qend,
5707
lList *rue_list, lListElem *request, lList *load_attr, lList *total_list,
5708
lListElem *queue, u_long32 layer, double lc_factor, dstring *reason,
5709
bool allow_non_requestable, bool no_centry, const char *object_name)
5712
lListElem *cplx_el, *uep, *tep = NULL;
5713
u_long32 start = a->start;
5714
bool schedule_based = (a->is_advance_reservation || a->is_schedule_based) ? true : false;
5718
double used, total, request_val;
5720
DENTER(TOP_LAYER, "ri_slots_by_time");
5722
/* always assume zero consumable utilization in schedule based mode */
5724
name = lGetString(request, CE_name);
5725
uep = lGetElemStr(rue_list, RUE_name, name);
5727
DPRINTF(("\t\t%s: ri_slots_by_time(%s)\n", object_name, name));
5730
if (!(cplx_el = get_attribute(name, total_list, rue_list, load_attr, a->centry_list, queue, layer,
5731
lc_factor, reason, schedule_based, DISPATCH_TIME_NOW, 0))) {
5732
DRETURN(DISPATCH_MISSING_ATTR); /* does not exist */
5735
ret = match_static_resource(1, request, cplx_el, reason, false, false, allow_non_requestable);
5736
if (ret != DISPATCH_OK) {
5737
lFreeElem(&cplx_el);
5741
if (ret == DISPATCH_OK && !lGetBool(cplx_el, CE_consumable)) {
5742
lFreeElem(&cplx_el);
5744
*slots_qend = INT_MAX;
5745
DRETURN(DISPATCH_OK); /* no limitations */
5748
/* we're done if there is no consumable capacity */
5749
if (!(tep=lGetElemStr(total_list, CE_name, name))) {
5750
lFreeElem(&cplx_el);
5752
*slots_qend = INT_MAX;
5753
DRETURN(DISPATCH_OK);
5756
lFreeElem(&cplx_el);
5759
if (!tep && !(tep=lGetElemStr(total_list, CE_name, name))) {
5760
DRETURN(DISPATCH_NEVER_CAT);
5762
total = lGetDouble(tep, CE_doubleval);
5765
utilized = lGetNumberOfElem(lGetList(uep, RUE_utilized));
5768
if (!a->is_advance_reservation && sconf_get_qs_state() == QS_STATE_EMPTY) {
5769
DPRINTF(("QS_STATE is empty, skipping extensive checks!\n"));
5771
} else if ((schedule_based || utilized != 0)) {
5772
if (!a->is_reservation) {
5773
start = sconf_get_now();
5775
used = utilization_max(uep, start, a->duration);
5776
DPRINTF(("\t\t%s: ri_slots_by_time: utilization_max("sge_u32", "sge_u32") returns %f\n",
5777
object_name, start, a->duration, used));
5779
used = lGetDouble(lGetElemStr(rue_list, RUE_name, name), RUE_utilized_now);
5782
request_val = lGetDouble(request, CE_doubleval);
5784
if ((request_val != 0) && (total < DBL_MAX)) {
5785
*slots = (int)((total - used) / request_val);
5787
*slots_qend = (int)((total - utilization_queue_end(uep)) / request_val);
5789
*slots_qend = (int)(total / request_val);
5792
*slots_qend = INT_MAX;
5795
if (*slots == 0 || *slots_qend == 0) {
5797
dstring availability; char availability_text[1024];
5799
sge_dstring_init(&availability, availability_text, sizeof(availability_text));
5801
monitor_dominance(dom_str, DOMINANT_TYPE_CONSUMABLE | layer);
5802
sge_dstring_sprintf(&availability, "%s:%s=%f", dom_str, lGetString(request, CE_name), (total - used));
5803
sge_dstring_append(reason, MSG_SCHEDD_ITOFFERSONLY);
5804
sge_dstring_append(reason, availability_text);
5806
if ((a->duration != DISPATCH_TIME_NOW) &&
5807
(*slots != 0 && *slots_qend == 0)) {
5808
sge_dstring_append(reason, MSG_SCHEDD_DUETORR);
5812
DPRINTF(("\t\t%s: ri_slots_by_time: %s=%f has %d (%d) slots at time "sge_U32CFormat"%s (avail: %f total: %f)\n",
5813
object_name, lGetString(uep, RUE_name), request_val, *slots, *slots_qend, start,
5814
!a->is_reservation?" (= now)":"", total - used, total));
5816
DRETURN(DISPATCH_OK);
5820
/* Determine maximum number of host_slots as limited
5821
by job request to this host
5823
for each resource at this host requested by the job {
5824
avail(R) = (total - used) / request
5826
host_slot_max_by_R = MIN(all avail(R))
5828
host_slot = MIN(host_slot_max_by_T, host_slot_max_by_R)
5832
parallel_rc_slots_by_time(const sge_assignment_t *a, lList *requests, int *slots, int *slots_qend,
5833
lList *total_list, lList *rue_list, lList *load_attr, bool force_slots,
5834
lListElem *queue, u_long32 layer, double lc_factor, u_long32 tag,
5835
bool allow_non_requestable, const char *object_name, bool isRQ)
5838
char reason_buf[1024];
5841
int max_slots = INT_MAX, max_slots_qend = INT_MAX;
5843
static lListElem *implicit_slots_request = NULL;
5844
lListElem *tep, *cep, *actual, *req;
5847
DENTER(TOP_LAYER, "parallel_rc_slots_by_time");
5849
sge_dstring_init(&reason, reason_buf, sizeof(reason_buf));
5851
clear_resource_tags(requests, QUEUE_TAG);
5853
if (!implicit_slots_request) {
5854
implicit_slots_request = lCreateElem(CE_Type);
5855
lSetString(implicit_slots_request, CE_name, SGE_ATTR_SLOTS);
5856
lSetString(implicit_slots_request, CE_stringval, "1");
5857
lSetDouble(implicit_slots_request, CE_doubleval, 1);
5860
/* --- implicit slot request */
5861
name = SGE_ATTR_SLOTS;
5862
if (!(tep = lGetElemStr(total_list, CE_name, name)) && force_slots) {
5863
DRETURN(DISPATCH_OK); /* ??? Is this correct? Should be DISPATCH_NEVER_CAT */
5866
if (ri_slots_by_time(a, &avail, &avail_qend,
5867
rue_list, implicit_slots_request, load_attr, total_list, queue, layer, lc_factor,
5868
&reason, allow_non_requestable, false, object_name)) {
5869
/* If the request is made from the resource quota module, this error message
5870
* is not informative and should not be displayed */
5872
schedd_mes_add(a->job_id, SCHEDD_INFO_CANNOTRUNINQUEUE_SSS, "slots=1", object_name, reason_buf);
5874
DRETURN(DISPATCH_NEVER_CAT);
5876
max_slots = MIN(max_slots, avail);
5877
max_slots_qend = MIN(max_slots_qend, avail_qend);
5878
DPRINTF(("%s: parallel_rc_slots_by_time(%s) %d (%d later)\n", object_name, name,
5879
(int)max_slots, (int)max_slots_qend));
5883
/* --- default request */
5884
for_each (actual, rue_list) {
5885
name = lGetString(actual, RUE_name);
5886
if (!strcmp(name, SGE_ATTR_SLOTS)) {
5889
cep = centry_list_locate(a->centry_list, name);
5891
if (!is_requested(requests, name)) {
5893
const char *def_req = lGetString(cep, CE_default);
5895
parse_ulong_val(&request, NULL, lGetUlong(cep, CE_valtype), def_req, NULL, 0);
5899
lSetString(cep, CE_stringval, def_req);
5900
lSetDouble(cep, CE_doubleval, request);
5902
if (ri_slots_by_time(a, &avail, &avail_qend,
5903
rue_list, cep, load_attr, total_list, queue, layer, lc_factor,
5904
&reason, allow_non_requestable, false, object_name)==-1) {
5905
char buff[1024 + 1];
5906
centry_list_append_to_string(requests, buff, sizeof(buff) - 1);
5907
if (*buff && (buff[strlen(buff) - 1] == '\n')) {
5908
buff[strlen(buff) - 1] = 0;
5910
schedd_mes_add(a->job_id, SCHEDD_INFO_CANNOTRUNINQUEUE_SSS, buff, object_name, reason_buf);
5911
DRETURN(DISPATCH_NEVER_CAT);
5913
max_slots = MIN(max_slots, avail);
5914
max_slots_qend = MIN(max_slots_qend, avail_qend);
5915
DPRINTF(("%s: parallel_rc_slots_by_time(%s) %d (%d later)\n", object_name, name,
5916
(int)max_slots, (int)max_slots_qend));
5922
/* --- explicit requests */
5923
for_each (req, requests) {
5924
name = lGetString(req, CE_name);
5925
result = ri_slots_by_time(a, &avail, &avail_qend,
5926
rue_list, req, load_attr, total_list, queue, layer, lc_factor,
5927
&reason, allow_non_requestable, false, object_name);
5929
if (result == DISPATCH_NEVER_CAT || result == DISPATCH_NEVER_JOB) {
5930
char buff[1024 + 1];
5931
centry_list_append_to_string(requests, buff, sizeof(buff) - 1);
5932
if (*buff && (buff[strlen(buff) - 1] == '\n')) {
5933
buff[strlen(buff) - 1] = 0;
5935
schedd_mes_add(a->job_id, SCHEDD_INFO_CANNOTRUNINQUEUE_SSS, buff, object_name, reason_buf);
5939
case DISPATCH_OK: /* the requested element does not exist */
5940
case DISPATCH_NOT_AT_TIME: /* will match later-on */
5942
DPRINTF(("%s: explicit request for %s gets us %d slots (%d later)\n",
5943
object_name, name, avail, avail_qend));
5944
if (lGetUlong(req, CE_tagged) < tag && tag != RQS_TAG)
5945
lSetUlong(req, CE_tagged, tag);
5947
max_slots = MIN(max_slots, avail);
5948
max_slots_qend = MIN(max_slots_qend, avail_qend);
5949
DPRINTF(("%s: parallel_rc_slots_by_time(%s) %d (%d later)\n", object_name, name,
5950
(int)max_slots, (int)max_slots_qend));
5953
case DISPATCH_NEVER_CAT: /* the requested element does not exist */
5955
DPRINTF(("%s: parallel_rc_slots_by_time(%s) <never>\n", object_name, name));
5956
*slots = *slots_qend = 0;
5957
DRETURN(DISPATCH_NEVER_CAT);
5959
case DISPATCH_NEVER_JOB: /* the requested element does not exist */
5961
DPRINTF(("%s: parallel_rc_slots_by_time(%s) <never>\n", object_name, name));
5962
*slots = *slots_qend = 0;
5963
DRETURN(DISPATCH_NEVER_JOB);
5966
case DISPATCH_MISSING_ATTR: /* the requested element does not exist */
5967
if (tag == QUEUE_TAG && lGetUlong(req, CE_tagged) == NO_TAG) {
5968
DPRINTF(("%s: parallel_rc_slots_by_time(%s) <never found>\n", object_name, name));
5969
*slots = *slots_qend = 0;
5970
DRETURN(DISPATCH_NEVER_CAT);
5972
DPRINTF(("%s: parallel_rc_slots_by_time(%s) no such resource, but already satisfied\n",
5973
object_name, name));
5975
case DISPATCH_NEVER:
5977
DPRINTF(("unexpected return code\n"));
5982
*slots_qend = max_slots_qend;
5984
DRETURN(DISPATCH_OK);
5987
/****** sge_select_queue/sge_create_load_list() ********************************
5989
* sge_create_load_list() -- create the controll structure for consumables as
5993
* void sge_create_load_list(const lList *queue_list, const lList
5994
* *host_list, const lList *centry_list, lList **load_list)
5997
* scanes all queues for consumables as load thresholds. It builds a
5998
* consumable category for each queue which is using consumables as a load
6000
* If no consumables are used, the *load_list is set to NULL.
6003
* const lList *queue_list - a list of queue instances
6004
* const lList *host_list - a list of hosts
6005
* const lList *centry_list - a list of complex entries
6006
* lList **load_list - a ref to the target load list
6009
* MT-NOTE: sge_create_load_list() is MT safe
6012
* sge_create_load_list
6014
* sge_load_list_alarm
6015
* sge_remove_queue_from_load_list
6016
* sge_free_load_list
6018
*******************************************************************************/
6019
void sge_create_load_list(const lList *queue_list, const lList *host_list,
6020
const lList *centry_list, lList **load_list) {
6022
lListElem *load_threshold;
6024
lList * load_threshold_list;
6025
const char *load_threshold_name;
6026
const char *limit_value;
6030
DENTER(TOP_LAYER, "sge_create_load_list");
6032
if (load_list == NULL){
6033
CRITICAL((SGE_EVENT, "no load_list specified\n"));
6038
if (*load_list != NULL){
6039
sge_free_load_list(load_list);
6042
if ((global = host_list_locate(host_list, SGE_GLOBAL_NAME)) == NULL) {
6043
ERROR((SGE_EVENT, "no global host in sge_create_load_list"));
6047
for_each(queue, queue_list) {
6048
load_threshold_list = lGetList(queue, QU_load_thresholds);
6049
for_each(load_threshold, load_threshold_list) {
6050
load_threshold_name = lGetString(load_threshold, CE_name);
6051
limit_value = lGetString(load_threshold, CE_stringval);
6052
if ((centry = centry_list_locate(centry_list, load_threshold_name)) == NULL) {
6053
ERROR((SGE_EVENT, MSG_SCHEDD_WHYEXCEEDNOCOMPLEX_S, load_threshold_name));
6057
if (lGetBool(centry, CE_consumable)) {
6058
lListElem *global_consumable = NULL;
6059
lListElem *host_consumable = NULL;
6060
lListElem *queue_consumable = NULL;
6062
lListElem *load_elem = NULL;
6063
lListElem *queue_ref_elem = NULL;
6064
lList *queue_ref_list = NULL;
6067
if ((host = host_list_locate(host_list, lGetHost(queue, QU_qhostname))) == NULL){
6068
ERROR((SGE_EVENT, MSG_SGETEXT_INVALIDHOSTINQUEUE_SS,
6069
lGetHost(queue, QU_qhostname), lGetString(queue, QU_full_name)));
6073
global_consumable = lGetSubStr(global, RUE_name, load_threshold_name,
6074
EH_resource_utilization);
6075
host_consumable = lGetSubStr(host, RUE_name, load_threshold_name,
6076
EH_resource_utilization);
6077
queue_consumable = lGetSubStr(queue, RUE_name, load_threshold_name,
6078
QU_resource_utilization);
6080
if (*load_list == NULL) {
6081
*load_list = lCreateList("load_ref_list", LDR_Type);
6082
if (*load_list == NULL) {
6087
load_elem = load_locate_elem(*load_list, global_consumable,
6088
host_consumable, queue_consumable,
6091
if (load_elem == NULL) {
6092
load_elem = lCreateElem(LDR_Type);
6093
if (load_elem == NULL) {
6096
lSetPosRef(load_elem, LDR_global_pos, global_consumable);
6097
lSetPosRef(load_elem, LDR_host_pos, host_consumable);
6098
lSetPosRef(load_elem, LDR_queue_pos, queue_consumable);
6099
lSetPosString(load_elem, LDR_limit_pos, limit_value);
6100
lAppendElem(*load_list, load_elem);
6103
queue_ref_list = lGetPosList(load_elem, LDR_queue_ref_list_pos);
6104
if (queue_ref_list == NULL) {
6105
queue_ref_list = lCreateList("", QRL_Type);
6106
if (queue_ref_list == NULL) {
6109
lSetPosList(load_elem, LDR_queue_ref_list_pos, queue_ref_list);
6112
queue_ref_elem = lCreateElem(QRL_Type);
6113
if (queue_ref_elem == NULL) {
6116
lSetRef(queue_ref_elem, QRL_queue, queue);
6117
lAppendElem(queue_ref_list, queue_ref_elem);
6119
/* reset the changed bit in the consumables */
6120
if (global_consumable != NULL){
6121
sge_bitfield_reset(&(global_consumable->changed));
6123
if (host_consumable != NULL){
6124
sge_bitfield_reset(&(host_consumable->changed));
6126
if (queue_consumable != NULL){
6127
sge_bitfield_reset(&(queue_consumable->changed));
6137
DPRINTF(("error in sge_create_load_list!"));
6138
ERROR((SGE_EVENT, MSG_SGETEXT_CONSUMABLE_AS_LOAD));
6139
sge_free_load_list(load_list);
6144
/****** sge_select_queue/load_locate_elem() ************************************
6146
* load_locate_elem() -- locates a consumable category in the given load list
6149
* static lListElem* load_locate_elem(lList *load_list, lListElem
6150
* *global_consumable, lListElem *host_consumable, lListElem
6151
* *queue_consumable)
6154
* lList *load_list - the load list to work on
6155
* lListElem *global_consumable - a ref to the global consumable
6156
* lListElem *host_consumable - a ref to the host consumable
6157
* lListElem *queue_consumable - a ref to the qeue consumable
6160
* static lListElem* - NULL, or the category element from the load list
6163
* MT-NOTE: load_locate_elem() is MT safe
6166
* sge_create_load_list
6168
* sge_load_list_alarm
6169
* sge_remove_queue_from_load_list
6170
* sge_free_load_list
6172
*******************************************************************************/
6173
static lListElem *load_locate_elem(lList *load_list, lListElem *global_consumable,
6174
lListElem *host_consumable, lListElem *queue_consumable,
6175
const char *limit) {
6176
lListElem *load_elem = NULL;
6177
lListElem *load = NULL;
6179
for_each(load, load_list) {
6180
if ((lGetPosRef(load, LDR_global_pos) == global_consumable) &&
6181
(lGetPosRef(load, LDR_host_pos) == host_consumable) &&
6182
(lGetPosRef(load, LDR_queue_pos) == queue_consumable) &&
6183
( strcmp(lGetPosString(load, LDR_limit_pos), limit) == 0)) {
6192
/****** sge_select_queue/sge_load_list_alarm() *********************************
6194
* sge_load_list_alarm() -- checks if queues went into an alarm state
6197
* bool sge_load_list_alarm(lList *load_list, const lList *host_list, const
6198
* lList *centry_list)
6201
* The function uses the cull bitfield to identify modifications in one of
6202
* the consumable elements. If the consumption has changed, the load for all
6203
* queue referencing the consumable is recomputed. If a queue exceeds it
6204
* load threshold, QU_tagged4schedule is set to 1.
6207
* lList *load_list - ???
6208
* const lList *host_list - ???
6209
* const lList *centry_list - ???
6212
* bool - true, if at least one queue was set into alarm state
6215
* MT-NOTE: sge_load_list_alarm() is MT safe
6218
* sge_create_load_list
6220
* sge_load_list_alarm
6221
* sge_remove_queue_from_load_list
6222
* sge_free_load_list
6224
*******************************************************************************/
6225
bool sge_load_list_alarm(lList *load_list, const lList *host_list,
6226
const lList *centry_list) {
6229
lListElem *queue_ref;
6230
lList *queue_ref_list;
6232
bool is_alarm = false;
6234
DENTER(TOP_LAYER, "sge_load_list_alarm");
6236
if (load_list == NULL) {
6240
for_each(load, load_list) {
6241
bool is_recalc=false;
6244
elem = lGetPosRef(load, LDR_global_pos);
6246
if ( sge_bitfield_changed(&(elem->changed))) {
6248
sge_bitfield_reset(&(elem->changed));
6252
elem = lGetPosRef(load, LDR_host_pos);
6254
if ( sge_bitfield_changed(&(elem->changed))) {
6256
sge_bitfield_reset(&(elem->changed));
6260
elem = lGetPosRef(load, LDR_queue_pos);
6262
if ( sge_bitfield_changed(&(elem->changed))) {
6264
sge_bitfield_reset(&(elem->changed));
6269
bool is_category_alarm = false;
6270
queue_ref_list = lGetPosList(load, LDR_queue_ref_list_pos);
6271
for_each(queue_ref, queue_ref_list) {
6272
queue = lGetRef(queue_ref, QRL_queue);
6273
if (is_category_alarm) {
6274
lSetUlong(queue, QU_tagged4schedule, 1);
6276
else if (sge_load_alarm(reason, queue, lGetList(queue, QU_load_thresholds), host_list,
6277
centry_list, NULL, true)) {
6279
DPRINTF(("queue %s tagged to be overloaded: %s\n",
6280
lGetString(queue, QU_full_name), reason));
6281
schedd_mes_add_global(SCHEDD_INFO_QUEUEOVERLOADED_SS,
6282
lGetString(queue, QU_full_name), reason);
6283
lSetUlong(queue, QU_tagged4schedule, 1);
6285
is_category_alarm = true;
6297
/****** sge_select_queue/sge_remove_queue_from_load_list() *********************
6299
* sge_remove_queue_from_load_list() -- removes queues from the load list
6302
* void sge_remove_queue_from_load_list(lList **load_list, const lList
6306
* lList **load_list - load list structure
6307
* const lList *queue_list - queues to be removed from it.
6310
* MT-NOTE: sge_remove_queue_from_load_list() is MT safe
6313
* sge_create_load_list
6315
* sge_load_list_alarm
6316
* sge_remove_queue_from_load_list
6317
* sge_free_load_list
6319
*******************************************************************************/
6320
void sge_remove_queue_from_load_list(lList **load_list, const lList *queue_list){
6321
lListElem* queue = NULL;
6322
lListElem *load = NULL;
6324
DENTER(TOP_LAYER, "sge_remove_queue_from_load_list");
6326
if (load_list == NULL){
6327
CRITICAL((SGE_EVENT, "no load_list specified\n"));
6332
if (*load_list == NULL) {
6336
for_each(queue, queue_list) {
6337
bool is_found = false;
6338
lList *queue_ref_list = NULL;
6339
lListElem *queue_ref = NULL;
6341
for_each(load, *load_list) {
6342
queue_ref_list = lGetPosList(load, LDR_queue_ref_list_pos);
6343
for_each(queue_ref, queue_ref_list) {
6344
if (queue == lGetRef(queue_ref, QRL_queue)) {
6350
lRemoveElem(queue_ref_list, &queue_ref);
6352
if (lGetNumberOfElem(queue_ref_list) == 0) {
6353
lRemoveElem(*load_list, &load);
6359
if (lGetNumberOfElem(*load_list) == 0) {
6360
lFreeList(load_list);
6369
/****** sge_select_queue/sge_free_load_list() **********************************
6371
* sge_free_load_list() -- frees the load list and sets it to NULL
6374
* void sge_free_load_list(lList **load_list)
6377
* lList **load_list - the load list
6380
* MT-NOTE: sge_free_load_list() is MT safe
6383
* sge_create_load_list
6385
* sge_load_list_alarm
6386
* sge_remove_queue_from_load_list
6387
* sge_free_load_list
6389
*******************************************************************************/
6390
void sge_free_load_list(lList **load_list)
6392
DENTER(TOP_LAYER, "sge_free_load_list");
6394
lFreeList(load_list);
6401
typedef struct lib_cache_s {
6407
struct lib_cache_s *next;
6410
/****** sge_dlib() *************************************************************
6412
* sge_dlib() -- lookup, load, and cache function from a dynamic library
6415
* void *sge_dlib(const char *key, const char *lib_name, const char *fn_name,
6416
* lib_cache_t **lib_cache_list)
6419
* const char *key - unique key for identifying function
6420
* const char *lib_name - dynamic library name
6421
* const char *fn_nam - function name
6422
* lib_cache_t **lib_cache_list - cache list (if NULL, we use a global cache)
6425
* void * - the address of the function
6428
* MT-NOTE: sge_free_load_list() is not MT safe
6432
*******************************************************************************/
6434
sge_dlib(const char *key, const char *lib_name, const char *fn_name,
6435
lib_cache_t **lib_cache_list)
6437
static lib_cache_t *static_lib_cache_list = NULL;
6438
lib_cache_t **cache_list = NULL;
6439
lib_cache_t *cache = NULL;
6440
lib_cache_t *prev = NULL;
6441
lib_cache_t *new_cache = NULL;
6443
void *new_lib_handle = NULL;
6444
void *new_fn_handle = NULL;
6445
const char *error = NULL;
6447
DENTER(TOP_LAYER, "sge_dlib");
6449
/* Use user cache list if supplied */
6451
cache_list = lib_cache_list;
6453
cache_list = &static_lib_cache_list;
6456
* Search based on supplied key. If found and the library name and function
6457
* name match, return the function address. If the library or function
6458
* do not match, then we will reload the library.
6460
for (cache=*cache_list; cache; prev=cache, cache=cache->next) {
6461
if (strcmp(key, cache->key)==0) {
6462
if (strcmp(lib_name, cache->lib_name)==0 &&
6463
strcmp(fn_name, cache->fn_name)==0) {
6464
DRETURN(cache->fn_handle);
6472
/* open the library */
6473
new_lib_handle = dlopen(lib_name, RTLD_LAZY);
6474
if (!new_lib_handle) {
6476
ERROR((SGE_EVENT, "Unable to open library %s for %s - %s\n",
6477
lib_name, key, error));
6481
/* search library for the function name */
6482
new_fn_handle = dlsym(new_lib_handle, fn_name);
6483
if (((error = dlerror()) != NULL) || !new_fn_handle) {
6484
dlclose(new_lib_handle);
6485
ERROR((SGE_EVENT, "Unable to locate function %s in library %s for %s - %s\n",
6486
fn_name, lib_name, key, error));
6490
/* If we're replacing the old function, just delete it */
6492
dlclose(cache->lib_handle);
6494
FREE(cache->lib_name);
6495
FREE(cache->fn_name);
6497
*cache_list = cache->next;
6500
prev->next = cache->next;
6505
/* cache the new function address */
6506
if ((new_cache = (lib_cache_t *)malloc(sizeof(lib_cache_t))) == NULL ||
6507
(new_cache->key = strdup(key)) == NULL ||
6508
(new_cache->lib_name = strdup(lib_name)) == NULL ||
6509
(new_cache->fn_name = strdup(fn_name)) == NULL) {
6510
ERROR((SGE_EVENT, "Memory allocation problem in sge_dl\n"));
6513
new_cache->lib_handle = new_lib_handle;
6514
new_cache->fn_handle = new_fn_handle;
6515
new_cache->next = *cache_list;
6516
*cache_list = new_cache;
6518
/* return the cached function address */
6519
DRETURN(new_cache->fn_handle);
6523
strcpy_replace(char *dp, const char *sp, lList *rlist)
6528
const char *es = NULL;
6530
if (rlist == NULL) {
6536
if (*sp == 0) /* done means we make one last pass */
6538
if (name && !isalnum(*sp) && *sp != '_') {
6542
if ((res = lGetElemStr(rlist, CE_name, name)))
6543
s = lGetString(res, CE_stringval);
6544
/* handle ${varname} */
6545
if (curly && *sp == '}')
6547
/* handle ${varname:-value} and ${varname:+value} */
6548
else if (curly && *sp == ':') {
6549
if (sp[1] == '-' || sp[1] == '+') {
6551
for (ep=sp+2; *ep; ep++) {
6553
if ((!*s && sp[1] == '-') ||
6554
(*s && sp[1] == '+')) {
6555
s = sp+2; /* mark substition string */
6556
es = ep; /* mark end of substition string */
6558
sp = ep+1; /* point past varname */
6565
while (*s && s != es)
6571
} else if (*sp == '$') {
6583
/****** sge_select_queue/sge_call_pe_qsort() **********************************
6585
* sge_call_pe_qsort() -- call the Parallel Environment qsort plug-in
6588
* void sge_call_pe_qsort(sge_assignment_t *a, const char *qsort_args)
6591
* sge_assignment_t *a - PE assignment
6592
* qsort_args - the PE qsort_args attribute
6595
* MT-NOTE: sge_call_pe_qsort() is not MT safe
6599
*******************************************************************************/
6601
sge_call_pe_qsort(sge_assignment_t *a, const char *qsort_args)
6604
struct saved_vars_s *cntx = NULL;
6607
pqs_qsort_t pqs_qsort;
6608
const char *pe_name = a->pe_name;
6609
const char *lib_name;
6610
const char *fn_name;
6611
int num_queues = lGetNumberOfElem(a->queue_list);
6612
char qsort_args_buf[4096];
6613
const char *qsort_argv[1024];
6616
DENTER(TOP_LAYER, "sge_call_pe_qsort");
6621
* Copy qsort_args substituting any references to $<resource>
6622
* with the corresponding requested value from the hard resource
6626
strcpy_replace(qsort_args_buf, qsort_args, lGetList(a->job, JB_hard_resource_list));
6628
if ((lib_name = sge_strtok_r(qsort_args_buf, " ", &cntx)) &&
6629
(fn_name = sge_strtok_r(NULL, " ", &cntx)) &&
6630
(pqs_qsort = sge_dlib(pe_name, lib_name, fn_name, NULL))) {
6632
pqs_params_t pqs_params;
6637
* Build queue sort parameters
6639
pqs_params.job_id = a->job_id;
6640
pqs_params.task_id = a->ja_task_id;
6641
pqs_params.slots = a->slots;
6642
pqs_params.pe_name = pe_name;
6643
pqs_params.pe_qsort_args = qsort_args_buf;
6644
pqs_params.pe_qsort_argv = qsort_argv;
6645
pqs_params.num_queues = num_queues;
6646
pqs_params.qlist = (pqs_queue_t *)malloc(num_queues * sizeof(pqs_queue_t));
6648
qp = pqs_params.qlist;
6649
for_each(q, a->queue_list) {
6650
qp->queue_name = lGetString(q, QU_qname);
6651
qp->host_name = lGetHost(q, QU_qhostname);
6652
qp->host_seqno = lGetUlong(q, QU_host_seq_no);
6653
qp->queue_seqno = lGetUlong(q, QU_seq_no);
6654
qp->available_slots = lGetUlong(q, QU_tag);
6655
qp->soft_violations = lGetUlong(q, QU_soft_violation);
6661
* For convenience, convert qsort_args into an argument vector qsort_argv.
6663
qsort_argv[argc++] = lib_name;
6664
qsort_argv[argc++] = fn_name;
6665
while ((tok = sge_strtok_r(NULL, " ", &cntx)) &&
6666
argc<(sizeof(qsort_argv)/sizeof(char *)-1)) {
6667
qsort_argv[argc++] = tok;
6671
* Call the dynamic queue sort function
6674
ret = (*pqs_qsort)(&pqs_params, 0, err_str, sizeof(err_str)-1);
6677
ERROR((SGE_EVENT, err_str));
6681
* Update the queue list with the new sort order
6684
if (ret == PQS_ASSIGNED) {
6685
qp = pqs_params.qlist;
6686
for_each(q, a->queue_list) {
6687
lSetUlong(q, QU_host_seq_no, qp->new_seqno);
6692
FREE(pqs_params.qlist);
6695
ERROR((SGE_EVENT, "Unable to dynamically load PE qsort_args %s\n", qsort_args));
6696
ret = 0; /* schedule anyway with normal queue/host sort */
6700
sge_free_saved_vars(cntx);
6707
/****** sge_select_queue/match_static_advance_reservation() ********************
6709
* match_static_advance_reservation() -- Do matching that depends not on queue
6713
* static dispatch_t match_static_advance_reservation(const sge_assignment_t
6717
* Checks whether a job that requests a advance reservation can be scheduled.
6718
* The job can be scheduled if the advance reservation is in state "running".
6721
* const sge_assignment_t *a - assignment to match
6724
* static dispatch_t - DISPATCH_OK on success
6725
* DISPATCH_NEVER_CAT on error
6728
* MT-NOTE: match_static_advance_reservation() is MT safe
6729
*******************************************************************************/
6730
static dispatch_t match_static_advance_reservation(const sge_assignment_t *a)
6732
dispatch_t result = DISPATCH_OK;
6734
u_long32 ar_id = lGetUlong(a->job, JB_ar);
6736
DENTER(TOP_LAYER, "match_static_advance_reservation");
6740
if ((ar = lGetElemUlong(a->ar_list, AR_id, ar_id)) != NULL) {
6743
if (!(a->is_job_verify)) {
6744
/* is ar in error and error handling is not soft? */
6745
if (lGetUlong(ar, AR_state) == AR_ERROR && lGetUlong(ar, AR_error_handling) != 0) {
6746
schedd_mes_add(a->job_id, SCHEDD_INFO_ARISINERROR_I, ar_id);
6747
DRETURN(DISPATCH_NEVER_CAT);
6750
/* is ar running? */
6751
if (lGetUlong(ar, AR_state) != AR_RUNNING && lGetUlong(ar, AR_state) != AR_ERROR) {
6752
schedd_mes_add(a->job_id, SCHEDD_INFO_EXECTIME_);
6753
DRETURN(DISPATCH_NEVER_CAT);
6757
/* has user access? */
6758
if ((acl_list = lGetList(ar, AR_xacl_list))) {
6760
for_each(acl_ep, acl_list) {
6761
const char* user = lGetString(acl_ep, ARA_name);
6763
if (!is_hgroup_name(user)) {
6764
if (strcmp(a->user, user) == 0) {
6768
/* skip preattached \@ sign */
6769
const char *acl_name = ++user;
6770
lListElem *userset_list = lGetElemStr(a->acl_list, US_name, acl_name);
6772
if (sge_contained_in_access_list(a->user, a->group, userset_list, NULL) == 1) {
6777
if (acl_ep != NULL){
6778
dstring buffer = DSTRING_INIT;
6779
sge_dstring_sprintf(&buffer, sge_U32CFormat, sge_u32c(ar_id));
6780
schedd_mes_add(a->job_id, SCHEDD_INFO_HASNOPERMISSION_SS, SGE_OBJ_AR, sge_dstring_get_string(&buffer));
6781
sge_dstring_free(&buffer);
6782
DRETURN(DISPATCH_NEVER_CAT);
6786
if ((acl_list = lGetList(ar, AR_acl_list))) {
6788
for_each(acl_ep, acl_list) {
6789
const char *user = lGetString(acl_ep, ARA_name);
6791
if (!is_hgroup_name(user)) {
6792
if (strcmp(a->user, user) == 0) {
6796
/* skip preattached \@ sign */
6797
const char *acl_name = ++user;
6798
lListElem *userset_list = lGetElemStr(a->acl_list, US_name, acl_name);
6800
if (sge_contained_in_access_list(a->user, a->group, userset_list, NULL) == 1) {
6805
if (acl_ep == NULL){
6806
dstring buffer = DSTRING_INIT;
6807
sge_dstring_sprintf(&buffer, sge_U32CFormat, sge_u32c(ar_id));
6808
schedd_mes_add(a->job_id, SCHEDD_INFO_HASNOPERMISSION_SS, SGE_OBJ_AR, sge_dstring_get_string(&buffer));
6809
sge_dstring_free(&buffer);
6810
DRETURN(DISPATCH_NEVER_CAT);
6814
/* should never happen */
6815
DRETURN(DISPATCH_NEVER_CAT);