~ubuntu-branches/ubuntu/trusty/ocamlnet/trusty

« back to all changes in this revision

Viewing changes to src/equeue/unixqueue_pollset.ml

  • Committer: Bazaar Package Importer
  • Author(s): Stéphane Glondu
  • Date: 2011-09-02 14:12:33 UTC
  • mfrom: (18.2.3 sid)
  • Revision ID: james.westby@ubuntu.com-20110902141233-zbj0ygxb92u6gy4z
Tags: 3.4-1
* New upstream release
  - add a new NetcgiRequire directive to ease dependency management
    (Closes: #637147)
  - remove patches that were applied upstream:
    + Added-missing-shebang-lines-in-example-shell-scripts
    + Try-also-ocamlc-for-POSIX-threads

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
(* $Id: unixqueue_pollset.ml 1624 2011-06-13 12:27:58Z gerd $ *)
 
2
 
 
3
(* pset # dispose: we try to call it when there are no more operations in
 
4
   the queue. This is tested when
 
5
    - the resource is removed (and we are not waiting)
 
6
    - the group is cleared (and we are not waiting)
 
7
    - after every wait
 
8
   Note that we must not call [dispose] while [wait] is running!
 
9
 *)
 
10
 
 
11
 
 
12
open Unixqueue_util
 
13
open Printf
 
14
 
 
15
module Float = struct
 
16
  type t = float
 
17
(*  let compare = ( Pervasives.compare : float -> float -> int ) *)
 
18
  let compare (x:float) y =
 
19
    if x < y then (-1) else if x = y then 0 else 1
 
20
      (* does not work for non-normal numbers but we don't care *)
 
21
end
 
22
 
 
23
 
 
24
module FloatMap = Map.Make(Float)
 
25
 
 
26
 
 
27
let nogroup_id = Oo.id nogroup
 
28
 
 
29
let min_key m = 
 
30
  (* In ocaml-3.12 there is already a function for this in Map *)
 
31
  let k = ref None in
 
32
  ( try
 
33
      FloatMap.iter
 
34
        (fun t _ -> k := Some t; raise Exit)
 
35
        m
 
36
    with Exit -> ()
 
37
  );
 
38
  match !k with
 
39
    | Some min -> min
 
40
    | None -> raise Not_found
 
41
 
 
42
 
 
43
let ops_until tmax m =
 
44
  (* Look into the FloatMap m, and return all ops for which
 
45
     t <= tmax
 
46
   *)
 
47
  let l = ref [] in
 
48
  ( try
 
49
      FloatMap.iter
 
50
        (fun t ops ->
 
51
           if t > tmax then raise Exit;
 
52
           l := (t,ops) :: !l
 
53
        )
 
54
        m
 
55
    with
 
56
      | Exit -> ()
 
57
  );
 
58
  List.rev !l
 
59
 
 
60
 
 
61
exception Term of Unixqueue_util.group
 
62
  (* Extra (Term g) is now the group termination event for g *)
 
63
 
 
64
exception Keep_alive
 
65
  (* Sometimes used to keep the event system alive *)
 
66
 
 
67
exception Exit_loop
 
68
 
 
69
 
 
70
let () =
 
71
  Netexn.register_printer
 
72
    (Term nogroup)
 
73
    (function
 
74
       | Term g ->
 
75
           if g = nogroup then
 
76
             "Term(nogroup)"
 
77
           else
 
78
             "Term(" ^ string_of_int (Oo.id g) ^ ")"
 
79
       | _ -> assert false
 
80
    )
 
81
 
 
82
 
 
83
let pset_set (pset:Netsys_pollset.pollset) fd (i,o,p) =
 
84
  if not i && not o && not p then
 
85
    pset # remove fd
 
86
  else
 
87
    pset # add fd (Netsys_posix.poll_req_events i o p)
 
88
 
 
89
 
 
90
let pset_find (pset:Netsys_pollset.pollset) fd =
 
91
  try Netsys_posix.poll_req_triple(pset#find fd)
 
92
  with
 
93
    | Not_found -> (false,false,false)
 
94
 
 
95
 
 
96
 
 
97
let op_of_event ev =
 
98
  match ev with
 
99
    | Unixqueue_util.Input_arrived(_,fd)    -> Unixqueue_util.Wait_in fd
 
100
    | Unixqueue_util.Output_readiness(_,fd) -> Unixqueue_util.Wait_out fd
 
101
    | Unixqueue_util.Out_of_band(_,fd)      -> Unixqueue_util.Wait_oob fd
 
102
    | Unixqueue_util.Timeout(_,op)          -> op
 
103
    | _ -> assert false
 
104
 
 
105
let rec list_mem_op op l =
 
106
  match l with
 
107
    | h :: t ->
 
108
        is_op_eq h op || list_mem_op op t
 
109
    | [] ->
 
110
        false
 
111
 
 
112
let while_locked mutex f =
 
113
  Netsys_oothr.serialize mutex f ()
 
114
 
 
115
 
 
116
let escape_lock mutex f =
 
117
  mutex # unlock();
 
118
  let r = 
 
119
    try f ()
 
120
    with e -> mutex # lock(); raise e in
 
121
  mutex # lock();
 
122
  r
 
123
 
 
124
 
 
125
let flatten_map f l =
 
126
  (* = List.flatten (List.map f l) *)
 
127
  let rec loop l h =
 
128
    match l with
 
129
      | [] -> List.rev h
 
130
      | x :: l' ->
 
131
          let y = f x in
 
132
          loop l' (List.rev_append y h) in
 
133
  loop l []
 
134
 
 
135
 
 
136
(* A little encapsulation so we can easily identify handlers by Oo.id *)
 
137
class ohandler (h:handler) = 
 
138
object
 
139
  method run esys eq ev =
 
140
    h esys eq ev
 
141
end
 
142
 
 
143
 
 
144
class pollset_event_system (pset : Netsys_pollset.pollset) =
 
145
  let mtp = !Netsys_oothr.provider in
 
146
  let is_mt = not (mtp#single_threaded) in
 
147
  let sys = ref (lazy (assert false)) in   (* initialized below *)
 
148
  let ops_of_group = 
 
149
    (Hashtbl.create 10 : (group, OpSet.t) Hashtbl.t) in
 
150
        (* is for [clear] only *)
 
151
  let tmo_of_op =
 
152
    (OpTbl.create 10 : (float * float * group * bool) OpTbl.t) in
 
153
      (* first number: duration of timeout (or -1)
 
154
         second number: point in time (or -1)
 
155
         bool: whether strong
 
156
       *)
 
157
  let ops_of_tmo = ref(FloatMap.empty : OpSet.t FloatMap.t) in
 
158
  let strong_ops = ref 0 in (* number of strong (non-weak) ops *)
 
159
 
 
160
  let aborting = ref false in
 
161
  let close_tab = 
 
162
    (Hashtbl.create 10 :
 
163
       (Unix.file_descr, (group * (Unix.file_descr -> unit))) Hashtbl.t) in
 
164
  let abort_tab = 
 
165
     (Hashtbl.create 10 : (group, (group -> exn -> unit)) Hashtbl.t) in
 
166
  let handlers = (Hashtbl.create 10 : (group, ohandler list) Hashtbl.t) in
 
167
  let handled_groups = ref 0 in
 
168
            (* the number of keys in [handlers] *)
 
169
 
 
170
  let waiting = ref false in
 
171
  let when_blocking = ref (fun () -> ()) in
 
172
 
 
173
  let mutex = mtp # create_mutex() in
 
174
 
 
175
 
 
176
  let event_of_op_wl_nf op =
 
177
      let (_,_,g,_) = OpTbl.find tmo_of_op op in (* or Not_found *)
 
178
      match op with
 
179
        | Unixqueue_util.Wait_in fd ->
 
180
            Unixqueue_util.Input_arrived(g,fd)
 
181
        | Unixqueue_util.Wait_out fd ->
 
182
            Unixqueue_util.Output_readiness(g,fd)
 
183
        | Unixqueue_util.Wait_oob fd ->
 
184
            Unixqueue_util.Out_of_band(g,fd)
 
185
        | Unixqueue_util.Wait _ ->
 
186
            assert false in
 
187
 
 
188
  let events_of_op_wl op =
 
189
    try 
 
190
      [ event_of_op_wl_nf op ]
 
191
    with
 
192
      | Not_found ->
 
193
          (* A "ghost event", i.e. there is no handler anymore
 
194
             for it, but wasn't deleted quickly enough from 
 
195
             pset
 
196
           *)
 
197
          [] in
 
198
 
 
199
  let tmo_event_of_op_wl_nf op =
 
200
    let (_,_,g,_) = OpTbl.find tmo_of_op op in (* or Not_found *)
 
201
    Unixqueue_util.Timeout(g,op) in
 
202
 
 
203
  let tmo_events_of_op_wl op =
 
204
    try 
 
205
      [ tmo_event_of_op_wl_nf op ]
 
206
    with
 
207
      | Not_found -> [] (* Ghost event, see above *) in
 
208
            
 
209
 
 
210
  let add_event_wl e =
 
211
    Equeue.add_event (Lazy.force !sys) e;
 
212
    pset # cancel_wait true
 
213
      (* Set the cancel bit, so that any pending [wait] is interrupted *) in
 
214
 
 
215
 
 
216
object(self)
 
217
 
 
218
  initializer (
 
219
    let equeue_sys = Equeue.create ~string_of_event self#source in
 
220
    sys := lazy equeue_sys;
 
221
    ignore(Lazy.force !sys);
 
222
    (* Add ourselves now at object creation time. The only drawback is that
 
223
       we no longer raise [Equeue.Out_of_handlers] - but this is questionable
 
224
       anyway since the addition of [Immediate] events.
 
225
 
 
226
       In order to generate [Out_of_handlers] we would have to count
 
227
       [Immediate] events in addition to handlers.
 
228
     *)
 
229
    self#equeue_add_handler ()
 
230
  )
 
231
 
 
232
 
 
233
  method private source  _sys =
 
234
    (* locking: the lock is not held when called, because we are called back
 
235
       from Equeue
 
236
     *)
 
237
    assert(Lazy.force !sys == _sys);
 
238
 
 
239
    !when_blocking();
 
240
 
 
241
    let dbg = !Unixqueue_util.Debug.enable in
 
242
 
 
243
    let locked = ref true in   (* keep track of locking state *)
 
244
    if is_mt then
 
245
      mutex # lock();
 
246
    
 
247
    let t0 = Unix.gettimeofday() in
 
248
    let tmin = try min_key !ops_of_tmo with Not_found -> (-1.0) in
 
249
    let delta = if tmin < 0.0 then (-1.0) else max (tmin -. t0) 0.0 in
 
250
 
 
251
    if dbg then
 
252
      dlogr (fun () ->
 
253
               sprintf "t0 = %f,   #tmo_of_op = %d" 
 
254
                 t0 (OpTbl.length tmo_of_op)
 
255
            );
 
256
    
 
257
    let nothing_to_do =
 
258
      (* For this test only non-weak resources count, so... *)
 
259
      !strong_ops = 0 in
 
260
 
 
261
    let have_eintr = ref false in
 
262
 
 
263
    let pset_events = 
 
264
      try
 
265
        if nothing_to_do then (
 
266
          if dbg then
 
267
            dlogr (fun () -> "nothing_to_do");
 
268
          []
 
269
        )
 
270
        else (
 
271
          if dbg then
 
272
            dlogr (fun () -> (
 
273
                     let ops = 
 
274
                       OpTbl.fold (fun op _ l -> op::l) tmo_of_op [] in
 
275
                     let op_str = 
 
276
                       String.concat ";" (List.map string_of_op ops) in
 
277
                     sprintf "wait tmo=%f ops=<%s>" delta op_str));
 
278
          (* Reset the cancel bit immediately before calling [wait]. Any
 
279
             event added up to now is considered anyway by [wait] because
 
280
             our lock is still held. Any new event added after we unlock will
 
281
             set the cancel_wait flag, and cause the [wait] to exit (if it is
 
282
             still running).
 
283
           *)
 
284
          pset # cancel_wait false;
 
285
          waiting := true;
 
286
          if is_mt then
 
287
            mutex # unlock();
 
288
          locked := false;
 
289
          pset # wait delta
 
290
        )
 
291
      with
 
292
        | Unix.Unix_error(Unix.EINTR,_,_) ->
 
293
            if dbg then
 
294
              dlogr (fun () -> "wait signals EINTR");
 
295
            have_eintr := true;
 
296
            []
 
297
        | e ->   
 
298
            (* Usually from [wait], but one never knows... *)
 
299
            if !locked && is_mt then mutex#unlock();
 
300
            waiting := false;
 
301
            raise e
 
302
    in
 
303
 
 
304
    waiting := false;
 
305
    if not !locked && is_mt then mutex # lock();
 
306
    locked := true;
 
307
    try  
 
308
      (* Catch exceptions and unlock *)
 
309
 
 
310
      if dbg then
 
311
        dlogr (fun () -> (
 
312
                 sprintf "wait returns <%d pset events>" 
 
313
                   (List.length pset_events)));
 
314
      
 
315
      let t1 = Unix.gettimeofday() in
 
316
      if dbg then
 
317
        dlogr (fun () -> (sprintf "t1 = %f" t1));
 
318
      (* t1 is the reference for determining the timeouts *)
 
319
 
 
320
      (* while waiting somebody might have removed resouces, so ... *)
 
321
      if OpTbl.length tmo_of_op = 0 then
 
322
        pset # dispose();
 
323
 
 
324
      let operations =  (* flatten_map *)
 
325
        (* The possible operations *)
 
326
        List.fold_left
 
327
          (fun acc (fd,ev_in,ev_out) ->
 
328
             (* Note that POLLHUP and POLLERR can also mean that we
 
329
                have data to read/write!
 
330
              *)
 
331
             let (in_rd,in_wr,in_pri) = 
 
332
               Netsys_posix.poll_req_triple ev_in in
 
333
             let out_rd = 
 
334
               Netsys_posix.poll_rd_result ev_out in 
 
335
             let out_wr = 
 
336
               Netsys_posix.poll_wr_result ev_out in 
 
337
             let out_pri = 
 
338
               Netsys_posix.poll_pri_result ev_out in 
 
339
             let out_hup = 
 
340
               Netsys_posix.poll_hup_result ev_out in 
 
341
             let out_err = 
 
342
               Netsys_posix.poll_err_result ev_out in 
 
343
             let have_input  = 
 
344
               in_rd && (out_rd || out_hup || out_err) in
 
345
             let have_output =
 
346
               in_wr && (out_wr || out_hup || out_err) in
 
347
             let have_pri =
 
348
               in_pri && (out_pri || out_hup || out_err) in
 
349
             let e1 = 
 
350
               if have_pri then Unixqueue_util.Wait_oob fd :: acc else acc in
 
351
             let e2 = 
 
352
               if have_input then Unixqueue_util.Wait_in fd :: e1 else e1 in
 
353
             let e3 = 
 
354
               if have_output then Unixqueue_util.Wait_out fd :: e2 else e2 in
 
355
             e3
 
356
          )
 
357
          []
 
358
          pset_events in
 
359
 
 
360
      let ops_timed_out =
 
361
        ops_until t1 !ops_of_tmo in
 
362
      (* Note: this _must_ include operations until <= t1 (not <t1), otherwise
 
363
         a timeout value of 0.0 won't work
 
364
       *)
 
365
 
 
366
      let ops_timed_out_l =
 
367
        (* Determine the operations in [tmo_of_op] that have timed
 
368
           out and that are not in [operations]
 
369
           FIXME: [List.mem op operations] is not scalable.
 
370
         *)
 
371
        List.fold_left
 
372
          (fun oacc (_, ops) ->
 
373
             OpSet.fold
 
374
               (fun op iacc ->
 
375
                  if list_mem_op op operations then iacc else op::iacc) 
 
376
               ops 
 
377
               oacc
 
378
          )
 
379
          []
 
380
          ops_timed_out in
 
381
 
 
382
      if dbg then (
 
383
        dlogr
 
384
          (fun() -> 
 
385
             sprintf "delivering events <%s>"
 
386
               (String.concat ";" 
 
387
                  (flatten_map
 
388
                     (fun op ->
 
389
                        List.map string_of_event (events_of_op_wl op)
 
390
                     )
 
391
                     operations
 
392
                  )));
 
393
        dlogr
 
394
          (fun() -> 
 
395
             sprintf "delivering timeouts <%s>"
 
396
               (String.concat ";" 
 
397
                  (flatten_map
 
398
                     (fun op ->
 
399
                        List.map string_of_event (tmo_events_of_op_wl op)
 
400
                     )
 
401
                     ops_timed_out_l
 
402
                  )));
 
403
      );
 
404
      
 
405
      (* deliver events *)
 
406
      let delivered = ref false in
 
407
      let deliver get_ev op =
 
408
        try 
 
409
          let ev = get_ev op in
 
410
          delivered := true;
 
411
          Equeue.add_event _sys ev
 
412
        with Not_found -> () in
 
413
      List.iter (deliver event_of_op_wl_nf)     operations;
 
414
      List.iter (deliver tmo_event_of_op_wl_nf) ops_timed_out_l;
 
415
 
 
416
      if !have_eintr then (
 
417
        dlogr (fun () -> "delivering Signal");
 
418
        Equeue.add_event _sys Unixqueue_util.Signal
 
419
      ) else
 
420
        if not !delivered && not nothing_to_do then (
 
421
          (* Ensure we always add an event to keep the event loop running: *)
 
422
          if dbg then
 
423
            dlogr (fun () -> "delivering Keep_alive");
 
424
          Equeue.add_event _sys (Unixqueue_util.Extra Keep_alive)
 
425
        );
 
426
    
 
427
      (* Update ops_of_tmo: *)
 
428
      List.iter
 
429
        (fun (t,_) ->
 
430
           ops_of_tmo := FloatMap.remove t !ops_of_tmo
 
431
        )
 
432
        ops_timed_out;
 
433
 
 
434
      (* Set a new timeout for all delivered events:
 
435
         (Note that [pset] remains unchanged, because the set of watched
 
436
         resources remains unchanged.)
 
437
         rm_done is true when the old timeout is already removed from
 
438
         ops_of_tmo.
 
439
       *)
 
440
      let update_tmo rm_done oplist =
 
441
        List.iter
 
442
          (fun op ->
 
443
             try
 
444
               let (tmo,t1,g,is_strong) = 
 
445
                 OpTbl.find tmo_of_op op in (* or Not_found *)
 
446
               if tmo >= 0.0 then (
 
447
                 let t2 = t1 +. tmo in
 
448
                 self#sched_upd_tmo_wl g op tmo t2 is_strong 
 
449
                   (if rm_done then (-1.0) else t1)
 
450
               )
 
451
             with
 
452
               | Not_found -> ()
 
453
                   (* It is possible that resources were removed while
 
454
                      we were waiting for events. This can lead to
 
455
                      [Not_found] here. We just ignore this.
 
456
                    *)
 
457
          )
 
458
          oplist in
 
459
      update_tmo false operations;
 
460
      update_tmo true  ops_timed_out_l;
 
461
 
 
462
      if is_mt then mutex # unlock();
 
463
      locked := false
 
464
 
 
465
    with
 
466
      | e ->
 
467
          (* exceptions are unexpected, but we want to make sure not to mess
 
468
             with locks
 
469
           *)
 
470
          if !locked && is_mt then mutex # unlock();
 
471
          raise e
 
472
 
 
473
 
 
474
  (* Note: suffix _wl = while locked *)
 
475
 
 
476
  method private sched_remove_wl op =
 
477
    try
 
478
      let tmo, t1, g, is_strong = OpTbl.find tmo_of_op op in  (* or Not_found *)
 
479
      dlogr(fun () -> (sprintf "sched_remove %s" (string_of_op op)));
 
480
      OpTbl.remove tmo_of_op op;
 
481
      if is_strong then decr strong_ops;
 
482
      ( try
 
483
          let l_ops =
 
484
            if tmo >= 0.0 then
 
485
              FloatMap.find t1 !ops_of_tmo 
 
486
            else raise Not_found in
 
487
          let l_ops' =
 
488
            OpSet.remove op l_ops in
 
489
          if l_ops' = OpSet.empty then
 
490
            ops_of_tmo := FloatMap.remove t1 !ops_of_tmo
 
491
          else
 
492
            ops_of_tmo := FloatMap.add t1 l_ops' !ops_of_tmo
 
493
        with Not_found -> ()
 
494
      );
 
495
      if Oo.id g <> nogroup_id then (
 
496
        let old_set = Hashtbl.find ops_of_group g in
 
497
        let new_set = OpSet.remove op old_set in
 
498
        if new_set = OpSet.empty then
 
499
          Hashtbl.remove ops_of_group g
 
500
        else
 
501
          Hashtbl.replace ops_of_group g new_set
 
502
      )
 
503
 
 
504
    with
 
505
      | Not_found -> ()
 
506
 
 
507
 
 
508
  method private pset_remove_wl op =
 
509
    match op with
 
510
      | Wait_in fd ->
 
511
          let (i,o,p) = pset_find pset fd in
 
512
          pset_set pset fd (false,o,p)
 
513
      | Wait_out fd ->
 
514
          let (i,o,p) = pset_find pset fd in
 
515
          pset_set pset fd (i,false,p)
 
516
      | Wait_oob fd ->
 
517
          let (i,o,p) = pset_find pset fd in
 
518
          pset_set pset fd (i,o,false)
 
519
      | Wait _ ->
 
520
          ()
 
521
            
 
522
 
 
523
  method private sched_add_wl g op tmo t1 is_strong =
 
524
    dlogr(fun () -> (sprintf "sched_add %s tmo=%f t1=%f is_strong=%b"
 
525
                       (string_of_op op) tmo t1 is_strong));
 
526
    OpTbl.replace tmo_of_op op (tmo, t1, g, is_strong);
 
527
    if is_strong then
 
528
      incr strong_ops;
 
529
    let l_ops =
 
530
      try FloatMap.find t1 !ops_of_tmo with Not_found -> OpSet.empty in
 
531
    if tmo >= 0.0 then
 
532
      ops_of_tmo := FloatMap.add t1 (OpSet.add op l_ops) !ops_of_tmo;
 
533
    if Oo.id g <> nogroup_id then (
 
534
      let old_set =
 
535
        try Hashtbl.find ops_of_group g with Not_found -> OpSet.empty in
 
536
      let new_set =
 
537
        OpSet.add op old_set in
 
538
      Hashtbl.replace ops_of_group g new_set
 
539
    )
 
540
 
 
541
  method private pset_add_wl op =
 
542
    match op with
 
543
      | Wait_in fd ->
 
544
          let (i,o,p) = pset_find pset fd in
 
545
          pset_set pset fd (true,o,p)
 
546
      | Wait_out fd ->
 
547
          let (i,o,p) = pset_find pset fd in
 
548
          pset_set pset fd (i,true,p)
 
549
      | Wait_oob fd ->
 
550
          let (i,o,p) = pset_find pset fd in
 
551
          pset_set pset fd (i,o,true)
 
552
      | Wait _ ->
 
553
          ()
 
554
 
 
555
  method private sched_upd_tmo_wl g op tmo t1 is_strong old_t1 =
 
556
    (* only for tmo>=0 *)
 
557
    dlogr(fun () -> (sprintf "sched_upd_tmo %s tmo=%f t1=%f is_strong=%b"
 
558
                       (string_of_op op) tmo t1 is_strong));
 
559
    OpTbl.replace tmo_of_op op (tmo, t1, g, is_strong);
 
560
 
 
561
    (* We assume old_t1 is already removed form ops_of_tmo if old_t1 < 0 *)
 
562
    if old_t1 >= 0.0 then (
 
563
      try
 
564
        let l_ops =
 
565
          FloatMap.find old_t1 !ops_of_tmo in
 
566
        let l_ops' =
 
567
          OpSet.remove op l_ops in
 
568
        if l_ops' = OpSet.empty then
 
569
          ops_of_tmo := FloatMap.remove old_t1 !ops_of_tmo
 
570
        else
 
571
          ops_of_tmo := FloatMap.add old_t1 l_ops' !ops_of_tmo
 
572
      with Not_found -> ()
 
573
    );
 
574
 
 
575
    let l_ops_new =
 
576
      try FloatMap.find t1 !ops_of_tmo with Not_found -> OpSet.empty in
 
577
    ops_of_tmo := FloatMap.add t1 (OpSet.add op l_ops_new) !ops_of_tmo
 
578
 
 
579
  method exists_resource op =
 
580
    while_locked mutex
 
581
      (fun () -> self # exists_resource_wl op)
 
582
 
 
583
  method private exists_resource_wl op =
 
584
    OpTbl.mem tmo_of_op op
 
585
 
 
586
 
 
587
  method private exists_descriptor_wl fd =
 
588
    self#exists_resource_wl (Unixqueue_util.Wait_in fd) ||
 
589
    self#exists_resource_wl (Unixqueue_util.Wait_out fd) ||
 
590
    self#exists_resource_wl (Unixqueue_util.Wait_oob fd)
 
591
 
 
592
 
 
593
  method add_resource g (op, tmo) =
 
594
    while_locked mutex
 
595
      (fun () ->
 
596
         self # add_resource_wl g (op, tmo) true
 
597
      )
 
598
 
 
599
  method add_weak_resource g (op, tmo) =
 
600
    while_locked mutex
 
601
      (fun () ->
 
602
         self # add_resource_wl g (op, tmo) false
 
603
      )
 
604
 
 
605
 
 
606
  method private add_resource_wl g (op, tmo) is_strong =
 
607
    if g # is_terminating then
 
608
      invalid_arg "Unixqueue.add_resource: the group is terminated";
 
609
    if not (OpTbl.mem tmo_of_op op) then (
 
610
      self#pset_add_wl op;
 
611
      let t1 = if tmo < 0.0 then tmo else Unix.gettimeofday() +. tmo in
 
612
      self#sched_add_wl g op tmo t1 is_strong;
 
613
      (* Multi-threading: interrupt [wait] *)
 
614
      pset # cancel_wait true;
 
615
      (* CHECK: In the select-based impl we add Keep_alive to equeue.
 
616
              This idea (probably): If [wait] is about to return, and the
 
617
              queue becomes empty, the whole esys terminates. The Keep_alive
 
618
              prevents that. 
 
619
              My current thinking is that this delays the race condition
 
620
              a bit, but does not prevent it from happening
 
621
       *)
 
622
    )
 
623
      (* Note: The second addition of a resource is silently ignored...
 
624
              Maybe this should be fixed, so that the timeout can be lowered
 
625
       *)
 
626
 
 
627
 
 
628
  method remove_resource g op =
 
629
    while_locked mutex
 
630
      (fun () ->
 
631
         if g # is_terminating then
 
632
           invalid_arg "remove_resource: the group is terminated";
 
633
         let _, t1, g_found, _ = OpTbl.find tmo_of_op op in
 
634
         if Oo.id g <> Oo.id g_found then
 
635
           failwith "remove_resource: descriptor belongs to different group";
 
636
         self#sched_remove_wl op;
 
637
         self#pset_remove_wl op;
 
638
 
 
639
         if not !waiting && OpTbl.length tmo_of_op = 0 then
 
640
           pset # dispose();
 
641
 
 
642
         pset # cancel_wait true;    (* interrupt [wait] *)
 
643
         (* is there a close action ? *)
 
644
         let fd_opt =
 
645
           match op with
 
646
             | Wait_in  d -> Some d
 
647
             | Wait_out d -> Some d
 
648
             | Wait_oob d -> Some d
 
649
             | Wait _      -> None in
 
650
         match fd_opt with
 
651
           | Some fd ->
 
652
               if not !aborting then (
 
653
                 let action = 
 
654
                   try Some (snd(Hashtbl.find close_tab fd)) 
 
655
                   with Not_found -> None in
 
656
                 match action with
 
657
                   | Some a ->
 
658
                       (* any open resource? *)
 
659
                       (* FIXME MT: We don't know yet whether fd can be closed.
 
660
                          This shouldn't be done before [wait] returns.
 
661
                        *)
 
662
                       if not (self#exists_descriptor_wl fd) then (
 
663
                         dlogr 
 
664
                           (fun () -> 
 
665
                              (sprintf "remove_resource \
 
666
                                        <running close action for fd %s>"
 
667
                                 (string_of_fd fd)));
 
668
                         Hashtbl.remove close_tab fd;
 
669
                         escape_lock mutex (fun () -> a fd);
 
670
                       )
 
671
                   | None ->
 
672
                       ()
 
673
               )
 
674
           | None -> ()
 
675
      )
 
676
 
 
677
 
 
678
  method new_group () =
 
679
    new group_object
 
680
 
 
681
  method new_wait_id () =
 
682
    new wait_object
 
683
 
 
684
 
 
685
  method add_close_action g (d,a) =
 
686
    while_locked mutex
 
687
      (fun () ->
 
688
         if g # is_terminating then
 
689
           invalid_arg "add_close_action: the group is terminated";
 
690
         (* CHECK: Maybe we should fail if g is a different group than
 
691
          * the existing group
 
692
          *)
 
693
         if self#exists_descriptor_wl d then
 
694
           Hashtbl.replace close_tab d (g,a)
 
695
             (* There can be only one close action.
 
696
              * TODO: Rename to set_close_action
 
697
              *)
 
698
         else
 
699
           failwith "add_close_action"
 
700
      )
 
701
 
 
702
 
 
703
  method add_abort_action g a =
 
704
    while_locked mutex
 
705
      (fun () ->
 
706
         if g # is_terminating then
 
707
           invalid_arg "add_abort_action: the group is terminated";
 
708
         if Oo.id g = nogroup_id then
 
709
           invalid_arg "add_abort_action: the group is nogroup";
 
710
         Hashtbl.replace abort_tab g a
 
711
      )
 
712
 
 
713
  method add_event e =
 
714
    while_locked mutex
 
715
      (fun () -> add_event_wl e)
 
716
 
 
717
 
 
718
  method private uq_handler (esys : event Equeue.t) ev =
 
719
    (* locking: it is assumed that we do not have the lock when uq_handler
 
720
       is called. It is a callback from Equeue
 
721
     *)
 
722
    (* The single Unixqueue handler. For all added (sub) handlers, uq_handler
 
723
     * gets the events, and delivers them
 
724
     *)
 
725
 
 
726
    let terminate_handler_wl g h =
 
727
      if !Unixqueue_util.Debug.enable then
 
728
        dlogr
 
729
          (fun() -> 
 
730
             (sprintf "uq_handler <terminating handler group %d, handler %d>"
 
731
                (Oo.id g) (Oo.id h)));
 
732
      let hlist =
 
733
        try Hashtbl.find handlers g with Not_found -> [] in
 
734
      let hlist' =
 
735
        List.filter (fun h' -> Oo.id h' <> Oo.id h) hlist in
 
736
      if hlist' = [] then (
 
737
        Hashtbl.remove handlers g;
 
738
        handled_groups := !handled_groups - 1;
 
739
(*
 
740
        if handled_groups = 0 then (
 
741
          dlogr (fun () -> "uq_handler <self-terminating>");
 
742
          raise Equeue.Terminate  (* delete uq_handler from esys *)
 
743
        )
 
744
 *)
 
745
      ) else (
 
746
        Hashtbl.replace handlers g hlist'
 
747
      )
 
748
    in
 
749
 
 
750
    let rec forward_event_to g (hlist : ohandler list) =
 
751
      (* The caller does not have the lock when this fn is called! *)
 
752
      match hlist with
 
753
          [] ->
 
754
            if !Unixqueue_util.Debug.enable then
 
755
              dlogr (fun () -> "uq_handler <empty list>");
 
756
            raise Equeue.Reject
 
757
        | h :: hlist' ->
 
758
            ( try
 
759
                (* Note: ues _must not_ be locked now *)
 
760
                if !Unixqueue_util.Debug.enable then
 
761
                  dlogr
 
762
                    (fun () -> 
 
763
                       (sprintf 
 
764
                          "uq_handler <invoke handler group %d, handler %d>"
 
765
                          (Oo.id g) (Oo.id h)));
 
766
                h#run (self :> event_system) esys ev;
 
767
                if !Unixqueue_util.Debug.enable then
 
768
                  dlogr
 
769
                    (fun () -> 
 
770
                       (sprintf 
 
771
                          "uq_handler <invoke_success handler group %d, handler %d>"
 
772
                          (Oo.id g) (Oo.id h)));
 
773
              with
 
774
                  Equeue.Reject ->
 
775
                    forward_event_to g hlist'
 
776
                | Equeue.Terminate ->
 
777
                    (* Terminate only this handler. *)
 
778
                    while_locked mutex
 
779
                      (fun () -> terminate_handler_wl g h)
 
780
                    (* Any error exceptions simply fall through. Equeue will
 
781
                     * catch them, and will add the event to the error queue
 
782
                     *)
 
783
            )
 
784
    in
 
785
 
 
786
    let forward_event g =
 
787
      (* The caller does not have the lock when this fn is called! *)
 
788
      if !Unixqueue_util.Debug.enable then
 
789
        dlogr
 
790
          (fun () ->
 
791
             (sprintf "uq_handler <forward_event group %d>" (Oo.id g)));
 
792
      let hlist =
 
793
        while_locked mutex
 
794
          (fun () -> 
 
795
             try Hashtbl.find handlers g with Not_found -> []) in
 
796
      forward_event_to g hlist
 
797
    in
 
798
 
 
799
    let forward_event_to_all() =
 
800
      (* The caller does not have the lock when this fn is called! *)
 
801
      let hlist_all =
 
802
        while_locked mutex
 
803
          (fun () -> 
 
804
             Hashtbl.fold (fun g hlist l -> (g,hlist) :: l) handlers []) in
 
805
      try
 
806
        List.iter
 
807
          (fun (g,hlist) ->
 
808
             try
 
809
               forward_event_to g hlist;
 
810
               raise Exit_loop   (* event is delivered, so exit iteration *)
 
811
             with
 
812
                 (* event is rejected: try next group *)
 
813
                 Equeue.Reject -> ()
 
814
          )
 
815
          hlist_all;
 
816
        raise Equeue.Reject (* no handler has accepted the event, so reject *)
 
817
      with
 
818
          Exit_loop -> ()
 
819
    in
 
820
 
 
821
    if !Unixqueue_util.Debug.enable then
 
822
      dlogr
 
823
        (fun () ->
 
824
           (sprintf "uq_handler <event %s>"
 
825
              (string_of_event ev)));
 
826
 
 
827
    match ev with
 
828
      | Extra (Term g) ->
 
829
          (* Terminate all handlers of group g *)
 
830
          while_locked mutex
 
831
            (fun () ->
 
832
               if Hashtbl.mem handlers g then (
 
833
                 dlogr
 
834
                   (fun () ->
 
835
                      (sprintf "uq_handler <terminating group %d>" (Oo.id g)));
 
836
                 Hashtbl.remove handlers g;
 
837
                 handled_groups := !handled_groups - 1;
 
838
                 (*
 
839
                 if handled_groups = 0 then (
 
840
                   dlogr (fun () -> "uq_handler <self-terminating>");
 
841
                   raise Equeue.Terminate  (* delete uq_handler from esys *)
 
842
                 )
 
843
                  *)
 
844
               )
 
845
               else raise Equeue.Reject (* strange, should not happen *)
 
846
            )
 
847
      | Extra Keep_alive ->
 
848
          raise Equeue.Reject
 
849
      | Input_arrived(g,_) ->
 
850
          if g # is_terminating then raise Equeue.Reject;
 
851
          forward_event g;
 
852
      | Output_readiness(g,_) ->
 
853
          if g # is_terminating then raise Equeue.Reject;
 
854
          forward_event g;
 
855
      | Out_of_band(g,_) ->
 
856
          if g # is_terminating then raise Equeue.Reject;
 
857
          forward_event g;
 
858
      | Timeout(g,_) ->
 
859
          if g # is_terminating then raise Equeue.Reject;
 
860
          forward_event g;
 
861
      | Signal ->
 
862
          forward_event_to_all();
 
863
      | Extra x ->
 
864
          forward_event_to_all();
 
865
      | Immediate(g,f) ->
 
866
          if g # is_terminating then raise Equeue.Reject;
 
867
          ( try f()
 
868
            with Equeue.Terminate -> ()
 
869
          )
 
870
 
 
871
  method private equeue_add_handler () =
 
872
    Equeue.add_handler (Lazy.force !sys) self#uq_handler
 
873
 
 
874
  (* CHECK: There is a small difference between Equeue.add_handler and
 
875
   * this add_handler: Here, the handler is immediately active (if
 
876
   * uq_handler is already active). Can this lead to problems?
 
877
   *)
 
878
 
 
879
  method add_handler g h =
 
880
    while_locked mutex
 
881
      (fun () ->
 
882
         let oh = new ohandler h in
 
883
         dlogr
 
884
           (fun () ->
 
885
              (sprintf
 
886
                 "add_handler <group %d, handler %d>" (Oo.id g) (Oo.id oh)));
 
887
         
 
888
         if g # is_terminating then
 
889
           invalid_arg "Unixqueue.add_handler: the group is terminated";
 
890
         
 
891
         ( try
 
892
             let old_handlers = Hashtbl.find handlers g in
 
893
             Hashtbl.replace handlers g (oh :: old_handlers)
 
894
           with
 
895
             | Not_found ->
 
896
                 (* The group g is new *)
 
897
                 Hashtbl.add handlers g [oh];
 
898
                 handled_groups := !handled_groups + 1;
 
899
                 (* if handled_groups = 1 then
 
900
                   self#equeue_add_handler ()
 
901
                  *)
 
902
         )
 
903
      )
 
904
 
 
905
  method clear g =
 
906
    if Oo.id g = nogroup_id then
 
907
      invalid_arg "Unixqueue.clear: nogroup";
 
908
    while_locked mutex
 
909
      (fun () -> self # clear_wl g)
 
910
 
 
911
 
 
912
  method private clear_wl g =
 
913
    dlogr (fun () -> (sprintf "clear <group %d>" (Oo.id g)));
 
914
    
 
915
    (* Set that g is terminating now: *)
 
916
    g # terminate();
 
917
    
 
918
    (* (i) delete all resources of g: *)
 
919
    let ops = 
 
920
      try Hashtbl.find ops_of_group g with Not_found -> OpSet.empty in
 
921
    OpSet.iter
 
922
      self#sched_remove_wl
 
923
      ops;
 
924
    OpSet.iter
 
925
      self#pset_remove_wl
 
926
      ops;
 
927
    Hashtbl.remove ops_of_group g;
 
928
 
 
929
    (* (ii) delete all handlers of g: *)
 
930
    add_event_wl (Extra (Term g));
 
931
    (* side effect: we also interrupt [wait] *)
 
932
 
 
933
    (* (iii) delete special actions of g: *)
 
934
    let to_remove =   (* remove from close_tab *)
 
935
      Hashtbl.fold
 
936
        (fun d (g',_) l -> if g = g' then d :: l else l) close_tab [] in
 
937
    List.iter
 
938
      (Hashtbl.remove close_tab) to_remove;
 
939
    
 
940
    Hashtbl.remove abort_tab g;
 
941
 
 
942
    (* Note: the Term event isn't caught after all handlers have been
 
943
     * deleted. The Equeue module simply discards events that are not
 
944
     * handled.
 
945
     *)
 
946
 
 
947
    if not !waiting && OpTbl.length tmo_of_op = 0 then
 
948
      pset # dispose();
 
949
 
 
950
 
 
951
  method private abort g ex =
 
952
    (* caller doesn't have the lock *)
 
953
    (* Note: If g has been terminated, the abort action is removed. So
 
954
     * we will never find here one.
 
955
     *)
 
956
    dlogr (fun () -> (sprintf "abort <group %d, exception %s>"
 
957
                         (Oo.id g) (Netexn.to_string ex)));
 
958
    let action =
 
959
      while_locked mutex
 
960
        (fun () ->
 
961
           try Some (Hashtbl.find abort_tab g) with Not_found -> None) in
 
962
    match action with
 
963
      | Some a ->
 
964
          begin
 
965
            dlogr (fun () -> "abort <running abort action>");
 
966
            let mistake = ref None in
 
967
            while_locked mutex 
 
968
              (fun () -> aborting := true);
 
969
            begin try
 
970
              a g ex;
 
971
            with
 
972
              | any ->
 
973
                  mistake := Some any (* Wow *)
 
974
            end;
 
975
            while_locked mutex 
 
976
              (fun () ->
 
977
                 self#clear_wl g;
 
978
                 aborting := false
 
979
              );
 
980
            match !mistake with
 
981
              | None -> ()
 
982
              | Some m ->
 
983
                  dlogr (fun () -> (sprintf "abort <propagating exception %s>"
 
984
                                       (Netexn.to_string m)));
 
985
                  raise m
 
986
          end
 
987
      | None ->
 
988
          ()
 
989
 
 
990
  method run () =
 
991
    (* caller doesn't have the lock *)
 
992
    let continue = ref true in
 
993
    try
 
994
      while !continue do
 
995
        continue := false;
 
996
        try
 
997
          Equeue.run (Lazy.force !sys);
 
998
        with
 
999
          | Abort (g,an_exception) ->
 
1000
              begin
 
1001
                match an_exception with
 
1002
                  | (Equeue.Reject|Equeue.Terminate) ->
 
1003
                      (* A serious programming error: *)
 
1004
                      failwith "Caught 'Abort' exception with Reject or Terminate exception as argument; this is a programming error"
 
1005
                  | Abort(_,_) ->
 
1006
                      failwith "Caught 'Abort' exception with an 'Abort' exception as argument; this is a programming error"
 
1007
                  | _ -> ()
 
1008
              end;
 
1009
              self#abort g an_exception;
 
1010
              continue := true
 
1011
      done;
 
1012
    with
 
1013
      | error ->
 
1014
          raise error
 
1015
 
 
1016
  method is_running =
 
1017
    Equeue.is_running (Lazy.force !sys)
 
1018
 
 
1019
 
 
1020
  method when_blocking f =
 
1021
    when_blocking := f
 
1022
 
 
1023
end
 
1024
 
 
1025
 
 
1026
let pollset_event_system pset = 
 
1027
  (new pollset_event_system pset :> Unixqueue_util.event_system)
 
1028