~ubuntu-branches/ubuntu/trusty/ocamlnet/trusty

« back to all changes in this revision

Viewing changes to src/netplex/netplex_workload.ml

  • Committer: Bazaar Package Importer
  • Author(s): Stéphane Glondu
  • Date: 2011-09-02 14:12:33 UTC
  • mfrom: (18.2.3 sid)
  • Revision ID: james.westby@ubuntu.com-20110902141233-zbj0ygxb92u6gy4z
Tags: 3.4-1
* New upstream release
  - add a new NetcgiRequire directive to ease dependency management
    (Closes: #637147)
  - remove patches that were applied upstream:
    + Added-missing-shebang-lines-in-example-shell-scripts
    + Try-also-ocamlc-for-POSIX-threads

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
(* $Id: netplex_workload.ml 1054 2006-12-17 16:35:06Z gerd $ *)
 
1
(* $Id: netplex_workload.ml 1415 2010-02-15 23:58:25Z gerd $ *)
2
2
 
3
3
open Netplex_types
4
 
 
5
 
let debug_log logger_lz s =
6
 
  (Lazy.force logger_lz) # log 
7
 
    ~component:"netplex.controller"
8
 
    ~level:`Debug
9
 
    ~message:s
10
 
 
11
 
let debug_logf logger_lz msgf =
12
 
  Printf.kprintf (debug_log logger_lz) msgf
13
 
 
14
 
 
15
 
 
16
 
class constant_workload_manager num_threads : workload_manager =
 
4
open Printf
 
5
 
 
6
module Debug = struct
 
7
  let enable = ref false
 
8
end
 
9
 
 
10
let dlog = Netlog.Debug.mk_dlog "Netplex_workload" Debug.enable
 
11
let dlogr = Netlog.Debug.mk_dlogr "Netplex_workload" Debug.enable
 
12
 
 
13
let () =
 
14
  Netlog.Debug.register_module "Netplex_workload" Debug.enable
 
15
 
 
16
class constant_workload_manager ?(restart=true) num_threads : workload_manager =
17
17
object(self)
 
18
  val mutable allow_adjust = true
 
19
 
18
20
  method hello controller =
19
21
    ()
20
22
      (* TODO: Announce the availability of admin messages *)
26
28
  method adjust sockserv sockctrl =
27
29
    match sockctrl # state with
28
30
      | `Enabled ->
29
 
          let l = sockctrl # container_state in
30
 
          let n = List.length l in
31
 
          if n < num_threads then (
32
 
            sockctrl # start_containers (num_threads - n)
 
31
          if allow_adjust then (
 
32
            let l = sockctrl # container_state in
 
33
            let n = List.length l in
 
34
            if n < num_threads then (
 
35
              let _n_started =
 
36
                sockctrl # start_containers (num_threads - n) in
 
37
              (* If less containers could be started, we ignore the problem.
 
38
                 [adjust] will be called again, and the problem will be fixed.
 
39
                 Hopefully... We cannot do much more here.
 
40
               *)
 
41
              ()
 
42
            );
 
43
            if not restart then allow_adjust <- false
33
44
          )
 
45
      | `Disabled ->
 
46
          allow_adjust <- true
34
47
      | _ ->
35
48
          ()
36
49
 
44
57
end
45
58
 
46
59
 
47
 
let create_constant_workload_manager n =
48
 
  new constant_workload_manager n
 
60
let create_constant_workload_manager =
 
61
  new constant_workload_manager
49
62
 
50
63
 
51
64
let constant_workload_manager_factory =
104
117
      | `Busy -> max_int
105
118
      | `Shutting_down -> max_int in 
106
119
  List.sort
107
 
    (fun (cid1,s1,_) (cid2,s2,_) ->
 
120
    (fun (cid1,_,s1,_) (cid2,_,s2,_) ->
108
121
       let n1 = weight s1 in
109
122
       let n2 = weight s2 in
110
123
       n1 - n2)
158
171
          let active_threads =
159
172
            List.length
160
173
              (List.filter 
161
 
                 (fun (cid,s,_) -> 
 
174
                 (fun (cid,_,s,_) -> 
162
175
                    not (ContMap.mem cid inactivated_conts) 
163
176
                    && s <> `Shutting_down) 
164
177
                 container_state) in
168
181
          (* Determine used capacity: *)
169
182
          let used_cap =
170
183
            List.fold_left
171
 
              (fun acc (cid,s,_) ->
 
184
              (fun acc (cid,_,s,_) ->
172
185
                 if ContMap.mem cid inactivated_conts then
173
186
                   acc
174
187
                 else
184
197
          (* Free capacity: *)
185
198
          let free_cap = total_cap - used_cap in
186
199
 
187
 
          if !Netplex_log.debug_scheduling then (
188
 
            debug_logf logger
189
 
              "Dyn workload mng %s: total_threads=%d avail_threads=%d total_cap=%d used_cap=%d"
190
 
              sockserv#name all_threads active_threads
191
 
              total_cap used_cap
192
 
          );
 
200
          dlogr
 
201
            (fun () ->
 
202
               sprintf
 
203
                 "Service %s: \
 
204
                  total_threads=%d avail_threads=%d total_cap=%d used_cap=%d"
 
205
                 sockserv#name all_threads active_threads
 
206
                 total_cap used_cap
 
207
            );
193
208
 
194
209
          (* Now decide... *)
195
210
          if free_cap < config#min_free_job_capacity then (
209
224
                  containers_by_attractivity container_state in
210
225
                let n = ref 0 in
211
226
                List.iter
212
 
                  (fun (cid,s,selected) ->
 
227
                  (fun (cid,_,s,selected) ->
213
228
                     let already_inactivated =
214
229
                       ContMap.mem cid inactivated_conts in
215
230
                     if !n < exceeding_threads && not already_inactivated 
249
264
      containers_by_attractivity container_state in
250
265
    let l = ref [] in
251
266
    List.iter
252
 
      (fun (cid, s, selected) ->
 
267
      (fun (cid, _, s, selected) ->
253
268
         try
254
269
           if !n <= 0 then raise Not_found;
255
270
           let g_opt = ContMap.find cid inactivated_conts in
283
298
      (fun cid ->
284
299
         inactivated_conts <- ContMap.remove cid inactivated_conts)
285
300
      !l;
286
 
    if !Netplex_log.debug_scheduling && !l <> [] then (
287
 
      debug_logf logger
288
 
        "Dyn workload mng %s: Reclaiming %d inactivated containers"
289
 
        sockserv#name (List.length !l)
290
 
    );
 
301
    if !l <> [] then
 
302
      dlogr
 
303
        (fun () ->
 
304
           sprintf
 
305
             "Service %s: Reclaiming %d inactivated containers"
 
306
             sockserv#name (List.length !l));
291
307
    (* Second pass: If needed, start further containers: *)
292
308
    let started_ocap = ref 0 in
293
309
    if !n > 0 then (
295
311
        (!n-1) / config#recommended_jobs_per_thread + 1 in
296
312
      let needed_threads' =
297
313
        min (max 0 (config#max_threads - all_threads)) needed_threads in
298
 
      sockctrl # start_containers needed_threads';
299
 
      let cap = needed_threads' * config#recommended_jobs_per_thread in
300
 
      let ocap = needed_threads' * config#max_jobs_per_thread in
 
314
      let started_threads = sockctrl # start_containers needed_threads' in
 
315
      (* If started_threads < needed_threads', we ignore the problem. *)
 
316
      let cap = started_threads * config#recommended_jobs_per_thread in
 
317
      let ocap = started_threads * config#max_jobs_per_thread in
301
318
      n := !n - cap;
302
319
      n_overload := !n_overload - ocap;
303
320
      started_ocap := !started_ocap + ocap;
309
326
       *)
310
327
      let l = ref [] in
311
328
      List.iter
312
 
        (fun (cid, s, selected) ->
 
329
        (fun (cid, _, s, selected) ->
313
330
           try
314
331
             if !n_overload <= 0 then raise Not_found;
315
332
             let g_opt = ContMap.find cid inactivated_conts in
338
355
        (fun cid ->
339
356
           inactivated_conts <- ContMap.remove cid inactivated_conts)
340
357
        !l;
341
 
      if !Netplex_log.debug_scheduling && !l <> [] then (
342
 
        debug_logf logger
343
 
          "Dyn workload mng %s: Reclaiming %d inactivated but overloaded containers"
344
 
          sockserv#name (List.length !l)
345
 
      );
 
358
      if !l <> [] then
 
359
        dlogr
 
360
          (fun () ->
 
361
             sprintf "Service %s: \
 
362
                     Reclaiming %d inactivated but overloaded containers"
 
363
               sockserv#name (List.length !l)
 
364
          );
346
365
    );
347
366
    (* Check whether we reach the capacity limit. *)
348
367
    let limit_reached =
353
372
         *)
354
373
        let avail_cap = ref !started_ocap in
355
374
        List.iter
356
 
          (fun (_, s, _) ->
 
375
          (fun (_, _, s, _) ->
357
376
             match s with
358
377
               | `Accepting(m,_) ->
359
378
                   let od = max 0 (config#max_jobs_per_thread - m) in
390
409
    inactivated_conts <- ContMap.add cid None inactivated_conts;
391
410
    limit_alert <- false;
392
411
 
393
 
    if !Netplex_log.debug_scheduling then (
394
 
      debug_logf logger
395
 
        "Dyn workload mng %s: Inactivating 1 container"
396
 
        sockserv#name
397
 
    );
 
412
    dlogr
 
413
      (fun () ->
 
414
         sprintf "Service %s: Inactivating 1 container"
 
415
           sockserv#name
 
416
      )
398
417
 
399
418
 
400
419
  method private inactivation_check sockserv sockctrl =
404
423
     *)
405
424
    let container_state = sockctrl # container_state in
406
425
    List.iter
407
 
      (fun (cid, s, selected) ->
 
426
      (fun (cid, _, s, selected) ->
408
427
         try
409
428
           let g_opt = ContMap.find cid inactivated_conts in
410
429
           assert(not selected);
415
434
            *)
416
435
           match (g_opt, s) with
417
436
             | None, `Accepting(0,_) ->
418
 
                 if !Netplex_log.debug_scheduling then (
419
 
                   debug_logf logger
420
 
                     "Dyn workload mng %s: Inactivated container becomes idle"
421
 
                     sockserv#name
422
 
                 );
 
437
                 dlogr
 
438
                   (fun () ->
 
439
                      sprintf "Service %s: Inactivated container becomes idle"
 
440
                        sockserv#name
 
441
                   );
423
442
                 let esys = Lazy.force esys in
424
443
                 let g = Unixqueue.new_group esys in
425
444
                 Unixqueue.once