~ubuntu-branches/ubuntu/saucy/rabbitmq-server/saucy

« back to all changes in this revision

Viewing changes to src/rabbit_reader.erl

  • Committer: Package Import Robot
  • Author(s): Emile Joubert
  • Date: 2013-03-13 10:53:18 UTC
  • mfrom: (0.5.1) (0.1.36 sid)
  • Revision ID: package-import@ubuntu.com-20130313105318-8juqvm5209o27hbu
Tags: 3.0.4-1
New upstream release

Show diffs side-by-side

added added

removed removed

Lines of Context:
177
177
 
178
178
log(Level, Fmt, Args) -> rabbit_log:log(connection, Level, Fmt, Args).
179
179
 
 
180
socket_error(Reason) ->
 
181
    log(error, "error on AMQP connection ~p: ~p (~s)~n",
 
182
        [self(), Reason, rabbit_misc:format_inet_error(Reason)]).
 
183
 
180
184
inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
181
185
 
182
186
socket_op(Sock, Fun) ->
183
187
    case Fun(Sock) of
184
188
        {ok, Res}       -> Res;
185
 
        {error, Reason} -> log(error, "error on AMQP connection ~p: ~p~n",
186
 
                               [self(), Reason]),
 
189
        {error, Reason} -> socket_error(Reason),
187
190
                           %% NB: this is tcp socket, even in case of ssl
188
191
                           rabbit_net:fast_close(Sock),
189
192
                           exit(normal)
190
193
    end.
191
194
 
192
 
name(Sock) ->
193
 
    socket_op(Sock, fun (S) -> rabbit_net:connection_string(S, inbound) end).
194
 
 
195
 
socket_ends(Sock) ->
196
 
    socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end).
197
 
 
198
195
start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
199
196
                 Sock, SockTransform) ->
200
197
    process_flag(trap_exit, true),
201
 
    Name = name(Sock),
 
198
    Name = case rabbit_net:connection_string(Sock, inbound) of
 
199
               {ok, Str}         -> Str;
 
200
               {error, enotconn} -> rabbit_net:fast_close(Sock),
 
201
                                    exit(normal);
 
202
               {error, Reason}   -> socket_error(Reason),
 
203
                                    rabbit_net:fast_close(Sock),
 
204
                                    exit(normal)
 
205
           end,
202
206
    log(info, "accepting AMQP connection ~p (~s)~n", [self(), Name]),
203
207
    ClientSock = socket_op(Sock, SockTransform),
204
208
    erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), handshake_timeout),
205
 
    {PeerHost, PeerPort, Host, Port} = socket_ends(Sock),
 
209
    {PeerHost, PeerPort, Host, Port} =
 
210
        socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end),
206
211
    State = #v1{parent              = Parent,
207
212
                sock                = ClientSock,
208
213
                name                = list_to_binary(Name),
277
282
 
278
283
mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) ->
279
284
    case rabbit_net:recv(Sock) of
280
 
        {data, Data}    -> recvloop(Deb, State#v1{buf = [Data | Buf],
281
 
                                                  buf_len = BufLen + size(Data),
282
 
                                                  pending_recv = false});
283
 
        closed          -> case State#v1.connection_state of
284
 
                               closed -> State;
285
 
                               _      -> throw(connection_closed_abruptly)
286
 
                           end;
287
 
        {error, Reason} -> throw({inet_error, Reason});
288
 
        {other, Other}  -> handle_other(Other, Deb, State)
 
285
        {data, Data} ->
 
286
            recvloop(Deb, State#v1{buf = [Data | Buf],
 
287
                                   buf_len = BufLen + size(Data),
 
288
                                   pending_recv = false});
 
289
        closed when State#v1.connection_state =:= closed ->
 
290
            ok;
 
291
        closed ->
 
292
            throw(connection_closed_abruptly);
 
293
        {error, Reason} ->
 
294
            throw({inet_error, Reason});
 
295
        {other, {system, From, Request}} ->
 
296
            sys:handle_system_msg(Request, From, State#v1.parent,
 
297
                                  ?MODULE, Deb, State);
 
298
        {other, Other}  ->
 
299
            case handle_other(Other, State) of
 
300
                stop     -> ok;
 
301
                NewState -> recvloop(Deb, NewState)
 
302
            end
289
303
    end.
290
304
 
291
 
handle_other({conserve_resources, Conserve}, Deb, State) ->
292
 
    recvloop(Deb, control_throttle(State#v1{conserve_resources = Conserve}));
293
 
handle_other({channel_closing, ChPid}, Deb, State) ->
 
305
handle_other({conserve_resources, Conserve}, State) ->
 
306
    control_throttle(State#v1{conserve_resources = Conserve});
 
307
handle_other({channel_closing, ChPid}, State) ->
294
308
    ok = rabbit_channel:ready_for_close(ChPid),
295
309
    channel_cleanup(ChPid),
296
 
    mainloop(Deb, maybe_close(control_throttle(State)));
297
 
handle_other({'EXIT', Parent, Reason}, _Deb, State = #v1{parent = Parent}) ->
 
310
    maybe_close(control_throttle(State));
 
311
handle_other({'EXIT', Parent, Reason}, State = #v1{parent = Parent}) ->
298
312
    terminate(io_lib:format("broker forced connection closure "
299
313
                            "with reason '~w'", [Reason]), State),
300
314
    %% this is what we are expected to do according to
306
320
    %% initiated by our parent it is probably more important to exit
307
321
    %% quickly.
308
322
    exit(Reason);
309
 
handle_other({channel_exit, _Channel, E = {writer, send_failed, _Error}},
310
 
             _Deb, _State) ->
 
323
handle_other({channel_exit, _Channel, E = {writer, send_failed, _E}}, _State) ->
311
324
    throw(E);
312
 
handle_other({channel_exit, Channel, Reason}, Deb, State) ->
313
 
    mainloop(Deb, handle_exception(State, Channel, Reason));
314
 
handle_other({'DOWN', _MRef, process, ChPid, Reason}, Deb, State) ->
315
 
    mainloop(Deb, handle_dependent_exit(ChPid, Reason, State));
316
 
handle_other(terminate_connection, _Deb, State) ->
317
 
    State;
318
 
handle_other(handshake_timeout, Deb, State)
 
325
handle_other({channel_exit, Channel, Reason}, State) ->
 
326
    handle_exception(State, Channel, Reason);
 
327
handle_other({'DOWN', _MRef, process, ChPid, Reason}, State) ->
 
328
    handle_dependent_exit(ChPid, Reason, State);
 
329
handle_other(terminate_connection, _State) ->
 
330
    stop;
 
331
handle_other(handshake_timeout, State)
319
332
  when ?IS_RUNNING(State) orelse
320
333
       State#v1.connection_state =:= closing orelse
321
334
       State#v1.connection_state =:= closed ->
322
 
    mainloop(Deb, State);
323
 
handle_other(handshake_timeout, _Deb, State) ->
 
335
    State;
 
336
handle_other(handshake_timeout, State) ->
324
337
    throw({handshake_timeout, State#v1.callback});
325
 
handle_other(heartbeat_timeout, Deb, State = #v1{connection_state = closed}) ->
326
 
    mainloop(Deb, State);
327
 
handle_other(heartbeat_timeout, _Deb, #v1{connection_state = S}) ->
 
338
handle_other(heartbeat_timeout, State = #v1{connection_state = closed}) ->
 
339
    State;
 
340
handle_other(heartbeat_timeout, #v1{connection_state = S}) ->
328
341
    throw({heartbeat_timeout, S});
329
 
handle_other({'$gen_call', From, {shutdown, Explanation}}, Deb, State) ->
 
342
handle_other({'$gen_call', From, {shutdown, Explanation}}, State) ->
330
343
    {ForceTermination, NewState} = terminate(Explanation, State),
331
344
    gen_server:reply(From, ok),
332
345
    case ForceTermination of
333
 
        force  -> ok;
334
 
        normal -> mainloop(Deb, NewState)
 
346
        force  -> stop;
 
347
        normal -> NewState
335
348
    end;
336
 
handle_other({'$gen_call', From, info}, Deb, State) ->
 
349
handle_other({'$gen_call', From, info}, State) ->
337
350
    gen_server:reply(From, infos(?INFO_KEYS, State)),
338
 
    mainloop(Deb, State);
339
 
handle_other({'$gen_call', From, {info, Items}}, Deb, State) ->
 
351
    State;
 
352
handle_other({'$gen_call', From, {info, Items}}, State) ->
340
353
    gen_server:reply(From, try {ok, infos(Items, State)}
341
354
                           catch Error -> {error, Error}
342
355
                           end),
343
 
    mainloop(Deb, State);
344
 
handle_other({'$gen_cast', force_event_refresh}, Deb, State)
 
356
    State;
 
357
handle_other({'$gen_cast', force_event_refresh}, State)
345
358
  when ?IS_RUNNING(State) ->
346
359
    rabbit_event:notify(connection_created,
347
360
                        [{type, network} | infos(?CREATION_EVENT_KEYS, State)]),
348
 
    mainloop(Deb, State);
349
 
handle_other({'$gen_cast', force_event_refresh}, Deb, State) ->
 
361
    State;
 
362
handle_other({'$gen_cast', force_event_refresh}, State) ->
350
363
    %% Ignore, we will emit a created event once we start running.
351
 
    mainloop(Deb, State);
352
 
handle_other(ensure_stats, Deb, State) ->
353
 
    mainloop(Deb, ensure_stats_timer(State));
354
 
handle_other(emit_stats, Deb, State) ->
355
 
    mainloop(Deb, emit_stats(State));
356
 
handle_other({system, From, Request}, Deb, State = #v1{parent = Parent}) ->
357
 
    sys:handle_system_msg(Request, From, Parent, ?MODULE, Deb, State);
358
 
handle_other({bump_credit, Msg}, Deb, State) ->
 
364
    State;
 
365
handle_other(ensure_stats, State) ->
 
366
    ensure_stats_timer(State);
 
367
handle_other(emit_stats, State) ->
 
368
    emit_stats(State);
 
369
handle_other({bump_credit, Msg}, State) ->
359
370
    credit_flow:handle_bump_msg(Msg),
360
 
    recvloop(Deb, control_throttle(State));
361
 
handle_other(Other, _Deb, _State) ->
 
371
    control_throttle(State);
 
372
handle_other(Other, _State) ->
362
373
    %% internal error -> something worth dying for
363
374
    exit({unexpected_message, Other}).
364
375