~ubuntu-branches/ubuntu/lucid/lwt/lucid

« back to all changes in this revision

Viewing changes to src/lwt_preemptive.ml

  • Committer: Bazaar Package Importer
  • Author(s): Stéphane Glondu
  • Date: 2009-10-15 23:58:46 UTC
  • mfrom: (3.1.1 sid)
  • Revision ID: james.westby@ubuntu.com-20091015235846-g6sbpx5angz5c2uq
Tags: 2.0.0-1
* New upstream release
* Switch packaging to dh-ocaml 0.9
* Upload to unstable

Show diffs side-by-side

added added

removed removed

Lines of Context:
3
3
 * Module lwt_preemptive.ml
4
4
 * Copyright (C) 2005 Nataliya Guts, Vincent Balat, J�r�me Vouillon
5
5
 * Laboratoire PPS - CNRS Universit� Paris Diderot
 
6
 *               2009 J�r�mie Dimino
6
7
 *
7
8
 * This program is free software; you can redistribute it and/or modify
8
9
 * it under the terms of the GNU Lesser General Public License as published by
9
 
 * the Free Software Foundation, with linking exception;
 
10
 * the Free Software Foundation, with linking exceptions;
10
11
 * either version 2.1 of the License, or (at your option) any later version.
 
12
 * See COPYING file for details.
11
13
 *
12
14
 * This program is distributed in the hope that it will be useful,
13
15
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19
21
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
20
22
 *)
21
23
 
22
 
open Lwt;;
23
 
 
24
 
exception Task_failed
25
 
 
26
 
let minthreads : int ref = ref 0
27
 
let maxthreads : int ref = ref 0
28
 
 
29
 
let maxthreadqueued = ref 100
30
 
 
31
 
let get_max_number_of_threads_queued () = !maxthreadqueued
32
 
let set_max_number_of_threads_queued n = maxthreadqueued := n
33
 
 
34
 
let finishedpipe =
 
24
open Lwt
 
25
open Lwt_io
 
26
 
 
27
(* +-----------------------------------------------------------------+
 
28
   | Parameters                                                      |
 
29
   +-----------------------------------------------------------------+ *)
 
30
 
 
31
(* Minimum number of preemptive threads: *)
 
32
let min_threads : int ref = ref 0
 
33
 
 
34
(* Maximum number of preemptive threads: *)
 
35
let max_threads : int ref = ref 0
 
36
 
 
37
(* Size of the waiting queue: *)
 
38
let max_thread_queued = ref 1000
 
39
 
 
40
let get_max_number_of_threads_queued _ =
 
41
  !max_thread_queued
 
42
 
 
43
let set_max_number_of_threads_queued n =
 
44
  if n < 0 then invalid_arg "Lwt_preemptive.set_max_number_of_threads_queued";
 
45
  max_thread_queued := n
 
46
 
 
47
(* The function for logging errors: *)
 
48
let error_log = ref (fun msg -> ignore (eprintl msg >> flush stderr))
 
49
 
 
50
(* The total number of preemptive threads currently running: *)
 
51
let threads_count = ref 0
 
52
 
 
53
(* +-----------------------------------------------------------------+
 
54
   | Preemptive threads management                                   |
 
55
   +-----------------------------------------------------------------+ *)
 
56
 
 
57
(* Pipe used to communicate between worker threads and the main
 
58
   thread. Each time a thread finish its work, it send its id to the
 
59
   main thread: *)
 
60
let finished_pipe =
35
61
  let (in_fd, out_fd) = Lwt_unix.pipe_in () in
36
62
  Lwt_unix.set_close_on_exec in_fd;
37
63
  Unix.set_close_on_exec out_fd;
38
 
  (Lwt_chan.in_channel_of_descr in_fd, Unix.out_channel_of_descr out_fd)
39
 
 
40
 
let pipelock = Mutex.create ()
41
 
 
42
 
let worker_chan n : (unit -> unit) Event.channel = Event.new_channel ()
43
 
type th = {mutable client: unit Lwt.t;
44
 
           mutable busy: bool;
45
 
           taskchannel: (unit -> unit) Event.channel;
46
 
           mutable worker: Thread.t option}
47
 
let pool : th array ref = ref [||]
48
 
 
49
 
let busylock = Mutex.create ()
50
 
let setbusy n b =
51
 
  Mutex.lock busylock;
52
 
  !pool.(n).busy <- b;
53
 
  Mutex.unlock busylock
54
 
 
55
 
let rec worker (n : int) : unit Lwt.t =
56
 
  let g = Event.sync (Event.receive !pool.(n).taskchannel) in
57
 
  g ();
58
 
  let buf = string_of_int n in
59
 
  Mutex.lock pipelock;
60
 
  output_string (snd finishedpipe) (buf^"\n");
61
 
  flush (snd finishedpipe);
62
 
  Mutex.unlock pipelock;
63
 
  worker n
64
 
 
65
 
 
66
 
exception All_preemptive_threads_are_busy
67
 
let free, nbthreadsqueued =
68
 
  let nb_threads_queued = ref 0 in
69
 
  let max_thread_waiting_queue =
70
 
    get_max_number_of_threads_queued () in
71
 
  let rec free1 i : int =
72
 
    if i >= !maxthreads
73
 
    then raise All_preemptive_threads_are_busy
74
 
    else if not !pool.(i).busy then i else free1 (i+1)
75
 
  in
76
 
  let launch_threads first =
77
 
    let rec aux last n =
78
 
      !pool.(n).worker <- Some (Thread.create worker n);
79
 
      if n<last then aux last (n+1)
80
 
    in
81
 
    match !pool.(first).worker with
82
 
      None ->
83
 
        let last = (min (first + (max !minthreads 10)) !maxthreads) - 1 in
84
 
        aux last first
85
 
    | _ -> ()
86
 
  in
87
 
  let rec aux () =
88
 
    try
89
 
      Mutex.lock busylock;
90
 
      let libre = free1 0 in
91
 
      !pool.(libre).busy <- true;
92
 
      Mutex.unlock busylock;
93
 
      launch_threads libre;
94
 
      Lwt.return libre
95
 
    with
96
 
      All_preemptive_threads_are_busy ->
97
 
        Mutex.unlock busylock;
98
 
        if (!maxthreads = 0) ||
99
 
        (!nb_threads_queued >= max_thread_waiting_queue)
100
 
        then fail All_preemptive_threads_are_busy
101
 
        else (nb_threads_queued := !nb_threads_queued + 1;
102
 
              Lwt_unix.sleep 1.0 >>= (fun () ->
103
 
                nb_threads_queued := !nb_threads_queued -1 ;
104
 
                aux ()))
105
 
    | e -> Mutex.unlock busylock; fail e
106
 
  in
107
 
  (aux,(fun () -> !nb_threads_queued))
108
 
 
109
 
let detach (f : 'a -> 'b) (args : 'a) : 'b Lwt.t =
110
 
  let res : 'b option ref = ref None in
111
 
  let exc : exn option ref = ref None in
112
 
  let g () = try res := Some (f args) with e -> exc := Some e
113
 
  in
114
 
  free () >>= (fun whatthread ->
115
 
    Event.sync (Event.send !pool.(whatthread).taskchannel g);
116
 
    !pool.(whatthread).client <- Lwt.wait ();
117
 
    !pool.(whatthread).client >>=
118
 
    (fun () -> match !res with
 
64
  (Lwt_io.of_fd ~mode:Lwt_io.input ~buffer_size:256 in_fd, Unix.out_channel_of_descr out_fd)
 
65
 
 
66
(* Mutex used to prevent concurrent access to [finished_pipe] by
 
67
   preemptive worker threads: *)
 
68
let pipe_lock = Mutex.create ()
 
69
 
 
70
type thread = {
 
71
  task_channel: (unit -> unit) Event.channel;
 
72
  (* Channel used to communicate a task to the worker thread. *)
 
73
 
 
74
  mutable thread : Thread.t;
 
75
  (* The worker thread. *)
 
76
 
 
77
  mutable reuse : bool;
 
78
  (* Whether the thread must be readded to the pool when the work is
 
79
     done. *)
 
80
}
 
81
 
 
82
(* Pool of worker threads: *)
 
83
let workers : thread Queue.t = Queue.create ()
 
84
 
 
85
(* Queue of clients waiting for a worker to be available: *)
 
86
let waiters : thread Lwt.u Lwt_sequence.t = Lwt_sequence.create ()
 
87
 
 
88
(* Mapping from thread ids to client lwt-thread: *)
 
89
let clients : (int, unit Lwt.u) Hashtbl.t = Hashtbl.create 16
 
90
 
 
91
(* Code executed by a worker: *)
 
92
let rec worker_loop worker =
 
93
  let task = Event.sync (Event.receive worker.task_channel) in
 
94
  task ();
 
95
  (* If there is too much threads, exit. This can happen if the user
 
96
     decreased the maximum: *)
 
97
  if !threads_count > !max_threads then worker.reuse <- false;
 
98
  (* Tell the main thread that work is done: *)
 
99
  Mutex.lock pipe_lock;
 
100
  let oc = snd finished_pipe in
 
101
  Pervasives.output_string oc (string_of_int (Thread.id worker.thread) ^ "\n");
 
102
  Pervasives.flush oc;
 
103
  Mutex.unlock pipe_lock;
 
104
  if worker.reuse then worker_loop worker
 
105
 
 
106
(* create a new worker: *)
 
107
let make_worker _ =
 
108
  incr threads_count;
 
109
  let worker = {
 
110
    task_channel = Event.new_channel ();
 
111
    thread = Thread.self ();
 
112
    reuse = true;
 
113
  } in
 
114
  worker.thread <- Thread.create worker_loop worker;
 
115
  worker
 
116
 
 
117
(* Add a worker to the pool: *)
 
118
let add_worker worker =
 
119
  match Lwt_sequence.take_opt_l waiters with
119
120
    | None ->
120
 
        (match !exc with
121
 
        | None ->
122
 
            setbusy whatthread false;
123
 
            fail Task_failed
124
 
        | Some e ->
125
 
            setbusy whatthread false;
126
 
            fail e)
127
 
    | Some r ->
128
 
        setbusy whatthread false;
129
 
        Lwt.return r))
130
 
 
131
 
 
132
 
let dispatch errlog =
133
 
  let rec aux () =
134
 
    (catch
135
 
       (fun () ->
136
 
         Lwt_chan.input_line (fst finishedpipe) >>=
137
 
         (fun v ->
138
 
           let n = int_of_string v in
139
 
 
140
 
           ignore (
141
 
           (* Here we want to do the recursive call as soon as possible
142
 
              (and before the wakeup)
143
 
              because if Lwt_unix.run is called by the waiters of the
144
 
              thread beeing awoken,
145
 
              and if that run wants to use detach,
146
 
              the pipe won't be available, and the run will never finish ...
147
 
              and block the other run
148
 
              (remember that an invocation of [run] will not terminate
149
 
              before all subsequent invocations are terminated)
150
 
            *)
151
 
           Lwt_unix.yield () >>= fun () ->
152
 
             wakeup !pool.(n).client ();
153
 
             return ());
154
 
           return ()))
155
 
 
156
 
       (fun e ->
157
 
          errlog
158
 
            ("Internal error in lwt_preemptive.ml (read failed on the pipe) "^
159
 
               Printexc.to_string e ^" - Please check if Lwt_preemptive is initialized and that lwt_preemptive.cmo is linked only once. Otherwise, please report the bug");
160
 
         return ())
161
 
    ) >>= (fun () -> aux ())
162
 
  in aux ()
163
 
 
164
 
 
165
 
let initthread =
166
 
  let def = Lwt.return () in
167
 
  fun n ->
168
 
    {client = def;
169
 
     busy = false;
170
 
     taskchannel = worker_chan n;
171
 
     worker = None}
172
 
 
173
 
let rec start_initial_threads min n =
174
 
  if n<min
175
 
  then begin
176
 
    !pool.(n).worker <- Some (Thread.create worker n);
177
 
    start_initial_threads min (n+1);
 
121
        Queue.add worker workers
 
122
    | Some w ->
 
123
        wakeup w worker
 
124
 
 
125
(* Wait for worker to be available, then return it: *)
 
126
let rec get_worker _ =
 
127
  if not (Queue.is_empty workers) then
 
128
    return (Queue.take workers)
 
129
  else if !threads_count < !max_threads then
 
130
    return (make_worker ())
 
131
  else begin
 
132
    let (res, w) = Lwt.task () in
 
133
    let node = Lwt_sequence.add_r w waiters in
 
134
    Lwt.on_cancel res (fun _ -> Lwt_sequence.remove node);
 
135
    res
178
136
  end
179
137
 
 
138
(* +-----------------------------------------------------------------+
 
139
   | Dispatcher                                                      |
 
140
   +-----------------------------------------------------------------+ *)
 
141
 
 
142
(* The dispatcher is responsible for reading id of threads which have
 
143
   finished their work and wakeup the corresponding lwt thread: *)
 
144
 
 
145
let rec dispatch () =
 
146
  begin
 
147
    try_lwt
 
148
      (* Read the id of the next thread that has finished his work: *)
 
149
      lwt n = int_of_string =|< read_line (fst finished_pipe) in
 
150
      ignore begin
 
151
        (* Here we want to do the recursive call as soon as possible
 
152
           (and before the wakeup) because if Lwt_unix.run is called
 
153
           by the waiters of the thread beeing awoken, and if that run
 
154
           wants to use detach, the pipe won't be available, and the
 
155
           run will never finish ...  and block the other run
 
156
           (remember that an invocation of [run] will not terminate
 
157
           before all subsequent invocations are terminated) *)
 
158
          Lwt_unix.yield () >> begin
 
159
            let w = Hashtbl.find clients n in
 
160
            Hashtbl.remove clients n;
 
161
            wakeup w ();
 
162
            return ()
 
163
          end
 
164
        end;
 
165
        return `continue
 
166
      with
 
167
        | Channel_closed _ ->
 
168
            return `break
 
169
        | exn ->
 
170
            Printf.ksprintf !error_log
 
171
              "Internal error in lwt_preemptive.ml (read failed on the pipe) %s - Please check if Lwt_preemptive is initialized and that lwt_preemptive.cmo is linked only once. Otherwise, please report the bug"
 
172
              (Printexc.to_string exn);
 
173
            fail exn
 
174
  end >>= function
 
175
    | `continue -> dispatch ()
 
176
    | `break -> return ()
 
177
 
 
178
let dispatcher_running = ref false
 
179
 
 
180
let dispatcher = lazy(dispatcher_running := true; dispatch ())
 
181
 
 
182
(* +-----------------------------------------------------------------+
 
183
   | Initialisation, and dynamic parameters reset                    |
 
184
   +-----------------------------------------------------------------+ *)
 
185
 
 
186
let get_bounds _ = (!min_threads, !max_threads)
 
187
 
 
188
let set_bounds (min, max) =
 
189
  if min < 0 || max < min then invalid_arg "Lwt_preemptive.set_bounds";
 
190
  let diff = min - !threads_count in
 
191
  min_threads := min;
 
192
  max_threads := max;
 
193
  (* Launch new workers: *)
 
194
  for i = 1 to diff do
 
195
    add_worker (make_worker ())
 
196
  done
 
197
 
180
198
let init min max errlog =
181
 
  pool := Array.init max initthread;
182
 
  minthreads := min;
183
 
  maxthreads := max;
184
 
  start_initial_threads min 0;
185
 
  dispatch errlog
186
 
 
187
 
 
188
 
let nbthreads () =
189
 
  Array.fold_left (fun nb elt ->
190
 
    match elt.worker with None -> nb | _ -> nb+1) 0 !pool
191
 
 
192
 
let nbthreadsbusy () =
193
 
  Mutex.lock busylock;
194
 
  let r =
195
 
    Array.fold_left (fun nb elt -> if elt.busy then nb+1 else nb) 0 !pool in
196
 
  Mutex.unlock busylock;
197
 
  r
198
 
 
 
199
  error_log := errlog;
 
200
  set_bounds (min, max);
 
201
  Lazy.force dispatcher
 
202
 
 
203
let simple_init _ =
 
204
  if not !dispatcher_running then set_bounds (0, 4);
 
205
  Lazy.force dispatcher
 
206
 
 
207
let nbthreads _ = !threads_count
 
208
let nbthreadsqueued _ = Lwt_sequence.fold_l (fun _ x -> x + 1) waiters 0
 
209
let nbthreadsbusy _ = !threads_count - Queue.length workers
 
210
 
 
211
(* +-----------------------------------------------------------------+
 
212
   | Detaching                                                       |
 
213
   +-----------------------------------------------------------------+ *)
 
214
 
 
215
let detach f args =
 
216
  let _ = simple_init () in
 
217
  let result = ref `Nothing in
 
218
  (* The task for the worker thread: *)
 
219
  let task () =
 
220
    try
 
221
      result := `Success(f args)
 
222
    with exn ->
 
223
      result := `Failure exn
 
224
  in
 
225
  lwt worker = get_worker () in
 
226
  let (res, w) =  Lwt.wait () in
 
227
  Hashtbl.add clients (Thread.id worker.thread) w;
 
228
  (* Send the task to the worker: *)
 
229
  Event.sync (Event.send worker.task_channel task);
 
230
  try_lwt
 
231
    (* Wait for notification of the dispatcher: *)
 
232
    res >>
 
233
      match !result with
 
234
      | `Nothing ->
 
235
          fail (Failure "Lwt_preemptive.detach")
 
236
      | `Success v ->
 
237
          return v
 
238
      | `Failure exn ->
 
239
          fail exn
 
240
  finally
 
241
    if worker.reuse then
 
242
      (* Put back the worker to the pool: *)
 
243
      add_worker worker
 
244
    else begin
 
245
      decr threads_count;
 
246
      (* Or wait for the thread to terminates, to free its associated
 
247
         resources: *)
 
248
      Thread.join worker.thread
 
249
    end;
 
250
    return ()