~ubuntu-branches/ubuntu/saucy/rabbitmq-server/saucy-proposed

« back to all changes in this revision

Viewing changes to plugins-src/rabbitmq-stomp/src/rabbit_stomp_reader.erl

  • Committer: Package Import Robot
  • Author(s): Emile Joubert
  • Date: 2012-06-22 17:48:28 UTC
  • mfrom: (0.2.16) (0.1.28 sid)
  • Revision ID: package-import@ubuntu.com-20120622174828-1t2dts9myai6ogqo
Tags: 2.8.4-1
New upstream release

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
%%   The contents of this file are subject to the Mozilla Public License
2
 
%%   Version 1.1 (the "License"); you may not use this file except in
3
 
%%   compliance with the License. You may obtain a copy of the License at
4
 
%%   http://www.mozilla.org/MPL/
5
 
%%
6
 
%%   Software distributed under the License is distributed on an "AS IS"
7
 
%%   basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
8
 
%%   License for the specific language governing rights and limitations
9
 
%%   under the License.
10
 
%%
11
 
%%   The Original Code is RabbitMQ.
12
 
%%
13
 
%%   The Initial Developers of the Original Code are LShift Ltd,
14
 
%%   Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
15
 
%%
16
 
%%   Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
17
 
%%   Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
18
 
%%   are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
19
 
%%   Technologies LLC, and Rabbit Technologies Ltd.
20
 
%%
21
 
%%   Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
22
 
%%   Ltd. Portions created by Cohesive Financial Technologies LLC are
23
 
%%   Copyright (C) 2007-2009 Cohesive Financial Technologies
24
 
%%   LLC. Portions created by Rabbit Technologies Ltd are Copyright
25
 
%%   (C) 2007-2009 Rabbit Technologies Ltd.
26
 
%%
27
 
%%   All Rights Reserved.
28
 
%%
29
 
%%   Contributor(s): ______________________________________.
30
 
%%
 
1
%% The contents of this file are subject to the Mozilla Public License
 
2
%% Version 1.1 (the "License"); you may not use this file except in
 
3
%% compliance with the License. You may obtain a copy of the License
 
4
%% at http://www.mozilla.org/MPL/
 
5
%%
 
6
%% Software distributed under the License is distributed on an "AS IS"
 
7
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
 
8
%% the License for the specific language governing rights and
 
9
%% limitations under the License.
 
10
%%
 
11
%% The Original Code is RabbitMQ.
 
12
%%
 
13
%% The Initial Developer of the Original Code is VMware, Inc.
 
14
%% Copyright (c) 2007-2012 VMware, Inc.  All rights reserved.
 
15
%%
 
16
 
31
17
-module(rabbit_stomp_reader).
32
18
 
33
 
-export([start_link/1]).
34
 
-export([init/1]).
35
 
-export([conserve_memory/2]).
 
19
-export([start_link/2]).
 
20
-export([init/2]).
 
21
-export([conserve_resources/2]).
36
22
 
 
23
-include("rabbit_stomp.hrl").
37
24
-include("rabbit_stomp_frame.hrl").
38
 
 
39
 
-record(reader_state, {socket, parse_state, processor, state, iterations}).
40
 
 
41
 
start_link(ProcessorPid) ->
42
 
        {ok, proc_lib:spawn_link(?MODULE, init, [ProcessorPid])}.
43
 
 
44
 
init(ProcessorPid) ->
 
25
-include_lib("amqp_client/include/amqp_client.hrl").
 
26
 
 
27
-record(reader_state, {socket, parse_state, processor, state, iterations,
 
28
                       conserve_resources, recv_outstanding}).
 
29
 
 
30
%%----------------------------------------------------------------------------
 
31
 
 
32
start_link(SupPid, Configuration) ->
 
33
        {ok, proc_lib:spawn_link(?MODULE, init, [SupPid, Configuration])}.
 
34
 
 
35
log(Level, Fmt, Args) -> rabbit_log:log(connection, Level, Fmt, Args).
 
36
 
 
37
init(SupPid, Configuration) ->
45
38
    receive
46
 
        {go, Sock} ->
47
 
            {ok, {PeerAddress, PeerPort}} = rabbit_net:peername(Sock),
48
 
            PeerAddressS = inet_parse:ntoa(PeerAddress),
49
 
            error_logger:info_msg("starting STOMP connection ~p from ~s:~p~n",
50
 
                                  [self(), PeerAddressS, PeerPort]),
 
39
        {go, Sock0, SockTransform} ->
 
40
            {ok, Sock} = SockTransform(Sock0),
 
41
            {ok, ProcessorPid} = start_processor(SupPid, Configuration, Sock),
 
42
            {ok, ConnStr} = rabbit_net:connection_string(Sock, inbound),
 
43
            log(info, "accepting STOMP connection ~p (~s)~n",
 
44
                [self(), ConnStr]),
51
45
 
52
46
            ParseState = rabbit_stomp_frame:initial_state(),
53
47
            try
54
48
                mainloop(
55
 
                   register_memory_alarm(
56
 
                     #reader_state{socket      = Sock,
57
 
                                   parse_state = ParseState,
58
 
                                   processor   = ProcessorPid,
59
 
                                   state       = running,
60
 
                                   iterations  = 0}), 0)
 
49
                  control_throttle(
 
50
                    register_resource_alarm(
 
51
                      #reader_state{socket             = Sock,
 
52
                                    parse_state        = ParseState,
 
53
                                    processor          = ProcessorPid,
 
54
                                    state              = running,
 
55
                                    iterations         = 0,
 
56
                                    conserve_resources = false,
 
57
                                    recv_outstanding   = false})), 0),
 
58
                log(info, "closing STOMP connection ~p (~s)~n",
 
59
                    [self(), ConnStr])
 
60
            catch
 
61
                Ex -> log(error, "closing STOMP connection ~p (~s):~n~p~n",
 
62
                          [self(), ConnStr, Ex])
61
63
            after
62
 
                rabbit_stomp_processor:flush_and_die(ProcessorPid),
63
 
                error_logger:info_msg("ending STOMP connection ~p from ~s:~p~n",
64
 
                                      [self(), PeerAddressS, PeerPort])
65
 
            end
 
64
                rabbit_stomp_processor:flush_and_die(ProcessorPid)
 
65
            end,
 
66
 
 
67
            done
66
68
    end.
67
69
 
68
 
mainloop(State = #reader_state{socket = Sock}, ByteCount) ->
69
 
    run_socket(State, ByteCount),
 
70
mainloop(State0 = #reader_state{socket = Sock}, ByteCount) ->
 
71
    State = run_socket(State0, ByteCount),
70
72
    receive
71
73
        {inet_async, Sock, _Ref, {ok, Data}} ->
72
 
            process_received_bytes(Data, State);
73
 
        {inet_async, Sock, _Ref, {error, closed}} ->
74
 
            error_logger:info_msg("Socket ~p closed by client~n", [Sock]),
75
 
            ok;
76
 
        {inet_async, Sock, _Ref, {error, Reason}} ->
77
 
            error_logger:error_msg("Socket ~p closed abruptly with "
78
 
                                   "error code ~p~n",
79
 
                                   [Sock, Reason]),
80
 
            ok;
81
 
        {conserve_memory, Conserve} ->
82
 
            mainloop(internal_conserve_memory(Conserve, State), ByteCount)
 
74
            process_received_bytes(Data, State#reader_state{recv_outstanding = false});
 
75
        {inet_async, _Sock, _Ref, {error, closed}} ->
 
76
            ok;
 
77
        {inet_async, _Sock, _Ref, {error, Reason}} ->
 
78
            throw({inet_error, Reason});
 
79
        {conserve_resources, Conserve} ->
 
80
            mainloop(
 
81
              control_throttle(
 
82
                State#reader_state{conserve_resources = Conserve}), ByteCount);
 
83
        {bump_credit, Msg} ->
 
84
            credit_flow:handle_bump_msg(Msg),
 
85
            mainloop(control_throttle(State), ByteCount)
83
86
    end.
84
87
 
85
88
process_received_bytes([], State) ->
96
99
            rabbit_stomp_processor:process_frame(Processor, Frame),
97
100
            PS = rabbit_stomp_frame:initial_state(),
98
101
            process_received_bytes(Rest,
99
 
                                   State#reader_state{
100
 
                                     parse_state = PS,
101
 
                                     state       = next_state(S, Frame)})
 
102
                                   control_throttle(
 
103
                                     State#reader_state{
 
104
                                       parse_state = PS,
 
105
                                       state       = next_state(S, Frame)}))
102
106
    end.
103
107
 
104
 
conserve_memory(Pid, Conserve) ->
105
 
    Pid ! {conserve_memory, Conserve},
 
108
conserve_resources(Pid, Conserve) ->
 
109
    Pid ! {conserve_resources, Conserve},
106
110
    ok.
107
111
 
108
 
register_memory_alarm(State) ->
109
 
    internal_conserve_memory(
110
 
      rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), State).
 
112
register_resource_alarm(State) ->
 
113
    rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}), State.
111
114
 
112
 
internal_conserve_memory(true, State = #reader_state{state = running}) ->
113
 
    State#reader_state{state = blocking};
114
 
internal_conserve_memory(false, State) ->
115
 
    State#reader_state{state = running};
116
 
internal_conserve_memory(_Conserve, State) ->
117
 
    State.
 
115
control_throttle(State = #reader_state{state              = CS,
 
116
                                       conserve_resources = Mem}) ->
 
117
    case {CS, Mem orelse credit_flow:blocked()} of
 
118
        {running,   true} -> State#reader_state{state = blocking};
 
119
        {blocking, false} -> State#reader_state{state = running};
 
120
        {blocked,  false} -> State#reader_state{state = running};
 
121
        {_,            _} -> State
 
122
    end.
118
123
 
119
124
next_state(blocking, #stomp_frame{command = "SEND"}) ->
120
125
    blocked;
121
126
next_state(S, _) ->
122
127
    S.
123
128
 
124
 
run_socket(#reader_state{state = blocked}, _ByteCount) ->
125
 
    ok;
126
 
run_socket(#reader_state{socket = Sock}, ByteCount) ->
 
129
run_socket(State = #reader_state{state = blocked}, _ByteCount) ->
 
130
    State;
 
131
run_socket(State = #reader_state{recv_outstanding = true}, _ByteCount) ->
 
132
    State;
 
133
run_socket(State = #reader_state{socket = Sock}, ByteCount) ->
127
134
    rabbit_net:async_recv(Sock, ByteCount, infinity),
128
 
    ok.
 
135
    State#reader_state{recv_outstanding = true}.
 
136
 
 
137
%%----------------------------------------------------------------------------
 
138
 
 
139
start_processor(SupPid, Configuration, Sock) ->
 
140
    SendFun = fun (sync, IoData) ->
 
141
                      %% no messages emitted
 
142
                      catch rabbit_net:send(Sock, IoData);
 
143
                  (async, IoData) ->
 
144
                      %% {inet_reply, _, _} will appear soon
 
145
                      %% We ignore certain errors here, as we will be
 
146
                      %% receiving an asynchronous notification of the
 
147
                      %% same (or a related) fault shortly anyway. See
 
148
                      %% bug 21365.
 
149
                      catch rabbit_net:port_command(Sock, IoData)
 
150
              end,
 
151
 
 
152
    StartHeartbeatFun =
 
153
        fun (SendTimeout, SendFin, ReceiveTimeout, ReceiveFun) ->
 
154
                SHF = rabbit_heartbeat:start_heartbeat_fun(SupPid),
 
155
                SHF(Sock, SendTimeout, SendFin, ReceiveTimeout, ReceiveFun)
 
156
        end,
 
157
 
 
158
    rabbit_stomp_client_sup:start_processor(
 
159
      SupPid, [SendFun, adapter_info(Sock), StartHeartbeatFun,
 
160
               ssl_login_name(Sock, Configuration), Configuration]).
 
161
 
 
162
 
 
163
adapter_info(Sock) ->
 
164
    {Addr, Port} = case rabbit_net:sockname(Sock) of
 
165
                       {ok, Res} -> Res;
 
166
                       _         -> {unknown, unknown}
 
167
                   end,
 
168
    {PeerAddr, PeerPort} = case rabbit_net:peername(Sock) of
 
169
                               {ok, Res2} -> Res2;
 
170
                               _          -> {unknown, unknown}
 
171
                           end,
 
172
    Name = case rabbit_net:connection_string(Sock, inbound) of
 
173
               {ok, Res3} -> Res3;
 
174
               _          -> unknown
 
175
           end,
 
176
    #adapter_info{protocol        = {'STOMP', 0},
 
177
                  name            = list_to_binary(Name),
 
178
                  address         = Addr,
 
179
                  port            = Port,
 
180
                  peer_address    = PeerAddr,
 
181
                  peer_port       = PeerPort,
 
182
                  additional_info = maybe_ssl_info(Sock)}.
 
183
 
 
184
maybe_ssl_info(Sock) ->
 
185
    case rabbit_net:is_ssl(Sock) of
 
186
        true  -> [{ssl, true}] ++ ssl_info(Sock) ++ ssl_cert_info(Sock);
 
187
        false -> [{ssl, false}]
 
188
    end.
 
189
 
 
190
ssl_info(Sock) ->
 
191
    {Protocol, KeyExchange, Cipher, Hash} =
 
192
        case rabbit_net:ssl_info(Sock) of
 
193
            {ok, {P, {K, C, H}}}    -> {P, K, C, H};
 
194
            {ok, {P, {K, C, H, _}}} -> {P, K, C, H};
 
195
            _                       -> {unknown, unknown, unknown, unknown}
 
196
        end,
 
197
    [{ssl_protocol,       Protocol},
 
198
     {ssl_key_exchange,   KeyExchange},
 
199
     {ssl_cipher,         Cipher},
 
200
     {ssl_hash,           Hash}].
 
201
 
 
202
ssl_cert_info(Sock) ->
 
203
    case rabbit_net:peercert(Sock) of
 
204
        {ok, Cert} ->
 
205
            [{peer_cert_issuer,   list_to_binary(
 
206
                                    rabbit_ssl:peer_cert_issuer(Cert))},
 
207
             {peer_cert_subject,  list_to_binary(
 
208
                                    rabbit_ssl:peer_cert_subject(Cert))},
 
209
             {peer_cert_validity, list_to_binary(
 
210
                                    rabbit_ssl:peer_cert_validity(Cert))}];
 
211
        _ ->
 
212
            []
 
213
    end.
 
214
 
 
215
ssl_login_name(_Sock, #stomp_configuration{ssl_cert_login = false}) ->
 
216
    none;
 
217
ssl_login_name(Sock, #stomp_configuration{ssl_cert_login = true}) ->
 
218
    case rabbit_net:peercert(Sock) of
 
219
        {ok, C}              -> case rabbit_ssl:peer_cert_auth_name(C) of
 
220
                                    unsafe    -> none;
 
221
                                    not_found -> none;
 
222
                                    Name      -> Name
 
223
                                end;
 
224
        {error, no_peercert} -> none;
 
225
        nossl                -> none
 
226
    end.