~ubuntu-branches/ubuntu/saucy/rabbitmq-server/saucy

« back to all changes in this revision

Viewing changes to plugins-src/cowboy-wrapper/cowboy-git/src/cowboy_listener.erl

  • Committer: Package Import Robot
  • Author(s): Emile Joubert
  • Date: 2012-11-19 11:42:31 UTC
  • mfrom: (0.2.18) (0.1.32 sid)
  • Revision ID: package-import@ubuntu.com-20121119114231-hvapkn4akng09etr
Tags: 3.0.0-1
New upstream release

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
%% Copyright (c) 2011, Loïc Hoguin <essen@dev-extend.eu>
 
2
%%
 
3
%% Permission to use, copy, modify, and/or distribute this software for any
 
4
%% purpose with or without fee is hereby granted, provided that the above
 
5
%% copyright notice and this permission notice appear in all copies.
 
6
%%
 
7
%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
 
8
%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
 
9
%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
 
10
%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
 
11
%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
 
12
%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
 
13
%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
 
14
 
 
15
%% @doc Public API for managing listeners.
 
16
-module(cowboy_listener).
 
17
-behaviour(gen_server).
 
18
 
 
19
-export([start_link/0, stop/1,
 
20
        add_connection/3, move_connection/3, remove_connection/2, wait/3]). %% API.
 
21
-export([init/1, handle_call/3, handle_cast/2,
 
22
        handle_info/2, terminate/2, code_change/3]). %% gen_server.
 
23
 
 
24
-record(state, {
 
25
        req_pools = [] :: [{atom(), non_neg_integer()}],
 
26
        reqs_table,
 
27
        queue = []
 
28
}).
 
29
 
 
30
%% API.
 
31
 
 
32
%% @private
 
33
%%
 
34
%% We set the process priority to high because cowboy_listener is the central
 
35
%% gen_server in Cowboy and is used to manage all the incoming connections.
 
36
%% Setting the process priority to high ensures the connection-related code
 
37
%% will always be executed when a connection needs it, allowing Cowboy to
 
38
%% scale far beyond what it would with a normal priority.
 
39
-spec start_link() -> {ok, pid()}.
 
40
start_link() ->
 
41
        gen_server:start_link(?MODULE, [], [{spawn_opt, [{priority, high}]}]).
 
42
 
 
43
%% @private
 
44
-spec stop(pid()) -> stopped.
 
45
stop(ServerPid) ->
 
46
        gen_server:call(ServerPid, stop).
 
47
 
 
48
%% @doc Add a connection to the given pool in the listener.
 
49
%%
 
50
%% Pools of connections are used to restrict the maximum number of connections
 
51
%% depending on their type. By default, Cowboy add all connections to the
 
52
%% pool <em>default</em>. It also checks for the maximum number of connections
 
53
%% in that pool before accepting again.
 
54
%%
 
55
%% When a process managing a connection dies, the process is removed from the
 
56
%% pool. If the socket has been sent to another process, it is up to the
 
57
%% protocol code to inform the listener of the new <em>ConnPid</em> by removing
 
58
%% the previous and adding the new one.
 
59
-spec add_connection(pid(), atom(), pid()) -> {ok, non_neg_integer()}.
 
60
add_connection(ServerPid, Pool, ConnPid) ->
 
61
        gen_server:call(ServerPid, {add_connection, Pool, ConnPid}).
 
62
 
 
63
%% @doc Move a connection from one pool to another.
 
64
-spec move_connection(pid(), atom(), pid()) -> ok.
 
65
move_connection(ServerPid, DestPool, ConnPid) ->
 
66
        gen_server:cast(ServerPid, {move_connection, DestPool, ConnPid}).
 
67
 
 
68
%% @doc Remove the given connection from its pool.
 
69
-spec remove_connection(pid(), pid()) -> ok.
 
70
remove_connection(ServerPid, ConnPid) ->
 
71
        gen_server:cast(ServerPid, {remove_connection, ConnPid}).
 
72
 
 
73
%% @doc Wait until the number of connections in the given pool gets below
 
74
%% the given threshold.
 
75
%%
 
76
%% This function will not return until the number of connections in the pool
 
77
%% gets below <em>MaxConns</em>. It makes use of <em>gen_server:reply/2</em>
 
78
%% to make the process wait for a reply indefinitely.
 
79
-spec wait(pid(), atom(), non_neg_integer()) -> ok.
 
80
wait(ServerPid, Pool, MaxConns) ->
 
81
        gen_server:call(ServerPid, {wait, Pool, MaxConns}, infinity).
 
82
 
 
83
%% gen_server.
 
84
 
 
85
%% @private
 
86
-spec init([]) -> {ok, #state{}}.
 
87
init([]) ->
 
88
        ReqsTablePid = ets:new(requests_table, [set, private]),
 
89
        {ok, #state{reqs_table=ReqsTablePid}}.
 
90
 
 
91
%% @private
 
92
-spec handle_call(_, _, State)
 
93
        -> {reply, ignored, State} | {stop, normal, stopped, State}.
 
94
handle_call({add_connection, Pool, ConnPid}, _From, State=#state{
 
95
                req_pools=Pools, reqs_table=ReqsTable}) ->
 
96
        MonitorRef = erlang:monitor(process, ConnPid),
 
97
        {NbConnsRet, Pools2} = case lists:keyfind(Pool, 1, Pools) of
 
98
                false ->
 
99
                        {1, [{Pool, 1}|Pools]};
 
100
                {Pool, NbConns} ->
 
101
                        NbConns2 = NbConns + 1,
 
102
                        {NbConns2, [{Pool, NbConns2}|lists:keydelete(Pool, 1, Pools)]}
 
103
        end,
 
104
        ets:insert(ReqsTable, {ConnPid, {MonitorRef, Pool}}),
 
105
        {reply, {ok, NbConnsRet}, State#state{req_pools=Pools2}};
 
106
handle_call({wait, Pool, MaxConns}, From, State=#state{
 
107
                req_pools=Pools, queue=Queue}) ->
 
108
        case lists:keyfind(Pool, 1, Pools) of
 
109
                {Pool, NbConns} when NbConns > MaxConns ->
 
110
                        {noreply, State#state{queue=[From|Queue]}};
 
111
                _Any ->
 
112
                        {reply, ok, State}
 
113
        end;
 
114
handle_call(stop, _From, State) ->
 
115
        {stop, normal, stopped, State};
 
116
handle_call(_Request, _From, State) ->
 
117
        {reply, ignored, State}.
 
118
 
 
119
%% @private
 
120
-spec handle_cast(_, State) -> {noreply, State}.
 
121
handle_cast({move_connection, DestPool, ConnPid}, State=#state{
 
122
                req_pools=Pools, reqs_table=ReqsTable}) ->
 
123
        {MonitorRef, SrcPool} = ets:lookup_element(ReqsTable, ConnPid, 2),
 
124
        ets:insert(ReqsTable, {ConnPid, {MonitorRef, DestPool}}),
 
125
        {SrcPool, SrcNbConns} = lists:keyfind(SrcPool, 1, Pools),
 
126
        DestNbConns = case lists:keyfind(DestPool, 1, Pools) of
 
127
                false -> 1;
 
128
                {DestPool, NbConns} -> NbConns + 1
 
129
        end,
 
130
        Pools2 = lists:keydelete(SrcPool, 1, lists:keydelete(DestPool, 1, Pools)),
 
131
        Pools3 = [{SrcPool, SrcNbConns - 1}, {DestPool, DestNbConns}|Pools2],
 
132
        {noreply, State#state{req_pools=Pools3}};
 
133
handle_cast({remove_connection, ConnPid}, State) ->
 
134
        State2 = remove_pid(ConnPid, State),
 
135
        {noreply, State2};
 
136
handle_cast(_Msg, State) ->
 
137
        {noreply, State}.
 
138
 
 
139
%% @private
 
140
-spec handle_info(_, State) -> {noreply, State}.
 
141
handle_info({'DOWN', _Ref, process, Pid, _Info}, State) ->
 
142
        State2 = remove_pid(Pid, State),
 
143
        {noreply, State2};
 
144
handle_info(_Info, State) ->
 
145
        {noreply, State}.
 
146
 
 
147
%% @private
 
148
-spec terminate(_, _) -> ok.
 
149
terminate(_Reason, _State) ->
 
150
        ok.
 
151
 
 
152
%% @private
 
153
-spec code_change(_, State, _) -> {ok, State}.
 
154
code_change(_OldVsn, State, _Extra) ->
 
155
        {ok, State}.
 
156
 
 
157
%% Internal.
 
158
 
 
159
%% @private
 
160
-spec remove_pid(pid(), State) -> State.
 
161
remove_pid(Pid, State=#state{
 
162
                req_pools=Pools, reqs_table=ReqsTable, queue=Queue}) ->
 
163
        {MonitorRef, Pool} = ets:lookup_element(ReqsTable, Pid, 2),
 
164
        erlang:demonitor(MonitorRef, [flush]),
 
165
        {Pool, NbConns} = lists:keyfind(Pool, 1, Pools),
 
166
        Pools2 = [{Pool, NbConns - 1}|lists:keydelete(Pool, 1, Pools)],
 
167
        ets:delete(ReqsTable, Pid),
 
168
        case Queue of
 
169
                [] ->
 
170
                        State#state{req_pools=Pools2};
 
171
                [Client|Queue2] ->
 
172
                        gen_server:reply(Client, ok),
 
173
                        State#state{req_pools=Pools2, queue=Queue2}
 
174
        end.