1
/*___INFO__MARK_BEGIN__*/
2
/*************************************************************************
4
* The Contents of this file are made available subject to the terms of
5
* the Sun Industry Standards Source License Version 1.2
7
* Sun Microsystems Inc., March, 2001
10
* Sun Industry Standards Source License Version 1.2
11
* =================================================
12
* The contents of this file are subject to the Sun Industry Standards
13
* Source License Version 1.2 (the "License"); You may not use this file
14
* except in compliance with the License. You may obtain a copy of the
15
* License at http://gridengine.sunsource.net/Gridengine_SISSL_license.html
17
* Software provided under this License is provided on an "AS IS" basis,
18
* WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING,
19
* WITHOUT LIMITATION, WARRANTIES THAT THE SOFTWARE IS FREE OF DEFECTS,
20
* MERCHANTABLE, FIT FOR A PARTICULAR PURPOSE, OR NON-INFRINGING.
21
* See the License for the specific provisions governing your rights and
22
* obligations concerning the Software.
24
* The Initial Developer of the Original Code is: Sun Microsystems, Inc.
26
* Copyright: 2001 by Sun Microsystems, Inc.
28
* All Rights Reserved.
30
************************************************************************/
31
/*___INFO__MARK_END__*/
37
#include "sge_answer.h"
39
#include "sge_ja_task.h"
40
#include "sge_pe_task.h"
41
#include "sge_usageL.h"
42
#include "sge_report_execd.h"
43
#include "sge_sched.h"
45
#include "execution_states.h"
46
#include "sge_feature.h"
47
#include "job_report_qmaster.h"
49
#include "sge_signal.h"
50
#include "sge_event_master.h"
51
#include "sge_job_qmaster.h"
53
#include "sge_give_jobs.h"
54
#include "sge_pe_qmaster.h"
56
#include "reschedule.h"
57
#include "msg_daemons_common.h"
58
#include "msg_qmaster.h"
59
#include "sge_string.h"
62
#include "sge_report.h"
64
#include "sge_reporting_qmaster.h"
66
#include "sge_persistence_qmaster.h"
67
#include "spool/sge_spooling.h"
68
#include "sgeobj/sge_ack.h"
70
static char *status2str(u_long32 status);
72
#define is_running(state) (state==JWRITTEN || state==JRUNNING|| state==JWAITING4OSJID)
74
static char *status2str(
99
/* ----------------------------------------
105
Process 'report' containing a job report list from
106
'commproc' at 'rhost'.
108
The 'pb' may get used to collect requests that will be
109
generated in this process. The caller should reply it
110
to the sender of this job report list if 'pb' remains
114
void because all necessary state changings are done
115
in the apropriate objects
117
---------------------------------------- */
118
void process_job_report(sge_gdi_ctx_class_t *ctx, lListElem *report,
119
lListElem *hep, char *rhost, char *commproc,
120
sge_pack_buffer *pb, monitoring_t *monitor)
122
lList* jrl = lGetList(report, REP_list); /* JR_Type */
123
lListElem *jep, *jr, *ep, *jatep = NULL;
124
object_description *object_base = object_type_get_object_description();
126
DENTER(TOP_LAYER, "process_job_report");
128
DPRINTF(("received job report with %d elements:\n", lGetNumberOfElem(jrl)));
131
** first process job reports of sub tasks to ensure this we put all these
132
** job reports to the top of the 'jrl' list this is necessary to ensure
133
** slave tasks get accounted on a shm machine
136
static lSortOrder *jr_sort_order = NULL;
137
if (!jr_sort_order) {
138
DPRINTF(("parsing job report sort order\n"));
139
jr_sort_order = lParseSortOrderVarArg(JR_Type, "%I-",
142
lSortList(jrl, jr_sort_order);
146
** now check all job reports found in step 1 are
147
** removed from job report list
150
const char *queue_name;
151
const char *pe_task_id_str = lGetString(jr, JR_pe_task_id_str);
153
lListElem *petask = NULL;
155
u_long32 jobid, rstate = 0, jataskid = 0;
157
jobid = lGetUlong(jr, JR_job_number);
158
jataskid = lGetUlong(jr, JR_ja_task_number);
159
rstate = lGetUlong(jr, JR_state);
161
/* handle protocol to execd for all jobs which are
162
already finished and maybe rescheduled */
164
fret = skip_restarted_job(hep, jr, jobid, jataskid);
167
pack_ack(pb, ACK_SIGNAL_JOB, jobid, jataskid, NULL);
168
} else if (fret == 3) {
169
pack_ack(pb, ACK_JOB_EXIT, jobid, jataskid, pe_task_id_str);
174
jep = job_list_locate(*object_base[SGE_TYPE_JOB].list, jobid);
176
jatep = lGetElemUlong(lGetList(jep, JB_ja_tasks), JAT_task_number, jataskid);
178
status = lGetUlong(jatep, JAT_status);
182
if ((queue_name = lGetString(jr, JR_queue_name)) == NULL) {
183
queue_name = MSG_OBJ_UNKNOWNQ;
186
if (pe_task_id_str != NULL && jep != NULL && jatep != NULL) {
187
petask = lGetSubStr(jatep, PET_id, pe_task_id_str, JAT_task_list);
195
lList *answer_list = NULL;
201
* If a ja_task was deleted while the execd was down, we'll
202
* get a "job running" report when the execd starts up again.
203
* The ja_task will be deleted by a timer triggered event
204
* (TYPE_SIGNAL_RESEND_EVENT), but this can take up to one
205
* minute - better send a kill signal immediately.
207
if (ISSET(lGetUlong(jatep, JAT_state), JDELETED)) {
208
DPRINTF(("Received report from "sge_u32"."sge_u32
209
" which is already in \"deleted\" state. "
210
"==> send kill signal\n", jobid, jataskid));
212
pack_ack(pb, ACK_SIGNAL_JOB, jobid, jataskid, NULL);
215
if (pe_task_id_str == NULL) {
217
/* store unscaled usage directly in job */
218
lXchgList(jr, JR_usage, lGetListRef(jatep, JAT_usage_list));
220
/* update jobs scaled usage list */
221
lSetList(jatep, JAT_scaled_usage_list,
222
lCopyList("scaled", lGetList(jatep, JAT_usage_list)));
223
scale_usage(lGetList(hep, EH_usage_scaling_list),
224
lGetList(jatep, JAT_previous_usage_list),
225
lGetList(jatep, JAT_scaled_usage_list));
227
if (status == JTRANSFERING) { /* got async ack for this job */
228
DPRINTF(("--- transfering job "sge_u32" is running\n", jobid));
229
sge_commit_job(ctx, jep, jatep, jr, COMMIT_ST_ARRIVED, COMMIT_DEFAULT, monitor); /* implicitly sending usage to schedd */
230
cancel_job_resend(jobid, jataskid);
232
/* need to generate a job event for new usage
233
* the timestamp should better come from report object
235
/* jatask usage is not spooled (?) */
236
sge_add_list_event( 0, sgeE_JOB_USAGE,
237
jobid, jataskid, NULL, NULL,
238
lGetString(jep, JB_session),
239
lGetList(jatep, JAT_scaled_usage_list));
240
lList_clear_changed_info(lGetList(jatep, JAT_scaled_usage_list));
243
/* register running task qmaster will log accounting for all registered tasks */
245
bool new_task = false;
247
/* do we expect a pe task report from this host? */
248
if (lGetString(jatep, JAT_granted_pe)
249
&& (pe=pe_list_locate(*object_base[SGE_TYPE_PE].list, lGetString(jatep, JAT_granted_pe)))
250
&& lGetBool(pe, PE_control_slaves)
251
&& lGetElemHost(lGetList(jatep, JAT_granted_destin_identifier_list), JG_qhostname, rhost)) {
253
/* is the task already known (object was created earlier)? */
254
if (petask == NULL) {
255
/* here qmaster hears the first time about this task
256
and thus adds it to the task list of the appropriate job */
258
DPRINTF(("--- task (#%d) "sge_u32"/%s -> running\n",
259
lGetNumberOfElem(lGetList(jatep, JAT_task_list)), jobid, pe_task_id_str));
260
petask = lAddSubStr(jatep, PET_id, pe_task_id_str, JAT_task_list, PET_Type);
261
lSetUlong(petask, PET_status, JRUNNING);
262
/* JG: TODO: this should be delivered from execd! */
263
lSetUlong(petask, PET_start_time, sge_get_gmt());
264
lSetList(petask, PET_granted_destin_identifier_list, NULL);
265
if ((ep=lAddSubHost(petask, JG_qhostname, rhost, PET_granted_destin_identifier_list, JG_Type))) {
266
lSetString(ep, JG_qname, queue_name);
270
/* store unscaled usage directly in sub-task */
271
lXchgList(jr, JR_usage, lGetListRef(petask, PET_usage));
273
/* update task's scaled usage list */
274
lSetList(petask, PET_scaled_usage,
275
lCopyList("scaled", lGetList(petask, PET_usage)));
277
scale_usage(lGetList(hep, EH_usage_scaling_list),
278
lGetList(petask, PET_previous_usage),
279
lGetList(petask, PET_scaled_usage));
281
/* notify scheduler of task usage event */
284
&answer_list, 0, sgeE_PETASK_ADD,
285
jobid, jataskid, pe_task_id_str, NULL,
286
lGetString(jep, JB_session),
287
jep, jatep, petask, true, true);
289
sge_add_list_event( 0, sgeE_JOB_USAGE,
290
jobid, jataskid, pe_task_id_str,
291
NULL, lGetString(jep, JB_session),
292
lGetList(petask, PET_scaled_usage));
294
answer_list_output(&answer_list);
295
} else if (lGetUlong(jatep, JAT_status) != JFINISHED) {
297
const char *shouldbe_queue_name;
298
const char *shouldbe_host_name;
300
if (!(jg = lFirst(lGetList(jatep, JAT_granted_destin_identifier_list)))) {
301
shouldbe_queue_name = MSG_OBJ_NOTRUNNING;
302
shouldbe_host_name = MSG_OBJ_NOTRUNNING;
304
if ((shouldbe_queue_name = lGetString(jg, JG_qname)) == NULL) {
305
shouldbe_queue_name = MSG_OBJ_UNKNOWN;
307
if ((shouldbe_host_name = lGetString(jg, JG_qhostname)) == NULL) {
308
shouldbe_host_name = MSG_OBJ_UNKNOWN;
311
/* should never happen */
312
ERROR((SGE_EVENT, MSG_JOB_REPORTEXITQ_SUUSSSSS,
313
rhost, sge_u32c(jobid), sge_u32c(jataskid),
314
pe_task_id_str?pe_task_id_str:MSG_MASTER,
315
queue_name, shouldbe_queue_name,
317
status2str(lGetUlong(jatep, JAT_status))));
321
/* once a day write an intermediate usage record to the
322
* reporting file to have correct daily usage reporting with
323
* long running jobs */
324
if (reporting_is_intermediate_acct_required(jep, jatep, petask)) {
325
/* write intermediate usage */
326
reporting_create_acct_record(ctx, NULL, jr, jep, jatep, true);
328
/* this action has changed the ja_task/pe_task - spool */
329
if (pe_task_id_str != NULL) {
330
/* JG: TODO we would need a PETASK_MOD event here!
331
* for spooling only, the ADD event is OK
334
&answer_list, 0, sgeE_PETASK_ADD,
335
jobid, jataskid, pe_task_id_str, NULL,
336
lGetString(jep, JB_session),
337
jep, jatep, petask, false, true);
340
&answer_list, 0, sgeE_JATASK_MOD,
341
jobid, jataskid, NULL, NULL,
342
lGetString(jep, JB_session),
343
jep, jatep, NULL, false, true);
345
answer_list_output(&answer_list);
349
ERROR((SGE_EVENT, MSG_JOB_REPORTRUNQ_SUUSSU,
350
rhost, sge_u32c(jobid), sge_u32c(jataskid),
351
pe_task_id_str?pe_task_id_str:"master",
352
queue_name, sge_u32c(status)));
356
/* execd reports a running job that is unknown */
357
/* signal this job to kill it at execd
358
this can be caused by a qdel -f while
359
execd was unreachable or by deletion of
360
the job in qmasters spool dir + qmaster
362
retry is triggered if execd reports
363
this job again as running
365
ERROR((SGE_EVENT, MSG_JOB_REPORTRUNFALSE_SUUSS, rhost,
366
sge_u32c(jobid), sge_u32c(jataskid),
367
pe_task_id_str?pe_task_id_str:MSG_MASTER, queue_name));
368
pack_ack(pb, ACK_SIGNAL_JOB, jobid, jataskid, NULL);
373
/* we might get load reports of pe slaves, which have finished
374
during a load report interval. We do not have any flushing
375
for slave load reports or any finish reports for them. If
376
the scheduler is fast, it might have send a remove order for
377
the job. We then get a load report for a job / task, which
378
does not exist anymore.
379
I do nto know, if we have to send a job exit request, but at
380
least we have to ignore the load report.
382
if (!jep || !jatep) {
383
DPRINTF(("send cleanup request for slave job "sge_u32"."sge_u32"\n",
385
pack_ack(pb, ACK_JOB_EXIT, jobid, jataskid, pe_task_id_str);
387
/* must be ack for slave job */
388
lListElem *first_at_host;
390
first_at_host = lGetElemHost(lGetList(jatep, JAT_granted_destin_identifier_list), JG_qhostname, rhost);
392
if (lGetUlong(first_at_host, JG_tag_slave_job) != 0) {
394
DPRINTF(("slave job "sge_u32" arrived at %s\n", jobid, rhost));
395
lSetUlong(first_at_host, JG_tag_slave_job, 0);
397
/* should trigger a fast delivery of the job to master execd
398
script but only when all other slaves have also arrived */
399
if (is_pe_master_task_send(jatep)) {
400
/* triggers direct job delivery to master execd */
401
lSetString(jatep, JAT_master_queue, lGetString( lFirst(lGetList(jatep, JAT_granted_destin_identifier_list)), JG_qname));
403
DPRINTF(("trigger retry of job delivery to master execd\n"));
404
lSetUlong(jatep, JAT_start_time, 0);
405
cancel_job_resend(jobid, jataskid);
406
trigger_job_resend(sge_get_gmt(), NULL, jobid, jataskid, 0);
410
/* clear state with regards to slave controlled container */
413
host = host_list_locate(*object_base[SGE_TYPE_EXECHOST].list, rhost);
414
update_reschedule_unknown_list_for_job(host, jobid, jataskid);
416
DPRINTF(("RU: CLEANUP FOR SLAVE JOB "sge_u32"."sge_u32" on host "SFN"\n",
417
jobid, jataskid, rhost));
420
pack_ack(pb, ACK_JOB_EXIT, jobid, jataskid, pe_task_id_str);
426
int skip_job_exit = 0;
428
if (!jep || !jatep || (jep && status==JFINISHED)) {
429
/* must be retry of execds job exit */
430
/* or job was deleted using "qdel -f" */
431
/* while execd was down or .. */
432
dstring buffer = DSTRING_INIT;
434
INFO((SGE_EVENT, "exiting job "SFQ": ja_task does not exist",
435
job_get_id_string(jobid, jataskid, pe_task_id_str, &buffer)));
437
INFO((SGE_EVENT, "exiting job "SFQ": job does not exist",
438
job_get_id_string(jobid, jataskid, pe_task_id_str, &buffer)));
440
sge_dstring_free(&buffer);
443
if (pe_task_id_str == NULL) {
444
/* store unscaled usage directly in job */
445
lXchgList(jr, JR_usage, lGetListRef(jatep, JAT_usage_list));
447
/* update jobs scaled usage list */
448
lSetList(jatep, JAT_scaled_usage_list,
449
lCopyList("scaled", lGetList(jatep, JAT_usage_list)));
450
scale_usage(lGetList(hep, EH_usage_scaling_list),
451
lGetList(jatep, JAT_previous_usage_list),
452
lGetList(jatep, JAT_scaled_usage_list));
453
/* skip sge_job_exit() and pack_job_exit() in case there
454
are still running tasks, since execd resends job exit */
455
for_each (petask, lGetList(jatep, JAT_task_list)) {
456
if (lGetUlong(petask, PET_status)==JRUNNING) {
457
DPRINTF(("job exit for job "sge_u32": still waiting for task %s\n",
458
jobid, lGetString(petask, PET_id)));
467
if (!skip_job_exit) {
468
DPRINTF(("--- running job "sge_u32"."sge_u32" is exiting\n",
469
jobid, jataskid, (status==JTRANSFERING)?"transfering":"running"));
471
sge_job_exit(ctx, jr, jep, jatep, monitor);
473
u_long32 failed = lGetUlong(jr, JR_failed);
475
if (failed == SSTATE_FAILURE_AFTER_JOB &&
476
!lGetString(jep, JB_checkpoint_name)) {
478
if (!ISSET(lGetUlong(jatep, JAT_state), JDELETED)) {
479
dstring id_dstring = DSTRING_INIT;
480
job_mark_job_as_deleted(ctx, jep, jatep);
481
ERROR((SGE_EVENT, MSG_JOB_MASTERTASKFAILED_S,
482
job_get_id_string(jobid, jataskid, NULL, &id_dstring)));
483
sge_dstring_free(&id_dstring);
493
ERROR((SGE_EVENT, MSG_JOB_REPORTEXITJ_UUU,
494
sge_u32c(jobid), sge_u32c(jataskid), sge_u32c(status)));
499
if (lGetString(jatep, JAT_granted_pe)
500
&& (pe=pe_list_locate(*object_base[SGE_TYPE_PE].list, lGetString(jatep, JAT_granted_pe)))
501
&& lGetBool(pe, PE_control_slaves)
502
&& lGetElemHost(lGetList(jatep, JAT_granted_destin_identifier_list), JG_qhostname, rhost)) {
504
* here we get usage of tasks that ran on slave/master execd's
505
* we store the pe task id of finished pe tasks in the ja task
506
* to prevent multiple handling of pe task finish in case
507
* execd resends job report.
510
if (ja_task_add_finished_pe_task(jatep, pe_task_id_str)) {
511
bool known_pe_task = true; /* did this pe task show up
512
earlier (USAGE report) */
514
if (petask == NULL) {
515
known_pe_task = false;
516
petask = lAddSubStr(jatep, PET_id, pe_task_id_str,
517
JAT_task_list, PET_Type);
518
lSetUlong(petask, PET_status, JRUNNING);
521
/* store unscaled usage directly in sub-task */
522
/* lXchgList(jr, JR_usage, lGetListRef(task, JB_usage_list)); */
523
/* copy list because we need to keep usage in jr for sge_log_dusage() */
524
lSetList(petask, PET_usage, lCopyList(NULL, lGetList(jr, JR_usage)));
526
/* update task's scaled usage list */
527
lSetList(petask, PET_scaled_usage,
528
lCopyList("scaled", lGetList(petask, PET_usage)));
529
scale_usage(lGetList(hep, EH_usage_scaling_list),
530
lGetList(petask, PET_previous_usage),
531
lGetList(petask, PET_scaled_usage));
534
if (lGetUlong(petask, PET_status)==JRUNNING ||
535
lGetUlong(petask, PET_status)==JTRANSFERING) {
536
u_long32 failed = lGetUlong(jr, JR_failed);
538
DPRINTF(("--- petask "sge_u32"."sge_u32"/%s -> final usage\n",
539
jobid, jataskid, pe_task_id_str));
540
lSetUlong(petask, PET_status, JFINISHED);
542
reporting_create_acct_record(ctx, NULL, jr, jep, jatep, false);
544
/* add tasks (scaled) usage to past usage container */
546
lListElem *container = lGetSubStr(jatep, PET_id, PE_TASK_PAST_USAGE_CONTAINER, JAT_task_list);
547
if (container == NULL) {
548
lList *answer_list = NULL;
549
container = pe_task_sum_past_usage_list(lGetList(jatep, JAT_task_list), petask);
550
/* usage container will be spooled */
552
&answer_list, 0, sgeE_PETASK_ADD,
553
jobid, jataskid, PE_TASK_PAST_USAGE_CONTAINER, NULL, lGetString(jep, JB_session),
554
jep, jatep, container, true, true);
555
answer_list_output(&answer_list);
557
lList *answer_list = NULL;
559
pe_task_sum_past_usage(container, petask);
560
/* create list event for the USAGE_CONTAINER */
561
sge_add_list_event(0, sgeE_JOB_USAGE,
563
PE_TASK_PAST_USAGE_CONTAINER,
565
lGetString(jep, JB_session),
566
lGetList(container, PET_scaled_usage));
567
/* usage container will be spooled */
568
/* JG: TODO: it is not really a sgeE_PETASK_ADD,
569
* but a sgeE_PETASK_MOD. We don't have this event
570
* yet. For spooling only, the add event will do
573
&answer_list, 0, sgeE_PETASK_ADD,
574
jobid, jataskid, PE_TASK_PAST_USAGE_CONTAINER, NULL, lGetString(jep, JB_session),
575
jep, jatep, container, false, true);
576
answer_list_output(&answer_list);
580
/* remove pe task from job/jatask */
582
lList *answer_list = NULL;
584
&answer_list, 0, sgeE_PETASK_DEL,
585
jobid, jataskid, pe_task_id_str,
586
NULL, NULL, NULL, NULL, NULL,
588
answer_list_output(&answer_list);
590
lRemoveElem(lGetList(jatep, JAT_task_list), &petask);
592
/* get rid of this job in case a task died from XCPU/XFSZ or
593
exited with a core dump */
594
if (failed==SSTATE_FAILURE_AFTER_JOB
595
&& (ep=lGetElemStr(lGetList(jr, JR_usage), UA_name, "signal"))) {
596
u_long32 sge_signo = (u_long32)lGetDouble(ep, UA_value);
600
INFO((SGE_EVENT, MSG_JOB_FILESIZEEXCEED_SSUU,
601
pe_task_id_str, rhost, sge_u32c(jobid), sge_u32c(jataskid)));
604
INFO((SGE_EVENT, MSG_JOB_CPULIMEXCEED_SSUU,
605
pe_task_id_str, rhost, sge_u32c(jobid), sge_u32c(jataskid)));
608
INFO((SGE_EVENT, MSG_JOB_DIEDTHROUGHSIG_SSUUS,
609
pe_task_id_str, rhost, sge_u32c(jobid), sge_u32c(jataskid), sge_sig2str(sge_signo)));
612
} else if (failed==0) {
613
INFO((SGE_EVENT, MSG_JOB_TASKFINISHED_SSUU,
614
pe_task_id_str, rhost, sge_u32c(jobid), sge_u32c(jataskid)));
616
INFO((SGE_EVENT, MSG_JOB_TASKFAILED_SSUUU,
617
pe_task_id_str, rhost, sge_u32c(jobid), sge_u32c(jataskid), sge_u32c(failed)));
620
if (failed == SSTATE_FAILURE_AFTER_JOB &&
621
!lGetString(jep, JB_checkpoint_name)) {
622
if (!ISSET(lGetUlong(jatep, JAT_state), JDELETED)) {
623
dstring id_dstring = DSTRING_INIT;
624
job_mark_job_as_deleted(ctx, jep, jatep);
625
ERROR((SGE_EVENT, MSG_JOB_JOBTASKFAILED_S,
626
job_get_id_string(jobid, jataskid, pe_task_id_str, &id_dstring)));
627
sge_dstring_free(&id_dstring);
632
} else if (status != JFINISHED) {
634
const char *shouldbe_queue_name;
635
const char *shouldbe_host_name;
637
if (!(jg = lFirst(lGetList(jatep, JAT_granted_destin_identifier_list)))) {
638
shouldbe_queue_name = MSG_OBJ_NOTRUNNING;
639
shouldbe_host_name = MSG_OBJ_NOTRUNNING;
641
if ((shouldbe_queue_name = lGetString(jg, JG_qname)) == NULL) {
642
shouldbe_queue_name = MSG_OBJ_UNKNOWN;
644
if ((shouldbe_host_name = lGetString(jg, JG_qhostname)) == NULL) {
645
shouldbe_host_name = MSG_OBJ_UNKNOWN;
648
/* should never happen */
649
ERROR((SGE_EVENT, MSG_JOB_REPORTEXITQ_SUUSSSSS,
650
rhost, sge_u32c(jobid), sge_u32c(jataskid),
651
pe_task_id_str?pe_task_id_str:MSG_MASTER, queue_name,
652
shouldbe_queue_name, shouldbe_host_name,
653
status2str(lGetUlong(jatep, JAT_status))));
657
/* pack ack to enable execd cleaning up */
658
if (!skip_job_exit) {
659
pack_ack(pb, ACK_JOB_EXIT, jobid, jataskid, pe_task_id_str);
664
ERROR((SGE_EVENT, MSG_EXECD_UNKNOWNJ_SUUSUS,
668
pe_task_id_str?pe_task_id_str:MSG_MASTER,
673
pack_ack(pb, ACK_JOB_EXIT, jobid, jataskid, pe_task_id_str);