~ubuntu-branches/ubuntu/lucid/rabbitmq-server/lucid

« back to all changes in this revision

Viewing changes to src/rabbit_amqqueue_process.erl

  • Committer: Bazaar Package Importer
  • Author(s): John Leuner
  • Date: 2010-02-19 17:30:57 UTC
  • mfrom: (0.1.9 sid)
  • Revision ID: james.westby@ubuntu.com-20100219173057-84hlnj2bsm1rvoaf
Tags: 1.7.2-1
New upstream release

Show diffs side-by-side

added added

removed removed

Lines of Context:
18
18
%%   are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
19
19
%%   Technologies LLC, and Rabbit Technologies Ltd.
20
20
%%
21
 
%%   Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
 
21
%%   Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
22
22
%%   Ltd. Portions created by Cohesive Financial Technologies LLC are
23
 
%%   Copyright (C) 2007-2009 Cohesive Financial Technologies
 
23
%%   Copyright (C) 2007-2010 Cohesive Financial Technologies
24
24
%%   LLC. Portions created by Rabbit Technologies Ltd are Copyright
25
 
%%   (C) 2007-2009 Rabbit Technologies Ltd.
 
25
%%   (C) 2007-2010 Rabbit Technologies Ltd.
26
26
%%
27
27
%%   All Rights Reserved.
28
28
%%
39
39
-define(HIBERNATE_AFTER_MIN, 1000).
40
40
-define(DESIRED_HIBERNATE, 10000).
41
41
 
42
 
-export([start_link/1]).
 
42
-export([start_link/1, info_keys/0]).
43
43
 
44
44
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]).
45
45
 
77
77
         auto_delete,
78
78
         arguments,
79
79
         pid,
 
80
         owner_pid,
 
81
         exclusive_consumer_pid,
 
82
         exclusive_consumer_tag,
80
83
         messages_ready,
81
84
         messages_unacknowledged,
82
85
         messages_uncommitted,
85
88
         consumers,
86
89
         transactions,
87
90
         memory]).
88
 
         
 
91
 
89
92
%%----------------------------------------------------------------------------
90
93
 
91
 
start_link(Q) ->
92
 
    gen_server2:start_link(?MODULE, Q, []).
 
94
start_link(Q) -> gen_server2:start_link(?MODULE, Q, []).
93
95
 
 
96
info_keys() -> ?INFO_KEYS.
 
97
    
94
98
%%----------------------------------------------------------------------------
95
99
 
96
100
init(Q) ->
166
170
    %% as a side effect this also starts monitoring the channel (if
167
171
    %% that wasn't happening already)
168
172
    store_ch_record((ch_record(ChPid))#cr{txn = Txn}).
169
 
    
170
 
deliver_immediately(Message, Delivered,
 
173
 
 
174
deliver_immediately(Message, IsDelivered,
171
175
                    State = #q{q = #amqqueue{name = QName},
172
176
                               active_consumers = ActiveConsumers,
173
177
                               blocked_consumers = BlockedConsumers,
174
178
                               next_msg_id = NextId}) ->
175
 
    ?LOGDEBUG("AMQQUEUE ~p DELIVERY:~n~p~n", [QName, Message]),
176
179
    case queue:out(ActiveConsumers) of
177
180
        {{value, QEntry = {ChPid, #consumer{tag = ConsumerTag,
178
181
                                            ack_required = AckRequired}}},
184
187
                true ->
185
188
                    rabbit_channel:deliver(
186
189
                      ChPid, ConsumerTag, AckRequired,
187
 
                      {QName, self(), NextId, Delivered, Message}),
 
190
                      {QName, self(), NextId, IsDelivered, Message}),
188
191
                    NewUAM = case AckRequired of
189
192
                                 true  -> dict:store(NextId, Message, UAM);
190
193
                                 false -> UAM
215
218
                                       ActiveConsumers,
216
219
                                       BlockedConsumers),
217
220
                    deliver_immediately(
218
 
                      Message, Delivered,
 
221
                      Message, IsDelivered,
219
222
                      State#q{active_consumers = NewActiveConsumers,
220
223
                              blocked_consumers = NewBlockedConsumers})
221
224
            end;
223
226
            {not_offered, State}
224
227
    end.
225
228
 
 
229
run_message_queue(State = #q{message_buffer = MessageBuffer}) ->
 
230
    run_message_queue(MessageBuffer, State).
 
231
 
 
232
run_message_queue(MessageBuffer, State) ->
 
233
    case queue:out(MessageBuffer) of
 
234
        {{value, {Message, IsDelivered}}, BufferTail} ->
 
235
            case deliver_immediately(Message, IsDelivered, State) of
 
236
                {offered, true, NewState} ->
 
237
                    persist_delivery(qname(State), Message, IsDelivered),
 
238
                    run_message_queue(BufferTail, NewState);
 
239
                {offered, false, NewState} ->
 
240
                    persist_auto_ack(qname(State), Message),
 
241
                    run_message_queue(BufferTail, NewState);
 
242
                {not_offered, NewState} ->
 
243
                    NewState#q{message_buffer = MessageBuffer}
 
244
            end;
 
245
        {empty, _} ->
 
246
            State#q{message_buffer = MessageBuffer}
 
247
    end.
 
248
 
226
249
attempt_delivery(none, _ChPid, Message, State) ->
227
250
    case deliver_immediately(Message, false, State) of
228
251
        {offered, false, State1} ->
250
273
    end.
251
274
 
252
275
deliver_or_enqueue_n(Messages, State = #q{message_buffer = MessageBuffer}) ->
253
 
    run_poke_burst(queue:join(MessageBuffer, queue:from_list(Messages)),
254
 
                   State).
 
276
    run_message_queue(queue:join(MessageBuffer, queue:from_list(Messages)),
 
277
                      State).
255
278
 
256
279
add_consumer(ChPid, Consumer, Queue) -> queue:in({ChPid, Consumer}, Queue).
257
280
 
281
304
            store_ch_record(NewC),
282
305
            case ch_record_state_transition(C, NewC) of
283
306
                ok      -> State;
284
 
                unblock -> {NewBlockedeConsumers, NewActiveConsumers} =
 
307
                unblock -> {NewBlockedConsumers, NewActiveConsumers} =
285
308
                               move_consumers(ChPid,
286
309
                                              State#q.blocked_consumers,
287
310
                                              State#q.active_consumers),
288
 
                           run_poke_burst(
 
311
                           run_message_queue(
289
312
                             State#q{active_consumers = NewActiveConsumers,
290
 
                                     blocked_consumers = NewBlockedeConsumers})
 
313
                                     blocked_consumers = NewBlockedConsumers})
291
314
            end
292
315
    end.
293
 
    
 
316
 
294
317
should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false;
295
318
should_auto_delete(#q{has_had_consumers = false}) -> false;
296
319
should_auto_delete(State) -> is_unused(State).
297
320
 
298
321
handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
299
322
    case lookup_ch(DownPid) of
300
 
        not_found -> noreply(State);
 
323
        not_found ->
 
324
            {ok, State};
301
325
        #cr{monitor_ref = MonitorRef, ch_pid = ChPid, txn = Txn,
302
326
            unacked_messages = UAM} ->
303
327
            erlang:demonitor(MonitorRef),
304
328
            erase({ch, ChPid}),
305
 
            case Txn of
306
 
                none -> ok;
307
 
                _    -> ok = rollback_work(Txn, qname(State)),
308
 
                        erase_tx(Txn)
309
 
            end,
310
 
            NewState =
311
 
                deliver_or_enqueue_n(
312
 
                  [{Message, true} ||
313
 
                      {_Messsage_id, Message} <- dict:to_list(UAM)],
314
 
                  State#q{
315
 
                    exclusive_consumer = case Holder of
316
 
                                             {ChPid, _} -> none;
317
 
                                             Other -> Other
318
 
                                         end,
319
 
                    active_consumers = remove_consumers(
320
 
                                         ChPid, State#q.active_consumers),
321
 
                    blocked_consumers = remove_consumers(
322
 
                                          ChPid, State#q.blocked_consumers)}),
323
 
            case should_auto_delete(NewState) of
324
 
                false -> noreply(NewState);
325
 
                true  -> {stop, normal, NewState}
 
329
            State1 = State#q{
 
330
                       exclusive_consumer = case Holder of
 
331
                                                {ChPid, _} -> none;
 
332
                                                Other      -> Other
 
333
                                            end,
 
334
                       active_consumers = remove_consumers(
 
335
                                            ChPid, State#q.active_consumers),
 
336
                       blocked_consumers = remove_consumers(
 
337
                                             ChPid, State#q.blocked_consumers)},
 
338
            case should_auto_delete(State1) of
 
339
                true  -> {stop, State1};
 
340
                false -> case Txn of
 
341
                             none -> ok;
 
342
                             _    -> ok = rollback_work(Txn, qname(State1)),
 
343
                                     erase_tx(Txn)
 
344
                         end,
 
345
                         {ok, deliver_or_enqueue_n(
 
346
                                [{Message, true} ||
 
347
                                    {_MsgId, Message} <- dict:to_list(UAM)],
 
348
                                State1)}
326
349
            end
327
350
    end.
328
351
 
345
368
        false -> in_use
346
369
    end.
347
370
 
348
 
run_poke_burst(State = #q{message_buffer = MessageBuffer}) ->
349
 
    run_poke_burst(MessageBuffer, State).
350
 
 
351
 
run_poke_burst(MessageBuffer, State) ->
352
 
    case queue:out(MessageBuffer) of
353
 
        {{value, {Message, Delivered}}, BufferTail} ->
354
 
            case deliver_immediately(Message, Delivered, State) of
355
 
                {offered, true, NewState} ->
356
 
                    persist_delivery(qname(State), Message, Delivered),
357
 
                    run_poke_burst(BufferTail, NewState);
358
 
                {offered, false, NewState} ->
359
 
                    persist_auto_ack(qname(State), Message),
360
 
                    run_poke_burst(BufferTail, NewState);
361
 
                {not_offered, NewState} ->
362
 
                    NewState#q{message_buffer = MessageBuffer}
363
 
            end;
364
 
        {empty, _} ->
365
 
            State#q{message_buffer = MessageBuffer}
366
 
    end.
367
 
 
368
371
is_unused(State) -> queue:is_empty(State#q.active_consumers) andalso
369
372
                        queue:is_empty(State#q.blocked_consumers).
370
373
 
387
390
                 true) ->
388
391
    ok;
389
392
persist_delivery(_QName, #basic_message{persistent_key = none},
390
 
                 _Delivered) ->
 
393
                 _IsDelivered) ->
391
394
    ok;
392
395
persist_delivery(QName, #basic_message{persistent_key = PKey},
393
 
                 _Delivered) ->
 
396
                 _IsDelivered) ->
394
397
    persist_work(none, QName, [{deliver, {QName, PKey}}]).
395
398
 
396
399
persist_acks(Txn, QName, Messages) ->
453
456
mark_tx_persistent(Txn) ->
454
457
    Tx = lookup_tx(Txn),
455
458
    store_tx(Txn, Tx#tx{is_persistent = true}).
456
 
    
 
459
 
457
460
is_tx_persistent(Txn) ->
458
461
    #tx{is_persistent = Res} = lookup_tx(Txn),
459
462
    Res.
490
493
 
491
494
purge_message_buffer(QName, MessageBuffer) ->
492
495
    Messages =
493
 
        [[Message || {Message, _Delivered} <-
 
496
        [[Message || {Message, _IsDelivered} <-
494
497
                         queue:to_list(MessageBuffer)] |
495
498
         lists:map(
496
499
           fun (#cr{unacked_messages = UAM}) ->
497
 
                   [Message || {_MessageId, Message} <- dict:to_list(UAM)]
 
500
                   [Message || {_MsgId, Message} <- dict:to_list(UAM)]
498
501
           end,
499
502
           all_ch_record())],
500
503
    %% the simplest, though certainly not the most obvious or
510
513
i(arguments,   #q{q = #amqqueue{arguments   = Arguments}})  -> Arguments;
511
514
i(pid, _) ->
512
515
    self();
 
516
i(owner_pid, #q{owner = none}) ->
 
517
    '';
 
518
i(owner_pid, #q{owner = {ReaderPid, _MonitorRef}}) ->
 
519
    ReaderPid;
 
520
i(exclusive_consumer_pid, #q{exclusive_consumer = none}) ->
 
521
    '';
 
522
i(exclusive_consumer_pid, #q{exclusive_consumer = {ChPid, _ConsumerTag}}) ->
 
523
    ChPid;
 
524
i(exclusive_consumer_tag, #q{exclusive_consumer = none}) ->
 
525
    '';
 
526
i(exclusive_consumer_tag, #q{exclusive_consumer = {_ChPid, ConsumerTag}}) ->
 
527
    ConsumerTag;
513
528
i(messages_ready, #q{message_buffer = MessageBuffer}) ->
514
529
    queue:len(MessageBuffer);
515
530
i(messages_unacknowledged, _) ->
546
561
    catch Error -> reply({error, Error}, State)
547
562
    end;
548
563
 
 
564
handle_call(consumers, _From,
 
565
            State = #q{active_consumers = ActiveConsumers,
 
566
                       blocked_consumers = BlockedConsumers}) ->
 
567
    reply(rabbit_misc:queue_fold(
 
568
            fun ({ChPid, #consumer{tag = ConsumerTag,
 
569
                                   ack_required = AckRequired}}, Acc) ->
 
570
                    [{ChPid, ConsumerTag, AckRequired} | Acc]
 
571
            end, [], queue:join(ActiveConsumers, BlockedConsumers)), State);
 
572
 
549
573
handle_call({deliver_immediately, Txn, Message, ChPid}, _From, State) ->
550
574
    %% Synchronous, "immediate" delivery mode
551
575
    %%
576
600
    erase_tx(Txn),
577
601
    noreply(NewState);
578
602
 
579
 
handle_call({notify_down, ChPid}, From, State) ->
580
 
    %% optimisation: we reply straight away so the sender can continue
581
 
    gen_server2:reply(From, ok),
582
 
    handle_ch_down(ChPid, State);
 
603
handle_call({notify_down, ChPid}, _From, State) ->
 
604
    %% we want to do this synchronously, so that auto_deleted queues
 
605
    %% are no longer visible by the time we send a response to the
 
606
    %% client.  The queue is ultimately deleted in terminate/2; if we
 
607
    %% return stop with a reply, terminate/2 will be called by
 
608
    %% gen_server2 *before* the reply is sent.
 
609
    case handle_ch_down(ChPid, State) of
 
610
        {ok, NewState}   -> reply(ok, NewState);
 
611
        {stop, NewState} -> {stop, normal, ok, NewState}
 
612
    end;
583
613
 
584
614
handle_call({basic_get, ChPid, NoAck}, _From,
585
615
            State = #q{q = #amqqueue{name = QName},
586
616
                       next_msg_id = NextId,
587
617
                       message_buffer = MessageBuffer}) ->
588
618
    case queue:out(MessageBuffer) of
589
 
        {{value, {Message, Delivered}}, BufferTail} ->
 
619
        {{value, {Message, IsDelivered}}, BufferTail} ->
590
620
            AckRequired = not(NoAck),
591
621
            case AckRequired of
592
622
                true  ->
593
 
                    persist_delivery(QName, Message, Delivered),
 
623
                    persist_delivery(QName, Message, IsDelivered),
594
624
                    C = #cr{unacked_messages = UAM} = ch_record(ChPid),
595
625
                    NewUAM = dict:store(NextId, Message, UAM),
596
626
                    store_ch_record(C#cr{unacked_messages = NewUAM});
597
627
                false ->
598
628
                    persist_auto_ack(QName, Message)
599
629
            end,
600
 
            Msg = {QName, self(), NextId, Delivered, Message},
 
630
            Msg = {QName, self(), NextId, IsDelivered, Message},
601
631
            reply({ok, queue:len(BufferTail), Msg},
602
632
                  State#q{message_buffer = BufferTail,
603
633
                          next_msg_id = NextId + 1});
623
653
                                         ack_required = not(NoAck)},
624
654
                    store_ch_record(C#cr{consumer_count = ConsumerCount +1,
625
655
                                         limiter_pid = LimiterPid}),
626
 
                    if ConsumerCount == 0 ->
627
 
                            ok = rabbit_limiter:register(LimiterPid, self());
628
 
                       true ->
629
 
                            ok
 
656
                    case ConsumerCount of
 
657
                        0 -> ok = rabbit_limiter:register(LimiterPid, self());
 
658
                        _ -> ok
630
659
                    end,
631
 
                    ExclusiveConsumer =
632
 
                        if ExclusiveConsume -> {ChPid, ConsumerTag};
633
 
                           true             -> ExistingHolder
634
 
                        end,
 
660
                    ExclusiveConsumer = case ExclusiveConsume of
 
661
                                            true  -> {ChPid, ConsumerTag};
 
662
                                            false -> ExistingHolder
 
663
                                        end,
635
664
                    State1 = State#q{has_had_consumers = true,
636
665
                                     exclusive_consumer = ExclusiveConsumer},
637
666
                    ok = maybe_send_reply(ChPid, OkMsg),
642
671
                                       add_consumer(
643
672
                                         ChPid, Consumer,
644
673
                                         State1#q.blocked_consumers)};
645
 
                            false -> run_poke_burst(
 
674
                            false -> run_message_queue(
646
675
                                       State1#q{
647
676
                                         active_consumers =
648
677
                                         add_consumer(
661
690
            reply(ok, State);
662
691
        C = #cr{consumer_count = ConsumerCount, limiter_pid = LimiterPid} ->
663
692
            store_ch_record(C#cr{consumer_count = ConsumerCount - 1}),
664
 
            if ConsumerCount == 1 ->
665
 
                    ok = rabbit_limiter:unregister(LimiterPid, self());
666
 
               true ->
667
 
                    ok
 
693
            case ConsumerCount of
 
694
                1 -> ok = rabbit_limiter:unregister(LimiterPid, self());
 
695
                _ -> ok
668
696
            end,
669
697
            ok = maybe_send_reply(ChPid, OkMsg),
670
698
            NewState =
686
714
handle_call(stat, _From, State = #q{q = #amqqueue{name = Name},
687
715
                                    message_buffer = MessageBuffer,
688
716
                                    active_consumers = ActiveConsumers}) ->
689
 
    reply({ok, Name, queue:len(MessageBuffer), queue:len(ActiveConsumers)},
690
 
          State);
 
717
    Length = queue:len(MessageBuffer),
 
718
    reply({ok, Name, Length, queue:len(ActiveConsumers)}, State);
691
719
 
692
720
handle_call({delete, IfUnused, IfEmpty}, _From,
693
721
            State = #q{message_buffer = MessageBuffer}) ->
707
735
    reply({ok, queue:len(MessageBuffer)},
708
736
          State#q{message_buffer = queue:new()});
709
737
 
710
 
handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner,
711
 
                                                        exclusive_consumer = Holder}) ->
 
738
handle_call({claim_queue, ReaderPid}, _From,
 
739
            State = #q{owner = Owner, exclusive_consumer = Holder}) ->
712
740
    case Owner of
713
741
        none ->
714
742
            case check_exclusive_access(Holder, true, State) of
721
749
                    %% pid...
722
750
                    reply(locked, State);
723
751
                ok ->
724
 
                    reply(ok, State#q{owner = {ReaderPid, erlang:monitor(process, ReaderPid)}})
 
752
                    MonitorRef = erlang:monitor(process, ReaderPid),
 
753
                    reply(ok, State#q{owner = {ReaderPid, MonitorRef}})
725
754
            end;
726
755
        {ReaderPid, _MonitorRef} ->
727
756
            reply(ok, State);
813
842
    NewState = State#q{owner = none},
814
843
    {stop, normal, NewState};
815
844
handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
816
 
    handle_ch_down(DownPid, State);
 
845
    case handle_ch_down(DownPid, State) of
 
846
        {ok, NewState}   -> noreply(NewState);
 
847
        {stop, NewState} -> {stop, normal, NewState}
 
848
    end;
817
849
 
818
850
handle_info(Info, State) ->
819
851
    ?LOGDEBUG("Info in queue: ~p~n", [Info]),