1
(* $Id: netplex_semaphore.ml 1569 2011-04-05 16:08:52Z gerd $ *)
3
(* POSIX semaphores: A better implementation would use POSIX semaphores.
4
For each Netplex semaphore <name> we define
5
- One POSIX semaphore <prefix>_<name>
6
- One POSIX semaphore <prefix>_<name>_isprotected (only for
7
storing the protected attribute)
8
- If the semaphore is protected, another set of counters
9
<prefix>_<name>_<container> for every container. These cannot be
10
semaphores, because negative values are possible. A shared memory
11
segment is possible (memory cells are managed by the controller,
12
and RPC calls are used for this).
14
The Netplex operations map nicely to POSIX operations:
15
- Netplex create: sem_open with O_CREAT and O_EXCL
16
- Netplex increment: sem_post. If the semaphore is protected, the
17
container-specific counter is also incremented (and created with
18
value 0 if not existing).
19
- Netplex decrement w/o wait: sem_trywait. If the decrement is successful,
20
the container-specific counter is also decremented.
21
- Netplex decrement with wait: sem_wait. If the decrement is successful,
22
the container-specific counter is also decremented.
24
If the container crashes, the controller looks at the container-specific
25
counter, and calls sem_post or sem_trywait as often as the counter says:
26
sem_post for negative values, and sem_trywait for positive values.
40
let release = ref (fun () -> ())
45
val mutable semaphores = Hashtbl.create 50
46
val mutable containers = Hashtbl.create 50
51
semaphores <- Hashtbl.create 1;
52
containers <- Hashtbl.create 1
57
Netplex_ctrl_aux.program_Semaphore'V1
62
method ctrl_unplugged ctrl =
65
self # ctrl_container_finished ctrl cid false
69
method ctrl_receive_call ctrl cid procname procarg_val reply =
72
reply(Some(Netplex_ctrl_aux._of_Semaphore'V1'ping'res ()))
76
Netplex_ctrl_aux._to_Semaphore'V1'increment'arg procarg_val in
78
self # increment ctrl cid sem_name in
79
reply(Some(Netplex_ctrl_aux._of_Semaphore'V1'increment'res r))
84
Netplex_ctrl_aux._of_Semaphore'V1'decrement'res v in
86
let (sem_name, wait_flag) =
87
Netplex_ctrl_aux._to_Semaphore'V1'decrement'arg procarg_val in
88
self # decrement_async ctrl cid sem_name wait_flag proc_reply
92
Netplex_ctrl_aux._to_Semaphore'V1'get'arg procarg_val in
93
let (sem, _, _) = self # get_sem_tuple ctrl sem_name in
94
reply(Some(Netplex_ctrl_aux._of_Semaphore'V1'get'res !sem))
97
let (sem_name, init_val, protected) =
98
Netplex_ctrl_aux._to_Semaphore'V1'create'arg procarg_val in
100
snd(self # get_or_create_sem
101
ctrl sem_name init_val protected) in
102
reply(Some(Netplex_ctrl_aux._of_Semaphore'V1'create'res r))
106
Netplex_ctrl_aux._to_Semaphore'V1'destroy'arg procarg_val in
107
self # destroy_sem ctrl sem_name;
108
reply(Some(Netplex_ctrl_aux._of_Semaphore'V1'destroy'res ()))
111
failwith "Unknown procedure"
113
method increment ctrl cid sem_name =
114
let (sem, protected, waiting) = self # get_sem_tuple ctrl sem_name in
115
let cont_sem = self # get_cont_sem cid sem_name protected in
120
if not (Queue.is_empty waiting) then (
121
let (waiting_reply, waiting_cid) = Queue.take waiting in
122
let waiting_cont_sem =
123
self # get_cont_sem waiting_cid sem_name protected in
124
self#really_decrement sem waiting_cont_sem protected;
130
method private decrement_async ctrl cid sem_name wait_flag reply =
131
let (sem, protected, waiting) = self # get_sem_tuple ctrl sem_name in
132
let cont_sem = self # get_cont_sem cid sem_name protected in
134
self#really_decrement sem cont_sem protected;
139
Queue.push (reply,cid) waiting
144
method private really_decrement sem cont_sem protected =
150
method private get_or_create_sem ctrl sem_name init_val protected =
152
(Hashtbl.find semaphores (ctrl, sem_name), false)
154
let waiting = Queue.create() in
155
let new_sem = (ref init_val, protected, waiting) in
156
Hashtbl.add semaphores (ctrl, sem_name) new_sem;
159
method create_sem ctrl sem_name init_val protected =
160
snd(self # get_or_create_sem ctrl sem_name init_val protected)
162
method get_sem_tuple ctrl sem_name =
163
fst(self # get_or_create_sem ctrl sem_name 0L true)
165
method get_sem ctrl sem_name =
166
let ((value,_,_),_) = self # get_or_create_sem ctrl sem_name 0L true in
169
method private get_cont_sem cid sem_name protected =
172
try Hashtbl.find containers cid
174
let new_ht = Hashtbl.create 1 in
175
Hashtbl.add containers cid new_ht;
178
Hashtbl.find ht sem_name
180
let new_sem = ref 0L in
181
Hashtbl.add ht sem_name new_sem;
186
method destroy_sem ctrl sem_name =
188
let (_,_,waiting) = Hashtbl.find semaphores (ctrl, sem_name) in
189
let q = Queue.create() in
190
Queue.transfer waiting q;
192
(fun (waiting_reply, waiting_cid) ->
193
let ht = Hashtbl.find containers waiting_cid in
194
Hashtbl.remove ht sem_name;
200
method ctrl_container_finished ctrl cid _ =
202
let ht = Hashtbl.find containers cid in (* or Not_found *)
205
(fun sem_name value ->
206
(*Netlog.logf `Debug "semaphore shutdown name=%s d=%Ld"
209
let (sem, _, waiting) = self # get_sem_tuple ctrl sem_name in
210
let zero_flag = (!sem = 0L) in
211
sem := Int64.sub !sem !value;
212
if !sem < 0L then sem := 0L;
213
if zero_flag && !sem > 0L then
214
sems := sem_name :: !sems
219
let (sem, protected, waiting) =
220
self # get_sem_tuple ctrl sem_name in
222
while not(Queue.is_empty waiting) && !v > 0L do
223
let (waiting_reply,waiting_cid) = Queue.take waiting in
224
let waiting_cont_sem =
225
self # get_cont_sem waiting_cid sem_name protected in
226
self#really_decrement sem waiting_cont_sem protected;
232
Hashtbl.remove containers cid
239
let plugin = (plugin_i :> plugin)
242
(* Release memory after [fork]: *)
243
Netsys_posix.register_post_fork_handler
245
method name = "Netplex_semaphore"
246
method run () = !release()
251
let increment sem_name =
252
let cont = Netplex_cenv.self_cont() in
253
Netplex_ctrl_aux._to_Semaphore'V1'increment'res
254
(cont # call_plugin plugin "increment"
255
(Netplex_ctrl_aux._of_Semaphore'V1'increment'arg sem_name))
258
let decrement ?(wait=false) sem_name =
259
let cont = Netplex_cenv.self_cont() in
260
Netplex_ctrl_aux._to_Semaphore'V1'decrement'res
261
(cont # call_plugin plugin "decrement"
262
(Netplex_ctrl_aux._of_Semaphore'V1'decrement'arg (sem_name, wait)))
267
match Netplex_cenv.self_obj() with
269
let cont = Netplex_cenv.self_cont() in
270
Netplex_ctrl_aux._to_Semaphore'V1'get'res
271
(cont # call_plugin plugin "get"
272
(Netplex_ctrl_aux._of_Semaphore'V1'get'arg sem_name))
273
| `Controller ctrl ->
274
plugin_i # get_sem ctrl sem_name
277
raise Netplex_cenv.Not_in_container_thread
280
let create ?(protected=false) sem_name init_val =
282
match Netplex_cenv.self_obj() with
284
Netplex_ctrl_aux._to_Semaphore'V1'create'res
285
(cont # call_plugin plugin "create"
286
(Netplex_ctrl_aux._of_Semaphore'V1'create'arg
287
(sem_name, init_val, protected)))
288
| `Controller ctrl ->
289
plugin_i # create_sem ctrl sem_name init_val protected
292
raise Netplex_cenv.Not_in_container_thread
295
let destroy sem_name =
297
match Netplex_cenv.self_obj() with
299
Netplex_ctrl_aux._to_Semaphore'V1'destroy'res
300
(cont # call_plugin plugin "destroy"
301
(Netplex_ctrl_aux._of_Semaphore'V1'destroy'arg sem_name))
302
| `Controller ctrl ->
303
plugin_i # destroy_sem ctrl sem_name
306
raise Netplex_cenv.Not_in_container_thread
309
let ctrl_increment sem_name cid =
311
match Netplex_cenv.self_obj() with
313
failwith "Netplex_semaphore.ctrl_increment: not in controller context"
314
| `Controller ctrl ->
315
plugin_i # increment ctrl cid sem_name
318
raise Netplex_cenv.Not_in_container_thread