2
* ocsigen_stream.ml Copyright (C) 2005 Vincent Balat
3
* Laboratoire PPS - CNRS Universit� Paris Diderot
5
* This program is free software; you can redistribute it and/or modify
6
* it under the terms of the GNU Lesser General Public License as published by
7
* the Free Software Foundation, with linking exception;
8
* either version 2.1 of the License, or (at your option) any later version.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU Lesser General Public License for more details.
15
* You should have received a copy of the GNU Lesser General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
22
exception Interrupted of exn
24
exception Already_read
27
type 'a stream = 'a step Lwt.t Lazy.t
30
| Finished of 'a stream option (* If there is another stream following
31
(usefull for substreams) *)
32
| Cont of 'a * 'a stream (* Current buffer, what follows *)
35
{ mutable stream : 'a stream;
36
mutable in_use : bool;
37
mutable finalizer : unit -> unit Lwt.t }
41
None -> Lwt.return (Finished None)
42
| Some st -> Lwt.return (Finished (Some (Lazy.lazy_from_fun st)))
45
Lwt.return (Cont (stri, Lazy.lazy_from_fun f))
47
let make ?finalize:(g = fun () -> Lwt.return ()) f =
48
{ stream = Lazy.lazy_from_fun f; in_use = false; finalizer = g }
55
(fun () -> Lazy.force st.stream)
59
Cont (s, rem) -> st.stream <- rem; Cont (s, get_aux st)
62
st.stream <- lazy (Lwt.fail e);
63
Lwt.fail (Interrupted e)))
66
if st.in_use then raise Already_read;
70
(** read the stream until the end, without decoding *)
71
let rec consume_aux st =
74
| Cont (_, f) -> consume_aux f
75
| Finished (Some ss) -> consume_aux ss
76
| Finished None -> Lwt.return ()
79
let st' = st.stream in
80
st.stream <- lazy (Lwt.fail Cancelled);
87
let f = st.finalizer in
88
st.finalizer <- (fun () -> Lwt.return ());
90
st.stream <- lazy (Lwt.fail Finalized);
93
let add_finalizer st g =
94
let f = st.finalizer in
95
st.finalizer <- fun () -> f () >>= fun () -> g ()
103
exception Stream_too_small
104
exception Stream_error of string
105
exception String_too_large
107
(*XXX Quadratic!!! *)
108
let string_of_stream =
112
| Finished _ -> return ""
114
let l2 = l+String.length s in
115
if l2 > Ocsigen_config.get_netbuffersize ()
116
then fail String_too_large
119
(fun r -> return (s^r))
122
(*XXX Quadratic!!! *)
123
let string_of_streams =
124
let rec aux l = function
125
| Finished None -> return ""
126
| Finished (Some s) -> next s >>= fun r -> aux l r
128
let l2 = l+String.length s in
129
if l2 > Ocsigen_config.get_netbuffersize ()
130
then fail String_too_large
133
aux l2 r >>= fun r ->
137
let enlarge_stream = function
138
| Finished a -> fail Stream_too_small
140
let long = String.length s in
141
let max = Ocsigen_config.get_netbuffersize () in
143
then fail Ocsigen_lib.Input_is_too_large
147
| Finished _ -> fail Stream_too_small
149
let long2 = String.length r in
150
let long3=long+long2 in
153
then return (Cont (new_s, ff))
154
else let long4 = long3 - max in
155
cont (String.sub new_s 0 max)
157
Lwt.return (Cont (String.sub new_s max long4, ff)))
159
let rec stream_want s len =
160
(* returns a stream with at most len bytes read if possible *)
162
| Finished _ -> return s
163
| Cont (stri, f) -> if String.length stri >= len
166
(fun () -> enlarge_stream s >>= (fun r -> stream_want s len))
168
Stream_too_small -> return s
171
let current_buffer = function
172
| Finished _ -> raise Stream_too_small
175
let rec skip s k = match s with
176
| Finished _ -> raise Stream_too_small
178
let len = String.length s in
180
then return (Cont (String.sub s k (len - k), f))
181
else (enlarge_stream (Cont ("", f)) >>=
182
(fun s -> skip s (k - len)))
184
let substream delim s =
185
let ldelim = String.length delim in
186
if ldelim = 0 then fail (Stream_error "Empty delimiter")
188
let rdelim = Netstring_pcre.regexp_string delim in
191
| Finished _ -> fail Stream_too_small
192
| Cont (s, f) as stre ->
193
let len = String.length s in
195
then enlarge_stream stre >>= aux
197
let p,_ = Netstring_pcre.search_forward rdelim s 0 in
198
cont (String.sub s 0 p)
201
(Some (fun () -> Lwt.return (Cont (String.sub s p (len - p),
204
let pos = (len + 1 - ldelim) in
205
cont (String.sub s 0 pos)
206
(fun () -> next f >>=
208
Finished _ -> fail Stream_too_small
211
(Cont (String.sub s pos (len - pos) ^ s',
216
(*****************************************************************************)
218
(*VVV Is it the good place for this? *)
220
let of_file filename =
221
let fd = Lwt_unix.of_unix_file_descr
222
(Unix.openfile filename [Unix.O_RDONLY;Unix.O_NONBLOCK] 0o666)
224
let ch = Lwt_chan.in_channel_of_descr fd in
228
Lwt_chan.input_line ch >>= fun s ->
230
(function End_of_file -> empty None | e -> fail e)
231
in make ~finalize:(fun () -> Lwt.return (Lwt_unix.close fd)) aux
234
make (fun () -> cont s (fun () -> empty None))