178
178
log(Level, Fmt, Args) -> rabbit_log:log(connection, Level, Fmt, Args).
180
socket_error(Reason) ->
181
log(error, "error on AMQP connection ~p: ~p (~s)~n",
182
[self(), Reason, rabbit_misc:format_inet_error(Reason)]).
180
184
inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
182
186
socket_op(Sock, Fun) ->
183
187
case Fun(Sock) of
184
188
{ok, Res} -> Res;
185
{error, Reason} -> log(error, "error on AMQP connection ~p: ~p~n",
189
{error, Reason} -> socket_error(Reason),
187
190
%% NB: this is tcp socket, even in case of ssl
188
191
rabbit_net:fast_close(Sock),
193
socket_op(Sock, fun (S) -> rabbit_net:connection_string(S, inbound) end).
196
socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end).
198
195
start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
199
196
Sock, SockTransform) ->
200
197
process_flag(trap_exit, true),
198
Name = case rabbit_net:connection_string(Sock, inbound) of
200
{error, enotconn} -> rabbit_net:fast_close(Sock),
202
{error, Reason} -> socket_error(Reason),
203
rabbit_net:fast_close(Sock),
202
206
log(info, "accepting AMQP connection ~p (~s)~n", [self(), Name]),
203
207
ClientSock = socket_op(Sock, SockTransform),
204
208
erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), handshake_timeout),
205
{PeerHost, PeerPort, Host, Port} = socket_ends(Sock),
209
{PeerHost, PeerPort, Host, Port} =
210
socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end),
206
211
State = #v1{parent = Parent,
207
212
sock = ClientSock,
208
213
name = list_to_binary(Name),
278
283
mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) ->
279
284
case rabbit_net:recv(Sock) of
280
{data, Data} -> recvloop(Deb, State#v1{buf = [Data | Buf],
281
buf_len = BufLen + size(Data),
282
pending_recv = false});
283
closed -> case State#v1.connection_state of
285
_ -> throw(connection_closed_abruptly)
287
{error, Reason} -> throw({inet_error, Reason});
288
{other, Other} -> handle_other(Other, Deb, State)
286
recvloop(Deb, State#v1{buf = [Data | Buf],
287
buf_len = BufLen + size(Data),
288
pending_recv = false});
289
closed when State#v1.connection_state =:= closed ->
292
throw(connection_closed_abruptly);
294
throw({inet_error, Reason});
295
{other, {system, From, Request}} ->
296
sys:handle_system_msg(Request, From, State#v1.parent,
297
?MODULE, Deb, State);
299
case handle_other(Other, State) of
301
NewState -> recvloop(Deb, NewState)
291
handle_other({conserve_resources, Conserve}, Deb, State) ->
292
recvloop(Deb, control_throttle(State#v1{conserve_resources = Conserve}));
293
handle_other({channel_closing, ChPid}, Deb, State) ->
305
handle_other({conserve_resources, Conserve}, State) ->
306
control_throttle(State#v1{conserve_resources = Conserve});
307
handle_other({channel_closing, ChPid}, State) ->
294
308
ok = rabbit_channel:ready_for_close(ChPid),
295
309
channel_cleanup(ChPid),
296
mainloop(Deb, maybe_close(control_throttle(State)));
297
handle_other({'EXIT', Parent, Reason}, _Deb, State = #v1{parent = Parent}) ->
310
maybe_close(control_throttle(State));
311
handle_other({'EXIT', Parent, Reason}, State = #v1{parent = Parent}) ->
298
312
terminate(io_lib:format("broker forced connection closure "
299
313
"with reason '~w'", [Reason]), State),
300
314
%% this is what we are expected to do according to
306
320
%% initiated by our parent it is probably more important to exit
309
handle_other({channel_exit, _Channel, E = {writer, send_failed, _Error}},
323
handle_other({channel_exit, _Channel, E = {writer, send_failed, _E}}, _State) ->
312
handle_other({channel_exit, Channel, Reason}, Deb, State) ->
313
mainloop(Deb, handle_exception(State, Channel, Reason));
314
handle_other({'DOWN', _MRef, process, ChPid, Reason}, Deb, State) ->
315
mainloop(Deb, handle_dependent_exit(ChPid, Reason, State));
316
handle_other(terminate_connection, _Deb, State) ->
318
handle_other(handshake_timeout, Deb, State)
325
handle_other({channel_exit, Channel, Reason}, State) ->
326
handle_exception(State, Channel, Reason);
327
handle_other({'DOWN', _MRef, process, ChPid, Reason}, State) ->
328
handle_dependent_exit(ChPid, Reason, State);
329
handle_other(terminate_connection, _State) ->
331
handle_other(handshake_timeout, State)
319
332
when ?IS_RUNNING(State) orelse
320
333
State#v1.connection_state =:= closing orelse
321
334
State#v1.connection_state =:= closed ->
322
mainloop(Deb, State);
323
handle_other(handshake_timeout, _Deb, State) ->
336
handle_other(handshake_timeout, State) ->
324
337
throw({handshake_timeout, State#v1.callback});
325
handle_other(heartbeat_timeout, Deb, State = #v1{connection_state = closed}) ->
326
mainloop(Deb, State);
327
handle_other(heartbeat_timeout, _Deb, #v1{connection_state = S}) ->
338
handle_other(heartbeat_timeout, State = #v1{connection_state = closed}) ->
340
handle_other(heartbeat_timeout, #v1{connection_state = S}) ->
328
341
throw({heartbeat_timeout, S});
329
handle_other({'$gen_call', From, {shutdown, Explanation}}, Deb, State) ->
342
handle_other({'$gen_call', From, {shutdown, Explanation}}, State) ->
330
343
{ForceTermination, NewState} = terminate(Explanation, State),
331
344
gen_server:reply(From, ok),
332
345
case ForceTermination of
334
normal -> mainloop(Deb, NewState)
336
handle_other({'$gen_call', From, info}, Deb, State) ->
349
handle_other({'$gen_call', From, info}, State) ->
337
350
gen_server:reply(From, infos(?INFO_KEYS, State)),
338
mainloop(Deb, State);
339
handle_other({'$gen_call', From, {info, Items}}, Deb, State) ->
352
handle_other({'$gen_call', From, {info, Items}}, State) ->
340
353
gen_server:reply(From, try {ok, infos(Items, State)}
341
354
catch Error -> {error, Error}
343
mainloop(Deb, State);
344
handle_other({'$gen_cast', force_event_refresh}, Deb, State)
357
handle_other({'$gen_cast', force_event_refresh}, State)
345
358
when ?IS_RUNNING(State) ->
346
359
rabbit_event:notify(connection_created,
347
360
[{type, network} | infos(?CREATION_EVENT_KEYS, State)]),
348
mainloop(Deb, State);
349
handle_other({'$gen_cast', force_event_refresh}, Deb, State) ->
362
handle_other({'$gen_cast', force_event_refresh}, State) ->
350
363
%% Ignore, we will emit a created event once we start running.
351
mainloop(Deb, State);
352
handle_other(ensure_stats, Deb, State) ->
353
mainloop(Deb, ensure_stats_timer(State));
354
handle_other(emit_stats, Deb, State) ->
355
mainloop(Deb, emit_stats(State));
356
handle_other({system, From, Request}, Deb, State = #v1{parent = Parent}) ->
357
sys:handle_system_msg(Request, From, Parent, ?MODULE, Deb, State);
358
handle_other({bump_credit, Msg}, Deb, State) ->
365
handle_other(ensure_stats, State) ->
366
ensure_stats_timer(State);
367
handle_other(emit_stats, State) ->
369
handle_other({bump_credit, Msg}, State) ->
359
370
credit_flow:handle_bump_msg(Msg),
360
recvloop(Deb, control_throttle(State));
361
handle_other(Other, _Deb, _State) ->
371
control_throttle(State);
372
handle_other(Other, _State) ->
362
373
%% internal error -> something worth dying for
363
374
exit({unexpected_message, Other}).