89
89
* job_set_wait() - set event for when job's wait time ends
90
90
* get_variable() - get value of a single environ variable of a job
91
91
* prefix_std_file() - build the fully prefixed default name for std e/o
92
* add_std_filename() - add the default name for std e/o
93
92
* get_jobowner() - get job owner name without @host suffix
94
93
* set_resc_deflt() - set unspecified resource_limit to default values
95
94
* set_statechar() - set the job state attribute character value
97
96
* Private functions
98
97
* chk_svr_resc_limit() - check job requirements againt queue/server limits
99
* default_std() - make the default name for standard out/error
100
* eval_chkpnt() - insure job checkpoint .ge. queues min. time
98
* default_std() - make the default name for standard out/error
99
* eval_checkpoint() - insure job checkpoint .ge. queues min. time
101
100
* set_deflt_resc() - set unspecified resource_limit to default values
102
101
* job_wait_over() - event handler for job_set_wait()
250
249
"RERUN1", /* job is rerun, stageout phase */
251
250
"RERUN2", /* job is rerun, delete files stage */
252
251
"RERUN3", /* job is rerun, mom delete job */
258
"RETURNSTD", /* returning stderr/stdout files to server spool */
350
356
/* link first in list */
352
insert_link(&pque->qu_jobs,&pjob->ji_jobque,pjob,LINK_INSET_AFTER);
358
insert_link(&pque->qu_jobs, &pjob->ji_jobque, pjob, LINK_INSET_AFTER);
356
362
/* link after 'current' job in list */
358
insert_link(&pjcur->ji_jobque,&pjob->ji_jobque,pjob,LINK_INSET_AFTER);
364
insert_link(&pjcur->ji_jobque, &pjob->ji_jobque, pjob, LINK_INSET_AFTER);
361
367
/* update counts: queue and queue by state */
387
393
/* issue enqueued accounting record */
389
sprintf(log_buffer,"queue=%s",
395
sprintf(log_buffer, "queue=%s",
390
396
pque->qu_qs.qu_name);
392
account_record(PBS_ACCT_QUEUE,pjob,log_buffer);
398
account_record(PBS_ACCT_QUEUE, pjob, log_buffer);
415
421
/* check the job checkpoint against the queue's min */
418
&pjob->ji_wattr[(int)JOB_ATR_chkpnt],
419
&pque->qu_attr[(int)QE_ATR_ChkptMim]);
424
&pjob->ji_wattr[(int)JOB_ATR_checkpoint],
425
&pque->qu_attr[(int)QE_ATR_checkpoint_min]);
421
427
/* do anything needed doing regarding job dependencies */
439
445
pjob->ji_wattr[(int)JOB_ATR_etime].at_val.at_long = time_now;
440
446
pjob->ji_wattr[(int)JOB_ATR_etime].at_flags |= ATR_VFLAG_SET;
442
/* notify the scheduler we have a new job */
444
svr_do_schedule = SCH_SCHEDULE_NEW;
449
/* notify the scheduler we have a new job */
450
svr_do_schedule = SCH_SCHEDULE_NEW;
447
452
else if (pque->qu_qs.qu_type == QTYPE_RoutePush)
775
* get_resource - find a resource (value) entry in the queue or server list
777
* Returns: pointer to struct resource or NULL
780
resource *get_resource(
782
attribute *p_queattr, /* I */
783
attribute *p_svrattr, /* I */
784
resource_def *rscdf, /* I */
785
int *fromQueue) /* O */
790
pr = find_resc_entry(p_queattr, rscdf);
792
if ((pr == NULL) || ((pr->rs_value.at_flags & ATR_VFLAG_SET) == 0))
794
/* queue limit not set, check server's */
796
pr = find_resc_entry(p_svrattr, rscdf);
798
if ((pr != NULL) && (pr->rs_value.at_flags & ATR_VFLAG_SET))
805
/* queue limit is set, use it */
811
} /* END get_resource() */
770
817
* compare the job resource limit against the system limit
771
818
* unless a queue limit exists, it takes priority
801
847
int MPPWidth = 0;
853
resource *mppnodect_resource = NULL;
804
855
static resource_def *noderesc = NULL;
805
856
static resource_def *needresc = NULL;
806
857
static resource_def *nodectresc = NULL;
807
858
static resource_def *mppwidthresc = NULL;
859
static resource_def *mppnppn = NULL;
809
861
static time_t UpdateTime = 0;
810
862
static time_t now;
823
875
/* NOTE: to optimize, only update once per 30 seconds */
825
noderesc = find_resc_def(svr_resc_def,"nodes", svr_resc_size);
826
needresc = find_resc_def(svr_resc_def,"neednodes", svr_resc_size);
827
nodectresc = find_resc_def(svr_resc_def,"nodect", svr_resc_size);
828
mppwidthresc = find_resc_def(svr_resc_def,"mppwidth", svr_resc_size);
877
noderesc = find_resc_def(svr_resc_def, "nodes", svr_resc_size);
878
needresc = find_resc_def(svr_resc_def, "neednodes", svr_resc_size);
879
nodectresc = find_resc_def(svr_resc_def, "nodect", svr_resc_size);
880
mppwidthresc = find_resc_def(svr_resc_def, "mppwidth", svr_resc_size);
881
mppnppn = find_resc_def(svr_resc_def, "mppnppn", svr_resc_size);
868
921
if ((jbrc->rs_value.at_flags & (ATR_VFLAG_SET | ATR_VFLAG_DEFLT)) == ATR_VFLAG_SET)
870
qurc = find_resc_entry(queatr, jbrc->rs_defin);
923
qurc = find_resc_entry(&pque->qu_attr[QA_ATR_ResourceMax], jbrc->rs_defin);
872
LimitIsFromQueue = 0;
873
925
LimitName = jbrc->rs_defin->rs_name;
875
if ((qurc == NULL) || ((qurc->rs_value.at_flags & ATR_VFLAG_SET) == 0))
927
cmpwith = get_resource(&pque->qu_attr[QA_ATR_ResourceMax],
928
&server.sv_attr[SRV_ATR_ResourceMax],
932
if (strcmp(LimitName,"mppnppn") == 0)
877
/* queue limit not set, check server's */
879
svrc = find_resc_entry(svratr, jbrc->rs_defin);
881
if ((svrc != NULL) && (svrc->rs_value.at_flags & ATR_VFLAG_SET))
934
mpp_nppn = jbrc->rs_value.at_val.at_long;
936
if (strcmp(LimitName,"mppwidth") == 0)
888
/* queue limit is set, use it */
890
LimitIsFromQueue = 1;
938
mpp_width = jbrc->rs_value.at_val.at_long;
895
941
if ((jbrc->rs_defin == noderesc) && (qtype == QTYPE_Execution))
897
943
/* NOTE: process after loop so SvrNodeCt is loaded */
944
/* can check pure nodes violation right here */
899
948
jbrc_nodes = jbrc;
949
if ((jbrc_nodes != NULL) &&
952
if ((isdigit(*(jbrc_nodes->rs_value.at_val.at_str))) &&
953
(isdigit(*(qurc->rs_value.at_val.at_str))))
955
job_nodes = atoi(jbrc_nodes->rs_value.at_val.at_str);
956
queue_nodes = atoi(qurc->rs_value.at_val.at_str);
958
if (queue_nodes < job_nodes)
911
973
#endif /* NERSCDEV */
974
else if ((strcmp(LimitName,"mppnodect") == 0)
975
&& (jbrc->rs_value.at_val.at_long == -1))
978
* mppnodect is a special attrtibute, It gets set based upon the
979
* values of mppwidth and mppnppn. -1 signifies the case where mppwidth
980
* and mppnppn were not both specified for the job. We will need to
981
* check mppnodect limits against queue/server defaults, if any.
984
mppnodect_resource = jbrc;
912
986
else if ((cmpwith != NULL) && (jbrc->rs_defin != needresc))
914
988
/* don't check neednodes */
926
if ((EMsg != NULL) && (EMsg[0] == '\0'))
1001
* is_transit flag is not set
1002
* or is_transit is set, but not to true
1003
* or the value comes from queue limit
1005
if ((!(pque->qu_attr[(int)QE_ATR_is_transit].at_flags & ATR_VFLAG_SET)) ||
1006
(!pque->qu_attr[(int)QE_ATR_is_transit].at_val.at_long) ||
928
sprintf(EMsg,"cannot satisfy %s max %s requirement",
929
(LimitIsFromQueue == 1) ? "queue" : "server",
930
(LimitName != NULL) ? LimitName : "resource");
1009
if ((EMsg != NULL) && (EMsg[0] == '\0'))
1011
sprintf(EMsg, "cannot satisfy %s max %s requirement",
1012
(LimitIsFromQueue == 1) ? "queue" : "server",
1013
(LimitName != NULL) ? LimitName : "resource");
936
1020
} /* END if () */
938
1022
jbrc = (resource *)GET_NEXT(jbrc->rs_link);
939
1023
} /* END while (jbrc != NULL) */
1025
if (mppnodect_resource != NULL)
1028
* special case where mppnodect was not specified for the job, we need to
1029
* check max against recalculated value using queue/server resources_defaults
1034
/* get queue/server default value */
1036
if (mppnppn != NULL)
1038
cmpwith = get_resource(&pque->qu_attr[QA_ATR_ResourceDefault],
1039
&server.sv_attr[SRV_ATR_resource_deflt],
1043
if (cmpwith != NULL)
1045
mpp_nppn = cmpwith->rs_value.at_val.at_long;
1052
/* get queue/server default value */
1054
if (mppwidthresc != NULL)
1056
cmpwith = get_resource(&pque->qu_attr[QA_ATR_ResourceDefault],
1057
&server.sv_attr[SRV_ATR_resource_deflt],
1061
if (cmpwith != NULL)
1063
mpp_width = cmpwith->rs_value.at_val.at_long;
1068
/* Uses same way of calculating as set_mppnodect */
1069
/* Check for width less than a node */
1071
if ((mpp_width) && (mpp_width < mpp_nppn))
1073
mpp_nppn = mpp_width;
1076
/* Compute an estimate for the number of nodes needed */
1078
mpp_nodect = mpp_width;
1081
mpp_nodect = (mpp_nodect + mpp_nppn - 1) / mpp_nppn;
1084
LimitIsFromQueue = 0;
1085
LimitName = mppnodect_resource->rs_defin->rs_name;
1087
cmpwith = get_resource(&pque->qu_attr[QA_ATR_ResourceMax],
1088
&server.sv_attr[SRV_ATR_ResourceMax],
1089
mppnodect_resource->rs_defin,
1092
if (cmpwith != NULL)
1099
"chk_svr_resc_limit: comparing calculated mppnodect %ld, %s limit %s %ld\n",
1101
(LimitIsFromQueue == 1) ? "queue" : "server",
1103
cmpwith->rs_value.at_val.at_long);
1105
log_event(PBSEVENT_DEBUG, PBS_EVENTCLASS_SERVER, msg_daemonname,
1109
nodect_orig = mppnodect_resource->rs_value.at_val.at_long;
1110
mppnodect_resource->rs_value.at_val.at_long = mpp_nodect;
1112
rc = mppnodect_resource->rs_defin->rs_comp(
1114
&mppnodect_resource->rs_value);
1116
mppnodect_resource->rs_value.at_val.at_long = nodect_orig;
1125
* is_transit flag is not set
1126
* or is_transit is set, but not to true
1127
* or the value comes from queue limit
1129
if ((!(pque->qu_attr[(int)QE_ATR_is_transit].at_flags & ATR_VFLAG_SET)) ||
1130
(!pque->qu_attr[(int)QE_ATR_is_transit].at_val.at_long) ||
1133
if ((EMsg != NULL) && (EMsg[0] == '\0'))
1135
sprintf(EMsg, "cannot satisfy %s max %s requirement",
1136
(LimitIsFromQueue == 1) ? "queue" : "server",
1137
(LimitName != NULL) ? LimitName : "resource");
1144
} /* END if (mppnodect_resource != NULL) */
941
1146
if (jbrc_nodes != NULL)
978
if ((EMsg != NULL) && (EMsg[0] == '\0'))
979
strcpy(EMsg, "cannot locate feasible nodes");
1184
* is_transit flag is not set
1185
* or is_transit is set, but not to true
1187
if ((!(pque->qu_attr[(int)QE_ATR_is_transit].at_flags & ATR_VFLAG_SET)) ||
1188
(!pque->qu_attr[(int)QE_ATR_is_transit].at_val.at_long))
1190
if ((EMsg != NULL) && (EMsg[0] == '\0'))
1191
strcpy(EMsg, "cannot locate feasible nodes");
984
1197
} /* END if (jbrc_nodes != NULL) */
1218
1431
return(PBSE_NONONRERUNABLE);
1433
if (strcmp(Q_DT_fault_tolerant,
1434
pque->qu_attr[QA_ATR_DisallowedTypes].at_val.at_arst->as_string[i]) == 0
1435
&& ((pjob->ji_wattr[(int)JOB_ATR_fault_tolerant].at_flags & ATR_VFLAG_SET) &&
1436
pjob->ji_wattr[(int)JOB_ATR_fault_tolerant].at_val.at_long != 0))
1439
snprintf(EMsg, 1024,
1440
"fault_tolerant jobs are not allowed for queue: user %s, queue %s",
1441
pjob->ji_wattr[(int)JOB_ATR_job_owner].at_val.at_str,
1442
pque->qu_qs.qu_name);
1444
return(PBSE_NOFAULTTOLERANT);
1447
if (strcmp(Q_DT_fault_intolerant,
1448
pque->qu_attr[QA_ATR_DisallowedTypes].at_val.at_arst->as_string[i]) == 0
1449
&& (!(pjob->ji_wattr[(int)JOB_ATR_fault_tolerant].at_flags & ATR_VFLAG_SET) ||
1450
pjob->ji_wattr[(int)JOB_ATR_fault_tolerant].at_val.at_long == 0))
1453
snprintf(EMsg, 1024,
1454
"only fault_tolerant jobs are allowed for queue: user %s, queue %s",
1455
pjob->ji_wattr[(int)JOB_ATR_job_owner].at_val.at_str,
1456
pque->qu_qs.qu_name);
1458
return(PBSE_NOFAULTINTOLERANT);
1460
if (strcmp(Q_DT_job_array,
1461
pque->qu_attr[QA_ATR_DisallowedTypes].at_val.at_arst->as_string[i]) == 0
1462
&& (pjob->ji_wattr[(int)JOB_ATR_job_array_request].at_flags & ATR_VFLAG_SET))
1465
snprintf(EMsg, 1024,
1466
"job arrays are not allowed for queue: queue %s",
1467
pque->qu_qs.qu_name);
1468
return(PBSE_NOJOBARRAYS);
1221
1472
} /* END if (pque->qu_attr[QA_ATR_DisallowedTypes].at_flags & ATR_VFLAG_SET) */
1325
1576
if (pque->qu_attr[QA_ATR_Enabled].at_val.at_long == 0)
1327
if (EMsg) snprintf(EMsg, 1024,
1328
"queue is disabled: user %s, queue %s",
1329
pjob->ji_wattr[(int)JOB_ATR_job_owner].at_val.at_str,
1330
pque->qu_qs.qu_name);
1579
snprintf(EMsg, 1024,
1580
"queue is disabled: user %s, queue %s",
1581
pjob->ji_wattr[(int)JOB_ATR_job_owner].at_val.at_str,
1582
pque->qu_qs.qu_name);
1332
1584
return(PBSE_QUNOENB);
1692
1945
if (qsubhost != NULL)
1948
/* If just the host name portion was specified
1949
* then we use it instead of qsubhost
1952
if ((key == (int)'e') &&
1953
(pjob->ji_wattr[(int)JOB_ATR_errpath].at_flags & ATR_VFLAG_SET) &&
1954
(pjob->ji_wattr[(int)JOB_ATR_errpath].at_val.at_str[strlen(pjob->ji_wattr[(int)JOB_ATR_errpath].at_val.at_str) - 1] == ':'))
1957
pjob->ji_wattr[(int)JOB_ATR_errpath].at_val.at_str[strlen(pjob->ji_wattr[(int)JOB_ATR_errpath].at_val.at_str) - 1] = '\0';
1958
qsubhost = pjob->ji_wattr[(int)JOB_ATR_errpath].at_val.at_str;
1961
else if ((key == (int)'o') &&
1962
(pjob->ji_wattr[(int)JOB_ATR_outpath].at_flags & ATR_VFLAG_SET) &&
1963
(pjob->ji_wattr[(int)JOB_ATR_outpath].at_val.at_str[strlen(pjob->ji_wattr[(int)JOB_ATR_outpath].at_val.at_str) - 1] == ':'))
1966
pjob->ji_wattr[(int)JOB_ATR_outpath].at_val.at_str[strlen(pjob->ji_wattr[(int)JOB_ATR_outpath].at_val.at_str) - 1] = '\0';
1967
qsubhost = pjob->ji_wattr[(int)JOB_ATR_outpath].at_val.at_str;
1694
1971
len = strlen(qsubhost) +
1695
1972
strlen(pjob->ji_wattr[(int)JOB_ATR_jobname].at_val.at_str) +
1696
1973
PBS_MAXSEQNUM +
1888
2164
/* apply queue defaults first since they take precedence */
1890
set_deflt_resc(ja,&pque->qu_attr[(int)QA_ATR_ResourceDefault]);
2166
set_deflt_resc(ja, &pque->qu_attr[(int)QA_ATR_ResourceDefault]);
1892
2168
/* server defaults will only be applied to attributes which have
1893
2169
not yet been set */
1895
set_deflt_resc(ja,&server.sv_attr[(int)SRV_ATR_resource_deflt]);
2171
set_deflt_resc(ja, &server.sv_attr[(int)SRV_ATR_resource_deflt]);
1897
2173
/* apply queue max limits first since they take precedence */
1899
#ifndef RESOURCEMAXNOTDEFAULT
1900
set_deflt_resc(ja,&pque->qu_attr[(int)QA_ATR_ResourceMax]);
2175
#ifdef RESOURCEMAXDEFAULT
2176
set_deflt_resc(ja, &pque->qu_attr[(int)QA_ATR_ResourceMax]);
1902
2178
/* server max limits will only be applied to attributes which have
1903
2179
not yet been set */
1905
set_deflt_resc(ja,&server.sv_attr[(int)SRV_ATR_ResourceMax]);
2181
set_deflt_resc(ja, &server.sv_attr[(int)SRV_ATR_ResourceMax]);
1972
* eval_chkpnt - if the job's checkpoint attribute is "c=nnnn" and
2248
* eval_checkpoint - if the job's checkpoint attribute is "c=nnnn" and
1973
2249
* nnnn is less than the queue' minimum checkpoint time, reset
1974
2250
* to the queue min time.
1977
static void eval_chkpnt(
2253
static void eval_checkpoint(
1979
2255
attribute *jobckp, /* job's checkpoint attribute */
1980
2256
attribute *queckp) /* queue's checkpoint attribute */
2084
2361
pque->qu_njstate[i] = 0;
2087
for (pjob = (job *)GET_NEXT(svr_alljobs);pjob != NULL;
2364
for (pjob = (job *)GET_NEXT(svr_alljobs);
2088
2366
pjob = (job *)GET_NEXT(pjob->ji_alljobs))
2090
2368
server.sv_qs.sv_numjobs++;
2092
2370
server.sv_jobstates[pjob->ji_qs.ji_state]++;
2094
(pjob->ji_qhdr)->qu_numjobs++;
2095
(pjob->ji_qhdr)->qu_njstate[pjob->ji_qs.ji_state]++;
2372
pque = pjob->ji_qhdr;
2374
pque = find_queuebyname(pjob->ji_qs.ji_queue);
2097
if (pjob->ji_qs.ji_state == JOB_STATE_COMPLETE)
2099
pque = pjob->ji_qhdr;
2102
pque = find_queuebyname(pjob->ji_qs.ji_queue);
2379
pque->qu_njstate[pjob->ji_qs.ji_state]++;
2381
if (pjob->ji_qs.ji_state == JOB_STATE_COMPLETE)
2105
2382
pque->qu_numcompleted++;
2384
pjob->ji_qhdr = pque;
2108
2387
} /* END for (pjob) */