~ubuntu-branches/ubuntu/utopic/gridengine/utopic

« back to all changes in this revision

Viewing changes to source/daemons/qmaster/job_report_qmaster.c

  • Committer: Bazaar Package Importer
  • Author(s): Mark Hymers
  • Date: 2008-06-25 22:36:13 UTC
  • Revision ID: james.westby@ubuntu.com-20080625223613-tvd9xlhuoct9kyhm
Tags: upstream-6.2~beta2
ImportĀ upstreamĀ versionĀ 6.2~beta2

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*___INFO__MARK_BEGIN__*/
 
2
/*************************************************************************
 
3
 * 
 
4
 *  The Contents of this file are made available subject to the terms of
 
5
 *  the Sun Industry Standards Source License Version 1.2
 
6
 * 
 
7
 *  Sun Microsystems Inc., March, 2001
 
8
 * 
 
9
 * 
 
10
 *  Sun Industry Standards Source License Version 1.2
 
11
 *  =================================================
 
12
 *  The contents of this file are subject to the Sun Industry Standards
 
13
 *  Source License Version 1.2 (the "License"); You may not use this file
 
14
 *  except in compliance with the License. You may obtain a copy of the
 
15
 *  License at http://gridengine.sunsource.net/Gridengine_SISSL_license.html
 
16
 * 
 
17
 *  Software provided under this License is provided on an "AS IS" basis,
 
18
 *  WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING,
 
19
 *  WITHOUT LIMITATION, WARRANTIES THAT THE SOFTWARE IS FREE OF DEFECTS,
 
20
 *  MERCHANTABLE, FIT FOR A PARTICULAR PURPOSE, OR NON-INFRINGING.
 
21
 *  See the License for the specific provisions governing your rights and
 
22
 *  obligations concerning the Software.
 
23
 * 
 
24
 *   The Initial Developer of the Original Code is: Sun Microsystems, Inc.
 
25
 * 
 
26
 *   Copyright: 2001 by Sun Microsystems, Inc.
 
27
 * 
 
28
 *   All Rights Reserved.
 
29
 * 
 
30
 ************************************************************************/
 
31
/*___INFO__MARK_END__*/
 
32
#include <string.h>
 
33
 
 
34
#include "sgermon.h"
 
35
#include "sge_log.h"
 
36
#include "sge.h"
 
37
#include "sge_answer.h"
 
38
#include "sge_pe.h"
 
39
#include "sge_ja_task.h"
 
40
#include "sge_pe_task.h"
 
41
#include "sge_usageL.h"
 
42
#include "sge_report_execd.h"
 
43
#include "sge_sched.h"
 
44
#include "sge_prog.h"
 
45
#include "execution_states.h"
 
46
#include "sge_feature.h"
 
47
#include "job_report_qmaster.h"
 
48
#include "job_exit.h"
 
49
#include "sge_signal.h"
 
50
#include "sge_event_master.h"
 
51
#include "sge_job_qmaster.h"
 
52
#include "sge_host.h"
 
53
#include "sge_give_jobs.h"
 
54
#include "sge_pe_qmaster.h"
 
55
#include "sge_time.h"
 
56
#include "reschedule.h"
 
57
#include "msg_daemons_common.h"
 
58
#include "msg_qmaster.h"
 
59
#include "sge_string.h"
 
60
#include "sge_var.h"
 
61
#include "sge_job.h"
 
62
#include "sge_report.h"
 
63
 
 
64
#include "sge_reporting_qmaster.h"
 
65
 
 
66
#include "sge_persistence_qmaster.h"
 
67
#include "spool/sge_spooling.h"
 
68
#include "sgeobj/sge_ack.h"
 
69
 
 
70
static char *status2str(u_long32 status);
 
71
 
 
72
#define is_running(state) (state==JWRITTEN || state==JRUNNING|| state==JWAITING4OSJID)
 
73
 
 
74
static char *status2str(
 
75
u_long32 status 
 
76
) {
 
77
   char *s;
 
78
 
 
79
   switch (status) {
 
80
   case JTRANSFERING:
 
81
      s = "JTRANSFERING";
 
82
      break;
 
83
   case JRUNNING:
 
84
      s = "JRUNNING";
 
85
      break;
 
86
   case JFINISHED:
 
87
      s = "JFINISHED";
 
88
      break;
 
89
   case JIDLE:
 
90
      s = "JIDLE";
 
91
      break;
 
92
   default:
 
93
      s = "<unknown>";
 
94
      break;
 
95
   }
 
96
 
 
97
   return s;
 
98
}
 
99
/* ----------------------------------------
 
100
 
 
101
NAME 
 
102
   process_job_report
 
103
 
 
104
DESCR
 
105
   Process 'report' containing a job report list from 
 
106
   'commproc' at 'rhost'.
 
107
 
 
108
   The 'pb' may get used to collect requests that will be 
 
109
   generated in this process. The caller should reply it
 
110
   to the sender of this job report list if 'pb' remains
 
111
   not empty.
 
112
 
 
113
RETURN
 
114
   void  because all necessary state changings are done 
 
115
         in the apropriate objects
 
116
 
 
117
   ---------------------------------------- */
 
118
void process_job_report(sge_gdi_ctx_class_t *ctx, lListElem *report,
 
119
                       lListElem *hep, char *rhost, char *commproc,
 
120
                       sge_pack_buffer *pb, monitoring_t *monitor)
 
121
{
 
122
   lList* jrl = lGetList(report, REP_list); /* JR_Type */
 
123
   lListElem *jep, *jr, *ep, *jatep = NULL; 
 
124
   object_description *object_base = object_type_get_object_description();
 
125
 
 
126
   DENTER(TOP_LAYER, "process_job_report");
 
127
 
 
128
   DPRINTF(("received job report with %d elements:\n", lGetNumberOfElem(jrl)));
 
129
 
 
130
   /* 
 
131
   ** first process job reports of sub tasks to ensure this we put all these 
 
132
   ** job reports to the top of the 'jrl' list this is necessary to ensure 
 
133
   ** slave tasks get accounted on a shm machine 
 
134
   */
 
135
   {
 
136
      static lSortOrder *jr_sort_order = NULL;
 
137
      if (!jr_sort_order) {
 
138
         DPRINTF(("parsing job report sort order\n"));
 
139
         jr_sort_order = lParseSortOrderVarArg(JR_Type, "%I-", 
 
140
            JR_pe_task_id_str);
 
141
      }
 
142
      lSortList(jrl, jr_sort_order);
 
143
   }
 
144
 
 
145
   /*
 
146
   ** now check all job reports found in step 1 are 
 
147
   ** removed from job report list
 
148
   */
 
149
   for_each(jr, jrl) {
 
150
      const char *queue_name;
 
151
      const char *pe_task_id_str = lGetString(jr, JR_pe_task_id_str);
 
152
      u_long32 status = 0;
 
153
      lListElem *petask = NULL;
 
154
      int fret;
 
155
      u_long32 jobid, rstate = 0, jataskid = 0;
 
156
 
 
157
      jobid = lGetUlong(jr, JR_job_number);
 
158
      jataskid = lGetUlong(jr, JR_ja_task_number);
 
159
      rstate = lGetUlong(jr, JR_state);
 
160
 
 
161
      /* handle protocol to execd for all jobs which are
 
162
         already finished and maybe rescheduled */
 
163
      /* RU: */
 
164
      fret = skip_restarted_job(hep, jr, jobid, jataskid);
 
165
      if (fret > 0) {
 
166
         if (fret == 2) {
 
167
            pack_ack(pb, ACK_SIGNAL_JOB, jobid, jataskid, NULL);
 
168
         } else if (fret == 3) {
 
169
            pack_ack(pb, ACK_JOB_EXIT, jobid, jataskid, pe_task_id_str);
 
170
         }
 
171
         continue;
 
172
      }
 
173
 
 
174
      jep = job_list_locate(*object_base[SGE_TYPE_JOB].list, jobid);
 
175
      if (jep != NULL) {
 
176
         jatep = lGetElemUlong(lGetList(jep, JB_ja_tasks), JAT_task_number, jataskid);
 
177
         if (jatep != NULL) {
 
178
            status = lGetUlong(jatep, JAT_status);
 
179
         }
 
180
      }
 
181
 
 
182
      if ((queue_name = lGetString(jr, JR_queue_name)) == NULL) {
 
183
         queue_name = MSG_OBJ_UNKNOWNQ;
 
184
      }
 
185
      
 
186
      if (pe_task_id_str != NULL && jep != NULL && jatep != NULL) {
 
187
         petask = lGetSubStr(jatep, PET_id, pe_task_id_str, JAT_task_list); 
 
188
      }
 
189
      
 
190
      switch (rstate) {
 
191
      case JWRITTEN:
 
192
      case JRUNNING:   
 
193
      case JWAITING4OSJID:
 
194
         if (jep && jatep) {
 
195
            lList *answer_list = NULL;
 
196
 
 
197
            switch (status) {
 
198
            case JTRANSFERING:
 
199
            case JRUNNING:   
 
200
               /* 
 
201
                * If a ja_task was deleted while the execd was down, we'll
 
202
                * get a "job running" report when the execd starts up again.
 
203
                * The ja_task will be deleted by a timer triggered event
 
204
                * (TYPE_SIGNAL_RESEND_EVENT), but this can take up to one
 
205
                * minute - better send a kill signal immediately.
 
206
                */
 
207
               if (ISSET(lGetUlong(jatep, JAT_state), JDELETED)) {
 
208
                  DPRINTF(("Received report from "sge_u32"."sge_u32
 
209
                           " which is already in \"deleted\" state. "
 
210
                           "==> send kill signal\n", jobid, jataskid));
 
211
 
 
212
                  pack_ack(pb, ACK_SIGNAL_JOB, jobid, jataskid, NULL);
 
213
               }
 
214
 
 
215
               if (pe_task_id_str == NULL) {
 
216
                    
 
217
                  /* store unscaled usage directly in job */
 
218
                  lXchgList(jr, JR_usage, lGetListRef(jatep, JAT_usage_list));
 
219
 
 
220
                  /* update jobs scaled usage list */
 
221
                  lSetList(jatep, JAT_scaled_usage_list, 
 
222
                      lCopyList("scaled", lGetList(jatep, JAT_usage_list)));
 
223
                  scale_usage(lGetList(hep, EH_usage_scaling_list), 
 
224
                              lGetList(jatep, JAT_previous_usage_list),
 
225
                              lGetList(jatep, JAT_scaled_usage_list));
 
226
                 
 
227
                  if (status == JTRANSFERING) { /* got async ack for this job */
 
228
                     DPRINTF(("--- transfering job "sge_u32" is running\n", jobid));
 
229
                     sge_commit_job(ctx, jep, jatep, jr, COMMIT_ST_ARRIVED, COMMIT_DEFAULT, monitor); /* implicitly sending usage to schedd */
 
230
                     cancel_job_resend(jobid, jataskid);
 
231
                  } else {
 
232
                     /* need to generate a job event for new usage 
 
233
                      * the timestamp should better come from report object
 
234
                      */
 
235
                     /* jatask usage is not spooled (?) */
 
236
                     sge_add_list_event( 0, sgeE_JOB_USAGE, 
 
237
                                        jobid, jataskid, NULL, NULL,
 
238
                                        lGetString(jep, JB_session),
 
239
                                        lGetList(jatep, JAT_scaled_usage_list));
 
240
                     lList_clear_changed_info(lGetList(jatep, JAT_scaled_usage_list));
 
241
                  }
 
242
               } else {
 
243
                  /* register running task qmaster will log accounting for all registered tasks */
 
244
                  lListElem *pe;
 
245
                  bool new_task = false;
 
246
 
 
247
                  /* do we expect a pe task report from this host? */
 
248
                  if (lGetString(jatep, JAT_granted_pe)
 
249
                        && (pe=pe_list_locate(*object_base[SGE_TYPE_PE].list, lGetString(jatep, JAT_granted_pe)))
 
250
                        && lGetBool(pe, PE_control_slaves)
 
251
                        && lGetElemHost(lGetList(jatep, JAT_granted_destin_identifier_list), JG_qhostname, rhost)) {
 
252
                    
 
253
                    /* is the task already known (object was created earlier)? */
 
254
                    if (petask == NULL) {
 
255
                        /* here qmaster hears the first time about this task
 
256
                           and thus adds it to the task list of the appropriate job */
 
257
                        new_task = true;
 
258
                        DPRINTF(("--- task (#%d) "sge_u32"/%s -> running\n", 
 
259
                           lGetNumberOfElem(lGetList(jatep, JAT_task_list)), jobid, pe_task_id_str));
 
260
                        petask = lAddSubStr(jatep, PET_id, pe_task_id_str, JAT_task_list, PET_Type);
 
261
                        lSetUlong(petask, PET_status, JRUNNING);
 
262
                        /* JG: TODO: this should be delivered from execd! */
 
263
                        lSetUlong(petask, PET_start_time, sge_get_gmt());
 
264
                        lSetList(petask, PET_granted_destin_identifier_list, NULL);
 
265
                        if ((ep=lAddSubHost(petask, JG_qhostname, rhost, PET_granted_destin_identifier_list, JG_Type))) {
 
266
                           lSetString(ep, JG_qname, queue_name);
 
267
                        }
 
268
                    }
 
269
 
 
270
                    /* store unscaled usage directly in sub-task */
 
271
                    lXchgList(jr, JR_usage, lGetListRef(petask, PET_usage));
 
272
 
 
273
                    /* update task's scaled usage list */
 
274
                    lSetList(petask, PET_scaled_usage,
 
275
                             lCopyList("scaled", lGetList(petask, PET_usage)));
 
276
 
 
277
                    scale_usage(lGetList(hep, EH_usage_scaling_list), 
 
278
                                lGetList(petask, PET_previous_usage),
 
279
                                lGetList(petask, PET_scaled_usage));
 
280
 
 
281
                    /* notify scheduler of task usage event */
 
282
                    if (new_task) {
 
283
                       sge_event_spool(ctx,
 
284
                                       &answer_list, 0, sgeE_PETASK_ADD, 
 
285
                                       jobid, jataskid, pe_task_id_str, NULL,
 
286
                                       lGetString(jep, JB_session),
 
287
                                       jep, jatep, petask, true, true);
 
288
                    } else {
 
289
                       sge_add_list_event( 0, sgeE_JOB_USAGE, 
 
290
                                          jobid, jataskid, pe_task_id_str, 
 
291
                                          NULL, lGetString(jep, JB_session),
 
292
                                          lGetList(petask, PET_scaled_usage));
 
293
                    }
 
294
                    answer_list_output(&answer_list);
 
295
                  } else if (lGetUlong(jatep, JAT_status) != JFINISHED) {
 
296
                     lListElem *jg;
 
297
                     const char *shouldbe_queue_name;
 
298
                     const char *shouldbe_host_name;
 
299
 
 
300
                     if (!(jg = lFirst(lGetList(jatep, JAT_granted_destin_identifier_list)))) {
 
301
                        shouldbe_queue_name = MSG_OBJ_NOTRUNNING;
 
302
                        shouldbe_host_name = MSG_OBJ_NOTRUNNING;
 
303
                     } else {
 
304
                        if ((shouldbe_queue_name = lGetString(jg, JG_qname)) == NULL) {
 
305
                           shouldbe_queue_name = MSG_OBJ_UNKNOWN;
 
306
                        }
 
307
                        if ((shouldbe_host_name = lGetString(jg, JG_qhostname)) == NULL) {
 
308
                           shouldbe_host_name = MSG_OBJ_UNKNOWN;
 
309
                        }
 
310
                     }
 
311
                     /* should never happen */
 
312
                     ERROR((SGE_EVENT, MSG_JOB_REPORTEXITQ_SUUSSSSS, 
 
313
                            rhost, sge_u32c(jobid), sge_u32c(jataskid), 
 
314
                            pe_task_id_str?pe_task_id_str:MSG_MASTER, 
 
315
                            queue_name, shouldbe_queue_name, 
 
316
                            shouldbe_host_name, 
 
317
                            status2str(lGetUlong(jatep, JAT_status))));
 
318
                  }
 
319
               }
 
320
 
 
321
               /* once a day write an intermediate usage record to the 
 
322
                * reporting file to have correct daily usage reporting with
 
323
                * long running jobs */
 
324
               if (reporting_is_intermediate_acct_required(jep, jatep, petask)) {
 
325
                  /* write intermediate usage */
 
326
                  reporting_create_acct_record(ctx, NULL, jr, jep, jatep, true);
 
327
 
 
328
                  /* this action has changed the ja_task/pe_task - spool */
 
329
                  if (pe_task_id_str != NULL) {
 
330
                     /* JG: TODO we would need a PETASK_MOD event here!
 
331
                      * for spooling only, the ADD event is OK
 
332
                      */
 
333
                     sge_event_spool(ctx,
 
334
                                     &answer_list, 0, sgeE_PETASK_ADD, 
 
335
                                     jobid, jataskid, pe_task_id_str, NULL,
 
336
                                     lGetString(jep, JB_session),
 
337
                                     jep, jatep, petask, false, true);
 
338
                  } else {
 
339
                     sge_event_spool(ctx,
 
340
                                     &answer_list, 0, sgeE_JATASK_MOD, 
 
341
                                     jobid, jataskid, NULL, NULL,
 
342
                                     lGetString(jep, JB_session),
 
343
                                     jep, jatep, NULL, false, true);
 
344
                  }
 
345
                  answer_list_output(&answer_list);
 
346
               }
 
347
               break;
 
348
            default:
 
349
               ERROR((SGE_EVENT, MSG_JOB_REPORTRUNQ_SUUSSU, 
 
350
                     rhost, sge_u32c(jobid), sge_u32c(jataskid), 
 
351
                     pe_task_id_str?pe_task_id_str:"master", 
 
352
                     queue_name, sge_u32c(status)));
 
353
               break;
 
354
            } 
 
355
         } else {
 
356
            /* execd reports a running job that is unknown */
 
357
            /* signal this job to kill it at execd 
 
358
               this can be caused by a qdel -f while 
 
359
               execd was unreachable or by deletion of 
 
360
               the job in qmasters spool dir + qmaster 
 
361
               restart  
 
362
               retry is triggered if execd reports
 
363
               this job again as running
 
364
            */
 
365
            ERROR((SGE_EVENT, MSG_JOB_REPORTRUNFALSE_SUUSS, rhost, 
 
366
                   sge_u32c(jobid), sge_u32c(jataskid), 
 
367
                   pe_task_id_str?pe_task_id_str:MSG_MASTER, queue_name));
 
368
            pack_ack(pb, ACK_SIGNAL_JOB, jobid, jataskid, NULL);
 
369
         }
 
370
         break;
 
371
         
 
372
      case JSLAVE:
 
373
         /* we might get load reports of pe slaves, which have finished
 
374
            during a load report interval. We do not have any flushing
 
375
            for slave load reports or any finish reports for them. If
 
376
            the scheduler is fast, it might have send a remove order for
 
377
            the job. We then get a load report for a job / task, which
 
378
            does not exist anymore. 
 
379
            I do nto know, if we have to send a job exit request, but at
 
380
            least we have to ignore the load report.
 
381
          */  
 
382
         if (!jep || !jatep) {
 
383
            DPRINTF(("send cleanup request for slave job "sge_u32"."sge_u32"\n", 
 
384
               jobid, jataskid));
 
385
            pack_ack(pb, ACK_JOB_EXIT, jobid, jataskid, pe_task_id_str);
 
386
         } else {
 
387
            /* must be ack for slave job */
 
388
            lListElem *first_at_host;
 
389
 
 
390
            first_at_host = lGetElemHost(lGetList(jatep, JAT_granted_destin_identifier_list), JG_qhostname, rhost);
 
391
            if (first_at_host) {
 
392
               if (lGetUlong(first_at_host, JG_tag_slave_job) != 0) {
 
393
 
 
394
                  DPRINTF(("slave job "sge_u32" arrived at %s\n", jobid, rhost));
 
395
                  lSetUlong(first_at_host, JG_tag_slave_job, 0);
 
396
 
 
397
                  /* should trigger a fast delivery of the job to master execd 
 
398
                     script but only when all other slaves have also arrived */ 
 
399
                  if (is_pe_master_task_send(jatep)) {
 
400
                     /* triggers direct job delivery to master execd */
 
401
                     lSetString(jatep, JAT_master_queue, lGetString( lFirst(lGetList(jatep, JAT_granted_destin_identifier_list)), JG_qname));
 
402
 
 
403
                     DPRINTF(("trigger retry of job delivery to master execd\n"));
 
404
                     lSetUlong(jatep, JAT_start_time, 0);
 
405
                     cancel_job_resend(jobid, jataskid);
 
406
                     trigger_job_resend(sge_get_gmt(), NULL, jobid, jataskid, 0);
 
407
                  }
 
408
               }
 
409
            } else {
 
410
               /* clear state with regards to slave controlled container */
 
411
               lListElem *host;
 
412
 
 
413
               host = host_list_locate(*object_base[SGE_TYPE_EXECHOST].list, rhost);
 
414
               update_reschedule_unknown_list_for_job(host, jobid, jataskid);
 
415
 
 
416
               DPRINTF(("RU: CLEANUP FOR SLAVE JOB "sge_u32"."sge_u32" on host "SFN"\n", 
 
417
                  jobid, jataskid, rhost));
 
418
 
 
419
               /* clean up */
 
420
               pack_ack(pb, ACK_JOB_EXIT, jobid, jataskid, pe_task_id_str);
 
421
            }
 
422
         }
 
423
         break;
 
424
      case JEXITING:
 
425
      {
 
426
         int skip_job_exit = 0;
 
427
 
 
428
         if (!jep || !jatep || (jep && status==JFINISHED)) {
 
429
            /* must be retry of execds job exit */
 
430
            /* or job was deleted using "qdel -f" */
 
431
            /* while execd was down or .. */
 
432
            dstring buffer = DSTRING_INIT;
 
433
            if (jatep == NULL) {
 
434
               INFO((SGE_EVENT, "exiting job "SFQ": ja_task does not exist",
 
435
                     job_get_id_string(jobid, jataskid, pe_task_id_str, &buffer)));
 
436
            } else {
 
437
               INFO((SGE_EVENT, "exiting job "SFQ": job does not exist",
 
438
                     job_get_id_string(jobid, jataskid, pe_task_id_str, &buffer)));
 
439
            }
 
440
            sge_dstring_free(&buffer);
 
441
         } else {
 
442
            /* job exited */
 
443
            if (pe_task_id_str == NULL) {
 
444
               /* store unscaled usage directly in job */
 
445
               lXchgList(jr, JR_usage, lGetListRef(jatep, JAT_usage_list));
 
446
 
 
447
               /* update jobs scaled usage list */
 
448
               lSetList(jatep, JAT_scaled_usage_list,
 
449
                  lCopyList("scaled", lGetList(jatep, JAT_usage_list)));
 
450
               scale_usage(lGetList(hep, EH_usage_scaling_list),
 
451
                           lGetList(jatep, JAT_previous_usage_list),
 
452
                           lGetList(jatep, JAT_scaled_usage_list));
 
453
               /* skip sge_job_exit() and pack_job_exit() in case there
 
454
                  are still running tasks, since execd resends job exit */
 
455
               for_each (petask, lGetList(jatep, JAT_task_list)) {
 
456
                  if (lGetUlong(petask, PET_status)==JRUNNING) {
 
457
                     DPRINTF(("job exit for job "sge_u32": still waiting for task %s\n", 
 
458
                        jobid, lGetString(petask, PET_id)));
 
459
                     skip_job_exit = 1;
 
460
                     break;
 
461
                  }
 
462
               }
 
463
 
 
464
               switch (status) {
 
465
               case JRUNNING:
 
466
               case JTRANSFERING:
 
467
                  if (!skip_job_exit) {
 
468
                     DPRINTF(("--- running job "sge_u32"."sge_u32" is exiting\n", 
 
469
                        jobid, jataskid, (status==JTRANSFERING)?"transfering":"running"));
 
470
 
 
471
                     sge_job_exit(ctx, jr, jep, jatep, monitor);
 
472
                  } else {
 
473
                     u_long32 failed = lGetUlong(jr, JR_failed);
 
474
 
 
475
                     if (failed == SSTATE_FAILURE_AFTER_JOB && 
 
476
                           !lGetString(jep, JB_checkpoint_name)) {
 
477
 
 
478
                        if (!ISSET(lGetUlong(jatep, JAT_state), JDELETED)) {
 
479
                           dstring id_dstring = DSTRING_INIT;
 
480
                           job_mark_job_as_deleted(ctx, jep, jatep);
 
481
                           ERROR((SGE_EVENT, MSG_JOB_MASTERTASKFAILED_S, 
 
482
                                  job_get_id_string(jobid, jataskid, NULL, &id_dstring)));
 
483
                           sge_dstring_free(&id_dstring);
 
484
                        }
 
485
                     }
 
486
                  }
 
487
                  break;
 
488
               case JFINISHED:
 
489
                  /* must be retry */
 
490
                  skip_job_exit = 1;
 
491
                  break;
 
492
               default:
 
493
                  ERROR((SGE_EVENT, MSG_JOB_REPORTEXITJ_UUU,
 
494
                        sge_u32c(jobid), sge_u32c(jataskid), sge_u32c(status)));
 
495
                  break;
 
496
               }
 
497
            } else {
 
498
               lListElem *pe;
 
499
               if (lGetString(jatep, JAT_granted_pe)
 
500
                  && (pe=pe_list_locate(*object_base[SGE_TYPE_PE].list, lGetString(jatep, JAT_granted_pe)))
 
501
                  && lGetBool(pe, PE_control_slaves)
 
502
                  && lGetElemHost(lGetList(jatep, JAT_granted_destin_identifier_list), JG_qhostname, rhost)) {
 
503
                  /* 
 
504
                   * here we get usage of tasks that ran on slave/master execd's
 
505
                   * we store the pe task id of finished pe tasks in the ja task
 
506
                   * to prevent multiple handling of pe task finish in case 
 
507
                   * execd resends job report.
 
508
                   */
 
509
 
 
510
                  if (ja_task_add_finished_pe_task(jatep, pe_task_id_str)) {
 
511
                     bool known_pe_task = true; /* did this pe task show up
 
512
                                                   earlier (USAGE report) */
 
513
 
 
514
                     if (petask == NULL) {
 
515
                        known_pe_task = false;
 
516
                        petask = lAddSubStr(jatep, PET_id, pe_task_id_str, 
 
517
                                            JAT_task_list, PET_Type);
 
518
                        lSetUlong(petask, PET_status, JRUNNING);
 
519
                     }
 
520
 
 
521
                     /* store unscaled usage directly in sub-task */
 
522
                     /* lXchgList(jr, JR_usage, lGetListRef(task, JB_usage_list)); */
 
523
                     /* copy list because we need to keep usage in jr for sge_log_dusage() */
 
524
                     lSetList(petask, PET_usage, lCopyList(NULL, lGetList(jr, JR_usage)));
 
525
 
 
526
                     /* update task's scaled usage list */
 
527
                     lSetList(petask, PET_scaled_usage,
 
528
                              lCopyList("scaled", lGetList(petask, PET_usage)));
 
529
                     scale_usage(lGetList(hep, EH_usage_scaling_list), 
 
530
                                 lGetList(petask, PET_previous_usage),
 
531
                                 lGetList(petask, PET_scaled_usage));
 
532
 
 
533
 
 
534
                     if (lGetUlong(petask, PET_status)==JRUNNING ||
 
535
                         lGetUlong(petask, PET_status)==JTRANSFERING) {
 
536
                        u_long32 failed = lGetUlong(jr, JR_failed);
 
537
 
 
538
                        DPRINTF(("--- petask "sge_u32"."sge_u32"/%s -> final usage\n", 
 
539
                           jobid, jataskid, pe_task_id_str));
 
540
                        lSetUlong(petask, PET_status, JFINISHED);
 
541
 
 
542
                        reporting_create_acct_record(ctx, NULL, jr, jep, jatep, false);
 
543
 
 
544
                        /* add tasks (scaled) usage to past usage container */
 
545
                        {
 
546
                           lListElem *container = lGetSubStr(jatep, PET_id, PE_TASK_PAST_USAGE_CONTAINER, JAT_task_list);
 
547
                           if (container == NULL) {
 
548
                              lList *answer_list = NULL;
 
549
                              container = pe_task_sum_past_usage_list(lGetList(jatep, JAT_task_list), petask);
 
550
                              /* usage container will be spooled */
 
551
                              sge_event_spool(ctx,
 
552
                                            &answer_list, 0, sgeE_PETASK_ADD, 
 
553
                                            jobid, jataskid, PE_TASK_PAST_USAGE_CONTAINER, NULL, lGetString(jep, JB_session),  
 
554
                                            jep, jatep, container, true, true);
 
555
                              answer_list_output(&answer_list);
 
556
                           } else {
 
557
                              lList *answer_list = NULL;
 
558
 
 
559
                              pe_task_sum_past_usage(container, petask);
 
560
                              /* create list event for the USAGE_CONTAINER */
 
561
                              sge_add_list_event(0, sgeE_JOB_USAGE, 
 
562
                                                 jobid, jataskid, 
 
563
                                                 PE_TASK_PAST_USAGE_CONTAINER, 
 
564
                                                 NULL,
 
565
                                                 lGetString(jep, JB_session),
 
566
                                                 lGetList(container, PET_scaled_usage));
 
567
                              /* usage container will be spooled */
 
568
                              /* JG: TODO: it is not really a sgeE_PETASK_ADD,
 
569
                               * but a sgeE_PETASK_MOD. We don't have this event
 
570
                               * yet. For spooling only, the add event will do
 
571
                               */
 
572
                              sge_event_spool(ctx,
 
573
                                            &answer_list, 0, sgeE_PETASK_ADD, 
 
574
                                            jobid, jataskid, PE_TASK_PAST_USAGE_CONTAINER, NULL, lGetString(jep, JB_session),  
 
575
                                            jep, jatep, container, false, true);
 
576
                              answer_list_output(&answer_list);
 
577
                           }
 
578
                        }
 
579
 
 
580
                        /* remove pe task from job/jatask */
 
581
                        if (known_pe_task) {
 
582
                           lList *answer_list = NULL;
 
583
                           sge_event_spool(ctx,
 
584
                                           &answer_list, 0, sgeE_PETASK_DEL, 
 
585
                                          jobid, jataskid, pe_task_id_str, 
 
586
                                          NULL, NULL, NULL, NULL, NULL, 
 
587
                                          true, true);
 
588
                           answer_list_output(&answer_list);
 
589
                        }
 
590
                        lRemoveElem(lGetList(jatep, JAT_task_list), &petask);
 
591
                        
 
592
                        /* get rid of this job in case a task died from XCPU/XFSZ or 
 
593
                           exited with a core dump */
 
594
                        if (failed==SSTATE_FAILURE_AFTER_JOB
 
595
                              && (ep=lGetElemStr(lGetList(jr, JR_usage), UA_name, "signal"))) {
 
596
                           u_long32 sge_signo = (u_long32)lGetDouble(ep, UA_value);
 
597
 
 
598
                           switch (sge_signo) {
 
599
                           case SGE_SIGXFSZ:
 
600
                              INFO((SGE_EVENT, MSG_JOB_FILESIZEEXCEED_SSUU, 
 
601
                                   pe_task_id_str, rhost, sge_u32c(jobid), sge_u32c(jataskid)));
 
602
                              break;
 
603
                           case SGE_SIGXCPU:
 
604
                              INFO((SGE_EVENT, MSG_JOB_CPULIMEXCEED_SSUU, 
 
605
                                    pe_task_id_str, rhost, sge_u32c(jobid), sge_u32c(jataskid)));
 
606
                              break;
 
607
                           default: 
 
608
                              INFO((SGE_EVENT, MSG_JOB_DIEDTHROUGHSIG_SSUUS, 
 
609
                                   pe_task_id_str, rhost, sge_u32c(jobid), sge_u32c(jataskid), sge_sig2str(sge_signo)));
 
610
                              break;
 
611
                           }   
 
612
                        } else if (failed==0) {
 
613
                           INFO((SGE_EVENT, MSG_JOB_TASKFINISHED_SSUU, 
 
614
                              pe_task_id_str, rhost, sge_u32c(jobid), sge_u32c(jataskid)));
 
615
                        } else {
 
616
                           INFO((SGE_EVENT, MSG_JOB_TASKFAILED_SSUUU,
 
617
                              pe_task_id_str, rhost, sge_u32c(jobid), sge_u32c(jataskid), sge_u32c(failed)));
 
618
                        }
 
619
 
 
620
                        if (failed == SSTATE_FAILURE_AFTER_JOB && 
 
621
                              !lGetString(jep, JB_checkpoint_name)) {
 
622
                           if (!ISSET(lGetUlong(jatep, JAT_state), JDELETED)) {
 
623
                              dstring id_dstring = DSTRING_INIT;
 
624
                              job_mark_job_as_deleted(ctx, jep, jatep);
 
625
                              ERROR((SGE_EVENT, MSG_JOB_JOBTASKFAILED_S, 
 
626
                                     job_get_id_string(jobid, jataskid, pe_task_id_str, &id_dstring)));
 
627
                              sge_dstring_free(&id_dstring);
 
628
                           }
 
629
                        }
 
630
                     }
 
631
                  }
 
632
               } else if (status != JFINISHED) {
 
633
                  lListElem *jg;
 
634
                  const char *shouldbe_queue_name;
 
635
                  const char *shouldbe_host_name;
 
636
 
 
637
                  if (!(jg = lFirst(lGetList(jatep, JAT_granted_destin_identifier_list)))) {
 
638
                     shouldbe_queue_name = MSG_OBJ_NOTRUNNING;
 
639
                     shouldbe_host_name = MSG_OBJ_NOTRUNNING;
 
640
                  } else {
 
641
                     if ((shouldbe_queue_name = lGetString(jg, JG_qname)) == NULL) {
 
642
                        shouldbe_queue_name = MSG_OBJ_UNKNOWN;
 
643
                     }
 
644
                     if ((shouldbe_host_name = lGetString(jg, JG_qhostname)) == NULL) {
 
645
                        shouldbe_host_name = MSG_OBJ_UNKNOWN;
 
646
                     }
 
647
                  }
 
648
                  /* should never happen */
 
649
                  ERROR((SGE_EVENT, MSG_JOB_REPORTEXITQ_SUUSSSSS, 
 
650
                        rhost, sge_u32c(jobid), sge_u32c(jataskid), 
 
651
                        pe_task_id_str?pe_task_id_str:MSG_MASTER, queue_name, 
 
652
                        shouldbe_queue_name, shouldbe_host_name, 
 
653
                        status2str(lGetUlong(jatep, JAT_status))));
 
654
               }
 
655
            }
 
656
         }
 
657
         /* pack ack to enable execd cleaning up */
 
658
         if (!skip_job_exit) {
 
659
            pack_ack(pb, ACK_JOB_EXIT, jobid, jataskid, pe_task_id_str);
 
660
         }
 
661
      }
 
662
         break;
 
663
      default:
 
664
         ERROR((SGE_EVENT, MSG_EXECD_UNKNOWNJ_SUUSUS, 
 
665
                rhost, 
 
666
                sge_u32c(jobid), 
 
667
                sge_u32c(jataskid), 
 
668
                pe_task_id_str?pe_task_id_str:MSG_MASTER, 
 
669
                sge_u32c(rstate), 
 
670
                queue_name));
 
671
 
 
672
 
 
673
         pack_ack(pb, ACK_JOB_EXIT, jobid, jataskid, pe_task_id_str);
 
674
         break;
 
675
      }
 
676
   }
 
677
 
 
678
 
 
679
   DRETURN_VOID;
 
680
}
 
681