1
/*___INFO__MARK_BEGIN__*/
2
/*************************************************************************
4
* The Contents of this file are made available subject to the terms of
5
* the Sun Industry Standards Source License Version 1.2
7
* Sun Microsystems Inc., March, 2001
10
* Sun Industry Standards Source License Version 1.2
11
* =================================================
12
* The contents of this file are subject to the Sun Industry Standards
13
* Source License Version 1.2 (the "License"); You may not use this file
14
* except in compliance with the License. You may obtain a copy of the
15
* License at http://gridengine.sunsource.net/Gridengine_SISSL_license.html
17
* Software provided under this License is provided on an "AS IS" basis,
18
* WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING,
19
* WITHOUT LIMITATION, WARRANTIES THAT THE SOFTWARE IS FREE OF DEFECTS,
20
* MERCHANTABLE, FIT FOR A PARTICULAR PURPOSE, OR NON-INFRINGING.
21
* See the License for the specific provisions governing your rights and
22
* obligations concerning the Software.
24
* The Initial Developer of the Original Code is: Sun Microsystems, Inc.
26
* Copyright: 2001 by Sun Microsystems, Inc.
28
* All Rights Reserved.
30
************************************************************************/
31
/*___INFO__MARK_END__*/
36
# include <sys/stream.h>
40
#include "basis_types.h"
41
#include "sge_mt_init.h"
44
/* daemons/qmaster/ */
45
#include "setup_qmaster.h"
46
#include "sge_sched_process_events.h"
47
#include "sge_qmaster_threads.h"
48
#include "sge_follow.h"
51
#include "rmon/sgermon.h"
52
#include "sgeobj/sge_answer.h"
53
#include "sgeobj/sge_conf.h"
54
#include "sgeobj/sge_reportL.h"
55
#include "sgeobj/sge_schedd_conf.h"
56
#include "sgeobj/sge_job.h"
57
#include "sgeobj//sge_jobL.h"
58
#include "sgeobj/sge_cqueueL.h"
59
#include "sgeobj/sge_qinstanceL.h"
60
#include "sgeobj/sge_ja_taskL.h"
61
#include "sgeobj/sge_usersetL.h"
62
#include "sgeobj/sge_qinstance_state.h"
63
#include "sgeobj/sge_userprj.h"
64
#include "sgeobj/sge_sharetree.h"
65
#include "sgeobj/sge_host.h"
66
#include "sgeobj/sge_centry.h"
67
#include "sgeobj/sge_ctL.h"
68
#include "sgeobj/sge_ckpt.h"
69
#include "sgeobj/sge_pe.h"
70
#include "sgeobj/sge_range.h"
71
#include "lck/sge_mtutil.h"
72
#include "mir/sge_mirror.h"
73
#include "evc/sge_event_client.h"
74
#include "evm/sge_event_master.h"
76
#include "uti/sge_unistd.h"
77
#include "uti/sge_prog.h"
78
#include "uti/sge_log.h"
79
#include "uti/sge_profiling.h"
80
#include "uti/sge_time.h"
81
#include "uti/sge_thread_ctrl.h"
83
#include "sge_sched_prepare_data.h"
84
#include "sge_sched_job_category.h"
86
#include "sge_orders.h"
87
#include "sge_job_schedd.h"
89
#include "schedd_message.h"
90
#include "msg_schedd.h"
91
#include "sge_schedd_text.h"
92
#include "schedd_monitor.h"
93
#include "sge_interactive_sched.h"
94
#include "sge_orderL.h"
96
#include "load_correction.h"
97
#include "sge_resource_utilization.h"
98
#include "suspend_thresholds.h"
99
#include "sge_support.h"
100
#include "sort_hosts.h"
102
#include "sge_follow.h"
103
#include "sge_qmaster_threads.h"
104
#include "sge_sched_process_events.h"
105
#include "sge_sched_thread.h"
106
#include "sge_sched_order.h"
109
#define SCHEDULER_TIMEOUT_S 10
110
#define SCHEDULER_TIMEOUT_N 0
113
scheduler_control_t Scheduler_Control = {
114
PTHREAD_MUTEX_INITIALIZER,
115
PTHREAD_COND_INITIALIZER,
117
true, /* rebuild_categories */
118
false /* new_global_config */
122
static void rest_busy(sge_evc_class_t *evc);
124
static void wait_for_events(void);
127
static int dispatch_jobs(sge_evc_class_t *evc, scheduler_all_data_t *lists, order_t *orders,
128
lList **splitted_job_lists[]);
131
select_assign_debit(lList **queue_list, lList **dis_queue_list, lListElem *job, lListElem *ja_task,
132
lList *pe_list, lList *ckpt_list, lList *centry_list, lList *host_list,
133
lList *acl_list, lList **user_list, lList **group_list, order_t *orders,
134
double *total_running_job_tickets, int *sort_hostlist, bool is_start,
135
bool is_reserve, bool is_schedule_based, lList **load_list, lList *hgrp_list, lList *rqs_list,
136
lList *ar_list, sched_prof_t *pi);
138
void st_set_flag_new_global_conf(bool new_value)
140
DENTER(TOP_LAYER, "st_set_flag_new_global_conf");
141
sge_mutex_lock("event_control_mutex", SGE_FUNC, __LINE__, &Scheduler_Control.mutex);
142
Scheduler_Control.new_global_conf = new_value;
143
sge_mutex_unlock("event_control_mutex", SGE_FUNC, __LINE__, &Scheduler_Control.mutex);
147
bool st_get_flag_new_global_conf(void)
151
DENTER(TOP_LAYER, "st_get_flag_new_global_conf");
152
sge_mutex_lock("event_control_mutex", SGE_FUNC, __LINE__, &Scheduler_Control.mutex);
153
ret = Scheduler_Control.new_global_conf;
154
sge_mutex_unlock("event_control_mutex", SGE_FUNC, __LINE__, &Scheduler_Control.mutex);
158
int scheduler_method(sge_evc_class_t *evc, lList **answer_list, scheduler_all_data_t *lists, lList **order)
160
order_t orders = ORDER_INIT;
161
lList **splitted_job_lists[SPLIT_LAST]; /* JB_Type */
162
lList *waiting_due_to_pedecessor_list = NULL; /* JB_Type */
163
lList *waiting_due_to_time_list = NULL; /* JB_Type */
164
lList *pending_excluded_list = NULL; /* JB_Type */
165
lList *suspended_list = NULL; /* JB_Type */
166
lList *finished_list = NULL; /* JB_Type */
167
lList *pending_list = NULL; /* JB_Type */
168
lList *pending_excludedlist = NULL; /* JB_Type */
169
lList *running_list = NULL; /* JB_Type */
170
lList *error_list = NULL; /* JB_Type */
171
lList *hold_list = NULL; /* JB_Type */
172
lList *not_started_list = NULL; /* JB_Type */
173
int prof_job_count, global_mes_count, job_mes_count;
177
DENTER(TOP_LAYER, "scheduler_method");
179
PROF_START_MEASUREMENT(SGE_PROF_CUSTOM0);
181
serf_new_interval(sge_get_gmt());
182
orders.pendingOrderList = *order;
185
prof_job_count = lGetNumberOfElem(lists->job_list);
188
schedd_mes_initialize();
189
schedd_mes_set_logging(1);
191
for (i = SPLIT_FIRST; i < SPLIT_LAST; i++) {
192
splitted_job_lists[i] = NULL;
194
splitted_job_lists[SPLIT_WAITING_DUE_TO_PREDECESSOR] =
195
&waiting_due_to_pedecessor_list;
196
splitted_job_lists[SPLIT_WAITING_DUE_TO_TIME] = &waiting_due_to_time_list;
197
splitted_job_lists[SPLIT_PENDING_EXCLUDED] = &pending_excluded_list;
198
splitted_job_lists[SPLIT_SUSPENDED] = &suspended_list;
199
splitted_job_lists[SPLIT_FINISHED] = &finished_list;
200
splitted_job_lists[SPLIT_PENDING] = &pending_list;
201
splitted_job_lists[SPLIT_PENDING_EXCLUDED_INSTANCES] = &pending_excludedlist;
202
splitted_job_lists[SPLIT_RUNNING] = &running_list;
203
splitted_job_lists[SPLIT_ERROR] = &error_list;
204
splitted_job_lists[SPLIT_HOLD] = &hold_list;
205
splitted_job_lists[SPLIT_NOT_STARTED] = ¬_started_list;
207
split_jobs(&(lists->job_list), NULL, lists->all_queue_list,
208
mconf_get_max_aj_instances(), splitted_job_lists, false, false);
210
if (lists->all_queue_list != NULL) { /* add global queue messages */
212
lCondition *where = NULL;
213
lEnumeration *what = NULL;
214
const lDescr *dp = lGetListDescr(lists->all_queue_list);
215
lListElem *mes_queues;
217
what = lWhat("%T(ALL)", dp);
218
where = lWhere("%T(%I m= %u "
224
QU_state, QI_SUSPENDED, /* only not suspended queues */
225
QU_state, QI_SUSPENDED_ON_SUBORDINATE,
226
QU_state, QI_ERROR, /* no queues in error state */
227
QU_state, QI_AMBIGUOUS,
228
QU_state, QI_UNKNOWN); /* only known queues */
230
if (what == NULL || where == NULL) {
231
DPRINTF(("failed creating where or what describing non available queues\n"));
233
qlp = lSelect("", lists->all_queue_list, where, what);
235
for_each(mes_queues, qlp) {
236
schedd_mes_add_global(SCHEDD_INFO_QUEUENOTAVAIL_,
237
lGetString(mes_queues, QU_full_name));
242
for_each(mes_queues, lists->dis_queue_list) {
243
schedd_mes_add_global(SCHEDD_INFO_QUEUENOTAVAIL_,
244
lGetString(mes_queues, QU_full_name));
247
schedd_log_list(MSG_SCHEDD_LOGLIST_QUEUESTEMPORARLYNOTAVAILABLEDROPPED,
255
* the actual scheduling
257
dispatch_jobs(evc, lists, &orders, splitted_job_lists);
262
remove_immediate_jobs(*(splitted_job_lists[SPLIT_PENDING]),
263
*(splitted_job_lists[SPLIT_RUNNING]), &orders);
265
if (sge_thread_has_shutdown_started() == false) {
266
sge_schedd_send_orders(evc->get_gdi_ctx(evc), &orders, &(orders.configOrderList), NULL, "C: config orders");
267
sge_schedd_send_orders(evc->get_gdi_ctx(evc), &orders, &(orders.jobStartOrderList), NULL, "C: job start orders");
268
sge_schedd_send_orders(evc->get_gdi_ctx(evc), &orders, &(orders.pendingOrderList), NULL, "C: peding ticket orders");
271
PROF_START_MEASUREMENT(SGE_PROF_SCHEDLIB4);
273
int clean_jobs[] = {SPLIT_WAITING_DUE_TO_PREDECESSOR,
274
SPLIT_WAITING_DUE_TO_TIME,
275
SPLIT_PENDING_EXCLUDED,
276
SPLIT_PENDING_EXCLUDED_INSTANCES,
283
for (i = 0; i < max; i++) {
284
/* clear SGEEE fields for queued jobs */
285
for_each(job, *(splitted_job_lists[clean_jobs[i]])) {
286
orders.pendingOrderList = sge_create_orders(orders.pendingOrderList,
288
job, NULL, NULL, false);
294
sge_build_sgeee_orders(lists, NULL,*(splitted_job_lists[SPLIT_PENDING]), NULL,
295
&orders, false, 0, false);
297
sge_build_sgeee_orders(lists, NULL,*(splitted_job_lists[SPLIT_NOT_STARTED]), NULL,
298
&orders, false, 0, false);
300
/* generated scheduler messages, thus we have to call it */
301
trash_splitted_jobs(splitted_job_lists);
303
orders.jobStartOrderList= sge_add_schedd_info(orders.jobStartOrderList, &global_mes_count, &job_mes_count);
305
if (prof_is_active(SGE_PROF_SCHEDLIB4)) {
306
prof_stop_measurement(SGE_PROF_SCHEDLIB4, NULL);
307
PROFILING((SGE_EVENT, "PROF: create pending job orders: %.3f s",
308
prof_get_measurement_utime(SGE_PROF_SCHEDLIB4,false, NULL)));
311
sge_schedd_send_orders(evc->get_gdi_ctx(evc), &orders, &(orders.configOrderList), answer_list, "D: config orders");
312
sge_schedd_send_orders(evc->get_gdi_ctx(evc), &orders, &(orders.jobStartOrderList), answer_list, "D: job start orders");
313
sge_schedd_send_orders(evc->get_gdi_ctx(evc), &orders, &(orders.pendingOrderList), answer_list, "D: pendig ticket orders");
315
if (Master_Request_Queue.order_list != NULL) {
316
sge_schedd_add_gdi_order_request(evc->get_gdi_ctx(evc), &orders, answer_list, &Master_Request_Queue.order_list);
319
if(prof_is_active(SGE_PROF_CUSTOM0)) {
320
prof_stop_measurement(SGE_PROF_CUSTOM0, NULL);
322
PROFILING((SGE_EVENT, "PROF: scheduled in %.3f (u %.3f + s %.3f = %.3f): %d sequential, %d parallel, %d orders, %d H, %d Q, %d QA, %d J(qw), %d J(r), %d J(s), %d J(h), %d J(e), %d J(x), %d J(all), %d C, %d ACL, %d PE, %d U, %d D, %d PRJ, %d ST, %d CKPT, %d RU, %d gMes, %d jMes, %d/%d pre-send, %d/%d/%d pe-alg\n",
323
prof_get_measurement_wallclock(SGE_PROF_CUSTOM0, true, NULL),
324
prof_get_measurement_utime(SGE_PROF_CUSTOM0, true, NULL),
325
prof_get_measurement_stime(SGE_PROF_CUSTOM0, true, NULL),
326
prof_get_measurement_utime(SGE_PROF_CUSTOM0, true, NULL) +
327
prof_get_measurement_stime(SGE_PROF_CUSTOM0, true, NULL),
328
sconf_get_fast_jobs(),
329
sconf_get_comprehensive_jobs(),
330
(int)orders.numberSendOrders,
331
lGetNumberOfElem(lists->host_list),
332
lGetNumberOfElem(lists->queue_list),
333
lGetNumberOfElem(lists->all_queue_list),
334
(lGetNumberOfElem(*(splitted_job_lists[SPLIT_PENDING])) + lGetNumberOfElem(*(splitted_job_lists[SPLIT_NOT_STARTED]))),
335
lGetNumberOfElem(*(splitted_job_lists[SPLIT_RUNNING])),
336
lGetNumberOfElem(*(splitted_job_lists[SPLIT_SUSPENDED])),
337
lGetNumberOfElem(*(splitted_job_lists[SPLIT_HOLD])),
338
lGetNumberOfElem(*(splitted_job_lists[SPLIT_ERROR])),
339
lGetNumberOfElem(*(splitted_job_lists[SPLIT_FINISHED])),
341
lGetNumberOfElem(lists->centry_list),
342
lGetNumberOfElem(lists->acl_list),
343
lGetNumberOfElem(lists->pe_list),
344
lGetNumberOfElem(lists->user_list),
345
lGetNumberOfElem(lists->dept_list),
346
lGetNumberOfElem(lists->project_list),
347
lGetNumberOfElem(lists->share_tree),
348
lGetNumberOfElem(lists->ckpt_list),
349
lGetNumberOfElem(lists->running_per_user),
352
(int)orders.numberSendOrders,
353
(int)orders.numberSendPackages,
354
sconf_get_pe_alg_value(SCHEDD_PE_LOW_FIRST),
355
sconf_get_pe_alg_value(SCHEDD_PE_BINARY),
356
sconf_get_pe_alg_value(SCHEDD_PE_HIGH_FIRST)
360
PROF_START_MEASUREMENT(SGE_PROF_CUSTOM5);
362
/* free all job lists */
363
for (i = SPLIT_FIRST; i < SPLIT_LAST; i++) {
364
if (splitted_job_lists[i]) {
365
lFreeList(splitted_job_lists[i]);
366
splitted_job_lists[i] = NULL;
370
schedd_mes_release();
371
schedd_mes_set_logging(0);
373
if(prof_is_active(SGE_PROF_CUSTOM5)) {
374
prof_stop_measurement(SGE_PROF_CUSTOM5, NULL);
376
PROFILING((SGE_EVENT, "PROF: send orders and cleanup took: %.3f (u %.3f,s %.3f) s",
377
prof_get_measurement_wallclock(SGE_PROF_CUSTOM5, true, NULL),
378
prof_get_measurement_utime(SGE_PROF_CUSTOM5, true, NULL),
379
prof_get_measurement_stime(SGE_PROF_CUSTOM5, true, NULL) ));
385
/****** schedd/scheduler/dispatch_jobs() **************************************
387
* dispatch_jobs() -- dispatches jobs to queues
390
* static int dispatch_jobs(sge_Sdescr_t *lists, lList **orderlist, lList
391
* **running_jobs, lList **finished_jobs)
394
* dispatch_jobs() is responsible for splitting
395
* still running jobs into 'running_jobs'
398
* sge_Sdescr_t *lists - all lists
399
* lList **orderlist - returns orders to be sent to qmaster
400
* lList **running_jobs - returns all running jobs
401
* lList **finished_jobs - returns all finished jobs
405
* -1 got inconsistent data
406
******************************************************************************/
407
static int dispatch_jobs(sge_evc_class_t *evc, scheduler_all_data_t *lists, order_t *orders,
408
lList **splitted_job_lists[])
410
lList *user_list=NULL, *group_list=NULL;
411
lListElem *orig_job, *cat=NULL;
412
lList *none_avail_queues = NULL;
413
lList *consumable_load_list = NULL;
416
u_long32 queue_sort_method;
418
lList *job_load_adjustments = NULL;
419
double total_running_job_tickets=0;
420
int nreservation = 0;
421
int fast_runs = 0; /* sequential jobs */
422
int fast_soft_runs = 0; /* sequential jobs with soft requests */
423
int comprehensive_runs = 0; /* all kind of pe jobs */
426
int sort_hostlist = 1; /* hostlist has to be sorted. Info returned by select_assign_debit */
427
/* e.g. if load correction was computed */
428
int nr_pending_jobs=0;
429
int max_reserve = sconf_get_max_reservations();
430
bool is_schedule_based = (max_reserve > 0) ? true: false;
432
DENTER(TOP_LAYER, "dispatch_jobs");
434
queue_sort_method = sconf_get_queue_sort_method();
435
maxujobs = sconf_get_maxujobs();
436
job_load_adjustments = sconf_get_job_load_adjustments();
437
sconf_set_host_order_changed(false);
439
/*---------------------------------------------------------------------
442
* Load sensors report load delayed. So we have to increase the load
443
* of an exechost for each job started before load adjustment decay time.
444
* load_adjustment_decay_time is a configuration value.
445
*---------------------------------------------------------------------*/
447
u_long32 decay_time = sconf_get_load_adjustment_decay_time();
449
correct_load(*(splitted_job_lists[SPLIT_RUNNING]),
454
/* is there a "global" load value in the job_load_adjustments?
455
if so this will make it necessary to check load thresholds
456
of each queue after each dispatched job */
458
lListElem *gep, *lcep;
459
if ((gep = host_list_locate(lists->host_list, "global"))) {
460
for_each (lcep, job_load_adjustments) {
461
const char *attr = lGetString(lcep, CE_name);
462
if (lGetSubStr(gep, HL_name, attr, EH_load_list)) {
463
DPRINTF(("GLOBAL LOAD CORRECTION \"%s\"\n", attr));
473
sconf_set_global_load_correction((global_lc != 0) ? true : false);
475
/* we will assume this time as start time for now assignments */
476
sconf_set_now(sge_get_gmt());
478
if (max_reserve != 0 || lGetNumberOfElem(lists->ar_list) > 0) {
479
lListElem *dis_queue_elem = lFirst(lists->dis_queue_list);
480
/*----------------------------------------------------------------------
481
* ENSURE RUNNING JOBS ARE REFLECTED IN PER RESOURCE SCHEDULE
482
*---------------------------------------------------------------------*/
484
if (dis_queue_elem != NULL) {
485
lAppendList(lists->queue_list, lists->dis_queue_list);
488
prepare_resource_schedules(*(splitted_job_lists[SPLIT_RUNNING]),
489
*(splitted_job_lists[SPLIT_SUSPENDED]),
490
lists->pe_list, lists->host_list, lists->queue_list,
491
lists->rqs_list, lists->centry_list, lists->acl_list,
492
lists->hgrp_list, lists->ar_list, true);
494
if (dis_queue_elem != NULL) {
495
lDechainList(lists->queue_list, &(lists->dis_queue_list), dis_queue_elem);
499
/*---------------------------------------------------------------------
500
* CAPACITY CORRECTION
501
*---------------------------------------------------------------------*/
502
correct_capacities(lists->host_list, lists->centry_list);
504
/*---------------------------------------------------------------------
505
* KEEP SUSPEND THRESHOLD QUEUES
506
*---------------------------------------------------------------------*/
507
if (sge_split_queue_load(
508
&(lists->queue_list), /* queue list */
509
&none_avail_queues, /* list of queues in suspend alarm state */
512
job_load_adjustments,
514
QU_suspend_thresholds)) {
515
lFreeList(&none_avail_queues);
516
lFreeList(&job_load_adjustments);
517
DPRINTF(("couldn't split queue list with regard to suspend thresholds\n"));
521
unsuspend_job_in_queues(lists->queue_list,
522
*(splitted_job_lists[SPLIT_SUSPENDED]),
524
suspend_job_in_queues(none_avail_queues,
525
*(splitted_job_lists[SPLIT_RUNNING]),
527
lFreeList(&none_avail_queues);
529
/*---------------------------------------------------------------------
531
*---------------------------------------------------------------------*/
532
/* split queues into overloaded and non-overloaded queues */
533
if (sge_split_queue_load(&(lists->queue_list), NULL,
534
lists->host_list, lists->centry_list, job_load_adjustments,
535
NULL, false, true, QU_load_thresholds)) {
536
lFreeList(&job_load_adjustments);
537
DPRINTF(("couldn't split queue list concerning load\n"));
541
/* remove cal_disabled queues - needed them for implementing suspend thresholds */
542
if (sge_split_cal_disabled(&(lists->queue_list), &lists->dis_queue_list)) {
543
DPRINTF(("couldn't split queue list concerning cal_disabled state\n"));
544
lFreeList(&job_load_adjustments);
548
/* trash disabled queues - needed them for implementing suspend thresholds */
549
if (sge_split_disabled(&(lists->queue_list), &none_avail_queues)) {
550
DPRINTF(("couldn't split queue list concerning disabled state\n"));
551
lFreeList(&none_avail_queues);
552
lFreeList(&job_load_adjustments);
556
lFreeList(&none_avail_queues);
557
if (sge_split_queue_slots_free(&(lists->queue_list), &none_avail_queues)) {
558
DPRINTF(("couldn't split queue list concerning free slots\n"));
559
lFreeList(&none_avail_queues);
560
lFreeList(&job_load_adjustments);
563
if (lists->dis_queue_list != NULL) {
564
lAddList(lists->dis_queue_list, &none_avail_queues);
566
lists->dis_queue_list = none_avail_queues;
567
none_avail_queues = NULL;
571
/*---------------------------------------------------------------------
573
*---------------------------------------------------------------------*/
575
DPRINTF(("STARTING PASS 1 WITH %d PENDING JOBS\n",
576
lGetNumberOfElem(*(splitted_job_lists[SPLIT_PENDING]))));
578
user_list_init_jc(&user_list, splitted_job_lists);
580
nr_pending_jobs = lGetNumberOfElem(*(splitted_job_lists[SPLIT_PENDING]));
582
DPRINTF(("STARTING PASS 2 WITH %d PENDING JOBS\n",nr_pending_jobs ));
584
/*--------------------------------------------------------------------
585
* CALL SGEEE SCHEDULER TO
586
* CALCULATE TICKETS FOR EACH JOB - IN SUPPORT OF SGEEE
587
*------------------------------------------------------------------*/
591
PROF_START_MEASUREMENT(SGE_PROF_CUSTOM1);
593
ret = sgeee_scheduler(lists,
594
*(splitted_job_lists[SPLIT_RUNNING]),
595
*(splitted_job_lists[SPLIT_FINISHED]),
596
*(splitted_job_lists[SPLIT_PENDING]),
599
sge_schedd_send_orders(evc->get_gdi_ctx(evc), orders, &(orders->configOrderList), NULL, "A: config orders");
600
sge_schedd_send_orders(evc->get_gdi_ctx(evc), orders, &(orders->jobStartOrderList), NULL, "A: job start orders");
601
sge_schedd_send_orders(evc->get_gdi_ctx(evc), orders, &(orders->pendingOrderList), NULL, "A: pendig ticket orders");
603
if (prof_is_active(SGE_PROF_CUSTOM1)) {
604
prof_stop_measurement(SGE_PROF_CUSTOM1, NULL);
606
PROFILING((SGE_EVENT, "PROF: job-order calculation took %.3f s",
607
prof_get_measurement_wallclock(SGE_PROF_CUSTOM1, true, NULL)));
611
lFreeList(&user_list);
612
lFreeList(&group_list);
613
lFreeList(&job_load_adjustments);
618
if ( ((max_reserve == 0) && (lGetNumberOfElem(lists->queue_list) == 0)) || /* no reservation and no queues avail */
619
((lGetNumberOfElem(lists->queue_list) == 0) && (lGetNumberOfElem(lists->dis_queue_list) == 0))) { /* reservation and no queues avail */
620
DPRINTF(("queues dropped because of overload or full: ALL\n"));
621
schedd_mes_add_global(SCHEDD_INFO_ALLALARMOVERLOADED_);
622
lFreeList(&user_list);
623
lFreeList(&group_list);
624
lFreeList(&job_load_adjustments);
628
job_lists_split_with_reference_to_max_running(splitted_job_lists,
633
nr_pending_jobs = lGetNumberOfElem(*(splitted_job_lists[SPLIT_PENDING]));
635
if (nr_pending_jobs == 0) {
636
/* no jobs to schedule */
637
schedd_log(MSG_SCHEDD_MON_NOPENDJOBSTOPERFORMSCHEDULINGON);
638
lFreeList(&user_list);
639
lFreeList(&group_list);
640
lFreeList(&job_load_adjustments);
645
* Order Jobs in descending order according to tickets and
648
PROF_START_MEASUREMENT(SGE_PROF_CUSTOM3);
650
sgeee_sort_jobs(splitted_job_lists[SPLIT_PENDING]);
652
if (prof_is_active(SGE_PROF_CUSTOM3)) {
653
prof_stop_measurement(SGE_PROF_CUSTOM3, NULL);
655
PROFILING((SGE_EVENT, "PROF: job sorting took %.3f s",
656
prof_get_measurement_wallclock(SGE_PROF_CUSTOM3, false, NULL)));
659
/*---------------------------------------------------------------------
661
*---------------------------------------------------------------------*/
663
there are two possibilities for SGE administrators
667
the sequence number from configuration is used for sorting
669
sort by load (using a load formula)
670
the least loaded queue gets filled first
672
to do this we sort the hosts using the load formula
673
because there may be more queues than hosts and
674
the queue load is identically to the host load
677
switch (queue_sort_method) {
682
DPRINTF(("sorting hosts by load\n"));
683
sort_host_list(lists->host_list, lists->centry_list);
689
/* generate a consumable laod list structure. It stores which queues
690
are using consumables in their load threshold. */
691
sge_create_load_list(lists->queue_list, lists->host_list, lists->centry_list,
692
&consumable_load_list);
695
/*---------------------------------------------------------------------
696
* DISPATCH JOBS TO QUEUES
697
*---------------------------------------------------------------------*/
699
PROF_START_MEASUREMENT(SGE_PROF_CUSTOM4);
702
* loop over the jobs that are left in priority order
705
bool is_immediate_array_job = false;
706
bool do_prof = prof_is_active(SGE_PROF_CUSTOM4);
708
struct timeval now, later;
710
gettimeofday(&now, NULL);
712
memset(&pi, 0, sizeof(sched_prof_t));
714
while ( (orig_job = lFirst(*(splitted_job_lists[SPLIT_PENDING]))) != NULL) {
715
dispatch_t result = DISPATCH_NEVER_CAT;
717
bool is_pjob_resort = false;
721
job_id = lGetUlong(orig_job, JB_job_number);
724
* We don't try to get a reservation, if
725
* - reservation is generally disabled
726
* - maximum number of reservations is exceeded
727
* - it's not desired for the job
728
* - the job is an immediate one
730
if (nreservation < max_reserve &&
731
lGetBool(orig_job, JB_reserve) &&
732
!JOB_TYPE_IS_IMMEDIATE(lGetUlong(orig_job, JB_type))) {
739
/* Don't need to look for a 'now' assignment if the last job
740
of this category got no 'now' assignement either */
741
is_start = (sge_is_job_category_rejected(orig_job))?false:true;
743
if (is_start || is_reserve) {
744
lListElem *job = NULL;
749
/* sort the hostlist */
751
lPSortList(lists->host_list, "%I+", EH_sort_value);
753
sconf_set_host_order_changed(true);
756
sconf_set_host_order_changed(false);
759
is_immediate_array_job = (is_immediate_array_job ||
760
(JOB_TYPE_IS_ARRAY(lGetUlong(orig_job, JB_type)) &&
761
JOB_TYPE_IS_IMMEDIATE(lGetUlong(orig_job, JB_type)))) ? true : false;
764
if ((job = lCopyElem(orig_job)) == NULL) {
768
if (job_get_next_task(job, &ja_task, &ja_task_id) != 0) {
769
DPRINTF(("Found job "sge_u32" with no job array tasks\n", job_id));
772
DPRINTF(("Found pending job "sge_u32"."sge_u32". Try %sto start and %sto reserve\n",
773
job_id, ja_task_id, is_start?"":"not ", is_reserve?"":"not "));
774
DPRINTF(("-----------------------------------------\n"));
776
result = select_assign_debit(
777
&(lists->queue_list),
778
&(lists->dis_queue_list),
788
&total_running_job_tickets,
793
&consumable_load_list,
802
/* collect profiling data */
803
switch (sconf_get_last_dispatch_type()) {
804
case DISPATCH_TYPE_FAST : fast_runs++;
806
case DISPATCH_TYPE_FAST_SOFT_REQ : fast_soft_runs++;
808
case DISPATCH_TYPE_COMPREHENSIVE : comprehensive_runs++;
814
case DISPATCH_OK: /* now assignment */
816
char *owner = strdup(lGetString(orig_job, JB_owner));
817
/* here the job got an assignment that will cause it be started immediately */
819
DPRINTF(("Found NOW assignment\n"));
821
schedd_mes_rollback();
822
if (job_count_pending_tasks(orig_job, true) < 2)
823
is_pjob_resort = false;
825
is_pjob_resort = true;
827
job_move_first_pending_to_running(&orig_job, splitted_job_lists);
830
* after sge_move_to_running() orig_job can be removed and job
831
* should be used instead
836
* drop idle jobs that exceed maxujobs limit
837
* should be done after resort_job() 'cause job is referenced
839
job_lists_split_with_reference_to_max_running(splitted_job_lists,
845
/* do not send job start orders inbetween, if we have an immediate array
847
if (!is_immediate_array_job && (lGetNumberOfElem(orders->jobStartOrderList) > 10)) {
848
gettimeofday(&later, NULL);
849
time = later.tv_usec - now.tv_usec;
850
time = (time / 1000000.0) + (later.tv_sec - now.tv_sec);
853
lList *answer_list = NULL;
854
sge_schedd_send_orders(evc->get_gdi_ctx(evc), orders, &(orders->configOrderList), &answer_list, "B: config orders");
855
sge_schedd_send_orders(evc->get_gdi_ctx(evc), orders, &(orders->jobStartOrderList), &answer_list, "B: job start orders");
856
sge_schedd_send_orders(evc->get_gdi_ctx(evc), orders, &(orders->pendingOrderList), &answer_list, "B: pendig ticket orders");
857
answer_list_output(&answer_list);
858
gettimeofday(&now, NULL);
864
case DISPATCH_NOT_AT_TIME: /* reservation */
865
/* here the job got a reservation but can't be started now */
866
DPRINTF(("Got a RESERVATION\n"));
869
/* mark the category as rejected */
870
if ((cat = lGetRef(orig_job, JB_category))) {
871
DPRINTF(("SKIP JOB " sge_u32 " of category '%s' (rc: "sge_u32 ")\n", job_id,
872
lGetString(cat, CT_str), lGetUlong(cat, CT_refcount)));
873
sge_reject_category(cat);
875
/* here no reservation was made for a job that couldn't be started now
876
or the job is not dispatchable at all */
877
schedd_mes_commit(*(splitted_job_lists[SPLIT_PENDING]), 0, cat);
879
/* Remove pending job if there are no pending tasks anymore (including the current) */
880
if (job_count_pending_tasks(orig_job, true) < 2 || (nreservation >= max_reserve )) {
882
lDechainElem(*(splitted_job_lists[SPLIT_PENDING]), orig_job);
883
if ((*(splitted_job_lists[SPLIT_NOT_STARTED])) == NULL) {
884
*(splitted_job_lists[SPLIT_NOT_STARTED]) = lCreateList("", lGetListDescr(*(splitted_job_lists[SPLIT_PENDING])));
886
lAppendElem(*(splitted_job_lists[SPLIT_NOT_STARTED]), orig_job);
887
is_pjob_resort = false;
890
u_long32 ja_task_number = range_list_get_first_id(lGetList(orig_job, JB_ja_n_h_ids), NULL);
891
object_delete_range_id(orig_job, NULL, JB_ja_n_h_ids, ja_task_number);
892
is_pjob_resort = true;
897
case DISPATCH_NEVER_CAT: /* never this category */
898
/* before deleting the element mark the category as rejected */
899
if ((cat = lGetRef(orig_job, JB_category))) {
900
DPRINTF(("SKIP JOB " sge_u32 " of category '%s' (rc: "sge_u32 ")\n", job_id,
901
lGetString(cat, CT_str), lGetUlong(cat, CT_refcount)));
902
sge_reject_category(cat);
905
case DISPATCH_NEVER_JOB: /* never this particular job */
907
/* here no reservation was made for a job that couldn't be started now
908
or the job is not dispatchable at all */
909
schedd_mes_commit(*(splitted_job_lists[SPLIT_PENDING]), 0, cat);
911
if (JOB_TYPE_IS_IMMEDIATE(lGetUlong(orig_job, JB_type))) { /* immediate job */
912
lCondition *where = NULL;
914
/* remove job from pending list and generate remove immediate orders
915
for all tasks including alreaedy assigned ones */
916
where = lWhere("%T(%I==%u)", JB_Type, JB_job_number, job_id);
917
remove_immediate_job(*(splitted_job_lists[SPLIT_PENDING]), orig_job, orders, 0);
918
if ((rjob = lFindFirst(*(splitted_job_lists[SPLIT_RUNNING]), where)) != NULL)
919
remove_immediate_job(*(splitted_job_lists[SPLIT_RUNNING]), rjob, orders, 1);
922
/* prevent that we get the same job next time again */
923
lDechainElem(*(splitted_job_lists[SPLIT_PENDING]),orig_job);
924
if ((*(splitted_job_lists[SPLIT_NOT_STARTED])) == NULL) {
925
*(splitted_job_lists[SPLIT_NOT_STARTED]) = lCreateList("", lGetListDescr(*(splitted_job_lists[SPLIT_PENDING])));
927
lAppendElem(*(splitted_job_lists[SPLIT_NOT_STARTED]), orig_job);
932
case DISPATCH_MISSING_ATTR : /* should not happen */
937
/*------------------------------------------------------------------
938
* SGEEE mode - if we dispatch a job sub-task and the job has more
939
* sub-tasks, then the job is still first in the job list.
940
* We need to remove and reinsert the job back into the sorted job
941
* list in case another job is higher priority (i.e. has more tickets)
942
*------------------------------------------------------------------*/
944
/* no more free queues - then exit */
945
if (lGetNumberOfElem(lists->queue_list)==0) {
949
if (is_pjob_resort) {
950
sgeee_resort_pending_jobs(splitted_job_lists[SPLIT_PENDING]);
956
if (prof_is_active(SGE_PROF_CUSTOM4)) {
957
static bool first_time = true;
958
prof_stop_measurement(SGE_PROF_CUSTOM4, NULL);
959
PROFILING((SGE_EVENT, "PROF: job dispatching took %.3f s (%d fast, %d comp, %d pe, %d res)",
960
prof_get_measurement_wallclock(SGE_PROF_CUSTOM4, false, NULL),
967
PROFILING((SGE_EVENT, "PROF: parallel matching %12.12s %12.12s %12.12s %12.12s %12.12s %12.12s %12.12s",
968
"global", "rqs", "cqstatic", "hstatic",
969
"qstatic", "hdynamic", "qdyn"));
970
PROFILING((SGE_EVENT, "PROF: sequential matching %12.12s %12.12s %12.12s %12.12s %12.12s %12.12s %12.12s",
971
"global", "rqs", "cqstatic", "hstatic",
972
"qstatic", "hdynamic", "qdyn"));
975
PROFILING((SGE_EVENT, "PROF: parallel matching %12d %12d %12d %12d %12d %12d %12d",
976
pi.par_global, pi.par_rqs, pi.par_cqstat, pi.par_hstat,
977
pi.par_qstat, pi.par_hdyn, pi.par_qdyn));
978
PROFILING((SGE_EVENT, "PROF: sequential matching %12d %12d %12d %12d %12d %12d %12d",
979
pi.seq_global, pi.seq_rqs, pi.seq_cqstat, pi.seq_hstat,
980
pi.seq_qstat, pi.seq_hdyn, pi.seq_qdyn));
983
lFreeList(&user_list);
984
lFreeList(&group_list);
985
sge_free_load_list(&consumable_load_list);
986
lFreeList(&job_load_adjustments);
991
/****** schedd/scheduler/select_assign_debit() ********************************
993
* select_assign_debit()
996
* Selects resources for 'job', add appropriate order to the 'orders_list',
997
* debits resources of this job for the next dispatch and sort out no longer
998
* available queues from the 'queue_list'. If no assignment can be made and
999
* reservation scheduling is enabled a reservation assignment is made if
1000
* possible. This is done to prevent lower prior jobs eating up resources
1001
* and thus preventing this job from being started at the earliest point in
1005
* bool is_start - try to find a now assignment
1006
* bool is_reserve - try to find a reservation assignment
1009
* int - 0 ok got an assignment now
1010
* 1 got a reservation assignment
1011
* -1 will never get an assignment for that category
1012
* -2 will never get an assignment for that particular job
1013
******************************************************************************/
1015
select_assign_debit(lList **queue_list, lList **dis_queue_list, lListElem *job, lListElem *ja_task,
1016
lList *pe_list, lList *ckpt_list, lList *centry_list, lList *host_list, lList *acl_list,
1017
lList **user_list, lList **group_list, order_t *orders, double *total_running_job_tickets,
1018
int *sort_hostlist, bool is_start, bool is_reserve, bool is_schedule_based, lList **load_list, lList *hgrp_list,
1019
lList *rqs_list, lList *ar_list, sched_prof_t *pi)
1021
lListElem *granted_el;
1022
dispatch_t result = DISPATCH_NOT_AT_TIME;
1023
const char *pe_name, *ckpt_name;
1024
sge_assignment_t a = SGE_ASSIGNMENT_INIT;
1025
bool is_computed_reservation = false;
1027
DENTER(TOP_LAYER, "select_assign_debit");
1029
assignment_init(&a, job, ja_task, true);
1030
a.queue_list = *queue_list;
1031
a.host_list = host_list;
1032
a.centry_list = centry_list;
1033
a.acl_list = acl_list;
1034
a.hgrp_list = hgrp_list;
1035
a.rqs_list = rqs_list;
1036
a.ar_list = ar_list;
1039
/* in reservation scheduling mode a non-zero duration always must be defined */
1040
if ( !job_get_duration(&a.duration, job) ) {
1041
schedd_mes_add(a.job_id, SCHEDD_INFO_CKPTNOTFOUND_);
1042
assignment_release(&a);
1043
DRETURN(DISPATCH_NEVER_CAT);
1047
if (*queue_list == NULL) {
1048
*queue_list = lCreateList("temp queue", lGetListDescr(*dis_queue_list));
1049
a.queue_list = *queue_list;
1051
a.care_reservation = is_computed_reservation = true;
1052
lAppendList(*queue_list, *dis_queue_list);
1055
a.duration = duration_add_offset(a.duration, sconf_get_duration_offset());
1056
a.is_schedule_based = is_schedule_based;
1058
/*------------------------------------------------------------------
1059
* seek for ckpt interface definition requested by the job
1060
* in case of ckpt jobs
1061
*------------------------------------------------------------------*/
1062
if ((ckpt_name = lGetString(job, JB_checkpoint_name))) {
1063
a.ckpt = ckpt_list_locate(ckpt_list, ckpt_name);
1065
schedd_mes_add(a.job_id, SCHEDD_INFO_CKPTNOTFOUND_);
1066
assignment_release(&a);
1067
DRETURN(DISPATCH_NEVER_CAT);
1071
/*------------------------------------------------------------------
1072
* if a global host pointer exists it is needed everywhere
1073
* down in the assignment code (tired of searching/passing it).
1074
*------------------------------------------------------------------*/
1075
a.gep = host_list_locate(host_list, SGE_GLOBAL_NAME);
1077
if ((pe_name = lGetString(job, JB_pe)) != NULL) {
1078
/*------------------------------------------------------------------
1079
* SELECT POSSIBLE QUEUE(S) FOR THIS PE JOB
1080
*------------------------------------------------------------------*/
1084
DPRINTF(("looking for immediate parallel assignment for job "
1085
sge_U32CFormat"."sge_U32CFormat" requesting pe \"%s\" duration "sge_U32CFormat"\n",
1086
a.job_id, a.ja_task_id, pe_name, a.duration));
1088
a.start = DISPATCH_TIME_NOW;
1089
a.is_reservation = false;
1090
result = sge_select_parallel_environment(&a, pe_list);
1093
if (result == DISPATCH_NOT_AT_TIME) {
1095
DPRINTF(("looking for parallel reservation for job "
1096
sge_U32CFormat"."sge_U32CFormat" requesting pe \"%s\" duration "sge_U32CFormat"\n",
1097
a.job_id, a.ja_task_id, pe_name, a.duration));
1098
is_computed_reservation = true;
1099
a.start = DISPATCH_TIME_QUEUE_END;
1100
a.is_reservation = true;
1101
assignment_clear_cache(&a);
1103
result = sge_select_parallel_environment(&a, pe_list);
1105
if (result == DISPATCH_OK) {
1106
result = DISPATCH_NOT_AT_TIME; /* this job got a reservation */
1109
result = DISPATCH_NEVER_CAT;
1114
/*------------------------------------------------------------------
1115
* SELECT POSSIBLE QUEUE(S) FOR THIS SEQUENTIAL JOB
1116
*------------------------------------------------------------------*/
1122
DPRINTF(("looking for immediate sequential assignment for job "
1123
sge_U32CFormat"."sge_U32CFormat" duration "sge_U32CFormat"\n", a.job_id,
1124
a.ja_task_id, a.duration));
1126
a.start = DISPATCH_TIME_NOW;
1127
a.is_reservation = false;
1128
result = sge_sequential_assignment(&a);
1130
DPRINTF(("sge_sequential_assignment(immediate) returned %d\n", result));
1133
/* try to reserve for jobs that can be dispatched with the current configuration */
1134
if (result == DISPATCH_NOT_AT_TIME) {
1136
DPRINTF(("looking for sequential reservation for job "
1137
sge_U32CFormat"."sge_U32CFormat" duration "sge_U32CFormat"\n",
1138
a.job_id, a.ja_task_id, a.duration));
1139
a.start = DISPATCH_TIME_QUEUE_END;
1140
a.is_reservation = true;
1141
assignment_clear_cache(&a);
1143
result = sge_sequential_assignment(&a);
1144
if (result == DISPATCH_OK) {
1145
result = DISPATCH_NOT_AT_TIME; /* this job got a reservation */
1149
result = DISPATCH_NEVER_CAT;
1154
/*------------------------------------------------------------------
1155
* BUILD ORDERS LIST THAT TRANSFERS OUR DECISIONS TO QMASTER
1156
*------------------------------------------------------------------*/
1157
if (result == DISPATCH_NEVER_CAT || result == DISPATCH_NEVER_JOB) {
1158
/* failed scheduling this job */
1159
if (JOB_TYPE_IS_IMMEDIATE(lGetUlong(job, JB_type))) { /* immediate job */
1160
/* generate order for removing it at qmaster */
1161
order_remove_immediate(job, ja_task, orders);
1163
assignment_release(&a);
1167
if (result == DISPATCH_OK) {
1168
/* in SGEEE we must account for job tickets on hosts due to parallel jobs */
1170
double job_tickets_per_slot;
1171
double job_ftickets_per_slot;
1172
double job_otickets_per_slot;
1173
double job_stickets_per_slot;
1175
job_tickets_per_slot =(double)lGetDouble(ja_task, JAT_tix)/a.slots;
1176
job_ftickets_per_slot =(double)lGetDouble(ja_task, JAT_fticket)/a.slots;
1177
job_otickets_per_slot =(double)lGetDouble(ja_task, JAT_oticket)/a.slots;
1178
job_stickets_per_slot =(double)lGetDouble(ja_task, JAT_sticket)/a.slots;
1181
for_each(granted_el, a.gdil) {
1182
u_long32 granted_slots = lGetUlong(granted_el, JG_slots);
1183
lSetDouble(granted_el, JG_ticket, job_tickets_per_slot * granted_slots);
1184
lSetDouble(granted_el, JG_oticket, job_otickets_per_slot * granted_slots);
1185
lSetDouble(granted_el, JG_fticket, job_ftickets_per_slot * granted_slots);
1186
lSetDouble(granted_el, JG_sticket, job_stickets_per_slot * granted_slots);
1188
*total_running_job_tickets += lGetDouble(ja_task, JAT_tix);
1192
DPRINTF(("got PE %s\n", lGetString(a.pe, PE_name)));
1193
lSetString(ja_task, JAT_granted_pe, lGetString(a.pe, PE_name));
1196
orders->jobStartOrderList = sge_create_orders(orders->jobStartOrderList, ORT_start_job,
1197
job, ja_task, a.gdil, true);
1199
/* increase the number of jobs a user/group has running */
1200
sge_inc_jc(user_list, lGetString(job, JB_owner), 1);
1203
/*------------------------------------------------------------------
1204
* DEBIT JOBS RESOURCES IN DIFFERENT OBJECTS
1206
* We have not enough time to wait till qmaster updates our lists.
1207
* So we do the work by ourselfs for being able to send more than
1208
* one order per scheduling epoch.
1209
*------------------------------------------------------------------*/
1211
/* when a job is started *now* the RUE_utilized_now must always change */
1212
/* if jobs with reservations are ahead then we also must change the RUE_utilized */
1213
if (result == DISPATCH_OK) {
1214
if (a.start == DISPATCH_TIME_NOW) {
1215
a.start = sconf_get_now();
1217
debit_scheduled_job(&a, sort_hostlist, orders, true,
1218
SCHEDULING_RECORD_ENTRY_TYPE_STARTING, true);
1220
debit_scheduled_job(&a, sort_hostlist, orders, false,
1221
SCHEDULING_RECORD_ENTRY_TYPE_RESERVING, true);
1224
/*------------------------------------------------------------------
1225
* REMOVE QUEUES THAT ARE NO LONGER USEFUL FOR FURTHER SCHEDULING
1226
*------------------------------------------------------------------*/
1227
if (result == DISPATCH_OK || is_computed_reservation) {
1229
lList *disabled_queues = NULL;
1230
bool is_consumable_load_alarm = false;
1232
if (*load_list == NULL) {
1233
sge_split_suspended(queue_list, dis_queue_list);
1234
sge_split_queue_slots_free(queue_list, dis_queue_list);
1237
sge_split_suspended(queue_list, &disabled_queues);
1238
sge_split_queue_slots_free(queue_list, &disabled_queues);
1240
sge_remove_queue_from_load_list(load_list, disabled_queues);
1241
if (*dis_queue_list == NULL) {
1242
*dis_queue_list = disabled_queues;
1245
lAddList(*dis_queue_list, &disabled_queues);
1247
disabled_queues = NULL;
1251
/* remove all taggs */
1252
for_each(queue, *queue_list) {
1253
lSetUlong(queue, QU_tagged4schedule, 0);
1256
is_consumable_load_alarm = sge_load_list_alarm(*load_list, host_list,
1259
/* split queues into overloaded and non-overloaded queues */
1260
if (sge_split_queue_load(
1261
queue_list, /* source list */
1263
host_list, /* host list contains load values */
1267
is_consumable_load_alarm,
1268
false, QU_load_thresholds)) { /* use load thresholds here */
1270
DPRINTF(("couldn't split queue list concerning load\n"));
1271
assignment_release(&a);
1272
DRETURN(DISPATCH_NEVER);
1274
if (*load_list != NULL) {
1275
sge_remove_queue_from_load_list(load_list, disabled_queues);
1278
if (*dis_queue_list == NULL) {
1279
*dis_queue_list = disabled_queues;
1282
lAddList(*dis_queue_list, &disabled_queues);
1284
disabled_queues = NULL;
1287
/* no longer needed - having built the order
1288
and debited the job everywhere */
1289
assignment_release(&a);