~ubuntu-branches/ubuntu/quantal/xen-api/quantal

« back to all changes in this revision

Viewing changes to ocaml/xapi/xapi_event.ml

  • Committer: Package Import Robot
  • Author(s): Jon Ludlam
  • Date: 2011-07-07 21:50:18 UTC
  • Revision ID: package-import@ubuntu.com-20110707215018-3t9ekbh7qy5y2b1p
Tags: upstream-1.3
ImportĀ upstreamĀ versionĀ 1.3

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
(*
 
2
 * Copyright (C) 2006-2009 Citrix Systems Inc.
 
3
 *
 
4
 * This program is free software; you can redistribute it and/or modify
 
5
 * it under the terms of the GNU Lesser General Public License as published
 
6
 * by the Free Software Foundation; version 2.1 only. with the special
 
7
 * exception on linking described in file LICENSE.
 
8
 *
 
9
 * This program is distributed in the hope that it will be useful,
 
10
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 
11
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
12
 * GNU Lesser General Public License for more details.
 
13
 *)
 
14
open Printf
 
15
open Threadext
 
16
open Listext
 
17
open Event_types
 
18
open Stringext
 
19
 
 
20
module D=Debug.Debugger(struct let name="xapi_event" end)
 
21
open D
 
22
 
 
23
let message_get_since_for_events : (__context:Context.t -> float -> (float * (API.ref_message * API.message_t) list)) ref = ref ( fun ~__context _ -> ignore __context; (0.0, []))
 
24
 
 
25
(** Limit the event queue to this many events: *)
 
26
let max_stored_events = 500
 
27
(** Limit the maximum age of an event in the event queue to this value in seconds: *)
 
28
let max_event_age = 15. *. 60. (* 15 minutes *)
 
29
 
 
30
(** Ordered list of events, newest first *)
 
31
let queue = ref []
 
32
(** Monotonically increasing event ID. One higher than the highest event ID in the queue *)
 
33
let id = ref 0L 
 
34
(** When we GC events we track how many we've deleted so we can send an error to the client *)
 
35
let highest_forgotten_id = ref (-1L)
 
36
 
 
37
(** Types used to store user event subscriptions: ***********************************************)
 
38
type subscription = 
 
39
    | Class of string (** subscribe to all events for objects of this class *)
 
40
    | All             (** subscribe to everything *)
 
41
 
 
42
let subscription_of_string x = if x = "*" then All else Class (String.lowercase x)
 
43
 
 
44
let event_matches subs ty = List.mem All subs || (List.mem (Class (String.lowercase ty)) subs)
 
45
 
 
46
(** Every session that calls 'register' gets a subscription*)
 
47
type subscription_record = {
 
48
        mutable last_id: int64;           (** last event ID to sent to this client *)
 
49
        mutable last_timestamp : float;   (** Time at which the last event was sent (for messages) *)
 
50
        mutable last_generation : int64;  (** Generation count of the last event *)
 
51
        mutable cur_id: int64;            (** Most current generation count relevant to the client - only used in new events mechanism *)
 
52
        mutable subs: subscription list;  (** list of all the subscriptions *)
 
53
        m: Mutex.t;                       (** protects access to the mutable fields in this record *)
 
54
        session: API.ref_session;         (** session which owns this subscription *)
 
55
        mutable session_invalid: bool;    (** set to true if the associated session has been deleted *)
 
56
        mutable timeout: float;           (** Timeout *)
 
57
}
 
58
 
 
59
 
 
60
(** Thrown if the user requests events which we don't have because we've thrown
 
61
    then away. This should only happen if the client (or network) becomes unresponsive
 
62
    for the max_event_age interval or if more than max_stored_events are produced 
 
63
    between successive calls to Event.next (). The client should refresh all its state
 
64
    manually before calling Event.next () again.
 
65
*)
 
66
let events_lost () = raise (Api_errors.Server_error (Api_errors.events_lost, []))
 
67
 
 
68
let get_current_event_number () =
 
69
  (Db_cache_types.Manifest.generation (Db_cache_types.Database.manifest (Db_ref.get_database (Db_backend.make ()))))
 
70
 
 
71
(* Mapping of session IDs to lists of subscribed classes *)
 
72
let subscriptions = Hashtbl.create 10
 
73
 
 
74
(* Lock protects the global event queue reference and the subscriptions hashtable *)
 
75
let event_lock = Mutex.create ()
 
76
let newevents = Condition.create ()
 
77
 
 
78
(** This function takes a set of events (requested by the client) and removes
 
79
    redundancies (like multiple Mods of the same object or Mod of a deleted object).
 
80
    NB we cannot safely remove object Deletes because a client might receive the Add event
 
81
    if one call and a Del event in a subsequent call.
 
82
    NB events are stored in reverse order. *)
 
83
let coalesce_events events = 
 
84
        (* We're not trying to be super-efficient here, we scan the list multiple times. 
 
85
           Let's keep the queue short. *)
 
86
        let refs_of_op op events = List.concat 
 
87
                (List.map (function { op = op'; reference = reference } -> 
 
88
                             if op = op' then [ reference ] else []) events) in
 
89
 
 
90
        let dummy x = { x with op = Dummy } in
 
91
 
 
92
        (* If an object has been deleted, remove any Modification events *)
 
93
        let all_dead = refs_of_op Del events in
 
94
        let events' = List.map (function { op = Mod; reference = reference } as x ->
 
95
                                 if List.mem reference all_dead 
 
96
                                 then dummy x else x
 
97
                                | x -> x) events in
 
98
 
 
99
        (* If one Mod event has been seen, remove the rest (we keep the latest (ie 
 
100
           the first one in the list) since the list is reversed)) *)
 
101
        let events' = List.rev (snd (List.fold_left 
 
102
          (fun (already_seen, acc) x -> 
 
103
             if x.op = Mod then
 
104
               let x = if List.mem x.reference already_seen then dummy x else x in
 
105
               x.reference :: already_seen, x :: acc
 
106
             else already_seen, x :: acc) ([], []) events')) in
 
107
 
 
108
        (* For debugging we may wish to keep the dummy events so we can account for 
 
109
           every event ID. However normally we want to zap them. *)
 
110
        let events' = List.filter (fun ev -> ev.op <> Dummy) events' in
 
111
        (* debug "Removed %d redundant events" (List.length events - (List.length events')); *)
 
112
        events'
 
113
 
 
114
let event_add ?snapshot ty op reference  =
 
115
 
 
116
  let gen_events_for tbl =
 
117
    let objs = List.filter (fun x->x.Datamodel_types.gen_events) (Dm_api.objects_of_api Datamodel.all_api) in
 
118
    let objs = List.map (fun x->x.Datamodel_types.name) objs in
 
119
      List.mem tbl objs in
 
120
 
 
121
    if not (gen_events_for ty) then ()
 
122
    else
 
123
      begin
 
124
 
 
125
        let ts = Unix.time () in
 
126
        let op = op_of_string op in
 
127
 
 
128
        Mutex.execute event_lock
 
129
        (fun () ->
 
130
                let ev = { id = !id; ts = ts; ty = String.lowercase ty; op = op; reference = reference; 
 
131
                           snapshot = snapshot } in
 
132
 
 
133
                let matches_anything = Hashtbl.fold
 
134
                        (fun _ s acc ->
 
135
                                 if event_matches s.subs ty
 
136
                                 then (s.cur_id <- get_current_event_number (); true)
 
137
                                 else acc) subscriptions false in
 
138
                if matches_anything then begin
 
139
                        queue := ev :: !queue;
 
140
                        (* debug "Adding event %Ld: %s" (!id) (string_of_event ev); *)
 
141
                        id := Int64.add !id Int64.one;
 
142
                        Condition.broadcast newevents;
 
143
                end else begin
 
144
                        (* debug "Dropping event %s" (string_of_event ev) *)
 
145
                end;
 
146
                
 
147
                (* Remove redundant events from the queue *)
 
148
                (* queue := coalesce_events !queue;*)
 
149
                
 
150
                (* GC the events in the queue *)
 
151
                let young, old = List.partition (fun ev -> ts -. ev.ts <= max_event_age) !queue in
 
152
                let too_many = List.length young - max_stored_events in
 
153
                let to_keep, to_drop = if too_many <= 0 then young, old
 
154
                  else
 
155
                    (* Reverse-sort by ID and preserve the first 'max_stored_events' *)
 
156
                    let lucky, unlucky = 
 
157
                      List.chop max_stored_events (List.sort (fun a b -> compare b.id a.id) young) in
 
158
                    lucky, old @ unlucky in
 
159
                queue := to_keep;
 
160
                (* Remember the highest ID of the list of events to drop *)
 
161
                if to_drop <> [] then
 
162
                highest_forgotten_id := (List.hd to_drop).id;
 
163
                (* debug "After event queue GC: keeping %d; dropping %d (highest dropped id = %Ld)" 
 
164
                  (List.length to_keep) (List.length to_drop) !highest_forgotten_id *)
 
165
        )
 
166
      end
 
167
 
 
168
 
 
169
let register_hooks () =
 
170
        Db_action_helper.events_register event_add
 
171
 
 
172
(** Return the subscription associated with a session, or create a new blank one if none
 
173
    has yet been created. *)
 
174
let get_subscription ~__context = 
 
175
        let session = Context.get_session_id __context in
 
176
        Mutex.execute event_lock
 
177
        (fun () ->
 
178
           if Hashtbl.mem subscriptions session then Hashtbl.find subscriptions session
 
179
           else 
 
180
                   let subscription = { last_id = !id; last_timestamp=(Unix.gettimeofday ()); last_generation=0L; cur_id = 0L; subs = []; m = Mutex.create(); session = session; session_invalid = false; timeout=0.0; } in
 
181
             Hashtbl.replace subscriptions session subscription;
 
182
             subscription)
 
183
 
 
184
(** Raises an exception if the provided session has not already registered for some events *)
 
185
let assert_subscribed ~__context = 
 
186
        let session = Context.get_session_id __context in
 
187
        Mutex.execute event_lock
 
188
        (fun () ->
 
189
           if not(Hashtbl.mem subscriptions session) 
 
190
           then raise (Api_errors.Server_error(Api_errors.session_not_registered, [ Context.trackid_of_session (Some session) ])))
 
191
 
 
192
(** Register an interest in events generated on objects of class <class_name> *)
 
193
let register ~__context ~classes = 
 
194
        let subs = List.map subscription_of_string (List.map String.lowercase classes) in
 
195
        let sub = get_subscription ~__context in
 
196
        Mutex.execute sub.m (fun () -> sub.subs <- subs @ sub.subs)
 
197
 
 
198
 
 
199
(** Unregister interest in events generated on objects of class <class_name> *)
 
200
let unregister ~__context ~classes = 
 
201
        let subs = List.map subscription_of_string (List.map String.lowercase classes) in
 
202
        let sub = get_subscription ~__context in
 
203
        Mutex.execute sub.m
 
204
                (fun () -> sub.subs <- List.filter (fun x -> not(List.mem x subs)) sub.subs)
 
205
 
 
206
(** Is called by the session timeout code *)
 
207
let on_session_deleted session_id = Mutex.execute event_lock 
 
208
        (fun () -> 
 
209
           (* Unregister this session if is associated with in imported DB. *)
 
210
           Db_backend.unregister_session session_id;
 
211
           if Hashtbl.mem subscriptions session_id then begin 
 
212
             let sub = Hashtbl.find subscriptions session_id in
 
213
             (* Mark the subscription as invalid and wake everyone up *)
 
214
             Mutex.execute sub.m (fun () -> sub.session_invalid <- true);
 
215
             Hashtbl.remove subscriptions session_id;
 
216
             Condition.broadcast newevents;
 
217
           end)
 
218
 
 
219
let session_is_invalid sub = Mutex.execute sub.m (fun () -> sub.session_invalid)
 
220
 
 
221
(** Blocks the caller until the current ID has changed OR the session has been 
 
222
    invalidated. *)
 
223
let wait subscription from_id = 
 
224
        let result = ref 0L in
 
225
        Mutex.execute event_lock
 
226
          (fun () ->
 
227
             (* NB we occasionally grab the specific session lock while holding the general lock *)
 
228
             while !id = from_id && not (session_is_invalid subscription) do Condition.wait newevents event_lock done;
 
229
             result := !id);
 
230
        if session_is_invalid subscription
 
231
        then raise (Api_errors.Server_error(Api_errors.session_invalid, [ Ref.string_of subscription.session ]))
 
232
        else !result
 
233
 
 
234
let wait2 subscription from_id =
 
235
        let timeoutname = Printf.sprintf "event_from_timeout_%s" (Ref.string_of subscription.session) in
 
236
  Mutex.execute event_lock
 
237
        (fun () ->
 
238
          while from_id = subscription.cur_id && not (session_is_invalid subscription) && Unix.gettimeofday () < subscription.timeout 
 
239
          do 
 
240
                  Xapi_periodic_scheduler.add_to_queue timeoutname Xapi_periodic_scheduler.OneShot (subscription.timeout -. Unix.gettimeofday () +. 0.5) (fun () -> Condition.broadcast newevents);
 
241
                  Condition.wait newevents event_lock; 
 
242
                  Xapi_periodic_scheduler.remove_from_queue timeoutname
 
243
          done;
 
244
        );
 
245
  if session_is_invalid subscription
 
246
  then raise (Api_errors.Server_error(Api_errors.session_invalid, [ Ref.string_of subscription.session ]))
 
247
  else ()
 
248
 
 
249
(** Internal function to return a list of events between a start and an end ID. 
 
250
    We assume that our 64bit counter never wraps. *)
 
251
let events_read id_start id_end =
 
252
        let check_ev ev = id_start <= ev.id && ev.id < id_end in
 
253
 
 
254
        let some_events_lost = ref false in
 
255
        let selected_events = Mutex.execute event_lock
 
256
          (fun () ->
 
257
             some_events_lost := !highest_forgotten_id >= id_start;
 
258
             List.find_all (fun ev -> check_ev ev) !queue) in
 
259
        (* Note we may actually retrieve fewer events than we expect because the
 
260
           queue may have been coalesced. *)
 
261
        if !some_events_lost (* is true *) then events_lost ();
 
262
 
 
263
        (* NB queue is kept in reverse order *)
 
264
        List.rev selected_events
 
265
 
 
266
(** Blocking call which returns the next set of events relevant to this session. *)
 
267
let rec next ~__context =
 
268
        assert_subscribed ~__context;
 
269
 
 
270
        let subscription = get_subscription ~__context in
 
271
 
 
272
        (* Return a <from_id, end_id> exclusive range that is guaranteed to be specific to this 
 
273
           thread. Concurrent calls will grab wholly disjoint ranges. Note the range might be
 
274
           empty. *)
 
275
        let grab_range () = 
 
276
                (* Briefly hold both the general and the specific mutex *)
 
277
                Mutex.execute event_lock 
 
278
                  (fun () -> Mutex.execute subscription.m
 
279
                     (fun () ->
 
280
                        let last_id = subscription.last_id in
 
281
                        (* Bump our last_id counter: these events don't have to be looked at again *)
 
282
                        subscription.last_id <- !id ;
 
283
                        last_id, !id)) in
 
284
        (* Like grab_range () only guarantees to return a non-empty range by blocking if necessary *)
 
285
        let rec grab_nonempty_range () = 
 
286
                let last_id, end_id = grab_range () in
 
287
                if last_id = end_id then begin
 
288
                        let (_: int64) = wait subscription end_id in
 
289
                        grab_nonempty_range ()
 
290
                end else last_id, end_id in
 
291
 
 
292
        let last_id, end_id = grab_nonempty_range () in
 
293
        (* debug "next examining events in range %Ld <= x < %Ld" last_id end_id; *)
 
294
        (* Are any of the new events interesting? *)
 
295
        let events = events_read last_id end_id in
 
296
        let subs = Mutex.execute subscription.m (fun () -> subscription.subs) in
 
297
        let relevant = List.filter (fun ev -> event_matches subs ev.ty) events in
 
298
        (* debug "number of relevant events = %d" (List.length relevant); *)
 
299
        if relevant = [] then next ~__context 
 
300
        else XMLRPC.To.array (List.map xmlrpc_of_event relevant)
 
301
 
 
302
let from ~__context ~classes ~token ~timeout = 
 
303
        let from, from_t = 
 
304
                try 
 
305
                        match String.split ',' token with
 
306
                                | [from;from_t] -> 
 
307
                                        (Int64.of_string from, float_of_string from_t)
 
308
                                | [""] -> (0L, 0.1)
 
309
                                | _ -> 
 
310
                                        warn "Bad format passed to Event.from: %s" token;
 
311
                                        failwith "Error"
 
312
                with _ ->
 
313
                        (0L, 0.1)
 
314
        in
 
315
 
 
316
        (* Temporarily create a subscription for the duration of this call *)
 
317
        let subs = List.map subscription_of_string (List.map String.lowercase classes) in
 
318
        let sub = get_subscription ~__context in
 
319
 
 
320
        sub.timeout <- Unix.gettimeofday () +. timeout;
 
321
 
 
322
        sub.last_timestamp <- from_t;
 
323
        sub.last_generation <- from;
 
324
 
 
325
        Mutex.execute sub.m (fun () -> sub.subs <- subs @ sub.subs);
 
326
 
 
327
        let all_event_tables =
 
328
                let objs = List.filter (fun x->x.Datamodel_types.gen_events) (Dm_api.objects_of_api Datamodel.all_api) in
 
329
                let objs = List.map (fun x->x.Datamodel_types.name) objs in
 
330
                objs
 
331
        in
 
332
 
 
333
        let all_subs = Mutex.execute sub.m (fun () -> Hashtbl.fold (fun _ s acc -> s.subs @ acc) subscriptions []) in
 
334
        let tables = List.filter (fun table -> event_matches all_subs table) all_event_tables in
 
335
 
 
336
        let events_lost = ref [] in
 
337
 
 
338
        let grab_range t =
 
339
                let tableset = Db_cache_types.Database.tableset (Db_ref.get_database t) in
 
340
                let (timestamp,messages) =
 
341
                        if event_matches all_subs "message" then (!message_get_since_for_events) ~__context sub.last_timestamp else (0.0, []) in
 
342
                (timestamp, messages, tableset, List.fold_left
 
343
                        (fun acc table ->
 
344
                                 Db_cache_types.Table.fold_over_recent sub.last_generation
 
345
                                         (fun ctime mtime dtime objref (creates,mods,deletes,last) ->
 
346
                                                  info "last_generation=%Ld cur_id=%Ld" sub.last_generation sub.cur_id;
 
347
                                                  info "ctime: %Ld mtime:%Ld dtime:%Ld objref:%s" ctime mtime dtime objref;
 
348
                                                  let last = max last (max mtime dtime) in (* mtime guaranteed to always be larger than ctime *)
 
349
                                                  if dtime > 0L then begin
 
350
                                                          if ctime > sub.last_generation then
 
351
                                                                  (creates,mods,deletes,last) (* It was created and destroyed since the last update *)
 
352
                                                          else
 
353
                                                                  (creates,mods,(table, objref, dtime)::deletes,last) (* It might have been modified, but we can't tell now *)
 
354
                                                  end else begin
 
355
                                                          ((if ctime > sub.last_generation then (table, objref, ctime)::creates else creates),
 
356
                                                           (if mtime > sub.last_generation then (table, objref, mtime)::mods else mods),
 
357
                                                           deletes, last)
 
358
                                                  end
 
359
                                         ) (fun () -> events_lost := table :: !events_lost) (Db_cache_types.TableSet.find table tableset) acc
 
360
                        ) ([],[],[],sub.last_generation) tables)
 
361
        in
 
362
 
 
363
        let rec grab_nonempty_range () =
 
364
                let (timestamp, messages, tableset, (creates,mods,deletes,last)) as result = Db_lock.with_lock (fun () -> grab_range (Db_backend.make ())) in
 
365
                if List.length creates = 0 && List.length mods = 0 && List.length deletes = 0 && List.length messages = 0 && Unix.gettimeofday () < sub.timeout
 
366
                then
 
367
                        (
 
368
                                debug "Waiting more: timeout=%f now=%f" sub.timeout (Unix.gettimeofday ());
 
369
                                
 
370
                                sub.last_generation <- last; (* Cur_id was bumped, but nothing relevent fell out of the db. Therefore the *)
 
371
                                sub.last_timestamp <- timestamp;
 
372
                                sub.cur_id <- last; (* last id the client got is equivalent to the current one *)
 
373
                                wait2 sub last;
 
374
                                Thread.delay 0.05;
 
375
                                grab_nonempty_range ())
 
376
                else
 
377
                        result
 
378
        in
 
379
 
 
380
        let (timestamp, messages, tableset, (creates,mods,deletes,last)) = grab_nonempty_range () in
 
381
 
 
382
        sub.last_generation <- last;
 
383
        sub.last_timestamp <- timestamp;
 
384
 
 
385
        let delevs = List.fold_left (fun acc (table, objref, dtime) ->
 
386
                                                                         if event_matches sub.subs table then begin
 
387
                                                                                 let ev = {id=dtime;
 
388
                                                                                                   ts=0.0;
 
389
                                                                                                   ty=String.lowercase table;
 
390
                                                                                                   op=Del;
 
391
                                                                                                   reference=objref;
 
392
                                                                                                   snapshot=None} in
 
393
                                                                                 ev::acc
 
394
                                                                         end else acc
 
395
                                                                ) [] deletes in
 
396
 
 
397
        let modevs = List.fold_left (fun acc (table, objref, mtime) ->
 
398
                                                                         if event_matches sub.subs table then begin
 
399
                                                                                 let serialiser = Eventgen.find_get_record table in
 
400
                                                                                 let xml = serialiser ~__context ~self:objref () in
 
401
                                                                                 let ev = { id=mtime;
 
402
                                                                                                        ts=0.0;
 
403
                                                                                                        ty=String.lowercase table;
 
404
                                                                                                        op=Mod;
 
405
                                                                                                        reference=objref;
 
406
                                                                                                        snapshot=xml } in
 
407
                                                                                 ev::acc
 
408
                                                                         end else acc
 
409
                                                                ) delevs mods in
 
410
 
 
411
        let createevs = List.fold_left (fun acc (table, objref, ctime) ->
 
412
                                                                                if event_matches sub.subs table then begin
 
413
                                                                                        let serialiser = Eventgen.find_get_record table in
 
414
                                                                                        let xml = serialiser ~__context ~self:objref () in
 
415
                                                                                        let ev = { id=ctime;
 
416
                                                                                                           ts=0.0;
 
417
                                                                                                           ty=String.lowercase table;
 
418
                                                                                                           op=Add;
 
419
                                                                                                           reference=objref;
 
420
                                                                                                           snapshot=xml } in
 
421
                                                                                        ev::acc
 
422
                                                                                end else acc
 
423
                                                                   ) modevs creates in
 
424
        
 
425
        let message_events = List.fold_left (fun acc (_ref,message) ->
 
426
                                                                                         let objref = Ref.string_of _ref in
 
427
                                                                                         let xml = API.To.message_t message in
 
428
                                                                                         let ev = { id=0L;
 
429
                                                                                                                ts=0.0;
 
430
                                                                                                                ty="message";
 
431
                                                                                                                op=Add;
 
432
                                                                                                                reference=objref;
 
433
                                                                                                                snapshot=Some xml } in
 
434
                                                                                         ev::acc) createevs messages in
 
435
 
 
436
        let valid_ref_counts =
 
437
        XMLRPC.To.structure
 
438
            (Db_cache_types.TableSet.fold
 
439
                (fun tablename _ _ table acc ->
 
440
                    (String.lowercase tablename, XMLRPC.To.int
 
441
                        (Db_cache_types.Table.fold
 
442
                            (fun r _ _ _ acc -> Int32.add 1l acc) table 0l))::acc)
 
443
                tableset [])
 
444
        in
 
445
 
 
446
        let session = Context.get_session_id __context in
 
447
 
 
448
        on_session_deleted session;
 
449
 
 
450
        XMLRPC.To.structure [("events",XMLRPC.To.array (List.map xmlrpc_of_event message_events)); 
 
451
                                                 ("valid_ref_counts",valid_ref_counts); 
 
452
                                                 ("token",XMLRPC.To.string (Printf.sprintf "%Ld,%f" last timestamp))
 
453
                                                ]
 
454
 
 
455
let get_current_id ~__context = Mutex.execute event_lock (fun () -> !id)
 
456
 
 
457
(** Inject an unnecessary update as a heartbeat. This will:
 
458
    1. hopefully prevent some firewalls from silently closing the connection
 
459
    2. allow the server to detect when a client has failed *)
 
460
let heartbeat ~__context =
 
461
  try
 
462
    Db_lock.with_lock 
 
463
      (fun () ->
 
464
                   (* We must hold the database lock since we are sending an update for a real object
 
465
                          and we don't want to accidentally transmit an older snapshot. *)
 
466
                   let pool = try Some (Helpers.get_pool ~__context) with _ -> None in
 
467
                   match pool with
 
468
                   | Some pool ->
 
469
                                 let pool_r = Db.Pool.get_record ~__context ~self:pool in
 
470
                                 let pool_xml = API.To.pool_t pool_r in
 
471
                                 event_add ~snapshot:pool_xml "pool" "mod" (Ref.string_of pool)
 
472
                   | None -> () (* no pool object created during initial boot *)
 
473
      )
 
474
  with e ->
 
475
    error "Caught exception sending event heartbeat: %s" (ExnHelper.string_of_exn e)