1
(* $Id: uq_io.ml 1665 2011-08-30 17:37:57Z gerd $ *)
3
open Uq_engines.Operators
8
| `Memory of Netsys_mem.memory
12
class type obj_buffer =
15
method blit_out : int -> string_like -> int -> int -> unit
16
method delete_hd : int -> unit
17
method index_from : int -> char -> int
18
method add : string_like -> int -> int -> unit
19
method advance : int -> unit
20
method page_for_additions : string_like * int * int
21
method page_for_consumption : string_like * int * int
22
method clear : unit -> unit
26
class type ['in_device] in_buffer_pre =
28
method buffer : obj_buffer
30
method set_eof : unit -> unit
31
method start_fill_e : unit -> bool Uq_engines.engine
32
method fill_e_opt : bool Uq_engines.engine option
33
(* The current fill engine, or None *)
34
method udevice : 'in_device
35
method shutdown_e : unit -> unit Uq_engines.engine
36
method inactivate : unit -> unit
37
method event_system : Unixqueue.event_system
41
class type ['out_device] out_buffer_pre =
43
method buffer : obj_buffer
45
method max : int option
46
method start_flush_e : unit -> unit Uq_engines.engine
47
method flush_e_opt : unit Uq_engines.engine option
48
(* The current flush engine, or None *)
49
method write_eof_e : unit -> bool Uq_engines.engine
50
(* The buffer must be empty before [write_eof_e] *)
51
method udevice : 'out_device option
52
method shutdown_e : float option -> unit Uq_engines.engine
53
method inactivate : unit -> unit
54
method event_system : Unixqueue.event_system
59
[ `Polldescr of Netsys.fd_style * Unix.file_descr * Unixqueue.event_system
60
| `Multiplex of Uq_engines.multiplex_controller
61
| `Async_in of Uq_engines.async_in_channel * Unixqueue.event_system
62
| `Buffer_in of in_device in_buffer_pre
63
| `Count_in of (int -> unit) * in_device
68
[ `Polldescr of Netsys.fd_style * Unix.file_descr * Unixqueue.event_system
69
| `Multiplex of Uq_engines.multiplex_controller
70
| `Async_out of Uq_engines.async_out_channel * Unixqueue.event_system
71
| `Buffer_out of out_device out_buffer_pre
72
| `Count_out of (int -> unit) * out_device
76
type in_buffer = in_device in_buffer_pre
77
type out_buffer = out_device out_buffer_pre
81
[ `Buffer_in of in_buffer ]
83
type inout_device = [ in_device | out_device ]
87
exception Line_too_long
90
let rec device_esys0 =
92
| `Polldescr(_,_,esys) -> esys
93
| `Multiplex mplex -> mplex#event_system
94
| `Async_in(_,esys) -> esys
95
| `Async_out(_,esys) -> esys
96
| `Buffer_in b -> b#event_system
97
| `Buffer_out b -> b#event_system
98
| `Count_in(_,d) -> device_esys0 (d :> inout_device)
99
| `Count_out(_,d) -> device_esys0 (d :> inout_device)
103
device_esys0 (d :> inout_device)
111
let rec device_supports_memory0 =
113
| `Polldescr(style,_,_) ->
115
| `Read_write | `Recv_send _ | `Recv_send_implied ->
120
| `Multiplex mplex ->
121
mplex # mem_supported
122
| `Async_in(_,esys) ->
124
| `Async_out(_,esys) ->
130
| `Count_in(_,d) -> device_supports_memory0 (d : in_device :> inout_device)
131
| `Count_out(_,d) -> device_supports_memory0 (d : out_device :> inout_device)
133
let device_supports_memory d =
134
device_supports_memory0 (d :> inout_device)
137
let mem_gread style fd m pos len =
140
Netsys_mem.mem_read fd m pos len
141
| `Recv_send _ | `Recv_send_implied ->
142
Netsys_mem.mem_recv fd m pos len []
144
failwith ("Uq_io: This fd style does not support `Memory: " ^
145
Netsys.string_of_fd_style style)
148
let mem_gwrite style fd m pos len =
151
Netsys_mem.mem_write fd m pos len
152
| `Recv_send _ | `Recv_send_implied ->
153
Netsys_mem.mem_send fd m pos len []
155
failwith ("Uq_io: This fd style does not support `Memory: " ^
156
Netsys.string_of_fd_style style)
159
let ach_input_e ch esys s pos len =
160
(* case: async channel *)
162
let (e, signal) = Uq_engines.signal_engine esys in
164
let rec wait_for_input () =
166
let n = ch # input s pos len in
167
if n > 0 || len = 0 then
170
ch # request_notification
177
| error -> signal (`Error error)
184
let rec buf_input_e b ms pos len =
185
let bl = b#buffer#length in
186
if bl > 0 || len = 0 then (
187
let n = min len bl in
188
b#buffer#blit_out 0 ms pos n;
189
b#buffer#delete_hd n;
190
eps_e (`Done n) b#event_system
193
eps_e (`Error End_of_file) b#event_system
195
(* Optimization: if len is quite large, bypass the buffer *)
197
if len >= 4096 && (device_supports_memory d || is_string ms) then
198
dev_input_e d ms pos len
200
| `Error End_of_file ->
201
b#set_eof(); `Error End_of_file
206
match b#fill_e_opt with
207
| None -> b#start_fill_e ()
209
fe ++ (fun _ -> buf_input_e b ms pos len)
213
and dev_input_e (d : in_device) ms pos len =
215
| `Polldescr(style, fd, esys) ->
216
new Uq_engines.input_engine
220
let n = Netsys.gread style fd s pos len in
221
if len > 0 && n = 0 then raise End_of_file;
224
let n = mem_gread style fd m pos len in
225
if len > 0 && n = 0 then raise End_of_file;
230
| `Multiplex mplex ->
231
let (e, signal) = Uq_engines.signal_engine mplex#event_system in
233
if mplex#reading then mplex # cancel_reading() in
236
mplex # start_reading
237
~when_done:(fun xopt n ->
241
| Some Uq_engines.Cancelled ->
242
cancel(); signal `Aborted
243
| Some err -> signal (`Error err)
247
if mplex#mem_supported then
248
mplex # start_mem_reading
249
~when_done:(fun xopt n ->
253
| Some Uq_engines.Cancelled ->
254
cancel(); signal `Aborted
255
| Some err -> signal (`Error err)
261
(Failure "Uq_io: This mplex does not support `Memory"));
265
| `Error e -> `Error e
266
| `Aborted -> cancel(); `Aborted
270
| `Async_in (ch,esys) ->
273
ach_input_e ch esys s pos len
277
(Failure "Uq_io: async channels do not support `Memory"))
282
buf_input_e b ms pos len
285
dev_input_e d ms pos len
287
| `Done n -> c n; `Done n
291
let input_e d0 ms pos len =
292
let d = (d0 :> in_device) in
293
dev_input_e d ms pos len
295
let rec really_input_e d ms pos len =
297
eps_e (`Done ()) (device_esys d)
299
input_e d ms pos len ++
300
(fun n -> really_input_e d ms (pos+n) (len-n))
303
let input_line_e ?(max_len = Sys.max_string_length) (`Buffer_in b) =
305
if k2 > max_len then raise Line_too_long;
306
let s = String.create k1 in
307
b#buffer#blit_out 0 (`String s) 0 k1;
308
b#buffer#delete_hd k2;
310
let rec look_ahead eof =
312
let k = b#buffer#index_from 0 '\n' in
313
let s = consume k (k+1) in
314
eps_e (`Done s) b#event_system
318
let n = b#buffer#length in
320
eps_e (`Error End_of_file) b#event_system
322
let s = consume n n in
323
eps_e (`Done s) b#event_system
328
if b#buffer#length > max_len then
329
eps_e (`Error Line_too_long) b#event_system
332
match b#fill_e_opt with
333
| None -> b#start_fill_e ()
338
eps_e (`Error Line_too_long) b#event_system
343
exception Cont of (unit -> string list Uq_engines.engine)
345
let input_lines_e ?(max_len = Sys.max_string_length) (`Buffer_in b) =
346
let copy_string i l =
347
let s = String.create l in
348
b#buffer#blit_out i (`String s) 0 l;
351
b#buffer#delete_hd k in
352
let rec look_ahead i acc eof =
354
let k = b#buffer#index_from i '\n' in
355
if k-i+1 > max_len then raise Line_too_long;
356
let s = copy_string i (k-i) in
357
raise(Cont(fun () -> look_ahead (k+1) (s::acc) eof))
361
let n = b#buffer#length in
364
eps_e (`Error End_of_file) b#event_system
367
let s = copy_string i (n-i) in
368
if n-i > max_len then raise Line_too_long;
370
eps_e (`Done (List.rev (s :: acc))) b#event_system
377
eps_e (`Done (List.rev acc)) b#event_system
380
if b#buffer#length > max_len then raise Line_too_long;
382
match b#fill_e_opt with
383
| None -> b#start_fill_e ()
385
fe ++ (look_ahead 0 [])
389
eps_e (`Error Line_too_long) b#event_system
390
| Cont f -> (* make the recursion tail-recursive *)
393
look_ahead 0 [] b#eof
396
let ach_output_e ch esys s pos len =
397
(* case: async channel *)
399
let (e, signal) = Uq_engines.signal_engine esys in
401
let rec wait_for_output () =
403
let n = ch # output s pos len in
404
if n > 0 || len = 0 then
407
ch # request_notification
414
| error -> signal (`Error error)
421
let rec buf_output_e b ms pos len =
424
(`Error (Failure "Uq_io: Buffer already closed for new data"))
427
let bl = b#buffer#length in
428
(* Optimization: if len is large, try to bypass the buffer *)
431
bl=0 && len >= 4096 && (device_supports_memory d || is_string ms)
433
dev_output_e d ms pos len
438
| Some m -> max (min len (m - bl)) 0 in
439
if n > 0 || len = 0 then (
440
b#buffer#add ms pos n;
441
eps_e (`Done n) b#event_system
445
match b#flush_e_opt with
446
| None -> b#start_flush_e ()
448
fe ++ (fun _ -> buf_output_e b ms pos len)
453
and dev_output_e (d : out_device) ms pos len =
455
| `Polldescr(style, fd, esys) ->
456
new Uq_engines.output_engine
460
Netsys.gwrite style fd s pos len
462
mem_gwrite style fd m pos len
466
| `Multiplex mplex ->
467
let (e, signal) = Uq_engines.signal_engine mplex#event_system in
469
if mplex#writing then mplex # cancel_writing() in
472
mplex # start_writing
473
~when_done:(fun xopt n ->
475
| None -> signal (`Done n)
476
| Some Uq_engines.Cancelled ->
477
cancel(); signal `Aborted
478
| Some err -> signal (`Error err)
482
if mplex#mem_supported then
483
mplex # start_mem_writing
484
~when_done:(fun xopt n ->
486
| None -> signal (`Done n)
487
| Some Uq_engines.Cancelled ->
488
cancel(); signal `Aborted
489
| Some err -> signal (`Error err)
495
(Failure "Uq_io: This mplex does not support `Memory"));
499
| `Error e -> `Error e
500
| `Aborted -> cancel(); `Aborted
503
| `Async_out (ch,esys) ->
506
ach_output_e ch esys s pos len
510
(Failure "Uq_io: async channels do not support `Memory"))
515
buf_output_e b ms pos len
518
dev_output_e d ms pos len
520
| `Done n -> c n; `Done n
524
let output_e d ms pos len =
525
dev_output_e (d :> out_device) ms pos len
528
let rec really_output_e d ms pos len =
530
eps_e (`Done ()) (device_esys d)
532
output_e d ms pos len ++
533
(fun n -> really_output_e d ms (pos+n) (len-n))
535
let output_string_e d s =
536
really_output_e d (`String s) 0 (String.length s)
538
let output_memory_e d m =
539
really_output_e d (`Memory m) 0 (Bigarray.Array1.dim m)
541
let output_netbuffer_e d b =
542
let s = Netbuffer.unsafe_buffer b in
543
really_output_e d (`String s) 0 (Netbuffer.length b)
546
match (d :> out_device) with
548
( match b#flush_e_opt with
549
| None -> b#start_flush_e ()
553
eps_e (`Done()) (device_esys d)
555
let rec write_eof0_e d =
557
| `Polldescr(style, fd, esys) ->
558
eps_e (`Done false) esys
559
| `Multiplex mplex ->
560
let (e, signal) = Uq_engines.signal_engine mplex#event_system in
562
if mplex#writing then mplex # cancel_writing() in
563
if mplex # supports_half_open_connection then
564
mplex # start_writing_eof
565
~when_done:(fun xopt ->
567
| None -> signal (`Done true)
568
| Some Uq_engines.Cancelled ->
569
cancel(); signal `Aborted
570
| Some error -> signal (`Error error)
574
signal (`Done false);
577
| `Error e -> `Error e
578
| `Aborted -> cancel(); `Aborted
580
| `Async_out (ch,esys) ->
581
eps_e (`Done false) esys
584
(fun () -> b#write_eof_e())
589
write_eof0_e (d :> out_device)
593
let rec shutdown0_e ?linger d =
595
| `Polldescr(style, fd, esys) ->
596
Netsys.gclose style fd;
598
| `Multiplex mplex ->
599
if mplex#reading then
600
mplex#cancel_reading();
601
if mplex#writing then
602
mplex#cancel_writing();
603
let (e, signal) = Uq_engines.signal_engine mplex#event_system in
605
if not mplex#shutting_down then mplex # cancel_shutting_down() in
606
mplex # start_shutting_down
608
~when_done:(fun xopt ->
613
| Some Uq_engines.Cancelled ->
614
cancel(); signal `Aborted
616
signal (`Error error)
621
| `Error e -> `Error e
622
| `Aborted -> cancel(); `Aborted
624
| `Async_in (ch,esys) ->
627
| `Async_out (ch,esys) ->
633
flush_e (`Buffer_out b) ++ (fun _ -> b # shutdown_e linger)
635
shutdown0_e ?linger (d :> [in_device | out_device])
637
shutdown0_e ?linger (d :> [in_device | out_device])
639
let shutdown_e ?linger d =
640
shutdown0_e ?linger (d :> [in_device | out_device])
642
let rec inactivate0 d =
644
| `Polldescr(style, fd, esys) ->
645
Netsys.gclose style fd
646
| `Multiplex mplex ->
648
| `Async_in (ch,esys) ->
650
| `Async_out (ch,esys) ->
657
inactivate0 (d :> inout_device)
659
inactivate0 (d :> inout_device)
662
inactivate0 (d :> inout_device)
664
let mem_obj_buffer small_buffer =
667
Netsys_mem.small_block_size else Netsys_mem.default_block_size in
669
Netpagebuffer.create psize in
671
method length = Netpagebuffer.length buf
672
method blit_out bpos ms pos len =
674
| `String s -> Netpagebuffer.blit_to_string buf bpos s pos len
675
| `Memory m -> Netpagebuffer.blit_to_memory buf bpos m pos len
677
Netpagebuffer.delete_hd buf n
678
method index_from pos c =
679
Netpagebuffer.index_from buf pos c
680
method add ms pos len =
682
| `String s -> Netpagebuffer.add_sub_string buf s pos len
683
| `Memory m -> Netpagebuffer.add_sub_memory buf m pos len
685
Netpagebuffer.advance buf n
686
method page_for_additions =
687
let (m,pos,len) = Netpagebuffer.page_for_additions buf in
688
(`Memory m, pos, len)
689
method page_for_consumption =
690
let (m,pos,len) = Netpagebuffer.page_for_consumption buf in
691
(`Memory m, pos, len)
693
Netpagebuffer.clear buf
697
let str_obj_buffer small_buffer =
699
if small_buffer then 4096 else 65536 in
701
Netbuffer.create bufsize in
703
method length = Netbuffer.length buf
704
method blit_out bpos ms pos len =
706
| `String s -> Netbuffer.blit_to_string buf bpos s pos len
707
| `Memory m -> Netbuffer.blit_to_memory buf bpos m pos len
709
Netbuffer.delete buf 0 n
710
method index_from pos c =
711
Netbuffer.index_from buf pos c
712
method add ms pos len =
714
| `String s -> Netbuffer.add_sub_string buf s pos len
715
| `Memory m -> Netbuffer.add_sub_memory buf m pos len
717
Netbuffer.advance buf n
718
method page_for_additions =
719
let (s,pos,len) = Netbuffer.area_for_additions buf in
720
(`String s, pos, len)
721
method page_for_consumption =
722
let s = Netbuffer.unsafe_buffer buf in
723
(`String s, 0, Netbuffer.length buf)
730
let create_in_buffer ?(small_buffer=false) d0 =
731
let d = (d0 :> in_device) in
735
if device_supports_memory d then
736
mem_obj_buffer small_buffer
738
str_obj_buffer small_buffer in
746
method set_eof() = eof := true
748
method start_fill_e () =
749
assert(!fill_e_opt = None);
751
eps_e (`Done true) esys
753
let (ms,pos,len) = buf # page_for_additions in
760
eps_e (`Done false) esys
763
| `Done flag -> `Done flag
764
| `Error End_of_file ->
765
eof := true; `Done true
766
| `Error error -> `Error error
767
| `Aborted -> `Aborted
769
fill_e_opt := Some e;
776
method shutdown_e () =
779
method inactivate() =
784
method event_system = esys
788
let in_buffer_length (b:in_buffer) =
791
let in_buffer_blit (b:in_buffer) bpos ms mspos len =
792
b#buffer#blit_out bpos ms mspos len
794
let in_buffer_fill_e (b:in_buffer) =
795
match b#fill_e_opt with
796
| None -> b#start_fill_e ()
800
let create_out_buffer ?(small_buffer=false) ~max d0 =
801
let d = (d0 :> out_device) in
805
if device_supports_memory d then
806
mem_obj_buffer small_buffer
808
str_obj_buffer small_buffer in
816
let (ms,pos,len) = buf # page_for_consumption in
817
let len' = min len n in
818
output_e d ms pos len' ++
825
eps_e (`Done ()) esys in
832
method start_flush_e() =
833
assert (!flush_e_opt = None);
836
>> (fun st -> flush_e_opt := None; st) in
837
flush_e_opt := Some e;
840
method flush_e_opt = !flush_e_opt
842
method write_eof_e () =
843
if buf#length = 0 then
847
(`Error (Failure "Uq_io: called write_eof_e with non-empty buffer"))
850
method shutdown_e linger =
853
method inactivate () =
857
method udevice = Some d
858
method event_system = esys
862
let copy_e ?(small_buffer=false) ?len ?len64 d_in d_out =
863
let d_in_esys = device_esys d_in in
864
let d_out_esys = device_esys d_out in
865
if d_in_esys <> d_out_esys then
866
invalid_arg "Uq_io.copy_e: devices must use the same event system";
867
let esys = d_in_esys in
869
let ms, ms_len, free_ms =
870
if device_supports_memory d_in && device_supports_memory d_out then (
872
Netsys_mem.pool_alloc_memory2
873
(if small_buffer then Netsys_mem.small_pool
874
else Netsys_mem.default_pool) in
875
(`Memory m, Bigarray.Array1.dim m, f)
878
let s = String.create (if small_buffer then 4096 else 65536) in
879
(`String s, String.length s, (fun () -> ()))
881
(* Note that calling free_ms only accelerates that ms is recognized
882
as free after the copy is done. It is not necessary to call it.
885
let rec push_data p n =
887
eps_e (`Done ()) esys
889
output_e d_out ms p n ++ (fun k -> push_data (p+k) (n-k)) in
891
let count = ref 0L in
893
match len, len64 with
895
| Some n, None -> Some(Int64.of_int n)
896
| None, Some n -> Some n
897
| Some n1, Some n2 -> Some(min (Int64.of_int n1) n2) in
899
let rec pull_data() =
905
Int64.to_int( min (Int64.of_int ms_len) (Int64.sub l !count)) in
907
let ( >> ) = Uq_engines.fmap_engine in
908
(* For a strange reason we need this - somewhere a generalization is
914
eps_e (`Done !count) esys
917
( input_e d_in ms 0 n
919
| `Done n -> `Done(`Good n)
920
| `Error End_of_file -> `Done `Eof
921
| `Error error -> free_ms(); `Error error
922
| `Aborted -> free_ms(); `Aborted
924
: [`Good of int | `Eof] Uq_engines.engine
928
count := Int64.add !count (Int64.of_int n);
929
push_data 0 n ++ (fun () -> pull_data())
932
eps_e (`Done !count) esys
939
| `Done x -> `Done(Some x)
940
| `Error End_of_file -> `Done None
941
| `Error e -> `Error e
942
| `Aborted -> `Aborted
945
let filter_out_buffer ~max (p : Netchannels.io_obj_channel) d0 : out_buffer =
946
let small_buffer = true in
947
let d = (d0 :> out_device) in
950
let buf = str_obj_buffer small_buffer in
956
let rec do_flush_e() =
958
if buf#length > 0 then (
960
(* First copy everything from buf to p: *)
961
let (ms,pos,len) = buf # page_for_consumption in
965
| `Memory _ -> assert false in
967
let n = p # output s pos len in
970
(* Copy from p to d: *)
972
`Async_in(new Uq_engines.pseudo_async_in_channel p, esys) in
975
| `Done _ -> `Done ()
976
| `Error Netchannels.Buffer_underrun -> `Done ()
977
| `Error err -> `Error err
978
| `Aborted -> `Aborted
988
`Async_in(new Uq_engines.pseudo_async_in_channel p, esys) in
994
| `Error err -> `Error err
995
| `Aborted -> `Aborted
1008
method start_flush_e() =
1009
assert (!flush_e_opt = None);
1013
flush_e_opt := None;
1016
flush_e_opt := Some e;
1019
method flush_e_opt =
1020
match !flush_e_opt with
1023
assert(match e#state with
1029
method write_eof_e () =
1031
flush_e (`Buffer_out self)
1036
method shutdown_e linger =
1038
flush_e (`Buffer_out self)
1040
shutdown_e ?linger d
1043
method inactivate () =
1047
method udevice = None
1048
(* It is not allowed to bypass this buffer *)
1049
method event_system = esys