~ubuntu-branches/ubuntu/saucy/rabbitmq-server/saucy

« back to all changes in this revision

Viewing changes to plugins-src/rabbitmq-mqtt/src/rabbit_mqtt_collector.erl

  • Committer: Package Import Robot
  • Author(s): Emile Joubert
  • Date: 2012-11-19 11:42:31 UTC
  • mfrom: (0.2.18) (0.1.32 sid)
  • Revision ID: package-import@ubuntu.com-20121119114231-hvapkn4akng09etr
Tags: 3.0.0-1
New upstream release

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
%% The contents of this file are subject to the Mozilla Public License
 
2
%% Version 1.1 (the "License"); you may not use this file except in
 
3
%% compliance with the License. You may obtain a copy of the License
 
4
%% at http://www.mozilla.org/MPL/
 
5
%%
 
6
%% Software distributed under the License is distributed on an "AS IS"
 
7
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
 
8
%% the License for the specific language governing rights and
 
9
%% limitations under the License.
 
10
%%
 
11
%% The Original Code is RabbitMQ.
 
12
%%
 
13
%% The Initial Developer of the Original Code is VMware, Inc.
 
14
%% Copyright (c) 2007-2012 VMware, Inc.  All rights reserved.
 
15
%%
 
16
 
 
17
-module(rabbit_mqtt_collector).
 
18
 
 
19
-behaviour(gen_server).
 
20
 
 
21
-export([start_link/0, register/2, unregister/1]).
 
22
 
 
23
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
 
24
         terminate/2, code_change/3]).
 
25
 
 
26
-record(state, {client_ids}).
 
27
 
 
28
-define(SERVER, ?MODULE).
 
29
 
 
30
%%----------------------------------------------------------------------------
 
31
 
 
32
start_link() ->
 
33
    gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
 
34
 
 
35
register(ClientId, Pid) ->
 
36
    gen_server:call(rabbit_mqtt_collector, {register, ClientId, Pid}, infinity).
 
37
 
 
38
unregister(ClientId) ->
 
39
    gen_server:call(rabbit_mqtt_collector, {unregister, ClientId}, infinity).
 
40
 
 
41
%%----------------------------------------------------------------------------
 
42
 
 
43
init([]) ->
 
44
    {ok, #state{client_ids = dict:new()}}. % clientid -> {pid, monitor}
 
45
 
 
46
%%--------------------------------------------------------------------------
 
47
 
 
48
handle_call({register, ClientId, Pid}, _From,
 
49
            State = #state{client_ids = Ids}) ->
 
50
    Ids1 = case dict:find(ClientId, Ids) of
 
51
               {ok, {OldPid, MRef}} when Pid =/= OldPid ->
 
52
                   catch gen_server2:call(OldPid, duplicate_id),
 
53
                   erlang:demonitor(MRef),
 
54
                   dict:erase(ClientId, Ids);
 
55
               error ->
 
56
                   Ids
 
57
           end,
 
58
    Ids2 = dict:store(ClientId, {Pid, erlang:monitor(process, Pid)}, Ids1),
 
59
    {reply, ok, State#state{client_ids = Ids2}};
 
60
 
 
61
handle_call({unregister, ClientId}, _From, State = #state{client_ids = Ids}) ->
 
62
    {Reply, Ids1} = case dict:find(ClientId, Ids) of
 
63
                        {ok, {_Pid, MRef}} -> erlang:demonitor(MRef),
 
64
                                              {ok, dict:erase(ClientId, Ids)};
 
65
                        error              -> {not_registered, Ids}
 
66
                    end,
 
67
    {reply, Reply, State#state{ client_ids = Ids1 }};
 
68
 
 
69
handle_call(Msg, _From, State) ->
 
70
    {stop, {unhandled_call, Msg}, State}.
 
71
 
 
72
handle_cast(Msg, State) ->
 
73
    {stop, {unhandled_cast, Msg}, State}.
 
74
 
 
75
handle_info({'DOWN', MRef, process, DownPid, _Reason},
 
76
            State = #state{client_ids = Ids}) ->
 
77
    Ids1 = dict:filter(fun (ClientId, {Pid, M})
 
78
                           when Pid =:= DownPid, MRef =:= M ->
 
79
                               rabbit_log:warning("MQTT disconnect from ~p~n",
 
80
                                                  [ClientId]),
 
81
                               false;
 
82
                           (_, _) ->
 
83
                               true
 
84
                       end, Ids),
 
85
    {noreply, State #state{ client_ids = Ids1 }}.
 
86
 
 
87
terminate(_Reason, _State) ->
 
88
    ok.
 
89
 
 
90
code_change(_OldVsn, State, _Extra) ->
 
91
    {ok, State}.