~ubuntu-branches/ubuntu/utopic/gridengine/utopic

« back to all changes in this revision

Viewing changes to source/clients/qsub/qsub.c

  • Committer: Bazaar Package Importer
  • Author(s): Mark Hymers
  • Date: 2008-06-25 22:36:13 UTC
  • Revision ID: james.westby@ubuntu.com-20080625223613-tvd9xlhuoct9kyhm
Tags: upstream-6.2~beta2
ImportĀ upstreamĀ versionĀ 6.2~beta2

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*___INFO__MARK_BEGIN__*/
 
2
/*************************************************************************
 
3
 * 
 
4
 *  The Contents of this file are made available subject to the terms of
 
5
 *  the Sun Industry Standards Source License Version 1.2
 
6
 * 
 
7
 *  Sun Microsystems Inc., March, 2001
 
8
 * 
 
9
 * 
 
10
 *  Sun Industry Standards Source License Version 1.2
 
11
 *  =================================================
 
12
 *  The contents of this file are subject to the Sun Industry Standards
 
13
 *  Source License Version 1.2 (the "License"); You may not use this file
 
14
 *  except in compliance with the License. You may obtain a copy of the
 
15
 *  License at http://gridengine.sunsource.net/Gridengine_SISSL_license.html
 
16
 * 
 
17
 *  Software provided under this License is provided on an "AS IS" basis,
 
18
 *  WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING,
 
19
 *  WITHOUT LIMITATION, WARRANTIES THAT THE SOFTWARE IS FREE OF DEFECTS,
 
20
 *  MERCHANTABLE, FIT FOR A PARTICULAR PURPOSE, OR NON-INFRINGING.
 
21
 *  See the License for the specific provisions governing your rights and
 
22
 *  obligations concerning the Software.
 
23
 * 
 
24
 *   The Initial Developer of the Original Code is: Sun Microsystems, Inc.
 
25
 * 
 
26
 *   Copyright: 2001 by Sun Microsystems, Inc.
 
27
 * 
 
28
 *   All Rights Reserved.
 
29
 * 
 
30
 ************************************************************************/
 
31
/*___INFO__MARK_END__*/
 
32
#include <string.h>
 
33
#include <sys/stat.h>
 
34
#include <errno.h>
 
35
 
 
36
#include "sge_all_listsL.h"
 
37
#include "usage.h"
 
38
#include "parse_qsub.h"
 
39
#include "parse_job_cull.h"
 
40
#include "read_defaults.h"
 
41
#include "show_job.h"
 
42
#include "commlib.h"
 
43
#include "sig_handlers.h"
 
44
#include "sge_prog.h"
 
45
#include "sgermon.h"
 
46
#include "sge_afsutil.h"
 
47
#include "setup_path.h"
 
48
#include "qm_name.h"
 
49
#include "sge_unistd.h"
 
50
#include "sge_security.h"
 
51
#include "sge_answer.h"
 
52
#include "sge_job.h"
 
53
#include "japi.h"
 
54
#include "japiP.h"
 
55
#include "lck/sge_mtutil.h"
 
56
#include "uti/sge_log.h"
 
57
#include "sge_profiling.h"
 
58
#include "gdi/sge_gdi_ctx.h"
 
59
 
 
60
#include "msg_clients_common.h"
 
61
#include "msg_qsub.h"
 
62
#include "msg_qmaster.h"
 
63
#include "basis_types.h"
 
64
#include "msg_common.h"
 
65
 
 
66
 
 
67
extern sge_gdi_ctx_class_t *ctx;
 
68
 
 
69
extern char **environ;
 
70
static pthread_mutex_t exit_mutex = PTHREAD_MUTEX_INITIALIZER;
 
71
static pthread_cond_t exit_cv = PTHREAD_COND_INITIALIZER;
 
72
static bool exited = false;
 
73
 
 
74
static char *get_bulk_jobid_string(long job_id, int start, int end, int step);
 
75
static void qsub_setup_sig_handlers(void);
 
76
static void qsub_terminate(void);
 
77
static void *sig_thread(void *dummy);
 
78
static int report_exit_status(int stat, const char *jobid);
 
79
static void error_handler(const char *message);
 
80
 
 
81
int main(int argc, char **argv);
 
82
 
 
83
/************************************************************************/
 
84
int main(
 
85
int argc,
 
86
char **argv 
 
87
) {
 
88
   lList *opts_cmdline = NULL;
 
89
   lList *opts_defaults = NULL;
 
90
   lList *opts_scriptfile = NULL;
 
91
   lList *opts_all = NULL;
 
92
   lListElem *job = NULL;
 
93
   lList *alp = NULL;
 
94
   lListElem *ep;
 
95
   lListElem *i_opt;
 
96
   lListElem *o_opt;
 
97
   int exit_status = 0;
 
98
   int just_verify;
 
99
   int tmp_ret;
 
100
   int wait_for_job = 0, is_immediate = 0;
 
101
   dstring session_key_out = DSTRING_INIT;
 
102
   dstring diag = DSTRING_INIT;
 
103
   dstring jobid = DSTRING_INIT;
 
104
   bool has_terse = false;
 
105
   u_long32 start, end, step;
 
106
   u_long32 num_tasks;
 
107
   int count, stat;
 
108
   char *jobid_string = NULL;
 
109
   drmaa_attr_values_t *jobids = NULL;
 
110
 
 
111
   u_long32 prog_number = 0;
 
112
   u_long32 myuid = 0;
 
113
   const char *sge_root = NULL;
 
114
   const char *cell_root = NULL;
 
115
   const char *username = NULL;
 
116
   const char *qualified_hostname = NULL;
 
117
   const char *unqualified_hostname = NULL;
 
118
   const char *mastername = NULL;
 
119
 
 
120
   DENTER_MAIN(TOP_LAYER, "qsub");
 
121
 
 
122
   prof_mt_init();
 
123
 
 
124
   /* Set up the program information name */
 
125
   sge_setup_sig_handlers(QSUB);
 
126
 
 
127
   DPRINTF(("Initializing JAPI\n"));
 
128
 
 
129
   if (japi_init(NULL, NULL, NULL, QSUB, false, NULL, &diag)
 
130
                                                      != DRMAA_ERRNO_SUCCESS) {
 
131
      fprintf(stderr, "\n");
 
132
      fprintf(stderr, MSG_QSUB_COULDNOTINITIALIZEENV_S,
 
133
              sge_dstring_get_string(&diag));
 
134
      fprintf(stderr, "\n");
 
135
      DEXIT;
 
136
      SGE_EXIT((void**)&ctx, 1);
 
137
   }
 
138
 
 
139
   prog_number = ctx->get_who(ctx);
 
140
   myuid = ctx->get_uid(ctx);
 
141
   sge_root = ctx->get_sge_root(ctx);
 
142
   cell_root = ctx->get_cell_root(ctx);
 
143
   username = ctx->get_username(ctx);
 
144
   qualified_hostname = ctx->get_qualified_hostname(ctx);
 
145
   unqualified_hostname = ctx->get_unqualified_hostname(ctx);
 
146
   mastername = ctx->get_master(ctx, false);
 
147
 
 
148
   /*
 
149
    * read switches from the various defaults files
 
150
    */
 
151
   opt_list_append_opts_from_default_files(prog_number, cell_root, username, &opts_defaults, &alp, environ);
 
152
   tmp_ret = answer_list_print_err_warn(&alp, NULL, NULL, MSG_WARNING);
 
153
   if (tmp_ret > 0) {
 
154
      DEXIT;
 
155
      SGE_EXIT((void**)&ctx, tmp_ret);
 
156
   }
 
157
 
 
158
   /*
 
159
    * append the commandline switches to the list
 
160
    */
 
161
   opt_list_append_opts_from_qsub_cmdline(prog_number, &opts_cmdline, &alp,
 
162
                                          argv + 1, environ);
 
163
   tmp_ret = answer_list_print_err_warn(&alp, NULL, "qsub: ", MSG_QSUB_WARNING_S);
 
164
   if (tmp_ret > 0) {
 
165
      DEXIT;
 
166
      SGE_EXIT((void**)&ctx, tmp_ret);
 
167
   }
 
168
 
 
169
   /*
 
170
    * show usage if -help was in commandline
 
171
    */
 
172
   if (opt_list_has_X(opts_cmdline, "-help")) {
 
173
      sge_usage(QSUB, stdout);
 
174
      DEXIT;
 
175
      SGE_EXIT((void**)&ctx, 0);
 
176
   }
 
177
 
 
178
   /*
 
179
    * Check if -terse is requested
 
180
    */
 
181
   if (opt_list_has_X(opts_cmdline, "-terse")) {
 
182
      has_terse = true;
 
183
   }
 
184
 
 
185
   /*
 
186
    * We will only read commandline options from scripfile if the script
 
187
    * itself should not be handled as binary
 
188
    */
 
189
   if (opt_list_is_X_true(opts_cmdline, "-b") ||
 
190
       (!opt_list_has_X(opts_cmdline, "-b") &&
 
191
        opt_list_is_X_true(opts_defaults, "-b"))) {
 
192
      DPRINTF(("Skipping options from script due to -b option\n"));
 
193
   } else {
 
194
      opt_list_append_opts_from_script(prog_number,
 
195
                                       &opts_scriptfile, &alp, 
 
196
                                       opts_cmdline, environ);
 
197
      tmp_ret = answer_list_print_err_warn(&alp, NULL, MSG_QSUB_COULDNOTREADSCRIPT_S,
 
198
                                           MSG_WARNING);
 
199
      if (tmp_ret > 0) {
 
200
         DEXIT;
 
201
         SGE_EXIT((void**)&ctx, tmp_ret);
 
202
      }
 
203
   }
 
204
 
 
205
   /*
 
206
    * Merge all commandline options and interprete them
 
207
    */
 
208
   opt_list_merge_command_lines(&opts_all, &opts_defaults, 
 
209
                                &opts_scriptfile, &opts_cmdline);
 
210
 
 
211
   /* If "-sync y" is set, wait for the job to end. */   
 
212
   /* Remove all -sync switches since cull_parse_job_parameter()
 
213
    * doesn't know what to do with them. */
 
214
   while ((ep = lGetElemStr(opts_all, SPA_switch, "-sync"))) {
 
215
      if (lGetInt(ep, SPA_argval_lIntT) == TRUE) {
 
216
         wait_for_job = 1;
 
217
      }
 
218
      
 
219
      lRemoveElem(opts_all, &ep);
 
220
   }
 
221
 
 
222
   i_opt = lGetElemStr(opts_all, SPA_switch, "-i");
 
223
   o_opt = lGetElemStr(opts_all, SPA_switch, "-o");
 
224
 
 
225
   if (opt_list_is_X_true(opts_cmdline, "-i")) {
 
226
      if (i_opt == o_opt)  {
 
227
         fprintf(stderr, MSG_PARSE_SAMEPATHFORINPUTANDOUTPUT_SS,
 
228
             "", "");
 
229
         fprintf(stderr, "\n");
 
230
         DEXIT;
 
231
         SGE_EXIT((void**)&ctx, 1);
 
232
      }
 
233
   }
 
234
 
 
235
   
 
236
   if (wait_for_job) {
 
237
      DPRINTF(("Wait for job end\n"));
 
238
   }
 
239
 
 
240
   alp = cull_parse_job_parameter(myuid, username, cell_root, unqualified_hostname, qualified_hostname, opts_all, &job);
 
241
 
 
242
   tmp_ret = answer_list_print_err_warn(&alp, NULL, "qsub: ", MSG_WARNING);
 
243
   if (tmp_ret > 0) {
 
244
      DEXIT;
 
245
      SGE_EXIT((void**)&ctx, tmp_ret);
 
246
   }
 
247
 
 
248
   if (set_sec_cred(sge_root, mastername, job, &alp) != 0) {
 
249
      answer_list_output(&alp);
 
250
      DEXIT;
 
251
      SGE_EXIT((void**)&ctx, 1);
 
252
   }
 
253
 
 
254
   /* Check is we're just verifying the job */
 
255
   just_verify = (lGetUlong(job, JB_verify_suitable_queues)==JUST_VERIFY);
 
256
   DPRINTF(("Just verifying job\n"));
 
257
 
 
258
   /* Check if job is immediate */
 
259
   is_immediate = (int)JOB_TYPE_IS_IMMEDIATE(lGetUlong(job, JB_type));
 
260
   DPRINTF(("Job is%s immediate\n", is_immediate ? "" : " not"));
 
261
 
 
262
   DPRINTF(("Everything ok\n"));
 
263
 
 
264
   if (lGetUlong(job, JB_verify)) {
 
265
      cull_show_job(job, 0);
 
266
      DEXIT;
 
267
      SGE_EXIT((void**)&ctx, 0);
 
268
   }
 
269
 
 
270
   if (is_immediate || wait_for_job) {
 
271
      pthread_t sigt;
 
272
      
 
273
      qsub_setup_sig_handlers(); 
 
274
 
 
275
      if (pthread_create(&sigt, NULL, sig_thread, (void *)NULL) != 0) {
 
276
         fprintf(stderr, "\n");
 
277
         fprintf(stderr, MSG_QSUB_COULDNOTINITIALIZEENV_S,
 
278
                 " error preparing signal handling thread");
 
279
         fprintf(stderr, "\n");
 
280
         
 
281
         exit_status = 1;
 
282
         goto Error;
 
283
      }
 
284
      
 
285
      if (japi_enable_job_wait(username, unqualified_hostname, NULL, &session_key_out, error_handler, &diag) ==
 
286
                                       DRMAA_ERRNO_DRM_COMMUNICATION_FAILURE) {
 
287
         const char *msg = sge_dstring_get_string(&diag);
 
288
         fprintf(stderr, "\n");
 
289
         fprintf(stderr, MSG_QSUB_COULDNOTINITIALIZEENV_S,
 
290
                 msg?msg:" error starting event client thread");
 
291
         fprintf(stderr, "\n");
 
292
         
 
293
         exit_status = 1;
 
294
         goto Error;
 
295
      }
 
296
   }
 
297
   
 
298
   job_get_submit_task_ids(job, &start, &end, &step);
 
299
   num_tasks = (end - start) / step + 1;
 
300
 
 
301
   if (num_tasks > 1) {
 
302
      int error = japi_run_bulk_jobs(&jobids, job, start, end, step, &diag);
 
303
      if (error != DRMAA_ERRNO_SUCCESS) {
 
304
         /* No active session here means that japi_enable_job_wait() was
 
305
          * interrupted by the signal handler, in which case we just break out
 
306
          * quietly. */
 
307
         if (error != DRMAA_ERRNO_NO_ACTIVE_SESSION) {
 
308
            fprintf(stderr, MSG_QSUB_COULDNOTRUNJOB_S,
 
309
                    sge_dstring_get_string(&diag));
 
310
            fprintf(stderr, "\n");
 
311
         }
 
312
         
 
313
         /* BUGFIX: Issuezilla #1013
 
314
          * To quickly fix this issue, I'm mapping the JAPI/DRMAA error code
 
315
          * back into a GDI error code.  This is the easy solution.  The
 
316
          * correct solution would be to address issue #859, presumably by
 
317
          * having JAPI reuse the GDI error codes instead of the JAPI error
 
318
          * codes. */
 
319
         if (error == DRMAA_ERRNO_TRY_LATER) {
 
320
            exit_status = STATUS_NOTOK_DOAGAIN;
 
321
         }
 
322
         else {
 
323
            exit_status = 1;
 
324
         }
 
325
         
 
326
         goto Error;
 
327
      }
 
328
 
 
329
      DPRINTF(("job id is: %ld\n", jobids->it.ji.jobid));
 
330
      
 
331
      jobid_string = get_bulk_jobid_string((long)jobids->it.ji.jobid, start, end, step);
 
332
   }
 
333
   else if (num_tasks == 1) {
 
334
      int error = japi_run_job(&jobid, job, &diag);
 
335
      
 
336
      if (error != DRMAA_ERRNO_SUCCESS) {
 
337
         if (error != DRMAA_ERRNO_NO_ACTIVE_SESSION) {
 
338
            fprintf(stderr, MSG_QSUB_COULDNOTRUNJOB_S,
 
339
                    sge_dstring_get_string(&diag));
 
340
            fprintf(stderr, "\n");
 
341
         }
 
342
         
 
343
         /* BUGFIX: Issuezilla #1013
 
344
          * To quickly fix this issue, I'm mapping the JAPI/DRMAA error code
 
345
          * back into a GDI error code.  This is the easy solution.  The
 
346
          * correct solution would be to address issue #859, presumably by
 
347
          * having JAPI reuse the GDI error codes instead of the DRMAA error
 
348
          * codes. */
 
349
         if (error == DRMAA_ERRNO_TRY_LATER) {
 
350
            exit_status = STATUS_NOTOK_DOAGAIN;
 
351
         }
 
352
         else {
 
353
            exit_status = 1;
 
354
         }
 
355
         
 
356
         goto Error;
 
357
      }
 
358
 
 
359
      jobid_string = strdup(sge_dstring_get_string(&jobid));
 
360
      DPRINTF(("job id is: %s\n", jobid_string));
 
361
 
 
362
      sge_dstring_free(&jobid);
 
363
   }
 
364
   else {
 
365
      fprintf(stderr, MSG_QSUB_COULDNOTRUNJOB_S, "invalid task structure");
 
366
      fprintf(stderr, "\n");
 
367
      
 
368
      exit_status = 1;
 
369
      goto Error;
 
370
   }
 
371
  
 
372
   /* only success message is printed to stdout */
 
373
   if (!just_verify) {
 
374
      const char *output = sge_dstring_get_string(&diag); 
 
375
 
 
376
      /* print the tersed output */
 
377
      if (has_terse) {
 
378
        printf("%s", jobid_string);
 
379
      } else if (output != NULL) {
 
380
        printf("%s", output);
 
381
      } else {
 
382
        printf(MSG_QSUB_YOURJOBHASBEENSUBMITTED_SS, jobid_string, lGetString(job, JB_job_name));
 
383
      }
 
384
      printf("\n");
 
385
   } else {
 
386
      printf(MSG_JOB_VERIFYFOUNDQ);
 
387
      printf("\n");
 
388
   }   
 
389
 
 
390
   if ((wait_for_job || is_immediate) && !just_verify) {
 
391
      int event;
 
392
 
 
393
      if (is_immediate) {
 
394
         fprintf(stderr, "%s\n", MSG_QSUB_WAITINGFORIMMEDIATEJOBTOBESCHEDULED);
 
395
 
 
396
         /* We only need to wait for the first task to be scheduled to be able
 
397
          * to say that the job is running. */
 
398
         tmp_ret = japi_wait(DRMAA_JOB_IDS_SESSION_ANY, &jobid, &stat,
 
399
                             DRMAA_TIMEOUT_WAIT_FOREVER, JAPI_JOB_START, &event,
 
400
                             NULL, &diag);
 
401
 
 
402
         if ((tmp_ret == DRMAA_ERRNO_SUCCESS) && (event == JAPI_JOB_START)) {
 
403
            fprintf(stderr, "\n");
 
404
            fprintf(stderr, MSG_QSUB_YOURIMMEDIATEJOBXHASBEENSUCCESSFULLYSCHEDULED_S,
 
405
                  jobid_string);
 
406
            fprintf(stderr, "\n");
 
407
         }
 
408
         /* A job finish event here means that the job was rejected. */
 
409
         else if ((tmp_ret == DRMAA_ERRNO_SUCCESS) &&
 
410
                  (event == JAPI_JOB_FINISH)) {
 
411
            fprintf(stderr, "\n%s\n", MSG_QSUB_YOURQSUBREQUESTCOULDNOTBESCHEDULEDDTRYLATER);
 
412
            
 
413
            exit_status = 1;
 
414
            goto Error;
 
415
         }
 
416
         else {
 
417
         /* Since we told japi_wait to wait forever, we know that if it gets
 
418
          * a timeout, it's because it's been interrupted to exit, in which
 
419
          * case we don't complain.  Same for no active session. */
 
420
            if ((tmp_ret != DRMAA_ERRNO_EXIT_TIMEOUT) &&
 
421
                (tmp_ret != DRMAA_ERRNO_NO_ACTIVE_SESSION)) {
 
422
               fprintf(stderr, "\n");
 
423
               fprintf(stderr, MSG_QSUB_COULDNOTWAITFORJOB_S,
 
424
                       sge_dstring_get_string(&diag));
 
425
               fprintf(stderr, "\n");
 
426
            }
 
427
 
 
428
            exit_status = 1;
 
429
            goto Error;
 
430
         }
 
431
      }
 
432
         
 
433
      if (wait_for_job) {
 
434
         /* Rather than using japi_synchronize on ALL for bulk jobs, we use
 
435
          * japi_wait on ANY num_tasks times because with synchronize, we would
 
436
          * have to wait for all the tasks to finish before we know if any
 
437
          * finished. */
 
438
         for (count = 0; count < num_tasks; count++) {
 
439
            /* Since there's only one running job in the session, we can just
 
440
             * wait for ANY. */
 
441
            if ((tmp_ret = japi_wait(DRMAA_JOB_IDS_SESSION_ANY, &jobid, &stat,
 
442
                          DRMAA_TIMEOUT_WAIT_FOREVER, JAPI_JOB_FINISH, &event,
 
443
                          NULL, &diag)) != DRMAA_ERRNO_SUCCESS) {
 
444
               if ((tmp_ret != DRMAA_ERRNO_EXIT_TIMEOUT) &&
 
445
                   (tmp_ret != DRMAA_ERRNO_NO_ACTIVE_SESSION)) {
 
446
                  fprintf(stderr, "\n");
 
447
                  fprintf(stderr, MSG_QSUB_COULDNOTWAITFORJOB_S, sge_dstring_get_string(&diag));
 
448
                  fprintf(stderr, "\n");
 
449
               }
 
450
               
 
451
               exit_status = 1;
 
452
               goto Error;
 
453
            }
 
454
            
 
455
            /* report how job finished */
 
456
            /* If the job is an array job, use the first non-zero exit code as
 
457
             * the exit code for qsub. */
 
458
            if (exit_status == 0) {
 
459
               exit_status = report_exit_status(stat,
 
460
                                              sge_dstring_get_string(&jobid));
 
461
            }
 
462
            /* If we've already found a non-zero exit code, just print the exit
 
463
             * info for the task. */
 
464
            else {
 
465
               report_exit_status(stat, sge_dstring_get_string(&jobid));
 
466
            }               
 
467
         }
 
468
      }
 
469
   }
 
470
 
 
471
Error:
 
472
   FREE(jobid_string);
 
473
   lFreeList(&alp);
 
474
   lFreeList(&opts_all);
 
475
   
 
476
   if ((tmp_ret = japi_exit(JAPI_EXIT_NO_FLAG, &diag)) != DRMAA_ERRNO_SUCCESS) {
 
477
      if (tmp_ret != DRMAA_ERRNO_NO_ACTIVE_SESSION) {
 
478
         fprintf(stderr, "\n");
 
479
         fprintf(stderr, MSG_QSUB_COULDNOTFINALIZEENV_S, sge_dstring_get_string(&diag));
 
480
         fprintf(stderr, "\n");
 
481
      }
 
482
      else {
 
483
         struct timespec ts;
 
484
         /* We know that if we get a DRMAA_ERRNO_NO_ACTIVE_SESSION here, it's
 
485
          * because the signal handler thread called japi_exit().  We know this
 
486
          * because if the call to japi_init() fails, we just exit directly.
 
487
          * If the call to japi_init() succeeds, then we have an active session,
 
488
          * so coming here because of an error would not result in the
 
489
          * DRMAA_ERRNO_NO_ACTIVE_SESSION error. */
 
490
         DPRINTF(("Sleeping for 15 seconds to wait for the exit to finish.\n"));
 
491
         
 
492
         sge_relative_timespec(15, &ts);
 
493
         sge_mutex_lock("qsub_exit_mutex", SGE_FUNC, __LINE__, &exit_mutex);
 
494
         
 
495
         while (!exited) {
 
496
            if (pthread_cond_timedwait(&exit_cv, &exit_mutex, &ts) == ETIMEDOUT) {
 
497
               DPRINTF(("Exit has not finished after 15 seconds.  Exiting.\n"));
 
498
               break;
 
499
            }
 
500
         }
 
501
         
 
502
         sge_mutex_unlock("qsub_exit_mutex", SGE_FUNC, __LINE__, &exit_mutex);
 
503
      }
 
504
   }
 
505
 
 
506
   sge_prof_cleanup();
 
507
 
 
508
   /* This is an exit() instead of an SGE_EXIT() because when the qmaster is
 
509
    * supended, SGE_EXIT() hangs. */
 
510
   exit(exit_status);
 
511
   DEXIT;
 
512
   return exit_status;
 
513
}
 
514
 
 
515
/****** get_bulk_jobid_string() ************************************************
 
516
*  NAME
 
517
*     get_bulk_jobid_string() -- Turn the job id and parameters into a string
 
518
*
 
519
*  SYNOPSIS
 
520
*     char *get_bulk_jobid_string(long job_id, int start, int end, int step)
 
521
*
 
522
*  FUNCTION
 
523
*     Creates a string from the job id, start task, end task, and task step.
 
524
*     The return job id string must be freed by the caller.
 
525
*
 
526
*  INPUT
 
527
*     long job_id   - The job's id number
 
528
*     int start     - The number of the first task in the job
 
529
*     int end       - The number of the last task in the job
 
530
*     int step      - The increment between job task numbers
 
531
*
 
532
*  RESULT
 
533
*     static char * - The job id string
 
534
*
 
535
*  NOTES
 
536
*     MT-NOTES: get_bulk_jobid_string() is MT safe
 
537
*******************************************************************************/
 
538
static char *get_bulk_jobid_string(long job_id, int start, int end, int step)
 
539
{
 
540
   char *jobid_str = (char *)malloc(sizeof(char) * 1024);
 
541
   char *ret_str = NULL;
 
542
   
 
543
   sprintf(jobid_str, "%ld.%d-%d:%d", job_id, start, end, step);
 
544
   ret_str = strdup(jobid_str);
 
545
   FREE(jobid_str);
 
546
   
 
547
   return ret_str;
 
548
}
 
549
 
 
550
/****** qsub_setup_sig_handlers() **********************************************
 
551
*  NAME
 
552
*     qsub_setup_sig_handlers() -- Set up the signal handlers
 
553
*
 
554
*  SYNOPSIS
 
555
*     void qsub_setup_sig_handlers(void)
 
556
*
 
557
*  FUNCTION
 
558
*     Blocks all signals so that the signal handler thread receives them.
 
559
*
 
560
*  NOTES
 
561
*     MT-NOTES: get_bulk_jobid_string() is MT safe
 
562
*******************************************************************************/
 
563
static void qsub_setup_sig_handlers(void)
 
564
{
 
565
   sigset_t sig_set;
 
566
 
 
567
   sigfillset(&sig_set);
 
568
   pthread_sigmask(SIG_BLOCK, &sig_set, NULL);
 
569
}
 
570
 
 
571
/****** qsub_terminate() *******************************************************
 
572
*  NAME
 
573
*     qsub_terminate() -- Terminates qsub
 
574
*
 
575
*  SYNOPSIS
 
576
*     void qsub_terminate(void)
 
577
*
 
578
*  FUNCTION
 
579
*     Prints out messages that qsub is ending and exits JAPI.
 
580
*
 
581
*  NOTES
 
582
*     MT-NOTES: qsub_terminate() is MT safe
 
583
*******************************************************************************/
 
584
static void qsub_terminate(void)
 
585
{
 
586
   dstring diag = DSTRING_INIT;
 
587
   int tmp_ret;
 
588
   
 
589
   fprintf(stderr, "\n%s\n", MSG_QSUB_INTERRUPTED);
 
590
   fprintf(stderr, "%s\n", MSG_QSUB_TERMINATING);
 
591
 
 
592
   tmp_ret = japi_exit(JAPI_EXIT_KILL_PENDING, &diag);
 
593
   
 
594
   /* No active session here means that the main thread beat us to exiting,
 
595
      in which case, we just quietly give up and go away. */
 
596
   if ((tmp_ret != DRMAA_ERRNO_SUCCESS) &&
 
597
       (tmp_ret != DRMAA_ERRNO_NO_ACTIVE_SESSION)) {
 
598
      fprintf(stderr, "\n");
 
599
      fprintf(stderr, MSG_QSUB_COULDNOTFINALIZEENV_S,
 
600
              sge_dstring_get_string(&diag));
 
601
      fprintf(stderr, "\n");
 
602
   }
 
603
 
 
604
   sge_dstring_free(&diag);
 
605
 
 
606
   sge_mutex_lock("qsub_exit_mutex", "qsub_terminate", __LINE__, &exit_mutex);
 
607
   exited = true;
 
608
   pthread_cond_signal(&exit_cv);
 
609
   sge_mutex_unlock("qsub_exit_mutex", "qsub_terminate", __LINE__, &exit_mutex);
 
610
}
 
611
 
 
612
/****** sig_thread() ***********************************************************
 
613
*  NAME
 
614
*     sig_thread() -- Signal handler thread
 
615
*
 
616
*  SYNOPSIS
 
617
*     void *sig_thread(void *dummy)
 
618
*
 
619
*  FUNCTION
 
620
*     Waits for a SIGINT or SIGTERM and then calls qsub_terminate().
 
621
*
 
622
*  INPUT
 
623
*     void *dummy - Unused
 
624
*
 
625
*  RESULT
 
626
*     static void * - Always NULL
 
627
*
 
628
*  NOTES
 
629
*     MT-NOTES: sig_thread() is MT safe
 
630
*******************************************************************************/
 
631
static void *sig_thread(void *dummy)
 
632
{
 
633
   int sig;
 
634
   sigset_t signal_set;
 
635
   dstring diag = DSTRING_INIT;
 
636
 
 
637
   sigemptyset(&signal_set);
 
638
   sigaddset(&signal_set, SIGINT);
 
639
   sigaddset(&signal_set, SIGTERM);
 
640
 
 
641
   /* Set up this thread so that when japi_exit() gets called, the GDI is
 
642
    * ready for use. */
 
643
   japi_init_mt(&diag);
 
644
 
 
645
   /* We don't care about sigwait's return(error) code because our response
 
646
    * to an error would be the same thing we're doing anyway: shutting down. */
 
647
   sigwait(&signal_set, &sig);
 
648
   
 
649
   qsub_terminate();
 
650
   
 
651
   return (void *)NULL;
 
652
}
 
653
 
 
654
/****** report_exit_status() ***************************************************
 
655
*  NAME
 
656
*     report_exit_status() -- Prints a job's exit status
 
657
*
 
658
*  SYNOPSIS
 
659
*     static int report_exit_status(int stat, const char *jobid)
 
660
*
 
661
*  FUNCTION
 
662
*     Prints a job's exit status to stdout.
 
663
*
 
664
*  INPUT
 
665
*     int stat          - The job's exit status
 
666
*     const char *jobid - The job id string
 
667
*
 
668
*  RESULT
 
669
*     static int        - The exit code of the job
 
670
*
 
671
*  NOTES
 
672
*     MT-NOTES: report_exit_status() is MT safe
 
673
*******************************************************************************/
 
674
static int report_exit_status(int stat, const char *jobid)
 
675
{
 
676
   int aborted, exited, signaled;
 
677
   int exit_status = 0;
 
678
   
 
679
   japi_wifaborted(&aborted, stat, NULL);
 
680
 
 
681
   if (aborted) {
 
682
      printf(MSG_QSUB_JOBNEVERRAN_S, jobid);
 
683
   } else {
 
684
      japi_wifexited(&exited, stat, NULL);
 
685
      if (exited) {
 
686
         japi_wexitstatus(&exit_status, stat, NULL);
 
687
         printf(MSG_QSUB_JOBEXITED_SI, jobid, exit_status);
 
688
      } else {
 
689
         japi_wifsignaled(&signaled, stat, NULL);
 
690
         
 
691
         if (signaled) {
 
692
            dstring termsig = DSTRING_INIT;
 
693
            japi_wtermsig(&termsig, stat, NULL);
 
694
            printf(MSG_QSUB_JOBRECEIVEDSIGNAL_SS, jobid,
 
695
                    sge_dstring_get_string(&termsig));
 
696
            sge_dstring_free(&termsig);
 
697
         } else {
 
698
            printf(MSG_QSUB_JOBFINISHUNCLEAR_S, jobid);
 
699
         }
 
700
 
 
701
         exit_status = 1;
 
702
      }
 
703
   }
 
704
   printf("\n");
 
705
   
 
706
   return exit_status;
 
707
}
 
708
 
 
709
/****** error_handler() ********************************************************
 
710
*  NAME
 
711
*     error_handler() -- Prints JAPI error messages
 
712
*
 
713
*  SYNOPSIS
 
714
*     static void error_handler(const char *message)
 
715
*
 
716
*  FUNCTION
 
717
*     Prints error messages from JAPI event client thread to stderr
 
718
*
 
719
*  INPUT
 
720
*     const char *message - The message to print
 
721
*
 
722
*  NOTES
 
723
*     MT-NOTES: error_handler() is MT safe
 
724
*******************************************************************************/
 
725
static void error_handler(const char *message)
 
726
{
 
727
   fprintf(stderr, message);
 
728
}