18
18
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
19
19
%% Technologies LLC, and Rabbit Technologies Ltd.
21
%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
21
%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
22
22
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
23
%% Copyright (C) 2007-2009 Cohesive Financial Technologies
23
%% Copyright (C) 2007-2010 Cohesive Financial Technologies
24
24
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
25
%% (C) 2007-2009 Rabbit Technologies Ltd.
25
%% (C) 2007-2010 Rabbit Technologies Ltd.
27
27
%% All Rights Reserved.
166
170
%% as a side effect this also starts monitoring the channel (if
167
171
%% that wasn't happening already)
168
172
store_ch_record((ch_record(ChPid))#cr{txn = Txn}).
170
deliver_immediately(Message, Delivered,
174
deliver_immediately(Message, IsDelivered,
171
175
State = #q{q = #amqqueue{name = QName},
172
176
active_consumers = ActiveConsumers,
173
177
blocked_consumers = BlockedConsumers,
174
178
next_msg_id = NextId}) ->
175
?LOGDEBUG("AMQQUEUE ~p DELIVERY:~n~p~n", [QName, Message]),
176
179
case queue:out(ActiveConsumers) of
177
180
{{value, QEntry = {ChPid, #consumer{tag = ConsumerTag,
178
181
ack_required = AckRequired}}},
185
188
rabbit_channel:deliver(
186
189
ChPid, ConsumerTag, AckRequired,
187
{QName, self(), NextId, Delivered, Message}),
190
{QName, self(), NextId, IsDelivered, Message}),
188
191
NewUAM = case AckRequired of
189
192
true -> dict:store(NextId, Message, UAM);
223
226
{not_offered, State}
229
run_message_queue(State = #q{message_buffer = MessageBuffer}) ->
230
run_message_queue(MessageBuffer, State).
232
run_message_queue(MessageBuffer, State) ->
233
case queue:out(MessageBuffer) of
234
{{value, {Message, IsDelivered}}, BufferTail} ->
235
case deliver_immediately(Message, IsDelivered, State) of
236
{offered, true, NewState} ->
237
persist_delivery(qname(State), Message, IsDelivered),
238
run_message_queue(BufferTail, NewState);
239
{offered, false, NewState} ->
240
persist_auto_ack(qname(State), Message),
241
run_message_queue(BufferTail, NewState);
242
{not_offered, NewState} ->
243
NewState#q{message_buffer = MessageBuffer}
246
State#q{message_buffer = MessageBuffer}
226
249
attempt_delivery(none, _ChPid, Message, State) ->
227
250
case deliver_immediately(Message, false, State) of
228
251
{offered, false, State1} ->
252
275
deliver_or_enqueue_n(Messages, State = #q{message_buffer = MessageBuffer}) ->
253
run_poke_burst(queue:join(MessageBuffer, queue:from_list(Messages)),
276
run_message_queue(queue:join(MessageBuffer, queue:from_list(Messages)),
256
279
add_consumer(ChPid, Consumer, Queue) -> queue:in({ChPid, Consumer}, Queue).
281
304
store_ch_record(NewC),
282
305
case ch_record_state_transition(C, NewC) of
284
unblock -> {NewBlockedeConsumers, NewActiveConsumers} =
307
unblock -> {NewBlockedConsumers, NewActiveConsumers} =
285
308
move_consumers(ChPid,
286
309
State#q.blocked_consumers,
287
310
State#q.active_consumers),
289
312
State#q{active_consumers = NewActiveConsumers,
290
blocked_consumers = NewBlockedeConsumers})
313
blocked_consumers = NewBlockedConsumers})
294
317
should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false;
295
318
should_auto_delete(#q{has_had_consumers = false}) -> false;
296
319
should_auto_delete(State) -> is_unused(State).
298
321
handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
299
322
case lookup_ch(DownPid) of
300
not_found -> noreply(State);
301
325
#cr{monitor_ref = MonitorRef, ch_pid = ChPid, txn = Txn,
302
326
unacked_messages = UAM} ->
303
327
erlang:demonitor(MonitorRef),
304
328
erase({ch, ChPid}),
307
_ -> ok = rollback_work(Txn, qname(State)),
311
deliver_or_enqueue_n(
313
{_Messsage_id, Message} <- dict:to_list(UAM)],
315
exclusive_consumer = case Holder of
319
active_consumers = remove_consumers(
320
ChPid, State#q.active_consumers),
321
blocked_consumers = remove_consumers(
322
ChPid, State#q.blocked_consumers)}),
323
case should_auto_delete(NewState) of
324
false -> noreply(NewState);
325
true -> {stop, normal, NewState}
330
exclusive_consumer = case Holder of
334
active_consumers = remove_consumers(
335
ChPid, State#q.active_consumers),
336
blocked_consumers = remove_consumers(
337
ChPid, State#q.blocked_consumers)},
338
case should_auto_delete(State1) of
339
true -> {stop, State1};
342
_ -> ok = rollback_work(Txn, qname(State1)),
345
{ok, deliver_or_enqueue_n(
347
{_MsgId, Message} <- dict:to_list(UAM)],
348
run_poke_burst(State = #q{message_buffer = MessageBuffer}) ->
349
run_poke_burst(MessageBuffer, State).
351
run_poke_burst(MessageBuffer, State) ->
352
case queue:out(MessageBuffer) of
353
{{value, {Message, Delivered}}, BufferTail} ->
354
case deliver_immediately(Message, Delivered, State) of
355
{offered, true, NewState} ->
356
persist_delivery(qname(State), Message, Delivered),
357
run_poke_burst(BufferTail, NewState);
358
{offered, false, NewState} ->
359
persist_auto_ack(qname(State), Message),
360
run_poke_burst(BufferTail, NewState);
361
{not_offered, NewState} ->
362
NewState#q{message_buffer = MessageBuffer}
365
State#q{message_buffer = MessageBuffer}
368
371
is_unused(State) -> queue:is_empty(State#q.active_consumers) andalso
369
372
queue:is_empty(State#q.blocked_consumers).
389
392
persist_delivery(_QName, #basic_message{persistent_key = none},
392
395
persist_delivery(QName, #basic_message{persistent_key = PKey},
394
397
persist_work(none, QName, [{deliver, {QName, PKey}}]).
396
399
persist_acks(Txn, QName, Messages) ->
491
494
purge_message_buffer(QName, MessageBuffer) ->
493
[[Message || {Message, _Delivered} <-
496
[[Message || {Message, _IsDelivered} <-
494
497
queue:to_list(MessageBuffer)] |
496
499
fun (#cr{unacked_messages = UAM}) ->
497
[Message || {_MessageId, Message} <- dict:to_list(UAM)]
500
[Message || {_MsgId, Message} <- dict:to_list(UAM)]
499
502
all_ch_record())],
500
503
%% the simplest, though certainly not the most obvious or
510
513
i(arguments, #q{q = #amqqueue{arguments = Arguments}}) -> Arguments;
516
i(owner_pid, #q{owner = none}) ->
518
i(owner_pid, #q{owner = {ReaderPid, _MonitorRef}}) ->
520
i(exclusive_consumer_pid, #q{exclusive_consumer = none}) ->
522
i(exclusive_consumer_pid, #q{exclusive_consumer = {ChPid, _ConsumerTag}}) ->
524
i(exclusive_consumer_tag, #q{exclusive_consumer = none}) ->
526
i(exclusive_consumer_tag, #q{exclusive_consumer = {_ChPid, ConsumerTag}}) ->
513
528
i(messages_ready, #q{message_buffer = MessageBuffer}) ->
514
529
queue:len(MessageBuffer);
515
530
i(messages_unacknowledged, _) ->
546
561
catch Error -> reply({error, Error}, State)
564
handle_call(consumers, _From,
565
State = #q{active_consumers = ActiveConsumers,
566
blocked_consumers = BlockedConsumers}) ->
567
reply(rabbit_misc:queue_fold(
568
fun ({ChPid, #consumer{tag = ConsumerTag,
569
ack_required = AckRequired}}, Acc) ->
570
[{ChPid, ConsumerTag, AckRequired} | Acc]
571
end, [], queue:join(ActiveConsumers, BlockedConsumers)), State);
549
573
handle_call({deliver_immediately, Txn, Message, ChPid}, _From, State) ->
550
574
%% Synchronous, "immediate" delivery mode
577
601
noreply(NewState);
579
handle_call({notify_down, ChPid}, From, State) ->
580
%% optimisation: we reply straight away so the sender can continue
581
gen_server2:reply(From, ok),
582
handle_ch_down(ChPid, State);
603
handle_call({notify_down, ChPid}, _From, State) ->
604
%% we want to do this synchronously, so that auto_deleted queues
605
%% are no longer visible by the time we send a response to the
606
%% client. The queue is ultimately deleted in terminate/2; if we
607
%% return stop with a reply, terminate/2 will be called by
608
%% gen_server2 *before* the reply is sent.
609
case handle_ch_down(ChPid, State) of
610
{ok, NewState} -> reply(ok, NewState);
611
{stop, NewState} -> {stop, normal, ok, NewState}
584
614
handle_call({basic_get, ChPid, NoAck}, _From,
585
615
State = #q{q = #amqqueue{name = QName},
586
616
next_msg_id = NextId,
587
617
message_buffer = MessageBuffer}) ->
588
618
case queue:out(MessageBuffer) of
589
{{value, {Message, Delivered}}, BufferTail} ->
619
{{value, {Message, IsDelivered}}, BufferTail} ->
590
620
AckRequired = not(NoAck),
591
621
case AckRequired of
593
persist_delivery(QName, Message, Delivered),
623
persist_delivery(QName, Message, IsDelivered),
594
624
C = #cr{unacked_messages = UAM} = ch_record(ChPid),
595
625
NewUAM = dict:store(NextId, Message, UAM),
596
626
store_ch_record(C#cr{unacked_messages = NewUAM});
598
628
persist_auto_ack(QName, Message)
600
Msg = {QName, self(), NextId, Delivered, Message},
630
Msg = {QName, self(), NextId, IsDelivered, Message},
601
631
reply({ok, queue:len(BufferTail), Msg},
602
632
State#q{message_buffer = BufferTail,
603
633
next_msg_id = NextId + 1});
623
653
ack_required = not(NoAck)},
624
654
store_ch_record(C#cr{consumer_count = ConsumerCount +1,
625
655
limiter_pid = LimiterPid}),
626
if ConsumerCount == 0 ->
627
ok = rabbit_limiter:register(LimiterPid, self());
656
case ConsumerCount of
657
0 -> ok = rabbit_limiter:register(LimiterPid, self());
632
if ExclusiveConsume -> {ChPid, ConsumerTag};
633
true -> ExistingHolder
660
ExclusiveConsumer = case ExclusiveConsume of
661
true -> {ChPid, ConsumerTag};
662
false -> ExistingHolder
635
664
State1 = State#q{has_had_consumers = true,
636
665
exclusive_consumer = ExclusiveConsumer},
637
666
ok = maybe_send_reply(ChPid, OkMsg),
661
690
reply(ok, State);
662
691
C = #cr{consumer_count = ConsumerCount, limiter_pid = LimiterPid} ->
663
692
store_ch_record(C#cr{consumer_count = ConsumerCount - 1}),
664
if ConsumerCount == 1 ->
665
ok = rabbit_limiter:unregister(LimiterPid, self());
693
case ConsumerCount of
694
1 -> ok = rabbit_limiter:unregister(LimiterPid, self());
669
697
ok = maybe_send_reply(ChPid, OkMsg),
686
714
handle_call(stat, _From, State = #q{q = #amqqueue{name = Name},
687
715
message_buffer = MessageBuffer,
688
716
active_consumers = ActiveConsumers}) ->
689
reply({ok, Name, queue:len(MessageBuffer), queue:len(ActiveConsumers)},
717
Length = queue:len(MessageBuffer),
718
reply({ok, Name, Length, queue:len(ActiveConsumers)}, State);
692
720
handle_call({delete, IfUnused, IfEmpty}, _From,
693
721
State = #q{message_buffer = MessageBuffer}) ->
707
735
reply({ok, queue:len(MessageBuffer)},
708
736
State#q{message_buffer = queue:new()});
710
handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner,
711
exclusive_consumer = Holder}) ->
738
handle_call({claim_queue, ReaderPid}, _From,
739
State = #q{owner = Owner, exclusive_consumer = Holder}) ->
714
742
case check_exclusive_access(Holder, true, State) of
813
842
NewState = State#q{owner = none},
814
843
{stop, normal, NewState};
815
844
handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
816
handle_ch_down(DownPid, State);
845
case handle_ch_down(DownPid, State) of
846
{ok, NewState} -> noreply(NewState);
847
{stop, NewState} -> {stop, normal, NewState}
818
850
handle_info(Info, State) ->
819
851
?LOGDEBUG("Info in queue: ~p~n", [Info]),