~ubuntu-branches/ubuntu/utopic/gridengine/utopic

« back to all changes in this revision

Viewing changes to source/daemons/qmaster/sge_qmod_qmaster.c

  • Committer: Bazaar Package Importer
  • Author(s): Mark Hymers
  • Date: 2008-06-25 22:36:13 UTC
  • Revision ID: james.westby@ubuntu.com-20080625223613-tvd9xlhuoct9kyhm
Tags: upstream-6.2~beta2
ImportĀ upstreamĀ versionĀ 6.2~beta2

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*___INFO__MARK_BEGIN__*/
 
2
/*************************************************************************
 
3
 * 
 
4
 *  The Contents of this file are made available subject to the terms of
 
5
 *  the Sun Industry Standards Source License Version 1.2
 
6
 * 
 
7
 *  Sun Microsystems Inc., March, 2001
 
8
 * 
 
9
 * 
 
10
 *  Sun Industry Standards Source License Version 1.2
 
11
 *  =================================================
 
12
 *  The contents of this file are subject to the Sun Industry Standards
 
13
 *  Source License Version 1.2 (the "License"); You may not use this file
 
14
 *  except in compliance with the License. You may obtain a copy of the
 
15
 *  License at http://gridengine.sunsource.net/Gridengine_SISSL_license.html
 
16
 * 
 
17
 *  Software provided under this License is provided on an "AS IS" basis,
 
18
 *  WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING,
 
19
 *  WITHOUT LIMITATION, WARRANTIES THAT THE SOFTWARE IS FREE OF DEFECTS,
 
20
 *  MERCHANTABLE, FIT FOR A PARTICULAR PURPOSE, OR NON-INFRINGING.
 
21
 *  See the License for the specific provisions governing your rights and
 
22
 *  obligations concerning the Software.
 
23
 * 
 
24
 *   The Initial Developer of the Original Code is: Sun Microsystems, Inc.
 
25
 * 
 
26
 *   Copyright: 2001 by Sun Microsystems, Inc.
 
27
 * 
 
28
 *   All Rights Reserved.
 
29
 * 
 
30
 ************************************************************************/
 
31
/*___INFO__MARK_END__*/
 
32
#include <stdlib.h>
 
33
#include <string.h>
 
34
#include <limits.h>
 
35
#include <fnmatch.h>
 
36
 
 
37
#include "sge.h"
 
38
#include "symbols.h"
 
39
#include "sge_ja_task.h"
 
40
#include "sge_str.h"
 
41
#include "sge_idL.h"
 
42
#include "sge_pe.h"
 
43
#include "sge_signal.h"
 
44
#include "sge_prog.h"
 
45
#include "sge_queue_event_master.h"
 
46
#include "sge_qmod_qmaster.h"
 
47
#include "sge_job_qmaster.h"
 
48
#include "sge_give_jobs.h"
 
49
#include "sge_host.h"
 
50
#include "sge_parse_num_par.h"
 
51
#include "sge_pe_qmaster.h"
 
52
#include "sge_string.h"
 
53
#include "commlib.h"
 
54
#include "sgermon.h"
 
55
#include "sge_log.h"
 
56
#include "sge_time.h"
 
57
#include "reschedule.h"
 
58
#include "sge_security.h"
 
59
#include "sge_job.h"
 
60
#include "sge_answer.h"
 
61
#include "sge_conf.h"
 
62
#include "sge_hostname.h"
 
63
#include "sge_manop.h"
 
64
#include "sge_qinstance.h"
 
65
#include "sge_qinstance_state.h"
 
66
#include "sge_qinstance_qmaster.h"
 
67
#include "sge_cqueue_qmaster.h"
 
68
#include "sge_range.h"
 
69
#include "sge_centry.h"
 
70
#include "sge_calendar.h"
 
71
#include "sge_cqueue.h"
 
72
#include "sge_qref.h"
 
73
#include "sge_lock.h"
 
74
 
 
75
#include "sge_persistence_qmaster.h"
 
76
#include "sge_reporting_qmaster.h"
 
77
#include "spool/sge_spooling.h"
 
78
 
 
79
#include "msg_common.h"
 
80
#include "msg_qmaster.h"
 
81
 
 
82
 
 
83
/*-------------------------------------------------------------------------*/
 
84
static void signal_slave_jobs_in_queue(sge_gdi_ctx_class_t *ctx, int how, lListElem *jep, monitoring_t *monitor);
 
85
 
 
86
static void signal_slave_tasks_of_job(sge_gdi_ctx_class_t *ctx, int how, lListElem *jep, lListElem *jatep, 
 
87
                                      monitoring_t *monitor);
 
88
 
 
89
static int sge_change_queue_state(sge_gdi_ctx_class_t *ctx,
 
90
                                  char *user, char *host, lListElem *qep, 
 
91
                                  u_long32 action, u_long32 force, lList **answer, 
 
92
                                  monitoring_t *monitor);
 
93
 
 
94
static int sge_change_job_state(sge_gdi_ctx_class_t *ctx,
 
95
                                char *user, char *host, lListElem *jep, lListElem *jatep, 
 
96
                                u_long32 task_id, u_long32 action, u_long32 force, 
 
97
                                lList **answer, monitoring_t *monitor);
 
98
 
 
99
static int qmod_queue_weakclean(sge_gdi_ctx_class_t *ctx, 
 
100
                                lListElem *qep, u_long32 force, lList **answer, 
 
101
                                char *user, char *host, int isoperator, int isowner,
 
102
                                monitoring_t *monitor);  
 
103
 
 
104
static int qmod_queue_clean(sge_gdi_ctx_class_t *ctx,
 
105
                            lListElem *qep, u_long32 force, lList **answer, 
 
106
                            char *user, char *host, int isoperator, int isowner,
 
107
                            monitoring_t *monitor);
 
108
 
 
109
static void qmod_job_suspend(sge_gdi_ctx_class_t *ctx,
 
110
                             lListElem *jep, lListElem *jatep, lListElem *queueep, 
 
111
                             u_long32 force, lList **answer, char *user, char *host,
 
112
                             monitoring_t *monitor);
 
113
 
 
114
static void qmod_job_unsuspend(sge_gdi_ctx_class_t *ctx,
 
115
                               lListElem *jep, lListElem *jatep, lListElem *queueep, 
 
116
                               u_long32 force, lList **answer, char *user, char *host,
 
117
                               monitoring_t *monitor);
 
118
 
 
119
static void qmod_job_reschedule(sge_gdi_ctx_class_t *ctx, 
 
120
                                lListElem *jep, lListElem *jatep, lListElem *queueep, 
 
121
                                u_long32 force, lList **answer, char *user, char *host,
 
122
                                monitoring_t *monitor);
 
123
 
 
124
/*-------------------------------------------------------------------------*/
 
125
 
 
126
void 
 
127
sge_gdi_qmod(sge_gdi_ctx_class_t *ctx, sge_gdi_packet_class_t *packet, sge_gdi_task_class_t *task,
 
128
             monitoring_t *monitor) 
 
129
{
 
130
   lList *alp = NULL;
 
131
   lListElem *dep;
 
132
   lListElem *jatask = NULL, *rn, *job, *tmp_task;
 
133
   bool found;
 
134
   u_long32 jobid;
 
135
   u_long32 start = 0, end = 0, step = 0;
 
136
   int alltasks;
 
137
   lList *master_hgroup_list = *(object_type_get_master_list(SGE_TYPE_HGROUP));
 
138
   lList *cqueue_list = *(object_type_get_master_list(SGE_TYPE_CQUEUE));
 
139
   dstring cqueue_buffer = DSTRING_INIT;
 
140
   dstring hostname_buffer = DSTRING_INIT;
 
141
   
 
142
   DENTER(TOP_LAYER, "sge_gdi_qmod");
 
143
 
 
144
 
 
145
   if (!packet->host || (strlen(packet->user) == 0) || !packet->commproc) {
 
146
      CRITICAL((SGE_EVENT, MSG_SGETEXT_NULLPTRPASSED_S, SGE_FUNC));
 
147
      answer_list_add(&(task->answer_list), SGE_EVENT, 
 
148
                      STATUS_EUNKNOWN, ANSWER_QUALITY_ERROR);
 
149
      sge_dstring_free(&cqueue_buffer);
 
150
      sge_dstring_free(&hostname_buffer);
 
151
      DEXIT;
 
152
      return;
 
153
   }
 
154
 
 
155
   /*
 
156
   ** loop over the ids and change queue or job state and signal them
 
157
   ** if necessary
 
158
   */
 
159
   for_each(dep, task->data_list) {
 
160
      lList *tmp_list = NULL;
 
161
      lList *qref_list = NULL;
 
162
      bool found_something = true;
 
163
      u_long32 id_action = lGetUlong(dep, ID_action);
 
164
 
 
165
      found = false;
 
166
      
 
167
      if ((id_action & JOB_DO_ACTION) == 0) {
 
168
         qref_list_add(&qref_list, NULL, lGetString(dep, ID_str));
 
169
         qref_list_resolve_hostname(qref_list);
 
170
         qref_list_resolve(qref_list, NULL, &tmp_list, 
 
171
                           &found_something, cqueue_list,
 
172
                           master_hgroup_list,
 
173
                           true, true);
 
174
         if (found_something) { 
 
175
            lListElem *qref = NULL;
 
176
 
 
177
            id_action = (id_action & (~QUEUE_DO_ACTION));
 
178
 
 
179
            for_each(qref, tmp_list) {
 
180
               const char *full_name = NULL;
 
181
               const char *cqueue_name = NULL;
 
182
               const char *hostname = NULL;
 
183
               bool has_hostname = false;
 
184
               bool has_domain = false;
 
185
               lListElem *cqueue = NULL;
 
186
               lListElem *qinstance = NULL;
 
187
               lList *qinstance_list = NULL;
 
188
 
 
189
               full_name = lGetString(qref, QR_name);
 
190
               cqueue_name_split(full_name, &cqueue_buffer, &hostname_buffer,
 
191
                                 &has_hostname, &has_domain);
 
192
               cqueue_name = sge_dstring_get_string(&cqueue_buffer);
 
193
               hostname = sge_dstring_get_string(&hostname_buffer);
 
194
               cqueue = lGetElemStr(cqueue_list, CQ_name, cqueue_name);
 
195
               qinstance_list = lGetList(cqueue, CQ_qinstances);
 
196
               qinstance = lGetElemHost(qinstance_list, QU_qhostname, hostname);
 
197
 
 
198
               sge_change_queue_state(ctx, packet->user, packet->host, qinstance,
 
199
                     id_action, lGetUlong(dep, ID_force),
 
200
                     &alp, monitor);
 
201
               found = true;
 
202
            }
 
203
      }
 
204
      lFreeList(&qref_list);
 
205
      lFreeList(&tmp_list);
 
206
      }
 
207
      if (!found) {
 
208
         bool is_jobName_suport = false; 
 
209
         u_long action = lGetUlong(dep, ID_action);
 
210
         if ((action & JOB_DO_ACTION) > 0 && 
 
211
             (action & QUEUE_DO_ACTION) == 0) {
 
212
            action = (action & (~JOB_DO_ACTION));
 
213
            is_jobName_suport = true;
 
214
         }
 
215
         
 
216
         /* 
 
217
         ** We found no queue so look for a job. This only makes sense for
 
218
         ** suspend, unsuspend and reschedule
 
219
         */
 
220
         if (sge_strisint(lGetString(dep, ID_str)) && 
 
221
               (action == QI_DO_SUSPEND || 
 
222
                action == QI_DO_RESCHEDULE ||
 
223
                action == QI_DO_CLEARERROR || 
 
224
                action == QI_DO_UNSUSPEND)) {
 
225
            jobid = strtol(lGetString(dep, ID_str), NULL, 10);
 
226
 
 
227
            rn = lFirst(lGetList(dep, ID_ja_structure));
 
228
            if (rn) {
 
229
               start = lGetUlong(rn, RN_min);
 
230
               if (start) {
 
231
                  end = lGetUlong(rn, RN_max);
 
232
                  step = lGetUlong(rn, RN_step);
 
233
                  if (!step)
 
234
                     step = 1;
 
235
                  alltasks = 0;
 
236
               } else {
 
237
                  start = 1;
 
238
                  end = (u_long32)LONG_MAX;
 
239
                  step = 1;
 
240
                  alltasks = 1;
 
241
               }
 
242
               if (start > end)
 
243
                  end = start;
 
244
 
 
245
            } else {
 
246
               alltasks = 1;
 
247
            }
 
248
 
 
249
            job = job_list_locate(*(object_type_get_master_list(SGE_TYPE_JOB)), jobid);
 
250
            if (job) {
 
251
               jatask = lFirst(lGetList(job, JB_ja_tasks));
 
252
 
 
253
               while ((tmp_task = jatask)) {
 
254
                  u_long32 task_number;
 
255
 
 
256
                  jatask = lNext(tmp_task);
 
257
                  task_number = lGetUlong(tmp_task, JAT_task_number);
 
258
                  if ((task_number >= start && task_number <= end &&
 
259
                     ((task_number-start)%step) == 0) || alltasks) {
 
260
                     DPRINTF(("Modify job: "sge_u32"."sge_u32"\n", jobid,
 
261
                        task_number));
 
262
 
 
263
                     /* this specifies no queue, so lets probe for a job */
 
264
                     /* change state of job: */
 
265
                     sge_change_job_state(ctx, packet->user, packet->host, job, tmp_task, 0,
 
266
                         action, lGetUlong(dep, ID_force), &alp, monitor);   
 
267
                     found = true;
 
268
                  }
 
269
               }
 
270
 
 
271
               /* create more precise GDI answers also for pending jobs/tasks and jobs/tasks in hold state 
 
272
                  When the operation is to be applied on the whole job array but no task is enrolled so far 
 
273
                  (i.e. not found) only one single GDI answer is created. Otherwise one message is created 
 
274
                  per task */
 
275
               if (alltasks && job_is_array(job)) {
 
276
                  if (!found) {
 
277
                     sge_change_job_state(ctx, packet->user, packet->host, job, NULL, 0,
 
278
                         action, lGetUlong(dep, ID_force), &alp, monitor);   
 
279
                     found = true;
 
280
                  }
 
281
               } else {
 
282
                  lListElem *range;
 
283
                  u_long32 min, max, step;
 
284
                  u_long32 taskid;
 
285
 
 
286
                  /* handle all pending tasks */
 
287
                  for_each (range, lGetList(job, JB_ja_n_h_ids)) {
 
288
                     range_get_all_ids(range, &min, &max, &step);
 
289
                     for (taskid=min; taskid<=max; taskid+= step) {
 
290
                        if ((taskid >= start && taskid <= end &&
 
291
                           ((taskid-start)%step) == 0) || alltasks) {
 
292
                           DPRINTF(("Modify job: "sge_u32"."sge_u32"\n", jobid,
 
293
                              taskid));
 
294
                           sge_change_job_state(ctx, packet->user, packet->host, job, NULL, taskid,
 
295
                               action, lGetUlong(dep, ID_force), &alp, monitor);   
 
296
                           found = true;
 
297
                        }
 
298
                     }
 
299
                  }
 
300
 
 
301
                  /* handle all tasks in user hold */
 
302
                  for_each (range, lGetList(job, JB_ja_u_h_ids)) {
 
303
                     range_get_all_ids(range, &min, &max, &step);
 
304
                     for (taskid=min; taskid<=max; taskid+= step) {
 
305
                        if ((taskid >= start && taskid <= end &&
 
306
                           ((taskid-start)%step) == 0) || alltasks) {
 
307
                           DPRINTF(("Modify job: "sge_u32"."sge_u32"\n", jobid,
 
308
                                    taskid));
 
309
                           sge_change_job_state(ctx, packet->user, packet->host, job, NULL, taskid,
 
310
                                                action, lGetUlong(dep, ID_force), &alp, monitor);   
 
311
                           found = true;
 
312
                        }
 
313
                     }
 
314
                  }
 
315
 
 
316
                  /* handle all tasks in system hold that are not in user hold */
 
317
                  for_each (range, lGetList(job, JB_ja_s_h_ids)) {
 
318
                     range_get_all_ids(range, &min, &max, &step);
 
319
                     for (taskid=min; taskid<=max; taskid+= step) {
 
320
                        if (range_list_is_id_within(lGetList(job, JB_ja_u_h_ids), taskid)) {
 
321
                           continue;
 
322
                        }
 
323
                        if ((taskid >= start && taskid <= end &&
 
324
                           ((taskid-start)%step) == 0) || alltasks) {
 
325
                           DPRINTF(("Modify job: "sge_u32"."sge_u32"\n", jobid,
 
326
                                    taskid));
 
327
                           sge_change_job_state(ctx, packet->user, packet->host, job, NULL, taskid,
 
328
                                                action, lGetUlong(dep, ID_force), &alp, monitor);   
 
329
                           found = true;
 
330
                        }
 
331
                     }
 
332
                  }
 
333
 
 
334
                  /* handle all tasks in operator hold that are not in user hold or system hold */
 
335
                  for_each (range, lGetList(job, JB_ja_o_h_ids)) {
 
336
                     range_get_all_ids(range, &min, &max, &step);
 
337
                     for (taskid=min; taskid<=max; taskid+= step) {
 
338
                        if (range_list_is_id_within(lGetList(job, JB_ja_u_h_ids), taskid) ||
 
339
                            range_list_is_id_within(lGetList(job, JB_ja_s_h_ids), taskid)) {
 
340
                           continue;
 
341
                        }
 
342
                        if ((taskid >= start && taskid <= end &&
 
343
                           ((taskid-start)%step) == 0) || alltasks) {
 
344
                           DPRINTF(("Modify job: "sge_u32"."sge_u32"\n", jobid,
 
345
                                    taskid));
 
346
                           sge_change_job_state(ctx, packet->user, packet->host, job, NULL, taskid,
 
347
                                                action, lGetUlong(dep, ID_force), &alp, monitor);   
 
348
                           found = true;
 
349
                        }
 
350
                     }
 
351
                  }
 
352
               }
 
353
            }
 
354
         }
 
355
         /* job name or pattern was submitted */
 
356
         else if (is_jobName_suport && (
 
357
                  action == QI_DO_SUSPEND || 
 
358
                  action == QI_DO_RESCHEDULE ||
 
359
                  action == QI_DO_CLEARERROR || 
 
360
                  action == QI_DO_UNSUSPEND)) {
 
361
                
 
362
            const char *job_name = lGetString(dep, ID_str);
 
363
            const lListElem *job;
 
364
            lListElem *mod = NULL;
 
365
            for_each(job, *(object_type_get_master_list(SGE_TYPE_JOB))) {
 
366
               if (!fnmatch(job_name, lGetString(job, JB_job_name), 0)) {
 
367
                  char job_id[40];
 
368
                  mod = lCopyElem(dep);
 
369
                  sprintf(job_id, sge_u32, lGetUlong(job, JB_job_number));
 
370
                  lSetString(mod, ID_str, job_id);
 
371
                  lAppendElem(task->data_list, mod);
 
372
                  found = true;
 
373
               }
 
374
            }
 
375
         }
 
376
         else {
 
377
            /* job id invalid or action invalid for jobs */
 
378
 
 
379
         }
 
380
      }
 
381
 
 
382
      if (!found) {
 
383
         u_long action = lGetUlong(dep, ID_action);
 
384
/*
 
385
         if ((action & JOB_DO_ACTION)) {
 
386
            action = action - JOB_DO_ACTION;
 
387
         }
 
388
*/         
 
389
         /*
 
390
         ** If the action is QI_DO_UNSUSPEND or QI_DO_SUSPEND, 
 
391
         ** 'invalid queue or job' will be printed,
 
392
         ** otherwise 'invalid queue' will be printed, because these actions
 
393
         ** are not suitable for jobs.
 
394
         */
 
395
         if ((action & QUEUE_DO_ACTION) == 0 && (
 
396
             (action & JOB_DO_ACTION) != 0 ||
 
397
             (action & QI_DO_SUSPEND) != 0 || 
 
398
             (action & QI_DO_UNSUSPEND) != 0|| 
 
399
             (action & QI_DO_CLEAN) != 0))
 
400
            ERROR((SGE_EVENT, MSG_QUEUE_INVALIDQORJOB_S, lGetString(dep, ID_str)));
 
401
         else
 
402
            ERROR((SGE_EVENT, MSG_QUEUE_INVALIDQ_S, lGetString(dep, ID_str)));  
 
403
         answer_list_add(&alp, SGE_EVENT, STATUS_EEXIST, ANSWER_QUALITY_ERROR);
 
404
      }
 
405
   }
 
406
 
 
407
   sge_dstring_free(&cqueue_buffer);
 
408
   sge_dstring_free(&hostname_buffer);
 
409
 
 
410
   task->answer_list = alp;
 
411
   
 
412
   DEXIT;
 
413
}
 
414
 
 
415
static int 
 
416
sge_change_queue_state(sge_gdi_ctx_class_t *ctx,
 
417
                       char *user, char *host, lListElem *qep, u_long32 action, 
 
418
                       u_long32 force, lList **answer, monitoring_t *monitor) 
 
419
{
 
420
   bool isoperator;
 
421
   bool isowner;
 
422
   int result = 0;
 
423
   const char *ehname = lGetHost(qep, QU_qhostname);
 
424
   
 
425
   DENTER(TOP_LAYER, "sge_change_queue_state");
 
426
 
 
427
   isowner = qinstance_check_owner(qep, user);
 
428
   isoperator = manop_is_operator(user);
 
429
 
 
430
   if (!isowner) {
 
431
      ERROR((SGE_EVENT, MSG_QUEUE_NOCHANGEQPERMS_SS, user, lGetString(qep, QU_full_name)));
 
432
      answer_list_add(answer, SGE_EVENT, STATUS_ESEMANTIC, ANSWER_QUALITY_ERROR);
 
433
      DEXIT;
 
434
      return -1;
 
435
   }
 
436
 
 
437
   switch (action) {
 
438
      case QI_DO_CLEARERROR:
 
439
      case QI_DO_ENABLE:
 
440
      case QI_DO_DISABLE:
 
441
      case QI_DO_SUSPEND:
 
442
      case QI_DO_UNSUSPEND:
 
443
#ifdef __SGE_QINSTANCE_STATE_DEBUG__
 
444
      case QI_DO_SETERROR:
 
445
      case QI_DO_SETORPHANED:
 
446
      case QI_DO_CLEARORPHANED:
 
447
      case QI_DO_SETUNKNOWN:
 
448
      case QI_DO_CLEARUNKNOWN:
 
449
      case QI_DO_SETAMBIGUOUS:
 
450
      case QI_DO_CLEARAMBIGUOUS:
 
451
#endif
 
452
         result = qinstance_change_state_on_command(ctx, qep, answer, action, force ? true : false, user, host, isoperator, isowner, monitor) ? 0 : -1;
 
453
         break;
 
454
      case QI_DO_CLEAN:
 
455
         result = qmod_queue_clean(ctx, qep, force, answer, user, host, isoperator, isowner, monitor);
 
456
         break;
 
457
 
 
458
      case QI_DO_RESCHEDULE:
 
459
         result = qmod_queue_weakclean(ctx, qep, force, answer, user, host, isoperator, isowner, monitor);
 
460
         break;
 
461
      default:
 
462
         INFO((SGE_EVENT, MSG_LOG_QUNKNOWNQMODCMD_U, sge_u32c(action)));
 
463
         answer_list_add(answer, SGE_EVENT, STATUS_OK, ANSWER_QUALITY_ERROR);
 
464
         break;
 
465
   }
 
466
 
 
467
   sge_event_spool(ctx,
 
468
                   answer, 0, sgeE_QINSTANCE_MOD,
 
469
                   0, 0, lGetString(qep, QU_qname),
 
470
                   ehname, NULL,
 
471
                   qep, NULL, NULL, true, true);
 
472
 
 
473
   switch (action) {
 
474
      case QI_DO_CLEAN:
 
475
      case QI_DO_RESCHEDULE:
 
476
         cqueue_list_del_all_orphaned(ctx, *(object_type_get_master_list(SGE_TYPE_CQUEUE)), answer, 
 
477
               lGetString(qep, QU_qname), ehname);
 
478
         break;
 
479
      default:
 
480
         break;
 
481
   }
 
482
 
 
483
   DEXIT;
 
484
   return result;
 
485
}
 
486
 
 
487
static int sge_change_job_state(
 
488
sge_gdi_ctx_class_t *ctx,
 
489
char *user,
 
490
char *host,
 
491
lListElem *jep,
 
492
lListElem *jatep,
 
493
u_long32 task_id,
 
494
u_long32 action,
 
495
u_long32 force,
 
496
lList **answer,
 
497
monitoring_t *monitor
 
498
) {
 
499
   lListElem *queueep;
 
500
   u_long32 job_id;
 
501
 
 
502
   DENTER(TOP_LAYER, "sge_change_job_state");
 
503
  
 
504
   job_id = lGetUlong(jep, JB_job_number);
 
505
 
 
506
   if (strcmp(user, lGetString(jep, JB_owner)) && !manop_is_operator(user)) {
 
507
      ERROR((SGE_EVENT, MSG_JOB_NOMODJOBPERMS_SU, user, sge_u32c(job_id)));
 
508
      answer_list_add(answer, SGE_EVENT, STATUS_ENOTOWNER, ANSWER_QUALITY_ERROR);
 
509
      DEXIT;
 
510
      return -1;
 
511
   }
 
512
 
 
513
   if (!jatep) {
 
514
      /* unenrolled tasks always are not-running pending/hold */
 
515
      if (task_id) 
 
516
         WARNING((SGE_EVENT, MSG_QMODJOB_NOTENROLLED_UU, sge_u32c(job_id), sge_u32c(task_id)));
 
517
      else 
 
518
         WARNING((SGE_EVENT, MSG_QMODJOB_NOTENROLLED_U, sge_u32c(job_id)));
 
519
      answer_list_add(answer, SGE_EVENT, STATUS_ESEMANTIC, ANSWER_QUALITY_WARNING);
 
520
      DEXIT;
 
521
      return -1;
 
522
   }
 
523
 
 
524
   task_id = lGetUlong(jatep, JAT_task_number);
 
525
 
 
526
   if (lGetString(jatep, JAT_master_queue)) {
 
527
      queueep = cqueue_list_locate_qinstance(
 
528
                              *(object_type_get_master_list(SGE_TYPE_CQUEUE)), 
 
529
                              lGetString(jatep, JAT_master_queue));
 
530
   } else {
 
531
      queueep = NULL;
 
532
   }
 
533
 
 
534
   switch (action) {
 
535
      case QI_DO_RESCHEDULE:
 
536
         qmod_job_reschedule(ctx, jep, jatep, queueep, force, answer, user, host, monitor);
 
537
         break;
 
538
 
 
539
      case JSUSPENDED:
 
540
         qmod_job_suspend(ctx, jep, jatep, queueep, force, answer, user, host, monitor);
 
541
         break;
 
542
 
 
543
      case JRUNNING:
 
544
         qmod_job_unsuspend(ctx, jep, jatep, queueep, force, answer, user, host, monitor);
 
545
         break;
 
546
         
 
547
      case QI_DO_CLEARERROR:
 
548
         if (VALID(JERROR, lGetUlong(jatep, JAT_state))) {
 
549
            lSetUlong(jatep, JAT_state, lGetUlong(jatep, JAT_state) & ~JERROR);
 
550
            ja_task_message_trash_all_of_type_X(jatep, 1); 
 
551
/* lWriteElemTo(jatep, stderr); */
 
552
            sge_event_spool(ctx,
 
553
                            answer, 0, sgeE_JATASK_MOD,
 
554
                            job_id, task_id, NULL, NULL, NULL,
 
555
                            jep, jatep, NULL, true, true);
 
556
            if (job_is_array(jep)) {
 
557
               INFO((SGE_EVENT, MSG_JOB_CLEARERRORTASK_SSUU, user, host, sge_u32c(job_id), sge_u32c(task_id)));
 
558
            } else {
 
559
               INFO((SGE_EVENT, MSG_JOB_CLEARERRORJOB_SSU, user, host, sge_u32c(job_id)));
 
560
            }
 
561
         } else {
 
562
            if (job_is_array(jep)) {
 
563
               INFO((SGE_EVENT, MSG_JOB_NOERRORSTATETASK_UU, sge_u32c(job_id), sge_u32c(task_id)));
 
564
            } else {
 
565
               INFO((SGE_EVENT, MSG_JOB_NOERRORSTATEJOB_UU, sge_u32c(job_id)));
 
566
            }
 
567
         }
 
568
         answer_list_add(answer, SGE_EVENT, STATUS_OK, ANSWER_QUALITY_ERROR);
 
569
         break;
 
570
         
 
571
      default:
 
572
         INFO((SGE_EVENT, MSG_LOG_JOBUNKNOWNQMODCMD_U, sge_u32c(action)));
 
573
         answer_list_add(answer, SGE_EVENT, STATUS_OK, ANSWER_QUALITY_ERROR);
 
574
         break;
 
575
   }
 
576
 
 
577
   DEXIT;
 
578
   return 0;
 
579
}
 
580
 
 
581
/****
 
582
 **** qmod_queue_weakclean (static)
 
583
 ****/
 
584
static int qmod_queue_weakclean(
 
585
sge_gdi_ctx_class_t *ctx,
 
586
lListElem *qep,
 
587
u_long32 force,
 
588
lList **answer,
 
589
char *user,
 
590
char *host,
 
591
int isoperator,
 
592
int isowner, 
 
593
monitoring_t *monitor
 
594
) {
 
595
   DENTER(TOP_LAYER, "qmod_queue_weakclean");
 
596
 
 
597
   if (!isoperator && !isowner) {
 
598
      ERROR((SGE_EVENT, MSG_QUEUE_NORESCHEDULEQPERMS_SS, user, 
 
599
         lGetString(qep, QU_full_name)));
 
600
      answer_list_add(answer, SGE_EVENT, STATUS_ESEMANTIC, ANSWER_QUALITY_ERROR);
 
601
      DEXIT;
 
602
      return -1;
 
603
   }
 
604
 
 
605
   reschedule_jobs(ctx, qep, force, answer, monitor);
 
606
 
 
607
   DEXIT;
 
608
   return 0;
 
609
}
 
610
 
 
611
/****
 
612
 **** qmod_queue_clean (static)
 
613
 ****
 
614
 **** cleans the specified queue (every job will be deleted)
 
615
 **** The user will do this via qconf -cq <qname>
 
616
 ****/
 
617
static int qmod_queue_clean(
 
618
sge_gdi_ctx_class_t *ctx,
 
619
lListElem *qep,
 
620
u_long32 force,
 
621
lList **answer,
 
622
char *user,
 
623
char *host,
 
624
int isoperator,
 
625
int isowner,
 
626
monitoring_t *monitor
 
627
) {
 
628
   lListElem *nextjep, *jep;
 
629
   const char *qname = NULL;
 
630
   DENTER(TOP_LAYER, "qmod_queue_clean");
 
631
 
 
632
   qname = lGetString(qep, QU_full_name);
 
633
 
 
634
   DPRINTF(("cleaning queue >%s<\n", qname ));
 
635
   
 
636
   if (!manop_is_manager(user)) {
 
637
      ERROR((SGE_EVENT, MSG_QUEUE_NOCLEANQPERMS)); 
 
638
      answer_list_add(answer, SGE_EVENT, STATUS_ESEMANTIC, ANSWER_QUALITY_ERROR);
 
639
      DEXIT;
 
640
      return -1;
 
641
   }
 
642
 
 
643
   /* using sge_commit_job(j, COMMIT_ST_FINISHED_FAILED) q->job_list
 
644
      could get modified so we have to be careful when iterating through the job list */
 
645
   nextjep = lFirst(*(object_type_get_master_list(SGE_TYPE_JOB)));
 
646
   while ((jep=nextjep)) {
 
647
      lListElem *jatep, *nexttep;
 
648
      nextjep = lNext(jep);
 
649
 
 
650
      nexttep = lFirst(lGetList(jep, JB_ja_tasks));
 
651
      while ((jatep=nexttep)) {
 
652
         nexttep = lNext(jatep);
 
653
 
 
654
         if (lGetSubStr(jatep, JG_qname, qname, JAT_granted_destin_identifier_list) != NULL) {
 
655
            /* 3: JOB_FINISH reports aborted */
 
656
            sge_commit_job(ctx, jep, jatep, NULL, COMMIT_ST_FINISHED_FAILED_EE, COMMIT_DEFAULT | COMMIT_NEVER_RAN, monitor);
 
657
         }
 
658
      }
 
659
   }
 
660
   INFO((SGE_EVENT, MSG_QUEUE_PURGEQ_SSS, user, host, qname ));
 
661
   answer_list_add(answer, SGE_EVENT, STATUS_OK, ANSWER_QUALITY_INFO);
 
662
 
 
663
   DRETURN(0);
 
664
}
 
665
 
 
666
/****
 
667
 **** qmod_job_reschedule (static)
 
668
 ****/
 
669
static void qmod_job_reschedule(
 
670
sge_gdi_ctx_class_t *ctx,
 
671
lListElem *jep,
 
672
lListElem *jatep,
 
673
lListElem *queueep,
 
674
u_long32 force,
 
675
lList **answer,
 
676
char *user,
 
677
char *host,
 
678
monitoring_t *monitor
 
679
) {
 
680
   DENTER(TOP_LAYER, "qmod_job_reschedule");
 
681
 
 
682
   reschedule_job(ctx, jep, jatep, queueep, force, answer, monitor);
 
683
 
 
684
   DEXIT;
 
685
}
 
686
/****
 
687
 **** qmod_job_suspend (static)
 
688
 ****/
 
689
static void qmod_job_suspend(
 
690
sge_gdi_ctx_class_t *ctx,
 
691
lListElem *jep,
 
692
lListElem *jatep,
 
693
lListElem *queueep,
 
694
u_long32 force,
 
695
lList **answer,
 
696
char *user,
 
697
char *host,
 
698
monitoring_t *monitor
 
699
) {
 
700
   int i;
 
701
   u_long32 state = 0;
 
702
   u_long32 jataskid = 0;
 
703
   u_long32 jobid = 0;
 
704
   bool migrate_on_suspend = false;
 
705
   u_long32 now;
 
706
 
 
707
   DENTER(TOP_LAYER, "qmod_job_suspend");
 
708
 
 
709
   now = sge_get_gmt();
 
710
 
 
711
   jobid = lGetUlong(jep, JB_job_number);
 
712
   jataskid = lGetUlong(jatep, JAT_task_number);
 
713
 
 
714
   /* determine whether we actually migrate upon suspend */
 
715
   if (lGetUlong(jep, JB_checkpoint_attr) & CHECKPOINT_SUSPEND)
 
716
      migrate_on_suspend = true;
 
717
 
 
718
   if (VALID(JSUSPENDED, lGetUlong(jatep, JAT_state))) {
 
719
      /* this job is already suspended or lives in a suspended queue */
 
720
      if (force && queueep) {
 
721
         /* here force means to send the suspend signal again 
 
722
            this can only be done if we know the queue this job
 
723
            runs in */
 
724
         if (sge_signal_queue(ctx, SGE_SIGSTOP, queueep, jep, jatep, monitor)) {
 
725
            if (job_is_array(jep)) {
 
726
               WARNING((SGE_EVENT, MSG_JOB_NOFORCESUSPENDTASK_SUU, user, sge_u32c(jobid), sge_u32c(jataskid)));
 
727
            } else {
 
728
               WARNING((SGE_EVENT, MSG_JOB_NOFORCESUSPENDJOB_SU, user, sge_u32c(jobid)));
 
729
            }
 
730
            answer_list_add(answer, SGE_EVENT, STATUS_ESEMANTIC, ANSWER_QUALITY_WARNING);
 
731
         }
 
732
         else {
 
733
            if (job_is_array(jep)) {
 
734
               WARNING((SGE_EVENT, MSG_JOB_FORCESUSPENDTASK_SUU, user, sge_u32c(jobid), sge_u32c(jataskid)));
 
735
            } else {
 
736
               WARNING((SGE_EVENT, MSG_JOB_FORCESUSPENDJOB_SU, user, sge_u32c(jobid)));
 
737
            }
 
738
            answer_list_add(answer, SGE_EVENT, STATUS_ESEMANTIC, ANSWER_QUALITY_WARNING);
 
739
         }
 
740
      }
 
741
      else {
 
742
         if (job_is_array(jep)) {
 
743
            WARNING((SGE_EVENT, MSG_JOB_ALREADYSUSPENDED_SUU, user, sge_u32c(jobid), sge_u32c(jataskid)));
 
744
         } else {
 
745
            WARNING((SGE_EVENT, MSG_JOB_ALREADYSUSPENDED_SU, user, sge_u32c(jobid)));
 
746
         }
 
747
         answer_list_add(answer, SGE_EVENT, STATUS_ESEMANTIC, ANSWER_QUALITY_WARNING);
 
748
      }
 
749
 
 
750
      /* 
 
751
      ** may be the queue is suspended, than the job might not be 
 
752
      */
 
753
      state = lGetUlong(jatep, JAT_state);
 
754
      CLEARBIT(JRUNNING, state);
 
755
      SETBIT(JSUSPENDED, state);
 
756
      lSetUlong(jatep, JAT_state, state);
 
757
      if (migrate_on_suspend)
 
758
         lSetUlong(jatep, JAT_stop_initiate_time, now);
 
759
 
 
760
      sge_event_spool(ctx,
 
761
                      answer, 0, sgeE_JATASK_MOD, 
 
762
                      jobid, jataskid, NULL, NULL, NULL,
 
763
                      jep, jatep, NULL, true, true);
 
764
   }
 
765
   else {   /* job wasn't suspended yet */
 
766
      if (queueep) {
 
767
         if ((i = sge_signal_queue(ctx, SGE_SIGSTOP, queueep, jep, jatep, monitor))) {
 
768
            if (job_is_array(jep)) {
 
769
               WARNING((SGE_EVENT, MSG_JOB_NOSUSPENDTASK_SUU, user, sge_u32c(jobid), sge_u32c(jataskid)));
 
770
            } else {
 
771
               WARNING((SGE_EVENT, MSG_JOB_NOSUSPENDJOB_SU, user, sge_u32c(jobid)));
 
772
            }
 
773
            answer_list_add(answer, SGE_EVENT, STATUS_ESEMANTIC, ANSWER_QUALITY_WARNING);
 
774
         }
 
775
      }
 
776
      else
 
777
         i = 1;
 
778
 
 
779
      if (force) {
 
780
         /* set jobs state to suspend in all cases */
 
781
         if (!i) {
 
782
            if (job_is_array(jep)) {
 
783
               INFO((SGE_EVENT, MSG_JOB_FORCESUSPENDTASK_SUU, user, sge_u32c(jobid), sge_u32c(jataskid)));
 
784
            } else {
 
785
               INFO((SGE_EVENT, MSG_JOB_FORCESUSPENDJOB_SU, user, sge_u32c(jobid)));
 
786
            }
 
787
            answer_list_add(answer, SGE_EVENT, STATUS_OK, ANSWER_QUALITY_INFO);
 
788
         }
 
789
 
 
790
         state = lGetUlong(jatep, JAT_state);
 
791
         CLEARBIT(JRUNNING, state);
 
792
         SETBIT(JSUSPENDED, state);
 
793
         lSetUlong(jatep, JAT_state, state);
 
794
         if (migrate_on_suspend)
 
795
            lSetUlong(jatep, JAT_stop_initiate_time, now);
 
796
         sge_event_spool(ctx,
 
797
                         answer, 0, sgeE_JATASK_MOD,
 
798
                         jobid, jataskid, NULL, NULL, NULL,
 
799
                         jep, jatep, NULL, true, true);
 
800
      }
 
801
      else {
 
802
         if (!i) {
 
803
            if (job_is_array(jep)) {
 
804
               INFO((SGE_EVENT, MSG_JOB_SUSPENDTASK_SUU, user, sge_u32c(jobid), sge_u32c(jataskid)));
 
805
            } else {
 
806
               INFO((SGE_EVENT, MSG_JOB_SUSPENDJOB_SU, user, sge_u32c(jobid)));
 
807
            }
 
808
            answer_list_add(answer, SGE_EVENT, STATUS_OK, ANSWER_QUALITY_INFO);
 
809
 
 
810
            state = lGetUlong(jatep, JAT_state);
 
811
            CLEARBIT(JRUNNING, state);
 
812
            SETBIT(JSUSPENDED, state);
 
813
            lSetUlong(jatep, JAT_state, state);
 
814
            if (migrate_on_suspend)
 
815
               lSetUlong(jatep, JAT_stop_initiate_time, now);
 
816
            sge_event_spool(ctx,
 
817
                            answer, 0, sgeE_JATASK_MOD, 
 
818
                            jobid, jataskid, NULL, NULL, NULL,
 
819
                            jep, jatep, NULL, true, true);
 
820
         }
 
821
      }
 
822
      reporting_create_job_log(NULL, now, JL_SUSPENDED, user, host, NULL, jep, jatep, NULL, NULL);
 
823
   }
 
824
   DEXIT;
 
825
}
 
826
 
 
827
/****
 
828
 **** qmod_job_unsuspend (static)
 
829
 ****/
 
830
static void qmod_job_unsuspend(
 
831
sge_gdi_ctx_class_t *ctx,
 
832
lListElem *jep,
 
833
lListElem *jatep,
 
834
lListElem *queueep,
 
835
u_long32 force,
 
836
lList **answer,
 
837
char *user,
 
838
char *host,
 
839
monitoring_t *monitor
 
840
) {
 
841
   int i;
 
842
   u_long32 state = 0;
 
843
   u_long32 jobid, jataskid;
 
844
   u_long32 now;
 
845
 
 
846
   DENTER(TOP_LAYER, "qmod_job_unsuspend");
 
847
 
 
848
   now = sge_get_gmt();
 
849
 
 
850
   jobid = lGetUlong(jep, JB_job_number);
 
851
   jataskid = lGetUlong(jatep, JAT_task_number);
 
852
 
 
853
   /* admin suspend may not override suspend from threshold */ 
 
854
   if (VALID(JSUSPENDED_ON_THRESHOLD, lGetUlong(jatep, JAT_state))) {
 
855
      if (VALID(JSUSPENDED, lGetUlong(jatep, JAT_state))) {
 
856
         if (job_is_array(jep)) {
 
857
            INFO((SGE_EVENT, MSG_JOB_RMADMSUSPENDTASK_SSUU, user, host, sge_u32c(jobid), sge_u32c(jataskid)));
 
858
         } else {
 
859
            INFO((SGE_EVENT, MSG_JOB_RMADMSUSPENDJOB_SSU, user, host, sge_u32c(jobid)));
 
860
         }
 
861
         answer_list_add(answer, SGE_EVENT, STATUS_OK, ANSWER_QUALITY_ERROR);
 
862
 
 
863
         state = lGetUlong(jatep, JAT_state);
 
864
         CLEARBIT(JSUSPENDED, state);
 
865
         lSetUlong(jatep, JAT_state, state);
 
866
         sge_event_spool(ctx,
 
867
                         answer, 0, sgeE_JATASK_MOD,
 
868
                         jobid, jataskid, NULL, NULL, NULL,
 
869
                         jep, jatep, NULL, true, true);
 
870
         reporting_create_job_log(NULL, now, JL_UNSUSPENDED, user, host, NULL, jep, jatep, NULL, NULL);
 
871
         DEXIT;
 
872
         return;
 
873
      } 
 
874
      else {
 
875
         /* guess admin tries to remove threshold suspension by qmon -us <jobid> */
 
876
         if (job_is_array(jep)) {
 
877
            WARNING((SGE_EVENT, MSG_JOB_NOADMSUSPENDTASK_SUU, user, sge_u32c(jobid), sge_u32c(jataskid)));
 
878
         } else {
 
879
            WARNING((SGE_EVENT, MSG_JOB_NOADMSUSPENDJOB_SU, user, sge_u32c(jobid)));
 
880
         }
 
881
         answer_list_add(answer, SGE_EVENT, STATUS_ESEMANTIC, ANSWER_QUALITY_WARNING);
 
882
         DEXIT;
 
883
         return;
 
884
      }
 
885
   }
 
886
 
 
887
   if (VALID(JRUNNING, lGetUlong(jatep, JAT_state))) {
 
888
      /* this job is already running */
 
889
      if (force && queueep) {
 
890
         /* 
 
891
         ** here force means to send the cont signal again 
 
892
         ** this can only be done if we know the queue this job
 
893
         ** runs in 
 
894
         */
 
895
         if (sge_signal_queue(ctx, SGE_SIGCONT, queueep, jep, jatep, monitor)) {
 
896
            if (job_is_array(jep)) {
 
897
               WARNING((SGE_EVENT, MSG_JOB_NOFORCEENABLETASK_SUU, user, sge_u32c(jobid), sge_u32c(jataskid)));
 
898
            } else {
 
899
               WARNING((SGE_EVENT, MSG_JOB_NOFORCEENABLEJOB_SU, user, sge_u32c(jobid)));
 
900
            }
 
901
            answer_list_add(answer, SGE_EVENT, STATUS_ESEMANTIC, ANSWER_QUALITY_WARNING);
 
902
         }
 
903
         else {
 
904
            if (job_is_array(jep)) {
 
905
               WARNING((SGE_EVENT, MSG_JOB_FORCEENABLETASK_SUU, user, sge_u32c(jobid), sge_u32c(jataskid)));
 
906
            } else {
 
907
               WARNING((SGE_EVENT, MSG_JOB_FORCEENABLEJOB_SU, user, sge_u32c(jobid)));
 
908
            }
 
909
            answer_list_add(answer, SGE_EVENT, STATUS_ESEMANTIC, ANSWER_QUALITY_WARNING);
 
910
         }
 
911
      }
 
912
      else {
 
913
         if (job_is_array(jep)) {
 
914
            WARNING((SGE_EVENT, MSG_JOB_ALREADYUNSUSPENDED_SUU, user, sge_u32c(jobid), sge_u32c(jataskid)));
 
915
         } else {
 
916
            WARNING((SGE_EVENT, MSG_JOB_ALREADYUNSUSPENDED_SU, user, sge_u32c(jobid))); 
 
917
         }
 
918
         answer_list_add(answer, SGE_EVENT, STATUS_ESEMANTIC, ANSWER_QUALITY_WARNING);
 
919
      }
 
920
      /* 
 
921
      ** job is already running, so no job information has to be changed 
 
922
      */
 
923
   }
 
924
   else {   /* job wasn't suspended till now */
 
925
      if (queueep) {
 
926
         if ((i = sge_signal_queue(ctx, SGE_SIGCONT, queueep, jep, jatep, monitor))) {
 
927
            if (job_is_array(jep)) {
 
928
               WARNING((SGE_EVENT, MSG_JOB_NOUNSUSPENDTASK_SUU, user, sge_u32c(jobid), sge_u32c(jataskid)));
 
929
            } else {
 
930
               WARNING((SGE_EVENT, MSG_JOB_NOUNSUSPENDJOB_SU, user, sge_u32c(jobid)));
 
931
            }
 
932
            answer_list_add(answer, SGE_EVENT, STATUS_ESEMANTIC, ANSWER_QUALITY_WARNING);
 
933
         }
 
934
      }
 
935
      else
 
936
         i = 1;
 
937
 
 
938
      if (force) {
 
939
         /* set jobs state to suspend in all cases */
 
940
         if (!i) {
 
941
            if (job_is_array(jep)) {
 
942
               INFO((SGE_EVENT, MSG_JOB_FORCEUNSUSPTASK_SSUU, user, host, sge_u32c(jobid), sge_u32c(jataskid)));
 
943
            } else {
 
944
               INFO((SGE_EVENT, MSG_JOB_FORCEUNSUSPJOB_SSU, user, host, sge_u32c(jobid)));
 
945
            }
 
946
            answer_list_add(answer, SGE_EVENT, STATUS_OK, ANSWER_QUALITY_ERROR);
 
947
         }
 
948
 
 
949
         state = lGetUlong(jatep, JAT_state);
 
950
         SETBIT(JRUNNING, state);
 
951
         CLEARBIT(JSUSPENDED, state);
 
952
         lSetUlong(jatep, JAT_state, state);
 
953
         sge_event_spool(ctx,
 
954
                         answer, 0, sgeE_JATASK_MOD,
 
955
                         jobid, jataskid, NULL, NULL, NULL,
 
956
                         jep, jatep, NULL, true, true);
 
957
      }
 
958
      else {
 
959
         /* set job state only if communication works */
 
960
         if (!i) {
 
961
            if (job_is_array(jep)) {
 
962
               INFO((SGE_EVENT, MSG_JOB_UNSUSPENDTASK_SUU, user, sge_u32c(jobid), sge_u32c(jataskid)));
 
963
            } else {
 
964
               INFO((SGE_EVENT, MSG_JOB_UNSUSPENDJOB_SU, user, sge_u32c(jobid)));
 
965
            }
 
966
            answer_list_add(answer, SGE_EVENT, STATUS_OK, ANSWER_QUALITY_ERROR);
 
967
            
 
968
            state = lGetUlong(jatep, JAT_state);
 
969
            SETBIT(JRUNNING, state);
 
970
            CLEARBIT(JSUSPENDED, state);
 
971
            lSetUlong(jatep, JAT_state, state);
 
972
            sge_event_spool(ctx,
 
973
                            answer, 0, sgeE_JATASK_MOD,
 
974
                            jobid, jataskid, NULL, NULL, NULL,
 
975
                            jep, jatep, NULL, true, true);
 
976
         }
 
977
      }
 
978
   }
 
979
   reporting_create_job_log(NULL, now, JL_UNSUSPENDED, user, host, NULL, jep, jatep, NULL, NULL);
 
980
   DEXIT;
 
981
}
 
982
 
 
983
 
 
984
void rebuild_signal_events()
 
985
{
 
986
   lListElem *cqueue, *jep, *jatep;
 
987
 
 
988
   DENTER(TOP_LAYER, "rebuild_signal_events");
 
989
 
 
990
   /* J O B */
 
991
   for_each(jep, *(object_type_get_master_list(SGE_TYPE_JOB)))
 
992
   {
 
993
      for_each (jatep, lGetList(jep, JB_ja_tasks))
 
994
      { 
 
995
         time_t when = (time_t)lGetUlong(jatep, JAT_pending_signal_delivery_time);
 
996
 
 
997
         if (lGetUlong(jatep, JAT_pending_signal) && (when > 0))
 
998
         {
 
999
            u_long32 key1 = lGetUlong(jep, JB_job_number);
 
1000
            u_long32 key2 = lGetUlong(jatep, JAT_task_number);
 
1001
            te_event_t ev = NULL;
 
1002
            
 
1003
            ev = te_new_event(when, TYPE_SIGNAL_RESEND_EVENT, ONE_TIME_EVENT, key1, key2, NULL);
 
1004
            te_add_event(ev);
 
1005
            te_free_event(&ev);
 
1006
         }
 
1007
      }
 
1008
   }
 
1009
 
 
1010
   /* Q U E U E */
 
1011
   for_each(cqueue, *(object_type_get_master_list(SGE_TYPE_CQUEUE)))
 
1012
   { 
 
1013
      lList *qinstance_list = lGetList(cqueue, CQ_qinstances);
 
1014
      lListElem *qinstance;
 
1015
 
 
1016
      for_each(qinstance, qinstance_list)
 
1017
      {
 
1018
         time_t when = (time_t)lGetUlong(qinstance, QU_pending_signal_delivery_time);
 
1019
 
 
1020
         if (lGetUlong(qinstance, QU_pending_signal) && (when > 0))
 
1021
         {
 
1022
            const char* str_key = lGetString(qinstance, QU_full_name); 
 
1023
            te_event_t ev = NULL;
 
1024
 
 
1025
            ev = te_new_event(when, TYPE_SIGNAL_RESEND_EVENT, ONE_TIME_EVENT, 0, 0, str_key);
 
1026
            te_add_event(ev);
 
1027
            te_free_event(&ev);
 
1028
         }
 
1029
      }
 
1030
   }
 
1031
 
 
1032
   DEXIT;
 
1033
   return;
 
1034
} /* rebuild_signal_events() */
 
1035
 
 
1036
/* this function is called by our timer mechanism for resending signals */  
 
1037
void resend_signal_event(sge_gdi_ctx_class_t *ctx, te_event_t anEvent, monitoring_t *monitor)
 
1038
{
 
1039
   lListElem *qep, *jep, *jatep;
 
1040
   u_long32 jobid = te_get_first_numeric_key(anEvent);
 
1041
   u_long32 jataskid = te_get_second_numeric_key(anEvent);
 
1042
   const char* queue = te_get_alphanumeric_key(anEvent);
 
1043
 
 
1044
   DENTER(TOP_LAYER, "resend_signal_event");
 
1045
 
 
1046
   MONITOR_WAIT_TIME(SGE_LOCK(LOCK_GLOBAL, LOCK_WRITE), monitor);
 
1047
 
 
1048
   if (queue == NULL) {
 
1049
      if (!(jep = job_list_locate(*(object_type_get_master_list(SGE_TYPE_JOB)), jobid)) || !(jatep=job_search_task(jep, NULL, jataskid)))
 
1050
      {
 
1051
         ERROR((SGE_EVENT, MSG_EVE_RESENTSIGNALTASK_UU, sge_u32c(jobid), sge_u32c(jataskid)));
 
1052
         SGE_UNLOCK(LOCK_GLOBAL, LOCK_WRITE);
 
1053
         DEXIT;
 
1054
         return;
 
1055
      }
 
1056
      
 
1057
      if ((qep = cqueue_list_locate_qinstance(*(object_type_get_master_list(SGE_TYPE_CQUEUE)), lGetString(jatep, JAT_master_queue)))) {
 
1058
         sge_signal_queue(ctx, lGetUlong(jatep, JAT_pending_signal), qep, jep, jatep, monitor);
 
1059
      }
 
1060
   } else {
 
1061
      if (!(qep = cqueue_list_locate_qinstance(*(object_type_get_master_list(SGE_TYPE_CQUEUE)), queue))) {
 
1062
         ERROR((SGE_EVENT, MSG_EVE_RESENTSIGNALQ_S, queue));
 
1063
         SGE_UNLOCK(LOCK_GLOBAL, LOCK_WRITE);
 
1064
         sge_free((char *)queue);
 
1065
         DEXIT;
 
1066
         return;
 
1067
      }
 
1068
      
 
1069
      sge_signal_queue(ctx, lGetUlong(qep, QU_pending_signal), qep, NULL, NULL, monitor);
 
1070
   }
 
1071
 
 
1072
   sge_free((char *)queue);
 
1073
 
 
1074
   SGE_UNLOCK(LOCK_GLOBAL, LOCK_WRITE);
 
1075
 
 
1076
   DEXIT;
 
1077
   return;
 
1078
}
 
1079
 
 
1080
static void sge_propagate_queue_suspension(const char *qnm, int how)
 
1081
{
 
1082
   lListElem *jep, *jatep;
 
1083
 
 
1084
   DENTER(TOP_LAYER, "sge_propagate_queue_suspension");
 
1085
 
 
1086
   DPRINTF(("searching for all jobs in queue %s due to %s\n", qnm, sge_sig2str(how)));
 
1087
   for_each (jep, *object_type_get_master_list(SGE_TYPE_JOB)) {
 
1088
      for_each (jatep, lGetList(jep, JB_ja_tasks)) {
 
1089
         if (lGetElemStr(lGetList(jatep, JAT_granted_destin_identifier_list), JG_qname, qnm)) {
 
1090
            u_long32 jstate;
 
1091
            DPRINTF(("found "sge_u32"."sge_u32"\n", lGetUlong(jep, JB_job_number), lGetUlong(jatep, JAT_task_number)));
 
1092
            jstate = lGetUlong(jatep, JAT_state);
 
1093
            if (how == SGE_SIGSTOP)
 
1094
               jstate |= JSUSPENDED_ON_SUBORDINATE;
 
1095
            else
 
1096
               jstate &= ~JSUSPENDED_ON_SUBORDINATE;
 
1097
            lSetUlong(jatep, JAT_state, jstate);
 
1098
         }
 
1099
      }
 
1100
   }
 
1101
 
 
1102
   DRETURN_VOID;
 
1103
}
 
1104
 
 
1105
/************************************************************************
 
1106
 This is called by the qmaster to:
 
1107
 - send a signal to all jobs in a queue (job_number == 0);
 
1108
 - send a signal to one job
 
1109
 ************************************************************************/
 
1110
int sge_signal_queue(
 
1111
sge_gdi_ctx_class_t *ctx,
 
1112
int how, /* signal */
 
1113
lListElem *qep,
 
1114
lListElem *jep,
 
1115
lListElem *jatep,
 
1116
monitoring_t *monitor
 
1117
) {
 
1118
   int i;
 
1119
   u_long32 next_delivery_time = 60;
 
1120
   u_long32 now;
 
1121
   sge_pack_buffer pb;
 
1122
   int sent = 0;
 
1123
 
 
1124
   DENTER(TOP_LAYER, "sge_signal_queue");
 
1125
 
 
1126
   now = sge_get_gmt();
 
1127
 
 
1128
   DEBUG((SGE_EVENT, "queue_signal: %d, queue: %s, job: %d, jatask: %d", how, 
 
1129
            (qep?lGetString(qep, QU_full_name):"none"),
 
1130
            (int)(jep?lGetUlong(jep,JB_job_number):-1),
 
1131
            (int)(jatep?lGetUlong(jatep,JAT_task_number):-1)
 
1132
        ));
 
1133
 
 
1134
   if (!jep && (how == SGE_SIGSTOP || how == SGE_SIGCONT))
 
1135
      sge_propagate_queue_suspension(lGetString(qep, QU_full_name), how);
 
1136
 
 
1137
   /* don't try to signal unheard queues */
 
1138
   if (!qinstance_state_is_unknown(qep)) {
 
1139
      const char *hnm, *pnm;
 
1140
 
 
1141
      pnm = prognames[EXECD]; 
 
1142
      hnm = lGetHost(qep, QU_qhostname);
 
1143
 
 
1144
      if ((i = init_packbuffer(&pb, 256, 0)) == PACK_SUCCESS) {
 
1145
         /* identifier for acknowledgement */
 
1146
         if (jep) {
 
1147
            /*
 
1148
             * Due to IZ 1619: pack signal only if
 
1149
             *    job is a non-parallel job
 
1150
             *    or all slaves of the parallel job have been acknowledged
 
1151
             */
 
1152
            if (!lGetString(jatep, JAT_master_queue) ||
 
1153
                is_pe_master_task_send(jatep)) {
 
1154
               /* TAG_SIGJOB */
 
1155
               packint(&pb, lGetUlong(jep, JB_job_number));   
 
1156
               packint(&pb, lGetUlong(jatep, JAT_task_number));
 
1157
               packstr(&pb, NULL);
 
1158
               packint(&pb, how); 
 
1159
            } 
 
1160
         } else {
 
1161
            /* TAG_SIGQUEUE */
 
1162
            packint(&pb, 0);
 
1163
            packint(&pb, 0); 
 
1164
            packstr(&pb, lGetString(qep, QU_full_name));
 
1165
            packint(&pb, how); 
 
1166
         }
 
1167
 
 
1168
         if (mconf_get_simulate_execds()) {
 
1169
            i = CL_RETVAL_OK;
 
1170
            if (jep && how == SGE_SIGKILL)
 
1171
               trigger_job_resend(sge_get_gmt(), NULL, lGetUlong(jep, JB_job_number), lGetUlong(jatep, JAT_task_number), 1);
 
1172
         } else {
 
1173
            if (pb_filled(&pb)) {
 
1174
               u_long32 dummy = 0;
 
1175
               i = gdi2_send_message_pb(ctx, 0, pnm, 1, hnm, jep ? TAG_SIGJOB: TAG_SIGQUEUE, 
 
1176
                             &pb, &dummy);
 
1177
            }
 
1178
         }
 
1179
 
 
1180
         MONITOR_MESSAGES_OUT(monitor);                          
 
1181
         clear_packbuffer(&pb);
 
1182
      } else {
 
1183
         i = CL_RETVAL_MALLOC;  /* an error */
 
1184
      }
 
1185
 
 
1186
      if (i != CL_RETVAL_OK) {
 
1187
         ERROR((SGE_EVENT, MSG_COM_NOUPDATEQSTATE_IS, how, lGetString(qep, QU_full_name)));
 
1188
         DRETURN(i);
 
1189
      }
 
1190
      sent = 1;
 
1191
   }
 
1192
 
 
1193
   next_delivery_time += now;
 
1194
 
 
1195
   /* If this is a operation on one job we enter the signal request in the
 
1196
      job structure. If the operation is not acknowledged in time we can do
 
1197
      further steps */
 
1198
   if (jep) {
 
1199
      te_event_t ev = NULL;
 
1200
 
 
1201
      DPRINTF(("JOB "sge_u32": %s signal %s (retry after "sge_u32" seconds) host: %s\n", 
 
1202
            lGetUlong(jep, JB_job_number), sent?"sent":"queued", sge_sig2str(how), next_delivery_time - now, 
 
1203
            lGetHost(qep, QU_qhostname)));
 
1204
      te_delete_one_time_event(TYPE_SIGNAL_RESEND_EVENT, lGetUlong(jep, JB_job_number),
 
1205
         lGetUlong(jatep, JAT_task_number), NULL);
 
1206
 
 
1207
      if (!mconf_get_simulate_execds()) {
 
1208
         lSetUlong(jatep, JAT_pending_signal, how);
 
1209
         ev = te_new_event((time_t)next_delivery_time, TYPE_SIGNAL_RESEND_EVENT, ONE_TIME_EVENT,
 
1210
            lGetUlong(jep, JB_job_number), lGetUlong(jatep, JAT_task_number), NULL);
 
1211
         te_add_event(ev);
 
1212
         te_free_event(&ev);
 
1213
         lSetUlong(jatep, JAT_pending_signal_delivery_time, next_delivery_time); 
 
1214
      }
 
1215
   } else {
 
1216
      te_event_t ev = NULL;
 
1217
 
 
1218
      DPRINTF(("QUEUE %s: %s signal %s (retry after "sge_u32" seconds) host %s\n", 
 
1219
            lGetString(qep, QU_full_name), sent?"sent":"queued", sge_sig2str(how), next_delivery_time - now,
 
1220
            lGetHost(qep, QU_qhostname)));
 
1221
      te_delete_one_time_event(TYPE_SIGNAL_RESEND_EVENT, 0, 0, lGetString(qep, QU_full_name));
 
1222
 
 
1223
      if (!mconf_get_simulate_execds()) {
 
1224
         lSetUlong(qep, QU_pending_signal, how);
 
1225
         ev = te_new_event((time_t)next_delivery_time, TYPE_SIGNAL_RESEND_EVENT, ONE_TIME_EVENT, 0, 0,
 
1226
            lGetString(qep, QU_full_name));
 
1227
         te_add_event(ev);
 
1228
         te_free_event(&ev);
 
1229
         lSetUlong(qep, QU_pending_signal_delivery_time, next_delivery_time);
 
1230
      }
 
1231
   }
 
1232
 
 
1233
   if (!jep) {/* signalling a queue ? - handle slave jobs in this queue */
 
1234
      signal_slave_jobs_in_queue(ctx, how, qep, monitor); 
 
1235
   }   
 
1236
   else {/* is this the master queue of this job to signal ? - then decide whether slave tasks also 
 
1237
           must get signalled */
 
1238
      if (!strcmp(lGetString(lFirst(lGetList(jatep, JAT_granted_destin_identifier_list)), 
 
1239
            JG_qname), lGetString(qep, QU_full_name))) {
 
1240
         signal_slave_tasks_of_job(ctx, how, jep, jatep, monitor); 
 
1241
      }
 
1242
   }   
 
1243
 
 
1244
   DEXIT;
 
1245
   return 0;
 
1246
} /* sge_signal_queue() */
 
1247
 
 
1248
/* in case we have to signal a queue 
 
1249
   in which slave tasks are running 
 
1250
   we have to notify the master execd 
 
1251
   where the master task of this job is running
 
1252
*/  
 
1253
static void signal_slave_jobs_in_queue(
 
1254
sge_gdi_ctx_class_t *ctx,
 
1255
int how, /* signal */
 
1256
lListElem *qep,
 
1257
monitoring_t *monitor
 
1258
) {
 
1259
   lList *gdil_lp;
 
1260
   lListElem *mq, *jep, *jatep;
 
1261
   const char *qname, *mqname, *pe_name;
 
1262
 
 
1263
   DENTER(TOP_LAYER, "signal_slave_jobs_in_queue");
 
1264
 
 
1265
   qname = lGetString(qep, QU_full_name);
 
1266
   /* test whether there are parallel jobs 
 
1267
      with a slave slot in this queue 
 
1268
      if so then signal this job */
 
1269
   for_each (jep, *(object_type_get_master_list(SGE_TYPE_JOB))) {
 
1270
      for_each (jatep, lGetList(jep, JB_ja_tasks)) {
 
1271
 
 
1272
         /* skip sequential and not running jobs */
 
1273
         if (lGetNumberOfElem( gdil_lp =
 
1274
               lGetList(jatep, JAT_granted_destin_identifier_list))<=1)
 
1275
            continue;
 
1276
       
 
1277
         /* signalling of not "slave controlled" parallel jobs will not work
 
1278
            since they are not known to the apropriate execd - we should
 
1279
            omit signalling in this case to prevent waste of communication bandwith */ 
 
1280
         if (!(pe_name=lGetString(jatep, JAT_granted_pe)) ||
 
1281
             !pe_list_locate(*object_type_get_master_list(SGE_TYPE_PE), pe_name))
 
1282
            continue;
 
1283
 
 
1284
         if (lGetElemStr(gdil_lp, JG_qname, qname) != NULL) {
 
1285
 
 
1286
            /* search master queue - needed for signalling of a job */
 
1287
            if ((mq = cqueue_list_locate_qinstance(*(object_type_get_master_list(SGE_TYPE_CQUEUE)), mqname = lGetString(
 
1288
                  lFirst(gdil_lp), JG_qname)))) {
 
1289
               DPRINTF(("found slave job "sge_u32" in queue %s master queue is %s\n", 
 
1290
                  lGetUlong(jep, JB_job_number), qname, mqname));
 
1291
               sge_signal_queue(ctx, how, mq, jep, jatep, monitor);
 
1292
            } else {
 
1293
               ERROR((SGE_EVENT, MSG_JOB_UNABLE2FINDMQ_SU, mqname, sge_u32c(lGetUlong(jep, JB_job_number))));
 
1294
            }
 
1295
         }
 
1296
      }
 
1297
   }
 
1298
 
 
1299
   DRETURN_VOID;
 
1300
}
 
1301
 
 
1302
static void signal_slave_tasks_of_job(sge_gdi_ctx_class_t *ctx, int how, lListElem *jep, lListElem *jatep, 
 
1303
                                      monitoring_t *monitor) 
 
1304
{
 
1305
   lList *gdil_lp;
 
1306
   lListElem *mq, *pe, *gdil_ep;
 
1307
   const char *qname, *pe_name;
 
1308
 
 
1309
   DENTER(TOP_LAYER, "signal_slave_tasks_of_job");
 
1310
 
 
1311
   /* do not signal slave tasks in case of checkpointing jobs with 
 
1312
      STOP/CONT when suspending means migration */
 
1313
   if ((how==SGE_SIGCONT || how==SGE_SIGSTOP) &&
 
1314
      (lGetUlong(jep, JB_checkpoint_attr)|CHECKPOINT_SUSPEND)!=0) {
 
1315
      DPRINTF(("omit signaling - checkpoint script does action for whole job\n"));
 
1316
      return;
 
1317
   }
 
1318
 
 
1319
   /* forward signal to slave exec hosts 
 
1320
      in case of slave controlled jobs */
 
1321
   if ( !((lGetNumberOfElem(gdil_lp=lGetList(jatep, JAT_granted_destin_identifier_list)))<=1 || 
 
1322
         !(pe_name=lGetString(jatep, JAT_granted_pe)) ||
 
1323
         !(pe=pe_list_locate(*object_type_get_master_list(SGE_TYPE_PE), pe_name)) ||
 
1324
         !lGetBool(pe, PE_control_slaves)))
 
1325
      for (gdil_ep=lNext(lFirst(gdil_lp)); gdil_ep; gdil_ep=lNext(gdil_ep))
 
1326
         if ((mq = cqueue_list_locate_qinstance(*(object_type_get_master_list(SGE_TYPE_CQUEUE)), qname = lGetString(gdil_ep, JG_qname)))) {
 
1327
            DPRINTF(("found slave job "sge_u32" in queue %s\n", 
 
1328
               lGetUlong(jep, JB_job_number), qname));
 
1329
            sge_signal_queue(ctx, how, mq, jep, jatep, monitor);
 
1330
         }
 
1331
 
 
1332
   DEXIT;
 
1333
   return;
 
1334
}
 
1335