1
%% The contents of this file are subject to the Mozilla Public License
2
%% Version 1.1 (the "License"); you may not use this file except in
3
%% compliance with the License. You may obtain a copy of the License
4
%% at http://www.mozilla.org/MPL/
6
%% Software distributed under the License is distributed on an "AS IS"
7
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
8
%% the License for the specific language governing rights and
9
%% limitations under the License.
11
%% The Original Code is RabbitMQ.
13
%% The Initial Developer of the Original Code is VMware, Inc.
14
%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
17
-module(rabbit_control_main).
18
-include("rabbit.hrl").
20
-export([start/0, stop/0, action/5]).
22
-define(RPC_TIMEOUT, infinity).
23
-define(EXTERNAL_CHECK_INTERVAL, 1000).
25
-define(QUIET_OPT, "-q").
26
-define(NODE_OPT, "-n").
27
-define(VHOST_OPT, "-p").
28
-define(RAM_OPT, "--ram").
29
-define(OFFLINE_OPT, "--offline").
31
-define(QUIET_DEF, {?QUIET_OPT, flag}).
32
-define(NODE_DEF(Node), {?NODE_OPT, {option, Node}}).
33
-define(VHOST_DEF, {?VHOST_OPT, {option, "/"}}).
34
-define(RAM_DEF, {?RAM_OPT, flag}).
35
-define(OFFLINE_DEF, {?OFFLINE_OPT, flag}).
37
-define(GLOBAL_DEFS(Node), [?QUIET_DEF, ?NODE_DEF(Node)]).
48
{join_cluster, [?RAM_DEF]},
49
change_cluster_node_type,
51
{forget_cluster_node, [?OFFLINE_DEF]},
64
{set_permissions, [?VHOST_DEF]},
65
{clear_permissions, [?VHOST_DEF]},
66
{list_permissions, [?VHOST_DEF]},
67
list_user_permissions,
69
{set_parameter, [?VHOST_DEF]},
70
{clear_parameter, [?VHOST_DEF]},
71
{list_parameters, [?VHOST_DEF]},
73
{set_policy, [?VHOST_DEF]},
74
{clear_policy, [?VHOST_DEF]},
75
{list_policies, [?VHOST_DEF]},
77
{list_queues, [?VHOST_DEF]},
78
{list_exchanges, [?VHOST_DEF]},
79
{list_bindings, [?VHOST_DEF]},
80
{list_connections, [?VHOST_DEF]},
82
{list_consumers, [?VHOST_DEF]},
89
{trace_on, [?VHOST_DEF]},
90
{trace_off, [?VHOST_DEF]},
91
set_vm_memory_high_watermark
94
-define(GLOBAL_QUERIES,
95
[{"Connections", rabbit_networking, connection_info_all,
96
connection_info_keys},
97
{"Channels", rabbit_channel, info_all, info_keys}]).
99
-define(VHOST_QUERIES,
100
[{"Queues", rabbit_amqqueue, info_all, info_keys},
101
{"Exchanges", rabbit_exchange, info_all, info_keys},
102
{"Bindings", rabbit_binding, info_all, info_keys},
103
{"Consumers", rabbit_amqqueue, consumers_all, consumer_info_keys},
104
{"Permissions", rabbit_auth_backend_internal, list_vhost_permissions,
105
vhost_perms_info_keys},
106
{"Policies", rabbit_policy, list_formatted, info_keys},
107
{"Parameters", rabbit_runtime_parameters, list_formatted, info_keys}]).
109
%%----------------------------------------------------------------------------
113
-spec(start/0 :: () -> no_return()).
114
-spec(stop/0 :: () -> 'ok').
116
(atom(), node(), [string()], [{string(), any()}],
117
fun ((string(), [any()]) -> 'ok'))
119
-spec(usage/0 :: () -> no_return()).
123
%%----------------------------------------------------------------------------
126
{ok, [[NodeStr|_]|_]} = init:get_argument(nodename),
127
{Command, Opts, Args} =
128
case rabbit_misc:parse_arguments(?COMMANDS, ?GLOBAL_DEFS(NodeStr),
129
init:get_plain_arguments())
132
no_command -> print_error("could not recognise command", []),
136
?NODE_OPT -> {?NODE_OPT, rabbit_nodes:make(V)};
138
end || {K, V} <- Opts],
139
Quiet = proplists:get_bool(?QUIET_OPT, Opts1),
140
Node = proplists:get_value(?NODE_OPT, Opts1),
141
Inform = case Quiet of
142
true -> fun (_Format, _Args1) -> ok end;
143
false -> fun (Format, Args1) ->
144
io:format(Format ++ " ...~n", Args1)
147
PrintInvalidCommandError =
149
print_error("invalid command '~s'",
150
[string:join([atom_to_list(Command) | Args], " ")])
153
%% The reason we don't use a try/catch here is that rpc:call turns
154
%% thrown errors into normal return values
155
case catch action(Command, Node, Args, Opts, Inform) of
159
false -> io:format("...done.~n")
162
{'EXIT', {function_clause, [{?MODULE, action, _} | _]}} -> %% < R15
163
PrintInvalidCommandError(),
165
{'EXIT', {function_clause, [{?MODULE, action, _, _} | _]}} -> %% >= R15
166
PrintInvalidCommandError(),
168
{'EXIT', {badarg, _}} ->
169
print_error("invalid parameter: ~p", [Args]),
171
{error, {Problem, Reason}} when is_atom(Problem); is_binary(Reason) ->
172
%% We handle this common case specially to avoid ~p since
173
%% that has i18n issues
174
print_error("~s: ~s", [Problem, Reason]),
177
print_error("~p", [Reason]),
179
{error_string, Reason} ->
180
print_error("~s", [Reason]),
182
{badrpc, {'EXIT', Reason}} ->
183
print_error("~p", [Reason]),
186
print_error("unable to connect to node ~w: ~w", [Node, Reason]),
187
print_badrpc_diagnostics(Node),
190
print_error("~p", [Other]),
194
fmt_stderr(Format, Args) -> rabbit_misc:format_stderr(Format ++ "~n", Args).
196
print_report(Node, {Descr, Module, InfoFun, KeysFun}) ->
197
io:format("~s:~n", [Descr]),
198
print_report0(Node, {Module, InfoFun, KeysFun}, []).
200
print_report(Node, {Descr, Module, InfoFun, KeysFun}, VHostArg) ->
201
io:format("~s on ~s:~n", [Descr, VHostArg]),
202
print_report0(Node, {Module, InfoFun, KeysFun}, VHostArg).
204
print_report0(Node, {Module, InfoFun, KeysFun}, VHostArg) ->
205
case rpc_call(Node, Module, InfoFun, VHostArg) of
206
[_|_] = Results -> InfoItems = rpc_call(Node, Module, KeysFun, []),
207
display_row([atom_to_list(I) || I <- InfoItems]),
208
display_info_list(Results, InfoItems);
213
print_error(Format, Args) -> fmt_stderr("Error: " ++ Format, Args).
215
print_badrpc_diagnostics(Node) ->
216
fmt_stderr(rabbit_nodes:diagnostics([Node]), []).
222
io:format("~s", [rabbit_ctl_usage:usage()]),
225
%%----------------------------------------------------------------------------
227
action(stop, Node, Args, _Opts, Inform) ->
228
Inform("Stopping and halting node ~p", [Node]),
229
Res = call(Node, {rabbit, stop_and_halt, []}),
231
{ok, [PidFile]} -> wait_for_process_death(
232
read_pid_file(PidFile, false));
233
{ok, [_, _| _]} -> exit({badarg, Args});
238
action(stop_app, Node, [], _Opts, Inform) ->
239
Inform("Stopping node ~p", [Node]),
240
call(Node, {rabbit, stop, []});
242
action(start_app, Node, [], _Opts, Inform) ->
243
Inform("Starting node ~p", [Node]),
244
call(Node, {rabbit, start, []});
246
action(reset, Node, [], _Opts, Inform) ->
247
Inform("Resetting node ~p", [Node]),
248
call(Node, {rabbit_mnesia, reset, []});
250
action(force_reset, Node, [], _Opts, Inform) ->
251
Inform("Forcefully resetting node ~p", [Node]),
252
call(Node, {rabbit_mnesia, force_reset, []});
254
action(join_cluster, Node, [ClusterNodeS], Opts, Inform) ->
255
ClusterNode = list_to_atom(ClusterNodeS),
256
NodeType = case proplists:get_bool(?RAM_OPT, Opts) of
260
Inform("Clustering node ~p with ~p", [Node, ClusterNode]),
261
rpc_call(Node, rabbit_mnesia, join_cluster, [ClusterNode, NodeType]);
263
action(change_cluster_node_type, Node, ["ram"], _Opts, Inform) ->
264
Inform("Turning ~p into a ram node", [Node]),
265
rpc_call(Node, rabbit_mnesia, change_cluster_node_type, [ram]);
266
action(change_cluster_node_type, Node, [Type], _Opts, Inform)
267
when Type =:= "disc" orelse Type =:= "disk" ->
268
Inform("Turning ~p into a disc node", [Node]),
269
rpc_call(Node, rabbit_mnesia, change_cluster_node_type, [disc]);
271
action(update_cluster_nodes, Node, [ClusterNodeS], _Opts, Inform) ->
272
ClusterNode = list_to_atom(ClusterNodeS),
273
Inform("Updating cluster nodes for ~p from ~p", [Node, ClusterNode]),
274
rpc_call(Node, rabbit_mnesia, update_cluster_nodes, [ClusterNode]);
276
action(forget_cluster_node, Node, [ClusterNodeS], Opts, Inform) ->
277
ClusterNode = list_to_atom(ClusterNodeS),
278
RemoveWhenOffline = proplists:get_bool(?OFFLINE_OPT, Opts),
279
Inform("Removing node ~p from cluster", [ClusterNode]),
280
rpc_call(Node, rabbit_mnesia, forget_cluster_node,
281
[ClusterNode, RemoveWhenOffline]);
283
action(wait, Node, [PidFile], _Opts, Inform) ->
284
Inform("Waiting for ~p", [Node]),
285
wait_for_application(Node, PidFile, rabbit_and_plugins, Inform);
286
action(wait, Node, [PidFile, App], _Opts, Inform) ->
287
Inform("Waiting for ~p on ~p", [App, Node]),
288
wait_for_application(Node, PidFile, list_to_atom(App), Inform);
290
action(status, Node, [], _Opts, Inform) ->
291
Inform("Status of node ~p", [Node]),
292
display_call_result(Node, {rabbit, status, []});
294
action(cluster_status, Node, [], _Opts, Inform) ->
295
Inform("Cluster status of node ~p", [Node]),
296
display_call_result(Node, {rabbit_mnesia, status, []});
298
action(environment, Node, _App, _Opts, Inform) ->
299
Inform("Application environment of node ~p", [Node]),
300
display_call_result(Node, {rabbit, environment, []});
302
action(rotate_logs, Node, [], _Opts, Inform) ->
303
Inform("Reopening logs for node ~p", [Node]),
304
call(Node, {rabbit, rotate_logs, [""]});
305
action(rotate_logs, Node, Args = [Suffix], _Opts, Inform) ->
306
Inform("Rotating logs to files with suffix \"~s\"", [Suffix]),
307
call(Node, {rabbit, rotate_logs, Args});
309
action(close_connection, Node, [PidStr, Explanation], _Opts, Inform) ->
310
Inform("Closing connection \"~s\"", [PidStr]),
311
rpc_call(Node, rabbit_networking, close_connection,
312
[rabbit_misc:string_to_pid(PidStr), Explanation]);
314
action(add_user, Node, Args = [Username, _Password], _Opts, Inform) ->
315
Inform("Creating user \"~s\"", [Username]),
316
call(Node, {rabbit_auth_backend_internal, add_user, Args});
318
action(delete_user, Node, Args = [_Username], _Opts, Inform) ->
319
Inform("Deleting user \"~s\"", Args),
320
call(Node, {rabbit_auth_backend_internal, delete_user, Args});
322
action(change_password, Node, Args = [Username, _Newpassword], _Opts, Inform) ->
323
Inform("Changing password for user \"~s\"", [Username]),
324
call(Node, {rabbit_auth_backend_internal, change_password, Args});
326
action(clear_password, Node, Args = [Username], _Opts, Inform) ->
327
Inform("Clearing password for user \"~s\"", [Username]),
328
call(Node, {rabbit_auth_backend_internal, clear_password, Args});
330
action(set_user_tags, Node, [Username | TagsStr], _Opts, Inform) ->
331
Tags = [list_to_atom(T) || T <- TagsStr],
332
Inform("Setting tags for user \"~s\" to ~p", [Username, Tags]),
333
rpc_call(Node, rabbit_auth_backend_internal, set_tags,
334
[list_to_binary(Username), Tags]);
336
action(list_users, Node, [], _Opts, Inform) ->
337
Inform("Listing users", []),
339
call(Node, {rabbit_auth_backend_internal, list_users, []}),
340
rabbit_auth_backend_internal:user_info_keys());
342
action(add_vhost, Node, Args = [_VHostPath], _Opts, Inform) ->
343
Inform("Creating vhost \"~s\"", Args),
344
call(Node, {rabbit_vhost, add, Args});
346
action(delete_vhost, Node, Args = [_VHostPath], _Opts, Inform) ->
347
Inform("Deleting vhost \"~s\"", Args),
348
call(Node, {rabbit_vhost, delete, Args});
350
action(list_vhosts, Node, Args, _Opts, Inform) ->
351
Inform("Listing vhosts", []),
352
ArgAtoms = default_if_empty(Args, [name]),
353
display_info_list(call(Node, {rabbit_vhost, info_all, []}), ArgAtoms);
355
action(list_user_permissions, Node, Args = [_Username], _Opts, Inform) ->
356
Inform("Listing permissions for user ~p", Args),
357
display_info_list(call(Node, {rabbit_auth_backend_internal,
358
list_user_permissions, Args}),
359
rabbit_auth_backend_internal:user_perms_info_keys());
361
action(list_queues, Node, Args, Opts, Inform) ->
362
Inform("Listing queues", []),
363
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
364
ArgAtoms = default_if_empty(Args, [name, messages]),
365
display_info_list(rpc_call(Node, rabbit_amqqueue, info_all,
366
[VHostArg, ArgAtoms]),
369
action(list_exchanges, Node, Args, Opts, Inform) ->
370
Inform("Listing exchanges", []),
371
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
372
ArgAtoms = default_if_empty(Args, [name, type]),
373
display_info_list(rpc_call(Node, rabbit_exchange, info_all,
374
[VHostArg, ArgAtoms]),
377
action(list_bindings, Node, Args, Opts, Inform) ->
378
Inform("Listing bindings", []),
379
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
380
ArgAtoms = default_if_empty(Args, [source_name, source_kind,
381
destination_name, destination_kind,
382
routing_key, arguments]),
383
display_info_list(rpc_call(Node, rabbit_binding, info_all,
384
[VHostArg, ArgAtoms]),
387
action(list_connections, Node, Args, _Opts, Inform) ->
388
Inform("Listing connections", []),
389
ArgAtoms = default_if_empty(Args, [user, peer_host, peer_port, state]),
390
display_info_list(rpc_call(Node, rabbit_networking, connection_info_all,
394
action(list_channels, Node, Args, _Opts, Inform) ->
395
Inform("Listing channels", []),
396
ArgAtoms = default_if_empty(Args, [pid, user, consumer_count,
397
messages_unacknowledged]),
398
display_info_list(rpc_call(Node, rabbit_channel, info_all, [ArgAtoms]),
401
action(list_consumers, Node, _Args, Opts, Inform) ->
402
Inform("Listing consumers", []),
403
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
404
display_info_list(rpc_call(Node, rabbit_amqqueue, consumers_all, [VHostArg]),
405
rabbit_amqqueue:consumer_info_keys());
407
action(trace_on, Node, [], Opts, Inform) ->
408
VHost = proplists:get_value(?VHOST_OPT, Opts),
409
Inform("Starting tracing for vhost \"~s\"", [VHost]),
410
rpc_call(Node, rabbit_trace, start, [list_to_binary(VHost)]);
412
action(trace_off, Node, [], Opts, Inform) ->
413
VHost = proplists:get_value(?VHOST_OPT, Opts),
414
Inform("Stopping tracing for vhost \"~s\"", [VHost]),
415
rpc_call(Node, rabbit_trace, stop, [list_to_binary(VHost)]);
417
action(set_vm_memory_high_watermark, Node, [Arg], _Opts, Inform) ->
418
Frac = list_to_float(case string:chr(Arg, $.) of
422
Inform("Setting memory threshold on ~p to ~p", [Node, Frac]),
423
rpc_call(Node, vm_memory_monitor, set_vm_memory_high_watermark, [Frac]);
425
action(set_permissions, Node, [Username, CPerm, WPerm, RPerm], Opts, Inform) ->
426
VHost = proplists:get_value(?VHOST_OPT, Opts),
427
Inform("Setting permissions for user \"~s\" in vhost \"~s\"",
429
call(Node, {rabbit_auth_backend_internal, set_permissions,
430
[Username, VHost, CPerm, WPerm, RPerm]});
432
action(clear_permissions, Node, [Username], Opts, Inform) ->
433
VHost = proplists:get_value(?VHOST_OPT, Opts),
434
Inform("Clearing permissions for user \"~s\" in vhost \"~s\"",
436
call(Node, {rabbit_auth_backend_internal, clear_permissions,
439
action(list_permissions, Node, [], Opts, Inform) ->
440
VHost = proplists:get_value(?VHOST_OPT, Opts),
441
Inform("Listing permissions in vhost \"~s\"", [VHost]),
442
display_info_list(call(Node, {rabbit_auth_backend_internal,
443
list_vhost_permissions, [VHost]}),
444
rabbit_auth_backend_internal:vhost_perms_info_keys());
446
action(set_parameter, Node, [Component, Key, Value], Opts, Inform) ->
447
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
448
Inform("Setting runtime parameter ~p for component ~p to ~p",
449
[Key, Component, Value]),
450
rpc_call(Node, rabbit_runtime_parameters, parse_set,
451
[VHostArg, list_to_binary(Component), list_to_binary(Key), Value]);
453
action(clear_parameter, Node, [Component, Key], Opts, Inform) ->
454
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
455
Inform("Clearing runtime parameter ~p for component ~p", [Key, Component]),
456
rpc_call(Node, rabbit_runtime_parameters, clear, [VHostArg,
457
list_to_binary(Component),
458
list_to_binary(Key)]);
460
action(list_parameters, Node, [], Opts, Inform) ->
461
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
462
Inform("Listing runtime parameters", []),
464
rpc_call(Node, rabbit_runtime_parameters, list_formatted, [VHostArg]),
465
rabbit_runtime_parameters:info_keys());
467
action(set_policy, Node, [Key, Pattern, Defn | Prio], Opts, Inform)
468
when Prio == [] orelse length(Prio) == 1 ->
469
Msg = "Setting policy ~p for pattern ~p to ~p",
470
{InformMsg, Prio1} = case Prio of [] -> {Msg, undefined};
471
[P] -> {Msg ++ " with priority ~s", P}
473
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
474
Inform(InformMsg, [Key, Pattern, Defn] ++ Prio),
475
rpc_call(Node, rabbit_policy, parse_set,
476
[VHostArg, list_to_binary(Key), Pattern, Defn, Prio1]);
478
action(clear_policy, Node, [Key], Opts, Inform) ->
479
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
480
Inform("Clearing policy ~p", [Key]),
481
rpc_call(Node, rabbit_policy, delete, [VHostArg, list_to_binary(Key)]);
483
action(list_policies, Node, [], Opts, Inform) ->
484
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
485
Inform("Listing policies", []),
486
display_info_list(rpc_call(Node, rabbit_policy, list_formatted, [VHostArg]),
487
rabbit_policy:info_keys());
489
action(report, Node, _Args, _Opts, Inform) ->
490
Inform("Reporting server status on ~p~n~n", [erlang:universaltime()]),
491
[begin ok = action(Action, N, [], [], Inform), io:nl() end ||
492
N <- unsafe_rpc(Node, rabbit_mnesia, cluster_nodes, [running]),
493
Action <- [status, cluster_status, environment]],
494
VHosts = unsafe_rpc(Node, rabbit_vhost, list, []),
495
[print_report(Node, Q) || Q <- ?GLOBAL_QUERIES],
496
[print_report(Node, Q, [V]) || Q <- ?VHOST_QUERIES, V <- VHosts],
499
action(eval, Node, [Expr], _Opts, _Inform) ->
500
case erl_scan:string(Expr) of
502
case erl_parse:parse_exprs(Scanned) of
503
{ok, Parsed} -> {value, Value, _} =
505
Node, erl_eval, exprs, [Parsed, []]),
506
io:format("~p~n", [Value]),
508
{error, E} -> {error_string, format_parse_error(E)}
511
{error_string, format_parse_error(E)}
514
format_parse_error({_Line, Mod, Err}) -> lists:flatten(Mod:format_error(Err)).
516
%%----------------------------------------------------------------------------
518
wait_for_application(Node, PidFile, Application, Inform) ->
519
Pid = read_pid_file(PidFile, true),
520
Inform("pid is ~s", [Pid]),
521
wait_for_application(Node, Pid, Application).
523
wait_for_application(Node, Pid, rabbit_and_plugins) ->
524
wait_for_startup(Node, Pid);
525
wait_for_application(Node, Pid, Application) ->
526
while_process_is_alive(
527
Node, Pid, fun() -> rabbit_nodes:is_running(Node, Application) end).
529
wait_for_startup(Node, Pid) ->
530
while_process_is_alive(
531
Node, Pid, fun() -> rpc:call(Node, rabbit, await_startup, []) =:= ok end).
533
while_process_is_alive(Node, Pid, Activity) ->
534
case process_up(Pid) of
535
true -> case Activity() of
537
false -> timer:sleep(?EXTERNAL_CHECK_INTERVAL),
538
while_process_is_alive(Node, Pid, Activity)
540
false -> {error, process_not_running}
543
wait_for_process_death(Pid) ->
544
case process_up(Pid) of
545
true -> timer:sleep(?EXTERNAL_CHECK_INTERVAL),
546
wait_for_process_death(Pid);
550
read_pid_file(PidFile, Wait) ->
551
case {file:read_file(PidFile), Wait} of
553
S = binary_to_list(Bin),
554
{match, [PidS]} = re:run(S, "[^\\s]+",
555
[{capture, all, list}]),
556
try list_to_integer(PidS)
557
catch error:badarg ->
558
exit({error, {garbage_in_pid_file, PidFile}})
561
{{error, enoent}, true} ->
562
timer:sleep(?EXTERNAL_CHECK_INTERVAL),
563
read_pid_file(PidFile, Wait);
564
{{error, _} = E, _} ->
565
exit({error, {could_not_read_pid, E}})
568
% Test using some OS clunkiness since we shouldn't trust
569
% rpc:call(os, getpid, []) at this point
571
with_os([{unix, fun () ->
575
Res = os:cmd("tasklist /nh /fi \"pid eq " ++
577
case re:run(Res, "erl\\.exe", [{capture, none}]) of
584
{OsFamily, _} = os:type(),
585
case proplists:get_value(OsFamily, Handlers) of
586
undefined -> throw({unsupported_os, OsFamily});
591
Port = erlang:open_port({spawn, "ps -p " ++ Pid},
592
[exit_status, {line, 16384},
593
use_stdio, stderr_to_stdout]),
598
{Port, {exit_status, Rc}} -> Rc;
599
{Port, _} -> exit_loop(Port)
602
%%----------------------------------------------------------------------------
604
default_if_empty(List, Default) when is_list(List) ->
605
if List == [] -> Default;
606
true -> [list_to_atom(X) || X <- List]
609
display_info_list(Results, InfoItemKeys) when is_list(Results) ->
611
fun (Result) -> display_row(
612
[format_info_item(proplists:get_value(X, Result)) ||
614
end, lists:sort(Results)),
616
display_info_list(Other, _) ->
620
io:fwrite(string:join(Row, "\t")),
623
-define(IS_U8(X), (X >= 0 andalso X =< 255)).
624
-define(IS_U16(X), (X >= 0 andalso X =< 65535)).
626
format_info_item(#resource{name = Name}) ->
628
format_info_item({N1, N2, N3, N4} = Value) when
629
?IS_U8(N1), ?IS_U8(N2), ?IS_U8(N3), ?IS_U8(N4) ->
630
rabbit_misc:ntoa(Value);
631
format_info_item({K1, K2, K3, K4, K5, K6, K7, K8} = Value) when
632
?IS_U16(K1), ?IS_U16(K2), ?IS_U16(K3), ?IS_U16(K4),
633
?IS_U16(K5), ?IS_U16(K6), ?IS_U16(K7), ?IS_U16(K8) ->
634
rabbit_misc:ntoa(Value);
635
format_info_item(Value) when is_pid(Value) ->
636
rabbit_misc:pid_to_string(Value);
637
format_info_item(Value) when is_binary(Value) ->
639
format_info_item(Value) when is_atom(Value) ->
640
escape(atom_to_list(Value));
641
format_info_item([{TableEntryKey, TableEntryType, _TableEntryValue} | _] =
642
Value) when is_binary(TableEntryKey) andalso
643
is_atom(TableEntryType) ->
644
io_lib:format("~1000000000000p", [prettify_amqp_table(Value)]);
645
format_info_item([T | _] = Value)
646
when is_tuple(T) orelse is_pid(T) orelse is_binary(T) orelse is_atom(T) orelse
649
lists:nthtail(2, lists:append(
650
[", " ++ format_info_item(E) || E <- Value])) ++ "]";
651
format_info_item(Value) ->
652
io_lib:format("~w", [Value]).
654
display_call_result(Node, MFA) ->
655
case call(Node, MFA) of
656
{badrpc, _} = Res -> throw(Res);
657
Res -> io:format("~p~n", [Res]),
661
unsafe_rpc(Node, Mod, Fun, Args) ->
662
case rpc_call(Node, Mod, Fun, Args) of
663
{badrpc, _} = Res -> throw(Res);
667
call(Node, {Mod, Fun, Args}) ->
668
rpc_call(Node, Mod, Fun, lists:map(fun list_to_binary/1, Args)).
670
rpc_call(Node, Mod, Fun, Args) ->
671
rpc:call(Node, Mod, Fun, Args, ?RPC_TIMEOUT).
673
%% escape does C-style backslash escaping of non-printable ASCII
674
%% characters. We don't escape characters above 127, since they may
675
%% form part of UTF-8 strings.
677
escape(Atom) when is_atom(Atom) -> escape(atom_to_list(Atom));
678
escape(Bin) when is_binary(Bin) -> escape(binary_to_list(Bin));
679
escape(L) when is_list(L) -> escape_char(lists:reverse(L), []).
681
escape_char([$\\ | T], Acc) ->
682
escape_char(T, [$\\, $\\ | Acc]);
683
escape_char([X | T], Acc) when X >= 32, X /= 127 ->
684
escape_char(T, [X | Acc]);
685
escape_char([X | T], Acc) ->
686
escape_char(T, [$\\, $0 + (X bsr 6), $0 + (X band 8#070 bsr 3),
687
$0 + (X band 7) | Acc]);
688
escape_char([], Acc) ->
691
prettify_amqp_table(Table) ->
692
[{escape(K), prettify_typed_amqp_value(T, V)} || {K, T, V} <- Table].
694
prettify_typed_amqp_value(longstr, Value) -> escape(Value);
695
prettify_typed_amqp_value(table, Value) -> prettify_amqp_table(Value);
696
prettify_typed_amqp_value(array, Value) -> [prettify_typed_amqp_value(T, V) ||
698
prettify_typed_amqp_value(_Type, Value) -> Value.