1
/*___INFO__MARK_BEGIN__*/
2
/*************************************************************************
4
* The Contents of this file are made available subject to the terms of
5
* the Sun Industry Standards Source License Version 1.2
7
* Sun Microsystems Inc., March, 2001
10
* Sun Industry Standards Source License Version 1.2
11
* =================================================
12
* The contents of this file are subject to the Sun Industry Standards
13
* Source License Version 1.2 (the "License"); You may not use this file
14
* except in compliance with the License. You may obtain a copy of the
15
* License at http://gridengine.sunsource.net/Gridengine_SISSL_license.html
17
* Software provided under this License is provided on an "AS IS" basis,
18
* WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING,
19
* WITHOUT LIMITATION, WARRANTIES THAT THE SOFTWARE IS FREE OF DEFECTS,
20
* MERCHANTABLE, FIT FOR A PARTICULAR PURPOSE, OR NON-INFRINGING.
21
* See the License for the specific provisions governing your rights and
22
* obligations concerning the Software.
24
* The Initial Developer of the Original Code is: Sun Microsystems, Inc.
26
* Copyright: 2001 by Sun Microsystems, Inc.
28
* All Rights Reserved.
30
************************************************************************/
31
/*___INFO__MARK_END__*/
34
#include <sys/types.h>
39
#include <sys/types.h>
41
#include <sys/resource.h>
49
#include "sge_ja_task.h"
50
#include "sge_pe_task.h"
51
#include "sge_qinstance.h"
54
#include "sge_usage.h"
56
#include "admin_mail.h"
59
#include "config_file.h"
60
#include "sge_signal.h"
61
#include "dispatcher.h"
63
#include "sge_job_qmaster.h"
64
#include "execution_states.h"
65
#include "sge_load_sensor.h"
66
#include "reaper_execd.h"
67
#include "job_report_execd.h"
69
#include "sge_qexec.h"
70
#include "sge_string.h"
71
#include "sge_afsutil.h"
72
#include "sge_parse_num_par.h"
73
#include "setup_path.h"
75
#include "msg_common.h"
76
#include "msg_daemons_common.h"
77
#include "msg_execd.h"
78
#include "sge_security.h"
79
#include "sge_feature.h"
80
#include "sge_spool.h"
81
#include "spool/classic/read_write_job.h"
83
#include "sge_unistd.h"
84
#include "sge_uidgid.h"
86
#include "sge_report.h"
87
#include "sge_ulong.h"
88
#include "sgeobj/sge_object.h"
89
#include "uti/sge_stdio.h"
93
static void unregister_from_ptf(u_long32 jobid, u_long32 jataskid, const char *pe_task_id, lListElem *jr);
96
static int clean_up_job(lListElem *jr, int failed, int signal, int is_array, const lListElem *pe, const char *job_owner);
97
static void convert_attribute(lList **cflpp, lListElem *jr, char *name, u_long32 udefau);
98
static int extract_ulong_attribute(lList **cflpp, char *name, u_long32 *valuep);
100
static lListElem *execd_job_failure(lListElem *jep, lListElem *jatep, lListElem *petep, const char *error_string, int general, int failed);
101
static int read_dusage(lListElem *jr, const char *jobdir, u_long32 jobid, u_long32 jataskid, int failed, int usage_mul_factor);
102
static void build_derived_final_usage(lListElem *jr, int usage_mul_factor);
104
static void examine_job_task_from_file(int startup, char *dir, lListElem *jep, lListElem *jatep, lListElem *petep, pid_t *pids, int npids);
107
/*****************************************************************************
108
This code is used only by execd.
109
Execd has gotten a SIGCLD or execd starts up.
110
Look whether there are finished jobs. Report them to the qmaster.
111
Our input is what shepherd wrote to the directory of the job:
112
"<execd_spool>/active_jobs/<jobnumber>/".
114
"error"-file errors detected by shepherd
115
"trace"-file information only interesting for sge programmers
116
"usage"-file usage of job (man getrusage)
117
"pid"-file pid of shepherd; can be used to look into the process table
119
If everything is done we can remove the job directory.
120
****************************************************************************/
121
int sge_reap_children_execd(int max_count)
124
int exit_status, child_signal, core_dumped, failed;
125
u_long32 jobid, jataskid;
126
lListElem *jep, *petep = NULL, *jatep = NULL;
131
DENTER(TOP_LAYER, "sge_reap_children_execd");
132
DPRINTF(("========================REAPER======================\n"));
140
if (reap_count >= max_count) {
141
DPRINTF(("max. reap count is reached - returning. reaped "sge_U32CFormat" childs.\n", sge_u32c(reap_count)));
145
exit_status = child_signal = core_dumped = failed = 0;
147
pid = waitpid(-1, &status, WNOHANG);
150
DPRINTF(("pid==0 - no stopped or exited children\n"));
155
DPRINTF(("pid==-1 - no children not previously waited for\n"));
159
if (WIFSTOPPED(status)) {
160
DPRINTF(("PID %d WIFSTOPPED\n", pid));
165
if (WIFCONTINUED(status)) {
166
DPRINTF(("PID %d WIFCONTINUED\n", pid));
171
if (WIFSIGNALED(status)) {
172
child_signal = WTERMSIG(status);
174
core_dumped = WCOREDUMP(status);
176
core_dumped = status & 80;
178
failed = ESSTATE_DIED_THRU_SIGNAL;
179
} else if (WIFEXITED(status)) {
180
exit_status = WEXITSTATUS(status);
182
failed = ESSTATE_SHEPHERD_EXIT;
184
/* not signaled and not exited - so what else happend with this guy? */
185
WARNING((SGE_EVENT, MSG_WAITPIDNOSIGNOEXIT_UI, sge_u32c(pid), status));
189
/* increase reaped job counter */
192
/* search whether it was a job or one of its tasks */
193
for_each(jep, *(object_type_get_master_list(SGE_TYPE_JOB))) {
197
for_each (jatep, lGetList(jep, JB_ja_tasks)) {
198
if (lGetUlong(jatep, JAT_pid) == pid) {
204
for_each(petep, lGetList(jatep, JAT_task_list)) {
205
if (lGetUlong(petep, PET_pid) == pid) {
224
jobid = lGetUlong(jep, JB_job_number);
225
jataskid = lGetUlong(jatep, JAT_task_number);
228
ERROR((SGE_EVENT, MSG_SHEPHERD_VSHEPHERDOFJOBWXDIEDTHROUGHSIGNALYZ_SUUSI,
229
(petep ? MSG_SLAVE : "" ),
232
core_dumped ? MSG_COREDUMPED: "",
236
ERROR((SGE_EVENT, MSG_SHEPHERD_WSHEPHERDOFJOBXYEXITEDWITHSTATUSZ_SUUI,
237
(petep ? MSG_SLAVE : "" ),
244
* seek job report for this job it must be contained in job report
245
* if not it should be a job kept with SGE_KEEP_ACTIVE (without
246
* keeping job object itself)
248
DPRINTF(("Job: "sge_u32", JA-Task: "sge_u32", PE-Task: %s\n", jobid, jataskid,
249
petep != NULL ? lGetString(petep, PET_id) : ""));
250
if (!(jr=get_job_report(jobid, jataskid, petep != NULL ? lGetString(petep, PET_id) : NULL))) {
251
ERROR((SGE_EVENT, MSG_JOB_MISSINGJOBXYINJOBREPORTFOREXITINGJOBADDINGIT_UU,
252
sge_u32c(jobid), sge_u32c(jataskid)));
253
jr = add_job_report(jobid, jataskid, petep != NULL ? lGetString(petep, PET_id) : NULL, jep);
256
/* when restarting execd it happens that cleanup_old_jobs()
257
has already cleaned up this job */
258
if (lGetUlong(jr, JR_state)==JEXITING) {
259
DPRINTF(("State of job "sge_u32" already changed to JEXITING\n", jobid));
263
lSetUlong(jr, JR_state, JEXITING);
266
lSetUlong(petep, PET_status, JEXITING);
268
lSetUlong(jatep, JAT_status, JEXITING);
271
clean_up_job(jr, failed, exit_status, job_is_array(jep),
272
lGetObject(jatep, JAT_pe_object), lGetString(jep, JB_owner));
274
flush_job_report(jr);
276
} else if (sge_ls_stop_if_pid(pid, 1)) {
278
ERROR((SGE_EVENT, MSG_STATUS_LOADSENSORDIEDWITHSIGNALXY_SI,
279
core_dumped ? MSG_COREDUMPED: "",
282
WARNING((SGE_EVENT, MSG_STATUS_LOADSENSOREXITEDWITHEXITSTATUS_I,
287
ERROR((SGE_EVENT, MSG_STATUS_MAILERDIEDTHROUGHSIGNALXY_SI,
288
core_dumped ? MSG_COREDUMPED: "",
290
else if (exit_status)
291
ERROR((SGE_EVENT, MSG_STATUS_MAILEREXITEDWITHEXITSTATUS_I,
295
DPRINTF(("reaped "sge_U32CFormat" childs - no child remaining\n", sge_u32c(reap_count)));
306
unregisters job from ptf and fills usage values
307
into appropriate job report element
311
0 everyting worked fine
312
1 got only zero usage from ptf
315
static void unregister_from_ptf(
318
const char *pe_task_id,
324
DENTER(TOP_LAYER, "unregister_from_ptf");
326
ptf_error = ptf_job_complete(job_id, ja_task_id, pe_task_id, &usage);
328
WARNING((SGE_EVENT, MSG_JOB_REAPINGJOBXPTFCOMPLAINSY_US,
329
sge_u32c(job_id), ptf_errstr(ptf_error)));
332
lXchgList(jr, JR_usage, &usage);
342
/************************************************************************
343
This is the job clean up and report function. We design this function
344
to be as independent from the execd as possible. Maybe we want to
345
make an extra executable later. We have another function cleaning up jobs
346
(execd_job_start_failure). This function is called if the job starting
347
failed. At this time there is no config-file present.
349
jobid = id of job to reap
350
failed = indicates a failure of job execution, see shepherd_states.h
351
************************************************************************/
352
static int clean_up_job(lListElem *jr, int failed, int shepherd_exit_status,
353
int is_array, const lListElem *pe, const char* job_owner)
355
dstring jobdir = DSTRING_INIT;
356
dstring fname = DSTRING_INIT;
358
char id_buffer[MAX_STRING_SIZE];
361
SGE_STRUCT_STAT statbuf;
364
u_long32 job_id, job_pid, ckpt_arena, general_failure = 0, ja_task_id;
365
const char *pe_task_id = NULL;
367
int usage_mul_factor;
369
DENTER(TOP_LAYER, "clean_up_job");
371
sge_dstring_init(&id_dstring, id_buffer, MAX_STRING_SIZE);
374
CRITICAL((SGE_EVENT, MSG_JOB_CLEANUPJOBCALLEDWITHINVALIDPARAMETERS));
379
job_id = lGetUlong(jr, JR_job_number);
380
ja_task_id = lGetUlong(jr, JR_ja_task_number);
381
pe_task_id = lGetString(jr, JR_pe_task_id_str);
383
DPRINTF(("cleanup for job %s\n",
384
job_get_id_string(job_id, ja_task_id, pe_task_id, &id_dstring)));
387
unregister_from_ptf(job_id, ja_task_id, pe_task_id, jr);
389
lDelSubStr(jr, UA_name, USAGE_ATTR_CPU, JR_usage);
390
lDelSubStr(jr, UA_name, USAGE_ATTR_MEM, JR_usage);
391
lDelSubStr(jr, UA_name, USAGE_ATTR_IO, JR_usage);
392
lDelSubStr(jr, UA_name, USAGE_ATTR_IOW, JR_usage);
393
lDelSubStr(jr, UA_name, USAGE_ATTR_VMEM, JR_usage);
394
lDelSubStr(jr, UA_name, USAGE_ATTR_MAXVMEM, JR_usage);
398
/* moved to remove_acked_job_exit() */
399
krb_destroy_forwarded_tgt(job_id);
402
/* set directory for job */
403
sge_get_active_job_file_path(&jobdir, job_id, ja_task_id, pe_task_id, NULL);
405
if (SGE_STAT(sge_dstring_get_string(&jobdir), &statbuf)) {
406
/* This never should happen, cause if we cant create this directory on
407
startup we report the job finish immediately */
408
ERROR((SGE_EVENT, MSG_JOB_CANTFINDDIRXFORREAPINGJOBYZ_SS,
409
sge_dstring_get_string(&jobdir),
410
job_get_id_string(job_id, ja_task_id, pe_task_id, &id_dstring)));
411
sge_dstring_free(&jobdir);
412
return -1; /* nothing can be done without information */
415
/* read config written by exec_job */
416
sge_get_active_job_file_path(&fname, job_id, ja_task_id, pe_task_id,
418
if (read_config(sge_dstring_get_string(&fname))) {
419
/* This should happen very rarely. exec_job() should avoid this
420
condition as far as possible. One possibility for this case is,
421
that the execd dies just after making the jobs active directory.
422
The pain with this case is, that we have not much information
423
to report this job to qmaster. */
424
ERROR((SGE_EVENT, MSG_JOB_CANTREADCONFIGFILEFORJOBXY_S,
425
job_get_id_string(job_id, ja_task_id, pe_task_id, &id_dstring)));
426
lSetUlong(jr, JR_failed, ESSTATE_NO_CONFIG);
427
lSetString(jr, JR_err_str, (char *) MSG_SHEPHERD_EXECDWENTDOWNDURINGJOBSTART);
429
sge_dstring_free(&fname);
430
sge_dstring_free(&jobdir);
441
* look for exit status of shepherd This is the last file the shepherd
442
* creates. So if we can find this shepherd terminated normal.
444
sge_get_active_job_file_path(&fname,
445
job_id, ja_task_id, pe_task_id, "exit_status");
446
if (!(fp = fopen(sge_dstring_get_string(&fname), "r"))) {
448
* we trust the exit status of the shepherd if it exited regularly
449
* otherwise we assume it died before starting the job (if died through signal or
450
* job_dir was found during execd startup)
452
if (failed == ESSTATE_SHEPHERD_EXIT)
453
failed = shepherd_exit_status;
455
failed = SSTATE_BEFORE_PROLOG;
457
sprintf(error, MSG_STATUS_ABNORMALTERMINATIONOFSHEPHERDFORJOBXY_S,
458
job_get_id_string(job_id, ja_task_id, pe_task_id, &id_dstring));
459
ERROR((SGE_EVENT, error));
462
* failed = ESSTATE_SHEPHERD_EXIT or exit status of shepherd if we are
463
* parent in case we can't open the exit_status file
466
int fscanf_count, shepherd_exit_status_file;
468
fscanf_count = fscanf(fp, "%d", &shepherd_exit_status_file);
469
FCLOSE_IGNORE_ERROR(fp);
470
if (fscanf_count != 1) {
471
sprintf(error, MSG_STATUS_ABNORMALTERMINATIONFOSHEPHERDFORJOBXYEXITSTATEFILEISEMPTY_S,
472
job_get_id_string(job_id, ja_task_id, pe_task_id, &id_dstring));
473
ERROR((SGE_EVENT, error));
475
* If shepherd died through signal assume job was started, else
478
if (failed == ESSTATE_SHEPHERD_EXIT)
479
failed = shepherd_exit_status;
481
failed = ESSTATE_NO_EXITSTATUS;
482
} else if (failed != ESSTATE_NO_PID) /* only set during execd startup */
483
failed = shepherd_exit_status_file;
486
* failed is content of exit_status file or ESSTATE_NO_PID or real
487
* exit status if file can't be read
494
* exit status of shepherd (one of SSTATE_* values)
495
* ESSTATE_DIED_THRU_SIGNAL
500
if (failed == ESSTATE_DIED_THRU_SIGNAL)
501
sprintf(error, MSG_SHEPHERD_DIEDTHROUGHSIGNAL);
502
else if (failed == ESSTATE_NO_PID)
503
sprintf(error, MSG_SHEPHERD_NOPIDFILE);
505
sprintf(error, MSG_SHEPHERD_EXITEDWISSTATUS_IS, failed,
506
get_sstate_description(failed));
509
/* look for error file this overrules errors found yet */
510
sge_get_active_job_file_path(&fname,
511
job_id, ja_task_id, pe_task_id, "error");
512
if ((fp = fopen(sge_dstring_get_string(&fname), "r"))) {
515
if ((n = fread(error, 1, sizeof(error), fp))) {
516
/* Non empty error file. The shepherd encounterd a problem. */
518
failed = ESSTATE_UNEXP_ERRORFILE;
520
/* ensure only first line of error file is in 'error' */
521
if ((new_line=strchr(error, '\n')))
523
DPRINTF(("ERRORFILE: %256s\n", error));
526
DPRINTF(("empty error file\n"));
528
ERROR((SGE_EVENT, MSG_JOB_CANTREADERRORFILEFORJOBXY_S,
529
job_get_id_string(job_id, ja_task_id, pe_task_id, &id_dstring)));
531
FCLOSE_IGNORE_ERROR(fp);
534
ERROR((SGE_EVENT, MSG_FILE_NOOPEN_SS, sge_dstring_get_string(&fname), strerror(errno)));
535
/* There is no error file. */
541
** now that shepherd stops on errors there is no usage file
542
** if the job never ran
543
** but we need a dusage struct because information that has nothing
544
** to do with the job also goes in there, this should be changed
545
** read_dusage gets the failed parameter to decide what should be there
548
/* to report correct usage for loosly integrated parallel jobs,
549
* we have to compute a multiplication factor for acct_reserved_usage
556
slots = (s=get_conf_val("pe_slots"))?atoi(s):1;
557
usage_mul_factor = execd_get_acct_multiplication_factor(pe, slots,
558
(pe_task_id != NULL) ? true : false);
561
if (read_dusage(jr, sge_dstring_get_string(&jobdir), job_id, ja_task_id, failed, usage_mul_factor)) {
562
if (error[0] == '\0') {
563
sprintf(error, MSG_JOB_CANTREADUSAGEFILEFORJOBXY_S,
564
job_get_id_string(job_id, ja_task_id, pe_task_id, &id_dstring));
567
ERROR((SGE_EVENT, error));
570
failed = SSTATE_FAILURE_AFTER_JOB;
578
/* map system signals into sge signals to make signo's exchangable */
579
du=lGetSubStr(jr, UA_name, "signal", JR_usage);
581
int signo = (int)lGetDouble(du, UA_value);
586
/* Job died through a signal */
587
sprintf(error, MSG_JOB_WXDIEDTHROUGHSIGNALYZ_SSI,
588
job_get_id_string(job_id, ja_task_id, pe_task_id, &id_dstring),
589
sge_sys_sig2str(signo), signo);
591
DPRINTF(("%s\n", error));
592
failed = SSTATE_FAILURE_AFTER_JOB;
594
if ((sge_signo=sge_map_signal(signo)) != -1)
595
lSetDouble(du, UA_value, (double)sge_signo);
598
if ((u_long32)lGetDouble(du, UA_value) == 0xffffffff) {
600
failed = SSTATE_FAILURE_AFTER_JOB;
602
sprintf(error, MSG_JOB_CANTREADUSEDRESOURCESFORJOB);
609
/* Be careful: the checkpointing checking is done at the end. It will
610
* often override other failure states.
611
* If the job finishes, the shepherd must remove the "checkpointed" file
614
sge_get_active_job_file_path(&fname,
615
job_id, ja_task_id, pe_task_id, "checkpointed");
616
ckpt_arena = 1; /* 1 job will be restarted in case of failure *
617
* 2 job will be restarted from ckpt arena */
618
if (!SGE_STAT(sge_dstring_get_string(&fname), &statbuf)) {
621
failed = SSTATE_MIGRATE;
622
if ((fp = fopen(sge_dstring_get_string(&fname), "r"))) {
623
DPRINTF(("found checkpointed file\n"));
624
if (fscanf(fp, "%d", &dummy)==1) {
625
DPRINTF(("need restart from ckpt arena\n"));
628
FCLOSE_IGNORE_ERROR(fp);
631
sge_get_active_job_file_path(&fname, job_id, ja_task_id, pe_task_id,
633
if (!SGE_STAT(sge_dstring_get_string(&fname), &statbuf)) {
634
if ((fp = fopen(sge_dstring_get_string(&fname), "r"))) {
635
if (!fscanf(fp, sge_u32 , &job_pid))
637
FCLOSE_IGNORE_ERROR(fp);
641
ERROR((SGE_EVENT, MSG_JOB_CANTOPENJOBPIDFILEFORJOBXY_S,
642
job_get_id_string(job_id, ja_task_id, pe_task_id, &id_dstring)));
649
lSetUlong(jr, JR_job_pid, job_pid);
651
/* Only used for ckpt jobs: 1 checkpointed, 2 checkpoint in the arena */
652
lSetUlong(jr, JR_ckpt_arena, ckpt_arena);
654
/* Currently the shepherd doesn't create this file */
655
sge_get_active_job_file_path(&fname, job_id, ja_task_id, pe_task_id,
657
if (!SGE_STAT(sge_dstring_get_string(&fname), &statbuf))
658
failed = SSTATE_AGAIN;
661
lSetUlong(jr, JR_failed, failed);
662
DPRINTF(("job report for job "SFN": failed = %ld\n",
663
job_get_id_string(job_id, ja_task_id, pe_task_id, &id_dstring),
667
lSetString(jr, JR_err_str, error);
668
DPRINTF(("job report for job "SFN": err_str = %s\n",
669
job_get_id_string(job_id, ja_task_id, pe_task_id, &id_dstring),
673
/* general_failure */
675
case SSTATE_BEFORE_PROLOG:
676
case SSTATE_HELPER_SERVICE_ERROR:
677
case SSTATE_CHECK_DAEMON_CONFIG:
679
/* for CR 6252457 this block should be removed, but for a real fix the
680
host or queue error detection must be improved
682
general_failure = GFSTATE_HOST;
683
lSetUlong(jr, JR_general_failure, general_failure);
684
job_related_adminmail(EXECD, jr, is_array, job_owner);
686
case SSTATE_PROCSET_NOTSET:
687
case SSTATE_READ_CONFIG:
688
case SSTATE_PROLOG_FAILED:
689
case SSTATE_BEFORE_PESTART:
690
case SSTATE_PESTART_FAILED:
691
general_failure = GFSTATE_QUEUE;
692
lSetUlong(jr, JR_general_failure, general_failure);
693
job_related_adminmail(EXECD, jr, is_array, job_owner);
695
case SSTATE_BEFORE_JOB:
696
case SSTATE_NO_SHELL:
698
int job_caused_failure = 0;
699
lListElem *job = NULL;
700
lListElem *ja_task = NULL;
701
lListElem *master_queue = NULL;
702
const void *iterator = NULL;
704
/* Bugfix: Issuezilla 1031/1034
705
* The problem in 1031 is that each task got added as its own job
706
* structure, but the reaper was only looking at the first job
707
* structure in the list. Instead, we have to iterate through the
708
* list by hand to make sure we find every instance. */
710
job = lGetElemUlongFirst(*(object_type_get_master_list(SGE_TYPE_JOB)), JB_job_number, job_id, &iterator);
712
ja_task = job_search_task(job, NULL, ja_task_id);
713
if(ja_task != NULL) {
716
job = lGetElemUlongNext(*(object_type_get_master_list(SGE_TYPE_JOB)), JB_job_number, job_id,
720
if ((job != NULL) && (ja_task != NULL)) {
721
master_queue = responsible_queue(job, ja_task, NULL);
724
if ((failed == SSTATE_NO_SHELL) && (job != NULL) &&
725
((lGetList(job, JB_shell_list) != NULL) ||
726
((master_queue != NULL) &&
727
JOB_TYPE_IS_BINARY(lGetUlong(job, JB_type)) &&
728
JOB_TYPE_IS_NO_SHELL(lGetUlong(job, JB_type))))) {
729
job_caused_failure = 1;
730
} else if ((failed == SSTATE_NO_SHELL) && (master_queue != NULL)) {
731
char* shell_start_mode = mconf_get_shell_start_mode();
732
const char *mode = job_get_shell_start_mode(job, master_queue,
734
FREE(shell_start_mode);
736
if (!strcmp(mode, "unix_behavior") != 0) {
737
job_caused_failure = 1;
739
} else if ((failed == SSTATE_BEFORE_JOB) && (job != NULL) &&
740
JOB_TYPE_IS_BINARY(lGetUlong(job, JB_type)) &&
741
!sge_is_file(lGetString(job, JB_script_file))) {
742
job_caused_failure = 1;
745
general_failure = job_caused_failure ? GFSTATE_JOB : GFSTATE_QUEUE;
746
lSetUlong(jr, JR_general_failure, general_failure);
747
job_related_adminmail(EXECD, jr, is_array, job_owner);
751
** if an error occurred where the job
752
** is source of the failure
754
case SSTATE_OPEN_OUTPUT:
756
case SSTATE_AFS_PROBLEM:
757
case SSTATE_APPERROR:
758
case SSTATE_PASSWD_MISSING:
759
case SSTATE_PASSWD_WRONG:
760
case SSTATE_HELPER_SERVICE_BEFORE_JOB:
761
general_failure = GFSTATE_JOB;
762
lSetUlong(jr, JR_general_failure, general_failure);
763
job_related_adminmail(EXECD, jr, is_array, job_owner);
766
** if an error occurred after the job has been run
767
** it is not as serious
769
case SSTATE_BEFORE_PESTOP:
770
case SSTATE_PESTOP_FAILED:
771
case SSTATE_BEFORE_EPILOG:
772
case SSTATE_EPILOG_FAILED:
773
case SSTATE_PROCSET_NOTFREED:
774
general_failure = GFSTATE_NO_HALT;
775
lSetUlong(jr, JR_general_failure, general_failure);
776
job_related_adminmail(EXECD, jr, is_array, job_owner);
779
** these are shepherd error conditions met by the execd
781
case ESSTATE_NO_CONFIG:
783
case ESSTATE_DIED_THRU_SIGNAL:
784
case ESSTATE_SHEPHERD_EXIT:
785
case ESSTATE_NO_EXITSTATUS:
786
case ESSTATE_UNEXP_ERRORFILE:
787
case ESSTATE_UNKNOWN_JOB:
789
** test for admin mail here
791
general_failure = GFSTATE_NO_HALT;
792
lSetUlong(jr, JR_general_failure, general_failure);
793
job_related_adminmail(EXECD, jr, is_array, job_owner);
796
general_failure = GFSTATE_NO_HALT;
798
** this is no error, because not all failures apply to general_failure
803
lSetUlong(jr, JR_general_failure, general_failure);
804
DPRINTF(("job report for job "SFN": general_failure = %ld\n",
805
job_get_id_string(job_id, ja_task_id, pe_task_id, &id_dstring),
808
sge_dstring_free(&fname);
809
sge_dstring_free(&jobdir);
815
/* ------------------------- */
816
void remove_acked_job_exit(
817
sge_gdi_ctx_class_t *ctx,
820
const char *pe_task_id,
823
char *exec_file, *script_file, *tmpdir, *job_owner, *qname;
824
dstring jobdir = DSTRING_INIT;
825
char fname[SGE_PATH_MAX];
826
char err_str_buffer[1024];
828
SGE_STRUCT_STAT statbuf;
829
lListElem *jep, *petep = NULL, *jatep = NULL;
831
const char *pe_task_id_str;
832
const void *iterator;
833
const char *sge_root = ctx->get_sge_root(ctx);
835
DENTER(TOP_LAYER, "remove_acked_job_exit");
837
sge_dstring_init(&err_str, err_str_buffer, sizeof(err_str_buffer));
839
if (ja_task_id == 0) {
840
ERROR((SGE_EVENT, MSG_SHEPHERD_REMOVEACKEDJOBEXITCALLEDWITHX_U, sge_u32c(job_id)));
845
pe_task_id_str = jr?lGetString(jr, JR_pe_task_id_str):NULL;
847
/* try to find this job in our job list */
849
jep = lGetElemUlongFirst(*(object_type_get_master_list(SGE_TYPE_JOB)), JB_job_number, job_id, &iterator);
851
jatep = job_search_task(jep, NULL, ja_task_id);
855
jep = lGetElemUlongNext(*(object_type_get_master_list(SGE_TYPE_JOB)), JB_job_number, job_id, &iterator);
861
DPRINTF(("REMOVING WITH jep && jatep\n"));
862
if (pe_task_id_str) {
863
petep = lGetElemStr(lGetList(jatep, JAT_task_list), PET_id, pe_task_id_str);
866
ERROR((SGE_EVENT, MSG_JOB_XYHASNOTASKZ_UUS,
867
sge_u32c(job_id), sge_u32c(ja_task_id), pe_task_id_str));
873
if (lGetUlong(jr, JR_state) != JEXITING) {
874
WARNING((SGE_EVENT, MSG_EXECD_GOTACKFORPETASKBUTISNOTINSTATEEXITING_S, pe_task_id_str));
879
master_q = responsible_queue(jep, jatep, petep);
881
master_q = responsible_queue(jep, jatep, NULL);
884
/* use mail list of job instead of tasks one */
885
if (jr && lGetUlong(jr, JR_state) != JSLAVE) {
886
reaper_sendmail(ctx, jep, jr);
894
/* destroy credentials cache of job */
896
krb_destroy_forwarded_tgt(job_id);
899
** Execute command to delete the client's DCE or Kerberos credentials.
901
if (mconf_get_do_credentials())
902
delete_credentials(sge_root, jep);
904
/* remove job/task active dir */
905
if (!mconf_get_keep_active() && !getenv("SGE_KEEP_ACTIVE")) {
906
sge_get_active_job_file_path(&jobdir,
907
job_id, ja_task_id, pe_task_id,
909
DPRINTF(("removing active dir: %s\n", sge_dstring_get_string(&jobdir)));
910
if (sge_rmdir(sge_dstring_get_string(&jobdir), &err_str)) {
911
ERROR((SGE_EVENT, MSG_FILE_CANTREMOVEDIRECTORY_SS,
912
sge_dstring_get_string(&jobdir), err_str_buffer));
916
/* increment # of free slots. In case no slot is used any longer
917
we have to remove queues tmpdir for this job */
918
used_slots = qinstance_slots_used(master_q) - 1;
919
qinstance_set_slots_used(master_q, used_slots);
921
sge_remove_tmpdir(lGetString(master_q, QU_tmpdir),
922
lGetString(jep, JB_owner), lGetUlong(jep, JB_job_number),
923
ja_task_id, lGetString(master_q, QU_qname));
926
if (!pe_task_id_str) {
927
if (!mconf_get_simulate_jobs()) {
928
job_remove_spool_file(job_id, ja_task_id, NULL, SPOOL_WITHIN_EXECD);
930
if (!JOB_TYPE_IS_BINARY(lGetUlong(jep, JB_type)) &&
931
lGetString(jep, JB_exec_file)) {
933
lListElem *tmp_job = NULL;
935
/* it is possible to remove the exec_file if
936
less than one task of a job is running */
937
tmp_job = lGetElemUlongFirst(*(object_type_get_master_list(SGE_TYPE_JOB)), JB_job_number, job_id, &iterator);
938
while (tmp_job != NULL && task_number <= 2) {
940
tmp_job = lGetElemUlongNext(*(object_type_get_master_list(SGE_TYPE_JOB)), JB_job_number, job_id, &iterator);
943
if (task_number <= 1) {
944
DPRINTF(("unlinking script file %s\n", lGetString(jep, JB_exec_file)));
945
unlink(lGetString(jep, JB_exec_file));
950
DPRINTF(("not removing job file: pe_task_id_str = %s\n",
956
if (pe_task_id_str) {
957
/* unchain pe task element from task list */
958
lRemoveElem(lGetList(jatep, JAT_task_list), &petep);
960
/* check if job has queue limits and decrease global flag if necessary */
961
modify_queue_limits_flag_for_job(ctx->get_unqualified_hostname(ctx), jep, false);
963
lRemoveElem(*(object_type_get_master_list(SGE_TYPE_JOB)), &jep);
967
} else { /* must be an ack of an ask job request from qmaster */
969
DPRINTF(("REMOVING WITHOUT jep && jatep\n"));
970
/* clean up active jobs entry */
971
if (!pe_task_id_str) {
972
ERROR((SGE_EVENT, MSG_SHEPHERD_ACKNOWLEDGEFORUNKNOWNJOBXYZ_UUS,
973
sge_u32c(job_id), sge_u32c(ja_task_id),
974
(pe_task_id_str ? pe_task_id_str : MSG_MASTER)));
980
krb_destroy_forwarded_tgt(job_id);
982
sge_get_active_job_file_path(&jobdir,
983
job_id, ja_task_id, pe_task_id,
985
if (SGE_STAT(sge_dstring_get_string(&jobdir), &statbuf)) {
986
ERROR((SGE_EVENT, MSG_SHEPHERD_CANTFINDACTIVEJOBSDIRXFORREAPINGJOBY_SU,
987
sge_dstring_get_string(&jobdir), sge_u32c(job_id)));
989
/*** read config file written by exec_job ***/
990
sprintf(fname, "%s/config", sge_dstring_get_string(&jobdir));
991
if (read_config(fname)) {
992
/* This should happen very rarely. exec_job() should avoid this
993
condition as far as possible. One possibility for this case is,
994
that the execd dies just after making the jobs active directory.
995
The pain with this case is, that we have not much information
996
to report this job to qmaster. */
998
if (sge_rmdir(sge_dstring_get_string(&jobdir), &err_str)) {
999
ERROR((SGE_EVENT, MSG_FILE_CANTREMOVEDIRECTORY_SS,
1000
sge_dstring_get_string(&jobdir), err_str_buffer));
1004
/* do not remove xterm or qlogin starter ! */
1005
if ((script_file = get_conf_val("script_file"))
1006
&& strcasecmp(script_file, "INTERACTIVE")
1007
&& strcasecmp(script_file, "QLOGIN")) {
1008
if ((exec_file = get_conf_val("exec_file"))) {
1009
DPRINTF(("removing exec_file %s\n", exec_file));
1015
if ((!(tmpdir = get_conf_val("queue_tmpdir"))) ||
1016
(!(qname = get_conf_val("queue"))) ||
1017
(!(job_owner = get_conf_val("job_owner")))) {
1018
ERROR((SGE_EVENT, MSG_SHEPHERD_INCORRECTCONFIGFILEFORJOBXY_UU,
1019
sge_u32c(job_id), sge_u32c(ja_task_id)));
1021
DPRINTF(("removing queue_tmpdir %s\n", tmpdir));
1022
sge_remove_tmpdir(tmpdir, job_owner, job_id, ja_task_id, qname);
1027
if (!mconf_get_simulate_jobs()) {
1028
job_remove_spool_file(job_id, ja_task_id, NULL, SPOOL_WITHIN_EXECD);
1032
if (!mconf_get_keep_active() && !getenv("SGE_KEEP_ACTIVE")) {
1033
DPRINTF(("removing active dir: %s\n", sge_dstring_get_string(&jobdir)));
1034
if (sge_rmdir(sge_dstring_get_string(&jobdir), &err_str)) {
1035
ERROR((SGE_EVENT, MSG_FILE_CANTREMOVEDIRECTORY_SS,
1036
sge_dstring_get_string(&jobdir), err_str_buffer));
1041
cleanup_job_report(job_id, ja_task_id);
1044
sge_dstring_free(&jobdir);
1050
/**************************************************************************
1051
This function is called if we fail to start a job.
1052
Returns the appropriate job report.
1054
We clean up the job.
1055
We report the job_number and the error to the master.
1056
If general is set we have a general problems starting jobs.
1057
The master should not give us jobs in the future and the administrator
1058
should be informed about the problem (e.g. we cant write to a filesystem).
1059
**************************************************************************/
1060
lListElem *execd_job_start_failure(
1064
const char *error_string,
1067
return execd_job_failure(jep, jatep, petep, error_string, general, SSTATE_FAILURE_BEFORE_JOB);
1070
lListElem *execd_job_run_failure(
1074
const char *error_string,
1077
return execd_job_failure(jep, jatep, petep, error_string, general, SSTATE_FAILURE_AFTER_JOB);
1080
static lListElem *execd_job_failure(
1084
const char *error_string,
1089
u_long32 jobid, jataskid;
1090
const char *petaskid = NULL;
1092
DENTER(TOP_LAYER, "execd_job_failure");
1094
jobid = lGetUlong(jep, JB_job_number);
1095
jataskid = lGetUlong(jatep, JAT_task_number);
1097
petaskid = lGetString(petep, PET_id);
1101
(failed==SSTATE_FAILURE_BEFORE_JOB)?
1102
MSG_SHEPHERD_CANTSTARTJOBXY_US:
1103
MSG_SHEPHERD_PROBLEMSAFTERSTART_DS, sge_u32c(jobid), error_string));
1105
jr = get_job_report(jobid, jataskid, petaskid);
1107
DPRINTF(("no job report found to report job start failure!\n"));
1108
jr = add_job_report(jobid, jataskid, petaskid, jep);
1111
if (petep != NULL) {
1112
ep = lFirst(lGetList(petep, PET_granted_destin_identifier_list));
1114
ep = lFirst(lGetList(jatep, JAT_granted_destin_identifier_list));
1118
lSetString(jr, JR_queue_name, lGetString(ep, JG_qname));
1121
lSetUlong(jr, JR_failed, failed);
1122
lSetUlong(jr, JR_general_failure, general);
1123
lSetString(jr, JR_err_str, error_string);
1125
lSetUlong(jr, JR_state, JEXITING);
1127
job_related_adminmail(EXECD, jr, job_is_array(jep), lGetString(jep, JB_owner));
1134
/**************************************************************************
1135
This function is called if we are asked by the master concerning a job
1136
we dont know anything about. We have to tell this to the qmaster, so that
1137
he can clean up this job.
1138
This is done very like the normal job finish and runs into the same
1139
functions in the qmaster.
1140
**************************************************************************/
1148
DENTER(TOP_LAYER, "job_unknown");
1150
ERROR((SGE_EVENT, MSG_SHEPHERD_JATASKXYISKNOWNREPORTINGITTOQMASTER,
1151
sge_u32c(jobid), sge_u32c(jataskid)));
1153
jr = add_job_report(jobid, jataskid, NULL, NULL);
1155
lSetString(jr, JR_queue_name, qname);
1156
lSetUlong(jr, JR_failed, ESSTATE_UNKNOWN_JOB);
1157
lSetUlong(jr, JR_state, JEXITING);
1158
lSetString(jr, JR_err_str, (char*) MSG_JR_ERRSTR_EXECDDONTKNOWJOB);
1165
/************************************************************************
1166
Look for old jobs hanging around on disk and report them to the qmaster.
1167
This is used at execd startup time.
1168
We have to call it cyclic cause there may be jobs alive while execd went
1169
down and up. If such a job exits we get no SIGCLD from shepherd.
1170
If startup is true this is the first call of the execd. We produce more
1171
output for the administrator the first time.
1172
************************************************************************/
1173
int clean_up_old_jobs(int startup)
1175
SGE_STRUCT_DIRENT *dent = NULL;
1177
char dir[SGE_PATH_MAX];
1178
pid_t pids[10000]; /* a bunch of processes */
1179
int npids; /* number of running processes */
1181
u_long32 jobid, jataskid;
1182
static int lost_children = 1;
1183
lListElem *jep, *petep, *jatep = NULL;
1185
DENTER(TOP_LAYER, "clean_up_old_jobs");
1188
INFO((SGE_EVENT, MSG_SHEPHERD_CKECKINGFOROLDJOBS));
1192
If we get an empty Master_Job_List we know that
1193
it is no longer necessary to pass this code
1195
Getting job information by parsing ps-output
1196
is very expensive. The aim is to get informed
1197
about jobs that were started by the execd
1198
running before we were started. Jobs that
1199
were started by us are our childs and we
1200
get a cheap SIGCLD informing us about the
1203
So if we arrive here and an empty Master_Job_List
1204
we know all jobs that were our "lost children"
1205
exited and there is no need for ps-commands.
1208
if (mconf_get_simulate_jobs() ||
1209
lGetNumberOfElem(*(object_type_get_master_list(SGE_TYPE_JOB))) == 0 ||
1211
if (lost_children) {
1212
INFO((SGE_EVENT, MSG_SHEPHERD_NOOLDJOBSATSTARTUP));
1215
/* all children exited */
1220
/* Get pids of running jobs. So we can look for running shepherds. */
1221
npids = sge_get_pids(pids, 10000, SGE_SHEPHERD, PSCMD);
1223
ERROR((SGE_EVENT, MSG_SHEPHERD_CANTGETPROCESSESFROMPSCOMMAND));
1228
DPRINTF(("found %d running processes\n", npids));
1230
/* We read the job information from active dir. There is one subdir for each
1231
started job. Shepherd writes exit_status and usage to this directory.
1232
Cause maybe shepherd was killed while execd was down we have to look into
1233
the process table too. */
1235
if (!(cwd=opendir(ACTIVE_DIR))) {
1236
ERROR((SGE_EVENT, MSG_FILE_CANTOPENDIRECTORYX_SS, ACTIVE_DIR, strerror(errno)));
1241
while ((dent=SGE_READDIR(cwd))) {
1242
char string[256], *token, *endp;
1244
const void *iterator;
1246
jobdir = dent->d_name; /* jobdir is the jobid.jataskid converted to string */
1247
strcpy(string, jobdir);
1249
if (!strcmp(jobdir, ".") || !strcmp(jobdir, "..") )
1252
jobid = jataskid = 0;
1253
if ((token = strtok(string, " ."))) {
1254
tmp_id = strtol(token, &endp, 10);
1255
if (*endp == '\0') {
1257
if ((token = strtok(NULL, " \n\t"))) {
1258
tmp_id = strtol(token, &endp, 10);
1259
if (*endp == '\0') {
1266
if (!jobid || !jataskid) {
1267
/* someone left his garbage in our directory */
1268
WARNING((SGE_EVENT, MSG_SHEPHERD_XISNOTAJOBDIRECTORY_S, jobdir));
1272
/* seek job to this jobdir */
1273
jep = lGetElemUlongFirst(*(object_type_get_master_list(SGE_TYPE_JOB)), JB_job_number, jobid, &iterator);
1274
while(jep != NULL) {
1275
jatep = job_search_task(jep, NULL, jataskid);
1279
jep = lGetElemUlongNext(*(object_type_get_master_list(SGE_TYPE_JOB)), JB_job_number, jobid, &iterator);
1282
if (!jep || !jatep) {
1283
/* missing job in job dir but not in active job dir */
1285
ERROR((SGE_EVENT, MSG_SHEPHERD_FOUNDACTIVEJOBDIRXWHILEMISSINGJOBDIRREMOVING_S, jobdir));
1287
/* remove active jobs directory */
1288
DPRINTF(("+++++++++++++++++++ remove active jobs directory ++++++++++++++++++\n"));
1290
char path[SGE_PATH_MAX];
1291
sprintf(path, ACTIVE_DIR"/%s", jobdir);
1292
sge_rmdir(path, NULL);
1296
if (lGetUlong(jatep, JAT_status) != JSLAVE) {
1297
sprintf(dir, "%s/%s", ACTIVE_DIR, jobdir);
1298
examine_job_task_from_file(startup, dir, jep, jatep, NULL, pids, npids);
1300
for_each(petep, lGetList(jatep, JAT_task_list)) {
1301
sprintf(dir, "%s/%s/%s", ACTIVE_DIR, jobdir, lGetString(petep, PET_id));
1302
examine_job_task_from_file(startup, dir, jep, jatep, petep, pids, npids);
1304
} /* while (dent=SGE_READDIR(cwd)) */
1313
examine_job_task_from_file(int startup, char *dir, lListElem *jep,
1314
lListElem *jatep, lListElem *petep, pid_t *pids,
1317
lListElem *jr = NULL;
1318
int shepherd_alive; /* =1 -> this shepherd is in the process table */
1320
SGE_STRUCT_STAT statbuf;
1321
char fname[SGE_PATH_MAX];
1322
pid_t pid; /* pid of shepherd */
1324
u_long32 jobid, jataskid;
1325
const char *pe_task_id_str = NULL;
1326
static u_long32 startup_time = 0;
1328
DENTER(TOP_LAYER, "examine_job_task_from_file");
1330
if (!startup_time) {
1331
startup_time = sge_get_gmt();
1334
jobid = lGetUlong(jep, JB_job_number);
1335
jataskid = lGetUlong(jatep, JAT_task_number);
1337
pe_task_id_str = lGetString(petep, PET_id);
1340
if (SGE_STAT(dir, &statbuf)) {
1341
ERROR((SGE_EVENT, MSG_SHEPHERD_CANTSTATXY_SS, dir, strerror(errno)));
1346
if (!(statbuf.st_mode && S_IFDIR)) {
1347
ERROR((SGE_EVENT, MSG_FILE_XISNOTADIRECTORY_S, dir));
1352
DPRINTF(("Found job directory: %s\n", dir));
1354
INFO((SGE_EVENT, MSG_SHEPHERD_FOUNDDIROFJOBX_S, dir));
1357
/* Look for pid of shepherd */
1358
sprintf(fname, "%s/pid", dir);
1359
if (!(fp = fopen(fname, "r"))) {
1361
1. a job started before startup of execd
1362
In this case the job was started by the old execd
1363
and there must be a pid fild -> Logging
1365
2. a newly started job
1366
In this case the shepherd had not enough time
1367
to write the pid file -> No Logging */
1368
if (startup && startup_time >= lGetUlong(jatep, JAT_start_time)) {
1369
ERROR((SGE_EVENT, MSG_SHEPHERD_CANTREADPIDFILEXFORJOBYSTARTTIMEZX_SSUS,
1370
fname, dir, sge_u32c(lGetUlong(jatep, JAT_start_time)), strerror(errno)));
1371
/* seek job report for this job - it must be contained in job report
1372
If this is a newly started execd we can assume the execd was broken
1373
in the interval between making the jobs active directory and writing
1374
the shepherds pid (done by the started shepherd). So we just remove
1375
and report this job. */
1376
if (!(jr=get_job_report(jobid, jataskid, pe_task_id_str))) {
1377
CRITICAL((SGE_EVENT, MSG_SHEPHERD_MISSINGJOBXINJOBREPORTFOREXITINGJOB_U, sge_u32c(jobid)));
1378
jr = add_job_report(jobid, jataskid, pe_task_id_str, NULL);
1380
lSetUlong(jr, JR_state, JEXITING);
1381
clean_up_job(jr, ESSTATE_NO_PID, 0, job_is_array(jep),
1382
lGetObject(jatep, JAT_pe_object), lGetString(jep, JB_owner)); /* failed before execution */
1388
if (fscanf(fp, pid_t_fmt, &pid) != 1) {
1389
/* most probably a newly started job
1390
shepherd usually just had not enough time for writing the pid file
1391
if these warnings do appear frequently one might consider having
1392
execd (and thus the shepherds) do local spooling instead of via NFS */
1393
WARNING((SGE_EVENT, MSG_SHEPHERD_CANTREADPIDFROMPIDFILEXFORJOBY_SS,
1395
FCLOSE_IGNORE_ERROR(fp);
1399
FCLOSE_IGNORE_ERROR(fp);
1401
/* look whether shepherd is still alive */
1402
shepherd_alive = sge_contains_pid(pid, pids, npids);
1404
/* report this information */
1405
sprintf(err_str, MSG_SHEPHERD_SHEPHERDFORJOBXHASPIDYANDISZALIVE_SUS,
1406
dir, sge_u32c(pid), (shepherd_alive ? "": MSG_NOT));
1408
INFO((SGE_EVENT, err_str));
1414
if (shepherd_alive) { /* shepherd alive -> nothing to do */
1417
at startup we need to change the
1418
state of not exited jobs from JWRITTEN
1421
if ((jr=get_job_report(jobid, jataskid, pe_task_id_str))) {
1423
lSetUlong(jr, JR_state, JRUNNING);
1424
/* here we will call a ptf function to get */
1425
/* the first usage data after restart */
1427
/* found job in active jobs directory
1428
but not in spool directory of execd */
1429
ERROR((SGE_EVENT, MSG_SHEPHERD_INCONSISTENTDATAFORJOBX_U, sge_u32c(jobid)));
1430
jr = add_job_report(jobid, jataskid, pe_task_id_str, NULL);
1431
lSetUlong(jr, JR_state, JEXITING);
1438
/* seek job report for this job - it must be contained in job report */
1439
if (!(jr=get_job_report(jobid, jataskid, pe_task_id_str))) {
1440
CRITICAL((SGE_EVENT, MSG_SHEPHERD_MISSINGJOBXYINJOBREPORT_UU, sge_u32c(jobid), sge_u32c(jataskid)));
1441
jr = add_job_report(jobid, jataskid, pe_task_id_str, jep);
1446
/* if the state is already JEXITING work is done
1447
for this job and we wait for ACK from qmaster */
1448
if (lGetUlong(jr, JR_state)==JEXITING) {
1449
DPRINTF(("State of job "sge_u32"."sge_u32" already changed to JEXITING\n", jobid, jataskid));
1454
clean_up_job(jr, 0, 0, job_is_array(jep), lGetObject(jatep, JAT_pe_object), lGetString(jep, JB_owner));
1455
lSetUlong(jr, JR_state, JEXITING);
1457
flush_job_report(jr);
1463
/************************************************************/
1464
/* fill dusage with: */
1465
/* - data retrieved from config - file */
1466
/* - data retrieved from "usage" as shepherd has written it */
1467
/* if we cant read "usage" exit_status = 0xffffffff */
1468
/************************************************************/
1470
read_dusage(lListElem *jr, const char *jobdir, u_long32 jobid,
1471
u_long32 jataskid, int failed, int usage_mul_factor)
1473
char pid_file[SGE_PATH_MAX];
1477
DENTER(TOP_LAYER, "read_dusage");
1481
sprintf(pid_file, "%s/pid", jobdir);
1483
if (failed != ESSTATE_NO_PID) {
1484
fp = fopen(pid_file, "r");
1486
fscanf(fp, sge_u32 , &pid);
1489
ERROR((SGE_EVENT, MSG_SHEPHERD_CANTOPENPIDFILEXFORJOBYZ_SUU,
1490
pid_file, sge_u32c(jobid), sge_u32c(jataskid)));
1494
if (failed != ESSTATE_NO_CONFIG) {
1495
dstring buffer = DSTRING_INIT;
1496
const char *qinstance_name = NULL;
1498
qinstance_name = sge_dstring_sprintf(&buffer, SFN"@"SFN,
1499
get_conf_val("queue"),
1500
get_conf_val("host"));
1501
lSetString(jr, JR_queue_name, qinstance_name);
1502
qinstance_name = NULL;
1503
sge_dstring_free(&buffer);
1505
add_usage(jr, "submission_time", get_conf_val("submission_time"), (double)0);
1506
add_usage(jr, "priority", get_conf_val("priority"), (double)0);
1509
/* read "usage" file */
1511
** there may be more failure states where there is no usage file
1512
** but it is best if we try to read it and ignore the error if reading failed
1513
** if we have an error already
1515
if (!failed || (failed > SSTATE_BEFORE_JOB)) {
1516
char usage_file[SGE_PATH_MAX];
1517
sprintf(usage_file, "%s/usage", jobdir);
1518
fp = fopen(usage_file, "r");
1522
u_long32 wait_status;
1524
read_config_list(fp, &cflp, NULL, CF_Type, CF_name, CF_value, 0, "=", 0, buf, sizeof(buf));
1527
if (extract_ulong_attribute(&cflp, "wait_status", &wait_status)==0)
1528
lSetUlong(jr, JR_wait_status, wait_status);
1530
convert_attribute(&cflp, jr, "exit_status", 1);
1531
convert_attribute(&cflp, jr, "signal", 0);
1532
convert_attribute(&cflp, jr, "start_time", 0);
1533
convert_attribute(&cflp, jr, "end_time", 0);
1535
convert_attribute(&cflp, jr, "ru_wallclock", 0);
1537
convert_attribute(&cflp, jr, "ru_utime", 0);
1538
convert_attribute(&cflp, jr, "ru_stime", 0);
1539
convert_attribute(&cflp, jr, "ru_maxrss", 0);
1540
convert_attribute(&cflp, jr, "ru_ixrss", 0);
1541
convert_attribute(&cflp, jr, "ru_ismrss", 0);
1542
convert_attribute(&cflp, jr, "ru_idrss", 0);
1543
convert_attribute(&cflp, jr, "ru_isrss", 0);
1544
convert_attribute(&cflp, jr, "ru_minflt", 0);
1545
convert_attribute(&cflp, jr, "ru_majflt", 0);
1546
convert_attribute(&cflp, jr, "ru_nswap", 0);
1547
convert_attribute(&cflp, jr, "ru_inblock", 0);
1548
convert_attribute(&cflp, jr, "ru_oublock", 0);
1549
convert_attribute(&cflp, jr, "ru_msgsnd", 0);
1550
convert_attribute(&cflp, jr, "ru_msgrcv", 0);
1551
convert_attribute(&cflp, jr, "ru_nsignals", 0);
1552
convert_attribute(&cflp, jr, "ru_nvcsw", 0);
1553
convert_attribute(&cflp, jr, "ru_nivcsw", 0);
1556
#ifdef NEC_ACCOUNTING_ENTRIES
1557
/* Additional accounting information for NEC SX-4 SX-5 */
1558
#if defined(NECSX4) || defined(NECSX5)
1560
convert_attribute(&cflp, jr, "necsx_necsx4", 0);
1561
#elif defined(NECSX5)
1562
convert_attribute(&cflp, jr, "necsx_necsx5", 0);
1564
convert_attribute(&cflp, jr, "necsx_base_prty", 0);
1565
convert_attribute(&cflp, jr, "necsx_time_slice", 0);
1566
convert_attribute(&cflp, jr, "necsx_num_procs", 0);
1567
convert_attribute(&cflp, jr, "necsx_kcore_min", 0);
1568
convert_attribute(&cflp, jr, "necsx_mean_size", 0);
1569
convert_attribute(&cflp, jr, "necsx_maxmem_size", 0);
1570
convert_attribute(&cflp, jr, "necsx_chars_trnsfd", 0);
1571
convert_attribute(&cflp, jr, "necsx_blocks_rw", 0);
1572
convert_attribute(&cflp, jr, "necsx_inst", 0);
1573
convert_attribute(&cflp, jr, "necsx_vector_inst", 0);
1574
convert_attribute(&cflp, jr, "necsx_vector_elmt", 0);
1575
convert_attribute(&cflp, jr, "necsx_vec_exe", 0);
1576
convert_attribute(&cflp, jr, "necsx_flops", 0);
1577
convert_attribute(&cflp, jr, "necsx_conc_flops", 0);
1578
convert_attribute(&cflp, jr, "necsx_fpec", 0);
1579
convert_attribute(&cflp, jr, "necsx_cmcc", 0);
1580
convert_attribute(&cflp, jr, "necsx_bccc", 0);
1581
convert_attribute(&cflp, jr, "necsx_mt_open", 0);
1582
convert_attribute(&cflp, jr, "necsx_io_blocks", 0);
1583
convert_attribute(&cflp, jr, "necsx_multi_single", 0);
1584
convert_attribute(&cflp, jr, "necsx_max_nproc", 0);
1588
build_derived_final_usage(jr, usage_mul_factor);
1592
ERROR((SGE_EVENT, MSG_SHEPHERD_CANTOPENUSAGEFILEXFORJOBYZX_SUUS,
1593
usage_file, sge_u32c(jobid), sge_u32c(jataskid), strerror(errno)));
1604
if (lGetList(jr, JR_usage)) {
1605
DPRINTF(("resulting usage attributes:\n"));
1607
DPRINTF(("empty usage list\n"));
1610
for_each (ep, lGetList(jr, JR_usage)) {
1611
DPRINTF((" \"%s\" = %f\n",
1612
lGetString(ep, UA_name),
1613
lGetDouble(ep, UA_value)));
1620
ERROR((SGE_EVENT, MSG_FILE_ERRORCLOSEINGXY_SS, "usage or pid",
1627
static void build_derived_final_usage(lListElem *jr, int usage_mul_factor)
1630
double ru_cpu, pdc_cpu;
1633
io, iow, r_io, r_iow, maxvmem, r_maxvmem;
1634
double h_vmem = 0, s_vmem = 0;
1636
DENTER(TOP_LAYER, "build_derived_final_usage");
1638
parse_ulong_val(&h_vmem, NULL, TYPE_MEM, get_conf_val("h_vmem"), NULL, 0);
1639
parse_ulong_val(&s_vmem, NULL, TYPE_MEM, get_conf_val("s_vmem"), NULL, 0);
1640
h_vmem = MIN(s_vmem, h_vmem);
1642
usage_list = lGetList(jr, JR_usage);
1644
/* cpu = MAX(sum of "ru_utime" and "ru_stime" , PDC "cpu" usage) */
1645
ru_cpu = usage_list_get_double_usage(usage_list, "ru_utime", 0) +
1646
usage_list_get_double_usage(usage_list, "ru_stime", 0);
1647
pdc_cpu = usage_list_get_double_usage(usage_list, USAGE_ATTR_CPU, 0);
1648
cpu = MAX(ru_cpu, pdc_cpu);
1650
/* r_cpu = h_rt * usage_mul_factor
1651
* (see execd_get_acct_multiplication_factor)
1653
r_cpu = (usage_list_get_double_usage(usage_list, "end_time", 0) -
1654
usage_list_get_double_usage(usage_list, "start_time", 0)) *
1657
/* mem = PDC "mem" usage or zero */
1658
mem = usage_list_get_double_usage(usage_list, USAGE_ATTR_MEM, 0);
1660
/* r_mem = r_cpu * h_vmem */
1661
if (h_vmem != DBL_MAX)
1662
r_mem = (r_cpu * h_vmem)/(1024*1024*1024);
1666
/* io = PDC "io" usage or zero */
1667
io = usage_list_get_double_usage(usage_list, USAGE_ATTR_IO, 0);
1672
/* iow = PDC "io wait time" or zero */
1673
iow = usage_list_get_double_usage(usage_list, USAGE_ATTR_IOW, 0);
1679
r_maxvmem = maxvmem = usage_list_get_double_usage(usage_list, USAGE_ATTR_MAXVMEM, 0);
1681
DPRINTF(("CPU/MEM/IO: M(%f/%f/%f) R(%f/%f/%f) acct: %s stree: %s\n",
1682
cpu, mem, io, r_cpu, r_mem, r_io,
1683
mconf_get_acct_reserved_usage()?"R":"M", mconf_get_sharetree_reserved_usage()?"R":"M"));
1685
if (mconf_get_acct_reserved_usage()) {
1686
add_usage(jr, USAGE_ATTR_CPU_ACCT, NULL, r_cpu);
1687
add_usage(jr, USAGE_ATTR_MEM_ACCT, NULL, r_mem);
1688
add_usage(jr, USAGE_ATTR_IO_ACCT, NULL, r_io);
1689
add_usage(jr, USAGE_ATTR_IOW_ACCT, NULL, r_iow);
1690
if (r_maxvmem != DBL_MAX)
1691
add_usage(jr, USAGE_ATTR_MAXVMEM_ACCT, NULL, r_maxvmem);
1693
add_usage(jr, USAGE_ATTR_CPU_ACCT, NULL, cpu);
1694
add_usage(jr, USAGE_ATTR_MEM_ACCT, NULL, mem);
1695
add_usage(jr, USAGE_ATTR_IO_ACCT, NULL, io);
1696
add_usage(jr, USAGE_ATTR_IOW_ACCT, NULL, iow);
1697
if (maxvmem != DBL_MAX)
1698
add_usage(jr, USAGE_ATTR_MAXVMEM_ACCT, NULL, maxvmem);
1701
if (mconf_get_sharetree_reserved_usage()) {
1702
add_usage(jr, USAGE_ATTR_CPU, NULL, r_cpu);
1703
add_usage(jr, USAGE_ATTR_MEM, NULL, r_mem);
1704
add_usage(jr, USAGE_ATTR_IO, NULL, r_io);
1705
add_usage(jr, USAGE_ATTR_IOW, NULL, r_iow);
1706
if (r_maxvmem!= DBL_MAX)
1707
add_usage(jr, USAGE_ATTR_MAXVMEM, NULL, r_maxvmem);
1709
add_usage(jr, USAGE_ATTR_CPU, NULL, cpu);
1710
add_usage(jr, USAGE_ATTR_MEM, NULL, mem);
1711
add_usage(jr, USAGE_ATTR_IO, NULL, io);
1712
add_usage(jr, USAGE_ATTR_IOW, NULL, iow);
1713
if (maxvmem!= DBL_MAX)
1714
add_usage(jr, USAGE_ATTR_MAXVMEM, NULL, maxvmem);
1721
/*****************************************************************/
1722
static void convert_attribute(
1730
s = get_conf_value(NULL, *cflpp, CF_name, CF_value, name);
1731
add_usage(jr, name, s, (double)udefault);
1732
lDelElemStr(cflpp, CF_name, name);
1736
/*****************************************************************/
1738
static int extract_ulong_attribute(
1746
if (!(s = get_conf_value(NULL, *cflpp, CF_name, CF_value, name)))
1748
ret = sscanf(s, sge_u32, valuep);
1749
lDelElemStr(cflpp, CF_name, name);
1750
return (ret == 1)?0:-1;
1754
/* send mail to users if requested */
1755
void reaper_sendmail(
1756
sge_gdi_ctx_class_t *ctx,
1761
u_long32 mail_options;
1762
char sge_mail_subj[1024];
1763
char sge_mail_body[10*2048];
1764
char sge_mail_start[128];
1765
char sge_mail_end[128];
1766
u_long32 jobid, taskid, failed;
1767
double ru_utime, ru_stime, ru_wallclock;
1768
double ru_cpu = 0.0, ru_maxvmem = 0.0;
1769
int exit_status = -1, signo = -1;
1770
const char *q, *h, *u;
1772
const char *pe_task_id_str;
1775
dstring cpu_string = DSTRING_INIT;
1776
dstring maxvmem_string = DSTRING_INIT;
1777
const char *qualified_hostname = ctx->get_qualified_hostname(ctx);
1779
DENTER(TOP_LAYER, "reaper_sendmail");
1781
sge_dstring_init(&ds, buffer, sizeof(buffer));
1782
mail_users = lGetList(jep, JB_mail_list);
1783
mail_options = lGetUlong(jep, JB_mail_options);
1784
pe_task_id_str = lGetString(jr, JR_pe_task_id_str);
1786
if (!(q=lGetString(jr, JR_queue_name)))
1787
q = MSG_MAIL_UNKNOWN_NAME;
1789
h = qualified_hostname;
1791
if (!(u=lGetString(jep, JB_owner)))
1792
u = MSG_MAIL_UNKNOWN_NAME;
1794
/* JG: TODO (397): Extend usage module: usage_list_get_ctime_usage()
1795
* and use the other usage_list_get functions.
1798
if ((ep=lGetSubStr(jr, UA_name, "start_time", JR_usage)))
1799
strcpy(sge_mail_start, sge_ctime((time_t)lGetDouble(ep, UA_value), &ds));
1801
strcpy(sge_mail_start, MSG_MAIL_UNKNOWN_NAME);
1803
if ((ep=lGetSubStr(jr, UA_name, "end_time", JR_usage)))
1804
strcpy(sge_mail_end, sge_ctime((time_t)lGetDouble(ep, UA_value), &ds));
1806
strcpy(sge_mail_end, MSG_MAIL_UNKNOWN_NAME);
1808
if ((ep=lGetSubStr(jr, UA_name, "ru_utime", JR_usage)))
1809
ru_utime = lGetDouble(ep, UA_value);
1813
if ((ep=lGetSubStr(jr, UA_name, "ru_stime", JR_usage)))
1814
ru_stime = lGetDouble(ep, UA_value);
1818
if ((ep=lGetSubStr(jr, UA_name, "ru_wallclock", JR_usage)))
1819
ru_wallclock = lGetDouble(ep, UA_value);
1823
if ((ep=lGetSubStr(jr, UA_name, USAGE_ATTR_CPU_ACCT, JR_usage)))
1824
ru_cpu = lGetDouble(ep, UA_value);
1825
if ((ep=lGetSubStr(jr, UA_name, USAGE_ATTR_MAXVMEM_ACCT, JR_usage)))
1826
ru_maxvmem = lGetDouble(ep, UA_value);
1828
jobid = lGetUlong(jr, JR_job_number);
1829
taskid = lGetUlong(jr, JR_ja_task_number);
1831
failed = lGetUlong(jr, JR_failed);
1833
if ((ep=lGetSubStr(jr, UA_name, "exit_status", JR_usage)))
1834
exit_status = (int)lGetDouble(ep, UA_value);
1836
double_print_time_to_dstring(ru_cpu, &cpu_string);
1837
double_print_memory_to_dstring(ru_maxvmem, &maxvmem_string);
1839
/* send job exit mail only for master task */
1840
if ((VALID(MAIL_AT_EXIT, mail_options)) && !failed && !pe_task_id_str) {
1841
dstring utime_string = DSTRING_INIT;
1842
dstring stime_string = DSTRING_INIT;
1843
dstring wtime_string = DSTRING_INIT;
1845
DPRINTF(("mail VALID at EXIT\n"));
1846
double_print_time_to_dstring(ru_utime, &utime_string);
1847
double_print_time_to_dstring(ru_stime, &stime_string);
1848
double_print_time_to_dstring(ru_wallclock, &wtime_string);
1849
if (job_is_array(jep)) {
1850
sprintf(sge_mail_subj, MSG_MAIL_SUBJECT_JA_TASK_COMP_UUS,
1851
sge_u32c(jobid), sge_u32c(taskid), lGetString(jep, JB_job_name));
1852
sprintf(sge_mail_body,
1853
MSG_MAIL_BODY_COMP_SSSSSSSSSSSI,
1860
sge_dstring_get_string(&utime_string),
1861
sge_dstring_get_string(&stime_string),
1862
sge_dstring_get_string(&wtime_string),
1863
(ru_cpu == 0.0) ? "NA":sge_dstring_get_string(&cpu_string),
1864
(ru_maxvmem == 0.0) ? "NA":sge_dstring_get_string(&maxvmem_string),
1867
sprintf(sge_mail_subj, MSG_MAIL_SUBJECT_JOB_COMP_US,
1868
sge_u32c(jobid), lGetString(jep, JB_job_name));
1869
sprintf(sge_mail_body,
1870
MSG_MAIL_BODY_COMP_SSSSSSSSSSSI,
1877
sge_dstring_get_string(&utime_string),
1878
sge_dstring_get_string(&stime_string),
1879
sge_dstring_get_string(&wtime_string),
1880
(ru_cpu == 0.0) ? "NA":sge_dstring_get_string(&cpu_string),
1881
(ru_maxvmem == 0.0) ? "NA":sge_dstring_get_string(&maxvmem_string),
1885
cull_mail(EXECD, mail_users, sge_mail_subj, sge_mail_body, MSG_MAIL_TYPE_COMP);
1886
sge_dstring_free(&utime_string);
1887
sge_dstring_free(&stime_string);
1888
sge_dstring_free(&wtime_string);
1891
if (((VALID(MAIL_AT_ABORT, mail_options))
1892
|| (VALID(MAIL_AT_EXIT, mail_options))) &&
1893
(failed || lGetUlong(jr, JR_general_failure)==GFSTATE_JOB)) {
1895
const char *err_str;
1896
const char *action, *comment = "";
1898
if (failed==SSTATE_MIGRATE) {
1899
action = MSG_MAIL_ACTION_MIGR;
1900
} else if (failed==SSTATE_AGAIN) {
1901
action = MSG_MAIL_ACTION_RESCH;
1902
} else if (failed==SSTATE_APPERROR) {
1903
action = MSG_MAIL_ACTION_APPERROR;
1904
} else if (lGetUlong(jr, JR_general_failure)==GFSTATE_JOB) {
1905
action = MSG_MAIL_ACTION_ERR;
1906
comment = MSG_MAIL_ACTION_ERR_COMMENT;
1908
action = MSG_MAIL_ACTION_ABORT;
1911
if ((ep=lGetSubStr(jr, UA_name, "signal", JR_usage)))
1912
signo = (u_long32)lGetDouble(ep, UA_value);
1914
if (!(err_str=lGetString(jr, JR_err_str)))
1915
err_str = MSG_UNKNOWNREASON;
1917
DPRINTF(("MAIL VALID at ABORT\n"));
1918
sprintf(exitstr, "%d", exit_status);
1919
if (pe_task_id_str == NULL) {
1920
if (job_is_array(jep)) {
1921
sprintf(sge_mail_subj,
1922
MSG_MAIL_SUBJECT_JA_TASK_STATE_UUSS,
1925
lGetString(jep, JB_job_name),
1927
sprintf(sge_mail_body,
1928
MSG_MAIL_BODY_STATE_SSSSSSSSSSSSS,
1932
u, q, h, sge_mail_start, sge_mail_end,
1933
(ru_cpu == 0.0) ? "NA":sge_dstring_get_string(&cpu_string),
1934
(ru_maxvmem == 0.0) ? "NA":sge_dstring_get_string(&maxvmem_string),
1935
get_sstate_description(failed),
1939
sprintf(sge_mail_subj,
1940
MSG_MAIL_SUBJECT_JOB_STATE_USS,
1942
lGetString(jep, JB_job_name),
1944
sprintf(sge_mail_body,
1945
MSG_MAIL_BODY_STATE_SSSSSSSSSSSSS,
1949
u, q, h, sge_mail_start, sge_mail_end,
1950
(ru_cpu == 0.0) ? "NA":sge_dstring_get_string(&cpu_string),
1951
(ru_maxvmem == 0.0) ? "NA":sge_dstring_get_string(&maxvmem_string),
1952
get_sstate_description(failed),
1956
cull_mail(EXECD, mail_users, sge_mail_subj,
1957
sge_mail_body, MSG_MAIL_TYPE_STATE);
1961
sge_dstring_free(&cpu_string);
1962
sge_dstring_free(&maxvmem_string);