1
1
/*****************************************************************************\
2
2
* src/slurmd/slurmstepd/mgr.c - job manager functions for slurmstepd
3
* $Id: mgr.c 17040 2009-03-26 15:03:18Z jette $
3
* $Id: mgr.c 18638 2009-09-08 21:54:27Z jette $
4
4
*****************************************************************************
5
5
* Copyright (C) 2002-2007 The Regents of the University of California.
6
* Copyright (C) 2008 Lawrence Livermore National Security.
6
* Copyright (C) 2008-2009 Lawrence Livermore National Security.
7
7
* Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
8
8
* Written by Mark Grondona <mgrondona@llnl.gov>.
9
* CODE-OCEC-09-009. All rights reserved.
11
11
* This file is part of SLURM, a resource management program.
12
* For details, see <http://www.llnl.gov/linux/slurm/>.
12
* For details, see <https://computing.llnl.gov/linux/slurm/>.
13
* Please also read the included file: DISCLAIMER.
14
15
* SLURM is free software; you can redistribute it and/or modify it under
15
16
* the terms of the GNU General Public License as published by the Free
77
78
#include <slurm/slurm_errno.h>
80
#include "src/common/basil_resv_conf.h"
79
81
#include "src/common/cbuf.h"
80
82
#include "src/common/env.h"
83
#include "src/common/fd.h"
84
#include "src/common/forward.h"
81
85
#include "src/common/hostlist.h"
82
86
#include "src/common/log.h"
87
#include "src/common/mpi.h"
83
88
#include "src/common/node_select.h"
84
#include "src/common/fd.h"
89
#include "src/common/plugstack.h"
85
90
#include "src/common/safeopen.h"
86
91
#include "src/common/slurm_jobacct_gather.h"
87
92
#include "src/common/switch.h"
93
#include "src/common/util-net.h"
94
#include "src/common/xmalloc.h"
88
95
#include "src/common/xsignal.h"
89
96
#include "src/common/xstring.h"
90
#include "src/common/xmalloc.h"
91
#include "src/common/util-net.h"
92
#include "src/common/forward.h"
93
#include "src/common/plugstack.h"
94
#include "src/common/mpi.h"
96
98
#include "src/slurmd/slurmd/slurmd.h"
162
165
static int _access(const char *path, int modes, uid_t uid, gid_t gid);
163
166
static void _send_launch_failure(launch_tasks_request_msg_t *,
165
168
static int _fork_all_tasks(slurmd_job_t *job);
166
169
static int _become_user(slurmd_job_t *job, struct priv_state *ps);
167
170
static void _set_prio_process (slurmd_job_t *job);
168
171
static void _set_job_log_prefix(slurmd_job_t *job);
169
172
static int _setup_normal_io(slurmd_job_t *job);
170
173
static int _drop_privileges(slurmd_job_t *job, bool do_setuid,
171
struct priv_state *state);
174
struct priv_state *state);
172
175
static int _reclaim_privileges(struct priv_state *state);
173
176
static void _send_launch_resp(slurmd_job_t *job, int rc);
174
static void _slurmd_job_log_init(slurmd_job_t *job);
177
static int _slurmd_job_log_init(slurmd_job_t *job);
175
178
static void _wait_for_io(slurmd_job_t *job);
176
179
static int _send_exit_msg(slurmd_job_t *job, uint32_t *tid, int n,
178
181
static void _wait_for_children_slurmstepd(slurmd_job_t *job);
179
182
static int _send_pending_exit_msgs(slurmd_job_t *job);
180
183
static void _send_step_complete_msgs(slurmd_job_t *job);
329
332
if (job->aborted)
330
333
verbose("job %u abort complete", job->jobid);
331
334
else if (msg->step_id == SLURM_BATCH_SCRIPT) {
332
_send_complete_batch_script_msg(job,
333
ESLURMD_CREATE_BATCH_DIR_ERROR, -1);
335
_send_complete_batch_script_msg(
336
job, ESLURMD_CREATE_BATCH_DIR_ERROR, -1);
335
338
_send_step_complete_msgs(job);
369
372
if (_drop_privileges(job, true, &sprivs) < 0)
370
373
return ESLURMD_SET_UID_OR_GID_ERROR;
372
/* FIXME - need to check a return code for failures */
373
io_init_tasks_stdio(job);
375
if (_reclaim_privileges(&sprivs) < 0)
376
error("sete{u/g}id(%lu/%lu): %m",
377
(u_long) sprivs.saved_uid, (u_long) sprivs.saved_gid);
375
if (io_init_tasks_stdio(job) != SLURM_SUCCESS) {
376
rc = ESLURMD_IO_ERROR;
380
381
* MUST create the initial client object before starting
383
384
if (!job->batch) {
384
385
srun_info_t *srun = list_peek(job->sruns);
387
/* local id of task that sends to srun, -1 for all tasks,
388
any other value for no tasks */
389
int srun_stdout_tasks = -1;
390
int srun_stderr_tasks = -1;
385
392
xassert(srun != NULL);
386
rc = io_initial_client_connect(srun, job);
388
return ESLURMD_IO_ERROR;
394
/* If I/O is labelled with task num, and if a separate file is
395
written per node or per task, the I/O needs to be sent
396
back to the stepd, get a label appended, and written from
397
the stepd rather than sent back to srun or written directly
398
from the node. When a task has ofname or efname == NULL, it
399
means data gets sent back to the client. */
402
slurmd_filename_pattern_t outpattern, errpattern;
406
io_find_filename_pattern(job, &outpattern, &errpattern,
408
file_flags = io_get_file_flags(job);
410
/* Make eio objects to write from the slurmstepd */
411
if (outpattern == SLURMD_ALL_UNIQUE) {
412
/* Open a separate file per task */
413
for (ii = 0; ii < job->ntasks; ii++) {
414
rc = io_create_local_client(
415
job->task[ii]->ofname,
416
file_flags, job, job->labelio,
418
same ? job->task[ii]->id : -2);
419
if (rc != SLURM_SUCCESS) {
420
error("Could not open output "
422
job->task[ii]->ofname);
423
rc = ESLURMD_IO_ERROR;
427
srun_stdout_tasks = -2;
429
srun_stderr_tasks = -2;
430
} else if (outpattern == SLURMD_ALL_SAME) {
431
/* Open a file for all tasks */
432
rc = io_create_local_client(
433
job->task[0]->ofname,
434
file_flags, job, job->labelio,
436
if (rc != SLURM_SUCCESS) {
437
error("Could not open output "
439
job->task[0]->ofname);
440
rc = ESLURMD_IO_ERROR;
443
srun_stdout_tasks = -2;
445
srun_stderr_tasks = -2;
449
if (errpattern == SLURMD_ALL_UNIQUE) {
450
/* Open a separate file per task */
451
for (ii = 0; ii < job->ntasks; ii++) {
452
rc = io_create_local_client(
453
job->task[ii]->efname,
456
-2, job->task[ii]->id);
457
if (rc != SLURM_SUCCESS) {
463
rc = ESLURMD_IO_ERROR;
467
srun_stderr_tasks = -2;
468
} else if (errpattern == SLURMD_ALL_SAME) {
469
/* Open a file for all tasks */
470
rc = io_create_local_client(
471
job->task[0]->efname,
472
file_flags, job, job->labelio,
474
if (rc != SLURM_SUCCESS) {
475
error("Could not open error "
477
job->task[0]->efname);
478
rc = ESLURMD_IO_ERROR;
481
srun_stderr_tasks = -2;
486
if(io_initial_client_connect(srun, job, srun_stdout_tasks,
487
srun_stderr_tasks) < 0) {
488
rc = ESLURMD_IO_ERROR;
494
if (_reclaim_privileges(&sprivs) < 0) {
495
error("sete{u/g}id(%lu/%lu): %m",
496
(u_long) sprivs.saved_uid, (u_long) sprivs.saved_gid);
499
if (!rc && !job->batch) {
392
500
if (io_thread_start(job) < 0)
393
return ESLURMD_IO_ERROR;
501
rc = ESLURMD_IO_ERROR;
394
504
debug2("Leaving _setup_normal_io");
395
return SLURM_SUCCESS;
437
547
debug3("sending task exit msg for %d tasks", n);
439
msg.task_id_list = tid;
441
msg.return_code = status;
442
msg.job_id = job->jobid;
443
msg.step_id = job->stepid;
549
msg.task_id_list = tid;
551
msg.return_code = status;
552
msg.job_id = job->jobid;
553
msg.step_id = job->stepid;
444
554
slurm_msg_t_init(&resp);
446
resp.msg_type = MESSAGE_TASK_EXIT;
556
resp.msg_type = MESSAGE_TASK_EXIT;
449
559
* XXX Hack for TCP timeouts on exit of large, synchronized
684
796
pthread_mutex_unlock(&step_complete.lock);
799
/* This dummy function is provided so that the checkpoint functions can
800
* resolve this symbol name (as needed for some of the checkpoint
801
* functions used by slurmctld). */
802
extern void agent_queue_request(void *dummy)
804
fatal("Invalid agent_queue_request function call, likely from "
805
"checkpoint plugin");
688
809
* Executes the functions of the slurmd job manager process,
689
810
* which runs as root and performs shared memory and interconnect
699
820
bool io_initialized = false;
821
char *ckpt_type = slurm_get_checkpoint_type();
701
823
debug3("Entered job_manager for %u.%u pid=%lu",
702
824
job->jobid, job->stepid, (unsigned long) job->jmgr_pid);
704
826
* Preload plugins.
706
if (switch_init() != SLURM_SUCCESS
707
|| slurmd_task_init() != SLURM_SUCCESS
708
|| slurm_proctrack_init() != SLURM_SUCCESS
709
|| slurm_jobacct_gather_init() != SLURM_SUCCESS) {
828
if ((switch_init() != SLURM_SUCCESS) ||
829
(slurmd_task_init() != SLURM_SUCCESS) ||
830
(slurm_proctrack_init() != SLURM_SUCCESS) ||
831
(checkpoint_init(ckpt_type) != SLURM_SUCCESS) ||
832
(slurm_jobacct_gather_init() != SLURM_SUCCESS)) {
710
833
rc = SLURM_PLUGIN_NAME_INVALID;
965
1123
if (conf->propagate_prio == 1)
966
1124
_set_prio_process(job);
968
(void) pre_setuid(job);
1127
* Reclaim privileges and call any plugin hooks
1128
* that may require elevated privs
1130
if (_spank_task_privileged(job, i, &sprivs) < 0)
969
1133
if (_become_user(job, &sprivs) < 0) {
970
1134
error("_become_user failed: %m");
971
1135
/* child process, should not return */
1024
1194
i, job->task[i]->pid, job->pgid);
1027
if (slurm_container_add(job, job->task[i]->pid)
1197
if (slurm_container_add(job, job->task[i]->pid)
1028
1198
== SLURM_ERROR) {
1029
error("slurm_container_add: %m");
1199
error("slurm_container_add: %m");
1032
1202
jobacct_id.nodeid = job->nodeid;
1033
1203
jobacct_id.taskid = job->task[i]->gtid;
1034
1204
jobacct_gather_g_add_task(job->task[i]->pid,
1295
_log_task_exit(unsigned long taskid, unsigned long pid, int status)
1298
* Print a nice message to the log describing the task exit status.
1300
* The final else is there just in case there is ever an exit status
1301
* that isn't WIFEXITED || WIFSIGNALED. We'll probably never reach
1302
* that code, but it is better than dropping a potentially useful
1305
if (WIFEXITED(status))
1306
verbose("task %lu (%lu) exited with exit code %d.",
1307
taskid, pid, WEXITSTATUS(status));
1308
else if (WIFSIGNALED(status))
1309
/* WCOREDUMP isn't available on AIX */
1310
verbose("task %lu (%lu) exited. Killed by signal %d%s.",
1311
taskid, pid, WTERMSIG(status),
1313
WCOREDUMP(status) ? " (core dumped)" : ""
1319
verbose("task %lu (%lu) exited with status 0x%04x.",
1320
taskid, pid, status);
1125
1324
* If waitflag is true, perform a blocking wait for a single process
1126
1325
* and then return.
1431
1633
debug("Sending launch resp rc=%d", rc);
1433
1635
slurm_msg_t_init(&resp_msg);
1434
resp_msg.address = srun->resp_addr;
1435
resp_msg.data = &resp;
1436
resp_msg.msg_type = RESPONSE_LAUNCH_TASKS;
1636
resp_msg.address = srun->resp_addr;
1637
resp_msg.data = &resp;
1638
resp_msg.msg_type = RESPONSE_LAUNCH_TASKS;
1438
resp.node_name = xstrdup(job->node_name);
1439
resp.return_code = rc;
1440
resp.count_of_pids = job->ntasks;
1640
resp.node_name = xstrdup(job->node_name);
1641
resp.return_code = rc;
1642
resp.count_of_pids = job->ntasks;
1442
1644
resp.local_pids = xmalloc(job->ntasks * sizeof(*resp.local_pids));
1443
1645
resp.task_ids = xmalloc(job->ntasks * sizeof(*resp.task_ids));