~ubuntu-branches/ubuntu/saucy/rabbitmq-server/saucy

« back to all changes in this revision

Viewing changes to src/rabbit_reader.erl

  • Committer: Package Import Robot
  • Author(s): Emile Joubert
  • Date: 2012-11-19 11:42:31 UTC
  • mfrom: (0.2.18) (0.1.32 sid)
  • Revision ID: package-import@ubuntu.com-20121119114231-hvapkn4akng09etr
Tags: 3.0.0-1
New upstream release

Show diffs side-by-side

added added

removed removed

Lines of Context:
35
35
 
36
36
%%--------------------------------------------------------------------------
37
37
 
38
 
-record(v1, {parent, sock, connection, callback, recv_len, pending_recv,
 
38
-record(v1, {parent, sock, name, connection, callback, recv_len, pending_recv,
39
39
             connection_state, queue_collector, heartbeater, stats_timer,
40
40
             channel_sup_sup_pid, start_heartbeat_fun, buf, buf_len,
41
41
             auth_mechanism, auth_state, conserve_resources,
42
 
             last_blocked_by, last_blocked_at}).
 
42
             last_blocked_by, last_blocked_at, host, peer_host,
 
43
             port, peer_port}).
43
44
 
44
45
-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
45
46
                          send_pend, state, last_blocked_by, last_blocked_age,
46
47
                          channels]).
47
48
 
48
 
-define(CREATION_EVENT_KEYS, [pid, name, address, port, peer_address, peer_port,
49
 
                              ssl, peer_cert_subject, peer_cert_issuer,
50
 
                              peer_cert_validity, auth_mechanism,
51
 
                              ssl_protocol, ssl_key_exchange,
52
 
                              ssl_cipher, ssl_hash,
53
 
                              protocol, user, vhost, timeout, frame_max,
54
 
                              client_properties]).
 
49
-define(CREATION_EVENT_KEYS,
 
50
        [pid, name, port, peer_port, host,
 
51
        peer_host, ssl, peer_cert_subject, peer_cert_issuer,
 
52
        peer_cert_validity, auth_mechanism, ssl_protocol,
 
53
        ssl_key_exchange, ssl_cipher, ssl_hash, protocol, user, vhost,
 
54
        timeout, frame_max, client_properties]).
55
55
 
56
56
-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
57
57
 
185
185
        {error, Reason} -> log(error, "error on AMQP connection ~p: ~p~n",
186
186
                               [self(), Reason]),
187
187
                           %% NB: this is tcp socket, even in case of ssl
188
 
                           rabbit_net:maybe_fast_close(Sock),
 
188
                           rabbit_net:fast_close(Sock),
189
189
                           exit(normal)
190
190
    end.
191
191
 
192
192
name(Sock) ->
193
193
    socket_op(Sock, fun (S) -> rabbit_net:connection_string(S, inbound) end).
194
194
 
 
195
socket_ends(Sock) ->
 
196
    socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end).
 
197
 
195
198
start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
196
199
                 Sock, SockTransform) ->
197
200
    process_flag(trap_exit, true),
198
 
    ConnStr = name(Sock),
199
 
    log(info, "accepting AMQP connection ~p (~s)~n", [self(), ConnStr]),
 
201
    Name = name(Sock),
 
202
    log(info, "accepting AMQP connection ~p (~s)~n", [self(), Name]),
200
203
    ClientSock = socket_op(Sock, SockTransform),
201
 
    erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(),
202
 
                      handshake_timeout),
 
204
    erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), handshake_timeout),
 
205
    {PeerHost, PeerPort, Host, Port} = socket_ends(Sock),
203
206
    State = #v1{parent              = Parent,
204
207
                sock                = ClientSock,
 
208
                name                = list_to_binary(Name),
205
209
                connection          = #connection{
206
210
                  protocol           = none,
207
211
                  user               = none,
224
228
                auth_state          = none,
225
229
                conserve_resources  = false,
226
230
                last_blocked_by     = none,
227
 
                last_blocked_at     = never},
 
231
                last_blocked_at     = never,
 
232
                host                = Host,
 
233
                peer_host           = PeerHost,
 
234
                port                = Port,
 
235
                peer_port           = PeerPort},
228
236
    try
229
 
        BufSizes = inet_op(fun () ->
230
 
                                   rabbit_net:getopts(
231
 
                                     ClientSock, [sndbuf, recbuf, buffer])
232
 
                           end),
233
 
        BufSz = lists:max([Sz || {_Opt, Sz} <- BufSizes]),
234
 
        ok = inet_op(fun () ->
235
 
                             rabbit_net:setopts(ClientSock, [{buffer, BufSz}])
236
 
                     end),
 
237
        ok = inet_op(fun () -> rabbit_net:tune_buffer_size(ClientSock) end),
237
238
        recvloop(Deb, switch_callback(rabbit_event:init_stats_timer(
238
239
                                       State, #v1.stats_timer),
239
240
                                      handshake, 8)),
240
 
        log(info, "closing AMQP connection ~p (~s)~n", [self(), ConnStr])
 
241
        log(info, "closing AMQP connection ~p (~s)~n", [self(), Name])
241
242
    catch
242
243
        Ex -> log(case Ex of
243
244
                      connection_closed_abruptly -> warning;
244
245
                      _                          -> error
245
246
                  end, "closing AMQP connection ~p (~s):~n~p~n",
246
 
                  [self(), ConnStr, Ex])
 
247
                  [self(), Name, Ex])
247
248
    after
248
249
        %% We don't call gen_tcp:close/1 here since it waits for
249
250
        %% pending output to be sent, which results in unnecessary
252
253
        %% the socket. However, to keep the file_handle_cache
253
254
        %% accounting as accurate as possible we ought to close the
254
255
        %% socket w/o delay before termination.
255
 
        rabbit_net:maybe_fast_close(ClientSock),
 
256
        rabbit_net:fast_close(ClientSock),
256
257
        rabbit_event:notify(connection_closed, [{pid, self()}])
257
258
    end,
258
259
    done.
321
322
    mainloop(Deb, State);
322
323
handle_other(handshake_timeout, _Deb, State) ->
323
324
    throw({handshake_timeout, State#v1.callback});
324
 
handle_other(timeout, Deb, State = #v1{connection_state = closed}) ->
 
325
handle_other(heartbeat_timeout, Deb, State = #v1{connection_state = closed}) ->
325
326
    mainloop(Deb, State);
326
 
handle_other(timeout, _Deb, #v1{connection_state = S}) ->
327
 
    throw({timeout, S});
 
327
handle_other(heartbeat_timeout, _Deb, #v1{connection_state = S}) ->
 
328
    throw({heartbeat_timeout, S});
328
329
handle_other({'$gen_call', From, {shutdown, Explanation}}, Deb, State) ->
329
330
    {ForceTermination, NewState} = terminate(Explanation, State),
330
331
    gen_server:reply(From, ok),
348
349
handle_other({'$gen_cast', force_event_refresh}, Deb, State) ->
349
350
    %% Ignore, we will emit a created event once we start running.
350
351
    mainloop(Deb, State);
 
352
handle_other(ensure_stats, Deb, State) ->
 
353
    mainloop(Deb, ensure_stats_timer(State));
351
354
handle_other(emit_stats, Deb, State) ->
352
355
    mainloop(Deb, emit_stats(State));
353
356
handle_other({system, From, Request}, Deb, State = #v1{parent = Parent}) ->
363
366
    State#v1{callback = Callback, recv_len = Length}.
364
367
 
365
368
terminate(Explanation, State) when ?IS_RUNNING(State) ->
366
 
    {normal, send_exception(State, 0,
367
 
                            rabbit_misc:amqp_error(
368
 
                              connection_forced, Explanation, [], none))};
 
369
    {normal, handle_exception(State, 0,
 
370
                              rabbit_misc:amqp_error(
 
371
                                connection_forced, Explanation, [], none))};
369
372
terminate(_Explanation, State) ->
370
373
    {force, State}.
371
374
 
393
396
update_last_blocked_by(State = #v1{conserve_resources = false}) ->
394
397
    State#v1{last_blocked_by = flow}.
395
398
 
 
399
%%--------------------------------------------------------------------------
 
400
%% error handling / termination
 
401
 
396
402
close_connection(State = #v1{queue_collector = Collector,
397
403
                             connection = #connection{
398
404
                               timeout_sec = TimeoutSec}}) ->
416
422
        {_Channel, controlled} ->
417
423
            maybe_close(control_throttle(State));
418
424
        {Channel, uncontrolled} ->
419
 
            log(error, "AMQP connection ~p, channel ~p - error:~n~p~n",
420
 
                [self(), Channel, Reason]),
421
425
            maybe_close(handle_exception(control_throttle(State),
422
426
                                         Channel, Reason))
423
427
    end.
424
428
 
425
 
channel_cleanup(ChPid) ->
426
 
    case get({ch_pid, ChPid}) of
427
 
        undefined       -> undefined;
428
 
        {Channel, MRef} -> credit_flow:peer_down(ChPid),
429
 
                           erase({channel, Channel}),
430
 
                           erase({ch_pid, ChPid}),
431
 
                           erlang:demonitor(MRef, [flush]),
432
 
                           Channel
433
 
    end.
434
 
 
435
 
all_channels() -> [ChPid || {{ch_pid, ChPid}, _ChannelMRef} <- get()].
436
 
 
437
429
terminate_channels() ->
438
430
    NChannels =
439
431
        length([rabbit_channel:shutdown(ChPid) || ChPid <- all_channels()]),
487
479
termination_kind(normal) -> controlled;
488
480
termination_kind(_)      -> uncontrolled.
489
481
 
 
482
handle_exception(State = #v1{connection_state = closed}, Channel, Reason) ->
 
483
    log(error, "AMQP connection ~p (~p), channel ~p - error:~n~p~n",
 
484
        [self(), closed, Channel, Reason]),
 
485
    State;
 
486
handle_exception(State = #v1{connection = #connection{protocol = Protocol},
 
487
                             connection_state = CS},
 
488
                 Channel, Reason)
 
489
  when ?IS_RUNNING(State) orelse CS =:= closing ->
 
490
    log(error, "AMQP connection ~p (~p), channel ~p - error:~n~p~n",
 
491
        [self(), CS, Channel, Reason]),
 
492
    {0, CloseMethod} =
 
493
        rabbit_binary_generator:map_exception(Channel, Reason, Protocol),
 
494
    terminate_channels(),
 
495
    State1 = close_connection(State),
 
496
    ok = send_on_channel0(State1#v1.sock, CloseMethod, Protocol),
 
497
    State1;
 
498
handle_exception(State, Channel, Reason) ->
 
499
    %% We don't trust the client at this point - force them to wait
 
500
    %% for a bit so they can't DOS us with repeated failed logins etc.
 
501
    timer:sleep(?SILENT_CLOSE_DELAY * 1000),
 
502
    throw({handshake_error, State#v1.connection_state, Channel, Reason}).
 
503
 
 
504
%% we've "lost sync" with the client and hence must not accept any
 
505
%% more input
 
506
fatal_frame_error(Error, Type, Channel, Payload, State) ->
 
507
    frame_error(Error, Type, Channel, Payload, State),
 
508
    %% grace period to allow transmission of error
 
509
    timer:sleep(?SILENT_CLOSE_DELAY * 1000),
 
510
    throw(fatal_frame_error).
 
511
 
 
512
frame_error(Error, Type, Channel, Payload, State) ->
 
513
    {Str, Bin} = payload_snippet(Payload),
 
514
    handle_exception(State, Channel,
 
515
                     rabbit_misc:amqp_error(frame_error,
 
516
                                            "type ~p, ~s octets = ~p: ~p",
 
517
                                            [Type, Str, Bin, Error], none)).
 
518
 
 
519
unexpected_frame(Type, Channel, Payload, State) ->
 
520
    {Str, Bin} = payload_snippet(Payload),
 
521
    handle_exception(State, Channel,
 
522
                     rabbit_misc:amqp_error(unexpected_frame,
 
523
                                            "type ~p, ~s octets = ~p",
 
524
                                            [Type, Str, Bin], none)).
 
525
 
 
526
payload_snippet(Payload) when size(Payload) =< 16 ->
 
527
    {"all", Payload};
 
528
payload_snippet(<<Snippet:16/binary, _/binary>>) ->
 
529
    {"first 16", Snippet}.
 
530
 
 
531
%%--------------------------------------------------------------------------
 
532
 
 
533
create_channel(Channel, State) ->
 
534
    #v1{sock = Sock, name = Name, queue_collector = Collector,
 
535
        channel_sup_sup_pid = ChanSupSup,
 
536
        connection = #connection{protocol     = Protocol,
 
537
                                 frame_max    = FrameMax,
 
538
                                 user         = User,
 
539
                                 vhost        = VHost,
 
540
                                 capabilities = Capabilities}} = State,
 
541
    {ok, _ChSupPid, {ChPid, AState}} =
 
542
        rabbit_channel_sup_sup:start_channel(
 
543
          ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Name,
 
544
                       Protocol, User, VHost, Capabilities, Collector}),
 
545
    MRef = erlang:monitor(process, ChPid),
 
546
    put({ch_pid, ChPid}, {Channel, MRef}),
 
547
    put({channel, Channel}, {ChPid, AState}),
 
548
    {ChPid, AState}.
 
549
 
 
550
channel_cleanup(ChPid) ->
 
551
    case get({ch_pid, ChPid}) of
 
552
        undefined       -> undefined;
 
553
        {Channel, MRef} -> credit_flow:peer_down(ChPid),
 
554
                           erase({channel, Channel}),
 
555
                           erase({ch_pid, ChPid}),
 
556
                           erlang:demonitor(MRef, [flush]),
 
557
                           Channel
 
558
    end.
 
559
 
 
560
all_channels() -> [ChPid || {{ch_pid, ChPid}, _ChannelMRef} <- get()].
 
561
 
 
562
%%--------------------------------------------------------------------------
 
563
 
490
564
handle_frame(Type, 0, Payload,
491
565
             State = #v1{connection_state = CS,
492
566
                         connection = #connection{protocol = Protocol}})
502
576
handle_frame(Type, 0, Payload,
503
577
             State = #v1{connection = #connection{protocol = Protocol}}) ->
504
578
    case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of
505
 
        error     -> throw({unknown_frame, 0, Type, Payload});
 
579
        error     -> frame_error(unknown_frame, Type, 0, Payload, State);
506
580
        heartbeat -> State;
507
581
        {method, MethodName, FieldsBin} ->
508
582
            handle_method0(MethodName, FieldsBin, State);
509
 
        Other -> throw({unexpected_frame_on_channel0, Other})
 
583
        _Other    -> unexpected_frame(Type, 0, Payload, State)
510
584
    end;
511
585
handle_frame(Type, Channel, Payload,
512
 
             State = #v1{connection = #connection{protocol = Protocol}}) ->
 
586
             State = #v1{connection = #connection{protocol = Protocol}})
 
587
  when ?IS_RUNNING(State) ->
513
588
    case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of
514
 
        error         -> throw({unknown_frame, Channel, Type, Payload});
515
 
        heartbeat     -> throw({unexpected_heartbeat_frame, Channel});
516
 
        AnalyzedFrame -> process_frame(AnalyzedFrame, Channel, State)
517
 
    end.
 
589
        error     -> frame_error(unknown_frame, Type, Channel, Payload, State);
 
590
        heartbeat -> unexpected_frame(Type, Channel, Payload, State);
 
591
        Frame     -> process_frame(Frame, Channel, State)
 
592
    end;
 
593
handle_frame(Type, Channel, Payload, State) ->
 
594
    unexpected_frame(Type, Channel, Payload, State).
518
595
 
519
596
process_frame(Frame, Channel, State) ->
520
 
    case get({channel, Channel}) of
521
 
        {ChPid, AState} ->
522
 
            case process_channel_frame(Frame,  ChPid, AState) of
523
 
                {ok, NewAState} -> put({channel, Channel}, {ChPid, NewAState}),
524
 
                                   post_process_frame(Frame, ChPid, State);
525
 
                {error, Reason} -> handle_exception(State, Channel, Reason)
526
 
            end;
527
 
        undefined when ?IS_RUNNING(State) ->
528
 
            ok = create_channel(Channel, State),
529
 
            process_frame(Frame, Channel, State);
530
 
        undefined ->
531
 
            throw({channel_frame_while_starting,
532
 
                   Channel, State#v1.connection_state, Frame})
 
597
    {ChPid, AState} = case get({channel, Channel}) of
 
598
                          undefined -> create_channel(Channel, State);
 
599
                          Other     -> Other
 
600
                      end,
 
601
    case process_channel_frame(Frame,  ChPid, AState) of
 
602
        {ok, NewAState} -> put({channel, Channel}, {ChPid, NewAState}),
 
603
                           post_process_frame(Frame, ChPid, State);
 
604
        {error, Reason} -> handle_exception(State, Channel, Reason)
 
605
    end.
 
606
 
 
607
process_channel_frame(Frame, ChPid, AState) ->
 
608
    case rabbit_command_assembler:process(Frame, AState) of
 
609
        {ok, NewAState}                  -> {ok, NewAState};
 
610
        {ok, Method, NewAState}          -> rabbit_channel:do(ChPid, Method),
 
611
                                            {ok, NewAState};
 
612
        {ok, Method, Content, NewAState} -> rabbit_channel:do_flow(
 
613
                                              ChPid, Method, Content),
 
614
                                            {ok, NewAState};
 
615
        {error, Reason}                  -> {error, Reason}
533
616
    end.
534
617
 
535
618
post_process_frame({method, 'channel.close_ok', _}, ChPid, State) ->
546
629
post_process_frame(_Frame, _ChPid, State) ->
547
630
    control_throttle(State).
548
631
 
 
632
%%--------------------------------------------------------------------------
 
633
 
 
634
%% We allow clients to exceed the frame size a little bit since quite
 
635
%% a few get it wrong - off-by 1 or 8 (empty frame size) are typical.
 
636
-define(FRAME_SIZE_FUDGE, ?EMPTY_FRAME_SIZE).
 
637
 
 
638
handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>,
 
639
             State = #v1{connection = #connection{frame_max = FrameMax}})
 
640
  when FrameMax /= 0 andalso
 
641
       PayloadSize > FrameMax - ?EMPTY_FRAME_SIZE + ?FRAME_SIZE_FUDGE ->
 
642
    fatal_frame_error(
 
643
      {frame_too_large, PayloadSize, FrameMax - ?EMPTY_FRAME_SIZE},
 
644
      Type, Channel, <<>>, State);
549
645
handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) ->
550
646
    ensure_stats_timer(
551
647
      switch_callback(State, {frame_payload, Type, Channel, PayloadSize},
552
648
                      PayloadSize + 1));
553
649
 
554
 
handle_input({frame_payload, Type, Channel, PayloadSize},
555
 
             PayloadAndMarker, State) ->
556
 
    case PayloadAndMarker of
557
 
        <<Payload:PayloadSize/binary, ?FRAME_END>> ->
558
 
            switch_callback(handle_frame(Type, Channel, Payload, State),
559
 
                            frame_header, 7);
560
 
        _ ->
561
 
            throw({bad_payload, Type, Channel, PayloadSize, PayloadAndMarker})
 
650
handle_input({frame_payload, Type, Channel, PayloadSize}, Data, State) ->
 
651
    <<Payload:PayloadSize/binary, EndMarker>> = Data,
 
652
    case EndMarker of
 
653
        ?FRAME_END -> State1 = handle_frame(Type, Channel, Payload, State),
 
654
                      switch_callback(State1, frame_header, 7);
 
655
        _          -> fatal_frame_error({invalid_frame_end_marker, EndMarker},
 
656
                                        Type, Channel, Payload, State)
562
657
    end;
563
658
 
564
659
%% The two rules pertaining to version negotiation:
629
724
 
630
725
handle_method0(MethodName, FieldsBin,
631
726
               State = #v1{connection = #connection{protocol = Protocol}}) ->
632
 
    HandleException =
633
 
        fun(R) ->
634
 
                case ?IS_RUNNING(State) of
635
 
                    true  -> send_exception(State, 0, R);
636
 
                    %% We don't trust the client at this point - force
637
 
                    %% them to wait for a bit so they can't DOS us with
638
 
                    %% repeated failed logins etc.
639
 
                    false -> timer:sleep(?SILENT_CLOSE_DELAY * 1000),
640
 
                             throw({channel0_error, State#v1.connection_state, R})
641
 
                end
642
 
        end,
643
727
    try
644
728
        handle_method0(Protocol:decode_method_fields(MethodName, FieldsBin),
645
729
                       State)
646
730
    catch exit:#amqp_error{method = none} = Reason ->
647
 
            HandleException(Reason#amqp_error{method = MethodName});
 
731
            handle_exception(State, 0, Reason#amqp_error{method = MethodName});
648
732
          Type:Reason ->
649
 
            HandleException({Type, Reason, MethodName, erlang:get_stacktrace()})
 
733
            Stack = erlang:get_stacktrace(),
 
734
            handle_exception(State, 0, {Type, Reason, MethodName, Stack})
650
735
    end.
651
736
 
652
737
handle_method0(#'connection.start_ok'{mechanism = Mechanism,
693
778
            Frame = rabbit_binary_generator:build_heartbeat_frame(),
694
779
            SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end,
695
780
            Parent = self(),
696
 
            ReceiveFun = fun() -> Parent ! timeout end,
 
781
            ReceiveFun = fun() -> Parent ! heartbeat_timeout end,
697
782
            Heartbeater = SHF(Sock, ClientHeartbeat, SendFun,
698
783
                              ClientHeartbeat, ReceiveFun),
699
784
            State#v1{connection_state = opening,
746
831
    rabbit_misc:protocol_error(
747
832
      channel_error, "unexpected method in connection state ~w", [S]).
748
833
 
749
 
%% Compute frame_max for this instance. Could simply use 0, but breaks
750
 
%% QPid Java client.
751
834
server_frame_max() ->
752
835
    {ok, FrameMax} = application:get_env(rabbit, frame_max),
753
836
    FrameMax.
754
837
 
 
838
server_heartbeat() ->
 
839
    {ok, Heartbeat} = application:get_env(rabbit, heartbeat),
 
840
    Heartbeat.
 
841
 
755
842
send_on_channel0(Sock, Method, Protocol) ->
756
843
    ok = rabbit_writer:internal_send_command(Sock, 0, Method, Protocol).
757
844
 
803
890
        {ok, User} ->
804
891
            Tune = #'connection.tune'{channel_max = 0,
805
892
                                      frame_max = server_frame_max(),
806
 
                                      heartbeat = 0},
 
893
                                      heartbeat = server_heartbeat()},
807
894
            ok = send_on_channel0(Sock, Tune, Protocol),
808
895
            State#v1{connection_state = tuning,
809
896
                     connection = Connection#connection{user = User}}
813
900
 
814
901
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
815
902
 
816
 
i(pid, #v1{}) ->
817
 
    self();
818
 
i(name, #v1{sock = Sock}) ->
819
 
    list_to_binary(name(Sock));
820
 
i(address, #v1{sock = Sock}) ->
821
 
    socket_info(fun rabbit_net:sockname/1, fun ({A, _}) -> A end, Sock);
822
 
i(port, #v1{sock = Sock}) ->
823
 
    socket_info(fun rabbit_net:sockname/1, fun ({_, P}) -> P end, Sock);
824
 
i(peer_address, #v1{sock = Sock}) ->
825
 
    socket_info(fun rabbit_net:peername/1, fun ({A, _}) -> A end, Sock);
826
 
i(peer_port, #v1{sock = Sock}) ->
827
 
    socket_info(fun rabbit_net:peername/1, fun ({_, P}) -> P end, Sock);
828
 
i(ssl, #v1{sock = Sock}) ->
829
 
    rabbit_net:is_ssl(Sock);
830
 
i(ssl_protocol, #v1{sock = Sock}) ->
831
 
    ssl_info(fun ({P, _}) -> P end, Sock);
832
 
i(ssl_key_exchange, #v1{sock = Sock}) ->
833
 
    ssl_info(fun ({_, {K, _, _}}) -> K end, Sock);
834
 
i(ssl_cipher, #v1{sock = Sock}) ->
835
 
    ssl_info(fun ({_, {_, C, _}}) -> C end, Sock);
836
 
i(ssl_hash, #v1{sock = Sock}) ->
837
 
    ssl_info(fun ({_, {_, _, H}}) -> H end, Sock);
838
 
i(peer_cert_issuer, #v1{sock = Sock}) ->
839
 
    cert_info(fun rabbit_ssl:peer_cert_issuer/1, Sock);
840
 
i(peer_cert_subject, #v1{sock = Sock}) ->
841
 
    cert_info(fun rabbit_ssl:peer_cert_subject/1, Sock);
842
 
i(peer_cert_validity, #v1{sock = Sock}) ->
843
 
    cert_info(fun rabbit_ssl:peer_cert_validity/1, Sock);
844
 
i(SockStat, #v1{sock = Sock}) when SockStat =:= recv_oct;
845
 
                                   SockStat =:= recv_cnt;
846
 
                                   SockStat =:= send_oct;
847
 
                                   SockStat =:= send_cnt;
848
 
                                   SockStat =:= send_pend ->
849
 
    socket_info(fun () -> rabbit_net:getstat(Sock, [SockStat]) end,
850
 
                fun ([{_, I}]) -> I end);
851
 
i(state, #v1{connection_state = S}) ->
852
 
    S;
853
 
i(last_blocked_by, #v1{last_blocked_by = By}) ->
854
 
    By;
855
 
i(last_blocked_age, #v1{last_blocked_at = never}) ->
 
903
i(pid,                #v1{}) -> self();
 
904
i(name,               #v1{name      = Name})     -> Name;
 
905
i(host,               #v1{host      = Host})     -> Host;
 
906
i(peer_host,          #v1{peer_host = PeerHost}) -> PeerHost;
 
907
i(port,               #v1{port      = Port})     -> Port;
 
908
i(peer_port,          #v1{peer_port = PeerPort}) -> PeerPort;
 
909
i(SockStat,           S) when SockStat =:= recv_oct;
 
910
                              SockStat =:= recv_cnt;
 
911
                              SockStat =:= send_oct;
 
912
                              SockStat =:= send_cnt;
 
913
                              SockStat =:= send_pend ->
 
914
    socket_info(fun (Sock) -> rabbit_net:getstat(Sock, [SockStat]) end,
 
915
                fun ([{_, I}]) -> I end, S);
 
916
i(ssl,                #v1{sock = Sock}) -> rabbit_net:is_ssl(Sock);
 
917
i(ssl_protocol,       S) -> ssl_info(fun ({P,         _}) -> P end, S);
 
918
i(ssl_key_exchange,   S) -> ssl_info(fun ({_, {K, _, _}}) -> K end, S);
 
919
i(ssl_cipher,         S) -> ssl_info(fun ({_, {_, C, _}}) -> C end, S);
 
920
i(ssl_hash,           S) -> ssl_info(fun ({_, {_, _, H}}) -> H end, S);
 
921
i(peer_cert_issuer,   S) -> cert_info(fun rabbit_ssl:peer_cert_issuer/1,   S);
 
922
i(peer_cert_subject,  S) -> cert_info(fun rabbit_ssl:peer_cert_subject/1,  S);
 
923
i(peer_cert_validity, S) -> cert_info(fun rabbit_ssl:peer_cert_validity/1, S);
 
924
i(state,              #v1{connection_state = CS}) -> CS;
 
925
i(last_blocked_by,    #v1{last_blocked_by = By}) -> By;
 
926
i(last_blocked_age,   #v1{last_blocked_at = never}) ->
856
927
    infinity;
857
 
i(last_blocked_age, #v1{last_blocked_at = T}) ->
 
928
i(last_blocked_age,   #v1{last_blocked_at = T}) ->
858
929
    timer:now_diff(erlang:now(), T) / 1000000;
859
 
i(channels, #v1{}) ->
860
 
    length(all_channels());
861
 
i(protocol, #v1{connection = #connection{protocol = none}}) ->
862
 
    none;
863
 
i(protocol, #v1{connection = #connection{protocol = Protocol}}) ->
 
930
i(channels,           #v1{}) -> length(all_channels());
 
931
i(auth_mechanism,     #v1{auth_mechanism = none}) ->
 
932
    none;
 
933
i(auth_mechanism,     #v1{auth_mechanism = Mechanism}) ->
 
934
    proplists:get_value(name, Mechanism:description());
 
935
i(protocol,           #v1{connection = #connection{protocol = none}}) ->
 
936
    none;
 
937
i(protocol,           #v1{connection = #connection{protocol = Protocol}}) ->
864
938
    Protocol:version();
865
 
i(auth_mechanism, #v1{auth_mechanism = none}) ->
866
 
    none;
867
 
i(auth_mechanism, #v1{auth_mechanism = Mechanism}) ->
868
 
    proplists:get_value(name, Mechanism:description());
869
 
i(user, #v1{connection = #connection{user = #user{username = Username}}}) ->
 
939
i(user,               #v1{connection = #connection{user = none}}) ->
 
940
    '';
 
941
i(user,               #v1{connection = #connection{user = #user{
 
942
                                                     username = Username}}}) ->
870
943
    Username;
871
 
i(user, #v1{connection = #connection{user = none}}) ->
872
 
    '';
873
 
i(vhost, #v1{connection = #connection{vhost = VHost}}) ->
 
944
i(vhost,              #v1{connection = #connection{vhost = VHost}}) ->
874
945
    VHost;
875
 
i(timeout, #v1{connection = #connection{timeout_sec = Timeout}}) ->
 
946
i(timeout,            #v1{connection = #connection{timeout_sec = Timeout}}) ->
876
947
    Timeout;
877
 
i(frame_max, #v1{connection = #connection{frame_max = FrameMax}}) ->
 
948
i(frame_max,          #v1{connection = #connection{frame_max = FrameMax}}) ->
878
949
    FrameMax;
879
 
i(client_properties, #v1{connection = #connection{
880
 
                           client_properties = ClientProperties}}) ->
 
950
i(client_properties,  #v1{connection = #connection{client_properties =
 
951
                                                       ClientProperties}}) ->
881
952
    ClientProperties;
882
953
i(Item, #v1{}) ->
883
954
    throw({bad_argument, Item}).
884
955
 
885
 
socket_info(Get, Select, Sock) ->
886
 
    socket_info(fun() -> Get(Sock) end, Select).
887
 
 
888
 
socket_info(Get, Select) ->
889
 
    case Get() of
 
956
socket_info(Get, Select, #v1{sock = Sock}) ->
 
957
    case Get(Sock) of
890
958
        {ok,    T} -> Select(T);
891
959
        {error, _} -> ''
892
960
    end.
893
961
 
894
 
ssl_info(F, Sock) ->
 
962
ssl_info(F, #v1{sock = Sock}) ->
895
963
    %% The first ok form is R14
896
964
    %% The second is R13 - the extra term is exportability (by inspection,
897
965
    %% the docs are wrong)
902
970
        {ok, {P, {K, C, H, _}}} -> F({P, {K, C, H}})
903
971
    end.
904
972
 
905
 
cert_info(F, Sock) ->
 
973
cert_info(F, #v1{sock = Sock}) ->
906
974
    case rabbit_net:peercert(Sock) of
907
975
        nossl                -> '';
908
976
        {error, no_peercert} -> '';
909
977
        {ok, Cert}           -> list_to_binary(F(Cert))
910
978
    end.
911
979
 
912
 
%%--------------------------------------------------------------------------
913
 
 
914
 
create_channel(Channel, State) ->
915
 
    #v1{sock = Sock, queue_collector = Collector,
916
 
        channel_sup_sup_pid = ChanSupSup,
917
 
        connection = #connection{protocol     = Protocol,
918
 
                                 frame_max    = FrameMax,
919
 
                                 user         = User,
920
 
                                 vhost        = VHost,
921
 
                                 capabilities = Capabilities}} = State,
922
 
    {ok, _ChSupPid, {ChPid, AState}} =
923
 
        rabbit_channel_sup_sup:start_channel(
924
 
          ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), name(Sock),
925
 
                       Protocol, User, VHost, Capabilities, Collector}),
926
 
    MRef = erlang:monitor(process, ChPid),
927
 
    put({ch_pid, ChPid}, {Channel, MRef}),
928
 
    put({channel, Channel}, {ChPid, AState}),
929
 
    ok.
930
 
 
931
 
process_channel_frame(Frame, ChPid, AState) ->
932
 
    case rabbit_command_assembler:process(Frame, AState) of
933
 
        {ok, NewAState}                  -> {ok, NewAState};
934
 
        {ok, Method, NewAState}          -> rabbit_channel:do(ChPid, Method),
935
 
                                            {ok, NewAState};
936
 
        {ok, Method, Content, NewAState} -> rabbit_channel:do_flow(
937
 
                                              ChPid, Method, Content),
938
 
                                            {ok, NewAState};
939
 
        {error, Reason}                  -> {error, Reason}
940
 
    end.
941
 
 
942
 
handle_exception(State = #v1{connection_state = closed}, _Channel, _Reason) ->
943
 
    State;
944
 
handle_exception(State, Channel, Reason) ->
945
 
    send_exception(State, Channel, Reason).
946
 
 
947
 
send_exception(State = #v1{connection = #connection{protocol = Protocol}},
948
 
               Channel, Reason) ->
949
 
    {0, CloseMethod} =
950
 
        rabbit_binary_generator:map_exception(Channel, Reason, Protocol),
951
 
    terminate_channels(),
952
 
    State1 = close_connection(State),
953
 
    ok = rabbit_writer:internal_send_command(
954
 
           State1#v1.sock, 0, CloseMethod, Protocol),
955
 
    State1.
956
 
 
957
980
emit_stats(State) ->
958
981
    rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)),
959
982
    rabbit_event:reset_stats_timer(State, #v1.stats_timer).