19
21
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
26
let minthreads : int ref = ref 0
27
let maxthreads : int ref = ref 0
29
let maxthreadqueued = ref 100
31
let get_max_number_of_threads_queued () = !maxthreadqueued
32
let set_max_number_of_threads_queued n = maxthreadqueued := n
27
(* +-----------------------------------------------------------------+
29
+-----------------------------------------------------------------+ *)
31
(* Minimum number of preemptive threads: *)
32
let min_threads : int ref = ref 0
34
(* Maximum number of preemptive threads: *)
35
let max_threads : int ref = ref 0
37
(* Size of the waiting queue: *)
38
let max_thread_queued = ref 1000
40
let get_max_number_of_threads_queued _ =
43
let set_max_number_of_threads_queued n =
44
if n < 0 then invalid_arg "Lwt_preemptive.set_max_number_of_threads_queued";
45
max_thread_queued := n
47
(* The function for logging errors: *)
48
let error_log = ref (fun msg -> ignore (eprintl msg >> flush stderr))
50
(* The total number of preemptive threads currently running: *)
51
let threads_count = ref 0
53
(* +-----------------------------------------------------------------+
54
| Preemptive threads management |
55
+-----------------------------------------------------------------+ *)
57
(* Pipe used to communicate between worker threads and the main
58
thread. Each time a thread finish its work, it send its id to the
35
61
let (in_fd, out_fd) = Lwt_unix.pipe_in () in
36
62
Lwt_unix.set_close_on_exec in_fd;
37
63
Unix.set_close_on_exec out_fd;
38
(Lwt_chan.in_channel_of_descr in_fd, Unix.out_channel_of_descr out_fd)
40
let pipelock = Mutex.create ()
42
let worker_chan n : (unit -> unit) Event.channel = Event.new_channel ()
43
type th = {mutable client: unit Lwt.t;
45
taskchannel: (unit -> unit) Event.channel;
46
mutable worker: Thread.t option}
47
let pool : th array ref = ref [||]
49
let busylock = Mutex.create ()
55
let rec worker (n : int) : unit Lwt.t =
56
let g = Event.sync (Event.receive !pool.(n).taskchannel) in
58
let buf = string_of_int n in
60
output_string (snd finishedpipe) (buf^"\n");
61
flush (snd finishedpipe);
62
Mutex.unlock pipelock;
66
exception All_preemptive_threads_are_busy
67
let free, nbthreadsqueued =
68
let nb_threads_queued = ref 0 in
69
let max_thread_waiting_queue =
70
get_max_number_of_threads_queued () in
71
let rec free1 i : int =
73
then raise All_preemptive_threads_are_busy
74
else if not !pool.(i).busy then i else free1 (i+1)
76
let launch_threads first =
78
!pool.(n).worker <- Some (Thread.create worker n);
79
if n<last then aux last (n+1)
81
match !pool.(first).worker with
83
let last = (min (first + (max !minthreads 10)) !maxthreads) - 1 in
90
let libre = free1 0 in
91
!pool.(libre).busy <- true;
92
Mutex.unlock busylock;
96
All_preemptive_threads_are_busy ->
97
Mutex.unlock busylock;
98
if (!maxthreads = 0) ||
99
(!nb_threads_queued >= max_thread_waiting_queue)
100
then fail All_preemptive_threads_are_busy
101
else (nb_threads_queued := !nb_threads_queued + 1;
102
Lwt_unix.sleep 1.0 >>= (fun () ->
103
nb_threads_queued := !nb_threads_queued -1 ;
105
| e -> Mutex.unlock busylock; fail e
107
(aux,(fun () -> !nb_threads_queued))
109
let detach (f : 'a -> 'b) (args : 'a) : 'b Lwt.t =
110
let res : 'b option ref = ref None in
111
let exc : exn option ref = ref None in
112
let g () = try res := Some (f args) with e -> exc := Some e
114
free () >>= (fun whatthread ->
115
Event.sync (Event.send !pool.(whatthread).taskchannel g);
116
!pool.(whatthread).client <- Lwt.wait ();
117
!pool.(whatthread).client >>=
118
(fun () -> match !res with
64
(Lwt_io.of_fd ~mode:Lwt_io.input ~buffer_size:256 in_fd, Unix.out_channel_of_descr out_fd)
66
(* Mutex used to prevent concurrent access to [finished_pipe] by
67
preemptive worker threads: *)
68
let pipe_lock = Mutex.create ()
71
task_channel: (unit -> unit) Event.channel;
72
(* Channel used to communicate a task to the worker thread. *)
74
mutable thread : Thread.t;
75
(* The worker thread. *)
78
(* Whether the thread must be readded to the pool when the work is
82
(* Pool of worker threads: *)
83
let workers : thread Queue.t = Queue.create ()
85
(* Queue of clients waiting for a worker to be available: *)
86
let waiters : thread Lwt.u Lwt_sequence.t = Lwt_sequence.create ()
88
(* Mapping from thread ids to client lwt-thread: *)
89
let clients : (int, unit Lwt.u) Hashtbl.t = Hashtbl.create 16
91
(* Code executed by a worker: *)
92
let rec worker_loop worker =
93
let task = Event.sync (Event.receive worker.task_channel) in
95
(* If there is too much threads, exit. This can happen if the user
96
decreased the maximum: *)
97
if !threads_count > !max_threads then worker.reuse <- false;
98
(* Tell the main thread that work is done: *)
100
let oc = snd finished_pipe in
101
Pervasives.output_string oc (string_of_int (Thread.id worker.thread) ^ "\n");
103
Mutex.unlock pipe_lock;
104
if worker.reuse then worker_loop worker
106
(* create a new worker: *)
110
task_channel = Event.new_channel ();
111
thread = Thread.self ();
114
worker.thread <- Thread.create worker_loop worker;
117
(* Add a worker to the pool: *)
118
let add_worker worker =
119
match Lwt_sequence.take_opt_l waiters with
122
setbusy whatthread false;
125
setbusy whatthread false;
128
setbusy whatthread false;
132
let dispatch errlog =
136
Lwt_chan.input_line (fst finishedpipe) >>=
138
let n = int_of_string v in
141
(* Here we want to do the recursive call as soon as possible
142
(and before the wakeup)
143
because if Lwt_unix.run is called by the waiters of the
144
thread beeing awoken,
145
and if that run wants to use detach,
146
the pipe won't be available, and the run will never finish ...
147
and block the other run
148
(remember that an invocation of [run] will not terminate
149
before all subsequent invocations are terminated)
151
Lwt_unix.yield () >>= fun () ->
152
wakeup !pool.(n).client ();
158
("Internal error in lwt_preemptive.ml (read failed on the pipe) "^
159
Printexc.to_string e ^" - Please check if Lwt_preemptive is initialized and that lwt_preemptive.cmo is linked only once. Otherwise, please report the bug");
161
) >>= (fun () -> aux ())
166
let def = Lwt.return () in
170
taskchannel = worker_chan n;
173
let rec start_initial_threads min n =
176
!pool.(n).worker <- Some (Thread.create worker n);
177
start_initial_threads min (n+1);
121
Queue.add worker workers
125
(* Wait for worker to be available, then return it: *)
126
let rec get_worker _ =
127
if not (Queue.is_empty workers) then
128
return (Queue.take workers)
129
else if !threads_count < !max_threads then
130
return (make_worker ())
132
let (res, w) = Lwt.task () in
133
let node = Lwt_sequence.add_r w waiters in
134
Lwt.on_cancel res (fun _ -> Lwt_sequence.remove node);
138
(* +-----------------------------------------------------------------+
140
+-----------------------------------------------------------------+ *)
142
(* The dispatcher is responsible for reading id of threads which have
143
finished their work and wakeup the corresponding lwt thread: *)
145
let rec dispatch () =
148
(* Read the id of the next thread that has finished his work: *)
149
lwt n = int_of_string =|< read_line (fst finished_pipe) in
151
(* Here we want to do the recursive call as soon as possible
152
(and before the wakeup) because if Lwt_unix.run is called
153
by the waiters of the thread beeing awoken, and if that run
154
wants to use detach, the pipe won't be available, and the
155
run will never finish ... and block the other run
156
(remember that an invocation of [run] will not terminate
157
before all subsequent invocations are terminated) *)
158
Lwt_unix.yield () >> begin
159
let w = Hashtbl.find clients n in
160
Hashtbl.remove clients n;
167
| Channel_closed _ ->
170
Printf.ksprintf !error_log
171
"Internal error in lwt_preemptive.ml (read failed on the pipe) %s - Please check if Lwt_preemptive is initialized and that lwt_preemptive.cmo is linked only once. Otherwise, please report the bug"
172
(Printexc.to_string exn);
175
| `continue -> dispatch ()
176
| `break -> return ()
178
let dispatcher_running = ref false
180
let dispatcher = lazy(dispatcher_running := true; dispatch ())
182
(* +-----------------------------------------------------------------+
183
| Initialisation, and dynamic parameters reset |
184
+-----------------------------------------------------------------+ *)
186
let get_bounds _ = (!min_threads, !max_threads)
188
let set_bounds (min, max) =
189
if min < 0 || max < min then invalid_arg "Lwt_preemptive.set_bounds";
190
let diff = min - !threads_count in
193
(* Launch new workers: *)
195
add_worker (make_worker ())
180
198
let init min max errlog =
181
pool := Array.init max initthread;
184
start_initial_threads min 0;
189
Array.fold_left (fun nb elt ->
190
match elt.worker with None -> nb | _ -> nb+1) 0 !pool
192
let nbthreadsbusy () =
195
Array.fold_left (fun nb elt -> if elt.busy then nb+1 else nb) 0 !pool in
196
Mutex.unlock busylock;
200
set_bounds (min, max);
201
Lazy.force dispatcher
204
if not !dispatcher_running then set_bounds (0, 4);
205
Lazy.force dispatcher
207
let nbthreads _ = !threads_count
208
let nbthreadsqueued _ = Lwt_sequence.fold_l (fun _ x -> x + 1) waiters 0
209
let nbthreadsbusy _ = !threads_count - Queue.length workers
211
(* +-----------------------------------------------------------------+
213
+-----------------------------------------------------------------+ *)
216
let _ = simple_init () in
217
let result = ref `Nothing in
218
(* The task for the worker thread: *)
221
result := `Success(f args)
223
result := `Failure exn
225
lwt worker = get_worker () in
226
let (res, w) = Lwt.wait () in
227
Hashtbl.add clients (Thread.id worker.thread) w;
228
(* Send the task to the worker: *)
229
Event.sync (Event.send worker.task_channel task);
231
(* Wait for notification of the dispatcher: *)
235
fail (Failure "Lwt_preemptive.detach")
242
(* Put back the worker to the pool: *)
246
(* Or wait for the thread to terminates, to free its associated
248
Thread.join worker.thread