~ubuntu-branches/ubuntu/saucy/rabbitmq-server/saucy

« back to all changes in this revision

Viewing changes to plugins-src/rabbitmq-stomp/src/rabbit_stomp_reader.erl

  • Committer: Package Import Robot
  • Author(s): Emile Joubert
  • Date: 2012-11-19 11:42:31 UTC
  • mfrom: (0.2.18) (0.1.32 sid)
  • Revision ID: package-import@ubuntu.com-20121119114231-hvapkn4akng09etr
Tags: 3.0.0-1
New upstream release

Show diffs side-by-side

added added

removed removed

Lines of Context:
24
24
-include("rabbit_stomp_frame.hrl").
25
25
-include_lib("amqp_client/include/amqp_client.hrl").
26
26
 
27
 
-record(reader_state, {socket, parse_state, processor, state, iterations,
 
27
-record(reader_state, {socket, parse_state, processor, state,
28
28
                       conserve_resources, recv_outstanding}).
29
29
 
30
30
%%----------------------------------------------------------------------------
45
45
 
46
46
            ParseState = rabbit_stomp_frame:initial_state(),
47
47
            try
 
48
                rabbit_misc:throw_on_error(
 
49
                  inet_error, fun () -> rabbit_net:tune_buffer_size(Sock) end),
48
50
                mainloop(
49
51
                  control_throttle(
50
52
                    register_resource_alarm(
52
54
                                    parse_state        = ParseState,
53
55
                                    processor          = ProcessorPid,
54
56
                                    state              = running,
55
 
                                    iterations         = 0,
56
57
                                    conserve_resources = false,
57
 
                                    recv_outstanding   = false})), 0),
 
58
                                    recv_outstanding   = false}))),
58
59
                log(info, "closing STOMP connection ~p (~s)~n",
59
60
                    [self(), ConnStr])
60
61
            catch
61
 
                Ex -> log(error, "closing STOMP connection ~p (~s):~n~p~n",
 
62
                _:Ex -> log(error, "closing STOMP connection ~p (~s):~n~p~n",
62
63
                          [self(), ConnStr, Ex])
63
64
            after
64
65
                rabbit_stomp_processor:flush_and_die(ProcessorPid)
67
68
            done
68
69
    end.
69
70
 
70
 
mainloop(State0 = #reader_state{socket = Sock}, ByteCount) ->
71
 
    State = run_socket(State0, ByteCount),
 
71
mainloop(State0 = #reader_state{socket = Sock}) ->
 
72
    State = run_socket(State0),
72
73
    receive
73
74
        {inet_async, Sock, _Ref, {ok, Data}} ->
74
 
            process_received_bytes(Data, State#reader_state{recv_outstanding = false});
 
75
            process_received_bytes(
 
76
              Data, State#reader_state{recv_outstanding = false});
75
77
        {inet_async, _Sock, _Ref, {error, closed}} ->
76
78
            ok;
77
79
        {inet_async, _Sock, _Ref, {error, Reason}} ->
78
80
            throw({inet_error, Reason});
79
81
        {conserve_resources, Conserve} ->
80
 
            mainloop(
81
 
              control_throttle(
82
 
                State#reader_state{conserve_resources = Conserve}), ByteCount);
 
82
            mainloop(control_throttle(
 
83
                       State#reader_state{conserve_resources = Conserve}));
83
84
        {bump_credit, Msg} ->
84
85
            credit_flow:handle_bump_msg(Msg),
85
 
            mainloop(control_throttle(State), ByteCount)
 
86
            mainloop(control_throttle(State))
86
87
    end.
87
88
 
88
89
process_received_bytes([], State) ->
89
 
    mainloop(State, 0);
 
90
    mainloop(State);
90
91
process_received_bytes(Bytes,
91
92
                       State = #reader_state{
92
93
                         processor   = Processor,
93
94
                         parse_state = ParseState,
94
95
                         state       = S}) ->
95
96
    case rabbit_stomp_frame:parse(Bytes, ParseState) of
96
 
        {more, ParseState1, Length} ->
97
 
            mainloop(State#reader_state{parse_state = ParseState1}, Length);
 
97
        {more, ParseState1} ->
 
98
            mainloop(State#reader_state{parse_state = ParseState1});
98
99
        {ok, Frame, Rest} ->
99
100
            rabbit_stomp_processor:process_frame(Processor, Frame),
100
101
            PS = rabbit_stomp_frame:initial_state(),
126
127
next_state(S, _) ->
127
128
    S.
128
129
 
129
 
run_socket(State = #reader_state{state = blocked}, _ByteCount) ->
130
 
    State;
131
 
run_socket(State = #reader_state{recv_outstanding = true}, _ByteCount) ->
132
 
    State;
133
 
run_socket(State = #reader_state{socket = Sock}, ByteCount) ->
134
 
    rabbit_net:async_recv(Sock, ByteCount, infinity),
 
130
run_socket(State = #reader_state{state = blocked}) ->
 
131
    State;
 
132
run_socket(State = #reader_state{recv_outstanding = true}) ->
 
133
    State;
 
134
run_socket(State = #reader_state{socket = Sock}) ->
 
135
    rabbit_net:async_recv(Sock, 0, infinity),
135
136
    State#reader_state{recv_outstanding = true}.
136
137
 
137
138
%%----------------------------------------------------------------------------
173
174
               {ok, Res3} -> Res3;
174
175
               _          -> unknown
175
176
           end,
176
 
    #adapter_info{protocol        = {'STOMP', 0},
177
 
                  name            = list_to_binary(Name),
178
 
                  address         = Addr,
179
 
                  port            = Port,
180
 
                  peer_address    = PeerAddr,
181
 
                  peer_port       = PeerPort,
182
 
                  additional_info = maybe_ssl_info(Sock)}.
 
177
    #amqp_adapter_info{protocol        = {'STOMP', 0},
 
178
                       name            = list_to_binary(Name),
 
179
                       address         = Addr,
 
180
                       port            = Port,
 
181
                       peer_address    = PeerAddr,
 
182
                       peer_port       = PeerPort,
 
183
                       additional_info = maybe_ssl_info(Sock)}.
183
184
 
184
185
maybe_ssl_info(Sock) ->
185
186
    case rabbit_net:is_ssl(Sock) of