36
36
%%--------------------------------------------------------------------------
38
-record(v1, {parent, sock, connection, callback, recv_len, pending_recv,
38
-record(v1, {parent, sock, name, connection, callback, recv_len, pending_recv,
39
39
connection_state, queue_collector, heartbeater, stats_timer,
40
40
channel_sup_sup_pid, start_heartbeat_fun, buf, buf_len,
41
41
auth_mechanism, auth_state, conserve_resources,
42
last_blocked_by, last_blocked_at}).
42
last_blocked_by, last_blocked_at, host, peer_host,
44
45
-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
45
46
send_pend, state, last_blocked_by, last_blocked_age,
48
-define(CREATION_EVENT_KEYS, [pid, name, address, port, peer_address, peer_port,
49
ssl, peer_cert_subject, peer_cert_issuer,
50
peer_cert_validity, auth_mechanism,
51
ssl_protocol, ssl_key_exchange,
53
protocol, user, vhost, timeout, frame_max,
49
-define(CREATION_EVENT_KEYS,
50
[pid, name, port, peer_port, host,
51
peer_host, ssl, peer_cert_subject, peer_cert_issuer,
52
peer_cert_validity, auth_mechanism, ssl_protocol,
53
ssl_key_exchange, ssl_cipher, ssl_hash, protocol, user, vhost,
54
timeout, frame_max, client_properties]).
56
56
-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
185
185
{error, Reason} -> log(error, "error on AMQP connection ~p: ~p~n",
186
186
[self(), Reason]),
187
187
%% NB: this is tcp socket, even in case of ssl
188
rabbit_net:maybe_fast_close(Sock),
188
rabbit_net:fast_close(Sock),
193
193
socket_op(Sock, fun (S) -> rabbit_net:connection_string(S, inbound) end).
196
socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end).
195
198
start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
196
199
Sock, SockTransform) ->
197
200
process_flag(trap_exit, true),
198
ConnStr = name(Sock),
199
log(info, "accepting AMQP connection ~p (~s)~n", [self(), ConnStr]),
202
log(info, "accepting AMQP connection ~p (~s)~n", [self(), Name]),
200
203
ClientSock = socket_op(Sock, SockTransform),
201
erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(),
204
erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), handshake_timeout),
205
{PeerHost, PeerPort, Host, Port} = socket_ends(Sock),
203
206
State = #v1{parent = Parent,
204
207
sock = ClientSock,
208
name = list_to_binary(Name),
205
209
connection = #connection{
487
479
termination_kind(normal) -> controlled;
488
480
termination_kind(_) -> uncontrolled.
482
handle_exception(State = #v1{connection_state = closed}, Channel, Reason) ->
483
log(error, "AMQP connection ~p (~p), channel ~p - error:~n~p~n",
484
[self(), closed, Channel, Reason]),
486
handle_exception(State = #v1{connection = #connection{protocol = Protocol},
487
connection_state = CS},
489
when ?IS_RUNNING(State) orelse CS =:= closing ->
490
log(error, "AMQP connection ~p (~p), channel ~p - error:~n~p~n",
491
[self(), CS, Channel, Reason]),
493
rabbit_binary_generator:map_exception(Channel, Reason, Protocol),
494
terminate_channels(),
495
State1 = close_connection(State),
496
ok = send_on_channel0(State1#v1.sock, CloseMethod, Protocol),
498
handle_exception(State, Channel, Reason) ->
499
%% We don't trust the client at this point - force them to wait
500
%% for a bit so they can't DOS us with repeated failed logins etc.
501
timer:sleep(?SILENT_CLOSE_DELAY * 1000),
502
throw({handshake_error, State#v1.connection_state, Channel, Reason}).
504
%% we've "lost sync" with the client and hence must not accept any
506
fatal_frame_error(Error, Type, Channel, Payload, State) ->
507
frame_error(Error, Type, Channel, Payload, State),
508
%% grace period to allow transmission of error
509
timer:sleep(?SILENT_CLOSE_DELAY * 1000),
510
throw(fatal_frame_error).
512
frame_error(Error, Type, Channel, Payload, State) ->
513
{Str, Bin} = payload_snippet(Payload),
514
handle_exception(State, Channel,
515
rabbit_misc:amqp_error(frame_error,
516
"type ~p, ~s octets = ~p: ~p",
517
[Type, Str, Bin, Error], none)).
519
unexpected_frame(Type, Channel, Payload, State) ->
520
{Str, Bin} = payload_snippet(Payload),
521
handle_exception(State, Channel,
522
rabbit_misc:amqp_error(unexpected_frame,
523
"type ~p, ~s octets = ~p",
524
[Type, Str, Bin], none)).
526
payload_snippet(Payload) when size(Payload) =< 16 ->
528
payload_snippet(<<Snippet:16/binary, _/binary>>) ->
529
{"first 16", Snippet}.
531
%%--------------------------------------------------------------------------
533
create_channel(Channel, State) ->
534
#v1{sock = Sock, name = Name, queue_collector = Collector,
535
channel_sup_sup_pid = ChanSupSup,
536
connection = #connection{protocol = Protocol,
537
frame_max = FrameMax,
540
capabilities = Capabilities}} = State,
541
{ok, _ChSupPid, {ChPid, AState}} =
542
rabbit_channel_sup_sup:start_channel(
543
ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Name,
544
Protocol, User, VHost, Capabilities, Collector}),
545
MRef = erlang:monitor(process, ChPid),
546
put({ch_pid, ChPid}, {Channel, MRef}),
547
put({channel, Channel}, {ChPid, AState}),
550
channel_cleanup(ChPid) ->
551
case get({ch_pid, ChPid}) of
552
undefined -> undefined;
553
{Channel, MRef} -> credit_flow:peer_down(ChPid),
554
erase({channel, Channel}),
555
erase({ch_pid, ChPid}),
556
erlang:demonitor(MRef, [flush]),
560
all_channels() -> [ChPid || {{ch_pid, ChPid}, _ChannelMRef} <- get()].
562
%%--------------------------------------------------------------------------
490
564
handle_frame(Type, 0, Payload,
491
565
State = #v1{connection_state = CS,
492
566
connection = #connection{protocol = Protocol}})
502
576
handle_frame(Type, 0, Payload,
503
577
State = #v1{connection = #connection{protocol = Protocol}}) ->
504
578
case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of
505
error -> throw({unknown_frame, 0, Type, Payload});
579
error -> frame_error(unknown_frame, Type, 0, Payload, State);
506
580
heartbeat -> State;
507
581
{method, MethodName, FieldsBin} ->
508
582
handle_method0(MethodName, FieldsBin, State);
509
Other -> throw({unexpected_frame_on_channel0, Other})
583
_Other -> unexpected_frame(Type, 0, Payload, State)
511
585
handle_frame(Type, Channel, Payload,
512
State = #v1{connection = #connection{protocol = Protocol}}) ->
586
State = #v1{connection = #connection{protocol = Protocol}})
587
when ?IS_RUNNING(State) ->
513
588
case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of
514
error -> throw({unknown_frame, Channel, Type, Payload});
515
heartbeat -> throw({unexpected_heartbeat_frame, Channel});
516
AnalyzedFrame -> process_frame(AnalyzedFrame, Channel, State)
589
error -> frame_error(unknown_frame, Type, Channel, Payload, State);
590
heartbeat -> unexpected_frame(Type, Channel, Payload, State);
591
Frame -> process_frame(Frame, Channel, State)
593
handle_frame(Type, Channel, Payload, State) ->
594
unexpected_frame(Type, Channel, Payload, State).
519
596
process_frame(Frame, Channel, State) ->
520
case get({channel, Channel}) of
522
case process_channel_frame(Frame, ChPid, AState) of
523
{ok, NewAState} -> put({channel, Channel}, {ChPid, NewAState}),
524
post_process_frame(Frame, ChPid, State);
525
{error, Reason} -> handle_exception(State, Channel, Reason)
527
undefined when ?IS_RUNNING(State) ->
528
ok = create_channel(Channel, State),
529
process_frame(Frame, Channel, State);
531
throw({channel_frame_while_starting,
532
Channel, State#v1.connection_state, Frame})
597
{ChPid, AState} = case get({channel, Channel}) of
598
undefined -> create_channel(Channel, State);
601
case process_channel_frame(Frame, ChPid, AState) of
602
{ok, NewAState} -> put({channel, Channel}, {ChPid, NewAState}),
603
post_process_frame(Frame, ChPid, State);
604
{error, Reason} -> handle_exception(State, Channel, Reason)
607
process_channel_frame(Frame, ChPid, AState) ->
608
case rabbit_command_assembler:process(Frame, AState) of
609
{ok, NewAState} -> {ok, NewAState};
610
{ok, Method, NewAState} -> rabbit_channel:do(ChPid, Method),
612
{ok, Method, Content, NewAState} -> rabbit_channel:do_flow(
613
ChPid, Method, Content),
615
{error, Reason} -> {error, Reason}
535
618
post_process_frame({method, 'channel.close_ok', _}, ChPid, State) ->
546
629
post_process_frame(_Frame, _ChPid, State) ->
547
630
control_throttle(State).
632
%%--------------------------------------------------------------------------
634
%% We allow clients to exceed the frame size a little bit since quite
635
%% a few get it wrong - off-by 1 or 8 (empty frame size) are typical.
636
-define(FRAME_SIZE_FUDGE, ?EMPTY_FRAME_SIZE).
638
handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>,
639
State = #v1{connection = #connection{frame_max = FrameMax}})
640
when FrameMax /= 0 andalso
641
PayloadSize > FrameMax - ?EMPTY_FRAME_SIZE + ?FRAME_SIZE_FUDGE ->
643
{frame_too_large, PayloadSize, FrameMax - ?EMPTY_FRAME_SIZE},
644
Type, Channel, <<>>, State);
549
645
handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) ->
550
646
ensure_stats_timer(
551
647
switch_callback(State, {frame_payload, Type, Channel, PayloadSize},
552
648
PayloadSize + 1));
554
handle_input({frame_payload, Type, Channel, PayloadSize},
555
PayloadAndMarker, State) ->
556
case PayloadAndMarker of
557
<<Payload:PayloadSize/binary, ?FRAME_END>> ->
558
switch_callback(handle_frame(Type, Channel, Payload, State),
561
throw({bad_payload, Type, Channel, PayloadSize, PayloadAndMarker})
650
handle_input({frame_payload, Type, Channel, PayloadSize}, Data, State) ->
651
<<Payload:PayloadSize/binary, EndMarker>> = Data,
653
?FRAME_END -> State1 = handle_frame(Type, Channel, Payload, State),
654
switch_callback(State1, frame_header, 7);
655
_ -> fatal_frame_error({invalid_frame_end_marker, EndMarker},
656
Type, Channel, Payload, State)
564
659
%% The two rules pertaining to version negotiation:
630
725
handle_method0(MethodName, FieldsBin,
631
726
State = #v1{connection = #connection{protocol = Protocol}}) ->
634
case ?IS_RUNNING(State) of
635
true -> send_exception(State, 0, R);
636
%% We don't trust the client at this point - force
637
%% them to wait for a bit so they can't DOS us with
638
%% repeated failed logins etc.
639
false -> timer:sleep(?SILENT_CLOSE_DELAY * 1000),
640
throw({channel0_error, State#v1.connection_state, R})
644
728
handle_method0(Protocol:decode_method_fields(MethodName, FieldsBin),
646
730
catch exit:#amqp_error{method = none} = Reason ->
647
HandleException(Reason#amqp_error{method = MethodName});
731
handle_exception(State, 0, Reason#amqp_error{method = MethodName});
649
HandleException({Type, Reason, MethodName, erlang:get_stacktrace()})
733
Stack = erlang:get_stacktrace(),
734
handle_exception(State, 0, {Type, Reason, MethodName, Stack})
652
737
handle_method0(#'connection.start_ok'{mechanism = Mechanism,
814
901
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
818
i(name, #v1{sock = Sock}) ->
819
list_to_binary(name(Sock));
820
i(address, #v1{sock = Sock}) ->
821
socket_info(fun rabbit_net:sockname/1, fun ({A, _}) -> A end, Sock);
822
i(port, #v1{sock = Sock}) ->
823
socket_info(fun rabbit_net:sockname/1, fun ({_, P}) -> P end, Sock);
824
i(peer_address, #v1{sock = Sock}) ->
825
socket_info(fun rabbit_net:peername/1, fun ({A, _}) -> A end, Sock);
826
i(peer_port, #v1{sock = Sock}) ->
827
socket_info(fun rabbit_net:peername/1, fun ({_, P}) -> P end, Sock);
828
i(ssl, #v1{sock = Sock}) ->
829
rabbit_net:is_ssl(Sock);
830
i(ssl_protocol, #v1{sock = Sock}) ->
831
ssl_info(fun ({P, _}) -> P end, Sock);
832
i(ssl_key_exchange, #v1{sock = Sock}) ->
833
ssl_info(fun ({_, {K, _, _}}) -> K end, Sock);
834
i(ssl_cipher, #v1{sock = Sock}) ->
835
ssl_info(fun ({_, {_, C, _}}) -> C end, Sock);
836
i(ssl_hash, #v1{sock = Sock}) ->
837
ssl_info(fun ({_, {_, _, H}}) -> H end, Sock);
838
i(peer_cert_issuer, #v1{sock = Sock}) ->
839
cert_info(fun rabbit_ssl:peer_cert_issuer/1, Sock);
840
i(peer_cert_subject, #v1{sock = Sock}) ->
841
cert_info(fun rabbit_ssl:peer_cert_subject/1, Sock);
842
i(peer_cert_validity, #v1{sock = Sock}) ->
843
cert_info(fun rabbit_ssl:peer_cert_validity/1, Sock);
844
i(SockStat, #v1{sock = Sock}) when SockStat =:= recv_oct;
845
SockStat =:= recv_cnt;
846
SockStat =:= send_oct;
847
SockStat =:= send_cnt;
848
SockStat =:= send_pend ->
849
socket_info(fun () -> rabbit_net:getstat(Sock, [SockStat]) end,
850
fun ([{_, I}]) -> I end);
851
i(state, #v1{connection_state = S}) ->
853
i(last_blocked_by, #v1{last_blocked_by = By}) ->
855
i(last_blocked_age, #v1{last_blocked_at = never}) ->
903
i(pid, #v1{}) -> self();
904
i(name, #v1{name = Name}) -> Name;
905
i(host, #v1{host = Host}) -> Host;
906
i(peer_host, #v1{peer_host = PeerHost}) -> PeerHost;
907
i(port, #v1{port = Port}) -> Port;
908
i(peer_port, #v1{peer_port = PeerPort}) -> PeerPort;
909
i(SockStat, S) when SockStat =:= recv_oct;
910
SockStat =:= recv_cnt;
911
SockStat =:= send_oct;
912
SockStat =:= send_cnt;
913
SockStat =:= send_pend ->
914
socket_info(fun (Sock) -> rabbit_net:getstat(Sock, [SockStat]) end,
915
fun ([{_, I}]) -> I end, S);
916
i(ssl, #v1{sock = Sock}) -> rabbit_net:is_ssl(Sock);
917
i(ssl_protocol, S) -> ssl_info(fun ({P, _}) -> P end, S);
918
i(ssl_key_exchange, S) -> ssl_info(fun ({_, {K, _, _}}) -> K end, S);
919
i(ssl_cipher, S) -> ssl_info(fun ({_, {_, C, _}}) -> C end, S);
920
i(ssl_hash, S) -> ssl_info(fun ({_, {_, _, H}}) -> H end, S);
921
i(peer_cert_issuer, S) -> cert_info(fun rabbit_ssl:peer_cert_issuer/1, S);
922
i(peer_cert_subject, S) -> cert_info(fun rabbit_ssl:peer_cert_subject/1, S);
923
i(peer_cert_validity, S) -> cert_info(fun rabbit_ssl:peer_cert_validity/1, S);
924
i(state, #v1{connection_state = CS}) -> CS;
925
i(last_blocked_by, #v1{last_blocked_by = By}) -> By;
926
i(last_blocked_age, #v1{last_blocked_at = never}) ->
857
i(last_blocked_age, #v1{last_blocked_at = T}) ->
928
i(last_blocked_age, #v1{last_blocked_at = T}) ->
858
929
timer:now_diff(erlang:now(), T) / 1000000;
859
i(channels, #v1{}) ->
860
length(all_channels());
861
i(protocol, #v1{connection = #connection{protocol = none}}) ->
863
i(protocol, #v1{connection = #connection{protocol = Protocol}}) ->
930
i(channels, #v1{}) -> length(all_channels());
931
i(auth_mechanism, #v1{auth_mechanism = none}) ->
933
i(auth_mechanism, #v1{auth_mechanism = Mechanism}) ->
934
proplists:get_value(name, Mechanism:description());
935
i(protocol, #v1{connection = #connection{protocol = none}}) ->
937
i(protocol, #v1{connection = #connection{protocol = Protocol}}) ->
864
938
Protocol:version();
865
i(auth_mechanism, #v1{auth_mechanism = none}) ->
867
i(auth_mechanism, #v1{auth_mechanism = Mechanism}) ->
868
proplists:get_value(name, Mechanism:description());
869
i(user, #v1{connection = #connection{user = #user{username = Username}}}) ->
939
i(user, #v1{connection = #connection{user = none}}) ->
941
i(user, #v1{connection = #connection{user = #user{
942
username = Username}}}) ->
871
i(user, #v1{connection = #connection{user = none}}) ->
873
i(vhost, #v1{connection = #connection{vhost = VHost}}) ->
944
i(vhost, #v1{connection = #connection{vhost = VHost}}) ->
875
i(timeout, #v1{connection = #connection{timeout_sec = Timeout}}) ->
946
i(timeout, #v1{connection = #connection{timeout_sec = Timeout}}) ->
877
i(frame_max, #v1{connection = #connection{frame_max = FrameMax}}) ->
948
i(frame_max, #v1{connection = #connection{frame_max = FrameMax}}) ->
879
i(client_properties, #v1{connection = #connection{
880
client_properties = ClientProperties}}) ->
950
i(client_properties, #v1{connection = #connection{client_properties =
951
ClientProperties}}) ->
881
952
ClientProperties;
882
953
i(Item, #v1{}) ->
883
954
throw({bad_argument, Item}).
885
socket_info(Get, Select, Sock) ->
886
socket_info(fun() -> Get(Sock) end, Select).
888
socket_info(Get, Select) ->
956
socket_info(Get, Select, #v1{sock = Sock}) ->
890
958
{ok, T} -> Select(T);
962
ssl_info(F, #v1{sock = Sock}) ->
895
963
%% The first ok form is R14
896
964
%% The second is R13 - the extra term is exportability (by inspection,
897
965
%% the docs are wrong)
902
970
{ok, {P, {K, C, H, _}}} -> F({P, {K, C, H}})
905
cert_info(F, Sock) ->
973
cert_info(F, #v1{sock = Sock}) ->
906
974
case rabbit_net:peercert(Sock) of
908
976
{error, no_peercert} -> '';
909
977
{ok, Cert} -> list_to_binary(F(Cert))
912
%%--------------------------------------------------------------------------
914
create_channel(Channel, State) ->
915
#v1{sock = Sock, queue_collector = Collector,
916
channel_sup_sup_pid = ChanSupSup,
917
connection = #connection{protocol = Protocol,
918
frame_max = FrameMax,
921
capabilities = Capabilities}} = State,
922
{ok, _ChSupPid, {ChPid, AState}} =
923
rabbit_channel_sup_sup:start_channel(
924
ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), name(Sock),
925
Protocol, User, VHost, Capabilities, Collector}),
926
MRef = erlang:monitor(process, ChPid),
927
put({ch_pid, ChPid}, {Channel, MRef}),
928
put({channel, Channel}, {ChPid, AState}),
931
process_channel_frame(Frame, ChPid, AState) ->
932
case rabbit_command_assembler:process(Frame, AState) of
933
{ok, NewAState} -> {ok, NewAState};
934
{ok, Method, NewAState} -> rabbit_channel:do(ChPid, Method),
936
{ok, Method, Content, NewAState} -> rabbit_channel:do_flow(
937
ChPid, Method, Content),
939
{error, Reason} -> {error, Reason}
942
handle_exception(State = #v1{connection_state = closed}, _Channel, _Reason) ->
944
handle_exception(State, Channel, Reason) ->
945
send_exception(State, Channel, Reason).
947
send_exception(State = #v1{connection = #connection{protocol = Protocol}},
950
rabbit_binary_generator:map_exception(Channel, Reason, Protocol),
951
terminate_channels(),
952
State1 = close_connection(State),
953
ok = rabbit_writer:internal_send_command(
954
State1#v1.sock, 0, CloseMethod, Protocol),
957
980
emit_stats(State) ->
958
981
rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)),
959
982
rabbit_event:reset_stats_timer(State, #v1.stats_timer).