2
/*___INFO__MARK_BEGIN__*/
3
/*************************************************************************
5
* The Contents of this file are made available subject to the terms of
6
* the Sun Industry Standards Source License Version 1.2
8
* Sun Microsystems Inc., March, 2001
11
* Sun Industry Standards Source License Version 1.2
12
* =================================================
13
* The contents of this file are subject to the Sun Industry Standards
14
* Source License Version 1.2 (the "License"); You may not use this file
15
* except in compliance with the License. You may obtain a copy of the
16
* License at http://gridengine.sunsource.net/Gridengine_SISSL_license.html
18
* Software provided under this License is provided on an "AS IS" basis,
19
* WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING,
20
* WITHOUT LIMITATION, WARRANTIES THAT THE SOFTWARE IS FREE OF DEFECTS,
21
* MERCHANTABLE, FIT FOR A PARTICULAR PURPOSE, OR NON-INFRINGING.
22
* See the License for the specific provisions governing your rights and
23
* obligations concerning the Software.
25
* The Initial Developer of the Original Code is: Sun Microsystems, Inc.
27
* Copyright: 2001 by Sun Microsystems, Inc.
29
* All Rights Reserved.
31
************************************************************************/
32
/*___INFO__MARK_END__*/
34
#if defined(ALPHA) || defined(SOLARIS)
35
# include <sys/param.h> /* for MAX() macro */
38
#if defined(NECSX4) || defined(NECSX5)
39
# include <sys/types.h>
40
# include <sys/disp.h>
43
#include <sys/types.h>
45
#if defined(COMPILE_DC) || defined(MODULE_TEST)
47
#if defined(IRIX) || defined(ALPHA) || defined(LINUX) || defined(SOLARIS) || defined(NECSX4) || defined(NECSX5) || !defined(MODULE_TEST) || defined(HP1164) || defined(HP1164) || defined(FREEBSD) || defined(DARWIN)
56
#include <sys/types.h>
62
# include <sys/resource.h>
63
# include <sys/systeminfo.h>
64
# include <sys/sched.h>
65
# include <sys/sysmp.h>
66
# include <sys/schedctl.h>
70
# include <sys/category.h>
73
#if defined(ALPHA) || defined(SOLARIS) || defined(LINUX) || defined(FREEBSD) || defined(DARWIN)
74
# include <sys/resource.h>
78
# include <sys/stat.h>
80
# include <sys/signal.h>
81
# if ( IRIX || ALPHA || SOLARIS )
82
# include <sys/fault.h>
84
# include <sys/syscall.h>
85
# include <sys/procfs.h>
88
#include "sge_all_listsL.h"
93
#include "uti/sge_stdio.h"
98
#include "basis_types.h"
99
#include "sge_language.h"
100
#include "sge_conf.h"
101
#include "msg_common.h"
102
#include "msg_execd.h"
104
#include "exec_ifm.h"
106
#include "sge_feature.h"
108
#include "sge_uidgid.h"
109
#include "sge_pe_task.h"
110
#include "sge_ja_task.h"
114
* PTF Data Structures
116
* job_ticket_list list of jobs and their associated tickets which is
117
* sent from the SGE scheduler to the PTF.
119
* job_ticket_entry contains the job ID and job tickets for each job.
121
* job_list local list of jobs and all the associated attributes
123
* job entry contained in the job list.
128
* Make sure JL_usage is always set to at least PTF_MIN_JOB_USAGE
130
* Where do we create the usage list? Probably in the routine that
131
* we get the usage from the Data Collector.
133
* When does usage get reset to PTF_MIN_JOB_USAGE? Possibly, upon
134
* receiving a new job tickets list.
136
* When do we update the usage in the qmaster? Possibly, after
137
* receiving a new job tickets list.
139
* Make sure we don't delete a job from the job list until all the
140
* usage has been collected and reported back to the qmaster.
144
#define INCPTR(type, ptr, nbyte) ptr = (type *)((char *)ptr + nbyte)
146
#define INCJOBPTR(ptr, nbyte) INCPTR(struct psJob_s, ptr, nbyte)
148
#define INCPROCPTR(ptr, nbyte) INCPTR(struct psProc_s, ptr, nbyte)
150
static void ptf_calc_job_proportion_pass0(lListElem *job,
151
u_long * sum_of_job_tickets,
152
double *sum_of_last_usage);
154
static void ptf_calc_job_proportion_pass1(lListElem *job,
155
u_long sum_of_job_tickets,
156
double sum_of_last_usage,
157
double *sum_proportion);
159
static void ptf_calc_job_proportion_pass2(lListElem *job,
160
u_long sum_of_job_tickets,
161
double sum_proportion,
162
double *sum_adjusted_proportion,
163
double *sum_last_interval_usage);
165
static void ptf_calc_job_proportion_pass3(lListElem *job,
166
double sum_adjusted_proportion,
167
double sum_last_interval_usage,
170
double *max_ticket_share);
172
static void ptf_set_OS_scheduling_parameters(lList *job_list, double min_share,
174
double max_ticket_share);
176
static void ptf_get_usage_from_data_collector(void);
178
static lListElem *ptf_process_job(osjobid_t os_job_id,
179
const char *task_id_str,
180
lListElem *new_job, u_long32 jataskid);
182
static lListElem *ptf_get_job_os(lList *job_list, osjobid_t os_job_id,
183
lListElem **job_elem);
185
static void ptf_set_job_priority(lListElem *job);
187
static lList *_ptf_get_job_usage(lListElem *job, u_long ja_task_id,
188
const char *task_id);
190
static osjobid_t ptf_get_osjobid(lListElem *osjob);
193
static void ptf_set_osjobid(lListElem *osjob, osjobid_t osjobid);
195
static void ptf_set_native_job_priority(lListElem *job, lListElem *osjob,
198
static lList *ptf_build_usage_list(char *name, lList *old_usage_list);
200
static lListElem *ptf_get_job(u_long job_id);
204
static void ptf_setpriority_ash(lListElem *job, lListElem *osjob, long pri);
206
#elif defined(CRAY) || defined(NECSX4) || defined(NECSX5)
208
static void ptf_setpriority_jobid(lListElem *job, lListElem *osjob, long pri);
210
#elif defined(ALPHA) || defined(SOLARIS) || defined(LINUX) || defined(FREEBSD) || defined(DARWIN)
212
static void ptf_setpriority_addgrpid(lListElem *job, lListElem *osjob,
217
static lList *ptf_jobs = NULL;
219
static int is_ptf_running = 0;
223
static char irix_release[257] = "6.2";
225
static int got_release = 0;
229
/****** execd/ptf/ptf_get_osjobid() *******************************************
231
* ptf_get_osjobid() -- return the job id
234
* static osjobid_t ptf_get_osjobid(lListElem *osjob)
237
* The function returns the os job id contained in the CULL element
240
* lListElem *osjob - element of type JO_Type
243
* static osjobid_t - os job id (job id / ash / supplementary gid)
244
******************************************************************************/
245
static osjobid_t ptf_get_osjobid(lListElem *osjob)
249
#if !defined(LINUX) && !defined(SOLARIS) && !defined(ALPHA5) && !defined(NECSX4) && !defined(NECSX5) && !defined(DARWIN) && !defined(FREEBSD) && !defined(NETBSD) && !defined(INTERIX) && !defined(HP1164) && !defined(AIX)
251
osjobid = lGetUlong(osjob, JO_OS_job_ID2);
252
osjobid = (osjobid << 32) + lGetUlong(osjob, JO_OS_job_ID);
256
osjobid = lGetUlong(osjob, JO_OS_job_ID);
263
/****** execd/ptf/ptf_set_osjobid() *******************************************
265
* ptf_set_osjobid() -- set os job id
268
* static void ptf_set_osjobid(lListElem *osjob, osjobid_t osjobid)
271
* Set the attribute of "osjob" containing the os job id
274
* lListElem *osjob - element of type JO_Type
275
* osjobid_t osjobid - os job id (job id / ash / supplementary gid)
276
******************************************************************************/
277
static void ptf_set_osjobid(lListElem *osjob, osjobid_t osjobid)
279
#if !defined(LINUX) && !defined(SOLARIS) && !defined(ALPHA5) && !defined(NECSX4) && !defined(NECSX5) && !defined(DARWIN) && !defined(FREEBSD) && !defined(NETBSD) && !defined(INTERIX) && !defined(HP1164) && !defined(AIX)
281
lSetUlong(osjob, JO_OS_job_ID2, ((u_osjobid_t) osjobid) >> 32);
282
lSetUlong(osjob, JO_OS_job_ID, osjobid & 0xffffffff);
286
lSetUlong(osjob, JO_OS_job_ID, osjobid);
291
/****** execd/ptf/ptf_build_usage_list() **************************************
293
* ptf_build_usage_list() -- create a new usage list from an existing list
296
* static lList* ptf_build_usage_list(char *name, lList *old_usage_list)
299
* This method creates a new usage list or makes a copy of a old
300
* usage list and zeros out the usage values.
303
* char *name - name of the new list
304
* lList *old_usage_list - old usage list or NULL
307
* static lList* - copy of "old_usage_list" or a real new one
308
******************************************************************************/
309
static lList *ptf_build_usage_list(char *name, lList *old_usage_list)
314
DENTER(TOP_LAYER, "ptf_build_usage_list");
316
if (old_usage_list) {
317
usage_list = lCopyList(name, old_usage_list);
318
for_each(usage, usage_list) {
319
lSetDouble(usage, UA_value, 0);
322
usage_list = lCreateList(name, UA_Type);
324
usage = lCreateElem(UA_Type);
325
lSetString(usage, UA_name, USAGE_ATTR_IO);
326
lSetDouble(usage, UA_value, 0);
327
lAppendElem(usage_list, usage);
329
usage = lCreateElem(UA_Type);
330
lSetString(usage, UA_name, USAGE_ATTR_IOW);
331
lSetDouble(usage, UA_value, 0);
332
lAppendElem(usage_list, usage);
334
usage = lCreateElem(UA_Type);
335
lSetString(usage, UA_name, USAGE_ATTR_MEM);
336
lSetDouble(usage, UA_value, 0);
337
lAppendElem(usage_list, usage);
339
usage = lCreateElem(UA_Type);
340
lSetString(usage, UA_name, USAGE_ATTR_CPU);
341
lSetDouble(usage, UA_value, 0);
342
lAppendElem(usage_list, usage);
344
#if defined(ALPHA) || defined(LINUX) || defined(SOLARIS) || defined(HP1164) || defined(AIX) || defined(FREEBSD) || defined(DARWIN)
345
usage = lCreateElem(UA_Type);
346
lSetString(usage, UA_name, USAGE_ATTR_VMEM);
347
lSetDouble(usage, UA_value, 0);
348
DPRINTF(("adding usage attribute %s\n", USAGE_ATTR_VMEM));
349
lAppendElem(usage_list, usage);
351
usage = lCreateElem(UA_Type);
352
lSetString(usage, UA_name, USAGE_ATTR_MAXVMEM);
353
lSetDouble(usage, UA_value, 0);
354
DPRINTF(("adding usage attribute %s\n", USAGE_ATTR_MAXVMEM));
355
lAppendElem(usage_list, usage);
363
/****** execd/ptf/ptf_reinit_queue_priority() *********************************
365
* ptf_reinit_queue_priority() -- set static priority
368
* void ptf_reinit_queue_priority(u_long32 job_id, u_long32 ja_task_id,
369
* char *pe_task_id_str, u_long32 priority)
372
* If execd switches from SGEEE to SGE mode this functions is used to
373
* reinitialize static priorities of all jobs currently running.
376
* u_long32 job_id - job id
377
* u_long32 ja_task_id - task number
378
* char *pe_task_id_str - pe task id string or NULL
379
* u_long32 priority - new static priority
380
******************************************************************************/
381
void ptf_reinit_queue_priority(u_long32 job_id, u_long32 ja_task_id,
382
const char *pe_task_id_str, int priority)
385
DENTER(TOP_LAYER, "ptf_reinit_queue_priority");
387
if (!job_id || !ja_task_id) {
392
for_each(job_elem, ptf_jobs) {
396
if (lGetUlong(job_elem, JL_job_ID) == job_id) {
397
DPRINTF(("\tjob id: " sge_u32 "\n", lGetUlong(job_elem, JL_job_ID)));
398
os_job_list = lGetList(job_elem, JL_OS_job_list);
399
for_each(os_job, os_job_list) {
400
if (lGetUlong(os_job, JO_ja_task_ID) == ja_task_id &&
401
((!pe_task_id_str && !lGetString(os_job, JO_task_id_str))
402
|| (pe_task_id_str && lGetString(os_job, JO_task_id_str) &&
403
!strcmp(pe_task_id_str,
404
lGetString(os_job, JO_task_id_str))))) {
406
DPRINTF(("\t\tChanging priority for osjobid: " sge_u32 " jatask "
407
sge_u32 " petask %s\n",
408
lGetUlong(os_job, JO_OS_job_ID),
409
lGetUlong(os_job, JO_ja_task_ID),
410
pe_task_id_str ? pe_task_id_str : ""));
412
ptf_set_native_job_priority(job_elem, os_job,
413
PTF_PRIORITY_TO_NATIVE_PRIORITY
422
/****** execd/ptf/ptf_set_job_priority() **************************************
424
* ptf_set_job_priority() -- Update job priority
427
* static void ptf_set_job_priority(lListElem *job)
430
* The funktion updates the priority of each process which belongs
431
* to "job". The attribute JL_pri of "job" specifies the new priority.
434
* lListElem *job - JL_Type
435
******************************************************************************/
436
static void ptf_set_job_priority(lListElem *job)
439
long pri = lGetLong(job, JL_pri);
441
DENTER(TOP_LAYER, "ptf_set_job_priority");
443
for_each(osjob, lGetList(job, JL_OS_job_list)) {
444
ptf_set_native_job_priority(job, osjob, pri);
449
/****** execd/ptf/ptf_set_native_job_priority() *******************************
451
* ptf_set_native_job_priority() -- Change job priority
454
* static void ptf_set_native_job_priority(lListElem *job, lListElem *osjob,
458
* The function updates the priority of each process which belongs
459
* to "job" and "osjob".
462
* lListElem *job - job
463
* lListElem *osjob - one of the os jobs of "job"
464
* long pri - new priority value
465
******************************************************************************/
466
static void ptf_set_native_job_priority(lListElem *job, lListElem *osjob,
470
ptf_setpriority_ash(job, osjob, pri);
471
#elif defined(CRAY) || defined(NECSX4) || defined(NECSX5)
472
ptf_setpriority_jobid(job, osjob, pri);
473
#elif defined(ALPHA) || defined(SOLARIS) || defined(LINUX) || defined(FREEBSD) || defined(DARWIN)
474
ptf_setpriority_addgrpid(job, osjob, pri);
480
/****** execd/ptf/ptf_setpriority_ash() ***************************************
482
* ptf_setpriority_ash() -- Change priority of processes
485
* static void ptf_setpriority_ash(lListElem *job, lListElem *osjob,
489
* This function is only available for the architecture IRIX.
490
* All processes belonging to "job" and "osjob" will get a new priority.
493
* lListElem *job - job
494
* lListElem *osjob - one of the os jobs of "job"
495
* long pri - new priority
496
******************************************************************************/
497
static void ptf_setpriority_ash(lListElem *job, lListElem *osjob, long pri)
499
static int got_bg_flag = 0;
505
static int first = 1;
508
DENTER(TOP_LAYER, "ptf_setpriority_ash");
512
nprocs = sysmp(MP_NPROCS);
520
bg_flag = (getenv("PTF_NO_BACKGROUND_PRI") == NULL);
526
if (sysinfo(SI_RELEASE, irix_release, sizeof(irix_release)) < 0) {
527
ERROR((SGE_EVENT, MSG_SYSTEM_SYSINFO_SI_RELEASE_CALL_FAILED_S,
534
for_each(pid, lGetList(osjob, JO_pid_list)) {
535
# ifdef PTF_NICE_BASED
536
/* IRIX 6.4 also has some scheduler bugs. Namely, when you use
537
* setpriority and assign a "weightless" priority of 20, setting
538
* a new priority doesn't bring the process out of the "weightless"
539
* class. The only way to bring the process out is to force a
540
* reset of its scheduling class using sched_setscheduler.
542
if (bg_flag && lGetUlong(job, JL_interactive) == 0 &&
543
lGetDouble(job, JL_adjusted_current_proportion) <
544
(PTF_BACKGROUND_JOB_PROPORTION / (double) nprocs)) {
546
/* set the background "weightless" priority */
548
if (setpriority(PRIO_PROCESS, (id_t)lGetUlong(pid, JP_pid),
549
PTF_BACKGROUND_NICE_VALUE) < 0 && errno != ESRCH) {
551
ERROR((SGE_EVENT, MSG_PRIO_JOBXPIDYSETPRIORITYFAILURE_UUS,
552
sge_u32c(lGetUlong(job, JL_job_ID)),
553
sge_u32c(lGetUlong(pid, JP_pid)), strerror(errno)));
556
lSetUlong(pid, JP_background, 1);
560
struct sched_param sp;
562
sp.sched_priority = pri;
564
if (sched_setscheduler((id_t)lGetUlong(pid, JP_pid), SCHED_TS, &sp) < 0
566
ERROR((SGE_EVENT, MSG_SCHEDD_JOBXPIDYSCHEDSETSCHEDULERFAILURE_UUS,
567
sge_u32c(lGetUlong(job, JL_job_ID)),
568
sge_u32c(lGetUlong(pid, JP_pid)), strerror(errno)));
571
lSetUlong(pid, JP_background, 0);
574
# ifdef PTF_NDPRI_BASED
575
if (schedctl(NDPRI, lGetUlong(pid, JP_pid), pri) < 0 && errno != ESRCH) {
576
ERROR((SGE_EVENT, MSG_SCHEDD_JOBXPIDYSCHEDCTLFAILUREX_UUS,
577
sge_u32c(lGetUlong(job, JL_job_ID)), sge_u32c(lGetUlong(pid, JP_pid)),
586
#elif defined(CRAY) || defined(NECSX4) || defined(NECSX5)
588
/****** execd/ptf/ptf_setpriority_jobid() *************************************
590
* ptf_setpriority_ash() -- Change priority of processes
593
* static void ptf_setpriority_jobid(lListElem *job, lListElem *osjob,
597
* This function is only available for the architecture CRAY and NECSX 4/5.
598
* All processes belonging to "job" and "osjob" will get a new priority.
601
* lListElem *job - job
602
* lListElem *osjob - one of the os jobs of "job"
603
* long pri - new priority
604
******************************************************************************/
605
static void ptf_setpriority_jobid(lListElem *job, lListElem *osjob, long pri)
608
DENTER(TOP_LAYER, "ptf_setpriority_jobid");
611
nice = nicem(C_JOB, ptf_get_osjobid(osjob), 0);
613
/* for interactive jobs, don't set "background" priority */
614
if (lGetUlong(job, JL_interactive) && pri == PTF_MIN_PRIORITY) {
617
if (nicem(C_JOB, ptf_get_osjobid(osjob), pri - nice) < 0) {
618
ERROR((SGE_EVENT, MSG_PRIO_JOBXNICEMFAILURE_S,
619
sge_u32c(lGetUlong(job, JL_job_ID)), strerror(errno)));
623
ERROR((SGE_EVENT, MSG_PRIO_JOBXNICEMFAILURE_S,
624
sge_u32c(lGetUlong(job, JL_job_ID)), strerror(errno)));
628
# if defined(NECSX4) || defined(NECSX5)
634
* If the timeslice value is set then set the timeslice
635
* scheduling parameter. This gives nice proportional
636
* scheduling for SX jobs. PTF_MIN_PRIORITY and
637
* PTF_MAX_PRIORITY define the time slice range to use.
638
* Values are in 1/HZ seconds, where HZ is 200.
644
if (lGetDouble(job, JL_timeslice) > 0 && lGetUlong(job, JL_interactive) == 0) {
647
osjobid_t jobid = ptf_get_osjobid(osjob);
649
if (dispcntl(SG_JID, jobid, DCNTL_GET2, &attr) != -1) {
650
attr.tmslice = (int) lGetDouble(job, JL_timeslice);
651
if (dispcntl(SG_JID, jobid, DCNTL_SET2, &attr) == -1) {
652
ERROR((SGE_EVENT, MSG_PRIO_JOBXNICEJFAILURE_S,
653
sge_u32c(lGetUlong(job, JL_job_ID)), strerror(errno)));
662
* NEC nice values range from 0 to 39.
663
* According to the nicej(2) man page, nicej returns the new
664
* nice value minus 20, so -1 is a valid return code. Errno
665
* is not set upon a successful call, so we can't tell the
666
* difference between success and failure if we are setting
667
* the nice value to 19. We don't generate an error message
668
* if the nicej(2) call fails and we are trying to set the
669
* nice value to 19 (no big deal).
673
nice = nicej(ptf_get_osjobid(osjob), 0);
674
if (nice != -1 || pri == 19) {
675
if (nicej(ptf_get_osjobid(osjob), pri - (nice+20)) == -1 && pri != 19) {
676
if (errno != ESRCH) {
677
ERROR((SGE_EVENT, MSG_PRIO_JOBXNICEJFAILURE_S,
678
sge_u32c(lGetUlong(job, JL_job_ID)), strerror(errno)));
681
DPRINTF(("NICEJ(" sge_u32 ", " sge_u32 ")\n",
682
(u_long32) ptf_get_osjobid(osjob), (u_long32) pri));
692
#elif defined(ALPHA) || defined(SOLARIS) || defined(LINUX) || defined(FREEBSD) || defined(DARWIN)
694
/****** execd/ptf/ptf_setpriority_addgrpid() **********************************
696
* ptf_setpriority_addgrpid() -- Change priority of processes
699
* static void ptf_setpriority_jobid(lListElem *job, lListElem *osjob,
703
* This function is only available for the architecture SOLARIS, ALPHA,
704
* LINUX, DARWIN and FREEBSD. All processes belonging to "job" and "osjob" will
705
* get a new priority.
707
* This function assumes the the "max" priority is smaller than the "min"
711
* lListElem *job - job
712
* lListElem *osjob - one of the os jobs of "job"
713
* long pri - new priority
714
******************************************************************************/
715
static void ptf_setpriority_addgrpid(lListElem *job, lListElem *osjob,
720
DENTER(TOP_LAYER, "ptf_setpriority_addgrpid");
722
# ifdef USE_ALPHA_PGRPS
725
* set the priority for the entire process group
727
if (setpriority(PRIO_PGRP, ptf_get_osjobid(osjob), pri) < 0 &&
729
ERROR((SGE_EVENT, MSG_PRIO_JOBXSETPRIORITYFAILURE_DS,
730
sge_u32c(lGetUlong(job, JL_job_ID)), strerror(errno)));
736
* set the priority for each active process
738
for_each(pid, lGetList(osjob, JO_pid_list)) {
739
if (setpriority(PRIO_PROCESS, lGetUlong(pid, JP_pid), pri) < 0 &&
741
ERROR((SGE_EVENT, MSG_PRIO_JOBXPIDYSETPRIORITYFAILURE_UUS,
742
sge_u32c(lGetUlong(job, JL_job_ID)), sge_u32c(lGetUlong(pid, JP_pid)),
745
DPRINTF(("Changing Priority of process " sge_u32 " to " sge_u32 "\n",
746
sge_u32c(lGetUlong(pid, JP_pid)), sge_u32c((u_long32) pri)));
755
/****** execd/ptf/ptf_get_job() ***********************************************
757
* ptf_get_job() -- look up the job for the supplied job_id and return it
760
* static lListElem* ptf_get_job(u_long job_id)
763
* look up the job for the supplied job_id and return it
766
* u_long job_id - SGE job id
769
* static lListElem* - JL_Type
770
******************************************************************************/
771
static lListElem *ptf_get_job(u_long job_id)
776
where = lWhere("%T(%I == %u)", JL_Type, JL_job_ID, job_id);
777
job = lFindFirst(ptf_jobs, where);
782
/****** execd/ptf/ptf_get_job_os() ********************************************
784
* ptf_get_job_os() -- look up the job for the supplied OS job_id
787
* static lListElem* ptf_get_job_os(lList *job_list,
788
* osjobid_t os_job_id,
789
* lListElem **job_elem)
792
* This functions tries to find a os job element (JO_Type) with
793
* "os_job_id" within the list of os jobs of "job_elem". If "job_elem"
794
* is not provided the function will look up the whole "job_list".
797
* lList *job_list - List of all known jobs (JL_Type)
798
* osjobid_t os_job_id - os job id (ash, job id, supplementary gid)
799
* lListElem **job_elem - Pointer to a job element pointer (JL_Type)
800
* - pointer to a NULL pointer
801
* => *job_elem will contain the corresponding
802
* job element pointer when the function
803
* returns successfully
807
* static lListElem* - osjob (JO_Type)
808
* or NULL if it was not found.
809
******************************************************************************/
810
static lListElem *ptf_get_job_os(lList *job_list, osjobid_t os_job_id,
811
lListElem **job_elem)
813
lListElem *job, *osjob = NULL;
816
DENTER(TOP_LAYER, "ptf_get_job_os");
818
#if defined(LINUX) || defined(SOLARIS) || defined(ALPHA5) || defined(NECSX4) || defined(NECSX5) || defined(DARWIN) || defined(FREEBSD) || defined(NETBSD) || defined(INTERIX) || defined(HP1164) || defined(AIX)
819
where = lWhere("%T(%I == %u)", JO_Type, JO_OS_job_ID, (u_long32) os_job_id);
821
where = lWhere("%T(%I == %u && %I == %u)", JO_Type,
822
JO_OS_job_ID, (u_long) (os_job_id & 0xffffffff),
823
JO_OS_job_ID2, (u_long) (((u_osjobid_t) os_job_id) >> 32));
827
CRITICAL((SGE_EVENT, MSG_WHERE_FAILEDTOBUILDWHERECONDITION));
831
if (job_elem && (*job_elem)) {
832
osjob = lFindFirst(lGetList(*job_elem, JL_OS_job_list), where);
834
for_each(job, job_list) {
835
osjob = lFindFirst(lGetList(job, JL_OS_job_list), where);
841
*job_elem = osjob ? job : NULL;
850
/*--------------------------------------------------------------------
851
* ptf_process_job - process a job received from the SGE scheduler.
852
* This assumes that new jobs can be received after a job ticket
853
* list has been sent. The new jobs will have an associated number
854
* of tickets which will be updated in the job_list maintained by
856
*--------------------------------------------------------------------*/
858
static lListElem *ptf_process_job(osjobid_t os_job_id, const char *task_id_str,
859
lListElem *new_job, u_long32 jataskid)
861
lListElem *job, *osjob;
862
lList *job_list = ptf_jobs;
863
u_long job_id = lGetUlong(new_job, JB_job_number);
865
lGetDouble(lFirst(lGetList(new_job, JB_ja_tasks)), JAT_tix);
866
u_long interactive = (lGetString(new_job, JB_script_file) == NULL);
868
DENTER(TOP_LAYER, "ptf_process_job");
871
* Add the job to the job list, if it does not already exist
877
* job == NULL && osjobid == 0
879
* job == NULL && osjobid > 0
880
* add osjob job && osjobid > 0 search by osjobid if osjob
883
* add osjob job && osjobid == 0 skip
885
job = ptf_get_job(job_id);
886
if (os_job_id == 0) {
895
job = lCreateElem(JL_Type);
896
lAppendElem(job_list, job);
897
lSetUlong(job, JL_job_ID, job_id);
899
osjoblist = lGetList(job, JL_OS_job_list);
900
osjob = ptf_get_job_os(osjoblist, os_job_id, &job);
903
osjoblist = lCreateList("osjoblist", JO_Type);
904
lSetList(job, JL_OS_job_list, osjoblist);
906
osjob = lCreateElem(JO_Type);
907
lSetUlong(osjob, JO_ja_task_ID, jataskid);
908
lAppendElem(osjoblist, osjob);
909
lSetList(osjob, JO_usage_list,
910
ptf_build_usage_list("usagelist", NULL));
911
ptf_set_osjobid(osjob, os_job_id);
914
lSetString(osjob, JO_task_id_str, task_id_str);
919
* set number of tickets in job entry
921
lSetUlong(job, JL_tickets, (u_long32)MAX(job_tickets, 1));
924
* set interactive job flag
927
lSetUlong(job, JL_interactive, 1);
934
/****** execd/ptf/ptf_get_usage_from_data_collector() *************************
936
* ptf_get_usage_from_data_collector() -- get usage from PDC
939
* static void ptf_get_usage_from_data_collector(void)
942
* get the usage for all the jobs in the job ticket list and update
945
* call data collector routine with a list of all the jobs in the job_list
946
* for each job in the job_list
947
* update job.usage with data collector info
948
* update list of process IDs associated with job end
950
******************************************************************************/
951
static void ptf_get_usage_from_data_collector(void)
955
lListElem *job, *osjob;
956
lList *pidlist, *oldpidlist;
957
struct psJob_s *jobs, *ojobs, *tmp_jobs;
958
struct psProc_s *procs;
964
DENTER(TOP_LAYER, "ptf_get_usage_from_data_collector");
966
ojobs = jobs = psGetAllJobs();
968
jobcount = *(uint64 *) jobs;
971
INCJOBPTR(jobs, sizeof(uint64));
976
for (i = 0; i < jobcount; i++) {
979
double cpu_usage_value;
981
/* look up job in job list */
984
osjob = ptf_get_job_os(ptf_jobs, jobs->jd_jid, &job);
987
u_long job_state = lGetUlong(osjob, JO_state);
991
/* fill in job completion state */
992
lSetUlong(osjob, JO_state, jobs->jd_refcnt ?
993
(job_state & ~JL_JOB_COMPLETE) : (job_state |
996
/* fill in usage for job */
997
usage_list = lGetList(osjob, JO_usage_list);
999
usage_list = ptf_build_usage_list("usagelist", NULL);
1000
lSetList(osjob, JO_usage_list, usage_list);
1004
cpu_usage_value = jobs->jd_utime_c + jobs->jd_utime_a +
1005
jobs->jd_stime_c + jobs->jd_stime_a;
1006
if ((usage = lGetElemStr(usage_list, UA_name, USAGE_ATTR_CPU))) {
1007
lSetDouble(usage, UA_value,
1008
MAX(cpu_usage_value, lGetDouble(usage, UA_value)));
1011
/* set mem usage (in GB seconds) */
1012
if ((usage = lGetElemStr(usage_list, UA_name, USAGE_ATTR_MEM))) {
1013
lSetDouble(usage, UA_value, (double) jobs->jd_mem / 1048576.0);
1016
/* set I/O usage (in GB) */
1017
if ((usage = lGetElemStr(usage_list, UA_name, USAGE_ATTR_IO))) {
1018
lSetDouble(usage, UA_value,
1019
(double) jobs->jd_chars / 1073741824.0);
1022
/* set I/O wait time */
1023
if ((usage = lGetElemStr(usage_list, UA_name, USAGE_ATTR_IOW))) {
1024
lSetDouble(usage, UA_value,
1025
(double) jobs->jd_bwtime_c + jobs->jd_bwtime_a +
1026
jobs->jd_rwtime_c + jobs->jd_rwtime_a);
1029
/* set vmem usage */
1030
if ((usage = lGetElemStr(usage_list, UA_name, USAGE_ATTR_VMEM))) {
1031
lSetDouble(usage, UA_value, jobs->jd_vmem);
1034
/* set himem usage */
1035
if ((usage = lGetElemStr(usage_list, UA_name, USAGE_ATTR_MAXVMEM))) {
1036
lSetDouble(usage, UA_value, jobs->jd_himem);
1039
/* build new pid list */
1040
proccount = jobs->jd_proccount;
1041
INCJOBPTR(jobs, jobs->jd_length);
1043
if (proccount > 0) {
1044
oldpidlist = lGetList(osjob, JO_pid_list);
1045
pidlist = lCreateList("pidlist", JP_Type);
1047
procs = (struct psProc_s *) jobs;
1048
for (j = 0; j < proccount; j++) {
1051
if (procs->pd_state == 1) {
1052
if ((pid = lGetElemUlong(oldpidlist, JP_pid,
1054
lAppendElem(pidlist, lCopyElem(pid));
1056
pid = lCreateElem(JP_Type);
1058
lSetUlong(pid, JP_pid, procs->pd_pid);
1059
lAppendElem(pidlist, pid);
1062
INCPROCPTR(procs, procs->pd_length);
1065
jobs = (struct psJob_s *)procs;
1066
lSetList(osjob, JO_pid_list, pidlist);
1068
lSetList(osjob, JO_pid_list, NULL);
1071
tid = lGetString(osjob, JO_task_id_str);
1072
DPRINTF(("JOB " sge_u32 "." sge_u32 ": %s: (cpu = %8.3lf / mem = "
1073
UINT64_FMT " / io = " UINT64_FMT " / vmem = "
1074
UINT64_FMT " / himem = " UINT64_FMT ")\n",
1075
lGetUlong(job, JL_job_ID),
1076
lGetUlong(osjob, JO_ja_task_ID), tid ? tid : "",
1077
tmp_jobs->jd_utime_c + tmp_jobs->jd_utime_a +
1078
tmp_jobs->jd_stime_c + tmp_jobs->jd_stime_a,
1079
tmp_jobs->jd_mem, tmp_jobs->jd_chars,
1080
tmp_jobs->jd_vmem, tmp_jobs->jd_himem));
1083
* NOTE: Under what conditions would DC have a job
1084
* that the PTF doesn't know about?
1086
psIgnoreJob(jobs->jd_jid);
1088
proccount = jobs->jd_proccount;
1089
INCJOBPTR(jobs, jobs->jd_length);
1090
procs = (struct psProc_s *) jobs;
1091
for (j = 0; j < proccount; j++) {
1092
INCPROCPTR(procs, procs->pd_length);
1094
jobs = (struct psJob_s *) procs;
1099
for_each(job, ptf_jobs) {
1100
double usage_value, old_usage_value;
1101
double cpu_usage_value = 0;
1102
int active_jobs = 0;
1104
for_each(osjob, lGetList(job, JL_OS_job_list)) {
1107
if ((usage = lGetElemStr(lGetList(osjob, JO_usage_list),
1108
UA_name, USAGE_ATTR_CPU))) {
1109
cpu_usage_value += lGetDouble(usage, UA_value);
1111
if (!(lGetUlong(osjob, JO_state) & JL_JOB_COMPLETE)) {
1116
/* calculate JL_usage */
1117
usage_value = cpu_usage_value;
1118
old_usage_value = lGetDouble(job, JL_old_usage_value);
1119
lSetDouble(job, JL_old_usage_value, usage_value);
1120
lSetDouble(job, JL_usage,
1121
MAX(PTF_MIN_JOB_USAGE, lGetDouble(job, JL_usage) *
1122
PTF_USAGE_DECAY_FACTOR + (usage_value - old_usage_value)));
1123
lSetDouble(job, JL_last_usage, usage_value - old_usage_value);
1127
lSetUlong(job, JL_state, lGetUlong(job, JL_state)
1137
lListElem *job, *proc, *usage;
1142
DENTER(TOP_LAYER, "ptf_get_usage_from_data_collector");
1145
for_each(job, ptf_jobs) {
1147
lListElem *osjob = lFirst(lGetList(job, JL_OS_job_list));
1149
usage_list = lGetList(osjob, JO_usage_list);
1151
usage_list = ptf_build_usage_list("usagelist", NULL);
1152
lSetList(osjob, JO_usage_list, usage_list);
1157
for_each(usage, usage_list) {
1160
value = lGetDouble(usage, UA_value);
1162
lSetDouble(usage, UA_value, value);
1165
/* calculate JL_usage */
1170
procfd = lGetUlong(job, JL_procfd);
1173
double usage_value, old_usage_value;
1175
if (ioctl(procfd, PIOCSTATUS, &pinfo) < 0) {
1176
perror("ioctl on /proc");
1180
pinfo.pr_utime.tv_sec +
1181
pinfo.pr_utime.tv_nsec / 1000000000.0L +
1182
pinfo.pr_stime.tv_sec +
1183
pinfo.pr_stime.tv_nsec / 1000000000.0L +
1184
pinfo.pr_cutime.tv_sec +
1185
pinfo.pr_cutime.tv_nsec / 1000000000.0L +
1186
pinfo.pr_cstime.tv_sec +
1187
pinfo.pr_cstime.tv_nsec / 1000000000.0L;
1189
old_usage_value = lGetDouble(job, JL_old_usage_value);
1191
lSetDouble(job, JL_old_usage_value, usage_value);
1193
lSetDouble(job, JL_usage,
1194
lGetDouble(job, JL_usage) *
1195
PTF_USAGE_DECAY_FACTOR +
1196
(usage_value - old_usage_value));
1198
lSetDouble(job, JL_last_usage, usage_value - old_usage_value);
1202
/* build a fake pid list with the OS job ID as the pid */
1204
pid_list = lGetList(osjob, JO_pid_list);
1206
pid_list = lCreateList("pidlist", JP_Type);
1207
lSetList(osjob, JO_pid_list, pid_list);
1208
proc = lCreateElem(JP_Type);
1209
lSetUlong(proc, JP_pid, lGetUlong(osjob, JO_OS_job_ID));
1210
lAppendElem(pid_list, proc);
1216
# endif /* MODULE_TEST */
1224
/*--------------------------------------------------------------------
1225
* ptf_calc_job_proporiton_pass0
1226
*--------------------------------------------------------------------*/
1228
static void ptf_calc_job_proportion_pass0(lListElem *job,
1229
u_long *sum_of_job_tickets,
1230
double *sum_of_last_usage)
1232
*sum_of_job_tickets += lGetUlong(job, JL_tickets);
1233
*sum_of_last_usage += lGetDouble(job, JL_last_usage);
1237
/*--------------------------------------------------------------------
1238
* ptf_calc_job_proportion_pass1
1239
*--------------------------------------------------------------------*/
1241
static void ptf_calc_job_proportion_pass1(lListElem *job,
1242
u_long sum_of_job_tickets,
1243
double sum_of_last_usage,
1244
double *sum_proportion)
1246
double share, job_proportion;
1249
share = ((double) lGetUlong(job, JL_tickets) / sum_of_job_tickets) *
1251
lSetDouble(job, JL_share, share);
1252
lSetDouble(job, JL_ticket_share, (double) lGetUlong(job, JL_tickets) / sum_of_job_tickets);
1255
* Calculate each jobs proportion value based on the job's tickets
1257
* job.proportion = (job.tickets/sum_of_job_tickets)^2 / job.usage
1259
u = MAX(lGetDouble(job, JL_usage), PTF_MIN_JOB_USAGE);
1260
job_proportion = share * share / u;
1261
*sum_proportion += job_proportion;
1262
lSetDouble(job, JL_proportion, job_proportion);
1266
/*--------------------------------------------------------------------
1267
* ptf_calc_job_proportion_pass2
1268
*--------------------------------------------------------------------*/
1270
static void ptf_calc_job_proportion_pass2(lListElem *job,
1271
u_long sum_of_job_tickets,
1272
double sum_proportion, double
1273
*sum_adjusted_proportion, double
1274
*sum_last_interval_usage)
1276
double job_current_proportion = 0;
1277
double compensate_proportion;
1278
double job_targetted_proportion, job_adjusted_usage, job_adjusted_proportion, share;
1280
job_targetted_proportion = (double) lGetUlong(job, JL_tickets) /
1283
if (sum_proportion > 0) {
1284
job_current_proportion = lGetDouble(job, JL_proportion) / sum_proportion;
1287
compensate_proportion = PTF_COMPENSATION_FACTOR * job_targetted_proportion;
1288
if (job_current_proportion > compensate_proportion) {
1289
job_adjusted_usage = lGetDouble(job, JL_usage) *
1290
(job_current_proportion / compensate_proportion);
1292
job_adjusted_usage = lGetDouble(job, JL_usage);
1294
lSetDouble(job, JL_adjusted_usage, job_adjusted_usage);
1297
* Recalculate proportions based on adjusted job usage
1299
share = lGetDouble(job, JL_share);
1300
job_adjusted_usage = MAX(job_adjusted_usage, PTF_MIN_JOB_USAGE);
1301
job_adjusted_proportion = share * share / job_adjusted_usage;
1302
*sum_adjusted_proportion += job_adjusted_proportion;
1303
lSetDouble(job, JL_adjusted_proportion, job_adjusted_proportion);
1306
/*--------------------------------------------------------------------
1307
* ptf_calc_job_proportion_pass3
1308
*--------------------------------------------------------------------*/
1310
static void ptf_calc_job_proportion_pass3(lListElem *job,
1311
double sum_adjusted_proportion,
1312
double sum_last_interval_usage,
1313
double *min_share, double *max_share,
1314
double *max_ticket_share)
1316
double job_adjusted_current_proportion = 0;
1318
if (sum_adjusted_proportion > 0)
1319
job_adjusted_current_proportion =
1320
lGetDouble(job, JL_adjusted_proportion) / sum_adjusted_proportion;
1322
lSetDouble(job, JL_last_proportion,
1323
lGetDouble(job, JL_adjusted_current_proportion));
1325
lSetDouble(job, JL_adjusted_current_proportion,
1326
job_adjusted_current_proportion);
1328
*max_share = MAX(*max_share, job_adjusted_current_proportion);
1329
*min_share = MIN(*min_share, job_adjusted_current_proportion);
1330
*max_ticket_share = MAX(*max_ticket_share, lGetDouble(job, JL_ticket_share));
1333
DPRINTF(("XXXXXXX minshare: %f, max_share: %f XXXXXXX\n", *min_share,
1338
/*--------------------------------------------------------------------
1339
* ptf_set_OS_scheduling_parameters
1341
* This function assumes the the "max" priority is smaller than the "min"
1343
*--------------------------------------------------------------------*/
1344
static void ptf_set_OS_scheduling_parameters(lList *job_list, double min_share,
1346
double max_ticket_share)
1349
static long pri_range, pri_min = -999, pri_max = -999;
1350
long pri_min_tmp, pri_max_tmp;
1353
DENTER(TOP_LAYER, "ptf_set_OS_scheduling_parameters");
1356
pri_min_tmp = mconf_get_ptf_min_priority();
1357
if (pri_min_tmp == -999) {
1358
pri_min_tmp = PTF_MIN_PRIORITY;
1361
pri_max_tmp = mconf_get_ptf_max_priority();
1362
if (pri_max_tmp == -999) {
1363
pri_max_tmp = PTF_MAX_PRIORITY;
1367
* For OS'es where we enforce the values of max and min priority verify
1368
* the the values are in the boundaries of PTF_OS_MAX_PRIORITY and
1369
* PTF_OS_MIN_PRIORITY.
1371
#if ENFORCE_PRI_RANGE
1372
pri_max_tmp = MAX(pri_max_tmp, PTF_OS_MAX_PRIORITY);
1373
pri_min_tmp = MIN(pri_min_tmp, PTF_OS_MIN_PRIORITY);
1377
* Ensure that the max priority can't get a bigger value than
1378
* the min priority (otherwise the "range" gets wrongly calculated
1380
if (pri_max_tmp > pri_min_tmp) {
1381
pri_max_tmp = pri_min_tmp;
1384
/* If the value has changed set pri_max/pri_min/pri_range and log */
1385
if (pri_max != pri_max_tmp || pri_min != pri_min_tmp) {
1386
u_long32 old_ll = log_state_get_log_level();
1388
pri_max = pri_max_tmp;
1389
pri_min = pri_min_tmp;
1390
pri_range = pri_min - pri_max;
1392
log_state_set_log_level(LOG_INFO);
1393
INFO((SGE_EVENT, MSG_PRIO_PTFMINMAX_II, (int) pri_max, (int) pri_min));
1394
log_state_set_log_level(old_ll);
1397
/* Set the priority for each job */
1398
for_each(job, job_list) {
1400
pri = pri_max + (long)(pri_range * (1.0 -
1401
lGetDouble(job, JL_adjusted_current_proportion)));
1404
* Note: Should calculate targetted proportion and if it is below
1405
* a certain % then set a background priority. This is because
1406
* nice seems to always give at least some minimal % to all
1411
pri = pri_max + (pri_range * (lGetDouble(job, JL_curr_pri) / max_share));
1414
if (max_share > 0) {
1415
pri = pri_max + (long)(pri_range * ((lGetDouble(job, JL_curr_pri)
1416
- min_share) / max_share));
1421
if (pri_min > 50 && pri_max > 50) {
1422
if (max_ticket_share > 0) {
1423
lSetDouble(job, JL_timeslice,
1424
MAX(pri_max, lGetDouble(job, JL_ticket_share) *
1425
pri_min / max_ticket_share));
1427
lSetDouble(job, JL_timeslice, pri_min);
1430
lSetLong(job, JL_pri, pri);
1431
ptf_set_job_priority(job);
1438
/*--------------------------------------------------------------------
1439
* ptf_job_started - process new job
1440
*--------------------------------------------------------------------*/
1441
int ptf_job_started(osjobid_t os_job_id, const char *task_id_str,
1442
lListElem *new_job, u_long32 jataskid)
1444
DENTER(TOP_LAYER, "ptf_job_started");
1447
* Add new job to job list
1449
ptf_process_job(os_job_id, task_id_str, new_job, jataskid);
1452
* Tell data collector to start collecting data for this job
1455
if (os_job_id > 0) {
1456
psWatchJob(os_job_id);
1461
if (os_job_id > 0) {
1465
sprintf(fname, "/proc/%05d", os_job_id);
1467
fd = open(fname, O_RDONLY);
1469
fprintf(stderr, "opening of %s failed\n", fname);
1471
lSetUlong(job, JL_procfd, fd);
1481
/*--------------------------------------------------------------------
1482
* ptf_job_complete - process completed job
1483
*--------------------------------------------------------------------*/
1485
int ptf_job_complete(u_long32 job_id, u_long32 ja_task_id, const char *pe_task_id, lList **usage)
1487
lListElem *ptf_job, *osjob;
1490
DENTER(TOP_LAYER, "ptf_job_complete");
1492
ptf_job = ptf_get_job(job_id);
1494
if (ptf_job == NULL) {
1495
DRETURN(PTF_ERROR_JOB_NOT_FOUND);
1498
osjobs = lGetList(ptf_job, JL_OS_job_list);
1501
* if job is not complete, go get latest job usage info
1503
if (!(lGetUlong(ptf_job, JL_state) & JL_JOB_COMPLETE)) {
1504
sge_switch2start_user();
1505
ptf_get_usage_from_data_collector();
1506
sge_switch2admin_user();
1510
* Get usage for completed job
1511
* (If this for a sub-task then we return the sub-task usage)
1513
*usage = _ptf_get_job_usage(ptf_job, ja_task_id, pe_task_id);
1515
/* Search ja/pe ptf task */
1516
if (pe_task_id == NULL) {
1517
osjob = lFirst(osjobs);
1519
for_each(osjob, osjobs) {
1520
if (lGetUlong(osjob, JO_ja_task_ID) == ja_task_id) {
1521
const char *osjob_pe_task_id = lGetString(osjob, JO_task_id_str);
1523
if (osjob_pe_task_id != NULL &&
1524
strcmp(pe_task_id, osjob_pe_task_id) == 0) {
1531
if (osjob == NULL) {
1532
DRETURN(PTF_ERROR_JOB_NOT_FOUND);
1536
* Mark osjob as complete and see if all tracked osjobs are done
1537
* Tell data collector to stop collecting data for this job
1539
lSetUlong(osjob, JO_state, lGetUlong(osjob, JO_state) | JL_JOB_DELETED);
1541
psIgnoreJob(ptf_get_osjobid(osjob));
1548
int procfd = lGetUlong(ptf_job, JL_procfd);
1557
* Remove job/task from job/task list
1560
DPRINTF(("PTF: Removing job " sge_u32 "." sge_u32 ", petask %s\n",
1561
job_id, ja_task_id, pe_task_id == NULL ? "none" : pe_task_id));
1562
lRemoveElem(osjobs, &osjob);
1564
if (lGetNumberOfElem(osjobs) == 0) {
1565
DPRINTF(("PTF: Removing job\n"));
1566
lRemoveElem(ptf_jobs, &ptf_job);
1573
/*--------------------------------------------------------------------
1574
* ptf_process_job_ticket_list - Process the job ticket list sent
1575
* from the SGE scheduler.
1576
*--------------------------------------------------------------------*/
1578
int ptf_process_job_ticket_list(lList *job_ticket_list)
1580
lListElem *jte, *job;
1582
DENTER(TOP_LAYER, "ptf_process_job_ticket_list");
1585
* Update the job entries in the job list with the number of
1586
* tickets from the job ticket list. Reset the usage to the
1587
* minimum usage value.
1589
for_each(jte, job_ticket_list) {
1592
* set JB_script_file because we don't know if this is
1593
* an interactive job
1595
lSetString(jte, JB_script_file, "dummy");
1597
job = ptf_process_job(0, NULL, jte,
1598
lGetUlong(lFirst(lGetList(jte, JB_ja_tasks)),
1601
/* reset temporary usage and priority */
1602
lSetDouble(job, JL_usage, MAX(PTF_MIN_JOB_USAGE,
1603
lGetDouble(job, JL_usage) * 0.1));
1605
lSetDouble(job, JL_curr_pri, 0);
1613
void ptf_update_job_usage()
1615
DENTER(TOP_LAYER, "ptf_update_job_usage");
1617
ptf_get_usage_from_data_collector();
1622
/*--------------------------------------------------------------------
1623
* ptf_adjust_job_priorities - routine to adjust the priorities of
1624
* executing jobs. Called whenever the PTF interval timer expires.
1625
*--------------------------------------------------------------------*/
1627
int ptf_adjust_job_priorities(void)
1629
static u_long32 next = 0;
1631
lList *job_list, *pid_list;
1632
lListElem *job, *osjob;
1633
u_long sum_of_job_tickets = 0;
1635
double sum_proportion = 0;
1636
double sum_adjusted_proportion = 0;
1637
double min_share = 100.0;
1638
double max_share = 0;
1639
double max_ticket_share = 0;
1640
double sum_interval_usage = 0;
1641
double sum_of_last_usage = 0;
1643
DENTER(TOP_LAYER, "ptf_adjust_job_priorities");
1645
if ((now = sge_get_gmt()) < next) {
1649
job_list = ptf_jobs;
1652
* Do pass 0 of calculating job proportional share of resources
1654
for_each(job, job_list) {
1655
ptf_calc_job_proportion_pass0(job, &sum_of_job_tickets,
1656
&sum_of_last_usage);
1658
if (sum_of_job_tickets == 0) {
1659
sum_of_job_tickets = 1;
1663
* Do pass 1 of calculating job proportional share of resources
1665
for_each(job, job_list) {
1666
ptf_calc_job_proportion_pass1(job, sum_of_job_tickets,
1667
sum_of_last_usage, &sum_proportion);
1671
* Do pass 2 of calculating job proportional share of resources
1673
for_each(job, job_list) {
1674
ptf_calc_job_proportion_pass2(job, sum_of_job_tickets,
1676
&sum_adjusted_proportion,
1677
&sum_interval_usage);
1681
* Do pass 3 of calculating job proportional share of resources
1683
for_each(job, job_list) {
1684
ptf_calc_job_proportion_pass3(job, sum_proportion,
1685
sum_interval_usage, &min_share,
1686
&max_share, &max_ticket_share);
1692
for_each(job, job_list) {
1696
* calculate share based on tickets only
1698
shr = lGetDouble(job, JL_share);
1701
for_each(osjob, lGetList(job, JL_OS_job_list)) {
1702
if ((pid_list = lGetList(osjob, JO_pid_list))) {
1703
num_procs += lGetNumberOfElem(pid_list);
1706
num_procs = MAX(1, num_procs);
1709
* NOTE: share algo only adjusts priority when a process runs
1711
lSetDouble(job, JL_curr_pri, lGetDouble(job, JL_curr_pri)
1712
+ ((lGetDouble(job, JL_adjusted_usage) * num_procs)
1715
max_share = MAX(max_share, lGetDouble(job, JL_curr_pri));
1716
if (min_share < 0) {
1717
min_share = lGetDouble(job, JL_curr_pri);
1719
min_share = MIN(min_share, lGetDouble(job, JL_curr_pri));
1724
* Set the O.S. scheduling parameters for the jobs
1726
ptf_set_OS_scheduling_parameters(job_list, min_share, max_share,
1729
next = now + PTF_SCHEDULE_TIME;
1735
/*--------------------------------------------------------------------
1736
* ptf_get_job_usage - routine to return a single usage list for the
1738
*--------------------------------------------------------------------*/
1739
lList *ptf_get_job_usage(u_long job_id, u_long ja_task_id,
1740
const char *task_id)
1742
return _ptf_get_job_usage(ptf_get_job(job_id), ja_task_id, task_id);
1745
static lList *_ptf_get_job_usage(lListElem *job, u_long ja_task_id,
1746
const char *task_id)
1748
lListElem *osjob, *usrc, *udst;
1749
lList *job_usage = NULL;
1750
const char *task_id_str;
1756
for_each(osjob, lGetList(job, JL_OS_job_list)) {
1757
task_id_str = lGetString(osjob, JO_task_id_str);
1759
if ((((!task_id || !task_id[0]) && !task_id_str)
1760
|| (task_id && !strcmp(task_id, "*"))
1761
|| (task_id && task_id_str && !strcmp(task_id, task_id_str)))
1762
&& (lGetUlong(osjob, JO_ja_task_ID) == ja_task_id)) {
1765
for_each(usrc, lGetList(osjob, JO_usage_list)) {
1766
if ((udst = lGetElemStr(job_usage, UA_name,
1767
lGetString(usrc, UA_name)))) {
1768
lSetDouble(udst, UA_value, lGetDouble(udst, UA_value)
1769
+ lGetDouble(usrc, UA_value));
1771
lAppendElem(job_usage, lCopyElem(usrc));
1775
job_usage = lCopyList(NULL, lGetList(osjob, JO_usage_list));
1782
/*--------------------------------------------------------------------
1783
* ptf_get_usage - routine to return the current job usage for
1784
* all executing jobs. Returns a job list with the job_ID and usage
1785
* filled in. A separate job entry is returned for each tracked
1787
*--------------------------------------------------------------------*/
1789
int ptf_get_usage(lList **job_usage_list)
1791
lList *job_list, *temp_usage_list;
1792
lListElem *job, *osjob;
1795
DENTER(TOP_LAYER, "ptf_get_usage");
1797
what = lWhat("%T(%I %I)", JB_Type, JB_job_number, JB_ja_tasks);
1799
temp_usage_list = lCreateList("PtfJobUsageList", JB_Type);
1800
job_list = ptf_jobs;
1801
for_each(job, job_list) {
1802
lListElem *tmp_job = NULL;
1803
u_long32 job_id = lGetUlong(job, JL_job_ID);
1805
for_each(osjob, lGetList(job, JL_OS_job_list)) {
1806
lListElem *tmp_ja_task;
1807
lListElem *tmp_pe_task;
1808
u_long32 ja_task_id = lGetUlong(osjob, JO_ja_task_ID);
1809
const char *pe_task_id = lGetString(osjob, JO_task_id_str);
1811
if (lGetUlong(osjob, JO_state) & JL_JOB_DELETED) {
1815
tmp_job = job_list_locate(temp_usage_list, job_id);
1816
if (tmp_job == NULL) {
1817
tmp_job = lCreateElem(JB_Type);
1818
lSetUlong(tmp_job, JB_job_number, job_id);
1819
lAppendElem(temp_usage_list, tmp_job);
1822
tmp_ja_task = job_search_task(tmp_job, NULL, ja_task_id);
1823
if (tmp_ja_task == NULL) {
1824
tmp_ja_task = lAddSubUlong(tmp_job, JAT_task_number, ja_task_id, JB_ja_tasks, JAT_Type);
1827
if (pe_task_id != NULL) {
1828
tmp_pe_task = ja_task_search_pe_task(tmp_ja_task, pe_task_id);
1829
if (tmp_pe_task == NULL) {
1830
tmp_pe_task = lAddSubStr(tmp_ja_task, PET_id, pe_task_id, JAT_task_list, PET_Type);
1832
lSetList(tmp_pe_task, PET_usage, lCopyList(NULL, lGetList(osjob, JO_usage_list)));
1834
lSetList(tmp_ja_task, JAT_usage_list, lCopyList(NULL, lGetList(osjob, JO_usage_list)));
1836
} /* for each osjob */
1837
} /* for each ptf job */
1839
*job_usage_list = lSelect("PtfJobUsageList", temp_usage_list, NULL, what);
1841
lFreeList(&temp_usage_list);
1848
/*--------------------------------------------------------------------
1849
* ptf_init - initialize the Priority Translation Facility
1850
*--------------------------------------------------------------------*/
1854
DENTER(TOP_LAYER, "ptf_init");
1857
ptf_jobs = lCreateList("ptf_job_list", JL_Type);
1858
if (ptf_jobs == NULL) {
1863
sge_switch2start_user();
1864
if (psStartCollector()) {
1865
sge_switch2admin_user();
1870
schedctl(RENICE, 0, 0);
1873
if (getuid() == 0) {
1876
if ((nice = nicem(C_PGRP, 0, 0)) > 0) {
1877
if (nicem(C_PGRP, 0, 0 - nice) < 0) {
1878
ERROR((SGE_EVENT, MSG_PRIO_NICEMFAILED_S, strerror(errno)));
1882
#elif defined(NECSX5) || defined(NECSX6)
1884
if (getuid() == 0) {
1885
if (nicex(0, -10) == -1) {
1886
ERROR((SGE_EVENT, MSG_PRIO_NICEMFAILED_S, strerror(errno)));
1890
#elif defined(ALPHA) || defined(SOLARIS) || defined(LINUX) || defined(FREEBSD) || defined(DARWIN)
1891
if (getuid() == 0) {
1892
if (setpriority(PRIO_PROCESS, getpid(), PTF_MAX_PRIORITY) < 0) {
1893
ERROR((SGE_EVENT, MSG_PRIO_SETPRIOFAILED_S, strerror(errno)));
1897
sge_switch2admin_user();
1902
void ptf_start(void)
1904
DENTER(TOP_LAYER, "ptf_start");
1905
if (!is_ptf_running) {
1914
DENTER(TOP_LAYER, "ptf_stop");
1915
if (is_ptf_running) {
1916
ptf_unregister_registered_jobs();
1921
lFreeList(&ptf_jobs);
1925
void ptf_show_registered_jobs(void)
1928
lListElem *job_elem;
1930
DENTER(TOP_LAYER, "ptf_show_registered_jobs");
1932
job_list = ptf_jobs;
1933
for_each(job_elem, job_list) {
1937
DPRINTF(("\tjob id: " sge_u32 "\n", lGetUlong(job_elem, JL_job_ID)));
1938
os_job_list = lGetList(job_elem, JL_OS_job_list);
1939
for_each(os_job, os_job_list) {
1940
lList *process_list;
1942
const char *pe_task_id_str;
1943
u_long32 ja_task_id;
1945
pe_task_id_str = lGetString(os_job, JO_task_id_str);
1946
pe_task_id_str = pe_task_id_str ? pe_task_id_str : "<null>";
1947
ja_task_id = lGetUlong(os_job, JO_ja_task_ID);
1949
DPRINTF(("\t\tosjobid: "sge_u32" ja_task_id: "sge_u32" petaskid: %s\n",
1950
lGetUlong(os_job, JO_OS_job_ID), ja_task_id,
1952
process_list = lGetList(os_job, JO_pid_list);
1953
for_each(process, process_list) {
1956
pid = lGetUlong(process, JP_pid);
1957
DPRINTF(("\t\t\tpid: " sge_u32 "\n", pid));
1964
void ptf_unregister_registered_job(u_long32 job_id, u_long32 ja_task_id ) {
1966
lListElem *next_job;
1968
DENTER(TOP_LAYER, "ptf_unregister_registered_job");
1970
next_job = lFirst(ptf_jobs);
1971
while ((job = next_job)) {
1974
lListElem *next_os_job;
1976
next_job = lNext(job);
1978
if (lGetUlong(job, JL_job_ID) == job_id) {
1979
DPRINTF(("PTF: found job id "sge_U32CFormat"\n", job_id));
1980
os_job_list = lGetList(job, JL_OS_job_list);
1981
next_os_job = lFirst(os_job_list);
1982
while ((os_job = next_os_job)) {
1983
next_os_job = lNext(os_job);
1984
if (lGetUlong(os_job, JO_ja_task_ID ) == ja_task_id) {
1985
DPRINTF(("PTF: found job task id "sge_U32CFormat"\n", ja_task_id));
1986
psIgnoreJob(ptf_get_osjobid(os_job));
1987
DPRINTF(("PTF: Notify PDC to remove data for osjobid " sge_u32 "\n",
1988
lGetUlong(os_job, JO_OS_job_ID)));
1989
lRemoveElem(os_job_list, &os_job);
1993
if (lFirst(os_job_list) == NULL) {
1994
DPRINTF(("PTF: No more os_job_list entries, removing job\n"));
1995
DPRINTF(("PTF: Removing job " sge_u32 "\n", lGetUlong(job, JL_job_ID)));
1996
lRemoveElem(ptf_jobs, &job);
2003
void ptf_unregister_registered_jobs(void)
2007
DENTER(TOP_LAYER, "ptf_unregister_registered_jobs");
2009
for_each(job, ptf_jobs) {
2011
for_each(os_job, lGetList(job, JL_OS_job_list)) {
2012
psIgnoreJob(ptf_get_osjobid(os_job));
2013
DPRINTF(("PTF: Notify PDC to remove data for osjobid " sge_u32 "\n",
2014
lGetUlong(os_job, JO_OS_job_ID)));
2018
lFreeList(&ptf_jobs);
2019
DPRINTF(("PTF: All jobs unregistered from PTF\n"));
2023
int ptf_is_running(void)
2025
return is_ptf_running;
2028
/*--------------------------------------------------------------------
2029
* ptf_errstr - return PTF error string
2030
*--------------------------------------------------------------------*/
2032
const char *ptf_errstr(int ptf_error_code)
2034
const char *errmsg = MSG_ERROR_UNKNOWNERRORCODE;
2036
DENTER(TOP_LAYER, "ptf_errstr");
2038
switch (ptf_error_code) {
2039
case PTF_ERROR_NONE:
2040
errmsg = MSG_ERROR_NOERROROCCURED;
2043
case PTF_ERROR_INVALID_ARGUMENT:
2044
errmsg = MSG_ERROR_INVALIDARGUMENT;
2047
case PTF_ERROR_JOB_NOT_FOUND:
2048
errmsg = MSG_ERROR_JOBDOESNOTEXIST;
2060
#define TESTJOB "./cpu_bound"
2062
int main(int argc, char **argv)
2066
lList *job_ticket_list, *job_usage_list;
2067
lListElem *jte, *job;
2069
osjobid_t os_job_id = 100;
2071
#ifdef __SGE_COMPILE_WITH_GETTEXT__
2072
/* init language output for gettext() , it will use the right language */
2073
sge_init_language_func((gettext_func_type) gettext,
2074
(setlocale_func_type) setlocale,
2075
(bindtextdomain_func_type) bindtextdomain,
2076
(textdomain_func_type) textdomain);
2077
sge_init_language(NULL, NULL);
2078
#endif /* __SGE_COMPILE_WITH_GETTEXT__ */
2081
#if defined(ALPHA) && defined(notdef)
2095
/* build job ticket list */
2097
job_ticket_list = lCreateList("jobticketlist", JB_Type);
2100
jte = lCreateElem(JB_Type);
2101
lSetUlong(jte, JB_job_number, ++job_id);
2102
lSetUlong(jte, JB_ticket, 100);
2103
lSetString(jte, JB_script_file, TESTJOB);
2104
lAppendElem(job_ticket_list, jte);
2106
jte = lCreateElem(JB_Type);
2107
lSetUlong(jte, JB_job_number, ++job_id);
2108
lSetUlong(jte, JB_ticket, 500);
2109
lSetString(jte, JB_script_file, TESTJOB);
2110
lAppendElem(job_ticket_list, jte);
2112
jte = lCreateElem(JB_Type);
2113
lSetUlong(jte, JB_job_number, ++job_id);
2114
lSetUlong(jte, JB_ticket, 300);
2115
lSetString(jte, JB_script_file, TESTJOB);
2116
lAppendElem(job_ticket_list, jte);
2118
jte = lCreateElem(JB_Type);
2119
lSetUlong(jte, JB_job_number, ++job_id);
2120
lSetUlong(jte, JB_ticket, 100);
2121
lSetString(jte, JB_script_file, TESTJOB);
2122
lAppendElem(job_ticket_list, jte);
2126
for (argn = 1; argn < argc; argn++) {
2127
int tickets = atoi(argv[argn]);
2131
jte = lCreateElem(JB_Type);
2132
lSetUlong(jte, JB_job_number, ++job_id);
2133
lSetUlong(jte, JB_ticket, tickets);
2134
lSetString(jte, JB_script_file, TESTJOB);
2135
lAppendElem(job_ticket_list, jte);
2139
/* Start jobs and tell PTF jobs have started */
2141
for_each(jte, job_ticket_list) {
2146
if (newarraysess() < 0) {
2147
perror("newarraysess");
2151
os_job_id = getash();
2152
printf(MSG_JOB_THEASHFORJOBXISY_DX,
2153
sge_u32c(lGetUlong(jte, JB_job_number)), u64c(os_job_id));
2159
char *jobname = lGetString(jte, JB_script_file);
2161
/* schedctl(NDPRI, 0, 0); */
2168
execl(jobname, jobname, NULL);
2175
ptf_job_started(os_job_id, jte, 0);
2181
if (newarraysess() < 0) {
2182
perror("newarraysess");
2185
printf("My ash is "u64"\n", u64c(getash()));
2189
ptf_process_job_ticket_list(job_ticket_list);
2193
if (!(f = fdopen(1, "w"))) {
2194
fprintf(stderr, MSG_ERROR_COULDNOTOPENSTDOUTASFILE);
2198
if (lDumpList(f, ptf_jobs, 0) == EOF) {
2199
fprintf(stderr, MSG_ERROR_UNABLETODUMPJOBLIST);
2205
ptf_get_usage(&job_usage_list);
2207
for (i = 0; i < 100; i++) {
2208
u_long sum_of_tickets = 0;
2209
double sum_of_usage = 0;
2210
double sum_of_last_usage = 0;
2212
/* adjust priorities */
2214
ptf_adjust_job_priorities();
2216
for_each(job, ptf_jobs) {
2217
sum_of_tickets += lGetUlong(job, JL_tickets);
2218
sum_of_usage += lGetDouble(job, JL_usage);
2219
sum_of_last_usage += lGetDouble(job, JL_last_usage);
2222
# if defined(CRAY) || defined(ALPHA)
2223
# define XFMT "%20d"
2225
# define XFMT "%20lld"
2228
puts(" adj total curr"
2230
puts("job_id os_job_id tickets usage usage usage"
2231
" target% actual% actual% target% diff% curr_pri pri");
2232
for_each(job, ptf_jobs) {
2233
printf("%6d " XFMT " %7d %8.3lf %8.3lf %8.3lf %8.3lf"
2234
" %8.3lf %8.3lf %8.3lf %8.3lf %8.3lf %5d\n",
2235
lGetUlong(job, JL_job_ID), ptf_get_osjobid(job),
2236
lGetUlong(job, JL_tickets), lGetDouble(job, JL_usage),
2237
lGetDouble(job, JL_adjusted_usage),
2238
lGetDouble(job, JL_last_usage),
2239
sum_of_tickets ? lGetUlong(job, JL_tickets) /
2240
(double) sum_of_tickets : 0,
2241
sum_of_usage ? lGetDouble(job, JL_usage) / sum_of_usage : 0,
2242
sum_of_last_usage ? lGetDouble(job, JL_last_usage) /
2243
sum_of_last_usage : 0,
2244
lGetDouble(job, JL_adjusted_current_proportion),
2245
lGetDouble(job, JL_diff_proportion),
2246
lGetDouble(job, JL_curr_pri), lGetLong(job, JL_pri));
2253
if (!(f = fdopen(1, "w"))) {
2254
fprintf(stderr, MSG_ERROR_COULDNOTOPENSTDOUTASFILE);
2258
if (lDumpList(f, ptf_jobs, 0) == EOF) {
2259
fprintf(stderr, MSG_ERROR_UNABLETODUMPJOBLIST);
2263
/* dump job usage list */
2265
if (lDumpList(f, job_usage_list, 0) == EOF) {
2266
fprintf(stderr, "%s\n", MSG_ERROR_UNABLETODUMPJOBUSAGELIST);
2274
for_each(job, ptf_jobs) {
2275
ptf_kill(ptf_get_jobid(job), SIGTERM);
2285
#endif /* MODULE_TEST */