2
* Copyright (C) 2006-2009 Citrix Systems Inc.
4
* This program is free software; you can redistribute it and/or modify
5
* it under the terms of the GNU Lesser General Public License as published
6
* by the Free Software Foundation; version 2.1 only. with the special
7
* exception on linking described in file LICENSE.
9
* This program is distributed in the hope that it will be useful,
10
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12
* GNU Lesser General Public License for more details.
20
module D=Debug.Debugger(struct let name="xapi_event" end)
23
let message_get_since_for_events : (__context:Context.t -> float -> (float * (API.ref_message * API.message_t) list)) ref = ref ( fun ~__context _ -> ignore __context; (0.0, []))
25
(** Limit the event queue to this many events: *)
26
let max_stored_events = 500
27
(** Limit the maximum age of an event in the event queue to this value in seconds: *)
28
let max_event_age = 15. *. 60. (* 15 minutes *)
30
(** Ordered list of events, newest first *)
32
(** Monotonically increasing event ID. One higher than the highest event ID in the queue *)
34
(** When we GC events we track how many we've deleted so we can send an error to the client *)
35
let highest_forgotten_id = ref (-1L)
37
(** Types used to store user event subscriptions: ***********************************************)
39
| Class of string (** subscribe to all events for objects of this class *)
40
| All (** subscribe to everything *)
42
let subscription_of_string x = if x = "*" then All else Class (String.lowercase x)
44
let event_matches subs ty = List.mem All subs || (List.mem (Class (String.lowercase ty)) subs)
46
(** Every session that calls 'register' gets a subscription*)
47
type subscription_record = {
48
mutable last_id: int64; (** last event ID to sent to this client *)
49
mutable last_timestamp : float; (** Time at which the last event was sent (for messages) *)
50
mutable last_generation : int64; (** Generation count of the last event *)
51
mutable cur_id: int64; (** Most current generation count relevant to the client - only used in new events mechanism *)
52
mutable subs: subscription list; (** list of all the subscriptions *)
53
m: Mutex.t; (** protects access to the mutable fields in this record *)
54
session: API.ref_session; (** session which owns this subscription *)
55
mutable session_invalid: bool; (** set to true if the associated session has been deleted *)
56
mutable timeout: float; (** Timeout *)
60
(** Thrown if the user requests events which we don't have because we've thrown
61
then away. This should only happen if the client (or network) becomes unresponsive
62
for the max_event_age interval or if more than max_stored_events are produced
63
between successive calls to Event.next (). The client should refresh all its state
64
manually before calling Event.next () again.
66
let events_lost () = raise (Api_errors.Server_error (Api_errors.events_lost, []))
68
let get_current_event_number () =
69
(Db_cache_types.Manifest.generation (Db_cache_types.Database.manifest (Db_ref.get_database (Db_backend.make ()))))
71
(* Mapping of session IDs to lists of subscribed classes *)
72
let subscriptions = Hashtbl.create 10
74
(* Lock protects the global event queue reference and the subscriptions hashtable *)
75
let event_lock = Mutex.create ()
76
let newevents = Condition.create ()
78
(** This function takes a set of events (requested by the client) and removes
79
redundancies (like multiple Mods of the same object or Mod of a deleted object).
80
NB we cannot safely remove object Deletes because a client might receive the Add event
81
if one call and a Del event in a subsequent call.
82
NB events are stored in reverse order. *)
83
let coalesce_events events =
84
(* We're not trying to be super-efficient here, we scan the list multiple times.
85
Let's keep the queue short. *)
86
let refs_of_op op events = List.concat
87
(List.map (function { op = op'; reference = reference } ->
88
if op = op' then [ reference ] else []) events) in
90
let dummy x = { x with op = Dummy } in
92
(* If an object has been deleted, remove any Modification events *)
93
let all_dead = refs_of_op Del events in
94
let events' = List.map (function { op = Mod; reference = reference } as x ->
95
if List.mem reference all_dead
99
(* If one Mod event has been seen, remove the rest (we keep the latest (ie
100
the first one in the list) since the list is reversed)) *)
101
let events' = List.rev (snd (List.fold_left
102
(fun (already_seen, acc) x ->
104
let x = if List.mem x.reference already_seen then dummy x else x in
105
x.reference :: already_seen, x :: acc
106
else already_seen, x :: acc) ([], []) events')) in
108
(* For debugging we may wish to keep the dummy events so we can account for
109
every event ID. However normally we want to zap them. *)
110
let events' = List.filter (fun ev -> ev.op <> Dummy) events' in
111
(* debug "Removed %d redundant events" (List.length events - (List.length events')); *)
114
let event_add ?snapshot ty op reference =
116
let gen_events_for tbl =
117
let objs = List.filter (fun x->x.Datamodel_types.gen_events) (Dm_api.objects_of_api Datamodel.all_api) in
118
let objs = List.map (fun x->x.Datamodel_types.name) objs in
121
if not (gen_events_for ty) then ()
125
let ts = Unix.time () in
126
let op = op_of_string op in
128
Mutex.execute event_lock
130
let ev = { id = !id; ts = ts; ty = String.lowercase ty; op = op; reference = reference;
131
snapshot = snapshot } in
133
let matches_anything = Hashtbl.fold
135
if event_matches s.subs ty
136
then (s.cur_id <- get_current_event_number (); true)
137
else acc) subscriptions false in
138
if matches_anything then begin
139
queue := ev :: !queue;
140
(* debug "Adding event %Ld: %s" (!id) (string_of_event ev); *)
141
id := Int64.add !id Int64.one;
142
Condition.broadcast newevents;
144
(* debug "Dropping event %s" (string_of_event ev) *)
147
(* Remove redundant events from the queue *)
148
(* queue := coalesce_events !queue;*)
150
(* GC the events in the queue *)
151
let young, old = List.partition (fun ev -> ts -. ev.ts <= max_event_age) !queue in
152
let too_many = List.length young - max_stored_events in
153
let to_keep, to_drop = if too_many <= 0 then young, old
155
(* Reverse-sort by ID and preserve the first 'max_stored_events' *)
157
List.chop max_stored_events (List.sort (fun a b -> compare b.id a.id) young) in
158
lucky, old @ unlucky in
160
(* Remember the highest ID of the list of events to drop *)
161
if to_drop <> [] then
162
highest_forgotten_id := (List.hd to_drop).id;
163
(* debug "After event queue GC: keeping %d; dropping %d (highest dropped id = %Ld)"
164
(List.length to_keep) (List.length to_drop) !highest_forgotten_id *)
169
let register_hooks () =
170
Db_action_helper.events_register event_add
172
(** Return the subscription associated with a session, or create a new blank one if none
173
has yet been created. *)
174
let get_subscription ~__context =
175
let session = Context.get_session_id __context in
176
Mutex.execute event_lock
178
if Hashtbl.mem subscriptions session then Hashtbl.find subscriptions session
180
let subscription = { last_id = !id; last_timestamp=(Unix.gettimeofday ()); last_generation=0L; cur_id = 0L; subs = []; m = Mutex.create(); session = session; session_invalid = false; timeout=0.0; } in
181
Hashtbl.replace subscriptions session subscription;
184
(** Raises an exception if the provided session has not already registered for some events *)
185
let assert_subscribed ~__context =
186
let session = Context.get_session_id __context in
187
Mutex.execute event_lock
189
if not(Hashtbl.mem subscriptions session)
190
then raise (Api_errors.Server_error(Api_errors.session_not_registered, [ Context.trackid_of_session (Some session) ])))
192
(** Register an interest in events generated on objects of class <class_name> *)
193
let register ~__context ~classes =
194
let subs = List.map subscription_of_string (List.map String.lowercase classes) in
195
let sub = get_subscription ~__context in
196
Mutex.execute sub.m (fun () -> sub.subs <- subs @ sub.subs)
199
(** Unregister interest in events generated on objects of class <class_name> *)
200
let unregister ~__context ~classes =
201
let subs = List.map subscription_of_string (List.map String.lowercase classes) in
202
let sub = get_subscription ~__context in
204
(fun () -> sub.subs <- List.filter (fun x -> not(List.mem x subs)) sub.subs)
206
(** Is called by the session timeout code *)
207
let on_session_deleted session_id = Mutex.execute event_lock
209
(* Unregister this session if is associated with in imported DB. *)
210
Db_backend.unregister_session session_id;
211
if Hashtbl.mem subscriptions session_id then begin
212
let sub = Hashtbl.find subscriptions session_id in
213
(* Mark the subscription as invalid and wake everyone up *)
214
Mutex.execute sub.m (fun () -> sub.session_invalid <- true);
215
Hashtbl.remove subscriptions session_id;
216
Condition.broadcast newevents;
219
let session_is_invalid sub = Mutex.execute sub.m (fun () -> sub.session_invalid)
221
(** Blocks the caller until the current ID has changed OR the session has been
223
let wait subscription from_id =
224
let result = ref 0L in
225
Mutex.execute event_lock
227
(* NB we occasionally grab the specific session lock while holding the general lock *)
228
while !id = from_id && not (session_is_invalid subscription) do Condition.wait newevents event_lock done;
230
if session_is_invalid subscription
231
then raise (Api_errors.Server_error(Api_errors.session_invalid, [ Ref.string_of subscription.session ]))
234
let wait2 subscription from_id =
235
let timeoutname = Printf.sprintf "event_from_timeout_%s" (Ref.string_of subscription.session) in
236
Mutex.execute event_lock
238
while from_id = subscription.cur_id && not (session_is_invalid subscription) && Unix.gettimeofday () < subscription.timeout
240
Xapi_periodic_scheduler.add_to_queue timeoutname Xapi_periodic_scheduler.OneShot (subscription.timeout -. Unix.gettimeofday () +. 0.5) (fun () -> Condition.broadcast newevents);
241
Condition.wait newevents event_lock;
242
Xapi_periodic_scheduler.remove_from_queue timeoutname
245
if session_is_invalid subscription
246
then raise (Api_errors.Server_error(Api_errors.session_invalid, [ Ref.string_of subscription.session ]))
249
(** Internal function to return a list of events between a start and an end ID.
250
We assume that our 64bit counter never wraps. *)
251
let events_read id_start id_end =
252
let check_ev ev = id_start <= ev.id && ev.id < id_end in
254
let some_events_lost = ref false in
255
let selected_events = Mutex.execute event_lock
257
some_events_lost := !highest_forgotten_id >= id_start;
258
List.find_all (fun ev -> check_ev ev) !queue) in
259
(* Note we may actually retrieve fewer events than we expect because the
260
queue may have been coalesced. *)
261
if !some_events_lost (* is true *) then events_lost ();
263
(* NB queue is kept in reverse order *)
264
List.rev selected_events
266
(** Blocking call which returns the next set of events relevant to this session. *)
267
let rec next ~__context =
268
assert_subscribed ~__context;
270
let subscription = get_subscription ~__context in
272
(* Return a <from_id, end_id> exclusive range that is guaranteed to be specific to this
273
thread. Concurrent calls will grab wholly disjoint ranges. Note the range might be
276
(* Briefly hold both the general and the specific mutex *)
277
Mutex.execute event_lock
278
(fun () -> Mutex.execute subscription.m
280
let last_id = subscription.last_id in
281
(* Bump our last_id counter: these events don't have to be looked at again *)
282
subscription.last_id <- !id ;
284
(* Like grab_range () only guarantees to return a non-empty range by blocking if necessary *)
285
let rec grab_nonempty_range () =
286
let last_id, end_id = grab_range () in
287
if last_id = end_id then begin
288
let (_: int64) = wait subscription end_id in
289
grab_nonempty_range ()
290
end else last_id, end_id in
292
let last_id, end_id = grab_nonempty_range () in
293
(* debug "next examining events in range %Ld <= x < %Ld" last_id end_id; *)
294
(* Are any of the new events interesting? *)
295
let events = events_read last_id end_id in
296
let subs = Mutex.execute subscription.m (fun () -> subscription.subs) in
297
let relevant = List.filter (fun ev -> event_matches subs ev.ty) events in
298
(* debug "number of relevant events = %d" (List.length relevant); *)
299
if relevant = [] then next ~__context
300
else XMLRPC.To.array (List.map xmlrpc_of_event relevant)
302
let from ~__context ~classes ~token ~timeout =
305
match String.split ',' token with
307
(Int64.of_string from, float_of_string from_t)
310
warn "Bad format passed to Event.from: %s" token;
316
(* Temporarily create a subscription for the duration of this call *)
317
let subs = List.map subscription_of_string (List.map String.lowercase classes) in
318
let sub = get_subscription ~__context in
320
sub.timeout <- Unix.gettimeofday () +. timeout;
322
sub.last_timestamp <- from_t;
323
sub.last_generation <- from;
325
Mutex.execute sub.m (fun () -> sub.subs <- subs @ sub.subs);
327
let all_event_tables =
328
let objs = List.filter (fun x->x.Datamodel_types.gen_events) (Dm_api.objects_of_api Datamodel.all_api) in
329
let objs = List.map (fun x->x.Datamodel_types.name) objs in
333
let all_subs = Mutex.execute sub.m (fun () -> Hashtbl.fold (fun _ s acc -> s.subs @ acc) subscriptions []) in
334
let tables = List.filter (fun table -> event_matches all_subs table) all_event_tables in
336
let events_lost = ref [] in
339
let tableset = Db_cache_types.Database.tableset (Db_ref.get_database t) in
340
let (timestamp,messages) =
341
if event_matches all_subs "message" then (!message_get_since_for_events) ~__context sub.last_timestamp else (0.0, []) in
342
(timestamp, messages, tableset, List.fold_left
344
Db_cache_types.Table.fold_over_recent sub.last_generation
345
(fun ctime mtime dtime objref (creates,mods,deletes,last) ->
346
info "last_generation=%Ld cur_id=%Ld" sub.last_generation sub.cur_id;
347
info "ctime: %Ld mtime:%Ld dtime:%Ld objref:%s" ctime mtime dtime objref;
348
let last = max last (max mtime dtime) in (* mtime guaranteed to always be larger than ctime *)
349
if dtime > 0L then begin
350
if ctime > sub.last_generation then
351
(creates,mods,deletes,last) (* It was created and destroyed since the last update *)
353
(creates,mods,(table, objref, dtime)::deletes,last) (* It might have been modified, but we can't tell now *)
355
((if ctime > sub.last_generation then (table, objref, ctime)::creates else creates),
356
(if mtime > sub.last_generation then (table, objref, mtime)::mods else mods),
359
) (fun () -> events_lost := table :: !events_lost) (Db_cache_types.TableSet.find table tableset) acc
360
) ([],[],[],sub.last_generation) tables)
363
let rec grab_nonempty_range () =
364
let (timestamp, messages, tableset, (creates,mods,deletes,last)) as result = Db_lock.with_lock (fun () -> grab_range (Db_backend.make ())) in
365
if List.length creates = 0 && List.length mods = 0 && List.length deletes = 0 && List.length messages = 0 && Unix.gettimeofday () < sub.timeout
368
debug "Waiting more: timeout=%f now=%f" sub.timeout (Unix.gettimeofday ());
370
sub.last_generation <- last; (* Cur_id was bumped, but nothing relevent fell out of the db. Therefore the *)
371
sub.last_timestamp <- timestamp;
372
sub.cur_id <- last; (* last id the client got is equivalent to the current one *)
375
grab_nonempty_range ())
380
let (timestamp, messages, tableset, (creates,mods,deletes,last)) = grab_nonempty_range () in
382
sub.last_generation <- last;
383
sub.last_timestamp <- timestamp;
385
let delevs = List.fold_left (fun acc (table, objref, dtime) ->
386
if event_matches sub.subs table then begin
389
ty=String.lowercase table;
397
let modevs = List.fold_left (fun acc (table, objref, mtime) ->
398
if event_matches sub.subs table then begin
399
let serialiser = Eventgen.find_get_record table in
400
let xml = serialiser ~__context ~self:objref () in
403
ty=String.lowercase table;
411
let createevs = List.fold_left (fun acc (table, objref, ctime) ->
412
if event_matches sub.subs table then begin
413
let serialiser = Eventgen.find_get_record table in
414
let xml = serialiser ~__context ~self:objref () in
417
ty=String.lowercase table;
425
let message_events = List.fold_left (fun acc (_ref,message) ->
426
let objref = Ref.string_of _ref in
427
let xml = API.To.message_t message in
433
snapshot=Some xml } in
434
ev::acc) createevs messages in
436
let valid_ref_counts =
438
(Db_cache_types.TableSet.fold
439
(fun tablename _ _ table acc ->
440
(String.lowercase tablename, XMLRPC.To.int
441
(Db_cache_types.Table.fold
442
(fun r _ _ _ acc -> Int32.add 1l acc) table 0l))::acc)
446
let session = Context.get_session_id __context in
448
on_session_deleted session;
450
XMLRPC.To.structure [("events",XMLRPC.To.array (List.map xmlrpc_of_event message_events));
451
("valid_ref_counts",valid_ref_counts);
452
("token",XMLRPC.To.string (Printf.sprintf "%Ld,%f" last timestamp))
455
let get_current_id ~__context = Mutex.execute event_lock (fun () -> !id)
457
(** Inject an unnecessary update as a heartbeat. This will:
458
1. hopefully prevent some firewalls from silently closing the connection
459
2. allow the server to detect when a client has failed *)
460
let heartbeat ~__context =
464
(* We must hold the database lock since we are sending an update for a real object
465
and we don't want to accidentally transmit an older snapshot. *)
466
let pool = try Some (Helpers.get_pool ~__context) with _ -> None in
469
let pool_r = Db.Pool.get_record ~__context ~self:pool in
470
let pool_xml = API.To.pool_t pool_r in
471
event_add ~snapshot:pool_xml "pool" "mod" (Ref.string_of pool)
472
| None -> () (* no pool object created during initial boot *)
475
error "Caught exception sending event heartbeat: %s" (ExnHelper.string_of_exn e)