~ubuntu-branches/ubuntu/saucy/rabbitmq-server/saucy

« back to all changes in this revision

Viewing changes to src/rabbit_exchange.erl

  • Committer: Package Import Robot
  • Author(s): Emile Joubert
  • Date: 2012-11-19 11:42:31 UTC
  • mfrom: (0.2.18) (0.1.32 sid)
  • Revision ID: package-import@ubuntu.com-20121119114231-hvapkn4akng09etr
Tags: 3.0.0-1
New upstream release

Show diffs side-by-side

added added

removed removed

Lines of Context:
18
18
-include("rabbit.hrl").
19
19
-include("rabbit_framing.hrl").
20
20
 
21
 
-export([recover/0, callback/3, declare/6,
 
21
-export([recover/0, policy_changed/2, callback/4, declare/6,
22
22
         assert_equivalence/6, assert_args_equivalence/2, check_type/1,
23
 
         lookup/1, lookup_or_die/1, list/1, update_scratch/2,
 
23
         lookup/1, lookup_or_die/1, list/1, lookup_scratch/2, update_scratch/3,
24
24
         info_keys/0, info/1, info/2, info_all/1, info_all/2,
25
25
         route/2, delete/2]).
26
26
%% these must be run inside a mnesia tx
27
 
-export([maybe_auto_delete/1, serial/1, peek_serial/1]).
 
27
-export([maybe_auto_delete/1, serial/1, peek_serial/1, update/2]).
28
28
 
29
29
%%----------------------------------------------------------------------------
30
30
 
37
37
-type(fun_name() :: atom()).
38
38
 
39
39
-spec(recover/0 :: () -> [name()]).
40
 
-spec(callback/3:: (rabbit_types:exchange(), fun_name(), [any()]) -> 'ok').
 
40
-spec(callback/4::
 
41
        (rabbit_types:exchange(), fun_name(),
 
42
         fun((boolean()) -> non_neg_integer()) | atom(),
 
43
            [any()]) -> 'ok').
 
44
-spec(policy_changed/2 ::
 
45
        (rabbit_types:exchange(), rabbit_types:exchange()) -> 'ok').
41
46
-spec(declare/6 ::
42
47
        (name(), type(), boolean(), boolean(), boolean(),
43
48
         rabbit_framing:amqp_table())
58
63
        (name()) -> rabbit_types:exchange() |
59
64
                    rabbit_types:channel_exit()).
60
65
-spec(list/1 :: (rabbit_types:vhost()) -> [rabbit_types:exchange()]).
61
 
-spec(update_scratch/2 :: (name(), fun((any()) -> any())) -> 'ok').
 
66
-spec(lookup_scratch/2 :: (name(), atom()) ->
 
67
                               rabbit_types:ok(term()) |
 
68
                               rabbit_types:error('not_found')).
 
69
-spec(update_scratch/3 :: (name(), atom(), fun((any()) -> any())) -> 'ok').
 
70
-spec(update/2 ::
 
71
        (name(),
 
72
         fun((rabbit_types:exchange()) -> rabbit_types:exchange())) -> 'ok').
62
73
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
63
74
-spec(info/1 :: (rabbit_types:exchange()) -> rabbit_types:infos()).
64
75
-spec(info/2 ::
76
87
-spec(maybe_auto_delete/1::
77
88
        (rabbit_types:exchange())
78
89
        -> 'not_deleted' | {'deleted', rabbit_binding:deletions()}).
79
 
-spec(serial/1 :: (rabbit_types:exchange()) -> 'none' | pos_integer()).
 
90
-spec(serial/1 :: (rabbit_types:exchange()) ->
 
91
                       fun((boolean()) -> 'none' | pos_integer())).
80
92
-spec(peek_serial/1 :: (name()) -> pos_integer() | 'undefined').
81
93
 
82
94
-endif.
83
95
 
84
96
%%----------------------------------------------------------------------------
85
97
 
86
 
-define(INFO_KEYS, [name, type, durable, auto_delete, internal, arguments]).
 
98
-define(INFO_KEYS, [name, type, durable, auto_delete, internal, arguments,
 
99
                    policy]).
87
100
 
88
101
recover() ->
89
102
    Xs = rabbit_misc:table_filter(
95
108
                       true  -> store(X);
96
109
                       false -> ok
97
110
                   end,
98
 
                   rabbit_exchange:callback(X, create, [map_create_tx(Tx), X])
 
111
                   callback(X, create, map_create_tx(Tx), [X])
99
112
           end,
100
113
           rabbit_durable_exchange),
101
114
    [XName || #exchange{name = XName} <- Xs].
102
115
 
103
 
callback(#exchange{type = XType}, Fun, Args) ->
104
 
    apply(type_to_module(XType), Fun, Args).
 
116
callback(X = #exchange{type = XType}, Fun, Serial0, Args) ->
 
117
    Serial = fun (Bool) ->
 
118
                     case Serial0 of
 
119
                         _ when is_atom(Serial0) -> Serial0;
 
120
                         _                       -> Serial0(Bool)
 
121
                     end
 
122
             end,
 
123
    [ok = apply(M, Fun, [Serial(M:serialise_events(X)) | Args])
 
124
     || M <- decorators()],
 
125
    Module = type_to_module(XType),
 
126
    apply(Module, Fun, [Serial(Module:serialise_events()) | Args]).
 
127
 
 
128
policy_changed(X1, X2) -> callback(X1, policy_changed, none, [X1, X2]).
 
129
 
 
130
serialise_events(X = #exchange{type = Type}) ->
 
131
    case [Serialise || M <- decorators(),
 
132
                       Serialise <- [M:serialise_events(X)],
 
133
                       Serialise == true] of
 
134
        [] -> (type_to_module(Type)):serialise_events();
 
135
        _  -> true
 
136
    end.
 
137
 
 
138
serial(#exchange{name = XName} = X) ->
 
139
    Serial = case serialise_events(X) of
 
140
                 true  -> next_serial(XName);
 
141
                 false -> none
 
142
             end,
 
143
    fun (true)  -> Serial;
 
144
        (false) -> none
 
145
    end.
 
146
 
 
147
decorators() ->
 
148
    [M || {_, M} <- rabbit_registry:lookup_all(exchange_decorator)].
105
149
 
106
150
declare(XName, Type, Durable, AutoDelete, Internal, Args) ->
107
 
    X = #exchange{name        = XName,
108
 
                  type        = Type,
109
 
                  durable     = Durable,
110
 
                  auto_delete = AutoDelete,
111
 
                  internal    = Internal,
112
 
                  arguments   = Args},
 
151
    X = rabbit_policy:set(#exchange{name        = XName,
 
152
                                    type        = Type,
 
153
                                    durable     = Durable,
 
154
                                    auto_delete = AutoDelete,
 
155
                                    internal    = Internal,
 
156
                                    arguments   = Args}),
113
157
    XT = type_to_module(Type),
114
158
    %% We want to upset things if it isn't ok
115
159
    ok = XT:validate(X),
129
173
              end
130
174
      end,
131
175
      fun ({new, Exchange}, Tx) ->
132
 
              ok = XT:create(map_create_tx(Tx), Exchange),
 
176
              ok = callback(X, create, map_create_tx(Tx), [Exchange]),
133
177
              rabbit_event:notify_if(not Tx, exchange_created, info(Exchange)),
134
178
              Exchange;
135
179
          ({existing, Exchange}, _Tx) ->
141
185
map_create_tx(true)  -> transaction;
142
186
map_create_tx(false) -> none.
143
187
 
144
 
store(X = #exchange{name = Name, type = Type}) ->
145
 
    ok = mnesia:write(rabbit_exchange, X, write),
146
 
    case (type_to_module(Type)):serialise_events() of
147
 
        true  -> S = #exchange_serial{name = Name, next = 1},
148
 
                 ok = mnesia:write(rabbit_exchange_serial, S, write);
149
 
        false -> ok
150
 
    end.
 
188
store(X) -> ok = mnesia:write(rabbit_exchange, X, write).
151
189
 
152
190
%% Used with binaries sent over the wire; the type may not exist.
153
191
check_type(TypeBin) ->
200
238
      rabbit_exchange,
201
239
      #exchange{name = rabbit_misc:r(VHostPath, exchange), _ = '_'}).
202
240
 
203
 
update_scratch(Name, Fun) ->
 
241
lookup_scratch(Name, App) ->
 
242
    case lookup(Name) of
 
243
        {ok, #exchange{scratches = undefined}} ->
 
244
            {error, not_found};
 
245
        {ok, #exchange{scratches = Scratches}} ->
 
246
            case orddict:find(App, Scratches) of
 
247
                {ok, Value} -> {ok, Value};
 
248
                error       -> {error, not_found}
 
249
            end;
 
250
        {error, not_found} ->
 
251
            {error, not_found}
 
252
    end.
 
253
 
 
254
update_scratch(Name, App, Fun) ->
204
255
    rabbit_misc:execute_mnesia_transaction(
205
256
      fun() ->
206
 
              case mnesia:wread({rabbit_exchange, Name}) of
207
 
                  [X = #exchange{durable = Durable, scratch = Scratch}] ->
208
 
                      X1 = X#exchange{scratch = Fun(Scratch)},
209
 
                      ok = mnesia:write(rabbit_exchange, X1, write),
210
 
                      case Durable of
211
 
                          true -> ok = mnesia:write(rabbit_durable_exchange,
212
 
                                                    X1, write);
213
 
                          _    -> ok
214
 
                      end;
215
 
                  [] ->
216
 
                      ok
217
 
              end
 
257
              update(Name,
 
258
                     fun(X = #exchange{scratches = Scratches0}) ->
 
259
                             Scratches1 = case Scratches0 of
 
260
                                              undefined -> orddict:new();
 
261
                                              _         -> Scratches0
 
262
                                          end,
 
263
                             Scratch = case orddict:find(App, Scratches1) of
 
264
                                           {ok, S} -> S;
 
265
                                           error   -> undefined
 
266
                                       end,
 
267
                             Scratches2 = orddict:store(
 
268
                                            App, Fun(Scratch), Scratches1),
 
269
                             X#exchange{scratches = Scratches2}
 
270
                     end)
218
271
      end).
219
272
 
 
273
update(Name, Fun) ->
 
274
    case mnesia:wread({rabbit_exchange, Name}) of
 
275
        [X = #exchange{durable = Durable}] ->
 
276
            X1 = Fun(X),
 
277
            ok = mnesia:write(rabbit_exchange, X1, write),
 
278
            case Durable of
 
279
                true -> ok = mnesia:write(rabbit_durable_exchange, X1, write);
 
280
                _    -> ok
 
281
            end;
 
282
        [] ->
 
283
            ok
 
284
    end.
 
285
 
220
286
info_keys() -> ?INFO_KEYS.
221
287
 
222
288
map(VHostPath, F) ->
232
298
i(auto_delete, #exchange{auto_delete = AutoDelete}) -> AutoDelete;
233
299
i(internal,    #exchange{internal    = Internal})   -> Internal;
234
300
i(arguments,   #exchange{arguments   = Arguments})  -> Arguments;
 
301
i(policy,      X) ->  case rabbit_policy:name(X) of
 
302
                          none   -> '';
 
303
                          Policy -> Policy
 
304
                      end;
235
305
i(Item, _) -> throw({bad_argument, Item}).
236
306
 
237
307
info(X = #exchange{}) -> infos(?INFO_KEYS, X).
346
416
    Bindings = rabbit_binding:remove_for_source(XName),
347
417
    {deleted, X, Bindings, rabbit_binding:remove_for_destination(XName)}.
348
418
 
349
 
serial(#exchange{name = XName, type = Type}) ->
350
 
    case (type_to_module(Type)):serialise_events() of
351
 
        true  -> next_serial(XName);
352
 
        false -> none
353
 
    end.
354
 
 
355
419
next_serial(XName) ->
356
 
    [#exchange_serial{next = Serial}] =
357
 
        mnesia:read(rabbit_exchange_serial, XName, write),
 
420
    Serial = peek_serial(XName, write),
358
421
    ok = mnesia:write(rabbit_exchange_serial,
359
422
                      #exchange_serial{name = XName, next = Serial + 1}, write),
360
423
    Serial.
361
424
 
362
 
peek_serial(XName) ->
363
 
    case mnesia:read({rabbit_exchange_serial, XName}) of
 
425
peek_serial(XName) -> peek_serial(XName, read).
 
426
 
 
427
peek_serial(XName, LockType) ->
 
428
    case mnesia:read(rabbit_exchange_serial, XName, LockType) of
364
429
        [#exchange_serial{next = Serial}]  -> Serial;
365
 
        _                                  -> undefined
 
430
        _                                  -> 1
366
431
    end.
367
432
 
368
433
invalid_module(T) ->