~ubuntu-branches/ubuntu/trusty/ocamlnet/trusty

« back to all changes in this revision

Viewing changes to src/netplex/netplex_semaphore.ml

  • Committer: Bazaar Package Importer
  • Author(s): Stéphane Glondu
  • Date: 2011-09-02 14:12:33 UTC
  • mfrom: (18.2.3 sid)
  • Revision ID: james.westby@ubuntu.com-20110902141233-zbj0ygxb92u6gy4z
Tags: 3.4-1
* New upstream release
  - add a new NetcgiRequire directive to ease dependency management
    (Closes: #637147)
  - remove patches that were applied upstream:
    + Added-missing-shebang-lines-in-example-shell-scripts
    + Try-also-ocamlc-for-POSIX-threads

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
(* $Id: netplex_semaphore.ml 1569 2011-04-05 16:08:52Z gerd $ *)
 
2
 
 
3
(* POSIX semaphores: A better implementation would use POSIX semaphores.
 
4
   For each Netplex semaphore <name> we define
 
5
   - One POSIX semaphore <prefix>_<name>
 
6
   - One POSIX semaphore <prefix>_<name>_isprotected (only for
 
7
     storing the protected attribute)
 
8
   - If the semaphore is protected, another set of counters
 
9
     <prefix>_<name>_<container> for every container. These cannot be
 
10
     semaphores, because negative values are possible. A shared memory
 
11
     segment is possible (memory cells are managed by the controller,
 
12
     and RPC calls are used for this).
 
13
 
 
14
   The Netplex operations map nicely to POSIX operations:
 
15
   - Netplex create: sem_open with O_CREAT and O_EXCL
 
16
   - Netplex increment: sem_post. If the semaphore is protected, the
 
17
     container-specific counter is also incremented (and created with
 
18
     value 0 if not existing).
 
19
   - Netplex decrement w/o wait: sem_trywait. If the decrement is successful,
 
20
     the container-specific counter is also decremented.
 
21
   - Netplex decrement with wait: sem_wait. If the decrement is successful,
 
22
     the container-specific counter is also decremented.
 
23
 
 
24
   If the container crashes, the controller looks at the container-specific
 
25
   counter, and calls sem_post or sem_trywait as often as the counter says:
 
26
   sem_post for negative values, and sem_trywait for positive values.
 
27
 
 
28
 *)
 
29
 
 
30
 
 
31
open Netplex_types
 
32
 
 
33
let int64_incr v =
 
34
  v := Int64.succ !v
 
35
 
 
36
let int64_decr v =
 
37
  v := Int64.pred !v
 
38
 
 
39
 
 
40
let release = ref (fun () -> ())
 
41
 
 
42
 
 
43
let plugin_i =
 
44
  ( object(self)
 
45
      val mutable semaphores = Hashtbl.create 50
 
46
      val mutable containers = Hashtbl.create 50
 
47
 
 
48
      initializer (
 
49
        release :=
 
50
          (fun () -> 
 
51
             semaphores <- Hashtbl.create 1;
 
52
             containers <- Hashtbl.create 1
 
53
          )
 
54
      )
 
55
 
 
56
      method program = 
 
57
        Netplex_ctrl_aux.program_Semaphore'V1
 
58
 
 
59
      method ctrl_added _ =
 
60
        ()
 
61
 
 
62
      method ctrl_unplugged ctrl =
 
63
        List.iter
 
64
          (fun cid ->
 
65
             self # ctrl_container_finished ctrl cid false
 
66
          )
 
67
          ctrl#containers
 
68
 
 
69
      method ctrl_receive_call ctrl cid procname procarg_val reply =
 
70
        match procname with
 
71
          | "ping" ->
 
72
              reply(Some(Netplex_ctrl_aux._of_Semaphore'V1'ping'res ()))
 
73
                
 
74
          | "increment" ->
 
75
              let sem_name = 
 
76
                Netplex_ctrl_aux._to_Semaphore'V1'increment'arg procarg_val in
 
77
              let r = 
 
78
                self # increment ctrl cid sem_name in
 
79
              reply(Some(Netplex_ctrl_aux._of_Semaphore'V1'increment'res r))
 
80
                    
 
81
          | "decrement" ->
 
82
              let proc_reply v =
 
83
                let v' = 
 
84
                  Netplex_ctrl_aux._of_Semaphore'V1'decrement'res v in
 
85
                reply(Some v') in
 
86
              let (sem_name, wait_flag) =
 
87
                Netplex_ctrl_aux._to_Semaphore'V1'decrement'arg procarg_val in
 
88
              self # decrement_async ctrl cid sem_name wait_flag proc_reply
 
89
 
 
90
          | "get" ->
 
91
              let sem_name =
 
92
                Netplex_ctrl_aux._to_Semaphore'V1'get'arg procarg_val in
 
93
              let (sem, _, _) = self # get_sem_tuple ctrl sem_name in
 
94
              reply(Some(Netplex_ctrl_aux._of_Semaphore'V1'get'res !sem))
 
95
 
 
96
          | "create" ->
 
97
              let (sem_name, init_val, protected) = 
 
98
                Netplex_ctrl_aux._to_Semaphore'V1'create'arg procarg_val in
 
99
              let r =
 
100
                snd(self # get_or_create_sem
 
101
                      ctrl sem_name init_val protected) in
 
102
              reply(Some(Netplex_ctrl_aux._of_Semaphore'V1'create'res r))
 
103
 
 
104
          | "destroy" ->
 
105
              let sem_name =
 
106
                Netplex_ctrl_aux._to_Semaphore'V1'destroy'arg procarg_val in
 
107
              self # destroy_sem ctrl sem_name;
 
108
              reply(Some(Netplex_ctrl_aux._of_Semaphore'V1'destroy'res ()))
 
109
                    
 
110
          | _ ->
 
111
              failwith "Unknown procedure"
 
112
 
 
113
      method increment ctrl cid sem_name =
 
114
        let (sem, protected, waiting) = self # get_sem_tuple ctrl sem_name in
 
115
        let cont_sem = self # get_cont_sem cid sem_name protected in
 
116
        int64_incr sem;
 
117
        int64_incr cont_sem;
 
118
        let semval = !sem in
 
119
        if !sem = 1L then (
 
120
          if not (Queue.is_empty waiting) then (
 
121
            let (waiting_reply, waiting_cid) = Queue.take waiting in
 
122
            let waiting_cont_sem = 
 
123
              self # get_cont_sem waiting_cid sem_name protected in
 
124
            self#really_decrement sem waiting_cont_sem protected;
 
125
            waiting_reply 0L
 
126
          )
 
127
        );
 
128
        semval
 
129
 
 
130
      method private decrement_async ctrl cid sem_name wait_flag reply =
 
131
        let (sem, protected, waiting) = self # get_sem_tuple ctrl sem_name in
 
132
        let cont_sem = self # get_cont_sem cid sem_name protected in
 
133
        if !sem > 0L then (
 
134
          self#really_decrement sem cont_sem protected;
 
135
          reply !sem
 
136
        )
 
137
        else (
 
138
          if wait_flag then
 
139
            Queue.push (reply,cid) waiting
 
140
          else 
 
141
            reply (-1L)
 
142
        )
 
143
 
 
144
      method private really_decrement sem cont_sem protected =
 
145
        assert(!sem > 0L);
 
146
        int64_decr sem;
 
147
        if protected then
 
148
          int64_decr cont_sem
 
149
 
 
150
      method private get_or_create_sem ctrl sem_name init_val protected =
 
151
        try 
 
152
          (Hashtbl.find semaphores (ctrl, sem_name), false)
 
153
        with Not_found -> 
 
154
          let waiting = Queue.create() in
 
155
          let new_sem = (ref init_val, protected, waiting) in
 
156
          Hashtbl.add semaphores (ctrl, sem_name) new_sem;
 
157
          (new_sem, true)
 
158
 
 
159
      method create_sem ctrl sem_name init_val protected =
 
160
        snd(self # get_or_create_sem ctrl sem_name init_val protected)
 
161
 
 
162
      method get_sem_tuple ctrl sem_name =
 
163
        fst(self # get_or_create_sem ctrl sem_name 0L true)
 
164
 
 
165
      method get_sem ctrl sem_name =
 
166
        let ((value,_,_),_) = self # get_or_create_sem ctrl sem_name 0L true in
 
167
        !value
 
168
 
 
169
      method private get_cont_sem cid sem_name protected =
 
170
        if protected then (
 
171
          let ht =
 
172
            try Hashtbl.find containers cid
 
173
            with Not_found -> 
 
174
              let new_ht = Hashtbl.create 1 in
 
175
              Hashtbl.add containers cid new_ht;
 
176
              new_ht in
 
177
          try
 
178
            Hashtbl.find ht sem_name
 
179
          with Not_found ->
 
180
            let new_sem = ref 0L in
 
181
            Hashtbl.add ht sem_name new_sem;
 
182
            new_sem
 
183
        )
 
184
        else (ref 0L)
 
185
 
 
186
      method destroy_sem ctrl sem_name =
 
187
        try
 
188
          let (_,_,waiting) = Hashtbl.find semaphores (ctrl, sem_name) in
 
189
          let q = Queue.create() in
 
190
          Queue.transfer waiting q;
 
191
          Queue.iter
 
192
            (fun (waiting_reply, waiting_cid) ->
 
193
               let ht = Hashtbl.find containers waiting_cid in
 
194
               Hashtbl.remove ht sem_name;
 
195
               waiting_reply (-1L)
 
196
            )
 
197
            q
 
198
        with Not_found -> ()
 
199
 
 
200
      method ctrl_container_finished ctrl cid _ =
 
201
        try
 
202
          let ht = Hashtbl.find containers cid in  (* or Not_found *)
 
203
          let sems = ref [] in
 
204
          Hashtbl.iter
 
205
            (fun sem_name value ->
 
206
               (*Netlog.logf `Debug "semaphore shutdown name=%s d=%Ld"
 
207
                 sem_name !value;
 
208
                *)
 
209
               let (sem, _, waiting) = self # get_sem_tuple ctrl sem_name in
 
210
               let zero_flag = (!sem = 0L) in
 
211
               sem := Int64.sub !sem !value;
 
212
               if !sem < 0L then sem := 0L;
 
213
               if zero_flag && !sem > 0L then
 
214
                 sems := sem_name :: !sems
 
215
            )
 
216
            ht;
 
217
          List.iter
 
218
            (fun sem_name ->
 
219
               let (sem, protected, waiting) = 
 
220
                 self # get_sem_tuple ctrl sem_name in
 
221
               let v = ref !sem in
 
222
               while not(Queue.is_empty waiting) && !v > 0L do
 
223
                 let (waiting_reply,waiting_cid) = Queue.take waiting in
 
224
                 let waiting_cont_sem =
 
225
                   self # get_cont_sem waiting_cid sem_name protected in
 
226
                 self#really_decrement sem waiting_cont_sem protected;
 
227
                 waiting_reply 0L;
 
228
                 int64_decr v
 
229
               done
 
230
            )
 
231
            !sems;
 
232
          Hashtbl.remove containers cid
 
233
        with
 
234
          | Not_found -> ()
 
235
          
 
236
     end
 
237
  )
 
238
 
 
239
let plugin = (plugin_i :> plugin)
 
240
 
 
241
let () =
 
242
  (* Release memory after [fork]: *)
 
243
  Netsys_posix.register_post_fork_handler
 
244
    (object
 
245
       method name = "Netplex_semaphore"
 
246
       method run () = !release()
 
247
     end
 
248
    )
 
249
 
 
250
 
 
251
let increment sem_name =
 
252
  let cont = Netplex_cenv.self_cont() in
 
253
  Netplex_ctrl_aux._to_Semaphore'V1'increment'res
 
254
    (cont # call_plugin plugin "increment" 
 
255
       (Netplex_ctrl_aux._of_Semaphore'V1'increment'arg sem_name))
 
256
 
 
257
 
 
258
let decrement ?(wait=false) sem_name =
 
259
  let cont = Netplex_cenv.self_cont() in
 
260
  Netplex_ctrl_aux._to_Semaphore'V1'decrement'res
 
261
    (cont # call_plugin plugin "decrement" 
 
262
       (Netplex_ctrl_aux._of_Semaphore'V1'decrement'arg (sem_name, wait)))
 
263
 
 
264
 
 
265
let get sem_name =
 
266
  try
 
267
    match Netplex_cenv.self_obj() with
 
268
      | `Container cont ->
 
269
          let cont = Netplex_cenv.self_cont() in
 
270
          Netplex_ctrl_aux._to_Semaphore'V1'get'res
 
271
            (cont # call_plugin plugin "get" 
 
272
               (Netplex_ctrl_aux._of_Semaphore'V1'get'arg sem_name))
 
273
      | `Controller ctrl ->
 
274
          plugin_i # get_sem ctrl sem_name
 
275
  with
 
276
    | Not_found ->
 
277
        raise Netplex_cenv.Not_in_container_thread
 
278
 
 
279
 
 
280
let create ?(protected=false) sem_name init_val =
 
281
  try
 
282
    match Netplex_cenv.self_obj() with
 
283
      | `Container cont ->
 
284
          Netplex_ctrl_aux._to_Semaphore'V1'create'res
 
285
            (cont # call_plugin plugin "create" 
 
286
               (Netplex_ctrl_aux._of_Semaphore'V1'create'arg 
 
287
                  (sem_name, init_val, protected)))
 
288
      | `Controller ctrl ->
 
289
          plugin_i # create_sem ctrl sem_name init_val protected
 
290
  with
 
291
    | Not_found ->
 
292
        raise Netplex_cenv.Not_in_container_thread
 
293
 
 
294
 
 
295
let destroy sem_name =
 
296
  try
 
297
    match Netplex_cenv.self_obj() with
 
298
      | `Container cont ->
 
299
          Netplex_ctrl_aux._to_Semaphore'V1'destroy'res
 
300
            (cont # call_plugin plugin "destroy" 
 
301
               (Netplex_ctrl_aux._of_Semaphore'V1'destroy'arg sem_name))
 
302
      | `Controller ctrl ->
 
303
          plugin_i # destroy_sem ctrl sem_name
 
304
  with
 
305
    | Not_found ->
 
306
        raise Netplex_cenv.Not_in_container_thread
 
307
 
 
308
 
 
309
let ctrl_increment sem_name cid =
 
310
  try
 
311
    match Netplex_cenv.self_obj() with
 
312
      | `Container cont ->
 
313
          failwith "Netplex_semaphore.ctrl_increment: not in controller context"
 
314
      | `Controller ctrl ->
 
315
          plugin_i # increment ctrl cid sem_name
 
316
  with
 
317
    | Not_found ->
 
318
        raise Netplex_cenv.Not_in_container_thread