1
1
/*****************************************************************************\
2
2
* src/slurmd/slurmstepd/io.c - Standard I/O handling routines for slurmstepd
3
* $Id: io.c 13672 2008-03-19 23:10:58Z jette $
3
* $Id: io.c 17962 2009-06-24 19:41:49Z da $
4
4
*****************************************************************************
5
5
* Copyright (C) 2002 The Regents of the University of California.
6
6
* Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
7
7
* Written by Mark Grondona <mgrondona@llnl.gov>.
8
* CODE-OCEC-09-009. All rights reserved.
10
10
* This file is part of SLURM, a resource management program.
11
* For details, see <http://www.llnl.gov/linux/slurm/>.
11
* For details, see <https://computing.llnl.gov/linux/slurm/>.
12
* Please also read the included file: DISCLAIMER.
13
14
* SLURM is free software; you can redistribute it and/or modify it under
14
15
* the terms of the GNU General Public License as published by the Free
118
120
struct io_buf *out_msg;
119
121
int32_t out_remaining;
124
/* For clients that only write stdout or stderr, and/or only
125
write for one task. -1 means accept output from any task. */
126
int ltaskid_stdout, ltaskid_stderr;
130
/* true if writing to a file, false if writing to a socket */
135
static bool _local_file_writable(eio_obj_t *);
136
static int _local_file_write(eio_obj_t *, List);
138
struct io_operations local_file_ops = {
139
writable: &_local_file_writable,
140
handle_write: &_local_file_write,
123
144
/**********************************************************************
124
145
* Task write declarations
452
473
return SLURM_SUCCESS;
478
_local_file_writable(eio_obj_t *obj)
480
struct client_io_info *client = (struct client_io_info *) obj->arg;
482
xassert(client->magic == CLIENT_IO_MAGIC);
484
if (client->out_eof == true)
487
if (client->out_msg != NULL || !list_is_empty(client->msg_queue))
495
* The slurmstepd writes I/O to a file, possibly adding a label.
498
_local_file_write(eio_obj_t *obj, List objs)
500
struct client_io_info *client = (struct client_io_info *) obj->arg;
503
struct slurm_io_header header;
506
xassert(client->magic == CLIENT_IO_MAGIC);
508
* If we aren't already in the middle of sending a message, get the
509
* next message from the queue.
511
if (client->out_msg == NULL) {
512
client->out_msg = list_dequeue(client->msg_queue);
513
if (client->out_msg == NULL) {
514
return SLURM_SUCCESS;
516
client->out_remaining = client->out_msg->length -
517
io_hdr_packed_size();
520
/* This code to make a buffer, fill it, unpack its contents, and free
521
it is just used to read the header to get the global task id. */
522
header_tmp_buf = create_buf(client->out_msg->data,
523
client->out_msg->length);
524
io_hdr_unpack(&header, header_tmp_buf);
525
header_tmp_buf->head = NULL;
526
free_buf(header_tmp_buf);
528
/* A zero-length message indicates the end of a stream from one
529
of the tasks. Just free the message and return. */
530
if (header.length == 0) {
531
_free_outgoing_msg(client->out_msg, client->job);
532
client->out_msg = NULL;
533
return SLURM_SUCCESS;
536
/* Write the message to the file. */
537
buf = client->out_msg->data +
538
(client->out_msg->length - client->out_remaining);
540
n = write_labelled_message(obj->fd, buf, client->out_remaining,
541
header.gtaskid, client->labelio,
542
client->label_width);
544
client->out_eof = true;
545
_free_all_outgoing_msgs(client->msg_queue, client->job);
549
client->out_remaining -= n;
550
if (client->out_remaining == 0) {
551
_free_outgoing_msg(client->out_msg, client->job);
552
client->out_msg = NULL;
554
return SLURM_SUCCESS;
455
560
/**********************************************************************
456
561
* Task write functions
457
562
**********************************************************************/
899
989
fd_set_close_on_exec(task->stdout_fd);
900
990
task->from_stdout = -1; /* not used */
902
} else if (task->ofname != NULL) {
992
} else if (task->ofname != NULL &&
993
(!job->labelio || strcmp(task->ofname, "/dev/null")==0)) {
904
if (task->ofname != NULL) {
995
if (task->ofname != NULL &&
996
(!job->labelio || strcmp(task->ofname, "/dev/null")==0) ) {
906
998
/* open file on task's stdout */
907
999
debug5(" stdout file name = %s", task->ofname);
908
1000
task->stdout_fd = open(task->ofname, file_flags, 0666);
909
1001
if (task->stdout_fd == -1) {
910
error("Could not open stdout file: %m");
912
task->ofname = fname_create(job, "slurm-%J.out", 0);
913
task->stdout_fd = open(task->ofname, file_flags, 0666);
914
if (task->stdout_fd == -1)
1002
error("Could not open stdout file %s: %m",
917
1006
fd_set_close_on_exec(task->stdout_fd);
918
1007
task->from_stdout = -1; /* not used */
941
1030
#ifdef HAVE_PTY_H
943
1032
if (task->gtid == 0) {
1033
/* Make a file descriptor for the task to write to, but
1034
don't make a separate one read from, because in pty
1035
mode we can't distinguish between stdout and stderr
1036
coming from the remote shell. Both streams from the
1037
shell will go to task->stdout_fd, which is okay in
1038
pty mode because any output routed through the stepd
1039
will be displayed. */
944
1040
task->stderr_fd = dup(task->stdin_fd);
945
1041
fd_set_close_on_exec(task->stderr_fd);
946
task->from_stderr = dup(task->to_stdin);
947
fd_set_close_on_exec(task->from_stderr);
948
fd_set_nonblocking(task->from_stderr);
949
task->err = _create_task_out_eio(task->from_stderr,
950
SLURM_IO_STDERR, job, task);
951
list_append(job->stderr_eio_objs, (void *)task->err);
952
eio_new_initial_obj(job->eio, (void *)task->err);
1042
task->from_stderr = -1;
954
1044
xfree(task->efname);
955
1045
task->efname = xstrdup("/dev/null");
957
1047
fd_set_close_on_exec(task->stderr_fd);
958
1048
task->from_stderr = -1; /* not used */
960
} else if (task->efname != NULL) {
1050
} else if (task->efname != NULL &&
1051
(!job->labelio || strcmp(task->efname, "/dev/null")==0)) {
962
if (task->efname != NULL) {
1053
if (task->efname != NULL &&
1054
(!job->labelio || strcmp(task->efname, "/dev/null")==0) ) {
964
1056
/* open file on task's stdout */
965
1057
debug5(" stderr file name = %s", task->efname);
966
1058
task->stderr_fd = open(task->efname, file_flags, 0666);
967
1059
if (task->stderr_fd == -1) {
968
error("Could not open stderr file: %m");
970
task->efname = fname_create(job, "slurm-%J.err", 0);
971
task->stderr_fd = open(task->efname, file_flags, 0666);
972
if (task->stderr_fd == -1)
1060
error("Could not open stderr file %s: %m",
975
1064
fd_set_close_on_exec(task->stderr_fd);
976
1065
task->from_stderr = -1; /* not used */
1070
1161
if (msg == NULL)
1073
/* debug5("\"%s\"", msg->data + io_hdr_packed_size()); */
1075
1164
/* Add message to the msg_queue of all clients */
1076
1165
clients = list_iterator_create(out->job->clients);
1077
1166
while((eio = list_next(clients))) {
1078
1167
client = (struct client_io_info *)eio->arg;
1079
1168
if (client->out_eof == true)
1171
/* Some clients only take certain I/O streams */
1172
if (out->type==SLURM_IO_STDOUT) {
1173
if (client->ltaskid_stdout != -1 &&
1174
client->ltaskid_stdout != out->ltaskid)
1177
if (out->type==SLURM_IO_STDERR) {
1178
if (client->ltaskid_stderr != -1 &&
1179
client->ltaskid_stderr != out->ltaskid)
1081
1183
debug5("======================== Enqueued message");
1082
1184
xassert(client->magic == CLIENT_IO_MAGIC);
1083
1185
if (list_enqueue(client->msg_queue, msg))
1208
1338
pthread_sigmask(SIG_BLOCK, &set, NULL);
1210
1340
debug("IO handler started pid=%lu", (unsigned long) getpid());
1211
eio_handle_mainloop(job->eio);
1212
debug("IO handler exited");
1341
rc = eio_handle_mainloop(job->eio);
1342
debug("IO handler exited, rc=%d", rc);
1213
1343
return (void *)1;
1347
* Add a client to the job's client list that will write stdout and/or
1348
* stderr from the slurmstepd. The slurmstepd handles the write when
1349
* a file is created per node or per task, and the output needs to be
1350
* modified in some way, like labelling lines with the task number.
1353
io_create_local_client(const char *filename, int file_flags,
1354
slurmd_job_t *job, bool labelio,
1355
int stdout_tasks, int stderr_tasks)
1358
struct client_io_info *client;
1362
fd = open(filename, file_flags, 0666);
1364
return ESLURMD_IO_ERROR;
1366
fd_set_close_on_exec(fd);
1368
/* Now set up the eio object */
1369
client = xmalloc(sizeof(struct client_io_info));
1371
client->magic = CLIENT_IO_MAGIC;
1374
client->msg_queue = list_create(NULL); /* FIXME - destructor */
1376
client->ltaskid_stdout = stdout_tasks;
1377
client->ltaskid_stderr = stderr_tasks;
1378
client->labelio = labelio;
1379
client->is_local_file = true;
1381
client->label_width = 1;
1382
tmp = job->ntasks-1;
1383
while ((tmp /= 10) > 0)
1384
client->label_width++;
1387
obj = eio_obj_create(fd, &local_file_ops, (void *)client);
1388
list_append(job->clients, (void *)obj);
1389
eio_new_initial_obj(job->eio, (void *)obj);
1390
debug5("Now handling %d IO Client object(s)", list_count(job->clients));
1392
return SLURM_SUCCESS;
1217
1396
* Create the initial TCP connection back to a waiting client (e.g. srun).
1624
1830
return SLURM_SUCCESS;
1835
io_find_filename_pattern( slurmd_job_t *job,
1836
slurmd_filename_pattern_t *outpattern,
1837
slurmd_filename_pattern_t *errpattern,
1838
bool *same_out_err_files )
1841
int of_num_null = 0, ef_num_null = 0;
1842
int of_num_devnull = 0, ef_num_devnull = 0;
1843
int of_lastnull = -1, ef_lastnull = -1;
1844
bool of_all_same = true, ef_all_same = true;
1845
bool of_all_unique = true, ef_all_unique = true;
1847
*outpattern = SLURMD_UNKNOWN;
1848
*errpattern = SLURMD_UNKNOWN;
1849
*same_out_err_files = false;
1851
for (ii = 0; ii < job->ntasks; ii++) {
1852
if (job->task[ii]->ofname == NULL) {
1855
} else if (strcmp(job->task[ii]->ofname, "/dev/null")==0) {
1859
if (job->task[ii]->efname == NULL) {
1862
} else if (strcmp(job->task[ii]->efname, "/dev/null")==0) {
1866
if (of_num_null == job->ntasks)
1867
*outpattern = SLURMD_ALL_NULL;
1869
if (ef_num_null == job->ntasks)
1870
*errpattern = SLURMD_ALL_NULL;
1872
if (of_num_null == 1 && of_num_devnull == job->ntasks-1)
1873
*outpattern = SLURMD_ONE_NULL;
1875
if (ef_num_null == 1 && ef_num_devnull == job->ntasks-1)
1876
*errpattern = SLURMD_ONE_NULL;
1878
if (*outpattern == SLURMD_ALL_NULL && *errpattern == SLURMD_ALL_NULL)
1879
*same_out_err_files = true;
1881
if (*outpattern == SLURMD_ONE_NULL && *errpattern == SLURMD_ONE_NULL &&
1882
of_lastnull == ef_lastnull)
1883
*same_out_err_files = true;
1885
if (*outpattern != SLURMD_UNKNOWN && *errpattern != SLURMD_UNKNOWN)
1888
for (ii = 1; ii < job->ntasks; ii++) {
1889
if (!job->task[ii]->ofname || !job->task[0]->ofname ||
1890
strcmp(job->task[ii]->ofname, job->task[0]->ofname) != 0)
1891
of_all_same = false;
1893
if (!job->task[ii]->efname || !job->task[0]->efname ||
1894
strcmp(job->task[ii]->efname, job->task[0]->efname) != 0)
1895
ef_all_same = false;
1898
if (of_all_same && *outpattern == SLURMD_UNKNOWN)
1899
*outpattern = SLURMD_ALL_SAME;
1901
if (ef_all_same && *errpattern == SLURMD_UNKNOWN)
1902
*errpattern = SLURMD_ALL_SAME;
1904
if (job->task[0]->ofname && job->task[0]->efname &&
1905
strcmp(job->task[0]->ofname, job->task[0]->efname)==0)
1906
*same_out_err_files = true;
1908
if (*outpattern != SLURMD_UNKNOWN && *errpattern != SLURMD_UNKNOWN)
1911
for (ii = 0; ii < job->ntasks-1; ii++) {
1912
for (jj = ii+1; jj < job->ntasks; jj++) {
1914
if (!job->task[ii]->ofname || !job->task[jj]->ofname ||
1915
strcmp(job->task[ii]->ofname,
1916
job->task[jj]->ofname) == 0)
1917
of_all_unique = false;
1919
if (!job->task[ii]->efname || !job->task[jj]->efname ||
1920
strcmp(job->task[ii]->efname,
1921
job->task[jj]->efname) == 0)
1922
ef_all_unique = false;
1927
*outpattern = SLURMD_ALL_UNIQUE;
1930
*errpattern = SLURMD_ALL_UNIQUE;
1932
if (of_all_unique && ef_all_unique) {
1933
*same_out_err_files = true;
1934
for (ii = 0; ii < job->ntasks; ii++) {
1935
if (job->task[ii]->ofname &&
1936
job->task[ii]->efname &&
1937
strcmp(job->task[ii]->ofname,
1938
job->task[ii]->efname) != 0) {
1939
*same_out_err_files = false;
1948
io_get_file_flags(slurmd_job_t *job)
1950
slurm_ctl_conf_t *conf;
1953
/* set files for opening stdout/err */
1954
if (job->open_mode == OPEN_MODE_APPEND)
1955
file_flags = O_CREAT|O_WRONLY|O_APPEND;
1956
else if (job->open_mode == OPEN_MODE_TRUNCATE)
1957
file_flags = O_CREAT|O_WRONLY|O_APPEND|O_TRUNC;
1959
conf = slurm_conf_lock();
1960
if (conf->job_file_append)
1961
file_flags = O_CREAT|O_WRONLY|O_APPEND;
1963
file_flags = O_CREAT|O_WRONLY|O_APPEND|O_TRUNC;
1964
slurm_conf_unlock();