~ubuntu-branches/ubuntu/utopic/gridengine/utopic

« back to all changes in this revision

Viewing changes to source/libs/evm/sge_event_master.c

  • Committer: Bazaar Package Importer
  • Author(s): Mark Hymers
  • Date: 2008-06-25 22:36:13 UTC
  • Revision ID: james.westby@ubuntu.com-20080625223613-tvd9xlhuoct9kyhm
Tags: upstream-6.2~beta2
ImportĀ upstreamĀ versionĀ 6.2~beta2

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*___INFO__MARK_BEGIN__*/
 
2
/*************************************************************************
 
3
 * 
 
4
 *  The Contents of this file are made available subject to the terms of
 
5
 *  the Sun Industry Standards Source License Version 1.2
 
6
 * 
 
7
 *  Sun Microsystems Inc., March, 2001
 
8
 * 
 
9
 * 
 
10
 *  Sun Industry Standards Source License Version 1.2
 
11
 *  =================================================
 
12
 *  The contents of this file are subject to the Sun Industry Standards
 
13
 *  Source License Version 1.2 (the "License"); You may not use this file
 
14
 *  except in compliance with the License. You may obtain a copy of the
 
15
 *  License at http://gridengine.sunsource.net/Gridengine_SISSL_license.html
 
16
 * 
 
17
 *  Software provided under this License is provided on an "AS IS" basis,
 
18
 *  WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING,
 
19
 *  WITHOUT LIMITATION, WARRANTIES THAT THE SOFTWARE IS FREE OF DEFECTS,
 
20
 *  MERCHANTABLE, FIT FOR A PARTICULAR PURPOSE, OR NON-INFRINGING.
 
21
 *  See the License for the specific provisions governing your rights and
 
22
 *  obligations concerning the Software.
 
23
 * 
 
24
 *   The Initial Developer of the Original Code is: Sun Microsystems, Inc.
 
25
 * 
 
26
 *   Copyright: 2001 by Sun Microsystems, Inc.
 
27
 * 
 
28
 *   All Rights Reserved.
 
29
 * 
 
30
 ************************************************************************/
 
31
/*___INFO__MARK_END__*/
 
32
 
 
33
#include <pthread.h>
 
34
#include <stdio.h>
 
35
#include <stdlib.h>
 
36
#include <string.h>
 
37
#include <errno.h>
 
38
 
 
39
#include "sge.h"
 
40
#include "cull/cull.h"
 
41
#include "sgeobj/sge_feature.h"
 
42
#include "uti/sge_time.h"
 
43
#include "sgeobj/sge_host.h"
 
44
#include "sgeobj/sge_event.h"
 
45
#include "sgeobj/sge_all_listsL.h"
 
46
#include "uti/sge_prog.h"
 
47
#include "rmon/sgermon.h"
 
48
#include "uti/sge_log.h"
 
49
#include "sgeobj/sge_conf.h"
 
50
#include "sgeobj/sge_answer.h"
 
51
#include "sgeobj/sge_qinstance.h"
 
52
#include "sgeobj/sge_report.h"
 
53
#include "sgeobj/sge_ckpt.h"
 
54
#include "sgeobj/sge_pe.h"
 
55
#include "sgeobj/sge_userprj.h"
 
56
#include "sgeobj/sge_job.h"
 
57
#include "uti/sge_hostname.h"
 
58
#include "sgeobj/sge_userset.h"
 
59
#include "sgeobj/sge_manop.h"
 
60
#include "sgeobj/sge_calendar.h"
 
61
#include "sgeobj/sge_sharetree.h"
 
62
#include "sgeobj/sge_hgroup.h"
 
63
#include "sgeobj/sge_cuser.h"
 
64
#include "sgeobj/sge_centry.h"
 
65
#include "sgeobj/sge_cqueue.h"
 
66
#include "sgeobj/sge_object.h"
 
67
#include "sgeobj/sge_range.h"
 
68
#include "lck/sge_mtutil.h"
 
69
#include "configuration_qmaster.h"   /* bad dependency!! */
 
70
#include "comm/lists/cl_errors.h"
 
71
#include "comm/cl_commlib.h"
 
72
#include "uti/sge_profiling.h"
 
73
#include "uti/sge_spool.h"
 
74
#include "sgeobj/sge_event_requestL.h"
 
75
 
 
76
#include "lck/sge_lock.h"
 
77
 
 
78
#include "comm/lists/cl_thread.h"
 
79
 
 
80
#include "uti/sge_thread_ctrl.h"
 
81
 
 
82
#include "gdi/sge_gdi_ctx.h"
 
83
 
 
84
#include "msg_common.h"
 
85
#include "msg_sgeobjlib.h"
 
86
#include "msg_evmlib.h"
 
87
 
 
88
#include "sge_event_master.h"
 
89
 
 
90
/*
 
91
 ***** transaction handling implementation ************
 
92
 *
 
93
 * Well, one cannot really call the transaction implementation
 
94
 * transaction handling. First of all, there is no role
 
95
 * back. Second, there is only one transaction in the 
 
96
 * whole system, one no way to distinguish between the 
 
97
 * events added by the thread, which opend the transaction, 
 
98
 * and other threads just adding events.
 
99
 *
 
100
 * We need the current implementation for the scheduler
 
101
 * protocol. All gdi requets have to be atomic from the
 
102
 * scheduler point of view. Therefore we have a second
 
103
 * (the transaction) list to add events, and put them
 
104
 * into the send queue, when the gdi request has been
 
105
 * handled.
 
106
 * 
 
107
 * Important variables:
 
108
 *  pthread_mutex_t  transaction_mutex;  
 
109
 *  lList            *transaction_events;
 
110
 *  pthread_mutex_t  t_add_event_mutex;
 
111
 *  bool             is_transaction;   
 
112
 *
 
113
 * related methods:
 
114
 *  sge_set_commit_required()
 
115
 *  sge_commit() 
 
116
 *
 
117
 ******************************************************
 
118
 */
 
119
 
 
120
 
 
121
/*
 
122
 ***** subscription_t definition **********************
 
123
 *
 
124
 * This is a subscription entry in a two dimensional 
 
125
 * definition. The first dimension is for the event
 
126
 * clients and changes when the event client subscription
 
127
 * changes. The second dimension is of fixed size and 
 
128
 * contains one element for each posible event.
 
129
 *
 
130
 ******************************************************
 
131
 */
 
132
typedef struct {
 
133
      bool         subscription; /* true -> the event is subscribed           */
 
134
      bool         blocked;      /* true -> no events will be accepted before */
 
135
                                 /*   the total update is issued                */
 
136
      bool         flush;        /* true -> flush is set                      */
 
137
      u_long32     flush_time;   /* seconds how much the event can be delayed */
 
138
      lCondition   *where;       /* where filter                              */
 
139
      lDescr       *descr;       /* target list descriptor                    */
 
140
      lEnumeration *what;        /* limits the target element                 */
 
141
} subscription_t;
 
142
 
 
143
/****** Eventclient/Server/-Event_Client_Server_Defines ************************
 
144
*  NAME
 
145
*     Defines -- Constants used in the module
 
146
*
 
147
*  SYNOPSIS
 
148
*     #define EVENT_DELIVERY_INTERVAL_S 1 
 
149
*     #define EVENT_DELIVERY_INTERVAL_N 0 
 
150
*     #define EVENT_ACK_MIN_TIMEOUT 600
 
151
*     #define EVENT_ACK_MAX_TIMEOUT 1200
 
152
*
 
153
*  FUNCTION
 
154
*     EVENT_DELIVERY_INTERVAL_S is the event delivery interval. It is set in seconds.
 
155
*     EVENT_DELIVERY_INTERVAL_N same thing but in nano seconds.
 
156
*
 
157
*     EVENT_ACK_MIN/MAX_TIMEOUT is the minimum/maximum timeout value for an event
 
158
*     client sending the acknowledge for the delivery of events.
 
159
*     The real timeout value depends on the event delivery interval for the 
 
160
*     event client (10 * event delivery interval).
 
161
*******************************************************************************/
 
162
#define EVENT_DELIVERY_INTERVAL_S 1
 
163
#define EVENT_DELIVERY_INTERVAL_N 0
 
164
#define EVENT_ACK_MIN_TIMEOUT 600
 
165
#define EVENT_ACK_MAX_TIMEOUT 1200
 
166
 
 
167
/*
 
168
 *******************************************************************
 
169
 *
 
170
 * The next three array are important for lists, which can be
 
171
 * subscribed by the event client and which contain a sub-list
 
172
 * that can be subscribed by itself again (such as the: job list
 
173
 * with the JAT_Task sub list, or the cluster queue list with the
 
174
 * queue instances sub-list)
 
175
 * All lists, which follow the same structure have to be defined
 
176
 * in the special construct.
 
177
 *
 
178
 * EVENT_LIST:
 
179
 *  Contains all events for the main list, which delivers also
 
180
 *  the sub-list. 
 
181
 *
 
182
 * FIELD_LIST:
 
183
 *  Contains all attributes in the main list, which contain the
 
184
 *  sub-list in question.
 
185
 *
 
186
 * SOURCE_LIST:
 
187
 *  Contains the sub-scription events for the sub-list, which als
 
188
 *  contains the filter for the sub-list.
 
189
 *
 
190
 *
 
191
 * This construct and its functions are limited to one sub-scribable
 
192
 * sub-list per main list. If multiple sub-lists can be subsribed, the
 
193
 * construct has to be exetended.
 
194
 *
 
195
 * ISSUES:
 
196
 *  1416: the sgeE_PETASK_* list events are not handled correctly during
 
197
 *        total update. It works fine with all other events. The problem
 
198
 *        is, that an update of the jobs filters for job and ja_task, but
 
199
 *        not for PE tasks.
 
200
 *
 
201
 * SEE ALSO:
 
202
 *     evm/sge_event_master/list_select()
 
203
 *     evm/sge_event_master/elem_select() 
 
204
 *  and
 
205
 *     evm/sge_event_master/add_list_event
 
206
 *     evm/sge_event_master/add_event
 
207
 *
 
208
 ********************************************************************
 
209
 */
 
210
#define LIST_MAX 3
 
211
 
 
212
const int EVENT_LIST[LIST_MAX][6] = {
 
213
   {sgeE_JOB_LIST, sgeE_JOB_ADD, sgeE_JOB_DEL, sgeE_JOB_MOD, sgeE_JOB_MOD_SCHED_PRIORITY, -1},
 
214
   {sgeE_CQUEUE_LIST, sgeE_CQUEUE_ADD, sgeE_CQUEUE_DEL, sgeE_CQUEUE_MOD, -1, -1},
 
215
   {sgeE_JATASK_ADD, sgeE_JATASK_DEL, sgeE_JATASK_MOD, -1, -1, -1 }
 
216
};
 
217
 
 
218
const int FIELD_LIST[LIST_MAX][3] = {
 
219
   {JB_ja_tasks, JB_ja_template, -1},
 
220
   {CQ_qinstances, -1, -1},
 
221
   {JAT_task_list, -1, -1}
 
222
};
 
223
 
 
224
const int SOURCE_LIST[LIST_MAX][3] = {
 
225
   {sgeE_JATASK_MOD, sgeE_JATASK_ADD, -1},
 
226
   {sgeE_QINSTANCE_ADD, sgeE_QINSTANCE_MOD, -1},
 
227
   {sgeE_PETASK_ADD, -1, -1}
 
228
};
 
229
 
 
230
/*
 
231
 *****************************************************
 
232
 *
 
233
 * The next two array are needed for blocking events
 
234
 * for a given client, when a total update is pending
 
235
 * for it.
 
236
 *
 
237
 * The total_update_events array defines all list events,
 
238
 * which are used during a total update.
 
239
 *
 
240
 * The block_events contain all events, which are blocked
 
241
 * during a total update.
 
242
 *
 
243
 *  SEE ALSO:
 
244
 *   blockEvents
 
245
 *   total_update 
 
246
 *   add_list_event_direct
 
247
 *****************************************************
 
248
 */
 
249
 
 
250
#ifndef __SGE_NO_USERMAPPING__
 
251
#define total_update_eventsMAX 22
 
252
#else
 
253
#define total_update_eventsMAX 21
 
254
#endif
 
255
 
 
256
const int total_update_events[total_update_eventsMAX + 1] = {sgeE_ADMINHOST_LIST,
 
257
                                       sgeE_CALENDAR_LIST,
 
258
                                       sgeE_CKPT_LIST,
 
259
                                       sgeE_CENTRY_LIST,
 
260
                                       sgeE_CONFIG_LIST,
 
261
                                       sgeE_EXECHOST_LIST,
 
262
                                       sgeE_JOB_LIST,
 
263
                                       sgeE_JOB_SCHEDD_INFO_LIST,
 
264
                                       sgeE_MANAGER_LIST,
 
265
                                       sgeE_OPERATOR_LIST,
 
266
                                       sgeE_PE_LIST,
 
267
                                       sgeE_CQUEUE_LIST,
 
268
                                       sgeE_SCHED_CONF,
 
269
                                       sgeE_SUBMITHOST_LIST,
 
270
                                       sgeE_USERSET_LIST,
 
271
                                       sgeE_NEW_SHARETREE,
 
272
                                       sgeE_PROJECT_LIST,
 
273
                                       sgeE_USER_LIST,
 
274
                                       sgeE_HGROUP_LIST,
 
275
                                       sgeE_RQS_LIST,
 
276
                                       sgeE_AR_LIST,
 
277
#ifndef __SGE_NO_USERMAPPING__
 
278
                                       sgeE_CUSER_LIST,
 
279
#endif
 
280
                                       -1};
 
281
 
 
282
const int block_events[total_update_eventsMAX][9] = {
 
283
   {sgeE_ADMINHOST_ADD, sgeE_ADMINHOST_DEL, sgeE_ADMINHOST_MOD, -1, -1, -1, -1, -1, -1}, 
 
284
   {sgeE_CALENDAR_ADD,  sgeE_CALENDAR_DEL,  sgeE_CALENDAR_MOD,  -1, -1, -1, -1, -1, -1},
 
285
   {sgeE_CKPT_ADD,      sgeE_CKPT_DEL,      sgeE_CKPT_MOD,      -1, -1, -1, -1, -1, -1},
 
286
   {sgeE_CENTRY_ADD,    sgeE_CENTRY_DEL,    sgeE_CENTRY_MOD,    -1, -1, -1, -1, -1, -1}, 
 
287
   {sgeE_CONFIG_ADD,    sgeE_CONFIG_DEL,    sgeE_CONFIG_MOD,    -1, -1, -1, -1, -1, -1}, 
 
288
   {sgeE_EXECHOST_ADD,  sgeE_EXECHOST_DEL,  sgeE_EXECHOST_MOD,  -1, -1, -1, -1, -1, -1},
 
289
   {sgeE_JOB_ADD, sgeE_JOB_DEL, sgeE_JOB_MOD, sgeE_JOB_MOD_SCHED_PRIORITY, sgeE_JOB_USAGE, sgeE_JOB_FINAL_USAGE, sgeE_JOB_FINISH, -1, -1}, 
 
290
   {sgeE_JOB_SCHEDD_INFO_ADD, sgeE_JOB_SCHEDD_INFO_DEL, sgeE_JOB_SCHEDD_INFO_MOD, -1, -1, -1, -1, -1, -1},
 
291
   {sgeE_MANAGER_ADD,         sgeE_MANAGER_DEL,         sgeE_MANAGER_MOD,         -1, -1, -1, -1, -1, -1}, 
 
292
   {sgeE_OPERATOR_ADD,        sgeE_OPERATOR_DEL,        sgeE_OPERATOR_MOD,        -1, -1, -1, -1, -1, -1},
 
293
   {sgeE_PE_ADD,              sgeE_PE_DEL,              sgeE_PE_MOD,              -1, -1, -1, -1, -1, -1},
 
294
   {sgeE_CQUEUE_ADD,          sgeE_CQUEUE_DEL,          sgeE_CQUEUE_MOD, sgeE_QINSTANCE_ADD, sgeE_QINSTANCE_DEL, sgeE_QINSTANCE_MOD, sgeE_QINSTANCE_SOS, sgeE_QINSTANCE_USOS, -1}, 
 
295
   {-1, -1, -1, -1, -1, -1, -1, -1},
 
296
   {sgeE_SUBMITHOST_ADD,      sgeE_SUBMITHOST_DEL,      sgeE_SUBMITHOST_MOD,      -1, -1, -1, -1, -1, -1},
 
297
   {sgeE_USERSET_ADD,         sgeE_USERSET_DEL,         sgeE_USERSET_MOD,         -1, -1, -1, -1, -1, -1},
 
298
   {-1, -1, -1, -1, -1, -1, -1, -1}, 
 
299
   {sgeE_PROJECT_ADD,         sgeE_PROJECT_DEL,         sgeE_PROJECT_MOD,         -1, -1, -1, -1, -1, -1},
 
300
   {sgeE_USER_ADD,            sgeE_USER_DEL,            sgeE_USER_MOD,            -1, -1, -1, -1, -1, -1},
 
301
   {sgeE_HGROUP_ADD,          sgeE_HGROUP_DEL,          sgeE_HGROUP_MOD,          -1, -1, -1, -1, -1, -1},
 
302
   {sgeE_RQS_ADD,             sgeE_RQS_DEL,             sgeE_RQS_MOD,             -1, -1, -1, -1, -1, -1},
 
303
   {sgeE_AR_ADD,              sgeE_AR_DEL,              sgeE_AR_MOD,              -1, -1, -1, -1, -1, -1}
 
304
#ifndef __SGE_NO_USERMAPPING__
 
305
   ,{sgeE_CUSER_ADD,           sgeE_CUSER_DEL,           sgeE_CUSER_MOD,           -1, -1, -1, -1, -1, -1}
 
306
#endif
 
307
};
 
308
 
 
309
 
 
310
/*
 
311
 *******************************************************************
 
312
 *
 
313
 * Some events have to be delivered even so they have no data left
 
314
 * after filtering for them. These are for example all update list
 
315
 * events. 
 
316
 * The ensure, that is is done as fast as posible, we define an 
 
317
 * array of the size of the number of events, we have a init function
 
318
 * which sets the events which will be updated. To add a new event
 
319
 * one has only to update that function.
 
320
 * Events which do not contain any data are not affected. They are
 
321
 * always delivered.
 
322
 *
 
323
 * SEE ALSO:
 
324
 * - Array:
 
325
 *    SEND_EVENTS
 
326
 *
 
327
 * - Init function:
 
328
 *    evm/sge_event_master/sge_init_send_events()
 
329
 *
 
330
 ******************************************************************
 
331
 */
 
332
static bool SEND_EVENTS[sgeE_EVENTSIZE]; 
 
333
 
 
334
event_master_control_t Event_Master_Control = {
 
335
   PTHREAD_MUTEX_INITIALIZER,       /* mutex */
 
336
   PTHREAD_COND_INITIALIZER,        /* cond_var */
 
337
   PTHREAD_MUTEX_INITIALIZER,       /* cond_mutex */
 
338
   false,                           /* delivery_signaled */
 
339
   0,                               /* max_event_clients */
 
340
   false,                           /* is_prepare_shutdown */
 
341
   NULL,                            /* clients */
 
342
   NULL,                            /* client_ids */
 
343
   NULL,                            /* requests */
 
344
   PTHREAD_MUTEX_INITIALIZER,       /* request_mutex */
 
345
   0                                /* transaction_key */
 
346
};
 
347
 
 
348
static void       init_send_events(void); 
 
349
static void       flush_events(lListElem*, int);
 
350
static void       total_update(lListElem*, monitoring_t *monitor);
 
351
static void       build_subscription(lListElem*);
 
352
static void       remove_event_client(lListElem **client, int event_client_id, bool lock_event_master);
 
353
static void       check_send_new_subscribed_list(const subscription_t*, 
 
354
                                                 const subscription_t*, lListElem*,
 
355
                                                 ev_event event, object_description *master_table);
 
356
static int        eventclient_subscribed(const lListElem *, ev_event, const char*);
 
357
static int        purge_event_list(lList* aList, u_long32 event_number); 
 
358
static bool       add_list_event_for_client(u_long32, u_long32, ev_event, u_long32, u_long32, const char*, const char*, const char*, lList*, bool);
 
359
static void       add_list_event_direct(lListElem *event_client,
 
360
                                        lListElem *event, bool copy_event);
 
361
static void       total_update_event(lListElem*, ev_event, object_description *master_table, bool new_subscription);
 
362
static bool       list_select(subscription_t*, int, lList**, lList*, const lCondition*, const lEnumeration*, const lDescr*, bool);
 
363
static lListElem* elem_select(subscription_t*, lListElem*, const int[], const lCondition*, const lEnumeration*, const lDescr*, int);    
 
364
static lListElem* eventclient_list_locate_by_adress(const char*, const char*, u_long32);
 
365
static const lDescr* getDescriptorL(subscription_t*, const lList*, int);
 
366
static lListElem* get_event_client(u_long32 id);
 
367
static u_long32   allocate_new_dynamic_id(lList **answer_list);
 
368
static void       free_dynamic_id(lList **answer_list, u_long32 id);
 
369
static void       set_flush(void);
 
370
 
 
371
static void blockEvents(lListElem *event_client, ev_event ev_type, bool isBlock);
 
372
 
 
373
static void
 
374
sge_event_master_destroy_transaction_store(void *transaction_store)
 
375
{
 
376
   event_master_transaction_t *t_store = (event_master_transaction_t *)transaction_store;
 
377
   lFreeList(&(t_store->transaction_requests));
 
378
}
 
379
 
 
380
static void
 
381
sge_event_master_init_transaction_store(event_master_transaction_t *t_store)
 
382
{
 
383
   t_store->is_transaction = false;
 
384
   t_store->transaction_requests = lCreateListHash("Event Master Requests", EVR_Type, false);
 
385
}
 
386
 
 
387
/****** Eventclient/Server/sge_add_event_client() ******************************
 
388
*  NAME
 
389
*     sge_add_event_client() -- register a new event client
 
390
*
 
391
*  SYNOPSIS
 
392
*     #include "sge_event_master.h"
 
393
*
 
394
*     int 
 
395
*     sge_add_event_client(lListElem *clio, lList **alpp, lList **eclpp, 
 
396
*     char *ruser, char *rhost) 
 
397
*
 
398
*  FUNCTION
 
399
*     Registeres a new event client. 
 
400
*     If it requested a dynamic id, a new id is created and assigned.
 
401
*     If it is a special client(with fixed id) and an event client
 
402
*     with this id already exists, the old instance is deleted and the
 
403
*     new one registered.
 
404
*     If the registration succeeds, the event client is sent all data
 
405
*     (sgeE*_LIST events) according to its subscription.
 
406
*
 
407
*  INPUTS
 
408
*     lListElem *clio - the event client object used as registration data
 
409
*     lList **alpp    - answer list pointer for answer to event client
 
410
*     lList **eclpp   - list pointer to return new event client object
 
411
*     char *ruser     - user that tries to register an event client
 
412
*     char *rhost     - host on which the event client runs
 
413
*     event_client_update_func_t update_func - for internal event clients
 
414
*     monitoring_t monitor - monitoring handle
 
415
*
 
416
*  RESULT
 
417
*     int - AN_status value. STATUS_OK on success, else error code
 
418
*
 
419
*  NOTES
 
420
*     MT-NOTE: sge_add_event_client() is MT safe, it uses the global lock and
 
421
*              internal ones.
 
422
*
 
423
*******************************************************************************/
 
424
static void sge_event_master_process_add_event_client(lListElem *request, monitoring_t *monitor)
 
425
{
 
426
   /* to be implemented later on - handling the internal event clients could become a little bit tricky */
 
427
}
 
428
 
 
429
int sge_add_event_client(lListElem *clio, lList **alpp, lList **eclpp, char *ruser, 
 
430
                         char *rhost, event_client_update_func_t update_func, monitoring_t *monitor)
 
431
{
 
432
   lListElem *ep = NULL;
 
433
   u_long32 now;
 
434
   u_long32 id;
 
435
   u_long32 ed_time;
 
436
   const char *name;
 
437
   const char *host;
 
438
   const char *commproc;
 
439
   u_long32 commproc_id;
 
440
 
 
441
   DENTER(TOP_LAYER, "sge_add_event_client");
 
442
 
 
443
   id = lGetUlong(clio, EV_id);
 
444
   name = lGetString(clio, EV_name);
 
445
   ed_time = lGetUlong(clio, EV_d_time);
 
446
   host = lGetHost(clio, EV_host);
 
447
   commproc = lGetString(clio, EV_commproc);
 
448
   commproc_id = lGetUlong(clio, EV_commid);
 
449
 
 
450
   /* an event client must have a name */
 
451
   if (name == NULL) {
 
452
      name = "unnamed";
 
453
      lSetString(clio, EV_name, name);
 
454
   }
 
455
 
 
456
   /* check event client object structure */
 
457
   if (lCompListDescr(lGetElemDescr(clio), EV_Type) != 0) {
 
458
      ERROR((SGE_EVENT, MSG_EVE_INCOMPLETEEVENTCLIENT));
 
459
      answer_list_add(alpp, SGE_EVENT, STATUS_ESEMANTIC, ANSWER_QUALITY_ERROR);
 
460
      DRETURN(STATUS_DENIED);
 
461
   }
 
462
 
 
463
   /* EV_ID_ANY is 0, therefore the compare is always true (Irix complained) */
 
464
   if (id >= EV_ID_FIRST_DYNAMIC) { /* invalid request */
 
465
      ERROR((SGE_EVENT, MSG_EVE_ILLEGALIDREGISTERED_U, sge_u32c(id)));
 
466
      answer_list_add(alpp, SGE_EVENT, STATUS_ESEMANTIC, ANSWER_QUALITY_ERROR);
 
467
      DRETURN(STATUS_ESEMANTIC);
 
468
   }
 
469
 
 
470
   if (lGetBool(clio, EV_changed) && lGetList(clio, EV_subscribed) == NULL) {
 
471
      ERROR((SGE_EVENT, MSG_EVE_INVALIDSUBSCRIPTION));
 
472
      answer_list_add(alpp, SGE_EVENT, STATUS_ESEMANTIC, ANSWER_QUALITY_ERROR);
 
473
      DRETURN(STATUS_ESEMANTIC);
 
474
   }
 
475
 
 
476
   /* Acquire the event master mutex - we access the event client list */
 
477
   sge_mutex_lock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
 
478
 
 
479
   if (Event_Master_Control.is_prepare_shutdown) {
 
480
      sge_mutex_unlock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
 
481
      ERROR((SGE_EVENT, MSG_EVE_QMASTERISGOINGDOWN));
 
482
      answer_list_add(alpp, SGE_EVENT, STATUS_ESEMANTIC, ANSWER_QUALITY_ERROR);      
 
483
      DRETURN(STATUS_ESEMANTIC);
 
484
   }
 
485
 
 
486
   if (id == EV_ID_ANY) {   /* qmaster shall give id dynamically */
 
487
      /* Try to find an event client with the very same commd
 
488
         address triplet. If it exists the "new" event client must
 
489
         be the result of a reconnect after a timeout that happend at
 
490
         client side. We delete the old event client. */
 
491
      ep = eventclient_list_locate_by_adress(host, commproc, commproc_id);
 
492
 
 
493
      if (ep != NULL) {
 
494
         ERROR((SGE_EVENT, MSG_EVE_CLIENTREREGISTERED_SSSU, name, host, 
 
495
                commproc, sge_u32c(commproc_id)));
 
496
 
 
497
         /* delete old event client entry, and we already hold the lock! */
 
498
         remove_event_client(&ep, id, false);
 
499
      }
 
500
 
 
501
      /* Otherwise, get a new dynamic event client id */
 
502
      id = allocate_new_dynamic_id(alpp);
 
503
 
 
504
      if (id == 0) {
 
505
         sge_mutex_unlock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
 
506
         DRETURN(STATUS_ESEMANTIC);
 
507
      }
 
508
 
 
509
      INFO((SGE_EVENT, MSG_EVE_REG_SUU, name, sge_u32c(id), sge_u32c(ed_time)));
 
510
 
 
511
      /* Set the new id for this client. */
 
512
      lSetUlong(clio, EV_id, id);
 
513
   }
 
514
 
 
515
   /* special event clients: we allow only one instance */
 
516
   /* if it already exists, delete the old one and register the new one */
 
517
   if (id > EV_ID_ANY && id < EV_ID_FIRST_DYNAMIC) {
 
518
      /*
 
519
      ** we allow addition of a priviledged event client
 
520
      ** for internal clients (==> update_func != NULL)
 
521
      ** and manager/operator
 
522
      */
 
523
      if (update_func == NULL && !manop_is_manager(ruser)) {
 
524
         sge_mutex_unlock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
 
525
         ERROR((SGE_EVENT, MSG_WRONG_USER_FORFIXEDID ));
 
526
         answer_list_add(alpp, SGE_EVENT, STATUS_ESEMANTIC, ANSWER_QUALITY_ERROR);
 
527
         DRETURN(STATUS_ESEMANTIC);
 
528
      }
 
529
 
 
530
      if ((ep = get_event_client(id)) != NULL) {
 
531
         /* we already have this special client */
 
532
         ERROR((SGE_EVENT, MSG_EVE_CLIENTREREGISTERED_SSSU, name, host,
 
533
                commproc, sge_u32c(commproc_id)));
 
534
 
 
535
         /* delete old event client entry, and we already have the mutex! */
 
536
         remove_event_client(&ep, id, false);
 
537
      } else {
 
538
         INFO((SGE_EVENT, MSG_EVE_REG_SUU, name, sge_u32c(id), sge_u32c(ed_time)));
 
539
      }
 
540
   }
 
541
 
 
542
   ep = lCopyElem(clio);
 
543
   lSetRef(ep, EV_update_function, (void*) update_func);
 
544
   lSetBool(clio, EV_changed, false);
 
545
 
 
546
   lAppendElem(Event_Master_Control.clients, ep);
 
547
 
 
548
   lSetUlong(ep, EV_next_number, 1);
 
549
 
 
550
   /* register this contact */
 
551
   now = sge_get_gmt();
 
552
   lSetUlong(ep, EV_last_send_time, 0);
 
553
   lSetUlong(ep, EV_next_send_time, now + lGetUlong(ep, EV_d_time));
 
554
   lSetUlong(ep, EV_last_heard_from, now);
 
555
   lSetUlong(ep, EV_state, EV_connected);
 
556
 
 
557
   /* return new event client object to internal event client */
 
558
   if (eclpp != NULL) {
 
559
      lListElem *ret_el = lCopyElem(ep);
 
560
      if (*eclpp == NULL) {
 
561
         *eclpp = lCreateListHash("new event client", EV_Type, true);
 
562
      }
 
563
      lSetBool(ret_el, EV_changed, false);
 
564
      lAppendElem(*eclpp, ret_el);
 
565
   }
 
566
 
 
567
   /* Start with no pending events. */
 
568
   build_subscription(ep);
 
569
 
 
570
   /* build events for total update */
 
571
   total_update(ep, monitor);
 
572
 
 
573
   /* flush initial list events */
 
574
   flush_events(ep, 0);
 
575
 
 
576
   sge_mutex_unlock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
 
577
 
 
578
   INFO((SGE_EVENT, MSG_SGETEXT_ADDEDTOLIST_SSSS,
 
579
         ruser, rhost, name, MSG_EVE_EVENTCLIENT));
 
580
   answer_list_add(alpp, SGE_EVENT, STATUS_OK, ANSWER_QUALITY_INFO);
 
581
 
 
582
   DRETURN(STATUS_OK);
 
583
} /* sge_event_master_process_add_event_client() */
 
584
 
 
585
/****** Eventclient/Server/sge_mod_event_client() ******************************
 
586
*  NAME
 
587
*     sge_mod_event_client() -- modify event client
 
588
*
 
589
*  SYNOPSIS
 
590
*     #include "sge_event_master.h"
 
591
*
 
592
*     int 
 
593
*     sge_mod_event_client(lListElem *clio, lList **alpp, lList **eclpp, 
 
594
*     char *ruser, char *rhost) 
 
595
*
 
596
*  FUNCTION
 
597
*     An event client object is modified.
 
598
*     It is possible to modify the event delivery time and
 
599
*     the subscription.
 
600
*     If the subscription is changed, and new sgeE*_LIST events are subscribed,
 
601
*     these lists are sent to the event client.
 
602
*
 
603
*  INPUTS
 
604
*     lListElem *clio - object containing the data to change
 
605
*     lList **alpp    - answer list pointer
 
606
*     char *ruser     - user that triggered the modify action
 
607
*     char *rhost     - host that triggered the modify action
 
608
*
 
609
*  RESULT
 
610
*     int - AN_status code. STATUS_OK on success, else error code
 
611
*
 
612
*  NOTES
 
613
*     MT-NOTE: sge_mod_event_client() is MT safe, uses internal locks
 
614
*
 
615
*  SEE ALSO
 
616
*     evm_mod_func_t
 
617
*
 
618
*******************************************************************************/
 
619
int
 
620
sge_mod_event_client(lListElem *clio, lList **alpp, char *ruser, char *rhost)
 
621
{
 
622
   lListElem *evr = NULL;
 
623
 
 
624
   DENTER(TOP_LAYER,"sge_mod_event_client");
 
625
 
 
626
   if (clio == NULL) {
 
627
      ERROR((SGE_EVENT, "NULL element passed to sge_mod_event_client"));
 
628
      abort();
 
629
      DRETURN(STATUS_ESEMANTIC);
 
630
   }
 
631
 
 
632
   evr = lCreateElem(EVR_Type);
 
633
   lSetUlong(evr, EVR_operation, EVR_MOD_EVC);
 
634
   lSetUlong(evr, EVR_timestamp, sge_get_gmt());
 
635
   lSetObject(evr, EVR_event_client, lCopyElem(clio));
 
636
 
 
637
   sge_mutex_lock("event_master_request_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.request_mutex);
 
638
   lAppendElem(Event_Master_Control.requests, evr);
 
639
   sge_mutex_unlock("event_master_request_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.request_mutex);
 
640
 
 
641
   DEBUG((SGE_EVENT, MSG_SGETEXT_MODIFIEDINLIST_SSSS,
 
642
         ruser, rhost, lGetString(clio, EV_name), MSG_EVE_EVENTCLIENT));
 
643
   answer_list_add(alpp, SGE_EVENT, STATUS_OK, ANSWER_QUALITY_INFO);
 
644
 
 
645
   set_flush();
 
646
 
 
647
   DRETURN(STATUS_OK);
 
648
}
 
649
 
 
650
/****** Eventclient/Server/sge_event_master_process_mod_event_client() ********
 
651
*  NAME
 
652
*     sge_mod_event_client() -- modify event client
 
653
*
 
654
*  SYNOPSIS
 
655
*     #include "sge_event_master.h"
 
656
*
 
657
*     int 
 
658
*     sge_event_master_process_mod_event_client(lListElem *clio, lList **alpp, 
 
659
*                                               lList **eclpp, char *ruser, 
 
660
*                                               char *rhost) 
 
661
*
 
662
*  FUNCTION
 
663
*     An event client object is modified.
 
664
*     It is possible to modify the event delivery time and
 
665
*     the subscription.
 
666
*     If the subscription is changed, and new sgeE*_LIST events are subscribed,
 
667
*     these lists are sent to the event client.
 
668
*
 
669
*  INPUTS
 
670
*     lListElem *clio - object containing the data to change
 
671
*     lList **alpp    - answer list pointer
 
672
*     char *ruser     - user that triggered the modify action
 
673
*     char *rhost     - host that triggered the modify action
 
674
*
 
675
*  RESULT
 
676
*     int - AN_status code. STATUS_OK on success, else error code
 
677
*
 
678
*  NOTES
 
679
*     MT-NOTE: sge_mod_event_client() is NOT MT safe.
 
680
*
 
681
*******************************************************************************/
 
682
void
 
683
sge_event_master_process_mod_event_client(lListElem *request, monitoring_t *monitor)
 
684
{
 
685
   lListElem *event_client = NULL;
 
686
   u_long32 id;
 
687
   u_long32 busy;
 
688
   u_long32 busy_handling;
 
689
   u_long32 ev_d_time;
 
690
   lListElem *clio = NULL;
 
691
   cl_thread_settings_t *thread_config = NULL;
 
692
 
 
693
   DENTER(TOP_LAYER, "sge_event_master_process_mod_event_client");
 
694
 
 
695
   clio = lGetObject(request, EVR_event_client);
 
696
 
 
697
   /* try to find event_client */
 
698
   id = lGetUlong(clio, EV_id);
 
699
 
 
700
   sge_mutex_lock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
 
701
   event_client = get_event_client(id);
 
702
 
 
703
   if (event_client == NULL) {
 
704
      sge_mutex_unlock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
 
705
      ERROR((SGE_EVENT, MSG_EVE_UNKNOWNEVCLIENT_US, sge_u32c(id), "modify"));
 
706
      DRETURN_VOID;
 
707
   }
 
708
 
 
709
   /* these parameters can be changed */
 
710
   busy = lGetUlong(clio, EV_busy);
 
711
   ev_d_time = lGetUlong(clio, EV_d_time);
 
712
   busy_handling = lGetUlong(clio, EV_busy_handling);
 
713
 
 
714
   /* check for validity */
 
715
   if (ev_d_time < 1) {
 
716
      sge_mutex_unlock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
 
717
      ERROR((SGE_EVENT, MSG_EVE_INVALIDINTERVAL_U, sge_u32c(ev_d_time)));
 
718
      DRETURN_VOID;
 
719
   }
 
720
 
 
721
   if (lGetBool(clio, EV_changed) && lGetList(clio, EV_subscribed) == NULL) {
 
722
      sge_mutex_unlock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
 
723
      ERROR((SGE_EVENT, MSG_EVE_INVALIDSUBSCRIPTION));
 
724
      DRETURN_VOID;
 
725
   }
 
726
 
 
727
   /* event delivery interval changed.
 
728
    * We have to update the next delivery time to
 
729
    * next_delivery_time - old_interval + new_interval
 
730
    */
 
731
   if (ev_d_time != lGetUlong(event_client, EV_d_time)) {
 
732
      lSetUlong(event_client, EV_next_send_time,
 
733
                lGetUlong(event_client, EV_next_send_time) -
 
734
                lGetUlong(event_client, EV_d_time) + ev_d_time);
 
735
      lSetUlong(event_client, EV_d_time, ev_d_time);
 
736
   }
 
737
 
 
738
   /* subscription changed */
 
739
   if (lGetBool(clio, EV_changed)) {
 
740
      subscription_t *new_sub = NULL;
 
741
      subscription_t *old_sub = NULL;
 
742
      object_description *master_table = object_type_get_object_description();
 
743
      build_subscription(clio);
 
744
      new_sub = lGetRef(clio, EV_sub_array);
 
745
      old_sub = lGetRef(event_client, EV_sub_array);
 
746
 
 
747
      MONITOR_WAIT_TIME(SGE_LOCK(LOCK_GLOBAL, LOCK_READ), monitor);
 
748
 
 
749
      check_send_new_subscribed_list(old_sub, new_sub, event_client, sgeE_ADMINHOST_LIST, master_table);
 
750
      check_send_new_subscribed_list(old_sub, new_sub, event_client, sgeE_CALENDAR_LIST, master_table);
 
751
      check_send_new_subscribed_list(old_sub, new_sub, event_client, sgeE_CKPT_LIST, master_table);
 
752
      check_send_new_subscribed_list(old_sub, new_sub, event_client, sgeE_CENTRY_LIST, master_table);
 
753
      check_send_new_subscribed_list(old_sub, new_sub, event_client, sgeE_CONFIG_LIST, master_table);
 
754
      check_send_new_subscribed_list(old_sub, new_sub, event_client, sgeE_EXECHOST_LIST, master_table);
 
755
      check_send_new_subscribed_list(old_sub, new_sub, event_client, sgeE_JOB_LIST, master_table);
 
756
      check_send_new_subscribed_list(old_sub, new_sub, event_client, sgeE_JOB_SCHEDD_INFO_LIST, master_table);
 
757
      check_send_new_subscribed_list(old_sub, new_sub, event_client, sgeE_MANAGER_LIST, master_table);
 
758
      check_send_new_subscribed_list(old_sub, new_sub, event_client, sgeE_OPERATOR_LIST, master_table);
 
759
      check_send_new_subscribed_list(old_sub, new_sub, event_client, sgeE_NEW_SHARETREE, master_table);
 
760
      check_send_new_subscribed_list(old_sub, new_sub, event_client, sgeE_PE_LIST, master_table);
 
761
      check_send_new_subscribed_list(old_sub, new_sub, event_client, sgeE_PROJECT_LIST, master_table);
 
762
      check_send_new_subscribed_list(old_sub, new_sub, event_client, sgeE_CQUEUE_LIST, master_table);
 
763
      check_send_new_subscribed_list(old_sub, new_sub, event_client, sgeE_SCHED_CONF, master_table);
 
764
      check_send_new_subscribed_list(old_sub, new_sub, event_client, sgeE_SUBMITHOST_LIST, master_table);
 
765
      check_send_new_subscribed_list(old_sub, new_sub, event_client, sgeE_USER_LIST, master_table);
 
766
      check_send_new_subscribed_list(old_sub, new_sub, event_client, sgeE_USERSET_LIST, master_table);
 
767
      check_send_new_subscribed_list(old_sub, new_sub, event_client, sgeE_HGROUP_LIST, master_table);
 
768
      check_send_new_subscribed_list(old_sub, new_sub, event_client, sgeE_RQS_LIST, master_table);
 
769
      check_send_new_subscribed_list(old_sub, new_sub, event_client, sgeE_AR_LIST, master_table);
 
770
#ifndef __SGE_NO_USERMAPPING__
 
771
      check_send_new_subscribed_list(old_sub, new_sub, event_client, sgeE_CUSER_LIST, master_table);
 
772
#endif      
 
773
 
 
774
      SGE_UNLOCK(LOCK_GLOBAL, LOCK_READ);
 
775
 
 
776
#if 0
 
777
/* JG: TODO: better use lXchgList? */
 
778
      {
 
779
         lList *tmp_list = NULL;
 
780
         lXchgList(clio, EV_subscribed, &tmp_list);
 
781
         lXchgList(event_client, EV_subscribed, &tmp_list);
 
782
         lXchgList(clio, EV_subscribed, &tmp_list);
 
783
      }
 
784
#else
 
785
      lSetList(event_client, EV_subscribed, lCopyList("", lGetList(clio, EV_subscribed)));
 
786
#endif
 
787
      lSetRef(event_client, EV_sub_array, new_sub);
 
788
      lSetRef(clio, EV_sub_array, NULL);
 
789
      if (old_sub) {
 
790
         int i;
 
791
         for (i=0; i<sgeE_EVENTSIZE; i++){
 
792
            lFreeWhere(&(old_sub[i].where));
 
793
            lFreeWhat(&(old_sub[i].what));
 
794
            if (old_sub[i].descr){
 
795
               cull_hash_free_descr(old_sub[i].descr);
 
796
               free(old_sub[i].descr);
 
797
            }
 
798
         } 
 
799
         FREE(old_sub);
 
800
      }
 
801
   }
 
802
 
 
803
   /* busy state changed */
 
804
   if (busy != lGetUlong(event_client, EV_busy)) {
 
805
      lSetUlong(event_client, EV_busy, busy);
 
806
   }
 
807
   /* busy_handling changed */
 
808
   if (busy_handling != lGetUlong(event_client, EV_busy_handling)) {
 
809
      DPRINTF(("EVM: event client %s changes to "sge_U32CFormat"\n", 
 
810
         lGetString(event_client, EV_name), lGetUlong(event_client, EV_busy_handling)));
 
811
      lSetUlong(event_client, EV_busy_handling, busy_handling);
 
812
   }
 
813
 
 
814
   MONITOR_EDT_MOD(monitor);
 
815
 
 
816
   thread_config = cl_thread_get_thread_config();
 
817
   DEBUG((SGE_EVENT, MSG_SGETEXT_MODIFIEDINLIST_SSSS, thread_config ? thread_config->thread_name : "-NA-",
 
818
          "master host", lGetString(event_client, EV_name), MSG_EVE_EVENTCLIENT));
 
819
 
 
820
   sge_mutex_unlock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
 
821
 
 
822
   DRETURN_VOID;
 
823
} /* sge_event_master_process_mod_event_client() */
 
824
 
 
825
/****** evm/sge_event_master/sge_remove_event_client() *************************
 
826
*  NAME
 
827
*     sge_remove_event_client() -- remove event client 
 
828
*
 
829
*  SYNOPSIS
 
830
*     void sge_remove_event_client(u_long32 event_client_id) 
 
831
*
 
832
*  FUNCTION
 
833
*     Remove event client. Fetch event client from event client list.
 
834
*     Only sets status to "terminated",
 
835
*     it will be removed later on in ......................
 
836
*
 
837
*  INPUTS
 
838
*     u_long32 event_client_id - event client id 
 
839
*
 
840
*  RESULT
 
841
*     void - none
 
842
*
 
843
*  NOTES
 
844
*     MT-NOTE: sge_remove_event_client() is MT safe, uses internal locks 
 
845
*
 
846
*  SEE ALSO
 
847
*     evm_remove_func_t
 
848
*
 
849
*******************************************************************************/
 
850
void sge_remove_event_client(u_long32 event_client_id)
 
851
{
 
852
   lListElem *client;
 
853
 
 
854
   DENTER(TOP_LAYER, "sge_remove_event_client");
 
855
 
 
856
   sge_mutex_lock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
 
857
 
 
858
   DPRINTF(("sge_remove_event_client id = %d\n", (int) event_client_id));
 
859
 
 
860
   client = get_event_client(event_client_id);
 
861
 
 
862
   if (client == NULL) {
 
863
      sge_mutex_unlock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
 
864
      ERROR((SGE_EVENT, MSG_EVE_UNKNOWNEVCLIENT_US, sge_u32c(event_client_id), "remove"));
 
865
      DRETURN_VOID;
 
866
   }
 
867
   lSetUlong(client, EV_state, EV_terminated);
 
868
 
 
869
   sge_mutex_unlock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
 
870
   DRETURN_VOID;
 
871
} /* sge_remove_event_client() */
 
872
 
 
873
 
 
874
/****** sge_event_master/sge_set_max_dynamic_event_clients() *******************
 
875
*  NAME
 
876
*     sge_set_max_dynamic_event_clients() -- set max number of dyn. event clients
 
877
*
 
878
*  SYNOPSIS
 
879
*     void sge_set_max_dynamic_event_clients(u_long32 max) 
 
880
*
 
881
*  FUNCTION
 
882
*     Sets max number of dynamic event clients. If the new value is larger than
 
883
*     the maximum number of used file descriptors for communication this value
 
884
*     is set to the max. number of file descriptors minus some reserved file
 
885
*     descriptors. (10 for static event clients, 9 for execd, 10 for file 
 
886
*     descriptors used by application (to write files, etc.) ).
 
887
*
 
888
*     At least one dynamic event client is allowed.
 
889
*
 
890
*  INPUTS
 
891
*     u_long32 max - number of dynamic event clients
 
892
*
 
893
*  NOTES
 
894
*     MT-NOTE: sge_set_max_dynamic_event_clients() is MT safe 
 
895
*
 
896
*******************************************************************************/
 
897
u_long32 sge_set_max_dynamic_event_clients(u_long32 new_value)
 
898
{
 
899
   u_long32 max = new_value;
 
900
 
 
901
   DENTER(TOP_LAYER, "sge_set_max_dynamic_event_clients");
 
902
 
 
903
   sge_mutex_lock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
 
904
 
 
905
   /* Set the max event clients if it changed. */
 
906
   if (max != Event_Master_Control.max_event_clients) {
 
907
      /* check max. file descriptors of qmaster communication handle */
 
908
      cl_com_handle_t *handle = cl_com_get_handle("qmaster", 1);
 
909
      if (handle != NULL) {
 
910
         u_long32 max_allowed_value = 0;
 
911
         unsigned long max_file_handles = 0;
 
912
 
 
913
         cl_com_get_max_connections(handle, &max_file_handles);
 
914
         if (max_file_handles >= EVENT_MASTER_MIN_FREE_DESCRIPTORS) {
 
915
            max_allowed_value = (u_long32)max_file_handles - EVENT_MASTER_MIN_FREE_DESCRIPTORS;
 
916
         } else {
 
917
            max_allowed_value = 1;
 
918
         }
 
919
 
 
920
         if (max > max_allowed_value) {
 
921
            max = max_allowed_value;
 
922
            WARNING((SGE_EVENT, MSG_CONF_NR_DYNAMIC_EVENT_CLIENT_EXCEEDS_MAX_FILEDESCR_U, sge_u32c(max)));
 
923
         }
 
924
      }
 
925
   }
 
926
 
 
927
   /* check max again - it might have changed due to commlib max_file_handles restrictions */
 
928
   if (max != Event_Master_Control.max_event_clients) {
 
929
      lList *answer_list = NULL;
 
930
      lListElem *new_range;
 
931
      lListElem *event_client;
 
932
 
 
933
      /* If the new max is lower than the old max, then lowering the maximum
 
934
       * prevents new event clients, but allows the old ones there to drain off naturally.
 
935
       */
 
936
      Event_Master_Control.max_event_clients = max;
 
937
      INFO((SGE_EVENT, MSG_SET_MAXDYNEVENTCLIENT_U, sge_u32c(max)));
 
938
 
 
939
      /* we have to rebuild the event client id range list */
 
940
      lFreeList(&Event_Master_Control.client_ids);
 
941
      range_list_initialize(&Event_Master_Control.client_ids, &answer_list);
 
942
      new_range = lCreateElem(RN_Type);
 
943
      range_set_all_ids(new_range, EV_ID_FIRST_DYNAMIC, max - 1 + EV_ID_FIRST_DYNAMIC, 1);
 
944
      lAppendElem(Event_Master_Control.client_ids, new_range);
 
945
 
 
946
      /* and we have to remove the ids of our existing event clients from the range */
 
947
      for_each(event_client, Event_Master_Control.clients) {
 
948
         u_long32 event_client_id = lGetUlong(event_client, EV_id);
 
949
         /* only for dynamic event clients */
 
950
         if (event_client_id >= EV_ID_FIRST_DYNAMIC) {
 
951
            /* 
 
952
             * the event clients id might not be in the new range,
 
953
             * if the number of dynamic event clients has been reduced 
 
954
             */
 
955
            if (range_list_is_id_within(Event_Master_Control.client_ids, event_client_id)) {
 
956
               range_list_remove_id(&Event_Master_Control.client_ids, &answer_list, event_client_id);
 
957
            }
 
958
         }
 
959
      }
 
960
 
 
961
      /* compress the range list to reduce fragmentation */
 
962
      range_list_compress(Event_Master_Control.client_ids);
 
963
 
 
964
      /* output any errors that might have occured */
 
965
      answer_list_output(&answer_list);
 
966
 
 
967
   } /* if */
 
968
   sge_mutex_unlock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
 
969
 
 
970
   DRETURN(max);
 
971
}
 
972
 
 
973
/****** sge_event_master/sge_get_max_dynamic_event_clients() *******************
 
974
*  NAME
 
975
*     sge_get_max_dynamic_event_clients() -- get max dynamic event clients nr
 
976
*
 
977
*  SYNOPSIS
 
978
*     u_long32 sge_get_max_dynamic_event_clients(u_long32 max) 
 
979
*
 
980
*  FUNCTION
 
981
*     Returns the actual value of max. dynamic event clients allowed.
 
982
*
 
983
*  RESULT
 
984
*     u_long32 - max value
 
985
*
 
986
*  NOTES
 
987
*     MT-NOTE: sge_get_max_dynamic_event_clients() is MT save
 
988
*
 
989
*******************************************************************************/
 
990
u_long32 sge_get_max_dynamic_event_clients(void)
 
991
{
 
992
   u_long32 actual_value = 0;
 
993
 
 
994
   DENTER(TOP_LAYER, "sge_get_max_dynamic_event_clients");
 
995
 
 
996
   sge_mutex_lock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
 
997
   actual_value = Event_Master_Control.max_event_clients;
 
998
   sge_mutex_unlock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
 
999
 
 
1000
   DRETURN(actual_value);
 
1001
}
 
1002
 
 
1003
/****** Eventclient/Server/sge_has_event_client() ******************************
 
1004
*  NAME
 
1005
*     sge_has_event_client() -- Is a event client registered
 
1006
*
 
1007
*  SYNOPSIS
 
1008
*     #include "sge_event_master.h"
 
1009
*
 
1010
*    bool sge_has_event_client(u_long32 event_client_id)
 
1011
*
 
1012
*  FUNCTION
 
1013
*    Searches if the event client list, if a client
 
1014
*    with this id is available
 
1015
*
 
1016
*  INPUTS
 
1017
*     u_long32 event_client_id  - id of the event client
 
1018
*
 
1019
*  RESULT
 
1020
*     bool - TRUE if client is in the event client list
 
1021
*
 
1022
*  NOTES
 
1023
*     MT-NOTE: sge_has_event_client() is MT safe, it uses the internal locks
 
1024
*
 
1025
*******************************************************************************/
 
1026
bool sge_has_event_client(u_long32 event_client_id) {
 
1027
   bool ret;
 
1028
   
 
1029
   DENTER(TOP_LAYER, "sge_has_event_client");
 
1030
   
 
1031
   sge_mutex_lock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
 
1032
   ret = (get_event_client(event_client_id) != NULL) ? true : false;
 
1033
   sge_mutex_unlock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
 
1034
 
 
1035
   DRETURN(ret);
 
1036
}
 
1037
 
 
1038
/****** evm/sge_event_master/sge_select_event_clients() ************************
 
1039
*  NAME
 
1040
*     sge_select_event_clients() -- select event clients 
 
1041
*
 
1042
*  SYNOPSIS
 
1043
*     lList* sge_select_event_clients(const char *list_name, const lCondition 
 
1044
*     *where, const lEnumeration *what) 
 
1045
*
 
1046
*  FUNCTION
 
1047
*     Select event clients.  
 
1048
*
 
1049
*  INPUTS
 
1050
*     const char *list_name       - name of the result list returned. 
 
1051
*     const lCondition *where    - where condition 
 
1052
*     const lEnumeration *what - what enumeration
 
1053
*
 
1054
*  RESULT
 
1055
*     lList* - list with elements of type 'EV_Type'.
 
1056
*
 
1057
*  NOTES
 
1058
*     MT-NOTE: sge_select_event_clients() is MT safe 
 
1059
*     MT-NOTE:
 
1060
*     MT-NOTE: The elements contained in the result list are copies of the
 
1061
*     MT-NOTE: respective event client list elements.
 
1062
*
 
1063
*******************************************************************************/
 
1064
lList* sge_select_event_clients(const char *list_name, const lCondition *where, const lEnumeration *what)
 
1065
{
 
1066
   lList *lst = NULL;
 
1067
 
 
1068
   DENTER(TOP_LAYER, "sge_select_event_clients");
 
1069
 
 
1070
   sge_mutex_lock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
 
1071
   if (Event_Master_Control.clients != NULL) {
 
1072
      lst = lSelect(list_name, Event_Master_Control.clients, where, what);
 
1073
   }
 
1074
   sge_mutex_unlock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
 
1075
 
 
1076
   DRETURN(lst);
 
1077
} /* sge_select_event_clients() */
 
1078
 
 
1079
/****** evm/sge_event_master/sge_shutdown_event_client() ***********************
 
1080
*  NAME
 
1081
*     sge_shutdown_event_client() -- shutdown an event client 
 
1082
*
 
1083
*  SYNOPSIS
 
1084
*     int sge_shutdown_event_client(u_long32 event_client_id, const char* anUser, 
 
1085
*     uid_t anUID) 
 
1086
*
 
1087
*  FUNCTION
 
1088
*     Shutdown an event client. Send the event client denoted by 'event_client_id' 
 
1089
*     a shutdown event.
 
1090
*
 
1091
*     Shutting down an event client is only permitted if 'anUser' does have
 
1092
*     manager privileges OR is the owner of event client 'event_client_id'.
 
1093
*
 
1094
*  INPUTS
 
1095
*     u_long32 event_client_id - event client ID 
 
1096
*     const char* anUser - user which did request this operation 
 
1097
*     uid_t anUID        - user id of request user
 
1098
*     lList **alpp       - answer list for info and errors
 
1099
*
 
1100
*  RESULT
 
1101
*     EPERM - operation not permitted  
 
1102
*     ESRCH - client with given client id is unknown
 
1103
*     0     - otherwise
 
1104
*
 
1105
*  NOTES
 
1106
*     MT-NOTE: sge_shutdown_event_client() is MT safe, it uses the global lock
 
1107
*              and internal ones.
 
1108
*
 
1109
*******************************************************************************/
 
1110
int sge_shutdown_event_client(u_long32 event_client_id, const char* anUser,
 
1111
                              uid_t anUID, lList **alpp, monitoring_t *monitor)
 
1112
{
 
1113
   lListElem *client = NULL;
 
1114
   int ret = 0;
 
1115
 
 
1116
   DENTER(TOP_LAYER, "sge_shutdown_event_client");
 
1117
 
 
1118
   if (event_client_id <= EV_ID_ANY) {
 
1119
      SGE_ADD_MSG_ID(sprintf(SGE_EVENT, MSG_EVE_UNKNOWNEVCLIENT_US, sge_u32c(event_client_id), "shutdown"));
 
1120
      answer_list_add(alpp, SGE_EVENT, STATUS_ESYNTAX, ANSWER_QUALITY_ERROR);
 
1121
      DRETURN(EINVAL);
 
1122
   }
 
1123
 
 
1124
   sge_mutex_lock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
 
1125
   client = get_event_client(event_client_id);
 
1126
 
 
1127
   if (client != NULL) {
 
1128
      if (!manop_is_manager(anUser) && (anUID != lGetUlong(client, EV_uid))) {
 
1129
         sge_mutex_unlock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
 
1130
         answer_list_add(alpp, MSG_COM_NOSHUTDOWNPERMS, STATUS_DENIED,
 
1131
                         ANSWER_QUALITY_ERROR);
 
1132
         DRETURN(EPERM);
 
1133
      }
 
1134
 
 
1135
      add_list_event_for_client(event_client_id, 0, sgeE_SHUTDOWN, 0, 0, NULL, NULL,
 
1136
                                NULL, NULL, true);
 
1137
 
 
1138
      /* Print out a message about the event. */
 
1139
      if (event_client_id == EV_ID_SCHEDD) {
 
1140
         SGE_ADD_MSG_ID(sprintf(SGE_EVENT, MSG_COM_KILLED_SCHEDULER));
 
1141
      } else {
 
1142
         SGE_ADD_MSG_ID(sprintf(SGE_EVENT, MSG_COM_SHUTDOWNNOTIFICATION_SUS,
 
1143
                        lGetString(client, EV_name),
 
1144
                        sge_u32c(lGetUlong(client, EV_id)),
 
1145
                        lGetHost(client, EV_host)));
 
1146
      }
 
1147
      answer_list_add(alpp, SGE_EVENT, STATUS_OK, ANSWER_QUALITY_INFO);
 
1148
   } else {
 
1149
      SGE_ADD_MSG_ID(sprintf(SGE_EVENT,
 
1150
                        MSG_EVE_UNKNOWNEVCLIENT_US, sge_u32c(event_client_id), "shutdown"));
 
1151
      answer_list_add(alpp, SGE_EVENT, STATUS_ESYNTAX, ANSWER_QUALITY_ERROR);
 
1152
      ret = EINVAL;
 
1153
   }
 
1154
 
 
1155
   sge_mutex_unlock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
 
1156
 
 
1157
   DRETURN(ret);
 
1158
} /* sge_shutdown_event_client */
 
1159
 
 
1160
/****** evm/sge_event_master/sge_shutdown_dynamic_event_clients() **************
 
1161
*  NAME
 
1162
*     sge_shutdown_dynamic_event_clients() -- shutdown all dynamic event clients
 
1163
*
 
1164
*  SYNOPSIS
 
1165
*     int sge_shutdown_dynamic_event_clients(const char *anUser) 
 
1166
*
 
1167
*  FUNCTION
 
1168
*     Shutdown all dynamic event clients. Each dynamic event client known will
 
1169
*     be send a shutdown event.
 
1170
*
 
1171
*     An event client is a dynamic event client if it's client id is greater
 
1172
*     than or equal to 'EV_ID_FIRST_DYNAMIC'. 
 
1173
*
 
1174
*     Shutting down all dynamic event clients is only permitted if 'anUser' does
 
1175
*     have manager privileges.
 
1176
*
 
1177
*  INPUTS
 
1178
*     const char *anUser - user which did request this operation 
 
1179
*     lList **alpp       - answer list for info and errors
 
1180
*
 
1181
*  RESULT
 
1182
*     EPERM - operation not permitted 
 
1183
*     0     - otherwise
 
1184
*
 
1185
*  NOTES
 
1186
*     MT-NOTES: sge_shutdown_dynamic_event_clients() is MT safe, it uses the
 
1187
*               global_lock and internal ones.
 
1188
*
 
1189
*******************************************************************************/
 
1190
int sge_shutdown_dynamic_event_clients(const char *anUser, lList **alpp, monitoring_t *monitor)
 
1191
{
 
1192
   lListElem *client; 
 
1193
   int id = 0;
 
1194
 
 
1195
   DENTER(TOP_LAYER, "sge_shutdown_dynamic_event_clients");
 
1196
 
 
1197
   if (!manop_is_manager(anUser)) {
 
1198
      answer_list_add(alpp, MSG_COM_NOSHUTDOWNPERMS, STATUS_DENIED, ANSWER_QUALITY_ERROR);
 
1199
      DRETURN(EPERM);
 
1200
   }
 
1201
 
 
1202
   sge_mutex_lock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
 
1203
   for_each (client, Event_Master_Control.clients) {
 
1204
      id = lGetUlong(client, EV_id);
 
1205
 
 
1206
      /* Ignore clients with static ids. */
 
1207
      if (id < EV_ID_FIRST_DYNAMIC) {
 
1208
         continue;
 
1209
      }
 
1210
 
 
1211
      sge_add_event_for_client(id, 0, sgeE_SHUTDOWN, 0, 0, NULL, NULL, NULL, NULL);
 
1212
 
 
1213
      SGE_ADD_MSG_ID(sprintf(SGE_EVENT, MSG_COM_SHUTDOWNNOTIFICATION_SUS,
 
1214
                     lGetString(client, EV_name),
 
1215
                     sge_u32c(id), lGetHost(client, EV_host)));
 
1216
      answer_list_add(alpp, SGE_EVENT, STATUS_OK, ANSWER_QUALITY_INFO);
 
1217
   } /* for_each */
 
1218
 
 
1219
   sge_mutex_unlock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
 
1220
   DRETURN(0);
 
1221
} /* sge_shutdown_dynamic_event_clients() */
 
1222
 
 
1223
/****** Eventclient/Server/sge_add_event() *************************************
 
1224
*  NAME
 
1225
*     sge_add_event() -- add an object as event
 
1226
*
 
1227
*  SYNOPSIS
 
1228
*     #include "sge_event_master.h"
 
1229
*
 
1230
*     void 
 
1231
*     sge_add_event(u_long32 timestamp, ev_event type,
 
1232
*                   u_long32 intkey, u_long32 intkey2,
 
1233
*                   const char *strkey, const char *strkey2,
 
1234
*                   const char *session, lListElem *element) 
 
1235
*
 
1236
*  FUNCTION
 
1237
*     Adds an object to the list of events to deliver. Called, if an event 
 
1238
*     occurs to that object, e.g. it was added to Grid Engine, modified or 
 
1239
*     deleted.
 
1240
*  
 
1241
*     Internally, a list with that single object is created and passed to 
 
1242
*     sge_add_list_event().
 
1243
*
 
1244
*  INPUTS
 
1245
*     u_long32 timestamp      - event creation time, 0 -> use current time
 
1246
*     ev_event type           - the event id
 
1247
*     u_long32 intkey         - additional data
 
1248
*     u_long32 intkey2        - additional data
 
1249
*     const char *strkey      - additional data
 
1250
*     const char *strkey2     - additional data
 
1251
*     const char *session     - events session key
 
1252
*     lListElem *element      - the object to deliver as event
 
1253
*
 
1254
*  NOTES
 
1255
*     MT-NOTE: sge_add_event() is NOT MT safe.
 
1256
*
 
1257
*******************************************************************************/
 
1258
bool 
 
1259
sge_add_event(u_long32 timestamp, ev_event type, 
 
1260
              u_long32 intkey, u_long32 intkey2,
 
1261
              const char *strkey, const char *strkey2, 
 
1262
              const char *session, lListElem *element) 
 
1263
{
 
1264
   return sge_add_event_for_client(EV_ID_ANY, timestamp, type, intkey, intkey2, strkey, strkey2, session, element);
 
1265
}
 
1266
/****** sge_event_master/sge_add_event_for_client() ****************************
 
1267
*  NAME
 
1268
*     sge_add_event_for_client() -- add an event for a given object
 
1269
*
 
1270
*  SYNOPSIS
 
1271
*     bool
 
1272
*     sge_add_event_for_client(u_long32 event_client_id, u_long32 timestamp, ev_event type,
 
1273
*                              u_long32 intkey, u_long32 intkey2,
 
1274
*                              const char *strkey, const char *strkey2,
 
1275
*                              const char *session, lListElem *element)
 
1276
*
 
1277
*  FUNCTION
 
1278
*     Add an event for a given event client.
 
1279
*
 
1280
*  INPUTS
 
1281
*     u_long32 event_client_id   - event client id
 
1282
*     u_long32 timestamp         - event creation time, 0 -> use current time
 
1283
*     ev_event type              - event id
 
1284
*     u_long32 intkey            - 1st numeric key
 
1285
*     u_long32 intkey2           - 2nd numeric key
 
1286
*     const char *strkey         - 1st alphanumeric key
 
1287
*     const char *strkey2        - 2nd alphanumeric key
 
1288
*     const char *session        - event session
 
1289
*     lListElem *element         - object to be delivered with the event
 
1290
*
 
1291
*  NOTES
 
1292
*     MT-NOTE: sge_add_event_for_client() is MT safe
 
1293
*
 
1294
*******************************************************************************/
 
1295
bool sge_add_event_for_client(u_long32 event_client_id, u_long32 timestamp, ev_event type,
 
1296
                              u_long32 intkey, u_long32 intkey2,
 
1297
                              const char *strkey, const char *strkey2, 
 
1298
                              const char *session, lListElem *element)
 
1299
{
 
1300
   lList *lp = NULL;
 
1301
   bool ret = false;
 
1302
 
 
1303
   DENTER(TOP_LAYER, "sge_add_event_for_client");
 
1304
 
 
1305
   if (element != NULL) {
 
1306
      lList *temp_sub_lp = NULL;
 
1307
      int sub_list_elem = 0;
 
1308
 
 
1309
      /* ignore the sublist in case of the following events. We have
 
1310
       * extra events to handle the sub-lists 
 
1311
       */
 
1312
      if (type == sgeE_JOB_MOD) {
 
1313
         sub_list_elem = JB_ja_tasks;
 
1314
         lXchgList(element, sub_list_elem, &temp_sub_lp);
 
1315
      } else if (type == sgeE_CQUEUE_MOD) {
 
1316
         sub_list_elem = CQ_qinstances;
 
1317
         lXchgList(element, sub_list_elem, &temp_sub_lp);
 
1318
      } else if (type == sgeE_JATASK_MOD) {
 
1319
         sub_list_elem = JAT_task_list;
 
1320
         lXchgList(element, sub_list_elem, &temp_sub_lp);
 
1321
      }
 
1322
 
 
1323
      lp = lCreateListHash("Events", lGetElemDescr(element), false);
 
1324
      lAppendElem(lp, lCopyElemHash(element, false));
 
1325
 
 
1326
      /* restore the original event object */
 
1327
      if (temp_sub_lp != NULL) {
 
1328
         lXchgList(element, sub_list_elem, &temp_sub_lp);
 
1329
      }
 
1330
   }
 
1331
 
 
1332
   ret = add_list_event_for_client(event_client_id, timestamp, type, intkey, intkey2,
 
1333
                                   strkey, strkey2, session, lp, false);
 
1334
 
 
1335
   DRETURN(ret);
 
1336
}
 
1337
 
 
1338
/****** Eventclient/Server/sge_add_list_event() ********************************
 
1339
*  NAME
 
1340
*     sge_add_list_event() -- add a list as event
 
1341
*
 
1342
*  SYNOPSIS
 
1343
*     #include "sge_event_master.h"
 
1344
*
 
1345
*     void 
 
1346
*     sge_add_list_event(u_long32 timestamp, ev_event type, 
 
1347
*                        u_long32 intkey, u_long32 intkey2,
 
1348
*                        const char *strkey, const char *strkey2,
 
1349
*                        const char *session, lList *list) 
 
1350
*
 
1351
*  FUNCTION
 
1352
*     Adds a list of objects to the list of events to deliver, e.g. the 
 
1353
*     sgeE*_LIST events.
 
1354
*
 
1355
*  INPUTS
 
1356
*     u_long32 timestamp      - event creation time, 0 -> use current time
 
1357
*     ev_event type           - the event id
 
1358
*     u_long32 intkey         - additional data
 
1359
*     u_long32 intkey2        - additional data
 
1360
*     const char *strkey      - additional data
 
1361
*     const char *strkey2     - additional data
 
1362
*     const char *session     - events session key
 
1363
*     lList *list             - the list to deliver as event
 
1364
*
 
1365
*  NOTES
 
1366
*     MT-NOTE: sge_add_list_event() is MT safe.
 
1367
*
 
1368
*******************************************************************************/
 
1369
bool sge_add_list_event(u_long32 timestamp, ev_event type,
 
1370
                        u_long32 intkey, u_long32 intkey2,
 
1371
                        const char *strkey, const char *strkey2,
 
1372
                        const char *session, lList *list)
 
1373
{
 
1374
   bool ret;
 
1375
   lList *lp = NULL;
 
1376
 
 
1377
   if (list != NULL) {
 
1378
      lListElem *element = NULL;
 
1379
 
 
1380
      lp = lCreateListHash("Events", lGetListDescr(list), false);
 
1381
      if (lp == NULL) {
 
1382
         return false;
 
1383
      }
 
1384
      for_each(element, list) {
 
1385
         lList *temp_sub_lp = NULL;
 
1386
         int sub_list_elem = 0;
 
1387
 
 
1388
         /* ignore the sublist in case of the following events. We have
 
1389
          * extra events to handle the sub-lists
 
1390
          */
 
1391
         if (type == sgeE_JOB_MOD) {
 
1392
            sub_list_elem = JB_ja_tasks;
 
1393
            lXchgList(element, sub_list_elem, &temp_sub_lp);
 
1394
         } else if (type == sgeE_CQUEUE_MOD) {
 
1395
            sub_list_elem = CQ_qinstances;
 
1396
            lXchgList(element, sub_list_elem, &temp_sub_lp);
 
1397
         } else if (type == sgeE_JATASK_MOD) {
 
1398
            sub_list_elem = JAT_task_list;
 
1399
            lXchgList(element, sub_list_elem, &temp_sub_lp);
 
1400
         }
 
1401
 
 
1402
         lAppendElem(lp, lCopyElemHash(element, false));
 
1403
 
 
1404
         /* restore the original event object */
 
1405
         if (temp_sub_lp != NULL) {
 
1406
            lXchgList(element, sub_list_elem, &temp_sub_lp);
 
1407
         }
 
1408
      }
 
1409
   }
 
1410
 
 
1411
   ret = add_list_event_for_client(EV_ID_ANY, timestamp, type, intkey, intkey2,
 
1412
                                   strkey, strkey2, session, lp, false);
 
1413
   return ret;
 
1414
}
 
1415
 
 
1416
/****** Eventclient/Server/add_list_event_for_client() *************************
 
1417
*  NAME
 
1418
*     add_list_event_for_client() -- add a list as event
 
1419
*
 
1420
*  SYNOPSIS
 
1421
*     #include "sge_event_master.h"
 
1422
*
 
1423
*     void 
 
1424
*     add_list_event_for_client(u_long32 event_client_id, u_long32 timestamp,
 
1425
*                               ev_event type, u_long32 intkey,
 
1426
*                               u_long32 intkey2, const char *strkey,
 
1427
*                               const char *session, lList *list) 
 
1428
*
 
1429
*  FUNCTION
 
1430
*     Adds a list of objects to the list of events to deliver, e.g. the 
 
1431
*     sgeE*_LIST events, to a specific client.  No checking is done to make
 
1432
*     sure that the client id is valid.  That is the responsibility of the
 
1433
*     calling function.
 
1434
*
 
1435
*  INPUTS
 
1436
*     u_long32 event_client_id      - the id of the recipient
 
1437
*     u_long32 timestamp      - time stamp in gmt for the even; if 0 is passed,
 
1438
*                               sge_add_list_event will insert the actual time
 
1439
*     ev_event type           - the event id
 
1440
*     u_long32 intkey         - additional data
 
1441
*     u_long32 intkey2        - additional data
 
1442
*     const char *strkey      - additional data
 
1443
*     const char *session     - events session key
 
1444
*     lList *list             - the list to deliver as event
 
1445
*
 
1446
*  RESULTS
 
1447
*     Whether the event was added successfully.
 
1448
*
 
1449
*  NOTES
 
1450
*     MT-NOTE: add_list_event_for_client() is MT safe.
 
1451
*
 
1452
*******************************************************************************/
 
1453
static bool add_list_event_for_client(u_long32 event_client_id, u_long32 timestamp,
 
1454
                                      ev_event type, u_long32 intkey,
 
1455
                                      u_long32 intkey2, const char *strkey,
 
1456
                                      const char *strkey2, const char *session,
 
1457
                                      lList *list, bool has_lock)
 
1458
{
 
1459
   lListElem *evr = NULL;        /* event request object */
 
1460
   lList *etlp = NULL;           /* event list */
 
1461
   lListElem *etp = NULL;        /* event object */
 
1462
 
 
1463
   DENTER(TOP_LAYER, "add_list_event_for_client");
 
1464
 
 
1465
   /* an event needs a timestamp */
 
1466
   if (timestamp == 0) {
 
1467
      timestamp = sge_get_gmt();
 
1468
   }
 
1469
 
 
1470
   evr = lCreateElem(EVR_Type);
 
1471
   lSetUlong(evr, EVR_operation, EVR_ADD_EVENT);
 
1472
   lSetUlong(evr, EVR_timestamp, timestamp);
 
1473
   lSetUlong(evr, EVR_event_client_id, event_client_id);
 
1474
   lSetString(evr, EVR_session, session);
 
1475
 
 
1476
   etlp = lCreateListHash("Event_List", ET_Type, false);
 
1477
   lSetList(evr, EVR_event_list, etlp);
 
1478
 
 
1479
   etp = lCreateElem(ET_Type); /* actual event object */
 
1480
   lAppendElem(etlp, etp);
 
1481
 
 
1482
   lSetUlong(etp, ET_type, type);
 
1483
   lSetUlong(etp, ET_timestamp, timestamp);
 
1484
   lSetUlong(etp, ET_intkey, intkey);
 
1485
   lSetUlong(etp, ET_intkey2, intkey2);
 
1486
   lSetString(etp, ET_strkey, strkey);
 
1487
   lSetString(etp, ET_strkey2, strkey2);
 
1488
   lSetList(etp, ET_new_version, list);
 
1489
 
 
1490
   /* 
 
1491
    * if we have a transaction open, add to the transaction 
 
1492
    * otherwise into the event master request list
 
1493
    * need a new C block, as the GET_SPECIFIC macro declares new variables
 
1494
    */
 
1495
   {
 
1496
      GET_SPECIFIC(event_master_transaction_t, t_store, sge_event_master_init_transaction_store, Event_Master_Control.transaction_key, "t_store");
 
1497
      if (t_store->is_transaction) {
 
1498
         lAppendElem(t_store->transaction_requests, evr);
 
1499
      } else {
 
1500
         sge_mutex_lock("event_master_request_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.request_mutex);
 
1501
         lAppendElem(Event_Master_Control.requests, evr);
 
1502
         sge_mutex_unlock("event_master_request_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.request_mutex);
 
1503
 
 
1504
         set_flush();
 
1505
      }
 
1506
   }
 
1507
 
 
1508
   DRETURN(true);
 
1509
} /* end add_list_event_for_client */
 
1510
 
 
1511
/* add an event from the request list to the event clients which subscribed it */
 
1512
static void sge_event_master_process_send(lListElem *request, monitoring_t *monitor)
 
1513
{
 
1514
   lListElem *event_client = NULL;
 
1515
   lListElem *event = NULL;
 
1516
   lList *event_list = NULL;
 
1517
   u_long32 ec_id = 0;
 
1518
   const char *session = NULL;
 
1519
   ev_event type = sgeE_ALL_EVENTS;
 
1520
 
 
1521
   DENTER(TOP_LAYER, "sge_event_master_process_send");
 
1522
 
 
1523
   ec_id = lGetUlong(request, EVR_event_client_id);
 
1524
   session = lGetString(request, EVR_session);
 
1525
   event_list = lGetList(request, EVR_event_list);
 
1526
 
 
1527
   MONITOR_EDT_NEW(monitor);
 
1528
 
 
1529
   if (ec_id == EV_ID_ANY) {
 
1530
      DPRINTF(("Processing event for all clients\n"));
 
1531
 
 
1532
      event = lFirst(event_list);
 
1533
      while (event != NULL) {
 
1534
         bool added = false;
 
1535
         event = lDechainElem(event_list, event);
 
1536
         type = (ev_event)lGetUlong(event, ET_type);
 
1537
 
 
1538
         sge_mutex_lock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
 
1539
         for_each (event_client, Event_Master_Control.clients) {
 
1540
            ec_id = lGetUlong(event_client, EV_id);
 
1541
 
 
1542
            DPRINTF(("Preparing event for client %ld\n", ec_id));
 
1543
 
 
1544
            if (eventclient_subscribed(event_client, type, session)) {
 
1545
               added = true;
 
1546
               add_list_event_direct(event_client, event, true);
 
1547
               MONITOR_EDT_ADDED(monitor);
 
1548
            }
 
1549
         } /* for_each */
 
1550
         sge_mutex_unlock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
 
1551
 
 
1552
         if (!added) {
 
1553
            MONITOR_EDT_SKIP(monitor);
 
1554
         }
 
1555
         lFreeElem(&event);
 
1556
         event = lFirst(event_list);
 
1557
      } /* while */
 
1558
   } else {
 
1559
      DPRINTF(("Processing event for client %d.\n", ec_id));
 
1560
 
 
1561
      sge_mutex_lock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
 
1562
 
 
1563
      event_client = get_event_client(ec_id);
 
1564
 
 
1565
      /* Skip bad client ids.  Events with bad client ids will be freed
 
1566
       * when send is freed since we don't dechain them. */
 
1567
      if (event_client != NULL) {
 
1568
         event = lFirst(event_list);
 
1569
 
 
1570
         while (event != NULL) {
 
1571
            event = lDechainElem(event_list, event);
 
1572
            type = (ev_event)lGetUlong(event, ET_type);
 
1573
 
 
1574
            if (eventclient_subscribed(event_client, type, session)) {
 
1575
               add_list_event_direct(event_client, event, false);
 
1576
               MONITOR_EDT_ADDED(monitor);
 
1577
               /* We can't free the event when we're done because it now belongs
 
1578
                * to send_events(). */
 
1579
            } else {
 
1580
               MONITOR_EDT_SKIP(monitor);
 
1581
               lFreeElem(&event);
 
1582
            }
 
1583
            event = lFirst(event_list);
 
1584
         } /* while */
 
1585
      } /* if */
 
1586
 
 
1587
      sge_mutex_unlock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
 
1588
   } /* else */
 
1589
   DRETURN_VOID;
 
1590
} /* process_sends() */
 
1591
 
 
1592
/****** Eventclient/Server/sge_handle_event_ack() ******************************
 
1593
*  NAME
 
1594
*     sge_handle_event_ack() -- acknowledge event delivery
 
1595
*
 
1596
*  SYNOPSIS
 
1597
*     #include "sge_event_master.h"
 
1598
*
 
1599
*     void 
 
1600
*     sge_handle_event_ack(u_long32 event_client_id, ev_event event_number) 
 
1601
*
 
1602
*  FUNCTION
 
1603
*     After the server sent events to an event client, it has to acknowledge
 
1604
*     their receipt. 
 
1605
*     Acknowledged events are deleted from the list of events to deliver, 
 
1606
*     otherwise they will be resent after the next event delivery interval.
 
1607
*     If the handling of a busy state of the event client is enabled and set to 
 
1608
*     EV_BUSY_UNTIL_ACK, the event client will be set to "not busy".
 
1609
*
 
1610
*  INPUTS
 
1611
*     u_long32 event_client_id - event client sending acknowledge
 
1612
*     ev_event event_number   - serial number of the last event to acknowledge
 
1613
*
 
1614
*  RESULT
 
1615
*     void - none
 
1616
*
 
1617
*  NOTES
 
1618
*     MT-NOTE: sge_handle_event_ack() is MT safe.
 
1619
*
 
1620
*
 
1621
*******************************************************************************/
 
1622
void sge_handle_event_ack(u_long32 event_client_id, u_long32 event_number)
 
1623
{
 
1624
   lListElem *evr = NULL;
 
1625
 
 
1626
   DENTER(TOP_LAYER, "sge_handle_event_ack");
 
1627
 
 
1628
   evr = lCreateElem(EVR_Type);
 
1629
   lSetUlong(evr, EVR_operation, EVR_ACK_EVENT);
 
1630
   lSetUlong(evr, EVR_timestamp, sge_get_gmt());
 
1631
   lSetUlong(evr, EVR_event_client_id, event_client_id);
 
1632
   lSetUlong(evr, EVR_event_number, event_number);
 
1633
 
 
1634
   sge_mutex_lock("event_master_request_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.request_mutex);
 
1635
   lAppendElem(Event_Master_Control.requests, evr);
 
1636
   sge_mutex_unlock("event_master_request_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.request_mutex);
 
1637
 
 
1638
   set_flush();
 
1639
  
 
1640
   DRETURN_VOID;
 
1641
}
 
1642
 
 
1643
static void sge_event_master_process_ack(lListElem *request, monitoring_t *monitor)
 
1644
{
 
1645
   lListElem *client;
 
1646
   u_long32 event_client_id;
 
1647
 
 
1648
   DENTER(TOP_LAYER, "sge_event_master_process_ack");
 
1649
 
 
1650
   event_client_id = lGetUlong(request, EVR_event_client_id);
 
1651
 
 
1652
   sge_mutex_lock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
 
1653
 
 
1654
   client = get_event_client(event_client_id);
 
1655
   
 
1656
   if (client == NULL) {
 
1657
      ERROR((SGE_EVENT, MSG_EVE_UNKNOWNEVCLIENT_US, sge_u32c(event_client_id), "process acknowledgements"));
 
1658
   } else {
 
1659
      u_long32 event_number = lGetUlong(request, EVR_event_number);
 
1660
      u_long32 timestamp = lGetUlong(request, EVR_timestamp);
 
1661
      int res = 0;
 
1662
      lList *list = lGetList(client, EV_events);
 
1663
 
 
1664
      res = purge_event_list(list, event_number);
 
1665
 
 
1666
      MONITOR_EDT_ACK(monitor);
 
1667
      if (res > 0) {
 
1668
         DPRINTF(("%s: purged %d acknowledged events\n", SGE_FUNC, res));
 
1669
      }
 
1670
 
 
1671
      lSetUlong(client, EV_last_heard_from, timestamp); /* note time of ack */
 
1672
 
 
1673
      switch (lGetUlong(client, EV_busy_handling)) {
 
1674
         case EV_BUSY_UNTIL_ACK:
 
1675
         case EV_THROTTLE_FLUSH:
 
1676
            lSetUlong(client, EV_busy, 0); /* clear busy state */
 
1677
            break;
 
1678
         default:
 
1679
            break;
 
1680
      }
 
1681
   } /* else */
 
1682
 
 
1683
   sge_mutex_unlock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
 
1684
   DRETURN_VOID;
 
1685
} /* sge_handle_event_ack() */
 
1686
 
 
1687
/****** evm/sge_event_master/sge_deliver_events_immediately() ******************
 
1688
*  NAME
 
1689
*     sge_deliver_events_immediately() -- deliver events immediately 
 
1690
*
 
1691
*  SYNOPSIS
 
1692
*     void sge_deliver_events_immediately(u_long32 event_client_id) 
 
1693
*
 
1694
*  FUNCTION
 
1695
*     Deliver all events for the event client denoted by 'event_client_id'
 
1696
*     immediately.
 
1697
*
 
1698
*  INPUTS
 
1699
*     u_long32 event_client_id - event client id 
 
1700
*
 
1701
*  RESULT
 
1702
*     void - none
 
1703
*
 
1704
*  NOTES
 
1705
*     MT-NOTE: sge_deliver_events_immediately() is NOT MT safe. 
 
1706
*
 
1707
*******************************************************************************/
 
1708
void sge_deliver_events_immediately(u_long32 event_client_id)
 
1709
{
 
1710
   lListElem *client = NULL;
 
1711
 
 
1712
   DENTER(TOP_LAYER, "sge_event_immediate_delivery");
 
1713
 
 
1714
   sge_mutex_lock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
 
1715
 
 
1716
   if ((client = get_event_client(event_client_id)) == NULL) {
 
1717
      ERROR((SGE_EVENT, MSG_EVE_UNKNOWNEVCLIENT_US, sge_u32c(event_client_id), "deliver events immediately"));
 
1718
   } else {
 
1719
      flush_events(client, 0);
 
1720
 
 
1721
      sge_mutex_lock("event_master_cond_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.cond_mutex);
 
1722
      Event_Master_Control.delivery_signaled = true;
 
1723
      pthread_cond_signal(&Event_Master_Control.cond_var);
 
1724
      sge_mutex_unlock("event_master_cond_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.cond_mutex);
 
1725
   }
 
1726
 
 
1727
   sge_mutex_unlock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
 
1728
   DRETURN_VOID;
 
1729
} /* sge_deliver_event_immediately() */
 
1730
 
 
1731
/****** evm/sge_event_master/sge_resync_schedd() *******************************
 
1732
*  NAME
 
1733
*     sge_resync_schedd() -- resync schedd 
 
1734
*
 
1735
*  SYNOPSIS
 
1736
*     int sge_resync_schedd(void) 
 
1737
*
 
1738
*  FUNCTION
 
1739
*     Does a total update (send all lists) to schedd and outputs an error
 
1740
*     message.
 
1741
*
 
1742
*  INPUTS
 
1743
*     void - none 
 
1744
*
 
1745
*  RESULT
 
1746
*     0 - resync successful
 
1747
*    -1 - otherwise
 
1748
*
 
1749
*  NOTES
 
1750
*     MT-NOTE: sge_resync_schedd() in NOT MT safe. 
 
1751
*
 
1752
*******************************************************************************/
 
1753
int sge_resync_schedd(monitoring_t *monitor)
 
1754
{
 
1755
   lListElem *client;
 
1756
   int ret = -1;
 
1757
   DENTER(TOP_LAYER, "sge_sync_schedd");
 
1758
 
 
1759
   sge_mutex_lock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
 
1760
 
 
1761
   if ((client = get_event_client(EV_ID_SCHEDD)) != NULL) {
 
1762
      ERROR((SGE_EVENT, MSG_EVE_REINITEVENTCLIENT_S,
 
1763
             lGetString(client, EV_name)));
 
1764
 
 
1765
      total_update(client, monitor);
 
1766
 
 
1767
      ret = 0;
 
1768
   } else {
 
1769
      ERROR((SGE_EVENT, MSG_EVE_UNKNOWNEVCLIENT_US, sge_u32c(EV_ID_SCHEDD), "resynchronize"));
 
1770
      ret = -1;
 
1771
   }
 
1772
 
 
1773
   sge_mutex_unlock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
 
1774
   DRETURN(ret);
 
1775
} /* sge_resync_schedd() */
 
1776
 
 
1777
/****** evm/sge_event_master/sge_event_master_init() **************************
 
1778
*  NAME
 
1779
*     sge_event_master_init() -- event master initialization
 
1780
*
 
1781
*  SYNOPSIS
 
1782
*     static void sge_event_master_init(void) 
 
1783
*
 
1784
*  FUNCTION
 
1785
*     Initialize the event master control structure. Initialize permanent
 
1786
*     event array. 
 
1787
*
 
1788
*  INPUTS
 
1789
*     void - none 
 
1790
*
 
1791
*  RESULT
 
1792
*     void - none
 
1793
*
 
1794
*  NOTES
 
1795
*     MT-NOTE: sge_event_master_init() is not MT safe 
 
1796
*
 
1797
*******************************************************************************/
 
1798
void sge_event_master_init(void)
 
1799
{
 
1800
   DENTER(TOP_LAYER, "sge_event_master_init");
 
1801
 
 
1802
   Event_Master_Control.clients = lCreateListHash("EV_Clients", EV_Type, true);
 
1803
   Event_Master_Control.requests = lCreateListHash("Event Master Requests", EVR_Type, false);
 
1804
   pthread_key_create(&(Event_Master_Control.transaction_key), sge_event_master_destroy_transaction_store);
 
1805
   
 
1806
   init_send_events();
 
1807
 
 
1808
   /* Initialize the range list for event client ids */
 
1809
   {
 
1810
      lList *answer_list = NULL;
 
1811
      range_list_initialize(&Event_Master_Control.client_ids, &answer_list);
 
1812
      answer_list_output(&answer_list);
 
1813
   }
 
1814
 
 
1815
   DRETURN_VOID;
 
1816
}
 
1817
 
 
1818
/****** evm/sge_event_master/init_send_events() ********************************
 
1819
*  NAME
 
1820
*     init_send_events() -- sets the events, that should allways be delivered 
 
1821
*
 
1822
*  SYNOPSIS
 
1823
*     void init_send_events() 
 
1824
*
 
1825
*  FUNCTION
 
1826
*     sets the events, that should allways be delivered 
 
1827
*
 
1828
*  NOTES
 
1829
*     MT-NOTE: init_send_events() is not MT safe 
 
1830
*
 
1831
*******************************************************************************/
 
1832
static void init_send_events(void)
 
1833
{
 
1834
   DENTER(TOP_LAYER, "init_send_events");
 
1835
 
 
1836
   memset(SEND_EVENTS, false, sizeof(bool) * sgeE_EVENTSIZE);
 
1837
 
 
1838
   SEND_EVENTS[sgeE_ADMINHOST_LIST] = true;
 
1839
   SEND_EVENTS[sgeE_CALENDAR_LIST] = true;
 
1840
   SEND_EVENTS[sgeE_CKPT_LIST] = true;
 
1841
   SEND_EVENTS[sgeE_CENTRY_LIST] = true;
 
1842
   SEND_EVENTS[sgeE_CONFIG_LIST] = true;
 
1843
   SEND_EVENTS[sgeE_EXECHOST_LIST] = true;
 
1844
   SEND_EVENTS[sgeE_JOB_LIST] = true;
 
1845
   SEND_EVENTS[sgeE_JOB_SCHEDD_INFO_LIST] = true;
 
1846
   SEND_EVENTS[sgeE_MANAGER_LIST] = true;
 
1847
   SEND_EVENTS[sgeE_OPERATOR_LIST] = true;
 
1848
   SEND_EVENTS[sgeE_PE_LIST] = true;
 
1849
   SEND_EVENTS[sgeE_PROJECT_LIST] = true;
 
1850
   SEND_EVENTS[sgeE_QMASTER_GOES_DOWN] = true;
 
1851
   SEND_EVENTS[sgeE_CQUEUE_LIST] = true;
 
1852
   SEND_EVENTS[sgeE_SUBMITHOST_LIST] = true;
 
1853
   SEND_EVENTS[sgeE_USER_LIST] = true;
 
1854
   SEND_EVENTS[sgeE_USERSET_LIST] = true;
 
1855
   SEND_EVENTS[sgeE_HGROUP_LIST] = true;
 
1856
   SEND_EVENTS[sgeE_RQS_LIST] = true;
 
1857
   SEND_EVENTS[sgeE_AR_LIST] = true;
 
1858
#ifndef __SGE_NO_USERMAPPING__
 
1859
   SEND_EVENTS[sgeE_CUSER_LIST] = true;
 
1860
#endif
 
1861
 
 
1862
   DRETURN_VOID;
 
1863
} /* init_send_events() */
 
1864
 
 
1865
 
 
1866
/****** sge_event_master/sge_event_master_wait_next() ******************************
 
1867
*  NAME
 
1868
*     sge_event_master_wait_next() -- waits for a weakup
 
1869
*
 
1870
*  SYNOPSIS
 
1871
*     void sge_event_master_wait_next(void) 
 
1872
*
 
1873
*  FUNCTION
 
1874
*     waits for a weakup
 
1875
*
 
1876
*  NOTES
 
1877
*     MT-NOTE: is MT safe
 
1878
*
 
1879
*******************************************************************************/
 
1880
void sge_event_master_wait_next(void)
 
1881
{
 
1882
 
 
1883
   DENTER(TOP_LAYER, "sge_event_master_wait_next");
 
1884
 
 
1885
   sge_mutex_lock("event_master_cond_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.cond_mutex);
 
1886
 
 
1887
   if (!Event_Master_Control.delivery_signaled) {
 
1888
      u_long32 current_time = sge_get_gmt();
 
1889
      struct timespec ts;
 
1890
      ts.tv_sec = (time_t)(current_time + EVENT_DELIVERY_INTERVAL_S);
 
1891
      ts.tv_nsec = EVENT_DELIVERY_INTERVAL_N;
 
1892
      pthread_cond_timedwait(&Event_Master_Control.cond_var, &Event_Master_Control.cond_mutex, &ts);
 
1893
   }
 
1894
 
 
1895
   Event_Master_Control.delivery_signaled = false;
 
1896
 
 
1897
   sge_mutex_unlock("event_master_cond_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.cond_mutex);
 
1898
 
 
1899
 
 
1900
   DRETURN_VOID;
 
1901
}
 
1902
 
 
1903
/****** sge_event_master/remove_event_client() *********************************
 
1904
*  NAME
 
1905
*     remove_event_client() -- removes an event client
 
1906
*
 
1907
*  SYNOPSIS
 
1908
*     static void 
 
1909
*     remove_event_client(lListElem **client, int event_client_id, bool lock_event_master) 
 
1910
*
 
1911
*  FUNCTION
 
1912
*     removes an event client, marks the index as dirty and frees the memory
 
1913
*
 
1914
*  INPUTS
 
1915
*     lListElem **client     - event client to remove
 
1916
*     int event_client_id    - event client id to remove
 
1917
*     bool lock_event_master - shall the function aquire the event_master mutex
 
1918
*
 
1919
*  NOTES
 
1920
*     MT-NOTE: remove_event_client() is not MT safe
 
1921
*     - it locks the event master mutex to modify the event client list
 
1922
*     - it assums that the event client is locked before this method is called
 
1923
*
 
1924
*******************************************************************************/
 
1925
static void remove_event_client(lListElem **client, int event_client_id, bool lock_event_master) {
 
1926
   subscription_t *old_sub = NULL;
 
1927
   int i;
 
1928
 
 
1929
   DENTER(TOP_LAYER, "remove_event_client");
 
1930
 
 
1931
   INFO((SGE_EVENT, MSG_EVE_UNREG_SU, lGetString(*client, EV_name),
 
1932
         sge_u32c(lGetUlong(*client, EV_id))));
 
1933
 
 
1934
   old_sub = lGetRef(*client, EV_sub_array);
 
1935
   if (old_sub) {
 
1936
      /* now free event client subscription data */
 
1937
      for (i = 0; i < sgeE_EVENTSIZE; i++) {
 
1938
         lFreeWhere(&old_sub[i].where);
 
1939
         lFreeWhat(&old_sub[i].what);
 
1940
 
 
1941
         if (old_sub[i].descr) {
 
1942
            cull_hash_free_descr(old_sub[i].descr);
 
1943
            FREE(old_sub[i].descr);
 
1944
         }
 
1945
      }
 
1946
 
 
1947
      FREE(old_sub);
 
1948
      lSetRef(*client, EV_sub_array, NULL);
 
1949
   }
 
1950
 
 
1951
   if (lock_event_master) {
 
1952
      sge_mutex_lock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
 
1953
   }
 
1954
 
 
1955
   lRemoveElem(Event_Master_Control.clients, client);
 
1956
   if (event_client_id >= EV_ID_FIRST_DYNAMIC) {
 
1957
      lList *answer_list = NULL;
 
1958
      free_dynamic_id(&answer_list, event_client_id);
 
1959
      answer_list_output(&answer_list);
 
1960
   }
 
1961
 
 
1962
   if (lock_event_master) {
 
1963
      sge_mutex_unlock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
 
1964
   }
 
1965
 
 
1966
   DRETURN_VOID;
 
1967
}
 
1968
 
 
1969
/****** evm/sge_event_master/sge_event_master_send_events() ******************
 
1970
*  NAME
 
1971
*     sge_event_master_send_events() -- send events to event clients 
 
1972
*
 
1973
*  SYNOPSIS
 
1974
*     static void send_events(void) 
 
1975
*
 
1976
*  FUNCTION
 
1977
*     Loop over all event clients and send events due. If an event client did
 
1978
*     time out, it will be removed from the list of registered event clients.
 
1979
*
 
1980
*     Events will be delivered only, if the so called 'busy handling' of a 
 
1981
*     client does allow it. Events will be delivered as a report (REP_Type)
 
1982
*     with a report list of type ET_Type. 
 
1983
*
 
1984
*  INPUTS
 
1985
*     lListElem *report  - a report, has to be part of the report list. All
 
1986
*                          fields have to be init, except for REP_list element.   
 
1987
*     lList *report_list - a pre-init report list
 
1988
*
 
1989
*  RESULT
 
1990
*     void - none 
 
1991
*
 
1992
*  NOTES
 
1993
*     MT-NOTE: send_events() is MT safe 
 
1994
*     MT-NOTE:
 
1995
*     MT-NOTE: After all events for all clients have been sent. This function
 
1996
*     MT-NOTE: will wait on the condition variable 'Event_Master_Control.cond_var'
 
1997
*
 
1998
*******************************************************************************/
 
1999
void sge_event_master_send_events(sge_gdi_ctx_class_t *ctx, lListElem *report, lList *report_list,
 
2000
                                  monitoring_t *monitor)
 
2001
{
 
2002
   u_long32 timeout;
 
2003
   u_long32 busy_handling;
 
2004
   u_long32 scheduler_timeout = mconf_get_scheduler_timeout();
 
2005
   lListElem *event_client, *next_event_client;
 
2006
   int ret;
 
2007
   int commid; 
 
2008
   int deliver_interval;
 
2009
   time_t now = time(NULL);
 
2010
   u_long32 ec_id = 0;
 
2011
   event_client_update_func_t update_func = NULL;
 
2012
 
 
2013
   DENTER(TOP_LAYER, "sge_event_master_send_events");
 
2014
 
 
2015
   sge_mutex_lock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
 
2016
   event_client = lFirst(Event_Master_Control.clients);
 
2017
   while (event_client != NULL) {
 
2018
      const char *host = NULL;
 
2019
      const char *commproc = NULL;
 
2020
 
 
2021
      next_event_client = lNext(event_client);
 
2022
 
 
2023
      ec_id = lGetUlong(event_client, EV_id);
 
2024
 
 
2025
      /* is the event client in state terminated? remove it */
 
2026
      if (lGetUlong(event_client, EV_state) == EV_terminated) {
 
2027
         remove_event_client(&event_client, ec_id, false);
 
2028
         /* we removed a client, continue with the next one */
 
2029
         event_client = next_event_client;
 
2030
         continue;
 
2031
      }
 
2032
     
 
2033
      /* extract address of event client      */
 
2034
      /* Important:                           */
 
2035
      /*   host and commproc have to be freed */
 
2036
      update_func = (event_client_update_func_t) lGetRef(event_client, EV_update_function);
 
2037
 
 
2038
      host = lGetHost(event_client, EV_host);
 
2039
      commproc = lGetString(event_client, EV_commproc);
 
2040
      commid = lGetUlong(event_client, EV_commid);
 
2041
 
 
2042
      deliver_interval = lGetUlong(event_client, EV_d_time);
 
2043
      busy_handling = lGetUlong(event_client, EV_busy_handling);
 
2044
 
 
2045
      /* somone turned the clock back */
 
2046
      if (lGetUlong(event_client, EV_last_heard_from) > now) {
 
2047
         lSetUlong(event_client, EV_last_heard_from, now);
 
2048
         lSetUlong(event_client, EV_next_send_time, now + deliver_interval);
 
2049
      } else if (lGetUlong(event_client, EV_last_send_time)  > now) {
 
2050
         lSetUlong(event_client, EV_last_send_time, now);
 
2051
      }
 
2052
  
 
2053
      /* if set, use qmaster_params SCHEDULER_TIMEOUT */
 
2054
      if (scheduler_timeout > 0) {
 
2055
         timeout = scheduler_timeout;
 
2056
      } else {
 
2057
         /* is the ack timeout expired ? */
 
2058
         timeout = 10*deliver_interval;
 
2059
         
 
2060
         if (timeout < EVENT_ACK_MIN_TIMEOUT) {
 
2061
            timeout = EVENT_ACK_MIN_TIMEOUT;
 
2062
         } else if (timeout > EVENT_ACK_MAX_TIMEOUT) {
 
2063
            timeout = EVENT_ACK_MAX_TIMEOUT;
 
2064
         }
 
2065
      }
 
2066
 
 
2067
      if (now > (lGetUlong(event_client, EV_last_heard_from) + timeout)) {
 
2068
         ERROR((SGE_EVENT, MSG_COM_ACKTIMEOUT4EV_ISIS, (int) timeout, commproc, 
 
2069
                  (int) commid, host));
 
2070
         remove_event_client(&event_client, ec_id, false);
 
2071
         event_client = next_event_client;
 
2072
         continue; /* while */
 
2073
      }
 
2074
 
 
2075
      /* do we have to deliver events ? */
 
2076
      if (now >= lGetUlong(event_client, EV_next_send_time)) {
 
2077
         if ((busy_handling == EV_THROTTLE_FLUSH) || !lGetUlong(event_client, EV_busy)) {
 
2078
            lList *lp = NULL;
 
2079
 
 
2080
            /* put only pointer in report - dont copy */
 
2081
            lXchgList(event_client, EV_events, &lp);
 
2082
            lXchgList(report, REP_list, &lp);
 
2083
 
 
2084
            if (update_func != NULL) {
 
2085
               update_func(ec_id, NULL, report_list);
 
2086
               ret = CL_RETVAL_OK;
 
2087
            } else {
 
2088
               ret = report_list_send(ctx, report_list, host, commproc, commid, 0);
 
2089
               MONITOR_MESSAGES_OUT(monitor);
 
2090
            }
 
2091
 
 
2092
            /* on failure retry is triggered automatically */
 
2093
            if (ret == CL_RETVAL_OK) {
 
2094
               now = (time_t)sge_get_gmt();
 
2095
 
 
2096
               switch (busy_handling) {
 
2097
                  case EV_THROTTLE_FLUSH:
 
2098
                     /* increase busy counter */
 
2099
                     lSetUlong(event_client, EV_busy, lGetUlong(event_client, EV_busy) + 1);
 
2100
                     break;
 
2101
                  case EV_BUSY_UNTIL_RELEASED:
 
2102
                  case EV_BUSY_UNTIL_ACK:
 
2103
                     lSetUlong(event_client, EV_busy, 1);
 
2104
                     break;
 
2105
                  default:
 
2106
                     /* EV_BUSY_NO_HANDLING */
 
2107
                     break;
 
2108
               }
 
2109
 
 
2110
               lSetUlong(event_client, EV_last_send_time, now);
 
2111
            }
 
2112
 
 
2113
            /* We reset this time even if the report list send failed because we
 
2114
             * want to give failed clients a break before trying them again. */
 
2115
            lSetUlong(event_client, EV_next_send_time, now + deliver_interval);
 
2116
 
 
2117
            /* don't delete sent events - deletion is triggerd by ack's */
 
2118
            lXchgList(report, REP_list, &lp);
 
2119
            lXchgList(event_client, EV_events, &lp);
 
2120
         } else {
 
2121
            MONITOR_EDT_BUSY(monitor);
 
2122
         }
 
2123
      } /*if */
 
2124
 
 
2125
      event_client = next_event_client;
 
2126
   } /* while */
 
2127
   
 
2128
   sge_mutex_unlock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
 
2129
   DRETURN_VOID;
 
2130
} /* send_events() */
 
2131
 
 
2132
static void flush_events(lListElem *event_client, int interval)
 
2133
{
 
2134
   u_long32 next_send = 0;
 
2135
   u_long32 flush_delay = 0;
 
2136
   int now = sge_get_gmt();
 
2137
 
 
2138
   DENTER(TOP_LAYER, "flush_events");
 
2139
 
 
2140
   SGE_ASSERT(event_client != NULL);
 
2141
 
 
2142
   next_send = lGetUlong(event_client, EV_next_send_time);
 
2143
   next_send = MIN(next_send, now + interval);
 
2144
 
 
2145
   /* never send out two event packages in the very same second */
 
2146
   if (lGetUlong(event_client, EV_busy_handling) == EV_THROTTLE_FLUSH) {
 
2147
      u_long32 busy_counter = lGetUlong(event_client, EV_busy);
 
2148
      u_long32 ed_time = lGetUlong(event_client, EV_d_time);
 
2149
      u_long32 flush_delay_rate = MAX(lGetUlong(event_client, EV_flush_delay), 1);
 
2150
 
 
2151
      if (busy_counter >= flush_delay_rate) {
 
2152
         /* busy counters larger than flush delay cause events being
 
2153
            sent out in regular event delivery interval for alive protocol
 
2154
            purposes with event client */
 
2155
         flush_delay = MAX(flush_delay, ed_time);
 
2156
      } else {
 
2157
         /* for smaller busy counters event delivery interval is scaled
 
2158
            down with the busy counter */
 
2159
         flush_delay = MAX(flush_delay, ed_time * busy_counter / flush_delay_rate);
 
2160
      }
 
2161
   }
 
2162
 
 
2163
   next_send = MAX(next_send, lGetUlong(event_client, EV_last_send_time) + flush_delay);
 
2164
   lSetUlong(event_client, EV_next_send_time, next_send);
 
2165
 
 
2166
   if (now >= next_send) {
 
2167
      set_flush();
 
2168
   }
 
2169
 
 
2170
   DPRINTF(("%s: %s %d\tNOW: %d NEXT FLUSH: %d (%s,%s,%d)\n",
 
2171
            SGE_FUNC,
 
2172
            ((lGetString(event_client, EV_name) != NULL) ? lGetString(event_client, EV_name) : "<null>"),
 
2173
            lGetUlong(event_client, EV_id),
 
2174
            now,
 
2175
            next_send,
 
2176
            ((lGetHost(event_client, EV_host) != NULL) ? lGetHost(event_client, EV_host) : "<null>"),
 
2177
            ((lGetString(event_client, EV_commproc) != NULL) ? lGetString(event_client, EV_commproc) : "<null>"),
 
2178
            lGetUlong(event_client, EV_commid)));
 
2179
 
 
2180
   DRETURN_VOID;
 
2181
} /* flush_events() */
 
2182
 
 
2183
/****** Eventclient/Server/total_update() **************************************
 
2184
*  NAME
 
2185
*     total_update() -- send all data to eventclient
 
2186
*
 
2187
*  SYNOPSIS
 
2188
*     static void 
 
2189
*     total_update(lListElem *event_client) 
 
2190
*
 
2191
*  FUNCTION
 
2192
*     Sends all complete lists it subscribed to an eventclient.
 
2193
*     If the event client receives a complete list instead of single events,
 
2194
*     it should completely update it's database.
 
2195
*
 
2196
*  INPUTS
 
2197
*     lListElem *event_client - the event client to update
 
2198
*
 
2199
*  NOTES
 
2200
*     MT-NOTE: total_update() is MT safe, IF the function is invoked with
 
2201
*     MT-NOTE: 'LOCK_EVENT_CLIENT_LST' locked! This is in accordance with
 
2202
*     MT-NOTE: the acquire/release protocol as defined by the Grid Engine
 
2203
*     MT-NOTE: Locking API.
 
2204
*     MT-NOTE: the method also locks the global lock. One has to make sure,
 
2205
*     MT-NOTE: that no calling method has that lock already.
 
2206
*
 
2207
*  SEE ALSO
 
2208
*     libs/lck/sge_lock.h
 
2209
*     libs/lck/sge_lock.c
 
2210
*
 
2211
*******************************************************************************/
 
2212
static void total_update(lListElem *event_client, monitoring_t *monitor)
 
2213
{
 
2214
   object_description *master_table = NULL;
 
2215
 
 
2216
   DENTER(TOP_LAYER, "total_update");
 
2217
 
 
2218
   master_table = object_type_get_global_object_description();
 
2219
 
 
2220
   blockEvents(event_client, sgeE_ALL_EVENTS, true);
 
2221
 
 
2222
   sge_set_commit_required();
 
2223
 
 
2224
   total_update_event(event_client, sgeE_ADMINHOST_LIST, master_table, false);
 
2225
   total_update_event(event_client, sgeE_CALENDAR_LIST, master_table, false);
 
2226
   total_update_event(event_client, sgeE_CKPT_LIST, master_table, false);
 
2227
   total_update_event(event_client, sgeE_CENTRY_LIST, master_table, false);
 
2228
   total_update_event(event_client, sgeE_CONFIG_LIST, master_table, false);
 
2229
   total_update_event(event_client, sgeE_EXECHOST_LIST, master_table, false);
 
2230
   total_update_event(event_client, sgeE_JOB_LIST, master_table, false);
 
2231
   total_update_event(event_client, sgeE_JOB_SCHEDD_INFO_LIST, master_table, false);
 
2232
   total_update_event(event_client, sgeE_MANAGER_LIST, master_table, false);
 
2233
   total_update_event(event_client, sgeE_OPERATOR_LIST, master_table, false);
 
2234
   total_update_event(event_client, sgeE_PE_LIST, master_table, false);
 
2235
   total_update_event(event_client, sgeE_CQUEUE_LIST, master_table, false);
 
2236
   total_update_event(event_client, sgeE_SCHED_CONF, master_table, false);
 
2237
   total_update_event(event_client, sgeE_SUBMITHOST_LIST, master_table, false);
 
2238
   total_update_event(event_client, sgeE_USERSET_LIST, master_table, false);
 
2239
   total_update_event(event_client, sgeE_NEW_SHARETREE, master_table, false);
 
2240
   total_update_event(event_client, sgeE_PROJECT_LIST, master_table, false);
 
2241
   total_update_event(event_client, sgeE_USER_LIST, master_table, false);
 
2242
   total_update_event(event_client, sgeE_HGROUP_LIST, master_table, false);
 
2243
   total_update_event(event_client, sgeE_RQS_LIST, master_table, false);
 
2244
   total_update_event(event_client, sgeE_AR_LIST, master_table, false);
 
2245
#ifndef __SGE_NO_USERMAPPING__
 
2246
   total_update_event(event_client, sgeE_CUSER_LIST, master_table, false);
 
2247
#endif
 
2248
 
 
2249
   sge_commit();
 
2250
 
 
2251
   DRETURN_VOID;
 
2252
} /* total_update() */
 
2253
 
 
2254
 
 
2255
/****** evm/sge_event_master/build_subscription() ******************************
 
2256
*  NAME
 
2257
*     build_subscription() -- generates an array out of the cull registration
 
2258
*                                 structure
 
2259
*
 
2260
*  SYNOPSIS
 
2261
*     static void build_subscription(lListElem *event_el) 
 
2262
*
 
2263
*  FUNCTION
 
2264
*      generates an array out of the cull registration
 
2265
*      structure. The array contains all event elements and each of them 
 
2266
*      has an identifier, if it is subscribed or not. Before that is done, it is
 
2267
*      tested, the EV_changed flag is set. If not, the function simply returns.
 
2268
*
 
2269
*
 
2270
*  INPUTS
 
2271
*     lListElem *event_el - the event element, which event structure will be transformed 
 
2272
*
 
2273
*******************************************************************************/
 
2274
static void build_subscription(lListElem *event_el)
 
2275
{
 
2276
   lList *subscription = lGetList(event_el, EV_subscribed);
 
2277
   lListElem *sub_el = NULL;
 
2278
   subscription_t *sub_array = NULL;
 
2279
   subscription_t *old_sub_array = NULL;
 
2280
   int i = 0;
 
2281
 
 
2282
   DENTER(TOP_LAYER, "build_subscription");
 
2283
 
 
2284
   if (!lGetBool(event_el, EV_changed)) {
 
2285
      DRETURN_VOID;
 
2286
   }
 
2287
 
 
2288
   DPRINTF(("rebuild event mask for client(id): %s("sge_u32")\n", lGetString(event_el, EV_name), lGetUlong(event_el, EV_id)));
 
2289
 
 
2290
   sub_array = (subscription_t *) malloc(sizeof(subscription_t) * sgeE_EVENTSIZE);
 
2291
   memset(sub_array, 0, sizeof(subscription_t) * sgeE_EVENTSIZE); 
 
2292
 
 
2293
   for (i = 0; i < sgeE_EVENTSIZE; i++) {
 
2294
      sub_array[i].subscription = EV_NOT_SUBSCRIBED;
 
2295
      sub_array[i].blocked = false;
 
2296
   }
 
2297
 
 
2298
   for_each(sub_el, subscription) {
 
2299
      const lListElem *temp = NULL;
 
2300
      u_long32 event = lGetUlong(sub_el, EVS_id);
 
2301
 
 
2302
      sub_array[event].subscription = EV_SUBSCRIBED;
 
2303
      sub_array[event].flush = lGetBool(sub_el, EVS_flush) ? true : false;
 
2304
      sub_array[event].flush_time = lGetUlong(sub_el, EVS_interval);
 
2305
 
 
2306
      if ((temp = lGetObject(sub_el, EVS_where))) {
 
2307
         sub_array[event].where = lWhereFromElem(temp);
 
2308
      }
 
2309
 
 
2310
      if ((temp = lGetObject(sub_el, EVS_what))) {
 
2311
         sub_array[event].what = lWhatFromElem(temp);
 
2312
      }
 
2313
   }
 
2314
 
 
2315
   old_sub_array = lGetRef(event_el, EV_sub_array);
 
2316
 
 
2317
   if (old_sub_array) {
 
2318
      int i;
 
2319
      for (i = 0; i < sgeE_EVENTSIZE; i++) {
 
2320
         lFreeWhere(&(old_sub_array[i].where));
 
2321
         lFreeWhat(&(old_sub_array[i].what));
 
2322
         if (old_sub_array[i].descr){
 
2323
            cull_hash_free_descr(old_sub_array[i].descr);
 
2324
            free(old_sub_array[i].descr);
 
2325
         }
 
2326
      }
 
2327
      free(old_sub_array);
 
2328
   }
 
2329
 
 
2330
   lSetRef(event_el, EV_sub_array, sub_array);
 
2331
   lSetBool(event_el, EV_changed, false);
 
2332
 
 
2333
   DRETURN_VOID;
 
2334
} /* build_subscription() */
 
2335
 
 
2336
/****** Eventclient/Server/check_send_new_subscribed_list() ********************
 
2337
*  NAME
 
2338
*     check_send_new_subscribed_list() -- check suscription for new list events
 
2339
*
 
2340
*  SYNOPSIS
 
2341
*     static void 
 
2342
*     check_send_new_subscribed_list(const subscription_t *old_subscription, 
 
2343
*                                    const subscription_t *new_subscription, 
 
2344
*                                    lListElem *event_client, 
 
2345
*                                    ev_event event) 
 
2346
*
 
2347
*  FUNCTION
 
2348
*     Checks, if sgeE*_LIST events have been added to the subscription of a
 
2349
*     certain event client. If yes, send these lists to the event client.
 
2350
*
 
2351
*  INPUTS
 
2352
*     const subscription_t *old_subscription - former subscription
 
2353
*     const subscription_t *new_subscription - new subscription
 
2354
*     lListElem *event_client                - the event client object
 
2355
*     ev_event event                         - the event to check
 
2356
*     object_description *master_table       - master list table
 
2357
*
 
2358
*  SEE ALSO
 
2359
*     Eventclient/Server/total_update_event()
 
2360
*
 
2361
*******************************************************************************/
 
2362
static void 
 
2363
check_send_new_subscribed_list(const subscription_t *old_subscription,
 
2364
                               const subscription_t *new_subscription,
 
2365
                               lListElem *event_client, ev_event event,
 
2366
                               object_description *master_table)
 
2367
{
 
2368
   if ((new_subscription[event].subscription == EV_SUBSCRIBED) &&
 
2369
      (old_subscription[event].subscription == EV_NOT_SUBSCRIBED)) {
 
2370
      total_update_event(event_client, event, master_table, true);
 
2371
   }
 
2372
}
 
2373
 
 
2374
/****** Eventclient/Server/eventclient_subscribed() ************************
 
2375
*  NAME
 
2376
*     eventclient_subscribed() -- has event client subscribed an event?
 
2377
*
 
2378
*  SYNOPSIS
 
2379
*     #include "sge_event_master.h"
 
2380
*
 
2381
*     int 
 
2382
*     eventclient_subscribed(const lListElem *event_client, ev_event event) 
 
2383
*
 
2384
*  FUNCTION
 
2385
*     Checks if the given event client has a certain event subscribed.
 
2386
*     For event clients that use session filtering additional conditions
 
2387
*     must be fulfilled otherwise the event counts not as subscribed.
 
2388
*
 
2389
*  INPUTS
 
2390
*     const lListElem *event_client - event client to check
 
2391
*     ev_event event                - event to check
 
2392
*     const char *session           - session key of this event
 
2393
*
 
2394
*  RESULT
 
2395
*     int - 0 = not subscribed, 1 = subscribed
 
2396
*
 
2397
*  SEE ALSO
 
2398
*     Eventclient/-Session filtering
 
2399
*******************************************************************************/
 
2400
static int eventclient_subscribed(const lListElem *event_client, ev_event event,
 
2401
                                  const char *session)
 
2402
{
 
2403
   const subscription_t *subscription = NULL;
 
2404
   const char *ec_session = NULL;
 
2405
 
 
2406
   DENTER(TOP_LAYER, "eventclient_subscribed");
 
2407
 
 
2408
   SGE_ASSERT(event_client != NULL);
 
2409
 
 
2410
   if (event_client == NULL) {
 
2411
      DRETURN(0);
 
2412
   }
 
2413
 
 
2414
   subscription = lGetRef(event_client, EV_sub_array);
 
2415
   ec_session = lGetString(event_client, EV_session);
 
2416
 
 
2417
   if (subscription == NULL) {
 
2418
      DPRINTF(("No subscription!\n"));
 
2419
      DRETURN(0);
 
2420
   }
 
2421
 
 
2422
   if (ec_session) {
 
2423
      if (session) {
 
2424
         /* events that belong to a specific session are not subscribed
 
2425
            in case the event client is not interested in that session */
 
2426
         if (strcmp(session, ec_session)) {
 
2427
            DPRINTF(("Event session does not match client session\n"));
 
2428
            DRETURN(0);
 
2429
         }
 
2430
      } else {
 
2431
         /* events that do not belong to a specific session are not
 
2432
            subscribed if the event client is interested in events of a
 
2433
            specific session.
 
2434
            The only exception are list events, because list events do not
 
2435
            belong to a specific session. These events require filtering on
 
2436
            a more fine grained level */
 
2437
         if (!IS_TOTAL_UPDATE_EVENT(event)) {
 
2438
            DRETURN(0);
 
2439
         }
 
2440
      }
 
2441
   }
 
2442
   if ((subscription[event].subscription == EV_SUBSCRIBED) &&
 
2443
      (subscription[event].blocked == false)) {
 
2444
      DRETURN(1);
 
2445
   }
 
2446
 
 
2447
   DRETURN(0);
 
2448
}
 
2449
 
 
2450
/****** evm/sge_event_master/purge_event_list() ********************************
 
2451
*  NAME
 
2452
*     purge_event_list() -- purge event list
 
2453
*
 
2454
*  SYNOPSIS
 
2455
*     static int purge_event_list(lList* aList, ev_event event_number) 
 
2456
*
 
2457
*  FUNCTION
 
2458
*     Remove all events from 'aList' which do have an event id less than or
 
2459
*     equal to 'event_number'.
 
2460
*
 
2461
*  INPUTS
 
2462
*     lList* aList     - event list
 
2463
*     ev_event event_number - event
 
2464
*
 
2465
*  RESULT
 
2466
*     int - number of events purged.
 
2467
*
 
2468
*  NOTES
 
2469
*     MT-NOTE: purge_event_list() is NOT MT safe. 
 
2470
*     MT-NOTE: 
 
2471
*     MT-NOTE: Do not call this function without having 'aList' locked!
 
2472
*
 
2473
*  BUGS
 
2474
*     BUGBUG-AD: If 'event_number' == 0, not events will be purged. However zero is
 
2475
*     BUGBUG-AD: also the id of 'sgeE_ALL_EVENTS'. Is this behaviour correct?
 
2476
*
 
2477
*******************************************************************************/
 
2478
static int purge_event_list(lList *event_list, u_long32 event_number)
 
2479
{
 
2480
   int purged = 0, pos = 0;
 
2481
   lListElem *ev = NULL;
 
2482
 
 
2483
   DENTER(TOP_LAYER, "purge_event_list");
 
2484
 
 
2485
   if (event_number == 0) {
 
2486
      DRETURN(0);
 
2487
   }
 
2488
 
 
2489
   pos = lGetPosInDescr(ET_Type, ET_number);
 
2490
   ev = lFirst(event_list);
 
2491
 
 
2492
   while (ev != NULL) {
 
2493
      lListElem *tmp = ev;
 
2494
 
 
2495
      ev = lNext(ev); /* fetch next event, before the old one will be deleted */
 
2496
 
 
2497
      if (lGetPosUlong(tmp, pos) > event_number) {
 
2498
         break;
 
2499
      }
 
2500
 
 
2501
      lRemoveElem(event_list, &tmp);
 
2502
      purged++;
 
2503
   }
 
2504
 
 
2505
   DRETURN(purged);
 
2506
} /* remove_events_from_client() */
 
2507
 
 
2508
static void add_list_event_direct(lListElem *event_client, lListElem *event, 
 
2509
                                  bool copy_event)
 
2510
{
 
2511
   lList *lp = NULL;
 
2512
   lList *clp = NULL;
 
2513
   lListElem *ep = NULL;
 
2514
   ev_event type = (ev_event)lGetUlong(event, ET_type);
 
2515
   subscription_t *subscription = NULL;
 
2516
   char buffer[1024];
 
2517
   dstring buffer_wrapper;
 
2518
   u_long32 i = 0;
 
2519
   const lCondition *selection = NULL;
 
2520
   const lEnumeration *fields = NULL;
 
2521
   const lDescr *descr = NULL;
 
2522
   bool internal_client = false;
 
2523
 
 
2524
   DENTER(TOP_LAYER, "add_list_event_direct"); 
 
2525
 
 
2526
   SGE_ASSERT(event_client != NULL);
 
2527
 
 
2528
   if (lGetUlong(event_client, EV_state) != EV_connected) {
 
2529
      /* the event client is not connected anymore, so we are not
 
2530
         adding new events to it*/
 
2531
      if (!copy_event) {
 
2532
         lFreeElem(&event);
 
2533
      }
 
2534
      DRETURN_VOID;
 
2535
   }
 
2536
 
 
2537
   /* detect internal event clients */
 
2538
   if (lGetRef(event_client, EV_update_function) != NULL) {
 
2539
      internal_client = true;
 
2540
   }
 
2541
 
 
2542
   /* if the total updates blocked the client, we have to unblock this list */
 
2543
   blockEvents(event_client, type, false);
 
2544
 
 
2545
   sge_dstring_init(&buffer_wrapper, buffer, sizeof(buffer));
 
2546
 
 
2547
   /* Pull out payload for selecting */
 
2548
   lXchgList(event, ET_new_version, &lp);
 
2549
 
 
2550
   /* If the list is NULL, no need to bother with any of this.  Plus, if we
 
2551
    * did do this part with a NULL list, the check for a clp with no
 
2552
    * elements would kick us out. */
 
2553
   if (lp != NULL) {
 
2554
      subscription = lGetRef(event_client, EV_sub_array);
 
2555
      selection = subscription[type].where;
 
2556
      fields = subscription[type].what;
 
2557
 
 
2558
#if 1
 
2559
      DPRINTF(("deliver event: %d with where filter=%s and what filter=%s\n",
 
2560
               type, selection?"true":"false", fields?"true":"false"));
 
2561
#endif
 
2562
 
 
2563
      if (fields) {
 
2564
         descr = getDescriptorL(subscription, lp, type);
 
2565
 
 
2566
         DPRINTF(("Reducing event data\n"));
 
2567
         
 
2568
         if (!list_select(subscription, type, &clp, lp, selection, fields,
 
2569
                          descr, internal_client)) {
 
2570
            clp = lSelectDPack("updating list", lp, selection, descr,
 
2571
                               fields, internal_client, NULL, NULL);
 
2572
         }
 
2573
 
 
2574
         /* no elements in the event list, no need for an event */
 
2575
         if (!SEND_EVENTS[type] && lGetNumberOfElem(clp) == 0) {
 
2576
            if (clp != NULL) {
 
2577
               lFreeList(&clp);
 
2578
            }
 
2579
 
 
2580
            /* we are not making a copy, so we have to restore the old element */
 
2581
            lXchgList(event, ET_new_version, &lp);
 
2582
 
 
2583
            if (!copy_event) {
 
2584
               lFreeElem(&event);
 
2585
            }
 
2586
 
 
2587
            DPRINTF(("Skipping event because it has no content for this client.\n"));
 
2588
            DRETURN_VOID;
 
2589
         }
 
2590
 
 
2591
         /* If we're not making a copy, we have to free the original list.  If
 
2592
          * we are making a copy, freeing the list is the responsibility of the
 
2593
          * calling function. */
 
2594
         if (!copy_event) {
 
2595
            lFreeList(&lp);
 
2596
         }
 
2597
      } else if (copy_event) {
 
2598
         /* If there's no what clause, and we want a copy, we copy the list */
 
2599
         DPRINTF(("Copying event data\n"));
 
2600
         clp = lCopyListHash(lGetListName(lp), lp, internal_client);
 
2601
      } else {
 
2602
         /* If there's no what clause, and we don't want to copy, we just reuse
 
2603
          * the original list. */
 
2604
         clp = lp;
 
2605
         if (internal_client) {
 
2606
            cull_hash_create_hashtables(clp);
 
2607
         }
 
2608
         /* Make sure lp is clear for the next part. */
 
2609
         lp = NULL;
 
2610
      }
 
2611
   } /* if */
 
2612
 
 
2613
   /* If we're making a copy, copy the event and swap the orignial list
 
2614
    * back into the original event */
 
2615
   if (copy_event) {
 
2616
      DPRINTF(("Copying event\n"));
 
2617
      ep = lCopyElemHash(event, false);
 
2618
 
 
2619
      lXchgList(event, ET_new_version, &lp);
 
2620
   } else {
 
2621
      /* If we're not making a copy, reuse the original event. */
 
2622
      ep = event;
 
2623
   }
 
2624
 
 
2625
   /* Swap the new list into the working event. */
 
2626
   lXchgList(ep, ET_new_version, &clp);
 
2627
 
 
2628
   /* fill in event number and increment
 
2629
      EV_next_number of event recipient */
 
2630
   i = lGetUlong(event_client, EV_next_number);
 
2631
   lSetUlong(event_client, EV_next_number, (i + 1));
 
2632
   lSetUlong(ep, ET_number, i);
 
2633
 
 
2634
   /* build a new event list if not exists */
 
2635
   lp = lGetList(event_client, EV_events);
 
2636
 
 
2637
   if (lp == NULL) {
 
2638
      lp=lCreateListHash("Events", ET_Type, false);
 
2639
      lSetList(event_client, EV_events, lp);
 
2640
   }
 
2641
 
 
2642
   /* chain in new event */
 
2643
   lAppendElem(lp, ep);
 
2644
 
 
2645
   DPRINTF(("%d %s\n", lGetUlong(event_client, EV_id),
 
2646
            event_text(ep, &buffer_wrapper)));
 
2647
 
 
2648
   /* check if event clients wants flushing */
 
2649
   subscription = lGetRef(event_client, EV_sub_array);
 
2650
 
 
2651
   if (type == sgeE_QMASTER_GOES_DOWN) {
 
2652
      Event_Master_Control.is_prepare_shutdown = true;
 
2653
      lSetUlong(event_client, EV_busy, 0); /* can't be too busy for shutdown */
 
2654
      flush_events(event_client, 0);
 
2655
   } else if (type == sgeE_SHUTDOWN) {
 
2656
      flush_events(event_client, 0);
 
2657
      /* the event client should be shutdown, so we do not add any events to it, after
 
2658
         the shutdown event */
 
2659
      lSetUlong(event_client, EV_state, EV_closing); 
 
2660
   } else if (subscription[type].flush) {
 
2661
      DPRINTF(("flushing event client\n"));
 
2662
      flush_events(event_client, subscription[type].flush_time);
 
2663
   }
 
2664
 
 
2665
   DRETURN_VOID;
 
2666
}
 
2667
 
 
2668
/****** Eventclient/Server/total_update_event() *******************************
 
2669
*  NAME
 
2670
*     total_update_event() -- create a total update event
 
2671
*
 
2672
*  SYNOPSIS
 
2673
*     static void 
 
2674
*     total_update_event(lListElem *event_client, ev_event type) 
 
2675
*
 
2676
*  FUNCTION
 
2677
*     Creates an event delivering a certain list of objects for an event client.
 
2678
*     For event clients that have subscribed a session list filtering can be done
 
2679
*     here.
 
2680
*
 
2681
*  INPUTS
 
2682
*     lListElem *event_client          - event client to receive the list
 
2683
*     ev_event type                    - event describing the list to update
 
2684
*     object_description *object_base  - master list table
 
2685
*
 
2686
*******************************************************************************/
 
2687
static void total_update_event(lListElem *event_client, ev_event type, object_description *object_base, 
 
2688
                               bool new_subscription) 
 
2689
{
 
2690
   lList *lp = NULL; /* lp should be set, if we have to make a copy */
 
2691
   lList *copy_lp = NULL; /* copy_lp should be used for a copy of the org. list */
 
2692
   char buffer[1024];
 
2693
   dstring buffer_wrapper;
 
2694
   u_long32 id;
 
2695
 
 
2696
   DENTER(TOP_LAYER, "total_update_event");
 
2697
 
 
2698
   SGE_ASSERT(event_client != NULL);
 
2699
 
 
2700
   sge_dstring_init(&buffer_wrapper, buffer, sizeof(buffer));
 
2701
   id = lGetUlong(event_client, EV_id);
 
2702
 
 
2703
   /* This test bothers me.  Technically, the GDI thread should just drop the
 
2704
    * event in the send queue and forget about it.  However, doing this test
 
2705
    * here could prevent the queuing of events that will later be determined
 
2706
    * to be useless. */
 
2707
   if (new_subscription || eventclient_subscribed(event_client, type, NULL)) {
 
2708
      switch (type) {
 
2709
         case sgeE_ADMINHOST_LIST:
 
2710
            lp = *object_base[SGE_TYPE_ADMINHOST].list;
 
2711
            break;
 
2712
         case sgeE_CALENDAR_LIST:
 
2713
            lp = *object_base[SGE_TYPE_CALENDAR].list;
 
2714
            break;
 
2715
         case sgeE_CKPT_LIST:
 
2716
            lp = *object_base[SGE_TYPE_CKPT].list;
 
2717
            break;
 
2718
         case sgeE_CENTRY_LIST:
 
2719
            lp = *object_base[SGE_TYPE_CENTRY].list;
 
2720
            break;
 
2721
         case sgeE_CONFIG_LIST:
 
2722
            /* sge_get_configuration() returns a copy already, we do not need to make
 
2723
               one later */
 
2724
            lp = *object_base[SGE_TYPE_CONFIG].list;
 
2725
            break;
 
2726
         case sgeE_EXECHOST_LIST:
 
2727
            lp = *object_base[SGE_TYPE_EXECHOST].list;
 
2728
            break;
 
2729
         case sgeE_JOB_LIST:
 
2730
            lp = *object_base[SGE_TYPE_JOB].list;
 
2731
            break;
 
2732
         case sgeE_JOB_SCHEDD_INFO_LIST:
 
2733
            lp = *object_base[SGE_TYPE_JOB_SCHEDD_INFO].list;
 
2734
            break;
 
2735
         case sgeE_MANAGER_LIST:
 
2736
            lp = *object_base[SGE_TYPE_MANAGER].list;
 
2737
            break;
 
2738
         case sgeE_NEW_SHARETREE:
 
2739
            lp = *object_base[SGE_TYPE_SHARETREE].list;
 
2740
            break;
 
2741
         case sgeE_OPERATOR_LIST:
 
2742
            lp = *object_base[SGE_TYPE_OPERATOR].list;
 
2743
            break;
 
2744
         case sgeE_PE_LIST:
 
2745
            lp = *object_base[SGE_TYPE_PE].list;
 
2746
            break;
 
2747
         case sgeE_PROJECT_LIST:
 
2748
            lp = *object_base[SGE_TYPE_PROJECT].list;
 
2749
            break;
 
2750
         case sgeE_CQUEUE_LIST:
 
2751
            lp = *object_base[SGE_TYPE_CQUEUE].list;
 
2752
            break;
 
2753
         case sgeE_SCHED_CONF:
 
2754
            copy_lp = sconf_get_config_list();
 
2755
            break;
 
2756
         case sgeE_SUBMITHOST_LIST:
 
2757
            lp = *object_base[SGE_TYPE_SUBMITHOST].list;
 
2758
            break;
 
2759
         case sgeE_USER_LIST:
 
2760
            lp = *object_base[SGE_TYPE_USER].list;
 
2761
            break;
 
2762
         case sgeE_USERSET_LIST:
 
2763
            lp = *object_base[SGE_TYPE_USERSET].list;
 
2764
            break;
 
2765
         case sgeE_HGROUP_LIST:
 
2766
            lp = *object_base[SGE_TYPE_HGROUP].list;
 
2767
            break;
 
2768
         case sgeE_RQS_LIST:
 
2769
            lp = *object_base[SGE_TYPE_RQS].list;
 
2770
            break;
 
2771
         case sgeE_AR_LIST:
 
2772
            lp = *object_base[SGE_TYPE_AR].list;
 
2773
            break;
 
2774
#ifndef __SGE_NO_USERMAPPING__
 
2775
         case sgeE_CUSER_LIST:
 
2776
            lp = *object_base[SGE_TYPE_CUSER].list;
 
2777
            break;
 
2778
#endif
 
2779
         default:
 
2780
            WARNING((SGE_EVENT, MSG_EVE_TOTALUPDATENOTHANDLINGEVENT_I, type));
 
2781
            DRETURN_VOID;
 
2782
      } /* switch */
 
2783
 
 
2784
      if (lp != NULL) {
 
2785
         copy_lp = lCopyListHash(lGetListName(lp), lp, false);
 
2786
      }
 
2787
 
 
2788
      /* 'send_events()' will free the copy of 'lp' */
 
2789
      add_list_event_for_client(id, 0, type, 0, 0, NULL, NULL, NULL,
 
2790
                                copy_lp, false);
 
2791
   } /* if */
 
2792
 
 
2793
   DRETURN_VOID;
 
2794
} /* total_update_event() */
 
2795
 
 
2796
 
 
2797
/****** evm/sge_event_master/list_select() *************************************
 
2798
*  NAME
 
2799
*     list_select() -- makes a reduced job list dublication 
 
2800
*
 
2801
*  SYNOPSIS
 
2802
*     static bool list_select(subscription_t *subscription, int type, lList 
 
2803
*     **reduced_lp, lList *lp, const lCondition *selection, const lEnumeration 
 
2804
*     *fields, const lDescr *descr, bool do_hash) 
 
2805
*
 
2806
*  FUNCTION
 
2807
*     Only works on job events. All others are ignored. The job events
 
2808
*     need some special handling and this is done in this function. The
 
2809
*     JAT_Type list can be subscribed by its self and it is also part
 
2810
*     of the JB_Type. If a JAT_Type filter is set, this function also
 
2811
*     filters the JAT_Type lists in the JB_Type lists.
 
2812
*
 
2813
*  INPUTS
 
2814
*     subscription_t *subscription - subscription array 
 
2815
*     int type                     - event type 
 
2816
*     lList **reduced_lp           - target list (has to be an empty list) 
 
2817
*     lList *lp                    - source list (will be modified) 
 
2818
*     const lCondition *selection  - where filter 
 
2819
*     const lEnumeration *fields   - what filter 
 
2820
*     const lDescr *descr          - reduced descriptor 
 
2821
*     bool do_hash                 - create hash tables in the target list
 
2822
*
 
2823
*  RESULT
 
2824
*     static bool - true, if it was a job event 
 
2825
*
 
2826
*******************************************************************************/
 
2827
static bool list_select(subscription_t *subscription, int type,
 
2828
                        lList **reduced_lp, lList *lp,
 
2829
                        const lCondition *selection, const lEnumeration *fields,
 
2830
                        const lDescr *descr, bool do_hash)
 
2831
{
 
2832
   bool ret = false;
 
2833
   int entry_counter;
 
2834
   int event_counter;
 
2835
 
 
2836
   DENTER(TOP_LAYER, "list_select");
 
2837
   
 
2838
   for (entry_counter = 0; entry_counter < LIST_MAX; entry_counter++) {
 
2839
      event_counter = -1;
 
2840
      while (EVENT_LIST[entry_counter][++event_counter] != -1) {
 
2841
         if (type == EVENT_LIST[entry_counter][event_counter]) {
 
2842
            int sub_type = -1;
 
2843
            int i = -1;
 
2844
 
 
2845
            while (SOURCE_LIST[entry_counter][++i] != -1) {
 
2846
               if (subscription[SOURCE_LIST[entry_counter][i]].what) {
 
2847
                  sub_type = SOURCE_LIST[entry_counter][i];
 
2848
                  break;
 
2849
               }
 
2850
            }
 
2851
  
 
2852
            if (sub_type != -1) {
 
2853
               lListElem *element = NULL;
 
2854
               lListElem *reduced_el = NULL;
 
2855
 
 
2856
               ret = true;
 
2857
               *reduced_lp = lCreateListHash("update", descr, do_hash);
 
2858
 
 
2859
               for_each(element, lp) {
 
2860
                  reduced_el = elem_select(subscription, element,
 
2861
                               FIELD_LIST[entry_counter], selection,
 
2862
                               fields, descr, sub_type);
 
2863
 
 
2864
                  lAppendElem(*reduced_lp, reduced_el);
 
2865
               }
 
2866
            } else {
 
2867
               DPRINTF(("no sub type filter specified\n"));
 
2868
            }
 
2869
            goto end;
 
2870
         } /* end if */
 
2871
      } /* end while */
 
2872
   } /* end for */
 
2873
end:
 
2874
   DRETURN(ret);
 
2875
}
 
2876
 
 
2877
/****** sge_event_master/elem_select() ******************************************
 
2878
*  NAME
 
2879
*     elem_select() -- makes a reduced copy of an element with reducing sublists
 
2880
*                      as well
 
2881
*
 
2882
*  SYNOPSIS
 
2883
*     static lListElem *elem_select(subscription_t *subscription, lListElem *element, 
 
2884
*                              const int ids[], const lCondition *selection, 
 
2885
*                              const lEnumeration *fields, const lDescr *dp, int sub_type)
 
2886
*
 
2887
*  FUNCTION
 
2888
*     The function will apply the given filters for the element. Before the element
 
2889
*     is reduced, all attribute sub lists named in "ids" will be removed from the list and
 
2890
*     reduced. The reduced sub lists will be added the the reduced element and the original
 
2891
*     element will be restored. The sub-lists will only be reduced, if the reduced element
 
2892
*     still contains their attributes.
 
2893
*
 
2894
*  INPUTS
 
2895
*     subscription_t *subscription - subscription array 
 
2896
*     lListElem *element           - the element to reduce
 
2897
*     const int ids[]              - attribute with sublists to be reduced as well
 
2898
*     const lCondition *selection  - where filter 
 
2899
*     const lEnumeration *fields   - what filter 
 
2900
*     const lDescr *descr          - reduced descriptor 
 
2901
*     int sub_type                 - list type of the sublists.
 
2902
*
 
2903
*  RESULT
 
2904
*     bool - the reduced element, or NULL if something went wrong
 
2905
*
 
2906
*  NOTE:
 
2907
*  MT-NOTE: works only on the variables -> thread save
 
2908
*
 
2909
*******************************************************************************/
 
2910
static lListElem *elem_select(subscription_t *subscription, lListElem *element,
 
2911
                              const int ids[], const lCondition *selection,
 
2912
                              const lEnumeration *fields, const lDescr *dp, int sub_type)
 
2913
{
 
2914
   const lCondition *sub_selection = NULL;
 
2915
   const lEnumeration *sub_fields = NULL;
 
2916
   const lDescr *sub_descr = NULL;
 
2917
   lList **sub_list;
 
2918
   lListElem *el = NULL;
 
2919
   int counter;
 
2920
 
 
2921
   DENTER(TOP_LAYER, "elem_select");
 
2922
 
 
2923
   if (element == NULL) {
 
2924
      DRETURN(NULL);
 
2925
   }
 
2926
 
 
2927
   if (sub_type <= sgeE_ALL_EVENTS || sub_type >= sgeE_EVENTSIZE) {
 
2928
      /* TODO: SG: add error message */
 
2929
      DPRINTF(("wrong event sub type\n"));
 
2930
      DRETURN(NULL);
 
2931
   }
 
2932
 
 
2933
   /* get the filters for the sub lists */
 
2934
   if (sub_type >= 0) {
 
2935
      sub_selection = subscription[sub_type].where;
 
2936
      sub_fields = subscription[sub_type].what;
 
2937
   }
 
2938
 
 
2939
   if (sub_fields) { /* do we have a sub list filter, otherwise ... */
 
2940
      int ids_size = 0;
 
2941
 
 
2942
      /* allocate memory to store the sub-lists, which should be handeled special */
 
2943
      while (ids[ids_size] != -1) {
 
2944
         ids_size++;
 
2945
      }
 
2946
      sub_list = malloc(ids_size * sizeof(lList*));
 
2947
      memset(sub_list, 0 , ids_size * sizeof(lList*));
 
2948
 
 
2949
      /* remove the sub-lists from the main element */
 
2950
      for(counter = 0; counter < ids_size; counter ++) {
 
2951
         lXchgList(element, ids[counter], &(sub_list[counter]));
 
2952
      }
 
2953
 
 
2954
      /* get descriptor for reduced sub-lists */
 
2955
      if (!sub_descr) {
 
2956
         for(counter = 0; counter < ids_size; counter ++) {
 
2957
            if (sub_list[counter]) {
 
2958
               sub_descr = getDescriptorL(subscription, sub_list[counter], sub_type);
 
2959
               break;
 
2960
            }
 
2961
         }
 
2962
      }
 
2963
 
 
2964
      /* copy the main list */
 
2965
      if (!fields) {
 
2966
         /* there might be no filter for the main element, but for the sub-lists */
 
2967
         el = lCopyElemHash(element, false);
 
2968
      } else if (!dp) {
 
2969
         /* for some reason, we did not get a descriptor for the target element */
 
2970
         el = lSelectElemPack(element, selection, fields, false, NULL);
 
2971
      } else {
 
2972
         el = lSelectElemDPack(element, selection, dp, fields, false, NULL, NULL);
 
2973
      }
 
2974
 
 
2975
      /* if we have a new reduced main element */
 
2976
      if (el) {
 
2977
         /* copy the sub-lists, if they are still part of the reduced main element */
 
2978
         for (counter = 0; counter < ids_size; counter ++) {
 
2979
            if (sub_list[counter] && (lGetPosViaElem(el, ids[counter], SGE_NO_ABORT) != -1)) {
 
2980
               lSetList(el, ids[counter],
 
2981
                        lSelectDPack("", sub_list[counter], sub_selection,
 
2982
                                     sub_descr, sub_fields, false, NULL, NULL));
 
2983
            }
 
2984
         }
 
2985
      }
 
2986
 
 
2987
      /* restore the old sub_list */
 
2988
      for (counter = 0; counter < ids_size; counter ++) {
 
2989
         lXchgList(element, ids[counter], &(sub_list[counter]));
 
2990
      }
 
2991
 
 
2992
      FREE(sub_list);
 
2993
   } else {
 
2994
      DPRINTF(("no sub filter specified\n"));
 
2995
      el = lSelectElemDPack(element, selection, dp, fields, false, NULL, NULL);
 
2996
   }
 
2997
 
 
2998
   DRETURN(el);
 
2999
}
 
3000
 
 
3001
/****** Eventclient/Server/eventclient_list_locate() **************************
 
3002
*  NAME
 
3003
*     eventclient_list_locate_by_adress() -- search event client by adress
 
3004
*
 
3005
*  SYNOPSIS
 
3006
*     #include "sge_event_master.h"
 
3007
*
 
3008
*     lListElem *
 
3009
*     eventclient_list_locate_by_adress(const char *host, 
 
3010
*                     const char *commproc, u_long32 id) 
 
3011
*
 
3012
*  FUNCTION
 
3013
*     Searches the event client list for an event client with the
 
3014
*     specified commlib adress.
 
3015
*     Returns a pointer to the event client object or
 
3016
*     NULL, if no such event client is registered.
 
3017
*
 
3018
*  INPUTS
 
3019
*     const char *host     - hostname of the event client to search
 
3020
*     const char *commproc - commproc of the event client to search
 
3021
*     u_long32 id          - id of the event client to search
 
3022
*
 
3023
*  RESULT
 
3024
*     lListElem* - event client object or NULL.
 
3025
*
 
3026
*  NOTES
 
3027
*
 
3028
*******************************************************************************/
 
3029
static lListElem *
 
3030
eventclient_list_locate_by_adress(const char *host, const char *commproc,
 
3031
                                  u_long32 id)
 
3032
{
 
3033
   lListElem *ep;
 
3034
 
 
3035
   DENTER(TOP_LAYER, "eventclient_list_locate_by_adress");
 
3036
 
 
3037
   for_each(ep, Event_Master_Control.clients) {
 
3038
      if (lGetUlong(ep, EV_commid) == id &&
 
3039
          !sge_hostcmp(lGetHost(ep, EV_host), host) &&
 
3040
          !strcmp(lGetString(ep, EV_commproc), commproc)) {
 
3041
         break;
 
3042
      }
 
3043
   }
 
3044
 
 
3045
   DRETURN(ep);
 
3046
}
 
3047
 
 
3048
/****** sge_event_master/getDescriptorL() **************************************
 
3049
*  NAME
 
3050
*     getDescriptorL() -- returns a reduced desciptor 
 
3051
*
 
3052
*  SYNOPSIS
 
3053
*     static const lDescr* getDescriptorL(subscription_t *subscription, const 
 
3054
*     lList* list, int type) 
 
3055
*
 
3056
*  FUNCTION
 
3057
*     ??? 
 
3058
*
 
3059
*  INPUTS
 
3060
*     subscription_t *subscription - subscription array 
 
3061
*     const lList* list            - source list 
 
3062
*     int type                     - event type 
 
3063
*
 
3064
*  RESULT
 
3065
*     static const lDescr* - reduced descriptor or NULL, if no what exists 
 
3066
*
 
3067
*  NOTE
 
3068
*   MT-NOTE: thread save, works only on the submitted variables.
 
3069
*******************************************************************************/
 
3070
static const lDescr* getDescriptorL(subscription_t *subscription,
 
3071
                                    const lList* list, int type)
 
3072
{
 
3073
   const lDescr *dp = NULL;
 
3074
 
 
3075
   if (subscription[type].what) {
 
3076
      if (!(dp = subscription[type].descr)) {
 
3077
         subscription[type].descr = lGetReducedDescr(lGetListDescr(list),
 
3078
                                                     subscription[type].what);
 
3079
         dp = subscription[type].descr;
 
3080
      }
 
3081
   }
 
3082
 
 
3083
   return dp;
 
3084
}
 
3085
 
 
3086
/****** sge_event_master/get_event_client() ************************************
 
3087
*  NAME
 
3088
*     get_event_client() -- gets the event client from the list
 
3089
*
 
3090
*  SYNOPSIS
 
3091
*     static lListElem *get_event_client(u_long32 id)
 
3092
*
 
3093
*  FUNCTION
 
3094
*     Returns the event client with the given id, or NULL if no such event
 
3095
*     client exists.
 
3096
*
 
3097
*  INPUTS
 
3098
*     u_long32 id - the client id
 
3099
*
 
3100
*  RESULT
 
3101
*     The event client with the given id, or NULL if no such event client
 
3102
*     exists.
 
3103
*
 
3104
*  NOTE
 
3105
*     MT-NOTE: NOT thread safe.  Requires caller to hold Event_Master_Control.mutex.
 
3106
*******************************************************************************/
 
3107
static lListElem *get_event_client(u_long32 id)
 
3108
{
 
3109
   lListElem *client = NULL;
 
3110
 
 
3111
   client = lGetElemUlong(Event_Master_Control.clients, EV_id, id);
 
3112
 
 
3113
   return client;
 
3114
}
 
3115
 
 
3116
/****** sge_event_master/allocate_new_dynamic_id() *******************************
 
3117
*  NAME
 
3118
*     allocate_new_dynamic_id() -- gets a new dynamic id
 
3119
*
 
3120
*  SYNOPSIS
 
3121
*     static u_long32 allocate_new_dynamic_id(void)
 
3122
*
 
3123
*  FUNCTION
 
3124
*     Returns the next available dynamic event client id.  The id returned will
 
3125
*     be between EV_ID_FIRST_DYNAMIC and Event_Master_Control.max_event_clients +
 
3126
*     EV_ID_FIRST_DYNAMIC.
 
3127
*
 
3128
*  RESULTS
 
3129
*     The next available dynamic event client id.
 
3130
*
 
3131
*  NOTE
 
3132
*     MT-NOTE: allocate_new_dynamic_id() is thread safe,
 
3133
*              when the caller holds Event_Master_Control.mutex.
 
3134
*******************************************************************************/
 
3135
static u_long32
 
3136
allocate_new_dynamic_id(lList **answer_list)
 
3137
{
 
3138
   u_long32 id = 0;
 
3139
 
 
3140
   DENTER(TOP_LAYER, "allocate_new_dynamic_id");
 
3141
 
 
3142
   if (lGetNumberOfElem(Event_Master_Control.clients) < Event_Master_Control.max_event_clients) {
 
3143
      id = range_list_get_first_id(Event_Master_Control.client_ids, answer_list);
 
3144
      if (id != 0) {
 
3145
         range_list_remove_id(&Event_Master_Control.client_ids, answer_list, id);
 
3146
         /* compress the range list to reduce fragmentation */
 
3147
         range_list_compress(Event_Master_Control.client_ids);
 
3148
      }
 
3149
   } else {
 
3150
      ERROR((SGE_EVENT, MSG_TO_MANY_DYNAMIC_EC_U, sge_u32c( Event_Master_Control.max_event_clients)));
 
3151
      answer_list_add(answer_list, SGE_EVENT, STATUS_ESEMANTIC, ANSWER_QUALITY_ERROR);
 
3152
   }
 
3153
 
 
3154
   DRETURN(id);
 
3155
}
 
3156
 
 
3157
static void
 
3158
free_dynamic_id(lList **answer_list, u_long32 id)
 
3159
{
 
3160
   if (id < Event_Master_Control.max_event_clients + EV_ID_FIRST_DYNAMIC) {
 
3161
      range_list_insert_id(&Event_Master_Control.client_ids, answer_list, id);
 
3162
 
 
3163
      /* compress the range list to reduce fragmentation */
 
3164
      range_list_compress(Event_Master_Control.client_ids);
 
3165
   }
 
3166
}
 
3167
 
 
3168
/****** sge_event_master/sge_commit() ****************************************
 
3169
*  NAME
 
3170
*     sge_commit() -- Commit the queued events
 
3171
*
 
3172
*  SYNOPSIS
 
3173
*     bool sge_commit(void)
 
3174
*
 
3175
*  FUNCTION
 
3176
*     Sends any events that this thread currently has queued and clears the
 
3177
*     queue.
 
3178
*
 
3179
*  RESULTS
 
3180
*     Whether the call succeeded.
 
3181
*
 
3182
*  NOTE
 
3183
*     MT-NOTE: sge_commit is thread safe.
 
3184
*******************************************************************************/
 
3185
bool sge_commit(void)
 
3186
{
 
3187
   bool ret = true;
 
3188
 
 
3189
   DENTER(TOP_LAYER, "sge_commit");
 
3190
 
 
3191
   /* need a new C block, as the GET_SPECIFIC macro declares new variables */
 
3192
   {
 
3193
      GET_SPECIFIC(event_master_transaction_t, t_store, sge_event_master_init_transaction_store, Event_Master_Control.transaction_key, "t_store");
 
3194
      if (t_store->is_transaction) {
 
3195
         t_store->is_transaction = false;
 
3196
 
 
3197
         if (lGetNumberOfElem(t_store->transaction_requests) > 0) {
 
3198
            sge_mutex_lock("event_master_request_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.request_mutex);
 
3199
            lAppendList(Event_Master_Control.requests, t_store->transaction_requests);
 
3200
            sge_mutex_unlock("event_master_request_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.request_mutex);
 
3201
 
 
3202
            set_flush();
 
3203
         }
 
3204
      } else {
 
3205
         WARNING((SGE_EVENT, "attempting to commit an event master transaction, but no transaction is open"));
 
3206
         ret = false;
 
3207
      }
 
3208
   }
 
3209
 
 
3210
   DRETURN(ret);
 
3211
}
 
3212
 
 
3213
/****** sge_event_master/blockEvents() ****************************************
 
3214
*  NAME
 
3215
*     blockEvents() -- blocks or unblocks events
 
3216
*
 
3217
*  SYNOPSIS
 
3218
*     void blockEvents(lListElem *event_client, int ev_type, bool isBlock) 
 
3219
*
 
3220
*  FUNCTION
 
3221
*     In case that global update events have to be send, this function
 
3222
*     blocks all events for that list, or unblocks it.
 
3223
*     
 
3224
*  INPUT
 
3225
*    lListElem *event_client : event client to modify
 
3226
*    ev_event ev_type : the event_lists to unblock or -1
 
3227
*    bool isBlock : true: block events, false: unblock
 
3228
*
 
3229
*  NOTE
 
3230
*     MT-NOTE: sge_commit is thread safe.
 
3231
*******************************************************************************/
 
3232
static void blockEvents(lListElem *event_client, ev_event ev_type, bool isBlock) { 
 
3233
   subscription_t *sub_array = lGetRef(event_client, EV_sub_array);
 
3234
 
 
3235
   if (sub_array != NULL) {
 
3236
      int i = -1;
 
3237
      if (ev_type == sgeE_ALL_EVENTS) { /* block all subscribed events, for which are list events subscribed */
 
3238
         while (total_update_events[++i] != -1) {
 
3239
            if (sub_array[total_update_events[i]].subscription == EV_SUBSCRIBED) {
 
3240
               int y = -1;
 
3241
               while (block_events[i][++y] != -1) {
 
3242
                  sub_array[block_events[i][y]].blocked = isBlock;
 
3243
               }
 
3244
            }
 
3245
         }
 
3246
      } else {
 
3247
         while (total_update_events[++i] != -1) {
 
3248
            if (total_update_events[i] == ev_type) {
 
3249
               int y = -1;
 
3250
               while (block_events[i][++y] != -1) {
 
3251
                 sub_array[block_events[i][y]].blocked = isBlock;
 
3252
               }
 
3253
               break;
 
3254
            }
 
3255
         }
 
3256
      }
 
3257
   }
 
3258
}
 
3259
 
 
3260
/****** sge_event_master/set_flush() *******************************************
 
3261
*  NAME
 
3262
*     set_flush() -- Flush all events
 
3263
*
 
3264
*  SYNOPSIS
 
3265
*     void set_flush(void)
 
3266
*
 
3267
*  FUNCTION
 
3268
*     Flushes all pending events
 
3269
*
 
3270
*  NOTE
 
3271
*     MT-NOTE: set_flush is thread safe.
 
3272
*******************************************************************************/
 
3273
static void set_flush(void)
 
3274
{
 
3275
   DENTER(TOP_LAYER, "set_flush");
 
3276
 
 
3277
   sge_mutex_lock("event_master_cond_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.cond_mutex);
 
3278
   if (!Event_Master_Control.delivery_signaled) {
 
3279
      Event_Master_Control.delivery_signaled = true;
 
3280
      pthread_cond_signal(&Event_Master_Control.cond_var);
 
3281
   }
 
3282
   sge_mutex_unlock("event_master_cond_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.cond_mutex);
 
3283
 
 
3284
   DRETURN_VOID;
 
3285
}
 
3286
 
 
3287
/****** sge_event_master/sge_set_commit_required() *****************************
 
3288
*  NAME
 
3289
*     sge_set_commit_required() -- Require commits (make multipe object changes atomic)
 
3290
*
 
3291
*  SYNOPSIS
 
3292
*     void sge_set_commit_required()
 
3293
*
 
3294
*  FUNCTION
 
3295
*  Enables transactions on events. So far a rollback is not supported. It allows to accumulate
 
3296
*  events, while multiple objects are modified and events are issued and to submit them as
 
3297
*  one event package. There can only be one event session open at a time. The transaction_mutex
 
3298
*  will block multiple calles to this method.
 
3299
*  The method cannot be called recursivly, and sge_commit has to be called to close the transaction. 
 
3300
*
 
3301
*
 
3302
*  NOTE
 
3303
*     MT-NOTE: sge_set_commit_required is thread safe.  Transactional event
 
3304
*     processing is handled for each thread individually.
 
3305
*******************************************************************************/
 
3306
void sge_set_commit_required(void)
 
3307
{
 
3308
   DENTER(TOP_LAYER,"sge_set_commit_required");
 
3309
 
 
3310
   /* need a new C block, as the GET_SPECIFIC macro declares new variables */
 
3311
   {
 
3312
      GET_SPECIFIC(event_master_transaction_t, t_store, sge_event_master_init_transaction_store, Event_Master_Control.transaction_key, "t_store");
 
3313
      if (t_store->is_transaction) {
 
3314
         WARNING((SGE_EVENT, "attempting to open a new event master transaction, but we already have a transaction open"));
 
3315
      } else {
 
3316
         t_store->is_transaction = true;
 
3317
      }
 
3318
   }
 
3319
 
 
3320
   DRETURN_VOID;
 
3321
}
 
3322
 
 
3323
void sge_event_master_process_requests(monitoring_t *monitor)
 
3324
{
 
3325
   lList *requests = NULL;
 
3326
 
 
3327
   DENTER(TOP_LAYER, "sge_event_master_process_requests");
 
3328
 
 
3329
   /*
 
3330
    * get the request list
 
3331
    * put a new empty list in place to allow new requests while we process the old ones
 
3332
    */
 
3333
   sge_mutex_lock("event_master_request_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.request_mutex);
 
3334
 
 
3335
   if (lGetNumberOfElem(Event_Master_Control.requests) > 0) {
 
3336
      requests = Event_Master_Control.requests;
 
3337
      Event_Master_Control.requests = lCreateListHash("Event Master Requests", EVR_Type, false);
 
3338
   }
 
3339
 
 
3340
   sge_mutex_unlock("event_master_request_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.request_mutex);
 
3341
 
 
3342
   /* if there have been any requests - process them */
 
3343
   if (requests != NULL) {
 
3344
      lListElem *request = NULL;
 
3345
 
 
3346
      while ((request = lFirst(requests)) != NULL) {
 
3347
         DPRINTF(("processing event master request: %d\n", lGetUlong(request, EVR_operation)));
 
3348
         switch (lGetUlong(request, EVR_operation)) {
 
3349
            case EVR_ADD_EVC:
 
3350
               sge_event_master_process_add_event_client(request, monitor);
 
3351
               break;
 
3352
            case EVR_MOD_EVC:
 
3353
               sge_event_master_process_mod_event_client(request, monitor);
 
3354
               break;
 
3355
            case EVR_DEL_EVC:
 
3356
               break;
 
3357
            case EVR_ADD_EVENT:
 
3358
               sge_event_master_process_send(request, monitor);
 
3359
               break;
 
3360
            case EVR_ACK_EVENT:
 
3361
               sge_event_master_process_ack(request, monitor);
 
3362
               break;
 
3363
         }
 
3364
 
 
3365
         lRemoveElem(requests, &request);
 
3366
      }
 
3367
 
 
3368
      lFreeList(&requests);
 
3369
   }
 
3370
}
 
3371