1
/*___INFO__MARK_BEGIN__*/
2
/*************************************************************************
4
* The Contents of this file are made available subject to the terms of
5
* the Sun Industry Standards Source License Version 1.2
7
* Sun Microsystems Inc., March, 2001
10
* Sun Industry Standards Source License Version 1.2
11
* =================================================
12
* The contents of this file are subject to the Sun Industry Standards
13
* Source License Version 1.2 (the "License"); You may not use this file
14
* except in compliance with the License. You may obtain a copy of the
15
* License at http://gridengine.sunsource.net/Gridengine_SISSL_license.html
17
* Software provided under this License is provided on an "AS IS" basis,
18
* WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING,
19
* WITHOUT LIMITATION, WARRANTIES THAT THE SOFTWARE IS FREE OF DEFECTS,
20
* MERCHANTABLE, FIT FOR A PARTICULAR PURPOSE, OR NON-INFRINGING.
21
* See the License for the specific provisions governing your rights and
22
* obligations concerning the Software.
24
* The Initial Developer of the Original Code is: Sun Microsystems, Inc.
26
* Copyright: 2001 by Sun Microsystems, Inc.
28
* All Rights Reserved.
30
************************************************************************/
31
/*___INFO__MARK_END__*/
36
#include "sge_all_listsL.h"
38
#include "parse_qsub.h"
39
#include "parse_job_cull.h"
40
#include "read_defaults.h"
43
#include "sig_handlers.h"
46
#include "sge_afsutil.h"
47
#include "setup_path.h"
49
#include "sge_unistd.h"
50
#include "sge_security.h"
51
#include "sge_answer.h"
55
#include "lck/sge_mtutil.h"
56
#include "uti/sge_log.h"
57
#include "sge_profiling.h"
58
#include "gdi/sge_gdi_ctx.h"
60
#include "msg_clients_common.h"
62
#include "msg_qmaster.h"
63
#include "basis_types.h"
64
#include "msg_common.h"
67
extern sge_gdi_ctx_class_t *ctx;
69
extern char **environ;
70
static pthread_mutex_t exit_mutex = PTHREAD_MUTEX_INITIALIZER;
71
static pthread_cond_t exit_cv = PTHREAD_COND_INITIALIZER;
72
static bool exited = false;
74
static char *get_bulk_jobid_string(long job_id, int start, int end, int step);
75
static void qsub_setup_sig_handlers(void);
76
static void qsub_terminate(void);
77
static void *sig_thread(void *dummy);
78
static int report_exit_status(int stat, const char *jobid);
79
static void error_handler(const char *message);
81
int main(int argc, char **argv);
83
/************************************************************************/
88
lList *opts_cmdline = NULL;
89
lList *opts_defaults = NULL;
90
lList *opts_scriptfile = NULL;
91
lList *opts_all = NULL;
92
lListElem *job = NULL;
100
int wait_for_job = 0, is_immediate = 0;
101
dstring session_key_out = DSTRING_INIT;
102
dstring diag = DSTRING_INIT;
103
dstring jobid = DSTRING_INIT;
104
bool has_terse = false;
105
u_long32 start, end, step;
108
char *jobid_string = NULL;
109
drmaa_attr_values_t *jobids = NULL;
111
u_long32 prog_number = 0;
113
const char *sge_root = NULL;
114
const char *cell_root = NULL;
115
const char *username = NULL;
116
const char *qualified_hostname = NULL;
117
const char *unqualified_hostname = NULL;
118
const char *mastername = NULL;
120
DENTER_MAIN(TOP_LAYER, "qsub");
124
/* Set up the program information name */
125
sge_setup_sig_handlers(QSUB);
127
DPRINTF(("Initializing JAPI\n"));
129
if (japi_init(NULL, NULL, NULL, QSUB, false, NULL, &diag)
130
!= DRMAA_ERRNO_SUCCESS) {
131
fprintf(stderr, "\n");
132
fprintf(stderr, MSG_QSUB_COULDNOTINITIALIZEENV_S,
133
sge_dstring_get_string(&diag));
134
fprintf(stderr, "\n");
136
SGE_EXIT((void**)&ctx, 1);
139
prog_number = ctx->get_who(ctx);
140
myuid = ctx->get_uid(ctx);
141
sge_root = ctx->get_sge_root(ctx);
142
cell_root = ctx->get_cell_root(ctx);
143
username = ctx->get_username(ctx);
144
qualified_hostname = ctx->get_qualified_hostname(ctx);
145
unqualified_hostname = ctx->get_unqualified_hostname(ctx);
146
mastername = ctx->get_master(ctx, false);
149
* read switches from the various defaults files
151
opt_list_append_opts_from_default_files(prog_number, cell_root, username, &opts_defaults, &alp, environ);
152
tmp_ret = answer_list_print_err_warn(&alp, NULL, NULL, MSG_WARNING);
155
SGE_EXIT((void**)&ctx, tmp_ret);
159
* append the commandline switches to the list
161
opt_list_append_opts_from_qsub_cmdline(prog_number, &opts_cmdline, &alp,
163
tmp_ret = answer_list_print_err_warn(&alp, NULL, "qsub: ", MSG_QSUB_WARNING_S);
166
SGE_EXIT((void**)&ctx, tmp_ret);
170
* show usage if -help was in commandline
172
if (opt_list_has_X(opts_cmdline, "-help")) {
173
sge_usage(QSUB, stdout);
175
SGE_EXIT((void**)&ctx, 0);
179
* Check if -terse is requested
181
if (opt_list_has_X(opts_cmdline, "-terse")) {
186
* We will only read commandline options from scripfile if the script
187
* itself should not be handled as binary
189
if (opt_list_is_X_true(opts_cmdline, "-b") ||
190
(!opt_list_has_X(opts_cmdline, "-b") &&
191
opt_list_is_X_true(opts_defaults, "-b"))) {
192
DPRINTF(("Skipping options from script due to -b option\n"));
194
opt_list_append_opts_from_script(prog_number,
195
&opts_scriptfile, &alp,
196
opts_cmdline, environ);
197
tmp_ret = answer_list_print_err_warn(&alp, NULL, MSG_QSUB_COULDNOTREADSCRIPT_S,
201
SGE_EXIT((void**)&ctx, tmp_ret);
206
* Merge all commandline options and interprete them
208
opt_list_merge_command_lines(&opts_all, &opts_defaults,
209
&opts_scriptfile, &opts_cmdline);
211
/* If "-sync y" is set, wait for the job to end. */
212
/* Remove all -sync switches since cull_parse_job_parameter()
213
* doesn't know what to do with them. */
214
while ((ep = lGetElemStr(opts_all, SPA_switch, "-sync"))) {
215
if (lGetInt(ep, SPA_argval_lIntT) == TRUE) {
219
lRemoveElem(opts_all, &ep);
222
i_opt = lGetElemStr(opts_all, SPA_switch, "-i");
223
o_opt = lGetElemStr(opts_all, SPA_switch, "-o");
225
if (opt_list_is_X_true(opts_cmdline, "-i")) {
226
if (i_opt == o_opt) {
227
fprintf(stderr, MSG_PARSE_SAMEPATHFORINPUTANDOUTPUT_SS,
229
fprintf(stderr, "\n");
231
SGE_EXIT((void**)&ctx, 1);
237
DPRINTF(("Wait for job end\n"));
240
alp = cull_parse_job_parameter(myuid, username, cell_root, unqualified_hostname, qualified_hostname, opts_all, &job);
242
tmp_ret = answer_list_print_err_warn(&alp, NULL, "qsub: ", MSG_WARNING);
245
SGE_EXIT((void**)&ctx, tmp_ret);
248
if (set_sec_cred(sge_root, mastername, job, &alp) != 0) {
249
answer_list_output(&alp);
251
SGE_EXIT((void**)&ctx, 1);
254
/* Check is we're just verifying the job */
255
just_verify = (lGetUlong(job, JB_verify_suitable_queues)==JUST_VERIFY);
256
DPRINTF(("Just verifying job\n"));
258
/* Check if job is immediate */
259
is_immediate = (int)JOB_TYPE_IS_IMMEDIATE(lGetUlong(job, JB_type));
260
DPRINTF(("Job is%s immediate\n", is_immediate ? "" : " not"));
262
DPRINTF(("Everything ok\n"));
264
if (lGetUlong(job, JB_verify)) {
265
cull_show_job(job, 0);
267
SGE_EXIT((void**)&ctx, 0);
270
if (is_immediate || wait_for_job) {
273
qsub_setup_sig_handlers();
275
if (pthread_create(&sigt, NULL, sig_thread, (void *)NULL) != 0) {
276
fprintf(stderr, "\n");
277
fprintf(stderr, MSG_QSUB_COULDNOTINITIALIZEENV_S,
278
" error preparing signal handling thread");
279
fprintf(stderr, "\n");
285
if (japi_enable_job_wait(username, unqualified_hostname, NULL, &session_key_out, error_handler, &diag) ==
286
DRMAA_ERRNO_DRM_COMMUNICATION_FAILURE) {
287
const char *msg = sge_dstring_get_string(&diag);
288
fprintf(stderr, "\n");
289
fprintf(stderr, MSG_QSUB_COULDNOTINITIALIZEENV_S,
290
msg?msg:" error starting event client thread");
291
fprintf(stderr, "\n");
298
job_get_submit_task_ids(job, &start, &end, &step);
299
num_tasks = (end - start) / step + 1;
302
int error = japi_run_bulk_jobs(&jobids, job, start, end, step, &diag);
303
if (error != DRMAA_ERRNO_SUCCESS) {
304
/* No active session here means that japi_enable_job_wait() was
305
* interrupted by the signal handler, in which case we just break out
307
if (error != DRMAA_ERRNO_NO_ACTIVE_SESSION) {
308
fprintf(stderr, MSG_QSUB_COULDNOTRUNJOB_S,
309
sge_dstring_get_string(&diag));
310
fprintf(stderr, "\n");
313
/* BUGFIX: Issuezilla #1013
314
* To quickly fix this issue, I'm mapping the JAPI/DRMAA error code
315
* back into a GDI error code. This is the easy solution. The
316
* correct solution would be to address issue #859, presumably by
317
* having JAPI reuse the GDI error codes instead of the JAPI error
319
if (error == DRMAA_ERRNO_TRY_LATER) {
320
exit_status = STATUS_NOTOK_DOAGAIN;
329
DPRINTF(("job id is: %ld\n", jobids->it.ji.jobid));
331
jobid_string = get_bulk_jobid_string((long)jobids->it.ji.jobid, start, end, step);
333
else if (num_tasks == 1) {
334
int error = japi_run_job(&jobid, job, &diag);
336
if (error != DRMAA_ERRNO_SUCCESS) {
337
if (error != DRMAA_ERRNO_NO_ACTIVE_SESSION) {
338
fprintf(stderr, MSG_QSUB_COULDNOTRUNJOB_S,
339
sge_dstring_get_string(&diag));
340
fprintf(stderr, "\n");
343
/* BUGFIX: Issuezilla #1013
344
* To quickly fix this issue, I'm mapping the JAPI/DRMAA error code
345
* back into a GDI error code. This is the easy solution. The
346
* correct solution would be to address issue #859, presumably by
347
* having JAPI reuse the GDI error codes instead of the DRMAA error
349
if (error == DRMAA_ERRNO_TRY_LATER) {
350
exit_status = STATUS_NOTOK_DOAGAIN;
359
jobid_string = strdup(sge_dstring_get_string(&jobid));
360
DPRINTF(("job id is: %s\n", jobid_string));
362
sge_dstring_free(&jobid);
365
fprintf(stderr, MSG_QSUB_COULDNOTRUNJOB_S, "invalid task structure");
366
fprintf(stderr, "\n");
372
/* only success message is printed to stdout */
374
const char *output = sge_dstring_get_string(&diag);
376
/* print the tersed output */
378
printf("%s", jobid_string);
379
} else if (output != NULL) {
380
printf("%s", output);
382
printf(MSG_QSUB_YOURJOBHASBEENSUBMITTED_SS, jobid_string, lGetString(job, JB_job_name));
386
printf(MSG_JOB_VERIFYFOUNDQ);
390
if ((wait_for_job || is_immediate) && !just_verify) {
394
fprintf(stderr, "%s\n", MSG_QSUB_WAITINGFORIMMEDIATEJOBTOBESCHEDULED);
396
/* We only need to wait for the first task to be scheduled to be able
397
* to say that the job is running. */
398
tmp_ret = japi_wait(DRMAA_JOB_IDS_SESSION_ANY, &jobid, &stat,
399
DRMAA_TIMEOUT_WAIT_FOREVER, JAPI_JOB_START, &event,
402
if ((tmp_ret == DRMAA_ERRNO_SUCCESS) && (event == JAPI_JOB_START)) {
403
fprintf(stderr, "\n");
404
fprintf(stderr, MSG_QSUB_YOURIMMEDIATEJOBXHASBEENSUCCESSFULLYSCHEDULED_S,
406
fprintf(stderr, "\n");
408
/* A job finish event here means that the job was rejected. */
409
else if ((tmp_ret == DRMAA_ERRNO_SUCCESS) &&
410
(event == JAPI_JOB_FINISH)) {
411
fprintf(stderr, "\n%s\n", MSG_QSUB_YOURQSUBREQUESTCOULDNOTBESCHEDULEDDTRYLATER);
417
/* Since we told japi_wait to wait forever, we know that if it gets
418
* a timeout, it's because it's been interrupted to exit, in which
419
* case we don't complain. Same for no active session. */
420
if ((tmp_ret != DRMAA_ERRNO_EXIT_TIMEOUT) &&
421
(tmp_ret != DRMAA_ERRNO_NO_ACTIVE_SESSION)) {
422
fprintf(stderr, "\n");
423
fprintf(stderr, MSG_QSUB_COULDNOTWAITFORJOB_S,
424
sge_dstring_get_string(&diag));
425
fprintf(stderr, "\n");
434
/* Rather than using japi_synchronize on ALL for bulk jobs, we use
435
* japi_wait on ANY num_tasks times because with synchronize, we would
436
* have to wait for all the tasks to finish before we know if any
438
for (count = 0; count < num_tasks; count++) {
439
/* Since there's only one running job in the session, we can just
441
if ((tmp_ret = japi_wait(DRMAA_JOB_IDS_SESSION_ANY, &jobid, &stat,
442
DRMAA_TIMEOUT_WAIT_FOREVER, JAPI_JOB_FINISH, &event,
443
NULL, &diag)) != DRMAA_ERRNO_SUCCESS) {
444
if ((tmp_ret != DRMAA_ERRNO_EXIT_TIMEOUT) &&
445
(tmp_ret != DRMAA_ERRNO_NO_ACTIVE_SESSION)) {
446
fprintf(stderr, "\n");
447
fprintf(stderr, MSG_QSUB_COULDNOTWAITFORJOB_S, sge_dstring_get_string(&diag));
448
fprintf(stderr, "\n");
455
/* report how job finished */
456
/* If the job is an array job, use the first non-zero exit code as
457
* the exit code for qsub. */
458
if (exit_status == 0) {
459
exit_status = report_exit_status(stat,
460
sge_dstring_get_string(&jobid));
462
/* If we've already found a non-zero exit code, just print the exit
463
* info for the task. */
465
report_exit_status(stat, sge_dstring_get_string(&jobid));
474
lFreeList(&opts_all);
476
if ((tmp_ret = japi_exit(JAPI_EXIT_NO_FLAG, &diag)) != DRMAA_ERRNO_SUCCESS) {
477
if (tmp_ret != DRMAA_ERRNO_NO_ACTIVE_SESSION) {
478
fprintf(stderr, "\n");
479
fprintf(stderr, MSG_QSUB_COULDNOTFINALIZEENV_S, sge_dstring_get_string(&diag));
480
fprintf(stderr, "\n");
484
/* We know that if we get a DRMAA_ERRNO_NO_ACTIVE_SESSION here, it's
485
* because the signal handler thread called japi_exit(). We know this
486
* because if the call to japi_init() fails, we just exit directly.
487
* If the call to japi_init() succeeds, then we have an active session,
488
* so coming here because of an error would not result in the
489
* DRMAA_ERRNO_NO_ACTIVE_SESSION error. */
490
DPRINTF(("Sleeping for 15 seconds to wait for the exit to finish.\n"));
492
sge_relative_timespec(15, &ts);
493
sge_mutex_lock("qsub_exit_mutex", SGE_FUNC, __LINE__, &exit_mutex);
496
if (pthread_cond_timedwait(&exit_cv, &exit_mutex, &ts) == ETIMEDOUT) {
497
DPRINTF(("Exit has not finished after 15 seconds. Exiting.\n"));
502
sge_mutex_unlock("qsub_exit_mutex", SGE_FUNC, __LINE__, &exit_mutex);
508
/* This is an exit() instead of an SGE_EXIT() because when the qmaster is
509
* supended, SGE_EXIT() hangs. */
515
/****** get_bulk_jobid_string() ************************************************
517
* get_bulk_jobid_string() -- Turn the job id and parameters into a string
520
* char *get_bulk_jobid_string(long job_id, int start, int end, int step)
523
* Creates a string from the job id, start task, end task, and task step.
524
* The return job id string must be freed by the caller.
527
* long job_id - The job's id number
528
* int start - The number of the first task in the job
529
* int end - The number of the last task in the job
530
* int step - The increment between job task numbers
533
* static char * - The job id string
536
* MT-NOTES: get_bulk_jobid_string() is MT safe
537
*******************************************************************************/
538
static char *get_bulk_jobid_string(long job_id, int start, int end, int step)
540
char *jobid_str = (char *)malloc(sizeof(char) * 1024);
541
char *ret_str = NULL;
543
sprintf(jobid_str, "%ld.%d-%d:%d", job_id, start, end, step);
544
ret_str = strdup(jobid_str);
550
/****** qsub_setup_sig_handlers() **********************************************
552
* qsub_setup_sig_handlers() -- Set up the signal handlers
555
* void qsub_setup_sig_handlers(void)
558
* Blocks all signals so that the signal handler thread receives them.
561
* MT-NOTES: get_bulk_jobid_string() is MT safe
562
*******************************************************************************/
563
static void qsub_setup_sig_handlers(void)
567
sigfillset(&sig_set);
568
pthread_sigmask(SIG_BLOCK, &sig_set, NULL);
571
/****** qsub_terminate() *******************************************************
573
* qsub_terminate() -- Terminates qsub
576
* void qsub_terminate(void)
579
* Prints out messages that qsub is ending and exits JAPI.
582
* MT-NOTES: qsub_terminate() is MT safe
583
*******************************************************************************/
584
static void qsub_terminate(void)
586
dstring diag = DSTRING_INIT;
589
fprintf(stderr, "\n%s\n", MSG_QSUB_INTERRUPTED);
590
fprintf(stderr, "%s\n", MSG_QSUB_TERMINATING);
592
tmp_ret = japi_exit(JAPI_EXIT_KILL_PENDING, &diag);
594
/* No active session here means that the main thread beat us to exiting,
595
in which case, we just quietly give up and go away. */
596
if ((tmp_ret != DRMAA_ERRNO_SUCCESS) &&
597
(tmp_ret != DRMAA_ERRNO_NO_ACTIVE_SESSION)) {
598
fprintf(stderr, "\n");
599
fprintf(stderr, MSG_QSUB_COULDNOTFINALIZEENV_S,
600
sge_dstring_get_string(&diag));
601
fprintf(stderr, "\n");
604
sge_dstring_free(&diag);
606
sge_mutex_lock("qsub_exit_mutex", "qsub_terminate", __LINE__, &exit_mutex);
608
pthread_cond_signal(&exit_cv);
609
sge_mutex_unlock("qsub_exit_mutex", "qsub_terminate", __LINE__, &exit_mutex);
612
/****** sig_thread() ***********************************************************
614
* sig_thread() -- Signal handler thread
617
* void *sig_thread(void *dummy)
620
* Waits for a SIGINT or SIGTERM and then calls qsub_terminate().
623
* void *dummy - Unused
626
* static void * - Always NULL
629
* MT-NOTES: sig_thread() is MT safe
630
*******************************************************************************/
631
static void *sig_thread(void *dummy)
635
dstring diag = DSTRING_INIT;
637
sigemptyset(&signal_set);
638
sigaddset(&signal_set, SIGINT);
639
sigaddset(&signal_set, SIGTERM);
641
/* Set up this thread so that when japi_exit() gets called, the GDI is
645
/* We don't care about sigwait's return(error) code because our response
646
* to an error would be the same thing we're doing anyway: shutting down. */
647
sigwait(&signal_set, &sig);
654
/****** report_exit_status() ***************************************************
656
* report_exit_status() -- Prints a job's exit status
659
* static int report_exit_status(int stat, const char *jobid)
662
* Prints a job's exit status to stdout.
665
* int stat - The job's exit status
666
* const char *jobid - The job id string
669
* static int - The exit code of the job
672
* MT-NOTES: report_exit_status() is MT safe
673
*******************************************************************************/
674
static int report_exit_status(int stat, const char *jobid)
676
int aborted, exited, signaled;
679
japi_wifaborted(&aborted, stat, NULL);
682
printf(MSG_QSUB_JOBNEVERRAN_S, jobid);
684
japi_wifexited(&exited, stat, NULL);
686
japi_wexitstatus(&exit_status, stat, NULL);
687
printf(MSG_QSUB_JOBEXITED_SI, jobid, exit_status);
689
japi_wifsignaled(&signaled, stat, NULL);
692
dstring termsig = DSTRING_INIT;
693
japi_wtermsig(&termsig, stat, NULL);
694
printf(MSG_QSUB_JOBRECEIVEDSIGNAL_SS, jobid,
695
sge_dstring_get_string(&termsig));
696
sge_dstring_free(&termsig);
698
printf(MSG_QSUB_JOBFINISHUNCLEAR_S, jobid);
709
/****** error_handler() ********************************************************
711
* error_handler() -- Prints JAPI error messages
714
* static void error_handler(const char *message)
717
* Prints error messages from JAPI event client thread to stderr
720
* const char *message - The message to print
723
* MT-NOTES: error_handler() is MT safe
724
*******************************************************************************/
725
static void error_handler(const char *message)
727
fprintf(stderr, message);