~ubuntu-branches/ubuntu/saucy/slurm-llnl/saucy

« back to all changes in this revision

Viewing changes to src/srun/srun_job.c

  • Committer: Bazaar Package Importer
  • Author(s): Gennaro Oliva
  • Date: 2008-05-30 13:11:30 UTC
  • mfrom: (1.1.3 upstream)
  • Revision ID: james.westby@ubuntu.com-20080530131130-l6ko6aie7xhrlmxe
Tags: 1.3.3-1
* New upstream release
* Removed patches to src/slurmctd/controller.c src/slurmdbd/slurmdbd.c
  doc/man/man1/sacctmgr.1 included to upstream
* Edited watch file to seek for 1.3 releases
* doc/man/man1/salloc.1 doc/man/man1/sbatch.1 doc/man/man5/slurm.conf.5
  patched to improve formatting and avoid manual warnings 

Show diffs side-by-side

added added

removed removed

Lines of Context:
5
5
 *  Copyright (C) 2002 The Regents of the University of California.
6
6
 *  Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
7
7
 *  Written by Mark Grondona <grondona@llnl.gov>.
8
 
 *  UCRL-CODE-226842.
 
8
 *  LLNL-CODE-402394.
9
9
 *  
10
10
 *  This file is part of SLURM, a resource management program.
11
11
 *  For details, see <http://www.llnl.gov/linux/slurm/>.
56
56
#include "src/common/log.h"
57
57
#include "src/common/read_config.h"
58
58
#include "src/common/slurm_protocol_api.h"
59
 
#include "src/common/slurm_cred.h"
60
59
#include "src/common/xmalloc.h"
61
60
#include "src/common/xstring.h"
62
61
#include "src/common/io_hdr.h"
63
62
#include "src/common/forward.h"
 
63
#include "src/common/fd.h"
64
64
 
65
65
#include "src/srun/srun_job.h"
66
66
#include "src/srun/opt.h"
67
67
#include "src/srun/fname.h"
68
 
#include "src/srun/attach.h"
69
 
#include "src/srun/msg.h"
70
 
 
71
 
typedef enum {DSH_NEW, DSH_ACTIVE, DSH_DONE, DSH_FAILED} state_t;
 
68
#include "src/srun/debugger.h"
72
69
 
73
70
/*
74
71
 * allocation information structure used to store general information
85
82
        select_jobinfo_t select_jobinfo;
86
83
} allocation_info_t;
87
84
 
88
 
typedef struct thd {
89
 
        pthread_t       thread;                 /* thread ID */
90
 
        pthread_attr_t  attr;                   /* thread attributes */
91
 
        state_t         state;                  /* thread state */
92
 
} thd_t;
93
 
 
94
 
int message_thread = 0;
95
85
/*
96
86
 * Prototypes:
97
87
 */
99
89
static int        _compute_task_count(allocation_info_t *info);
100
90
static void       _set_nprocs(allocation_info_t *info);
101
91
static srun_job_t *_job_create_structure(allocation_info_t *info);
102
 
static void       _job_fake_cred(srun_job_t *job);
103
 
static char *     _task_state_name(srun_task_state_t state_inx);
104
 
static char *     _host_state_name(srun_host_state_t state_inx);
105
92
static char *     _normalize_hostlist(const char *hostlist);
106
93
 
107
94
 
139
126
         * Create job, then fill in host addresses
140
127
         */
141
128
        job = _job_create_structure(ai);
142
 
        job->step_layout = fake_slurm_step_layout_create(job->nodelist, 
143
 
                                                         NULL, NULL,
144
 
                                                         job->nhosts,
145
 
                                                         job->ntasks);
146
 
                
147
 
        _job_fake_cred(job);
 
129
        
148
130
        job_update_io_fnames(job);
 
131
 
149
132
   error:
150
133
        xfree(ai);
151
134
        return (job);
378
361
        return (job);
379
362
}
380
363
 
381
 
/*
382
 
 * Create an srun job structure from a resource allocation response msg
383
 
 */
384
 
static srun_job_t *
385
 
_job_create_structure(allocation_info_t *ainfo)
386
 
{
387
 
        srun_job_t *job = xmalloc(sizeof(srun_job_t));
388
 
        
389
 
        _set_nprocs(ainfo);
390
 
        debug2("creating job with %d tasks", opt.nprocs);
391
 
 
392
 
        slurm_mutex_init(&job->state_mutex);
393
 
        pthread_cond_init(&job->state_cond, NULL);
394
 
        job->state = SRUN_JOB_INIT;
395
 
 
396
 
        job->nodelist = xstrdup(ainfo->nodelist); 
397
 
        job->stepid  = ainfo->stepid;
398
 
        
399
 
#ifdef HAVE_FRONT_END   /* Limited job step support */
400
 
        opt.overcommit = true;
401
 
        job->nhosts = 1;
402
 
#else
403
 
        job->nhosts   = ainfo->nnodes;
404
 
#endif
405
 
 
406
 
#ifndef HAVE_FRONT_END
407
 
        if(opt.min_nodes > job->nhosts) {
408
 
                error("Only allocated %d nodes asked for %d",
409
 
                      job->nhosts, opt.min_nodes);
410
 
                if (opt.exc_nodes) {
411
 
                        /* When resources are pre-allocated and some nodes
412
 
                         * are explicitly excluded, this error can occur. */
413
 
                        error("Are required nodes explicitly excluded?");
414
 
                }
415
 
                return NULL;
416
 
        }       
417
 
#endif
418
 
        job->select_jobinfo = ainfo->select_jobinfo;
419
 
        job->jobid   = ainfo->jobid;
420
 
        
421
 
        job->ntasks  = opt.nprocs;
422
 
        job->task_prolog = xstrdup(opt.task_prolog);
423
 
        job->task_epilog = xstrdup(opt.task_epilog);
424
 
        /* Compute number of file descriptors / Ports needed for Job 
425
 
         * control info server
426
 
         */
427
 
        job->njfds = _estimate_nports(opt.nprocs, 48);
428
 
        debug3("njfds = %d", job->njfds);
429
 
        job->jfd = (slurm_fd *)
430
 
                xmalloc(job->njfds * sizeof(slurm_fd));
431
 
        job->jaddr = (slurm_addr *) 
432
 
                xmalloc(job->njfds * sizeof(slurm_addr));
433
 
 
434
 
        slurm_mutex_init(&job->task_mutex);
435
 
        
436
 
        job->old_job = false;
437
 
        job->removed = false;
438
 
        job->signaled = false;
439
 
        job->rc       = -1;
440
 
        
441
 
        /* 
442
 
         *  Initialize Launch and Exit timeout values
443
 
         */
444
 
        job->ltimeout = 0;
445
 
        job->etimeout = 0;
446
 
        
447
 
        job->host_state =  xmalloc(job->nhosts * sizeof(srun_host_state_t));
448
 
        
449
 
        /* ntask task states and statii*/
450
 
        job->task_state  =  xmalloc(opt.nprocs * sizeof(srun_task_state_t));
451
 
        job->tstatus     =  xmalloc(opt.nprocs * sizeof(int));
452
 
        
453
 
        job_update_io_fnames(job);
454
 
        
455
 
        return (job);   
456
 
}
457
 
 
458
364
void
459
365
update_job_state(srun_job_t *job, srun_job_state_t state)
460
366
{
461
 
        pipe_enum_t pipe_enum = PIPE_JOB_STATE;
462
367
        pthread_mutex_lock(&job->state_mutex);
463
368
        if (job->state < state) {
464
369
                job->state = state;
465
 
                if(message_thread) {
466
 
                        safe_write(job->forked_msg->par_msg->msg_pipe[1],
467
 
                                   &pipe_enum, sizeof(int));
468
 
                        safe_write(job->forked_msg->par_msg->msg_pipe[1],
469
 
                                   &job->state, sizeof(int));
470
 
                }
471
370
                pthread_cond_signal(&job->state_cond);
472
371
                
473
372
        }
474
373
        pthread_mutex_unlock(&job->state_mutex);
475
374
        return;
476
 
rwfail:
477
 
        pthread_mutex_unlock(&job->state_mutex);
478
 
        error("update_job_state: "
479
 
              "write from srun message-handler process failed");
480
 
 
481
375
}
482
376
 
483
377
srun_job_state_t 
494
388
void 
495
389
job_force_termination(srun_job_t *job)
496
390
{
497
 
        if (mode == MODE_ATTACH) {
498
 
                info ("forcing detach");
499
 
                update_job_state(job, SRUN_JOB_DETACHED);
500
 
        } else {
501
 
                info ("forcing job termination");
502
 
                update_job_state(job, SRUN_JOB_FORCETERM);
503
 
        }
504
 
 
505
 
        client_io_handler_finish(job->client_io);
506
 
}
507
 
 
508
 
 
509
 
int
510
 
set_job_rc(srun_job_t *job)
511
 
{
512
 
        int i, rc = 0, task_failed = 0;
513
 
 
514
 
        /*
515
 
         *  return code set to at least one if any tasks failed launch
516
 
         */
517
 
        for (i = 0; i < opt.nprocs; i++) {
518
 
                if (job->task_state[i] == SRUN_TASK_FAILED)
519
 
                        task_failed = 1; 
520
 
                if (job->rc < job->tstatus[i])
521
 
                        job->rc = job->tstatus[i];
522
 
        }
523
 
        if (task_failed && (job->rc <= 0)) {
524
 
                job->rc = 1;
525
 
                return 1;
526
 
        }
527
 
 
528
 
        if ((rc = WEXITSTATUS(job->rc)))
529
 
                return rc;
530
 
        if (WIFSIGNALED(job->rc))
531
 
                return (128 + WTERMSIG(job->rc));
532
 
        return job->rc;
533
 
}
534
 
 
535
 
 
536
 
void job_fatal(srun_job_t *job, const char *msg)
537
 
{
538
 
        if (msg) error(msg);
539
 
 
540
 
        srun_job_destroy(job, errno);
541
 
 
542
 
        exit(1);
543
 
}
544
 
 
545
 
 
546
 
void 
547
 
srun_job_destroy(srun_job_t *job, int error)
548
 
{
549
 
        if (job->removed)
550
 
                return;
551
 
 
552
 
        if (job->old_job) {
553
 
                debug("cancelling job step %u.%u", job->jobid, job->stepid);
554
 
                slurm_kill_job_step(job->jobid, job->stepid, SIGKILL);
555
 
        } else if (!opt.no_alloc) {
556
 
                debug("cancelling job %u", job->jobid);
557
 
                slurm_complete_job(job->jobid, error);
558
 
        } else {
559
 
                debug("no allocation to cancel, killing remote tasks");
560
 
                fwd_signal(job, SIGKILL, opt.max_threads); 
561
 
                return;
562
 
        }
563
 
 
564
 
        if (error) debugger_launch_failure(job);
565
 
 
566
 
        job->removed = true;
567
 
}
568
 
 
569
 
 
570
 
void
571
 
srun_job_kill(srun_job_t *job)
572
 
{
573
 
        if (!opt.no_alloc) {
574
 
                if (slurm_kill_job_step(job->jobid, job->stepid, SIGKILL) < 0)
575
 
                        error ("slurm_kill_job_step: %m");
576
 
        }
577
 
        update_job_state(job, SRUN_JOB_FAILED);
578
 
}
579
 
        
580
 
void 
581
 
report_job_status(srun_job_t *job)
582
 
{
583
 
        int i;
584
 
        hostlist_t hl = hostlist_create(job->nodelist);
585
 
        char *name = NULL;
586
 
 
587
 
        for (i = 0; i < job->nhosts; i++) {
588
 
                name = hostlist_shift(hl);
589
 
                info ("host:%s state:%s", name, 
590
 
                      _host_state_name(job->host_state[i]));
591
 
                free(name);
592
 
        }
593
 
}
594
 
 
595
 
 
596
 
#define NTASK_STATES 6
597
 
void 
598
 
report_task_status(srun_job_t *job)
599
 
{
600
 
        int i;
601
 
        char buf[MAXHOSTRANGELEN+2];
602
 
        hostlist_t hl[NTASK_STATES];
603
 
 
604
 
        for (i = 0; i < NTASK_STATES; i++)
605
 
                hl[i] = hostlist_create(NULL);
606
 
 
607
 
        for (i = 0; i < opt.nprocs; i++) {
608
 
                int state = job->task_state[i];
609
 
                debug3("  state of task %d is %d", i, state);
610
 
                snprintf(buf, 256, "%d", i);
611
 
                hostlist_push(hl[state], buf); 
612
 
        }
613
 
 
614
 
        for (i = 0; i< NTASK_STATES; i++) {
615
 
                if (hostlist_count(hl[i]) > 0) {
616
 
                        hostlist_ranged_string(hl[i], MAXHOSTRANGELEN, buf);
617
 
                        info("task%s: %s", buf, _task_state_name(i));
618
 
                }
619
 
                hostlist_destroy(hl[i]);
620
 
        }
621
 
 
622
 
}
623
 
 
624
 
void 
625
 
fwd_signal(srun_job_t *job, int signo, int max_threads)
626
 
{
627
 
        int i;
628
 
        slurm_msg_t req;
629
 
        kill_tasks_msg_t msg;
630
 
        static pthread_mutex_t sig_mutex = PTHREAD_MUTEX_INITIALIZER;
631
 
        pipe_enum_t pipe_enum = PIPE_SIGNALED;
632
 
        hostlist_t hl;
633
 
        char *name = NULL;
634
 
        char buf[8192];
635
 
        List ret_list = NULL;
636
 
        ListIterator itr;
637
 
        ret_data_info_t *ret_data_info = NULL;
638
 
        int rc = SLURM_SUCCESS;
639
 
 
640
 
        slurm_mutex_lock(&sig_mutex);
641
 
 
642
 
        if (signo == SIGKILL || signo == SIGINT || signo == SIGTERM) {
643
 
                slurm_mutex_lock(&job->state_mutex);
644
 
                job->signaled = true;
645
 
                slurm_mutex_unlock(&job->state_mutex);
646
 
                if(message_thread) {
647
 
                        write(job->forked_msg->par_msg->msg_pipe[1],
648
 
                              &pipe_enum,sizeof(int));
649
 
                        write(job->forked_msg->par_msg->msg_pipe[1],
650
 
                              &job->signaled,sizeof(int));
651
 
                }
652
 
        }
653
 
 
654
 
        debug2("forward signal %d to job", signo);
655
 
 
656
 
        /* common to all tasks */
657
 
        msg.job_id      = job->jobid;
658
 
        msg.job_step_id = job->stepid;
659
 
        msg.signal      = (uint32_t) signo;
660
 
        
661
 
        hl = hostlist_create("");
662
 
        for (i = 0; i < job->nhosts; i++) {
663
 
                if (job->host_state[i] != SRUN_HOST_REPLIED) {
664
 
                        name = nodelist_nth_host(
665
 
                                job->step_layout->node_list, i);
666
 
                        debug2("%s has not yet replied\n", name);
667
 
                        free(name);
668
 
                        continue;
669
 
                }
670
 
                if (job_active_tasks_on_host(job, i) == 0)
671
 
                        continue;
672
 
                name = nodelist_nth_host(job->step_layout->node_list, i);
673
 
                hostlist_push(hl, name);
674
 
                free(name);
675
 
        }
676
 
        if(!hostlist_count(hl)) {
677
 
                hostlist_destroy(hl);
678
 
                goto nothing_left;
679
 
        }
680
 
        hostlist_ranged_string(hl, sizeof(buf), buf);
681
 
        hostlist_destroy(hl);
682
 
        name = xstrdup(buf);
683
 
 
684
 
        slurm_msg_t_init(&req); 
685
 
        req.msg_type = REQUEST_SIGNAL_TASKS;
686
 
        req.data     = &msg;
687
 
        
688
 
        debug3("sending signal to host %s", name);
689
 
        
690
 
        if (!(ret_list = slurm_send_recv_msgs(name, &req, 0))) { 
691
 
                error("fwd_signal: slurm_send_recv_msgs really failed bad");
692
 
                xfree(name);
693
 
                slurm_mutex_unlock(&sig_mutex);
694
 
                return;
695
 
        }
696
 
        xfree(name);
697
 
        itr = list_iterator_create(ret_list);           
698
 
        while((ret_data_info = list_next(itr))) {
699
 
                rc = slurm_get_return_code(ret_data_info->type, 
700
 
                                           ret_data_info->data);
701
 
                /*
702
 
                 *  Report error unless it is "Invalid job id" which 
703
 
                 *    probably just means the tasks exited in the meanwhile.
704
 
                 */
705
 
                if ((rc != 0) && (rc != ESLURM_INVALID_JOB_ID)
706
 
                    &&  (rc != ESLURMD_JOB_NOTRUNNING) && (rc != ESRCH)) {
707
 
                        error("%s: signal: %s", 
708
 
                              ret_data_info->node_name, 
709
 
                              slurm_strerror(rc));
710
 
                }
711
 
        }
712
 
        list_iterator_destroy(itr);
713
 
        list_destroy(ret_list);
714
 
nothing_left:
715
 
        debug2("All tasks have been signalled");
716
 
        
717
 
        slurm_mutex_unlock(&sig_mutex);
718
 
}
719
 
 
720
 
int
721
 
job_active_tasks_on_host(srun_job_t *job, int hostid)
722
 
{
723
 
        int i;
724
 
        int retval = 0;
725
 
 
726
 
        slurm_mutex_lock(&job->task_mutex);
727
 
        for (i = 0; i < job->step_layout->tasks[hostid]; i++) {
728
 
                uint32_t *tids = job->step_layout->tids[hostid];
729
 
                xassert(tids != NULL);
730
 
                debug("Task %d state: %d", tids[i], job->task_state[tids[i]]);
731
 
                if (job->task_state[tids[i]] == SRUN_TASK_RUNNING) 
732
 
                        retval++;
733
 
        }
734
 
        slurm_mutex_unlock(&job->task_mutex);
735
 
        return retval;
 
391
        info ("forcing job termination");
 
392
        update_job_state(job, SRUN_JOB_FORCETERM);
736
393
}
737
394
 
738
395
static inline int
767
424
        }
768
425
}
769
426
 
 
427
/*
 
428
 * Create an srun job structure from a resource allocation response msg
 
429
 */
 
430
static srun_job_t *
 
431
_job_create_structure(allocation_info_t *ainfo)
 
432
{
 
433
        srun_job_t *job = xmalloc(sizeof(srun_job_t));
 
434
        
 
435
        _set_nprocs(ainfo);
 
436
        debug2("creating job with %d tasks", opt.nprocs);
 
437
 
 
438
        slurm_mutex_init(&job->state_mutex);
 
439
        pthread_cond_init(&job->state_cond, NULL);
 
440
        job->state = SRUN_JOB_INIT;
 
441
 
 
442
        job->nodelist = xstrdup(ainfo->nodelist); 
 
443
        job->stepid  = ainfo->stepid;
 
444
        
 
445
#ifdef HAVE_FRONT_END   /* Limited job step support */
 
446
        opt.overcommit = true;
 
447
        job->nhosts = 1;
 
448
#else
 
449
        job->nhosts   = ainfo->nnodes;
 
450
#endif
 
451
 
 
452
#ifndef HAVE_FRONT_END
 
453
        if(opt.min_nodes > job->nhosts) {
 
454
                error("Only allocated %d nodes asked for %d",
 
455
                      job->nhosts, opt.min_nodes);
 
456
                if (opt.exc_nodes) {
 
457
                        /* When resources are pre-allocated and some nodes
 
458
                         * are explicitly excluded, this error can occur. */
 
459
                        error("Are required nodes explicitly excluded?");
 
460
                }
 
461
                return NULL;
 
462
        }       
 
463
#endif
 
464
        job->select_jobinfo = ainfo->select_jobinfo;
 
465
        job->jobid   = ainfo->jobid;
 
466
        
 
467
        job->ntasks  = opt.nprocs;
 
468
 
 
469
        job->rc       = -1;
 
470
        
 
471
        job_update_io_fnames(job);
 
472
        
 
473
        return (job);   
 
474
}
 
475
 
770
476
void
771
477
job_update_io_fnames(srun_job_t *job)
772
478
{
775
481
        job->efname = opt.efname ? fname_create(job, opt.efname) : job->ofname;
776
482
}
777
483
 
778
 
static void
779
 
_job_fake_cred(srun_job_t *job)
780
 
{
781
 
        slurm_cred_arg_t arg;
782
 
        arg.jobid    = job->jobid;
783
 
        arg.stepid   = job->stepid;
784
 
        arg.uid      = opt.uid;
785
 
        arg.hostlist = job->nodelist;
786
 
        arg.alloc_lps_cnt = 0;    
787
 
        arg.alloc_lps     =  NULL; 
788
 
        job->cred = slurm_cred_faker(&arg);
789
 
}
790
 
 
791
 
static char *
792
 
_task_state_name(srun_task_state_t state_inx)
793
 
{
794
 
        switch (state_inx) {
795
 
                case SRUN_TASK_INIT:
796
 
                        return "initializing";
797
 
                case SRUN_TASK_RUNNING:
798
 
                        return "running";
799
 
                case SRUN_TASK_FAILED:
800
 
                        return "failed";
801
 
                case SRUN_TASK_EXITED:
802
 
                        return "exited";
803
 
                case SRUN_TASK_IO_WAIT:
804
 
                        return "waiting for io";
805
 
                case SRUN_TASK_ABNORMAL_EXIT:
806
 
                        return "exited abnormally";
807
 
                default:
808
 
                        return "unknown";
809
 
        }
810
 
}
811
 
 
812
 
static char *
813
 
_host_state_name(srun_host_state_t state_inx)
814
 
{
815
 
        switch (state_inx) {
816
 
                case SRUN_HOST_INIT:
817
 
                        return "initial";
818
 
                case SRUN_HOST_CONTACTED:
819
 
                        return "contacted";
820
 
                case SRUN_HOST_UNREACHABLE:
821
 
                        return "unreachable";
822
 
                case SRUN_HOST_REPLIED:
823
 
                        return "replied";
824
 
                default:
825
 
                        return "unknown";
826
 
        }
827
 
}
828
 
 
829
484
static char *
830
485
_normalize_hostlist(const char *hostlist)
831
486
{