~lynxman/ubuntu/oneiric/rabbitmq-server/lp_838959

« back to all changes in this revision

Viewing changes to src/rabbit_channel.erl

  • Committer: Elliot Murphy
  • Date: 2010-11-30 15:12:51 UTC
  • mfrom: (0.2.9 upstream)
  • Revision ID: elliot@elliotmurphy.com-20101130151251-ziiiaf3kbomv3yse
Tags: 2.2.0-0ubuntu1
* New upstream release, fixes FTBFS for 2.1.1-1ubuntu1
  - fix issue that causes cross-cluster communication to
    deadlock after sustained cluster activity
  - fix queue memory leak when using the management plugin
    or other consumers of queue statistics
  - brokers started with rabbitmq_multi.bat are now restartable
  - clustering reset no longer destroys installed plugins
  - fix race condition between queue declaration and connection
    termination that causes spurious noproc errors to appear
    in the log
  - fix memory leak when long-running channels consume and
    cancel on many queues
  - queue.declare and exchange.declare raise precondition_failed
    rather than not_allowed when attempting to redeclare a queue
    or exchange with parameters different than those currently
    known to the broker
  - automatic, lossless upgrade to new versions of RabbitMQ
    (when not clustered)
  - support per-queue message TTL. See:
    http://www.rabbitmq.com/extensions.html#queue-ttl
  - the volume of pending acks is now bounded by disk space rather
    than by memory
  - store passwords as hashes
  - allow server properties to be configured in the RabbitMQ
    config file
  - SSL connections are listed as such by rabbitmqctl
  - simplify permission configuration by removing the client
    permission scope
  - improve performance of message routing
  - removed support for basic.recover with requeue=false
  - remove build-time dependency on OTP source to allow users to
    build without the OTP source present
  - eliminate all valid dialyzer errors

Show diffs side-by-side

added added

removed removed

Lines of Context:
87
87
-spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(),
88
88
               rabbit_types:maybe(rabbit_types:content())) -> 'ok').
89
89
-spec(shutdown/1 :: (pid()) -> 'ok').
90
 
-spec(send_command/2 :: (pid(), rabbit_framing:amqp_method()) -> 'ok').
 
90
-spec(send_command/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
91
91
-spec(deliver/4 ::
92
92
        (pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg())
93
93
        -> 'ok').
94
94
-spec(flushed/2 :: (pid(), pid()) -> 'ok').
95
95
-spec(list/0 :: () -> [pid()]).
96
 
-spec(info_keys/0 :: () -> [rabbit_types:info_key()]).
97
 
-spec(info/1 :: (pid()) -> [rabbit_types:info()]).
98
 
-spec(info/2 :: (pid(), [rabbit_types:info_key()]) -> [rabbit_types:info()]).
99
 
-spec(info_all/0 :: () -> [[rabbit_types:info()]]).
100
 
-spec(info_all/1 :: ([rabbit_types:info_key()]) -> [[rabbit_types:info()]]).
 
96
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
 
97
-spec(info/1 :: (pid()) -> rabbit_types:infos()).
 
98
-spec(info/2 :: (pid(), rabbit_types:info_keys()) -> rabbit_types:infos()).
 
99
-spec(info_all/0 :: () -> [rabbit_types:infos()]).
 
100
-spec(info_all/1 :: (rabbit_types:info_keys()) -> [rabbit_types:infos()]).
101
101
-spec(emit_stats/1 :: (pid()) -> 'ok').
102
102
 
103
103
-endif.
233
233
    end;
234
234
 
235
235
handle_cast({flushed, QPid}, State) ->
236
 
    {noreply, queue_blocked(QPid, State)};
 
236
    {noreply, queue_blocked(QPid, State), hibernate};
237
237
 
238
238
handle_cast(terminate, State) ->
239
239
    {stop, normal, State};
245
245
handle_cast({deliver, ConsumerTag, AckRequired, Msg},
246
246
            State = #ch{writer_pid = WriterPid,
247
247
                        next_tag = DeliveryTag}) ->
248
 
    State1 = lock_message(AckRequired, {DeliveryTag, ConsumerTag, Msg}, State),
 
248
    State1 = lock_message(AckRequired,
 
249
                          ack_record(DeliveryTag, ConsumerTag, Msg),
 
250
                          State),
249
251
    ok = internal_deliver(WriterPid, true, ConsumerTag, DeliveryTag, Msg),
250
252
    {_QName, QPid, _MsgId, _Redelivered, _Msg} = Msg,
251
253
    maybe_incr_stats([{QPid, 1}],
258
260
handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) ->
259
261
    internal_emit_stats(State),
260
262
    {noreply,
261
 
     State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}}.
 
263
     State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)},
 
264
     hibernate}.
262
265
 
263
266
handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) ->
264
267
    erase_queue_stats(QPid),
265
 
    {noreply, queue_blocked(QPid, State)}.
 
268
    {noreply, queue_blocked(QPid, State), hibernate}.
266
269
 
267
270
handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) ->
268
271
    ok = clear_permission_cache(),
269
 
    rabbit_event:if_enabled(StatsTimer, fun () ->
270
 
                                                internal_emit_stats(State)
271
 
                                        end),
 
272
    rabbit_event:if_enabled(StatsTimer,
 
273
                            fun () ->
 
274
                                    internal_emit_stats(
 
275
                                      State, [{idle_since, now()}])
 
276
                            end),
272
277
    {hibernate,
273
278
     State#ch{stats_timer = rabbit_event:stop_stats_timer(StatsTimer)}}.
274
279
 
503
508
                #basic_message{exchange_name = ExchangeName,
504
509
                               routing_key = RoutingKey,
505
510
                               content = Content}}} ->
506
 
            State1 = lock_message(not(NoAck), {DeliveryTag, none, Msg}, State),
 
511
            State1 = lock_message(not(NoAck),
 
512
                                  ack_record(DeliveryTag, none, Msg),
 
513
                                  State),
507
514
            maybe_incr_stats([{QPid, 1}],
508
515
                             case NoAck of
509
516
                                 true  -> get_no_ack;
639
646
    %% variant of this method
640
647
    {noreply, State#ch{unacked_message_q = queue:new()}};
641
648
 
642
 
handle_method(#'basic.recover_async'{requeue = false},
643
 
              _, State = #ch{writer_pid = WriterPid,
644
 
                             unacked_message_q = UAMQ}) ->
645
 
    ok = rabbit_misc:queue_fold(
646
 
           fun ({_DeliveryTag, none, _Msg}, ok) ->
647
 
                   %% Was sent as a basic.get_ok. Don't redeliver
648
 
                   %% it. FIXME: appropriate?
649
 
                   ok;
650
 
               ({DeliveryTag, ConsumerTag,
651
 
                 {QName, QPid, MsgId, _Redelivered, Message}}, ok) ->
652
 
                   %% Was sent as a proper consumer delivery.  Resend
653
 
                   %% it as before.
654
 
                   %%
655
 
                   %% FIXME: What should happen if the consumer's been
656
 
                   %% cancelled since?
657
 
                   %%
658
 
                   %% FIXME: should we allocate a fresh DeliveryTag?
659
 
                   internal_deliver(
660
 
                     WriterPid, false, ConsumerTag, DeliveryTag,
661
 
                     {QName, QPid, MsgId, true, Message})
662
 
           end, ok, UAMQ),
663
 
    %% No answer required - basic.recover is the newer, synchronous
664
 
    %% variant of this method
665
 
    {noreply, State};
 
649
handle_method(#'basic.recover_async'{requeue = false}, _, _State) ->
 
650
    rabbit_misc:protocol_error(not_implemented, "requeue=false", []);
666
651
 
667
652
handle_method(#'basic.recover'{requeue = Requeue}, Content, State) ->
668
653
    {noreply, State2 = #ch{writer_pid = WriterPid}} =
984
969
                           routing_key = RoutingKey},
985
970
           Content).
986
971
 
 
972
ack_record(DeliveryTag, ConsumerTag,
 
973
           _MsgStruct = {_QName, QPid, MsgId, _Redelivered, _Msg}) ->
 
974
    {DeliveryTag, ConsumerTag, {QPid, MsgId}}.
 
975
 
987
976
collect_acks(Q, 0, true) ->
988
977
    {Q, queue:new()};
989
978
collect_acks(Q, DeliveryTag, Multiple) ->
1056
1045
 
1057
1046
fold_per_queue(F, Acc0, UAQ) ->
1058
1047
    D = rabbit_misc:queue_fold(
1059
 
          fun ({_DTag, _CTag,
1060
 
                {_QName, QPid, MsgId, _Redelivered, _Message}}, D) ->
 
1048
          fun ({_DTag, _CTag, {QPid, MsgId}}, D) ->
1061
1049
                  %% dict:append would avoid the lists:reverse in
1062
1050
                  %% handle_message({recover, true}, ...). However, it
1063
1051
                  %% is significantly slower when going beyond a few
1200
1188
    put({Type, QX},
1201
1189
        orddict:store(Measure, Cur + Inc, Measures)).
1202
1190
 
1203
 
internal_emit_stats(State = #ch{stats_timer = StatsTimer}) ->
 
1191
internal_emit_stats(State) ->
 
1192
    internal_emit_stats(State, []).
 
1193
 
 
1194
internal_emit_stats(State = #ch{stats_timer = StatsTimer}, Extra) ->
1204
1195
    CoarseStats = infos(?STATISTICS_KEYS, State),
1205
1196
    case rabbit_event:stats_level(StatsTimer) of
1206
1197
        coarse ->
1207
 
            rabbit_event:notify(channel_stats, CoarseStats);
 
1198
            rabbit_event:notify(channel_stats, Extra ++ CoarseStats);
1208
1199
        fine ->
1209
1200
            FineStats =
1210
1201
                [{channel_queue_stats,
1214
1205
                 {channel_queue_exchange_stats,
1215
1206
                  [{QX, Stats} ||
1216
1207
                      {{queue_exchange_stats, QX}, Stats} <- get()]}],
1217
 
            rabbit_event:notify(channel_stats, CoarseStats ++ FineStats)
 
1208
            rabbit_event:notify(channel_stats,
 
1209
                                Extra ++ CoarseStats ++ FineStats)
1218
1210
    end.
1219
1211
 
1220
1212
erase_queue_stats(QPid) ->