135
139
/* External Functions called */
137
141
extern void set_resc_assigned A_((job *, enum batch_op));
142
extern void cleanup_restart_file(job *);
139
144
/* Local public functions */
141
146
void req_jobobit A_((struct batch_request *));
143
/* Local private functions */
145
static struct batch_request *setup_cpyfiles A_((struct batch_request *, job *, char *, char *, int, int));
149
151
* setup_from - setup the "from" name for a standard job file:
150
* output, error, or chkpt
152
* output, error, or checkpoint
153
155
static char *setup_from(
339
static struct batch_request *return_stdfile(
340
struct batch_request *preq,
346
if (pjob->ji_wattr[(int)JOB_ATR_interactive].at_flags &&
347
pjob->ji_wattr[(int)JOB_ATR_interactive].at_val.at_long)
352
if ((pjob->ji_wattr[(int)JOB_ATR_checkpoint_name].at_flags & ATR_VFLAG_SET) == 0)
358
/* if this file is joined to another then it doesn't have to get copied back */
359
if (is_joined(pjob, ati))
366
preq = alloc_br(PBS_BATCH_ReturnFiles);
369
strcpy(preq->rq_ind.rq_returnfiles.rq_jobid, pjob->ji_qs.ji_jobid);
371
if (ati == JOB_ATR_errpath)
373
preq->rq_ind.rq_returnfiles.rq_return_stderr = TRUE;
375
else if (ati == JOB_ATR_outpath)
377
preq->rq_ind.rq_returnfiles.rq_return_stdout = TRUE;
797
860
JOB_STATE_EXITING,
798
JOB_SUBSTATE_STAGEOUT);
861
JOB_SUBSTATE_RETURNSTD);
800
863
ptask->wt_type = WORK_Immed;
802
865
/* NO BREAK, fall into stage out processing */
867
case JOB_SUBSTATE_RETURNSTD:
868
/* this is a new substate to TORQUE 2.4.0. The purpose is to provide a
869
* stage of the job exiting process that allows us to transfer stderr and
870
* stdout files from the mom's spool directory back to the server spool
871
* directory. This will only be done if the job has been checkpointed,
872
* and keep_completed is a positive value. This is so that a completed
873
* job can be restarted from a checkpoint file.
876
if (ptask->wt_type != WORK_Deferred_Reply)
878
/* this is the very first call, have mom return the files */
880
/* if job has been checkpointed and KeepSeconds > 0, copy the stderr
881
* and stdout files back so that we can restart a completed
882
* checkpointed job return_stdfile will only setup this request if
883
* the job has a checkpoint file and the file is not joined to another
889
if ((pque = pjob->ji_qhdr) && (pque->qu_attr != NULL))
891
KeepSeconds = attr_ifelse_long(
892
&pque->qu_attr[(int)QE_ATR_KeepCompleted],
893
&server.sv_attr[(int)SRV_ATR_KeepCompleted],
901
strcpy(namebuf, path_spool);
902
strcat(namebuf, pjob->ji_qs.ji_fileprefix);
903
strcat(namebuf, JOB_STDOUT_SUFFIX);
905
/* allocate space for the string name plus ".SAV" */
906
namebuf2 = malloc((strlen(namebuf) + 5) * sizeof(char));
908
if (pjob->ji_qs.ji_un.ji_exect.ji_momaddr != pbs_server_addr)
910
preq = return_stdfile(preq, pjob, JOB_ATR_outpath);
912
else if (access(namebuf, F_OK) == 0)
914
strcpy(namebuf2, namebuf);
915
strcat(namebuf2, ".SAV");
916
if (link(namebuf, namebuf2))
918
log_err(errno,id,strerror(errno));
923
namebuf[strlen(namebuf) - strlen(JOB_STDOUT_SUFFIX)] = '\0';
925
strcat(namebuf, JOB_STDERR_SUFFIX);
928
if (pjob->ji_qs.ji_un.ji_exect.ji_momaddr != pbs_server_addr)
930
preq = return_stdfile(preq, pjob, JOB_ATR_errpath);
932
else if (access(namebuf, F_OK) == 0)
934
strcpy(namebuf2, namebuf);
935
strcat(namebuf2, ".SAV");
936
if (link(namebuf, namebuf2))
938
log_err(errno,id,strerror(errno));
949
preq->rq_extra = (void *)pjob;
951
if (issue_Drequest(handle, preq, on_job_exit, 0) == 0)
953
/* success, we'll come back after mom replies */
957
/* set up as if mom returned error, if we fall through to
958
* here then we want to hit the error processing below
959
* because something bad happened */
963
preq->rq_reply.brp_code = PBSE_MOMREJECT;
965
preq->rq_reply.brp_choice = BATCH_REPLY_CHOICE_NULL;
967
preq->rq_reply.brp_un.brp_txt.brp_txtlen = 0;
973
/* we don't need to return files to the server spool,
974
move on to see if we need to delete files */
979
JOB_SUBSTATE_STAGEOUT);
986
pjob->ji_qs.ji_jobid,
987
"no spool files to return");
990
ptask = set_task(WORK_Immed, 0, on_job_exit, pjob);
994
append_link(&pjob->ji_svrtask, &ptask->wt_linkobj, ptask);
1002
/* here we have a reply (maybe faked) from MOM about the request */
1003
if (preq->rq_reply.brp_code != 0)
1007
snprintf(log_buffer, LOG_BUF_SIZE, "request to return spool files failed on node '%s' for job %s%s",
1008
pjob->ji_wattr[(int)JOB_ATR_exec_host].at_val.at_str,
1009
pjob->ji_qs.ji_jobid,
1010
(IsFaked == 1) ? "*" : "");
1013
PBSEVENT_ERROR | PBSEVENT_ADMIN | PBSEVENT_JOB,
1015
pjob->ji_qs.ji_jobid,
1018
} /* end if (preq->rq_reply.brp_code != 0) */
1022
* files (generally) moved ok, move on to the next phase by
1023
* "faking" the immediate work task and falling through to
1031
svr_setjobstate(pjob, JOB_STATE_EXITING, JOB_SUBSTATE_STAGEOUT);
1033
ptask->wt_type = WORK_Immed;
1036
/* NO BREAK -- FALL INTO NEXT CASE */
804
1038
case JOB_SUBSTATE_STAGEOUT:
806
1040
if (LOGLEVEL >= 4)
964
1199
* "faking" the immediate work task.
1203
/* check to see if we have saved the spool files, that means
1204
the mom and server are sharing this spool directory.
1205
pbs_server should take ownership of these files and rename them
1206
see JOB_SUBSTATE_RETURNSTD above*/
1208
strcpy(namebuf, path_spool);
1210
strcat(namebuf, pjob->ji_qs.ji_fileprefix);
1212
strcat(namebuf, JOB_STDOUT_SUFFIX);
1214
/* allocate space for the string name plus ".SAV" */
1215
namebuf2 = malloc((strlen(namebuf) + 5) * sizeof(char));
1217
strcpy(namebuf2, namebuf);
1219
strcat(namebuf2, ".SAV");
1221
spool_file_exists = access(namebuf2, F_OK);
1223
if (spool_file_exists == 0)
1225
if (link(namebuf2, namebuf))
1227
log_err(errno,id,strerror(errno));
1234
namebuf[strlen(namebuf) - strlen(JOB_STDOUT_SUFFIX)] = '\0';
1236
strcat(namebuf, JOB_STDERR_SUFFIX);
1237
strcpy(namebuf2, namebuf);
1238
strcat(namebuf2, ".SAV");
1240
spool_file_exists = access(namebuf2, F_OK);
1242
if (spool_file_exists == 0)
1245
if (link(namebuf2, namebuf))
1247
log_err(errno,id,strerror(errno));
1130
1432
if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_HERE) == 0)
1131
1433
issue_track(pjob);
1435
/* see if restarted job failed */
1437
if (pjob->ji_wattr[(int)JOB_ATR_checkpoint_restart_status].at_flags & ATR_VFLAG_SET)
1439
char *pfailtype = NULL;
1440
char *pfailure = NULL;
1445
pjob->ji_wattr[(int)JOB_ATR_checkpoint_restart_status].at_val.at_str, 20);
1447
pfailtype = strtok(errMsg," ");
1448
if (pfailtype != NULL)
1449
pfailure = strtok(NULL," ");
1451
if (pfailure != NULL)
1453
if (memcmp(pfailure,"failure",7) == 0)
1455
if (memcmp(pfailtype,"Temporary",9) == 0)
1459
svr_setjobstate(pjob, JOB_STATE_QUEUED, JOB_SUBSTATE_QUEUED);
1463
"Requeueing job after checkpoint restart failure: %s",
1464
pjob->ji_wattr[(int)JOB_ATR_checkpoint_restart_status].at_val.at_str);
1469
pjob->ji_qs.ji_jobid,
1475
* If we are deleting job after a failure then the first character
1476
* of the comment should no longer be uppercase
1478
else if (isupper(*pfailtype))
1480
/* put job on hold */
1482
hold_val = &pjob->ji_wattr[(int)JOB_ATR_hold].at_val.at_long;
1483
*hold_val |= HOLD_s;
1484
pjob->ji_wattr[(int)JOB_ATR_hold].at_flags |= ATR_VFLAG_SET;
1485
pjob->ji_modified = 1;
1486
svr_setjobstate(pjob, JOB_STATE_HELD, JOB_SUBSTATE_HELD);
1490
"Placing job on hold after checkpoint restart failure: %s",
1491
pjob->ji_wattr[(int)JOB_ATR_checkpoint_restart_status].at_val.at_str);
1496
pjob->ji_qs.ji_jobid,
1133
1505
svr_setjobstate(pjob, JOB_STATE_COMPLETE, JOB_SUBSTATE_COMPLETE);
1135
1507
if ((pque = pjob->ji_qhdr) && (pque != NULL))
1535
if (ptask->wt_type == WORK_Immed)
1539
if (((server.sv_attr[(int)SRV_ATR_JobMustReport].at_flags & ATR_VFLAG_SET) != 0)
1540
&& (server.sv_attr[(int)SRV_ATR_JobMustReport].at_val.at_long > 0))
1543
pjob->ji_wattr[(int)JOB_ATR_reported].at_val.at_long = 0;
1545
pjob->ji_wattr[(int)JOB_ATR_reported].at_flags =
1546
ATR_VFLAG_SET | ATR_VFLAG_MODIFY;
1547
job_save(pjob,SAVEJOB_FULL);
1550
else if (((pjob->ji_wattr[(int)JOB_ATR_reported].at_flags & ATR_VFLAG_SET) != 0)
1551
&& (pjob->ji_wattr[(int)JOB_ATR_reported].at_val.at_long == 0))
1163
1556
if (KeepSeconds <= 0)
1561
* If job must report is set and keep_completed is 0,
1562
* default to JOBMUSTREPORTDEFAULTKEEP seconds
1564
KeepSeconds = JOBMUSTREPORTDEFAULTKEEP;
1170
1574
if (ptask->wt_type == WORK_Immed)
1174
ptask = set_task(WORK_Timed, time_now + KeepSeconds, on_job_exit, pjob);
1576
/* is it first time in or server restart recovery */
1578
if ((handle == -1) &&
1579
(pjob->ji_wattr[(int)JOB_ATR_comp_time].at_flags & ATR_VFLAG_SET))
1582
* server restart - if we already have a completion_time then we
1583
* better be restarting.
1584
* use the comp_time to determine task invocation time
1586
ptask = set_task(WORK_Timed,
1587
pjob->ji_wattr[(int)JOB_ATR_comp_time].at_val.at_long + KeepSeconds,
1592
/* First time in - Set the job completion time */
1594
pjob->ji_wattr[(int)JOB_ATR_comp_time].at_val.at_long = (long)time(NULL);
1595
pjob->ji_wattr[(int)JOB_ATR_comp_time].at_flags |= ATR_VFLAG_SET;
1597
ptask = set_task(WORK_Timed, time_now + KeepSeconds, on_job_exit, pjob);
1598
job_save(pjob, SAVEJOB_FULL);
1176
1601
if (ptask != NULL)
2105
* Gets the latest stored used resource information (cput, mem, walltime, etc.)
2106
* about the given job.
2112
svrattrl *patlist, /* I */
2113
char *acctbuf) /* O */
2120
amt = RESC_USED_BUF - strlen(acctbuf);
2122
while (patlist != NULL)
2124
if (strcmp(patlist->al_name, ATTR_session) == 0)
2126
patlist = (svrattrl *)GET_NEXT(patlist->al_link);
2130
need = strlen(patlist->al_name) + strlen(patlist->al_value) + 3;
2132
if (patlist->al_resc)
2134
need += strlen(patlist->al_resc) + 3;
2139
strcat(acctbuf, "\n");
2140
strcat(acctbuf, patlist->al_name);
2142
if (patlist->al_resc)
2144
strcat(acctbuf, ".");
2145
strcat(acctbuf, patlist->al_resc);
2148
strcat(acctbuf, "=");
2150
strcat(acctbuf, patlist->al_value);
2157
patlist = (svrattrl *)GET_NEXT(patlist->al_link);
2161
} /* END get_used() */
2165
* Encodes the used resource information (cput, mem, walltime, etc.)
2166
* about the given job.
2170
#ifdef USESAVEDRESOURCES
2171
void encode_job_used(
2174
tlist_head *phead) /* O */
2181
at = &pjob->ji_wattr[JOB_ATR_resc_used];
2182
ad = &job_attr_def[JOB_ATR_resc_used];
2184
if ((at->at_flags & ATR_VFLAG_SET) == 0)
2189
for (rs = (resource *)GET_NEXT(at->at_val.at_list);
2191
rs = (resource *)GET_NEXT(rs->rs_link))
2193
resource_def *rd = rs->rs_defin;
2197
val = rs->rs_value; /* copy resource attribute */
2209
} /* END for (rs) */
2212
} /* END encode_job_used() */
2213
#endif /* USESAVEDRESOURCES */
1650
2226
struct batch_request *preq) /* I */
2229
#ifdef USESAVEDRESOURCES
2230
char id[] = "req_jobobit";
2231
#endif /* USESAVEDRESOURCES */
1653
2232
int alreadymailed = 0;
1656
char acctbuf[RESC_USED_BUF];
2234
char acctbuf[RESC_USED_BUF];
1658
2236
int exitstatus;
1659
char mailbuf[RESC_USED_BUF];
2237
int have_resc_used = FALSE;
2238
char mailbuf[RESC_USED_BUF];
1665
char jobid[PBS_MAXSVRJOBID+1];
2243
char jobid[PBS_MAXSVRJOBID+1];
1667
2245
struct work_task *ptask;
1668
2246
svrattrl *patlist;
1669
2247
unsigned int dummy;
1674
PBSEVENT_ERROR | PBSEVENT_JOB,
1676
preq->rq_ind.rq_jobobit.rq_jid,
1680
2249
strcpy(jobid, preq->rq_ind.rq_jobobit.rq_jid); /* This will be needed later for logging after preq is freed. */
1682
2250
pjob = find_job(preq->rq_ind.rq_jobobit.rq_jid);
1684
2252
if ((pjob == NULL) ||
1852
2388
accttail = strlen(acctbuf);
1854
amt = RESC_USED_BUF - accttail;
1856
while (patlist != NULL)
2390
have_resc_used = get_used(patlist, acctbuf);
2392
#ifdef USESAVEDRESOURCES
2394
/* if we don't have resources from the obit, use what the job already had */
2396
if (!have_resc_used)
1858
need = strlen(patlist->al_name) + strlen(patlist->al_value) + 3;
1860
if (patlist->al_resc)
1862
need += strlen(patlist->al_resc) + 3;
1867
strcat(acctbuf, "\n");
1868
strcat(acctbuf, patlist->al_name);
1870
if (patlist->al_resc)
1872
strcat(acctbuf, ".");
1873
strcat(acctbuf, patlist->al_resc);
1876
strcat(acctbuf, "=");
1878
strcat(acctbuf, patlist->al_value);
1883
patlist = (svrattrl *)GET_NEXT(patlist->al_link);
2398
struct batch_request *tmppreq;
2402
PBSEVENT_ERROR | PBSEVENT_JOB,
2405
"No resources used found");
2408
tmppreq = alloc_br(PBS_BATCH_JobObit);
2410
if (tmppreq == NULL)
2414
sprintf(log_buffer, "cannot allocate memory for temp obit message");
2418
PBS_EVENTCLASS_REQUEST,
2425
CLEAR_HEAD(tmppreq->rq_ind.rq_jobobit.rq_attr);
2427
encode_job_used(pjob, &tmppreq->rq_ind.rq_jobobit.rq_attr);
2429
patlist = (svrattrl *)GET_NEXT(tmppreq->rq_ind.rq_jobobit.rq_attr);
2431
have_resc_used = get_used(patlist, acctbuf);
2436
#endif /* USESAVEDRESOURCES */
1886
2438
strncat(mailbuf, (acctbuf + accttail), RESC_USED_BUF - strlen(mailbuf) - 1);
1888
2440
mailbuf[RESC_USED_BUF - 1] = '\0';
2659
/* remove checkpoint restart file if there is one */
2661
if (pjob->ji_wattr[(int)JOB_ATR_restart_name].at_flags & ATR_VFLAG_SET)
2663
cleanup_restart_file(pjob);
2107
2666
/* "on_job_exit()" will be dispatched out of the main loop */
2111
2671
/* Rerunning job, if not checkpointed, clear "resources_used and requeue job */
2113
if ((pjob->ji_qs.ji_svrflags & (JOB_SVFLG_CHKPT | JOB_SVFLG_ChkptMig)) == 0)
2673
if ((pjob->ji_qs.ji_svrflags & (JOB_SVFLG_CHECKPOINT_FILE | JOB_SVFLG_CHECKPOINT_MIGRATEABLE)) == 0)
2115
2675
job_attr_def[(int)JOB_ATR_resc_used].at_free(&pjob->ji_wattr[(int)JOB_ATR_resc_used]);
2117
else if (pjob->ji_qs.ji_svrflags & JOB_SVFLG_CHKPT)
2677
else if (pjob->ji_qs.ji_svrflags & JOB_SVFLG_CHECKPOINT_FILE)
2119
2679
/* non-migratable checkpoint (cray), leave there */
2120
2680
/* and just requeue the job */