1
% Copyright 2008 Konrad-Zuse-Zentrum f�r Informationstechnik Berlin
3
% Licensed under the Apache License, Version 2.0 (the "License");
4
% you may not use this file except in compliance with the License.
5
% You may obtain a copy of the License at
7
% http://www.apache.org/licenses/LICENSE-2.0
9
% Unless required by applicable law or agreed to in writing, software
10
% distributed under the License is distributed on an "AS IS" BASIS,
11
% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
% See the License for the specific language governing permissions and
13
% limitations under the License.
14
%%%-------------------------------------------------------------------
15
%%% File : comm_port.erl
16
%%% Author : Thorsten Schuett <schuett@zib.de>
17
%%% Description : Main CommLayer Interface
18
%%% Maps remote addresses to comm_connection PIDs.
20
%%% Created : 18 Apr 2008 by Thorsten Schuett <schuett@zib.de>
21
%%%-------------------------------------------------------------------
22
%% @author Thorsten Schuett <schuett@zib.de>
23
%% @copyright 2008 Konrad-Zuse-Zentrum f�r Informationstechnik Berlin
25
-module(comm_layer_dir.comm_port).
27
-author('schuett@zib.de').
28
-vsn('$Id: comm_port.erl,v 1.1 2009/11/06 12:41:36 maria Exp $ ').
30
-behaviour(gen_server).
41
-export([start_link/0,
43
unregister_connection/2, register_connection/4,
44
set_local_address/2, get_local_address_port/0]).
46
%% gen_server callbacks
47
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
48
terminate/2, code_change/3]).
50
%%====================================================================
52
%%====================================================================
55
%% @spec send({inet:ip_address(), int(), pid()}, term()) -> ok
57
send({Address, Port, Pid}, Message) ->
58
gen_server:call(?MODULE, {send, Address, Port, Pid, Message}, 20000).
61
send({Address, Port, Pid}, Message) ->
62
case ets:lookup(?MODULE, {Address, Port}) of
63
[{{Address, Port}, {_LPid, Socket}}] ->
64
comm_connection:send({Address, Port, Socket}, Pid, Message),
67
gen_server:call(?MODULE, {send, Address, Port, Pid, Message}, 20000)
73
%% @spec unregister_connection(inet:ip_address(), int()) -> ok
74
unregister_connection(Adress, Port) ->
75
gen_server:call(?MODULE, {unregister_conn, Adress, Port}, 20000).
78
%% @spec register_connection(inet:ip_address(), int(), pid(), gen_tcp:socket()) -> ok | duplicate
79
register_connection(Adress, Port, Pid, Socket) ->
80
gen_server:call(?MODULE, {register_conn, Adress, Port, Pid, Socket}, 20000).
83
%% @spec set_local_address(inet:ip_address(), int()) -> ok
84
set_local_address(Address, Port) ->
85
gen_server:call(?MODULE, {set_local_address, Address, Port}, 20000).
89
%% @spec get_local_address_port() -> {inet:ip_address(),int()}
90
get_local_address_port() ->
91
case ets:lookup(?MODULE, local_address_port) of
92
[{local_address_port, Value}] ->
98
%%--------------------------------------------------------------------
99
%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
100
%% Description: Starts the server
101
%%--------------------------------------------------------------------
103
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
105
%%====================================================================
106
%% gen_server callbacks
107
%%====================================================================
109
%%--------------------------------------------------------------------
110
%% Function: init(Args) -> {ok, State} |
111
%% {ok, State, Timeout} |
114
%% Description: Initiates the server
115
%%--------------------------------------------------------------------
117
ets:new(?MODULE, [set, protected, named_table]),
118
{ok, ok}. % empty state.
120
%%--------------------------------------------------------------------
121
%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
122
%% {reply, Reply, State, Timeout} |
123
%% {noreply, State} |
124
%% {noreply, State, Timeout} |
125
%% {stop, Reason, Reply, State} |
126
%% {stop, Reason, State}
127
%% Description: Handling call messages
128
%%--------------------------------------------------------------------
129
handle_call({send, Address, Port, Pid, Message}, _From, State) ->
130
send(Address, Port, Pid, Message, State);
132
handle_call({unregister_conn, Address, Port}, _From, State) ->
133
ets:delete(?MODULE, {Address, Port}),
136
handle_call({register_conn, Address, Port, Pid, Socket}, _From, State) ->
137
case ets:lookup(?MODULE, {Address, Port}) of
138
[{{Address, Port}, _}] ->
139
{reply, duplicate, State};
141
ets:insert(?MODULE, {{Address, Port}, {Pid, Socket}}),
145
handle_call({set_local_address, Address, Port}, _From, State) ->
146
ets:insert(?MODULE, {local_address_port, {Address,Port}}),
149
%%--------------------------------------------------------------------
150
%% Function: handle_cast(Msg, State) -> {noreply, State} |
151
%% {noreply, State, Timeout} |
152
%% {stop, Reason, State}
153
%% Description: Handling cast messages
154
%%--------------------------------------------------------------------
155
handle_cast(_Msg, State) ->
158
%%--------------------------------------------------------------------
159
%% Function: handle_info(Info, State) -> {noreply, State} |
160
%% {noreply, State, Timeout} |
161
%% {stop, Reason, State}
162
%% Description: Handling all non call/cast messages
163
%%--------------------------------------------------------------------
164
handle_info(_Info, State) ->
167
%%--------------------------------------------------------------------
168
%% Function: terminate(Reason, State) -> void()
169
%% Description: This function is called by a gen_server when it is about to
170
%% terminate. It should be the opposite of Module:init/1 and do any necessary
171
%% cleaning up. When it returns, the gen_server terminates with Reason.
172
%% The return value is ignored.
173
%%--------------------------------------------------------------------
174
terminate(_Reason, _State) ->
177
%%--------------------------------------------------------------------
178
%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
179
%% Description: Convert process state when code is changed
180
%%--------------------------------------------------------------------
181
code_change(_OldVsn, State, _Extra) ->
184
%%--------------------------------------------------------------------
185
%%% Internal functions
186
%%--------------------------------------------------------------------
189
send(Address, Port, Pid, Message, State) ->
190
{DepAddr,DepPort} = get_local_address_port(),
192
DepAddr == undefined ->
193
open_sync_connection(Address, Port, Pid, Message, State);
195
case ets:lookup(?MODULE, {Address, Port}) of
196
[{{Address, Port}, {ConnPid, _Socket}}] ->
197
ConnPid ! {send, Pid, Message},
200
ConnPid = comm_connection:open_new_async(Address, Port,
202
ets:insert(?MODULE, {{Address, Port}, {ConnPid, undef}}),
203
ConnPid ! {send, Pid, Message},
210
send(Address, Port, Pid, Message, State) ->
211
case ets:lookup(?MODULE, {Address, Port}) of
212
[{{Address, Port}, {_LPid, Socket}}] ->
213
comm_connection:send({Address, Port, Socket}, Pid, Message),
216
open_sync_connection(Address, Port, Pid, Message, State)
221
open_sync_connection(Address, Port, Pid, Message, State) ->
222
{DepAddr,DepPort} = get_local_address_port(),
223
case comm_connection:open_new(Address, Port, DepAddr, DepPort) of
224
{local_ip, MyIP, MyPort, MyPid, MySocket} ->
225
comm_connection:send({Address, Port, MySocket}, Pid, Message),
226
log:log(info,"[ CC ] this() == ~w", [{MyIP, MyPort}]),
227
% set_local_address(t, {MyIP,MyPort}}),
228
% register_connection(Address, Port, MyPid, MySocket),
229
ets:insert(?MODULE, {local_address_port, {MyIP,MyPort}}),
230
ets:insert(?MODULE, {{Address, Port}, {MyPid, MySocket}}),
233
% drop message (remote node not reachable, failure detector will notice)
235
{connection, LocalPid, NewSocket} ->
236
comm_connection:send({Address, Port, NewSocket}, Pid, Message),
237
ets:insert(?MODULE, {{Address, Port}, {LocalPid, NewSocket}}),
238
% register_connection(Address, Port, LPid, NewSocket),