1
/*___INFO__MARK_BEGIN__*/
2
/*************************************************************************
4
* The Contents of this file are made available subject to the terms of
5
* the Sun Industry Standards Source License Version 1.2
7
* Sun Microsystems Inc., March, 2001
10
* Sun Industry Standards Source License Version 1.2
11
* =================================================
12
* The contents of this file are subject to the Sun Industry Standards
13
* Source License Version 1.2 (the "License"); You may not use this file
14
* except in compliance with the License. You may obtain a copy of the
15
* License at http://gridengine.sunsource.net/Gridengine_SISSL_license.html
17
* Software provided under this License is provided on an "AS IS" basis,
18
* WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING,
19
* WITHOUT LIMITATION, WARRANTIES THAT THE SOFTWARE IS FREE OF DEFECTS,
20
* MERCHANTABLE, FIT FOR A PARTICULAR PURPOSE, OR NON-INFRINGING.
21
* See the License for the specific provisions governing your rights and
22
* obligations concerning the Software.
24
* The Initial Developer of the Original Code is: Sun Microsystems, Inc.
26
* Copyright: 2001 by Sun Microsystems, Inc.
28
* All Rights Reserved.
30
************************************************************************/
31
/*___INFO__MARK_END__*/
40
#include "cull/cull.h"
41
#include "sgeobj/sge_feature.h"
42
#include "uti/sge_time.h"
43
#include "sgeobj/sge_host.h"
44
#include "sgeobj/sge_event.h"
45
#include "sgeobj/sge_all_listsL.h"
46
#include "uti/sge_prog.h"
47
#include "rmon/sgermon.h"
48
#include "uti/sge_log.h"
49
#include "sgeobj/sge_conf.h"
50
#include "sgeobj/sge_answer.h"
51
#include "sgeobj/sge_qinstance.h"
52
#include "sgeobj/sge_report.h"
53
#include "sgeobj/sge_ckpt.h"
54
#include "sgeobj/sge_pe.h"
55
#include "sgeobj/sge_userprj.h"
56
#include "sgeobj/sge_job.h"
57
#include "uti/sge_hostname.h"
58
#include "sgeobj/sge_userset.h"
59
#include "sgeobj/sge_manop.h"
60
#include "sgeobj/sge_calendar.h"
61
#include "sgeobj/sge_sharetree.h"
62
#include "sgeobj/sge_hgroup.h"
63
#include "sgeobj/sge_cuser.h"
64
#include "sgeobj/sge_centry.h"
65
#include "sgeobj/sge_cqueue.h"
66
#include "sgeobj/sge_object.h"
67
#include "sgeobj/sge_range.h"
68
#include "lck/sge_mtutil.h"
69
#include "configuration_qmaster.h" /* bad dependency!! */
70
#include "comm/lists/cl_errors.h"
71
#include "comm/cl_commlib.h"
72
#include "uti/sge_profiling.h"
73
#include "uti/sge_spool.h"
74
#include "sgeobj/sge_event_requestL.h"
76
#include "lck/sge_lock.h"
78
#include "comm/lists/cl_thread.h"
80
#include "uti/sge_thread_ctrl.h"
82
#include "gdi/sge_gdi_ctx.h"
84
#include "msg_common.h"
85
#include "msg_sgeobjlib.h"
86
#include "msg_evmlib.h"
88
#include "sge_event_master.h"
91
***** transaction handling implementation ************
93
* Well, one cannot really call the transaction implementation
94
* transaction handling. First of all, there is no role
95
* back. Second, there is only one transaction in the
96
* whole system, one no way to distinguish between the
97
* events added by the thread, which opend the transaction,
98
* and other threads just adding events.
100
* We need the current implementation for the scheduler
101
* protocol. All gdi requets have to be atomic from the
102
* scheduler point of view. Therefore we have a second
103
* (the transaction) list to add events, and put them
104
* into the send queue, when the gdi request has been
107
* Important variables:
108
* pthread_mutex_t transaction_mutex;
109
* lList *transaction_events;
110
* pthread_mutex_t t_add_event_mutex;
111
* bool is_transaction;
114
* sge_set_commit_required()
117
******************************************************
122
***** subscription_t definition **********************
124
* This is a subscription entry in a two dimensional
125
* definition. The first dimension is for the event
126
* clients and changes when the event client subscription
127
* changes. The second dimension is of fixed size and
128
* contains one element for each posible event.
130
******************************************************
133
bool subscription; /* true -> the event is subscribed */
134
bool blocked; /* true -> no events will be accepted before */
135
/* the total update is issued */
136
bool flush; /* true -> flush is set */
137
u_long32 flush_time; /* seconds how much the event can be delayed */
138
lCondition *where; /* where filter */
139
lDescr *descr; /* target list descriptor */
140
lEnumeration *what; /* limits the target element */
143
/****** Eventclient/Server/-Event_Client_Server_Defines ************************
145
* Defines -- Constants used in the module
148
* #define EVENT_DELIVERY_INTERVAL_S 1
149
* #define EVENT_DELIVERY_INTERVAL_N 0
150
* #define EVENT_ACK_MIN_TIMEOUT 600
151
* #define EVENT_ACK_MAX_TIMEOUT 1200
154
* EVENT_DELIVERY_INTERVAL_S is the event delivery interval. It is set in seconds.
155
* EVENT_DELIVERY_INTERVAL_N same thing but in nano seconds.
157
* EVENT_ACK_MIN/MAX_TIMEOUT is the minimum/maximum timeout value for an event
158
* client sending the acknowledge for the delivery of events.
159
* The real timeout value depends on the event delivery interval for the
160
* event client (10 * event delivery interval).
161
*******************************************************************************/
162
#define EVENT_DELIVERY_INTERVAL_S 1
163
#define EVENT_DELIVERY_INTERVAL_N 0
164
#define EVENT_ACK_MIN_TIMEOUT 600
165
#define EVENT_ACK_MAX_TIMEOUT 1200
168
*******************************************************************
170
* The next three array are important for lists, which can be
171
* subscribed by the event client and which contain a sub-list
172
* that can be subscribed by itself again (such as the: job list
173
* with the JAT_Task sub list, or the cluster queue list with the
174
* queue instances sub-list)
175
* All lists, which follow the same structure have to be defined
176
* in the special construct.
179
* Contains all events for the main list, which delivers also
183
* Contains all attributes in the main list, which contain the
184
* sub-list in question.
187
* Contains the sub-scription events for the sub-list, which als
188
* contains the filter for the sub-list.
191
* This construct and its functions are limited to one sub-scribable
192
* sub-list per main list. If multiple sub-lists can be subsribed, the
193
* construct has to be exetended.
196
* 1416: the sgeE_PETASK_* list events are not handled correctly during
197
* total update. It works fine with all other events. The problem
198
* is, that an update of the jobs filters for job and ja_task, but
202
* evm/sge_event_master/list_select()
203
* evm/sge_event_master/elem_select()
205
* evm/sge_event_master/add_list_event
206
* evm/sge_event_master/add_event
208
********************************************************************
212
const int EVENT_LIST[LIST_MAX][6] = {
213
{sgeE_JOB_LIST, sgeE_JOB_ADD, sgeE_JOB_DEL, sgeE_JOB_MOD, sgeE_JOB_MOD_SCHED_PRIORITY, -1},
214
{sgeE_CQUEUE_LIST, sgeE_CQUEUE_ADD, sgeE_CQUEUE_DEL, sgeE_CQUEUE_MOD, -1, -1},
215
{sgeE_JATASK_ADD, sgeE_JATASK_DEL, sgeE_JATASK_MOD, -1, -1, -1 }
218
const int FIELD_LIST[LIST_MAX][3] = {
219
{JB_ja_tasks, JB_ja_template, -1},
220
{CQ_qinstances, -1, -1},
221
{JAT_task_list, -1, -1}
224
const int SOURCE_LIST[LIST_MAX][3] = {
225
{sgeE_JATASK_MOD, sgeE_JATASK_ADD, -1},
226
{sgeE_QINSTANCE_ADD, sgeE_QINSTANCE_MOD, -1},
227
{sgeE_PETASK_ADD, -1, -1}
231
*****************************************************
233
* The next two array are needed for blocking events
234
* for a given client, when a total update is pending
237
* The total_update_events array defines all list events,
238
* which are used during a total update.
240
* The block_events contain all events, which are blocked
241
* during a total update.
246
* add_list_event_direct
247
*****************************************************
250
#ifndef __SGE_NO_USERMAPPING__
251
#define total_update_eventsMAX 22
253
#define total_update_eventsMAX 21
256
const int total_update_events[total_update_eventsMAX + 1] = {sgeE_ADMINHOST_LIST,
263
sgeE_JOB_SCHEDD_INFO_LIST,
269
sgeE_SUBMITHOST_LIST,
277
#ifndef __SGE_NO_USERMAPPING__
282
const int block_events[total_update_eventsMAX][9] = {
283
{sgeE_ADMINHOST_ADD, sgeE_ADMINHOST_DEL, sgeE_ADMINHOST_MOD, -1, -1, -1, -1, -1, -1},
284
{sgeE_CALENDAR_ADD, sgeE_CALENDAR_DEL, sgeE_CALENDAR_MOD, -1, -1, -1, -1, -1, -1},
285
{sgeE_CKPT_ADD, sgeE_CKPT_DEL, sgeE_CKPT_MOD, -1, -1, -1, -1, -1, -1},
286
{sgeE_CENTRY_ADD, sgeE_CENTRY_DEL, sgeE_CENTRY_MOD, -1, -1, -1, -1, -1, -1},
287
{sgeE_CONFIG_ADD, sgeE_CONFIG_DEL, sgeE_CONFIG_MOD, -1, -1, -1, -1, -1, -1},
288
{sgeE_EXECHOST_ADD, sgeE_EXECHOST_DEL, sgeE_EXECHOST_MOD, -1, -1, -1, -1, -1, -1},
289
{sgeE_JOB_ADD, sgeE_JOB_DEL, sgeE_JOB_MOD, sgeE_JOB_MOD_SCHED_PRIORITY, sgeE_JOB_USAGE, sgeE_JOB_FINAL_USAGE, sgeE_JOB_FINISH, -1, -1},
290
{sgeE_JOB_SCHEDD_INFO_ADD, sgeE_JOB_SCHEDD_INFO_DEL, sgeE_JOB_SCHEDD_INFO_MOD, -1, -1, -1, -1, -1, -1},
291
{sgeE_MANAGER_ADD, sgeE_MANAGER_DEL, sgeE_MANAGER_MOD, -1, -1, -1, -1, -1, -1},
292
{sgeE_OPERATOR_ADD, sgeE_OPERATOR_DEL, sgeE_OPERATOR_MOD, -1, -1, -1, -1, -1, -1},
293
{sgeE_PE_ADD, sgeE_PE_DEL, sgeE_PE_MOD, -1, -1, -1, -1, -1, -1},
294
{sgeE_CQUEUE_ADD, sgeE_CQUEUE_DEL, sgeE_CQUEUE_MOD, sgeE_QINSTANCE_ADD, sgeE_QINSTANCE_DEL, sgeE_QINSTANCE_MOD, sgeE_QINSTANCE_SOS, sgeE_QINSTANCE_USOS, -1},
295
{-1, -1, -1, -1, -1, -1, -1, -1},
296
{sgeE_SUBMITHOST_ADD, sgeE_SUBMITHOST_DEL, sgeE_SUBMITHOST_MOD, -1, -1, -1, -1, -1, -1},
297
{sgeE_USERSET_ADD, sgeE_USERSET_DEL, sgeE_USERSET_MOD, -1, -1, -1, -1, -1, -1},
298
{-1, -1, -1, -1, -1, -1, -1, -1},
299
{sgeE_PROJECT_ADD, sgeE_PROJECT_DEL, sgeE_PROJECT_MOD, -1, -1, -1, -1, -1, -1},
300
{sgeE_USER_ADD, sgeE_USER_DEL, sgeE_USER_MOD, -1, -1, -1, -1, -1, -1},
301
{sgeE_HGROUP_ADD, sgeE_HGROUP_DEL, sgeE_HGROUP_MOD, -1, -1, -1, -1, -1, -1},
302
{sgeE_RQS_ADD, sgeE_RQS_DEL, sgeE_RQS_MOD, -1, -1, -1, -1, -1, -1},
303
{sgeE_AR_ADD, sgeE_AR_DEL, sgeE_AR_MOD, -1, -1, -1, -1, -1, -1}
304
#ifndef __SGE_NO_USERMAPPING__
305
,{sgeE_CUSER_ADD, sgeE_CUSER_DEL, sgeE_CUSER_MOD, -1, -1, -1, -1, -1, -1}
311
*******************************************************************
313
* Some events have to be delivered even so they have no data left
314
* after filtering for them. These are for example all update list
316
* The ensure, that is is done as fast as posible, we define an
317
* array of the size of the number of events, we have a init function
318
* which sets the events which will be updated. To add a new event
319
* one has only to update that function.
320
* Events which do not contain any data are not affected. They are
328
* evm/sge_event_master/sge_init_send_events()
330
******************************************************************
332
static bool SEND_EVENTS[sgeE_EVENTSIZE];
334
event_master_control_t Event_Master_Control = {
335
PTHREAD_MUTEX_INITIALIZER, /* mutex */
336
PTHREAD_COND_INITIALIZER, /* cond_var */
337
PTHREAD_MUTEX_INITIALIZER, /* cond_mutex */
338
false, /* delivery_signaled */
339
0, /* max_event_clients */
340
false, /* is_prepare_shutdown */
342
NULL, /* client_ids */
344
PTHREAD_MUTEX_INITIALIZER, /* request_mutex */
345
0 /* transaction_key */
348
static void init_send_events(void);
349
static void flush_events(lListElem*, int);
350
static void total_update(lListElem*, monitoring_t *monitor);
351
static void build_subscription(lListElem*);
352
static void remove_event_client(lListElem **client, int event_client_id, bool lock_event_master);
353
static void check_send_new_subscribed_list(const subscription_t*,
354
const subscription_t*, lListElem*,
355
ev_event event, object_description *master_table);
356
static int eventclient_subscribed(const lListElem *, ev_event, const char*);
357
static int purge_event_list(lList* aList, u_long32 event_number);
358
static bool add_list_event_for_client(u_long32, u_long32, ev_event, u_long32, u_long32, const char*, const char*, const char*, lList*, bool);
359
static void add_list_event_direct(lListElem *event_client,
360
lListElem *event, bool copy_event);
361
static void total_update_event(lListElem*, ev_event, object_description *master_table, bool new_subscription);
362
static bool list_select(subscription_t*, int, lList**, lList*, const lCondition*, const lEnumeration*, const lDescr*, bool);
363
static lListElem* elem_select(subscription_t*, lListElem*, const int[], const lCondition*, const lEnumeration*, const lDescr*, int);
364
static lListElem* eventclient_list_locate_by_adress(const char*, const char*, u_long32);
365
static const lDescr* getDescriptorL(subscription_t*, const lList*, int);
366
static lListElem* get_event_client(u_long32 id);
367
static u_long32 allocate_new_dynamic_id(lList **answer_list);
368
static void free_dynamic_id(lList **answer_list, u_long32 id);
369
static void set_flush(void);
371
static void blockEvents(lListElem *event_client, ev_event ev_type, bool isBlock);
374
sge_event_master_destroy_transaction_store(void *transaction_store)
376
event_master_transaction_t *t_store = (event_master_transaction_t *)transaction_store;
377
lFreeList(&(t_store->transaction_requests));
381
sge_event_master_init_transaction_store(event_master_transaction_t *t_store)
383
t_store->is_transaction = false;
384
t_store->transaction_requests = lCreateListHash("Event Master Requests", EVR_Type, false);
387
/****** Eventclient/Server/sge_add_event_client() ******************************
389
* sge_add_event_client() -- register a new event client
392
* #include "sge_event_master.h"
395
* sge_add_event_client(lListElem *clio, lList **alpp, lList **eclpp,
396
* char *ruser, char *rhost)
399
* Registeres a new event client.
400
* If it requested a dynamic id, a new id is created and assigned.
401
* If it is a special client(with fixed id) and an event client
402
* with this id already exists, the old instance is deleted and the
403
* new one registered.
404
* If the registration succeeds, the event client is sent all data
405
* (sgeE*_LIST events) according to its subscription.
408
* lListElem *clio - the event client object used as registration data
409
* lList **alpp - answer list pointer for answer to event client
410
* lList **eclpp - list pointer to return new event client object
411
* char *ruser - user that tries to register an event client
412
* char *rhost - host on which the event client runs
413
* event_client_update_func_t update_func - for internal event clients
414
* monitoring_t monitor - monitoring handle
417
* int - AN_status value. STATUS_OK on success, else error code
420
* MT-NOTE: sge_add_event_client() is MT safe, it uses the global lock and
423
*******************************************************************************/
424
static void sge_event_master_process_add_event_client(lListElem *request, monitoring_t *monitor)
426
/* to be implemented later on - handling the internal event clients could become a little bit tricky */
429
int sge_add_event_client(lListElem *clio, lList **alpp, lList **eclpp, char *ruser,
430
char *rhost, event_client_update_func_t update_func, monitoring_t *monitor)
432
lListElem *ep = NULL;
438
const char *commproc;
439
u_long32 commproc_id;
441
DENTER(TOP_LAYER, "sge_add_event_client");
443
id = lGetUlong(clio, EV_id);
444
name = lGetString(clio, EV_name);
445
ed_time = lGetUlong(clio, EV_d_time);
446
host = lGetHost(clio, EV_host);
447
commproc = lGetString(clio, EV_commproc);
448
commproc_id = lGetUlong(clio, EV_commid);
450
/* an event client must have a name */
453
lSetString(clio, EV_name, name);
456
/* check event client object structure */
457
if (lCompListDescr(lGetElemDescr(clio), EV_Type) != 0) {
458
ERROR((SGE_EVENT, MSG_EVE_INCOMPLETEEVENTCLIENT));
459
answer_list_add(alpp, SGE_EVENT, STATUS_ESEMANTIC, ANSWER_QUALITY_ERROR);
460
DRETURN(STATUS_DENIED);
463
/* EV_ID_ANY is 0, therefore the compare is always true (Irix complained) */
464
if (id >= EV_ID_FIRST_DYNAMIC) { /* invalid request */
465
ERROR((SGE_EVENT, MSG_EVE_ILLEGALIDREGISTERED_U, sge_u32c(id)));
466
answer_list_add(alpp, SGE_EVENT, STATUS_ESEMANTIC, ANSWER_QUALITY_ERROR);
467
DRETURN(STATUS_ESEMANTIC);
470
if (lGetBool(clio, EV_changed) && lGetList(clio, EV_subscribed) == NULL) {
471
ERROR((SGE_EVENT, MSG_EVE_INVALIDSUBSCRIPTION));
472
answer_list_add(alpp, SGE_EVENT, STATUS_ESEMANTIC, ANSWER_QUALITY_ERROR);
473
DRETURN(STATUS_ESEMANTIC);
476
/* Acquire the event master mutex - we access the event client list */
477
sge_mutex_lock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
479
if (Event_Master_Control.is_prepare_shutdown) {
480
sge_mutex_unlock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
481
ERROR((SGE_EVENT, MSG_EVE_QMASTERISGOINGDOWN));
482
answer_list_add(alpp, SGE_EVENT, STATUS_ESEMANTIC, ANSWER_QUALITY_ERROR);
483
DRETURN(STATUS_ESEMANTIC);
486
if (id == EV_ID_ANY) { /* qmaster shall give id dynamically */
487
/* Try to find an event client with the very same commd
488
address triplet. If it exists the "new" event client must
489
be the result of a reconnect after a timeout that happend at
490
client side. We delete the old event client. */
491
ep = eventclient_list_locate_by_adress(host, commproc, commproc_id);
494
ERROR((SGE_EVENT, MSG_EVE_CLIENTREREGISTERED_SSSU, name, host,
495
commproc, sge_u32c(commproc_id)));
497
/* delete old event client entry, and we already hold the lock! */
498
remove_event_client(&ep, id, false);
501
/* Otherwise, get a new dynamic event client id */
502
id = allocate_new_dynamic_id(alpp);
505
sge_mutex_unlock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
506
DRETURN(STATUS_ESEMANTIC);
509
INFO((SGE_EVENT, MSG_EVE_REG_SUU, name, sge_u32c(id), sge_u32c(ed_time)));
511
/* Set the new id for this client. */
512
lSetUlong(clio, EV_id, id);
515
/* special event clients: we allow only one instance */
516
/* if it already exists, delete the old one and register the new one */
517
if (id > EV_ID_ANY && id < EV_ID_FIRST_DYNAMIC) {
519
** we allow addition of a priviledged event client
520
** for internal clients (==> update_func != NULL)
521
** and manager/operator
523
if (update_func == NULL && !manop_is_manager(ruser)) {
524
sge_mutex_unlock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
525
ERROR((SGE_EVENT, MSG_WRONG_USER_FORFIXEDID ));
526
answer_list_add(alpp, SGE_EVENT, STATUS_ESEMANTIC, ANSWER_QUALITY_ERROR);
527
DRETURN(STATUS_ESEMANTIC);
530
if ((ep = get_event_client(id)) != NULL) {
531
/* we already have this special client */
532
ERROR((SGE_EVENT, MSG_EVE_CLIENTREREGISTERED_SSSU, name, host,
533
commproc, sge_u32c(commproc_id)));
535
/* delete old event client entry, and we already have the mutex! */
536
remove_event_client(&ep, id, false);
538
INFO((SGE_EVENT, MSG_EVE_REG_SUU, name, sge_u32c(id), sge_u32c(ed_time)));
542
ep = lCopyElem(clio);
543
lSetRef(ep, EV_update_function, (void*) update_func);
544
lSetBool(clio, EV_changed, false);
546
lAppendElem(Event_Master_Control.clients, ep);
548
lSetUlong(ep, EV_next_number, 1);
550
/* register this contact */
552
lSetUlong(ep, EV_last_send_time, 0);
553
lSetUlong(ep, EV_next_send_time, now + lGetUlong(ep, EV_d_time));
554
lSetUlong(ep, EV_last_heard_from, now);
555
lSetUlong(ep, EV_state, EV_connected);
557
/* return new event client object to internal event client */
559
lListElem *ret_el = lCopyElem(ep);
560
if (*eclpp == NULL) {
561
*eclpp = lCreateListHash("new event client", EV_Type, true);
563
lSetBool(ret_el, EV_changed, false);
564
lAppendElem(*eclpp, ret_el);
567
/* Start with no pending events. */
568
build_subscription(ep);
570
/* build events for total update */
571
total_update(ep, monitor);
573
/* flush initial list events */
576
sge_mutex_unlock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
578
INFO((SGE_EVENT, MSG_SGETEXT_ADDEDTOLIST_SSSS,
579
ruser, rhost, name, MSG_EVE_EVENTCLIENT));
580
answer_list_add(alpp, SGE_EVENT, STATUS_OK, ANSWER_QUALITY_INFO);
583
} /* sge_event_master_process_add_event_client() */
585
/****** Eventclient/Server/sge_mod_event_client() ******************************
587
* sge_mod_event_client() -- modify event client
590
* #include "sge_event_master.h"
593
* sge_mod_event_client(lListElem *clio, lList **alpp, lList **eclpp,
594
* char *ruser, char *rhost)
597
* An event client object is modified.
598
* It is possible to modify the event delivery time and
600
* If the subscription is changed, and new sgeE*_LIST events are subscribed,
601
* these lists are sent to the event client.
604
* lListElem *clio - object containing the data to change
605
* lList **alpp - answer list pointer
606
* char *ruser - user that triggered the modify action
607
* char *rhost - host that triggered the modify action
610
* int - AN_status code. STATUS_OK on success, else error code
613
* MT-NOTE: sge_mod_event_client() is MT safe, uses internal locks
618
*******************************************************************************/
620
sge_mod_event_client(lListElem *clio, lList **alpp, char *ruser, char *rhost)
622
lListElem *evr = NULL;
624
DENTER(TOP_LAYER,"sge_mod_event_client");
627
ERROR((SGE_EVENT, "NULL element passed to sge_mod_event_client"));
629
DRETURN(STATUS_ESEMANTIC);
632
evr = lCreateElem(EVR_Type);
633
lSetUlong(evr, EVR_operation, EVR_MOD_EVC);
634
lSetUlong(evr, EVR_timestamp, sge_get_gmt());
635
lSetObject(evr, EVR_event_client, lCopyElem(clio));
637
sge_mutex_lock("event_master_request_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.request_mutex);
638
lAppendElem(Event_Master_Control.requests, evr);
639
sge_mutex_unlock("event_master_request_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.request_mutex);
641
DEBUG((SGE_EVENT, MSG_SGETEXT_MODIFIEDINLIST_SSSS,
642
ruser, rhost, lGetString(clio, EV_name), MSG_EVE_EVENTCLIENT));
643
answer_list_add(alpp, SGE_EVENT, STATUS_OK, ANSWER_QUALITY_INFO);
650
/****** Eventclient/Server/sge_event_master_process_mod_event_client() ********
652
* sge_mod_event_client() -- modify event client
655
* #include "sge_event_master.h"
658
* sge_event_master_process_mod_event_client(lListElem *clio, lList **alpp,
659
* lList **eclpp, char *ruser,
663
* An event client object is modified.
664
* It is possible to modify the event delivery time and
666
* If the subscription is changed, and new sgeE*_LIST events are subscribed,
667
* these lists are sent to the event client.
670
* lListElem *clio - object containing the data to change
671
* lList **alpp - answer list pointer
672
* char *ruser - user that triggered the modify action
673
* char *rhost - host that triggered the modify action
676
* int - AN_status code. STATUS_OK on success, else error code
679
* MT-NOTE: sge_mod_event_client() is NOT MT safe.
681
*******************************************************************************/
683
sge_event_master_process_mod_event_client(lListElem *request, monitoring_t *monitor)
685
lListElem *event_client = NULL;
688
u_long32 busy_handling;
690
lListElem *clio = NULL;
691
cl_thread_settings_t *thread_config = NULL;
693
DENTER(TOP_LAYER, "sge_event_master_process_mod_event_client");
695
clio = lGetObject(request, EVR_event_client);
697
/* try to find event_client */
698
id = lGetUlong(clio, EV_id);
700
sge_mutex_lock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
701
event_client = get_event_client(id);
703
if (event_client == NULL) {
704
sge_mutex_unlock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
705
ERROR((SGE_EVENT, MSG_EVE_UNKNOWNEVCLIENT_US, sge_u32c(id), "modify"));
709
/* these parameters can be changed */
710
busy = lGetUlong(clio, EV_busy);
711
ev_d_time = lGetUlong(clio, EV_d_time);
712
busy_handling = lGetUlong(clio, EV_busy_handling);
714
/* check for validity */
716
sge_mutex_unlock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
717
ERROR((SGE_EVENT, MSG_EVE_INVALIDINTERVAL_U, sge_u32c(ev_d_time)));
721
if (lGetBool(clio, EV_changed) && lGetList(clio, EV_subscribed) == NULL) {
722
sge_mutex_unlock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
723
ERROR((SGE_EVENT, MSG_EVE_INVALIDSUBSCRIPTION));
727
/* event delivery interval changed.
728
* We have to update the next delivery time to
729
* next_delivery_time - old_interval + new_interval
731
if (ev_d_time != lGetUlong(event_client, EV_d_time)) {
732
lSetUlong(event_client, EV_next_send_time,
733
lGetUlong(event_client, EV_next_send_time) -
734
lGetUlong(event_client, EV_d_time) + ev_d_time);
735
lSetUlong(event_client, EV_d_time, ev_d_time);
738
/* subscription changed */
739
if (lGetBool(clio, EV_changed)) {
740
subscription_t *new_sub = NULL;
741
subscription_t *old_sub = NULL;
742
object_description *master_table = object_type_get_object_description();
743
build_subscription(clio);
744
new_sub = lGetRef(clio, EV_sub_array);
745
old_sub = lGetRef(event_client, EV_sub_array);
747
MONITOR_WAIT_TIME(SGE_LOCK(LOCK_GLOBAL, LOCK_READ), monitor);
749
check_send_new_subscribed_list(old_sub, new_sub, event_client, sgeE_ADMINHOST_LIST, master_table);
750
check_send_new_subscribed_list(old_sub, new_sub, event_client, sgeE_CALENDAR_LIST, master_table);
751
check_send_new_subscribed_list(old_sub, new_sub, event_client, sgeE_CKPT_LIST, master_table);
752
check_send_new_subscribed_list(old_sub, new_sub, event_client, sgeE_CENTRY_LIST, master_table);
753
check_send_new_subscribed_list(old_sub, new_sub, event_client, sgeE_CONFIG_LIST, master_table);
754
check_send_new_subscribed_list(old_sub, new_sub, event_client, sgeE_EXECHOST_LIST, master_table);
755
check_send_new_subscribed_list(old_sub, new_sub, event_client, sgeE_JOB_LIST, master_table);
756
check_send_new_subscribed_list(old_sub, new_sub, event_client, sgeE_JOB_SCHEDD_INFO_LIST, master_table);
757
check_send_new_subscribed_list(old_sub, new_sub, event_client, sgeE_MANAGER_LIST, master_table);
758
check_send_new_subscribed_list(old_sub, new_sub, event_client, sgeE_OPERATOR_LIST, master_table);
759
check_send_new_subscribed_list(old_sub, new_sub, event_client, sgeE_NEW_SHARETREE, master_table);
760
check_send_new_subscribed_list(old_sub, new_sub, event_client, sgeE_PE_LIST, master_table);
761
check_send_new_subscribed_list(old_sub, new_sub, event_client, sgeE_PROJECT_LIST, master_table);
762
check_send_new_subscribed_list(old_sub, new_sub, event_client, sgeE_CQUEUE_LIST, master_table);
763
check_send_new_subscribed_list(old_sub, new_sub, event_client, sgeE_SCHED_CONF, master_table);
764
check_send_new_subscribed_list(old_sub, new_sub, event_client, sgeE_SUBMITHOST_LIST, master_table);
765
check_send_new_subscribed_list(old_sub, new_sub, event_client, sgeE_USER_LIST, master_table);
766
check_send_new_subscribed_list(old_sub, new_sub, event_client, sgeE_USERSET_LIST, master_table);
767
check_send_new_subscribed_list(old_sub, new_sub, event_client, sgeE_HGROUP_LIST, master_table);
768
check_send_new_subscribed_list(old_sub, new_sub, event_client, sgeE_RQS_LIST, master_table);
769
check_send_new_subscribed_list(old_sub, new_sub, event_client, sgeE_AR_LIST, master_table);
770
#ifndef __SGE_NO_USERMAPPING__
771
check_send_new_subscribed_list(old_sub, new_sub, event_client, sgeE_CUSER_LIST, master_table);
774
SGE_UNLOCK(LOCK_GLOBAL, LOCK_READ);
777
/* JG: TODO: better use lXchgList? */
779
lList *tmp_list = NULL;
780
lXchgList(clio, EV_subscribed, &tmp_list);
781
lXchgList(event_client, EV_subscribed, &tmp_list);
782
lXchgList(clio, EV_subscribed, &tmp_list);
785
lSetList(event_client, EV_subscribed, lCopyList("", lGetList(clio, EV_subscribed)));
787
lSetRef(event_client, EV_sub_array, new_sub);
788
lSetRef(clio, EV_sub_array, NULL);
791
for (i=0; i<sgeE_EVENTSIZE; i++){
792
lFreeWhere(&(old_sub[i].where));
793
lFreeWhat(&(old_sub[i].what));
794
if (old_sub[i].descr){
795
cull_hash_free_descr(old_sub[i].descr);
796
free(old_sub[i].descr);
803
/* busy state changed */
804
if (busy != lGetUlong(event_client, EV_busy)) {
805
lSetUlong(event_client, EV_busy, busy);
807
/* busy_handling changed */
808
if (busy_handling != lGetUlong(event_client, EV_busy_handling)) {
809
DPRINTF(("EVM: event client %s changes to "sge_U32CFormat"\n",
810
lGetString(event_client, EV_name), lGetUlong(event_client, EV_busy_handling)));
811
lSetUlong(event_client, EV_busy_handling, busy_handling);
814
MONITOR_EDT_MOD(monitor);
816
thread_config = cl_thread_get_thread_config();
817
DEBUG((SGE_EVENT, MSG_SGETEXT_MODIFIEDINLIST_SSSS, thread_config ? thread_config->thread_name : "-NA-",
818
"master host", lGetString(event_client, EV_name), MSG_EVE_EVENTCLIENT));
820
sge_mutex_unlock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
823
} /* sge_event_master_process_mod_event_client() */
825
/****** evm/sge_event_master/sge_remove_event_client() *************************
827
* sge_remove_event_client() -- remove event client
830
* void sge_remove_event_client(u_long32 event_client_id)
833
* Remove event client. Fetch event client from event client list.
834
* Only sets status to "terminated",
835
* it will be removed later on in ......................
838
* u_long32 event_client_id - event client id
844
* MT-NOTE: sge_remove_event_client() is MT safe, uses internal locks
849
*******************************************************************************/
850
void sge_remove_event_client(u_long32 event_client_id)
854
DENTER(TOP_LAYER, "sge_remove_event_client");
856
sge_mutex_lock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
858
DPRINTF(("sge_remove_event_client id = %d\n", (int) event_client_id));
860
client = get_event_client(event_client_id);
862
if (client == NULL) {
863
sge_mutex_unlock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
864
ERROR((SGE_EVENT, MSG_EVE_UNKNOWNEVCLIENT_US, sge_u32c(event_client_id), "remove"));
867
lSetUlong(client, EV_state, EV_terminated);
869
sge_mutex_unlock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
871
} /* sge_remove_event_client() */
874
/****** sge_event_master/sge_set_max_dynamic_event_clients() *******************
876
* sge_set_max_dynamic_event_clients() -- set max number of dyn. event clients
879
* void sge_set_max_dynamic_event_clients(u_long32 max)
882
* Sets max number of dynamic event clients. If the new value is larger than
883
* the maximum number of used file descriptors for communication this value
884
* is set to the max. number of file descriptors minus some reserved file
885
* descriptors. (10 for static event clients, 9 for execd, 10 for file
886
* descriptors used by application (to write files, etc.) ).
888
* At least one dynamic event client is allowed.
891
* u_long32 max - number of dynamic event clients
894
* MT-NOTE: sge_set_max_dynamic_event_clients() is MT safe
896
*******************************************************************************/
897
u_long32 sge_set_max_dynamic_event_clients(u_long32 new_value)
899
u_long32 max = new_value;
901
DENTER(TOP_LAYER, "sge_set_max_dynamic_event_clients");
903
sge_mutex_lock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
905
/* Set the max event clients if it changed. */
906
if (max != Event_Master_Control.max_event_clients) {
907
/* check max. file descriptors of qmaster communication handle */
908
cl_com_handle_t *handle = cl_com_get_handle("qmaster", 1);
909
if (handle != NULL) {
910
u_long32 max_allowed_value = 0;
911
unsigned long max_file_handles = 0;
913
cl_com_get_max_connections(handle, &max_file_handles);
914
if (max_file_handles >= EVENT_MASTER_MIN_FREE_DESCRIPTORS) {
915
max_allowed_value = (u_long32)max_file_handles - EVENT_MASTER_MIN_FREE_DESCRIPTORS;
917
max_allowed_value = 1;
920
if (max > max_allowed_value) {
921
max = max_allowed_value;
922
WARNING((SGE_EVENT, MSG_CONF_NR_DYNAMIC_EVENT_CLIENT_EXCEEDS_MAX_FILEDESCR_U, sge_u32c(max)));
927
/* check max again - it might have changed due to commlib max_file_handles restrictions */
928
if (max != Event_Master_Control.max_event_clients) {
929
lList *answer_list = NULL;
930
lListElem *new_range;
931
lListElem *event_client;
933
/* If the new max is lower than the old max, then lowering the maximum
934
* prevents new event clients, but allows the old ones there to drain off naturally.
936
Event_Master_Control.max_event_clients = max;
937
INFO((SGE_EVENT, MSG_SET_MAXDYNEVENTCLIENT_U, sge_u32c(max)));
939
/* we have to rebuild the event client id range list */
940
lFreeList(&Event_Master_Control.client_ids);
941
range_list_initialize(&Event_Master_Control.client_ids, &answer_list);
942
new_range = lCreateElem(RN_Type);
943
range_set_all_ids(new_range, EV_ID_FIRST_DYNAMIC, max - 1 + EV_ID_FIRST_DYNAMIC, 1);
944
lAppendElem(Event_Master_Control.client_ids, new_range);
946
/* and we have to remove the ids of our existing event clients from the range */
947
for_each(event_client, Event_Master_Control.clients) {
948
u_long32 event_client_id = lGetUlong(event_client, EV_id);
949
/* only for dynamic event clients */
950
if (event_client_id >= EV_ID_FIRST_DYNAMIC) {
952
* the event clients id might not be in the new range,
953
* if the number of dynamic event clients has been reduced
955
if (range_list_is_id_within(Event_Master_Control.client_ids, event_client_id)) {
956
range_list_remove_id(&Event_Master_Control.client_ids, &answer_list, event_client_id);
961
/* compress the range list to reduce fragmentation */
962
range_list_compress(Event_Master_Control.client_ids);
964
/* output any errors that might have occured */
965
answer_list_output(&answer_list);
968
sge_mutex_unlock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
973
/****** sge_event_master/sge_get_max_dynamic_event_clients() *******************
975
* sge_get_max_dynamic_event_clients() -- get max dynamic event clients nr
978
* u_long32 sge_get_max_dynamic_event_clients(u_long32 max)
981
* Returns the actual value of max. dynamic event clients allowed.
984
* u_long32 - max value
987
* MT-NOTE: sge_get_max_dynamic_event_clients() is MT save
989
*******************************************************************************/
990
u_long32 sge_get_max_dynamic_event_clients(void)
992
u_long32 actual_value = 0;
994
DENTER(TOP_LAYER, "sge_get_max_dynamic_event_clients");
996
sge_mutex_lock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
997
actual_value = Event_Master_Control.max_event_clients;
998
sge_mutex_unlock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
1000
DRETURN(actual_value);
1003
/****** Eventclient/Server/sge_has_event_client() ******************************
1005
* sge_has_event_client() -- Is a event client registered
1008
* #include "sge_event_master.h"
1010
* bool sge_has_event_client(u_long32 event_client_id)
1013
* Searches if the event client list, if a client
1014
* with this id is available
1017
* u_long32 event_client_id - id of the event client
1020
* bool - TRUE if client is in the event client list
1023
* MT-NOTE: sge_has_event_client() is MT safe, it uses the internal locks
1025
*******************************************************************************/
1026
bool sge_has_event_client(u_long32 event_client_id) {
1029
DENTER(TOP_LAYER, "sge_has_event_client");
1031
sge_mutex_lock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
1032
ret = (get_event_client(event_client_id) != NULL) ? true : false;
1033
sge_mutex_unlock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
1038
/****** evm/sge_event_master/sge_select_event_clients() ************************
1040
* sge_select_event_clients() -- select event clients
1043
* lList* sge_select_event_clients(const char *list_name, const lCondition
1044
* *where, const lEnumeration *what)
1047
* Select event clients.
1050
* const char *list_name - name of the result list returned.
1051
* const lCondition *where - where condition
1052
* const lEnumeration *what - what enumeration
1055
* lList* - list with elements of type 'EV_Type'.
1058
* MT-NOTE: sge_select_event_clients() is MT safe
1060
* MT-NOTE: The elements contained in the result list are copies of the
1061
* MT-NOTE: respective event client list elements.
1063
*******************************************************************************/
1064
lList* sge_select_event_clients(const char *list_name, const lCondition *where, const lEnumeration *what)
1068
DENTER(TOP_LAYER, "sge_select_event_clients");
1070
sge_mutex_lock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
1071
if (Event_Master_Control.clients != NULL) {
1072
lst = lSelect(list_name, Event_Master_Control.clients, where, what);
1074
sge_mutex_unlock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
1077
} /* sge_select_event_clients() */
1079
/****** evm/sge_event_master/sge_shutdown_event_client() ***********************
1081
* sge_shutdown_event_client() -- shutdown an event client
1084
* int sge_shutdown_event_client(u_long32 event_client_id, const char* anUser,
1088
* Shutdown an event client. Send the event client denoted by 'event_client_id'
1091
* Shutting down an event client is only permitted if 'anUser' does have
1092
* manager privileges OR is the owner of event client 'event_client_id'.
1095
* u_long32 event_client_id - event client ID
1096
* const char* anUser - user which did request this operation
1097
* uid_t anUID - user id of request user
1098
* lList **alpp - answer list for info and errors
1101
* EPERM - operation not permitted
1102
* ESRCH - client with given client id is unknown
1106
* MT-NOTE: sge_shutdown_event_client() is MT safe, it uses the global lock
1107
* and internal ones.
1109
*******************************************************************************/
1110
int sge_shutdown_event_client(u_long32 event_client_id, const char* anUser,
1111
uid_t anUID, lList **alpp, monitoring_t *monitor)
1113
lListElem *client = NULL;
1116
DENTER(TOP_LAYER, "sge_shutdown_event_client");
1118
if (event_client_id <= EV_ID_ANY) {
1119
SGE_ADD_MSG_ID(sprintf(SGE_EVENT, MSG_EVE_UNKNOWNEVCLIENT_US, sge_u32c(event_client_id), "shutdown"));
1120
answer_list_add(alpp, SGE_EVENT, STATUS_ESYNTAX, ANSWER_QUALITY_ERROR);
1124
sge_mutex_lock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
1125
client = get_event_client(event_client_id);
1127
if (client != NULL) {
1128
if (!manop_is_manager(anUser) && (anUID != lGetUlong(client, EV_uid))) {
1129
sge_mutex_unlock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
1130
answer_list_add(alpp, MSG_COM_NOSHUTDOWNPERMS, STATUS_DENIED,
1131
ANSWER_QUALITY_ERROR);
1135
add_list_event_for_client(event_client_id, 0, sgeE_SHUTDOWN, 0, 0, NULL, NULL,
1138
/* Print out a message about the event. */
1139
if (event_client_id == EV_ID_SCHEDD) {
1140
SGE_ADD_MSG_ID(sprintf(SGE_EVENT, MSG_COM_KILLED_SCHEDULER));
1142
SGE_ADD_MSG_ID(sprintf(SGE_EVENT, MSG_COM_SHUTDOWNNOTIFICATION_SUS,
1143
lGetString(client, EV_name),
1144
sge_u32c(lGetUlong(client, EV_id)),
1145
lGetHost(client, EV_host)));
1147
answer_list_add(alpp, SGE_EVENT, STATUS_OK, ANSWER_QUALITY_INFO);
1149
SGE_ADD_MSG_ID(sprintf(SGE_EVENT,
1150
MSG_EVE_UNKNOWNEVCLIENT_US, sge_u32c(event_client_id), "shutdown"));
1151
answer_list_add(alpp, SGE_EVENT, STATUS_ESYNTAX, ANSWER_QUALITY_ERROR);
1155
sge_mutex_unlock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
1158
} /* sge_shutdown_event_client */
1160
/****** evm/sge_event_master/sge_shutdown_dynamic_event_clients() **************
1162
* sge_shutdown_dynamic_event_clients() -- shutdown all dynamic event clients
1165
* int sge_shutdown_dynamic_event_clients(const char *anUser)
1168
* Shutdown all dynamic event clients. Each dynamic event client known will
1169
* be send a shutdown event.
1171
* An event client is a dynamic event client if it's client id is greater
1172
* than or equal to 'EV_ID_FIRST_DYNAMIC'.
1174
* Shutting down all dynamic event clients is only permitted if 'anUser' does
1175
* have manager privileges.
1178
* const char *anUser - user which did request this operation
1179
* lList **alpp - answer list for info and errors
1182
* EPERM - operation not permitted
1186
* MT-NOTES: sge_shutdown_dynamic_event_clients() is MT safe, it uses the
1187
* global_lock and internal ones.
1189
*******************************************************************************/
1190
int sge_shutdown_dynamic_event_clients(const char *anUser, lList **alpp, monitoring_t *monitor)
1195
DENTER(TOP_LAYER, "sge_shutdown_dynamic_event_clients");
1197
if (!manop_is_manager(anUser)) {
1198
answer_list_add(alpp, MSG_COM_NOSHUTDOWNPERMS, STATUS_DENIED, ANSWER_QUALITY_ERROR);
1202
sge_mutex_lock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
1203
for_each (client, Event_Master_Control.clients) {
1204
id = lGetUlong(client, EV_id);
1206
/* Ignore clients with static ids. */
1207
if (id < EV_ID_FIRST_DYNAMIC) {
1211
sge_add_event_for_client(id, 0, sgeE_SHUTDOWN, 0, 0, NULL, NULL, NULL, NULL);
1213
SGE_ADD_MSG_ID(sprintf(SGE_EVENT, MSG_COM_SHUTDOWNNOTIFICATION_SUS,
1214
lGetString(client, EV_name),
1215
sge_u32c(id), lGetHost(client, EV_host)));
1216
answer_list_add(alpp, SGE_EVENT, STATUS_OK, ANSWER_QUALITY_INFO);
1219
sge_mutex_unlock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
1221
} /* sge_shutdown_dynamic_event_clients() */
1223
/****** Eventclient/Server/sge_add_event() *************************************
1225
* sge_add_event() -- add an object as event
1228
* #include "sge_event_master.h"
1231
* sge_add_event(u_long32 timestamp, ev_event type,
1232
* u_long32 intkey, u_long32 intkey2,
1233
* const char *strkey, const char *strkey2,
1234
* const char *session, lListElem *element)
1237
* Adds an object to the list of events to deliver. Called, if an event
1238
* occurs to that object, e.g. it was added to Grid Engine, modified or
1241
* Internally, a list with that single object is created and passed to
1242
* sge_add_list_event().
1245
* u_long32 timestamp - event creation time, 0 -> use current time
1246
* ev_event type - the event id
1247
* u_long32 intkey - additional data
1248
* u_long32 intkey2 - additional data
1249
* const char *strkey - additional data
1250
* const char *strkey2 - additional data
1251
* const char *session - events session key
1252
* lListElem *element - the object to deliver as event
1255
* MT-NOTE: sge_add_event() is NOT MT safe.
1257
*******************************************************************************/
1259
sge_add_event(u_long32 timestamp, ev_event type,
1260
u_long32 intkey, u_long32 intkey2,
1261
const char *strkey, const char *strkey2,
1262
const char *session, lListElem *element)
1264
return sge_add_event_for_client(EV_ID_ANY, timestamp, type, intkey, intkey2, strkey, strkey2, session, element);
1266
/****** sge_event_master/sge_add_event_for_client() ****************************
1268
* sge_add_event_for_client() -- add an event for a given object
1272
* sge_add_event_for_client(u_long32 event_client_id, u_long32 timestamp, ev_event type,
1273
* u_long32 intkey, u_long32 intkey2,
1274
* const char *strkey, const char *strkey2,
1275
* const char *session, lListElem *element)
1278
* Add an event for a given event client.
1281
* u_long32 event_client_id - event client id
1282
* u_long32 timestamp - event creation time, 0 -> use current time
1283
* ev_event type - event id
1284
* u_long32 intkey - 1st numeric key
1285
* u_long32 intkey2 - 2nd numeric key
1286
* const char *strkey - 1st alphanumeric key
1287
* const char *strkey2 - 2nd alphanumeric key
1288
* const char *session - event session
1289
* lListElem *element - object to be delivered with the event
1292
* MT-NOTE: sge_add_event_for_client() is MT safe
1294
*******************************************************************************/
1295
bool sge_add_event_for_client(u_long32 event_client_id, u_long32 timestamp, ev_event type,
1296
u_long32 intkey, u_long32 intkey2,
1297
const char *strkey, const char *strkey2,
1298
const char *session, lListElem *element)
1303
DENTER(TOP_LAYER, "sge_add_event_for_client");
1305
if (element != NULL) {
1306
lList *temp_sub_lp = NULL;
1307
int sub_list_elem = 0;
1309
/* ignore the sublist in case of the following events. We have
1310
* extra events to handle the sub-lists
1312
if (type == sgeE_JOB_MOD) {
1313
sub_list_elem = JB_ja_tasks;
1314
lXchgList(element, sub_list_elem, &temp_sub_lp);
1315
} else if (type == sgeE_CQUEUE_MOD) {
1316
sub_list_elem = CQ_qinstances;
1317
lXchgList(element, sub_list_elem, &temp_sub_lp);
1318
} else if (type == sgeE_JATASK_MOD) {
1319
sub_list_elem = JAT_task_list;
1320
lXchgList(element, sub_list_elem, &temp_sub_lp);
1323
lp = lCreateListHash("Events", lGetElemDescr(element), false);
1324
lAppendElem(lp, lCopyElemHash(element, false));
1326
/* restore the original event object */
1327
if (temp_sub_lp != NULL) {
1328
lXchgList(element, sub_list_elem, &temp_sub_lp);
1332
ret = add_list_event_for_client(event_client_id, timestamp, type, intkey, intkey2,
1333
strkey, strkey2, session, lp, false);
1338
/****** Eventclient/Server/sge_add_list_event() ********************************
1340
* sge_add_list_event() -- add a list as event
1343
* #include "sge_event_master.h"
1346
* sge_add_list_event(u_long32 timestamp, ev_event type,
1347
* u_long32 intkey, u_long32 intkey2,
1348
* const char *strkey, const char *strkey2,
1349
* const char *session, lList *list)
1352
* Adds a list of objects to the list of events to deliver, e.g. the
1353
* sgeE*_LIST events.
1356
* u_long32 timestamp - event creation time, 0 -> use current time
1357
* ev_event type - the event id
1358
* u_long32 intkey - additional data
1359
* u_long32 intkey2 - additional data
1360
* const char *strkey - additional data
1361
* const char *strkey2 - additional data
1362
* const char *session - events session key
1363
* lList *list - the list to deliver as event
1366
* MT-NOTE: sge_add_list_event() is MT safe.
1368
*******************************************************************************/
1369
bool sge_add_list_event(u_long32 timestamp, ev_event type,
1370
u_long32 intkey, u_long32 intkey2,
1371
const char *strkey, const char *strkey2,
1372
const char *session, lList *list)
1378
lListElem *element = NULL;
1380
lp = lCreateListHash("Events", lGetListDescr(list), false);
1384
for_each(element, list) {
1385
lList *temp_sub_lp = NULL;
1386
int sub_list_elem = 0;
1388
/* ignore the sublist in case of the following events. We have
1389
* extra events to handle the sub-lists
1391
if (type == sgeE_JOB_MOD) {
1392
sub_list_elem = JB_ja_tasks;
1393
lXchgList(element, sub_list_elem, &temp_sub_lp);
1394
} else if (type == sgeE_CQUEUE_MOD) {
1395
sub_list_elem = CQ_qinstances;
1396
lXchgList(element, sub_list_elem, &temp_sub_lp);
1397
} else if (type == sgeE_JATASK_MOD) {
1398
sub_list_elem = JAT_task_list;
1399
lXchgList(element, sub_list_elem, &temp_sub_lp);
1402
lAppendElem(lp, lCopyElemHash(element, false));
1404
/* restore the original event object */
1405
if (temp_sub_lp != NULL) {
1406
lXchgList(element, sub_list_elem, &temp_sub_lp);
1411
ret = add_list_event_for_client(EV_ID_ANY, timestamp, type, intkey, intkey2,
1412
strkey, strkey2, session, lp, false);
1416
/****** Eventclient/Server/add_list_event_for_client() *************************
1418
* add_list_event_for_client() -- add a list as event
1421
* #include "sge_event_master.h"
1424
* add_list_event_for_client(u_long32 event_client_id, u_long32 timestamp,
1425
* ev_event type, u_long32 intkey,
1426
* u_long32 intkey2, const char *strkey,
1427
* const char *session, lList *list)
1430
* Adds a list of objects to the list of events to deliver, e.g. the
1431
* sgeE*_LIST events, to a specific client. No checking is done to make
1432
* sure that the client id is valid. That is the responsibility of the
1436
* u_long32 event_client_id - the id of the recipient
1437
* u_long32 timestamp - time stamp in gmt for the even; if 0 is passed,
1438
* sge_add_list_event will insert the actual time
1439
* ev_event type - the event id
1440
* u_long32 intkey - additional data
1441
* u_long32 intkey2 - additional data
1442
* const char *strkey - additional data
1443
* const char *session - events session key
1444
* lList *list - the list to deliver as event
1447
* Whether the event was added successfully.
1450
* MT-NOTE: add_list_event_for_client() is MT safe.
1452
*******************************************************************************/
1453
static bool add_list_event_for_client(u_long32 event_client_id, u_long32 timestamp,
1454
ev_event type, u_long32 intkey,
1455
u_long32 intkey2, const char *strkey,
1456
const char *strkey2, const char *session,
1457
lList *list, bool has_lock)
1459
lListElem *evr = NULL; /* event request object */
1460
lList *etlp = NULL; /* event list */
1461
lListElem *etp = NULL; /* event object */
1463
DENTER(TOP_LAYER, "add_list_event_for_client");
1465
/* an event needs a timestamp */
1466
if (timestamp == 0) {
1467
timestamp = sge_get_gmt();
1470
evr = lCreateElem(EVR_Type);
1471
lSetUlong(evr, EVR_operation, EVR_ADD_EVENT);
1472
lSetUlong(evr, EVR_timestamp, timestamp);
1473
lSetUlong(evr, EVR_event_client_id, event_client_id);
1474
lSetString(evr, EVR_session, session);
1476
etlp = lCreateListHash("Event_List", ET_Type, false);
1477
lSetList(evr, EVR_event_list, etlp);
1479
etp = lCreateElem(ET_Type); /* actual event object */
1480
lAppendElem(etlp, etp);
1482
lSetUlong(etp, ET_type, type);
1483
lSetUlong(etp, ET_timestamp, timestamp);
1484
lSetUlong(etp, ET_intkey, intkey);
1485
lSetUlong(etp, ET_intkey2, intkey2);
1486
lSetString(etp, ET_strkey, strkey);
1487
lSetString(etp, ET_strkey2, strkey2);
1488
lSetList(etp, ET_new_version, list);
1491
* if we have a transaction open, add to the transaction
1492
* otherwise into the event master request list
1493
* need a new C block, as the GET_SPECIFIC macro declares new variables
1496
GET_SPECIFIC(event_master_transaction_t, t_store, sge_event_master_init_transaction_store, Event_Master_Control.transaction_key, "t_store");
1497
if (t_store->is_transaction) {
1498
lAppendElem(t_store->transaction_requests, evr);
1500
sge_mutex_lock("event_master_request_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.request_mutex);
1501
lAppendElem(Event_Master_Control.requests, evr);
1502
sge_mutex_unlock("event_master_request_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.request_mutex);
1509
} /* end add_list_event_for_client */
1511
/* add an event from the request list to the event clients which subscribed it */
1512
static void sge_event_master_process_send(lListElem *request, monitoring_t *monitor)
1514
lListElem *event_client = NULL;
1515
lListElem *event = NULL;
1516
lList *event_list = NULL;
1518
const char *session = NULL;
1519
ev_event type = sgeE_ALL_EVENTS;
1521
DENTER(TOP_LAYER, "sge_event_master_process_send");
1523
ec_id = lGetUlong(request, EVR_event_client_id);
1524
session = lGetString(request, EVR_session);
1525
event_list = lGetList(request, EVR_event_list);
1527
MONITOR_EDT_NEW(monitor);
1529
if (ec_id == EV_ID_ANY) {
1530
DPRINTF(("Processing event for all clients\n"));
1532
event = lFirst(event_list);
1533
while (event != NULL) {
1535
event = lDechainElem(event_list, event);
1536
type = (ev_event)lGetUlong(event, ET_type);
1538
sge_mutex_lock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
1539
for_each (event_client, Event_Master_Control.clients) {
1540
ec_id = lGetUlong(event_client, EV_id);
1542
DPRINTF(("Preparing event for client %ld\n", ec_id));
1544
if (eventclient_subscribed(event_client, type, session)) {
1546
add_list_event_direct(event_client, event, true);
1547
MONITOR_EDT_ADDED(monitor);
1550
sge_mutex_unlock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
1553
MONITOR_EDT_SKIP(monitor);
1556
event = lFirst(event_list);
1559
DPRINTF(("Processing event for client %d.\n", ec_id));
1561
sge_mutex_lock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
1563
event_client = get_event_client(ec_id);
1565
/* Skip bad client ids. Events with bad client ids will be freed
1566
* when send is freed since we don't dechain them. */
1567
if (event_client != NULL) {
1568
event = lFirst(event_list);
1570
while (event != NULL) {
1571
event = lDechainElem(event_list, event);
1572
type = (ev_event)lGetUlong(event, ET_type);
1574
if (eventclient_subscribed(event_client, type, session)) {
1575
add_list_event_direct(event_client, event, false);
1576
MONITOR_EDT_ADDED(monitor);
1577
/* We can't free the event when we're done because it now belongs
1578
* to send_events(). */
1580
MONITOR_EDT_SKIP(monitor);
1583
event = lFirst(event_list);
1587
sge_mutex_unlock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
1590
} /* process_sends() */
1592
/****** Eventclient/Server/sge_handle_event_ack() ******************************
1594
* sge_handle_event_ack() -- acknowledge event delivery
1597
* #include "sge_event_master.h"
1600
* sge_handle_event_ack(u_long32 event_client_id, ev_event event_number)
1603
* After the server sent events to an event client, it has to acknowledge
1605
* Acknowledged events are deleted from the list of events to deliver,
1606
* otherwise they will be resent after the next event delivery interval.
1607
* If the handling of a busy state of the event client is enabled and set to
1608
* EV_BUSY_UNTIL_ACK, the event client will be set to "not busy".
1611
* u_long32 event_client_id - event client sending acknowledge
1612
* ev_event event_number - serial number of the last event to acknowledge
1618
* MT-NOTE: sge_handle_event_ack() is MT safe.
1621
*******************************************************************************/
1622
void sge_handle_event_ack(u_long32 event_client_id, u_long32 event_number)
1624
lListElem *evr = NULL;
1626
DENTER(TOP_LAYER, "sge_handle_event_ack");
1628
evr = lCreateElem(EVR_Type);
1629
lSetUlong(evr, EVR_operation, EVR_ACK_EVENT);
1630
lSetUlong(evr, EVR_timestamp, sge_get_gmt());
1631
lSetUlong(evr, EVR_event_client_id, event_client_id);
1632
lSetUlong(evr, EVR_event_number, event_number);
1634
sge_mutex_lock("event_master_request_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.request_mutex);
1635
lAppendElem(Event_Master_Control.requests, evr);
1636
sge_mutex_unlock("event_master_request_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.request_mutex);
1643
static void sge_event_master_process_ack(lListElem *request, monitoring_t *monitor)
1646
u_long32 event_client_id;
1648
DENTER(TOP_LAYER, "sge_event_master_process_ack");
1650
event_client_id = lGetUlong(request, EVR_event_client_id);
1652
sge_mutex_lock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
1654
client = get_event_client(event_client_id);
1656
if (client == NULL) {
1657
ERROR((SGE_EVENT, MSG_EVE_UNKNOWNEVCLIENT_US, sge_u32c(event_client_id), "process acknowledgements"));
1659
u_long32 event_number = lGetUlong(request, EVR_event_number);
1660
u_long32 timestamp = lGetUlong(request, EVR_timestamp);
1662
lList *list = lGetList(client, EV_events);
1664
res = purge_event_list(list, event_number);
1666
MONITOR_EDT_ACK(monitor);
1668
DPRINTF(("%s: purged %d acknowledged events\n", SGE_FUNC, res));
1671
lSetUlong(client, EV_last_heard_from, timestamp); /* note time of ack */
1673
switch (lGetUlong(client, EV_busy_handling)) {
1674
case EV_BUSY_UNTIL_ACK:
1675
case EV_THROTTLE_FLUSH:
1676
lSetUlong(client, EV_busy, 0); /* clear busy state */
1683
sge_mutex_unlock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
1685
} /* sge_handle_event_ack() */
1687
/****** evm/sge_event_master/sge_deliver_events_immediately() ******************
1689
* sge_deliver_events_immediately() -- deliver events immediately
1692
* void sge_deliver_events_immediately(u_long32 event_client_id)
1695
* Deliver all events for the event client denoted by 'event_client_id'
1699
* u_long32 event_client_id - event client id
1705
* MT-NOTE: sge_deliver_events_immediately() is NOT MT safe.
1707
*******************************************************************************/
1708
void sge_deliver_events_immediately(u_long32 event_client_id)
1710
lListElem *client = NULL;
1712
DENTER(TOP_LAYER, "sge_event_immediate_delivery");
1714
sge_mutex_lock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
1716
if ((client = get_event_client(event_client_id)) == NULL) {
1717
ERROR((SGE_EVENT, MSG_EVE_UNKNOWNEVCLIENT_US, sge_u32c(event_client_id), "deliver events immediately"));
1719
flush_events(client, 0);
1721
sge_mutex_lock("event_master_cond_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.cond_mutex);
1722
Event_Master_Control.delivery_signaled = true;
1723
pthread_cond_signal(&Event_Master_Control.cond_var);
1724
sge_mutex_unlock("event_master_cond_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.cond_mutex);
1727
sge_mutex_unlock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
1729
} /* sge_deliver_event_immediately() */
1731
/****** evm/sge_event_master/sge_resync_schedd() *******************************
1733
* sge_resync_schedd() -- resync schedd
1736
* int sge_resync_schedd(void)
1739
* Does a total update (send all lists) to schedd and outputs an error
1746
* 0 - resync successful
1750
* MT-NOTE: sge_resync_schedd() in NOT MT safe.
1752
*******************************************************************************/
1753
int sge_resync_schedd(monitoring_t *monitor)
1757
DENTER(TOP_LAYER, "sge_sync_schedd");
1759
sge_mutex_lock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
1761
if ((client = get_event_client(EV_ID_SCHEDD)) != NULL) {
1762
ERROR((SGE_EVENT, MSG_EVE_REINITEVENTCLIENT_S,
1763
lGetString(client, EV_name)));
1765
total_update(client, monitor);
1769
ERROR((SGE_EVENT, MSG_EVE_UNKNOWNEVCLIENT_US, sge_u32c(EV_ID_SCHEDD), "resynchronize"));
1773
sge_mutex_unlock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
1775
} /* sge_resync_schedd() */
1777
/****** evm/sge_event_master/sge_event_master_init() **************************
1779
* sge_event_master_init() -- event master initialization
1782
* static void sge_event_master_init(void)
1785
* Initialize the event master control structure. Initialize permanent
1795
* MT-NOTE: sge_event_master_init() is not MT safe
1797
*******************************************************************************/
1798
void sge_event_master_init(void)
1800
DENTER(TOP_LAYER, "sge_event_master_init");
1802
Event_Master_Control.clients = lCreateListHash("EV_Clients", EV_Type, true);
1803
Event_Master_Control.requests = lCreateListHash("Event Master Requests", EVR_Type, false);
1804
pthread_key_create(&(Event_Master_Control.transaction_key), sge_event_master_destroy_transaction_store);
1808
/* Initialize the range list for event client ids */
1810
lList *answer_list = NULL;
1811
range_list_initialize(&Event_Master_Control.client_ids, &answer_list);
1812
answer_list_output(&answer_list);
1818
/****** evm/sge_event_master/init_send_events() ********************************
1820
* init_send_events() -- sets the events, that should allways be delivered
1823
* void init_send_events()
1826
* sets the events, that should allways be delivered
1829
* MT-NOTE: init_send_events() is not MT safe
1831
*******************************************************************************/
1832
static void init_send_events(void)
1834
DENTER(TOP_LAYER, "init_send_events");
1836
memset(SEND_EVENTS, false, sizeof(bool) * sgeE_EVENTSIZE);
1838
SEND_EVENTS[sgeE_ADMINHOST_LIST] = true;
1839
SEND_EVENTS[sgeE_CALENDAR_LIST] = true;
1840
SEND_EVENTS[sgeE_CKPT_LIST] = true;
1841
SEND_EVENTS[sgeE_CENTRY_LIST] = true;
1842
SEND_EVENTS[sgeE_CONFIG_LIST] = true;
1843
SEND_EVENTS[sgeE_EXECHOST_LIST] = true;
1844
SEND_EVENTS[sgeE_JOB_LIST] = true;
1845
SEND_EVENTS[sgeE_JOB_SCHEDD_INFO_LIST] = true;
1846
SEND_EVENTS[sgeE_MANAGER_LIST] = true;
1847
SEND_EVENTS[sgeE_OPERATOR_LIST] = true;
1848
SEND_EVENTS[sgeE_PE_LIST] = true;
1849
SEND_EVENTS[sgeE_PROJECT_LIST] = true;
1850
SEND_EVENTS[sgeE_QMASTER_GOES_DOWN] = true;
1851
SEND_EVENTS[sgeE_CQUEUE_LIST] = true;
1852
SEND_EVENTS[sgeE_SUBMITHOST_LIST] = true;
1853
SEND_EVENTS[sgeE_USER_LIST] = true;
1854
SEND_EVENTS[sgeE_USERSET_LIST] = true;
1855
SEND_EVENTS[sgeE_HGROUP_LIST] = true;
1856
SEND_EVENTS[sgeE_RQS_LIST] = true;
1857
SEND_EVENTS[sgeE_AR_LIST] = true;
1858
#ifndef __SGE_NO_USERMAPPING__
1859
SEND_EVENTS[sgeE_CUSER_LIST] = true;
1863
} /* init_send_events() */
1866
/****** sge_event_master/sge_event_master_wait_next() ******************************
1868
* sge_event_master_wait_next() -- waits for a weakup
1871
* void sge_event_master_wait_next(void)
1874
* waits for a weakup
1877
* MT-NOTE: is MT safe
1879
*******************************************************************************/
1880
void sge_event_master_wait_next(void)
1883
DENTER(TOP_LAYER, "sge_event_master_wait_next");
1885
sge_mutex_lock("event_master_cond_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.cond_mutex);
1887
if (!Event_Master_Control.delivery_signaled) {
1888
u_long32 current_time = sge_get_gmt();
1890
ts.tv_sec = (time_t)(current_time + EVENT_DELIVERY_INTERVAL_S);
1891
ts.tv_nsec = EVENT_DELIVERY_INTERVAL_N;
1892
pthread_cond_timedwait(&Event_Master_Control.cond_var, &Event_Master_Control.cond_mutex, &ts);
1895
Event_Master_Control.delivery_signaled = false;
1897
sge_mutex_unlock("event_master_cond_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.cond_mutex);
1903
/****** sge_event_master/remove_event_client() *********************************
1905
* remove_event_client() -- removes an event client
1909
* remove_event_client(lListElem **client, int event_client_id, bool lock_event_master)
1912
* removes an event client, marks the index as dirty and frees the memory
1915
* lListElem **client - event client to remove
1916
* int event_client_id - event client id to remove
1917
* bool lock_event_master - shall the function aquire the event_master mutex
1920
* MT-NOTE: remove_event_client() is not MT safe
1921
* - it locks the event master mutex to modify the event client list
1922
* - it assums that the event client is locked before this method is called
1924
*******************************************************************************/
1925
static void remove_event_client(lListElem **client, int event_client_id, bool lock_event_master) {
1926
subscription_t *old_sub = NULL;
1929
DENTER(TOP_LAYER, "remove_event_client");
1931
INFO((SGE_EVENT, MSG_EVE_UNREG_SU, lGetString(*client, EV_name),
1932
sge_u32c(lGetUlong(*client, EV_id))));
1934
old_sub = lGetRef(*client, EV_sub_array);
1936
/* now free event client subscription data */
1937
for (i = 0; i < sgeE_EVENTSIZE; i++) {
1938
lFreeWhere(&old_sub[i].where);
1939
lFreeWhat(&old_sub[i].what);
1941
if (old_sub[i].descr) {
1942
cull_hash_free_descr(old_sub[i].descr);
1943
FREE(old_sub[i].descr);
1948
lSetRef(*client, EV_sub_array, NULL);
1951
if (lock_event_master) {
1952
sge_mutex_lock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
1955
lRemoveElem(Event_Master_Control.clients, client);
1956
if (event_client_id >= EV_ID_FIRST_DYNAMIC) {
1957
lList *answer_list = NULL;
1958
free_dynamic_id(&answer_list, event_client_id);
1959
answer_list_output(&answer_list);
1962
if (lock_event_master) {
1963
sge_mutex_unlock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
1969
/****** evm/sge_event_master/sge_event_master_send_events() ******************
1971
* sge_event_master_send_events() -- send events to event clients
1974
* static void send_events(void)
1977
* Loop over all event clients and send events due. If an event client did
1978
* time out, it will be removed from the list of registered event clients.
1980
* Events will be delivered only, if the so called 'busy handling' of a
1981
* client does allow it. Events will be delivered as a report (REP_Type)
1982
* with a report list of type ET_Type.
1985
* lListElem *report - a report, has to be part of the report list. All
1986
* fields have to be init, except for REP_list element.
1987
* lList *report_list - a pre-init report list
1993
* MT-NOTE: send_events() is MT safe
1995
* MT-NOTE: After all events for all clients have been sent. This function
1996
* MT-NOTE: will wait on the condition variable 'Event_Master_Control.cond_var'
1998
*******************************************************************************/
1999
void sge_event_master_send_events(sge_gdi_ctx_class_t *ctx, lListElem *report, lList *report_list,
2000
monitoring_t *monitor)
2003
u_long32 busy_handling;
2004
u_long32 scheduler_timeout = mconf_get_scheduler_timeout();
2005
lListElem *event_client, *next_event_client;
2008
int deliver_interval;
2009
time_t now = time(NULL);
2011
event_client_update_func_t update_func = NULL;
2013
DENTER(TOP_LAYER, "sge_event_master_send_events");
2015
sge_mutex_lock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
2016
event_client = lFirst(Event_Master_Control.clients);
2017
while (event_client != NULL) {
2018
const char *host = NULL;
2019
const char *commproc = NULL;
2021
next_event_client = lNext(event_client);
2023
ec_id = lGetUlong(event_client, EV_id);
2025
/* is the event client in state terminated? remove it */
2026
if (lGetUlong(event_client, EV_state) == EV_terminated) {
2027
remove_event_client(&event_client, ec_id, false);
2028
/* we removed a client, continue with the next one */
2029
event_client = next_event_client;
2033
/* extract address of event client */
2035
/* host and commproc have to be freed */
2036
update_func = (event_client_update_func_t) lGetRef(event_client, EV_update_function);
2038
host = lGetHost(event_client, EV_host);
2039
commproc = lGetString(event_client, EV_commproc);
2040
commid = lGetUlong(event_client, EV_commid);
2042
deliver_interval = lGetUlong(event_client, EV_d_time);
2043
busy_handling = lGetUlong(event_client, EV_busy_handling);
2045
/* somone turned the clock back */
2046
if (lGetUlong(event_client, EV_last_heard_from) > now) {
2047
lSetUlong(event_client, EV_last_heard_from, now);
2048
lSetUlong(event_client, EV_next_send_time, now + deliver_interval);
2049
} else if (lGetUlong(event_client, EV_last_send_time) > now) {
2050
lSetUlong(event_client, EV_last_send_time, now);
2053
/* if set, use qmaster_params SCHEDULER_TIMEOUT */
2054
if (scheduler_timeout > 0) {
2055
timeout = scheduler_timeout;
2057
/* is the ack timeout expired ? */
2058
timeout = 10*deliver_interval;
2060
if (timeout < EVENT_ACK_MIN_TIMEOUT) {
2061
timeout = EVENT_ACK_MIN_TIMEOUT;
2062
} else if (timeout > EVENT_ACK_MAX_TIMEOUT) {
2063
timeout = EVENT_ACK_MAX_TIMEOUT;
2067
if (now > (lGetUlong(event_client, EV_last_heard_from) + timeout)) {
2068
ERROR((SGE_EVENT, MSG_COM_ACKTIMEOUT4EV_ISIS, (int) timeout, commproc,
2069
(int) commid, host));
2070
remove_event_client(&event_client, ec_id, false);
2071
event_client = next_event_client;
2072
continue; /* while */
2075
/* do we have to deliver events ? */
2076
if (now >= lGetUlong(event_client, EV_next_send_time)) {
2077
if ((busy_handling == EV_THROTTLE_FLUSH) || !lGetUlong(event_client, EV_busy)) {
2080
/* put only pointer in report - dont copy */
2081
lXchgList(event_client, EV_events, &lp);
2082
lXchgList(report, REP_list, &lp);
2084
if (update_func != NULL) {
2085
update_func(ec_id, NULL, report_list);
2088
ret = report_list_send(ctx, report_list, host, commproc, commid, 0);
2089
MONITOR_MESSAGES_OUT(monitor);
2092
/* on failure retry is triggered automatically */
2093
if (ret == CL_RETVAL_OK) {
2094
now = (time_t)sge_get_gmt();
2096
switch (busy_handling) {
2097
case EV_THROTTLE_FLUSH:
2098
/* increase busy counter */
2099
lSetUlong(event_client, EV_busy, lGetUlong(event_client, EV_busy) + 1);
2101
case EV_BUSY_UNTIL_RELEASED:
2102
case EV_BUSY_UNTIL_ACK:
2103
lSetUlong(event_client, EV_busy, 1);
2106
/* EV_BUSY_NO_HANDLING */
2110
lSetUlong(event_client, EV_last_send_time, now);
2113
/* We reset this time even if the report list send failed because we
2114
* want to give failed clients a break before trying them again. */
2115
lSetUlong(event_client, EV_next_send_time, now + deliver_interval);
2117
/* don't delete sent events - deletion is triggerd by ack's */
2118
lXchgList(report, REP_list, &lp);
2119
lXchgList(event_client, EV_events, &lp);
2121
MONITOR_EDT_BUSY(monitor);
2125
event_client = next_event_client;
2128
sge_mutex_unlock("event_master_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.mutex);
2130
} /* send_events() */
2132
static void flush_events(lListElem *event_client, int interval)
2134
u_long32 next_send = 0;
2135
u_long32 flush_delay = 0;
2136
int now = sge_get_gmt();
2138
DENTER(TOP_LAYER, "flush_events");
2140
SGE_ASSERT(event_client != NULL);
2142
next_send = lGetUlong(event_client, EV_next_send_time);
2143
next_send = MIN(next_send, now + interval);
2145
/* never send out two event packages in the very same second */
2146
if (lGetUlong(event_client, EV_busy_handling) == EV_THROTTLE_FLUSH) {
2147
u_long32 busy_counter = lGetUlong(event_client, EV_busy);
2148
u_long32 ed_time = lGetUlong(event_client, EV_d_time);
2149
u_long32 flush_delay_rate = MAX(lGetUlong(event_client, EV_flush_delay), 1);
2151
if (busy_counter >= flush_delay_rate) {
2152
/* busy counters larger than flush delay cause events being
2153
sent out in regular event delivery interval for alive protocol
2154
purposes with event client */
2155
flush_delay = MAX(flush_delay, ed_time);
2157
/* for smaller busy counters event delivery interval is scaled
2158
down with the busy counter */
2159
flush_delay = MAX(flush_delay, ed_time * busy_counter / flush_delay_rate);
2163
next_send = MAX(next_send, lGetUlong(event_client, EV_last_send_time) + flush_delay);
2164
lSetUlong(event_client, EV_next_send_time, next_send);
2166
if (now >= next_send) {
2170
DPRINTF(("%s: %s %d\tNOW: %d NEXT FLUSH: %d (%s,%s,%d)\n",
2172
((lGetString(event_client, EV_name) != NULL) ? lGetString(event_client, EV_name) : "<null>"),
2173
lGetUlong(event_client, EV_id),
2176
((lGetHost(event_client, EV_host) != NULL) ? lGetHost(event_client, EV_host) : "<null>"),
2177
((lGetString(event_client, EV_commproc) != NULL) ? lGetString(event_client, EV_commproc) : "<null>"),
2178
lGetUlong(event_client, EV_commid)));
2181
} /* flush_events() */
2183
/****** Eventclient/Server/total_update() **************************************
2185
* total_update() -- send all data to eventclient
2189
* total_update(lListElem *event_client)
2192
* Sends all complete lists it subscribed to an eventclient.
2193
* If the event client receives a complete list instead of single events,
2194
* it should completely update it's database.
2197
* lListElem *event_client - the event client to update
2200
* MT-NOTE: total_update() is MT safe, IF the function is invoked with
2201
* MT-NOTE: 'LOCK_EVENT_CLIENT_LST' locked! This is in accordance with
2202
* MT-NOTE: the acquire/release protocol as defined by the Grid Engine
2203
* MT-NOTE: Locking API.
2204
* MT-NOTE: the method also locks the global lock. One has to make sure,
2205
* MT-NOTE: that no calling method has that lock already.
2208
* libs/lck/sge_lock.h
2209
* libs/lck/sge_lock.c
2211
*******************************************************************************/
2212
static void total_update(lListElem *event_client, monitoring_t *monitor)
2214
object_description *master_table = NULL;
2216
DENTER(TOP_LAYER, "total_update");
2218
master_table = object_type_get_global_object_description();
2220
blockEvents(event_client, sgeE_ALL_EVENTS, true);
2222
sge_set_commit_required();
2224
total_update_event(event_client, sgeE_ADMINHOST_LIST, master_table, false);
2225
total_update_event(event_client, sgeE_CALENDAR_LIST, master_table, false);
2226
total_update_event(event_client, sgeE_CKPT_LIST, master_table, false);
2227
total_update_event(event_client, sgeE_CENTRY_LIST, master_table, false);
2228
total_update_event(event_client, sgeE_CONFIG_LIST, master_table, false);
2229
total_update_event(event_client, sgeE_EXECHOST_LIST, master_table, false);
2230
total_update_event(event_client, sgeE_JOB_LIST, master_table, false);
2231
total_update_event(event_client, sgeE_JOB_SCHEDD_INFO_LIST, master_table, false);
2232
total_update_event(event_client, sgeE_MANAGER_LIST, master_table, false);
2233
total_update_event(event_client, sgeE_OPERATOR_LIST, master_table, false);
2234
total_update_event(event_client, sgeE_PE_LIST, master_table, false);
2235
total_update_event(event_client, sgeE_CQUEUE_LIST, master_table, false);
2236
total_update_event(event_client, sgeE_SCHED_CONF, master_table, false);
2237
total_update_event(event_client, sgeE_SUBMITHOST_LIST, master_table, false);
2238
total_update_event(event_client, sgeE_USERSET_LIST, master_table, false);
2239
total_update_event(event_client, sgeE_NEW_SHARETREE, master_table, false);
2240
total_update_event(event_client, sgeE_PROJECT_LIST, master_table, false);
2241
total_update_event(event_client, sgeE_USER_LIST, master_table, false);
2242
total_update_event(event_client, sgeE_HGROUP_LIST, master_table, false);
2243
total_update_event(event_client, sgeE_RQS_LIST, master_table, false);
2244
total_update_event(event_client, sgeE_AR_LIST, master_table, false);
2245
#ifndef __SGE_NO_USERMAPPING__
2246
total_update_event(event_client, sgeE_CUSER_LIST, master_table, false);
2252
} /* total_update() */
2255
/****** evm/sge_event_master/build_subscription() ******************************
2257
* build_subscription() -- generates an array out of the cull registration
2261
* static void build_subscription(lListElem *event_el)
2264
* generates an array out of the cull registration
2265
* structure. The array contains all event elements and each of them
2266
* has an identifier, if it is subscribed or not. Before that is done, it is
2267
* tested, the EV_changed flag is set. If not, the function simply returns.
2271
* lListElem *event_el - the event element, which event structure will be transformed
2273
*******************************************************************************/
2274
static void build_subscription(lListElem *event_el)
2276
lList *subscription = lGetList(event_el, EV_subscribed);
2277
lListElem *sub_el = NULL;
2278
subscription_t *sub_array = NULL;
2279
subscription_t *old_sub_array = NULL;
2282
DENTER(TOP_LAYER, "build_subscription");
2284
if (!lGetBool(event_el, EV_changed)) {
2288
DPRINTF(("rebuild event mask for client(id): %s("sge_u32")\n", lGetString(event_el, EV_name), lGetUlong(event_el, EV_id)));
2290
sub_array = (subscription_t *) malloc(sizeof(subscription_t) * sgeE_EVENTSIZE);
2291
memset(sub_array, 0, sizeof(subscription_t) * sgeE_EVENTSIZE);
2293
for (i = 0; i < sgeE_EVENTSIZE; i++) {
2294
sub_array[i].subscription = EV_NOT_SUBSCRIBED;
2295
sub_array[i].blocked = false;
2298
for_each(sub_el, subscription) {
2299
const lListElem *temp = NULL;
2300
u_long32 event = lGetUlong(sub_el, EVS_id);
2302
sub_array[event].subscription = EV_SUBSCRIBED;
2303
sub_array[event].flush = lGetBool(sub_el, EVS_flush) ? true : false;
2304
sub_array[event].flush_time = lGetUlong(sub_el, EVS_interval);
2306
if ((temp = lGetObject(sub_el, EVS_where))) {
2307
sub_array[event].where = lWhereFromElem(temp);
2310
if ((temp = lGetObject(sub_el, EVS_what))) {
2311
sub_array[event].what = lWhatFromElem(temp);
2315
old_sub_array = lGetRef(event_el, EV_sub_array);
2317
if (old_sub_array) {
2319
for (i = 0; i < sgeE_EVENTSIZE; i++) {
2320
lFreeWhere(&(old_sub_array[i].where));
2321
lFreeWhat(&(old_sub_array[i].what));
2322
if (old_sub_array[i].descr){
2323
cull_hash_free_descr(old_sub_array[i].descr);
2324
free(old_sub_array[i].descr);
2327
free(old_sub_array);
2330
lSetRef(event_el, EV_sub_array, sub_array);
2331
lSetBool(event_el, EV_changed, false);
2334
} /* build_subscription() */
2336
/****** Eventclient/Server/check_send_new_subscribed_list() ********************
2338
* check_send_new_subscribed_list() -- check suscription for new list events
2342
* check_send_new_subscribed_list(const subscription_t *old_subscription,
2343
* const subscription_t *new_subscription,
2344
* lListElem *event_client,
2348
* Checks, if sgeE*_LIST events have been added to the subscription of a
2349
* certain event client. If yes, send these lists to the event client.
2352
* const subscription_t *old_subscription - former subscription
2353
* const subscription_t *new_subscription - new subscription
2354
* lListElem *event_client - the event client object
2355
* ev_event event - the event to check
2356
* object_description *master_table - master list table
2359
* Eventclient/Server/total_update_event()
2361
*******************************************************************************/
2363
check_send_new_subscribed_list(const subscription_t *old_subscription,
2364
const subscription_t *new_subscription,
2365
lListElem *event_client, ev_event event,
2366
object_description *master_table)
2368
if ((new_subscription[event].subscription == EV_SUBSCRIBED) &&
2369
(old_subscription[event].subscription == EV_NOT_SUBSCRIBED)) {
2370
total_update_event(event_client, event, master_table, true);
2374
/****** Eventclient/Server/eventclient_subscribed() ************************
2376
* eventclient_subscribed() -- has event client subscribed an event?
2379
* #include "sge_event_master.h"
2382
* eventclient_subscribed(const lListElem *event_client, ev_event event)
2385
* Checks if the given event client has a certain event subscribed.
2386
* For event clients that use session filtering additional conditions
2387
* must be fulfilled otherwise the event counts not as subscribed.
2390
* const lListElem *event_client - event client to check
2391
* ev_event event - event to check
2392
* const char *session - session key of this event
2395
* int - 0 = not subscribed, 1 = subscribed
2398
* Eventclient/-Session filtering
2399
*******************************************************************************/
2400
static int eventclient_subscribed(const lListElem *event_client, ev_event event,
2401
const char *session)
2403
const subscription_t *subscription = NULL;
2404
const char *ec_session = NULL;
2406
DENTER(TOP_LAYER, "eventclient_subscribed");
2408
SGE_ASSERT(event_client != NULL);
2410
if (event_client == NULL) {
2414
subscription = lGetRef(event_client, EV_sub_array);
2415
ec_session = lGetString(event_client, EV_session);
2417
if (subscription == NULL) {
2418
DPRINTF(("No subscription!\n"));
2424
/* events that belong to a specific session are not subscribed
2425
in case the event client is not interested in that session */
2426
if (strcmp(session, ec_session)) {
2427
DPRINTF(("Event session does not match client session\n"));
2431
/* events that do not belong to a specific session are not
2432
subscribed if the event client is interested in events of a
2434
The only exception are list events, because list events do not
2435
belong to a specific session. These events require filtering on
2436
a more fine grained level */
2437
if (!IS_TOTAL_UPDATE_EVENT(event)) {
2442
if ((subscription[event].subscription == EV_SUBSCRIBED) &&
2443
(subscription[event].blocked == false)) {
2450
/****** evm/sge_event_master/purge_event_list() ********************************
2452
* purge_event_list() -- purge event list
2455
* static int purge_event_list(lList* aList, ev_event event_number)
2458
* Remove all events from 'aList' which do have an event id less than or
2459
* equal to 'event_number'.
2462
* lList* aList - event list
2463
* ev_event event_number - event
2466
* int - number of events purged.
2469
* MT-NOTE: purge_event_list() is NOT MT safe.
2471
* MT-NOTE: Do not call this function without having 'aList' locked!
2474
* BUGBUG-AD: If 'event_number' == 0, not events will be purged. However zero is
2475
* BUGBUG-AD: also the id of 'sgeE_ALL_EVENTS'. Is this behaviour correct?
2477
*******************************************************************************/
2478
static int purge_event_list(lList *event_list, u_long32 event_number)
2480
int purged = 0, pos = 0;
2481
lListElem *ev = NULL;
2483
DENTER(TOP_LAYER, "purge_event_list");
2485
if (event_number == 0) {
2489
pos = lGetPosInDescr(ET_Type, ET_number);
2490
ev = lFirst(event_list);
2492
while (ev != NULL) {
2493
lListElem *tmp = ev;
2495
ev = lNext(ev); /* fetch next event, before the old one will be deleted */
2497
if (lGetPosUlong(tmp, pos) > event_number) {
2501
lRemoveElem(event_list, &tmp);
2506
} /* remove_events_from_client() */
2508
static void add_list_event_direct(lListElem *event_client, lListElem *event,
2513
lListElem *ep = NULL;
2514
ev_event type = (ev_event)lGetUlong(event, ET_type);
2515
subscription_t *subscription = NULL;
2517
dstring buffer_wrapper;
2519
const lCondition *selection = NULL;
2520
const lEnumeration *fields = NULL;
2521
const lDescr *descr = NULL;
2522
bool internal_client = false;
2524
DENTER(TOP_LAYER, "add_list_event_direct");
2526
SGE_ASSERT(event_client != NULL);
2528
if (lGetUlong(event_client, EV_state) != EV_connected) {
2529
/* the event client is not connected anymore, so we are not
2530
adding new events to it*/
2537
/* detect internal event clients */
2538
if (lGetRef(event_client, EV_update_function) != NULL) {
2539
internal_client = true;
2542
/* if the total updates blocked the client, we have to unblock this list */
2543
blockEvents(event_client, type, false);
2545
sge_dstring_init(&buffer_wrapper, buffer, sizeof(buffer));
2547
/* Pull out payload for selecting */
2548
lXchgList(event, ET_new_version, &lp);
2550
/* If the list is NULL, no need to bother with any of this. Plus, if we
2551
* did do this part with a NULL list, the check for a clp with no
2552
* elements would kick us out. */
2554
subscription = lGetRef(event_client, EV_sub_array);
2555
selection = subscription[type].where;
2556
fields = subscription[type].what;
2559
DPRINTF(("deliver event: %d with where filter=%s and what filter=%s\n",
2560
type, selection?"true":"false", fields?"true":"false"));
2564
descr = getDescriptorL(subscription, lp, type);
2566
DPRINTF(("Reducing event data\n"));
2568
if (!list_select(subscription, type, &clp, lp, selection, fields,
2569
descr, internal_client)) {
2570
clp = lSelectDPack("updating list", lp, selection, descr,
2571
fields, internal_client, NULL, NULL);
2574
/* no elements in the event list, no need for an event */
2575
if (!SEND_EVENTS[type] && lGetNumberOfElem(clp) == 0) {
2580
/* we are not making a copy, so we have to restore the old element */
2581
lXchgList(event, ET_new_version, &lp);
2587
DPRINTF(("Skipping event because it has no content for this client.\n"));
2591
/* If we're not making a copy, we have to free the original list. If
2592
* we are making a copy, freeing the list is the responsibility of the
2593
* calling function. */
2597
} else if (copy_event) {
2598
/* If there's no what clause, and we want a copy, we copy the list */
2599
DPRINTF(("Copying event data\n"));
2600
clp = lCopyListHash(lGetListName(lp), lp, internal_client);
2602
/* If there's no what clause, and we don't want to copy, we just reuse
2603
* the original list. */
2605
if (internal_client) {
2606
cull_hash_create_hashtables(clp);
2608
/* Make sure lp is clear for the next part. */
2613
/* If we're making a copy, copy the event and swap the orignial list
2614
* back into the original event */
2616
DPRINTF(("Copying event\n"));
2617
ep = lCopyElemHash(event, false);
2619
lXchgList(event, ET_new_version, &lp);
2621
/* If we're not making a copy, reuse the original event. */
2625
/* Swap the new list into the working event. */
2626
lXchgList(ep, ET_new_version, &clp);
2628
/* fill in event number and increment
2629
EV_next_number of event recipient */
2630
i = lGetUlong(event_client, EV_next_number);
2631
lSetUlong(event_client, EV_next_number, (i + 1));
2632
lSetUlong(ep, ET_number, i);
2634
/* build a new event list if not exists */
2635
lp = lGetList(event_client, EV_events);
2638
lp=lCreateListHash("Events", ET_Type, false);
2639
lSetList(event_client, EV_events, lp);
2642
/* chain in new event */
2643
lAppendElem(lp, ep);
2645
DPRINTF(("%d %s\n", lGetUlong(event_client, EV_id),
2646
event_text(ep, &buffer_wrapper)));
2648
/* check if event clients wants flushing */
2649
subscription = lGetRef(event_client, EV_sub_array);
2651
if (type == sgeE_QMASTER_GOES_DOWN) {
2652
Event_Master_Control.is_prepare_shutdown = true;
2653
lSetUlong(event_client, EV_busy, 0); /* can't be too busy for shutdown */
2654
flush_events(event_client, 0);
2655
} else if (type == sgeE_SHUTDOWN) {
2656
flush_events(event_client, 0);
2657
/* the event client should be shutdown, so we do not add any events to it, after
2658
the shutdown event */
2659
lSetUlong(event_client, EV_state, EV_closing);
2660
} else if (subscription[type].flush) {
2661
DPRINTF(("flushing event client\n"));
2662
flush_events(event_client, subscription[type].flush_time);
2668
/****** Eventclient/Server/total_update_event() *******************************
2670
* total_update_event() -- create a total update event
2674
* total_update_event(lListElem *event_client, ev_event type)
2677
* Creates an event delivering a certain list of objects for an event client.
2678
* For event clients that have subscribed a session list filtering can be done
2682
* lListElem *event_client - event client to receive the list
2683
* ev_event type - event describing the list to update
2684
* object_description *object_base - master list table
2686
*******************************************************************************/
2687
static void total_update_event(lListElem *event_client, ev_event type, object_description *object_base,
2688
bool new_subscription)
2690
lList *lp = NULL; /* lp should be set, if we have to make a copy */
2691
lList *copy_lp = NULL; /* copy_lp should be used for a copy of the org. list */
2693
dstring buffer_wrapper;
2696
DENTER(TOP_LAYER, "total_update_event");
2698
SGE_ASSERT(event_client != NULL);
2700
sge_dstring_init(&buffer_wrapper, buffer, sizeof(buffer));
2701
id = lGetUlong(event_client, EV_id);
2703
/* This test bothers me. Technically, the GDI thread should just drop the
2704
* event in the send queue and forget about it. However, doing this test
2705
* here could prevent the queuing of events that will later be determined
2707
if (new_subscription || eventclient_subscribed(event_client, type, NULL)) {
2709
case sgeE_ADMINHOST_LIST:
2710
lp = *object_base[SGE_TYPE_ADMINHOST].list;
2712
case sgeE_CALENDAR_LIST:
2713
lp = *object_base[SGE_TYPE_CALENDAR].list;
2715
case sgeE_CKPT_LIST:
2716
lp = *object_base[SGE_TYPE_CKPT].list;
2718
case sgeE_CENTRY_LIST:
2719
lp = *object_base[SGE_TYPE_CENTRY].list;
2721
case sgeE_CONFIG_LIST:
2722
/* sge_get_configuration() returns a copy already, we do not need to make
2724
lp = *object_base[SGE_TYPE_CONFIG].list;
2726
case sgeE_EXECHOST_LIST:
2727
lp = *object_base[SGE_TYPE_EXECHOST].list;
2730
lp = *object_base[SGE_TYPE_JOB].list;
2732
case sgeE_JOB_SCHEDD_INFO_LIST:
2733
lp = *object_base[SGE_TYPE_JOB_SCHEDD_INFO].list;
2735
case sgeE_MANAGER_LIST:
2736
lp = *object_base[SGE_TYPE_MANAGER].list;
2738
case sgeE_NEW_SHARETREE:
2739
lp = *object_base[SGE_TYPE_SHARETREE].list;
2741
case sgeE_OPERATOR_LIST:
2742
lp = *object_base[SGE_TYPE_OPERATOR].list;
2745
lp = *object_base[SGE_TYPE_PE].list;
2747
case sgeE_PROJECT_LIST:
2748
lp = *object_base[SGE_TYPE_PROJECT].list;
2750
case sgeE_CQUEUE_LIST:
2751
lp = *object_base[SGE_TYPE_CQUEUE].list;
2753
case sgeE_SCHED_CONF:
2754
copy_lp = sconf_get_config_list();
2756
case sgeE_SUBMITHOST_LIST:
2757
lp = *object_base[SGE_TYPE_SUBMITHOST].list;
2759
case sgeE_USER_LIST:
2760
lp = *object_base[SGE_TYPE_USER].list;
2762
case sgeE_USERSET_LIST:
2763
lp = *object_base[SGE_TYPE_USERSET].list;
2765
case sgeE_HGROUP_LIST:
2766
lp = *object_base[SGE_TYPE_HGROUP].list;
2769
lp = *object_base[SGE_TYPE_RQS].list;
2772
lp = *object_base[SGE_TYPE_AR].list;
2774
#ifndef __SGE_NO_USERMAPPING__
2775
case sgeE_CUSER_LIST:
2776
lp = *object_base[SGE_TYPE_CUSER].list;
2780
WARNING((SGE_EVENT, MSG_EVE_TOTALUPDATENOTHANDLINGEVENT_I, type));
2785
copy_lp = lCopyListHash(lGetListName(lp), lp, false);
2788
/* 'send_events()' will free the copy of 'lp' */
2789
add_list_event_for_client(id, 0, type, 0, 0, NULL, NULL, NULL,
2794
} /* total_update_event() */
2797
/****** evm/sge_event_master/list_select() *************************************
2799
* list_select() -- makes a reduced job list dublication
2802
* static bool list_select(subscription_t *subscription, int type, lList
2803
* **reduced_lp, lList *lp, const lCondition *selection, const lEnumeration
2804
* *fields, const lDescr *descr, bool do_hash)
2807
* Only works on job events. All others are ignored. The job events
2808
* need some special handling and this is done in this function. The
2809
* JAT_Type list can be subscribed by its self and it is also part
2810
* of the JB_Type. If a JAT_Type filter is set, this function also
2811
* filters the JAT_Type lists in the JB_Type lists.
2814
* subscription_t *subscription - subscription array
2815
* int type - event type
2816
* lList **reduced_lp - target list (has to be an empty list)
2817
* lList *lp - source list (will be modified)
2818
* const lCondition *selection - where filter
2819
* const lEnumeration *fields - what filter
2820
* const lDescr *descr - reduced descriptor
2821
* bool do_hash - create hash tables in the target list
2824
* static bool - true, if it was a job event
2826
*******************************************************************************/
2827
static bool list_select(subscription_t *subscription, int type,
2828
lList **reduced_lp, lList *lp,
2829
const lCondition *selection, const lEnumeration *fields,
2830
const lDescr *descr, bool do_hash)
2836
DENTER(TOP_LAYER, "list_select");
2838
for (entry_counter = 0; entry_counter < LIST_MAX; entry_counter++) {
2840
while (EVENT_LIST[entry_counter][++event_counter] != -1) {
2841
if (type == EVENT_LIST[entry_counter][event_counter]) {
2845
while (SOURCE_LIST[entry_counter][++i] != -1) {
2846
if (subscription[SOURCE_LIST[entry_counter][i]].what) {
2847
sub_type = SOURCE_LIST[entry_counter][i];
2852
if (sub_type != -1) {
2853
lListElem *element = NULL;
2854
lListElem *reduced_el = NULL;
2857
*reduced_lp = lCreateListHash("update", descr, do_hash);
2859
for_each(element, lp) {
2860
reduced_el = elem_select(subscription, element,
2861
FIELD_LIST[entry_counter], selection,
2862
fields, descr, sub_type);
2864
lAppendElem(*reduced_lp, reduced_el);
2867
DPRINTF(("no sub type filter specified\n"));
2877
/****** sge_event_master/elem_select() ******************************************
2879
* elem_select() -- makes a reduced copy of an element with reducing sublists
2883
* static lListElem *elem_select(subscription_t *subscription, lListElem *element,
2884
* const int ids[], const lCondition *selection,
2885
* const lEnumeration *fields, const lDescr *dp, int sub_type)
2888
* The function will apply the given filters for the element. Before the element
2889
* is reduced, all attribute sub lists named in "ids" will be removed from the list and
2890
* reduced. The reduced sub lists will be added the the reduced element and the original
2891
* element will be restored. The sub-lists will only be reduced, if the reduced element
2892
* still contains their attributes.
2895
* subscription_t *subscription - subscription array
2896
* lListElem *element - the element to reduce
2897
* const int ids[] - attribute with sublists to be reduced as well
2898
* const lCondition *selection - where filter
2899
* const lEnumeration *fields - what filter
2900
* const lDescr *descr - reduced descriptor
2901
* int sub_type - list type of the sublists.
2904
* bool - the reduced element, or NULL if something went wrong
2907
* MT-NOTE: works only on the variables -> thread save
2909
*******************************************************************************/
2910
static lListElem *elem_select(subscription_t *subscription, lListElem *element,
2911
const int ids[], const lCondition *selection,
2912
const lEnumeration *fields, const lDescr *dp, int sub_type)
2914
const lCondition *sub_selection = NULL;
2915
const lEnumeration *sub_fields = NULL;
2916
const lDescr *sub_descr = NULL;
2918
lListElem *el = NULL;
2921
DENTER(TOP_LAYER, "elem_select");
2923
if (element == NULL) {
2927
if (sub_type <= sgeE_ALL_EVENTS || sub_type >= sgeE_EVENTSIZE) {
2928
/* TODO: SG: add error message */
2929
DPRINTF(("wrong event sub type\n"));
2933
/* get the filters for the sub lists */
2934
if (sub_type >= 0) {
2935
sub_selection = subscription[sub_type].where;
2936
sub_fields = subscription[sub_type].what;
2939
if (sub_fields) { /* do we have a sub list filter, otherwise ... */
2942
/* allocate memory to store the sub-lists, which should be handeled special */
2943
while (ids[ids_size] != -1) {
2946
sub_list = malloc(ids_size * sizeof(lList*));
2947
memset(sub_list, 0 , ids_size * sizeof(lList*));
2949
/* remove the sub-lists from the main element */
2950
for(counter = 0; counter < ids_size; counter ++) {
2951
lXchgList(element, ids[counter], &(sub_list[counter]));
2954
/* get descriptor for reduced sub-lists */
2956
for(counter = 0; counter < ids_size; counter ++) {
2957
if (sub_list[counter]) {
2958
sub_descr = getDescriptorL(subscription, sub_list[counter], sub_type);
2964
/* copy the main list */
2966
/* there might be no filter for the main element, but for the sub-lists */
2967
el = lCopyElemHash(element, false);
2969
/* for some reason, we did not get a descriptor for the target element */
2970
el = lSelectElemPack(element, selection, fields, false, NULL);
2972
el = lSelectElemDPack(element, selection, dp, fields, false, NULL, NULL);
2975
/* if we have a new reduced main element */
2977
/* copy the sub-lists, if they are still part of the reduced main element */
2978
for (counter = 0; counter < ids_size; counter ++) {
2979
if (sub_list[counter] && (lGetPosViaElem(el, ids[counter], SGE_NO_ABORT) != -1)) {
2980
lSetList(el, ids[counter],
2981
lSelectDPack("", sub_list[counter], sub_selection,
2982
sub_descr, sub_fields, false, NULL, NULL));
2987
/* restore the old sub_list */
2988
for (counter = 0; counter < ids_size; counter ++) {
2989
lXchgList(element, ids[counter], &(sub_list[counter]));
2994
DPRINTF(("no sub filter specified\n"));
2995
el = lSelectElemDPack(element, selection, dp, fields, false, NULL, NULL);
3001
/****** Eventclient/Server/eventclient_list_locate() **************************
3003
* eventclient_list_locate_by_adress() -- search event client by adress
3006
* #include "sge_event_master.h"
3009
* eventclient_list_locate_by_adress(const char *host,
3010
* const char *commproc, u_long32 id)
3013
* Searches the event client list for an event client with the
3014
* specified commlib adress.
3015
* Returns a pointer to the event client object or
3016
* NULL, if no such event client is registered.
3019
* const char *host - hostname of the event client to search
3020
* const char *commproc - commproc of the event client to search
3021
* u_long32 id - id of the event client to search
3024
* lListElem* - event client object or NULL.
3028
*******************************************************************************/
3030
eventclient_list_locate_by_adress(const char *host, const char *commproc,
3035
DENTER(TOP_LAYER, "eventclient_list_locate_by_adress");
3037
for_each(ep, Event_Master_Control.clients) {
3038
if (lGetUlong(ep, EV_commid) == id &&
3039
!sge_hostcmp(lGetHost(ep, EV_host), host) &&
3040
!strcmp(lGetString(ep, EV_commproc), commproc)) {
3048
/****** sge_event_master/getDescriptorL() **************************************
3050
* getDescriptorL() -- returns a reduced desciptor
3053
* static const lDescr* getDescriptorL(subscription_t *subscription, const
3054
* lList* list, int type)
3060
* subscription_t *subscription - subscription array
3061
* const lList* list - source list
3062
* int type - event type
3065
* static const lDescr* - reduced descriptor or NULL, if no what exists
3068
* MT-NOTE: thread save, works only on the submitted variables.
3069
*******************************************************************************/
3070
static const lDescr* getDescriptorL(subscription_t *subscription,
3071
const lList* list, int type)
3073
const lDescr *dp = NULL;
3075
if (subscription[type].what) {
3076
if (!(dp = subscription[type].descr)) {
3077
subscription[type].descr = lGetReducedDescr(lGetListDescr(list),
3078
subscription[type].what);
3079
dp = subscription[type].descr;
3086
/****** sge_event_master/get_event_client() ************************************
3088
* get_event_client() -- gets the event client from the list
3091
* static lListElem *get_event_client(u_long32 id)
3094
* Returns the event client with the given id, or NULL if no such event
3098
* u_long32 id - the client id
3101
* The event client with the given id, or NULL if no such event client
3105
* MT-NOTE: NOT thread safe. Requires caller to hold Event_Master_Control.mutex.
3106
*******************************************************************************/
3107
static lListElem *get_event_client(u_long32 id)
3109
lListElem *client = NULL;
3111
client = lGetElemUlong(Event_Master_Control.clients, EV_id, id);
3116
/****** sge_event_master/allocate_new_dynamic_id() *******************************
3118
* allocate_new_dynamic_id() -- gets a new dynamic id
3121
* static u_long32 allocate_new_dynamic_id(void)
3124
* Returns the next available dynamic event client id. The id returned will
3125
* be between EV_ID_FIRST_DYNAMIC and Event_Master_Control.max_event_clients +
3126
* EV_ID_FIRST_DYNAMIC.
3129
* The next available dynamic event client id.
3132
* MT-NOTE: allocate_new_dynamic_id() is thread safe,
3133
* when the caller holds Event_Master_Control.mutex.
3134
*******************************************************************************/
3136
allocate_new_dynamic_id(lList **answer_list)
3140
DENTER(TOP_LAYER, "allocate_new_dynamic_id");
3142
if (lGetNumberOfElem(Event_Master_Control.clients) < Event_Master_Control.max_event_clients) {
3143
id = range_list_get_first_id(Event_Master_Control.client_ids, answer_list);
3145
range_list_remove_id(&Event_Master_Control.client_ids, answer_list, id);
3146
/* compress the range list to reduce fragmentation */
3147
range_list_compress(Event_Master_Control.client_ids);
3150
ERROR((SGE_EVENT, MSG_TO_MANY_DYNAMIC_EC_U, sge_u32c( Event_Master_Control.max_event_clients)));
3151
answer_list_add(answer_list, SGE_EVENT, STATUS_ESEMANTIC, ANSWER_QUALITY_ERROR);
3158
free_dynamic_id(lList **answer_list, u_long32 id)
3160
if (id < Event_Master_Control.max_event_clients + EV_ID_FIRST_DYNAMIC) {
3161
range_list_insert_id(&Event_Master_Control.client_ids, answer_list, id);
3163
/* compress the range list to reduce fragmentation */
3164
range_list_compress(Event_Master_Control.client_ids);
3168
/****** sge_event_master/sge_commit() ****************************************
3170
* sge_commit() -- Commit the queued events
3173
* bool sge_commit(void)
3176
* Sends any events that this thread currently has queued and clears the
3180
* Whether the call succeeded.
3183
* MT-NOTE: sge_commit is thread safe.
3184
*******************************************************************************/
3185
bool sge_commit(void)
3189
DENTER(TOP_LAYER, "sge_commit");
3191
/* need a new C block, as the GET_SPECIFIC macro declares new variables */
3193
GET_SPECIFIC(event_master_transaction_t, t_store, sge_event_master_init_transaction_store, Event_Master_Control.transaction_key, "t_store");
3194
if (t_store->is_transaction) {
3195
t_store->is_transaction = false;
3197
if (lGetNumberOfElem(t_store->transaction_requests) > 0) {
3198
sge_mutex_lock("event_master_request_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.request_mutex);
3199
lAppendList(Event_Master_Control.requests, t_store->transaction_requests);
3200
sge_mutex_unlock("event_master_request_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.request_mutex);
3205
WARNING((SGE_EVENT, "attempting to commit an event master transaction, but no transaction is open"));
3213
/****** sge_event_master/blockEvents() ****************************************
3215
* blockEvents() -- blocks or unblocks events
3218
* void blockEvents(lListElem *event_client, int ev_type, bool isBlock)
3221
* In case that global update events have to be send, this function
3222
* blocks all events for that list, or unblocks it.
3225
* lListElem *event_client : event client to modify
3226
* ev_event ev_type : the event_lists to unblock or -1
3227
* bool isBlock : true: block events, false: unblock
3230
* MT-NOTE: sge_commit is thread safe.
3231
*******************************************************************************/
3232
static void blockEvents(lListElem *event_client, ev_event ev_type, bool isBlock) {
3233
subscription_t *sub_array = lGetRef(event_client, EV_sub_array);
3235
if (sub_array != NULL) {
3237
if (ev_type == sgeE_ALL_EVENTS) { /* block all subscribed events, for which are list events subscribed */
3238
while (total_update_events[++i] != -1) {
3239
if (sub_array[total_update_events[i]].subscription == EV_SUBSCRIBED) {
3241
while (block_events[i][++y] != -1) {
3242
sub_array[block_events[i][y]].blocked = isBlock;
3247
while (total_update_events[++i] != -1) {
3248
if (total_update_events[i] == ev_type) {
3250
while (block_events[i][++y] != -1) {
3251
sub_array[block_events[i][y]].blocked = isBlock;
3260
/****** sge_event_master/set_flush() *******************************************
3262
* set_flush() -- Flush all events
3265
* void set_flush(void)
3268
* Flushes all pending events
3271
* MT-NOTE: set_flush is thread safe.
3272
*******************************************************************************/
3273
static void set_flush(void)
3275
DENTER(TOP_LAYER, "set_flush");
3277
sge_mutex_lock("event_master_cond_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.cond_mutex);
3278
if (!Event_Master_Control.delivery_signaled) {
3279
Event_Master_Control.delivery_signaled = true;
3280
pthread_cond_signal(&Event_Master_Control.cond_var);
3282
sge_mutex_unlock("event_master_cond_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.cond_mutex);
3287
/****** sge_event_master/sge_set_commit_required() *****************************
3289
* sge_set_commit_required() -- Require commits (make multipe object changes atomic)
3292
* void sge_set_commit_required()
3295
* Enables transactions on events. So far a rollback is not supported. It allows to accumulate
3296
* events, while multiple objects are modified and events are issued and to submit them as
3297
* one event package. There can only be one event session open at a time. The transaction_mutex
3298
* will block multiple calles to this method.
3299
* The method cannot be called recursivly, and sge_commit has to be called to close the transaction.
3303
* MT-NOTE: sge_set_commit_required is thread safe. Transactional event
3304
* processing is handled for each thread individually.
3305
*******************************************************************************/
3306
void sge_set_commit_required(void)
3308
DENTER(TOP_LAYER,"sge_set_commit_required");
3310
/* need a new C block, as the GET_SPECIFIC macro declares new variables */
3312
GET_SPECIFIC(event_master_transaction_t, t_store, sge_event_master_init_transaction_store, Event_Master_Control.transaction_key, "t_store");
3313
if (t_store->is_transaction) {
3314
WARNING((SGE_EVENT, "attempting to open a new event master transaction, but we already have a transaction open"));
3316
t_store->is_transaction = true;
3323
void sge_event_master_process_requests(monitoring_t *monitor)
3325
lList *requests = NULL;
3327
DENTER(TOP_LAYER, "sge_event_master_process_requests");
3330
* get the request list
3331
* put a new empty list in place to allow new requests while we process the old ones
3333
sge_mutex_lock("event_master_request_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.request_mutex);
3335
if (lGetNumberOfElem(Event_Master_Control.requests) > 0) {
3336
requests = Event_Master_Control.requests;
3337
Event_Master_Control.requests = lCreateListHash("Event Master Requests", EVR_Type, false);
3340
sge_mutex_unlock("event_master_request_mutex", SGE_FUNC, __LINE__, &Event_Master_Control.request_mutex);
3342
/* if there have been any requests - process them */
3343
if (requests != NULL) {
3344
lListElem *request = NULL;
3346
while ((request = lFirst(requests)) != NULL) {
3347
DPRINTF(("processing event master request: %d\n", lGetUlong(request, EVR_operation)));
3348
switch (lGetUlong(request, EVR_operation)) {
3350
sge_event_master_process_add_event_client(request, monitor);
3353
sge_event_master_process_mod_event_client(request, monitor);
3358
sge_event_master_process_send(request, monitor);
3361
sge_event_master_process_ack(request, monitor);
3365
lRemoveElem(requests, &request);
3368
lFreeList(&requests);