18
18
-include("rabbit.hrl").
19
19
-include("rabbit_framing.hrl").
21
-export([recover/0, callback/3, declare/6,
21
-export([recover/0, policy_changed/2, callback/4, declare/6,
22
22
assert_equivalence/6, assert_args_equivalence/2, check_type/1,
23
lookup/1, lookup_or_die/1, list/1, update_scratch/2,
23
lookup/1, lookup_or_die/1, list/1, lookup_scratch/2, update_scratch/3,
24
24
info_keys/0, info/1, info/2, info_all/1, info_all/2,
25
25
route/2, delete/2]).
26
26
%% these must be run inside a mnesia tx
27
-export([maybe_auto_delete/1, serial/1, peek_serial/1]).
27
-export([maybe_auto_delete/1, serial/1, peek_serial/1, update/2]).
29
29
%%----------------------------------------------------------------------------
37
37
-type(fun_name() :: atom()).
39
39
-spec(recover/0 :: () -> [name()]).
40
-spec(callback/3:: (rabbit_types:exchange(), fun_name(), [any()]) -> 'ok').
41
(rabbit_types:exchange(), fun_name(),
42
fun((boolean()) -> non_neg_integer()) | atom(),
44
-spec(policy_changed/2 ::
45
(rabbit_types:exchange(), rabbit_types:exchange()) -> 'ok').
42
47
(name(), type(), boolean(), boolean(), boolean(),
43
48
rabbit_framing:amqp_table())
58
63
(name()) -> rabbit_types:exchange() |
59
64
rabbit_types:channel_exit()).
60
65
-spec(list/1 :: (rabbit_types:vhost()) -> [rabbit_types:exchange()]).
61
-spec(update_scratch/2 :: (name(), fun((any()) -> any())) -> 'ok').
66
-spec(lookup_scratch/2 :: (name(), atom()) ->
67
rabbit_types:ok(term()) |
68
rabbit_types:error('not_found')).
69
-spec(update_scratch/3 :: (name(), atom(), fun((any()) -> any())) -> 'ok').
72
fun((rabbit_types:exchange()) -> rabbit_types:exchange())) -> 'ok').
62
73
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
63
74
-spec(info/1 :: (rabbit_types:exchange()) -> rabbit_types:infos()).
76
87
-spec(maybe_auto_delete/1::
77
88
(rabbit_types:exchange())
78
89
-> 'not_deleted' | {'deleted', rabbit_binding:deletions()}).
79
-spec(serial/1 :: (rabbit_types:exchange()) -> 'none' | pos_integer()).
90
-spec(serial/1 :: (rabbit_types:exchange()) ->
91
fun((boolean()) -> 'none' | pos_integer())).
80
92
-spec(peek_serial/1 :: (name()) -> pos_integer() | 'undefined').
84
96
%%----------------------------------------------------------------------------
86
-define(INFO_KEYS, [name, type, durable, auto_delete, internal, arguments]).
98
-define(INFO_KEYS, [name, type, durable, auto_delete, internal, arguments,
89
102
Xs = rabbit_misc:table_filter(
98
rabbit_exchange:callback(X, create, [map_create_tx(Tx), X])
111
callback(X, create, map_create_tx(Tx), [X])
100
113
rabbit_durable_exchange),
101
114
[XName || #exchange{name = XName} <- Xs].
103
callback(#exchange{type = XType}, Fun, Args) ->
104
apply(type_to_module(XType), Fun, Args).
116
callback(X = #exchange{type = XType}, Fun, Serial0, Args) ->
117
Serial = fun (Bool) ->
119
_ when is_atom(Serial0) -> Serial0;
123
[ok = apply(M, Fun, [Serial(M:serialise_events(X)) | Args])
124
|| M <- decorators()],
125
Module = type_to_module(XType),
126
apply(Module, Fun, [Serial(Module:serialise_events()) | Args]).
128
policy_changed(X1, X2) -> callback(X1, policy_changed, none, [X1, X2]).
130
serialise_events(X = #exchange{type = Type}) ->
131
case [Serialise || M <- decorators(),
132
Serialise <- [M:serialise_events(X)],
133
Serialise == true] of
134
[] -> (type_to_module(Type)):serialise_events();
138
serial(#exchange{name = XName} = X) ->
139
Serial = case serialise_events(X) of
140
true -> next_serial(XName);
143
fun (true) -> Serial;
148
[M || {_, M} <- rabbit_registry:lookup_all(exchange_decorator)].
106
150
declare(XName, Type, Durable, AutoDelete, Internal, Args) ->
107
X = #exchange{name = XName,
110
auto_delete = AutoDelete,
151
X = rabbit_policy:set(#exchange{name = XName,
154
auto_delete = AutoDelete,
113
157
XT = type_to_module(Type),
114
158
%% We want to upset things if it isn't ok
115
159
ok = XT:validate(X),
131
175
fun ({new, Exchange}, Tx) ->
132
ok = XT:create(map_create_tx(Tx), Exchange),
176
ok = callback(X, create, map_create_tx(Tx), [Exchange]),
133
177
rabbit_event:notify_if(not Tx, exchange_created, info(Exchange)),
135
179
({existing, Exchange}, _Tx) ->
141
185
map_create_tx(true) -> transaction;
142
186
map_create_tx(false) -> none.
144
store(X = #exchange{name = Name, type = Type}) ->
145
ok = mnesia:write(rabbit_exchange, X, write),
146
case (type_to_module(Type)):serialise_events() of
147
true -> S = #exchange_serial{name = Name, next = 1},
148
ok = mnesia:write(rabbit_exchange_serial, S, write);
188
store(X) -> ok = mnesia:write(rabbit_exchange, X, write).
152
190
%% Used with binaries sent over the wire; the type may not exist.
153
191
check_type(TypeBin) ->
201
239
#exchange{name = rabbit_misc:r(VHostPath, exchange), _ = '_'}).
203
update_scratch(Name, Fun) ->
241
lookup_scratch(Name, App) ->
243
{ok, #exchange{scratches = undefined}} ->
245
{ok, #exchange{scratches = Scratches}} ->
246
case orddict:find(App, Scratches) of
247
{ok, Value} -> {ok, Value};
248
error -> {error, not_found}
250
{error, not_found} ->
254
update_scratch(Name, App, Fun) ->
204
255
rabbit_misc:execute_mnesia_transaction(
206
case mnesia:wread({rabbit_exchange, Name}) of
207
[X = #exchange{durable = Durable, scratch = Scratch}] ->
208
X1 = X#exchange{scratch = Fun(Scratch)},
209
ok = mnesia:write(rabbit_exchange, X1, write),
211
true -> ok = mnesia:write(rabbit_durable_exchange,
258
fun(X = #exchange{scratches = Scratches0}) ->
259
Scratches1 = case Scratches0 of
260
undefined -> orddict:new();
263
Scratch = case orddict:find(App, Scratches1) of
267
Scratches2 = orddict:store(
268
App, Fun(Scratch), Scratches1),
269
X#exchange{scratches = Scratches2}
274
case mnesia:wread({rabbit_exchange, Name}) of
275
[X = #exchange{durable = Durable}] ->
277
ok = mnesia:write(rabbit_exchange, X1, write),
279
true -> ok = mnesia:write(rabbit_durable_exchange, X1, write);
220
286
info_keys() -> ?INFO_KEYS.
222
288
map(VHostPath, F) ->
232
298
i(auto_delete, #exchange{auto_delete = AutoDelete}) -> AutoDelete;
233
299
i(internal, #exchange{internal = Internal}) -> Internal;
234
300
i(arguments, #exchange{arguments = Arguments}) -> Arguments;
301
i(policy, X) -> case rabbit_policy:name(X) of
235
305
i(Item, _) -> throw({bad_argument, Item}).
237
307
info(X = #exchange{}) -> infos(?INFO_KEYS, X).
346
416
Bindings = rabbit_binding:remove_for_source(XName),
347
417
{deleted, X, Bindings, rabbit_binding:remove_for_destination(XName)}.
349
serial(#exchange{name = XName, type = Type}) ->
350
case (type_to_module(Type)):serialise_events() of
351
true -> next_serial(XName);
355
419
next_serial(XName) ->
356
[#exchange_serial{next = Serial}] =
357
mnesia:read(rabbit_exchange_serial, XName, write),
420
Serial = peek_serial(XName, write),
358
421
ok = mnesia:write(rabbit_exchange_serial,
359
422
#exchange_serial{name = XName, next = Serial + 1}, write),
362
peek_serial(XName) ->
363
case mnesia:read({rabbit_exchange_serial, XName}) of
425
peek_serial(XName) -> peek_serial(XName, read).
427
peek_serial(XName, LockType) ->
428
case mnesia:read(rabbit_exchange_serial, XName, LockType) of
364
429
[#exchange_serial{next = Serial}] -> Serial;
368
433
invalid_module(T) ->