2
* JBoss, Home of Professional Open Source
3
* Copyright 2005, JBoss Inc., and individual contributors as indicated
4
* by the @authors tag. See the copyright.txt in the distribution for a
5
* full listing of individual contributors.
7
* This is free software; you can redistribute it and/or modify it
8
* under the terms of the GNU Lesser General Public License as
9
* published by the Free Software Foundation; either version 2.1 of
10
* the License, or (at your option) any later version.
12
* This software is distributed in the hope that it will be useful,
13
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15
* Lesser General Public License for more details.
17
* You should have received a copy of the GNU Lesser General Public
18
* License along with this software; if not, write to the Free
19
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
23
package org.jboss.remoting.transport.multiplex;
25
import java.io.EOFException;
26
import java.io.IOException;
27
import java.net.InetAddress;
28
import java.net.InetSocketAddress;
29
import java.net.ServerSocket;
30
import java.net.Socket;
31
import java.net.SocketException;
32
import java.net.SocketTimeoutException;
33
import java.util.ArrayList;
34
import java.util.Collection;
35
import java.util.Collections;
36
import java.util.HashMap;
37
import java.util.HashSet;
38
import java.util.Iterator;
39
import java.util.List;
43
import javax.net.ServerSocketFactory;
44
import javax.net.ssl.HandshakeCompletedEvent;
45
import javax.net.ssl.HandshakeCompletedListener;
47
import org.jboss.logging.Logger;
48
import org.jboss.remoting.Client;
49
import org.jboss.remoting.InvokerLocator;
50
import org.jboss.remoting.InvokerRegistry;
51
import org.jboss.remoting.ServerInvocationHandler;
52
import org.jboss.remoting.ServerInvoker;
53
import org.jboss.remoting.callback.ServerInvokerCallbackHandler;
54
import org.jboss.remoting.marshal.serializable.SerializableMarshaller;
55
import org.jboss.remoting.transport.PortUtil;
56
import org.jboss.remoting.transport.multiplex.utility.AddressPair;
57
import org.jboss.remoting.transport.socket.SocketServerInvoker;
58
import org.jboss.remoting.util.socket.HandshakeRepeater;
62
* <code>MultiplexServerInvoker</code> is the server side of the Multiplex transport.
63
* For more information, see Remoting documentation on labs.jboss.org.
65
* @author <a href="mailto:tom.elrod@jboss.com">Tom Elrod</a>
66
* @author <a href="mailto:r.sigal@computer.org">Ron Sigal</a>
68
* @deprecated As of release 2.4.0 the multiplex transport will no longer be actively supported.
70
public class MultiplexServerInvoker extends SocketServerInvoker
71
implements Runnable, VirtualSocket.DisconnectListener
73
protected static final Logger log = Logger.getLogger(MultiplexServerInvoker.class);
74
private static boolean trace = log.isTraceEnabled();
76
private static Map socketGroupMap = new HashMap();
77
private static Map addressPairMap = new HashMap();
78
private static HandshakeCompletedEvent handshakeCompletedEvent;
80
private boolean isVirtual = false;
81
private Map virtualServerInvokers;
82
private Socket connectPrimingSocket;
83
private SocketGroupInfo socketGroupInfo;
84
private AddressPair addressPair;
85
private String bindHost;
87
private int originalBindPort;
88
private InetAddress bindAddress;
89
private InetSocketAddress connectSocketAddress;
90
private boolean readyToStart = true;
91
private boolean needsSocketGroupConfiguration = true;
92
private boolean cleanedUp;
93
private boolean hasMaster;
94
private int errorCount;
96
private ServerSocket serverSocket;
98
/////////////////////////////////////////////////////////////////////////////////////
99
// configurable Multiplex parameters //
100
/////////////////////////////////////////////////////////////////////////////////////
102
* The following parameters may be set in any of four ways:
104
* 1. They may be appended to an <code>InvokerLocator</code> passed to a
105
* <code>Connector</code> constructor.
106
* 2. They may be included in a configuration <code>Map</code> passed to a
107
* <code>Connector</code> constructor.
108
* 3. They may be described in a <config> XML element passed to
109
* <code>Connector.setConfiguration()</code>.
110
* 4. In some cases, a <code>MultiplexServerInvoker</code> setter methods may be invoked.
112
* Of those of the following parameters destined for <code>MultiplexServerInvoker</code>,
113
* there are two categories.
115
* 1. <code>serverMultiplexId</code>, <code>multiplexConnectHost</code>, and
116
* <code>multiplexConnectPort</code> are used to
117
* match up <code>MultiplexClientInvoker</code>s and virtual
118
* <code>MultiplexServerInvokers</code> so that
119
* they share an underlying socket connection. Depending on the way in which
120
* the information is provided (see Remoting documentation), the connection may
121
* be created any time during or after the call to <code>Connector.create()</code>. Note,
122
* however, that if a callback <code>MultiplexServerInvoker</code> is created with just a
123
* <code>serverMultiplexId</code> parameter (server rule 3 in the Remoting documentation),
124
* then calling <code>setMultiplexConnectHost()</code> and
125
* <code>setMultiplexConnectPort()</code> will
126
* not trigger the creation of a connection. Moreover, when a <code>Client</code> comes
127
* along, the connect information supplied by the <code>Client</code> will be used, so there
128
* is no point to having setter methods for these parameters.
130
* 2. <code>maxAcceptErrors</code> is used in the
131
* <code>MultiplexServerInvoker</code> <code>accept()</code> loop, and it
132
* may be changed at any time by calling <code>setMaxAcceptErrors()</code>.
134
* Those of the following parameters which are destined for the <code>MultiplexingManager</code>,
135
* <code>InputMultiplexor</code>, and <code>OutputMultiplexor</code> classes are
136
* passed to them by way of a configuration <code>Map</code> passed to
137
* <code>VirtualSocket</code> and <code>VirtualServerSocket</code> constructors.
139
* A <code>VirtualServerSocket</code> is created when a server side master
140
* <code>MultiplexServerInvoker</code>
141
* accepts a connection request generated by the creation of a priming socket on a client.
142
* Since this can happen any time after <code>Connector.start()</code> is created, the values of
143
* these parameters can be changed by calling their respective setter methods any
144
* time before <code>Connector.start()</code> is called.
146
* A <code>VirtualSocket</code> is created when a client side
147
* <code>MultiplexClientInvoker</code> or callback
148
* <code>MultiplexServerInvoker</code> opens a priming socket, and this happens when
149
* <code>Connector.create()</code> is called. Therefore, the values of these parameters can be
150
* changed by calling their respective setter methods any time before
151
* <code>Connector.create()</code> is called.
153
// MultiplexingManager:
154
private int staticThreadsMonitorPeriod;
155
private int shutdownRequestTimeout;
156
private int shutdownRefusalsMaximum;
157
private int shutdownMonitorPeriod;
160
private int inputBufferSize;
161
private int inputMaxErrors;
163
// OutputMultiplexor:
164
private int outputMessagePoolSize;
165
private int outputMessageSize;
166
private int outputMaxChunkSize;
167
private int outputMaxTimeSlice;
168
private int outputMaxDataSlice;
170
// MultiplexServerInvoker
171
private int maxAcceptErrors;
172
private String serverMultiplexId;
173
private String multiplexConnectHost;
174
private int multiplexConnectPort;
175
private boolean multiplexConnectPortIsSet; // to check for missing configuration information
177
public static Map getAddressPairMap()
179
return addressPairMap;
182
public static Map getSocketGroupMap()
184
return socketGroupMap;
190
* Create a new <code>MultiplexServerInvoker</code>.
194
public MultiplexServerInvoker(InvokerLocator locator)
197
// virtualServerInvokers = Collections.synchronizedMap(new HashMap());
198
virtualServerInvokers = new HashMap();
203
* Create a new <code>MultiplexServerInvoker</code>.
205
public MultiplexServerInvoker(InvokerLocator locator, Map configuration)
207
super(locator, configuration);
208
// virtualServerInvokers = Collections.synchronizedMap(new HashMap());
209
virtualServerInvokers = new HashMap();
214
* Create a new <code>MultiplexServerInvoker</code>.
216
protected MultiplexServerInvoker(InvokerLocator locator, Map configuration,
217
List serverSockets, Socket socket,
218
Map virtualServerInvokers)
220
super(locator, configuration);
221
this.serverSockets = serverSockets;
222
serverSocket = (ServerSocket) serverSockets.get(0);
223
connectPrimingSocket = socket;
224
this.virtualServerInvokers = virtualServerInvokers;
226
needsSocketGroupConfiguration = false;
227
((VirtualSocket) connectPrimingSocket).addDisconnectListener(this);
241
* Each implementation of the remote client invoker should have
242
* a default data type that is uses in the case it is not specified
243
* in the invoker locator uri.
245
protected String getDefaultDataType()
247
return SerializableMarshaller.DATATYPE;
250
//TODO: -TME Need to check on synchronization after initial hook up
251
public void start() throws IOException
262
log.trace("Started execution of method run");
264
ServerSocketRefresh thread=new ServerSocketRefresh();
265
thread.setDaemon(true);
276
log.trace("Socket is going to be accepted");
278
thread.release(); //goes on if serversocket refresh is completed
279
Socket socket = serverSocket.accept();
282
log.trace("Accepted: " + socket);
284
processInvocation(socket);
286
catch (SocketException e)
288
if ("Socket is closed".equals(e.getMessage())
289
|| "Socket closed".equals(e.getMessage()))
291
log.info("socket is closed: stopping thread");
292
// If this invoker was started by a Connector, let the Connector stop it.
297
else if (++errorCount > maxAcceptErrors)
299
log.error("maximum accept errors exceeded: stopping thread");
300
// If this invoker was started by a Connector, let the Connector stop it.
310
catch (SocketTimeoutException e)
314
// If remote MultiplexClientInvoker and optional callback MultiplexServerInvoker
315
// have shutdown, it's safe to stop.
316
if (connectPrimingSocket != null && ((VirtualSocket)connectPrimingSocket).hasReceivedDisconnectMessage())
318
log.info("Client has closed: stopping thread");
319
// If this invoker was started by a Connector, let the Connector stop it.
326
catch (javax.net.ssl.SSLHandshakeException e)
328
log.info("SSLHandshakeException", e);
334
log.error("Failed to accept socket connection", ex);
335
if (++errorCount > maxAcceptErrors)
337
log.error("maximum accept errors exceeded: stopping");
338
// If this invoker was started by a Connector, let the Connector stop it.
358
public boolean isSafeToShutdown()
360
return (connectPrimingSocket == null || ((VirtualSocket) connectPrimingSocket).hasReceivedDisconnectMessage());
364
public void notifyDisconnected(VirtualSocket virtualSocket)
366
if (virtualSocket != connectPrimingSocket)
368
log.error("notified about disconnection of unrecognized virtual socket");
372
log.debug("remote peer socket has closed: stopping");
379
// If running == false, super.stop() will not call cleanup().
380
// However, MultiplexServerInvoker could have stuff to clean up
381
// (socket group information) even if it didn't start.
389
public String toString()
393
VirtualServerSocket vss = (VirtualServerSocket) serverSocket;
395
return "MultiplexServerInvoker[virtual:"
396
+ vss.getInetAddress() + ":" + vss.getLocalPort()
398
+ vss.getRemoteAddress() + ":" + vss.getRemotePort()
401
return "MultiplexServerInvoker[virtual]";
404
if (serverSocket != null)
405
return "MultiplexServerInvoker[master:"
406
+ serverSocket.getInetAddress() + ":" + serverSocket.getLocalPort()
409
return "MultiplexServerInvoker[master]";
413
protected void setup() throws Exception
415
originalBindPort = this.getLocator().getPort();
420
// socketFactory = createSocketFactory(configuration);
421
// if (socketFactory != null)
422
// configuration.put(Multiplex.SOCKET_FACTORY, socketFactory);
424
if (!configuration.isEmpty())
426
if (needsSocketGroupConfiguration)
430
configureSocketGroupParameters(configuration);
432
catch (IOException e)
434
log.error("error configuring socket group parameters", e);
444
* Finishes start up process when suitable bind and connect information is available.
445
* For more information, see the Multiplex subsystem documentation at labs.jboss.org.
447
protected void finishStart() throws IOException
449
log.debug("entering finishStart()");
454
if (socketGroupInfo != null && connectSocketAddress == null)
456
InetAddress connectAddress = socketGroupInfo.getConnectAddress();
457
int connectPort = socketGroupInfo.getConnectPort();
458
connectSocketAddress = new InetSocketAddress(connectAddress, connectPort);
461
if (socketGroupInfo != null && addressPair == null)
463
String connectHost = socketGroupInfo.getConnectAddress().getHostName();
464
int connectPort = socketGroupInfo.getConnectPort();
465
addressPair = new AddressPair(connectHost, connectPort, bindHost, bindPort);
474
log.error("Error starting MultiplexServerInvoker.", e);
479
log.debug("MultiplexServerInvoker started.");
485
* Called by MultiplexClientInvoker.createSocket() when it finds connection is
486
* broken and binds virtual socket group to new bind port.
490
protected void resetLocator(int bindPort)
492
this.bindPort = bindPort;
493
InvokerLocator newLocator = new InvokerLocator(locator.getProtocol(),
497
locator.getParameters());
499
InvokerRegistry.updateServerInvokerLocator(locator, newLocator);
500
locator = newLocator;
504
protected void configureSocketGroupParameters(Map parameters) throws IOException
506
log.debug("entering configureSocketGroupParameters()");
509
synchronized (SocketGroupInfo.class)
511
if (serverMultiplexId != null)
513
socketGroupInfo = (SocketGroupInfo) getSocketGroupMap().get(serverMultiplexId);
514
if (socketGroupInfo != null)
521
if (multiplexConnectHost != null && !this.multiplexConnectPortIsSet)
522
throw new IOException("multiplexConnectHost != null and multiplexConnectPort is not set");
524
if (multiplexConnectHost == null && this.multiplexConnectPortIsSet)
525
throw new IOException("multiplexConnectHost == null and multiplexConnectPort is set");
528
if (multiplexConnectHost != null)
530
rule2(multiplexConnectHost, multiplexConnectPort);
535
if (serverMultiplexId != null)
547
protected static void createPrimingSocket(SocketGroupInfo socketGroupInfo,
548
String connectHost, int connectPort,
549
Map configuration, int timeout)
552
createPrimingSocket(socketGroupInfo, connectHost, connectPort, null, -1, configuration, timeout);
556
protected static void createPrimingSocket(SocketGroupInfo socketGroupInfo,
557
String connectHost, int connectPort,
558
InetAddress bindAddress, int bindPort,
559
Map configuration, int timeout)
562
log.debug("entering createPrimingSocket()");
564
boolean needed = true;
565
InetSocketAddress csa = new InetSocketAddress(connectHost, connectPort);
566
InetSocketAddress bsa = null;
568
if (bindAddress != null)
570
bsa = new InetSocketAddress(bindAddress, bindPort);
571
needed = !MultiplexingManager.checkForShareableManagerByAddressPair(bsa, csa);
575
needed = !MultiplexingManager.checkForShareableManager(csa);
578
if (socketGroupInfo != null)
579
socketGroupInfo.setPrimingSocketNeeded(needed);
583
log.debug("priming socket is not necessary");
587
// If the configuration Map has an SSL HandshakeCompletedListener, we register to
588
// receive the HandshakeCompletedEvent with a HandshakeRepeater and, if the event
589
// arrives within 60 seconds, we pass it on to the configured listener. Otherwise,
590
// HandshakeRepeater.waitForHandshake() will throw an SSLException.
591
Object obj = configuration.get(Client.HANDSHAKE_COMPLETED_LISTENER);
592
HandshakeCompletedListener externalListener = null;
593
HandshakeRepeater internalListener = null;
594
if (obj != null && obj instanceof HandshakeCompletedListener)
596
externalListener = (HandshakeCompletedListener) obj;
597
internalListener = new HandshakeRepeater(new InternalHandshakeListener());
598
configuration.put(Multiplex.SSL_HANDSHAKE_LISTENER, internalListener);
601
VirtualSocket socket = new VirtualSocket(configuration);
603
if (bindAddress != null)
604
socket.connect(csa, bsa, timeout);
606
socket.connect(csa, timeout);
608
MultiplexingManager manager = socket.getManager();
610
if (externalListener != null)
612
if (manager.getHandshakeCompletedEvent() != null)
614
externalListener.handshakeCompleted(manager.getHandshakeCompletedEvent());
618
internalListener.waitForHandshake();
619
externalListener.handshakeCompleted(handshakeCompletedEvent);
623
if (!manager.waitForRemoteServerSocketRegistered())
624
throw new IOException("error waiting for remote server socket to be registered");
626
if (socketGroupInfo != null)
627
socketGroupInfo.setPrimingSocket(socket);
629
log.debug("created priming socket: " + socket.getLocalSocketId());
633
protected String getThreadName(int i)
635
String virtualTag = isVirtual ? "v" : "m";
636
return "MultiplexServerInvoker#" + i + virtualTag + "-" + serverSocket.toString();
640
protected void processInvocation(Socket socket) throws Exception
643
super.processInvocation(socket);
646
log.debug("creating VSS");
647
ServerSocket ss = new VirtualServerSocket((VirtualSocket) socket, configuration);
648
ss.setSoTimeout(getTimeout());
649
List serverSockets = new ArrayList();
650
serverSockets.add(ss);
651
MultiplexServerInvoker si = new MultiplexServerInvoker(locator, configuration, serverSockets, socket, virtualServerInvokers);
653
si.clientCallbackListener = clientCallbackListener;
654
si.handlers = handlers;
655
si.setMBeanServer(this.getMBeanServer());
656
si.setServerSocketFactory(this.getServerSocketFactory());
657
si.setSocketFactory(this.socketFactory);
658
synchronized (virtualServerInvokers)
660
virtualServerInvokers.put(socket.getRemoteSocketAddress(), si);
662
si.connectionNotifier = connectionNotifier;
665
log.debug("created virtual MultiplexServerInvoker: " + si);
670
protected void cleanup()
672
// If running == false, SocketServerInvoker doesn't want to call cleanup().
678
// If the Finalizer thread gets here after clean up has occurred, return.
686
if (connectPrimingSocket != null)
688
log.debug("connect priming != null");
689
// If !virtualServerInvokers.containsKey(connectPrimingSocket.getRemoteSocketAddress()),
690
// the master MultiplexServerInvoker might be iterating through virtualServerInvokers
691
// and shutting them down. This test avoids a NullPointerException.
692
Object key = connectPrimingSocket.getRemoteSocketAddress();
693
synchronized (virtualServerInvokers)
695
if (virtualServerInvokers.containsKey(key))
696
virtualServerInvokers.remove(key);
701
log.debug("MultiplexServerInvoker: closing connect priming socket");
702
connectPrimingSocket.close();
704
catch (IOException e)
706
log.error("Error closing connect priming socket during cleanup upon stopping", e);
711
log.debug("connect priming socket == null");
714
// Remove all callback handlers (if any ServerInvocationHandlers are registered).
715
Iterator it = handlers.values().iterator();
719
log.debug("removing callback handlers");
720
ServerInvocationHandler defaultHandler = (ServerInvocationHandler) it.next();
721
ServerInvocationHandler handler = null;
722
ServerInvokerCallbackHandler callbackHandler = null;
723
it = callbackHandlers.values().iterator();
727
callbackHandler = (ServerInvokerCallbackHandler) it.next();
728
String subsystem = callbackHandler.getSubsystem();
730
if (subsystem == null)
731
handler = defaultHandler;
733
handler = (ServerInvocationHandler) handlers.get(subsystem.toUpperCase());
735
handler.removeListener(callbackHandler);
741
// Iterator it = virtualServerInvokers.values().iterator();
743
synchronized (virtualServerInvokers)
745
it = new HashMap(virtualServerInvokers).values().iterator();
750
ServerInvoker serverInvoker = ((ServerInvoker) it.next());
752
serverInvoker.stop();
756
if (socketGroupInfo != null)
758
synchronized (MultiplexServerInvoker.SocketGroupInfo.class)
760
socketGroupInfo.removeServerInvoker(this);
761
VirtualSocket ps = null;
763
if (socketGroupInfo.getClientInvokers().isEmpty())
765
log.debug("invoker group shutting down: " + socketGroupInfo.getSocketGroupId());
767
if ((ps = socketGroupInfo.getPrimingSocket()) != null)
769
// When the remote virtual MultiplexServerInvoker learns that the
770
// priming socket has closed, it will close its VirtualServerSocket,
771
// rendering unshareable the MultiplexingManager that underlies this
772
// socket group. We mark it as unshareable immediately so that it will
773
// not be reused by any other socket group.
774
ps.getManager().unregisterShareable();
776
log.debug("MultiplexServerInvoker: closing bind priming socket");
781
catch (IOException e)
783
log.error("Error closing bind priming socket during cleanup upon stopping", e);
787
serverMultiplexId = socketGroupInfo.getSocketGroupId();
788
log.debug("serverMultiplexId: " + serverMultiplexId);
789
if (serverMultiplexId != null)
791
getSocketGroupMap().remove(serverMultiplexId);
792
log.debug("removed serverMultiplexId: " + serverMultiplexId);
793
log.debug("socketGroupInfo: " + getSocketGroupMap().get(serverMultiplexId));
796
// addressPair is set in finishStart().
797
if (addressPair != null)
799
getAddressPairMap().remove(addressPair);
808
* In creating the server socket, <code>createServerSocket()</code> determines whether multiplexing
809
* will be supported by this <code>ServerInvoker</code>. The determination is made according to the
810
* presence or absence of certain parameters in the <code>ServerInvoker</code>'s locator. In particular,
811
* a <code>VirtualServerSocket</code>, which supports multiplexing, needs to connect to a
812
* remote <code>MasterServerSocket</code> before it can begin to accept connection requests.
813
* In order to know which <code>MasterServerSocket</code> to connect to,
814
* it looks for parameters "connectHost" and "connectPort" in the locator. The presence of these parameters
815
* indicates that a <code>VirtualServerSocket</code> should be created, and their absence indicates that a
816
* <code>MasterServerSocket</code>, which does not support multiplexing, should be created.
822
* @throws IOException
824
protected ServerSocket createServerSocket(int bindPort, int backlog, InetAddress bindAddress) throws IOException
825
// private ServerSocket createServerSocket() throws IOException
827
// The following commented code represents an attempt to make an automatic determination as to whether
828
// a VirtualServerSocket should be created. The idea is to see if a ClientInvoker already
829
// exists on the local port to which the new server socket wants to bind. The existence of such a
830
// ClientInvoker would indicate that multiplexing is desired. However, it appears that a ClientInvoker
831
// has no control over which local port(s) it uses.
833
// if (InvokerRegistry.isClientInvokerRegistered(getLocator()))
837
// Invoker clientInvoker = InvokerRegistry.createClientInvoker(getLocator());
838
// InvokerLocator connectLocator = clientInvoker.getLocator();
839
// InetSocketAddress connectSocketAddress = new InetSocketAddress(connectLocator.getHost(), connectLocator.getPort());
840
// InetSocketAddress bindSocketAddress = new InetSocketAddress(bindAddress, serverBindPort);
841
// svrSocket = new VirtualServerSocket(connectSocketAddress, bindSocketAddress);
843
// catch (Exception e)
845
// throw new IOException(e.getMessage());
849
// If this is a virtual MultiplexServerInvoker created by a master MultiplexServerInvoker,
850
// then the server socket has already been created.
851
if (serverSocket != null)
854
ServerSocket svrSocket = null;
858
InetSocketAddress bindSocketAddress = new InetSocketAddress(bindAddress, this.bindPort);
859
svrSocket = new VirtualServerSocket(connectSocketAddress, bindSocketAddress, getTimeout(), configuration);
860
svrSocket.setSoTimeout(getTimeout());
862
if (socketGroupInfo != null)
863
socketGroupInfo.setPrimingSocketNeeded(false);
867
// svrSocket = new MasterServerSocket(getServerSocketFactory(), bindPort, backlog, bindAddress);
868
ServerSocketFactory ssf = getServerSocketFactory();
869
if (ssf != null && !ssf.getClass().equals(ServerSocketFactory.getDefault().getClass()))
871
configuration.put(Multiplex.SERVER_SOCKET_FACTORY, ssf);
873
svrSocket = new MasterServerSocket(bindPort, backlog, bindAddress, configuration);
874
svrSocket.setSoTimeout(getTimeout());
877
log.debug("Created " + svrSocket.getClass() + ": " + svrSocket);
882
protected void rule1() throws IOException
884
log.debug("server rule 1");
886
// If we get here, it's because a MultiplexClientInvoker created a SocketGroupInfo with matching
887
// group id. We want to make sure that it didn't get a bind address or bind port different
888
// than the ones passed in through the parameters map.
889
InetAddress socketGroupBindAddress = socketGroupInfo.getBindAddress();
890
int socketGroupBindPort = socketGroupInfo.getBindPort();
892
if (socketGroupBindAddress != null && !socketGroupBindAddress.equals(bindAddress))
894
String message = "socket group bind address (" + socketGroupBindAddress +
895
") does not match bind address (" + bindAddress + ")";
897
socketGroupInfo = null; // We don't belong to this group.
898
throw new IOException(message);
901
if (socketGroupBindPort > 0 && originalBindPort > 0 && socketGroupBindPort != bindPort)
903
String message = "socket group bind port (" + socketGroupBindPort +
904
") does not match bind port (" + bindPort + ")";
906
socketGroupInfo = null; // We don't belong to this group.
907
throw new IOException(message);
910
if (originalBindPort <= 0)
912
if (socketGroupBindPort > 0)
913
bindPort = socketGroupBindPort;
916
bindPort = PortUtil.findFreePort(bindHost);
917
socketGroupBindPort = bindPort;
920
// re-write locator since the port is different
921
InvokerLocator newLocator = new InvokerLocator(locator.getProtocol(), locator.getHost(), bindPort, locator.getPath(), locator.getParameters());
922
// need to update the locator key used in the invoker registry
923
InvokerRegistry.updateServerInvokerLocator(locator, newLocator);
924
this.locator = newLocator;
928
InetAddress connectAddress = socketGroupInfo.getConnectAddress();
929
int connectPort = socketGroupInfo.getConnectPort();
930
connectSocketAddress = new InetSocketAddress(connectAddress, connectPort);
931
socketGroupInfo.setBindAddress(bindAddress);
932
socketGroupInfo.setBindPort(bindPort);
933
socketGroupInfo.setServerInvoker(this);
935
Iterator it = socketGroupInfo.getClientInvokers().iterator();
938
((MultiplexClientInvoker) it.next()).finishStart();
943
if (socketGroupInfo.getPrimingSocket() == null)
945
socketFactory = createSocketFactory(configuration);
946
if (socketFactory != null)
947
configuration.put(Multiplex.SOCKET_FACTORY, socketFactory);
949
createPrimingSocket(socketGroupInfo, connectAddress.getHostName(), connectPort,
950
bindAddress, bindPort, configuration, getTimeout());
953
// We got socketGroupInfo by socketGroupId. Make sure it is also stored by AddressPair.
954
String connectHost = connectAddress.getHostName();
955
addressPair = new AddressPair(connectHost, connectPort, bindHost, bindPort);
956
addressPairMap.put(addressPair, socketGroupInfo);
960
protected void rule2(String connectHost, int connectPort)
963
log.debug("server rule 2");
966
connectSocketAddress = new InetSocketAddress(connectHost, connectPort);
967
addressPair = new AddressPair(connectHost, connectPort, bindHost, bindPort);
968
socketGroupInfo = (SocketGroupInfo) addressPairMap.get(addressPair);
970
// If socketGroupInfo exists, it's because it was created, along with a priming socket (if necessary),
971
// by a MultiplexClientInvoker.
972
if (socketGroupInfo != null)
974
// We got socketGroupInfo by AddressPair. Make sure it is stored by socketGroupId, if we have one.
975
if (serverMultiplexId != null)
977
String socketGroupSocketGroupId = socketGroupInfo.getSocketGroupId();
979
if (socketGroupSocketGroupId != null && socketGroupSocketGroupId != serverMultiplexId)
981
String message = "socket group multiplexId (" + socketGroupSocketGroupId +
982
") does not match multiplexId (" + serverMultiplexId + ")";
984
socketGroupInfo = null; // Assume we weren't meant to join this group.
985
throw new IOException(message);
988
if (socketGroupSocketGroupId == null)
990
socketGroupInfo.setSocketGroupId(serverMultiplexId);
991
getSocketGroupMap().put(serverMultiplexId, socketGroupInfo);
995
socketGroupInfo.setBindAddress(bindAddress);
996
socketGroupInfo.setBindPort(bindPort);
997
socketGroupInfo.setServerInvoker(this);
1002
socketGroupInfo = new SocketGroupInfo();
1003
socketGroupInfo.setBindAddress(bindAddress);
1004
socketGroupInfo.setBindPort(bindPort);
1005
socketGroupInfo.setServerInvoker(this);
1007
// Set connectAddress and connectPort to be able to test for inconsistencies with connect address
1008
// and connect port determined by companion MultiplexClientInvoker.
1009
InetAddress connectAddress = InetAddress.getByName(connectHost);
1010
socketGroupInfo.setConnectAddress(connectAddress);
1011
socketGroupInfo.setConnectPort(connectPort);
1013
socketFactory = createSocketFactory(configuration);
1014
if (socketFactory != null)
1015
configuration.put(Multiplex.SOCKET_FACTORY, socketFactory);
1017
createPrimingSocket(socketGroupInfo, connectHost, connectPort,
1018
bindAddress, bindPort, configuration, getTimeout());
1019
addressPairMap.put(addressPair, socketGroupInfo);
1021
if (serverMultiplexId != null)
1023
socketGroupInfo.setSocketGroupId(serverMultiplexId);
1024
socketGroupMap.put(serverMultiplexId, socketGroupInfo);
1027
readyToStart = true;
1031
protected void rule3() throws IOException
1033
log.debug("server rule 3");
1034
socketGroupInfo = new SocketGroupInfo();
1035
socketGroupInfo.setSocketGroupId(serverMultiplexId);
1036
socketGroupInfo.setServerInvoker(this);
1037
socketGroupInfo.setBindAddress(bindAddress);
1038
socketGroupInfo.setBindPort(bindPort);
1039
socketGroupMap.put(serverMultiplexId, socketGroupInfo);
1041
readyToStart = false;
1045
protected void rule4()
1047
log.debug("server rule 4");
1049
readyToStart = true;
1053
protected void refreshServerSocket() throws IOException
1055
super.refreshServerSocket();
1059
* Returns <code>ServerSocket</code> used to accept invocation requests.
1060
* It is added to facilitate unit tests.
1062
* @return <code>ServerSocket</code> used to accept invocation requests.
1064
public ServerSocket getServerSocket()
1066
return serverSocket;
1071
* Provides access to a virtual <code>MultiplexServerInvoker</code> in a master
1072
* <code>MultiplexServerInvoker</code>'s invoker farm.
1074
public MultiplexServerInvoker getServerInvoker(InetSocketAddress address)
1076
synchronized (virtualServerInvokers)
1078
return (MultiplexServerInvoker) virtualServerInvokers.get(address);
1084
* Provides access to all virtual <code>MultiplexServerInvoker</code>s in a master
1085
* <code>MultiplexServerInvoker</code>'s invoker farm.
1087
public Collection getServerInvokers()
1089
synchronized (virtualServerInvokers)
1091
return virtualServerInvokers.values();
1095
protected void setBindingInfo() throws IOException
1097
String originalUri = getLocator().getOriginalURI();
1098
String pastProtocol = originalUri.substring(originalUri.indexOf("://") + 3);
1099
int colon = pastProtocol.indexOf(":");
1100
int slash = pastProtocol.indexOf("/");
1101
String originalHost = null;
1102
int originalPort = 0;
1106
originalHost = pastProtocol.substring(0, colon).trim();
1110
originalPort = Integer.parseInt(pastProtocol.substring(colon + 1, slash));
1114
originalPort = Integer.parseInt(pastProtocol.substring(colon + 1));
1121
originalHost = pastProtocol.substring(0, slash).trim();
1125
originalHost = pastProtocol.substring(0).trim();
1129
bindHost = getServerBindAddress();
1130
bindPort = getServerBindPort();
1131
bindAddress = InetAddress.getByName(bindHost);
1135
protected void getParameters() throws Exception
1137
if (configuration != null)
1139
= Multiplex.getOneParameter(configuration,
1141
Multiplex.MAX_ACCEPT_ERRORS,
1142
Multiplex.MAX_ACCEPT_ERRORS_DEFAULT);
1144
if (configuration != null)
1145
serverMultiplexId = (String) configuration.get(Multiplex.SERVER_MULTIPLEX_ID);
1147
if (configuration != null)
1148
multiplexConnectHost = (String) configuration.get(Multiplex.MULTIPLEX_CONNECT_HOST);
1150
Object value = configuration.get(Multiplex.MULTIPLEX_CONNECT_PORT);
1153
if (value instanceof String)
1157
multiplexConnectPort = Integer.parseInt((String) value);
1158
multiplexConnectPortIsSet = true;
1160
catch (NumberFormatException e)
1162
String errorMessage = "number format error for multiplexConnectPort: " + (String) value;
1163
log.error(errorMessage);
1164
throw new IOException(errorMessage);
1167
else if (value instanceof Integer)
1169
multiplexConnectPort = ((Integer) configuration.get(Multiplex.MULTIPLEX_CONNECT_PORT)).intValue();
1170
multiplexConnectPortIsSet = true;
1174
String errorMessage = "invalid object passed for multiplexConnectPort: " + value;
1175
log.error(errorMessage);
1176
throw new IOException(errorMessage);
1182
/////////////////////////////////////////////////////////////////////////////////////
1183
// accessors for configurable Multiplex parameters //
1184
/////////////////////////////////////////////////////////////////////////////////////
1185
public int getInputBufferSize()
1187
return inputBufferSize;
1191
public void setInputBufferSize(int inputBufferSize)
1193
this.inputBufferSize = inputBufferSize;
1194
if (configuration != null)
1195
configuration.put(Multiplex.INPUT_BUFFER_SIZE, new Integer(inputBufferSize));
1199
public int getInputMaxErrors()
1201
return inputMaxErrors;
1205
public void setInputMaxErrors(int inputMaxErrors)
1207
this.inputMaxErrors = inputMaxErrors;
1208
if (configuration != null)
1209
configuration.put(Multiplex.INPUT_MAX_ERRORS, new Integer(inputMaxErrors));
1213
public int getMaxAcceptErrors()
1215
return maxAcceptErrors;
1219
public void setMaxAcceptErrors(int maxAcceptErrors)
1221
this.maxAcceptErrors = maxAcceptErrors;
1222
if (configuration != null)
1223
configuration.put(Multiplex.MAX_ACCEPT_ERRORS, new Integer(maxAcceptErrors));
1227
public String getMultiplexConnectHost()
1229
return multiplexConnectHost;
1234
// This method is useless. See notes about paramters, above.
1235
// public void setMultiplexConnectHost(String multiplexConnectHost)
1237
// this.multiplexConnectHost = multiplexConnectHost;
1238
// if (configuration != null)
1239
// configuration.put(Multiplex.MULTIPLEX_CONNECT_HOST, multiplexConnectHost);
1243
public int getMultiplexConnectPort()
1245
return multiplexConnectPort;
1249
// This method is useless. See notes about paramters, above.
1250
// public void setMultiplexConnectPort(int multiplexConnectPort)
1252
// this.multiplexConnectPort = multiplexConnectPort;
1253
// if (configuration != null)
1254
// configuration.put(Multiplex.MULTIPLEX_CONNECT_PORT, new Integer(multiplexConnectPort));
1258
public int getOutputMaxChunkSize()
1260
return outputMaxChunkSize;
1264
public void setOutputMaxChunkSize(int outputMaxChunkSize)
1266
this.outputMaxChunkSize = outputMaxChunkSize;
1267
if (configuration != null)
1268
configuration.put(Multiplex.OUTPUT_MAX_CHUNK_SIZE, new Integer(outputMaxChunkSize));
1272
public int getOutputMaxDataSlice()
1274
return outputMaxDataSlice;
1278
public void setOutputMaxDataSlice(int outputMaxDataSlice)
1280
this.outputMaxDataSlice = outputMaxDataSlice;
1281
if (configuration != null)
1282
configuration.put(Multiplex.OUTPUT_MAX_DATA_SLICE, new Integer(outputMaxDataSlice));
1286
public int getOutputMaxTimeSlice()
1288
return outputMaxTimeSlice;
1292
public void setOutputMaxTimeSlice(int outputMaxTimeSlice)
1294
this.outputMaxTimeSlice = outputMaxTimeSlice;
1295
if (configuration != null)
1296
configuration.put(Multiplex.OUTPUT_MAX_TIME_SLICE, new Integer(outputMaxTimeSlice));
1300
public int getOutputMessagePoolSize()
1302
return outputMessagePoolSize;
1306
public void setOutputMessagePoolSize(int outputMessagePoolSize)
1308
this.outputMessagePoolSize = outputMessagePoolSize;
1309
if (configuration != null)
1310
configuration.put(Multiplex.OUTPUT_MESSAGE_POOL_SIZE, new Integer(outputMessagePoolSize));
1314
public int getOutputMessageSize()
1316
return outputMessageSize;
1320
public void setOutputMessageSize(int outputMessageSize)
1322
this.outputMessageSize = outputMessageSize;
1323
if (configuration != null)
1324
configuration.put(Multiplex.OUTPUT_MESSAGE_SIZE, new Integer(outputMessageSize));
1328
public String getServerMultiplexId()
1330
return serverMultiplexId;
1334
// This method is useless. See notes about paramters, above.
1335
// public void setServerMultiplexId(String serverMultiplexId)
1337
// this.serverMultiplexId = serverMultiplexId;
1338
// if (configuration != null)
1339
// configuration.put(Multiplex.SERVER_MULTIPLEX_ID, serverMultiplexId);
1343
public int getShutdownMonitorPeriod()
1345
return shutdownMonitorPeriod;
1349
public void setShutdownMonitorPeriod(int shutdownMonitorPeriod)
1351
this.shutdownMonitorPeriod = shutdownMonitorPeriod;
1352
if (configuration != null)
1353
configuration.put(Multiplex.SHUTDOWN_MONITOR_PERIOD, new Integer(shutdownMonitorPeriod));
1357
public int getShutdownRefusalsMaximum()
1359
return shutdownRefusalsMaximum;
1363
public void setShutdownRefusalsMaximum(int shutdownRefusalsMaximum)
1365
this.shutdownRefusalsMaximum = shutdownRefusalsMaximum;
1366
if (configuration != null)
1367
configuration.put(Multiplex.SHUTDOWN_REFUSALS_MAXIMUM, new Integer(shutdownRefusalsMaximum));
1371
public int getShutdownRequestTimeout()
1373
return shutdownRequestTimeout;
1377
public void setShutdownRequestTimeout(int shutdownRequestTimeout)
1379
this.shutdownRequestTimeout = shutdownRequestTimeout;
1380
if (configuration != null)
1381
configuration.put(Multiplex.SHUTDOWN_REQUEST_TIMEOUT, new Integer(shutdownRequestTimeout));
1385
public int getStaticThreadsMonitorPeriod()
1387
return staticThreadsMonitorPeriod;
1391
public void setStaticThreadsMonitorPeriod(int staticThreadsMonitorPeriod)
1393
this.staticThreadsMonitorPeriod = staticThreadsMonitorPeriod;
1394
if (configuration != null)
1395
configuration.put(Multiplex.STATIC_THREADS_MONITOR_PERIOD, new Integer(staticThreadsMonitorPeriod));
1400
* <code>SocketGroupInfo</code> holds all of the information for a single virtual socket group.
1402
public static class SocketGroupInfo
1404
private String socketGroupId;
1405
private Set clientInvokers = new HashSet();
1406
private MultiplexServerInvoker serverInvoker;
1407
private boolean primingSocketNeeded;
1408
private VirtualSocket primingSocket;
1409
private InetAddress connectAddress;
1410
private int connectPort;
1411
private InetAddress bindAddress;
1412
private int bindPort;
1414
public InetAddress getBindAddress()
1419
public void setBindAddress(InetAddress bindAddress)
1421
this.bindAddress = bindAddress;
1424
public int getBindPort()
1429
public void setBindPort(int bindPort)
1431
this.bindPort = bindPort;
1434
public Set getClientInvokers()
1436
return clientInvokers;
1439
public void addClientInvoker(MultiplexClientInvoker clientInvoker)
1441
clientInvokers.add(clientInvoker);
1444
public void removeClientInvoker(MultiplexClientInvoker clientInvoker)
1446
clientInvokers.remove(clientInvoker);
1449
public InetAddress getConnectAddress()
1451
return connectAddress;
1454
public void setConnectAddress(InetAddress connectAddress)
1456
this.connectAddress = connectAddress;
1459
public int getConnectPort()
1464
public void setConnectPort(int connectPort)
1466
this.connectPort = connectPort;
1469
public boolean getPrimingSocketNeeded()
1471
return primingSocketNeeded;
1474
public void setPrimingSocketNeeded(boolean primingSocketNeeded)
1476
this.primingSocketNeeded = primingSocketNeeded;
1479
public VirtualSocket getPrimingSocket()
1481
return primingSocket;
1484
public void setPrimingSocket(VirtualSocket primingSocket)
1486
this.primingSocket = primingSocket;
1489
public String getSocketGroupId()
1491
return socketGroupId;
1494
public void setSocketGroupId(String socketGroupId)
1496
this.socketGroupId = socketGroupId;
1499
public MultiplexServerInvoker getServerInvoker()
1501
return serverInvoker;
1504
public void removeServerInvoker(MultiplexServerInvoker serverInvoker)
1506
if (this.serverInvoker != serverInvoker)
1508
String message = "Attempt to remove unknown MultiplexServerInvoker: " +
1509
"(" + bindAddress + "," + bindPort + ")->(" +
1510
connectAddress + "," + connectPort + ")";
1514
this.serverInvoker = null;
1517
public void setServerInvoker(MultiplexServerInvoker serverInvoker) throws IOException
1519
if (this.serverInvoker != null && serverInvoker != null)
1521
String message = "Second MultiplexServerInvoker attempting to join invoker group: " +
1522
"(" + bindAddress + "," + bindPort + ")->(" +
1523
connectAddress + "," + connectPort + ")";
1525
throw new IOException(message);
1528
this.serverInvoker = serverInvoker;
1533
protected static class InternalHandshakeListener implements HandshakeCompletedListener
1535
public void handshakeCompleted(HandshakeCompletedEvent event)
1537
handshakeCompletedEvent = event;
b'\\ No newline at end of file'