~ubuntu-branches/ubuntu/utopic/gridengine/utopic

« back to all changes in this revision

Viewing changes to source/daemons/qmaster/job_exit.c

  • Committer: Bazaar Package Importer
  • Author(s): Mark Hymers
  • Date: 2008-06-25 22:36:13 UTC
  • Revision ID: james.westby@ubuntu.com-20080625223613-tvd9xlhuoct9kyhm
Tags: upstream-6.2~beta2
ImportĀ upstreamĀ versionĀ 6.2~beta2

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*___INFO__MARK_BEGIN__*/
 
2
/*************************************************************************
 
3
 * 
 
4
 *  The Contents of this file are made available subject to the terms of
 
5
 *  the Sun Industry Standards Source License Version 1.2
 
6
 * 
 
7
 *  Sun Microsystems Inc., March, 2001
 
8
 * 
 
9
 * 
 
10
 *  Sun Industry Standards Source License Version 1.2
 
11
 *  =================================================
 
12
 *  The contents of this file are subject to the Sun Industry Standards
 
13
 *  Source License Version 1.2 (the "License"); You may not use this file
 
14
 *  except in compliance with the License. You may obtain a copy of the
 
15
 *  License at http://gridengine.sunsource.net/Gridengine_SISSL_license.html
 
16
 * 
 
17
 *  Software provided under this License is provided on an "AS IS" basis,
 
18
 *  WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING,
 
19
 *  WITHOUT LIMITATION, WARRANTIES THAT THE SOFTWARE IS FREE OF DEFECTS,
 
20
 *  MERCHANTABLE, FIT FOR A PARTICULAR PURPOSE, OR NON-INFRINGING.
 
21
 *  See the License for the specific provisions governing your rights and
 
22
 *  obligations concerning the Software.
 
23
 * 
 
24
 *   The Initial Developer of the Original Code is: Sun Microsystems, Inc.
 
25
 * 
 
26
 *   Copyright: 2001 by Sun Microsystems, Inc.
 
27
 * 
 
28
 *   All Rights Reserved.
 
29
 * 
 
30
 ************************************************************************/
 
31
/*___INFO__MARK_END__*/
 
32
#include <string.h>
 
33
#include <errno.h>
 
34
#include <stdlib.h>
 
35
 
 
36
#include "sge.h"
 
37
#include "sge_ja_task.h"
 
38
#include "sge_job_refL.h"
 
39
#include "sge_job_qmaster.h"
 
40
#include "sge_pe_qmaster.h"
 
41
#include "sge_host.h"
 
42
#include "job_exit.h"
 
43
#include "sge_give_jobs.h"
 
44
#include "sge_event_master.h"
 
45
#include "sge_queue_event_master.h"
 
46
#include "sge_cqueue_qmaster.h"
 
47
#include "sge_subordinate_qmaster.h"
 
48
#include "execution_states.h"
 
49
#include "sge_feature.h"
 
50
#include "sge_rusage.h"
 
51
#include "sge_prog.h"
 
52
#include "sgermon.h"
 
53
#include "sge_log.h"
 
54
#include "symbols.h"
 
55
#include "category.h"
 
56
#include "setup_path.h"
 
57
#include "msg_common.h"
 
58
#include "msg_daemons_common.h"
 
59
#include "msg_qmaster.h"
 
60
#include "sge_string.h"
 
61
#include "sge_unistd.h"
 
62
#include "sge_time.h"
 
63
#include "sge_spool.h"
 
64
#include "sge_hostname.h"
 
65
#include "sgeobj/sge_qinstance.h"
 
66
#include "sgeobj/sge_qinstance_state.h"
 
67
#include "sge_job.h"
 
68
#include "sge_report.h"
 
69
#include "sge_report_execd.h"
 
70
#include "sge_userset.h"
 
71
#include "sge_cqueue.h"
 
72
#include "sge_answer.h"
 
73
 
 
74
#include "sge_reporting_qmaster.h"
 
75
#include "sge_advance_reservation_qmaster.h"
 
76
#include "sge_qinstance_qmaster.h"
 
77
 
 
78
#include "sge_persistence_qmaster.h"
 
79
#include "spool/sge_spooling.h"
 
80
 
 
81
/************************************************************************
 
82
 Master routine for job exit
 
83
 
 
84
 We need a rusage struct filled.
 
85
 In normal cases this is done by the execd, sending this structure
 
86
 to notify master about job finish.
 
87
 
 
88
 In case of an error noticed by the master which needs the job to be 
 
89
 removed we can fill this structure by hand. We need:
 
90
 
 
91
 rusage->job_number
 
92
 rusage->qname to clean up the queue (if we didn't find it we nevertheless
 
93
               clean up the job
 
94
 
 
95
 for functions regarding rusage see sge_rusage.c
 
96
 ************************************************************************/
 
97
void sge_job_exit(sge_gdi_ctx_class_t *ctx, lListElem *jr, lListElem *jep, lListElem *jatep, monitoring_t *monitor) 
 
98
{
 
99
   lListElem *queueep = NULL;
 
100
   const char *err_str = NULL;
 
101
   const char *qname = NULL;  
 
102
   const char *hostname = MSG_OBJ_UNKNOWNHOST;
 
103
   u_long32 jobid, jataskid;
 
104
   lListElem *hep = NULL;
 
105
   u_long32 timestamp;
 
106
   object_description *object_base = object_type_get_object_description();
 
107
 
 
108
   u_long32 failed, general_failure;
 
109
   lList *saved_gdil;
 
110
 
 
111
   DENTER(TOP_LAYER, "sge_job_exit");
 
112
   
 
113
   /* JG: TODO: we'd prefer some more precise timestamp, e.g. from jr */
 
114
   timestamp = sge_get_gmt();
 
115
                     
 
116
   qname = lGetString(jr, JR_queue_name);
 
117
   if (!qname) {
 
118
      qname = (char *)MSG_OBJ_UNKNOWNQ;
 
119
   }
 
120
   err_str = lGetString(jr, JR_err_str);
 
121
   if (!err_str) {
 
122
      err_str = MSG_UNKNOWNREASON;
 
123
   }
 
124
 
 
125
   jobid = lGetUlong(jr, JR_job_number);
 
126
   jataskid = lGetUlong(jr, JR_ja_task_number);
 
127
   failed = lGetUlong(jr, JR_failed);
 
128
   general_failure = lGetUlong(jr, JR_general_failure);
 
129
 
 
130
   cancel_job_resend(jobid, jataskid);
 
131
 
 
132
   /* This only has a meaning for Hibernator jobs. The job pid must
 
133
    * be saved accross restarts, since jobs get there old pid
 
134
    */
 
135
   lSetUlong(jatep, JAT_pvm_ckpt_pid, lGetUlong(jr, JR_job_pid));
 
136
 
 
137
   DPRINTF(("reaping job "sge_u32"."sge_u32" in queue >%s< job_pid %d\n", 
 
138
      jobid, jataskid, qname, (int) lGetUlong(jatep, JAT_pvm_ckpt_pid)));
 
139
 
 
140
   if (!(queueep = cqueue_list_locate_qinstance(*object_base[SGE_TYPE_CQUEUE].list, qname))) {
 
141
      ERROR((SGE_EVENT, MSG_JOB_WRITEJFINISH_S, qname));
 
142
   }
 
143
 
 
144
   /* retrieve hostname for later use */
 
145
   if (queueep != NULL) {
 
146
      hostname = lGetHost(queueep, QU_qhostname);
 
147
   }
 
148
 
 
149
   if (failed) {        /* a problem occured */
 
150
      WARNING((SGE_EVENT, MSG_JOB_FAILEDONHOST_UUSSSS, sge_u32c(jobid), 
 
151
               sge_u32c(jataskid), 
 
152
               hostname,
 
153
               general_failure ? MSG_GENERAL : "",
 
154
               get_sstate_description(failed), err_str));
 
155
   } else {
 
156
      INFO((SGE_EVENT, MSG_JOB_JFINISH_UUS,  sge_u32c(jobid), sge_u32c(jataskid), 
 
157
            hostname));
 
158
   }
 
159
 
 
160
 
 
161
   /*-------------------------------------------------*/
 
162
 
 
163
   /* test if this job is in state JRUNNING or JTRANSFERING */
 
164
   if (lGetUlong(jatep, JAT_status) != JRUNNING && 
 
165
       lGetUlong(jatep, JAT_status) != JTRANSFERING) {
 
166
      ERROR((SGE_EVENT, MSG_JOB_JEXITNOTRUN_UU, sge_u32c(lGetUlong(jep, JB_job_number)), sge_u32c(jataskid)));
 
167
      return;
 
168
   }
 
169
 
 
170
   saved_gdil = lCopyList("cpy", lGetList(jatep, JAT_granted_destin_identifier_list));
 
171
 
 
172
   /*
 
173
    * case 1: job being trashed because 
 
174
    *    --> failed starting interactive job
 
175
    *    --> job was deleted
 
176
    *    --> a failed batch job that explicitely shall not enter error state
 
177
    */
 
178
   if (((lGetUlong(jatep, JAT_state) & JDELETED) == JDELETED) ||
 
179
         (failed && !lGetString(jep, JB_exec_file)) ||
 
180
         (failed && general_failure==GFSTATE_JOB && JOB_TYPE_IS_NO_ERROR(lGetUlong(jep, JB_type)))) {
 
181
      reporting_create_acct_record(ctx, NULL, jr, jep, jatep, false);
 
182
      /* JG: TODO: we need more information in the log message */
 
183
      reporting_create_job_log(NULL, timestamp, JL_DELETED, MSG_EXECD, hostname, jr, jep, jatep, NULL, MSG_LOG_JREMOVED);
 
184
 
 
185
      sge_commit_job(ctx, jep, jatep, jr, COMMIT_ST_FINISHED_FAILED_EE, COMMIT_DEFAULT | COMMIT_NEVER_RAN, monitor);
 
186
 
 
187
      if (lGetUlong(jep, JB_ar) != 0 && (lGetUlong(jatep, JAT_state) & JDELETED) == JDELETED) {
 
188
         /* get AR and remove it if no other jobs are debited */
 
189
         lList *master_ar_list = *object_base[SGE_TYPE_AR].list;
 
190
         lListElem *ar = ar_list_locate(master_ar_list, lGetUlong(jep, JB_ar));
 
191
 
 
192
         if (ar != NULL && lGetUlong(ar, AR_state) == AR_DELETED) {
 
193
            lListElem *ar_queue;
 
194
            u_long32 ar_id = lGetUlong(ar, AR_id);
 
195
 
 
196
            for_each(ar_queue, lGetList(ar, AR_reserved_queues)) {
 
197
               if (qinstance_slots_used(ar_queue) != 0) {
 
198
                  break;
 
199
               }
 
200
            }
 
201
            if (ar_queue == NULL) {
 
202
               /* no jobs registered in advance reservation */
 
203
               dstring buffer = DSTRING_INIT;
 
204
 
 
205
               sge_dstring_sprintf(&buffer, sge_U32CFormat,
 
206
                                   sge_u32c(ar_id));
 
207
 
 
208
               ar_do_reservation(ar, false);
 
209
 
 
210
               reporting_create_ar_log_record(NULL, ar, ARL_DELETED, 
 
211
                                              "AR deleted",
 
212
                                              timestamp);
 
213
               reporting_create_ar_acct_records(NULL, ar, timestamp); 
 
214
 
 
215
               lRemoveElem(master_ar_list, &ar);
 
216
 
 
217
               sge_event_spool(ctx, NULL, 0, sgeE_AR_DEL, 
 
218
                      ar_id, 0, sge_dstring_get_string(&buffer), NULL, NULL,
 
219
                      NULL, NULL, NULL, true, true);
 
220
               sge_dstring_free(&buffer);
 
221
            }
 
222
         }
 
223
      }
 
224
   } 
 
225
     /*
 
226
      * case 2: set job in error state
 
227
      *    --> owner requested wrong 
 
228
      *        -e/-o/-S/-cwd
 
229
      *    --> user did not exist at the execution machine
 
230
      *    --> application controlled job error
 
231
      */
 
232
   else if ((failed && general_failure==GFSTATE_JOB)) {
 
233
      DPRINTF(("set job "sge_u32"."sge_u32" in ERROR state\n", 
 
234
               lGetUlong(jep, JB_job_number), jataskid));
 
235
      reporting_create_acct_record(ctx, NULL, jr, jep, jatep, false);
 
236
      /* JG: TODO: we need more information in the log message */
 
237
      reporting_create_job_log(NULL, timestamp, JL_ERROR, MSG_EXECD, hostname, 
 
238
                               jr, jep, jatep, NULL, MSG_LOG_JERRORSET);
 
239
      lSetUlong(jatep, JAT_start_time, 0);
 
240
      ja_task_message_add(jatep, 1, err_str);
 
241
      sge_commit_job(ctx, jep, jatep, jr, COMMIT_ST_FAILED_AND_ERROR, COMMIT_DEFAULT, monitor);
 
242
   }
 
243
      /*
 
244
       * case 3: job being rescheduled because it wasnt even started
 
245
       *                            or because it was a general error
 
246
       */
 
247
   else if (((failed && (failed <= SSTATE_BEFORE_JOB)) || 
 
248
        general_failure)) {
 
249
      /* JG: TODO: we need more information in the log message */
 
250
      reporting_create_job_log(NULL, timestamp, JL_RESTART, MSG_EXECD, 
 
251
                               hostname, jr, jep, jatep, NULL, 
 
252
                               MSG_LOG_JNOSTARTRESCHEDULE);
 
253
      ja_task_message_add(jatep, 1, err_str);
 
254
      sge_commit_job(ctx, jep, jatep, jr, COMMIT_ST_RESCHEDULED, COMMIT_DEFAULT, monitor);
 
255
      reporting_create_acct_record(ctx, NULL, jr, jep, jatep, false);
 
256
      lSetUlong(jatep, JAT_start_time, 0);
 
257
   }
 
258
      /*
 
259
       * case 4: job being rescheduled because rerun specified or ckpt job
 
260
       */
 
261
   else if (((failed == ESSTATE_NO_EXITSTATUS) || 
 
262
              failed == ESSTATE_DIED_THRU_SIGNAL) &&
 
263
            ((lGetUlong(jep, JB_restart) == 1 || 
 
264
             (lGetUlong(jep, JB_checkpoint_attr) & ~NO_CHECKPOINT)) ||
 
265
             (!lGetUlong(jep, JB_restart) && lGetBool(queueep, QU_rerun)))) {
 
266
      DTRACE;
 
267
      lSetUlong(jatep, JAT_job_restarted, 
 
268
                  MAX(lGetUlong(jatep, JAT_job_restarted), 
 
269
                      lGetUlong(jr, JR_ckpt_arena)));
 
270
      lSetString(jatep, JAT_osjobid, lGetString(jr, JR_osjobid));
 
271
      reporting_create_acct_record(ctx, NULL, jr, jep, jatep, false);
 
272
      /* JG: TODO: we need more information in the log message */
 
273
      reporting_create_job_log(NULL, timestamp, JL_RESTART, MSG_EXECD, hostname, jr, jep, jatep, NULL, MSG_LOG_JRERUNRESCHEDULE);
 
274
      lSetUlong(jatep, JAT_start_time, 0);
 
275
      sge_commit_job(ctx, jep, jatep, jr, COMMIT_ST_RESCHEDULED, COMMIT_DEFAULT, monitor);
 
276
   }
 
277
      /*
 
278
       * case 5: job being rescheduled because it was interrupted and a checkpoint exists
 
279
       */
 
280
   else if (failed == SSTATE_MIGRATE) {
 
281
      DTRACE;
 
282
      /* job_restarted == 2 means a checkpoint in the ckpt arena */
 
283
      lSetUlong(jatep, JAT_job_restarted, 
 
284
                  MAX(lGetUlong(jatep, JAT_job_restarted), 
 
285
                      lGetUlong(jr, JR_ckpt_arena)));
 
286
      lSetString(jatep, JAT_osjobid, lGetString(jr, JR_osjobid));
 
287
      reporting_create_acct_record(ctx, NULL, jr, jep, jatep, false);
 
288
      reporting_create_job_log(NULL, timestamp, JL_MIGRATE, MSG_EXECD, hostname, jr, jep, jatep, NULL, MSG_LOG_JCKPTRESCHEDULE);
 
289
      lSetUlong(jatep, JAT_start_time, 0);
 
290
      sge_commit_job(ctx, jep, jatep, jr, COMMIT_ST_RESCHEDULED, COMMIT_DEFAULT, monitor);
 
291
   }
 
292
      /*
 
293
       * case 6: job being rescheduled because of exit 99 
 
294
       *                            or because of a rerun e.g. triggered by qmod -r <jobid>
 
295
       */
 
296
   else if (failed == SSTATE_AGAIN) {
 
297
      lSetUlong(jatep, JAT_job_restarted, 
 
298
                  MAX(lGetUlong(jatep, JAT_job_restarted), 
 
299
                      lGetUlong(jr, JR_ckpt_arena)));
 
300
      lSetString(jatep, JAT_osjobid, lGetString(jr, JR_osjobid));
 
301
      reporting_create_acct_record(ctx, NULL, jr, jep, jatep, false);
 
302
      reporting_create_job_log(NULL, timestamp, JL_RESTART, MSG_EXECD, hostname, jr, jep, jatep, NULL, MSG_LOG_JNORESRESCHEDULE);
 
303
      lSetUlong(jatep, JAT_start_time, 0);
 
304
      sge_commit_job(ctx, jep, jatep, jr, COMMIT_ST_RESCHEDULED, COMMIT_DEFAULT, monitor);
 
305
   }
 
306
      /*
 
307
       * case 7: job finished 
 
308
       */
 
309
   else {
 
310
      reporting_create_acct_record(ctx, NULL, jr, jep, jatep, false);
 
311
      reporting_create_job_log(NULL, timestamp, JL_FINISHED, MSG_EXECD, hostname, jr, jep, jatep, NULL, MSG_LOG_EXITED);
 
312
      sge_commit_job(ctx, jep, jatep, jr, COMMIT_ST_FINISHED_FAILED_EE, COMMIT_DEFAULT, monitor);
 
313
   }
 
314
 
 
315
   if (queueep != NULL) {
 
316
      bool found_host = false;
 
317
      bool spool_queueep = false;
 
318
      lList *answer_list = NULL;
 
319
      /*
 
320
      ** in this case we have to halt all queues on this host
 
321
      */
 
322
      if (general_failure && general_failure == GFSTATE_HOST) {
 
323
         spool_queueep = true;
 
324
         hep = host_list_locate(*object_base[SGE_TYPE_EXECHOST].list, 
 
325
                                lGetHost(queueep, QU_qhostname));
 
326
         if (hep != NULL) {
 
327
            lListElem *cqueue = NULL;
 
328
            const char *host = lGetHost(hep, EH_name);
 
329
            dstring error = DSTRING_INIT;
 
330
 
 
331
            found_host = true;
 
332
 
 
333
            for_each(cqueue, *object_base[SGE_TYPE_CQUEUE].list) {
 
334
               lList *qinstance_list = lGetList(cqueue, CQ_qinstances);
 
335
               lListElem *qinstance = NULL;
 
336
 
 
337
               qinstance = lGetElemHost(qinstance_list, QU_qhostname, host);
 
338
               if (qinstance != NULL) {
 
339
 
 
340
                  sge_qmaster_qinstance_state_set_error(qinstance, true);
 
341
 
 
342
                  sge_dstring_sprintf(&error, MSG_LOG_QERRORBYJOBHOST_SUS, lGetString(qinstance, QU_qname), sge_u32c(jobid), host);
 
343
                  qinstance_message_add(qinstance, QI_ERROR, sge_dstring_get_string(&error)); 
 
344
                  ERROR((SGE_EVENT, sge_dstring_get_string(&error)));
 
345
                  if (qinstance != queueep) {
 
346
                     sge_event_spool(ctx, &answer_list, 0, sgeE_QINSTANCE_MOD, 
 
347
                                     0, 0, lGetString(qinstance, QU_qname), 
 
348
                                     lGetHost(qinstance, QU_qhostname), NULL,
 
349
                                     qinstance, NULL, NULL, true, true);
 
350
                  }
 
351
               }
 
352
            }
 
353
            sge_dstring_free(&error);
 
354
         }
 
355
      }
 
356
      /*
 
357
      ** to be sure this queue is halted even if the host 
 
358
      ** is not found in the next statement
 
359
      */
 
360
      if (general_failure && general_failure != GFSTATE_JOB && found_host == false) {  
 
361
         dstring error = DSTRING_INIT; 
 
362
 
 
363
         sge_dstring_sprintf(&error, MSG_LOG_QERRORBYJOBHOST_SUS,
 
364
                             lGetString(queueep, QU_qname), sge_u32c(jobid),
 
365
                             hostname);
 
366
         
 
367
         /* general error -> this queue cant run any job */
 
368
         sge_qmaster_qinstance_state_set_error(queueep, true);
 
369
         qinstance_message_add(queueep, QI_ERROR, sge_dstring_get_string(&error));
 
370
         spool_queueep = true;
 
371
         ERROR((SGE_EVENT, sge_dstring_get_string(&error)));      
 
372
         sge_dstring_free(&error);
 
373
      }
 
374
 
 
375
      sge_event_spool(ctx, &answer_list, 0, sgeE_QINSTANCE_MOD, 
 
376
                      0, 0, lGetString(queueep, QU_qname), 
 
377
                      lGetHost(queueep, QU_qhostname), NULL,
 
378
                      queueep, NULL, NULL, true, spool_queueep);
 
379
 
 
380
      gdil_del_all_orphaned(ctx, saved_gdil, &answer_list);
 
381
      answer_list_output(&answer_list);
 
382
   }
 
383
 
 
384
   lFreeList(&saved_gdil);
 
385
   DRETURN_VOID;
 
386
}