2
* JBoss, Home of Professional Open Source
3
* Copyright 2005, JBoss Inc., and individual contributors as indicated
4
* by the @authors tag. See the copyright.txt in the distribution for a
5
* full listing of individual contributors.
7
* This is free software; you can redistribute it and/or modify it
8
* under the terms of the GNU Lesser General Public License as
9
* published by the Free Software Foundation; either version 2.1 of
10
* the License, or (at your option) any later version.
12
* This software is distributed in the hope that it will be useful,
13
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15
* Lesser General Public License for more details.
17
* You should have received a copy of the GNU Lesser General Public
18
* License along with this software; if not, write to the Free
19
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
23
package org.jboss.remoting;
25
import org.jboss.logging.Logger;
26
import org.jboss.remoting.callback.Callback;
27
import org.jboss.remoting.callback.CallbackPoller;
28
import org.jboss.remoting.callback.InvokerCallbackHandler;
29
import org.jboss.remoting.callback.ServerInvokerCallbackHandler;
30
import org.jboss.remoting.invocation.InternalInvocation;
31
import org.jboss.remoting.invocation.OnewayInvocation;
32
import org.jboss.remoting.marshal.Marshaller;
33
import org.jboss.remoting.marshal.UnMarshaller;
34
import org.jboss.remoting.security.SSLSocketBuilder;
35
import org.jboss.remoting.stream.StreamServer;
36
import org.jboss.remoting.transport.BidirectionalClientInvoker;
37
import org.jboss.remoting.transport.ClientInvoker;
38
import org.jboss.remoting.transport.Connector;
39
import org.jboss.remoting.transport.PortUtil;
40
import org.jboss.remoting.transport.local.LocalClientInvoker;
41
import org.jboss.remoting.util.SecurityUtility;
42
import org.jboss.util.id.GUID;
43
import org.jboss.util.threadpool.BasicThreadPool;
44
import org.jboss.util.threadpool.BlockingMode;
45
import org.jboss.util.threadpool.ThreadPool;
47
import javax.net.SocketFactory;
48
import java.io.Externalizable;
49
import java.io.IOException;
50
import java.io.InputStream;
51
import java.io.ObjectInput;
52
import java.io.ObjectOutput;
53
import java.io.StreamCorruptedException;
54
import java.lang.ref.WeakReference;
55
import java.net.InetAddress;
56
import java.net.SocketTimeoutException;
57
import java.net.UnknownHostException;
58
import java.rmi.MarshalException;
59
import java.security.AccessController;
60
import java.security.PrivilegedAction;
61
import java.security.PrivilegedActionException;
62
import java.security.PrivilegedExceptionAction;
63
import java.util.ArrayList;
64
import java.util.HashMap;
65
import java.util.HashSet;
66
import java.util.Iterator;
67
import java.util.List;
70
import java.util.Timer;
71
import java.util.TimerTask;
74
* Client is a convience class for invoking remote methods for a given subsystem. It is intended to
75
* be the main user interface for making remote invocation on the client side.
77
* @author <a href="mailto:jhaynie@vocalocity.net">Jeff Haynie</a>
78
* @author <a href="mailto:telrod@e2technologies.net">Tom Elrod</a>
79
* @author <a href="mailto:ovidiu@jboss.org">Ovidiu Feodorov</a>
81
* @version $Revision: 5458 $
83
public class Client implements Externalizable
85
// Constants ------------------------------------------------------------------------------------
88
* Key to be used to determine if invocation is to be oneway (async).
90
public static final String ONEWAY_FLAG = "oneway";
93
* Key to be used when tracking callback listeners.
95
public static final String LISTENER_ID_KEY = "listenerId";
98
* Specifies the default number of work threads in the pool for executing one way invocations on
99
* the client. Value is 10.
101
public static final int MAX_NUM_ONEWAY_THREADS_DEFAULT = 10;
104
* The key to use for the metadata Map passed when making a invoke() call and wish for the
105
* invocation payload to be sent as is and not wrapped within a remoting invocation request
106
* object. This should be used when want to make direct calls on systems outside of remoting
107
* (e.g. making a http POST request to a web service).
109
public static final String RAW = "rawPayload";
112
* Key for the configuration map passed to the Client constructor to indicate that client should
113
* make initial request to establish lease with server. The value for this should be either a
114
* String that java.lang.Boolean can evaluate or a java.lang.Boolean. Client leasing is turned
115
* off by default, so would need to use this property to turn client leasing on.
117
public static final String ENABLE_LEASE = "enableLease";
120
* Key for the configuration map passed to the Client constructor providing a ssl
121
* javax.net.ssl.HandshakeCompletedListener implementation, which will be called on when ssl
122
* handshake completed with server.
124
public static final String HANDSHAKE_COMPLETED_LISTENER = "handshakeCompletedListener";
127
* Key for the configuration when adding a callback handler and internal callback server
128
* connector is created. The value should be the transport protocol to be used. By default will
129
* use the same protocol as being used by this client (e.g. http, socket, rmi, multiplex, etc.)
131
public static final String CALLBACK_SERVER_PROTOCOL = "callbackServerProtocol";
134
* Key for the configuration when adding a callback handler and internal callback server
135
* connector is created. The value should be the host name to be used. By default will use the
136
* result of calling InetAddress.getLocalHost().getHostAddress().
138
public static final String CALLBACK_SERVER_HOST = "callbackServerHost";
141
* Key for the configuration when adding a callback handler and internal callback server
142
* connector is created. The value should be the port to be used. By default will find a random
145
public static final String CALLBACK_SERVER_PORT = "callbackServerPort";
148
* Key for the configuration map that determines the threadpool size for asynchrouous invocations.
150
public static final String MAX_NUM_ONEWAY_THREADS = "maxNumThreadsOneway";
153
* Key for the configuration map that determines the queue size for waiting asynchronous
156
public static final String MAX_ONEWAY_THREAD_POOL_QUEUE_SIZE = "maxOnewayThreadPoolQueueSize";
159
* Default timeout period for network i/o in disconnect() and removeListener().
160
* -1 indicates that no special per invocation timeout will be set.
162
public static final int DEFAULT_DISCONNECT_TIMEOUT = -1;
165
* Key for setting delay before client invoker is destroyed by disconnect().
167
public static final String INVOKER_DESTRUCTION_DELAY = "invokerDestructionDelay";
169
public static final String THROW_CALLBACK_EXCEPTION = "throwCallbackException";
171
private static Map connectionValidators = new HashMap();
172
private static Object connectionValidatorLock = new Object();
174
static final String CLIENT = "client";
175
static final String CONNECTION_LISTENER = "connectionListener";
177
/** The key to use to specify that parameters for objects created by Client should be taken,
178
* in addition to the metadata map, from the InvokerLocator and from the Client's configuration map.
180
public static final String USE_ALL_PARAMS = "useAllParams";
182
private static final Logger log = Logger.getLogger(Client.class);
183
private static boolean trace = log.isTraceEnabled();
185
private static final long serialVersionUID = 5679279425009837934L;
187
private static Timer invokerDestructionTimer;
188
private static Object invokerDestructionTimerLock = new Object();
190
// Static ---------------------------------------------------------------------------------------
192
// Attributes -----------------------------------------------------------------------------------
195
* Indicated the max number of threads used within oneway thread pool.
197
private int maxNumberThreads = MAX_NUM_ONEWAY_THREADS_DEFAULT;
198
private int maxOnewayThreadPoolQueueSize = -1;
199
private ClientInvoker invoker;
200
private ClassLoader classloader;
201
private String subsystem;
202
private String sessionId;
203
private Object onewayThreadPoolLock = new Object();
204
private ThreadPool onewayThreadPool;
205
private InvokerLocator locator;
207
private ConnectionValidator connectionValidator = null;
208
private ConnectionValidatorKey connectionValidatorKey;
209
private Map configuration = new HashMap();
211
private Map callbackConnectors = new HashMap();
212
private Map callbackPollers = new HashMap();
214
private Map listeners = new HashMap();
216
private SocketFactory socketFactory;
218
private int disconnectTimeout = DEFAULT_DISCONNECT_TIMEOUT;
220
private boolean connected = false;
222
private int invokerDestructionDelay = 0;
224
private Set connectionListeners = new HashSet();
226
private boolean useClientConnectionIdentity;
228
// Constructors ---------------------------------------------------------------------------------
231
* PLEASE DO NOT USE THIS CONSTRUCTOR OR YOUR COMPUTER WILL BURST INTO FLAMES!!!
232
* It is only here so can externalize object and will provide a dead object if invoker is not
233
* explicitly set. Please use other contructors provided.
240
* Constructs a remoting client with intended target server specified via the locator, without
241
* specifing a remote subsystem or including any metadata. Same as calling Client(locator, null,
244
public Client(InvokerLocator locator) throws Exception
246
this(locator, null, null);
250
* Constructs a remoting client with intended target server specified via the locator and
251
* configuration metadata. The metadata supplied will be used when creating client invoker (in
252
* the case specific data is required) and also for passing along additional data to connection
253
* listeners on the server side in the case that the client fails, will be able to use this extra
254
* information when notified.
256
public Client(InvokerLocator locator, Map configuration) throws Exception
258
this(locator, null, configuration);
262
* Constructs a remoting client with intended target server specified via the locator and
263
* intended subsystem on server for invocations to be routed to.
265
public Client(InvokerLocator locator, String subsystem) throws Exception
267
this(locator, subsystem, null);
271
* Constructs a remoting client with intended target server specified via the locator, intended
272
* subsystem on the server for invocations to be routed to, and configuration metadata. The
273
* metadata supplied will be used when creating client invoker (in the case specific data is
274
* required) and also for passing along additional data to connection listeners on the server
275
* side in the case that the client fails, will be able to use this extra information when
278
public Client(InvokerLocator locator, String subsystem, Map configuration) throws Exception
280
this(null, locator, subsystem, configuration);
284
* Constructs a remoting client with intended target server specified via the locator, intended
285
* subsystem on the server for invocations to be routed to, and configuration metadata. The
286
* metadata supplied will be used when creating client invoker (in the case specific data is
287
* required) and also for passing along additional data to connection listeners on the server
288
* side in the case that the client fails, will be able to use this extra information when
289
* notified (which will happen when connect() method is called.
291
* @param cl - the classloader that should be used by remoting.
292
* @deprecated This constructor should not be used any more as will no longer take into account
293
* the classloader specified as a parameter.
295
public Client(ClassLoader cl, InvokerLocator locator, String subsystem, Map configuration)
300
this.classloader = (ClassLoader) AccessController.doPrivileged( new PrivilegedAction()
304
return Thread.currentThread().getContextClassLoader();
310
this.classloader = cl;
312
this.locator = locator;
313
this.subsystem = subsystem == null ? null : subsystem.toUpperCase();
314
if (configuration != null)
316
this.configuration = new HashMap(configuration);
318
this.sessionId = new GUID().toString();
323
* Constructs a remoting client with intended target server specified via the locator and
324
* intended subsystem on server for invocations to be routed to.
326
* @deprecated This constructor should not be used any more as will no longer take into account
327
* the classloader specified as a parameter.
329
public Client(ClassLoader cl, ClientInvoker invoker, String subsystem) throws Exception
331
this.classloader = cl;
332
this.subsystem = subsystem == null ? null : subsystem.toUpperCase();
333
this.invoker = invoker;
334
this.sessionId = new GUID().toString();
337
// Externalizable implementation ----------------------------------------------------------------
339
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
341
int version = in.readInt();
345
case Version.VERSION_2:
346
case Version.VERSION_2_2:
348
this.locator = (InvokerLocator) in.readObject();
349
this.subsystem = (String) in.readObject();
350
this.configuration = (Map) in.readObject();
351
boolean wasConnected = in.readBoolean();
353
this.classloader = (ClassLoader) AccessController.doPrivileged( new PrivilegedAction()
357
return Thread.currentThread().getContextClassLoader();
363
this.invoker = InvokerRegistry.createClientInvoker(locator, configuration);
372
throw new IOException(e.getMessage());
378
throw new StreamCorruptedException("Unkown version seen: " + version);
382
public void writeExternal(ObjectOutput out) throws IOException
384
out.writeInt(Version.getDefaultVersion());
385
out.writeObject(invoker != null ? invoker.getLocator() : locator);
386
out.writeObject(subsystem);
387
out.writeObject(configuration);
388
out.writeBoolean(isConnected());
392
// Public ---------------------------------------------------------------------------------------
395
* Adds a connection listener that will be notified if/when the connection to the server fails
396
* while the client is idle (no calls being made). The default behavior is to ping for connection
399
public void addConnectionListener(ConnectionListener listener)
401
HashMap metadata = new HashMap();
402
if (configuration.get(ConnectionValidator.VALIDATOR_PING_PERIOD) == null &&
403
locator.getParameters().get(ConnectionValidator.VALIDATOR_PING_PERIOD) == null)
405
String pingPeriod = Long.toString(ConnectionValidator.DEFAULT_PING_PERIOD);
406
metadata.put(ConnectionValidator.VALIDATOR_PING_PERIOD, pingPeriod);
408
addConnectionListener(listener, metadata);
412
* Adds a connection listener that will be notified if/when the connection to the server fails
413
* while the client is idle (no calls being made). The current behavior is to ping the server
414
* periodically. The time period is defined by the pingPeriod (which should be in milliseconds).
416
public void addConnectionListener(ConnectionListener listener, int pingPeriod)
418
HashMap metadata = new HashMap();
419
metadata.put(ConnectionValidator.VALIDATOR_PING_PERIOD, Integer.toString(pingPeriod));
420
addConnectionListener(listener, metadata);
424
* Adds a connection listener that will be notified if/when the connection to the server fails
425
* while the client is idle (no calls being made). The current behavior is to ping the server
426
* periodically. Various parameters may be specified in metadata.
428
* @see org.jboss.remoting.ConnectionValidator
430
public void addConnectionListener(ConnectionListener listener, Map metadata)
434
throw new RuntimeException("Can not add connection listener to remoting client " +
435
"while client is not connected.");
439
// if local, then no point in having connection listener
440
if (invoker instanceof LocalClientInvoker)
446
synchronized (connectionValidatorLock)
448
if (trace) log.trace(this + " in addConnectionListener()");
449
if (connectionValidator == null)
451
Map map = new HashMap(configuration);
452
map.putAll(metadata);
453
connectionValidatorKey = new ConnectionValidatorKey(invoker, map);
454
WeakReference ref = (WeakReference) connectionValidators.get(connectionValidatorKey);
457
connectionValidator = new ConnectionValidator(this, metadata);
458
connectionValidators.put(connectionValidatorKey, new WeakReference(connectionValidator));
459
connectionValidator.addConnectionListener(this, listener);
460
if (trace) log.trace(this + ": created " + connectionValidator);
464
connectionValidator = (ConnectionValidator) ref.get();
465
if (connectionValidator.addConnectionListener(this, listener))
467
if (trace) log.trace(this + ": reusing from static table: " + connectionValidator);
471
if (trace) log.trace(this + ": unable to reuse existing ConnectionValidator in static map: " + connectionValidator);
472
connectionValidator = new ConnectionValidator(this, metadata);
473
connectionValidators.put(connectionValidatorKey, new WeakReference(connectionValidator));
474
connectionValidator.addConnectionListener(this, listener);
475
if (trace) log.trace(this + ": current ConnectionValidator is stopped: created " + connectionValidator);
481
if (connectionValidator.addConnectionListener(this, listener))
483
if (trace) log.trace(this + ": reusing from local reference: " + connectionValidator);
487
if (trace) log.trace(this + ": unable to reuse ConnectionValidator from local reference: " + connectionValidator);
488
connectionValidator = new ConnectionValidator(this, metadata);
489
connectionValidators.put(connectionValidatorKey, new WeakReference(connectionValidator));
490
connectionValidator.addConnectionListener(this, listener);
491
if (trace) log.trace(this + ": current ConnectionValidator is stopped: created " + connectionValidator);
495
connectionListeners.add(listener);
500
* Removes specified connection listener. Will return true if it has already been registered,
503
public boolean removeConnectionListener(ConnectionListener listener)
505
if (trace) log.trace(this + ".removeConnectionListener(" + listener + ")");
506
boolean isRemoved = false;
507
synchronized (connectionValidatorLock)
509
if (connectionValidator == null)
513
isRemoved = connectionValidator.removeConnectionListener(this, listener);
514
if (connectionValidator.isStopped())
516
if (connectionValidators.remove(connectionValidatorKey) != null)
518
log.debug(this + ".removeConnectionListener() removed from static map: " + connectionValidator);
520
connectionValidator = null;
521
connectionValidatorKey = null;
523
connectionListeners.remove(listener);
524
if (connectionListeners.isEmpty())
526
connectionValidator = null;
527
connectionValidatorKey = null;
529
if (connectionValidator == null)
531
if (trace) log.trace(this + " set connectionValidator to null");
538
* This will set the session id used when making invocations on server invokers. There is a
539
* default unique id automatically generated for each Client instance, so unless you have a good
540
* reason to set this, do not set this.
542
public void setSessionId(String sessionId)
544
this.sessionId = sessionId;
548
* Gets the configuration map passed when constructing this object.
550
public Map getConfiguration()
552
return configuration;
556
* Gets the session id used when making invocations on server invokers. This is the id that will
557
* be used for tracking client connections on the server side, to include client failures that
558
* are sent to connection listeners on the server side.
560
public String getSessionId()
562
return this.sessionId;
566
* Indicates if the underlying transport has been connected to the target server.
568
public boolean isConnected()
574
* Will cause the underlying transport to make connection to the target server. This is
575
* important for any stateful transports, like socket or multiplex. This is also when a client
576
* lease with the server is started.
578
public void connect() throws Exception
584
* Will cause the underlying transport to make connection to the target server. This is
585
* important for any stateful transports, like socket or multiplex. This is also when a client
586
* lease with the server is started. If listener is not null, it will be registered to
587
* receive a callback if the connection fails.
589
public void connect(ConnectionListener listener) throws Exception
591
connect(listener, null);
595
* Will cause the underlying transport to make connection to the target server. This is
596
* important for any stateful transports, like socket or multiplex. This is also when a client
597
* lease with the server is started. If listener is not null, it will be registered to
598
* receive a callback if the connection fails.
601
* If this version of connect() is used, and leasing is enabled, the concept of "connection
602
* identity" is enforced. That is, the ConnectionValidator used by this Client will be
603
* tied to the LeasePinger currently used by the MicroRemoteClientInvoker created or reused
604
* in this method, and that LeasePinger will be tied to this Client and its ConnectionValidator.
605
* If the ConnectionValidator used by any of the Clients associated with the MicroRemoteClientInvoker
606
* used by this Client detects a broken connection, it will shut down that LeasePinger.
607
* Moreover, each ConnectionValidator associated with that LeasePinger will notify its
608
* ConnectionListeners of the broken connection. At that point, the LeasePinger will be
609
* destroyed, and all of the associated Clients will be disconnected.
611
public void connect(ConnectionListener listener, Map metadata) throws Exception
613
log.debug(this + ".connect(" + listener + ")");
614
if (trace) log.trace(this + ": metadata = " + metadata);
620
throw new IllegalStateException("Cannot connect a client with a null locator");
625
if (socketFactory != null)
627
configuration.put(Remoting.CUSTOM_SOCKET_FACTORY, socketFactory);
628
this.socketFactory = null;
630
invoker = InvokerRegistry.createClientInvoker(locator, configuration);
633
connect(invoker, listener, metadata);
636
log.debug(this + " is connected");
640
* Disconnects the underlying transport from the target server. Also notifies the target server
641
* to terminate client lease. Is important that this method is called when no longer using the
642
* remoting client. Otherwise resource will not be cleaned up and if the target server requires
643
* a lease, it will be maintained in the background.
645
public void disconnect()
647
if (trace) log.trace(this + " entering disconnect()");
653
// this is a noop if no lease is active
654
invoker.terminateLease(sessionId, disconnectTimeout);
656
// Need to remove myself from registry so will not keep reference to me since I am of no
657
// use now. Will have to create a new one.
659
if (invokerDestructionDelay > 0)
661
synchronized (invokerDestructionTimerLock)
663
InvokerDestructionTimerTask task = new InvokerDestructionTimerTask(invoker);
664
if (invokerDestructionTimer == null)
666
invokerDestructionTimer = new Timer(true);
671
invokerDestructionTimer.schedule(task, invokerDestructionDelay);
673
catch (IllegalStateException e)
675
log.debug("Unable to schedule InvokerDestructionTimerTask on existing Timer", e);
676
invokerDestructionTimer = new Timer(true);
677
invokerDestructionTimer.schedule(task, invokerDestructionDelay);
680
if (trace) log.trace(this + " scheduled destruction of " + invoker);
685
InvokerRegistry.destroyClientInvoker(invoker.getLocator(), configuration);
691
synchronized (connectionValidatorLock)
693
if (connectionValidator != null)
695
Iterator it = connectionListeners.iterator();
698
ConnectionListener listener = (ConnectionListener) it.next();
699
connectionValidator.removeConnectionListener(this, listener);
701
if (connectionValidator.isStopped())
703
if (connectionValidators.remove(connectionValidatorKey) != null)
705
if (trace) log.trace(this + ".disconnect() removed from static map: " + connectionValidator);
709
connectionValidator = null;
710
connectionValidatorKey = null;
713
log.debug(this + " is disconnected");
717
* Get the client invoker (transport implementation).
719
public ClientInvoker getInvoker()
725
* Set the client invoker (transport implementation).
727
public void setInvoker(ClientInvoker invoker)
729
this.invoker = invoker;
733
* Gets the subsystem being used when routing invocation request on the server side.
735
public String getSubsystem()
741
* Sets the subsystem being used when routing invocation requests on the server side. Specifing
742
* a subsystem is only needed when server has multiple handlers registered (which will each have
743
* their own associated subsystem).
745
public void setSubsystem(String subsystem)
747
this.subsystem = subsystem;
751
* Invokes the server invoker handler with the payload parameter passed. Same as calling
752
* invoke(param, null);
754
public Object invoke(Object param) throws Throwable
756
return invoke(param, null);
760
* Invoke the method remotely.
762
* @param param - payload for the server invoker handler.
763
* @param metadata - any extra metadata that may be needed by the transport (i.e. GET or POST if
764
* using http invoker) or if need to pass along extra data to the server invoker handler.
766
public Object invoke(Object param, Map metadata) throws Throwable
768
return invoke(param, metadata, null);
772
* Will invoke a oneway call to server without a return object. This should be used when not
773
* expecting a return value from the server and wish to achieve higher performance, since the
774
* client will not wait for a return.
776
* This is done one of two ways. The first is to pass true as the clientSide param. This will
777
* cause the execution of the remote call to be executed in a new thread on the client side and
778
* will return the calling thread before making call to server side.
780
* The second, is to pass false as the clientSide param. This will allow the current calling
781
* thread to make the call to the remote server, at which point, the server side processing of
782
* the thread will be executed on the remote server in a new executing thread.
784
* NOTE: The treatment of server side oneway invocations may vary with the transport. The
785
* client side transport is not required to wait for a reply from the server. In particular,
786
* the socket and bisocket transports return immediately after writing the invocation.
788
public void invokeOneway(final Object param, final Map sendPayload, boolean clientSide)
791
final Map internalSendPayload = sendPayload == null ? new HashMap() : sendPayload;
792
internalSendPayload.put(ONEWAY_FLAG, "true");
796
ThreadPool threadPool = getOnewayThreadPool();
797
Runnable onewayRun = new Runnable()
803
invoke(param, internalSendPayload);
807
// throw away exception since can't get it back to original caller
808
log.error("Error executing client oneway invocation request: " + param, e);
812
threadPool.run(onewayRun);
816
OnewayInvocation invocation = new OnewayInvocation(param);
817
invoke(invocation, internalSendPayload);
822
* Returns the callback Connectors with which callbackHandler is registered.
824
public Set getCallbackConnectors(InvokerCallbackHandler callbackHandler)
826
return (Set) callbackConnectors.get(callbackHandler);
830
* Gets the timeout used for network i/o in disconnect() and removeListener().
832
public int getDisconnectTimeout()
834
return disconnectTimeout;
838
* Sets the timeout used for network i/o in disconnect() and removeListener().
840
public void setDisconnectTimeout(int disconnectTimeout)
842
this.disconnectTimeout = disconnectTimeout;
846
* Sets the maximum queue size to use within client pool for one way invocations on the client
847
* side (meaning oneway invocation is handled by thread in this pool and user's call returns
848
* immediately). Default value is MAX_NUM_ONEWAY_THREADS.
850
public void setMaxOnewayThreadPoolQueueSize(int maxOnewayThreadPoolQueueSize)
852
this.maxOnewayThreadPoolQueueSize = maxOnewayThreadPoolQueueSize;
856
* Gets the maximum queue size to use within client pool for one way invocations on the client
857
* side (meaning oneway invocation is handled by thread in this pool and user's call returns
858
* immediately). Default value is MAX_NUM_ONEWAY_THREADS.
860
public int getMaxOnewayThreadPoolQueueSize()
862
return this.maxOnewayThreadPoolQueueSize;
866
* Sets the maximum number of threads to use within client pool for one way invocations on the
867
* client side (meaning oneway invocation is handled by thread in this pool and user's call
868
* returns immediately). Default value is MAX_NUM_ONEWAY_THREADS.
870
public void setMaxNumberOfThreads(int numOfThreads)
872
this.maxNumberThreads = numOfThreads;
876
* Gets the maximum number of threads to use within client pool for one way invocations on the
877
* client side (meaning oneway invocation is handled by thread in this pool and user's call
878
* returns immediately). Default value is MAX_NUM_ONEWAY_THREADS.
880
public int getMaxNumberOfThreads()
882
return this.maxNumberThreads;
886
* Gets the thread pool being used for making one way invocations on the client side. If one has
887
* not be specifically set via configuration or call to set it, will always return instance of
888
* org.jboss.util.threadpool.BasicThreadPool.
890
public ThreadPool getOnewayThreadPool()
892
synchronized (onewayThreadPoolLock)
894
if (onewayThreadPool == null)
896
BasicThreadPool pool = new BasicThreadPool("JBossRemoting Client Oneway");
897
log.debug("created new thread pool: " + pool);
898
Object param = configuration.get(MAX_NUM_ONEWAY_THREADS);
899
if (param instanceof String)
903
maxNumberThreads = Integer.parseInt((String) param);
905
catch (NumberFormatException e)
907
log.error("maxNumberThreads parameter has invalid format: " + param);
910
else if (param != null)
912
log.error("maxNumberThreads parameter must be a string in integer format: " + param);
915
param = configuration.get(MAX_ONEWAY_THREAD_POOL_QUEUE_SIZE);
917
if (param instanceof String)
921
maxOnewayThreadPoolQueueSize = Integer.parseInt((String) param);
923
catch (NumberFormatException e)
925
log.error("maxOnewayThreadPoolQueueSize parameter has invalid format: " + param);
928
else if (param != null)
930
log.error("maxOnewayThreadPoolQueueSize parameter must be a string in integer format: " + param);
933
pool.setMaximumPoolSize(maxNumberThreads);
935
if (maxOnewayThreadPoolQueueSize > 0)
937
pool.setMaximumQueueSize(maxOnewayThreadPoolQueueSize);
939
pool.setBlockingMode(BlockingMode.RUN);
940
onewayThreadPool = pool;
943
return onewayThreadPool;
947
* Sets the thread pool to be used for making one way invocations on the client side.
949
public void setOnewayThreadPool(ThreadPool pool)
951
this.onewayThreadPool = pool;
955
* The socket factory can only be set on the Client before the connect() method has been called.
956
* Otherwise, a runtime exception will be thrown.
958
public void setSocketFactory(SocketFactory socketFactory)
962
throw new RuntimeException("Cannot set socket factory on Client after " +
963
"the connect() method has been called.");
968
invoker.setSocketFactory(socketFactory);
972
this.socketFactory = socketFactory;
976
public SocketFactory getSocketFactory()
980
return invoker.getSocketFactory();
984
return socketFactory;
989
* Same as calling invokeOneway(Object param, Map sendPayload, boolean clientSide) with
990
* clientSide param being false and a null sendPayload.
992
public void invokeOneway(Object param) throws Throwable
994
invokeOneway(param, null);
998
* Same as calling invokeOneway(Object param, Map sendPayload, boolean clientSide) with
999
* clientSide param being false.
1001
public void invokeOneway(Object param, Map sendPayload) throws Throwable
1003
invokeOneway(param, sendPayload, false);
1007
* Adds the specified handler as a callback listener for push (async) callbacks. If the transport
1008
* is uni-directional (e.g. http), remoting will automatically poll for callbacks from the server
1009
* and deliver them to the callback handler. If the transport is bi-directional (e.g. multiplex),
1010
* remoting will automatically create a callback server internally and receive and deliver to
1011
* callback handler the callbacks as they are generated on the server. The metadata map passed
1012
* will control configuration for how the callbacks are processed, such as the polling frequency.
1014
public void addListener(InvokerCallbackHandler callbackhandler, Map metadata) throws Throwable
1016
addListener(callbackhandler, metadata, null);
1020
* Adds the specified handler as a callback listener for push (async) callbacks. If the transport
1021
* is uni-directional (e.g. http), remoting will automatically poll for callbacks from the server
1022
* and deliver them to the callback handler. If the transport is bi-directional (e.g. multiplex),
1023
* remoting will automatically create a callback server internally and receive and deliver to
1024
* callback handler the callbacks as they are generated on the server. The metadata map passed
1025
* will control configuration for how the callbacks are processed, such as the polling frequency.
1027
* @param callbackHandlerObject - this object will be included in the Callback object instance
1028
* passed to the InvokerCallbackHandler specified.
1030
public void addListener(InvokerCallbackHandler callbackhandler, Map metadata,
1031
Object callbackHandlerObject) throws Throwable
1033
addListener(callbackhandler, metadata, callbackHandlerObject, false);
1037
* Adds the specific handler as a callback listener for async callbacks. If the transport
1038
* supports bi-directional calls (meaning server can call back to client over same connection
1039
* that was established by the client) or if the serverToClient flag is set to true, a callback
1040
* server will be created internally and the target server will actually send callbacks to the
1041
* client's internal server. Otherwise, the client will simulate push callbacks by internally
1042
* polling for callbacks on the server and then deliver them to the callback handler.
1044
* @param serverToClient - if true, will allow server to connect to the client directly (which
1045
* must be allowed by firewall in front of client unless transport is bi-directional, such
1046
* as the multiplex transport). If false (and not bi-directional transport), server will
1047
* not create any new connection to the client.
1049
public void addListener(InvokerCallbackHandler callbackhandler, Map metadata,
1050
Object callbackHandlerObject, boolean serverToClient) throws Throwable
1052
InvokerLocator callbackLocator = null;
1056
if (callbackhandler != null)
1058
boolean isBidirectional = invoker instanceof BidirectionalClientInvoker;
1060
if (isBidirectional || serverToClient)
1062
// setup callback server
1063
String transport = null;
1067
// look for config values
1068
if (metadata != null)
1070
transport = (String) metadata.get(CALLBACK_SERVER_PROTOCOL);
1071
host = (String) metadata.get(CALLBACK_SERVER_HOST);
1072
String sPort = (String) metadata.get(CALLBACK_SERVER_PORT);
1077
port = Integer.parseInt(sPort);
1079
catch (NumberFormatException e)
1081
log.warn("Could not set the internal callback server port as " +
1082
"configuration value (" + sPort + ") is not a number.");
1088
metadata = new HashMap();
1090
if (transport == null)
1092
transport = invoker.getLocator().getProtocol();
1093
metadata.put(CALLBACK_SERVER_PROTOCOL, transport);
1097
host = getLocalHost().getHostAddress();
1098
metadata.put(CALLBACK_SERVER_HOST, host);
1102
port = PortUtil.findFreePort(host);
1103
metadata.put(CALLBACK_SERVER_PORT, String.valueOf(port));
1109
((BidirectionalClientInvoker)invoker).getCallbackLocator(metadata);
1113
callbackLocator = new InvokerLocator(transport, host, port, null, metadata);
1115
log.debug("starting callback Connector: " + callbackLocator);
1116
Map callbackConfig = new HashMap(configuration);
1118
if (locator.getParameters() != null)
1120
callbackConfig.putAll(locator.getParameters());
1123
configureCallbackServerSocketFactory(callbackConfig);
1124
Connector callbackServerConnector = new Connector(callbackLocator, callbackConfig);
1126
synchronized (callbackConnectors)
1128
Set connectors = (Set) callbackConnectors.get(callbackhandler);
1129
if (connectors == null)
1131
connectors = new HashSet();
1133
connectors.add(callbackServerConnector);
1134
callbackConnectors.put(callbackhandler, connectors);
1137
callbackServerConnector.start();
1138
// have to use the locator from the server as can be modified internally
1139
callbackLocator = callbackServerConnector.getServerInvoker().getLocator();
1140
addCallbackListener(callbackhandler, metadata, callbackLocator, callbackHandlerObject);
1144
if (callbackPollers.get(callbackhandler) != null)
1146
log.debug(callbackhandler + " already registered");
1150
//need to setup poller to get callbacks from the server
1151
CallbackPoller poller =
1152
new CallbackPoller(this, callbackhandler, metadata, callbackHandlerObject);
1153
callbackPollers.put(callbackhandler, poller);
1154
addCallbackListener(callbackhandler, metadata, callbackLocator, callbackHandlerObject);
1160
throw new NullPointerException("InvokerCallbackHandler to be added as " +
1161
"a listener can not be null.");
1166
throw new Exception("Can not add callback listener because " +
1167
"remoting client is not connected to server.");
1172
* Adds the specified handler as a callback listener for pull (sync) callbacks. Using this method
1173
* will require the programatic getting of callbacks from the server (they will not be pushed to
1174
* the callback handler automatically).
1176
public void addListener(InvokerCallbackHandler callbackHandler) throws Throwable
1178
addListener(callbackHandler, (InvokerLocator) null);
1182
* Adds the specified handler as a callback listener for push (async) callbacks. The invoker
1183
* server will then callback on this handler (via the server invoker specified by the
1184
* clientLocator) when it gets a callback from the server handler.
1186
* Note: passing a null clientLocator will cause the client invoker's client locator to be set to
1187
* null, which basically converts the mode to be pull (sync) where will require call to get
1188
* callbacks (as will not automatically be pushed to callback handler).
1190
public void addListener(InvokerCallbackHandler callbackHandler,
1191
InvokerLocator clientLocator) throws Throwable
1193
addListener(callbackHandler, clientLocator, null);
1197
* Adds the specified handler as a callback listener for push (async) callbacks. The invoker
1198
* server will then callback on this handler (via the server invoker specified by the
1199
* clientLocator) when it gets a callback from the server handler.
1201
* Note: passing a null clientLocator will cause the client invoker's client locator to be set to
1202
* null, which basically converts the mode to be pull (sync) where will require call to get
1203
* callbacks (as will not automatically be pushed to callback handler).
1205
* @param callbackHandlerObject will be included in the callback object passed upon callback.
1207
public void addListener(InvokerCallbackHandler callbackHandler,
1208
InvokerLocator clientLocator, Object callbackHandlerObject)
1211
if (callbackHandler != null)
1215
addCallbackListener(callbackHandler, null, clientLocator, callbackHandlerObject);
1219
throw new Exception("Can not add callback listener as " +
1220
"remoting client is not connected to server.");
1225
throw new NullPointerException("InvokerCallbackHandler to be added as " +
1226
"a listener can not be null.");
1231
* Removes callback handler as a callback listener from the server (and client in the case that
1232
* it was setup to receive async callbacks). See addListener().
1234
public void removeListener(InvokerCallbackHandler callbackHandler) throws Throwable
1238
if (callbackHandler != null)
1240
// first need to see if is push or pull callback (i.e. does have locator associated
1242
String listenerId = (String)listeners.get(callbackHandler);
1243
if(listenerId != null)
1245
// have a pull callback handler
1246
// If disconnectTimeout == 0, skip network i/o.
1247
if (disconnectTimeout != 0)
1249
Map metadata = new HashMap();
1250
metadata.put(LISTENER_ID_KEY, listenerId);
1252
if (disconnectTimeout > 0)
1253
metadata.put(ServerInvoker.TIMEOUT, Integer.toString(disconnectTimeout));
1257
invoke(new InternalInvocation(InternalInvocation.REMOVELISTENER, null), metadata);
1261
log.debug("unable to remove remote callback handler", e);
1265
// clean up callback poller if one exists
1266
CallbackPoller callbackPoller = (CallbackPoller) callbackPollers.remove(callbackHandler);
1267
if (callbackPoller != null)
1269
callbackPoller.stop();
1272
listeners.remove(callbackHandler);
1276
// have a push callback handler
1277
List holderList = invoker.getClientLocators(sessionId, callbackHandler);
1278
if(holderList != null && holderList.size() > 0)
1280
for(int x = 0; x < holderList.size(); x++)
1282
AbstractInvoker.CallbackLocatorHolder holder =
1283
(AbstractInvoker.CallbackLocatorHolder)holderList.get(x);
1284
listenerId = holder.getListenerId();
1285
InvokerLocator locator = holder.getLocator();
1286
Map metadata = new HashMap();
1287
metadata.put(LISTENER_ID_KEY, listenerId);
1289
// If disconnectTimeout == 0, skip network i/o.
1290
if (disconnectTimeout != 0)
1292
if (disconnectTimeout > 0)
1293
metadata.put(ServerInvoker.TIMEOUT, Integer.toString(disconnectTimeout));
1297
// now call target server to remove listener
1298
InternalInvocation ii =
1299
new InternalInvocation(InternalInvocation.REMOVELISTENER, null);
1301
invoke(ii, metadata);
1305
log.debug("unable to remove remote callback handler", e);
1309
// call to callback server to remove listener
1310
Client client = new Client(locator, subsystem);
1311
client.setSessionId(getSessionId());
1313
InternalInvocation ii =
1314
new InternalInvocation(InternalInvocation.REMOVECLIENTLISTENER,
1315
new Object[]{callbackHandler});
1317
client.invoke(ii, metadata);
1318
client.disconnect();
1323
// clean up callback server connectors if any exist
1324
Set connectors = null;
1325
synchronized (callbackConnectors)
1327
connectors = (Set) callbackConnectors.remove(callbackHandler);
1330
if (connectors != null)
1332
Iterator it = connectors.iterator();
1333
while (it.hasNext())
1335
Connector callbackConnector = (Connector) it.next();
1336
callbackConnector.stop();
1337
callbackConnector.destroy();
1343
throw new NullPointerException("Can not remove null InvokerCallbackHandler listener.");
1348
throw new Exception("Can not remove callback listener as " +
1349
"remoting client is not connected to server.");
1354
* Gets the callbacks for specified callback handler. The handler is required because an id is
1355
* generated for each handler. So if have two callback handlers registered with the same server,
1356
* no other way to know for which handler to get the callbacks for.
1358
public List getCallbacks(InvokerCallbackHandler callbackHandler) throws Throwable
1360
return getCallbacks(callbackHandler, null);
1364
* Gets the callbacks for specified callback handler. The handler is required because an id is
1365
* generated for each handler. So if have two callback handlers registered with the same server,
1366
* no other way to know for which handler to get the callbacks for.
1368
* The metadata map can be used to set callback blocking mode and blocking timeout
1371
public List getCallbacks(InvokerCallbackHandler callbackHandler, Map metadata) throws Throwable
1373
if (callbackHandler != null)
1375
String listenerId = (String)listeners.get(callbackHandler);
1377
if(listenerId != null)
1379
if (metadata == null)
1380
metadata = new HashMap();
1382
metadata.put(LISTENER_ID_KEY, listenerId);
1383
InternalInvocation invocation = new InternalInvocation(InternalInvocation.GETCALLBACKS, null);
1387
List response = (List) invoke(invocation, metadata);
1390
catch (MarshalException e)
1392
if (e.getCause() != null && e.getCause() instanceof SocketTimeoutException)
1394
if (trace) log.trace(this + ": getCallbacks() timed out: returning empty list");
1395
return new ArrayList();
1401
metadata.remove(LISTENER_ID_KEY);
1406
String errorMessage = "Could not find listener id for InvokerCallbackHandler (" +
1408
"), please verify handler has been registered as listener.";
1410
String errorMode = (String) metadata.get(THROW_CALLBACK_EXCEPTION);
1411
boolean throwError = Boolean.valueOf(errorMode).booleanValue();
1415
throw new IOException(errorMessage);
1419
log.error(errorMessage);
1426
throw new NullPointerException("Can not remove null InvokerCallbackHandler listener.");
1430
public int acknowledgeCallback(InvokerCallbackHandler callbackHandler, Callback callback)
1433
return acknowledgeCallback(callbackHandler, callback, null);
1436
public int acknowledgeCallback(InvokerCallbackHandler callbackHandler, Callback callback,
1437
Object response) throws Throwable
1439
ArrayList callbacks = new ArrayList(1);
1440
callbacks.add(callback);
1442
ArrayList responses = null;
1443
if (response != null)
1445
responses = new ArrayList(1);
1446
responses.add(response);
1449
return acknowledgeCallbacks(callbackHandler, callbacks, responses);
1452
public int acknowledgeCallbacks(InvokerCallbackHandler callbackHandler, List callbacks)
1455
return acknowledgeCallbacks(callbackHandler, callbacks, null);
1458
public int acknowledgeCallbacks(InvokerCallbackHandler callbackHandler, List callbacks,
1459
List responses) throws Throwable
1461
if (callbackHandler == null)
1463
throw new Exception("InvokerCallbackHandler parameter must not be null");
1466
if (callbacks == null)
1468
throw new Exception("Callback List parameter must not be null");
1471
if (responses != null && responses.size() != callbacks.size())
1473
throw new Exception("Callback response list must be (1) null " +
1474
"or (2) the same size as callback list");
1477
if (callbacks.size() == 0)
1484
ArrayList callbackIds = new ArrayList(callbacks.size());
1485
Iterator idsIterator = callbacks.iterator();
1486
ArrayList responseList = null;
1487
Iterator responseIterator = null;
1489
if (responses != null)
1491
responseList = new ArrayList(responses.size());
1492
responseIterator = responses.iterator();
1495
Callback callback = null;
1496
Object response = null;
1497
String listenerId = null;
1499
for (int i = 0; i < callbacks.size(); i++)
1501
callback = (Callback) idsIterator.next();
1503
if (responseIterator != null)
1505
response = responseIterator.next();
1508
Map returnPayload = callback.getReturnPayload();
1510
if (returnPayload != null)
1512
Object callbackId = returnPayload.get(ServerInvokerCallbackHandler.CALLBACK_ID);
1513
if (callbackId != null)
1515
callbackIds.add(callbackId);
1517
if (responseIterator != null)
1519
responseList.add(response);
1522
String nextListenerId = (String) returnPayload.get(LISTENER_ID_KEY);
1524
if (nextListenerId == null)
1526
throw new Exception("Cannot acknowledge callbacks: " +
1527
"callback " + callbackId + " has null listener id");
1532
listenerId = nextListenerId;
1536
if (!listenerId.equals(nextListenerId))
1537
throw new Exception("Cannot acknowledge callbacks: " +
1538
"all must be from same server side callback handler");
1543
log.error("Cannot acknowledge callback: callback id " +
1544
"is missing from return payload");
1549
log.error("Cannot acknowledge callback: return payload is null");
1553
if (callbackIds.size() == 0)
1558
Map metadata = new HashMap();
1559
if(listenerId != null)
1561
metadata.put(LISTENER_ID_KEY, listenerId);
1565
throw new Exception("Could not find listener id for InvokerCallbackHandler (" +
1566
callbackHandler + "), please verify handler " +
1567
"has been registered as listener.");
1570
Object[] params = new Object[] {callbackIds, responseList};
1571
InternalInvocation invocation =
1572
new InternalInvocation(InternalInvocation.ACKNOWLEDGECALLBACK, params);
1573
invoke(invocation, metadata);
1574
return callbackIds.size();
1578
throw new Exception("Can not acknowledge Callback due to not being connected to server.");
1583
* Sets the marshaller implementation that should be used by the client invoker (transport). This
1584
* overrides the client's default marshaller (or any set within configuration).
1586
public void setMarshaller(Marshaller marshaller)
1590
if (marshaller != null)
1592
invoker.setMarshaller(marshaller);
1596
throw new NullPointerException("Can not set Marshaller with a null value.");
1601
throw new RuntimeException("Can not set remoting client Marshaller when not connected.");
1606
* Sets the unmarshaller implementation that should be used by the client invoker (transport).
1607
* This overrides the client's default unmarshaller (or any set within configuration).
1609
public void setUnMarshaller(UnMarshaller unmarshaller)
1613
if (unmarshaller != null)
1616
invoker.setUnMarshaller(unmarshaller);
1620
throw new NullPointerException("Can not set UnMarshaller to null value.");
1625
throw new RuntimeException("Can not set remoting client UnMarhshaller when not connected.");
1630
* Takes an inputstream and wraps a server around. Then calls the target remoting server and
1631
* passes a proxy for an inputstream to the server's handler. When the server handler calls on
1632
* this proxy, it will call back on this server wrapped around this inputstream.
1634
* @param param - invocation payload.
1636
* @return the return value from the invocation.
1639
public Object invoke(InputStream inputStream, Object param) throws Throwable
1641
StreamServer streamServer = new StreamServer(inputStream);
1642
String locator = streamServer.getInvokerLocator();
1644
// now call on target server and pass locator for stream callbacks
1645
InvocationRequest invocationRequest =
1646
new InvocationRequest(sessionId, subsystem, param, null, null, null);
1647
return invoke(new InternalInvocation(InternalInvocation.ADDSTREAMCALLBACK,
1648
new Object[]{locator, invocationRequest}), null);
1652
* Takes an inputstream and wraps a server around. Then calls the target remoting server and
1653
* passes a proxy for an inputstream to the server's handler. When the server handler calls on
1654
* this proxy, it will call back on this server wrapped around this inputstream. The Connector
1655
* passed is expected to have already been started and will have the stream handler added with
1656
* subsystem of 'stream'. Also note that the Connector passed will not be stopped when/if the
1657
* server calls to close the input stream.
1659
* @param param - invocation payload.
1661
* @return the return value from the invocation
1663
public Object invoke(InputStream inputStream, Object param, Connector streamConnector)
1666
StreamServer streamServer = new StreamServer(inputStream, streamConnector);
1667
String locator = streamServer.getInvokerLocator();
1669
// now call on target server and pass locator for stream callbacks
1670
InvocationRequest invocationRequest =
1671
new InvocationRequest(sessionId, subsystem, param, null, null, null);
1673
return invoke(new InternalInvocation(InternalInvocation.ADDSTREAMCALLBACK,
1674
new Object[]{locator, invocationRequest}), null);
1678
* Takes an inputstream and wraps a server around. Then calls the target remoting server and
1679
* passes proxy for an inputstream to the server's handler. When the server handle calls on this
1680
* proxy, it will call back on this server wrapped around this inputstream. The InvokerLocator
1681
* passed is used to create the internal Connector used to receive the calls from the server
1684
public Object invoke(InputStream inputStream, Object param, InvokerLocator streamServerLocator)
1687
StreamServer streamServer = new StreamServer(inputStream, streamServerLocator);
1688
String locator = streamServer.getInvokerLocator();
1690
// now call on target server and pass locator for stream callbacks
1691
InvocationRequest invocationRequest =
1692
new InvocationRequest(sessionId, subsystem, param, null, null, null);
1693
return invoke(new InternalInvocation(InternalInvocation.ADDSTREAMCALLBACK,
1694
new Object[]{locator, invocationRequest}), null);
1698
* @return the ping period (in ms) this client's connection validator is configured with. If the
1699
* client doesn't ping (on account of connection validator not being installed, or
1700
* stopped), returns -1.
1702
public long getPingPeriod()
1704
if (connectionValidator == null)
1709
return connectionValidator.getPingPeriod();
1713
* @return the lease period (in ms) if the client has an active leasing mechanism with the server
1716
public long getLeasePeriod()
1718
if (invoker == null)
1723
return invoker.getLeasePeriod(sessionId);
1727
* Returns an InetAddress for the client machine as seen by the server machine.
1728
* @return an InetAddress for the client machine as seen by the server machine.
1731
public InetAddress getAddressSeenByServer() throws Throwable
1733
return (InetAddress) invoke("$GET_CLIENT_LOCAL_ADDRESS$");
1736
public String toString()
1738
return "Client[" + System.identityHashCode(this) + ":" + sessionId + "]";
1741
// Package protected ----------------------------------------------------------------------------
1743
void notifyListeners()
1745
synchronized (connectionValidatorLock)
1747
log.debug(this + " entering notifyListeners(): " + connectionValidator);
1748
if (connectionValidator != null)
1750
synchronized (connectionValidator)
1752
if (connectionValidator.isStopped())
1754
if (trace) log.trace(this + ": " + connectionValidator + " is stopped");
1758
if (trace) log.trace(this + ": " + connectionValidator + " is not stopped");
1759
if (trace) log.trace(this + " calling connectionValidator.notifyListeners()");
1760
connectionValidator.notifyListeners(new Exception("Could not connect to server!"));
1761
Iterator it = connectionListeners.iterator();
1762
while (it.hasNext())
1764
ConnectionListener listener = (ConnectionListener) it.next();
1765
connectionValidator.removeConnectionListener(this, listener);
1767
if (connectionValidators.remove(connectionValidatorKey) != null)
1769
if (trace) log.trace(this + ".notifyAndDisconnect() removed from static map: " + connectionValidator);
1773
connectionValidator = null;
1774
connectionValidatorKey = null;
1777
log.debug(this + " leaving notifyListeners()");
1781
// Protected ------------------------------------------------------------------------------------
1783
// Private --------------------------------------------------------------------------------------
1785
private void connect(ClientInvoker invoker, ConnectionListener listener, Map metadata)
1787
if (invoker != null)
1792
setupClientLease(invoker, listener, metadata);
1794
catch (Throwable throwable)
1796
RuntimeException e = new CannotConnectException("Error setting up client lease upon performing connect.");
1797
e.initCause(throwable);
1800
log.debug(this + " connected to " + locator);
1804
throw new RuntimeException("Client invoker is null (may have used void constructor " +
1805
"for Client, which should only be used for Externalization.");
1809
private void setupClientLease(ClientInvoker invoker, ConnectionListener listener, Map metadata) throws Throwable
1811
long leasePeriod = -1;
1812
boolean enableLease = false;
1814
// start with checking the locator URL for hint as to if should do initial lease ping
1815
if (invoker != null)
1817
if (invoker instanceof LocalClientInvoker)
1819
// no need to continue as won't do client lease when is local (JBREM-382)
1823
InvokerLocator locator = invoker.getLocator();
1824
Map locatorParams = locator.getParameters();
1825
if (locatorParams != null)
1827
String leaseValue = (String)locatorParams.get(InvokerLocator.CLIENT_LEASE);
1828
if (leaseValue != null && leaseValue.length() > 0)
1830
enableLease = Boolean.valueOf(leaseValue).booleanValue();
1833
String leasePeriodValue = (String)locatorParams.get(InvokerLocator.CLIENT_LEASE_PERIOD);
1834
if (leasePeriodValue != null && leasePeriodValue.length() > 0)
1838
leasePeriod = Long.parseLong(leasePeriodValue);
1840
catch (NumberFormatException e)
1842
log.warn("Could not convert client lease period value (" +
1843
leasePeriodValue + ") to a number.");
1850
throw new RuntimeException("Can not set up client lease as client invoker is null.");
1853
if (configuration != null)
1855
Object val = configuration.get(ENABLE_LEASE);
1859
if (val instanceof Boolean)
1861
enableLease = ((Boolean)val).booleanValue();
1863
else if (val instanceof String)
1865
enableLease = Boolean.valueOf((String)val).booleanValue();
1869
log.warn("Can not evaluate " + ENABLE_LEASE + " value (" +
1870
val + ") as a boolean type.");
1874
String leasePeriodValue = (String)configuration.get(InvokerLocator.CLIENT_LEASE_PERIOD);
1876
if (leasePeriodValue != null && leasePeriodValue.length() > 0)
1880
leasePeriod = Long.parseLong(leasePeriodValue);
1882
catch (NumberFormatException e)
1884
log.warn("Could not convert client lease period value (" +
1885
leasePeriodValue + ") to a number.");
1890
if (trace) log.trace(this + " enableLease: " + enableLease);
1893
Map temp = new HashMap(configuration);
1894
if (metadata != null)
1896
temp.putAll(metadata);
1898
if (useClientConnectionIdentity)
1900
temp.put(CLIENT, this);
1901
temp.put(CONNECTION_LISTENER, listener);
1903
if (trace) log.trace(this + " calling MicroRemoteClientInvoker.establishLease()");
1904
invoker.establishLease(sessionId, temp, leasePeriod);
1906
else if (listener != null)
1908
addConnectionListener(listener, metadata);
1912
private Object invoke(Object param, Map metadata, InvokerLocator callbackServerLocator)
1917
return invoker.invoke(new InvocationRequest(sessionId, subsystem, param,
1918
metadata, null, callbackServerLocator));
1922
throw new Exception("Can not make remoting client invocation " +
1923
"due to not being connected to server.");
1927
private void addCallbackListener(InvokerCallbackHandler callbackhandler, Map metadata,
1928
InvokerLocator callbackLocator, Object callbackHandlerObject)
1931
// if callback locator is null, then is pull callbacks and need to track callback handler
1932
// per Client (not by client invoker).
1933
if (callbackLocator == null)
1935
String listenerId = generateListenerId(callbackhandler);
1937
// if listenerId is null, means this Client has already had the callbackhanler reference
1938
// registered as a listener, so no need to add it again.
1939
if (listenerId != null)
1941
Map internalMetadata = new HashMap();
1942
internalMetadata.put(LISTENER_ID_KEY, listenerId);
1943
if(metadata != null)
1945
internalMetadata.putAll(metadata);
1947
// now call server to add listener
1948
invoke(new InternalInvocation(InternalInvocation.ADDLISTENER, null),
1949
internalMetadata, callbackLocator);
1954
// is going to be push callbacks which means callback server locator involved.
1955
// will have to delegate to client invoker.
1956
String listenerId = invoker.addClientLocator(sessionId, callbackhandler, callbackLocator);
1958
if (listenerId != null)
1961
Map internalMetadata = new HashMap();
1962
internalMetadata.put(LISTENER_ID_KEY, listenerId);
1963
if(metadata != null)
1965
internalMetadata.putAll(metadata);
1968
Client client = new Client(callbackLocator, subsystem);
1969
client.setSessionId(getSessionId());
1974
InternalInvocation i =
1975
new InternalInvocation(InternalInvocation.ADDCLIENTLISTENER,
1976
new Object[]{callbackhandler, callbackHandlerObject});
1978
client.invoke(i, internalMetadata);
1982
client.disconnect();
1985
// now call server to add listener
1986
invoke(new InternalInvocation(InternalInvocation.ADDLISTENER, null),
1987
internalMetadata, callbackLocator);
1992
private String generateListenerId(InvokerCallbackHandler callbackhandler)
1994
String listenerId = null;
1995
Object obj = listeners.get(callbackhandler);
1998
listenerId = new GUID().toString();
1999
listeners.put(callbackhandler, listenerId);
2004
private void processParameters()
2006
Map params = new HashMap();
2007
if (configuration != null)
2008
params.putAll(configuration);
2009
if (locator.getParameters() != null)
2010
params.putAll(locator.getParameters());
2012
Object param = params.get(INVOKER_DESTRUCTION_DELAY);
2013
if (param instanceof String)
2017
invokerDestructionDelay = Integer.parseInt((String) param);
2018
log.debug(this + " setting invokerDestructionDelay to " + invokerDestructionDelay);
2020
catch (NumberFormatException e)
2022
log.error("invokerDestructionDelay parameter has invalid format: " + param);
2025
else if (param != null)
2027
log.error("invokerDestructionDelay parameter must be a string in integer format: " + param);
2030
param = configuration.get(Remoting.USE_CLIENT_CONNECTION_IDENTITY);
2031
if (param instanceof String)
2033
useClientConnectionIdentity = Boolean.valueOf((String) param).booleanValue();
2035
else if (param != null)
2037
log.warn("value of " + Remoting.USE_CLIENT_CONNECTION_IDENTITY + " must be a String: " + param);
2041
if (locator.getParameters() != null)
2043
param = locator.getParameters().get(Remoting.USE_CLIENT_CONNECTION_IDENTITY);
2046
useClientConnectionIdentity = Boolean.valueOf((String) param).booleanValue();
2047
this.configuration.put(Remoting.USE_CLIENT_CONNECTION_IDENTITY, param);
2052
PortUtil.updateRange(params);
2055
private void configureCallbackServerSocketFactory(Map map) throws Exception
2057
if (InvokerRegistry.isSSLSupported(locator.getProtocol()) &&
2058
!map.containsKey(Remoting.CUSTOM_SERVER_SOCKET_FACTORY) &&
2059
!map.containsKey(ServerInvoker.SERVER_SOCKET_FACTORY) &&
2060
!map.containsKey(SSLSocketBuilder.REMOTING_SERVER_SOCKET_USE_CLIENT_MODE))
2061
map.put(SSLSocketBuilder.REMOTING_SERVER_SOCKET_USE_CLIENT_MODE, "true");
2064
// Inner classes --------------------------------------------------------------------------------
2065
class InvokerDestructionTimerTask extends TimerTask
2067
private WeakReference ref;
2069
public InvokerDestructionTimerTask(ClientInvoker invoker)
2071
ref = new WeakReference(invoker);
2076
ClientInvoker invoker = (ClientInvoker) ref.get();
2077
log.trace(this + " calling InvokerRegistry.destroyClientInvoker() for " + invoker);
2078
InvokerRegistry.destroyClientInvoker(invoker.getLocator(), configuration);
2084
static class ConnectionValidatorKey
2086
private ClientInvoker invoker;
2087
private Map metadata;
2089
ConnectionValidatorKey(ClientInvoker invoker, Map metadata)
2091
this.invoker = invoker;
2092
this.metadata = metadata;
2095
public boolean equals(Object o)
2099
if (! (o instanceof ConnectionValidatorKey))
2101
ConnectionValidatorKey holder = (ConnectionValidatorKey) o;
2102
boolean metadataEquals = (metadata == null && holder.metadata == null) || metadata.equals(holder.metadata);
2103
return invoker == holder.invoker && metadataEquals;
2106
public int hashCode()
2108
return invoker.hashCode() * metadata.hashCode();
2112
static private InetAddress getLocalHost() throws UnknownHostException
2114
if (SecurityUtility.skipAccessControl())
2118
return InetAddress.getLocalHost();
2120
catch (IOException e)
2122
return InetAddress.getByName("127.0.0.1");
2128
return (InetAddress) AccessController.doPrivileged( new PrivilegedExceptionAction()
2130
public Object run() throws IOException
2134
return InetAddress.getLocalHost();
2136
catch (IOException e)
2138
return InetAddress.getByName("127.0.0.1");
2143
catch (PrivilegedActionException e)
2145
throw (UnknownHostException) e.getCause();