~ubuntu-branches/ubuntu/precise/jcsp/precise

« back to all changes in this revision

Viewing changes to src/org/jcsp/net2/NetAltingConnectionClient.java

  • Committer: Bazaar Package Importer
  • Author(s): Miguel Landaeta
  • Date: 2010-06-20 18:12:26 UTC
  • Revision ID: james.westby@ubuntu.com-20100620181226-8yg8d9rjjjiuy7oz
Tags: upstream-1.1-rc4
ImportĀ upstreamĀ versionĀ 1.1-rc4

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
package org.jcsp.net2;
 
2
 
 
3
import java.io.IOException;
 
4
 
 
5
import org.jcsp.lang.AltingChannelInput;
 
6
import org.jcsp.lang.AltingConnectionClient;
 
7
import org.jcsp.lang.Any2OneChannel;
 
8
import org.jcsp.lang.Channel;
 
9
import org.jcsp.lang.ChannelOutput;
 
10
import org.jcsp.util.InfiniteBuffer;
 
11
 
 
12
public final class NetAltingConnectionClient
 
13
    extends AltingConnectionClient
 
14
    implements NetConnectionClient
 
15
{
 
16
    private final AltingChannelInput in;
 
17
 
 
18
    private final ChannelOutput toLinkTX;
 
19
 
 
20
    private final Link linkConnectedTo;
 
21
 
 
22
    private final NetConnectionLocation serverLocation;
 
23
 
 
24
    private final NetConnectionLocation localLocation;
 
25
 
 
26
    private final ConnectionData localConnection;
 
27
 
 
28
    private final boolean isLocal;
 
29
 
 
30
    private final NetworkMessageFilter.FilterTx outputFilter;
 
31
 
 
32
    private final NetworkMessageFilter.FilterRx inputFilter;
 
33
 
 
34
    private final ConnectionData data;
 
35
 
 
36
    static NetAltingConnectionClient create(NetConnectionLocation loc, NetworkMessageFilter.FilterTx filterTX,
 
37
            NetworkMessageFilter.FilterRx filterRX)
 
38
        throws JCSPNetworkException
 
39
    {
 
40
        // Create the connection data structure
 
41
        ConnectionData data = new ConnectionData();
 
42
 
 
43
        // Create channel linking this to the Link level. This channel is used to receive response and
 
44
        // acknowledgement messages
 
45
        Any2OneChannel chan = Channel.any2one(new InfiniteBuffer());
 
46
        data.toConnection = chan.out();
 
47
 
 
48
        // Set state of connection
 
49
        data.state = ConnectionDataState.CLIENT_STATE_CLOSED;
 
50
 
 
51
        ConnectionManager.getInstance().create(data);
 
52
 
 
53
        ChannelOutput toLink;
 
54
 
 
55
        if (loc.getNodeID().equals(Node.getInstance().getNodeID()))
 
56
        {
 
57
            toLink = ConnectionManager.getInstance().getConnection(loc.getVConnN()).toConnection;
 
58
            return new NetAltingConnectionClient(chan.in(), toLink, null, data, loc, filterTX, filterRX);
 
59
        }
 
60
 
 
61
        Link link = LinkManager.getInstance().requestLink(loc.getNodeID());
 
62
 
 
63
        if (link == null)
 
64
        {
 
65
            link = LinkFactory.getLink(loc.getNodeID());
 
66
        }
 
67
 
 
68
        toLink = link.getTxChannel();
 
69
 
 
70
        return new NetAltingConnectionClient(chan.in(), toLink, link, data, loc, filterTX, filterRX);
 
71
    }
 
72
 
 
73
    private NetAltingConnectionClient(AltingChannelInput input, ChannelOutput toLink, Link link,
 
74
            ConnectionData connData, NetConnectionLocation loc, NetworkMessageFilter.FilterTx filterTX,
 
75
            NetworkMessageFilter.FilterRx filterRX)
 
76
    {
 
77
        super(input);
 
78
        this.toLinkTX = toLink;
 
79
        this.in = input;
 
80
        this.data = connData;
 
81
        this.serverLocation = loc;
 
82
        this.localLocation = new NetConnectionLocation(Node.getInstance().getNodeID(), connData.vconnn);
 
83
        this.outputFilter = filterTX;
 
84
        this.inputFilter = filterRX;
 
85
 
 
86
        if (link != null)
 
87
        {
 
88
            this.linkConnectedTo = link;
 
89
            // TODO: registration stuff
 
90
 
 
91
            this.isLocal = false;
 
92
            this.localConnection = null;
 
93
        }
 
94
        else
 
95
        {
 
96
            this.isLocal = true;
 
97
            this.localConnection = ConnectionManager.getInstance().getConnection(this.serverLocation.getVConnN());
 
98
            this.linkConnectedTo = null;
 
99
        }
 
100
    }
 
101
 
 
102
    public boolean isOpen()
 
103
        throws IllegalStateException, JCSPNetworkException
 
104
    {
 
105
        if (this.data.state == ConnectionDataState.CLIENT_STATE_MADE_REQ)
 
106
            throw new IllegalStateException("Can only call isOpen after a reply has been received from the server");
 
107
        if (this.data.state == ConnectionDataState.DESTROYED)
 
108
            throw new JCSPNetworkException("Client connection end has been destroyed");
 
109
        if (this.data.state == ConnectionDataState.BROKEN)
 
110
            throw new JCSPNetworkException("Client connection end has broken");
 
111
        return this.data.state == ConnectionDataState.CLIENT_STATE_OPEN;
 
112
    }
 
113
 
 
114
    public Object reply()
 
115
        throws IllegalStateException, JCSPNetworkException
 
116
    {
 
117
        if (this.data.state != ConnectionDataState.CLIENT_STATE_MADE_REQ)
 
118
            throw new IllegalStateException("Can only call reply() after a request()");
 
119
        if (this.data.state == ConnectionDataState.DESTROYED)
 
120
            throw new JCSPNetworkException("Client connection end has been destroyed");
 
121
        if (this.data.state == ConnectionDataState.BROKEN)
 
122
            throw new JCSPNetworkException("Client connection end has broken");
 
123
 
 
124
        while (true)
 
125
        {
 
126
            NetworkMessage msg = (NetworkMessage)this.in.read();
 
127
 
 
128
            try
 
129
            {
 
130
                synchronized (this.data)
 
131
                {
 
132
                    switch (msg.type)
 
133
                    {
 
134
                        case NetworkProtocol.REPLY:
 
135
                        {
 
136
                            Object toReturn = this.inputFilter.filterRX(msg.data);
 
137
                            NetworkMessage ack = new NetworkMessage();
 
138
                            ack.type = NetworkProtocol.REPLY_ACK;
 
139
                            ack.attr1 = msg.attr2;
 
140
                            ack.attr2 = -1;
 
141
                            this.toLinkTX.write(ack);
 
142
                            this.data.state = ConnectionDataState.CLIENT_STATE_OPEN;
 
143
                            return toReturn;
 
144
                        }
 
145
                        case NetworkProtocol.REPLY_AND_CLOSE:
 
146
                        {
 
147
                            Object toReturn = this.inputFilter.filterRX(msg.data);
 
148
                            NetworkMessage ack = new NetworkMessage();
 
149
                            ack.attr1 = msg.attr2;
 
150
                            ack.attr2 = -1;
 
151
                            this.toLinkTX.write(ack);
 
152
                            this.data.state = ConnectionDataState.CLIENT_STATE_CLOSED;
 
153
                            return toReturn;
 
154
                        }
 
155
                        case NetworkProtocol.LINK_LOST:
 
156
                        {
 
157
                            this.data.state = ConnectionDataState.BROKEN;
 
158
                            ConnectionManager.getInstance().removeConnection(this.data);
 
159
                            throw new JCSPNetworkException("Link to server Node was lost.  Reply cannot complete");
 
160
                        }
 
161
                        case NetworkProtocol.REJECT_CONNECTION:
 
162
                        {
 
163
                            this.data.state = ConnectionDataState.BROKEN;
 
164
                            ConnectionManager.getInstance().removeConnection(this.data);
 
165
                            throw new JCSPNetworkException("Server connection rejected previous request");
 
166
                        }
 
167
                        default:
 
168
                        {
 
169
                            this.data.state = ConnectionDataState.BROKEN;
 
170
                            ConnectionManager.getInstance().removeConnection(this.data);
 
171
                            Node.err.log(this.getClass(), "Connection " + this.data.vconnn
 
172
                                                          + " received unexpected message");
 
173
                            throw new JCSPNetworkException("NetAltingConnectionClient received unexpected message");
 
174
                        }
 
175
                    }
 
176
                }
 
177
            }
 
178
            catch (IOException ioe)
 
179
            {
 
180
                throw new JCSPNetworkException("Incoming message was corrupted");
 
181
            }
 
182
        }
 
183
    }
 
184
 
 
185
    public void request(Object obj)
 
186
        throws IllegalStateException, JCSPNetworkException
 
187
    {
 
188
        if (this.data.state == ConnectionDataState.CLIENT_STATE_MADE_REQ)
 
189
            throw new IllegalStateException("Cannot call request(Object) twice without calling reply()");
 
190
        if (this.data.state == ConnectionDataState.DESTROYED)
 
191
            throw new JCSPNetworkException("Client connection end has been destroyed");
 
192
        if (this.data.state == ConnectionDataState.BROKEN)
 
193
            throw new JCSPNetworkException("Client connection end has broken");
 
194
 
 
195
        if (this.in.pending())
 
196
        {
 
197
            NetworkMessage msg = (NetworkMessage)this.in.read();
 
198
 
 
199
            synchronized (this.data)
 
200
            {
 
201
                if (msg.type == NetworkProtocol.LINK_LOST)
 
202
                {
 
203
                    this.data.state = ConnectionDataState.BROKEN;
 
204
                    ConnectionManager.getInstance().removeConnection(this.data);
 
205
                    throw new JCSPNetworkException("Link to server Node lost.  Send cannot complete");
 
206
                }
 
207
                Node.err.log(this.getClass(), "Connection " + this.data.vconnn + " reports unexpected message");
 
208
                throw new JCSPNetworkException("NetAltingConnecionClient received an unexpected message");
 
209
            }
 
210
        }
 
211
 
 
212
        NetworkMessage msg = new NetworkMessage();
 
213
 
 
214
        if (this.data.state == ConnectionDataState.CLIENT_STATE_CLOSED)
 
215
        {
 
216
            msg.type = NetworkProtocol.OPEN;
 
217
        }
 
218
        else
 
219
        {
 
220
            msg.type = NetworkProtocol.REQUEST;
 
221
        }
 
222
 
 
223
        msg.attr1 = this.serverLocation.getVConnN();
 
224
        msg.attr2 = this.data.vconnn;
 
225
 
 
226
        try
 
227
        {
 
228
            msg.data = this.outputFilter.filterTX(obj);
 
229
 
 
230
            synchronized (this.data)
 
231
            {
 
232
                this.data.state = ConnectionDataState.CLIENT_STATE_MADE_REQ;
 
233
            }
 
234
 
 
235
            if (!this.isLocal)
 
236
            {
 
237
                this.toLinkTX.write(msg);
 
238
            }
 
239
            else
 
240
            {
 
241
                synchronized (this.localConnection)
 
242
                {
 
243
                    switch (this.localConnection.state)
 
244
                    {
 
245
                        case ConnectionDataState.SERVER_STATE_CLOSED:
 
246
                        case ConnectionDataState.SERVER_STATE_OPEN:
 
247
                        case ConnectionDataState.SERVER_STATE_RECEIVED:
 
248
                            msg.toLink = this.data.toConnection;
 
249
                            this.localConnection.openServer.write(msg);
 
250
                            break;
 
251
 
 
252
                        default:
 
253
                            this.data.state = ConnectionDataState.BROKEN;
 
254
                            ConnectionManager.getInstance().removeConnection(this.data);
 
255
                            throw new JCSPNetworkException("Connection rejected during request");
 
256
                    }
 
257
                }
 
258
            }
 
259
        }
 
260
        catch (IOException ioe)
 
261
        {
 
262
            throw new JCSPNetworkException("Error when trying to convert the message for sending");
 
263
        }
 
264
 
 
265
        NetworkMessage reply = (NetworkMessage)this.in.read();
 
266
 
 
267
        if (reply.type == NetworkProtocol.REJECT_CONNECTION)
 
268
        {
 
269
            this.data.state = ConnectionDataState.BROKEN;
 
270
            ConnectionManager.getInstance().removeConnection(this.data);
 
271
            throw new JCSPNetworkException("Connection rejected during request");
 
272
        }
 
273
        else if (reply.type == NetworkProtocol.LINK_LOST)
 
274
        {
 
275
            this.data.state = ConnectionDataState.BROKEN;
 
276
            ConnectionManager.getInstance().removeConnection(this.data);
 
277
            throw new JCSPNetworkException("Link to server Node was lost.  Request cannot complete");
 
278
        }
 
279
        else if (reply.type == NetworkProtocol.REQUEST_ACK)
 
280
        {
 
281
            return;
 
282
        }
 
283
        else
 
284
        {
 
285
            Node.err.log(this.getClass(), "Connection " + this.data.vconnn + " reports unexpected message.");
 
286
            throw new JCSPNetworkException("NetAltingConnectionClient received an unexpected message");
 
287
        }
 
288
    }
 
289
 
 
290
    public void destroy()
 
291
    {
 
292
        synchronized (this.data)
 
293
        {
 
294
            this.data.state = ConnectionDataState.DESTROYED;
 
295
            ConnectionManager.getInstance().removeConnection(this.data);
 
296
            // TODO: deregistration from link
 
297
 
 
298
            // Deal with left over messages
 
299
            while (this.in.pending())
 
300
            {
 
301
                NetworkMessage msg = (NetworkMessage)this.in.read();
 
302
                switch (msg.type)
 
303
                {
 
304
                    case NetworkProtocol.REPLY:
 
305
                    case NetworkProtocol.REPLY_AND_CLOSE:
 
306
                    {
 
307
                        NetworkMessage reply = new NetworkMessage();
 
308
                        reply.type = NetworkProtocol.REJECT_CONNECTION;
 
309
                        reply.attr1 = msg.attr2;
 
310
                        reply.attr2 = -1;
 
311
                        this.toLinkTX.write(reply);
 
312
                        break;
 
313
                    }
 
314
 
 
315
                    case NetworkProtocol.REQUEST_ACK:
 
316
                    {
 
317
                        NetworkMessage reply = new NetworkMessage();
 
318
                        reply.type = NetworkProtocol.REJECT_CONNECTION;
 
319
                        reply.attr1 = msg.attr1;
 
320
                        reply.attr2 = -1;
 
321
                        this.toLinkTX.write(reply);
 
322
                        break;
 
323
                    }
 
324
 
 
325
                    default:
 
326
                        break;
 
327
                }
 
328
            }
 
329
        }
 
330
    }
 
331
 
 
332
    public NetLocation getLocation()
 
333
    {
 
334
        return this.serverLocation;
 
335
    }
 
336
 
 
337
    NetConnectionLocation getLocalLocation()
 
338
    {
 
339
        return this.localLocation;
 
340
    }
 
341
 
 
342
    final ConnectionData getConnectionData()
 
343
    {
 
344
        return this.data;
 
345
    }
 
346
}