1
%% The contents of this file are subject to the Mozilla Public License
2
%% Version 1.1 (the "License"); you may not use this file except in
3
%% compliance with the License. You may obtain a copy of the License at
4
%% http://www.mozilla.org/MPL/
6
%% Software distributed under the License is distributed on an "AS IS"
7
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
8
%% License for the specific language governing rights and limitations
11
%% The Original Code is RabbitMQ.
13
%% The Initial Developers of the Original Code are LShift Ltd,
14
%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
16
%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
17
%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
18
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
19
%% Technologies LLC, and Rabbit Technologies Ltd.
21
%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
22
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
23
%% Copyright (C) 2007-2009 Cohesive Financial Technologies
24
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
25
%% (C) 2007-2009 Rabbit Technologies Ltd.
27
%% All Rights Reserved.
29
%% Contributor(s): ______________________________________.
1
%% The contents of this file are subject to the Mozilla Public License
2
%% Version 1.1 (the "License"); you may not use this file except in
3
%% compliance with the License. You may obtain a copy of the License
4
%% at http://www.mozilla.org/MPL/
6
%% Software distributed under the License is distributed on an "AS IS"
7
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
8
%% the License for the specific language governing rights and
9
%% limitations under the License.
11
%% The Original Code is RabbitMQ.
13
%% The Initial Developer of the Original Code is VMware, Inc.
14
%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
31
17
-module(rabbit_stomp_reader).
33
-export([start_link/1]).
35
-export([conserve_memory/2]).
19
-export([start_link/2]).
21
-export([conserve_resources/2]).
23
-include("rabbit_stomp.hrl").
37
24
-include("rabbit_stomp_frame.hrl").
39
-record(reader_state, {socket, parse_state, processor, state, iterations}).
41
start_link(ProcessorPid) ->
42
{ok, proc_lib:spawn_link(?MODULE, init, [ProcessorPid])}.
25
-include_lib("amqp_client/include/amqp_client.hrl").
27
-record(reader_state, {socket, parse_state, processor, state, iterations,
28
conserve_resources, recv_outstanding}).
30
%%----------------------------------------------------------------------------
32
start_link(SupPid, Configuration) ->
33
{ok, proc_lib:spawn_link(?MODULE, init, [SupPid, Configuration])}.
35
log(Level, Fmt, Args) -> rabbit_log:log(connection, Level, Fmt, Args).
37
init(SupPid, Configuration) ->
47
{ok, {PeerAddress, PeerPort}} = rabbit_net:peername(Sock),
48
PeerAddressS = inet_parse:ntoa(PeerAddress),
49
error_logger:info_msg("starting STOMP connection ~p from ~s:~p~n",
50
[self(), PeerAddressS, PeerPort]),
39
{go, Sock0, SockTransform} ->
40
{ok, Sock} = SockTransform(Sock0),
41
{ok, ProcessorPid} = start_processor(SupPid, Configuration, Sock),
42
{ok, ConnStr} = rabbit_net:connection_string(Sock, inbound),
43
log(info, "accepting STOMP connection ~p (~s)~n",
52
46
ParseState = rabbit_stomp_frame:initial_state(),
55
register_memory_alarm(
56
#reader_state{socket = Sock,
57
parse_state = ParseState,
58
processor = ProcessorPid,
50
register_resource_alarm(
51
#reader_state{socket = Sock,
52
parse_state = ParseState,
53
processor = ProcessorPid,
56
conserve_resources = false,
57
recv_outstanding = false})), 0),
58
log(info, "closing STOMP connection ~p (~s)~n",
61
Ex -> log(error, "closing STOMP connection ~p (~s):~n~p~n",
62
[self(), ConnStr, Ex])
62
rabbit_stomp_processor:flush_and_die(ProcessorPid),
63
error_logger:info_msg("ending STOMP connection ~p from ~s:~p~n",
64
[self(), PeerAddressS, PeerPort])
64
rabbit_stomp_processor:flush_and_die(ProcessorPid)
68
mainloop(State = #reader_state{socket = Sock}, ByteCount) ->
69
run_socket(State, ByteCount),
70
mainloop(State0 = #reader_state{socket = Sock}, ByteCount) ->
71
State = run_socket(State0, ByteCount),
71
73
{inet_async, Sock, _Ref, {ok, Data}} ->
72
process_received_bytes(Data, State);
73
{inet_async, Sock, _Ref, {error, closed}} ->
74
error_logger:info_msg("Socket ~p closed by client~n", [Sock]),
76
{inet_async, Sock, _Ref, {error, Reason}} ->
77
error_logger:error_msg("Socket ~p closed abruptly with "
81
{conserve_memory, Conserve} ->
82
mainloop(internal_conserve_memory(Conserve, State), ByteCount)
74
process_received_bytes(Data, State#reader_state{recv_outstanding = false});
75
{inet_async, _Sock, _Ref, {error, closed}} ->
77
{inet_async, _Sock, _Ref, {error, Reason}} ->
78
throw({inet_error, Reason});
79
{conserve_resources, Conserve} ->
82
State#reader_state{conserve_resources = Conserve}), ByteCount);
84
credit_flow:handle_bump_msg(Msg),
85
mainloop(control_throttle(State), ByteCount)
85
88
process_received_bytes([], State) ->
96
99
rabbit_stomp_processor:process_frame(Processor, Frame),
97
100
PS = rabbit_stomp_frame:initial_state(),
98
101
process_received_bytes(Rest,
101
state = next_state(S, Frame)})
105
state = next_state(S, Frame)}))
104
conserve_memory(Pid, Conserve) ->
105
Pid ! {conserve_memory, Conserve},
108
conserve_resources(Pid, Conserve) ->
109
Pid ! {conserve_resources, Conserve},
108
register_memory_alarm(State) ->
109
internal_conserve_memory(
110
rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), State).
112
register_resource_alarm(State) ->
113
rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}), State.
112
internal_conserve_memory(true, State = #reader_state{state = running}) ->
113
State#reader_state{state = blocking};
114
internal_conserve_memory(false, State) ->
115
State#reader_state{state = running};
116
internal_conserve_memory(_Conserve, State) ->
115
control_throttle(State = #reader_state{state = CS,
116
conserve_resources = Mem}) ->
117
case {CS, Mem orelse credit_flow:blocked()} of
118
{running, true} -> State#reader_state{state = blocking};
119
{blocking, false} -> State#reader_state{state = running};
120
{blocked, false} -> State#reader_state{state = running};
119
124
next_state(blocking, #stomp_frame{command = "SEND"}) ->
121
126
next_state(S, _) ->
124
run_socket(#reader_state{state = blocked}, _ByteCount) ->
126
run_socket(#reader_state{socket = Sock}, ByteCount) ->
129
run_socket(State = #reader_state{state = blocked}, _ByteCount) ->
131
run_socket(State = #reader_state{recv_outstanding = true}, _ByteCount) ->
133
run_socket(State = #reader_state{socket = Sock}, ByteCount) ->
127
134
rabbit_net:async_recv(Sock, ByteCount, infinity),
135
State#reader_state{recv_outstanding = true}.
137
%%----------------------------------------------------------------------------
139
start_processor(SupPid, Configuration, Sock) ->
140
SendFun = fun (sync, IoData) ->
141
%% no messages emitted
142
catch rabbit_net:send(Sock, IoData);
144
%% {inet_reply, _, _} will appear soon
145
%% We ignore certain errors here, as we will be
146
%% receiving an asynchronous notification of the
147
%% same (or a related) fault shortly anyway. See
149
catch rabbit_net:port_command(Sock, IoData)
153
fun (SendTimeout, SendFin, ReceiveTimeout, ReceiveFun) ->
154
SHF = rabbit_heartbeat:start_heartbeat_fun(SupPid),
155
SHF(Sock, SendTimeout, SendFin, ReceiveTimeout, ReceiveFun)
158
rabbit_stomp_client_sup:start_processor(
159
SupPid, [SendFun, adapter_info(Sock), StartHeartbeatFun,
160
ssl_login_name(Sock, Configuration), Configuration]).
163
adapter_info(Sock) ->
164
{Addr, Port} = case rabbit_net:sockname(Sock) of
166
_ -> {unknown, unknown}
168
{PeerAddr, PeerPort} = case rabbit_net:peername(Sock) of
170
_ -> {unknown, unknown}
172
Name = case rabbit_net:connection_string(Sock, inbound) of
176
#adapter_info{protocol = {'STOMP', 0},
177
name = list_to_binary(Name),
180
peer_address = PeerAddr,
181
peer_port = PeerPort,
182
additional_info = maybe_ssl_info(Sock)}.
184
maybe_ssl_info(Sock) ->
185
case rabbit_net:is_ssl(Sock) of
186
true -> [{ssl, true}] ++ ssl_info(Sock) ++ ssl_cert_info(Sock);
187
false -> [{ssl, false}]
191
{Protocol, KeyExchange, Cipher, Hash} =
192
case rabbit_net:ssl_info(Sock) of
193
{ok, {P, {K, C, H}}} -> {P, K, C, H};
194
{ok, {P, {K, C, H, _}}} -> {P, K, C, H};
195
_ -> {unknown, unknown, unknown, unknown}
197
[{ssl_protocol, Protocol},
198
{ssl_key_exchange, KeyExchange},
199
{ssl_cipher, Cipher},
202
ssl_cert_info(Sock) ->
203
case rabbit_net:peercert(Sock) of
205
[{peer_cert_issuer, list_to_binary(
206
rabbit_ssl:peer_cert_issuer(Cert))},
207
{peer_cert_subject, list_to_binary(
208
rabbit_ssl:peer_cert_subject(Cert))},
209
{peer_cert_validity, list_to_binary(
210
rabbit_ssl:peer_cert_validity(Cert))}];
215
ssl_login_name(_Sock, #stomp_configuration{ssl_cert_login = false}) ->
217
ssl_login_name(Sock, #stomp_configuration{ssl_cert_login = true}) ->
218
case rabbit_net:peercert(Sock) of
219
{ok, C} -> case rabbit_ssl:peer_cert_auth_name(C) of
224
{error, no_peercert} -> none;