1
%% The contents of this file are subject to the Mozilla Public License
2
%% Version 1.1 (the "License"); you may not use this file except in
3
%% compliance with the License. You may obtain a copy of the License
4
%% at http://www.mozilla.org/MPL/
6
%% Software distributed under the License is distributed on an "AS IS"
7
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
8
%% the License for the specific language governing rights and
9
%% limitations under the License.
11
%% The Original Code is RabbitMQ.
13
%% The Initial Developer of the Original Code is VMware, Inc.
14
%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
17
-module(rabbit_mqtt_collector).
19
-behaviour(gen_server).
21
-export([start_link/0, register/2, unregister/1]).
23
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
24
terminate/2, code_change/3]).
26
-record(state, {client_ids}).
28
-define(SERVER, ?MODULE).
30
%%----------------------------------------------------------------------------
33
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
35
register(ClientId, Pid) ->
36
gen_server:call(rabbit_mqtt_collector, {register, ClientId, Pid}, infinity).
38
unregister(ClientId) ->
39
gen_server:call(rabbit_mqtt_collector, {unregister, ClientId}, infinity).
41
%%----------------------------------------------------------------------------
44
{ok, #state{client_ids = dict:new()}}. % clientid -> {pid, monitor}
46
%%--------------------------------------------------------------------------
48
handle_call({register, ClientId, Pid}, _From,
49
State = #state{client_ids = Ids}) ->
50
Ids1 = case dict:find(ClientId, Ids) of
51
{ok, {OldPid, MRef}} when Pid =/= OldPid ->
52
catch gen_server2:call(OldPid, duplicate_id),
53
erlang:demonitor(MRef),
54
dict:erase(ClientId, Ids);
58
Ids2 = dict:store(ClientId, {Pid, erlang:monitor(process, Pid)}, Ids1),
59
{reply, ok, State#state{client_ids = Ids2}};
61
handle_call({unregister, ClientId}, _From, State = #state{client_ids = Ids}) ->
62
{Reply, Ids1} = case dict:find(ClientId, Ids) of
63
{ok, {_Pid, MRef}} -> erlang:demonitor(MRef),
64
{ok, dict:erase(ClientId, Ids)};
65
error -> {not_registered, Ids}
67
{reply, Reply, State#state{ client_ids = Ids1 }};
69
handle_call(Msg, _From, State) ->
70
{stop, {unhandled_call, Msg}, State}.
72
handle_cast(Msg, State) ->
73
{stop, {unhandled_cast, Msg}, State}.
75
handle_info({'DOWN', MRef, process, DownPid, _Reason},
76
State = #state{client_ids = Ids}) ->
77
Ids1 = dict:filter(fun (ClientId, {Pid, M})
78
when Pid =:= DownPid, MRef =:= M ->
79
rabbit_log:warning("MQTT disconnect from ~p~n",
85
{noreply, State #state{ client_ids = Ids1 }}.
87
terminate(_Reason, _State) ->
90
code_change(_OldVsn, State, _Extra) ->