~ubuntu-branches/ubuntu/saucy/rabbitmq-server/saucy

« back to all changes in this revision

Viewing changes to src/rabbit_amqqueue_process.erl

  • Committer: Package Import Robot
  • Author(s): Emile Joubert
  • Date: 2012-11-19 11:42:31 UTC
  • mfrom: (0.2.18) (0.1.32 sid)
  • Revision ID: package-import@ubuntu.com-20121119114231-hvapkn4akng09etr
Tags: 3.0.0-1
New upstream release

Show diffs side-by-side

added added

removed removed

Lines of Context:
26
26
 
27
27
-export([start_link/1, info_keys/0]).
28
28
 
29
 
-export([init_with_backing_queue_state/8]).
 
29
-export([init_with_backing_queue_state/7]).
30
30
 
31
31
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
32
32
         handle_info/2, handle_pre_hibernate/1, prioritise_call/3,
76
76
-spec(start_link/1 ::
77
77
        (rabbit_types:amqqueue()) -> rabbit_types:ok_pid_or_error()).
78
78
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
79
 
-spec(init_with_backing_queue_state/8 ::
80
 
        (rabbit_types:amqqueue(), atom(), tuple(), any(), [any()],
 
79
-spec(init_with_backing_queue_state/7 ::
 
80
        (rabbit_types:amqqueue(), atom(), tuple(), any(),
81
81
         [rabbit_types:delivery()], pmon:pmon(), dict()) -> #q{}).
82
82
 
83
83
-endif.
86
86
 
87
87
-define(STATISTICS_KEYS,
88
88
        [pid,
 
89
         policy,
89
90
         exclusive_consumer_pid,
90
91
         exclusive_consumer_tag,
91
92
         messages_ready,
92
93
         messages_unacknowledged,
93
94
         messages,
94
95
         consumers,
 
96
         active_consumers,
95
97
         memory,
96
98
         slave_pids,
 
99
         synchronised_slave_pids,
97
100
         backing_queue_status
98
101
        ]).
99
102
 
103
106
         durable,
104
107
         auto_delete,
105
108
         arguments,
106
 
         owner_pid,
107
 
         slave_pids,
108
 
         synchronised_slave_pids
 
109
         owner_pid
109
110
        ]).
110
111
 
111
112
-define(INFO_KEYS,
112
 
        ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid, slave_pids]).
 
113
        ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
113
114
 
114
115
%%----------------------------------------------------------------------------
115
116
 
145
146
     {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
146
147
 
147
148
init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
148
 
                              RateTRef, AckTags, Deliveries, Senders, MTC) ->
 
149
                              RateTRef, Deliveries, Senders, MTC) ->
149
150
    case Owner of
150
151
        none -> ok;
151
152
        _    -> erlang:monitor(process, Owner)
167
168
               delayed_stop        = undefined,
168
169
               queue_monitors      = pmon:new(),
169
170
               msg_id_to_channel   = MTC},
170
 
    State1 = requeue_and_run(AckTags, process_args(
171
 
                                        rabbit_event:init_stats_timer(
172
 
                                          State, #q.stats_timer))),
173
 
    lists:foldl(
174
 
      fun (Delivery, StateN) -> deliver_or_enqueue(Delivery, StateN) end,
175
 
      State1, Deliveries).
 
171
    State1 = process_args(rabbit_event:init_stats_timer(State, #q.stats_timer)),
 
172
    lists:foldl(fun (Delivery, StateN) ->
 
173
                        deliver_or_enqueue(Delivery, true, StateN)
 
174
                end, State1, Deliveries).
176
175
 
177
176
terminate(shutdown = R,      State = #q{backing_queue = BQ}) ->
178
177
    terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State);
180
179
    terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State);
181
180
terminate(Reason,            State = #q{q             = #amqqueue{name = QName},
182
181
                                        backing_queue = BQ}) ->
183
 
    %% FIXME: How do we cancel active subscriptions?
184
182
    terminate_shutdown(
185
183
      fun (BQS) ->
186
184
              BQS1 = BQ:delete_and_terminate(Reason, BQS),
197
195
declare(Recover, From, State = #q{q                   = Q,
198
196
                                  backing_queue       = BQ,
199
197
                                  backing_queue_state = undefined}) ->
200
 
    case rabbit_amqqueue:internal_declare(Q, Recover) of
201
 
        not_found -> {stop, normal, not_found, State};
202
 
        Q         -> gen_server2:reply(From, {new, Q}),
203
 
                     ok = file_handle_cache:register_callback(
204
 
                            rabbit_amqqueue, set_maximum_since_use,
205
 
                            [self()]),
206
 
                     ok = rabbit_memory_monitor:register(
207
 
                            self(), {rabbit_amqqueue,
208
 
                                     set_ram_duration_target, [self()]}),
209
 
                     BQS = bq_init(BQ, Q, Recover),
210
 
                     State1 = process_args(State#q{backing_queue_state = BQS}),
211
 
                     rabbit_event:notify(queue_created,
212
 
                                         infos(?CREATION_EVENT_KEYS, State1)),
213
 
                     rabbit_event:if_enabled(State1, #q.stats_timer,
214
 
                                             fun() -> emit_stats(State1) end),
215
 
                     noreply(State1);
216
 
        Q1        -> {stop, normal, {existing, Q1}, State}
 
198
    case rabbit_amqqueue:internal_declare(Q, Recover =/= new) of
 
199
        #amqqueue{} = Q1 ->
 
200
            case matches(Recover, Q, Q1) of
 
201
                true ->
 
202
                    gen_server2:reply(From, {new, Q}),
 
203
                    ok = file_handle_cache:register_callback(
 
204
                           rabbit_amqqueue, set_maximum_since_use, [self()]),
 
205
                    ok = rabbit_memory_monitor:register(
 
206
                           self(), {rabbit_amqqueue,
 
207
                                    set_ram_duration_target, [self()]}),
 
208
                    BQS = bq_init(BQ, Q, Recover),
 
209
                    recovery_barrier(Recover),
 
210
                    State1 = process_args(State#q{backing_queue_state = BQS}),
 
211
                    rabbit_event:notify(queue_created,
 
212
                                        infos(?CREATION_EVENT_KEYS, State1)),
 
213
                    rabbit_event:if_enabled(State1, #q.stats_timer,
 
214
                                            fun() -> emit_stats(State1) end),
 
215
                    noreply(State1);
 
216
                false ->
 
217
                    {stop, normal, {existing, Q1}, State}
 
218
            end;
 
219
        Err ->
 
220
            {stop, normal, Err, State}
217
221
    end.
218
222
 
 
223
matches(new, Q1, Q2) ->
 
224
    %% i.e. not policy
 
225
    Q1#amqqueue.name            =:= Q2#amqqueue.name            andalso
 
226
    Q1#amqqueue.durable         =:= Q2#amqqueue.durable         andalso
 
227
    Q1#amqqueue.auto_delete     =:= Q2#amqqueue.auto_delete     andalso
 
228
    Q1#amqqueue.exclusive_owner =:= Q2#amqqueue.exclusive_owner andalso
 
229
    Q1#amqqueue.arguments       =:= Q2#amqqueue.arguments       andalso
 
230
    Q1#amqqueue.pid             =:= Q2#amqqueue.pid             andalso
 
231
    Q1#amqqueue.slave_pids      =:= Q2#amqqueue.slave_pids;
 
232
matches(_,  Q,   Q) -> true;
 
233
matches(_, _Q, _Q1) -> false.
 
234
 
219
235
bq_init(BQ, Q, Recover) ->
220
236
    Self = self(),
221
 
    BQ:init(Q, Recover,
 
237
    BQ:init(Q, Recover =/= new,
222
238
            fun (Mod, Fun) ->
223
239
                    rabbit_amqqueue:run_backing_queue(Self, Mod, Fun)
224
240
            end).
225
241
 
 
242
recovery_barrier(new) ->
 
243
    ok;
 
244
recovery_barrier(BarrierPid) ->
 
245
    MRef = erlang:monitor(process, BarrierPid),
 
246
    receive
 
247
        {BarrierPid, go}              -> erlang:demonitor(MRef, [flush]);
 
248
        {'DOWN', MRef, process, _, _} -> ok
 
249
    end.
 
250
 
226
251
process_args(State = #q{q = #amqqueue{arguments = Arguments}}) ->
227
252
    lists:foldl(
228
253
      fun({Arg, Fun}, State1) ->
232
257
              end
233
258
      end, State,
234
259
      [{<<"x-expires">>,                 fun init_expires/2},
235
 
       {<<"x-message-ttl">>,             fun init_ttl/2},
236
260
       {<<"x-dead-letter-exchange">>,    fun init_dlx/2},
237
 
       {<<"x-dead-letter-routing-key">>, fun init_dlx_routing_key/2}]).
 
261
       {<<"x-dead-letter-routing-key">>, fun init_dlx_routing_key/2},
 
262
       {<<"x-message-ttl">>,             fun init_ttl/2}]).
238
263
 
239
264
init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}).
240
265
 
279
304
        timed -> {ensure_sync_timer(State1), 0             }
280
305
    end.
281
306
 
282
 
backing_queue_module(#amqqueue{arguments = Args}) ->
283
 
    case rabbit_misc:table_lookup(Args, <<"x-ha-policy">>) of
284
 
        undefined -> {ok, BQM} = application:get_env(backing_queue_module),
285
 
                     BQM;
286
 
        _Policy   -> rabbit_mirror_queue_master
 
307
backing_queue_module(Q) ->
 
308
    case rabbit_mirror_queue_misc:is_mirrored(Q) of
 
309
        false -> {ok, BQM} = application:get_env(backing_queue_module),
 
310
                 BQM;
 
311
        true  -> rabbit_mirror_queue_master
287
312
    end.
288
313
 
289
314
ensure_sync_timer(State = #q{sync_timer_ref = undefined}) ->
482
507
    rabbit_misc:gb_trees_foreach(fun rabbit_misc:confirm_to_sender/2, CMs),
483
508
    State#q{msg_id_to_channel = MTC1}.
484
509
 
485
 
should_confirm_message(#delivery{msg_seq_no = undefined}, _State) ->
486
 
    never;
487
 
should_confirm_message(#delivery{sender     = SenderPid,
 
510
send_or_record_confirm(#delivery{msg_seq_no = undefined}, State) ->
 
511
    {never, State};
 
512
send_or_record_confirm(#delivery{sender     = SenderPid,
488
513
                                 msg_seq_no = MsgSeqNo,
489
514
                                 message    = #basic_message {
490
515
                                   is_persistent = true,
491
516
                                   id            = MsgId}},
492
 
                       #q{q = #amqqueue{durable = true}}) ->
493
 
    {eventually, SenderPid, MsgSeqNo, MsgId};
494
 
should_confirm_message(#delivery{sender     = SenderPid,
495
 
                                 msg_seq_no = MsgSeqNo},
496
 
                       _State) ->
497
 
    {immediately, SenderPid, MsgSeqNo}.
498
 
 
499
 
needs_confirming({eventually, _, _, _}) -> true;
500
 
needs_confirming(_)                     -> false.
501
 
 
502
 
maybe_record_confirm_message({eventually, SenderPid, MsgSeqNo, MsgId},
503
 
                             State = #q{msg_id_to_channel = MTC}) ->
504
 
    State#q{msg_id_to_channel =
505
 
                gb_trees:insert(MsgId, {SenderPid, MsgSeqNo}, MTC)};
506
 
maybe_record_confirm_message({immediately, SenderPid, MsgSeqNo}, State) ->
 
517
                       State = #q{q                 = #amqqueue{durable = true},
 
518
                                  msg_id_to_channel = MTC}) ->
 
519
    MTC1 = gb_trees:insert(MsgId, {SenderPid, MsgSeqNo}, MTC),
 
520
    {eventually, State#q{msg_id_to_channel = MTC1}};
 
521
send_or_record_confirm(#delivery{sender     = SenderPid,
 
522
                                 msg_seq_no = MsgSeqNo}, State) ->
507
523
    rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]),
508
 
    State;
509
 
maybe_record_confirm_message(_Confirm, State) ->
510
 
    State.
 
524
    {immediately, State}.
 
525
 
 
526
discard(#delivery{sender = SenderPid, message = #basic_message{id = MsgId}},
 
527
        State) ->
 
528
    %% fake an 'eventual' confirm from BQ; noop if not needed
 
529
    State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
 
530
        confirm_messages([MsgId], State),
 
531
    BQS1 = BQ:discard(MsgId, SenderPid, BQS),
 
532
    State1#q{backing_queue_state = BQS1}.
511
533
 
512
534
run_message_queue(State) ->
513
535
    State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
517
539
                            BQ:is_empty(BQS), State1),
518
540
    State2.
519
541
 
520
 
attempt_delivery(#delivery{sender = SenderPid, message = Message}, Confirm,
 
542
attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
 
543
                 Props = #message_properties{delivered = Delivered},
521
544
                 State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
522
545
    case BQ:is_duplicate(Message, BQS) of
523
546
        {false, BQS1} ->
524
547
            deliver_msgs_to_consumers(
525
 
              fun (AckRequired, State1 = #q{backing_queue_state = BQS2}) ->
526
 
                      Props = message_properties(Confirm, State1),
 
548
              fun (true, State1 = #q{backing_queue_state = BQS2}) ->
527
549
                      {AckTag, BQS3} = BQ:publish_delivered(
528
 
                                         AckRequired, Message, Props,
529
 
                                         SenderPid, BQS2),
530
 
                      {{Message, false, AckTag}, true,
531
 
                       State1#q{backing_queue_state = BQS3}}
 
550
                                         Message, Props, SenderPid, BQS2),
 
551
                      {{Message, Delivered, AckTag},
 
552
                       true, State1#q{backing_queue_state = BQS3}};
 
553
                  (false, State1) ->
 
554
                      {{Message, Delivered, undefined},
 
555
                       true, discard(Delivery, State1)}
532
556
              end, false, State#q{backing_queue_state = BQS1});
533
 
        {Duplicate, BQS1} ->
534
 
            %% if the message has previously been seen by the BQ then
535
 
            %% it must have been seen under the same circumstances as
536
 
            %% now: i.e. if it is now a deliver_immediately then it
537
 
            %% must have been before.
538
 
            {case Duplicate of
539
 
                 published -> true;
540
 
                 discarded -> false
541
 
             end,
542
 
             State#q{backing_queue_state = BQS1}}
 
557
        {published, BQS1} ->
 
558
            {true,  State#q{backing_queue_state = BQS1}};
 
559
        {discarded, BQS1} ->
 
560
            {false, State#q{backing_queue_state = BQS1}}
543
561
    end.
544
562
 
545
 
deliver_or_enqueue(Delivery = #delivery{message    = Message,
546
 
                                        sender     = SenderPid}, State) ->
547
 
    Confirm = should_confirm_message(Delivery, State),
548
 
    case attempt_delivery(Delivery, Confirm, State) of
549
 
        {true, State1} ->
550
 
            maybe_record_confirm_message(Confirm, State1);
551
 
        %% the next one is an optimisations
552
 
        %% TODO: optimise the Confirm =/= never case too
553
 
        {false, State1 = #q{ttl = 0, dlx = undefined}} when Confirm == never ->
554
 
            discard_delivery(Delivery, State1);
555
 
        {false, State1} ->
556
 
            State2 = #q{backing_queue = BQ, backing_queue_state = BQS} =
557
 
                maybe_record_confirm_message(Confirm, State1),
558
 
            Props = message_properties(Confirm, State2),
 
563
deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
 
564
                   Delivered, State) ->
 
565
    {Confirm, State1} = send_or_record_confirm(Delivery, State),
 
566
    Props = message_properties(Message, Confirm, Delivered, State),
 
567
    case attempt_delivery(Delivery, Props, State1) of
 
568
        {true, State2} ->
 
569
            State2;
 
570
        %% The next one is an optimisation
 
571
        {false, State2 = #q{ttl = 0, dlx = undefined}} ->
 
572
            discard(Delivery, State2);
 
573
        {false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} ->
559
574
            BQS1 = BQ:publish(Message, Props, SenderPid, BQS),
560
575
            ensure_ttl_timer(Props#message_properties.expiry,
561
576
                             State2#q{backing_queue_state = BQS1})
562
577
    end.
563
578
 
564
 
requeue_and_run(AckTags, State = #q{backing_queue = BQ}) ->
565
 
    run_backing_queue(BQ, fun (M, BQS) ->
566
 
                                  {_MsgIds, BQS1} = M:requeue(AckTags, BQS),
567
 
                                  BQS1
568
 
                          end, State).
 
579
requeue_and_run(AckTags, State = #q{backing_queue       = BQ,
 
580
                                    backing_queue_state = BQS}) ->
 
581
    {_MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
 
582
    run_message_queue(State#q{backing_queue_state = BQS1}).
569
583
 
570
 
fetch(AckRequired, State = #q{backing_queue_state = BQS,
571
 
                              backing_queue       = BQ}) ->
 
584
fetch(AckRequired, State = #q{backing_queue       = BQ,
 
585
                              backing_queue_state = BQS}) ->
572
586
    {Result, BQS1} = BQ:fetch(AckRequired, BQS),
573
587
    {Result, State#q{backing_queue_state = BQS1}}.
574
588
 
 
589
ack(AckTags, ChPid, State) ->
 
590
    subtract_acks(ChPid, AckTags, State,
 
591
                  fun (State1 = #q{backing_queue       = BQ,
 
592
                                   backing_queue_state = BQS}) ->
 
593
                          {_Guids, BQS1} = BQ:ack(AckTags, BQS),
 
594
                          State1#q{backing_queue_state = BQS1}
 
595
                  end).
 
596
 
 
597
requeue(AckTags, ChPid, State) ->
 
598
    subtract_acks(ChPid, AckTags, State,
 
599
                  fun (State1) -> requeue_and_run(AckTags, State1) end).
 
600
 
575
601
remove_consumer(ChPid, ConsumerTag, Queue) ->
576
602
    queue:filter(fun ({CP, #consumer{tag = CTag}}) ->
577
603
                         (CP /= ChPid) or (CTag /= ConsumerTag)
662
688
 
663
689
qname(#q{q = #amqqueue{name = QName}}) -> QName.
664
690
 
665
 
backing_queue_timeout(State = #q{backing_queue = BQ}) ->
666
 
    run_backing_queue(BQ, fun (M, BQS) -> M:timeout(BQS) end, State).
667
 
 
668
 
run_backing_queue(Mod, Fun, State = #q{backing_queue = BQ,
669
 
                                       backing_queue_state = BQS}) ->
670
 
    run_message_queue(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)}).
 
691
backing_queue_timeout(State = #q{backing_queue       = BQ,
 
692
                                 backing_queue_state = BQS}) ->
 
693
    State#q{backing_queue_state = BQ:timeout(BQS)}.
671
694
 
672
695
subtract_acks(ChPid, AckTags, State, Fun) ->
673
696
    case lookup_ch(ChPid) of
679
702
            Fun(State)
680
703
    end.
681
704
 
682
 
discard_delivery(#delivery{sender = SenderPid,
683
 
                           message = Message},
684
 
                 State = #q{backing_queue = BQ,
685
 
                            backing_queue_state = BQS}) ->
686
 
    State#q{backing_queue_state = BQ:discard(Message, SenderPid, BQS)}.
687
 
 
688
 
message_properties(Confirm, #q{ttl = TTL}) ->
689
 
    #message_properties{expiry           = calculate_msg_expiry(TTL),
690
 
                        needs_confirming = needs_confirming(Confirm)}.
691
 
 
692
 
calculate_msg_expiry(undefined) -> undefined;
693
 
calculate_msg_expiry(TTL)       -> now_micros() + (TTL * 1000).
694
 
 
695
 
drop_expired_messages(State = #q{ttl = undefined}) ->
696
 
    State;
697
 
drop_expired_messages(State = #q{backing_queue_state = BQS,
 
705
message_properties(Message, Confirm, Delivered, #q{ttl = TTL}) ->
 
706
    #message_properties{expiry           = calculate_msg_expiry(Message, TTL),
 
707
                        needs_confirming = Confirm == eventually,
 
708
                        delivered        = Delivered}.
 
709
 
 
710
calculate_msg_expiry(#basic_message{content = Content}, TTL) ->
 
711
    #content{properties = Props} =
 
712
        rabbit_binary_parser:ensure_content_decoded(Content),
 
713
    %% We assert that the expiration must be valid - we check in the channel.
 
714
    {ok, MsgTTL} = rabbit_basic:parse_expiration(Props),
 
715
    case lists:min([TTL, MsgTTL]) of
 
716
        undefined -> undefined;
 
717
        T         -> now_micros() + T * 1000
 
718
    end.
 
719
 
 
720
drop_expired_messages(State = #q{dlx                 = DLX,
 
721
                                 backing_queue_state = BQS,
698
722
                                 backing_queue       = BQ }) ->
699
723
    Now = now_micros(),
700
 
    DLXFun = dead_letter_fun(expired, State),
701
724
    ExpirePred = fun (#message_properties{expiry = Exp}) -> Now >= Exp end,
702
 
    {Props, BQS1} =
703
 
        case DLXFun of
704
 
            undefined ->
705
 
                {Next, undefined, BQS2} = BQ:dropwhile(ExpirePred, false, BQS),
706
 
                {Next, BQS2};
707
 
            _  ->
708
 
                {Next, Msgs,      BQS2} = BQ:dropwhile(ExpirePred, true,  BQS),
709
 
                lists:foreach(fun({Msg, AckTag}) -> DLXFun(Msg, AckTag) end,
710
 
                              Msgs),
711
 
                {Next, BQS2}
712
 
        end,
 
725
    {Props, BQS1} = case DLX of
 
726
                        undefined -> {Next, undefined, BQS2} =
 
727
                                         BQ:dropwhile(ExpirePred, false, BQS),
 
728
                                     {Next, BQS2};
 
729
                        _         -> {Next, Msgs,      BQS2} =
 
730
                                         BQ:dropwhile(ExpirePred, true,  BQS),
 
731
                                     DLXFun = dead_letter_fun(expired),
 
732
                                     DLXFun(Msgs),
 
733
                                     {Next, BQS2}
 
734
                    end,
713
735
    ensure_ttl_timer(case Props of
714
736
                         undefined                          -> undefined;
715
737
                         #message_properties{expiry = Exp}  -> Exp
717
739
 
718
740
ensure_ttl_timer(undefined, State) ->
719
741
    State;
720
 
ensure_ttl_timer(_Expiry, State = #q{ttl = undefined}) ->
721
 
    State;
722
742
ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = undefined}) ->
723
743
    After = (case Expiry - now_micros() of
724
744
                 V when V > 0 -> V + 999; %% always fire later
736
756
ensure_ttl_timer(_Expiry, State) ->
737
757
    State.
738
758
 
739
 
ack_if_no_dlx(AckTags, State = #q{dlx                 = undefined,
740
 
                                  backing_queue       = BQ,
741
 
                                  backing_queue_state = BQS }) ->
742
 
    {_Guids, BQS1} = BQ:ack(AckTags, BQS),
743
 
    State#q{backing_queue_state = BQS1};
744
 
ack_if_no_dlx(_AckTags, State) ->
745
 
    State.
746
 
 
747
 
dead_letter_fun(_Reason, #q{dlx = undefined}) ->
748
 
    undefined;
749
 
dead_letter_fun(Reason, _State) ->
750
 
    fun(Msg, AckTag) ->
751
 
            gen_server2:cast(self(), {dead_letter, {Msg, AckTag}, Reason})
752
 
    end.
753
 
 
754
 
dead_letter_publish(Msg, Reason, State = #q{publish_seqno = MsgSeqNo}) ->
755
 
    DLMsg = #basic_message{exchange_name = XName} =
756
 
        make_dead_letter_msg(Reason, Msg, State),
757
 
    case rabbit_exchange:lookup(XName) of
758
 
        {ok, X} ->
759
 
            Delivery = rabbit_basic:delivery(false, false, DLMsg, MsgSeqNo),
760
 
            {Queues, Cycles} = detect_dead_letter_cycles(
761
 
                                 DLMsg, rabbit_exchange:route(X, Delivery)),
762
 
            lists:foreach(fun log_cycle_once/1, Cycles),
763
 
            QPids = rabbit_amqqueue:lookup(Queues),
764
 
            {_, DeliveredQPids} = rabbit_amqqueue:deliver(QPids, Delivery),
765
 
            DeliveredQPids;
766
 
        {error, not_found} ->
767
 
            []
768
 
    end.
769
 
 
770
 
dead_letter_msg(Msg, AckTag, Reason, State = #q{publish_seqno = MsgSeqNo,
771
 
                                                unconfirmed   = UC}) ->
772
 
    QPids = dead_letter_publish(Msg, Reason, State),
773
 
    State1 = State#q{queue_monitors = pmon:monitor_all(
774
 
                                        QPids, State#q.queue_monitors),
775
 
                     publish_seqno  = MsgSeqNo + 1},
776
 
    case QPids of
777
 
        [] -> cleanup_after_confirm([AckTag], State1);
778
 
        _  -> UC1 = dtree:insert(MsgSeqNo, QPids, AckTag, UC),
779
 
              noreply(State1#q{unconfirmed = UC1})
780
 
    end.
 
759
dead_letter_fun(Reason) ->
 
760
    fun(Msgs) -> gen_server2:cast(self(), {dead_letter, Msgs, Reason}) end.
 
761
 
 
762
dead_letter_publish(Msg, Reason, X, State = #q{publish_seqno = MsgSeqNo}) ->
 
763
    DLMsg = make_dead_letter_msg(Reason, Msg, State),
 
764
    Delivery = rabbit_basic:delivery(false, DLMsg, MsgSeqNo),
 
765
    {Queues, Cycles} = detect_dead_letter_cycles(
 
766
                         DLMsg, rabbit_exchange:route(X, Delivery)),
 
767
    lists:foreach(fun log_cycle_once/1, Cycles),
 
768
    {_, DeliveredQPids} = rabbit_amqqueue:deliver(
 
769
                            rabbit_amqqueue:lookup(Queues), Delivery),
 
770
    DeliveredQPids.
781
771
 
782
772
handle_queue_down(QPid, Reason, State = #q{queue_monitors = QMons,
783
773
                                           unconfirmed    = UC}) ->
798
788
                           unconfirmed    = UC1})
799
789
    end.
800
790
 
801
 
stop_later(Reason, State) ->
802
 
    stop_later(Reason, undefined, noreply, State).
 
791
stop(State) -> stop(undefined, noreply, State).
803
792
 
804
 
stop_later(Reason, From, Reply, State = #q{unconfirmed = UC}) ->
 
793
stop(From, Reply, State = #q{unconfirmed = UC}) ->
805
794
    case {dtree:is_empty(UC), Reply} of
806
795
        {true, noreply} ->
807
 
            {stop, Reason, State};
 
796
            {stop, normal, State};
808
797
        {true, _} ->
809
 
            {stop, Reason, Reply, State};
 
798
            {stop, normal, Reply, State};
810
799
        {false, _} ->
811
 
            noreply(State#q{delayed_stop = {Reason, {From, Reply}}})
 
800
            noreply(State#q{delayed_stop = {From, Reply}})
812
801
    end.
813
802
 
814
803
cleanup_after_confirm(AckTags, State = #q{delayed_stop        = DS,
819
808
    State1 = State#q{backing_queue_state = BQS1},
820
809
    case dtree:is_empty(UC) andalso DS =/= undefined of
821
810
        true  -> case DS of
822
 
                     {_, {_, noreply}}  -> ok;
823
 
                     {_, {From, Reply}} -> gen_server2:reply(From, Reply)
 
811
                     {_,  noreply} -> ok;
 
812
                     {From, Reply} -> gen_server2:reply(From, Reply)
824
813
                 end,
825
 
                 {Reason, _} = DS,
826
 
                 {stop, Reason, State1};
 
814
                 {stop, normal, State1};
827
815
        false -> noreply(State1)
828
816
    end.
829
817
 
879
867
                        {<<"time">>,         timestamp, TimeSec},
880
868
                        {<<"exchange">>,     longstr,   Exchange#resource.name},
881
869
                        {<<"routing-keys">>, array,     RKs1}],
882
 
                HeadersFun1(rabbit_basic:append_table_header(<<"x-death">>,
883
 
                                                             Info, Headers))
 
870
                HeadersFun1(rabbit_basic:prepend_table_header(<<"x-death">>,
 
871
                                                              Info, Headers))
884
872
        end,
885
873
    Content1 = rabbit_basic:map_headers(HeadersFun2, Content),
886
874
    Msg#basic_message{exchange_name = DLX, id = rabbit_guid:gen(),
888
876
 
889
877
now_micros() -> timer:now_diff(now(), {0,0,0}).
890
878
 
891
 
infos(Items, State) ->
892
 
    {Prefix, Items1} =
893
 
        case lists:member(synchronised_slave_pids, Items) of
894
 
            true  -> Prefix1 = slaves_status(State),
895
 
                     case lists:member(slave_pids, Items) of
896
 
                         true  -> {Prefix1, Items -- [slave_pids]};
897
 
                         false -> {proplists:delete(slave_pids, Prefix1), Items}
898
 
                     end;
899
 
            false -> {[], Items}
900
 
        end,
901
 
    Prefix ++ [{Item, i(Item, State)}
902
 
               || Item <- (Items1 -- [synchronised_slave_pids])].
903
 
 
904
 
slaves_status(#q{q = #amqqueue{name = Name}}) ->
905
 
    case rabbit_amqqueue:lookup(Name) of
906
 
        {ok, #amqqueue{mirror_nodes = undefined}} ->
907
 
            [{slave_pids, ''}, {synchronised_slave_pids, ''}];
908
 
        {ok, #amqqueue{slave_pids = SPids}} ->
909
 
            {Results, _Bad} =
910
 
                delegate:invoke(SPids, fun rabbit_mirror_queue_slave:info/1),
911
 
            {SPids1, SSPids} =
912
 
                lists:foldl(
913
 
                  fun ({Pid, Infos}, {SPidsN, SSPidsN}) ->
914
 
                          {[Pid | SPidsN],
915
 
                           case proplists:get_bool(is_synchronised, Infos) of
916
 
                               true  -> [Pid | SSPidsN];
917
 
                               false -> SSPidsN
918
 
                           end}
919
 
                  end, {[], []}, Results),
920
 
            [{slave_pids, SPids1}, {synchronised_slave_pids, SSPids}]
921
 
    end.
 
879
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
922
880
 
923
881
i(name,        #q{q = #amqqueue{name        = Name}})       -> Name;
924
882
i(durable,     #q{q = #amqqueue{durable     = Durable}})    -> Durable;
930
888
    '';
931
889
i(owner_pid, #q{q = #amqqueue{exclusive_owner = ExclusiveOwner}}) ->
932
890
    ExclusiveOwner;
 
891
i(policy,    #q{q = #amqqueue{name = Name}}) ->
 
892
    {ok, Q} = rabbit_amqqueue:lookup(Name),
 
893
    case rabbit_policy:name(Q) of
 
894
        none   -> '';
 
895
        Policy -> Policy
 
896
    end;
933
897
i(exclusive_consumer_pid, #q{exclusive_consumer = none}) ->
934
898
    '';
935
899
i(exclusive_consumer_pid, #q{exclusive_consumer = {ChPid, _ConsumerTag}}) ->
947
911
                                          messages_unacknowledged]]);
948
912
i(consumers, _) ->
949
913
    consumer_count();
 
914
i(active_consumers, _) ->
 
915
    active_consumer_count();
950
916
i(memory, _) ->
951
917
    {memory, M} = process_info(self(), memory),
952
918
    M;
953
919
i(slave_pids, #q{q = #amqqueue{name = Name}}) ->
954
 
    case rabbit_amqqueue:lookup(Name) of
955
 
        {ok, #amqqueue{mirror_nodes = undefined}} -> [];
956
 
        {ok, #amqqueue{slave_pids = SPids}}       -> SPids
 
920
    {ok, Q = #amqqueue{slave_pids = SPids}} =
 
921
        rabbit_amqqueue:lookup(Name),
 
922
    case rabbit_mirror_queue_misc:is_mirrored(Q) of
 
923
        false -> '';
 
924
        true  -> SPids
 
925
    end;
 
926
i(synchronised_slave_pids, #q{q = #amqqueue{name = Name}}) ->
 
927
    {ok, Q = #amqqueue{sync_slave_pids = SSPids}} =
 
928
        rabbit_amqqueue:lookup(Name),
 
929
    case rabbit_mirror_queue_misc:is_mirrored(Q) of
 
930
        false -> '';
 
931
        true  -> SSPids
957
932
    end;
958
933
i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
959
934
    BQ:status(BQS);
1037
1012
                    q = #amqqueue{name = QName} = Q} = State,
1038
1013
                 gen_server2:reply(From, not_found),
1039
1014
                 case Recover of
1040
 
                     true -> ok;
1041
 
                     _    -> rabbit_log:warning(
1042
 
                               "Queue ~p exclusive owner went away~n", [QName])
 
1015
                     new -> rabbit_log:warning(
 
1016
                              "Queue ~p exclusive owner went away~n", [QName]);
 
1017
                     _   -> ok
1043
1018
                 end,
1044
1019
                 BQS = bq_init(BQ, Q, Recover),
1045
1020
                 %% Rely on terminate to delete the queue.
1058
1033
handle_call(consumers, _From, State) ->
1059
1034
    reply(consumers(State), State);
1060
1035
 
1061
 
handle_call({deliver, Delivery = #delivery{immediate = true}}, _From, State) ->
1062
 
    %% FIXME: Is this correct semantics?
1063
 
    %%
1064
 
    %% I'm worried in particular about the case where an exchange has
1065
 
    %% two queues against a particular routing key, and a message is
1066
 
    %% sent in immediate mode through the binding. In non-immediate
1067
 
    %% mode, both queues get the message, saving it for later if
1068
 
    %% there's noone ready to receive it just now. In immediate mode,
1069
 
    %% should both queues still get the message, somehow, or should
1070
 
    %% just all ready-to-consume queues get the message, with unready
1071
 
    %% queues discarding the message?
1072
 
    %%
1073
 
    Confirm = should_confirm_message(Delivery, State),
1074
 
    {Delivered, State1} = attempt_delivery(Delivery, Confirm, State),
1075
 
    reply(Delivered, case Delivered of
1076
 
                         true  -> maybe_record_confirm_message(Confirm, State1);
1077
 
                         false -> discard_delivery(Delivery, State1)
1078
 
                     end);
1079
 
 
1080
 
handle_call({deliver, Delivery = #delivery{mandatory = true}}, From, State) ->
1081
 
    gen_server2:reply(From, true),
1082
 
    noreply(deliver_or_enqueue(Delivery, State));
 
1036
handle_call({deliver, Delivery, Delivered}, From, State) ->
 
1037
    %% Synchronous, "mandatory" deliver mode.
 
1038
    gen_server2:reply(From, ok),
 
1039
    noreply(deliver_or_enqueue(Delivery, Delivered, State));
1083
1040
 
1084
1041
handle_call({notify_down, ChPid}, From, State) ->
1085
1042
    %% we want to do this synchronously, so that auto_deleted queues
1086
1043
    %% are no longer visible by the time we send a response to the
1087
1044
    %% client.  The queue is ultimately deleted in terminate/2; if we
1088
1045
    %% return stop with a reply, terminate/2 will be called by
1089
 
    %% gen_server2 *before* the reply is sent.
 
1046
    %% gen_server2 *before* the reply is sent. FIXME: in case of a
 
1047
    %% delayed stop the reply is sent earlier.
1090
1048
    case handle_ch_down(ChPid, State) of
1091
1049
        {ok, State1}   -> reply(ok, State1);
1092
 
        {stop, State1} -> stop_later(normal, From, ok, State1)
 
1050
        {stop, State1} -> stop(From, ok, State1)
1093
1051
    end;
1094
1052
 
1095
1053
handle_call({basic_get, ChPid, NoAck}, _From,
1164
1122
                                             State#q.active_consumers)},
1165
1123
            case should_auto_delete(State1) of
1166
1124
                false -> reply(ok, ensure_expiry_timer(State1));
1167
 
                true  -> stop_later(normal, From, ok, State1)
 
1125
                true  -> stop(From, ok, State1)
1168
1126
            end
1169
1127
    end;
1170
1128
 
1180
1138
    if
1181
1139
        IfEmpty and not(IsEmpty)   -> reply({error, not_empty}, State);
1182
1140
        IfUnused and not(IsUnused) -> reply({error, in_use}, State);
1183
 
        true                       -> stop_later(normal, From,
1184
 
                                                 {ok, BQ:len(BQS)}, State)
 
1141
        true                       -> stop(From, {ok, BQ:len(BQS)}, State)
1185
1142
    end;
1186
1143
 
1187
1144
handle_call(purge, _From, State = #q{backing_queue       = BQ,
1191
1148
 
1192
1149
handle_call({requeue, AckTags, ChPid}, From, State) ->
1193
1150
    gen_server2:reply(From, ok),
1194
 
    noreply(subtract_acks(
1195
 
              ChPid, AckTags, State,
1196
 
              fun (State1) -> requeue_and_run(AckTags, State1) end));
 
1151
    noreply(requeue(AckTags, ChPid, State));
 
1152
 
 
1153
handle_call(start_mirroring, _From, State = #q{backing_queue       = BQ,
 
1154
                                               backing_queue_state = BQS}) ->
 
1155
    %% lookup again to get policy for init_with_existing_bq
 
1156
    {ok, Q} = rabbit_amqqueue:lookup(qname(State)),
 
1157
    true = BQ =/= rabbit_mirror_queue_master, %% assertion
 
1158
    BQ1 = rabbit_mirror_queue_master,
 
1159
    BQS1 = BQ1:init_with_existing_bq(Q, BQ, BQS),
 
1160
    reply(ok, State#q{backing_queue       = BQ1,
 
1161
                      backing_queue_state = BQS1});
 
1162
 
 
1163
handle_call(stop_mirroring, _From, State = #q{backing_queue       = BQ,
 
1164
                                              backing_queue_state = BQS}) ->
 
1165
    BQ = rabbit_mirror_queue_master, %% assertion
 
1166
    {BQ1, BQS1} = BQ:stop_mirroring(BQS),
 
1167
    reply(ok, State#q{backing_queue       = BQ1,
 
1168
                      backing_queue_state = BQS1});
1197
1169
 
1198
1170
handle_call(force_event_refresh, _From,
1199
1171
            State = #q{exclusive_consumer = Exclusive}) ->
1219
1191
handle_cast(_, State = #q{delayed_stop = DS}) when DS =/= undefined ->
1220
1192
    noreply(State);
1221
1193
 
1222
 
handle_cast({run_backing_queue, Mod, Fun}, State) ->
1223
 
    noreply(run_backing_queue(Mod, Fun, State));
 
1194
handle_cast({run_backing_queue, Mod, Fun},
 
1195
            State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
 
1196
    noreply(run_message_queue(
 
1197
              State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)}));
1224
1198
 
1225
 
handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow},
 
1199
handle_cast({deliver, Delivery = #delivery{sender = Sender}, Delivered, Flow},
1226
1200
            State = #q{senders = Senders}) ->
1227
 
    %% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
 
1201
    %% Asynchronous, non-"mandatory" deliver mode.
1228
1202
    Senders1 = case Flow of
1229
1203
                   flow   -> credit_flow:ack(Sender),
1230
1204
                             pmon:monitor(Sender, Senders);
1231
1205
                   noflow -> Senders
1232
1206
               end,
1233
1207
    State1 = State#q{senders = Senders1},
1234
 
    noreply(deliver_or_enqueue(Delivery, State1));
 
1208
    noreply(deliver_or_enqueue(Delivery, Delivered, State1));
1235
1209
 
1236
1210
handle_cast({ack, AckTags, ChPid}, State) ->
 
1211
    noreply(ack(AckTags, ChPid, State));
 
1212
 
 
1213
handle_cast({reject, AckTags, true, ChPid}, State) ->
 
1214
    noreply(requeue(AckTags, ChPid, State));
 
1215
 
 
1216
handle_cast({reject, AckTags, false, ChPid}, State = #q{dlx = undefined}) ->
 
1217
    noreply(ack(AckTags, ChPid, State));
 
1218
 
 
1219
handle_cast({reject, AckTags, false, ChPid}, State) ->
 
1220
    DLXFun = dead_letter_fun(rejected),
1237
1221
    noreply(subtract_acks(
1238
1222
              ChPid, AckTags, State,
1239
1223
              fun (State1 = #q{backing_queue       = BQ,
1240
1224
                               backing_queue_state = BQS}) ->
1241
 
                      {_Guids, BQS1} = BQ:ack(AckTags, BQS),
 
1225
                      BQS1 = BQ:fold(fun(M, A) -> DLXFun([{M, A}]) end,
 
1226
                                     BQS, AckTags),
1242
1227
                      State1#q{backing_queue_state = BQS1}
1243
1228
              end));
1244
1229
 
1245
 
handle_cast({reject, AckTags, Requeue, ChPid}, State) ->
1246
 
    noreply(subtract_acks(
1247
 
              ChPid, AckTags, State,
1248
 
              case Requeue of
1249
 
                  true  -> fun (State1) -> requeue_and_run(AckTags, State1) end;
1250
 
                  false -> fun (State1 = #q{backing_queue       = BQ,
1251
 
                                            backing_queue_state = BQS}) ->
1252
 
                                   Fun = dead_letter_fun(rejected, State1),
1253
 
                                   BQS1 = BQ:fold(Fun, BQS, AckTags),
1254
 
                                   ack_if_no_dlx(
1255
 
                                     AckTags,
1256
 
                                     State1#q{backing_queue_state = BQS1})
1257
 
                           end
1258
 
              end));
1259
 
 
1260
1230
handle_cast(delete_immediately, State) ->
1261
 
    stop_later(normal, State);
 
1231
    stop(State);
1262
1232
 
1263
1233
handle_cast({unblock, ChPid}, State) ->
1264
1234
    noreply(
1301
1271
    ok = file_handle_cache:set_maximum_since_use(Age),
1302
1272
    noreply(State);
1303
1273
 
1304
 
handle_cast({dead_letter, {Msg, AckTag}, Reason}, State) ->
1305
 
    dead_letter_msg(Msg, AckTag, Reason, State).
 
1274
handle_cast({dead_letter, Msgs, Reason}, State = #q{dlx = XName}) ->
 
1275
    case rabbit_exchange:lookup(XName) of
 
1276
        {ok, X} ->
 
1277
            noreply(lists:foldl(
 
1278
                      fun({Msg, AckTag}, State1 = #q{publish_seqno  = SeqNo,
 
1279
                                                     unconfirmed    = UC,
 
1280
                                                     queue_monitors = QMon}) ->
 
1281
                              QPids = dead_letter_publish(Msg, Reason, X,
 
1282
                                                          State1),
 
1283
                              UC1   = dtree:insert(SeqNo, QPids, AckTag, UC),
 
1284
                              QMons = pmon:monitor_all(QPids, QMon),
 
1285
                              State1#q{queue_monitors = QMons,
 
1286
                                       publish_seqno  = SeqNo + 1,
 
1287
                                       unconfirmed    = UC1}
 
1288
                      end, State, Msgs));
 
1289
        {error, not_found} ->
 
1290
            cleanup_after_confirm([AckTag || {_, AckTag} <- Msgs], State)
 
1291
    end;
 
1292
 
 
1293
handle_cast(wake_up, State) ->
 
1294
    noreply(State).
1306
1295
 
1307
1296
%% We need to not ignore this as we need to remove outstanding
1308
1297
%% confirms due to queue death.
1315
1304
 
1316
1305
handle_info(maybe_expire, State) ->
1317
1306
    case is_unused(State) of
1318
 
        true  -> stop_later(normal, State);
 
1307
        true  -> stop(State);
1319
1308
        false -> noreply(ensure_expiry_timer(State))
1320
1309
    end;
1321
1310
 
1323
1312
    noreply(drop_expired_messages(State#q{ttl_timer_ref = undefined}));
1324
1313
 
1325
1314
handle_info(emit_stats, State) ->
1326
 
    %% Do not invoke noreply as it would see no timer and create a new one.
1327
1315
    emit_stats(State),
1328
 
    State1 = rabbit_event:reset_stats_timer(State, #q.stats_timer),
1329
 
    assert_invariant(State1),
1330
 
    {noreply, State1, hibernate};
 
1316
    {noreply, State1, Timeout} = noreply(State),
 
1317
    %% Need to reset *after* we've been through noreply/1 so we do not
 
1318
    %% just create another timer always and therefore never hibernate
 
1319
    {noreply, rabbit_event:reset_stats_timer(State1, #q.stats_timer), Timeout};
1331
1320
 
1332
1321
handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
1333
1322
            State = #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
1337
1326
    %% match what people expect (see bug 21824). However we need this
1338
1327
    %% monitor-and-async- delete in case the connection goes away
1339
1328
    %% unexpectedly.
1340
 
    stop_later(normal, State);
 
1329
    stop(State);
1341
1330
 
1342
1331
handle_info({'DOWN', _MonitorRef, process, DownPid, Reason}, State) ->
1343
1332
    case handle_ch_down(DownPid, State) of
1344
1333
        {ok, State1}   -> handle_queue_down(DownPid, Reason, State1);
1345
 
        {stop, State1} -> stop_later(normal, State1)
 
1334
        {stop, State1} -> stop(State1)
1346
1335
    end;
1347
1336
 
1348
1337
handle_info(update_ram_duration, State = #q{backing_queue = BQ,