24
24
-include("rabbit_stomp_frame.hrl").
25
25
-include_lib("amqp_client/include/amqp_client.hrl").
27
-record(reader_state, {socket, parse_state, processor, state, iterations,
27
-record(reader_state, {socket, parse_state, processor, state,
28
28
conserve_resources, recv_outstanding}).
30
30
%%----------------------------------------------------------------------------
52
54
parse_state = ParseState,
53
55
processor = ProcessorPid,
56
57
conserve_resources = false,
57
recv_outstanding = false})), 0),
58
recv_outstanding = false}))),
58
59
log(info, "closing STOMP connection ~p (~s)~n",
61
Ex -> log(error, "closing STOMP connection ~p (~s):~n~p~n",
62
_:Ex -> log(error, "closing STOMP connection ~p (~s):~n~p~n",
62
63
[self(), ConnStr, Ex])
64
65
rabbit_stomp_processor:flush_and_die(ProcessorPid)
70
mainloop(State0 = #reader_state{socket = Sock}, ByteCount) ->
71
State = run_socket(State0, ByteCount),
71
mainloop(State0 = #reader_state{socket = Sock}) ->
72
State = run_socket(State0),
73
74
{inet_async, Sock, _Ref, {ok, Data}} ->
74
process_received_bytes(Data, State#reader_state{recv_outstanding = false});
75
process_received_bytes(
76
Data, State#reader_state{recv_outstanding = false});
75
77
{inet_async, _Sock, _Ref, {error, closed}} ->
77
79
{inet_async, _Sock, _Ref, {error, Reason}} ->
78
80
throw({inet_error, Reason});
79
81
{conserve_resources, Conserve} ->
82
State#reader_state{conserve_resources = Conserve}), ByteCount);
82
mainloop(control_throttle(
83
State#reader_state{conserve_resources = Conserve}));
83
84
{bump_credit, Msg} ->
84
85
credit_flow:handle_bump_msg(Msg),
85
mainloop(control_throttle(State), ByteCount)
86
mainloop(control_throttle(State))
88
89
process_received_bytes([], State) ->
90
91
process_received_bytes(Bytes,
91
92
State = #reader_state{
92
93
processor = Processor,
93
94
parse_state = ParseState,
95
96
case rabbit_stomp_frame:parse(Bytes, ParseState) of
96
{more, ParseState1, Length} ->
97
mainloop(State#reader_state{parse_state = ParseState1}, Length);
97
{more, ParseState1} ->
98
mainloop(State#reader_state{parse_state = ParseState1});
98
99
{ok, Frame, Rest} ->
99
100
rabbit_stomp_processor:process_frame(Processor, Frame),
100
101
PS = rabbit_stomp_frame:initial_state(),
126
127
next_state(S, _) ->
129
run_socket(State = #reader_state{state = blocked}, _ByteCount) ->
131
run_socket(State = #reader_state{recv_outstanding = true}, _ByteCount) ->
133
run_socket(State = #reader_state{socket = Sock}, ByteCount) ->
134
rabbit_net:async_recv(Sock, ByteCount, infinity),
130
run_socket(State = #reader_state{state = blocked}) ->
132
run_socket(State = #reader_state{recv_outstanding = true}) ->
134
run_socket(State = #reader_state{socket = Sock}) ->
135
rabbit_net:async_recv(Sock, 0, infinity),
135
136
State#reader_state{recv_outstanding = true}.
137
138
%%----------------------------------------------------------------------------
173
174
{ok, Res3} -> Res3;
176
#adapter_info{protocol = {'STOMP', 0},
177
name = list_to_binary(Name),
180
peer_address = PeerAddr,
181
peer_port = PeerPort,
182
additional_info = maybe_ssl_info(Sock)}.
177
#amqp_adapter_info{protocol = {'STOMP', 0},
178
name = list_to_binary(Name),
181
peer_address = PeerAddr,
182
peer_port = PeerPort,
183
additional_info = maybe_ssl_info(Sock)}.
184
185
maybe_ssl_info(Sock) ->
185
186
case rabbit_net:is_ssl(Sock) of