~ubuntu-branches/ubuntu/trusty/ocamlnet/trusty

« back to all changes in this revision

Viewing changes to src/equeue/uq_io.ml

  • Committer: Bazaar Package Importer
  • Author(s): Stéphane Glondu
  • Date: 2011-09-02 14:12:33 UTC
  • mfrom: (18.2.3 sid)
  • Revision ID: james.westby@ubuntu.com-20110902141233-zbj0ygxb92u6gy4z
Tags: 3.4-1
* New upstream release
  - add a new NetcgiRequire directive to ease dependency management
    (Closes: #637147)
  - remove patches that were applied upstream:
    + Added-missing-shebang-lines-in-example-shell-scripts
    + Try-also-ocamlc-for-POSIX-threads

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
(* $Id: uq_io.ml 1665 2011-08-30 17:37:57Z gerd $ *)
 
2
 
 
3
open Uq_engines.Operators
 
4
open Printf
 
5
 
 
6
type string_like =
 
7
    [ `String of string
 
8
    | `Memory of Netsys_mem.memory
 
9
    ]
 
10
 
 
11
 
 
12
class type obj_buffer =
 
13
object
 
14
  method length : int
 
15
  method blit_out : int -> string_like -> int -> int -> unit
 
16
  method delete_hd : int -> unit
 
17
  method index_from : int -> char -> int
 
18
  method add : string_like -> int -> int -> unit
 
19
  method advance : int -> unit
 
20
  method page_for_additions : string_like * int * int
 
21
  method page_for_consumption : string_like * int * int
 
22
  method clear : unit -> unit
 
23
end
 
24
 
 
25
 
 
26
class type ['in_device] in_buffer_pre =
 
27
object
 
28
  method buffer : obj_buffer
 
29
  method eof : bool
 
30
  method set_eof : unit -> unit
 
31
  method start_fill_e : unit -> bool Uq_engines.engine
 
32
  method fill_e_opt : bool Uq_engines.engine option
 
33
    (* The current fill engine, or None *)
 
34
  method udevice : 'in_device
 
35
  method shutdown_e : unit -> unit Uq_engines.engine
 
36
  method inactivate : unit -> unit
 
37
  method event_system : Unixqueue.event_system
 
38
end
 
39
 
 
40
 
 
41
class type ['out_device] out_buffer_pre =
 
42
object
 
43
  method buffer : obj_buffer
 
44
  method eof : bool
 
45
  method max : int option
 
46
  method start_flush_e : unit -> unit Uq_engines.engine
 
47
  method flush_e_opt : unit Uq_engines.engine option
 
48
    (* The current flush engine, or None *)
 
49
  method write_eof_e : unit -> bool Uq_engines.engine
 
50
    (* The buffer must be empty before [write_eof_e] *)
 
51
  method udevice : 'out_device option
 
52
  method shutdown_e : float option -> unit Uq_engines.engine
 
53
  method inactivate : unit -> unit
 
54
  method event_system : Unixqueue.event_system
 
55
end
 
56
 
 
57
          
 
58
type in_device =
 
59
    [ `Polldescr of Netsys.fd_style * Unix.file_descr * Unixqueue.event_system
 
60
    | `Multiplex of Uq_engines.multiplex_controller
 
61
    | `Async_in of Uq_engines.async_in_channel * Unixqueue.event_system
 
62
    | `Buffer_in of in_device in_buffer_pre
 
63
    | `Count_in of (int -> unit) * in_device
 
64
    ]
 
65
 
 
66
 
 
67
type out_device =
 
68
    [ `Polldescr of Netsys.fd_style * Unix.file_descr * Unixqueue.event_system
 
69
    | `Multiplex of Uq_engines.multiplex_controller
 
70
    | `Async_out of Uq_engines.async_out_channel * Unixqueue.event_system
 
71
    | `Buffer_out of out_device out_buffer_pre
 
72
    | `Count_out of (int -> unit) * out_device
 
73
    ]
 
74
 
 
75
 
 
76
type in_buffer = in_device in_buffer_pre
 
77
type out_buffer = out_device out_buffer_pre
 
78
 
 
79
 
 
80
type in_bdevice =
 
81
    [ `Buffer_in of in_buffer ]
 
82
 
 
83
type inout_device = [ in_device | out_device ]
 
84
 
 
85
 
 
86
 
 
87
exception Line_too_long
 
88
 
 
89
 
 
90
let rec device_esys0 =
 
91
  function
 
92
    | `Polldescr(_,_,esys) -> esys
 
93
    | `Multiplex mplex -> mplex#event_system
 
94
    | `Async_in(_,esys) -> esys
 
95
    | `Async_out(_,esys) -> esys
 
96
    | `Buffer_in b -> b#event_system
 
97
    | `Buffer_out b -> b#event_system
 
98
    | `Count_in(_,d) -> device_esys0 (d :> inout_device)
 
99
    | `Count_out(_,d) -> device_esys0 (d :> inout_device)
 
100
 
 
101
 
 
102
let device_esys d =
 
103
  device_esys0 (d :> inout_device)
 
104
 
 
105
 
 
106
let is_string =
 
107
  function
 
108
    | `String _ -> true
 
109
    | `Memory _ -> false
 
110
 
 
111
let rec device_supports_memory0 =
 
112
  function
 
113
    | `Polldescr(style,_,_) -> 
 
114
        ( match style with
 
115
            | `Read_write | `Recv_send _ | `Recv_send_implied ->
 
116
                true
 
117
            | _ ->
 
118
                false
 
119
        )
 
120
    | `Multiplex mplex -> 
 
121
        mplex # mem_supported
 
122
    | `Async_in(_,esys) -> 
 
123
        false
 
124
    | `Async_out(_,esys) -> 
 
125
        false
 
126
    | `Buffer_in b -> 
 
127
        true
 
128
    | `Buffer_out b -> 
 
129
        true
 
130
    | `Count_in(_,d) -> device_supports_memory0 (d : in_device :> inout_device)
 
131
    | `Count_out(_,d) -> device_supports_memory0 (d : out_device :> inout_device)
 
132
 
 
133
let device_supports_memory d =
 
134
  device_supports_memory0 (d :> inout_device)
 
135
 
 
136
 
 
137
let mem_gread style fd m pos len =
 
138
  match style with
 
139
    | `Read_write ->
 
140
        Netsys_mem.mem_read fd m pos len
 
141
    | `Recv_send _ | `Recv_send_implied ->
 
142
        Netsys_mem.mem_recv fd m pos len []
 
143
    | _ ->
 
144
        failwith ("Uq_io: This fd style does not support `Memory: " ^ 
 
145
                    Netsys.string_of_fd_style style)
 
146
 
 
147
 
 
148
let mem_gwrite style fd m pos len =
 
149
  match style with
 
150
    | `Read_write ->
 
151
        Netsys_mem.mem_write fd m pos len
 
152
    | `Recv_send _ | `Recv_send_implied ->
 
153
        Netsys_mem.mem_send fd m pos len []
 
154
    | _ ->
 
155
        failwith ("Uq_io: This fd style does not support `Memory: " ^ 
 
156
                    Netsys.string_of_fd_style style)
 
157
 
 
158
 
 
159
let ach_input_e ch esys s pos len =
 
160
  (* case: async channel *)
 
161
 
 
162
  let (e, signal) = Uq_engines.signal_engine esys in
 
163
 
 
164
  let rec wait_for_input () =
 
165
    try
 
166
      let n = ch # input s pos len in
 
167
      if n > 0 || len = 0 then
 
168
        signal (`Done n)
 
169
      else (
 
170
        ch # request_notification
 
171
          (fun () ->
 
172
             wait_for_input();
 
173
             false
 
174
          )
 
175
      )
 
176
    with
 
177
      | error -> signal (`Error error)
 
178
  in
 
179
 
 
180
  wait_for_input();
 
181
  e
 
182
 
 
183
 
 
184
let rec buf_input_e b ms pos len =
 
185
  let bl = b#buffer#length in
 
186
  if bl > 0 || len = 0 then (
 
187
    let n = min len bl in
 
188
    b#buffer#blit_out 0 ms pos n;
 
189
    b#buffer#delete_hd n;
 
190
    eps_e (`Done n) b#event_system
 
191
  )
 
192
  else if b#eof then
 
193
    eps_e (`Error End_of_file) b#event_system
 
194
  else (
 
195
    (* Optimization: if len is quite large, bypass the buffer *)
 
196
    let d = b#udevice in
 
197
    if len >= 4096 && (device_supports_memory d || is_string ms) then
 
198
      dev_input_e d ms pos len
 
199
      >> (function
 
200
            | `Error End_of_file -> 
 
201
                b#set_eof(); `Error End_of_file
 
202
            | st -> st
 
203
         )
 
204
    else
 
205
      let fe =
 
206
        match b#fill_e_opt with
 
207
          | None -> b#start_fill_e ()
 
208
          | Some fe -> fe in
 
209
      fe ++ (fun _ -> buf_input_e b ms pos len)
 
210
  )
 
211
 
 
212
 
 
213
and dev_input_e (d : in_device) ms pos len =
 
214
  match d with
 
215
    | `Polldescr(style, fd, esys) ->
 
216
        new Uq_engines.input_engine
 
217
          (fun fd -> 
 
218
             match ms with
 
219
               | `String s ->
 
220
                   let n = Netsys.gread style fd s pos len in
 
221
                   if len > 0 && n = 0 then raise End_of_file;
 
222
                   n
 
223
               | `Memory m ->
 
224
                   let n = mem_gread style fd m pos len in
 
225
                   if len > 0 && n = 0 then raise End_of_file;
 
226
                   n
 
227
          )
 
228
          fd (-1.0) esys
 
229
 
 
230
    | `Multiplex mplex ->
 
231
        let (e, signal) = Uq_engines.signal_engine mplex#event_system in
 
232
        let cancel() =
 
233
          if mplex#reading then mplex # cancel_reading() in
 
234
        ( match ms with
 
235
            | `String s ->
 
236
                mplex # start_reading
 
237
                  ~when_done:(fun xopt n ->
 
238
                                match xopt with
 
239
                                  | None -> 
 
240
                                      signal (`Done n)
 
241
                                  | Some Uq_engines.Cancelled ->
 
242
                                      cancel(); signal `Aborted
 
243
                                  | Some err -> signal (`Error err)
 
244
                             )
 
245
                  s pos len;
 
246
            | `Memory m ->
 
247
                if mplex#mem_supported then
 
248
                  mplex # start_mem_reading
 
249
                    ~when_done:(fun xopt n ->
 
250
                                  match xopt with
 
251
                                    | None -> 
 
252
                                        signal (`Done n)
 
253
                                    | Some Uq_engines.Cancelled ->
 
254
                                        cancel(); signal `Aborted
 
255
                                    | Some err -> signal (`Error err)
 
256
                               )
 
257
                    m pos len
 
258
                else
 
259
                  signal
 
260
                    (`Error
 
261
                       (Failure "Uq_io: This mplex does not support `Memory"));
 
262
        );
 
263
        e >> (function
 
264
                | `Done n -> `Done n
 
265
                | `Error e -> `Error e 
 
266
                | `Aborted -> cancel(); `Aborted
 
267
             )
 
268
                    
 
269
 
 
270
    | `Async_in (ch,esys) ->
 
271
        ( match ms with
 
272
            | `String s ->
 
273
                ach_input_e ch esys s pos len
 
274
            | `Memory m ->
 
275
                eps_e
 
276
                  (`Error
 
277
                     (Failure "Uq_io: async channels do not support `Memory"))
 
278
                  esys
 
279
        )
 
280
        
 
281
    | `Buffer_in b ->
 
282
        buf_input_e b ms pos len
 
283
 
 
284
    | `Count_in(c,d) ->
 
285
        dev_input_e d ms pos len 
 
286
        >> (function
 
287
              | `Done n -> c n; `Done n
 
288
              | st -> st
 
289
           )
 
290
 
 
291
let input_e d0 ms pos len =
 
292
  let d = (d0 :> in_device) in
 
293
  dev_input_e d ms pos len
 
294
 
 
295
let rec really_input_e d ms pos len =
 
296
  if len = 0 then
 
297
    eps_e (`Done ()) (device_esys d)
 
298
  else
 
299
    input_e d ms pos len ++ 
 
300
      (fun n -> really_input_e d ms (pos+n) (len-n))
 
301
 
 
302
 
 
303
let input_line_e ?(max_len = Sys.max_string_length) (`Buffer_in b) =
 
304
  let consume k1 k2 =
 
305
    if k2 > max_len then raise Line_too_long;
 
306
    let s = String.create k1 in
 
307
    b#buffer#blit_out 0 (`String s) 0 k1;
 
308
    b#buffer#delete_hd k2;
 
309
    s in
 
310
  let rec look_ahead eof =
 
311
    try
 
312
      let k = b#buffer#index_from 0 '\n' in
 
313
      let s = consume k (k+1) in
 
314
      eps_e (`Done s) b#event_system
 
315
    with
 
316
      | Not_found ->
 
317
          if eof then (
 
318
            let n = b#buffer#length in
 
319
            if n = 0 then
 
320
              eps_e (`Error End_of_file) b#event_system
 
321
            else (
 
322
              let s = consume n n in
 
323
              eps_e (`Done s) b#event_system
 
324
            )
 
325
          )
 
326
          else (
 
327
            assert(not b#eof);
 
328
            if b#buffer#length > max_len then
 
329
              eps_e (`Error Line_too_long) b#event_system
 
330
            else
 
331
              let fe =
 
332
                match b#fill_e_opt with
 
333
                  | None -> b#start_fill_e ()
 
334
                  | Some fe -> fe in
 
335
              fe ++ look_ahead
 
336
          )
 
337
      | Line_too_long ->
 
338
           eps_e (`Error Line_too_long) b#event_system
 
339
  in
 
340
  look_ahead b#eof
 
341
 
 
342
 
 
343
exception Cont of (unit -> string list Uq_engines.engine)
 
344
 
 
345
let input_lines_e ?(max_len = Sys.max_string_length) (`Buffer_in b) =
 
346
  let copy_string i l =
 
347
    let s = String.create l in
 
348
    b#buffer#blit_out i (`String s) 0 l;
 
349
    s in
 
350
  let consume k =
 
351
    b#buffer#delete_hd k in
 
352
  let rec look_ahead i acc eof =
 
353
    try
 
354
      let k = b#buffer#index_from i '\n' in
 
355
      if k-i+1 > max_len then raise Line_too_long;
 
356
      let s = copy_string i (k-i) in
 
357
      raise(Cont(fun () -> look_ahead (k+1) (s::acc) eof))
 
358
    with
 
359
      | Not_found ->
 
360
          if eof then (
 
361
            let n = b#buffer#length in
 
362
            if n = 0 then (
 
363
              assert(acc = []);
 
364
              eps_e (`Error End_of_file) b#event_system
 
365
            )
 
366
            else (
 
367
              let s = copy_string i (n-i) in
 
368
              if n-i > max_len then raise Line_too_long;
 
369
              consume n;
 
370
              eps_e (`Done (List.rev (s :: acc))) b#event_system
 
371
            )
 
372
          )
 
373
          else (
 
374
            assert(not b#eof);
 
375
            if acc <> [] then (
 
376
              consume i;
 
377
              eps_e (`Done (List.rev acc)) b#event_system
 
378
            ) else (
 
379
              assert(i = 0);
 
380
              if b#buffer#length > max_len then raise Line_too_long;
 
381
              let fe =
 
382
                match b#fill_e_opt with
 
383
                  | None -> b#start_fill_e ()
 
384
                  | Some fe -> fe in
 
385
              fe ++ (look_ahead 0 [])
 
386
            )
 
387
          )
 
388
      | Line_too_long ->
 
389
           eps_e (`Error Line_too_long) b#event_system
 
390
      | Cont f ->  (* make the recursion tail-recursive *)
 
391
          f ()
 
392
  in
 
393
  look_ahead 0 [] b#eof
 
394
 
 
395
 
 
396
let ach_output_e ch esys s pos len =
 
397
  (* case: async channel *)
 
398
 
 
399
  let (e, signal) = Uq_engines.signal_engine esys in
 
400
 
 
401
  let rec wait_for_output () =
 
402
    try
 
403
      let n = ch # output s pos len in
 
404
      if n > 0 || len = 0 then
 
405
        signal (`Done n)
 
406
      else (
 
407
        ch # request_notification
 
408
          (fun () ->
 
409
             wait_for_output();
 
410
             false
 
411
          )
 
412
      )
 
413
    with
 
414
      | error -> signal (`Error error)
 
415
  in
 
416
 
 
417
  wait_for_output();
 
418
  e
 
419
 
 
420
 
 
421
let rec buf_output_e b ms pos len =
 
422
  if b # eof then
 
423
    eps_e
 
424
      (`Error (Failure "Uq_io: Buffer already closed for new data"))
 
425
      b#event_system
 
426
  else (
 
427
    let bl = b#buffer#length in
 
428
    (* Optimization: if len is large, try to bypass the buffer *)
 
429
    match b#udevice with
 
430
      | Some d when (
 
431
          bl=0 && len >= 4096 && (device_supports_memory d || is_string ms)
 
432
        ) ->
 
433
          dev_output_e d ms pos len
 
434
      | _ ->
 
435
          let n =
 
436
            match b # max with
 
437
              | None -> len
 
438
              | Some m -> max (min len (m - bl)) 0 in
 
439
          if n > 0 || len = 0 then (
 
440
            b#buffer#add ms pos n;
 
441
            eps_e (`Done n) b#event_system
 
442
          )
 
443
          else (
 
444
            let fe =
 
445
              match b#flush_e_opt with
 
446
                | None -> b#start_flush_e ()
 
447
                | Some fe -> fe in
 
448
            fe ++ (fun _ -> buf_output_e b ms pos len)
 
449
          )
 
450
  )
 
451
    
 
452
 
 
453
and dev_output_e (d : out_device) ms pos len =
 
454
  match d with
 
455
    | `Polldescr(style, fd, esys) ->
 
456
        new Uq_engines.output_engine
 
457
          (fun fd -> 
 
458
             match ms with
 
459
               | `String s ->
 
460
                   Netsys.gwrite style fd s pos len
 
461
               | `Memory m ->
 
462
                   mem_gwrite style fd m pos len
 
463
          )
 
464
          fd (-1.0) esys
 
465
 
 
466
    | `Multiplex mplex ->
 
467
        let (e, signal) = Uq_engines.signal_engine mplex#event_system in
 
468
        let cancel() =
 
469
          if mplex#writing then mplex # cancel_writing() in
 
470
        ( match ms with
 
471
            | `String s ->
 
472
                mplex # start_writing
 
473
                  ~when_done:(fun xopt n ->
 
474
                                match xopt with
 
475
                                  | None -> signal (`Done n)
 
476
                                  | Some Uq_engines.Cancelled ->
 
477
                                      cancel(); signal `Aborted
 
478
                                  | Some err -> signal (`Error err)
 
479
                             )
 
480
                  s pos len;
 
481
            | `Memory m ->
 
482
                if mplex#mem_supported then
 
483
                  mplex # start_mem_writing
 
484
                    ~when_done:(fun xopt n ->
 
485
                                  match xopt with
 
486
                                    | None -> signal (`Done n)
 
487
                                    | Some Uq_engines.Cancelled ->
 
488
                                        cancel(); signal `Aborted
 
489
                                    | Some err -> signal (`Error err)
 
490
                               )
 
491
                    m pos len
 
492
                else
 
493
                  signal
 
494
                    (`Error
 
495
                       (Failure "Uq_io: This mplex does not support `Memory"));
 
496
        );
 
497
        e >> (function
 
498
                | `Done n -> `Done n
 
499
                | `Error e -> `Error e 
 
500
                | `Aborted -> cancel(); `Aborted
 
501
             )
 
502
 
 
503
    | `Async_out (ch,esys) ->
 
504
        ( match ms with
 
505
            | `String s ->
 
506
                ach_output_e ch esys s pos len
 
507
            | `Memory m ->
 
508
                eps_e
 
509
                  (`Error
 
510
                     (Failure "Uq_io: async channels do not support `Memory"))
 
511
                  esys
 
512
        )
 
513
        
 
514
    | `Buffer_out b ->
 
515
        buf_output_e b ms pos len
 
516
 
 
517
    | `Count_out(c,d) ->
 
518
        dev_output_e d ms pos len 
 
519
        >> (function
 
520
              | `Done n -> c n; `Done n
 
521
              | st -> st
 
522
           )
 
523
 
 
524
let output_e d ms pos len =
 
525
  dev_output_e (d :> out_device) ms pos len
 
526
 
 
527
 
 
528
let rec really_output_e d ms pos len =
 
529
  if len = 0 then
 
530
    eps_e (`Done ()) (device_esys d)
 
531
  else
 
532
    output_e d ms pos len ++ 
 
533
      (fun n -> really_output_e d ms (pos+n) (len-n))
 
534
 
 
535
let output_string_e d s =
 
536
  really_output_e d (`String s) 0 (String.length s)
 
537
 
 
538
let output_memory_e d m =
 
539
  really_output_e d (`Memory m) 0 (Bigarray.Array1.dim m)
 
540
 
 
541
let output_netbuffer_e d b =
 
542
  let s = Netbuffer.unsafe_buffer b in
 
543
  really_output_e d (`String s) 0 (Netbuffer.length b)
 
544
 
 
545
let flush_e d =
 
546
  match (d :> out_device) with
 
547
    | `Buffer_out b ->
 
548
        ( match b#flush_e_opt with
 
549
            | None -> b#start_flush_e ()
 
550
            | Some fe -> fe
 
551
        )
 
552
    | _ ->
 
553
        eps_e (`Done()) (device_esys d)
 
554
 
 
555
let rec write_eof0_e d =
 
556
  match d with
 
557
    | `Polldescr(style, fd, esys) ->
 
558
        eps_e (`Done false) esys
 
559
    | `Multiplex mplex ->
 
560
        let (e, signal) = Uq_engines.signal_engine mplex#event_system in
 
561
        let cancel() =
 
562
          if mplex#writing then mplex # cancel_writing() in
 
563
        if mplex # supports_half_open_connection then
 
564
          mplex # start_writing_eof 
 
565
            ~when_done:(fun xopt ->
 
566
                          match xopt with
 
567
                            | None -> signal (`Done true)
 
568
                            | Some Uq_engines.Cancelled ->
 
569
                                cancel(); signal `Aborted
 
570
                            | Some error -> signal (`Error error)
 
571
                       )
 
572
            ()
 
573
        else
 
574
          signal (`Done false);
 
575
        e >> (function
 
576
                | `Done n -> `Done n
 
577
                | `Error e -> `Error e 
 
578
                | `Aborted -> cancel(); `Aborted
 
579
             )
 
580
    | `Async_out (ch,esys) ->
 
581
        eps_e (`Done false) esys
 
582
    | `Buffer_out b ->
 
583
        flush_e d ++
 
584
          (fun () -> b#write_eof_e())
 
585
    | `Count_out(_,d) ->
 
586
        write_eof0_e d
 
587
 
 
588
let write_eof_e d =
 
589
  write_eof0_e (d :> out_device)
 
590
 
 
591
 
 
592
 
 
593
let rec shutdown0_e ?linger d =
 
594
  match d with
 
595
    | `Polldescr(style, fd, esys) ->
 
596
        Netsys.gclose style fd;
 
597
        eps_e (`Done()) esys
 
598
    | `Multiplex mplex ->
 
599
        if mplex#reading then
 
600
          mplex#cancel_reading();
 
601
        if mplex#writing then
 
602
          mplex#cancel_writing();
 
603
        let (e, signal) = Uq_engines.signal_engine mplex#event_system in
 
604
        let cancel() =
 
605
          if not mplex#shutting_down then mplex # cancel_shutting_down() in
 
606
        mplex # start_shutting_down
 
607
          ?linger
 
608
          ~when_done:(fun xopt ->
 
609
                        match xopt with
 
610
                          | None ->
 
611
                              mplex#inactivate();
 
612
                              signal (`Done())
 
613
                          | Some Uq_engines.Cancelled ->
 
614
                              cancel(); signal `Aborted
 
615
                          | Some error ->
 
616
                              signal (`Error error)
 
617
                     )
 
618
          ();
 
619
        e >> (function
 
620
                | `Done n -> `Done n
 
621
                | `Error e -> `Error e 
 
622
                | `Aborted -> cancel(); `Aborted
 
623
             )
 
624
    | `Async_in (ch,esys) ->
 
625
        ch # close_in();
 
626
        eps_e (`Done()) esys
 
627
    | `Async_out (ch,esys) ->
 
628
        ch # close_out();
 
629
        eps_e (`Done()) esys
 
630
    | `Buffer_in b ->
 
631
        b # shutdown_e ()
 
632
    | `Buffer_out b ->
 
633
        flush_e (`Buffer_out b) ++ (fun _ -> b # shutdown_e linger)
 
634
    | `Count_in(_,d) ->
 
635
        shutdown0_e ?linger (d :> [in_device | out_device])
 
636
    | `Count_out(_,d) ->
 
637
        shutdown0_e ?linger (d :> [in_device | out_device])
 
638
 
 
639
let shutdown_e ?linger d =
 
640
  shutdown0_e ?linger (d :> [in_device | out_device])
 
641
 
 
642
let rec inactivate0 d =
 
643
  match d with
 
644
    | `Polldescr(style, fd, esys) ->
 
645
        Netsys.gclose style fd
 
646
    | `Multiplex mplex ->
 
647
        mplex#inactivate()
 
648
    | `Async_in (ch,esys) ->
 
649
        ch # close_in()
 
650
    | `Async_out (ch,esys) ->
 
651
        ch # close_out()
 
652
    | `Buffer_in b ->
 
653
        b # inactivate ()
 
654
    | `Buffer_out b ->
 
655
        b # inactivate ()
 
656
    | `Count_in(_,d) ->
 
657
        inactivate0 (d :> inout_device)
 
658
    | `Count_out(_,d) ->
 
659
        inactivate0 (d :> inout_device)
 
660
 
 
661
let inactivate d =
 
662
  inactivate0 (d :> inout_device)
 
663
 
 
664
let mem_obj_buffer small_buffer =
 
665
  let psize = 
 
666
    if small_buffer then 
 
667
      Netsys_mem.small_block_size else Netsys_mem.default_block_size in
 
668
  let buf = 
 
669
    Netpagebuffer.create psize in
 
670
  ( object
 
671
      method length = Netpagebuffer.length buf
 
672
      method blit_out bpos ms pos len =
 
673
        match ms with
 
674
          | `String s -> Netpagebuffer.blit_to_string buf bpos s pos len
 
675
          | `Memory m -> Netpagebuffer.blit_to_memory buf bpos m pos len
 
676
      method delete_hd n =
 
677
        Netpagebuffer.delete_hd buf n
 
678
      method index_from pos c =
 
679
        Netpagebuffer.index_from buf pos c
 
680
      method add ms pos len =
 
681
        match ms with
 
682
          | `String s -> Netpagebuffer.add_sub_string buf s pos len
 
683
          | `Memory m -> Netpagebuffer.add_sub_memory buf m pos len
 
684
      method advance n =
 
685
        Netpagebuffer.advance buf n
 
686
      method page_for_additions =
 
687
        let (m,pos,len) = Netpagebuffer.page_for_additions buf in
 
688
        (`Memory m, pos, len)
 
689
      method page_for_consumption =
 
690
        let (m,pos,len) = Netpagebuffer.page_for_consumption buf in
 
691
        (`Memory m, pos, len)
 
692
      method clear() =
 
693
        Netpagebuffer.clear buf
 
694
    end
 
695
  )
 
696
 
 
697
let str_obj_buffer small_buffer =
 
698
  let bufsize = 
 
699
    if small_buffer then 4096 else 65536 in
 
700
  let buf =
 
701
    Netbuffer.create bufsize in
 
702
  ( object
 
703
      method length = Netbuffer.length buf
 
704
      method blit_out bpos ms pos len =
 
705
        match ms with
 
706
          | `String s -> Netbuffer.blit_to_string buf bpos s pos len
 
707
          | `Memory m -> Netbuffer.blit_to_memory buf bpos m pos len
 
708
      method delete_hd n =
 
709
        Netbuffer.delete buf 0 n
 
710
      method index_from pos c =
 
711
        Netbuffer.index_from buf pos c
 
712
      method add ms pos len =
 
713
        match ms with
 
714
          | `String s -> Netbuffer.add_sub_string buf s pos len
 
715
          | `Memory m -> Netbuffer.add_sub_memory buf m pos len
 
716
      method advance n =
 
717
        Netbuffer.advance buf n
 
718
      method page_for_additions =
 
719
        let (s,pos,len) = Netbuffer.area_for_additions buf in
 
720
        (`String s, pos, len)
 
721
      method page_for_consumption =
 
722
        let s = Netbuffer.unsafe_buffer buf in
 
723
        (`String s, 0, Netbuffer.length buf)
 
724
      method clear() =
 
725
        Netbuffer.clear buf
 
726
    end
 
727
  )
 
728
    
 
729
 
 
730
let create_in_buffer ?(small_buffer=false) d0 =
 
731
  let d = (d0 :> in_device) in
 
732
  let esys =
 
733
    device_esys d in
 
734
  let buf =
 
735
    if device_supports_memory d then
 
736
      mem_obj_buffer small_buffer
 
737
    else
 
738
      str_obj_buffer small_buffer in
 
739
  let eof = 
 
740
    ref false in
 
741
  let fill_e_opt =
 
742
    ref None in
 
743
object
 
744
  method buffer = buf
 
745
  method eof = !eof
 
746
  method set_eof() = eof := true
 
747
 
 
748
  method start_fill_e () =
 
749
    assert(!fill_e_opt = None);
 
750
    if !eof then
 
751
      eps_e (`Done true) esys
 
752
    else (
 
753
      let (ms,pos,len) = buf # page_for_additions in
 
754
      let e =
 
755
        input_e d ms pos len
 
756
        ++ (fun n ->
 
757
              assert(n > 0);
 
758
              buf # advance n;
 
759
              fill_e_opt := None;
 
760
              eps_e (`Done false) esys
 
761
           )
 
762
        >> (function
 
763
              | `Done flag -> `Done flag
 
764
              | `Error End_of_file -> 
 
765
                  eof := true; `Done true
 
766
              | `Error error -> `Error error
 
767
              | `Aborted -> `Aborted
 
768
           ) in
 
769
      fill_e_opt := Some e;
 
770
      e
 
771
    )
 
772
 
 
773
  method fill_e_opt =
 
774
    !fill_e_opt
 
775
 
 
776
  method shutdown_e () =
 
777
    shutdown_e d
 
778
 
 
779
  method inactivate() =
 
780
    buf#clear();
 
781
    inactivate d
 
782
 
 
783
  method udevice = d
 
784
  method event_system = esys
 
785
end
 
786
 
 
787
 
 
788
let in_buffer_length (b:in_buffer) =
 
789
  b#buffer#length
 
790
 
 
791
let in_buffer_blit (b:in_buffer) bpos ms mspos len =
 
792
  b#buffer#blit_out bpos ms mspos len
 
793
 
 
794
let in_buffer_fill_e (b:in_buffer)  =
 
795
  match b#fill_e_opt with
 
796
    | None -> b#start_fill_e ()
 
797
    | Some fe -> fe
 
798
 
 
799
 
 
800
let create_out_buffer ?(small_buffer=false) ~max d0 =
 
801
  let d = (d0 :> out_device) in
 
802
  let esys =
 
803
    device_esys d in
 
804
  let buf =
 
805
    if device_supports_memory d then
 
806
      mem_obj_buffer small_buffer
 
807
    else
 
808
      str_obj_buffer small_buffer in
 
809
  let eof = 
 
810
    ref false in
 
811
  let flush_e_opt =
 
812
    ref None in
 
813
 
 
814
  let rec flush_e n =
 
815
    if n > 0 then (
 
816
      let (ms,pos,len) = buf # page_for_consumption in
 
817
      let len' = min len n in
 
818
      output_e d ms pos len' ++
 
819
        (fun k ->
 
820
           buf # delete_hd k;
 
821
           flush_e (n-k)
 
822
        )
 
823
    )
 
824
    else
 
825
      eps_e (`Done ()) esys in
 
826
 
 
827
 object
 
828
  method buffer = buf
 
829
  method eof = !eof
 
830
  method max = max
 
831
 
 
832
  method start_flush_e() =
 
833
    assert (!flush_e_opt = None);
 
834
    let e = 
 
835
      flush_e (buf#length)
 
836
      >> (fun st -> flush_e_opt := None; st) in
 
837
    flush_e_opt := Some e;
 
838
    e
 
839
 
 
840
  method flush_e_opt = !flush_e_opt
 
841
 
 
842
  method write_eof_e () =
 
843
    if buf#length = 0 then
 
844
      write_eof_e d
 
845
    else
 
846
      eps_e 
 
847
        (`Error (Failure "Uq_io: called write_eof_e with non-empty buffer"))
 
848
        esys
 
849
 
 
850
  method shutdown_e linger =
 
851
    shutdown_e ?linger d
 
852
    
 
853
  method inactivate () =
 
854
    buf#clear();
 
855
    inactivate d
 
856
 
 
857
  method udevice = Some d
 
858
  method event_system = esys
 
859
end
 
860
 
 
861
 
 
862
let copy_e ?(small_buffer=false) ?len ?len64 d_in d_out =
 
863
  let d_in_esys = device_esys d_in in
 
864
  let d_out_esys = device_esys d_out in
 
865
  if d_in_esys <> d_out_esys then
 
866
    invalid_arg "Uq_io.copy_e: devices must use the same event system";
 
867
  let esys = d_in_esys in
 
868
 
 
869
  let ms, ms_len, free_ms =
 
870
    if device_supports_memory d_in && device_supports_memory d_out then (
 
871
      let m, f = 
 
872
        Netsys_mem.pool_alloc_memory2 
 
873
          (if small_buffer then Netsys_mem.small_pool 
 
874
           else Netsys_mem.default_pool) in
 
875
      (`Memory m, Bigarray.Array1.dim m, f)
 
876
    )
 
877
    else (
 
878
      let s = String.create (if small_buffer then 4096 else 65536) in
 
879
      (`String s, String.length s, (fun () -> ()))
 
880
    ) in
 
881
  (* Note that calling free_ms only accelerates that ms is recognized
 
882
     as free after the copy is done. It is not necessary to call it.
 
883
   *)
 
884
 
 
885
  let rec push_data p n =
 
886
    if n = 0 then
 
887
      eps_e (`Done ()) esys
 
888
    else
 
889
      output_e d_out ms p n ++ (fun k -> push_data (p+k) (n-k)) in
 
890
 
 
891
  let count = ref 0L in
 
892
  let eff_len =
 
893
    match len, len64 with
 
894
      | None, None -> None
 
895
      | Some n, None -> Some(Int64.of_int n)
 
896
      | None, Some n -> Some n
 
897
      | Some n1, Some n2 -> Some(min (Int64.of_int n1) n2) in
 
898
 
 
899
  let rec pull_data() =
 
900
    let n =
 
901
      match eff_len with
 
902
        | None -> 
 
903
            ms_len
 
904
        | Some l -> 
 
905
            Int64.to_int( min (Int64.of_int ms_len) (Int64.sub l !count)) in
 
906
 
 
907
    let ( >> ) = Uq_engines.fmap_engine in
 
908
    (* For a strange reason we need this - somewhere a generalization is
 
909
       missing
 
910
     *)
 
911
 
 
912
    if n=0 then (
 
913
      free_ms();
 
914
      eps_e (`Done !count) esys
 
915
    )
 
916
    else
 
917
      ( input_e d_in ms 0 n
 
918
        >> (function
 
919
              | `Done n -> `Done(`Good n)
 
920
              | `Error End_of_file -> `Done `Eof
 
921
              | `Error error -> free_ms(); `Error error
 
922
              | `Aborted -> free_ms(); `Aborted
 
923
           )
 
924
        : [`Good of int | `Eof] Uq_engines.engine
 
925
      ) ++
 
926
        (function
 
927
           | `Good n ->
 
928
               count := Int64.add !count (Int64.of_int n);
 
929
               push_data 0 n ++ (fun () -> pull_data())
 
930
           | `Eof ->
 
931
               free_ms();
 
932
               eps_e (`Done !count) esys
 
933
        ) in
 
934
  pull_data()
 
935
 
 
936
  
 
937
let eof_as_none =
 
938
  function
 
939
    | `Done x -> `Done(Some x)
 
940
    | `Error End_of_file -> `Done None
 
941
    | `Error e -> `Error e
 
942
    | `Aborted -> `Aborted
 
943
 
 
944
 
 
945
let filter_out_buffer ~max (p : Netchannels.io_obj_channel) d0 : out_buffer =
 
946
  let small_buffer = true in
 
947
  let d = (d0 :> out_device) in
 
948
  let esys =
 
949
    device_esys d in
 
950
  let buf = str_obj_buffer small_buffer in
 
951
  let eof = 
 
952
    ref false in
 
953
  let flush_e_opt =
 
954
    ref None in
 
955
 
 
956
  let rec do_flush_e() =
 
957
    let q = ref 0 in
 
958
    if buf#length > 0 then (
 
959
      assert(not !eof);
 
960
      (* First copy everything from buf to p: *)
 
961
      let (ms,pos,len) = buf # page_for_consumption in
 
962
      let s =
 
963
        match ms with
 
964
          | `String s -> s 
 
965
          | `Memory _ -> assert false in
 
966
      q := 1;
 
967
      let n = p # output s pos len in
 
968
      q := 2;
 
969
      buf # delete_hd n;
 
970
      (* Copy from p to d: *)
 
971
      let p_dev =
 
972
        `Async_in(new Uq_engines.pseudo_async_in_channel p, esys) in
 
973
      ( copy_e p_dev d
 
974
        >> (function
 
975
              | `Done _ -> `Done ()
 
976
              | `Error Netchannels.Buffer_underrun -> `Done ()
 
977
              | `Error err -> `Error err
 
978
              | `Aborted -> `Aborted
 
979
           )
 
980
      ) ++ do_flush_e
 
981
    )
 
982
    else 
 
983
      if !eof then (
 
984
        q := 3;
 
985
        p # close_out();
 
986
        q := 4;
 
987
        let p_dev =
 
988
          `Async_in(new Uq_engines.pseudo_async_in_channel p, esys) in
 
989
        copy_e p_dev d
 
990
        >> (fun st -> 
 
991
              p#close_in();
 
992
              match st with
 
993
                | `Done _ -> `Done()
 
994
                | `Error err -> `Error err
 
995
                | `Aborted -> `Aborted
 
996
           )
 
997
      )
 
998
      else (
 
999
        eps_e (`Done()) esys
 
1000
      ) 
 
1001
  in
 
1002
 
 
1003
object(self)
 
1004
  method buffer = buf
 
1005
  method eof = !eof
 
1006
  method max = max
 
1007
 
 
1008
  method start_flush_e() =
 
1009
    assert (!flush_e_opt = None);
 
1010
    let e = 
 
1011
      do_flush_e ()
 
1012
      >> (fun st ->
 
1013
            flush_e_opt := None; 
 
1014
            st
 
1015
         ) in
 
1016
    flush_e_opt := Some e;
 
1017
    e
 
1018
 
 
1019
  method flush_e_opt = 
 
1020
    match !flush_e_opt with
 
1021
      | None -> None
 
1022
      | Some e ->
 
1023
          assert(match e#state with
 
1024
                   | `Done _ -> false
 
1025
                   | _ -> true
 
1026
                );
 
1027
          Some e
 
1028
 
 
1029
  method write_eof_e () =
 
1030
    eof := true;
 
1031
    flush_e (`Buffer_out self)
 
1032
    ++ (fun () ->
 
1033
          write_eof_e d
 
1034
       )
 
1035
 
 
1036
  method shutdown_e linger =
 
1037
    eof := true;
 
1038
    flush_e (`Buffer_out self)
 
1039
    ++ (fun () ->
 
1040
          shutdown_e ?linger d
 
1041
       )
 
1042
    
 
1043
  method inactivate () =
 
1044
    p#close_in();
 
1045
    inactivate d
 
1046
 
 
1047
  method udevice = None
 
1048
    (* It is not allowed to bypass this buffer *)
 
1049
  method event_system = esys
 
1050
end