1
%% Copyright (c) 2011, Loïc Hoguin <essen@dev-extend.eu>
3
%% Permission to use, copy, modify, and/or distribute this software for any
4
%% purpose with or without fee is hereby granted, provided that the above
5
%% copyright notice and this permission notice appear in all copies.
7
%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
8
%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
9
%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
10
%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
11
%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
12
%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
13
%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
15
%% @doc Public API for managing listeners.
16
-module(cowboy_listener).
17
-behaviour(gen_server).
19
-export([start_link/0, stop/1,
20
add_connection/3, move_connection/3, remove_connection/2, wait/3]). %% API.
21
-export([init/1, handle_call/3, handle_cast/2,
22
handle_info/2, terminate/2, code_change/3]). %% gen_server.
25
req_pools = [] :: [{atom(), non_neg_integer()}],
34
%% We set the process priority to high because cowboy_listener is the central
35
%% gen_server in Cowboy and is used to manage all the incoming connections.
36
%% Setting the process priority to high ensures the connection-related code
37
%% will always be executed when a connection needs it, allowing Cowboy to
38
%% scale far beyond what it would with a normal priority.
39
-spec start_link() -> {ok, pid()}.
41
gen_server:start_link(?MODULE, [], [{spawn_opt, [{priority, high}]}]).
44
-spec stop(pid()) -> stopped.
46
gen_server:call(ServerPid, stop).
48
%% @doc Add a connection to the given pool in the listener.
50
%% Pools of connections are used to restrict the maximum number of connections
51
%% depending on their type. By default, Cowboy add all connections to the
52
%% pool <em>default</em>. It also checks for the maximum number of connections
53
%% in that pool before accepting again.
55
%% When a process managing a connection dies, the process is removed from the
56
%% pool. If the socket has been sent to another process, it is up to the
57
%% protocol code to inform the listener of the new <em>ConnPid</em> by removing
58
%% the previous and adding the new one.
59
-spec add_connection(pid(), atom(), pid()) -> {ok, non_neg_integer()}.
60
add_connection(ServerPid, Pool, ConnPid) ->
61
gen_server:call(ServerPid, {add_connection, Pool, ConnPid}).
63
%% @doc Move a connection from one pool to another.
64
-spec move_connection(pid(), atom(), pid()) -> ok.
65
move_connection(ServerPid, DestPool, ConnPid) ->
66
gen_server:cast(ServerPid, {move_connection, DestPool, ConnPid}).
68
%% @doc Remove the given connection from its pool.
69
-spec remove_connection(pid(), pid()) -> ok.
70
remove_connection(ServerPid, ConnPid) ->
71
gen_server:cast(ServerPid, {remove_connection, ConnPid}).
73
%% @doc Wait until the number of connections in the given pool gets below
74
%% the given threshold.
76
%% This function will not return until the number of connections in the pool
77
%% gets below <em>MaxConns</em>. It makes use of <em>gen_server:reply/2</em>
78
%% to make the process wait for a reply indefinitely.
79
-spec wait(pid(), atom(), non_neg_integer()) -> ok.
80
wait(ServerPid, Pool, MaxConns) ->
81
gen_server:call(ServerPid, {wait, Pool, MaxConns}, infinity).
86
-spec init([]) -> {ok, #state{}}.
88
ReqsTablePid = ets:new(requests_table, [set, private]),
89
{ok, #state{reqs_table=ReqsTablePid}}.
92
-spec handle_call(_, _, State)
93
-> {reply, ignored, State} | {stop, normal, stopped, State}.
94
handle_call({add_connection, Pool, ConnPid}, _From, State=#state{
95
req_pools=Pools, reqs_table=ReqsTable}) ->
96
MonitorRef = erlang:monitor(process, ConnPid),
97
{NbConnsRet, Pools2} = case lists:keyfind(Pool, 1, Pools) of
99
{1, [{Pool, 1}|Pools]};
101
NbConns2 = NbConns + 1,
102
{NbConns2, [{Pool, NbConns2}|lists:keydelete(Pool, 1, Pools)]}
104
ets:insert(ReqsTable, {ConnPid, {MonitorRef, Pool}}),
105
{reply, {ok, NbConnsRet}, State#state{req_pools=Pools2}};
106
handle_call({wait, Pool, MaxConns}, From, State=#state{
107
req_pools=Pools, queue=Queue}) ->
108
case lists:keyfind(Pool, 1, Pools) of
109
{Pool, NbConns} when NbConns > MaxConns ->
110
{noreply, State#state{queue=[From|Queue]}};
114
handle_call(stop, _From, State) ->
115
{stop, normal, stopped, State};
116
handle_call(_Request, _From, State) ->
117
{reply, ignored, State}.
120
-spec handle_cast(_, State) -> {noreply, State}.
121
handle_cast({move_connection, DestPool, ConnPid}, State=#state{
122
req_pools=Pools, reqs_table=ReqsTable}) ->
123
{MonitorRef, SrcPool} = ets:lookup_element(ReqsTable, ConnPid, 2),
124
ets:insert(ReqsTable, {ConnPid, {MonitorRef, DestPool}}),
125
{SrcPool, SrcNbConns} = lists:keyfind(SrcPool, 1, Pools),
126
DestNbConns = case lists:keyfind(DestPool, 1, Pools) of
128
{DestPool, NbConns} -> NbConns + 1
130
Pools2 = lists:keydelete(SrcPool, 1, lists:keydelete(DestPool, 1, Pools)),
131
Pools3 = [{SrcPool, SrcNbConns - 1}, {DestPool, DestNbConns}|Pools2],
132
{noreply, State#state{req_pools=Pools3}};
133
handle_cast({remove_connection, ConnPid}, State) ->
134
State2 = remove_pid(ConnPid, State),
136
handle_cast(_Msg, State) ->
140
-spec handle_info(_, State) -> {noreply, State}.
141
handle_info({'DOWN', _Ref, process, Pid, _Info}, State) ->
142
State2 = remove_pid(Pid, State),
144
handle_info(_Info, State) ->
148
-spec terminate(_, _) -> ok.
149
terminate(_Reason, _State) ->
153
-spec code_change(_, State, _) -> {ok, State}.
154
code_change(_OldVsn, State, _Extra) ->
160
-spec remove_pid(pid(), State) -> State.
161
remove_pid(Pid, State=#state{
162
req_pools=Pools, reqs_table=ReqsTable, queue=Queue}) ->
163
{MonitorRef, Pool} = ets:lookup_element(ReqsTable, Pid, 2),
164
erlang:demonitor(MonitorRef, [flush]),
165
{Pool, NbConns} = lists:keyfind(Pool, 1, Pools),
166
Pools2 = [{Pool, NbConns - 1}|lists:keydelete(Pool, 1, Pools)],
167
ets:delete(ReqsTable, Pid),
170
State#state{req_pools=Pools2};
172
gen_server:reply(Client, ok),
173
State#state{req_pools=Pools2, queue=Queue2}