56
56
x(<<"upstream2">>),
57
57
x(<<"fed12.downstream">>)]).
59
multiple_uris_test() ->
60
%% We can't use a direct connection for Kill() to work.
61
set_param("federation-upstream", "localhost",
62
"{\"uri\": [\"amqp://localhost\", \"amqp://localhost:5672\"]}"),
64
{ok, Conn} = amqp_connection:start(#amqp_params_network{}),
65
{ok, Ch} = amqp_connection:open_channel(Conn),
67
amqp_connection:close(Conn)
69
WithCh(fun (Ch) -> declare_all(Ch, ?UPSTREAM_DOWNSTREAM) end),
70
expect_uris([<<"amqp://localhost">>, <<"amqp://localhost:5672">>]),
71
WithCh(fun (Ch) -> delete_all(Ch, ?UPSTREAM_DOWNSTREAM) end),
72
%% Put back how it was
73
set_param("federation-upstream", "localhost", "{\"uri\": \"amqp://\"}").
75
expect_uris([]) -> ok;
76
expect_uris(URIs) -> [Link] = rabbit_federation_status:status(),
77
URI = pget(uri, Link),
78
kill_only_connection(n("rabbit-test")),
79
expect_uris(URIs -- [URI]).
81
kill_only_connection(Node) ->
82
case connection_pids(Node) of
83
[Pid] -> rabbit_networking:close_connection(Pid, "why not?"),
84
wait_for_pid_to_die(Node, Pid);
85
_ -> timer:sleep(1000),
86
kill_only_connection(Node)
89
wait_for_pid_to_die(Node, Pid) ->
90
case connection_pids(Node) of
91
[Pid] -> timer:sleep(1000),
92
wait_for_pid_to_die(Node, Pid);
59
97
multiple_downstreams_test() ->
229
267
suffix({Nodename, _}, XName) ->
230
X = #exchange{name = r(list_to_binary(XName))},
231
268
rpc:call(n(Nodename), rabbit_federation_db, get_active_suffix,
232
269
[r(<<"fed.downstream">>),
233
#upstream{name = list_to_binary(Nodename),
234
exchange = X}, none]).
270
#upstream{name = list_to_binary(Nodename),
271
exchange_name = list_to_binary(XName)}, none]).
237
274
{_, NodeHost} = rabbit_nodes:parts(node()),
455
492
set_pol("dyn", "^dyn\\.", policy("all")),
456
493
assert_connections(Xs, [<<"localhost">>, <<"local5673">>]),
495
%% Change policy - links change
496
set_pol("dyn", "^dyn\\.", policy("localhost")),
497
assert_connections(Xs, [<<"localhost">>]),
458
499
%% Unfederate them - links disappear
459
500
clear_pol("dyn"),
460
501
assert_connections(Xs, [])
654
695
case rabbit_policy:get(<<"federation-upstream-set">>, r(Name)) of
656
697
X = #exchange{name = r(Name)},
657
[{Name, U#upstream.name, name(U#upstream.exchange)} ||
698
[{Name, U#upstream.name, U#upstream.exchange_name} ||
658
699
U <- rabbit_federation_upstream:from_set(Set, X)];
659
700
{error, not_found} ->
668
709
rabbit_federation_status:status(), Links),
669
710
?assertEqual([], Remaining),
713
connection_pids(Node) ->
715
rpc:call(Node, rabbit_networking, connection_info_all, [[pid]])].