87
87
-spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(),
88
88
rabbit_types:maybe(rabbit_types:content())) -> 'ok').
89
89
-spec(shutdown/1 :: (pid()) -> 'ok').
90
-spec(send_command/2 :: (pid(), rabbit_framing:amqp_method()) -> 'ok').
90
-spec(send_command/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
92
92
(pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg())
94
94
-spec(flushed/2 :: (pid(), pid()) -> 'ok').
95
95
-spec(list/0 :: () -> [pid()]).
96
-spec(info_keys/0 :: () -> [rabbit_types:info_key()]).
97
-spec(info/1 :: (pid()) -> [rabbit_types:info()]).
98
-spec(info/2 :: (pid(), [rabbit_types:info_key()]) -> [rabbit_types:info()]).
99
-spec(info_all/0 :: () -> [[rabbit_types:info()]]).
100
-spec(info_all/1 :: ([rabbit_types:info_key()]) -> [[rabbit_types:info()]]).
96
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
97
-spec(info/1 :: (pid()) -> rabbit_types:infos()).
98
-spec(info/2 :: (pid(), rabbit_types:info_keys()) -> rabbit_types:infos()).
99
-spec(info_all/0 :: () -> [rabbit_types:infos()]).
100
-spec(info_all/1 :: (rabbit_types:info_keys()) -> [rabbit_types:infos()]).
101
101
-spec(emit_stats/1 :: (pid()) -> 'ok').
235
235
handle_cast({flushed, QPid}, State) ->
236
{noreply, queue_blocked(QPid, State)};
236
{noreply, queue_blocked(QPid, State), hibernate};
238
238
handle_cast(terminate, State) ->
239
239
{stop, normal, State};
245
245
handle_cast({deliver, ConsumerTag, AckRequired, Msg},
246
246
State = #ch{writer_pid = WriterPid,
247
247
next_tag = DeliveryTag}) ->
248
State1 = lock_message(AckRequired, {DeliveryTag, ConsumerTag, Msg}, State),
248
State1 = lock_message(AckRequired,
249
ack_record(DeliveryTag, ConsumerTag, Msg),
249
251
ok = internal_deliver(WriterPid, true, ConsumerTag, DeliveryTag, Msg),
250
252
{_QName, QPid, _MsgId, _Redelivered, _Msg} = Msg,
251
253
maybe_incr_stats([{QPid, 1}],
258
260
handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) ->
259
261
internal_emit_stats(State),
261
State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}}.
263
State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)},
263
266
handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) ->
264
267
erase_queue_stats(QPid),
265
{noreply, queue_blocked(QPid, State)}.
268
{noreply, queue_blocked(QPid, State), hibernate}.
267
270
handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) ->
268
271
ok = clear_permission_cache(),
269
rabbit_event:if_enabled(StatsTimer, fun () ->
270
internal_emit_stats(State)
272
rabbit_event:if_enabled(StatsTimer,
275
State, [{idle_since, now()}])
273
278
State#ch{stats_timer = rabbit_event:stop_stats_timer(StatsTimer)}}.
503
508
#basic_message{exchange_name = ExchangeName,
504
509
routing_key = RoutingKey,
505
510
content = Content}}} ->
506
State1 = lock_message(not(NoAck), {DeliveryTag, none, Msg}, State),
511
State1 = lock_message(not(NoAck),
512
ack_record(DeliveryTag, none, Msg),
507
514
maybe_incr_stats([{QPid, 1}],
509
516
true -> get_no_ack;
639
646
%% variant of this method
640
647
{noreply, State#ch{unacked_message_q = queue:new()}};
642
handle_method(#'basic.recover_async'{requeue = false},
643
_, State = #ch{writer_pid = WriterPid,
644
unacked_message_q = UAMQ}) ->
645
ok = rabbit_misc:queue_fold(
646
fun ({_DeliveryTag, none, _Msg}, ok) ->
647
%% Was sent as a basic.get_ok. Don't redeliver
648
%% it. FIXME: appropriate?
650
({DeliveryTag, ConsumerTag,
651
{QName, QPid, MsgId, _Redelivered, Message}}, ok) ->
652
%% Was sent as a proper consumer delivery. Resend
655
%% FIXME: What should happen if the consumer's been
658
%% FIXME: should we allocate a fresh DeliveryTag?
660
WriterPid, false, ConsumerTag, DeliveryTag,
661
{QName, QPid, MsgId, true, Message})
663
%% No answer required - basic.recover is the newer, synchronous
664
%% variant of this method
649
handle_method(#'basic.recover_async'{requeue = false}, _, _State) ->
650
rabbit_misc:protocol_error(not_implemented, "requeue=false", []);
667
652
handle_method(#'basic.recover'{requeue = Requeue}, Content, State) ->
668
653
{noreply, State2 = #ch{writer_pid = WriterPid}} =
984
969
routing_key = RoutingKey},
972
ack_record(DeliveryTag, ConsumerTag,
973
_MsgStruct = {_QName, QPid, MsgId, _Redelivered, _Msg}) ->
974
{DeliveryTag, ConsumerTag, {QPid, MsgId}}.
987
976
collect_acks(Q, 0, true) ->
988
977
{Q, queue:new()};
989
978
collect_acks(Q, DeliveryTag, Multiple) ->
1057
1046
fold_per_queue(F, Acc0, UAQ) ->
1058
1047
D = rabbit_misc:queue_fold(
1060
{_QName, QPid, MsgId, _Redelivered, _Message}}, D) ->
1048
fun ({_DTag, _CTag, {QPid, MsgId}}, D) ->
1061
1049
%% dict:append would avoid the lists:reverse in
1062
1050
%% handle_message({recover, true}, ...). However, it
1063
1051
%% is significantly slower when going beyond a few
1200
1188
put({Type, QX},
1201
1189
orddict:store(Measure, Cur + Inc, Measures)).
1203
internal_emit_stats(State = #ch{stats_timer = StatsTimer}) ->
1191
internal_emit_stats(State) ->
1192
internal_emit_stats(State, []).
1194
internal_emit_stats(State = #ch{stats_timer = StatsTimer}, Extra) ->
1204
1195
CoarseStats = infos(?STATISTICS_KEYS, State),
1205
1196
case rabbit_event:stats_level(StatsTimer) of
1207
rabbit_event:notify(channel_stats, CoarseStats);
1198
rabbit_event:notify(channel_stats, Extra ++ CoarseStats);
1210
1201
[{channel_queue_stats,
1214
1205
{channel_queue_exchange_stats,
1215
1206
[{QX, Stats} ||
1216
1207
{{queue_exchange_stats, QX}, Stats} <- get()]}],
1217
rabbit_event:notify(channel_stats, CoarseStats ++ FineStats)
1208
rabbit_event:notify(channel_stats,
1209
Extra ++ CoarseStats ++ FineStats)
1220
1212
erase_queue_stats(QPid) ->