382
* Create an srun job structure from a resource allocation response msg
385
_job_create_structure(allocation_info_t *ainfo)
387
srun_job_t *job = xmalloc(sizeof(srun_job_t));
390
debug2("creating job with %d tasks", opt.nprocs);
392
slurm_mutex_init(&job->state_mutex);
393
pthread_cond_init(&job->state_cond, NULL);
394
job->state = SRUN_JOB_INIT;
396
job->nodelist = xstrdup(ainfo->nodelist);
397
job->stepid = ainfo->stepid;
399
#ifdef HAVE_FRONT_END /* Limited job step support */
400
opt.overcommit = true;
403
job->nhosts = ainfo->nnodes;
406
#ifndef HAVE_FRONT_END
407
if(opt.min_nodes > job->nhosts) {
408
error("Only allocated %d nodes asked for %d",
409
job->nhosts, opt.min_nodes);
411
/* When resources are pre-allocated and some nodes
412
* are explicitly excluded, this error can occur. */
413
error("Are required nodes explicitly excluded?");
418
job->select_jobinfo = ainfo->select_jobinfo;
419
job->jobid = ainfo->jobid;
421
job->ntasks = opt.nprocs;
422
job->task_prolog = xstrdup(opt.task_prolog);
423
job->task_epilog = xstrdup(opt.task_epilog);
424
/* Compute number of file descriptors / Ports needed for Job
425
* control info server
427
job->njfds = _estimate_nports(opt.nprocs, 48);
428
debug3("njfds = %d", job->njfds);
429
job->jfd = (slurm_fd *)
430
xmalloc(job->njfds * sizeof(slurm_fd));
431
job->jaddr = (slurm_addr *)
432
xmalloc(job->njfds * sizeof(slurm_addr));
434
slurm_mutex_init(&job->task_mutex);
436
job->old_job = false;
437
job->removed = false;
438
job->signaled = false;
442
* Initialize Launch and Exit timeout values
447
job->host_state = xmalloc(job->nhosts * sizeof(srun_host_state_t));
449
/* ntask task states and statii*/
450
job->task_state = xmalloc(opt.nprocs * sizeof(srun_task_state_t));
451
job->tstatus = xmalloc(opt.nprocs * sizeof(int));
453
job_update_io_fnames(job);
459
365
update_job_state(srun_job_t *job, srun_job_state_t state)
461
pipe_enum_t pipe_enum = PIPE_JOB_STATE;
462
367
pthread_mutex_lock(&job->state_mutex);
463
368
if (job->state < state) {
464
369
job->state = state;
466
safe_write(job->forked_msg->par_msg->msg_pipe[1],
467
&pipe_enum, sizeof(int));
468
safe_write(job->forked_msg->par_msg->msg_pipe[1],
469
&job->state, sizeof(int));
471
370
pthread_cond_signal(&job->state_cond);
474
373
pthread_mutex_unlock(&job->state_mutex);
477
pthread_mutex_unlock(&job->state_mutex);
478
error("update_job_state: "
479
"write from srun message-handler process failed");
495
389
job_force_termination(srun_job_t *job)
497
if (mode == MODE_ATTACH) {
498
info ("forcing detach");
499
update_job_state(job, SRUN_JOB_DETACHED);
501
info ("forcing job termination");
502
update_job_state(job, SRUN_JOB_FORCETERM);
505
client_io_handler_finish(job->client_io);
510
set_job_rc(srun_job_t *job)
512
int i, rc = 0, task_failed = 0;
515
* return code set to at least one if any tasks failed launch
517
for (i = 0; i < opt.nprocs; i++) {
518
if (job->task_state[i] == SRUN_TASK_FAILED)
520
if (job->rc < job->tstatus[i])
521
job->rc = job->tstatus[i];
523
if (task_failed && (job->rc <= 0)) {
528
if ((rc = WEXITSTATUS(job->rc)))
530
if (WIFSIGNALED(job->rc))
531
return (128 + WTERMSIG(job->rc));
536
void job_fatal(srun_job_t *job, const char *msg)
540
srun_job_destroy(job, errno);
547
srun_job_destroy(srun_job_t *job, int error)
553
debug("cancelling job step %u.%u", job->jobid, job->stepid);
554
slurm_kill_job_step(job->jobid, job->stepid, SIGKILL);
555
} else if (!opt.no_alloc) {
556
debug("cancelling job %u", job->jobid);
557
slurm_complete_job(job->jobid, error);
559
debug("no allocation to cancel, killing remote tasks");
560
fwd_signal(job, SIGKILL, opt.max_threads);
564
if (error) debugger_launch_failure(job);
571
srun_job_kill(srun_job_t *job)
574
if (slurm_kill_job_step(job->jobid, job->stepid, SIGKILL) < 0)
575
error ("slurm_kill_job_step: %m");
577
update_job_state(job, SRUN_JOB_FAILED);
581
report_job_status(srun_job_t *job)
584
hostlist_t hl = hostlist_create(job->nodelist);
587
for (i = 0; i < job->nhosts; i++) {
588
name = hostlist_shift(hl);
589
info ("host:%s state:%s", name,
590
_host_state_name(job->host_state[i]));
596
#define NTASK_STATES 6
598
report_task_status(srun_job_t *job)
601
char buf[MAXHOSTRANGELEN+2];
602
hostlist_t hl[NTASK_STATES];
604
for (i = 0; i < NTASK_STATES; i++)
605
hl[i] = hostlist_create(NULL);
607
for (i = 0; i < opt.nprocs; i++) {
608
int state = job->task_state[i];
609
debug3(" state of task %d is %d", i, state);
610
snprintf(buf, 256, "%d", i);
611
hostlist_push(hl[state], buf);
614
for (i = 0; i< NTASK_STATES; i++) {
615
if (hostlist_count(hl[i]) > 0) {
616
hostlist_ranged_string(hl[i], MAXHOSTRANGELEN, buf);
617
info("task%s: %s", buf, _task_state_name(i));
619
hostlist_destroy(hl[i]);
625
fwd_signal(srun_job_t *job, int signo, int max_threads)
629
kill_tasks_msg_t msg;
630
static pthread_mutex_t sig_mutex = PTHREAD_MUTEX_INITIALIZER;
631
pipe_enum_t pipe_enum = PIPE_SIGNALED;
635
List ret_list = NULL;
637
ret_data_info_t *ret_data_info = NULL;
638
int rc = SLURM_SUCCESS;
640
slurm_mutex_lock(&sig_mutex);
642
if (signo == SIGKILL || signo == SIGINT || signo == SIGTERM) {
643
slurm_mutex_lock(&job->state_mutex);
644
job->signaled = true;
645
slurm_mutex_unlock(&job->state_mutex);
647
write(job->forked_msg->par_msg->msg_pipe[1],
648
&pipe_enum,sizeof(int));
649
write(job->forked_msg->par_msg->msg_pipe[1],
650
&job->signaled,sizeof(int));
654
debug2("forward signal %d to job", signo);
656
/* common to all tasks */
657
msg.job_id = job->jobid;
658
msg.job_step_id = job->stepid;
659
msg.signal = (uint32_t) signo;
661
hl = hostlist_create("");
662
for (i = 0; i < job->nhosts; i++) {
663
if (job->host_state[i] != SRUN_HOST_REPLIED) {
664
name = nodelist_nth_host(
665
job->step_layout->node_list, i);
666
debug2("%s has not yet replied\n", name);
670
if (job_active_tasks_on_host(job, i) == 0)
672
name = nodelist_nth_host(job->step_layout->node_list, i);
673
hostlist_push(hl, name);
676
if(!hostlist_count(hl)) {
677
hostlist_destroy(hl);
680
hostlist_ranged_string(hl, sizeof(buf), buf);
681
hostlist_destroy(hl);
684
slurm_msg_t_init(&req);
685
req.msg_type = REQUEST_SIGNAL_TASKS;
688
debug3("sending signal to host %s", name);
690
if (!(ret_list = slurm_send_recv_msgs(name, &req, 0))) {
691
error("fwd_signal: slurm_send_recv_msgs really failed bad");
693
slurm_mutex_unlock(&sig_mutex);
697
itr = list_iterator_create(ret_list);
698
while((ret_data_info = list_next(itr))) {
699
rc = slurm_get_return_code(ret_data_info->type,
700
ret_data_info->data);
702
* Report error unless it is "Invalid job id" which
703
* probably just means the tasks exited in the meanwhile.
705
if ((rc != 0) && (rc != ESLURM_INVALID_JOB_ID)
706
&& (rc != ESLURMD_JOB_NOTRUNNING) && (rc != ESRCH)) {
707
error("%s: signal: %s",
708
ret_data_info->node_name,
712
list_iterator_destroy(itr);
713
list_destroy(ret_list);
715
debug2("All tasks have been signalled");
717
slurm_mutex_unlock(&sig_mutex);
721
job_active_tasks_on_host(srun_job_t *job, int hostid)
726
slurm_mutex_lock(&job->task_mutex);
727
for (i = 0; i < job->step_layout->tasks[hostid]; i++) {
728
uint32_t *tids = job->step_layout->tids[hostid];
729
xassert(tids != NULL);
730
debug("Task %d state: %d", tids[i], job->task_state[tids[i]]);
731
if (job->task_state[tids[i]] == SRUN_TASK_RUNNING)
734
slurm_mutex_unlock(&job->task_mutex);
391
info ("forcing job termination");
392
update_job_state(job, SRUN_JOB_FORCETERM);
738
395
static inline int
428
* Create an srun job structure from a resource allocation response msg
431
_job_create_structure(allocation_info_t *ainfo)
433
srun_job_t *job = xmalloc(sizeof(srun_job_t));
436
debug2("creating job with %d tasks", opt.nprocs);
438
slurm_mutex_init(&job->state_mutex);
439
pthread_cond_init(&job->state_cond, NULL);
440
job->state = SRUN_JOB_INIT;
442
job->nodelist = xstrdup(ainfo->nodelist);
443
job->stepid = ainfo->stepid;
445
#ifdef HAVE_FRONT_END /* Limited job step support */
446
opt.overcommit = true;
449
job->nhosts = ainfo->nnodes;
452
#ifndef HAVE_FRONT_END
453
if(opt.min_nodes > job->nhosts) {
454
error("Only allocated %d nodes asked for %d",
455
job->nhosts, opt.min_nodes);
457
/* When resources are pre-allocated and some nodes
458
* are explicitly excluded, this error can occur. */
459
error("Are required nodes explicitly excluded?");
464
job->select_jobinfo = ainfo->select_jobinfo;
465
job->jobid = ainfo->jobid;
467
job->ntasks = opt.nprocs;
471
job_update_io_fnames(job);
771
477
job_update_io_fnames(srun_job_t *job)