~ubuntu-branches/ubuntu/utopic/gridengine/utopic

« back to all changes in this revision

Viewing changes to source/clients/qevent/qevent.c

  • Committer: Bazaar Package Importer
  • Author(s): Mark Hymers
  • Date: 2008-06-25 22:36:13 UTC
  • Revision ID: james.westby@ubuntu.com-20080625223613-tvd9xlhuoct9kyhm
Tags: upstream-6.2~beta2
ImportĀ upstreamĀ versionĀ 6.2~beta2

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*___INFO__MARK_BEGIN__*/
 
2
/*************************************************************************
 
3
 * 
 
4
 *  The Contents of this file are made available subject to the terms of
 
5
 *  the Sun Industry Standards Source License Version 1.2
 
6
 * 
 
7
 *  Sun Microsystems Inc., March, 2001
 
8
 * 
 
9
 * 
 
10
 *  Sun Industry Standards Source License Version 1.2
 
11
 *  =================================================
 
12
 *  The contents of this file are subject to the Sun Industry Standards
 
13
 *  Source License Version 1.2 (the "License"); You may not use this file
 
14
 *  except in compliance with the License. You may obtain a copy of the
 
15
 *  License at http://gridengine.sunsource.net/Gridengine_SISSL_license.html
 
16
 * 
 
17
 *  Software provided under this License is provided on an "AS IS" basis,
 
18
 *  WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING,
 
19
 *  WITHOUT LIMITATION, WARRANTIES THAT THE SOFTWARE IS FREE OF DEFECTS,
 
20
 *  MERCHANTABLE, FIT FOR A PARTICULAR PURPOSE, OR NON-INFRINGING.
 
21
 *  See the License for the specific provisions governing your rights and
 
22
 *  obligations concerning the Software.
 
23
 * 
 
24
 *   The Initial Developer of the Original Code is: Sun Microsystems, Inc.
 
25
 * 
 
26
 *   Copyright: 2001 by Sun Microsystems, Inc.
 
27
 * 
 
28
 *   All Rights Reserved.
 
29
 * 
 
30
 ************************************************************************/
 
31
/*___INFO__MARK_END__*/
 
32
#include <unistd.h>
 
33
#include <stdio.h>
 
34
#include <string.h>
 
35
#include <stdlib.h>
 
36
#include <sys/types.h>
 
37
 
 
38
#if 0
 
39
#define QEVENT_SHOW_ALL
 
40
#endif
 
41
 
 
42
#if defined(FREEBSD) || defined(NETBSD) || defined(DARWIN)
 
43
#include <sys/time.h>
 
44
#endif
 
45
 
 
46
#include <sys/resource.h>
 
47
#include <sys/wait.h>
 
48
 
 
49
#include "sge_string.h"
 
50
#include "sge_unistd.h"
 
51
#include "sge_all_listsL.h"
 
52
#include "usage.h"
 
53
#include "sig_handlers.h"
 
54
#include "commlib.h"
 
55
#include "sge_prog.h"
 
56
#include "sgermon.h"
 
57
#include "sge_log.h"
 
58
 
 
59
#include "msg_clients_common.h"
 
60
#include "msg_common.h"
 
61
 
 
62
#include "sge_answer.h"
 
63
#include "sge_mirror.h"
 
64
#include "sge_event.h"
 
65
#include "sge_time.h"
 
66
#include "sge_feature.h"
 
67
#include "sge_spool.h"
 
68
#include "qevent.h"
 
69
#include "sge_profiling.h"
 
70
#include "sge_mt_init.h"
 
71
 
 
72
#include "gdi/sge_gdi_ctx.h"
 
73
 
 
74
#if defined(SOLARIS) || defined(ALPHA)
 
75
/* ALPHA4 only has wait3() prototype if _XOPEN_SOURCE_EXTENDED is defined */
 
76
pid_t wait3(int *, int, struct rusage *);
 
77
#endif
 
78
 
 
79
 
 
80
u_long Global_jobs_running = 0;
 
81
u_long Global_jobs_registered = 0;
 
82
qevent_options *Global_qevent_options;
 
83
 
 
84
 
 
85
static void qevent_show_usage(void);
 
86
static void qevent_testsuite_mode(sge_evc_class_t *evc);
 
87
static void qevent_subscribe_mode(sge_evc_class_t *evc);
 
88
static char* qevent_get_event_name(int event);
 
89
static void qevent_trigger_scripts(int qevent_event, qevent_options *option_struct, lListElem *event);
 
90
static void qevent_start_trigger_script(int qevent_event, const char* script_file, lListElem *event);
 
91
static qevent_options* qevent_get_option_struct(void);
 
92
static void qevent_set_option_struct(qevent_options *option_struct);
 
93
 
 
94
 
 
95
static void  qevent_set_option_struct(qevent_options *option_struct) {
 
96
   Global_qevent_options=option_struct;
 
97
}
 
98
 
 
99
 
 
100
static qevent_options* qevent_get_option_struct(void) {
 
101
   return Global_qevent_options;
 
102
}
 
103
 
 
104
static void qevent_dump_pid_file(void) {
 
105
   sge_write_pid("qevent.pid");
 
106
}
 
107
 
 
108
static sge_callback_result 
 
109
print_event(sge_evc_class_t *evc, object_description *object_base, sge_object_type type, 
 
110
            sge_event_action action, lListElem *event, void *clientdata)
 
111
{
 
112
   char buffer[1024];
 
113
   dstring buffer_wrapper;
 
114
 
 
115
   DENTER(TOP_LAYER, "print_event");
 
116
 
 
117
   sge_dstring_init(&buffer_wrapper, buffer, sizeof(buffer));
 
118
 
 
119
   fprintf(stdout, "%s\n", event_text(event, &buffer_wrapper));
 
120
   fflush(stdout);
 
121
   /* create a callback error to test error handling */
 
122
   if(type == SGE_TYPE_GLOBAL_CONFIG) {
 
123
      DEXIT;
 
124
      return SGE_EMA_FAILURE;
 
125
   }
 
126
   
 
127
   DEXIT;
 
128
   return SGE_EMA_OK;
 
129
}
 
130
 
 
131
#ifndef QEVENT_SHOW_ALL
 
132
static sge_callback_result
 
133
print_jatask_event(sge_evc_class_t *evc, object_description *object_base, sge_object_type type, 
 
134
                   sge_event_action action, lListElem *event, void *clientdata)
 
135
{
 
136
   char buffer[1024];
 
137
   dstring buffer_wrapper;
 
138
 
 
139
   DENTER(TOP_LAYER, "print_jatask_event");
 
140
 
 
141
   sge_dstring_init(&buffer_wrapper, buffer, sizeof(buffer));
 
142
   
 
143
   DPRINTF(("%s\n", event_text(event, &buffer_wrapper)));
 
144
   if (lGetPosViaElem(event, ET_type, SGE_NO_ABORT) >= 0) {
 
145
      u_long32 type = lGetUlong(event, ET_type);
 
146
      u_long32 timestamp = lGetUlong(event, ET_timestamp);
 
147
      
 
148
      if (type == sgeE_JATASK_MOD) { 
 
149
         lList *jat = lGetList(event,ET_new_version);
 
150
         u_long job_id  = lGetUlong(event, ET_intkey);
 
151
         u_long task_id = lGetUlong(event, ET_intkey2);
 
152
         lListElem *ep = lFirst(jat);
 
153
         u_long job_status = lGetUlong(ep, JAT_status);
 
154
         int task_running = (job_status==JRUNNING || job_status==JTRANSFERING);
 
155
 
 
156
         if (task_running) {
 
157
            fprintf(stdout,"JOB_START (%ld.%ld:ECL_TIME="sge_U32CFormat")\n", job_id ,task_id,sge_u32c(timestamp));
 
158
            fflush(stdout);  
 
159
            Global_jobs_running++;
 
160
         }
 
161
      }
 
162
   
 
163
      if (type == sgeE_JOB_FINAL_USAGE) { 
 
164
         /* lList *jat = lGetList(event,ET_new_version); */
 
165
         u_long job_id = lGetUlong(event, ET_intkey);
 
166
         u_long task_id = lGetUlong(event, ET_intkey2);
 
167
         /* lWriteElemTo(event, stdout); */
 
168
         fprintf(stdout,"JOB_FINISH (%ld.%ld:ECL_TIME="sge_U32CFormat")\n", job_id, task_id,sge_u32c(timestamp));
 
169
         Global_jobs_running--;
 
170
         fflush(stdout);  
 
171
      }
 
172
      if (type == sgeE_JOB_ADD) { 
 
173
         lList *jat = lGetList(event,ET_new_version);
 
174
         u_long job_id  = lGetUlong(event, ET_intkey);
 
175
         u_long task_id = lGetUlong(event, ET_intkey2);
 
176
         lListElem *ep = lFirst(jat);
 
177
         const char* job_project = lGetString(ep, JB_project);
 
178
         if (job_project == NULL) {
 
179
            job_project = "NONE";
 
180
         }
 
181
         fprintf(stdout,"JOB_ADD (%ld.%ld:ECL_TIME="sge_U32CFormat":project=%s)\n", job_id, task_id, sge_u32c(timestamp),job_project);
 
182
         Global_jobs_registered++;
 
183
         fflush(stdout);  
 
184
      }
 
185
      if (type == sgeE_JOB_DEL) { 
 
186
         u_long job_id  = lGetUlong(event, ET_intkey);
 
187
         u_long task_id = lGetUlong(event, ET_intkey2);
 
188
         fprintf(stdout,"JOB_DEL (%ld.%ld:ECL_TIME="sge_U32CFormat")\n", job_id, task_id,sge_u32c(timestamp));
 
189
         Global_jobs_registered--;
 
190
         fflush(stdout);  
 
191
      }
 
192
 
 
193
   }
 
194
   /* create a callback error to test error handling */
 
195
   if(type == SGE_TYPE_GLOBAL_CONFIG) {
 
196
      DEXIT;
 
197
      return SGE_EMA_FAILURE;
 
198
   }
 
199
   
 
200
   DEXIT;
 
201
   return SGE_EMA_OK;
 
202
}
 
203
#endif
 
204
 
 
205
static sge_callback_result
 
206
analyze_jatask_event(sge_evc_class_t *evc, object_description *object_base,sge_object_type type, 
 
207
                     sge_event_action action, lListElem *event, void *clientdata)
 
208
{
 
209
   char buffer[1024];
 
210
   dstring buffer_wrapper;
 
211
 
 
212
   sge_dstring_init(&buffer_wrapper, buffer, sizeof(buffer));
 
213
   
 
214
   if (lGetPosViaElem(event, ET_type, SGE_NO_ABORT) >= 0) {
 
215
      u_long32 type = lGetUlong(event, ET_type);
 
216
 
 
217
      if (type == sgeE_JATASK_MOD) { 
 
218
         lList *jat = lGetList(event,ET_new_version);
 
219
         lListElem *ep = lFirst(jat);
 
220
         u_long job_status = lGetUlong(ep, JAT_status);
 
221
         int task_running = (job_status==JRUNNING || job_status==JTRANSFERING);
 
222
         if (task_running) {
 
223
         }
 
224
      }
 
225
 
 
226
      if (type == sgeE_JOB_FINAL_USAGE) { 
 
227
      }
 
228
 
 
229
      if (type == sgeE_JOB_ADD) { 
 
230
         /* lList *jat = lGetList(event,ET_new_version);
 
231
         u_long job_id  = lGetUlong(event, ET_intkey);
 
232
         u_long task_id = lGetUlong(event, ET_intkey2);
 
233
         lListElem *ep = lFirst(jat); */
 
234
      }
 
235
 
 
236
      if (type == sgeE_JOB_DEL) { 
 
237
         qevent_trigger_scripts(QEVENT_JB_END, qevent_get_option_struct(), event);
 
238
      }
 
239
 
 
240
      if (type == sgeE_JATASK_DEL) { 
 
241
         qevent_trigger_scripts(QEVENT_JB_TASK_END,qevent_get_option_struct() , event);
 
242
      }
 
243
 
 
244
 
 
245
   }
 
246
   /* create a callback error to test error handling */
 
247
   if(type == SGE_TYPE_GLOBAL_CONFIG) {
 
248
      return SGE_EMA_FAILURE;
 
249
   }
 
250
   
 
251
   return SGE_EMA_OK;
 
252
}
 
253
 
 
254
 
 
255
 
 
256
static void qevent_trigger_scripts( int qevent_event, qevent_options *option_struct, lListElem *event) {
 
257
 
 
258
   int i=0;
 
259
   DENTER(TOP_LAYER, "qevent_trigger_scripts");
 
260
   if (option_struct->trigger_option_count > 0) {
 
261
      INFO((SGE_EVENT, "trigger for event "SFN"\n", qevent_get_event_name(qevent_event) ));
 
262
      for (i=0;i<option_struct->trigger_option_count;i++) {
 
263
         if ( (option_struct->trigger_option_events)[i] == qevent_event ) {
 
264
            qevent_start_trigger_script(qevent_event ,(option_struct->trigger_option_scripts)[i], event);
 
265
         }
 
266
      }
 
267
   }
 
268
   DEXIT;
 
269
}
 
270
 
 
271
static void qevent_start_trigger_script(int qevent_event, const char* script_file, lListElem *event ) {
 
272
   u_long jobid, taskid;
 
273
   const char* event_name;
 
274
   int pid;
 
275
   char buffer[MAX_STRING_SIZE];
 
276
   char buffer2[MAX_STRING_SIZE];
 
277
 
 
278
   DENTER(TOP_LAYER, "qevent_start_trigger_script");
 
279
 
 
280
   jobid  = lGetUlong(event, ET_intkey);
 
281
   taskid = lGetUlong(event, ET_intkey2);
 
282
   event_name = qevent_get_event_name(qevent_event);
 
283
   
 
284
 
 
285
   /* test if script is executable and valid file */
 
286
   if (!sge_is_file(script_file)) {
 
287
      ERROR((SGE_EVENT, "no script file: "SFQ"\n", script_file));
 
288
      DEXIT;
 
289
      return;
 
290
   }
 
291
 
 
292
   /* is file executable ? */
 
293
   if (!sge_is_executable(script_file)) {  
 
294
      ERROR((SGE_EVENT, "file not executable: "SFQ"\n", script_file));
 
295
      DEXIT;
 
296
      return;
 
297
   } 
 
298
 
 
299
   pid = fork();
 
300
   if (pid < 0) {
 
301
      ERROR((SGE_EVENT, "fork() error\n"));
 
302
      DEXIT;
 
303
      return;
 
304
   }
 
305
 
 
306
   if (pid > 0) {
 
307
      int exit_status;
 
308
 
 
309
#if !(defined(CRAY) || defined(INTERIX))
 
310
         struct rusage rusage;
 
311
#endif
 
312
 
 
313
#if defined(SVR3) || defined(_BSD)
 
314
         union wait status;
 
315
#else
 
316
         int status;
 
317
#endif
 
318
#if defined(CRAY) || defined(INTERIX)
 
319
         waitpid(pid, &status, 0);
 
320
#else
 
321
         wait3(&status, 0, &rusage);
 
322
#endif
 
323
#if defined(SVR3) || defined(_BSD)
 
324
         exit_status = status.w_retcode;
 
325
#else
 
326
         exit_status = status;
 
327
#endif
 
328
 
 
329
      if ( WEXITSTATUS(exit_status) == 0 ) {
 
330
         INFO((SGE_EVENT,"exit status of script: "sge_U32CFormat"\n", sge_u32c(WEXITSTATUS(exit_status))));
 
331
      } else {
 
332
         ERROR((SGE_EVENT,"exit status of script: "sge_U32CFormat"\n", sge_u32c(WEXITSTATUS(exit_status))));
 
333
      }
 
334
      DEXIT;
 
335
      return;
 
336
   } else {
 
337
      const char *basename = sge_basename( script_file, '/' );
 
338
      /*      SETPGRP;  */
 
339
      /*      sge_close_all_fds(NULL); */
 
340
      sprintf(buffer  ,sge_U32CFormat,sge_u32c(jobid));
 
341
      sprintf(buffer2 ,sge_U32CFormat,sge_u32c(taskid)); 
 
342
      execlp(script_file, basename, event_name, buffer, buffer2, (char *)0);
 
343
   }
 
344
   exit(1);
 
345
}
 
346
 
 
347
static void qevent_show_usage(void) {
 
348
   dstring ds;
 
349
   char buffer[256];
 
350
   
 
351
   sge_dstring_init(&ds, buffer, sizeof(buffer));
 
352
 
 
353
   fprintf(stdout, "%s\n", feature_get_product_name(FS_SHORT_VERSION, &ds));
 
354
   fprintf(stdout, "%s\n", MSG_SRC_USAGE );
 
355
 
 
356
   fprintf(stdout,"qevent [-h|-help] -ts|-testsuite\n");
 
357
   fprintf(stdout,"qevent [-h|-help] -sm|-subscribe\n");
 
358
   fprintf(stdout,"qevent [-h|-help] -trigger EVENT SCRIPT [ -trigger EVENT SCRIPT, ... ]\n\n");
 
359
   
 
360
   fprintf(stdout,"   -h,  -help             show usage\n");
 
361
   fprintf(stdout,"   -ts, -testsuite        run in testsuite mode\n");
 
362
   fprintf(stdout,"   -sm, -subscribe        run in subscribe mode\n");
 
363
   fprintf(stdout,"   -trigger EVENT SCRIPT  start SCRIPT (executable) when EVENT occurs\n");
 
364
   fprintf(stdout,"\n");
 
365
   fprintf(stdout,"SCRIPT - path to a executable shell script\n");
 
366
   fprintf(stdout,"         1. command line argument: event name\n");
 
367
   fprintf(stdout,"         2. command line argument: jobid\n");
 
368
   fprintf(stdout,"         3. command line argument: taskid\n");
 
369
   fprintf(stdout,"EVENT  - One of the following event category:\n");
 
370
   fprintf(stdout,"         %s      - job end event\n", qevent_get_event_name(QEVENT_JB_END));
 
371
   fprintf(stdout,"         %s - job task end event\n", qevent_get_event_name(QEVENT_JB_TASK_END));
 
372
}
 
373
 
 
374
 
 
375
static void qevent_parse_command_line(int argc, char **argv, qevent_options *option_struct) {
 
376
 
 
377
   
 
378
   DENTER(TOP_LAYER, "qevent_parse_command_line");
 
379
 
 
380
   option_struct->help_option = 0;
 
381
   option_struct->testsuite_option = 0;
 
382
   option_struct->subscribe_option = 0;
 
383
   option_struct->trigger_option_count =0;
 
384
 
 
385
   while (*(++argv)) {
 
386
      if (!strcmp("-h", *argv) || !strcmp("-help", *argv)) {
 
387
         option_struct->help_option = 1;
 
388
         continue;
 
389
      }
 
390
      if (!strcmp("-ts", *argv) || !strcmp("-testsuite", *argv)) {
 
391
         option_struct->testsuite_option = 1;
 
392
         continue;
 
393
      }
 
394
      if (!strcmp("-sm", *argv) || !strcmp("-subscribe", *argv)) {
 
395
         option_struct->subscribe_option = 1;
 
396
         continue;
 
397
      }
 
398
      if (!strcmp("-trigger", *argv)) {
 
399
         int ok = 0;
 
400
         if (option_struct->trigger_option_count >= MAX_TRIGGER_SCRIPTS ) {
 
401
            sge_dstring_sprintf(option_struct->error_message,
 
402
                                "option \"-trigger\": only "sge_U32CFormat" trigger arguments supported\n",
 
403
                                sge_u32c(MAX_TRIGGER_SCRIPTS) );
 
404
            break; 
 
405
         }
 
406
 
 
407
         ++argv;
 
408
         if (*argv) {
 
409
            /* get EVENT argument */
 
410
            if (strcmp(qevent_get_event_name(QEVENT_JB_END),*argv) == 0) {
 
411
               ok = 1;
 
412
               (option_struct->trigger_option_events)[option_struct->trigger_option_count] = QEVENT_JB_END;
 
413
            } 
 
414
            if (strcmp(qevent_get_event_name(QEVENT_JB_TASK_END),*argv) == 0) {
 
415
               ok = 1;
 
416
               (option_struct->trigger_option_events)[option_struct->trigger_option_count] = QEVENT_JB_TASK_END;
 
417
            } 
 
418
 
 
419
            if (!ok) {
 
420
               sge_dstring_append(option_struct->error_message,"option \"-trigger\": undefined EVENT type\n");
 
421
               break; 
 
422
            }
 
423
         } else {
 
424
            sge_dstring_append(option_struct->error_message,"option \"-trigger\": found no EVENT argument\n");
 
425
            break;
 
426
         }
 
427
         ++argv;
 
428
         if (*argv) {
 
429
            /* get SCRIPT argument */
 
430
 
 
431
            /* check for SCRIPT file */
 
432
            if (!sge_is_file(*argv)) {
 
433
               sge_dstring_sprintf(option_struct->error_message,
 
434
                                   "option \"-trigger\": SCRIPT file %s not found\n",
 
435
                                   (*argv));
 
436
               break;
 
437
            }
 
438
 
 
439
            /* is file executable ? */
 
440
            if (!sge_is_executable(*argv)) {  
 
441
               sge_dstring_sprintf(option_struct->error_message,
 
442
                                   "option \"-trigger\": SCRIPT file %s not executable\n",
 
443
                                   (*argv));
 
444
               break;
 
445
 
 
446
            } 
 
447
 
 
448
            (option_struct->trigger_option_scripts)[option_struct->trigger_option_count] = *argv;
 
449
            (option_struct->trigger_option_count)++;
 
450
         } else {
 
451
            sge_dstring_append(option_struct->error_message,"option \"-trigger\": found no SCRIPT argument\n");
 
452
            break;
 
453
         }
 
454
         continue;
 
455
      }
 
456
 
 
457
 
 
458
      /* unkown option */
 
459
      if ( *argv[0] == '-' ) {  
 
460
         sge_dstring_append(option_struct->error_message,"unkown option: ");
 
461
         sge_dstring_append(option_struct->error_message,*argv);
 
462
         sge_dstring_append(option_struct->error_message,"\n");
 
463
      } else {
 
464
         sge_dstring_append(option_struct->error_message,"unkown argument: ");
 
465
         sge_dstring_append(option_struct->error_message,*argv);
 
466
         sge_dstring_append(option_struct->error_message,"\n");
 
467
      }
 
468
   } 
 
469
   DEXIT;
 
470
}
 
471
 
 
472
int main(int argc, char *argv[])
 
473
{
 
474
   qevent_options enabled_options;
 
475
   dstring errors = DSTRING_INIT;
 
476
   int i, gdi_setup;
 
477
   lList *alp = NULL;
 
478
   sge_gdi_ctx_class_t *ctx = NULL; 
 
479
   sge_evc_class_t *evc = NULL;
 
480
 
 
481
   DENTER_MAIN(TOP_LAYER, "qevent");
 
482
 
 
483
/*    sge_mt_init(); */
 
484
 
 
485
   /* dump pid to file */
 
486
   qevent_dump_pid_file();
 
487
 
 
488
   /* parse command line */
 
489
   enabled_options.error_message = &errors;
 
490
   qevent_set_option_struct(&enabled_options);
 
491
   qevent_parse_command_line(argc, argv, &enabled_options);
 
492
 
 
493
   
 
494
 
 
495
   /* check if help option is set */
 
496
   if (enabled_options.help_option) {
 
497
      qevent_show_usage();
 
498
      sge_dstring_free(enabled_options.error_message);
 
499
      SGE_EXIT((void**)&ctx, 0);
 
500
   }
 
501
 
 
502
   /* are there command line parsing errors ? */
 
503
   if (sge_dstring_get_string(enabled_options.error_message)) {
 
504
      ERROR((SGE_EVENT, "%s", sge_dstring_get_string(enabled_options.error_message) ));
 
505
      qevent_show_usage();
 
506
      sge_dstring_free(enabled_options.error_message);
 
507
      SGE_EXIT((void**)&ctx, 1);
 
508
   }
 
509
 
 
510
 
 
511
   log_state_set_log_gui(1);
 
512
   sge_setup_sig_handlers(QEVENT);
 
513
 
 
514
   /* setup event client */
 
515
   gdi_setup = sge_gdi2_setup(&ctx, QEVENT, MAIN_THREAD, &alp);
 
516
   if (gdi_setup != AE_OK) {
 
517
      answer_list_output(&alp);
 
518
      sge_dstring_free(enabled_options.error_message);
 
519
      SGE_EXIT((void**)&ctx, 1);
 
520
   }
 
521
   /* TODO: how is the memory we allocate here released ???, SGE_EXIT doesn't */
 
522
   if (false == sge_gdi2_evc_setup(&evc, ctx, EV_ID_ANY, &alp, NULL)) {
 
523
      answer_list_output(&alp);
 
524
      sge_dstring_free(enabled_options.error_message);
 
525
      SGE_EXIT((void**)&ctx, 1);
 
526
   }
 
527
 
 
528
   /* ok, start over ... */
 
529
   /* check for testsuite option */
 
530
   
 
531
   if (enabled_options.testsuite_option) {
 
532
      /* only for testsuite */
 
533
      qevent_testsuite_mode(evc);
 
534
      sge_dstring_free(enabled_options.error_message);
 
535
      SGE_EXIT((void**)&ctx, 0);
 
536
   }
 
537
 
 
538
   /* check for subscribe option */
 
539
   if (enabled_options.subscribe_option) {
 
540
      /* only for testsuite */
 
541
      qevent_subscribe_mode(evc);
 
542
      sge_dstring_free(enabled_options.error_message);
 
543
      SGE_EXIT((void**)&ctx, 0);
 
544
   }
 
545
 
 
546
   if (enabled_options.trigger_option_count > 0) {
 
547
      lCondition *where =NULL;
 
548
      lEnumeration *what = NULL;
 
549
 
 
550
      sge_mirror_initialize(evc, EV_ID_ANY, "sge_mirror -trigger", true, 
 
551
                            NULL, NULL, NULL, NULL, NULL);
 
552
      evc->ec_set_busy_handling(evc, EV_BUSY_UNTIL_ACK);
 
553
 
 
554
      /* put out information about -trigger option */
 
555
      for (i=0;i<enabled_options.trigger_option_count;i++) {
 
556
         INFO((SGE_EVENT, "trigger script for %s events: %s\n",
 
557
                         qevent_get_event_name((enabled_options.trigger_option_events)[i]), 
 
558
                         (enabled_options.trigger_option_scripts)[i]));
 
559
         switch((enabled_options.trigger_option_events)[i]) {
 
560
            case QEVENT_JB_END:
 
561
                  
 
562
                  /* build mask for the job structure to contain only the needed elements */
 
563
                  where = NULL; 
 
564
                  what = lWhat("%T(%I %I %I %I %I %I %I %I)", JB_Type, JB_job_number, JB_ja_tasks, 
 
565
                                                              JB_ja_structure, JB_ja_n_h_ids, JB_ja_u_h_ids, 
 
566
                                                              JB_ja_s_h_ids,JB_ja_o_h_ids, JB_ja_template);
 
567
                  
 
568
                  /* register for job events */ 
 
569
                  sge_mirror_subscribe(evc, SGE_TYPE_JOB, analyze_jatask_event, NULL, NULL, where, what);
 
570
                  evc->ec_set_flush(evc, sgeE_JOB_DEL,true, 1);
 
571
 
 
572
                  /* the mirror interface registers more events, than we need,
 
573
                     thus we free the ones, we do not need */
 
574
                /*  evc->ec_unsubscribe(evc, sgeE_JOB_LIST); */
 
575
                  evc->ec_unsubscribe(evc, sgeE_JOB_MOD);
 
576
                  evc->ec_unsubscribe(evc, sgeE_JOB_MOD_SCHED_PRIORITY);
 
577
                  evc->ec_unsubscribe(evc, sgeE_JOB_USAGE);
 
578
                  evc->ec_unsubscribe(evc, sgeE_JOB_FINAL_USAGE);
 
579
               /*   evc->ec_unsubscribe(evc, sgeE_JOB_ADD); */
 
580
 
 
581
                  /* free the what and where mask */
 
582
                  lFreeWhere(&where);
 
583
                  lFreeWhat(&what);
 
584
               break;
 
585
            case QEVENT_JB_TASK_END:
 
586
            
 
587
                  /* build mask for the job structure to contain only the needed elements */
 
588
                  where = NULL; 
 
589
                  what = lWhat("%T(%I)", JAT_Type, JAT_status);
 
590
                  /* register for JAT events */ 
 
591
                  sge_mirror_subscribe(evc, SGE_TYPE_JATASK, analyze_jatask_event, NULL, NULL, where, what);
 
592
                  evc->ec_set_flush(evc, sgeE_JATASK_DEL,true, 1);
 
593
                  
 
594
                  /* the mirror interface registers more events, than we need,
 
595
                     thus we free the ones, we do not need */ 
 
596
                  evc->ec_unsubscribe(evc, sgeE_JATASK_ADD);
 
597
                  evc->ec_unsubscribe(evc, sgeE_JATASK_MOD);
 
598
                  /* free the what and where mask */
 
599
                  lFreeWhere(&where);
 
600
                  lFreeWhat(&what);
 
601
               break;
 
602
         }        
 
603
      }
 
604
 
 
605
      while(!shut_me_down) {
 
606
         sge_mirror_error error = sge_mirror_process_events(evc);
 
607
         if (error == SGE_EM_TIMEOUT && !shut_me_down ) {
 
608
            sleep(10);
 
609
            continue;
 
610
         }
 
611
      }
 
612
 
 
613
      sge_mirror_shutdown(evc);
 
614
 
 
615
      sge_dstring_free(enabled_options.error_message);
 
616
      sge_prof_cleanup();
 
617
      SGE_EXIT((void**)&ctx, 0);
 
618
      return 0;
 
619
   }
 
620
 
 
621
 
 
622
   ERROR((SGE_EVENT, "no option selected\n" ));
 
623
   qevent_show_usage();
 
624
   sge_dstring_free(enabled_options.error_message);
 
625
   sge_prof_cleanup();
 
626
   SGE_EXIT((void**)&ctx, 1);
 
627
   return 1;
 
628
}
 
629
 
 
630
static char* qevent_get_event_name(int event) {
 
631
  
 
632
   switch(event) {
 
633
      case QEVENT_JB_END:
 
634
         return "JB_END";
 
635
      case QEVENT_JB_TASK_END:
 
636
         return "JB_TASK_END";
 
637
   }
 
638
   return "unexpected event id";
 
639
}
 
640
 
 
641
 
 
642
 
 
643
static void qevent_testsuite_mode(sge_evc_class_t *evc) 
 
644
{
 
645
#ifndef QEVENT_SHOW_ALL
 
646
   u_long32 timestamp;
 
647
   lCondition *where =NULL;
 
648
   lEnumeration *what = NULL;
 
649
 
 
650
   const int job_nm[] = {       
 
651
         JB_job_number,
 
652
         JB_host,
 
653
         JB_category,            
 
654
         JB_project, 
 
655
         JB_ja_tasks,
 
656
         JB_ja_structure,
 
657
         JB_ja_n_h_ids,
 
658
         JB_ja_u_h_ids,
 
659
         JB_ja_s_h_ids,
 
660
         JB_ja_o_h_ids,   
 
661
         JB_ja_template,
 
662
         NoName
 
663
      };
 
664
 
 
665
   const int jat_nm[] = {     
 
666
      JAT_status, 
 
667
      JAT_task_number,
 
668
      NoName
 
669
   };  
 
670
#endif
 
671
   
 
672
   DENTER(TOP_LAYER, "qevent_testsuite_mode");
 
673
 
 
674
   sge_mirror_initialize(evc, EV_ID_ANY, "qevent", true,
 
675
                         NULL, NULL, NULL, NULL, NULL);
 
676
 
 
677
#ifdef QEVENT_SHOW_ALL
 
678
   sge_mirror_subscribe(evc, SGE_TYPE_ALL, print_event, NULL, NULL, NULL, NULL);
 
679
#else /* QEVENT_SHOW_ALL */
 
680
   where = NULL; 
 
681
   what =  lIntVector2What(JB_Type, job_nm); 
 
682
   sge_mirror_subscribe(evc, SGE_TYPE_JOB, print_jatask_event, NULL, NULL, where, what);
 
683
   lFreeWhere(&where);
 
684
   lFreeWhat(&what);
 
685
   
 
686
   where = NULL; 
 
687
   what = lIntVector2What(JAT_Type, jat_nm); 
 
688
   sge_mirror_subscribe(evc, SGE_TYPE_JATASK, print_jatask_event, NULL, NULL, where, what);
 
689
   lFreeWhere(&where);
 
690
   lFreeWhat(&what);
 
691
 
 
692
   /* we want a 5 second event delivery interval */
 
693
   evc->ec_set_edtime(evc, 5);
 
694
 
 
695
   /* and have our events flushed immediately */
 
696
   evc->ec_set_flush(evc, sgeE_JATASK_MOD, true, 1);
 
697
   evc->ec_set_flush(evc, sgeE_JOB_FINAL_USAGE, true, 1);
 
698
   evc->ec_set_flush(evc, sgeE_JOB_ADD, true, 1);
 
699
   evc->ec_set_flush(evc, sgeE_JOB_DEL, true, 1);
 
700
 
 
701
#endif /* QEVENT_SHOW_ALL */
 
702
   
 
703
   while (!shut_me_down) {
 
704
      sge_mirror_error error = sge_mirror_process_events(evc);
 
705
      if (error == SGE_EM_TIMEOUT && !shut_me_down) {
 
706
         sleep(10);
 
707
         continue;
 
708
      }
 
709
 
 
710
#ifndef QEVENT_SHOW_ALL
 
711
      timestamp = sge_get_gmt();
 
712
      fprintf(stdout,"ECL_STATE (jobs_running=%ld:jobs_registered=%ld:ECL_TIME="sge_U32CFormat")\n",
 
713
              Global_jobs_running,Global_jobs_registered,sge_u32c(timestamp));
 
714
      fflush(stdout);  
 
715
#endif
 
716
   }
 
717
 
 
718
   sge_mirror_shutdown(evc);
 
719
 
 
720
   DEXIT;
 
721
}
 
722
 
 
723
/****** qevent/qevent_subscribe_mode() *****************************************
 
724
*  NAME
 
725
*     qevent_subscribe_mode() -- ??? 
 
726
*
 
727
*  SYNOPSIS
 
728
*     static void qevent_subscribe_mode(sge_evc_class_t *evc) 
 
729
*
 
730
*  FUNCTION
 
731
*     ??? 
 
732
*
 
733
*  INPUTS
 
734
*     sge_evc_class_t *evc - ??? 
 
735
*
 
736
*  RESULT
 
737
*     static void - 
 
738
*
 
739
*  EXAMPLE
 
740
*     ??? 
 
741
*
 
742
*  NOTES
 
743
*     MT-NOTE: qevent_subscribe_mode() is not MT safe 
 
744
*
 
745
*  BUGS
 
746
*     ??? 
 
747
*
 
748
*  SEE ALSO
 
749
*     ???/???
 
750
*******************************************************************************/
 
751
static void qevent_subscribe_mode(sge_evc_class_t *evc) 
 
752
{
 
753
   sge_object_type event_type = SGE_TYPE_ADMINHOST;
 
754
   
 
755
   DENTER(TOP_LAYER, "qevent_subscribe_mode");
 
756
 
 
757
   sge_mirror_initialize(evc, EV_ID_ANY, "qevent", true,
 
758
                         NULL, NULL, NULL, NULL, NULL);
 
759
   sge_mirror_subscribe(evc, SGE_TYPE_SHUTDOWN, print_event, NULL, NULL, NULL, NULL);
 
760
   sge_mirror_subscribe(evc, SGE_TYPE_ADMINHOST, print_event, NULL, NULL, NULL, NULL);
 
761
 
 
762
   while(!shut_me_down) {
 
763
      sge_mirror_error error = sge_mirror_process_events(evc);
 
764
      if (evc != NULL) {
 
765
         if (event_type < SGE_TYPE_NONE) {
 
766
            event_type++;
 
767
            printf("Subscribe event_type: %d\n", event_type);
 
768
            error = sge_mirror_subscribe(evc, event_type, print_event, NULL, NULL, NULL, NULL);
 
769
         } else {   
 
770
            event_type = SGE_TYPE_ADMINHOST;
 
771
            printf("Unsubscribe all event_types\n");
 
772
            error = sge_mirror_unsubscribe(evc, SGE_TYPE_ALL);
 
773
         }
 
774
      }   
 
775
      if (error == SGE_EM_TIMEOUT && !shut_me_down) {
 
776
         printf("error was SGE_EM_TIMEOUT\n");
 
777
         sleep(10);
 
778
         continue;
 
779
      }
 
780
   }
 
781
 
 
782
   sge_mirror_shutdown(evc);
 
783
 
 
784
   DEXIT;
 
785
}
 
786