1
/*___INFO__MARK_BEGIN__*/
2
/*************************************************************************
4
* The Contents of this file are made available subject to the terms of
5
* the Sun Industry Standards Source License Version 1.2
7
* Sun Microsystems Inc., March, 2001
10
* Sun Industry Standards Source License Version 1.2
11
* =================================================
12
* The contents of this file are subject to the Sun Industry Standards
13
* Source License Version 1.2 (the "License"); You may not use this file
14
* except in compliance with the License. You may obtain a copy of the
15
* License at http://gridengine.sunsource.net/Gridengine_SISSL_license.html
17
* Software provided under this License is provided on an "AS IS" basis,
18
* WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING,
19
* WITHOUT LIMITATION, WARRANTIES THAT THE SOFTWARE IS FREE OF DEFECTS,
20
* MERCHANTABLE, FIT FOR A PARTICULAR PURPOSE, OR NON-INFRINGING.
21
* See the License for the specific provisions governing your rights and
22
* obligations concerning the Software.
24
* The Initial Developer of the Original Code is: Sun Microsystems, Inc.
26
* Copyright: 2001 by Sun Microsystems, Inc.
28
* All Rights Reserved.
30
************************************************************************/
31
/*___INFO__MARK_END__*/
39
#include "sge_ja_task.h"
43
#include "sge_signal.h"
45
#include "sge_queue_event_master.h"
46
#include "sge_qmod_qmaster.h"
47
#include "sge_job_qmaster.h"
48
#include "sge_give_jobs.h"
50
#include "sge_parse_num_par.h"
51
#include "sge_pe_qmaster.h"
52
#include "sge_string.h"
57
#include "reschedule.h"
58
#include "sge_security.h"
60
#include "sge_answer.h"
62
#include "sge_hostname.h"
63
#include "sge_manop.h"
64
#include "sge_qinstance.h"
65
#include "sge_qinstance_state.h"
66
#include "sge_qinstance_qmaster.h"
67
#include "sge_cqueue_qmaster.h"
68
#include "sge_range.h"
69
#include "sge_centry.h"
70
#include "sge_calendar.h"
71
#include "sge_cqueue.h"
75
#include "sge_persistence_qmaster.h"
76
#include "sge_reporting_qmaster.h"
77
#include "spool/sge_spooling.h"
79
#include "msg_common.h"
80
#include "msg_qmaster.h"
83
/*-------------------------------------------------------------------------*/
84
static void signal_slave_jobs_in_queue(sge_gdi_ctx_class_t *ctx, int how, lListElem *jep, monitoring_t *monitor);
86
static void signal_slave_tasks_of_job(sge_gdi_ctx_class_t *ctx, int how, lListElem *jep, lListElem *jatep,
87
monitoring_t *monitor);
89
static int sge_change_queue_state(sge_gdi_ctx_class_t *ctx,
90
char *user, char *host, lListElem *qep,
91
u_long32 action, u_long32 force, lList **answer,
92
monitoring_t *monitor);
94
static int sge_change_job_state(sge_gdi_ctx_class_t *ctx,
95
char *user, char *host, lListElem *jep, lListElem *jatep,
96
u_long32 task_id, u_long32 action, u_long32 force,
97
lList **answer, monitoring_t *monitor);
99
static int qmod_queue_weakclean(sge_gdi_ctx_class_t *ctx,
100
lListElem *qep, u_long32 force, lList **answer,
101
char *user, char *host, int isoperator, int isowner,
102
monitoring_t *monitor);
104
static int qmod_queue_clean(sge_gdi_ctx_class_t *ctx,
105
lListElem *qep, u_long32 force, lList **answer,
106
char *user, char *host, int isoperator, int isowner,
107
monitoring_t *monitor);
109
static void qmod_job_suspend(sge_gdi_ctx_class_t *ctx,
110
lListElem *jep, lListElem *jatep, lListElem *queueep,
111
u_long32 force, lList **answer, char *user, char *host,
112
monitoring_t *monitor);
114
static void qmod_job_unsuspend(sge_gdi_ctx_class_t *ctx,
115
lListElem *jep, lListElem *jatep, lListElem *queueep,
116
u_long32 force, lList **answer, char *user, char *host,
117
monitoring_t *monitor);
119
static void qmod_job_reschedule(sge_gdi_ctx_class_t *ctx,
120
lListElem *jep, lListElem *jatep, lListElem *queueep,
121
u_long32 force, lList **answer, char *user, char *host,
122
monitoring_t *monitor);
124
/*-------------------------------------------------------------------------*/
127
sge_gdi_qmod(sge_gdi_ctx_class_t *ctx, sge_gdi_packet_class_t *packet, sge_gdi_task_class_t *task,
128
monitoring_t *monitor)
132
lListElem *jatask = NULL, *rn, *job, *tmp_task;
135
u_long32 start = 0, end = 0, step = 0;
137
lList *master_hgroup_list = *(object_type_get_master_list(SGE_TYPE_HGROUP));
138
lList *cqueue_list = *(object_type_get_master_list(SGE_TYPE_CQUEUE));
139
dstring cqueue_buffer = DSTRING_INIT;
140
dstring hostname_buffer = DSTRING_INIT;
142
DENTER(TOP_LAYER, "sge_gdi_qmod");
145
if (!packet->host || (strlen(packet->user) == 0) || !packet->commproc) {
146
CRITICAL((SGE_EVENT, MSG_SGETEXT_NULLPTRPASSED_S, SGE_FUNC));
147
answer_list_add(&(task->answer_list), SGE_EVENT,
148
STATUS_EUNKNOWN, ANSWER_QUALITY_ERROR);
149
sge_dstring_free(&cqueue_buffer);
150
sge_dstring_free(&hostname_buffer);
156
** loop over the ids and change queue or job state and signal them
159
for_each(dep, task->data_list) {
160
lList *tmp_list = NULL;
161
lList *qref_list = NULL;
162
bool found_something = true;
163
u_long32 id_action = lGetUlong(dep, ID_action);
167
if ((id_action & JOB_DO_ACTION) == 0) {
168
qref_list_add(&qref_list, NULL, lGetString(dep, ID_str));
169
qref_list_resolve_hostname(qref_list);
170
qref_list_resolve(qref_list, NULL, &tmp_list,
171
&found_something, cqueue_list,
174
if (found_something) {
175
lListElem *qref = NULL;
177
id_action = (id_action & (~QUEUE_DO_ACTION));
179
for_each(qref, tmp_list) {
180
const char *full_name = NULL;
181
const char *cqueue_name = NULL;
182
const char *hostname = NULL;
183
bool has_hostname = false;
184
bool has_domain = false;
185
lListElem *cqueue = NULL;
186
lListElem *qinstance = NULL;
187
lList *qinstance_list = NULL;
189
full_name = lGetString(qref, QR_name);
190
cqueue_name_split(full_name, &cqueue_buffer, &hostname_buffer,
191
&has_hostname, &has_domain);
192
cqueue_name = sge_dstring_get_string(&cqueue_buffer);
193
hostname = sge_dstring_get_string(&hostname_buffer);
194
cqueue = lGetElemStr(cqueue_list, CQ_name, cqueue_name);
195
qinstance_list = lGetList(cqueue, CQ_qinstances);
196
qinstance = lGetElemHost(qinstance_list, QU_qhostname, hostname);
198
sge_change_queue_state(ctx, packet->user, packet->host, qinstance,
199
id_action, lGetUlong(dep, ID_force),
204
lFreeList(&qref_list);
205
lFreeList(&tmp_list);
208
bool is_jobName_suport = false;
209
u_long action = lGetUlong(dep, ID_action);
210
if ((action & JOB_DO_ACTION) > 0 &&
211
(action & QUEUE_DO_ACTION) == 0) {
212
action = (action & (~JOB_DO_ACTION));
213
is_jobName_suport = true;
217
** We found no queue so look for a job. This only makes sense for
218
** suspend, unsuspend and reschedule
220
if (sge_strisint(lGetString(dep, ID_str)) &&
221
(action == QI_DO_SUSPEND ||
222
action == QI_DO_RESCHEDULE ||
223
action == QI_DO_CLEARERROR ||
224
action == QI_DO_UNSUSPEND)) {
225
jobid = strtol(lGetString(dep, ID_str), NULL, 10);
227
rn = lFirst(lGetList(dep, ID_ja_structure));
229
start = lGetUlong(rn, RN_min);
231
end = lGetUlong(rn, RN_max);
232
step = lGetUlong(rn, RN_step);
238
end = (u_long32)LONG_MAX;
249
job = job_list_locate(*(object_type_get_master_list(SGE_TYPE_JOB)), jobid);
251
jatask = lFirst(lGetList(job, JB_ja_tasks));
253
while ((tmp_task = jatask)) {
254
u_long32 task_number;
256
jatask = lNext(tmp_task);
257
task_number = lGetUlong(tmp_task, JAT_task_number);
258
if ((task_number >= start && task_number <= end &&
259
((task_number-start)%step) == 0) || alltasks) {
260
DPRINTF(("Modify job: "sge_u32"."sge_u32"\n", jobid,
263
/* this specifies no queue, so lets probe for a job */
264
/* change state of job: */
265
sge_change_job_state(ctx, packet->user, packet->host, job, tmp_task, 0,
266
action, lGetUlong(dep, ID_force), &alp, monitor);
271
/* create more precise GDI answers also for pending jobs/tasks and jobs/tasks in hold state
272
When the operation is to be applied on the whole job array but no task is enrolled so far
273
(i.e. not found) only one single GDI answer is created. Otherwise one message is created
275
if (alltasks && job_is_array(job)) {
277
sge_change_job_state(ctx, packet->user, packet->host, job, NULL, 0,
278
action, lGetUlong(dep, ID_force), &alp, monitor);
283
u_long32 min, max, step;
286
/* handle all pending tasks */
287
for_each (range, lGetList(job, JB_ja_n_h_ids)) {
288
range_get_all_ids(range, &min, &max, &step);
289
for (taskid=min; taskid<=max; taskid+= step) {
290
if ((taskid >= start && taskid <= end &&
291
((taskid-start)%step) == 0) || alltasks) {
292
DPRINTF(("Modify job: "sge_u32"."sge_u32"\n", jobid,
294
sge_change_job_state(ctx, packet->user, packet->host, job, NULL, taskid,
295
action, lGetUlong(dep, ID_force), &alp, monitor);
301
/* handle all tasks in user hold */
302
for_each (range, lGetList(job, JB_ja_u_h_ids)) {
303
range_get_all_ids(range, &min, &max, &step);
304
for (taskid=min; taskid<=max; taskid+= step) {
305
if ((taskid >= start && taskid <= end &&
306
((taskid-start)%step) == 0) || alltasks) {
307
DPRINTF(("Modify job: "sge_u32"."sge_u32"\n", jobid,
309
sge_change_job_state(ctx, packet->user, packet->host, job, NULL, taskid,
310
action, lGetUlong(dep, ID_force), &alp, monitor);
316
/* handle all tasks in system hold that are not in user hold */
317
for_each (range, lGetList(job, JB_ja_s_h_ids)) {
318
range_get_all_ids(range, &min, &max, &step);
319
for (taskid=min; taskid<=max; taskid+= step) {
320
if (range_list_is_id_within(lGetList(job, JB_ja_u_h_ids), taskid)) {
323
if ((taskid >= start && taskid <= end &&
324
((taskid-start)%step) == 0) || alltasks) {
325
DPRINTF(("Modify job: "sge_u32"."sge_u32"\n", jobid,
327
sge_change_job_state(ctx, packet->user, packet->host, job, NULL, taskid,
328
action, lGetUlong(dep, ID_force), &alp, monitor);
334
/* handle all tasks in operator hold that are not in user hold or system hold */
335
for_each (range, lGetList(job, JB_ja_o_h_ids)) {
336
range_get_all_ids(range, &min, &max, &step);
337
for (taskid=min; taskid<=max; taskid+= step) {
338
if (range_list_is_id_within(lGetList(job, JB_ja_u_h_ids), taskid) ||
339
range_list_is_id_within(lGetList(job, JB_ja_s_h_ids), taskid)) {
342
if ((taskid >= start && taskid <= end &&
343
((taskid-start)%step) == 0) || alltasks) {
344
DPRINTF(("Modify job: "sge_u32"."sge_u32"\n", jobid,
346
sge_change_job_state(ctx, packet->user, packet->host, job, NULL, taskid,
347
action, lGetUlong(dep, ID_force), &alp, monitor);
355
/* job name or pattern was submitted */
356
else if (is_jobName_suport && (
357
action == QI_DO_SUSPEND ||
358
action == QI_DO_RESCHEDULE ||
359
action == QI_DO_CLEARERROR ||
360
action == QI_DO_UNSUSPEND)) {
362
const char *job_name = lGetString(dep, ID_str);
363
const lListElem *job;
364
lListElem *mod = NULL;
365
for_each(job, *(object_type_get_master_list(SGE_TYPE_JOB))) {
366
if (!fnmatch(job_name, lGetString(job, JB_job_name), 0)) {
368
mod = lCopyElem(dep);
369
sprintf(job_id, sge_u32, lGetUlong(job, JB_job_number));
370
lSetString(mod, ID_str, job_id);
371
lAppendElem(task->data_list, mod);
377
/* job id invalid or action invalid for jobs */
383
u_long action = lGetUlong(dep, ID_action);
385
if ((action & JOB_DO_ACTION)) {
386
action = action - JOB_DO_ACTION;
390
** If the action is QI_DO_UNSUSPEND or QI_DO_SUSPEND,
391
** 'invalid queue or job' will be printed,
392
** otherwise 'invalid queue' will be printed, because these actions
393
** are not suitable for jobs.
395
if ((action & QUEUE_DO_ACTION) == 0 && (
396
(action & JOB_DO_ACTION) != 0 ||
397
(action & QI_DO_SUSPEND) != 0 ||
398
(action & QI_DO_UNSUSPEND) != 0||
399
(action & QI_DO_CLEAN) != 0))
400
ERROR((SGE_EVENT, MSG_QUEUE_INVALIDQORJOB_S, lGetString(dep, ID_str)));
402
ERROR((SGE_EVENT, MSG_QUEUE_INVALIDQ_S, lGetString(dep, ID_str)));
403
answer_list_add(&alp, SGE_EVENT, STATUS_EEXIST, ANSWER_QUALITY_ERROR);
407
sge_dstring_free(&cqueue_buffer);
408
sge_dstring_free(&hostname_buffer);
410
task->answer_list = alp;
416
sge_change_queue_state(sge_gdi_ctx_class_t *ctx,
417
char *user, char *host, lListElem *qep, u_long32 action,
418
u_long32 force, lList **answer, monitoring_t *monitor)
423
const char *ehname = lGetHost(qep, QU_qhostname);
425
DENTER(TOP_LAYER, "sge_change_queue_state");
427
isowner = qinstance_check_owner(qep, user);
428
isoperator = manop_is_operator(user);
431
ERROR((SGE_EVENT, MSG_QUEUE_NOCHANGEQPERMS_SS, user, lGetString(qep, QU_full_name)));
432
answer_list_add(answer, SGE_EVENT, STATUS_ESEMANTIC, ANSWER_QUALITY_ERROR);
438
case QI_DO_CLEARERROR:
442
case QI_DO_UNSUSPEND:
443
#ifdef __SGE_QINSTANCE_STATE_DEBUG__
445
case QI_DO_SETORPHANED:
446
case QI_DO_CLEARORPHANED:
447
case QI_DO_SETUNKNOWN:
448
case QI_DO_CLEARUNKNOWN:
449
case QI_DO_SETAMBIGUOUS:
450
case QI_DO_CLEARAMBIGUOUS:
452
result = qinstance_change_state_on_command(ctx, qep, answer, action, force ? true : false, user, host, isoperator, isowner, monitor) ? 0 : -1;
455
result = qmod_queue_clean(ctx, qep, force, answer, user, host, isoperator, isowner, monitor);
458
case QI_DO_RESCHEDULE:
459
result = qmod_queue_weakclean(ctx, qep, force, answer, user, host, isoperator, isowner, monitor);
462
INFO((SGE_EVENT, MSG_LOG_QUNKNOWNQMODCMD_U, sge_u32c(action)));
463
answer_list_add(answer, SGE_EVENT, STATUS_OK, ANSWER_QUALITY_ERROR);
468
answer, 0, sgeE_QINSTANCE_MOD,
469
0, 0, lGetString(qep, QU_qname),
471
qep, NULL, NULL, true, true);
475
case QI_DO_RESCHEDULE:
476
cqueue_list_del_all_orphaned(ctx, *(object_type_get_master_list(SGE_TYPE_CQUEUE)), answer,
477
lGetString(qep, QU_qname), ehname);
487
static int sge_change_job_state(
488
sge_gdi_ctx_class_t *ctx,
497
monitoring_t *monitor
502
DENTER(TOP_LAYER, "sge_change_job_state");
504
job_id = lGetUlong(jep, JB_job_number);
506
if (strcmp(user, lGetString(jep, JB_owner)) && !manop_is_operator(user)) {
507
ERROR((SGE_EVENT, MSG_JOB_NOMODJOBPERMS_SU, user, sge_u32c(job_id)));
508
answer_list_add(answer, SGE_EVENT, STATUS_ENOTOWNER, ANSWER_QUALITY_ERROR);
514
/* unenrolled tasks always are not-running pending/hold */
516
WARNING((SGE_EVENT, MSG_QMODJOB_NOTENROLLED_UU, sge_u32c(job_id), sge_u32c(task_id)));
518
WARNING((SGE_EVENT, MSG_QMODJOB_NOTENROLLED_U, sge_u32c(job_id)));
519
answer_list_add(answer, SGE_EVENT, STATUS_ESEMANTIC, ANSWER_QUALITY_WARNING);
524
task_id = lGetUlong(jatep, JAT_task_number);
526
if (lGetString(jatep, JAT_master_queue)) {
527
queueep = cqueue_list_locate_qinstance(
528
*(object_type_get_master_list(SGE_TYPE_CQUEUE)),
529
lGetString(jatep, JAT_master_queue));
535
case QI_DO_RESCHEDULE:
536
qmod_job_reschedule(ctx, jep, jatep, queueep, force, answer, user, host, monitor);
540
qmod_job_suspend(ctx, jep, jatep, queueep, force, answer, user, host, monitor);
544
qmod_job_unsuspend(ctx, jep, jatep, queueep, force, answer, user, host, monitor);
547
case QI_DO_CLEARERROR:
548
if (VALID(JERROR, lGetUlong(jatep, JAT_state))) {
549
lSetUlong(jatep, JAT_state, lGetUlong(jatep, JAT_state) & ~JERROR);
550
ja_task_message_trash_all_of_type_X(jatep, 1);
551
/* lWriteElemTo(jatep, stderr); */
553
answer, 0, sgeE_JATASK_MOD,
554
job_id, task_id, NULL, NULL, NULL,
555
jep, jatep, NULL, true, true);
556
if (job_is_array(jep)) {
557
INFO((SGE_EVENT, MSG_JOB_CLEARERRORTASK_SSUU, user, host, sge_u32c(job_id), sge_u32c(task_id)));
559
INFO((SGE_EVENT, MSG_JOB_CLEARERRORJOB_SSU, user, host, sge_u32c(job_id)));
562
if (job_is_array(jep)) {
563
INFO((SGE_EVENT, MSG_JOB_NOERRORSTATETASK_UU, sge_u32c(job_id), sge_u32c(task_id)));
565
INFO((SGE_EVENT, MSG_JOB_NOERRORSTATEJOB_UU, sge_u32c(job_id)));
568
answer_list_add(answer, SGE_EVENT, STATUS_OK, ANSWER_QUALITY_ERROR);
572
INFO((SGE_EVENT, MSG_LOG_JOBUNKNOWNQMODCMD_U, sge_u32c(action)));
573
answer_list_add(answer, SGE_EVENT, STATUS_OK, ANSWER_QUALITY_ERROR);
582
**** qmod_queue_weakclean (static)
584
static int qmod_queue_weakclean(
585
sge_gdi_ctx_class_t *ctx,
593
monitoring_t *monitor
595
DENTER(TOP_LAYER, "qmod_queue_weakclean");
597
if (!isoperator && !isowner) {
598
ERROR((SGE_EVENT, MSG_QUEUE_NORESCHEDULEQPERMS_SS, user,
599
lGetString(qep, QU_full_name)));
600
answer_list_add(answer, SGE_EVENT, STATUS_ESEMANTIC, ANSWER_QUALITY_ERROR);
605
reschedule_jobs(ctx, qep, force, answer, monitor);
612
**** qmod_queue_clean (static)
614
**** cleans the specified queue (every job will be deleted)
615
**** The user will do this via qconf -cq <qname>
617
static int qmod_queue_clean(
618
sge_gdi_ctx_class_t *ctx,
626
monitoring_t *monitor
628
lListElem *nextjep, *jep;
629
const char *qname = NULL;
630
DENTER(TOP_LAYER, "qmod_queue_clean");
632
qname = lGetString(qep, QU_full_name);
634
DPRINTF(("cleaning queue >%s<\n", qname ));
636
if (!manop_is_manager(user)) {
637
ERROR((SGE_EVENT, MSG_QUEUE_NOCLEANQPERMS));
638
answer_list_add(answer, SGE_EVENT, STATUS_ESEMANTIC, ANSWER_QUALITY_ERROR);
643
/* using sge_commit_job(j, COMMIT_ST_FINISHED_FAILED) q->job_list
644
could get modified so we have to be careful when iterating through the job list */
645
nextjep = lFirst(*(object_type_get_master_list(SGE_TYPE_JOB)));
646
while ((jep=nextjep)) {
647
lListElem *jatep, *nexttep;
648
nextjep = lNext(jep);
650
nexttep = lFirst(lGetList(jep, JB_ja_tasks));
651
while ((jatep=nexttep)) {
652
nexttep = lNext(jatep);
654
if (lGetSubStr(jatep, JG_qname, qname, JAT_granted_destin_identifier_list) != NULL) {
655
/* 3: JOB_FINISH reports aborted */
656
sge_commit_job(ctx, jep, jatep, NULL, COMMIT_ST_FINISHED_FAILED_EE, COMMIT_DEFAULT | COMMIT_NEVER_RAN, monitor);
660
INFO((SGE_EVENT, MSG_QUEUE_PURGEQ_SSS, user, host, qname ));
661
answer_list_add(answer, SGE_EVENT, STATUS_OK, ANSWER_QUALITY_INFO);
667
**** qmod_job_reschedule (static)
669
static void qmod_job_reschedule(
670
sge_gdi_ctx_class_t *ctx,
678
monitoring_t *monitor
680
DENTER(TOP_LAYER, "qmod_job_reschedule");
682
reschedule_job(ctx, jep, jatep, queueep, force, answer, monitor);
687
**** qmod_job_suspend (static)
689
static void qmod_job_suspend(
690
sge_gdi_ctx_class_t *ctx,
698
monitoring_t *monitor
702
u_long32 jataskid = 0;
704
bool migrate_on_suspend = false;
707
DENTER(TOP_LAYER, "qmod_job_suspend");
711
jobid = lGetUlong(jep, JB_job_number);
712
jataskid = lGetUlong(jatep, JAT_task_number);
714
/* determine whether we actually migrate upon suspend */
715
if (lGetUlong(jep, JB_checkpoint_attr) & CHECKPOINT_SUSPEND)
716
migrate_on_suspend = true;
718
if (VALID(JSUSPENDED, lGetUlong(jatep, JAT_state))) {
719
/* this job is already suspended or lives in a suspended queue */
720
if (force && queueep) {
721
/* here force means to send the suspend signal again
722
this can only be done if we know the queue this job
724
if (sge_signal_queue(ctx, SGE_SIGSTOP, queueep, jep, jatep, monitor)) {
725
if (job_is_array(jep)) {
726
WARNING((SGE_EVENT, MSG_JOB_NOFORCESUSPENDTASK_SUU, user, sge_u32c(jobid), sge_u32c(jataskid)));
728
WARNING((SGE_EVENT, MSG_JOB_NOFORCESUSPENDJOB_SU, user, sge_u32c(jobid)));
730
answer_list_add(answer, SGE_EVENT, STATUS_ESEMANTIC, ANSWER_QUALITY_WARNING);
733
if (job_is_array(jep)) {
734
WARNING((SGE_EVENT, MSG_JOB_FORCESUSPENDTASK_SUU, user, sge_u32c(jobid), sge_u32c(jataskid)));
736
WARNING((SGE_EVENT, MSG_JOB_FORCESUSPENDJOB_SU, user, sge_u32c(jobid)));
738
answer_list_add(answer, SGE_EVENT, STATUS_ESEMANTIC, ANSWER_QUALITY_WARNING);
742
if (job_is_array(jep)) {
743
WARNING((SGE_EVENT, MSG_JOB_ALREADYSUSPENDED_SUU, user, sge_u32c(jobid), sge_u32c(jataskid)));
745
WARNING((SGE_EVENT, MSG_JOB_ALREADYSUSPENDED_SU, user, sge_u32c(jobid)));
747
answer_list_add(answer, SGE_EVENT, STATUS_ESEMANTIC, ANSWER_QUALITY_WARNING);
751
** may be the queue is suspended, than the job might not be
753
state = lGetUlong(jatep, JAT_state);
754
CLEARBIT(JRUNNING, state);
755
SETBIT(JSUSPENDED, state);
756
lSetUlong(jatep, JAT_state, state);
757
if (migrate_on_suspend)
758
lSetUlong(jatep, JAT_stop_initiate_time, now);
761
answer, 0, sgeE_JATASK_MOD,
762
jobid, jataskid, NULL, NULL, NULL,
763
jep, jatep, NULL, true, true);
765
else { /* job wasn't suspended yet */
767
if ((i = sge_signal_queue(ctx, SGE_SIGSTOP, queueep, jep, jatep, monitor))) {
768
if (job_is_array(jep)) {
769
WARNING((SGE_EVENT, MSG_JOB_NOSUSPENDTASK_SUU, user, sge_u32c(jobid), sge_u32c(jataskid)));
771
WARNING((SGE_EVENT, MSG_JOB_NOSUSPENDJOB_SU, user, sge_u32c(jobid)));
773
answer_list_add(answer, SGE_EVENT, STATUS_ESEMANTIC, ANSWER_QUALITY_WARNING);
780
/* set jobs state to suspend in all cases */
782
if (job_is_array(jep)) {
783
INFO((SGE_EVENT, MSG_JOB_FORCESUSPENDTASK_SUU, user, sge_u32c(jobid), sge_u32c(jataskid)));
785
INFO((SGE_EVENT, MSG_JOB_FORCESUSPENDJOB_SU, user, sge_u32c(jobid)));
787
answer_list_add(answer, SGE_EVENT, STATUS_OK, ANSWER_QUALITY_INFO);
790
state = lGetUlong(jatep, JAT_state);
791
CLEARBIT(JRUNNING, state);
792
SETBIT(JSUSPENDED, state);
793
lSetUlong(jatep, JAT_state, state);
794
if (migrate_on_suspend)
795
lSetUlong(jatep, JAT_stop_initiate_time, now);
797
answer, 0, sgeE_JATASK_MOD,
798
jobid, jataskid, NULL, NULL, NULL,
799
jep, jatep, NULL, true, true);
803
if (job_is_array(jep)) {
804
INFO((SGE_EVENT, MSG_JOB_SUSPENDTASK_SUU, user, sge_u32c(jobid), sge_u32c(jataskid)));
806
INFO((SGE_EVENT, MSG_JOB_SUSPENDJOB_SU, user, sge_u32c(jobid)));
808
answer_list_add(answer, SGE_EVENT, STATUS_OK, ANSWER_QUALITY_INFO);
810
state = lGetUlong(jatep, JAT_state);
811
CLEARBIT(JRUNNING, state);
812
SETBIT(JSUSPENDED, state);
813
lSetUlong(jatep, JAT_state, state);
814
if (migrate_on_suspend)
815
lSetUlong(jatep, JAT_stop_initiate_time, now);
817
answer, 0, sgeE_JATASK_MOD,
818
jobid, jataskid, NULL, NULL, NULL,
819
jep, jatep, NULL, true, true);
822
reporting_create_job_log(NULL, now, JL_SUSPENDED, user, host, NULL, jep, jatep, NULL, NULL);
828
**** qmod_job_unsuspend (static)
830
static void qmod_job_unsuspend(
831
sge_gdi_ctx_class_t *ctx,
839
monitoring_t *monitor
843
u_long32 jobid, jataskid;
846
DENTER(TOP_LAYER, "qmod_job_unsuspend");
850
jobid = lGetUlong(jep, JB_job_number);
851
jataskid = lGetUlong(jatep, JAT_task_number);
853
/* admin suspend may not override suspend from threshold */
854
if (VALID(JSUSPENDED_ON_THRESHOLD, lGetUlong(jatep, JAT_state))) {
855
if (VALID(JSUSPENDED, lGetUlong(jatep, JAT_state))) {
856
if (job_is_array(jep)) {
857
INFO((SGE_EVENT, MSG_JOB_RMADMSUSPENDTASK_SSUU, user, host, sge_u32c(jobid), sge_u32c(jataskid)));
859
INFO((SGE_EVENT, MSG_JOB_RMADMSUSPENDJOB_SSU, user, host, sge_u32c(jobid)));
861
answer_list_add(answer, SGE_EVENT, STATUS_OK, ANSWER_QUALITY_ERROR);
863
state = lGetUlong(jatep, JAT_state);
864
CLEARBIT(JSUSPENDED, state);
865
lSetUlong(jatep, JAT_state, state);
867
answer, 0, sgeE_JATASK_MOD,
868
jobid, jataskid, NULL, NULL, NULL,
869
jep, jatep, NULL, true, true);
870
reporting_create_job_log(NULL, now, JL_UNSUSPENDED, user, host, NULL, jep, jatep, NULL, NULL);
875
/* guess admin tries to remove threshold suspension by qmon -us <jobid> */
876
if (job_is_array(jep)) {
877
WARNING((SGE_EVENT, MSG_JOB_NOADMSUSPENDTASK_SUU, user, sge_u32c(jobid), sge_u32c(jataskid)));
879
WARNING((SGE_EVENT, MSG_JOB_NOADMSUSPENDJOB_SU, user, sge_u32c(jobid)));
881
answer_list_add(answer, SGE_EVENT, STATUS_ESEMANTIC, ANSWER_QUALITY_WARNING);
887
if (VALID(JRUNNING, lGetUlong(jatep, JAT_state))) {
888
/* this job is already running */
889
if (force && queueep) {
891
** here force means to send the cont signal again
892
** this can only be done if we know the queue this job
895
if (sge_signal_queue(ctx, SGE_SIGCONT, queueep, jep, jatep, monitor)) {
896
if (job_is_array(jep)) {
897
WARNING((SGE_EVENT, MSG_JOB_NOFORCEENABLETASK_SUU, user, sge_u32c(jobid), sge_u32c(jataskid)));
899
WARNING((SGE_EVENT, MSG_JOB_NOFORCEENABLEJOB_SU, user, sge_u32c(jobid)));
901
answer_list_add(answer, SGE_EVENT, STATUS_ESEMANTIC, ANSWER_QUALITY_WARNING);
904
if (job_is_array(jep)) {
905
WARNING((SGE_EVENT, MSG_JOB_FORCEENABLETASK_SUU, user, sge_u32c(jobid), sge_u32c(jataskid)));
907
WARNING((SGE_EVENT, MSG_JOB_FORCEENABLEJOB_SU, user, sge_u32c(jobid)));
909
answer_list_add(answer, SGE_EVENT, STATUS_ESEMANTIC, ANSWER_QUALITY_WARNING);
913
if (job_is_array(jep)) {
914
WARNING((SGE_EVENT, MSG_JOB_ALREADYUNSUSPENDED_SUU, user, sge_u32c(jobid), sge_u32c(jataskid)));
916
WARNING((SGE_EVENT, MSG_JOB_ALREADYUNSUSPENDED_SU, user, sge_u32c(jobid)));
918
answer_list_add(answer, SGE_EVENT, STATUS_ESEMANTIC, ANSWER_QUALITY_WARNING);
921
** job is already running, so no job information has to be changed
924
else { /* job wasn't suspended till now */
926
if ((i = sge_signal_queue(ctx, SGE_SIGCONT, queueep, jep, jatep, monitor))) {
927
if (job_is_array(jep)) {
928
WARNING((SGE_EVENT, MSG_JOB_NOUNSUSPENDTASK_SUU, user, sge_u32c(jobid), sge_u32c(jataskid)));
930
WARNING((SGE_EVENT, MSG_JOB_NOUNSUSPENDJOB_SU, user, sge_u32c(jobid)));
932
answer_list_add(answer, SGE_EVENT, STATUS_ESEMANTIC, ANSWER_QUALITY_WARNING);
939
/* set jobs state to suspend in all cases */
941
if (job_is_array(jep)) {
942
INFO((SGE_EVENT, MSG_JOB_FORCEUNSUSPTASK_SSUU, user, host, sge_u32c(jobid), sge_u32c(jataskid)));
944
INFO((SGE_EVENT, MSG_JOB_FORCEUNSUSPJOB_SSU, user, host, sge_u32c(jobid)));
946
answer_list_add(answer, SGE_EVENT, STATUS_OK, ANSWER_QUALITY_ERROR);
949
state = lGetUlong(jatep, JAT_state);
950
SETBIT(JRUNNING, state);
951
CLEARBIT(JSUSPENDED, state);
952
lSetUlong(jatep, JAT_state, state);
954
answer, 0, sgeE_JATASK_MOD,
955
jobid, jataskid, NULL, NULL, NULL,
956
jep, jatep, NULL, true, true);
959
/* set job state only if communication works */
961
if (job_is_array(jep)) {
962
INFO((SGE_EVENT, MSG_JOB_UNSUSPENDTASK_SUU, user, sge_u32c(jobid), sge_u32c(jataskid)));
964
INFO((SGE_EVENT, MSG_JOB_UNSUSPENDJOB_SU, user, sge_u32c(jobid)));
966
answer_list_add(answer, SGE_EVENT, STATUS_OK, ANSWER_QUALITY_ERROR);
968
state = lGetUlong(jatep, JAT_state);
969
SETBIT(JRUNNING, state);
970
CLEARBIT(JSUSPENDED, state);
971
lSetUlong(jatep, JAT_state, state);
973
answer, 0, sgeE_JATASK_MOD,
974
jobid, jataskid, NULL, NULL, NULL,
975
jep, jatep, NULL, true, true);
979
reporting_create_job_log(NULL, now, JL_UNSUSPENDED, user, host, NULL, jep, jatep, NULL, NULL);
984
void rebuild_signal_events()
986
lListElem *cqueue, *jep, *jatep;
988
DENTER(TOP_LAYER, "rebuild_signal_events");
991
for_each(jep, *(object_type_get_master_list(SGE_TYPE_JOB)))
993
for_each (jatep, lGetList(jep, JB_ja_tasks))
995
time_t when = (time_t)lGetUlong(jatep, JAT_pending_signal_delivery_time);
997
if (lGetUlong(jatep, JAT_pending_signal) && (when > 0))
999
u_long32 key1 = lGetUlong(jep, JB_job_number);
1000
u_long32 key2 = lGetUlong(jatep, JAT_task_number);
1001
te_event_t ev = NULL;
1003
ev = te_new_event(when, TYPE_SIGNAL_RESEND_EVENT, ONE_TIME_EVENT, key1, key2, NULL);
1011
for_each(cqueue, *(object_type_get_master_list(SGE_TYPE_CQUEUE)))
1013
lList *qinstance_list = lGetList(cqueue, CQ_qinstances);
1014
lListElem *qinstance;
1016
for_each(qinstance, qinstance_list)
1018
time_t when = (time_t)lGetUlong(qinstance, QU_pending_signal_delivery_time);
1020
if (lGetUlong(qinstance, QU_pending_signal) && (when > 0))
1022
const char* str_key = lGetString(qinstance, QU_full_name);
1023
te_event_t ev = NULL;
1025
ev = te_new_event(when, TYPE_SIGNAL_RESEND_EVENT, ONE_TIME_EVENT, 0, 0, str_key);
1034
} /* rebuild_signal_events() */
1036
/* this function is called by our timer mechanism for resending signals */
1037
void resend_signal_event(sge_gdi_ctx_class_t *ctx, te_event_t anEvent, monitoring_t *monitor)
1039
lListElem *qep, *jep, *jatep;
1040
u_long32 jobid = te_get_first_numeric_key(anEvent);
1041
u_long32 jataskid = te_get_second_numeric_key(anEvent);
1042
const char* queue = te_get_alphanumeric_key(anEvent);
1044
DENTER(TOP_LAYER, "resend_signal_event");
1046
MONITOR_WAIT_TIME(SGE_LOCK(LOCK_GLOBAL, LOCK_WRITE), monitor);
1048
if (queue == NULL) {
1049
if (!(jep = job_list_locate(*(object_type_get_master_list(SGE_TYPE_JOB)), jobid)) || !(jatep=job_search_task(jep, NULL, jataskid)))
1051
ERROR((SGE_EVENT, MSG_EVE_RESENTSIGNALTASK_UU, sge_u32c(jobid), sge_u32c(jataskid)));
1052
SGE_UNLOCK(LOCK_GLOBAL, LOCK_WRITE);
1057
if ((qep = cqueue_list_locate_qinstance(*(object_type_get_master_list(SGE_TYPE_CQUEUE)), lGetString(jatep, JAT_master_queue)))) {
1058
sge_signal_queue(ctx, lGetUlong(jatep, JAT_pending_signal), qep, jep, jatep, monitor);
1061
if (!(qep = cqueue_list_locate_qinstance(*(object_type_get_master_list(SGE_TYPE_CQUEUE)), queue))) {
1062
ERROR((SGE_EVENT, MSG_EVE_RESENTSIGNALQ_S, queue));
1063
SGE_UNLOCK(LOCK_GLOBAL, LOCK_WRITE);
1064
sge_free((char *)queue);
1069
sge_signal_queue(ctx, lGetUlong(qep, QU_pending_signal), qep, NULL, NULL, monitor);
1072
sge_free((char *)queue);
1074
SGE_UNLOCK(LOCK_GLOBAL, LOCK_WRITE);
1080
static void sge_propagate_queue_suspension(const char *qnm, int how)
1082
lListElem *jep, *jatep;
1084
DENTER(TOP_LAYER, "sge_propagate_queue_suspension");
1086
DPRINTF(("searching for all jobs in queue %s due to %s\n", qnm, sge_sig2str(how)));
1087
for_each (jep, *object_type_get_master_list(SGE_TYPE_JOB)) {
1088
for_each (jatep, lGetList(jep, JB_ja_tasks)) {
1089
if (lGetElemStr(lGetList(jatep, JAT_granted_destin_identifier_list), JG_qname, qnm)) {
1091
DPRINTF(("found "sge_u32"."sge_u32"\n", lGetUlong(jep, JB_job_number), lGetUlong(jatep, JAT_task_number)));
1092
jstate = lGetUlong(jatep, JAT_state);
1093
if (how == SGE_SIGSTOP)
1094
jstate |= JSUSPENDED_ON_SUBORDINATE;
1096
jstate &= ~JSUSPENDED_ON_SUBORDINATE;
1097
lSetUlong(jatep, JAT_state, jstate);
1105
/************************************************************************
1106
This is called by the qmaster to:
1107
- send a signal to all jobs in a queue (job_number == 0);
1108
- send a signal to one job
1109
************************************************************************/
1110
int sge_signal_queue(
1111
sge_gdi_ctx_class_t *ctx,
1112
int how, /* signal */
1116
monitoring_t *monitor
1119
u_long32 next_delivery_time = 60;
1124
DENTER(TOP_LAYER, "sge_signal_queue");
1126
now = sge_get_gmt();
1128
DEBUG((SGE_EVENT, "queue_signal: %d, queue: %s, job: %d, jatask: %d", how,
1129
(qep?lGetString(qep, QU_full_name):"none"),
1130
(int)(jep?lGetUlong(jep,JB_job_number):-1),
1131
(int)(jatep?lGetUlong(jatep,JAT_task_number):-1)
1134
if (!jep && (how == SGE_SIGSTOP || how == SGE_SIGCONT))
1135
sge_propagate_queue_suspension(lGetString(qep, QU_full_name), how);
1137
/* don't try to signal unheard queues */
1138
if (!qinstance_state_is_unknown(qep)) {
1139
const char *hnm, *pnm;
1141
pnm = prognames[EXECD];
1142
hnm = lGetHost(qep, QU_qhostname);
1144
if ((i = init_packbuffer(&pb, 256, 0)) == PACK_SUCCESS) {
1145
/* identifier for acknowledgement */
1148
* Due to IZ 1619: pack signal only if
1149
* job is a non-parallel job
1150
* or all slaves of the parallel job have been acknowledged
1152
if (!lGetString(jatep, JAT_master_queue) ||
1153
is_pe_master_task_send(jatep)) {
1155
packint(&pb, lGetUlong(jep, JB_job_number));
1156
packint(&pb, lGetUlong(jatep, JAT_task_number));
1164
packstr(&pb, lGetString(qep, QU_full_name));
1168
if (mconf_get_simulate_execds()) {
1170
if (jep && how == SGE_SIGKILL)
1171
trigger_job_resend(sge_get_gmt(), NULL, lGetUlong(jep, JB_job_number), lGetUlong(jatep, JAT_task_number), 1);
1173
if (pb_filled(&pb)) {
1175
i = gdi2_send_message_pb(ctx, 0, pnm, 1, hnm, jep ? TAG_SIGJOB: TAG_SIGQUEUE,
1180
MONITOR_MESSAGES_OUT(monitor);
1181
clear_packbuffer(&pb);
1183
i = CL_RETVAL_MALLOC; /* an error */
1186
if (i != CL_RETVAL_OK) {
1187
ERROR((SGE_EVENT, MSG_COM_NOUPDATEQSTATE_IS, how, lGetString(qep, QU_full_name)));
1193
next_delivery_time += now;
1195
/* If this is a operation on one job we enter the signal request in the
1196
job structure. If the operation is not acknowledged in time we can do
1199
te_event_t ev = NULL;
1201
DPRINTF(("JOB "sge_u32": %s signal %s (retry after "sge_u32" seconds) host: %s\n",
1202
lGetUlong(jep, JB_job_number), sent?"sent":"queued", sge_sig2str(how), next_delivery_time - now,
1203
lGetHost(qep, QU_qhostname)));
1204
te_delete_one_time_event(TYPE_SIGNAL_RESEND_EVENT, lGetUlong(jep, JB_job_number),
1205
lGetUlong(jatep, JAT_task_number), NULL);
1207
if (!mconf_get_simulate_execds()) {
1208
lSetUlong(jatep, JAT_pending_signal, how);
1209
ev = te_new_event((time_t)next_delivery_time, TYPE_SIGNAL_RESEND_EVENT, ONE_TIME_EVENT,
1210
lGetUlong(jep, JB_job_number), lGetUlong(jatep, JAT_task_number), NULL);
1213
lSetUlong(jatep, JAT_pending_signal_delivery_time, next_delivery_time);
1216
te_event_t ev = NULL;
1218
DPRINTF(("QUEUE %s: %s signal %s (retry after "sge_u32" seconds) host %s\n",
1219
lGetString(qep, QU_full_name), sent?"sent":"queued", sge_sig2str(how), next_delivery_time - now,
1220
lGetHost(qep, QU_qhostname)));
1221
te_delete_one_time_event(TYPE_SIGNAL_RESEND_EVENT, 0, 0, lGetString(qep, QU_full_name));
1223
if (!mconf_get_simulate_execds()) {
1224
lSetUlong(qep, QU_pending_signal, how);
1225
ev = te_new_event((time_t)next_delivery_time, TYPE_SIGNAL_RESEND_EVENT, ONE_TIME_EVENT, 0, 0,
1226
lGetString(qep, QU_full_name));
1229
lSetUlong(qep, QU_pending_signal_delivery_time, next_delivery_time);
1233
if (!jep) {/* signalling a queue ? - handle slave jobs in this queue */
1234
signal_slave_jobs_in_queue(ctx, how, qep, monitor);
1236
else {/* is this the master queue of this job to signal ? - then decide whether slave tasks also
1237
must get signalled */
1238
if (!strcmp(lGetString(lFirst(lGetList(jatep, JAT_granted_destin_identifier_list)),
1239
JG_qname), lGetString(qep, QU_full_name))) {
1240
signal_slave_tasks_of_job(ctx, how, jep, jatep, monitor);
1246
} /* sge_signal_queue() */
1248
/* in case we have to signal a queue
1249
in which slave tasks are running
1250
we have to notify the master execd
1251
where the master task of this job is running
1253
static void signal_slave_jobs_in_queue(
1254
sge_gdi_ctx_class_t *ctx,
1255
int how, /* signal */
1257
monitoring_t *monitor
1260
lListElem *mq, *jep, *jatep;
1261
const char *qname, *mqname, *pe_name;
1263
DENTER(TOP_LAYER, "signal_slave_jobs_in_queue");
1265
qname = lGetString(qep, QU_full_name);
1266
/* test whether there are parallel jobs
1267
with a slave slot in this queue
1268
if so then signal this job */
1269
for_each (jep, *(object_type_get_master_list(SGE_TYPE_JOB))) {
1270
for_each (jatep, lGetList(jep, JB_ja_tasks)) {
1272
/* skip sequential and not running jobs */
1273
if (lGetNumberOfElem( gdil_lp =
1274
lGetList(jatep, JAT_granted_destin_identifier_list))<=1)
1277
/* signalling of not "slave controlled" parallel jobs will not work
1278
since they are not known to the apropriate execd - we should
1279
omit signalling in this case to prevent waste of communication bandwith */
1280
if (!(pe_name=lGetString(jatep, JAT_granted_pe)) ||
1281
!pe_list_locate(*object_type_get_master_list(SGE_TYPE_PE), pe_name))
1284
if (lGetElemStr(gdil_lp, JG_qname, qname) != NULL) {
1286
/* search master queue - needed for signalling of a job */
1287
if ((mq = cqueue_list_locate_qinstance(*(object_type_get_master_list(SGE_TYPE_CQUEUE)), mqname = lGetString(
1288
lFirst(gdil_lp), JG_qname)))) {
1289
DPRINTF(("found slave job "sge_u32" in queue %s master queue is %s\n",
1290
lGetUlong(jep, JB_job_number), qname, mqname));
1291
sge_signal_queue(ctx, how, mq, jep, jatep, monitor);
1293
ERROR((SGE_EVENT, MSG_JOB_UNABLE2FINDMQ_SU, mqname, sge_u32c(lGetUlong(jep, JB_job_number))));
1302
static void signal_slave_tasks_of_job(sge_gdi_ctx_class_t *ctx, int how, lListElem *jep, lListElem *jatep,
1303
monitoring_t *monitor)
1306
lListElem *mq, *pe, *gdil_ep;
1307
const char *qname, *pe_name;
1309
DENTER(TOP_LAYER, "signal_slave_tasks_of_job");
1311
/* do not signal slave tasks in case of checkpointing jobs with
1312
STOP/CONT when suspending means migration */
1313
if ((how==SGE_SIGCONT || how==SGE_SIGSTOP) &&
1314
(lGetUlong(jep, JB_checkpoint_attr)|CHECKPOINT_SUSPEND)!=0) {
1315
DPRINTF(("omit signaling - checkpoint script does action for whole job\n"));
1319
/* forward signal to slave exec hosts
1320
in case of slave controlled jobs */
1321
if ( !((lGetNumberOfElem(gdil_lp=lGetList(jatep, JAT_granted_destin_identifier_list)))<=1 ||
1322
!(pe_name=lGetString(jatep, JAT_granted_pe)) ||
1323
!(pe=pe_list_locate(*object_type_get_master_list(SGE_TYPE_PE), pe_name)) ||
1324
!lGetBool(pe, PE_control_slaves)))
1325
for (gdil_ep=lNext(lFirst(gdil_lp)); gdil_ep; gdil_ep=lNext(gdil_ep))
1326
if ((mq = cqueue_list_locate_qinstance(*(object_type_get_master_list(SGE_TYPE_CQUEUE)), qname = lGetString(gdil_ep, JG_qname)))) {
1327
DPRINTF(("found slave job "sge_u32" in queue %s\n",
1328
lGetUlong(jep, JB_job_number), qname));
1329
sge_signal_queue(ctx, how, mq, jep, jatep, monitor);