3
import java.io.IOException;
5
import org.jcsp.lang.AltingChannelInput;
6
import org.jcsp.lang.AltingConnectionClient;
7
import org.jcsp.lang.Any2OneChannel;
8
import org.jcsp.lang.Channel;
9
import org.jcsp.lang.ChannelOutput;
10
import org.jcsp.util.InfiniteBuffer;
12
public final class NetAltingConnectionClient
13
extends AltingConnectionClient
14
implements NetConnectionClient
16
private final AltingChannelInput in;
18
private final ChannelOutput toLinkTX;
20
private final Link linkConnectedTo;
22
private final NetConnectionLocation serverLocation;
24
private final NetConnectionLocation localLocation;
26
private final ConnectionData localConnection;
28
private final boolean isLocal;
30
private final NetworkMessageFilter.FilterTx outputFilter;
32
private final NetworkMessageFilter.FilterRx inputFilter;
34
private final ConnectionData data;
36
static NetAltingConnectionClient create(NetConnectionLocation loc, NetworkMessageFilter.FilterTx filterTX,
37
NetworkMessageFilter.FilterRx filterRX)
38
throws JCSPNetworkException
40
// Create the connection data structure
41
ConnectionData data = new ConnectionData();
43
// Create channel linking this to the Link level. This channel is used to receive response and
44
// acknowledgement messages
45
Any2OneChannel chan = Channel.any2one(new InfiniteBuffer());
46
data.toConnection = chan.out();
48
// Set state of connection
49
data.state = ConnectionDataState.CLIENT_STATE_CLOSED;
51
ConnectionManager.getInstance().create(data);
55
if (loc.getNodeID().equals(Node.getInstance().getNodeID()))
57
toLink = ConnectionManager.getInstance().getConnection(loc.getVConnN()).toConnection;
58
return new NetAltingConnectionClient(chan.in(), toLink, null, data, loc, filterTX, filterRX);
61
Link link = LinkManager.getInstance().requestLink(loc.getNodeID());
65
link = LinkFactory.getLink(loc.getNodeID());
68
toLink = link.getTxChannel();
70
return new NetAltingConnectionClient(chan.in(), toLink, link, data, loc, filterTX, filterRX);
73
private NetAltingConnectionClient(AltingChannelInput input, ChannelOutput toLink, Link link,
74
ConnectionData connData, NetConnectionLocation loc, NetworkMessageFilter.FilterTx filterTX,
75
NetworkMessageFilter.FilterRx filterRX)
78
this.toLinkTX = toLink;
81
this.serverLocation = loc;
82
this.localLocation = new NetConnectionLocation(Node.getInstance().getNodeID(), connData.vconnn);
83
this.outputFilter = filterTX;
84
this.inputFilter = filterRX;
88
this.linkConnectedTo = link;
89
// TODO: registration stuff
92
this.localConnection = null;
97
this.localConnection = ConnectionManager.getInstance().getConnection(this.serverLocation.getVConnN());
98
this.linkConnectedTo = null;
102
public boolean isOpen()
103
throws IllegalStateException, JCSPNetworkException
105
if (this.data.state == ConnectionDataState.CLIENT_STATE_MADE_REQ)
106
throw new IllegalStateException("Can only call isOpen after a reply has been received from the server");
107
if (this.data.state == ConnectionDataState.DESTROYED)
108
throw new JCSPNetworkException("Client connection end has been destroyed");
109
if (this.data.state == ConnectionDataState.BROKEN)
110
throw new JCSPNetworkException("Client connection end has broken");
111
return this.data.state == ConnectionDataState.CLIENT_STATE_OPEN;
114
public Object reply()
115
throws IllegalStateException, JCSPNetworkException
117
if (this.data.state != ConnectionDataState.CLIENT_STATE_MADE_REQ)
118
throw new IllegalStateException("Can only call reply() after a request()");
119
if (this.data.state == ConnectionDataState.DESTROYED)
120
throw new JCSPNetworkException("Client connection end has been destroyed");
121
if (this.data.state == ConnectionDataState.BROKEN)
122
throw new JCSPNetworkException("Client connection end has broken");
126
NetworkMessage msg = (NetworkMessage)this.in.read();
130
synchronized (this.data)
134
case NetworkProtocol.REPLY:
136
Object toReturn = this.inputFilter.filterRX(msg.data);
137
NetworkMessage ack = new NetworkMessage();
138
ack.type = NetworkProtocol.REPLY_ACK;
139
ack.attr1 = msg.attr2;
141
this.toLinkTX.write(ack);
142
this.data.state = ConnectionDataState.CLIENT_STATE_OPEN;
145
case NetworkProtocol.REPLY_AND_CLOSE:
147
Object toReturn = this.inputFilter.filterRX(msg.data);
148
NetworkMessage ack = new NetworkMessage();
149
ack.attr1 = msg.attr2;
151
this.toLinkTX.write(ack);
152
this.data.state = ConnectionDataState.CLIENT_STATE_CLOSED;
155
case NetworkProtocol.LINK_LOST:
157
this.data.state = ConnectionDataState.BROKEN;
158
ConnectionManager.getInstance().removeConnection(this.data);
159
throw new JCSPNetworkException("Link to server Node was lost. Reply cannot complete");
161
case NetworkProtocol.REJECT_CONNECTION:
163
this.data.state = ConnectionDataState.BROKEN;
164
ConnectionManager.getInstance().removeConnection(this.data);
165
throw new JCSPNetworkException("Server connection rejected previous request");
169
this.data.state = ConnectionDataState.BROKEN;
170
ConnectionManager.getInstance().removeConnection(this.data);
171
Node.err.log(this.getClass(), "Connection " + this.data.vconnn
172
+ " received unexpected message");
173
throw new JCSPNetworkException("NetAltingConnectionClient received unexpected message");
178
catch (IOException ioe)
180
throw new JCSPNetworkException("Incoming message was corrupted");
185
public void request(Object obj)
186
throws IllegalStateException, JCSPNetworkException
188
if (this.data.state == ConnectionDataState.CLIENT_STATE_MADE_REQ)
189
throw new IllegalStateException("Cannot call request(Object) twice without calling reply()");
190
if (this.data.state == ConnectionDataState.DESTROYED)
191
throw new JCSPNetworkException("Client connection end has been destroyed");
192
if (this.data.state == ConnectionDataState.BROKEN)
193
throw new JCSPNetworkException("Client connection end has broken");
195
if (this.in.pending())
197
NetworkMessage msg = (NetworkMessage)this.in.read();
199
synchronized (this.data)
201
if (msg.type == NetworkProtocol.LINK_LOST)
203
this.data.state = ConnectionDataState.BROKEN;
204
ConnectionManager.getInstance().removeConnection(this.data);
205
throw new JCSPNetworkException("Link to server Node lost. Send cannot complete");
207
Node.err.log(this.getClass(), "Connection " + this.data.vconnn + " reports unexpected message");
208
throw new JCSPNetworkException("NetAltingConnecionClient received an unexpected message");
212
NetworkMessage msg = new NetworkMessage();
214
if (this.data.state == ConnectionDataState.CLIENT_STATE_CLOSED)
216
msg.type = NetworkProtocol.OPEN;
220
msg.type = NetworkProtocol.REQUEST;
223
msg.attr1 = this.serverLocation.getVConnN();
224
msg.attr2 = this.data.vconnn;
228
msg.data = this.outputFilter.filterTX(obj);
230
synchronized (this.data)
232
this.data.state = ConnectionDataState.CLIENT_STATE_MADE_REQ;
237
this.toLinkTX.write(msg);
241
synchronized (this.localConnection)
243
switch (this.localConnection.state)
245
case ConnectionDataState.SERVER_STATE_CLOSED:
246
case ConnectionDataState.SERVER_STATE_OPEN:
247
case ConnectionDataState.SERVER_STATE_RECEIVED:
248
msg.toLink = this.data.toConnection;
249
this.localConnection.openServer.write(msg);
253
this.data.state = ConnectionDataState.BROKEN;
254
ConnectionManager.getInstance().removeConnection(this.data);
255
throw new JCSPNetworkException("Connection rejected during request");
260
catch (IOException ioe)
262
throw new JCSPNetworkException("Error when trying to convert the message for sending");
265
NetworkMessage reply = (NetworkMessage)this.in.read();
267
if (reply.type == NetworkProtocol.REJECT_CONNECTION)
269
this.data.state = ConnectionDataState.BROKEN;
270
ConnectionManager.getInstance().removeConnection(this.data);
271
throw new JCSPNetworkException("Connection rejected during request");
273
else if (reply.type == NetworkProtocol.LINK_LOST)
275
this.data.state = ConnectionDataState.BROKEN;
276
ConnectionManager.getInstance().removeConnection(this.data);
277
throw new JCSPNetworkException("Link to server Node was lost. Request cannot complete");
279
else if (reply.type == NetworkProtocol.REQUEST_ACK)
285
Node.err.log(this.getClass(), "Connection " + this.data.vconnn + " reports unexpected message.");
286
throw new JCSPNetworkException("NetAltingConnectionClient received an unexpected message");
290
public void destroy()
292
synchronized (this.data)
294
this.data.state = ConnectionDataState.DESTROYED;
295
ConnectionManager.getInstance().removeConnection(this.data);
296
// TODO: deregistration from link
298
// Deal with left over messages
299
while (this.in.pending())
301
NetworkMessage msg = (NetworkMessage)this.in.read();
304
case NetworkProtocol.REPLY:
305
case NetworkProtocol.REPLY_AND_CLOSE:
307
NetworkMessage reply = new NetworkMessage();
308
reply.type = NetworkProtocol.REJECT_CONNECTION;
309
reply.attr1 = msg.attr2;
311
this.toLinkTX.write(reply);
315
case NetworkProtocol.REQUEST_ACK:
317
NetworkMessage reply = new NetworkMessage();
318
reply.type = NetworkProtocol.REJECT_CONNECTION;
319
reply.attr1 = msg.attr1;
321
this.toLinkTX.write(reply);
332
public NetLocation getLocation()
334
return this.serverLocation;
337
NetConnectionLocation getLocalLocation()
339
return this.localLocation;
342
final ConnectionData getConnectionData()