2
* JBoss, Home of Professional Open Source
3
* Copyright 2005, JBoss Inc., and individual contributors as indicated
4
* by the @authors tag. See the copyright.txt in the distribution for a
5
* full listing of individual contributors.
7
* This is free software; you can redistribute it and/or modify it
8
* under the terms of the GNU Lesser General Public License as
9
* published by the Free Software Foundation; either version 2.1 of
10
* the License, or (at your option) any later version.
12
* This software is distributed in the hope that it will be useful,
13
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15
* Lesser General Public License for more details.
17
* You should have received a copy of the GNU Lesser General Public
18
* License along with this software; if not, write to the Free
19
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
23
package org.jboss.remoting;
25
import org.jboss.remoting.callback.Callback;
26
import org.jboss.remoting.callback.InvokerCallbackHandler;
27
import org.jboss.remoting.callback.ServerInvokerCallbackHandler;
28
import org.jboss.remoting.invocation.InternalInvocation;
29
import org.jboss.remoting.invocation.OnewayInvocation;
30
import org.jboss.remoting.loading.ClassBytes;
31
import org.jboss.remoting.security.SSLSocketBuilder;
32
import org.jboss.remoting.security.ServerSocketFactoryMBean;
33
import org.jboss.remoting.security.ServerSocketFactoryWrapper;
34
import org.jboss.remoting.socketfactory.CreationListenerServerSocketFactory;
35
import org.jboss.remoting.socketfactory.SocketCreationListener;
36
import org.jboss.remoting.stream.StreamHandler;
37
import org.jboss.remoting.stream.StreamInvocationHandler;
38
import org.jboss.remoting.transport.PortUtil;
39
import org.jboss.remoting.util.SecurityUtility;
40
import org.jboss.remoting.serialization.ClassLoaderUtility;
41
import org.jboss.util.threadpool.BasicThreadPool;
42
import org.jboss.util.threadpool.BlockingMode;
43
import org.jboss.util.threadpool.ThreadPool;
44
import org.jboss.util.threadpool.ThreadPoolMBean;
45
import org.jboss.logging.Logger;
47
import java.util.concurrent.ConcurrentHashMap;
49
import javax.management.MBeanServer;
50
import javax.management.MBeanServerInvocationHandler;
51
import javax.management.MalformedObjectNameException;
52
import javax.management.ObjectName;
53
import javax.net.ServerSocketFactory;
55
import java.io.IOException;
56
import java.lang.reflect.Constructor;
57
import java.net.InetAddress;
58
import java.net.UnknownHostException;
59
import java.security.AccessController;
60
import java.security.PrivilegedAction;
61
import java.security.PrivilegedActionException;
62
import java.security.PrivilegedExceptionAction;
63
import java.util.ArrayList;
64
import java.util.Collection;
65
import java.util.HashMap;
66
import java.util.Iterator;
67
import java.util.List;
70
import java.util.StringTokenizer;
73
* ServerInvoker is the server-side part of a remote Invoker. The ServerInvoker implementation is
74
* responsible for calling transport, depending on how the protocol receives the incoming data.
76
* @author <a href="mailto:jhaynie@vocalocity.net">Jeff Haynie</a>
77
* @author <a href="mailto:tom.elrod@jboss.com">Tom Elrod</a>
78
* @author <a href="mailto:ovidiu@jboss.org">Ovidiu Feodorov</a>
80
* @version $Revision: 5919 $
82
public abstract class ServerInvoker extends AbstractInvoker implements ServerInvokerMBean
84
// Constants ------------------------------------------------------------------------------------
86
protected static final Logger log = Logger.getLogger(ServerInvoker.class);
89
* Key for the the maximum number of threads to be used in the thread pool for one way
90
* invocations (server side).
91
* This property is only used when the default oneway thread pool is used.
93
public static final String MAX_NUM_ONEWAY_THREADS_KEY = "maxNumThreadsOneway";
96
* Key for setting the setting the oneway thread pool to use.
97
* The value used with this key will first be checked to see if is a JMX ObjectName and if so,
98
* try to look up associated mbean for the ObjectName given and cast to type
99
* org.jboss.util.threadpool.ThreadPoolMBean
100
* (via MBeanServerInvocationHandler.newProxyInstance()). If the value is not a JMX ObjectName,
101
* will assume is a fully qualified classname and load the coresponding class and create a new
102
* instance of it (which will require it to have a void constructor). The newly created instance
103
* will then be cast to type of org.jboss.util.threadpool.ThreadPool.
105
public static final String ONEWAY_THREAD_POOL_CLASS_KEY = "onewayThreadPool";
108
* Key for setting the address the server invoker should bind to.
109
* The value can be either host name or IP.
111
public static final String SERVER_BIND_ADDRESS_KEY = "serverBindAddress";
114
* Key for setting the addres the client invoker should connecto to.
115
* This should be used when client will be connecting to server from outside the server's network
116
* and the external address is different from that of the internal address the server invoker
117
* will bind to (e.g. using NAT to expose different external address). This will mostly be useful
118
* when client uses remoting detection to discover remoting servers. The value can be either host
121
public static final String CLIENT_CONNECT_ADDRESS_KEY = "clientConnectAddress";
124
* Key for setting the port the server invoker should bind to.
125
* If the value supplied is less than or equal to zero, the server invoker will randomly choose
126
* a free port to use.
128
public static final String SERVER_BIND_PORT_KEY = "serverBindPort";
131
* key for setting the port the client invoker should connect to.
132
* This should be used when client will be connecting to server from outside the server's network
133
* and the external port is different from that of the internal port the server invoker will bind
134
* to (e.g. using NAT to expose different port routing). This will be mostly useful when client
135
* uses remoting detection to discover remoting servers.
137
public static final String CLIENT_CONNECT_PORT_KEY = "clientConnectPort";
140
* Key used for setting the amount of time (in milliseconds) that a client should renew its
142
* If this value is not set, the default of five seconds (see DEFAULT_CLIENT_LEASE_PERIOD), will
143
* be used. This value will also be what is given to the client when it initially querys server
144
* for leasing information.
146
public static final String CLIENT_LEASE_PERIOD = "clientLeasePeriod";
149
* Key for setting the timeout value (in milliseconds) for socket connections.
151
public static final String TIMEOUT = "timeout";
154
* Key for setting the value for the server socket factory to be used by the server invoker.
155
* The value can be either a JMX Object name, in which case will lookup the mbean and create
156
* a proxy to it with type of org.jboss.remoting.security.ServerSocketFactoryMBean
157
* (via MBeanServerInvocationHandler.newProxyInstance()). If not a JMX ObjectName, will assume
158
* is the fully qualified classname to the implementation to be used and will load the class,
159
* create a new instance of it (which requires it to have a void constructor). The instance will
160
* then be cast to type javax.net.ServerSocketFactory.
162
public static final String SERVER_SOCKET_FACTORY = "serverSocketFactory";
165
* The max number of worker threads to be used in the pool for processing one way calls on the
166
* server side. Value is is 100.
168
public static final int MAX_NUM_ONEWAY_THREADS = 100;
171
* Key for the configuration map that determines the queue size for waiting asynchronous
174
public static final String MAX_ONEWAY_THREAD_POOL_QUEUE_SIZE = "maxOnewayThreadPoolQueueSize";
177
* The default lease period for clients. This is the number of milliseconds that a client will be
178
* required to renew their lease with the server. The default value is 5 seconds.
180
public static final int DEFAULT_CLIENT_LEASE_PERIOD = 5000;
183
* The default timeout period for socket connections. The default value is 60000 milliseconds.
185
public static final int DEFAULT_TIMEOUT_PERIOD = 60000;
188
* The key to be used to determine if pull callbacks should be obtained in
189
* blocking or nonblocking mode
191
public static final String BLOCKING_MODE = "blockingMode";
194
* The key value to use to specify timeout for getting callbacks in blocking mode
196
public static final String BLOCKING_TIMEOUT = "blockingTimeout";
199
* The value associated with BLOCKING_MODE that indicates that pull callbacks
200
* should be obtained in blocking mode;
202
public static final String BLOCKING = "blocking";
205
* The value associated with BLOCKING_MODE that indicates that pull callbacks
206
* should be obtained in nonblocking mode;
208
public static final String NONBLOCKING = "nonblocking";
211
* Default timeout for getting callbacks in blocking mode.
212
* Default is 5000 milliseconds.
214
public static final int DEFAULT_BLOCKING_TIMEOUT = 5000;
218
* The key to use to specify if ServerInvokerCallbackHandlers should be
219
* registered as ConnectionListeners.
221
public static final String REGISTER_CALLBACK_LISTENER = "registerCallbackListener";
223
public static final String ECHO = "$ECHO$";
225
public static final String INVOKER_SESSION_ID = "invokerSessionId";
227
public static final String CONNECTION_LISTENER = "connectionListener";
230
// Static ---------------------------------------------------------------------------------------
232
private static boolean trace = log.isTraceEnabled();
233
private static final InetAddress LOCAL_HOST;
239
LOCAL_HOST = (InetAddress) AccessController.doPrivileged( new PrivilegedExceptionAction()
241
public Object run() throws UnknownHostException
245
return InetAddress.getLocalHost();
247
catch (UnknownHostException e)
249
return InetAddress.getByName("127.0.0.1");
254
catch (PrivilegedActionException e)
256
log.debug(ServerInvoker.class.getName() + " unable to get local host address", e.getCause());
257
throw new ExceptionInInitializerError(e.getCause());
259
catch (SecurityException e)
261
log.debug(ServerInvoker.class.getName() + " unable to get local host address", e);
266
// Attributes -----------------------------------------------------------------------------------
269
* Indicated the max number of threads used within oneway thread pool.
271
private int maxNumberThreads = MAX_NUM_ONEWAY_THREADS;
272
private int maxOnewayThreadPoolQueueSize = -1;
273
private String onewayThreadPoolClass = null;
274
private ThreadPool onewayThreadPool;
275
private Object onewayThreadPoolLock = new Object();
276
private boolean created = false;
278
private MBeanServer mbeanServer = null;
280
private String dataType;
281
private String serverBindAddress = null;
282
private int serverBindPort = 0;
283
private String clientConnectAddress = null;
284
private int clientConnectPort = -1;
286
protected List connectHomes = new ArrayList();
287
protected List homes = new ArrayList();
289
private int timeout = DEFAULT_TIMEOUT_PERIOD;
291
// indicates the lease timeout period for clients
292
private long leasePeriod = DEFAULT_CLIENT_LEASE_PERIOD;
293
private boolean leaseManagement = false;
294
private Map clientLeases = new ConcurrentHashMap();
296
protected Map handlers = new HashMap();
298
// If there is only one handler we store a direct reference to it, as an optimisation
299
// to avoid lookup in this common case - TLF
300
protected volatile ServerInvocationHandler singleHandler;
302
// If there is only one callback container we store a direct reference to it, as an optimisation
303
// to avoid lookup in this common case - TLF
304
protected volatile CallbackContainer singleCallbackContainer;
306
protected Map callbackHandlers = new HashMap();
307
protected Map clientCallbackListener = new HashMap();
308
protected boolean started = false;
309
protected ConnectionNotifier connectionNotifier = new ConnectionNotifier();
310
protected ServerSocketFactory serverSocketFactory = null;
312
protected boolean registerCallbackListeners = true;
314
protected boolean useClientConnectionIdentity;
316
// Constructors ---------------------------------------------------------------------------------
318
public ServerInvoker(InvokerLocator locator)
321
Map params = locator.getParameters();
322
if(configuration != null && params != null)
324
configuration.putAll(locator.getParameters());
328
public ServerInvoker(InvokerLocator locator, Map configuration)
330
super(locator, configuration);
333
// Public ---------------------------------------------------------------------------------------
335
public void setServerSocketFactory(ServerSocketFactory serverSocketFactory)
337
this.serverSocketFactory = serverSocketFactory;
340
public ServerSocketFactory getServerSocketFactory()
342
return serverSocketFactory;
346
* Sets timeout (in millseconds) to be used for the socket connection.
348
public void setTimeout(int timeout)
350
this.timeout = timeout;
354
* The timeout (in milliseconds) used for the socket connection.
356
public int getTimeout()
361
public ConnectionNotifier getConnectionNotifier()
363
return connectionNotifier;
366
public boolean isLeaseActivated()
368
return leaseManagement;
371
public void addConnectionListener(ConnectionListener listener)
375
connectionNotifier.addListener(listener);
379
leaseManagement = true;
384
throw new IllegalArgumentException("Can not add null ConnectionListener.");
388
public void setConnectionListener(Object listener)
390
if (listener == null)
392
log.error("ConnectionListener is null");
396
if (listener instanceof ConnectionListener)
398
addConnectionListener((ConnectionListener) listener);
402
if (!(listener instanceof String))
404
log.error("Object supplied as ConnectionListener is neither String nor ConnectionListener");
408
ConnectionListener connectionListener = null;
411
MBeanServer server = getMBeanServer();
412
ObjectName objName = new ObjectName((String) listener);
413
Class c = ConnectionListener.class;
414
Object o = MBeanServerInvocationHandler.newProxyInstance(server, objName, c, false);
415
connectionListener = (ConnectionListener) o;
417
catch (MalformedObjectNameException e)
419
log.debug("Object supplied as ConnectionListener is not an object name.");
422
if (connectionListener == null)
426
Class listenerClass = ClassLoaderUtility.loadClass((String) listener, ServerInvoker.class);
427
connectionListener = (ConnectionListener) listenerClass.newInstance();
431
log.error("Unable to instantiate " + listener + ": " + e.getMessage());
436
if (connectionListener == null)
438
log.error("Unable to create ConnectionListener from " + listener);
442
addConnectionListener(connectionListener);
445
public void removeConnectionListener(ConnectionListener listener)
447
if(connectionNotifier != null)
449
connectionNotifier.removeListener(listener);
451
// turn off lease management if no listeners (since no one to tell client died)
452
if(connectionNotifier.size() == 0)
454
leaseManagement = false;
456
// go through any existing leases and terminate them
457
Set clientKeys = clientLeases.keySet();
458
Iterator itr = clientKeys.iterator();
461
String sessionId = (String)itr.next();
462
Lease clientLease = (Lease)clientLeases.get(sessionId);
463
clientLease.terminateLease(sessionId);
465
clientLeases.clear();
471
* Sets the amount of time (in milliseconds) that a client should renew its lease. If this value
472
* is not set, the default of five seconds (see DEFAULT_CLIENT_LEASE_PERIOD), will be used. This
473
* value will also be what is given to the client when it initially querys server for leasing
474
* information. If set after create() method called, this value will override value set by
475
* CLIENT_LEASE_PERIOD key.
477
public void setLeasePeriod(long leasePeriodValue)
479
this.leasePeriod = leasePeriodValue;
481
if (leasePeriod <= 0)
483
this.leaseManagement = false;
487
if(connectionNotifier != null && connectionNotifier.size() > 0)
489
this.leaseManagement = true;
494
public Lease getLease(String sessionId)
496
return (Lease) clientLeases.get(sessionId);
500
* Gets the amount of time (in milliseconds) that a client should renew its lease.
502
public long getLeasePeriod()
508
* @jmx:managed-attribute
510
public String getClientConnectAddress()
512
return clientConnectAddress;
515
public int getClientConnectPort()
517
return clientConnectPort;
520
public void setClientConnectPort(int clientConnectPort)
522
this.clientConnectPort = clientConnectPort;
526
* This method should only be called by the service controller when this invoker is specified
527
* within the Connector configuration of a service xml. Calling this directly will have no
528
* effect, as will be used in building the locator uri that is published for detection and this
529
* happens when the invoker is first created and started (after that, no one will be aware of a
532
* @jmx:managed-attribute
534
public void setClientConnectAddress(String clientConnectAddress)
536
this.clientConnectAddress = clientConnectAddress;
539
public String getServerBindAddress()
541
return serverBindAddress;
544
public int getServerBindPort()
546
return serverBindPort;
549
public List getConnectHomes()
551
return new ArrayList(connectHomes);
554
public void setConnectHomes(List connectHomes)
556
this.connectHomes = new ArrayList(connectHomes);
559
public List getHomes()
561
return new ArrayList(homes);
564
public void setHomes(List homes)
566
this.homes = new ArrayList(homes);
570
* Sets the maximum number of thread to be used in the thread pool for one way invocations
571
* (server side). This property is only used when the default oneway thread pool is used. If set
572
* after create() method called, this value will override value set by MAX_NUM_ONEWAY_THREADS_KEY
575
public void setMaxNumberOfOnewayThreads(int numOfThreads)
577
this.maxNumberThreads = numOfThreads;
581
* Gets the maximum number of thread to be used in the thread pool for one way invocations
584
public int getMaxNumberOfOnewayThreads()
586
return this.maxNumberThreads;
590
* Gets the oneway thread pool to use.
592
public ThreadPool getOnewayThreadPool()
594
synchronized (onewayThreadPoolLock)
596
if(onewayThreadPool == null)
598
// if no thread pool class set, then use default BasicThreadPool
599
if(onewayThreadPoolClass == null || onewayThreadPoolClass.length() == 0)
601
BasicThreadPool pool = new BasicThreadPool("JBossRemoting Server Oneway");
602
pool.setMaximumPoolSize(maxNumberThreads);
603
if (maxOnewayThreadPoolQueueSize > 0)
604
pool.setMaximumQueueSize(maxOnewayThreadPoolQueueSize);
605
pool.setBlockingMode(BlockingMode.RUN);
606
onewayThreadPool = pool;
607
log.debug(this + " created new thread pool");
611
//first check to see if this is an ObjectName
612
boolean isObjName = false;
615
ObjectName objName = new ObjectName(onewayThreadPoolClass);
616
onewayThreadPool = createThreadPoolProxy(objName);
619
catch(MalformedObjectNameException e)
621
log.debug("Thread pool class supplied is not an object name.");
628
onewayThreadPool = (ThreadPool)Class.
629
forName(onewayThreadPoolClass, false, getClassLoader()).newInstance();
633
throw new RuntimeException("Error loading instance of ThreadPool based " +
634
"on class name " + onewayThreadPoolClass);
641
log.trace("reusing oneway thread pool");
643
return onewayThreadPool;
648
* Sets the oneway thread pool to use.
650
public void setOnewayThreadPool(ThreadPool pool)
652
this.onewayThreadPool = pool;
655
public MBeanServer getMBeanServer()
660
public void setMBeanServer(MBeanServer server)
662
// This has been added in order to support mbean service configuration. Now supporting
663
// classes, such as the ServerInvokerCallbackHandler can find and use resources such as
664
// CallbackStore, which can be run as a service mbean (and specified via object name within
665
// config). The use of JMX throughout remoting is a problem as now have to tie it in all
666
// throughout the code for service configuration as is being done here. When migrate to use
667
// under new server model, which does not depend on JMX, can rip out code such as this.
668
this.mbeanServer = server;
671
public boolean isRegisterCallbackListeners()
673
return registerCallbackListeners;
676
public void setRegisterCallbackListeners(boolean registerCallbackListeners)
678
this.registerCallbackListeners = registerCallbackListeners;
682
* @return true if a server invocation handler has been registered for this subsystem.
684
public synchronized boolean hasInvocationHandler(String subsystem)
686
return handlers.containsKey(subsystem);
690
* @return an array of keys for each subsystem this invoker can handle.
692
public synchronized String[] getSupportedSubsystems()
694
String subsystems [] = new String[handlers.size()];
695
return (String[]) handlers.keySet().toArray(subsystems);
699
* @return an array of the server invocation handlers.
701
public synchronized ServerInvocationHandler[] getInvocationHandlers()
703
ServerInvocationHandler ih [] = new ServerInvocationHandler[handlers.size()];
704
return (ServerInvocationHandler[]) handlers.values().toArray(ih);
708
* Add a server invocation handler for a particular subsystem. Typically, subsystems are defined
709
* in org.jboss.remoting.Subsystem, however, this can be any string that the caller knows about.
711
* @return previous ServerInvocationHandler with the same sybsystem value (case insensitive) or
712
* null if a previous one did not exist.
714
public synchronized ServerInvocationHandler addInvocationHandler(String subsystem,
715
ServerInvocationHandler handler)
717
handler.setInvoker(this);
719
ServerInvocationHandler oldHandler =
720
(ServerInvocationHandler)handlers.put(subsystem.toUpperCase(), handler);
722
log.debug(this + " added " + handler + " for subsystem '" + subsystem + "'" +
723
(oldHandler == null ? "" : ", replacing old handler " + oldHandler));
725
if (handlers.size() == 1)
727
singleHandler = handler;
731
singleHandler = null;
738
* Remove a subsystem invocation handler.
740
public synchronized ServerInvocationHandler removeInvocationHandler(String subsystem)
742
ServerInvocationHandler handler =
743
(ServerInvocationHandler)handlers.remove(subsystem.toUpperCase());
745
log.debug(this + (handler == null ?
746
" tried to remove handler for " + subsystem + " but no handler found" :
747
" removed handler " + handler + " for subsystem '" + subsystem + "'"));
749
if (handlers.size() == 1)
751
singleHandler = (ServerInvocationHandler)handlers.values().iterator().next();
755
singleHandler = null;
762
* Get a ServerInvocationHandler for a given subsystem type.
764
public synchronized ServerInvocationHandler getInvocationHandler(String subsystem)
766
return (ServerInvocationHandler) handlers.get(subsystem.toUpperCase());
769
protected boolean isUseClientConnectionIdentity()
771
return useClientConnectionIdentity;
774
protected void setUseClientConnectionIdentity(boolean useClientConnectionIdentity)
776
this.useClientConnectionIdentity = useClientConnectionIdentity;
779
public Object invoke(Object invoke) throws IOException
781
InvocationRequest request = null;
782
InvocationResponse response = null;
784
if(trace) { log.trace("server received invocation " + invoke); }
786
if(invoke != null && invoke instanceof InvocationRequest)
788
request = (InvocationRequest) invoke;
792
Object result = invoke(request);
794
response = new InvocationResponse(request.getSessionId(),
795
result, false, request.getReturnPayload());
798
catch(Throwable throwable)
800
response = new InvocationResponse(request.getSessionId(),
801
throwable, true, request.getReturnPayload());
806
log.error("server invoker received " + invoke + " as invocation. " +
807
"Must not be null and must be of type InvocationRequest.");
809
Exception e = new Exception("Error processing invocation request on " + getLocator() +
810
". Either invocation was null or of wrong type.");
813
new InvocationResponse(request.getSessionId(), e, true, request.getReturnPayload());
819
* Processes invocation request depending on the invocation type (internal, name based, oneway,
820
* etc). Can be called on directly when client and server are local to one another (by-passing
823
public Object invoke(InvocationRequest invocation) throws Throwable
827
Object param = invocation.getParameter();
828
Object result = null;
830
if (trace) { log.trace(this + " received " + param); }
832
if (ECHO.equals(param))
837
if (param instanceof String)
839
// check to see if this is a is alive ping
840
if ("$PING$".equals(param))
842
Map metadata = invocation.getRequestPayload();
843
if (metadata != null)
845
String invokerSessionId = (String) metadata.get(INVOKER_SESSION_ID);
846
if (invokerSessionId != null)
848
// Comes from ConnectionValidator configured to tie validation with lease.
849
boolean response = checkForClientLease(invokerSessionId);
850
if (trace) log.trace(this + " responding " + response + " to $PING$ for invoker sessionId " + invokerSessionId);
851
return new Boolean(response);
857
// Otherwise, it's a normal PING. NOTE we only update the lease when we
858
// receive a PING, not for all invocations.
859
updateClientLease(invocation);
862
// if this is an invocation ping, just pong back
863
Map responseMap = new HashMap();
864
responseMap.put(CLIENT_LEASE_PERIOD, new Long(leasePeriod));
866
InvocationResponse ir = new InvocationResponse(invocation.getSessionId(),
867
new Boolean(leaseManagement),
870
if (trace) { log.trace(this + " returning " + ir); }
874
if ("$GET_CLIENT_LOCAL_ADDRESS$".equals(param))
876
InetAddress address = null;
877
if (invocation.getRequestPayload() != null)
878
address = (InetAddress) invocation.getRequestPayload().get(Remoting.CLIENT_ADDRESS);
883
if ("$DISCONNECT$".equals(param))
887
terminateLease(invocation);
890
if (trace) { log.trace(this + " returning null"); }
895
//TODO: -TME both oneway and internal invocation will be broken since have not
896
// deserialized the para yet (removed ClassUtil.deserialize() so would let handler do it).
897
if (param instanceof OnewayInvocation)
899
// no point in delaying return to client if oneway
900
handleOnewayInvocation((OnewayInvocation)param, invocation);
906
String subsystem = invocation.getSubsystem();
907
String clientId = invocation.getSessionId();
909
//I have optimised this, so that if there is only one handler set (a very common case)
910
//then it will just use that without having to do a lookup or HashMap iteration over
913
ServerInvocationHandler handler = findInvocationHandler(subsystem);
915
if (param instanceof InternalInvocation)
917
result = handleInternalInvocation((InternalInvocation)param, invocation, handler);
921
if (trace) { log.trace(this + " dispatching " + invocation + " from client " + clientId + " to subsystem '" + subsystem + "'"); }
925
throw new InvalidConfigurationException(
926
"Can not handle invocation request for subsystem '" + subsystem + "' because " +
927
"there are no matching ServerInvocationHandlers registered. Please add via " +
928
"xml configuration or via the Connector's addInvocationHandler() method.");
930
result = handler.invoke(invocation);
933
if (trace) { log.trace(this + " successfully dispatched invocation, returning " + result + " from subsystem '" + subsystem + "' to client " + clientId); }
940
log.warn(this + " can not process invocation requests since is not in started state!");
941
throw new InvalidStateException(
942
"Can not process invocation request since is not in started state.");
947
* Will get the data type for the marshaller factory so know which marshaller to get to marshal
948
* the data. Will first check the locator uri for a 'datatype' parameter and take that value if
949
* it exists. Otherwise, will use the default datatype for the client invoker, based on
952
public String getDataType()
956
dataType = getDataType(getLocator());
959
dataType = getDefaultDataType();
975
throw new RuntimeException("Error setting up server invoker " + this, e);
982
* Subclasses should override to provide any specific start logic.
984
public void start() throws IOException
987
log.debug(this + " started for locator " + getLocator());
991
* @return true if the server invoker is started, false if not.
993
public boolean isStarted()
999
* Subclasses should override to provide any specific stop logic.
1005
for(Iterator i = callbackHandlers.values().iterator(); i.hasNext(); )
1007
ServerInvokerCallbackHandler callbackHandler = (ServerInvokerCallbackHandler)i.next();
1008
callbackHandler.destroy();
1011
log.debug(this + " stopped");
1015
* Destory the invoker permanently.
1017
public void destroy()
1019
if(classbyteloader != null)
1021
classbyteloader.destroy();
1026
* Sets the server invoker's transport specific configuration. Will need to set before calling
1027
* start() method (or at least stop() and start() again) before configurations will take affect.
1029
public void setConfiguration(Map configuration)
1031
this.configuration = configuration;
1035
* Gets the server invoker's transport specific configuration.
1037
public Map getConfiguration()
1039
return configuration;
1042
public void removeCallbackListener(String subsystem, InvokerCallbackHandler callbackHandler)
1044
ServerInvocationHandler handler = null;
1045
if(subsystem != null)
1047
handler = (ServerInvocationHandler) handlers.get(subsystem.toUpperCase());
1051
// subsystem not specified, so will hope for a default one being set
1052
if(!handlers.isEmpty())
1054
handler = (ServerInvocationHandler) handlers.values().iterator().next();
1057
handler.removeListener(callbackHandler);
1061
* @return the String for the object name to be used for the invoker.
1063
public String getMBeanObjectName()
1065
InvokerLocator locator = getLocator();
1066
StringBuffer buffer =
1067
new StringBuffer("jboss.remoting:service=invoker,transport=" + locator.getProtocol());
1068
String host = locator.getHost();
1069
boolean isIPv6 = host.indexOf("[") >= 0 | host.indexOf(":") >= 0;
1071
buffer.append(",host=");
1073
buffer.append("\"");
1074
buffer.append(locator.getHost());
1076
buffer.append("\"");
1078
buffer.append(",port=").append(locator.getPort());
1079
Map param = locator.getParameters();
1082
Iterator itr = param.keySet().iterator();
1083
while(itr.hasNext())
1086
String key = (String) itr.next();
1087
String value = (String) param.get(key);
1090
buffer.append(value);
1094
return buffer.toString();
1097
// Package protected ----------------------------------------------------------------------------
1099
// Protected ------------------------------------------------------------------------------------
1101
protected abstract String getDefaultDataType();
1103
protected void setup() throws Exception
1105
Map config = getConfiguration();
1106
PortUtil.updateRange(config);
1109
String maxNumOfThreads = (String)config.get(MAX_NUM_ONEWAY_THREADS_KEY);
1111
if(maxNumOfThreads != null && maxNumOfThreads.length() > 0)
1115
maxNumberThreads = Integer.parseInt(maxNumOfThreads);
1117
catch(NumberFormatException e)
1119
log.error("Can not convert max number of threads value (" +
1120
maxNumOfThreads + ") into a number.");
1124
String param = (String) configuration.get(MAX_ONEWAY_THREAD_POOL_QUEUE_SIZE);
1126
if (param != null && param.length() > 0)
1130
maxOnewayThreadPoolQueueSize = Integer.parseInt((String) param);
1132
catch (NumberFormatException e)
1134
log.error("maxOnewayThreadPoolQueueSize parameter has invalid format: " + param);
1138
onewayThreadPoolClass = (String)config.get(ONEWAY_THREAD_POOL_CLASS_KEY);
1140
// get timeout config
1141
String timeoutPeriod = (String)config.get(TIMEOUT);
1142
if(timeoutPeriod != null && timeoutPeriod.length() > 0)
1146
timeout = Integer.parseInt(timeoutPeriod);
1148
catch(NumberFormatException e)
1150
throw new InvalidConfigurationException("Can not set timeout because can not " +
1151
"convert give value (" + timeoutPeriod + ") to a number.");
1155
// config for client lease period
1156
String clientLeasePeriod = (String)config.get(CLIENT_LEASE_PERIOD);
1157
if(clientLeasePeriod != null)
1161
long leasePeriodValue = Long.parseLong(clientLeasePeriod);
1162
setLeasePeriod(leasePeriodValue);
1164
catch(NumberFormatException e)
1166
throw new InvalidConfigurationException("Can not set client lease period because " +
1167
"can not convert given value (" + clientLeasePeriod + ") to a number.");
1171
// config for useClientConnectionIdentity
1172
String useClientConnectionIdentityString = (String)config.get(Remoting.USE_CLIENT_CONNECTION_IDENTITY);
1173
if(useClientConnectionIdentityString != null)
1175
useClientConnectionIdentity = Boolean.valueOf(useClientConnectionIdentityString).booleanValue();
1178
// Inject ConnectionListener
1179
String connectionListener = (String)config.get(CONNECTION_LISTENER);
1180
if (connectionListener != null)
1182
setConnectionListener(connectionListener);
1185
String registerCallbackListenersString = (String)config.get(REGISTER_CALLBACK_LISTENER);
1186
if(registerCallbackListenersString != null)
1188
registerCallbackListeners = Boolean.valueOf(registerCallbackListenersString).booleanValue();
1191
createServerSocketFactory();
1193
// need to check invoker locator to see if need to provide binding address (in the case 0.0.0.0 was used)
1194
final InvokerLocator originalLocator = locator;
1195
locator = InvokerLocator.validateLocator(locator);
1196
if (!locator.getLocatorURI().equals(originalLocator.getLocatorURI())) {
1197
log.debug(this + " original locator: " + originalLocator);
1198
log.debug(this + " new locator: " + locator);
1201
// need to update the locator key used in the invoker registry
1202
AccessController.doPrivileged( new PrivilegedAction()
1206
InvokerRegistry.updateServerInvokerLocator(originalLocator, locator);
1212
protected void setupHomes(Map config) throws Exception
1214
// First try to find address(es) using the new multihome facility.
1215
if (locator.isMultihome())
1217
connectHomes = locator.getConnectHomeList();
1218
Object o = config.get(InvokerLocator.CONNECT_HOMES_KEY);
1221
if (o instanceof Collection)
1222
connectHomes.addAll((Collection) o);
1223
else if (o instanceof String)
1224
connectHomes.addAll(createHomeCollection((String) o));
1226
log.warn(this + ": " + InvokerLocator.CONNECT_HOMES_KEY + " must be a collection or String: " + o);
1229
homes = locator.getHomeList();
1230
o = config.get(InvokerLocator.HOMES_KEY);
1233
if (o instanceof Collection)
1234
homes.addAll((Collection) o);
1235
else if (o instanceof String)
1236
homes.addAll(createHomeCollection((String) o));
1238
log.warn(this + ": " + InvokerLocator.HOMES_KEY + " must be a collection or String: " + o);
1241
if (!homes.isEmpty() && connectHomes.isEmpty())
1242
connectHomes.addAll(homes);
1248
// If no bind address(es) found, try the old way.
1249
String locatorhost = locator.getHost();
1250
InetAddress addr = null;
1251
if(locatorhost != null)
1253
addr = getAddressByName(locatorhost);
1257
addr = getLocalHost();
1260
int port = locator.getPort();
1263
port = assignPort();
1266
// set the bind address
1267
serverBindAddress = (String)config.get(SERVER_BIND_ADDRESS_KEY);
1268
clientConnectAddress = (String)config.get(CLIENT_CONNECT_ADDRESS_KEY);
1269
if(serverBindAddress == null)
1271
if(clientConnectAddress != null)
1273
// can't use uri address, as is for client only
1274
serverBindAddress = getLocalHost().getHostAddress();
1278
serverBindAddress = addr.getHostAddress();
1282
// set the bind port
1283
String serverBindPortString = (String)config.get(SERVER_BIND_PORT_KEY);
1284
String clientConnectPortString = (String)config.get(CLIENT_CONNECT_PORT_KEY);
1285
if(clientConnectPortString != null)
1289
clientConnectPort = Integer.parseInt(clientConnectPortString);
1291
catch(NumberFormatException e)
1293
throw new InvalidConfigurationException("Can not set client bind port because can " +
1294
"not convert given value (" + clientConnectPortString + ") to a number.");
1297
if(serverBindPortString != null)
1301
serverBindPort = Integer.parseInt(serverBindPortString);
1302
if(serverBindPort <= 0)
1304
serverBindPort = assignPort();
1308
catch(NumberFormatException e)
1310
throw new InvalidConfigurationException("Can not set server bind port because can " +
1311
"not convert given value (" + serverBindPortString + ") to a number.");
1316
if(clientConnectPort > 0)
1318
// can't use uri port, as is for client only
1319
serverBindPort = PortUtil.findFreePort(locator.getHost());
1323
serverBindPort = port;
1327
Home h = new Home(serverBindAddress, serverBindPort);
1329
connectHomes.add(h);
1332
protected Collection createHomeCollection(String s)
1334
ArrayList homes = new ArrayList();
1335
StringTokenizer st = new StringTokenizer(s, "!");
1337
while (st.hasMoreTokens())
1339
String token = st.nextToken();
1340
int p = token.indexOf(':');
1342
String portString = null;
1350
host = token.substring(0, p);
1351
portString = token.substring(p + 1);
1357
port = Integer.parseInt(portString);
1361
log.warn("invalid port value in Home: " + token + ", using -1");
1363
Home home = new Home(host, port);
1370
protected int assignPort() throws IOException
1373
port = PortUtil.findFreePort(locator.getHost());
1375
// re-write locator since the port is different
1376
final InvokerLocator newLocator = new InvokerLocator(locator.getProtocol(), locator.getHost(), port,
1377
locator.getPath(), locator.getParameters());
1379
// need to update the locator key used in the invoker registry
1380
AccessController.doPrivileged( new PrivilegedAction()
1384
InvokerRegistry.updateServerInvokerLocator(locator, newLocator);
1389
this.locator = newLocator;
1393
protected void assignPorts() throws IOException
1395
boolean changed = false;
1397
for (int i = 0; i < list.size(); i++)
1399
Home home = (Home) list.get(i);
1405
home.port = PortUtil.findFreePort(home.host);
1409
if (trace) log.trace(this + " unable to find free port for: " + home.host);
1414
locator.setHomeList(homes);
1416
if (connectHomes.size() == homes.size())
1419
for (int i = 0; i < homes.size(); i++)
1421
Home home = (Home) connectHomes.get(i);
1425
home.port = ((Home) homes.get(i)).port;
1429
locator.setConnectHomeList(connectHomes);
1433
protected ServerSocketFactory createServerSocketFactory() throws IOException
1435
// only want to look at config if server socket factory has not already been set
1436
if(serverSocketFactory == null)
1438
Object obj = configuration.get(Remoting.CUSTOM_SERVER_SOCKET_FACTORY);
1441
if (obj instanceof ServerSocketFactory)
1443
serverSocketFactory = (ServerSocketFactory)obj;
1447
throw new RuntimeException("Can not set custom server socket factory (" + obj +
1448
") as is not of type javax.net.SocketFactory");
1452
if (serverSocketFactory == null)
1454
// TODO: -TME This is another big hack because of dependancy on JMX within configuration
1455
// Have to wait till the mbean server is set before can actually set the server socket
1456
// factory since it is an mbean (new server's DI will fix all this). Would prefer this
1457
// to be in the setup() method...
1458
// Also, I can't cast the mbean proxy directly to ServerSocketFactory because it is not
1459
// an interface. Therefore, have to require that ServerSocketFactoryMBean is used.
1461
String serverSocketFactoryString = (String)configuration.get(SERVER_SOCKET_FACTORY);
1462
if(serverSocketFactoryString != null && serverSocketFactoryString.length() > 0)
1466
if(serverSocketFactoryString != null)
1468
MBeanServer server = getMBeanServer();
1469
ObjectName serverSocketFactoryObjName =
1470
new ObjectName(serverSocketFactoryString);
1476
ServerSocketFactoryMBean serverSocketFactoryMBean =
1477
(ServerSocketFactoryMBean)MBeanServerInvocationHandler.
1478
newProxyInstance(server, serverSocketFactoryObjName,
1479
ServerSocketFactoryMBean.class, false);
1480
serverSocketFactory =
1481
new ServerSocketFactoryWrapper(serverSocketFactoryMBean);
1485
log.debug("Error creating mbean proxy for server socket factory " +
1486
"for object name " + serverSocketFactoryObjName + ". " +
1487
"Will try by class name.");
1492
log.debug("The 'serverSocketFactory' attribute was set with a value, " +
1493
"but the MBeanServer reference is null.");
1497
catch(MalformedObjectNameException e)
1499
log.debug("Attibute value (" + serverSocketFactoryString + ") passed is not a " +
1500
"valid ObjectName. Can not look up if is a mbean service. Will try by classname.");
1502
catch(NullPointerException e)
1504
log.debug("Could not set up the server socket factory as a mbean service " +
1505
"due to null pointer exception.");
1508
// couldn't create from object name for mbean service, will try by class name
1509
if(serverSocketFactory == null)
1513
Class cl = ClassLoaderUtility.loadClass(serverSocketFactoryString, getClass());
1515
Constructor serverSocketConstructor = null;
1516
serverSocketConstructor = cl.getConstructor(new Class[]{});
1517
serverSocketFactory =
1518
(ServerSocketFactory)serverSocketConstructor.newInstance(new Object[] {});
1519
log.trace("ServerSocketFactory (" + serverSocketFactoryString + ") loaded");
1523
log.debug("Could not create server socket factory by classname (" +
1524
serverSocketFactoryString + "). Error message: " + e.getMessage());
1531
if (serverSocketFactory == null && needsCustomSSLConfiguration(configuration))
1535
SSLSocketBuilder socketBuilder = new SSLSocketBuilder(configuration);
1536
socketBuilder.setUseSSLServerSocketFactory(false);
1537
serverSocketFactory = socketBuilder.createSSLServerSocketFactory();
1539
catch (IOException e)
1541
throw new RuntimeException("Unable to create customized SSL socket factory", e);
1545
if(serverSocketFactory == null)
1547
log.debug(this + " did not find server socket factory configuration as mbean service " +
1548
"or classname. Creating default server socket factory.");
1550
serverSocketFactory = getDefaultServerSocketFactory();
1553
log.debug(this + " created server socket factory " + serverSocketFactory);
1555
serverSocketFactory = wrapServerSocketFactory(serverSocketFactory, configuration);
1556
return serverSocketFactory;
1560
protected boolean justNeedsSSLClientMode(Map configuration)
1562
if (configuration.size() == 1 &&
1563
configuration.containsKey(SSLSocketBuilder.REMOTING_SERVER_SOCKET_USE_CLIENT_MODE))
1565
String useClientModeString =
1566
(String)configuration.get(SSLSocketBuilder.REMOTING_SERVER_SOCKET_USE_CLIENT_MODE);
1567
return Boolean.valueOf(useClientModeString).booleanValue();
1570
if (configuration.size() == 1 &&
1571
configuration.containsKey(SSLSocketBuilder.REMOTING_SOCKET_USE_CLIENT_MODE))
1573
String useClientModeString =
1574
(String)configuration.get(SSLSocketBuilder.REMOTING_SOCKET_USE_CLIENT_MODE);
1575
return Boolean.valueOf(useClientModeString).booleanValue();
1578
if (configuration.size() == 2
1579
&& configuration.containsKey(SSLSocketBuilder.REMOTING_SERVER_SOCKET_USE_CLIENT_MODE)
1580
&& configuration.containsKey(SSLSocketBuilder.REMOTING_SOCKET_USE_CLIENT_MODE))
1582
String useClientModeString =
1583
(String)configuration.get(SSLSocketBuilder.REMOTING_SERVER_SOCKET_USE_CLIENT_MODE);
1584
return Boolean.valueOf(useClientModeString).booleanValue();
1591
* Gets the default server socket factory to use for the server invoker. The intention is this
1592
* method will be overridden by sub-classes for their specific defaults.
1594
protected ServerSocketFactory getDefaultServerSocketFactory() throws IOException
1596
return ServerSocketFactory.getDefault();
1599
protected ServerSocketFactory wrapServerSocketFactory(ServerSocketFactory ssf, Map config)
1606
Object o = config.get(Remoting.SOCKET_CREATION_SERVER_LISTENER);
1613
if (o instanceof SocketCreationListener)
1615
return new CreationListenerServerSocketFactory(ssf, (SocketCreationListener) o);
1617
else if (o instanceof String)
1621
Class c = ClassLoaderUtility.loadClass((String) o, ServerInvoker.class);
1622
SocketCreationListener listener = (SocketCreationListener)c.newInstance();
1623
return new CreationListenerServerSocketFactory(ssf, listener);
1627
log.warn("unable to instantiate class: " + o, e);
1633
log.warn("unrecognized type for socket creation server listener: " + o);
1639
* Handles both internal and external invocations (internal meaning only to be used within
1640
* remoting and external for ones that go to handlers.
1642
protected Object handleInternalInvocation(InternalInvocation param,
1643
InvocationRequest invocation,
1644
ServerInvocationHandler handler) throws Throwable
1646
Object result = null;
1647
String methodName = param.getMethodName();
1649
if(trace) { log.trace("handling InternalInvocation where method name = " + methodName); }
1651
// check if the invocation is for callback handling
1652
if(InternalInvocation.HANDLECALLBACK.equals(methodName))
1654
String sessionId = ServerInvokerCallbackHandler.getId(invocation);
1655
if(trace) { log.trace("ServerInvoker (" + this + ") is being asked to deliver callback on client callback handler with session id of " + sessionId + "."); }
1657
CallbackContainer callbackContainer = null;
1659
if (singleCallbackContainer != null)
1661
callbackContainer = singleCallbackContainer;
1665
callbackContainer = (CallbackContainer)clientCallbackListener.get(sessionId);
1668
if(callbackContainer != null && callbackContainer.getCallbackHandler() != null)
1670
Object[] params = param.getParameters();
1672
Callback callbackRequest = (Callback) params[0];
1674
Object obj = callbackContainer.getCallbackHandleObject();
1678
Map callbackHandleObject = callbackRequest.getReturnPayload();
1680
if(callbackHandleObject == null)
1682
callbackHandleObject = new HashMap();
1685
//We only want to add it if it is not null otherwise is a redundant operation
1686
callbackHandleObject.put(Callback.CALLBACK_HANDLE_OBJECT_KEY,
1689
callbackRequest.setReturnPayload(callbackHandleObject);
1692
InvokerCallbackHandler callbackHandler = callbackContainer.getCallbackHandler();
1694
callbackHandler.handleCallback(callbackRequest);
1698
log.error("Could not find callback handler to call upon for handleCallback " +
1699
"where session id equals " + sessionId);
1702
else if(InternalInvocation.ADDLISTENER.equals(methodName))
1706
throw new InvalidConfigurationException(
1707
"Can not accept a callback listener since there are no ServerInvocationHandlers " +
1708
"registered. Please add via xml configuration or via the Connector's " +
1709
"addInvocationHandler() method.");
1712
ServerInvokerCallbackHandler callbackHandler = getCallbackHandler(invocation);
1713
if (registerCallbackListeners)
1715
connectionNotifier.addListenerFirst(callbackHandler);
1718
leaseManagement = true;
1721
handler.addListener(callbackHandler);
1723
else if(InternalInvocation.REMOVELISTENER.equals(methodName))
1725
ServerInvokerCallbackHandler callbackHandler = removeCallbackHandler(invocation);
1726
if(callbackHandler != null)
1728
if (registerCallbackListeners)
1730
// connectionNotifier.removeListener(callbackHandler);
1731
removeConnectionListener(callbackHandler);
1734
callbackHandler.destroy();
1738
throw new InvalidConfigurationException(
1739
"Can not remove a callback listener since there are no ServerInvocationHandlers " +
1740
"registered. Please add via xml configuration or via the Connector's " +
1741
"addInvocationHandler() method.");
1744
handler.removeListener(callbackHandler);
1746
if(trace) { log.trace("ServerInvoker (" + this + ") removing server callback handler " + callbackHandler + "."); }
1750
String sessionId = ServerInvokerCallbackHandler.getId(invocation);
1751
throw new RuntimeException("Can not remove callback listener from target server with " +
1752
"id of " + sessionId + " as it does not exist as a registered callback listener.");
1755
else if(InternalInvocation.GETCALLBACKS.equals(methodName))
1757
ServerInvokerCallbackHandler callbackHandler = getCallbackHandler(invocation);
1758
if(trace) { log.trace("ServerInvoker (" + this + ") getting callbacks for callback handler " + callbackHandler + "."); }
1759
result = callbackHandler.getCallbacks(invocation.getRequestPayload());
1761
else if(InternalInvocation.ACKNOWLEDGECALLBACK.equals(methodName))
1763
ServerInvokerCallbackHandler callbackHandler = getCallbackHandler(invocation);
1764
if(trace) { log.trace("ServerInvoker (" + this + ") acknowledge callback on callback handler " + callbackHandler + "."); }
1765
callbackHandler.acknowledgeCallbacks(param);
1767
else if(InternalInvocation.ADDCLIENTLISTENER.equals(methodName))
1769
String sessionId = ServerInvokerCallbackHandler.getId(invocation);
1770
Object[] params = param.getParameters();
1772
// the only elements should be the callback handler and possibly the callback handle object
1773
if(params == null || params.length < 0 || params.length > 3)
1775
log.debug("Received addClientListener InternalInvocation, but getParameters() " +
1776
"returned: " + params);
1777
throw new RuntimeException(
1778
"InvokerCallbackHandler and callback handle object (optional) must be supplied as " +
1779
"the only parameter objects within the InternalInvocation when calling " +
1780
"addClientListener.");
1783
InvokerCallbackHandler callbackHandler = (InvokerCallbackHandler)params[0];
1784
Object callbackHandleObject = params[1];
1785
CallbackContainer callbackContainer =
1786
new CallbackContainer(callbackHandler, callbackHandleObject);
1788
clientCallbackListener.put(sessionId, callbackContainer);
1790
//If there is only one CallbackContainer we store a direct reference to it to avoid
1791
//unnecessary lookups - TLF
1792
if (clientCallbackListener.size() == 1)
1794
singleCallbackContainer = callbackContainer;
1798
singleCallbackContainer = null;
1801
log.debug("ServerInvoker (" + this + ") added client callback handler " + callbackHandler +
1802
" with session id of " + sessionId + " and callback handle object of " +
1803
callbackHandleObject + ".");
1806
else if(InternalInvocation.REMOVECLIENTLISTENER.equals(methodName))
1808
String sessionId = ServerInvokerCallbackHandler.getId(invocation);
1810
log.debug("ServerInvoker (" + this + ") removing client callback handler with session " +
1811
"id of " + sessionId + ".");
1813
Object cbo = clientCallbackListener.remove(sessionId);
1816
throw new RuntimeException(
1817
"Can not remove callback listener from callback server with id of " + sessionId +
1818
" as it does not exist as a registered callback listener.");
1820
//If there is only one CallbackContainer we store a direct reference to it to avoid
1821
//unnecessary lookups - TLF
1822
if (clientCallbackListener.size() == 1)
1824
singleCallbackContainer =
1825
(CallbackContainer)clientCallbackListener.values().iterator().next();
1829
singleCallbackContainer = null;
1834
else if(InternalInvocation.ADDSTREAMCALLBACK.equals(methodName))
1836
StreamHandler streamHandler = getStreamHandler(invocation);
1837
if(handler instanceof StreamInvocationHandler)
1839
InternalInvocation inv = (InternalInvocation)invocation.getParameter();
1840
// second parameter should be the param payload
1841
result = ((StreamInvocationHandler)handler).
1842
handleStream(streamHandler, (InvocationRequest)inv.getParameters()[1]);
1846
log.debug("Client request is an InputStream, but the registered handlers do not " +
1847
"implement the StreamInvocationHandler interface, so could not process call.");
1848
throw new RuntimeException(
1849
"No handler registered of proper type (StreamInvocationHandler).");
1852
else if (InternalInvocation.ECHO.equals(methodName))
1854
Object response = null;
1855
Object[] objects = param.getParameters();
1856
if (objects != null && objects.length > 0)
1857
response = objects[0];
1861
log.trace(this + " echoing " + response);
1867
log.debug("Error processing InternalInvocation. Unable to process method " +
1868
methodName + ". Please make sure this should be an InternalInvocation.");
1869
throw new RuntimeException(
1870
"Error processing InternalInvocation. Unable to process method " + methodName);
1875
protected ServerInvocationHandler findInvocationHandler(String subsystem)
1877
ServerInvocationHandler handler = null;
1879
if (singleHandler != null)
1881
handler = singleHandler;
1885
if (subsystem != null)
1887
handler = (ServerInvocationHandler)handlers.get(subsystem.toUpperCase());
1891
// subsystem not specified, so will hope for a default one being set
1892
if (!handlers.isEmpty())
1894
if (trace) { log.trace(this + " handling invocation with no subsystem explicitely specified, using the default handler"); }
1895
handler = (ServerInvocationHandler)handlers.values().iterator().next();
1903
* Called prior to an invocation.
1904
* TODO is sending in the arg appropriate?
1906
protected void preProcess(String sessionId, ClassBytes arg, Map payload, InvokerLocator locator)
1911
* Called after an invocation.
1912
* TODO is sending in the arg appropriate?
1914
protected void postProcess(String sessionId, Object param, Map payload, InvokerLocator locator)
1918
// Private --------------------------------------------------------------------------------------
1920
private ThreadPool createThreadPoolProxy(ObjectName objName)
1923
MBeanServer server = getMBeanServer();
1926
ThreadPoolMBean poolMBean = (ThreadPoolMBean)MBeanServerInvocationHandler.
1927
newProxyInstance(server, objName, ThreadPoolMBean.class, false);
1929
pool = poolMBean.getInstance();
1933
throw new RuntimeException("Can not register MBean ThreadPool as the ServerInvoker " +
1934
"has not been registered with a MBeanServer.");
1939
//TODO: -TME getting of datatype is duplicated in both the RemoteClientInvoker and the ServerInvoker
1940
private String getDataType(InvokerLocator locator)
1946
Map params = locator.getParameters();
1949
type = (String) params.get(InvokerLocator.DATATYPE);
1955
private void terminateLease(InvocationRequest invocation)
1957
if (invocation != null)
1959
// clientSessionId == MicroRemoteClientInvoker.invokerSessionID.
1960
String clientSessionId = invocation.getSessionId();
1961
Lease clientLease = (Lease)clientLeases.get(clientSessionId);
1963
if (clientLease != null)
1965
boolean clientOnlyTerminated = false;
1966
// now have to determine if is just Client that disconnected
1967
// or if all Clients disconnected, thus the client invoker
1968
// is also disconnected as well.
1969
Map reqMap = invocation.getRequestPayload();
1972
Object holderObj = reqMap.get(ClientHolder.CLIENT_HOLDER_KEY);
1973
if (holderObj != null && holderObj instanceof ClientHolder)
1975
// just a client that disconnected, so only need to terminate lease for
1976
// that particular client (by client session id).
1977
if (trace) log.trace("terminating client lease: " + clientSessionId);
1978
ClientHolder holder = (ClientHolder) holderObj;
1979
clientLease.terminateLease(holder.getSessionId());
1980
clientOnlyTerminated = true;
1984
// now see if client invoker needs to be terminated
1985
if (!clientOnlyTerminated)
1987
if (trace) log.trace("terminating invoker lease: " + clientSessionId);
1988
clientLease.terminateLease(clientSessionId);
1989
clientLeases.remove(clientSessionId);
1994
String type = "invoker";
1995
Map reqMap = invocation.getRequestPayload();
1998
Object holderObj = reqMap.get(ClientHolder.CLIENT_HOLDER_KEY);
1999
if (holderObj != null && holderObj instanceof ClientHolder)
2004
log.debug("Asked to terminate " + type + " lease for invoker session id "
2005
+ clientSessionId + ", but lease for this id could not be found." +"" +
2006
"Probably has been removed due to connection failure.");
2011
private void updateClientLease(InvocationRequest invocation)
2013
if(invocation != null)
2015
String clientSessionId = invocation.getSessionId();
2016
if (invocation.getRequestPayload() != null)
2018
// Remove per invocation timeout.
2019
invocation.getRequestPayload().remove(TIMEOUT);
2021
if(clientSessionId != null)
2023
if(trace) { log.trace("Getting lease for invoker session id: " + clientSessionId); }
2025
Lease clientLease = (Lease)clientLeases.get(clientSessionId);
2026
if(clientLease == null)
2028
Lease newClientLease = new Lease(clientSessionId, leasePeriod,
2029
locator.getLocatorURI(),
2030
invocation.getRequestPayload(),
2034
clientLeases.put(clientSessionId, newClientLease);
2035
newClientLease.startLease();
2037
if(trace) { log.trace("No lease established for invoker session id (" + clientSessionId +
2038
"), so starting a new one:" + newClientLease); }
2042
if (useClientConnectionIdentity)
2044
String leasePingerId = (String) invocation.getRequestPayload().get(LeasePinger.LEASE_PINGER_ID);;
2045
if (leasePingerId == null || leasePingerId.equals(clientLease.getLeasePingerId()))
2047
// including request payload from invocation as may contain updated list of clients.
2048
if (trace) log.trace(clientLease + " matches: leasePingerId: " + leasePingerId);
2049
clientLease.updateLease(leasePeriod, invocation.getRequestPayload());
2050
if(trace) { log.trace("Updated lease for invoker session id (" + clientSessionId + ")"); }
2054
if (trace) log.trace(clientLease + " does not match: leasePingerId: " + leasePingerId);
2055
if (trace) log.trace("terminating invoker lease: " + clientLease);
2056
clientLease.terminateLeaseUponFailure(clientSessionId);
2057
clientLeases.remove(clientSessionId);
2059
Lease newClientLease = new Lease(clientSessionId, leasePeriod,
2060
locator.getLocatorURI(),
2061
invocation.getRequestPayload(),
2065
clientLeases.put(clientSessionId, newClientLease);
2066
newClientLease.startLease();
2068
if(trace) { log.trace("starting a new lease:" + newClientLease); }
2073
// including request payload from invocation as may contain updated list of clients.
2074
clientLease.updateLease(leasePeriod, invocation.getRequestPayload());
2076
if(trace) { log.trace("Updated lease for client session id (" + clientSessionId + ")"); }
2083
private boolean checkForClientLease(String invokerSessionId)
2085
if(leaseManagement && invokerSessionId != null)
2087
if(trace) { log.trace("Checking lease for invoker session id: " + invokerSessionId); }
2089
Lease clientLease = (Lease)clientLeases.get(invokerSessionId);
2090
if(clientLease == null)
2092
if(trace) { log.trace("No lease established for invoker session id (" + invokerSessionId + ")"); }
2097
if(trace) { log.trace("Found lease for invoker session id (" + invokerSessionId + ")"); }
2106
* Takes the real invocation from the client out of the OnewayInvocation and then executes the
2107
* invoke() with the real invocation on a seperate thread.
2109
private void handleOnewayInvocation(OnewayInvocation onewayInvocation,
2110
InvocationRequest invocation) throws Throwable
2112
Object[] objs = onewayInvocation.getParameters();
2114
// The oneway invocation should contain the real param as it's only param in parameter array
2115
Object realParam = objs[0];
2116
invocation.setParameter(realParam);
2118
final InvocationRequest newInvocation = invocation;
2120
ThreadPool executor = getOnewayThreadPool();
2121
Runnable onewayRun = new Runnable()
2127
invoke(newInvocation);
2131
// throw away exception since can't get it back to original caller
2132
log.error("Error executing server oneway invocation request: " + newInvocation, e);
2137
if(trace) { log.trace(this + " placing " + invocation + " in onewayThreadPool"); }
2138
executor.run(onewayRun);
2141
private StreamHandler getStreamHandler(InvocationRequest invocation) throws Exception
2143
InternalInvocation inv = (InternalInvocation)invocation.getParameter();
2144
String locator = (String)inv.getParameters()[0];
2145
return new StreamHandler(locator);
2148
private ServerInvokerCallbackHandler getCallbackHandler(InvocationRequest invocation)
2151
ServerInvokerCallbackHandler callbackHandler = null;
2152
String id = ServerInvokerCallbackHandler.getId(invocation);
2153
synchronized(callbackHandlers)
2155
callbackHandler = (ServerInvokerCallbackHandler)callbackHandlers.get(id);
2157
// if does not exist, create it
2158
if(callbackHandler == null)
2160
callbackHandler = new ServerInvokerCallbackHandler(invocation, getLocator(), this);
2161
callbackHandlers.put(id, callbackHandler);
2165
callbackHandler.connect();
2166
if(trace) { log.trace("ServerInvoker (" + this + ") adding server callback handler " + callbackHandler + " with id of " + id + "."); }
2167
return callbackHandler;
2170
public ServerInvokerCallbackHandler removeCallbackHandler(InvocationRequest invocation)
2172
String id = ServerInvokerCallbackHandler.getId(invocation);
2173
ServerInvokerCallbackHandler callbackHandler = null;
2175
synchronized(callbackHandlers)
2177
callbackHandler = (ServerInvokerCallbackHandler) callbackHandlers.remove(id);
2179
log.debug(this + " removed " + callbackHandler);
2180
return callbackHandler;
2183
public void shutdownCallbackHandler(ServerInvokerCallbackHandler callbackHandler, InvocationRequest invocation)
2185
removeCallbackHandler(invocation);
2186
if (registerCallbackListeners)
2188
removeConnectionListener(callbackHandler);
2190
ServerInvocationHandler handler = findInvocationHandler(invocation.getSessionId());
2191
if (handler != null)
2193
handler.removeListener(callbackHandler);
2194
if(trace) { log.trace(this + " removing server callback handler " + callbackHandler + "."); }
2198
log.debug(this + " cannot remove " + callbackHandler + ": associated ServerInvocationHandler not longer exists");
2202
// Inner classes --------------------------------------------------------------------------------
2204
public static class InvalidStateException extends Exception
2206
public InvalidStateException(String msg)
2212
private class CallbackContainer
2214
private InvokerCallbackHandler handler;
2215
private Object handleObject;
2217
public CallbackContainer(InvokerCallbackHandler handler, Object handleObject)
2219
this.handler = handler;
2220
this.handleObject = handleObject;
2223
public InvokerCallbackHandler getCallbackHandler()
2228
public Object getCallbackHandleObject()
2230
return handleObject;
2234
static private InetAddress getLocalHost() throws UnknownHostException
2236
if (SecurityUtility.skipAccessControl())
2238
return doGetLocalHost();
2243
return (InetAddress) AccessController.doPrivileged( new PrivilegedExceptionAction()
2245
public Object run() throws UnknownHostException
2247
return doGetLocalHost();
2251
catch (PrivilegedActionException e)
2253
throw (UnknownHostException) e.getCause();
2257
static private InetAddress doGetLocalHost() throws UnknownHostException
2259
if (LOCAL_HOST != null)
2266
return InetAddress.getLocalHost();
2268
catch (UnknownHostException e)
2270
return InetAddress.getByName("127.0.0.1");
2274
static private InetAddress getAddressByName(final String host) throws UnknownHostException
2276
if (SecurityUtility.skipAccessControl())
2278
return InetAddress.getByName(host);
2283
return (InetAddress)AccessController.doPrivileged( new PrivilegedExceptionAction()
2285
public Object run() throws IOException
2287
return InetAddress.getByName(host);
2291
catch (PrivilegedActionException e)
2293
throw (UnknownHostException) e.getCause();