1
(* $Id: unixqueue_pollset.ml 1624 2011-06-13 12:27:58Z gerd $ *)
3
(* pset # dispose: we try to call it when there are no more operations in
4
the queue. This is tested when
5
- the resource is removed (and we are not waiting)
6
- the group is cleared (and we are not waiting)
8
Note that we must not call [dispose] while [wait] is running!
17
(* let compare = ( Pervasives.compare : float -> float -> int ) *)
18
let compare (x:float) y =
19
if x < y then (-1) else if x = y then 0 else 1
20
(* does not work for non-normal numbers but we don't care *)
24
module FloatMap = Map.Make(Float)
27
let nogroup_id = Oo.id nogroup
30
(* In ocaml-3.12 there is already a function for this in Map *)
34
(fun t _ -> k := Some t; raise Exit)
40
| None -> raise Not_found
43
let ops_until tmax m =
44
(* Look into the FloatMap m, and return all ops for which
51
if t > tmax then raise Exit;
61
exception Term of Unixqueue_util.group
62
(* Extra (Term g) is now the group termination event for g *)
65
(* Sometimes used to keep the event system alive *)
71
Netexn.register_printer
78
"Term(" ^ string_of_int (Oo.id g) ^ ")"
83
let pset_set (pset:Netsys_pollset.pollset) fd (i,o,p) =
84
if not i && not o && not p then
87
pset # add fd (Netsys_posix.poll_req_events i o p)
90
let pset_find (pset:Netsys_pollset.pollset) fd =
91
try Netsys_posix.poll_req_triple(pset#find fd)
93
| Not_found -> (false,false,false)
99
| Unixqueue_util.Input_arrived(_,fd) -> Unixqueue_util.Wait_in fd
100
| Unixqueue_util.Output_readiness(_,fd) -> Unixqueue_util.Wait_out fd
101
| Unixqueue_util.Out_of_band(_,fd) -> Unixqueue_util.Wait_oob fd
102
| Unixqueue_util.Timeout(_,op) -> op
105
let rec list_mem_op op l =
108
is_op_eq h op || list_mem_op op t
112
let while_locked mutex f =
113
Netsys_oothr.serialize mutex f ()
116
let escape_lock mutex f =
120
with e -> mutex # lock(); raise e in
125
let flatten_map f l =
126
(* = List.flatten (List.map f l) *)
132
loop l' (List.rev_append y h) in
136
(* A little encapsulation so we can easily identify handlers by Oo.id *)
137
class ohandler (h:handler) =
139
method run esys eq ev =
144
class pollset_event_system (pset : Netsys_pollset.pollset) =
145
let mtp = !Netsys_oothr.provider in
146
let is_mt = not (mtp#single_threaded) in
147
let sys = ref (lazy (assert false)) in (* initialized below *)
149
(Hashtbl.create 10 : (group, OpSet.t) Hashtbl.t) in
150
(* is for [clear] only *)
152
(OpTbl.create 10 : (float * float * group * bool) OpTbl.t) in
153
(* first number: duration of timeout (or -1)
154
second number: point in time (or -1)
157
let ops_of_tmo = ref(FloatMap.empty : OpSet.t FloatMap.t) in
158
let strong_ops = ref 0 in (* number of strong (non-weak) ops *)
160
let aborting = ref false in
163
(Unix.file_descr, (group * (Unix.file_descr -> unit))) Hashtbl.t) in
165
(Hashtbl.create 10 : (group, (group -> exn -> unit)) Hashtbl.t) in
166
let handlers = (Hashtbl.create 10 : (group, ohandler list) Hashtbl.t) in
167
let handled_groups = ref 0 in
168
(* the number of keys in [handlers] *)
170
let waiting = ref false in
171
let when_blocking = ref (fun () -> ()) in
173
let mutex = mtp # create_mutex() in
176
let event_of_op_wl_nf op =
177
let (_,_,g,_) = OpTbl.find tmo_of_op op in (* or Not_found *)
179
| Unixqueue_util.Wait_in fd ->
180
Unixqueue_util.Input_arrived(g,fd)
181
| Unixqueue_util.Wait_out fd ->
182
Unixqueue_util.Output_readiness(g,fd)
183
| Unixqueue_util.Wait_oob fd ->
184
Unixqueue_util.Out_of_band(g,fd)
185
| Unixqueue_util.Wait _ ->
188
let events_of_op_wl op =
190
[ event_of_op_wl_nf op ]
193
(* A "ghost event", i.e. there is no handler anymore
194
for it, but wasn't deleted quickly enough from
199
let tmo_event_of_op_wl_nf op =
200
let (_,_,g,_) = OpTbl.find tmo_of_op op in (* or Not_found *)
201
Unixqueue_util.Timeout(g,op) in
203
let tmo_events_of_op_wl op =
205
[ tmo_event_of_op_wl_nf op ]
207
| Not_found -> [] (* Ghost event, see above *) in
211
Equeue.add_event (Lazy.force !sys) e;
212
pset # cancel_wait true
213
(* Set the cancel bit, so that any pending [wait] is interrupted *) in
219
let equeue_sys = Equeue.create ~string_of_event self#source in
220
sys := lazy equeue_sys;
221
ignore(Lazy.force !sys);
222
(* Add ourselves now at object creation time. The only drawback is that
223
we no longer raise [Equeue.Out_of_handlers] - but this is questionable
224
anyway since the addition of [Immediate] events.
226
In order to generate [Out_of_handlers] we would have to count
227
[Immediate] events in addition to handlers.
229
self#equeue_add_handler ()
233
method private source _sys =
234
(* locking: the lock is not held when called, because we are called back
237
assert(Lazy.force !sys == _sys);
241
let dbg = !Unixqueue_util.Debug.enable in
243
let locked = ref true in (* keep track of locking state *)
247
let t0 = Unix.gettimeofday() in
248
let tmin = try min_key !ops_of_tmo with Not_found -> (-1.0) in
249
let delta = if tmin < 0.0 then (-1.0) else max (tmin -. t0) 0.0 in
253
sprintf "t0 = %f, #tmo_of_op = %d"
254
t0 (OpTbl.length tmo_of_op)
258
(* For this test only non-weak resources count, so... *)
261
let have_eintr = ref false in
265
if nothing_to_do then (
267
dlogr (fun () -> "nothing_to_do");
274
OpTbl.fold (fun op _ l -> op::l) tmo_of_op [] in
276
String.concat ";" (List.map string_of_op ops) in
277
sprintf "wait tmo=%f ops=<%s>" delta op_str));
278
(* Reset the cancel bit immediately before calling [wait]. Any
279
event added up to now is considered anyway by [wait] because
280
our lock is still held. Any new event added after we unlock will
281
set the cancel_wait flag, and cause the [wait] to exit (if it is
284
pset # cancel_wait false;
292
| Unix.Unix_error(Unix.EINTR,_,_) ->
294
dlogr (fun () -> "wait signals EINTR");
298
(* Usually from [wait], but one never knows... *)
299
if !locked && is_mt then mutex#unlock();
305
if not !locked && is_mt then mutex # lock();
308
(* Catch exceptions and unlock *)
312
sprintf "wait returns <%d pset events>"
313
(List.length pset_events)));
315
let t1 = Unix.gettimeofday() in
317
dlogr (fun () -> (sprintf "t1 = %f" t1));
318
(* t1 is the reference for determining the timeouts *)
320
(* while waiting somebody might have removed resouces, so ... *)
321
if OpTbl.length tmo_of_op = 0 then
324
let operations = (* flatten_map *)
325
(* The possible operations *)
327
(fun acc (fd,ev_in,ev_out) ->
328
(* Note that POLLHUP and POLLERR can also mean that we
329
have data to read/write!
331
let (in_rd,in_wr,in_pri) =
332
Netsys_posix.poll_req_triple ev_in in
334
Netsys_posix.poll_rd_result ev_out in
336
Netsys_posix.poll_wr_result ev_out in
338
Netsys_posix.poll_pri_result ev_out in
340
Netsys_posix.poll_hup_result ev_out in
342
Netsys_posix.poll_err_result ev_out in
344
in_rd && (out_rd || out_hup || out_err) in
346
in_wr && (out_wr || out_hup || out_err) in
348
in_pri && (out_pri || out_hup || out_err) in
350
if have_pri then Unixqueue_util.Wait_oob fd :: acc else acc in
352
if have_input then Unixqueue_util.Wait_in fd :: e1 else e1 in
354
if have_output then Unixqueue_util.Wait_out fd :: e2 else e2 in
361
ops_until t1 !ops_of_tmo in
362
(* Note: this _must_ include operations until <= t1 (not <t1), otherwise
363
a timeout value of 0.0 won't work
366
let ops_timed_out_l =
367
(* Determine the operations in [tmo_of_op] that have timed
368
out and that are not in [operations]
369
FIXME: [List.mem op operations] is not scalable.
372
(fun oacc (_, ops) ->
375
if list_mem_op op operations then iacc else op::iacc)
385
sprintf "delivering events <%s>"
389
List.map string_of_event (events_of_op_wl op)
395
sprintf "delivering timeouts <%s>"
399
List.map string_of_event (tmo_events_of_op_wl op)
406
let delivered = ref false in
407
let deliver get_ev op =
409
let ev = get_ev op in
411
Equeue.add_event _sys ev
412
with Not_found -> () in
413
List.iter (deliver event_of_op_wl_nf) operations;
414
List.iter (deliver tmo_event_of_op_wl_nf) ops_timed_out_l;
416
if !have_eintr then (
417
dlogr (fun () -> "delivering Signal");
418
Equeue.add_event _sys Unixqueue_util.Signal
420
if not !delivered && not nothing_to_do then (
421
(* Ensure we always add an event to keep the event loop running: *)
423
dlogr (fun () -> "delivering Keep_alive");
424
Equeue.add_event _sys (Unixqueue_util.Extra Keep_alive)
427
(* Update ops_of_tmo: *)
430
ops_of_tmo := FloatMap.remove t !ops_of_tmo
434
(* Set a new timeout for all delivered events:
435
(Note that [pset] remains unchanged, because the set of watched
436
resources remains unchanged.)
437
rm_done is true when the old timeout is already removed from
440
let update_tmo rm_done oplist =
444
let (tmo,t1,g,is_strong) =
445
OpTbl.find tmo_of_op op in (* or Not_found *)
447
let t2 = t1 +. tmo in
448
self#sched_upd_tmo_wl g op tmo t2 is_strong
449
(if rm_done then (-1.0) else t1)
453
(* It is possible that resources were removed while
454
we were waiting for events. This can lead to
455
[Not_found] here. We just ignore this.
459
update_tmo false operations;
460
update_tmo true ops_timed_out_l;
462
if is_mt then mutex # unlock();
467
(* exceptions are unexpected, but we want to make sure not to mess
470
if !locked && is_mt then mutex # unlock();
474
(* Note: suffix _wl = while locked *)
476
method private sched_remove_wl op =
478
let tmo, t1, g, is_strong = OpTbl.find tmo_of_op op in (* or Not_found *)
479
dlogr(fun () -> (sprintf "sched_remove %s" (string_of_op op)));
480
OpTbl.remove tmo_of_op op;
481
if is_strong then decr strong_ops;
485
FloatMap.find t1 !ops_of_tmo
486
else raise Not_found in
488
OpSet.remove op l_ops in
489
if l_ops' = OpSet.empty then
490
ops_of_tmo := FloatMap.remove t1 !ops_of_tmo
492
ops_of_tmo := FloatMap.add t1 l_ops' !ops_of_tmo
495
if Oo.id g <> nogroup_id then (
496
let old_set = Hashtbl.find ops_of_group g in
497
let new_set = OpSet.remove op old_set in
498
if new_set = OpSet.empty then
499
Hashtbl.remove ops_of_group g
501
Hashtbl.replace ops_of_group g new_set
508
method private pset_remove_wl op =
511
let (i,o,p) = pset_find pset fd in
512
pset_set pset fd (false,o,p)
514
let (i,o,p) = pset_find pset fd in
515
pset_set pset fd (i,false,p)
517
let (i,o,p) = pset_find pset fd in
518
pset_set pset fd (i,o,false)
523
method private sched_add_wl g op tmo t1 is_strong =
524
dlogr(fun () -> (sprintf "sched_add %s tmo=%f t1=%f is_strong=%b"
525
(string_of_op op) tmo t1 is_strong));
526
OpTbl.replace tmo_of_op op (tmo, t1, g, is_strong);
530
try FloatMap.find t1 !ops_of_tmo with Not_found -> OpSet.empty in
532
ops_of_tmo := FloatMap.add t1 (OpSet.add op l_ops) !ops_of_tmo;
533
if Oo.id g <> nogroup_id then (
535
try Hashtbl.find ops_of_group g with Not_found -> OpSet.empty in
537
OpSet.add op old_set in
538
Hashtbl.replace ops_of_group g new_set
541
method private pset_add_wl op =
544
let (i,o,p) = pset_find pset fd in
545
pset_set pset fd (true,o,p)
547
let (i,o,p) = pset_find pset fd in
548
pset_set pset fd (i,true,p)
550
let (i,o,p) = pset_find pset fd in
551
pset_set pset fd (i,o,true)
555
method private sched_upd_tmo_wl g op tmo t1 is_strong old_t1 =
556
(* only for tmo>=0 *)
557
dlogr(fun () -> (sprintf "sched_upd_tmo %s tmo=%f t1=%f is_strong=%b"
558
(string_of_op op) tmo t1 is_strong));
559
OpTbl.replace tmo_of_op op (tmo, t1, g, is_strong);
561
(* We assume old_t1 is already removed form ops_of_tmo if old_t1 < 0 *)
562
if old_t1 >= 0.0 then (
565
FloatMap.find old_t1 !ops_of_tmo in
567
OpSet.remove op l_ops in
568
if l_ops' = OpSet.empty then
569
ops_of_tmo := FloatMap.remove old_t1 !ops_of_tmo
571
ops_of_tmo := FloatMap.add old_t1 l_ops' !ops_of_tmo
576
try FloatMap.find t1 !ops_of_tmo with Not_found -> OpSet.empty in
577
ops_of_tmo := FloatMap.add t1 (OpSet.add op l_ops_new) !ops_of_tmo
579
method exists_resource op =
581
(fun () -> self # exists_resource_wl op)
583
method private exists_resource_wl op =
584
OpTbl.mem tmo_of_op op
587
method private exists_descriptor_wl fd =
588
self#exists_resource_wl (Unixqueue_util.Wait_in fd) ||
589
self#exists_resource_wl (Unixqueue_util.Wait_out fd) ||
590
self#exists_resource_wl (Unixqueue_util.Wait_oob fd)
593
method add_resource g (op, tmo) =
596
self # add_resource_wl g (op, tmo) true
599
method add_weak_resource g (op, tmo) =
602
self # add_resource_wl g (op, tmo) false
606
method private add_resource_wl g (op, tmo) is_strong =
607
if g # is_terminating then
608
invalid_arg "Unixqueue.add_resource: the group is terminated";
609
if not (OpTbl.mem tmo_of_op op) then (
611
let t1 = if tmo < 0.0 then tmo else Unix.gettimeofday() +. tmo in
612
self#sched_add_wl g op tmo t1 is_strong;
613
(* Multi-threading: interrupt [wait] *)
614
pset # cancel_wait true;
615
(* CHECK: In the select-based impl we add Keep_alive to equeue.
616
This idea (probably): If [wait] is about to return, and the
617
queue becomes empty, the whole esys terminates. The Keep_alive
619
My current thinking is that this delays the race condition
620
a bit, but does not prevent it from happening
623
(* Note: The second addition of a resource is silently ignored...
624
Maybe this should be fixed, so that the timeout can be lowered
628
method remove_resource g op =
631
if g # is_terminating then
632
invalid_arg "remove_resource: the group is terminated";
633
let _, t1, g_found, _ = OpTbl.find tmo_of_op op in
634
if Oo.id g <> Oo.id g_found then
635
failwith "remove_resource: descriptor belongs to different group";
636
self#sched_remove_wl op;
637
self#pset_remove_wl op;
639
if not !waiting && OpTbl.length tmo_of_op = 0 then
642
pset # cancel_wait true; (* interrupt [wait] *)
643
(* is there a close action ? *)
646
| Wait_in d -> Some d
647
| Wait_out d -> Some d
648
| Wait_oob d -> Some d
652
if not !aborting then (
654
try Some (snd(Hashtbl.find close_tab fd))
655
with Not_found -> None in
658
(* any open resource? *)
659
(* FIXME MT: We don't know yet whether fd can be closed.
660
This shouldn't be done before [wait] returns.
662
if not (self#exists_descriptor_wl fd) then (
665
(sprintf "remove_resource \
666
<running close action for fd %s>"
668
Hashtbl.remove close_tab fd;
669
escape_lock mutex (fun () -> a fd);
678
method new_group () =
681
method new_wait_id () =
685
method add_close_action g (d,a) =
688
if g # is_terminating then
689
invalid_arg "add_close_action: the group is terminated";
690
(* CHECK: Maybe we should fail if g is a different group than
693
if self#exists_descriptor_wl d then
694
Hashtbl.replace close_tab d (g,a)
695
(* There can be only one close action.
696
* TODO: Rename to set_close_action
699
failwith "add_close_action"
703
method add_abort_action g a =
706
if g # is_terminating then
707
invalid_arg "add_abort_action: the group is terminated";
708
if Oo.id g = nogroup_id then
709
invalid_arg "add_abort_action: the group is nogroup";
710
Hashtbl.replace abort_tab g a
715
(fun () -> add_event_wl e)
718
method private uq_handler (esys : event Equeue.t) ev =
719
(* locking: it is assumed that we do not have the lock when uq_handler
720
is called. It is a callback from Equeue
722
(* The single Unixqueue handler. For all added (sub) handlers, uq_handler
723
* gets the events, and delivers them
726
let terminate_handler_wl g h =
727
if !Unixqueue_util.Debug.enable then
730
(sprintf "uq_handler <terminating handler group %d, handler %d>"
731
(Oo.id g) (Oo.id h)));
733
try Hashtbl.find handlers g with Not_found -> [] in
735
List.filter (fun h' -> Oo.id h' <> Oo.id h) hlist in
736
if hlist' = [] then (
737
Hashtbl.remove handlers g;
738
handled_groups := !handled_groups - 1;
740
if handled_groups = 0 then (
741
dlogr (fun () -> "uq_handler <self-terminating>");
742
raise Equeue.Terminate (* delete uq_handler from esys *)
746
Hashtbl.replace handlers g hlist'
750
let rec forward_event_to g (hlist : ohandler list) =
751
(* The caller does not have the lock when this fn is called! *)
754
if !Unixqueue_util.Debug.enable then
755
dlogr (fun () -> "uq_handler <empty list>");
759
(* Note: ues _must not_ be locked now *)
760
if !Unixqueue_util.Debug.enable then
764
"uq_handler <invoke handler group %d, handler %d>"
765
(Oo.id g) (Oo.id h)));
766
h#run (self :> event_system) esys ev;
767
if !Unixqueue_util.Debug.enable then
771
"uq_handler <invoke_success handler group %d, handler %d>"
772
(Oo.id g) (Oo.id h)));
775
forward_event_to g hlist'
776
| Equeue.Terminate ->
777
(* Terminate only this handler. *)
779
(fun () -> terminate_handler_wl g h)
780
(* Any error exceptions simply fall through. Equeue will
781
* catch them, and will add the event to the error queue
786
let forward_event g =
787
(* The caller does not have the lock when this fn is called! *)
788
if !Unixqueue_util.Debug.enable then
791
(sprintf "uq_handler <forward_event group %d>" (Oo.id g)));
795
try Hashtbl.find handlers g with Not_found -> []) in
796
forward_event_to g hlist
799
let forward_event_to_all() =
800
(* The caller does not have the lock when this fn is called! *)
804
Hashtbl.fold (fun g hlist l -> (g,hlist) :: l) handlers []) in
809
forward_event_to g hlist;
810
raise Exit_loop (* event is delivered, so exit iteration *)
812
(* event is rejected: try next group *)
816
raise Equeue.Reject (* no handler has accepted the event, so reject *)
821
if !Unixqueue_util.Debug.enable then
824
(sprintf "uq_handler <event %s>"
825
(string_of_event ev)));
829
(* Terminate all handlers of group g *)
832
if Hashtbl.mem handlers g then (
835
(sprintf "uq_handler <terminating group %d>" (Oo.id g)));
836
Hashtbl.remove handlers g;
837
handled_groups := !handled_groups - 1;
839
if handled_groups = 0 then (
840
dlogr (fun () -> "uq_handler <self-terminating>");
841
raise Equeue.Terminate (* delete uq_handler from esys *)
845
else raise Equeue.Reject (* strange, should not happen *)
847
| Extra Keep_alive ->
849
| Input_arrived(g,_) ->
850
if g # is_terminating then raise Equeue.Reject;
852
| Output_readiness(g,_) ->
853
if g # is_terminating then raise Equeue.Reject;
855
| Out_of_band(g,_) ->
856
if g # is_terminating then raise Equeue.Reject;
859
if g # is_terminating then raise Equeue.Reject;
862
forward_event_to_all();
864
forward_event_to_all();
866
if g # is_terminating then raise Equeue.Reject;
868
with Equeue.Terminate -> ()
871
method private equeue_add_handler () =
872
Equeue.add_handler (Lazy.force !sys) self#uq_handler
874
(* CHECK: There is a small difference between Equeue.add_handler and
875
* this add_handler: Here, the handler is immediately active (if
876
* uq_handler is already active). Can this lead to problems?
879
method add_handler g h =
882
let oh = new ohandler h in
886
"add_handler <group %d, handler %d>" (Oo.id g) (Oo.id oh)));
888
if g # is_terminating then
889
invalid_arg "Unixqueue.add_handler: the group is terminated";
892
let old_handlers = Hashtbl.find handlers g in
893
Hashtbl.replace handlers g (oh :: old_handlers)
896
(* The group g is new *)
897
Hashtbl.add handlers g [oh];
898
handled_groups := !handled_groups + 1;
899
(* if handled_groups = 1 then
900
self#equeue_add_handler ()
906
if Oo.id g = nogroup_id then
907
invalid_arg "Unixqueue.clear: nogroup";
909
(fun () -> self # clear_wl g)
912
method private clear_wl g =
913
dlogr (fun () -> (sprintf "clear <group %d>" (Oo.id g)));
915
(* Set that g is terminating now: *)
918
(* (i) delete all resources of g: *)
920
try Hashtbl.find ops_of_group g with Not_found -> OpSet.empty in
927
Hashtbl.remove ops_of_group g;
929
(* (ii) delete all handlers of g: *)
930
add_event_wl (Extra (Term g));
931
(* side effect: we also interrupt [wait] *)
933
(* (iii) delete special actions of g: *)
934
let to_remove = (* remove from close_tab *)
936
(fun d (g',_) l -> if g = g' then d :: l else l) close_tab [] in
938
(Hashtbl.remove close_tab) to_remove;
940
Hashtbl.remove abort_tab g;
942
(* Note: the Term event isn't caught after all handlers have been
943
* deleted. The Equeue module simply discards events that are not
947
if not !waiting && OpTbl.length tmo_of_op = 0 then
951
method private abort g ex =
952
(* caller doesn't have the lock *)
953
(* Note: If g has been terminated, the abort action is removed. So
954
* we will never find here one.
956
dlogr (fun () -> (sprintf "abort <group %d, exception %s>"
957
(Oo.id g) (Netexn.to_string ex)));
961
try Some (Hashtbl.find abort_tab g) with Not_found -> None) in
965
dlogr (fun () -> "abort <running abort action>");
966
let mistake = ref None in
968
(fun () -> aborting := true);
973
mistake := Some any (* Wow *)
983
dlogr (fun () -> (sprintf "abort <propagating exception %s>"
984
(Netexn.to_string m)));
991
(* caller doesn't have the lock *)
992
let continue = ref true in
997
Equeue.run (Lazy.force !sys);
999
| Abort (g,an_exception) ->
1001
match an_exception with
1002
| (Equeue.Reject|Equeue.Terminate) ->
1003
(* A serious programming error: *)
1004
failwith "Caught 'Abort' exception with Reject or Terminate exception as argument; this is a programming error"
1006
failwith "Caught 'Abort' exception with an 'Abort' exception as argument; this is a programming error"
1009
self#abort g an_exception;
1017
Equeue.is_running (Lazy.force !sys)
1020
method when_blocking f =
1026
let pollset_event_system pset =
1027
(new pollset_event_system pset :> Unixqueue_util.event_system)