1
(* Implementation of worker processes *)
5
let kept_data = Hashtbl.create 10
6
(* Maps partition ID to sortdata *)
8
let delayed_actions = ref []
9
(* List of tuples (part1_id, part2_id, f): When both partitions are in
13
let worker_clients = Hashtbl.create 10
14
(* Maps endpoint names to RPC clients *)
17
let get_worker_client endpoint =
19
Hashtbl.find worker_clients endpoint
22
let esys = (Netplex_cenv.self_cont())#event_system in
24
Netplex_sockserv.any_file_client_connector endpoint in
26
Sort1_proto_clnt.Worker.V1.create_client2
28
(`Socket(Rpc.Tcp, connector, Rpc_client.default_socket_config)) in
29
Hashtbl.replace worker_clients endpoint client;
33
let controller_clients = Hashtbl.create 10
34
(* Maps endpoint names to RPC clients. Should only be one *)
36
let get_controller_client endpoint =
38
Hashtbl.find controller_clients endpoint
41
let esys = (Netplex_cenv.self_cont())#event_system in
43
Netplex_sockserv.any_file_client_connector endpoint in
45
Sort1_proto_clnt.Controller.V1.create_client2
47
(`Socket(Rpc.Tcp, connector, Rpc_client.default_socket_config)) in
48
Hashtbl.replace controller_clients endpoint client;
52
let check_for_errors name get_reply =
54
let () = get_reply() in
58
Netplex_cenv.logf `Err
59
"Got exception when calling %s: %s"
61
(Netexn.to_string error)
64
let check_for_delayed_actions() =
65
let new_delayed_actions = ref [] in
68
if Hashtbl.mem kept_data p1 && Hashtbl.mem kept_data p2 then (
69
(* We run this within "once" so exceptions can be easily handled *)
70
let esys = (Netplex_cenv.self_cont()) # event_system in
71
let g = Unixqueue.new_group esys in
72
Unixqueue.once esys g 0.0 f
75
new_delayed_actions := (p1,p2,f) :: !new_delayed_actions
78
delayed_actions := List.rev !new_delayed_actions
81
let execute_op part_id data cont =
84
Hashtbl.replace kept_data part_id data;
85
check_for_delayed_actions()
88
let client = get_worker_client fwd.destination in
89
Sort1_proto_clnt.Worker.V1.merge_partition'async
91
(fwd.merge_with_partition_id,
96
(check_for_errors "merge_partition")
99
let client = get_controller_client ep in
100
Sort1_proto_clnt.Controller.V1.return_result'async
103
(check_for_errors "return_result")
106
let proc_sort_partition session (part_id, data, cont) emit =
107
Array.sort String.compare data;
109
execute_op part_id data cont
112
let merge part1_id part2_id partr_id cont () =
114
try Hashtbl.find kept_data part1_id
115
with Not_found -> assert false in
117
try Hashtbl.find kept_data part2_id
118
with Not_found -> assert false in
119
Hashtbl.remove kept_data part1_id;
120
Hashtbl.remove kept_data part2_id;
122
let l1 = Array.length data1 in
123
let l2 = Array.length data2 in
124
let datar = Array.create (l1 + l2) "" in
128
while !k1 < l1 && !k2 < l2 do
129
if data1.( !k1 ) < data2.( !k2 ) then (
130
datar.( !kr ) <- data1.( !k1 );
135
datar.( !kr ) <- data2.( !k2 );
141
Array.blit data1 !k1 datar !kr (l1 - !k1);
143
Array.blit data2 !k2 datar !kr (l2 - !k2);
144
execute_op partr_id datar cont
147
let proc_merge_partition session
148
(part1_id, part2_id, data2, partr_id, cont)
150
Hashtbl.replace kept_data part2_id data2;
152
(part1_id, part2_id, merge part1_id part2_id partr_id cont) ::
155
check_for_delayed_actions()
158
let configure cf addr = ()
161
Sort1_proto_srv.Worker.V1.bind_async
162
~proc_null:(fun _ _ emit -> emit ())
164
~proc_merge_partition
167
let worker_factory() =
168
Rpc_netplex.rpc_factory
174
inherit Netplex_kit.empty_processor_hooks()
175
method post_start_hook _ =
177
Netplex_cenv.create_timer