718
760
req->job_step_id, req->uid, req->gid, host, port);
720
762
first_job_run = !slurm_cred_jobid_cached(conf->vctx, req->job_id);
721
if (_check_job_credential(req->cred, jobid, stepid, req_uid,
722
req->tasks_to_launch[nodeid],
763
if (_check_job_credential(req, req_uid, req->tasks_to_launch[nodeid],
723
764
&step_hset) < 0) {
725
766
error("Invalid job credential from %ld@%s: %m",
726
767
(long) req_uid, host);
730
771
#ifndef HAVE_FRONT_END
731
772
if (first_job_run) {
732
if (_run_prolog(req->job_id, req->uid, NULL) != 0) {
733
error("[job %u] prolog failed", req->job_id);
774
rc = _run_prolog(req->job_id, req->uid, NULL);
776
int term_sig, exit_status;
777
if (WIFSIGNALED(rc)) {
779
term_sig = WTERMSIG(rc);
781
exit_status = WEXITSTATUS(rc);
784
error("[job %u] prolog failed status=%d:%d",
785
req->job_id, exit_status, term_sig);
734
786
errnum = ESLURMD_PROLOG_FAILED;
793
slurm_mutex_lock(&job_limits_mutex);
794
if (!job_limits_list)
795
job_limits_list = list_create(_job_limits_free);
796
job_limits_ptr = list_find_first (job_limits_list,
799
if (!job_limits_ptr) {
800
//info("AddLim job:%u mem:%u",req->job_id,req->job_mem);
801
job_limits_ptr = xmalloc(sizeof(job_mem_limits_t));
802
job_limits_ptr->job_id = req->job_id;
803
list_append(job_limits_list, job_limits_ptr);
805
job_limits_ptr->job_mem = req->job_mem; /* reset limit */
806
slurm_mutex_unlock(&job_limits_mutex);
739
809
adlen = sizeof(self);
740
810
_slurm_getsockname(msg->conn_fd, (struct sockaddr *)&self, &adlen);
971
1050
/* Never return a message, slurmctld does not expect one */
1053
static void _job_limits_free(void *x)
1059
static int _job_limits_match(void *x, void *key)
1061
job_mem_limits_t *job_limits_ptr = (job_mem_limits_t *) x;
1062
uint32_t *job_id = (uint32_t *) key;
1063
if (job_limits_ptr->job_id == *job_id)
1068
/* Call only with job_limits_mutex locked */
1070
_load_job_limits(void)
1073
ListIterator step_iter;
1076
job_mem_limits_t *job_limits_ptr;
1077
slurmstepd_info_t *stepd_info_ptr;
1079
if (!job_limits_list)
1080
job_limits_list = list_create(_job_limits_free);
1081
job_limits_loaded = true;
1083
steps = stepd_available(conf->spooldir, conf->node_name);
1084
step_iter = list_iterator_create(steps);
1085
while ((stepd = list_next(step_iter))) {
1086
job_limits_ptr = list_find_first(job_limits_list,
1089
if (job_limits_ptr) /* already processed */
1091
fd = stepd_connect(stepd->directory, stepd->nodename,
1092
stepd->jobid, stepd->stepid);
1094
continue; /* step completed */
1095
stepd_info_ptr = stepd_get_info(fd);
1096
if (stepd_info_ptr && stepd_info_ptr->job_mem_limit) {
1097
/* create entry for this job */
1098
job_limits_ptr = xmalloc(sizeof(job_mem_limits_t));
1099
job_limits_ptr->job_id = stepd->jobid;
1100
job_limits_ptr->job_mem = stepd_info_ptr->job_mem_limit;
1101
debug("RecLim job:%u mem:%u",
1102
stepd->jobid, stepd_info_ptr->job_mem_limit);
1103
list_append(job_limits_list, job_limits_ptr);
1105
xfree(stepd_info_ptr);
1108
list_iterator_destroy(step_iter);
1109
list_destroy(steps);
1113
_enforce_job_mem_limit(void)
1116
ListIterator step_iter, job_limits_iter;
1117
job_mem_limits_t *job_limits_ptr;
1119
int fd, i, job_inx, job_cnt = 0;
1121
stat_jobacct_msg_t acct_req;
1122
stat_jobacct_msg_t *resp = NULL;
1123
struct job_mem_info {
1125
uint32_t mem_limit; /* MB */
1126
uint32_t mem_used; /* KB */
1128
struct job_mem_info *job_mem_info_ptr = NULL;
1130
job_notify_msg_t notify_req;
1131
job_step_kill_msg_t kill_req;
1133
slurm_mutex_lock(&job_limits_mutex);
1134
if (!job_limits_loaded)
1136
if (list_count(job_limits_list) == 0) {
1137
slurm_mutex_unlock(&job_limits_mutex);
1141
job_mem_info_ptr = xmalloc((list_count(job_limits_list) + 1) *
1142
sizeof(struct job_mem_info));
1144
job_limits_iter = list_iterator_create(job_limits_list);
1145
while ((job_limits_ptr = list_next(job_limits_iter))) {
1146
job_mem_info_ptr[job_cnt].job_id = job_limits_ptr->job_id;
1147
job_mem_info_ptr[job_cnt].mem_limit = job_limits_ptr->job_mem;
1150
list_iterator_destroy(job_limits_iter);
1151
slurm_mutex_unlock(&job_limits_mutex);
1153
steps = stepd_available(conf->spooldir, conf->node_name);
1154
step_iter = list_iterator_create(steps);
1155
while ((stepd = list_next(step_iter))) {
1156
for (job_inx=0; job_inx<job_cnt; job_inx++) {
1157
if (job_mem_info_ptr[job_inx].job_id == stepd->jobid)
1160
if (job_inx >= job_cnt)
1161
continue; /* job not being tracked */
1163
fd = stepd_connect(stepd->directory, stepd->nodename,
1164
stepd->jobid, stepd->stepid);
1166
continue; /* step completed */
1167
acct_req.job_id = stepd->jobid;
1168
acct_req.step_id = stepd->stepid;
1169
resp = xmalloc(sizeof(stat_jobacct_msg_t));
1170
if ((!stepd_stat_jobacct(fd, &acct_req, resp)) &&
1172
/* resp->jobacct is NULL if account is disabled */
1173
jobacct_common_getinfo((struct jobacctinfo *)
1175
JOBACCT_DATA_TOT_RSS,
1177
//info("job %u.%u rss:%u",stepd->jobid, stepd->stepid, step_rss);
1178
step_rss = MAX(step_rss, 1);
1179
job_mem_info_ptr[job_inx].mem_used += step_rss;
1181
slurm_free_stat_jobacct_msg(resp);
1184
list_iterator_destroy(step_iter);
1185
list_destroy(steps);
1187
for (i=0; i<job_cnt; i++) {
1188
if ((job_mem_info_ptr[i].mem_limit == 0) ||
1189
(job_mem_info_ptr[i].mem_used == 0)) {
1190
/* no memory limit or no steps found, purge record */
1191
slurm_mutex_lock(&job_limits_mutex);
1192
list_delete_all(job_limits_list, _job_limits_match,
1193
&job_mem_info_ptr[i].job_id);
1194
slurm_mutex_unlock(&job_limits_mutex);
1197
job_mem_info_ptr[i].mem_used /= 1024; /* KB to MB */
1198
if (job_mem_info_ptr[i].mem_used <=
1199
job_mem_info_ptr[i].mem_limit)
1202
info("Job %u exceeded memory limit (%u>%u), cancelling it",
1203
job_mem_info_ptr[i].job_id, job_mem_info_ptr[i].mem_used,
1204
job_mem_info_ptr[i].mem_limit);
1205
/* NOTE: Batch jobs may have no srun to get this message */
1206
slurm_msg_t_init(&msg);
1207
notify_req.job_id = job_mem_info_ptr[i].job_id;
1208
notify_req.job_step_id = NO_VAL;
1209
notify_req.message = "Exceeded job memory limit";
1210
msg.msg_type = REQUEST_JOB_NOTIFY;
1211
msg.data = ¬ify_req;
1212
slurm_send_only_controller_msg(&msg);
1214
kill_req.job_id = job_mem_info_ptr[i].job_id;
1215
kill_req.job_step_id = NO_VAL;
1216
kill_req.signal = SIGKILL;
1217
kill_req.batch_flag = (uint16_t) 0;
1218
msg.msg_type = REQUEST_CANCEL_JOB_STEP;
1219
msg.data = &kill_req;
1220
slurm_send_only_controller_msg(&msg);
1222
xfree(job_mem_info_ptr);
975
1226
_rpc_ping(slurm_msg_t *msg)
977
1228
int rc = SLURM_SUCCESS;
978
uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred);
980
if (!_slurm_authorized_user(req_uid)) {
981
error("Security violation, ping RPC from uid %u",
982
(unsigned int) req_uid);
983
rc = ESLURM_USER_ID_MISSING; /* or bad in this case */
986
/* Return result. If the reply can't be sent this indicates that
987
* 1. The network is broken OR
988
* 2. slurmctld has died OR
989
* 3. slurmd was paged out due to full memory
990
* If the reply request fails, we send an registration message to
991
* slurmctld in hopes of avoiding having the node set DOWN due to
992
* slurmd paging and not being able to respond in a timely fashion. */
993
if (slurm_send_rc_msg(msg, rc) < 0) {
994
error("Error responding to ping: %m");
995
send_registration_msg(SLURM_SUCCESS, false);
1229
uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred, NULL);
1231
if (!_slurm_authorized_user(req_uid)) {
1232
error("Security violation, ping RPC from uid %u",
1233
(unsigned int) req_uid);
1234
rc = ESLURM_USER_ID_MISSING; /* or bad in this case */
1237
/* Return result. If the reply can't be sent this indicates that
1238
* 1. The network is broken OR
1239
* 2. slurmctld has died OR
1240
* 3. slurmd was paged out due to full memory
1241
* If the reply request fails, we send an registration message to
1242
* slurmctld in hopes of avoiding having the node set DOWN due to
1243
* slurmd paging and not being able to respond in a timely fashion. */
1244
if (slurm_send_rc_msg(msg, rc) < 0) {
1245
error("Error responding to ping: %m");
1246
send_registration_msg(SLURM_SUCCESS, false);
1249
/* Take this opportunity to enforce any job memory limits */
1250
_enforce_job_mem_limit();
1255
_rpc_health_check(slurm_msg_t *msg)
1257
int rc = SLURM_SUCCESS;
1258
uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred, NULL);
1260
if (!_slurm_authorized_user(req_uid)) {
1261
error("Security violation, ping RPC from uid %u",
1262
(unsigned int) req_uid);
1263
rc = ESLURM_USER_ID_MISSING; /* or bad in this case */
1266
/* Return result. If the reply can't be sent this indicates that
1267
* 1. The network is broken OR
1268
* 2. slurmctld has died OR
1269
* 3. slurmd was paged out due to full memory
1270
* If the reply request fails, we send an registration message to
1271
* slurmctld in hopes of avoiding having the node set DOWN due to
1272
* slurmd paging and not being able to respond in a timely fashion. */
1273
if (slurm_send_rc_msg(msg, rc) < 0) {
1274
error("Error responding to ping: %m");
1275
send_registration_msg(SLURM_SUCCESS, false);
1278
if ((rc == SLURM_SUCCESS) && (conf->health_check_program)) {
1279
char *env[1] = { NULL };
1280
rc = run_script("health_check", conf->health_check_program,
1284
/* Take this opportunity to enforce any job memory limits */
1285
_enforce_job_mem_limit();
1959
2334
_rpc_suspend_job(slurm_msg_t *msg)
1961
2336
suspend_msg_t *req = msg->data;
1962
uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred);
2337
uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred, NULL);
1965
2339
ListIterator i;
1966
2340
step_loc_t *stepd;
1967
2341
int step_cnt = 0;
1968
int fd, rc = SLURM_SUCCESS;
2342
int first_time, rc = SLURM_SUCCESS;
1970
2344
if (req->op != SUSPEND_JOB && req->op != RESUME_JOB) {
1971
2345
error("REQUEST_SUSPEND: bad op code %u", req->op);
1972
2346
rc = ESLURM_NOT_SUPPORTED;
1975
debug("_rpc_suspend_job jobid=%u uid=%d",
1976
req->job_id, req_uid);
1977
job_uid = _get_job_uid(req->job_id);
1981
2350
* check that requesting user ID is the SLURM UID or root
1983
2352
if (!_slurm_authorized_user(req_uid)) {
1984
error("Security violation: signal_job(%u) from uid %ld",
2353
error("Security violation: suspend_job(%u) from uid %ld",
1985
2354
req->job_id, (long) req_uid);
1986
2355
rc = ESLURM_USER_ID_MISSING;
2358
/* send a response now, which will include any errors
2359
* detected with the request */
2360
if (msg->conn_fd >= 0) {
2361
slurm_send_rc_msg(msg, rc);
2362
if (slurm_close_accepted_conn(msg->conn_fd) < 0)
2363
error ("_rpc_suspend_job: close(%d): %m", msg->conn_fd);
2366
if (rc != SLURM_SUCCESS)
2369
/* now we can focus on performing the requested action,
2370
* which could take a few seconds to complete */
2371
debug("_rpc_suspend_job jobid=%u uid=%d action=%s", req->job_id,
2372
req_uid, req->op == SUSPEND_JOB ? "suspend" : "resume");
2374
/* Try to get a thread lock for this job. If the lock
2375
* is not available then sleep and try again */
2377
while (!_get_suspend_job_lock(req->job_id)) {
2379
debug3("suspend lock sleep for %u", req->job_id);
2383
/* If suspending and you got the lock on the first try then
2384
* sleep for 1 second to give any launch requests a chance
2385
* to get started and avoid a race condition that would
2386
* effectively cause the suspend request to get ignored
2387
* because "there's no job to suspend" */
2388
if (first_time && req->op == SUSPEND_JOB) {
2389
debug3("suspend first sleep for %u", req->job_id);
2393
/* Release or reclaim resources bound to these tasks (task affinity) */
2394
if (req->op == SUSPEND_JOB)
2395
(void) slurmd_suspend_job(req->job_id);
2397
(void) slurmd_resume_job(req->job_id);
1991
2400
* Loop through all job steps and call stepd_suspend or stepd_resume
2401
* as appropriate. Since the "suspend" action contains a 'sleep 1',
2402
* suspend multiple jobsteps in parallel.
1994
2404
steps = stepd_available(conf->spooldir, conf->node_name);
1995
2405
i = list_iterator_create(steps);
1996
while ((stepd = list_next(i))) {
1997
if (stepd->jobid != req->job_id) {
1998
/* multiple jobs expected on shared nodes */
1999
debug3("Step from other job: jobid=%u (this jobid=%u)",
2000
stepd->jobid, req->job_id);
2005
fd = stepd_connect(stepd->directory, stepd->nodename,
2006
stepd->jobid, stepd->stepid);
2008
debug3("Unable to connect to step %u.%u",
2009
stepd->jobid, stepd->stepid);
2408
int x, fdi, fd[NUM_PARALLEL_SUSPEND];
2410
while ((stepd = list_next(i))) {
2411
if (stepd->jobid != req->job_id) {
2412
/* multiple jobs expected on shared nodes */
2413
debug3("Step from other job: jobid=%u (this jobid=%u)",
2414
stepd->jobid, req->job_id);
2419
fd[fdi] = stepd_connect(stepd->directory,
2420
stepd->nodename, stepd->jobid,
2422
if (fd[fdi] == -1) {
2423
debug3("Unable to connect to step %u.%u",
2424
stepd->jobid, stepd->stepid);
2430
if (fdi >= NUM_PARALLEL_SUSPEND)
2433
/* check for open connections */
2013
2437
if (req->op == SUSPEND_JOB) {
2014
debug2("Suspending job step %u.%u",
2015
stepd->jobid, stepd->stepid);
2016
if (stepd_suspend(fd) < 0)
2017
debug(" suspend failed: %m", stepd->jobid);
2438
stepd_suspend(fd, fdi, req->job_id);
2019
debug2("Resuming job step %u.%u",
2020
stepd->jobid, stepd->stepid);
2021
if (stepd_resume(fd) < 0)
2022
debug(" resume failed: %m", stepd->jobid);
2440
/* "resume" remains a serial action (for now) */
2441
for (x = 0; x < fdi; x++) {
2442
debug2("Resuming job %u (cached step count %d)",
2444
if (stepd_resume(fd[x]) < 0)
2445
debug(" resume failed: %m");
2448
for (x = 0; x < fdi; x++)
2449
/* fd may have been closed by stepd_suspend */
2453
/* check for no more jobs */
2454
if (fdi < NUM_PARALLEL_SUSPEND)
2027
2457
list_iterator_destroy(i);
2028
2458
list_destroy(steps);
2459
_unlock_suspend_job(req->job_id);
2031
2461
if (step_cnt == 0) {
2032
2462
debug2("No steps in jobid %u to suspend/resume",
2037
* At this point, if connection still open, we send controller
2040
fini: if (msg->conn_fd >= 0) {
2041
slurm_send_rc_msg(msg, rc);
2042
if (slurm_close_accepted_conn(msg->conn_fd) < 0)
2043
error ("_rpc_signal_job: close(%d): %m", msg->conn_fd);