~ubuntu-branches/ubuntu/saucy/rabbitmq-server/saucy

« back to all changes in this revision

Viewing changes to src/rabbit_control_main.erl

  • Committer: Package Import Robot
  • Author(s): Emile Joubert
  • Date: 2012-11-19 11:42:31 UTC
  • mfrom: (0.2.18) (0.1.32 sid)
  • Revision ID: package-import@ubuntu.com-20121119114231-hvapkn4akng09etr
Tags: 3.0.0-1
New upstream release

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
%% The contents of this file are subject to the Mozilla Public License
 
2
%% Version 1.1 (the "License"); you may not use this file except in
 
3
%% compliance with the License. You may obtain a copy of the License
 
4
%% at http://www.mozilla.org/MPL/
 
5
%%
 
6
%% Software distributed under the License is distributed on an "AS IS"
 
7
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
 
8
%% the License for the specific language governing rights and
 
9
%% limitations under the License.
 
10
%%
 
11
%% The Original Code is RabbitMQ.
 
12
%%
 
13
%% The Initial Developer of the Original Code is VMware, Inc.
 
14
%% Copyright (c) 2007-2012 VMware, Inc.  All rights reserved.
 
15
%%
 
16
 
 
17
-module(rabbit_control_main).
 
18
-include("rabbit.hrl").
 
19
 
 
20
-export([start/0, stop/0, action/5]).
 
21
 
 
22
-define(RPC_TIMEOUT, infinity).
 
23
-define(EXTERNAL_CHECK_INTERVAL, 1000).
 
24
 
 
25
-define(QUIET_OPT, "-q").
 
26
-define(NODE_OPT, "-n").
 
27
-define(VHOST_OPT, "-p").
 
28
-define(RAM_OPT, "--ram").
 
29
-define(OFFLINE_OPT, "--offline").
 
30
 
 
31
-define(QUIET_DEF, {?QUIET_OPT, flag}).
 
32
-define(NODE_DEF(Node), {?NODE_OPT, {option, Node}}).
 
33
-define(VHOST_DEF, {?VHOST_OPT, {option, "/"}}).
 
34
-define(RAM_DEF, {?RAM_OPT, flag}).
 
35
-define(OFFLINE_DEF, {?OFFLINE_OPT, flag}).
 
36
 
 
37
-define(GLOBAL_DEFS(Node), [?QUIET_DEF, ?NODE_DEF(Node)]).
 
38
 
 
39
-define(COMMANDS,
 
40
        [stop,
 
41
         stop_app,
 
42
         start_app,
 
43
         wait,
 
44
         reset,
 
45
         force_reset,
 
46
         rotate_logs,
 
47
 
 
48
         {join_cluster, [?RAM_DEF]},
 
49
         change_cluster_node_type,
 
50
         update_cluster_nodes,
 
51
         {forget_cluster_node, [?OFFLINE_DEF]},
 
52
         cluster_status,
 
53
 
 
54
         add_user,
 
55
         delete_user,
 
56
         change_password,
 
57
         clear_password,
 
58
         set_user_tags,
 
59
         list_users,
 
60
 
 
61
         add_vhost,
 
62
         delete_vhost,
 
63
         list_vhosts,
 
64
         {set_permissions, [?VHOST_DEF]},
 
65
         {clear_permissions, [?VHOST_DEF]},
 
66
         {list_permissions, [?VHOST_DEF]},
 
67
         list_user_permissions,
 
68
 
 
69
         {set_parameter, [?VHOST_DEF]},
 
70
         {clear_parameter, [?VHOST_DEF]},
 
71
         {list_parameters, [?VHOST_DEF]},
 
72
 
 
73
         {set_policy, [?VHOST_DEF]},
 
74
         {clear_policy, [?VHOST_DEF]},
 
75
         {list_policies, [?VHOST_DEF]},
 
76
 
 
77
         {list_queues, [?VHOST_DEF]},
 
78
         {list_exchanges, [?VHOST_DEF]},
 
79
         {list_bindings, [?VHOST_DEF]},
 
80
         {list_connections, [?VHOST_DEF]},
 
81
         list_channels,
 
82
         {list_consumers, [?VHOST_DEF]},
 
83
         status,
 
84
         environment,
 
85
         report,
 
86
         eval,
 
87
 
 
88
         close_connection,
 
89
         {trace_on, [?VHOST_DEF]},
 
90
         {trace_off, [?VHOST_DEF]},
 
91
         set_vm_memory_high_watermark
 
92
        ]).
 
93
 
 
94
-define(GLOBAL_QUERIES,
 
95
        [{"Connections", rabbit_networking, connection_info_all,
 
96
          connection_info_keys},
 
97
         {"Channels",  rabbit_channel,  info_all, info_keys}]).
 
98
 
 
99
-define(VHOST_QUERIES,
 
100
        [{"Queues",    rabbit_amqqueue, info_all, info_keys},
 
101
         {"Exchanges", rabbit_exchange, info_all, info_keys},
 
102
         {"Bindings",  rabbit_binding,  info_all, info_keys},
 
103
         {"Consumers", rabbit_amqqueue, consumers_all, consumer_info_keys},
 
104
         {"Permissions", rabbit_auth_backend_internal, list_vhost_permissions,
 
105
          vhost_perms_info_keys},
 
106
         {"Policies",   rabbit_policy,             list_formatted, info_keys},
 
107
         {"Parameters", rabbit_runtime_parameters, list_formatted, info_keys}]).
 
108
 
 
109
%%----------------------------------------------------------------------------
 
110
 
 
111
-ifdef(use_specs).
 
112
 
 
113
-spec(start/0 :: () -> no_return()).
 
114
-spec(stop/0 :: () -> 'ok').
 
115
-spec(action/5 ::
 
116
        (atom(), node(), [string()], [{string(), any()}],
 
117
         fun ((string(), [any()]) -> 'ok'))
 
118
        -> 'ok').
 
119
-spec(usage/0 :: () -> no_return()).
 
120
 
 
121
-endif.
 
122
 
 
123
%%----------------------------------------------------------------------------
 
124
 
 
125
start() ->
 
126
    {ok, [[NodeStr|_]|_]} = init:get_argument(nodename),
 
127
    {Command, Opts, Args} =
 
128
        case rabbit_misc:parse_arguments(?COMMANDS, ?GLOBAL_DEFS(NodeStr),
 
129
                                         init:get_plain_arguments())
 
130
        of
 
131
            {ok, Res}  -> Res;
 
132
            no_command -> print_error("could not recognise command", []),
 
133
                          usage()
 
134
        end,
 
135
    Opts1 = [case K of
 
136
                 ?NODE_OPT -> {?NODE_OPT, rabbit_nodes:make(V)};
 
137
                 _         -> {K, V}
 
138
             end || {K, V} <- Opts],
 
139
    Quiet = proplists:get_bool(?QUIET_OPT, Opts1),
 
140
    Node = proplists:get_value(?NODE_OPT, Opts1),
 
141
    Inform = case Quiet of
 
142
                 true  -> fun (_Format, _Args1) -> ok end;
 
143
                 false -> fun (Format, Args1) ->
 
144
                                  io:format(Format ++ " ...~n", Args1)
 
145
                          end
 
146
             end,
 
147
    PrintInvalidCommandError =
 
148
        fun () ->
 
149
                print_error("invalid command '~s'",
 
150
                            [string:join([atom_to_list(Command) | Args], " ")])
 
151
        end,
 
152
 
 
153
    %% The reason we don't use a try/catch here is that rpc:call turns
 
154
    %% thrown errors into normal return values
 
155
    case catch action(Command, Node, Args, Opts, Inform) of
 
156
        ok ->
 
157
            case Quiet of
 
158
                true  -> ok;
 
159
                false -> io:format("...done.~n")
 
160
            end,
 
161
            rabbit_misc:quit(0);
 
162
        {'EXIT', {function_clause, [{?MODULE, action, _}    | _]}} -> %% < R15
 
163
            PrintInvalidCommandError(),
 
164
            usage();
 
165
        {'EXIT', {function_clause, [{?MODULE, action, _, _} | _]}} -> %% >= R15
 
166
            PrintInvalidCommandError(),
 
167
            usage();
 
168
        {'EXIT', {badarg, _}} ->
 
169
            print_error("invalid parameter: ~p", [Args]),
 
170
            usage();
 
171
        {error, {Problem, Reason}} when is_atom(Problem); is_binary(Reason) ->
 
172
            %% We handle this common case specially to avoid ~p since
 
173
            %% that has i18n issues
 
174
            print_error("~s: ~s", [Problem, Reason]),
 
175
            rabbit_misc:quit(2);
 
176
        {error, Reason} ->
 
177
            print_error("~p", [Reason]),
 
178
            rabbit_misc:quit(2);
 
179
        {error_string, Reason} ->
 
180
            print_error("~s", [Reason]),
 
181
            rabbit_misc:quit(2);
 
182
        {badrpc, {'EXIT', Reason}} ->
 
183
            print_error("~p", [Reason]),
 
184
            rabbit_misc:quit(2);
 
185
        {badrpc, Reason} ->
 
186
            print_error("unable to connect to node ~w: ~w", [Node, Reason]),
 
187
            print_badrpc_diagnostics(Node),
 
188
            rabbit_misc:quit(2);
 
189
        Other ->
 
190
            print_error("~p", [Other]),
 
191
            rabbit_misc:quit(2)
 
192
    end.
 
193
 
 
194
fmt_stderr(Format, Args) -> rabbit_misc:format_stderr(Format ++ "~n", Args).
 
195
 
 
196
print_report(Node, {Descr, Module, InfoFun, KeysFun}) ->
 
197
    io:format("~s:~n", [Descr]),
 
198
    print_report0(Node, {Module, InfoFun, KeysFun}, []).
 
199
 
 
200
print_report(Node, {Descr, Module, InfoFun, KeysFun}, VHostArg) ->
 
201
    io:format("~s on ~s:~n", [Descr, VHostArg]),
 
202
    print_report0(Node, {Module, InfoFun, KeysFun}, VHostArg).
 
203
 
 
204
print_report0(Node, {Module, InfoFun, KeysFun}, VHostArg) ->
 
205
    case rpc_call(Node, Module, InfoFun, VHostArg) of
 
206
        [_|_] = Results -> InfoItems = rpc_call(Node, Module, KeysFun, []),
 
207
                           display_row([atom_to_list(I) || I <- InfoItems]),
 
208
                           display_info_list(Results, InfoItems);
 
209
        _               -> ok
 
210
    end,
 
211
    io:nl().
 
212
 
 
213
print_error(Format, Args) -> fmt_stderr("Error: " ++ Format, Args).
 
214
 
 
215
print_badrpc_diagnostics(Node) ->
 
216
    fmt_stderr(rabbit_nodes:diagnostics([Node]), []).
 
217
 
 
218
stop() ->
 
219
    ok.
 
220
 
 
221
usage() ->
 
222
    io:format("~s", [rabbit_ctl_usage:usage()]),
 
223
    rabbit_misc:quit(1).
 
224
 
 
225
%%----------------------------------------------------------------------------
 
226
 
 
227
action(stop, Node, Args, _Opts, Inform) ->
 
228
    Inform("Stopping and halting node ~p", [Node]),
 
229
    Res = call(Node, {rabbit, stop_and_halt, []}),
 
230
    case {Res, Args} of
 
231
        {ok, [PidFile]} -> wait_for_process_death(
 
232
                             read_pid_file(PidFile, false));
 
233
        {ok, [_, _| _]} -> exit({badarg, Args});
 
234
        _               -> ok
 
235
    end,
 
236
    Res;
 
237
 
 
238
action(stop_app, Node, [], _Opts, Inform) ->
 
239
    Inform("Stopping node ~p", [Node]),
 
240
    call(Node, {rabbit, stop, []});
 
241
 
 
242
action(start_app, Node, [], _Opts, Inform) ->
 
243
    Inform("Starting node ~p", [Node]),
 
244
    call(Node, {rabbit, start, []});
 
245
 
 
246
action(reset, Node, [], _Opts, Inform) ->
 
247
    Inform("Resetting node ~p", [Node]),
 
248
    call(Node, {rabbit_mnesia, reset, []});
 
249
 
 
250
action(force_reset, Node, [], _Opts, Inform) ->
 
251
    Inform("Forcefully resetting node ~p", [Node]),
 
252
    call(Node, {rabbit_mnesia, force_reset, []});
 
253
 
 
254
action(join_cluster, Node, [ClusterNodeS], Opts, Inform) ->
 
255
    ClusterNode = list_to_atom(ClusterNodeS),
 
256
    NodeType = case proplists:get_bool(?RAM_OPT, Opts) of
 
257
                   true  -> ram;
 
258
                   false -> disc
 
259
               end,
 
260
    Inform("Clustering node ~p with ~p", [Node, ClusterNode]),
 
261
    rpc_call(Node, rabbit_mnesia, join_cluster, [ClusterNode, NodeType]);
 
262
 
 
263
action(change_cluster_node_type, Node, ["ram"], _Opts, Inform) ->
 
264
    Inform("Turning ~p into a ram node", [Node]),
 
265
    rpc_call(Node, rabbit_mnesia, change_cluster_node_type, [ram]);
 
266
action(change_cluster_node_type, Node, [Type], _Opts, Inform)
 
267
  when Type =:= "disc" orelse Type =:= "disk" ->
 
268
    Inform("Turning ~p into a disc node", [Node]),
 
269
    rpc_call(Node, rabbit_mnesia, change_cluster_node_type, [disc]);
 
270
 
 
271
action(update_cluster_nodes, Node, [ClusterNodeS], _Opts, Inform) ->
 
272
    ClusterNode = list_to_atom(ClusterNodeS),
 
273
    Inform("Updating cluster nodes for ~p from ~p", [Node, ClusterNode]),
 
274
    rpc_call(Node, rabbit_mnesia, update_cluster_nodes, [ClusterNode]);
 
275
 
 
276
action(forget_cluster_node, Node, [ClusterNodeS], Opts, Inform) ->
 
277
    ClusterNode = list_to_atom(ClusterNodeS),
 
278
    RemoveWhenOffline = proplists:get_bool(?OFFLINE_OPT, Opts),
 
279
    Inform("Removing node ~p from cluster", [ClusterNode]),
 
280
    rpc_call(Node, rabbit_mnesia, forget_cluster_node,
 
281
             [ClusterNode, RemoveWhenOffline]);
 
282
 
 
283
action(wait, Node, [PidFile], _Opts, Inform) ->
 
284
    Inform("Waiting for ~p", [Node]),
 
285
    wait_for_application(Node, PidFile, rabbit_and_plugins, Inform);
 
286
action(wait, Node, [PidFile, App], _Opts, Inform) ->
 
287
    Inform("Waiting for ~p on ~p", [App, Node]),
 
288
    wait_for_application(Node, PidFile, list_to_atom(App), Inform);
 
289
 
 
290
action(status, Node, [], _Opts, Inform) ->
 
291
    Inform("Status of node ~p", [Node]),
 
292
    display_call_result(Node, {rabbit, status, []});
 
293
 
 
294
action(cluster_status, Node, [], _Opts, Inform) ->
 
295
    Inform("Cluster status of node ~p", [Node]),
 
296
    display_call_result(Node, {rabbit_mnesia, status, []});
 
297
 
 
298
action(environment, Node, _App, _Opts, Inform) ->
 
299
    Inform("Application environment of node ~p", [Node]),
 
300
    display_call_result(Node, {rabbit, environment, []});
 
301
 
 
302
action(rotate_logs, Node, [], _Opts, Inform) ->
 
303
    Inform("Reopening logs for node ~p", [Node]),
 
304
    call(Node, {rabbit, rotate_logs, [""]});
 
305
action(rotate_logs, Node, Args = [Suffix], _Opts, Inform) ->
 
306
    Inform("Rotating logs to files with suffix \"~s\"", [Suffix]),
 
307
    call(Node, {rabbit, rotate_logs, Args});
 
308
 
 
309
action(close_connection, Node, [PidStr, Explanation], _Opts, Inform) ->
 
310
    Inform("Closing connection \"~s\"", [PidStr]),
 
311
    rpc_call(Node, rabbit_networking, close_connection,
 
312
             [rabbit_misc:string_to_pid(PidStr), Explanation]);
 
313
 
 
314
action(add_user, Node, Args = [Username, _Password], _Opts, Inform) ->
 
315
    Inform("Creating user \"~s\"", [Username]),
 
316
    call(Node, {rabbit_auth_backend_internal, add_user, Args});
 
317
 
 
318
action(delete_user, Node, Args = [_Username], _Opts, Inform) ->
 
319
    Inform("Deleting user \"~s\"", Args),
 
320
    call(Node, {rabbit_auth_backend_internal, delete_user, Args});
 
321
 
 
322
action(change_password, Node, Args = [Username, _Newpassword], _Opts, Inform) ->
 
323
    Inform("Changing password for user \"~s\"", [Username]),
 
324
    call(Node, {rabbit_auth_backend_internal, change_password, Args});
 
325
 
 
326
action(clear_password, Node, Args = [Username], _Opts, Inform) ->
 
327
    Inform("Clearing password for user \"~s\"", [Username]),
 
328
    call(Node, {rabbit_auth_backend_internal, clear_password, Args});
 
329
 
 
330
action(set_user_tags, Node, [Username | TagsStr], _Opts, Inform) ->
 
331
    Tags = [list_to_atom(T) || T <- TagsStr],
 
332
    Inform("Setting tags for user \"~s\" to ~p", [Username, Tags]),
 
333
    rpc_call(Node, rabbit_auth_backend_internal, set_tags,
 
334
             [list_to_binary(Username), Tags]);
 
335
 
 
336
action(list_users, Node, [], _Opts, Inform) ->
 
337
    Inform("Listing users", []),
 
338
    display_info_list(
 
339
      call(Node, {rabbit_auth_backend_internal, list_users, []}),
 
340
      rabbit_auth_backend_internal:user_info_keys());
 
341
 
 
342
action(add_vhost, Node, Args = [_VHostPath], _Opts, Inform) ->
 
343
    Inform("Creating vhost \"~s\"", Args),
 
344
    call(Node, {rabbit_vhost, add, Args});
 
345
 
 
346
action(delete_vhost, Node, Args = [_VHostPath], _Opts, Inform) ->
 
347
    Inform("Deleting vhost \"~s\"", Args),
 
348
    call(Node, {rabbit_vhost, delete, Args});
 
349
 
 
350
action(list_vhosts, Node, Args, _Opts, Inform) ->
 
351
    Inform("Listing vhosts", []),
 
352
    ArgAtoms = default_if_empty(Args, [name]),
 
353
    display_info_list(call(Node, {rabbit_vhost, info_all, []}), ArgAtoms);
 
354
 
 
355
action(list_user_permissions, Node, Args = [_Username], _Opts, Inform) ->
 
356
    Inform("Listing permissions for user ~p", Args),
 
357
    display_info_list(call(Node, {rabbit_auth_backend_internal,
 
358
                                  list_user_permissions, Args}),
 
359
                      rabbit_auth_backend_internal:user_perms_info_keys());
 
360
 
 
361
action(list_queues, Node, Args, Opts, Inform) ->
 
362
    Inform("Listing queues", []),
 
363
    VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
 
364
    ArgAtoms = default_if_empty(Args, [name, messages]),
 
365
    display_info_list(rpc_call(Node, rabbit_amqqueue, info_all,
 
366
                               [VHostArg, ArgAtoms]),
 
367
                      ArgAtoms);
 
368
 
 
369
action(list_exchanges, Node, Args, Opts, Inform) ->
 
370
    Inform("Listing exchanges", []),
 
371
    VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
 
372
    ArgAtoms = default_if_empty(Args, [name, type]),
 
373
    display_info_list(rpc_call(Node, rabbit_exchange, info_all,
 
374
                               [VHostArg, ArgAtoms]),
 
375
                      ArgAtoms);
 
376
 
 
377
action(list_bindings, Node, Args, Opts, Inform) ->
 
378
    Inform("Listing bindings", []),
 
379
    VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
 
380
    ArgAtoms = default_if_empty(Args, [source_name, source_kind,
 
381
                                       destination_name, destination_kind,
 
382
                                       routing_key, arguments]),
 
383
    display_info_list(rpc_call(Node, rabbit_binding, info_all,
 
384
                               [VHostArg, ArgAtoms]),
 
385
                      ArgAtoms);
 
386
 
 
387
action(list_connections, Node, Args, _Opts, Inform) ->
 
388
    Inform("Listing connections", []),
 
389
    ArgAtoms = default_if_empty(Args, [user, peer_host, peer_port, state]),
 
390
    display_info_list(rpc_call(Node, rabbit_networking, connection_info_all,
 
391
                               [ArgAtoms]),
 
392
                      ArgAtoms);
 
393
 
 
394
action(list_channels, Node, Args, _Opts, Inform) ->
 
395
    Inform("Listing channels", []),
 
396
    ArgAtoms = default_if_empty(Args, [pid, user, consumer_count,
 
397
                                       messages_unacknowledged]),
 
398
    display_info_list(rpc_call(Node, rabbit_channel, info_all, [ArgAtoms]),
 
399
                      ArgAtoms);
 
400
 
 
401
action(list_consumers, Node, _Args, Opts, Inform) ->
 
402
    Inform("Listing consumers", []),
 
403
    VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
 
404
    display_info_list(rpc_call(Node, rabbit_amqqueue, consumers_all, [VHostArg]),
 
405
                      rabbit_amqqueue:consumer_info_keys());
 
406
 
 
407
action(trace_on, Node, [], Opts, Inform) ->
 
408
    VHost = proplists:get_value(?VHOST_OPT, Opts),
 
409
    Inform("Starting tracing for vhost \"~s\"", [VHost]),
 
410
    rpc_call(Node, rabbit_trace, start, [list_to_binary(VHost)]);
 
411
 
 
412
action(trace_off, Node, [], Opts, Inform) ->
 
413
    VHost = proplists:get_value(?VHOST_OPT, Opts),
 
414
    Inform("Stopping tracing for vhost \"~s\"", [VHost]),
 
415
    rpc_call(Node, rabbit_trace, stop, [list_to_binary(VHost)]);
 
416
 
 
417
action(set_vm_memory_high_watermark, Node, [Arg], _Opts, Inform) ->
 
418
    Frac = list_to_float(case string:chr(Arg, $.) of
 
419
                             0 -> Arg ++ ".0";
 
420
                             _ -> Arg
 
421
                         end),
 
422
    Inform("Setting memory threshold on ~p to ~p", [Node, Frac]),
 
423
    rpc_call(Node, vm_memory_monitor, set_vm_memory_high_watermark, [Frac]);
 
424
 
 
425
action(set_permissions, Node, [Username, CPerm, WPerm, RPerm], Opts, Inform) ->
 
426
    VHost = proplists:get_value(?VHOST_OPT, Opts),
 
427
    Inform("Setting permissions for user \"~s\" in vhost \"~s\"",
 
428
           [Username, VHost]),
 
429
    call(Node, {rabbit_auth_backend_internal, set_permissions,
 
430
                [Username, VHost, CPerm, WPerm, RPerm]});
 
431
 
 
432
action(clear_permissions, Node, [Username], Opts, Inform) ->
 
433
    VHost = proplists:get_value(?VHOST_OPT, Opts),
 
434
    Inform("Clearing permissions for user \"~s\" in vhost \"~s\"",
 
435
           [Username, VHost]),
 
436
    call(Node, {rabbit_auth_backend_internal, clear_permissions,
 
437
                [Username, VHost]});
 
438
 
 
439
action(list_permissions, Node, [], Opts, Inform) ->
 
440
    VHost = proplists:get_value(?VHOST_OPT, Opts),
 
441
    Inform("Listing permissions in vhost \"~s\"", [VHost]),
 
442
    display_info_list(call(Node, {rabbit_auth_backend_internal,
 
443
                             list_vhost_permissions, [VHost]}),
 
444
                      rabbit_auth_backend_internal:vhost_perms_info_keys());
 
445
 
 
446
action(set_parameter, Node, [Component, Key, Value], Opts, Inform) ->
 
447
    VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
 
448
    Inform("Setting runtime parameter ~p for component ~p to ~p",
 
449
           [Key, Component, Value]),
 
450
    rpc_call(Node, rabbit_runtime_parameters, parse_set,
 
451
             [VHostArg, list_to_binary(Component), list_to_binary(Key), Value]);
 
452
 
 
453
action(clear_parameter, Node, [Component, Key], Opts, Inform) ->
 
454
    VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
 
455
    Inform("Clearing runtime parameter ~p for component ~p", [Key, Component]),
 
456
    rpc_call(Node, rabbit_runtime_parameters, clear, [VHostArg,
 
457
                                                      list_to_binary(Component),
 
458
                                                      list_to_binary(Key)]);
 
459
 
 
460
action(list_parameters, Node, [], Opts, Inform) ->
 
461
    VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
 
462
    Inform("Listing runtime parameters", []),
 
463
    display_info_list(
 
464
      rpc_call(Node, rabbit_runtime_parameters, list_formatted, [VHostArg]),
 
465
      rabbit_runtime_parameters:info_keys());
 
466
 
 
467
action(set_policy, Node, [Key, Pattern, Defn | Prio], Opts, Inform)
 
468
  when Prio == [] orelse length(Prio) == 1 ->
 
469
    Msg = "Setting policy ~p for pattern ~p to ~p",
 
470
    {InformMsg, Prio1} = case Prio of []  -> {Msg, undefined};
 
471
                                      [P] -> {Msg ++ " with priority ~s", P}
 
472
                         end,
 
473
    VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
 
474
    Inform(InformMsg, [Key, Pattern, Defn] ++ Prio),
 
475
    rpc_call(Node, rabbit_policy, parse_set,
 
476
             [VHostArg, list_to_binary(Key), Pattern, Defn, Prio1]);
 
477
 
 
478
action(clear_policy, Node, [Key], Opts, Inform) ->
 
479
    VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
 
480
    Inform("Clearing policy ~p", [Key]),
 
481
    rpc_call(Node, rabbit_policy, delete, [VHostArg, list_to_binary(Key)]);
 
482
 
 
483
action(list_policies, Node, [], Opts, Inform) ->
 
484
    VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
 
485
    Inform("Listing policies", []),
 
486
    display_info_list(rpc_call(Node, rabbit_policy, list_formatted, [VHostArg]),
 
487
                      rabbit_policy:info_keys());
 
488
 
 
489
action(report, Node, _Args, _Opts, Inform) ->
 
490
    Inform("Reporting server status on ~p~n~n", [erlang:universaltime()]),
 
491
    [begin ok = action(Action, N, [], [], Inform), io:nl() end ||
 
492
        N      <- unsafe_rpc(Node, rabbit_mnesia, cluster_nodes, [running]),
 
493
        Action <- [status, cluster_status, environment]],
 
494
    VHosts = unsafe_rpc(Node, rabbit_vhost, list, []),
 
495
    [print_report(Node, Q)      || Q <- ?GLOBAL_QUERIES],
 
496
    [print_report(Node, Q, [V]) || Q <- ?VHOST_QUERIES, V <- VHosts],
 
497
    ok;
 
498
 
 
499
action(eval, Node, [Expr], _Opts, _Inform) ->
 
500
    case erl_scan:string(Expr) of
 
501
        {ok, Scanned, _} ->
 
502
            case erl_parse:parse_exprs(Scanned) of
 
503
                {ok, Parsed} -> {value, Value, _} =
 
504
                                    unsafe_rpc(
 
505
                                      Node, erl_eval, exprs, [Parsed, []]),
 
506
                                io:format("~p~n", [Value]),
 
507
                                ok;
 
508
                {error, E}   -> {error_string, format_parse_error(E)}
 
509
            end;
 
510
        {error, E, _} ->
 
511
            {error_string, format_parse_error(E)}
 
512
    end.
 
513
 
 
514
format_parse_error({_Line, Mod, Err}) -> lists:flatten(Mod:format_error(Err)).
 
515
 
 
516
%%----------------------------------------------------------------------------
 
517
 
 
518
wait_for_application(Node, PidFile, Application, Inform) ->
 
519
    Pid = read_pid_file(PidFile, true),
 
520
    Inform("pid is ~s", [Pid]),
 
521
    wait_for_application(Node, Pid, Application).
 
522
 
 
523
wait_for_application(Node, Pid, rabbit_and_plugins) ->
 
524
    wait_for_startup(Node, Pid);
 
525
wait_for_application(Node, Pid, Application) ->
 
526
    while_process_is_alive(
 
527
      Node, Pid, fun() -> rabbit_nodes:is_running(Node, Application) end).
 
528
 
 
529
wait_for_startup(Node, Pid) ->
 
530
    while_process_is_alive(
 
531
      Node, Pid, fun() -> rpc:call(Node, rabbit, await_startup, []) =:= ok end).
 
532
 
 
533
while_process_is_alive(Node, Pid, Activity) ->
 
534
    case process_up(Pid) of
 
535
        true  -> case Activity() of
 
536
                     true  -> ok;
 
537
                     false -> timer:sleep(?EXTERNAL_CHECK_INTERVAL),
 
538
                              while_process_is_alive(Node, Pid, Activity)
 
539
                 end;
 
540
        false -> {error, process_not_running}
 
541
    end.
 
542
 
 
543
wait_for_process_death(Pid) ->
 
544
    case process_up(Pid) of
 
545
        true  -> timer:sleep(?EXTERNAL_CHECK_INTERVAL),
 
546
                 wait_for_process_death(Pid);
 
547
        false -> ok
 
548
    end.
 
549
 
 
550
read_pid_file(PidFile, Wait) ->
 
551
    case {file:read_file(PidFile), Wait} of
 
552
        {{ok, Bin}, _} ->
 
553
            S = binary_to_list(Bin),
 
554
            {match, [PidS]} = re:run(S, "[^\\s]+",
 
555
                                     [{capture, all, list}]),
 
556
            try list_to_integer(PidS)
 
557
            catch error:badarg ->
 
558
                    exit({error, {garbage_in_pid_file, PidFile}})
 
559
            end,
 
560
            PidS;
 
561
        {{error, enoent}, true} ->
 
562
            timer:sleep(?EXTERNAL_CHECK_INTERVAL),
 
563
            read_pid_file(PidFile, Wait);
 
564
        {{error, _} = E, _} ->
 
565
            exit({error, {could_not_read_pid, E}})
 
566
    end.
 
567
 
 
568
% Test using some OS clunkiness since we shouldn't trust
 
569
% rpc:call(os, getpid, []) at this point
 
570
process_up(Pid) ->
 
571
    with_os([{unix, fun () ->
 
572
                            run_ps(Pid) =:= 0
 
573
                    end},
 
574
             {win32, fun () ->
 
575
                             Res = os:cmd("tasklist /nh /fi \"pid eq " ++
 
576
                                          Pid ++ "\" 2>&1"),
 
577
                             case re:run(Res, "erl\\.exe", [{capture, none}]) of
 
578
                                 match -> true;
 
579
                                 _     -> false
 
580
                             end
 
581
                     end}]).
 
582
 
 
583
with_os(Handlers) ->
 
584
    {OsFamily, _} = os:type(),
 
585
    case proplists:get_value(OsFamily, Handlers) of
 
586
        undefined -> throw({unsupported_os, OsFamily});
 
587
        Handler   -> Handler()
 
588
    end.
 
589
 
 
590
run_ps(Pid) ->
 
591
    Port = erlang:open_port({spawn, "ps -p " ++ Pid},
 
592
                            [exit_status, {line, 16384},
 
593
                             use_stdio, stderr_to_stdout]),
 
594
    exit_loop(Port).
 
595
 
 
596
exit_loop(Port) ->
 
597
    receive
 
598
        {Port, {exit_status, Rc}} -> Rc;
 
599
        {Port, _}                 -> exit_loop(Port)
 
600
    end.
 
601
 
 
602
%%----------------------------------------------------------------------------
 
603
 
 
604
default_if_empty(List, Default) when is_list(List) ->
 
605
    if List == [] -> Default;
 
606
       true       -> [list_to_atom(X) || X <- List]
 
607
    end.
 
608
 
 
609
display_info_list(Results, InfoItemKeys) when is_list(Results) ->
 
610
    lists:foreach(
 
611
      fun (Result) -> display_row(
 
612
                        [format_info_item(proplists:get_value(X, Result)) ||
 
613
                            X <- InfoItemKeys])
 
614
      end, lists:sort(Results)),
 
615
    ok;
 
616
display_info_list(Other, _) ->
 
617
    Other.
 
618
 
 
619
display_row(Row) ->
 
620
    io:fwrite(string:join(Row, "\t")),
 
621
    io:nl().
 
622
 
 
623
-define(IS_U8(X),  (X >= 0 andalso X =< 255)).
 
624
-define(IS_U16(X), (X >= 0 andalso X =< 65535)).
 
625
 
 
626
format_info_item(#resource{name = Name}) ->
 
627
    escape(Name);
 
628
format_info_item({N1, N2, N3, N4} = Value) when
 
629
      ?IS_U8(N1), ?IS_U8(N2), ?IS_U8(N3), ?IS_U8(N4) ->
 
630
    rabbit_misc:ntoa(Value);
 
631
format_info_item({K1, K2, K3, K4, K5, K6, K7, K8} = Value) when
 
632
      ?IS_U16(K1), ?IS_U16(K2), ?IS_U16(K3), ?IS_U16(K4),
 
633
      ?IS_U16(K5), ?IS_U16(K6), ?IS_U16(K7), ?IS_U16(K8) ->
 
634
    rabbit_misc:ntoa(Value);
 
635
format_info_item(Value) when is_pid(Value) ->
 
636
    rabbit_misc:pid_to_string(Value);
 
637
format_info_item(Value) when is_binary(Value) ->
 
638
    escape(Value);
 
639
format_info_item(Value) when is_atom(Value) ->
 
640
    escape(atom_to_list(Value));
 
641
format_info_item([{TableEntryKey, TableEntryType, _TableEntryValue} | _] =
 
642
                     Value) when is_binary(TableEntryKey) andalso
 
643
                                 is_atom(TableEntryType) ->
 
644
    io_lib:format("~1000000000000p", [prettify_amqp_table(Value)]);
 
645
format_info_item([T | _] = Value)
 
646
  when is_tuple(T) orelse is_pid(T) orelse is_binary(T) orelse is_atom(T) orelse
 
647
       is_list(T) ->
 
648
    "[" ++
 
649
        lists:nthtail(2, lists:append(
 
650
                           [", " ++ format_info_item(E) || E <- Value])) ++ "]";
 
651
format_info_item(Value) ->
 
652
    io_lib:format("~w", [Value]).
 
653
 
 
654
display_call_result(Node, MFA) ->
 
655
    case call(Node, MFA) of
 
656
        {badrpc, _} = Res -> throw(Res);
 
657
        Res               -> io:format("~p~n", [Res]),
 
658
                             ok
 
659
    end.
 
660
 
 
661
unsafe_rpc(Node, Mod, Fun, Args) ->
 
662
    case rpc_call(Node, Mod, Fun, Args) of
 
663
        {badrpc, _} = Res -> throw(Res);
 
664
        Normal            -> Normal
 
665
    end.
 
666
 
 
667
call(Node, {Mod, Fun, Args}) ->
 
668
    rpc_call(Node, Mod, Fun, lists:map(fun list_to_binary/1, Args)).
 
669
 
 
670
rpc_call(Node, Mod, Fun, Args) ->
 
671
    rpc:call(Node, Mod, Fun, Args, ?RPC_TIMEOUT).
 
672
 
 
673
%% escape does C-style backslash escaping of non-printable ASCII
 
674
%% characters.  We don't escape characters above 127, since they may
 
675
%% form part of UTF-8 strings.
 
676
 
 
677
escape(Atom) when is_atom(Atom)  -> escape(atom_to_list(Atom));
 
678
escape(Bin)  when is_binary(Bin) -> escape(binary_to_list(Bin));
 
679
escape(L)    when is_list(L)     -> escape_char(lists:reverse(L), []).
 
680
 
 
681
escape_char([$\\ | T], Acc) ->
 
682
    escape_char(T, [$\\, $\\ | Acc]);
 
683
escape_char([X | T], Acc) when X >= 32, X /= 127 ->
 
684
    escape_char(T, [X | Acc]);
 
685
escape_char([X | T], Acc) ->
 
686
    escape_char(T, [$\\, $0 + (X bsr 6), $0 + (X band 8#070 bsr 3),
 
687
                    $0 + (X band 7) | Acc]);
 
688
escape_char([], Acc) ->
 
689
    Acc.
 
690
 
 
691
prettify_amqp_table(Table) ->
 
692
    [{escape(K), prettify_typed_amqp_value(T, V)} || {K, T, V} <- Table].
 
693
 
 
694
prettify_typed_amqp_value(longstr, Value) -> escape(Value);
 
695
prettify_typed_amqp_value(table,   Value) -> prettify_amqp_table(Value);
 
696
prettify_typed_amqp_value(array,   Value) -> [prettify_typed_amqp_value(T, V) ||
 
697
                                                 {T, V} <- Value];
 
698
prettify_typed_amqp_value(_Type,   Value) -> Value.