~ubuntu-branches/ubuntu/trusty/ocamlnet/trusty

« back to all changes in this revision

Viewing changes to examples/rpc/sort/sort1_worker.ml

  • Committer: Bazaar Package Importer
  • Author(s): Stéphane Glondu
  • Date: 2011-09-02 14:12:33 UTC
  • mfrom: (18.2.3 sid)
  • Revision ID: james.westby@ubuntu.com-20110902141233-zbj0ygxb92u6gy4z
Tags: 3.4-1
* New upstream release
  - add a new NetcgiRequire directive to ease dependency management
    (Closes: #637147)
  - remove patches that were applied upstream:
    + Added-missing-shebang-lines-in-example-shell-scripts
    + Try-also-ocamlc-for-POSIX-threads

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
(* Implementation of worker processes *)
 
2
 
 
3
open Sort1_proto_aux
 
4
 
 
5
let kept_data = Hashtbl.create 10
 
6
  (* Maps partition ID to sortdata *)
 
7
 
 
8
let delayed_actions = ref []
 
9
  (* List of tuples (part1_id, part2_id, f): When both partitions are in
 
10
     [kept_data], run f
 
11
   *)
 
12
 
 
13
let worker_clients = Hashtbl.create 10
 
14
  (* Maps endpoint names to RPC clients *)
 
15
 
 
16
 
 
17
let get_worker_client endpoint =
 
18
  try
 
19
    Hashtbl.find worker_clients endpoint
 
20
  with
 
21
    | Not_found ->
 
22
        let esys = (Netplex_cenv.self_cont())#event_system in
 
23
        let connector =
 
24
          Netplex_sockserv.any_file_client_connector endpoint in
 
25
        let client =
 
26
          Sort1_proto_clnt.Worker.V1.create_client2
 
27
            ~esys
 
28
            (`Socket(Rpc.Tcp, connector, Rpc_client.default_socket_config)) in
 
29
        Hashtbl.replace worker_clients endpoint client;
 
30
        client
 
31
 
 
32
 
 
33
let controller_clients = Hashtbl.create 10
 
34
  (* Maps endpoint names to RPC clients. Should only be one *)
 
35
 
 
36
let get_controller_client endpoint =
 
37
    try
 
38
    Hashtbl.find controller_clients endpoint
 
39
  with
 
40
    | Not_found ->
 
41
        let esys = (Netplex_cenv.self_cont())#event_system in
 
42
        let connector =
 
43
          Netplex_sockserv.any_file_client_connector endpoint in
 
44
        let client =
 
45
          Sort1_proto_clnt.Controller.V1.create_client2
 
46
            ~esys
 
47
            (`Socket(Rpc.Tcp, connector, Rpc_client.default_socket_config)) in
 
48
        Hashtbl.replace controller_clients endpoint client;
 
49
        client
 
50
 
 
51
 
 
52
let check_for_errors name get_reply =
 
53
  try
 
54
    let () = get_reply() in
 
55
    ()
 
56
  with
 
57
    | error ->
 
58
        Netplex_cenv.logf `Err
 
59
          "Got exception when calling %s: %s"
 
60
          name
 
61
          (Netexn.to_string error)
 
62
 
 
63
 
 
64
let check_for_delayed_actions() =
 
65
  let new_delayed_actions = ref [] in
 
66
  List.iter
 
67
    (fun (p1,p2,f) ->
 
68
       if Hashtbl.mem kept_data p1 && Hashtbl.mem kept_data p2 then (
 
69
         (* We run this within "once" so exceptions can be easily handled *)
 
70
         let esys = (Netplex_cenv.self_cont()) # event_system in
 
71
         let g = Unixqueue.new_group esys in
 
72
         Unixqueue.once esys g 0.0 f
 
73
       )
 
74
       else
 
75
         new_delayed_actions := (p1,p2,f) :: !new_delayed_actions
 
76
    )
 
77
    !delayed_actions;
 
78
  delayed_actions := List.rev !new_delayed_actions
 
79
 
 
80
 
 
81
let execute_op part_id data cont =
 
82
  match cont with
 
83
    | `keep ->
 
84
        Hashtbl.replace kept_data part_id data;
 
85
        check_for_delayed_actions()
 
86
 
 
87
    | `forward fwd ->
 
88
        let client = get_worker_client fwd.destination in
 
89
        Sort1_proto_clnt.Worker.V1.merge_partition'async
 
90
          client
 
91
          (fwd.merge_with_partition_id,
 
92
           part_id,
 
93
           data,
 
94
           fwd.new_partition_id,
 
95
           fwd.continuation)
 
96
          (check_for_errors "merge_partition")
 
97
 
 
98
    | `return ep ->
 
99
        let client = get_controller_client ep in
 
100
        Sort1_proto_clnt.Controller.V1.return_result'async
 
101
          client
 
102
          data
 
103
          (check_for_errors "return_result")
 
104
 
 
105
 
 
106
let proc_sort_partition session (part_id, data, cont) emit =
 
107
  Array.sort String.compare data;
 
108
  emit ();
 
109
  execute_op part_id data cont
 
110
 
 
111
 
 
112
let merge part1_id part2_id partr_id cont () =
 
113
  let data1 =
 
114
    try Hashtbl.find kept_data part1_id
 
115
    with Not_found -> assert false in
 
116
  let data2 =
 
117
    try Hashtbl.find kept_data part2_id
 
118
    with Not_found -> assert false in
 
119
  Hashtbl.remove kept_data part1_id;
 
120
  Hashtbl.remove kept_data part2_id;
 
121
  Gc.major();
 
122
  let l1 = Array.length data1 in
 
123
  let l2 = Array.length data2 in
 
124
  let datar = Array.create (l1 + l2) "" in
 
125
  let k1 = ref 0 in
 
126
  let k2 = ref 0 in
 
127
  let kr = ref 0 in
 
128
  while !k1 < l1 && !k2 < l2 do
 
129
    if data1.( !k1 ) < data2.( !k2 ) then (
 
130
      datar.( !kr ) <- data1.( !k1 );
 
131
      incr k1;
 
132
      incr kr
 
133
    )
 
134
    else (
 
135
      datar.( !kr ) <- data2.( !k2 );
 
136
      incr k2;
 
137
      incr kr
 
138
    )
 
139
  done;
 
140
  if !k1 < l1 then
 
141
    Array.blit data1 !k1 datar !kr (l1 - !k1);
 
142
  if !k2 < l2 then
 
143
    Array.blit data2 !k2 datar !kr (l2 - !k2);
 
144
  execute_op partr_id datar cont
 
145
 
 
146
 
 
147
let proc_merge_partition session
 
148
                         (part1_id, part2_id, data2, partr_id, cont)
 
149
                         emit =
 
150
  Hashtbl.replace kept_data part2_id data2;
 
151
  delayed_actions := 
 
152
    (part1_id, part2_id, merge part1_id part2_id partr_id cont) :: 
 
153
      !delayed_actions;
 
154
  emit ();
 
155
  check_for_delayed_actions()
 
156
 
 
157
 
 
158
let configure cf addr = ()
 
159
 
 
160
let setup srv () =
 
161
  Sort1_proto_srv.Worker.V1.bind_async
 
162
    ~proc_null:(fun _ _ emit -> emit ())
 
163
    ~proc_sort_partition
 
164
    ~proc_merge_partition
 
165
    srv
 
166
 
 
167
let worker_factory() =
 
168
  Rpc_netplex.rpc_factory
 
169
    ~name:"sort_worker"
 
170
    ~configure
 
171
    ~setup
 
172
    ~hooks:(fun _ ->
 
173
              object(self)
 
174
                inherit Netplex_kit.empty_processor_hooks() 
 
175
                method post_start_hook _ =
 
176
                  let _t =
 
177
                    Netplex_cenv.create_timer
 
178
                      (fun _ -> 
 
179
                         Gc.major();
 
180
                         true
 
181
                      )
 
182
                      1.0 in
 
183
                  ()
 
184
              end
 
185
           )
 
186
    ()
 
187