~ubuntu-branches/ubuntu/utopic/gridengine/utopic

« back to all changes in this revision

Viewing changes to source/daemons/qmaster/sge_sched_thread.c

  • Committer: Bazaar Package Importer
  • Author(s): Mark Hymers
  • Date: 2008-06-25 22:36:13 UTC
  • Revision ID: james.westby@ubuntu.com-20080625223613-tvd9xlhuoct9kyhm
Tags: upstream-6.2~beta2
ImportĀ upstreamĀ versionĀ 6.2~beta2

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*___INFO__MARK_BEGIN__*/
 
2
/*************************************************************************
 
3
 * 
 
4
 *  The Contents of this file are made available subject to the terms of
 
5
 *  the Sun Industry Standards Source License Version 1.2
 
6
 * 
 
7
 *  Sun Microsystems Inc., March, 2001
 
8
 * 
 
9
 * 
 
10
 *  Sun Industry Standards Source License Version 1.2
 
11
 *  =================================================
 
12
 *  The contents of this file are subject to the Sun Industry Standards
 
13
 *  Source License Version 1.2 (the "License"); You may not use this file
 
14
 *  except in compliance with the License. You may obtain a copy of the
 
15
 *  License at http://gridengine.sunsource.net/Gridengine_SISSL_license.html
 
16
 * 
 
17
 *  Software provided under this License is provided on an "AS IS" basis,
 
18
 *  WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING,
 
19
 *  WITHOUT LIMITATION, WARRANTIES THAT THE SOFTWARE IS FREE OF DEFECTS,
 
20
 *  MERCHANTABLE, FIT FOR A PARTICULAR PURPOSE, OR NON-INFRINGING.
 
21
 *  See the License for the specific provisions governing your rights and
 
22
 *  obligations concerning the Software.
 
23
 * 
 
24
 *   The Initial Developer of the Original Code is: Sun Microsystems, Inc.
 
25
 * 
 
26
 *   Copyright: 2001 by Sun Microsystems, Inc.
 
27
 * 
 
28
 *   All Rights Reserved.
 
29
 * 
 
30
 ************************************************************************/
 
31
/*___INFO__MARK_END__*/
 
32
#include <string.h>
 
33
#include <pthread.h>
 
34
 
 
35
#ifdef SOLARISAMD64
 
36
#  include <sys/stream.h>
 
37
#endif  
 
38
 
 
39
/* common/ */
 
40
#include "basis_types.h" 
 
41
#include "sge_mt_init.h" 
 
42
#include "sge.h"
 
43
 
 
44
/* daemons/qmaster/ */
 
45
#include "setup_qmaster.h"
 
46
#include "sge_sched_process_events.h"
 
47
#include "sge_qmaster_threads.h"
 
48
#include "sge_follow.h"
 
49
 
 
50
/* lib/ */
 
51
#include "rmon/sgermon.h"
 
52
#include "sgeobj/sge_answer.h"
 
53
#include "sgeobj/sge_conf.h"
 
54
#include "sgeobj/sge_reportL.h"
 
55
#include "sgeobj/sge_schedd_conf.h"
 
56
#include "sgeobj/sge_job.h"
 
57
#include "sgeobj//sge_jobL.h"
 
58
#include "sgeobj/sge_cqueueL.h"
 
59
#include "sgeobj/sge_qinstanceL.h"
 
60
#include "sgeobj/sge_ja_taskL.h"
 
61
#include "sgeobj/sge_usersetL.h"
 
62
#include "sgeobj/sge_qinstance_state.h"
 
63
#include "sgeobj/sge_userprj.h"
 
64
#include "sgeobj/sge_sharetree.h"
 
65
#include "sgeobj/sge_host.h"
 
66
#include "sgeobj/sge_centry.h"
 
67
#include "sgeobj/sge_ctL.h"
 
68
#include "sgeobj/sge_ckpt.h"
 
69
#include "sgeobj/sge_pe.h"
 
70
#include "sgeobj/sge_range.h"
 
71
#include "lck/sge_mtutil.h"
 
72
#include "mir/sge_mirror.h"
 
73
#include "evc/sge_event_client.h"
 
74
#include "evm/sge_event_master.h"
 
75
 
 
76
#include "uti/sge_unistd.h"
 
77
#include "uti/sge_prog.h"
 
78
#include "uti/sge_log.h"
 
79
#include "uti/sge_profiling.h"
 
80
#include "uti/sge_time.h"
 
81
#include "uti/sge_thread_ctrl.h"
 
82
 
 
83
#include "sge_sched_prepare_data.h"
 
84
#include "sge_sched_job_category.h"
 
85
 
 
86
#include "sge_orders.h"
 
87
#include "sge_job_schedd.h"
 
88
#include "sge_serf.h"
 
89
#include "schedd_message.h"
 
90
#include "msg_schedd.h"
 
91
#include "sge_schedd_text.h"
 
92
#include "schedd_monitor.h"
 
93
#include "sge_interactive_sched.h"
 
94
#include "sge_orderL.h"
 
95
#include "sgeee.h"
 
96
#include "load_correction.h"
 
97
#include "sge_resource_utilization.h"
 
98
#include "suspend_thresholds.h"
 
99
#include "sge_support.h"
 
100
#include "sort_hosts.h"
 
101
#include "debit.h"
 
102
#include "sge_follow.h"
 
103
#include "sge_qmaster_threads.h"
 
104
#include "sge_sched_process_events.h"
 
105
#include "sge_sched_thread.h"
 
106
#include "sge_sched_order.h"
 
107
 
 
108
#if 0
 
109
#define SCHEDULER_TIMEOUT_S 10
 
110
#define SCHEDULER_TIMEOUT_N 0
 
111
#endif
 
112
 
 
113
scheduler_control_t Scheduler_Control = {
 
114
   PTHREAD_MUTEX_INITIALIZER, 
 
115
   PTHREAD_COND_INITIALIZER, 
 
116
   false, false, NULL, 
 
117
   true,  /* rebuild_categories */
 
118
   false /* new_global_config */
 
119
};
 
120
 
 
121
#if 0
 
122
static void rest_busy(sge_evc_class_t *evc);
 
123
 
 
124
static void wait_for_events(void);
 
125
#endif
 
126
 
 
127
static int dispatch_jobs(sge_evc_class_t *evc, scheduler_all_data_t *lists, order_t *orders,
 
128
                         lList **splitted_job_lists[]);
 
129
 
 
130
static dispatch_t
 
131
select_assign_debit(lList **queue_list, lList **dis_queue_list, lListElem *job, lListElem *ja_task,
 
132
                    lList *pe_list, lList *ckpt_list, lList *centry_list, lList *host_list,
 
133
                    lList *acl_list, lList **user_list, lList **group_list, order_t *orders,
 
134
                    double *total_running_job_tickets, int *sort_hostlist, bool is_start,
 
135
                    bool is_reserve, bool is_schedule_based, lList **load_list, lList *hgrp_list, lList *rqs_list,
 
136
                    lList *ar_list, sched_prof_t *pi);
 
137
 
 
138
void st_set_flag_new_global_conf(bool new_value)
 
139
{
 
140
   DENTER(TOP_LAYER, "st_set_flag_new_global_conf");
 
141
   sge_mutex_lock("event_control_mutex", SGE_FUNC, __LINE__, &Scheduler_Control.mutex);
 
142
   Scheduler_Control.new_global_conf = new_value;
 
143
   sge_mutex_unlock("event_control_mutex", SGE_FUNC, __LINE__, &Scheduler_Control.mutex);
 
144
   DRETURN_VOID;
 
145
 
146
 
 
147
bool st_get_flag_new_global_conf(void)
 
148
{
 
149
   bool ret = false;
 
150
 
 
151
   DENTER(TOP_LAYER, "st_get_flag_new_global_conf");
 
152
   sge_mutex_lock("event_control_mutex", SGE_FUNC, __LINE__, &Scheduler_Control.mutex);
 
153
   ret = Scheduler_Control.new_global_conf;
 
154
   sge_mutex_unlock("event_control_mutex", SGE_FUNC, __LINE__, &Scheduler_Control.mutex);
 
155
   DRETURN(ret);
 
156
 
157
 
 
158
int scheduler_method(sge_evc_class_t *evc, lList **answer_list, scheduler_all_data_t *lists, lList **order) 
 
159
{
 
160
   order_t orders = ORDER_INIT;
 
161
   lList **splitted_job_lists[SPLIT_LAST];         /* JB_Type */
 
162
   lList *waiting_due_to_pedecessor_list = NULL;   /* JB_Type */
 
163
   lList *waiting_due_to_time_list = NULL;         /* JB_Type */
 
164
   lList *pending_excluded_list = NULL;            /* JB_Type */
 
165
   lList *suspended_list = NULL;                   /* JB_Type */
 
166
   lList *finished_list = NULL;                    /* JB_Type */
 
167
   lList *pending_list = NULL;                     /* JB_Type */
 
168
   lList *pending_excludedlist = NULL;             /* JB_Type */
 
169
   lList *running_list = NULL;                     /* JB_Type */
 
170
   lList *error_list = NULL;                       /* JB_Type */
 
171
   lList *hold_list = NULL;                        /* JB_Type */
 
172
   lList *not_started_list = NULL;                 /* JB_Type */
 
173
   int prof_job_count, global_mes_count, job_mes_count;
 
174
 
 
175
   int i;
 
176
   
 
177
   DENTER(TOP_LAYER, "scheduler_method");
 
178
   
 
179
   PROF_START_MEASUREMENT(SGE_PROF_CUSTOM0);
 
180
   
 
181
   serf_new_interval(sge_get_gmt());
 
182
   orders.pendingOrderList = *order;
 
183
   *order = NULL;
 
184
   
 
185
   prof_job_count = lGetNumberOfElem(lists->job_list);
 
186
   
 
187
   sconf_reset_jobs();
 
188
   schedd_mes_initialize();
 
189
   schedd_mes_set_logging(1);
 
190
   
 
191
   for (i = SPLIT_FIRST; i < SPLIT_LAST; i++) {
 
192
      splitted_job_lists[i] = NULL;
 
193
   }
 
194
   splitted_job_lists[SPLIT_WAITING_DUE_TO_PREDECESSOR] =
 
195
                                               &waiting_due_to_pedecessor_list;
 
196
   splitted_job_lists[SPLIT_WAITING_DUE_TO_TIME] = &waiting_due_to_time_list;
 
197
   splitted_job_lists[SPLIT_PENDING_EXCLUDED] = &pending_excluded_list;
 
198
   splitted_job_lists[SPLIT_SUSPENDED] = &suspended_list;
 
199
   splitted_job_lists[SPLIT_FINISHED] = &finished_list;
 
200
   splitted_job_lists[SPLIT_PENDING] = &pending_list;
 
201
   splitted_job_lists[SPLIT_PENDING_EXCLUDED_INSTANCES] = &pending_excludedlist;
 
202
   splitted_job_lists[SPLIT_RUNNING] = &running_list;
 
203
   splitted_job_lists[SPLIT_ERROR] = &error_list;
 
204
   splitted_job_lists[SPLIT_HOLD] = &hold_list;
 
205
   splitted_job_lists[SPLIT_NOT_STARTED] = &not_started_list;
 
206
 
 
207
   split_jobs(&(lists->job_list), NULL, lists->all_queue_list,
 
208
              mconf_get_max_aj_instances(), splitted_job_lists, false, false);
 
209
 
 
210
   if (lists->all_queue_list != NULL) { /* add global queue messages */
 
211
      lList *qlp = NULL;
 
212
      lCondition *where = NULL;
 
213
      lEnumeration *what = NULL;
 
214
      const lDescr *dp = lGetListDescr(lists->all_queue_list);
 
215
      lListElem *mes_queues;
 
216
 
 
217
      what = lWhat("%T(ALL)", dp);
 
218
      where = lWhere("%T(%I m= %u "
 
219
         "|| %I m= %u "
 
220
         "|| %I m= %u "
 
221
         "|| %I m= %u "
 
222
         "|| %I m= %u)",
 
223
         dp,
 
224
         QU_state, QI_SUSPENDED,        /* only not suspended queues      */
 
225
         QU_state, QI_SUSPENDED_ON_SUBORDINATE,
 
226
         QU_state, QI_ERROR,            /* no queues in error state       */
 
227
         QU_state, QI_AMBIGUOUS,
 
228
         QU_state, QI_UNKNOWN);         /* only known queues              */
 
229
 
 
230
      if (what == NULL || where == NULL) {
 
231
         DPRINTF(("failed creating where or what describing non available queues\n"));
 
232
      } else {
 
233
         qlp = lSelect("", lists->all_queue_list, where, what);
 
234
 
 
235
         for_each(mes_queues, qlp) {
 
236
            schedd_mes_add_global(SCHEDD_INFO_QUEUENOTAVAIL_,
 
237
                                      lGetString(mes_queues, QU_full_name));
 
238
         }
 
239
 
 
240
      }
 
241
 
 
242
      for_each(mes_queues, lists->dis_queue_list) {
 
243
         schedd_mes_add_global(SCHEDD_INFO_QUEUENOTAVAIL_,
 
244
                                   lGetString(mes_queues, QU_full_name));
 
245
      }
 
246
 
 
247
      schedd_log_list(MSG_SCHEDD_LOGLIST_QUEUESTEMPORARLYNOTAVAILABLEDROPPED,
 
248
                      qlp, QU_full_name);
 
249
      lFreeList(&qlp);
 
250
      lFreeWhere(&where);
 
251
      lFreeWhat(&what);
 
252
   }
 
253
 
 
254
   /**
 
255
    * the actual scheduling 
 
256
    */
 
257
   dispatch_jobs(evc, lists, &orders, splitted_job_lists);
 
258
 
 
259
   /**
 
260
    * post processing 
 
261
    */
 
262
   remove_immediate_jobs(*(splitted_job_lists[SPLIT_PENDING]),
 
263
                         *(splitted_job_lists[SPLIT_RUNNING]), &orders);
 
264
 
 
265
   if (sge_thread_has_shutdown_started() == false) {
 
266
      sge_schedd_send_orders(evc->get_gdi_ctx(evc), &orders, &(orders.configOrderList), NULL, "C: config orders");
 
267
      sge_schedd_send_orders(evc->get_gdi_ctx(evc), &orders, &(orders.jobStartOrderList), NULL, "C: job start orders");
 
268
      sge_schedd_send_orders(evc->get_gdi_ctx(evc), &orders, &(orders.pendingOrderList), NULL, "C: peding ticket orders");
 
269
   }
 
270
 
 
271
   PROF_START_MEASUREMENT(SGE_PROF_SCHEDLIB4);
 
272
   {
 
273
      int clean_jobs[] = {SPLIT_WAITING_DUE_TO_PREDECESSOR,
 
274
                          SPLIT_WAITING_DUE_TO_TIME,
 
275
                          SPLIT_PENDING_EXCLUDED,
 
276
                          SPLIT_PENDING_EXCLUDED_INSTANCES,
 
277
                          SPLIT_ERROR,
 
278
                          SPLIT_HOLD};
 
279
      int i = 0;
 
280
      int max = 6;
 
281
      lListElem *job;
 
282
 
 
283
      for (i = 0; i < max; i++) {
 
284
         /* clear SGEEE fields for queued jobs */
 
285
         for_each(job, *(splitted_job_lists[clean_jobs[i]])) {
 
286
            orders.pendingOrderList = sge_create_orders(orders.pendingOrderList,
 
287
                                                        ORT_clear_pri_info,
 
288
                                                        job, NULL, NULL, false);
 
289
 
 
290
         }
 
291
      }
 
292
 
 
293
   }
 
294
   sge_build_sgeee_orders(lists, NULL,*(splitted_job_lists[SPLIT_PENDING]), NULL,
 
295
                          &orders, false, 0, false);
 
296
 
 
297
   sge_build_sgeee_orders(lists, NULL,*(splitted_job_lists[SPLIT_NOT_STARTED]), NULL,
 
298
                          &orders, false, 0, false);
 
299
 
 
300
   /* generated scheduler messages, thus we have to call it */
 
301
   trash_splitted_jobs(splitted_job_lists);
 
302
 
 
303
   orders.jobStartOrderList= sge_add_schedd_info(orders.jobStartOrderList, &global_mes_count, &job_mes_count);
 
304
 
 
305
   if (prof_is_active(SGE_PROF_SCHEDLIB4)) {
 
306
      prof_stop_measurement(SGE_PROF_SCHEDLIB4, NULL);
 
307
      PROFILING((SGE_EVENT, "PROF: create pending job orders: %.3f s",
 
308
               prof_get_measurement_utime(SGE_PROF_SCHEDLIB4,false, NULL)));
 
309
   }
 
310
 
 
311
   sge_schedd_send_orders(evc->get_gdi_ctx(evc), &orders, &(orders.configOrderList), answer_list, "D: config orders");
 
312
   sge_schedd_send_orders(evc->get_gdi_ctx(evc), &orders, &(orders.jobStartOrderList), answer_list, "D: job start orders");
 
313
   sge_schedd_send_orders(evc->get_gdi_ctx(evc), &orders, &(orders.pendingOrderList), answer_list, "D: pendig ticket orders");
 
314
 
 
315
   if (Master_Request_Queue.order_list != NULL) {
 
316
      sge_schedd_add_gdi_order_request(evc->get_gdi_ctx(evc), &orders, answer_list, &Master_Request_Queue.order_list);
 
317
   }
 
318
 
 
319
   if(prof_is_active(SGE_PROF_CUSTOM0)) {
 
320
      prof_stop_measurement(SGE_PROF_CUSTOM0, NULL);
 
321
 
 
322
      PROFILING((SGE_EVENT, "PROF: scheduled in %.3f (u %.3f + s %.3f = %.3f): %d sequential, %d parallel, %d orders, %d H, %d Q, %d QA, %d J(qw), %d J(r), %d J(s), %d J(h), %d J(e), %d J(x), %d J(all), %d C, %d ACL, %d PE, %d U, %d D, %d PRJ, %d ST, %d CKPT, %d RU, %d gMes, %d jMes, %d/%d pre-send, %d/%d/%d pe-alg\n",
 
323
         prof_get_measurement_wallclock(SGE_PROF_CUSTOM0, true, NULL),
 
324
         prof_get_measurement_utime(SGE_PROF_CUSTOM0, true, NULL),
 
325
         prof_get_measurement_stime(SGE_PROF_CUSTOM0, true, NULL),
 
326
         prof_get_measurement_utime(SGE_PROF_CUSTOM0, true, NULL) +
 
327
         prof_get_measurement_stime(SGE_PROF_CUSTOM0, true, NULL),
 
328
         sconf_get_fast_jobs(),
 
329
         sconf_get_comprehensive_jobs(),
 
330
         (int)orders.numberSendOrders,
 
331
         lGetNumberOfElem(lists->host_list),
 
332
         lGetNumberOfElem(lists->queue_list),
 
333
         lGetNumberOfElem(lists->all_queue_list),
 
334
         (lGetNumberOfElem(*(splitted_job_lists[SPLIT_PENDING])) + lGetNumberOfElem(*(splitted_job_lists[SPLIT_NOT_STARTED]))),
 
335
         lGetNumberOfElem(*(splitted_job_lists[SPLIT_RUNNING])),
 
336
         lGetNumberOfElem(*(splitted_job_lists[SPLIT_SUSPENDED])),
 
337
         lGetNumberOfElem(*(splitted_job_lists[SPLIT_HOLD])),
 
338
         lGetNumberOfElem(*(splitted_job_lists[SPLIT_ERROR])),
 
339
         lGetNumberOfElem(*(splitted_job_lists[SPLIT_FINISHED])),
 
340
         prof_job_count,
 
341
         lGetNumberOfElem(lists->centry_list),
 
342
         lGetNumberOfElem(lists->acl_list),
 
343
         lGetNumberOfElem(lists->pe_list),
 
344
         lGetNumberOfElem(lists->user_list),
 
345
         lGetNumberOfElem(lists->dept_list),
 
346
         lGetNumberOfElem(lists->project_list),
 
347
         lGetNumberOfElem(lists->share_tree),
 
348
         lGetNumberOfElem(lists->ckpt_list),
 
349
         lGetNumberOfElem(lists->running_per_user),
 
350
         global_mes_count,
 
351
         job_mes_count,
 
352
         (int)orders.numberSendOrders,
 
353
         (int)orders.numberSendPackages,
 
354
         sconf_get_pe_alg_value(SCHEDD_PE_LOW_FIRST),
 
355
         sconf_get_pe_alg_value(SCHEDD_PE_BINARY),
 
356
         sconf_get_pe_alg_value(SCHEDD_PE_HIGH_FIRST)
 
357
      ));
 
358
   }
 
359
 
 
360
   PROF_START_MEASUREMENT(SGE_PROF_CUSTOM5);
 
361
 
 
362
   /* free all job lists */
 
363
   for (i = SPLIT_FIRST; i < SPLIT_LAST; i++) {
 
364
      if (splitted_job_lists[i]) {
 
365
         lFreeList(splitted_job_lists[i]);
 
366
         splitted_job_lists[i] = NULL;
 
367
      }
 
368
   }
 
369
 
 
370
   schedd_mes_release();
 
371
   schedd_mes_set_logging(0);
 
372
 
 
373
   if(prof_is_active(SGE_PROF_CUSTOM5)) {
 
374
      prof_stop_measurement(SGE_PROF_CUSTOM5, NULL);
 
375
 
 
376
      PROFILING((SGE_EVENT, "PROF: send orders and cleanup took: %.3f (u %.3f,s %.3f) s",
 
377
         prof_get_measurement_wallclock(SGE_PROF_CUSTOM5, true, NULL),
 
378
         prof_get_measurement_utime(SGE_PROF_CUSTOM5, true, NULL),
 
379
         prof_get_measurement_stime(SGE_PROF_CUSTOM5, true, NULL) ));
 
380
   }
 
381
 
 
382
   DRETURN(0);
 
383
}
 
384
 
 
385
/****** schedd/scheduler/dispatch_jobs() **************************************
 
386
*  NAME
 
387
*     dispatch_jobs() -- dispatches jobs to queues
 
388
*
 
389
*  SYNOPSIS
 
390
*     static int dispatch_jobs(sge_Sdescr_t *lists, lList **orderlist, lList 
 
391
*     **running_jobs, lList **finished_jobs) 
 
392
*
 
393
*  FUNCTION
 
394
*     dispatch_jobs() is responsible for splitting
 
395
*     still running jobs into 'running_jobs' 
 
396
*
 
397
*  INPUTS
 
398
*     sge_Sdescr_t *lists   - all lists
 
399
*     lList **orderlist     - returns orders to be sent to qmaster
 
400
*     lList **running_jobs  - returns all running jobs
 
401
*     lList **finished_jobs - returns all finished jobs
 
402
*
 
403
*  RESULT
 
404
*     0   ok
 
405
*     -1  got inconsistent data
 
406
******************************************************************************/
 
407
static int dispatch_jobs(sge_evc_class_t *evc, scheduler_all_data_t *lists, order_t *orders,
 
408
                         lList **splitted_job_lists[])
 
409
{
 
410
   lList *user_list=NULL, *group_list=NULL;
 
411
   lListElem *orig_job, *cat=NULL;
 
412
   lList *none_avail_queues = NULL;
 
413
   lList *consumable_load_list = NULL;
 
414
   sched_prof_t pi;
 
415
 
 
416
   u_long32 queue_sort_method;
 
417
   u_long32 maxujobs;
 
418
   lList *job_load_adjustments = NULL;
 
419
   double total_running_job_tickets=0;
 
420
   int nreservation = 0;
 
421
   int fast_runs = 0;         /* sequential jobs */
 
422
   int fast_soft_runs = 0;    /* sequential jobs with soft requests */
 
423
   int comprehensive_runs = 0;      /* all kind of pe jobs */
 
424
 
 
425
   int global_lc = 0;
 
426
   int sort_hostlist = 1;       /* hostlist has to be sorted. Info returned by select_assign_debit */
 
427
                                /* e.g. if load correction was computed */
 
428
   int nr_pending_jobs=0;
 
429
   int max_reserve = sconf_get_max_reservations();
 
430
   bool is_schedule_based = (max_reserve > 0) ? true: false;
 
431
 
 
432
   DENTER(TOP_LAYER, "dispatch_jobs");
 
433
 
 
434
   queue_sort_method =  sconf_get_queue_sort_method();
 
435
   maxujobs = sconf_get_maxujobs();
 
436
   job_load_adjustments = sconf_get_job_load_adjustments();
 
437
   sconf_set_host_order_changed(false);
 
438
 
 
439
   /*---------------------------------------------------------------------
 
440
    * LOAD ADJUSTMENT
 
441
    *
 
442
    * Load sensors report load delayed. So we have to increase the load
 
443
    * of an exechost for each job started before load adjustment decay time.
 
444
    * load_adjustment_decay_time is a configuration value. 
 
445
    *---------------------------------------------------------------------*/
 
446
   {
 
447
      u_long32 decay_time = sconf_get_load_adjustment_decay_time();
 
448
      if ( decay_time ) {
 
449
         correct_load(*(splitted_job_lists[SPLIT_RUNNING]),
 
450
            lists->queue_list,
 
451
            lists->host_list,
 
452
            decay_time);
 
453
 
 
454
         /* is there a "global" load value in the job_load_adjustments?
 
455
            if so this will make it necessary to check load thresholds
 
456
            of each queue after each dispatched job */
 
457
         {
 
458
            lListElem *gep, *lcep;
 
459
            if ((gep = host_list_locate(lists->host_list, "global"))) {
 
460
               for_each (lcep, job_load_adjustments) {
 
461
                  const char *attr = lGetString(lcep, CE_name);
 
462
                  if (lGetSubStr(gep, HL_name, attr, EH_load_list)) {
 
463
                     DPRINTF(("GLOBAL LOAD CORRECTION \"%s\"\n", attr));
 
464
                     global_lc = 1;
 
465
                     break;
 
466
                  }
 
467
               }
 
468
            }
 
469
         }
 
470
      }
 
471
   }
 
472
 
 
473
   sconf_set_global_load_correction((global_lc != 0) ? true : false);
 
474
 
 
475
   /* we will assume this time as start time for now assignments */
 
476
   sconf_set_now(sge_get_gmt());
 
477
 
 
478
   if (max_reserve != 0 || lGetNumberOfElem(lists->ar_list) > 0) {
 
479
      lListElem *dis_queue_elem = lFirst(lists->dis_queue_list);
 
480
      /*----------------------------------------------------------------------
 
481
       * ENSURE RUNNING JOBS ARE REFLECTED IN PER RESOURCE SCHEDULE
 
482
       *---------------------------------------------------------------------*/
 
483
 
 
484
      if (dis_queue_elem != NULL) {
 
485
         lAppendList(lists->queue_list, lists->dis_queue_list);
 
486
      }
 
487
 
 
488
      prepare_resource_schedules(*(splitted_job_lists[SPLIT_RUNNING]),
 
489
                                 *(splitted_job_lists[SPLIT_SUSPENDED]),
 
490
                                 lists->pe_list, lists->host_list, lists->queue_list,
 
491
                                 lists->rqs_list, lists->centry_list, lists->acl_list,
 
492
                                 lists->hgrp_list, lists->ar_list, true);
 
493
 
 
494
      if (dis_queue_elem != NULL) {
 
495
         lDechainList(lists->queue_list, &(lists->dis_queue_list), dis_queue_elem);
 
496
      }
 
497
   }
 
498
 
 
499
   /*---------------------------------------------------------------------
 
500
    * CAPACITY CORRECTION
 
501
    *---------------------------------------------------------------------*/
 
502
   correct_capacities(lists->host_list, lists->centry_list);
 
503
 
 
504
   /*---------------------------------------------------------------------
 
505
    * KEEP SUSPEND THRESHOLD QUEUES
 
506
    *---------------------------------------------------------------------*/
 
507
   if (sge_split_queue_load(
 
508
         &(lists->queue_list),    /* queue list                             */
 
509
         &none_avail_queues,            /* list of queues in suspend alarm state  */
 
510
         lists->host_list,
 
511
         lists->centry_list,
 
512
         job_load_adjustments,
 
513
         NULL, false, false,
 
514
         QU_suspend_thresholds)) {
 
515
      lFreeList(&none_avail_queues);
 
516
      lFreeList(&job_load_adjustments);
 
517
      DPRINTF(("couldn't split queue list with regard to suspend thresholds\n"));
 
518
      DRETURN(-1);
 
519
   }
 
520
 
 
521
   unsuspend_job_in_queues(lists->queue_list,
 
522
                           *(splitted_job_lists[SPLIT_SUSPENDED]),
 
523
                           orders);
 
524
   suspend_job_in_queues(none_avail_queues,
 
525
                         *(splitted_job_lists[SPLIT_RUNNING]),
 
526
                         orders);
 
527
   lFreeList(&none_avail_queues);
 
528
 
 
529
   /*---------------------------------------------------------------------
 
530
    * FILTER QUEUES
 
531
    *---------------------------------------------------------------------*/
 
532
   /* split queues into overloaded and non-overloaded queues */
 
533
   if (sge_split_queue_load(&(lists->queue_list), NULL,
 
534
         lists->host_list, lists->centry_list, job_load_adjustments,
 
535
         NULL, false, true, QU_load_thresholds)) {
 
536
      lFreeList(&job_load_adjustments);
 
537
      DPRINTF(("couldn't split queue list concerning load\n"));
 
538
      DRETURN(-1);
 
539
   }
 
540
 
 
541
   /* remove cal_disabled queues - needed them for implementing suspend thresholds */
 
542
   if (sge_split_cal_disabled(&(lists->queue_list), &lists->dis_queue_list)) {
 
543
      DPRINTF(("couldn't split queue list concerning cal_disabled state\n"));
 
544
      lFreeList(&job_load_adjustments);
 
545
      DRETURN(-1);
 
546
   }
 
547
 
 
548
   /* trash disabled queues - needed them for implementing suspend thresholds */
 
549
   if (sge_split_disabled(&(lists->queue_list), &none_avail_queues)) {
 
550
      DPRINTF(("couldn't split queue list concerning disabled state\n"));
 
551
      lFreeList(&none_avail_queues);
 
552
      lFreeList(&job_load_adjustments);
 
553
      DRETURN(-1);
 
554
   }
 
555
 
 
556
   lFreeList(&none_avail_queues);
 
557
   if (sge_split_queue_slots_free(&(lists->queue_list), &none_avail_queues)) {
 
558
      DPRINTF(("couldn't split queue list concerning free slots\n"));
 
559
      lFreeList(&none_avail_queues);
 
560
      lFreeList(&job_load_adjustments);
 
561
      DRETURN(-1);
 
562
   }
 
563
   if (lists->dis_queue_list != NULL) {
 
564
      lAddList(lists->dis_queue_list, &none_avail_queues);
 
565
   } else {
 
566
      lists->dis_queue_list = none_avail_queues;
 
567
      none_avail_queues = NULL;
 
568
   }
 
569
 
 
570
 
 
571
   /*---------------------------------------------------------------------
 
572
    * FILTER JOBS
 
573
    *---------------------------------------------------------------------*/
 
574
 
 
575
   DPRINTF(("STARTING PASS 1 WITH %d PENDING JOBS\n",
 
576
            lGetNumberOfElem(*(splitted_job_lists[SPLIT_PENDING]))));
 
577
 
 
578
   user_list_init_jc(&user_list, splitted_job_lists);
 
579
 
 
580
   nr_pending_jobs = lGetNumberOfElem(*(splitted_job_lists[SPLIT_PENDING]));
 
581
 
 
582
   DPRINTF(("STARTING PASS 2 WITH %d PENDING JOBS\n",nr_pending_jobs ));
 
583
 
 
584
   /*--------------------------------------------------------------------
 
585
    * CALL SGEEE SCHEDULER TO
 
586
    * CALCULATE TICKETS FOR EACH JOB  - IN SUPPORT OF SGEEE
 
587
    *------------------------------------------------------------------*/
 
588
 
 
589
   {
 
590
      int ret;
 
591
      PROF_START_MEASUREMENT(SGE_PROF_CUSTOM1);
 
592
 
 
593
      ret = sgeee_scheduler(lists,
 
594
                    *(splitted_job_lists[SPLIT_RUNNING]),
 
595
                    *(splitted_job_lists[SPLIT_FINISHED]),
 
596
                    *(splitted_job_lists[SPLIT_PENDING]),
 
597
                    orders);
 
598
 
 
599
      sge_schedd_send_orders(evc->get_gdi_ctx(evc), orders, &(orders->configOrderList), NULL, "A: config orders");
 
600
      sge_schedd_send_orders(evc->get_gdi_ctx(evc), orders, &(orders->jobStartOrderList), NULL, "A: job start orders");
 
601
      sge_schedd_send_orders(evc->get_gdi_ctx(evc), orders, &(orders->pendingOrderList), NULL, "A: pendig ticket orders");
 
602
 
 
603
      if (prof_is_active(SGE_PROF_CUSTOM1)) {
 
604
         prof_stop_measurement(SGE_PROF_CUSTOM1, NULL);
 
605
 
 
606
         PROFILING((SGE_EVENT, "PROF: job-order calculation took %.3f s",
 
607
               prof_get_measurement_wallclock(SGE_PROF_CUSTOM1, true, NULL)));
 
608
      }
 
609
 
 
610
      if (ret != 0) {
 
611
         lFreeList(&user_list);
 
612
         lFreeList(&group_list);
 
613
         lFreeList(&job_load_adjustments);
 
614
         DRETURN(-1);
 
615
      }
 
616
   }
 
617
 
 
618
   if ( ((max_reserve == 0) && (lGetNumberOfElem(lists->queue_list) == 0)) || /* no reservation and no queues avail */
 
619
        ((lGetNumberOfElem(lists->queue_list) == 0) && (lGetNumberOfElem(lists->dis_queue_list) == 0))) { /* reservation and no queues avail */
 
620
      DPRINTF(("queues dropped because of overload or full: ALL\n"));
 
621
      schedd_mes_add_global(SCHEDD_INFO_ALLALARMOVERLOADED_);
 
622
      lFreeList(&user_list);
 
623
      lFreeList(&group_list);
 
624
      lFreeList(&job_load_adjustments);
 
625
      DRETURN(0);
 
626
   }
 
627
 
 
628
   job_lists_split_with_reference_to_max_running(splitted_job_lists,
 
629
                                                 &user_list,
 
630
                                                 NULL,
 
631
                                                 maxujobs);
 
632
 
 
633
   nr_pending_jobs = lGetNumberOfElem(*(splitted_job_lists[SPLIT_PENDING]));
 
634
 
 
635
   if (nr_pending_jobs == 0) {
 
636
      /* no jobs to schedule */
 
637
      schedd_log(MSG_SCHEDD_MON_NOPENDJOBSTOPERFORMSCHEDULINGON);
 
638
      lFreeList(&user_list);
 
639
      lFreeList(&group_list);
 
640
      lFreeList(&job_load_adjustments);
 
641
      DRETURN(0);
 
642
   }
 
643
 
 
644
   /* 
 
645
    * Order Jobs in descending order according to tickets and 
 
646
    * then job number 
 
647
    */
 
648
   PROF_START_MEASUREMENT(SGE_PROF_CUSTOM3);
 
649
 
 
650
   sgeee_sort_jobs(splitted_job_lists[SPLIT_PENDING]);
 
651
 
 
652
   if (prof_is_active(SGE_PROF_CUSTOM3)) {
 
653
      prof_stop_measurement(SGE_PROF_CUSTOM3, NULL);
 
654
 
 
655
      PROFILING((SGE_EVENT, "PROF: job sorting took %.3f s",
 
656
            prof_get_measurement_wallclock(SGE_PROF_CUSTOM3, false, NULL)));
 
657
   }
 
658
 
 
659
   /*---------------------------------------------------------------------
 
660
    * SORT HOSTS
 
661
    *---------------------------------------------------------------------*/
 
662
   /* 
 
663
      there are two possibilities for SGE administrators 
 
664
      selecting queues:
 
665
 
 
666
      sort by seq_no
 
667
         the sequence number from configuration is used for sorting
 
668
     
 
669
      sort by load (using a load formula)
 
670
         the least loaded queue gets filled first
 
671
 
 
672
         to do this we sort the hosts using the load formula
 
673
         because there may be more queues than hosts and
 
674
         the queue load is identically to the host load
 
675
 
 
676
   */
 
677
   switch (queue_sort_method) {
 
678
   case QSM_LOAD:
 
679
   case QSM_SEQNUM:  
 
680
   default:
 
681
 
 
682
      DPRINTF(("sorting hosts by load\n"));
 
683
      sort_host_list(lists->host_list, lists->centry_list);
 
684
 
 
685
 
 
686
      break;
 
687
   }
 
688
 
 
689
   /* generate a consumable laod list structure. It stores which queues
 
690
      are using consumables in their load threshold. */
 
691
   sge_create_load_list(lists->queue_list, lists->host_list, lists->centry_list,
 
692
                        &consumable_load_list);
 
693
 
 
694
 
 
695
   /*---------------------------------------------------------------------
 
696
    * DISPATCH JOBS TO QUEUES
 
697
    *---------------------------------------------------------------------*/
 
698
 
 
699
   PROF_START_MEASUREMENT(SGE_PROF_CUSTOM4);
 
700
 
 
701
   /*
 
702
    * loop over the jobs that are left in priority order
 
703
    */
 
704
   {
 
705
      bool is_immediate_array_job = false;
 
706
      bool do_prof = prof_is_active(SGE_PROF_CUSTOM4);
 
707
 
 
708
      struct timeval now, later;
 
709
      double time;
 
710
      gettimeofday(&now, NULL);
 
711
 
 
712
      memset(&pi, 0, sizeof(sched_prof_t));
 
713
 
 
714
      while ( (orig_job = lFirst(*(splitted_job_lists[SPLIT_PENDING]))) != NULL) {
 
715
         dispatch_t result = DISPATCH_NEVER_CAT;
 
716
         u_long32 job_id;
 
717
         bool is_pjob_resort = false;
 
718
         bool is_reserve;
 
719
         bool is_start;
 
720
 
 
721
         job_id = lGetUlong(orig_job, JB_job_number);
 
722
 
 
723
         /* 
 
724
          * We don't try to get a reservation, if 
 
725
          * - reservation is generally disabled 
 
726
          * - maximum number of reservations is exceeded 
 
727
          * - it's not desired for the job
 
728
          * - the job is an immediate one
 
729
          */
 
730
         if (nreservation < max_reserve &&
 
731
             lGetBool(orig_job, JB_reserve) &&
 
732
             !JOB_TYPE_IS_IMMEDIATE(lGetUlong(orig_job, JB_type))) {
 
733
            is_reserve = true;
 
734
         }  
 
735
         else {
 
736
            is_reserve = false;
 
737
         }  
 
738
 
 
739
         /* Don't need to look for a 'now' assignment if the last job 
 
740
            of this category got no 'now' assignement either */
 
741
         is_start = (sge_is_job_category_rejected(orig_job))?false:true;
 
742
 
 
743
         if (is_start || is_reserve) {
 
744
            lListElem *job = NULL;
 
745
            u_long32 ja_task_id;
 
746
            lListElem *ja_task;
 
747
 
 
748
 
 
749
            /* sort the hostlist */
 
750
            if(sort_hostlist) {
 
751
               lPSortList(lists->host_list, "%I+", EH_sort_value);
 
752
               sort_hostlist      = 0;
 
753
               sconf_set_host_order_changed(true);
 
754
            }
 
755
            else {
 
756
               sconf_set_host_order_changed(false);
 
757
            } 
 
758
 
 
759
            is_immediate_array_job = (is_immediate_array_job ||
 
760
                                    (JOB_TYPE_IS_ARRAY(lGetUlong(orig_job, JB_type)) &&
 
761
                                     JOB_TYPE_IS_IMMEDIATE(lGetUlong(orig_job, JB_type)))) ? true : false;
 
762
 
 
763
 
 
764
            if ((job = lCopyElem(orig_job)) == NULL) {
 
765
               break;
 
766
            }
 
767
 
 
768
            if (job_get_next_task(job, &ja_task, &ja_task_id) != 0) {
 
769
               DPRINTF(("Found job "sge_u32" with no job array tasks\n", job_id));
 
770
            }
 
771
            else {
 
772
               DPRINTF(("Found pending job "sge_u32"."sge_u32". Try %sto start and %sto reserve\n",
 
773
                     job_id, ja_task_id, is_start?"":"not ", is_reserve?"":"not "));
 
774
               DPRINTF(("-----------------------------------------\n"));
 
775
 
 
776
               result = select_assign_debit(
 
777
                  &(lists->queue_list),
 
778
                  &(lists->dis_queue_list),
 
779
                  job, ja_task,
 
780
                  lists->pe_list,
 
781
                  lists->ckpt_list,
 
782
                  lists->centry_list,
 
783
                  lists->host_list,
 
784
                  lists->acl_list,
 
785
                  &user_list,
 
786
                  &group_list,
 
787
                  orders,
 
788
                  &total_running_job_tickets,
 
789
                  &sort_hostlist,
 
790
                  is_start,
 
791
                  is_reserve,
 
792
                  is_schedule_based,
 
793
                  &consumable_load_list,
 
794
                  lists->hgrp_list,
 
795
                  lists->rqs_list,
 
796
                  lists->ar_list, 
 
797
                  do_prof?&pi:NULL);
 
798
            }
 
799
            lFreeElem(&job);
 
800
         }    
 
801
 
 
802
         /* collect profiling data */
 
803
         switch (sconf_get_last_dispatch_type()) {
 
804
            case DISPATCH_TYPE_FAST : fast_runs++;
 
805
               break;
 
806
            case DISPATCH_TYPE_FAST_SOFT_REQ : fast_soft_runs++;
 
807
               break;
 
808
            case DISPATCH_TYPE_COMPREHENSIVE : comprehensive_runs++;
 
809
               break;
 
810
         }
 
811
 
 
812
 
 
813
         switch (result) {
 
814
         case DISPATCH_OK: /* now assignment */
 
815
            {
 
816
               char *owner = strdup(lGetString(orig_job, JB_owner));
 
817
               /* here the job got an assignment that will cause it be started immediately */
 
818
 
 
819
               DPRINTF(("Found NOW assignment\n"));
 
820
 
 
821
               schedd_mes_rollback();
 
822
               if (job_count_pending_tasks(orig_job, true) < 2)
 
823
                  is_pjob_resort = false;
 
824
               else
 
825
                  is_pjob_resort = true;
 
826
 
 
827
               job_move_first_pending_to_running(&orig_job, splitted_job_lists);
 
828
 
 
829
               /* 
 
830
                * after sge_move_to_running() orig_job can be removed and job 
 
831
                * should be used instead 
 
832
                */
 
833
               orig_job = NULL;
 
834
 
 
835
               /* 
 
836
                * drop idle jobs that exceed maxujobs limit 
 
837
                * should be done after resort_job() 'cause job is referenced 
 
838
                */
 
839
               job_lists_split_with_reference_to_max_running(splitted_job_lists,
 
840
                                                   &user_list,
 
841
                                                   owner,
 
842
                                                   maxujobs);
 
843
               FREE(owner);
 
844
 
 
845
               /* do not send job start orders inbetween, if we have an immediate array
 
846
                  job. */
 
847
               if (!is_immediate_array_job && (lGetNumberOfElem(orders->jobStartOrderList) > 10)) {
 
848
                  gettimeofday(&later, NULL);
 
849
                  time = later.tv_usec - now.tv_usec;
 
850
                  time = (time / 1000000.0) + (later.tv_sec - now.tv_sec);
 
851
 
 
852
                  if (time > 0.5) {
 
853
                     lList *answer_list = NULL;
 
854
                     sge_schedd_send_orders(evc->get_gdi_ctx(evc), orders, &(orders->configOrderList), &answer_list, "B: config orders");
 
855
                     sge_schedd_send_orders(evc->get_gdi_ctx(evc), orders, &(orders->jobStartOrderList), &answer_list, "B: job start orders");
 
856
                     sge_schedd_send_orders(evc->get_gdi_ctx(evc), orders, &(orders->pendingOrderList), &answer_list, "B: pendig ticket orders");
 
857
                     answer_list_output(&answer_list);
 
858
                     gettimeofday(&now, NULL);
 
859
                  }
 
860
               }
 
861
            }
 
862
            break;
 
863
 
 
864
         case DISPATCH_NOT_AT_TIME: /* reservation */
 
865
            /* here the job got a reservation but can't be started now */
 
866
            DPRINTF(("Got a RESERVATION\n"));
 
867
            nreservation++;
 
868
 
 
869
            /* mark the category as rejected */
 
870
            if ((cat = lGetRef(orig_job, JB_category))) {
 
871
               DPRINTF(("SKIP JOB " sge_u32 " of category '%s' (rc: "sge_u32 ")\n", job_id,
 
872
                           lGetString(cat, CT_str), lGetUlong(cat, CT_refcount)));
 
873
               sge_reject_category(cat);
 
874
            }
 
875
            /* here no reservation was made for a job that couldn't be started now 
 
876
               or the job is not dispatchable at all */
 
877
            schedd_mes_commit(*(splitted_job_lists[SPLIT_PENDING]), 0, cat);
 
878
 
 
879
            /* Remove pending job if there are no pending tasks anymore (including the current) */
 
880
            if (job_count_pending_tasks(orig_job, true) < 2 || (nreservation >= max_reserve )) {
 
881
 
 
882
               lDechainElem(*(splitted_job_lists[SPLIT_PENDING]), orig_job);
 
883
               if ((*(splitted_job_lists[SPLIT_NOT_STARTED])) == NULL) {
 
884
                  *(splitted_job_lists[SPLIT_NOT_STARTED]) = lCreateList("", lGetListDescr(*(splitted_job_lists[SPLIT_PENDING])));
 
885
               }
 
886
               lAppendElem(*(splitted_job_lists[SPLIT_NOT_STARTED]), orig_job);
 
887
               is_pjob_resort = false;
 
888
            }
 
889
            else {
 
890
               u_long32 ja_task_number = range_list_get_first_id(lGetList(orig_job, JB_ja_n_h_ids), NULL);
 
891
               object_delete_range_id(orig_job, NULL, JB_ja_n_h_ids, ja_task_number);
 
892
               is_pjob_resort = true;
 
893
            }
 
894
            orig_job = NULL;
 
895
            break;
 
896
 
 
897
         case DISPATCH_NEVER_CAT: /* never this category */
 
898
            /* before deleting the element mark the category as rejected */
 
899
            if ((cat = lGetRef(orig_job, JB_category))) {
 
900
               DPRINTF(("SKIP JOB " sge_u32 " of category '%s' (rc: "sge_u32 ")\n", job_id,
 
901
                        lGetString(cat, CT_str), lGetUlong(cat, CT_refcount)));
 
902
               sge_reject_category(cat);
 
903
            }
 
904
 
 
905
         case DISPATCH_NEVER_JOB: /* never this particular job */
 
906
 
 
907
            /* here no reservation was made for a job that couldn't be started now 
 
908
               or the job is not dispatchable at all */
 
909
            schedd_mes_commit(*(splitted_job_lists[SPLIT_PENDING]), 0, cat);
 
910
 
 
911
            if (JOB_TYPE_IS_IMMEDIATE(lGetUlong(orig_job, JB_type))) { /* immediate job */
 
912
               lCondition *where = NULL;
 
913
               lListElem *rjob;
 
914
               /* remove job from pending list and generate remove immediate orders 
 
915
                  for all tasks including alreaedy assigned ones */
 
916
               where = lWhere("%T(%I==%u)", JB_Type, JB_job_number, job_id);
 
917
               remove_immediate_job(*(splitted_job_lists[SPLIT_PENDING]), orig_job, orders, 0);
 
918
               if ((rjob = lFindFirst(*(splitted_job_lists[SPLIT_RUNNING]), where)) != NULL)
 
919
                  remove_immediate_job(*(splitted_job_lists[SPLIT_RUNNING]), rjob, orders, 1);
 
920
               lFreeWhere(&where);
 
921
            } else {
 
922
            /* prevent that we get the same job next time again */
 
923
               lDechainElem(*(splitted_job_lists[SPLIT_PENDING]),orig_job);
 
924
               if ((*(splitted_job_lists[SPLIT_NOT_STARTED])) == NULL) {
 
925
                  *(splitted_job_lists[SPLIT_NOT_STARTED]) = lCreateList("", lGetListDescr(*(splitted_job_lists[SPLIT_PENDING])));
 
926
               }
 
927
               lAppendElem(*(splitted_job_lists[SPLIT_NOT_STARTED]), orig_job);
 
928
            }
 
929
            orig_job = NULL;
 
930
            break;
 
931
 
 
932
         case DISPATCH_MISSING_ATTR :  /* should not happen */
 
933
         default:
 
934
            break;
 
935
         }
 
936
 
 
937
         /*------------------------------------------------------------------ 
 
938
          * SGEEE mode - if we dispatch a job sub-task and the job has more
 
939
          * sub-tasks, then the job is still first in the job list.
 
940
          * We need to remove and reinsert the job back into the sorted job
 
941
          * list in case another job is higher priority (i.e. has more tickets)
 
942
          *------------------------------------------------------------------*/
 
943
 
 
944
         /* no more free queues - then exit */
 
945
         if (lGetNumberOfElem(lists->queue_list)==0) {
 
946
            break;
 
947
         }  
 
948
 
 
949
         if (is_pjob_resort) {
 
950
            sgeee_resort_pending_jobs(splitted_job_lists[SPLIT_PENDING]);
 
951
         }
 
952
 
 
953
      } /* end of while */
 
954
   }
 
955
 
 
956
   if (prof_is_active(SGE_PROF_CUSTOM4)) {
 
957
      static bool first_time = true;
 
958
      prof_stop_measurement(SGE_PROF_CUSTOM4, NULL);
 
959
      PROFILING((SGE_EVENT, "PROF: job dispatching took %.3f s (%d fast, %d comp, %d pe, %d res)",
 
960
                 prof_get_measurement_wallclock(SGE_PROF_CUSTOM4, false, NULL),
 
961
                 fast_runs,
 
962
                 fast_soft_runs,
 
963
                 comprehensive_runs,
 
964
                 nreservation));
 
965
 
 
966
      if (first_time) {
 
967
         PROFILING((SGE_EVENT, "PROF: parallel matching   %12.12s %12.12s %12.12s %12.12s %12.12s %12.12s %12.12s", 
 
968
            "global", "rqs", "cqstatic", "hstatic", 
 
969
               "qstatic", "hdynamic", "qdyn"));
 
970
         PROFILING((SGE_EVENT, "PROF: sequential matching %12.12s %12.12s %12.12s %12.12s %12.12s %12.12s %12.12s", 
 
971
            "global", "rqs", "cqstatic", "hstatic", 
 
972
               "qstatic", "hdynamic", "qdyn"));
 
973
         first_time = false;
 
974
      }
 
975
      PROFILING((SGE_EVENT, "PROF: parallel matching   %12d %12d %12d %12d %12d %12d %12d",
 
976
         pi.par_global, pi.par_rqs, pi.par_cqstat, pi.par_hstat, 
 
977
         pi.par_qstat, pi.par_hdyn, pi.par_qdyn));
 
978
      PROFILING((SGE_EVENT, "PROF: sequential matching %12d %12d %12d %12d %12d %12d %12d",
 
979
         pi.seq_global, pi.seq_rqs, pi.seq_cqstat, pi.seq_hstat, 
 
980
         pi.seq_qstat, pi.seq_hdyn, pi.seq_qdyn));
 
981
   }
 
982
 
 
983
   lFreeList(&user_list);
 
984
   lFreeList(&group_list);
 
985
   sge_free_load_list(&consumable_load_list);
 
986
   lFreeList(&job_load_adjustments);
 
987
 
 
988
   DRETURN(0);
 
989
}
 
990
 
 
991
/****** schedd/scheduler/select_assign_debit() ********************************
 
992
*  NAME
 
993
*     select_assign_debit()
 
994
*
 
995
*  FUNCTION
 
996
*     Selects resources for 'job', add appropriate order to the 'orders_list',
 
997
*     debits resources of this job for the next dispatch and sort out no longer
 
998
*     available queues from the 'queue_list'. If no assignment can be made and 
 
999
*     reservation scheduling is enabled a reservation assignment is made if 
 
1000
*     possible. This is done to prevent lower prior jobs eating up resources 
 
1001
*     and thus preventing this job from being started at the earliest point in 
 
1002
*     time.
 
1003
*
 
1004
*  INPUTS
 
1005
*     bool is_start  -   try to find a now assignment 
 
1006
*     bool is_reserve -  try to find a reservation assignment 
 
1007
*
 
1008
*  RESULT
 
1009
*     int - 0 ok got an assignment now
 
1010
*           1 got a reservation assignment
 
1011
*          -1 will never get an assignment for that category
 
1012
*          -2 will never get an assignment for that particular job
 
1013
******************************************************************************/
 
1014
static dispatch_t
 
1015
select_assign_debit(lList **queue_list, lList **dis_queue_list, lListElem *job, lListElem *ja_task,
 
1016
                    lList *pe_list, lList *ckpt_list, lList *centry_list, lList *host_list, lList *acl_list,
 
1017
                    lList **user_list, lList **group_list, order_t *orders, double *total_running_job_tickets,
 
1018
                    int *sort_hostlist, bool is_start,  bool is_reserve, bool is_schedule_based, lList **load_list, lList *hgrp_list,
 
1019
                    lList *rqs_list, lList *ar_list, sched_prof_t *pi)
 
1020
{
 
1021
   lListElem *granted_el;
 
1022
   dispatch_t result = DISPATCH_NOT_AT_TIME;
 
1023
   const char *pe_name, *ckpt_name;
 
1024
   sge_assignment_t a = SGE_ASSIGNMENT_INIT;
 
1025
   bool is_computed_reservation = false;
 
1026
 
 
1027
   DENTER(TOP_LAYER, "select_assign_debit");
 
1028
 
 
1029
   assignment_init(&a, job, ja_task, true);
 
1030
   a.queue_list       = *queue_list;
 
1031
   a.host_list        = host_list;
 
1032
   a.centry_list      = centry_list;
 
1033
   a.acl_list         = acl_list;
 
1034
   a.hgrp_list        = hgrp_list;
 
1035
   a.rqs_list         = rqs_list;
 
1036
   a.ar_list          = ar_list;
 
1037
   a.pi               = pi;
 
1038
 
 
1039
   /* in reservation scheduling mode a non-zero duration always must be defined */
 
1040
   if ( !job_get_duration(&a.duration, job) ) {
 
1041
      schedd_mes_add(a.job_id, SCHEDD_INFO_CKPTNOTFOUND_);
 
1042
      assignment_release(&a);
 
1043
      DRETURN(DISPATCH_NEVER_CAT);
 
1044
   }
 
1045
 
 
1046
   if (is_reserve) {
 
1047
      if (*queue_list == NULL) { 
 
1048
         *queue_list = lCreateList("temp queue", lGetListDescr(*dis_queue_list));
 
1049
         a.queue_list       = *queue_list;
 
1050
      }
 
1051
      a.care_reservation = is_computed_reservation = true;
 
1052
      lAppendList(*queue_list, *dis_queue_list);
 
1053
   }
 
1054
 
 
1055
   a.duration = duration_add_offset(a.duration, sconf_get_duration_offset());
 
1056
   a.is_schedule_based = is_schedule_based;
 
1057
 
 
1058
   /*------------------------------------------------------------------ 
 
1059
    * seek for ckpt interface definition requested by the job 
 
1060
    * in case of ckpt jobs 
 
1061
    *------------------------------------------------------------------*/
 
1062
   if ((ckpt_name = lGetString(job, JB_checkpoint_name))) {
 
1063
      a.ckpt = ckpt_list_locate(ckpt_list, ckpt_name);
 
1064
      if (!a.ckpt) {
 
1065
         schedd_mes_add(a.job_id, SCHEDD_INFO_CKPTNOTFOUND_);
 
1066
         assignment_release(&a);
 
1067
         DRETURN(DISPATCH_NEVER_CAT);
 
1068
      }
 
1069
   }
 
1070
 
 
1071
   /*------------------------------------------------------------------ 
 
1072
    * if a global host pointer exists it is needed everywhere 
 
1073
    * down in the assignment code (tired of searching/passing it).
 
1074
    *------------------------------------------------------------------*/
 
1075
   a.gep = host_list_locate(host_list, SGE_GLOBAL_NAME);
 
1076
 
 
1077
   if ((pe_name = lGetString(job, JB_pe)) != NULL) {
 
1078
   /*------------------------------------------------------------------
 
1079
    * SELECT POSSIBLE QUEUE(S) FOR THIS PE JOB
 
1080
    *------------------------------------------------------------------*/
 
1081
 
 
1082
      if (is_start) {
 
1083
 
 
1084
         DPRINTF(("looking for immediate parallel assignment for job "
 
1085
                  sge_U32CFormat"."sge_U32CFormat" requesting pe \"%s\" duration "sge_U32CFormat"\n",
 
1086
                  a.job_id, a.ja_task_id, pe_name, a.duration));
 
1087
 
 
1088
         a.start = DISPATCH_TIME_NOW;
 
1089
         a.is_reservation = false;
 
1090
         result = sge_select_parallel_environment(&a, pe_list);
 
1091
      }
 
1092
 
 
1093
      if (result == DISPATCH_NOT_AT_TIME) {
 
1094
         if (is_reserve) {
 
1095
            DPRINTF(("looking for parallel reservation for job "
 
1096
               sge_U32CFormat"."sge_U32CFormat" requesting pe \"%s\" duration "sge_U32CFormat"\n",
 
1097
                  a.job_id, a.ja_task_id, pe_name, a.duration));
 
1098
            is_computed_reservation = true;
 
1099
            a.start = DISPATCH_TIME_QUEUE_END;
 
1100
            a.is_reservation = true;
 
1101
            assignment_clear_cache(&a);
 
1102
 
 
1103
            result = sge_select_parallel_environment(&a, pe_list);
 
1104
 
 
1105
            if (result == DISPATCH_OK) {
 
1106
               result = DISPATCH_NOT_AT_TIME; /* this job got a reservation */
 
1107
            }  
 
1108
         } else {
 
1109
            result = DISPATCH_NEVER_CAT;
 
1110
         }  
 
1111
      }
 
1112
 
 
1113
   } else {
 
1114
      /*------------------------------------------------------------------
 
1115
       * SELECT POSSIBLE QUEUE(S) FOR THIS SEQUENTIAL JOB
 
1116
       *------------------------------------------------------------------*/
 
1117
 
 
1118
      a.slots = 1;
 
1119
 
 
1120
      if (is_start) {
 
1121
 
 
1122
         DPRINTF(("looking for immediate sequential assignment for job "
 
1123
                  sge_U32CFormat"."sge_U32CFormat" duration "sge_U32CFormat"\n", a.job_id,
 
1124
                  a.ja_task_id, a.duration));
 
1125
 
 
1126
         a.start = DISPATCH_TIME_NOW;
 
1127
         a.is_reservation = false;
 
1128
         result = sge_sequential_assignment(&a);
 
1129
 
 
1130
         DPRINTF(("sge_sequential_assignment(immediate) returned %d\n", result));
 
1131
      }
 
1132
 
 
1133
      /* try to reserve for jobs that can be dispatched with the current configuration */
 
1134
      if (result == DISPATCH_NOT_AT_TIME) {
 
1135
         if (is_reserve) {
 
1136
            DPRINTF(("looking for sequential reservation for job "
 
1137
               sge_U32CFormat"."sge_U32CFormat" duration "sge_U32CFormat"\n",
 
1138
                  a.job_id, a.ja_task_id, a.duration));
 
1139
            a.start = DISPATCH_TIME_QUEUE_END;
 
1140
            a.is_reservation = true;
 
1141
            assignment_clear_cache(&a);
 
1142
 
 
1143
            result = sge_sequential_assignment(&a);
 
1144
            if (result == DISPATCH_OK) {
 
1145
               result = DISPATCH_NOT_AT_TIME; /* this job got a reservation */
 
1146
            }  
 
1147
         }
 
1148
         else {
 
1149
            result = DISPATCH_NEVER_CAT;
 
1150
         }  
 
1151
      }
 
1152
   }
 
1153
 
 
1154
   /*------------------------------------------------------------------
 
1155
    * BUILD ORDERS LIST THAT TRANSFERS OUR DECISIONS TO QMASTER
 
1156
    *------------------------------------------------------------------*/
 
1157
   if (result == DISPATCH_NEVER_CAT || result == DISPATCH_NEVER_JOB) {
 
1158
      /* failed scheduling this job */
 
1159
      if (JOB_TYPE_IS_IMMEDIATE(lGetUlong(job, JB_type))) { /* immediate job */
 
1160
         /* generate order for removing it at qmaster */
 
1161
         order_remove_immediate(job, ja_task, orders);
 
1162
      }
 
1163
      assignment_release(&a);
 
1164
      DRETURN(result);
 
1165
   }
 
1166
 
 
1167
   if (result == DISPATCH_OK) {
 
1168
      /* in SGEEE we must account for job tickets on hosts due to parallel jobs */
 
1169
      {
 
1170
         double job_tickets_per_slot;
 
1171
         double job_ftickets_per_slot;
 
1172
         double job_otickets_per_slot;
 
1173
         double job_stickets_per_slot;
 
1174
 
 
1175
         job_tickets_per_slot =(double)lGetDouble(ja_task, JAT_tix)/a.slots;
 
1176
         job_ftickets_per_slot =(double)lGetDouble(ja_task, JAT_fticket)/a.slots;
 
1177
         job_otickets_per_slot =(double)lGetDouble(ja_task, JAT_oticket)/a.slots;
 
1178
         job_stickets_per_slot =(double)lGetDouble(ja_task, JAT_sticket)/a.slots;
 
1179
 
 
1180
 
 
1181
         for_each(granted_el, a.gdil) {
 
1182
            u_long32 granted_slots = lGetUlong(granted_el, JG_slots);
 
1183
            lSetDouble(granted_el, JG_ticket, job_tickets_per_slot * granted_slots);
 
1184
            lSetDouble(granted_el, JG_oticket, job_otickets_per_slot  * granted_slots);
 
1185
            lSetDouble(granted_el, JG_fticket, job_ftickets_per_slot  * granted_slots);
 
1186
            lSetDouble(granted_el, JG_sticket, job_stickets_per_slot  * granted_slots);
 
1187
         }
 
1188
         *total_running_job_tickets += lGetDouble(ja_task, JAT_tix);
 
1189
      }
 
1190
 
 
1191
      if (a.pe) {
 
1192
         DPRINTF(("got PE %s\n", lGetString(a.pe, PE_name)));
 
1193
         lSetString(ja_task, JAT_granted_pe, lGetString(a.pe, PE_name));
 
1194
      }
 
1195
 
 
1196
      orders->jobStartOrderList = sge_create_orders(orders->jobStartOrderList, ORT_start_job,
 
1197
            job, ja_task, a.gdil, true);
 
1198
 
 
1199
      /* increase the number of jobs a user/group has running */
 
1200
      sge_inc_jc(user_list, lGetString(job, JB_owner), 1);
 
1201
   }
 
1202
 
 
1203
   /*------------------------------------------------------------------
 
1204
    * DEBIT JOBS RESOURCES IN DIFFERENT OBJECTS
 
1205
    *
 
1206
    * We have not enough time to wait till qmaster updates our lists.
 
1207
    * So we do the work by ourselfs for being able to send more than
 
1208
    * one order per scheduling epoch.
 
1209
    *------------------------------------------------------------------*/
 
1210
 
 
1211
   /* when a job is started *now* the RUE_utilized_now must always change */
 
1212
   /* if jobs with reservations are ahead then we also must change the RUE_utilized */
 
1213
   if (result == DISPATCH_OK) {
 
1214
      if (a.start == DISPATCH_TIME_NOW) {
 
1215
         a.start = sconf_get_now();
 
1216
      }  
 
1217
      debit_scheduled_job(&a, sort_hostlist, orders, true,
 
1218
               SCHEDULING_RECORD_ENTRY_TYPE_STARTING, true);
 
1219
   } else {
 
1220
      debit_scheduled_job(&a, sort_hostlist, orders, false,
 
1221
            SCHEDULING_RECORD_ENTRY_TYPE_RESERVING, true);
 
1222
   }  
 
1223
 
 
1224
   /*------------------------------------------------------------------
 
1225
    * REMOVE QUEUES THAT ARE NO LONGER USEFUL FOR FURTHER SCHEDULING
 
1226
    *------------------------------------------------------------------*/
 
1227
   if (result == DISPATCH_OK || is_computed_reservation) {
 
1228
      lListElem *queue;
 
1229
      lList *disabled_queues = NULL;
 
1230
      bool is_consumable_load_alarm = false;
 
1231
 
 
1232
      if (*load_list == NULL) {
 
1233
         sge_split_suspended(queue_list, dis_queue_list);
 
1234
         sge_split_queue_slots_free(queue_list, dis_queue_list);
 
1235
      }
 
1236
      else {
 
1237
         sge_split_suspended(queue_list, &disabled_queues);
 
1238
         sge_split_queue_slots_free(queue_list, &disabled_queues);
 
1239
 
 
1240
         sge_remove_queue_from_load_list(load_list, disabled_queues);
 
1241
         if (*dis_queue_list == NULL) {
 
1242
            *dis_queue_list = disabled_queues;
 
1243
         }
 
1244
         else {
 
1245
            lAddList(*dis_queue_list, &disabled_queues);
 
1246
         }  
 
1247
         disabled_queues = NULL;
 
1248
      }
 
1249
 
 
1250
 
 
1251
      /* remove all taggs */
 
1252
      for_each(queue, *queue_list) {
 
1253
         lSetUlong(queue, QU_tagged4schedule, 0);
 
1254
      }
 
1255
 
 
1256
      is_consumable_load_alarm = sge_load_list_alarm(*load_list, host_list,
 
1257
                                                     centry_list);
 
1258
 
 
1259
      /* split queues into overloaded and non-overloaded queues */
 
1260
      if (sge_split_queue_load(
 
1261
            queue_list,                                     /* source list                              */
 
1262
            &disabled_queues,
 
1263
            host_list,                                      /* host list contains load values           */
 
1264
            centry_list,
 
1265
            a.load_adjustments,
 
1266
            a.gdil,
 
1267
            is_consumable_load_alarm,
 
1268
            false, QU_load_thresholds)) {   /* use load thresholds here */
 
1269
 
 
1270
         DPRINTF(("couldn't split queue list concerning load\n"));
 
1271
         assignment_release(&a);
 
1272
         DRETURN(DISPATCH_NEVER);
 
1273
      }
 
1274
      if (*load_list != NULL) {
 
1275
         sge_remove_queue_from_load_list(load_list, disabled_queues);
 
1276
      }
 
1277
 
 
1278
      if (*dis_queue_list == NULL) {
 
1279
         *dis_queue_list = disabled_queues;
 
1280
      }
 
1281
      else {
 
1282
         lAddList(*dis_queue_list, &disabled_queues);
 
1283
      }
 
1284
      disabled_queues = NULL;
 
1285
   }
 
1286
 
 
1287
   /* no longer needed - having built the order 
 
1288
      and debited the job everywhere */
 
1289
   assignment_release(&a);
 
1290
   DRETURN(result);
 
1291
}
 
1292
 
 
1293