1
package org.jboss.remoting.transport.socket;
3
import org.jboss.logging.Logger;
4
import org.jboss.remoting.CannotConnectException;
5
import org.jboss.remoting.ConnectionFailedException;
6
import org.jboss.remoting.Home;
7
import org.jboss.remoting.InvocationFailureException;
8
import org.jboss.remoting.InvocationRequest;
9
import org.jboss.remoting.InvokerLocator;
10
import org.jboss.remoting.RemoteClientInvoker;
11
import org.jboss.remoting.ServerInvoker;
12
import org.jboss.remoting.Version;
13
import org.jboss.remoting.serialization.ClassLoaderUtility;
14
import org.jboss.remoting.util.SecurityUtility;
15
import org.jboss.remoting.invocation.OnewayInvocation;
16
import org.jboss.remoting.marshal.Marshaller;
17
import org.jboss.remoting.marshal.UnMarshaller;
18
import org.jboss.remoting.marshal.VersionedMarshaller;
19
import org.jboss.remoting.marshal.VersionedUnMarshaller;
20
import org.jboss.remoting.marshal.serializable.SerializableMarshaller;
21
import org.jboss.util.propertyeditor.PropertyEditors;
23
import java.beans.IntrospectionException;
24
import java.io.EOFException;
25
import java.io.IOException;
26
import java.io.InputStream;
27
import java.io.OutputStream;
28
import java.lang.reflect.Constructor;
29
import java.net.InetAddress;
30
import java.net.Socket;
31
import java.net.InetSocketAddress;
32
import java.net.SocketException;
33
import java.net.UnknownHostException;
34
import java.rmi.MarshalException;
35
import java.security.AccessController;
36
import java.security.PrivilegedActionException;
37
import java.security.PrivilegedExceptionAction;
38
import java.util.HashMap;
39
import java.util.Iterator;
40
import java.util.LinkedList;
41
import java.util.List;
43
import java.util.Properties;
44
import java.util.regex.Pattern;
46
import EDU.oswego.cs.dl.util.concurrent.Semaphore;
49
* SocketClientInvoker uses Sockets to remotely connect to the a remote ServerInvoker, which must be
50
* a SocketServerInvoker.
52
* @author <a href="mailto:jhaynie@vocalocity.net">Jeff Haynie</a>
53
* @author <a href="mailto:telrod@e2technologies.net">Tom Elrod</a>
54
* @author <a href="mailto:ovidiu@jboss.org">Ovidiu Feodorov</a>
56
* @version $Revision: 5476 $
58
public class MicroSocketClientInvoker extends RemoteClientInvoker
60
// Constants ------------------------------------------------------------------------------------
62
private static final Logger log = Logger.getLogger(MicroSocketClientInvoker.class);
65
* Can be either true or false and will indicate if client socket should have TCP_NODELAY turned
66
* on or off. TCP_NODELAY is for a specific purpose; to disable the Nagle buffering algorithm.
67
* It should only be set for applications that send frequent small bursts of information without
68
* getting an immediate response; where timely delivery of data is required (the canonical
69
* example is mouse movements). The default is false.
71
public static final String TCP_NODELAY_FLAG = "enableTcpNoDelay";
74
* The client side maximum number of threads. The default is MAX_POOL_SIZE.
76
public static final String MAX_POOL_SIZE_FLAG = "clientMaxPoolSize";
79
* Specifies the fully qualified class name for the custom SocketWrapper implementation to use on
80
* the client. Note, will need to make sure this is marked as a client parameter (using the
81
* 'isParam' attribute). Making this change will not affect the marshaller/unmarshaller that is
82
* used, which may also be a requirement.
84
public static final String CLIENT_SOCKET_CLASS_FLAG = "clientSocketClass";
86
/** Key for setting timeout used by OnewayConnectionTask */
87
public static final String ONEWAY_CONNECTION_TIMEOUT = "onewayConnectionTimeout";
89
/** Key to determine if client side oneway invocations should wait to read version.
92
public static final String USE_ONEWAY_CONNECTION_TIMEOUT = "useOnewayConnectionTimeout";
94
/** Key for setting time to wait to get permission to get a connection */
95
public static final String CONNECTION_WAIT = "connectionWait";
97
/** Key for setting socket write timeout */
98
public static final String WRITE_TIMEOUT = "writeTimeout";
101
* Default value for enable TCP nodelay. Value is false.
103
public static final boolean TCP_NODELAY_DEFAULT = false;
106
* Default maximum number of times a invocation will be made when it gets a SocketException.
109
public static final int MAX_CALL_RETRIES = 3;
112
* Default maximum number of socket connections allowed at any point in time. Default is 50.
114
public static final int MAX_POOL_SIZE = 50;
116
/** Default timeout value used by OnewayConnectionTask. Value is 2 seconds. */
117
public static final int ONEWAY_CONNECTION_TIMEOUT_DEFAULT = 2000;
119
/** Default time to wait to get permission to get a connection */
120
public static final int CONNECTION_WAIT_DEFAULT = 30000;
122
// Static ---------------------------------------------------------------------------------------
124
private static boolean trace = log.isTraceEnabled();
127
* Used for debugging (tracing) connections leaks
129
static int counter = 0;
131
protected static final Map connectionPools = new HashMap();
133
protected static final Map semaphores = new HashMap();
135
// Performance measurements
136
public static long getSocketTime = 0;
137
public static long readTime = 0;
138
public static long writeTime = 0;
139
public static long serializeTime = 0;
140
public static long deserializeTime = 0;
142
private static final String patternString = "^.*(?:connection.*reset|connection.*closed|broken.*pipe).*$";
143
private static final Pattern RETRIABLE_ERROR_MESSAGE = Pattern.compile(patternString, Pattern.CASE_INSENSITIVE);
146
* Close all sockets in a specific pool.
148
public static void clearPool(LinkedList thepool)
156
synchronized (thepool)
158
int size = thepool.size();
159
for (int i = 0; i < size; i++)
161
SocketWrapper socketWrapper = (SocketWrapper)thepool.removeFirst();
164
socketWrapper.close();
165
socketWrapper = null;
167
catch (Exception ignored)
175
log.debug("Failure", ex);
180
* Close all sockets in all pools.
182
public static void clearPools()
184
synchronized (connectionPools)
186
for(Iterator i = connectionPools.keySet().iterator(); i.hasNext();)
188
ServerAddress sa = (ServerAddress) i.next();
190
if (trace) { log.trace("clearing pool for " + sa); }
191
clearPool((LinkedList) connectionPools.get(sa));
198
// Attributes -----------------------------------------------------------------------------------
200
private Constructor clientSocketConstructor;
201
private boolean reuseAddress;
203
protected InetAddress addr;
206
// flag being set on true by a disconnect request. If trying to create a connection goes on in a
207
// loop and a disconnect request arrives, this flag will be used to sent this information into
209
// private volatile boolean bailOut;
212
* Indicates if will check the socket connection when getting from pool by sending byte over the
213
* connection to validate is still good.
215
protected boolean shouldCheckConnection;
218
* If the TcpNoDelay option should be used on the socket.
220
protected boolean enableTcpNoDelay;
222
protected String clientSocketClassName;
223
protected Class clientSocketClass;
224
protected int numberOfCallRetries;
225
protected int maxPoolSize;
226
protected int onewayConnectionTimeout;
227
protected boolean useOnewayConnectionTimeout = true;
228
protected int connectionWait = CONNECTION_WAIT_DEFAULT;
231
* Pool for this invoker. This is shared between all instances of proxies attached to a specific
234
protected LinkedList pool;
236
//Semaphore is also shared between all proxies - must 1-1 correspondence between pool and semaphore
237
protected Semaphore semaphore;
241
* connection information
243
protected ServerAddress address;
247
* Socket configuration parameters.
249
protected boolean keepAlive;
250
protected boolean keepAliveSet;
251
protected boolean oOBInline;
252
protected boolean oOBInlineSet;
253
protected int receiveBufferSize = - 1;
254
protected int sendBufferSize = -1;
255
protected boolean soLinger;
256
protected boolean soLingerSet;
257
protected int soLingerDuration = -1;
258
protected int trafficClass = -1;
261
* If true, an IOException with message such as "Connection reset by peer: socket write error" will
262
* be treated like a SocketException.
264
protected boolean generalizeSocketException;
266
protected int writeTimeout = -1;
268
// Constructors ---------------------------------------------------------------------------------
270
public MicroSocketClientInvoker(InvokerLocator locator)
275
public MicroSocketClientInvoker(InvokerLocator locator, Map configuration)
277
super(locator, configuration);
279
clientSocketConstructor = null;
281
shouldCheckConnection = false;
282
enableTcpNoDelay = TCP_NODELAY_DEFAULT;
283
clientSocketClassName = ClientSocketWrapper.class.getName();
284
clientSocketClass = null;
285
numberOfCallRetries = MAX_CALL_RETRIES;
287
maxPoolSize = MAX_POOL_SIZE;
288
onewayConnectionTimeout = ONEWAY_CONNECTION_TIMEOUT_DEFAULT;
296
log.debug("Error setting up " + this, ex);
297
throw new RuntimeException(ex.getMessage(), ex);
300
log.debug(this + " constructed");
303
// Public ---------------------------------------------------------------------------------------
306
* Indicates if will check socket connection when returning from pool by sending byte to the
307
* server. Default value will be false.
309
public boolean checkingConnection()
311
return shouldCheckConnection;
315
* Returns if newly created sockets will have SO_REUSEADDR enabled. Default is for this to be
318
public boolean getReuseAddress()
324
* Sets if newly created socket should have SO_REUSEADDR enable. Default is true.
326
public void setReuseAddress(boolean reuse)
328
reuseAddress = reuse;
331
public boolean isKeepAlive()
336
public void setKeepAlive(boolean keepAlive)
338
this.keepAlive = keepAlive;
342
public boolean isOOBInline()
347
public void setOOBInline(boolean inline)
353
public int getReceiveBufferSize()
355
return receiveBufferSize;
358
public void setReceiveBufferSize(int receiveBufferSize)
360
this.receiveBufferSize = receiveBufferSize;
363
public int getSendBufferSize()
365
return sendBufferSize;
368
public void setSendBufferSize(int sendBufferSize)
370
this.sendBufferSize = sendBufferSize;
373
public boolean isSoLinger()
378
public int getSoLingerDuration()
380
return soLingerDuration;
383
public void setSoLinger(boolean soLinger)
385
this.soLinger = soLinger;
389
public void setSoLingerDuration(int soLingerDuration)
391
this.soLingerDuration = soLingerDuration;
394
public int getTrafficClass()
399
public void setTrafficClass(int trafficClass)
401
this.trafficClass = trafficClass;
404
public int getWriteTimeout()
409
public void setWriteTimeout(int writeTimeout)
411
this.writeTimeout = writeTimeout;
414
public boolean isGeneralizeSocketException()
416
return generalizeSocketException;
419
public void setGeneralizeSocketException(boolean generalizeSocketException)
421
this.generalizeSocketException = generalizeSocketException;
424
public synchronized void disconnect()
426
log.debug(this + " disconnecting ...");
431
public void flushConnectionPool()
435
while (pool != null && pool.size() > 0)
437
SocketWrapper socketWrapper = (SocketWrapper)pool.removeFirst();
440
socketWrapper.close();
442
catch (IOException e)
444
log.debug("Failed to close socket wrapper", e);
450
public int getConnectionWait()
452
return connectionWait;
455
public void setConnectionWait(int connectionWait)
457
this.connectionWait = connectionWait;
460
public Home getHomeInUse()
466
* Sets the number of times an invocation will retry based on getting SocketException.
468
public void setNumberOfCallRetries(int numberOfCallRetries)
470
if (numberOfCallRetries < 1)
472
this.numberOfCallRetries = MAX_CALL_RETRIES;
476
this.numberOfCallRetries = numberOfCallRetries;
480
public int getNumberOfCallRetries()
482
return numberOfCallRetries;
486
* Sets the number of retries to get a socket connection.
488
* @param numberOfRetries Must be a number greater than 0.
490
public void setNumberOfRetries(int numberOfRetries)
492
log.warn("numberOfRetries is no longer used");
495
public int getNumberOfRetries()
497
log.warn("numberOfRetries is no longer used");
502
* The name of of the server.
504
public String getServerHostName() throws Exception
506
return address.address;
509
public int getNumberOfUsedConnections()
511
if (semaphore == null)
514
return maxPoolSize - (int) semaphore.permits();
517
public int getNumberOfAvailableConnections()
519
if (semaphore == null)
522
return (int) semaphore.permits();
525
// Package protected ----------------------------------------------------------------------------
527
// Protected ------------------------------------------------------------------------------------
529
protected void setup() throws Exception
531
Properties props = new Properties();
532
props.putAll(configuration);
533
mapJavaBeanProperties(MicroSocketClientInvoker.this, props, false);
534
configureParameters();
536
if (!InvokerLocator.MULTIHOME.equals(locator.getHost()))
538
addr = getAddressByName(locator.getHost());
539
port = locator.getPort();
540
address = createServerAddress(addr, port);
544
List homes = locator.getConnectHomeList();
545
if (homes.size() == 1)
547
// Treat as in non MULTIHOME case.
548
Home home = (Home) homes.iterator().next();
549
addr = getAddressByName(home.host);
550
address = createServerAddress(addr, home.port);
555
protected void configureParameters()
557
Map params = configuration;
564
// look for enableTcpNoDelay param
565
Object val = params.get(TCP_NODELAY_FLAG);
570
enableTcpNoDelay = Boolean.valueOf((String)val).booleanValue();
571
log.debug(this + " setting enableTcpNoDelay to " + enableTcpNoDelay);
575
log.warn(this + " could not convert " + TCP_NODELAY_FLAG + " value of " +
576
val + " to a boolean value.");
580
// look for maxPoolSize param
581
val = params.get(MAX_POOL_SIZE_FLAG);
586
maxPoolSize = Integer.valueOf((String)val).intValue();
587
log.debug(this + " setting maxPoolSize to " + maxPoolSize);
591
log.warn(this + " could not convert " + MAX_POOL_SIZE_FLAG + " value of " +
592
val + " to a int value");
596
// look for client socket class name
597
val = params.get(CLIENT_SOCKET_CLASS_FLAG);
600
String value = (String)val;
601
if (value.length() > 0)
603
clientSocketClassName = value;
604
log.debug(this + " setting client socket wrapper class name to " + clientSocketClassName);
608
val = params.get(SocketServerInvoker.CHECK_CONNECTION_KEY);
609
if (val != null && ((String)val).length() > 0)
611
String value = (String) val;
612
shouldCheckConnection = Boolean.valueOf(value).booleanValue();
613
log.debug(this + " setting shouldCheckConnection to " + shouldCheckConnection);
615
else if (getVersion() == Version.VERSION_1)
617
shouldCheckConnection = true;
618
log.debug(this + " setting shouldCheckConnection to " + shouldCheckConnection);
621
// look for onewayConnectionTimeout param
622
val = params.get(ONEWAY_CONNECTION_TIMEOUT);
627
onewayConnectionTimeout = Integer.valueOf((String)val).intValue();
628
log.debug(this + " setting onewayConnectionTimeout to " + onewayConnectionTimeout);
632
log.warn(this + " could not convert " + ONEWAY_CONNECTION_TIMEOUT + " value of " +
633
val + " to an int value");
637
// look for useOnewayConnectionTimeout param
638
val = params.get(USE_ONEWAY_CONNECTION_TIMEOUT);
643
useOnewayConnectionTimeout = Boolean.valueOf((String)val).booleanValue();
644
log.debug(this + " setting useOnewayConnectionTimeout to " + useOnewayConnectionTimeout);
648
log.warn(this + " could not convert " + USE_ONEWAY_CONNECTION_TIMEOUT + " value of " +
649
val + " to a boolean value");
653
// look for writeTimeout param
654
val = params.get(WRITE_TIMEOUT);
659
writeTimeout = Integer.valueOf((String)val).intValue();
660
log.debug(this + " setting writeTimeout to " + writeTimeout);
664
log.warn(this + " could not convert " + WRITE_TIMEOUT + " value of " +
665
val + " to an int value");
670
protected ServerAddress createServerAddress(InetAddress addr, int port)
672
return new ServerAddress(addr.getHostAddress(), port, enableTcpNoDelay, -1, maxPoolSize);
675
protected void finalize() throws Throwable
681
protected synchronized void handleConnect() throws ConnectionFailedException
685
if (InvokerLocator.MULTIHOME.equals(locator.getHost()))
687
home = getUsableAddress(locator);
690
throw new ConnectionFailedException(this + " unable to find a usable address for: " + home);
692
locator.setHomeInUse(home);
696
home = new Home(locator.getHost(), locator.getPort());
700
protected Home getUsableAddress(InvokerLocator locator)
702
List homes = getConnectHomes();
703
Iterator it = homes.iterator();
710
home = (Home) it.next();
711
addr = getAddressByName(home.host);
712
address = createServerAddress(addr, home.port);
713
invoke(new InvocationRequest(null, null, ServerInvoker.ECHO, null, null, null));
714
if (trace) log.trace(this + " able to contact server at: " + home);
719
log.debug(this + " unable to contact server at: " + home);
726
protected synchronized void handleDisconnect()
733
* Each implementation of the remote client invoker should have a default data type that is used
734
* in the case it is not specified in the invoker locator URI.
736
protected String getDefaultDataType()
738
return SerializableMarshaller.DATATYPE;
741
protected Object transport(String sessionID, Object invocation, Map metadata,
742
Marshaller marshaller, UnMarshaller unmarshaller)
743
throws IOException, ConnectionFailedException, ClassNotFoundException
745
SocketWrapper socketWrapper = null;
746
Object response = null;
747
boolean oneway = false;
749
// tempTimeout < 0 will indicate there is no per invocation timeout.
750
int tempTimeout = -1;
752
int savedTimeout = -1;
757
// check to see if is one way invocation and return after writing invocation if is
758
Object val = metadata.get(org.jboss.remoting.Client.ONEWAY_FLAG);
759
if(val != null && val instanceof String && Boolean.valueOf((String)val).booleanValue())
764
// look for temporary timeout values
765
String tempTimeoutString = (String) metadata.get(ServerInvoker.TIMEOUT);
767
if (tempTimeoutString != null)
771
tempTimeout = Integer.valueOf(tempTimeoutString).intValue();
772
log.debug(this + " setting timeout to " + tempTimeout + " for this invocation");
776
log.warn(this + " could not convert " + ServerInvoker.TIMEOUT + " value of " +
777
tempTimeoutString + " to an integer value.");
783
if (tempTimeout >= 0)
785
start = System.currentTimeMillis();
788
boolean serverSideOneway = false;
789
if (oneway && invocation instanceof InvocationRequest)
791
InvocationRequest ir = (InvocationRequest) invocation;
792
if (ir.getParameter() instanceof OnewayInvocation)
793
serverSideOneway = true;
797
Exception sockEx = null;
799
for (; retryCount < numberOfCallRetries; retryCount++)
801
if (trace) log.trace(this + " retryCount: " + retryCount);
804
// If a per invocation timeout has been set, the time spent retrying
805
// should count toward the elapsed time.
806
timeLeft = (int) (tempTimeout - (System.currentTimeMillis() - start));
813
boolean tryPool = retryCount < (numberOfCallRetries - 1)
815
|| numberOfCallRetries == 1;
816
socketWrapper = getConnection(marshaller, unmarshaller, tryPool, timeLeft);
817
log.trace(this + " got socketWrapper: " + socketWrapper);
819
catch (InterruptedException e)
822
if (trace) log.trace(this + " released semaphore: " + semaphore.permits(), e);
823
throw new RuntimeException(e);
830
if (trace) log.trace(this + " released semaphore: " + semaphore.permits(), e);
831
sockEx = new CannotConnectException(
832
"Can not get connection to server. Problem establishing " +
833
"socket connection for " + locator, e);
837
if (tempTimeout >= 0)
839
timeLeft = (int) (tempTimeout - (System.currentTimeMillis() - start));
842
savedTimeout = socketWrapper.getTimeout();
843
socketWrapper.setTimeout(timeLeft);
848
int version = getVersion();
849
boolean performVersioning = Version.performVersioning(version);
851
OutputStream outputStream = socketWrapper.getOutputStream();
852
log.trace(this + "got outputStream: " + outputStream);
853
if (performVersioning)
855
log.trace(this + " writing version");
856
writeVersion(outputStream, version);
857
log.trace(this + " wrote version");
860
//TODO: -TME so this is messed up as now ties remoting versioning to using a marshaller type
861
versionedWrite(outputStream, marshaller, invocation, version);
863
if (serverSideOneway)
865
if(trace) { log.trace(this + " sent oneway invocation, so not waiting for response, returning null"); }
869
if (performVersioning && useOnewayConnectionTimeout)
871
int onewaySavedTimeout = socketWrapper.getTimeout();
872
socketWrapper.setTimeout(onewayConnectionTimeout);
873
InputStream inputStream = socketWrapper.getInputStream();
874
version = readVersion(inputStream);
877
throw new EOFException("end of file");
879
if (version == SocketWrapper.CLOSING)
881
log.trace(this + " received version 254: treating as end of file");
882
throw new EOFException("end of file");
885
// Note that if an exception is thrown, the socket is thrown away,
886
// so there's no need to reset the timeout value.
887
socketWrapper.setTimeout(onewaySavedTimeout);
892
InputStream inputStream = socketWrapper.getInputStream();
893
if (performVersioning)
895
version = readVersion(inputStream);
898
throw new EOFException("end of file");
900
if (version == SocketWrapper.CLOSING)
902
log.trace(this + " received version 254: treating as end of file");
903
throw new EOFException("end of file");
907
response = versionedRead(inputStream, unmarshaller, version);
910
// Note that resetting the timeout value after closing the socket results
911
// in an exception, so the reset is not done in a finally clause. However,
912
// if a catch clause is ever added that does not close the socket, care
913
// must be taken to reset the timeout in that case.
914
if (tempTimeout >= 0)
916
socketWrapper.setTimeout(savedTimeout);
919
catch (SocketException sex)
921
handleRetriableException(socketWrapper, sex, retryCount);
925
catch (EOFException ex)
927
handleRetriableException(socketWrapper, ex, retryCount);
931
catch (IOException e)
933
if (isGeneralizeSocketException() && e.getMessage() != null && RETRIABLE_ERROR_MESSAGE.matcher(e.getMessage()).matches())
935
handleRetriableException(socketWrapper, e, retryCount);
936
sockEx = new SocketException(e.getMessage());
941
return handleOtherException(e, semaphore, socketWrapper, oneway);
946
return handleOtherException(ex, semaphore, socketWrapper, oneway);
949
// call worked, so no need to retry
953
// need to check if ran out of retries
954
if (retryCount >= numberOfCallRetries)
956
handleException(sockEx, socketWrapper);
959
if (response == null && tempTimeout > 0 && timeLeft <= 0)
963
sockEx = new CannotConnectException(
964
"Can not get connection to server. Timed out establishing " +
965
"socket connection for " + locator);
967
handleException(sockEx, socketWrapper);
970
// Put socket back in pool for reuse
973
if (pool.size() < maxPoolSize)
975
pool.add(socketWrapper);
976
if (trace) { log.trace(this + " returned " + socketWrapper + " to pool"); }
980
if (trace) { log.trace(this + "'s pool is full, will close the connection"); }
983
socketWrapper.close();
985
catch (Exception ignored)
990
if (trace) log.trace(this + " released semaphore: " + semaphore.permits());
993
if (trace && !oneway) { log.trace(this + " received response " + response); }
997
protected Object handleException(Exception ex, SocketWrapper socketWrapper)
998
throws ClassNotFoundException, InvocationFailureException
1000
if (ex instanceof ClassNotFoundException)
1002
//TODO: -TME Add better exception handling for class not found exception
1003
log.debug("Error loading classes from remote call result.", ex);
1004
throw (ClassNotFoundException)ex;
1007
if (ex instanceof CannotConnectException)
1009
log.debug(this, ex);
1010
throw (CannotConnectException) ex;
1013
if (ex instanceof InterruptedException)
1015
log.debug(this, ex);
1016
throw new RuntimeException(ex);
1019
throw new InvocationFailureException("Unable to perform invocation", ex);
1022
protected void handleRetriableException(SocketWrapper socketWrapper, Exception e, int retryCount)
1024
if (trace) log.trace(this + "(" + socketWrapper + ") got Exception: " + e);
1028
semaphore.release();
1029
if (trace) log.trace(this + " released semaphore: " + semaphore.permits());
1030
socketWrapper.close();
1032
catch (Exception ex)
1034
if (trace) { log.trace(this + " couldn't successfully close its socketWrapper", ex); }
1038
* About to run out of retries and
1039
* pool may be full of timed out sockets,
1040
* so want to flush the pool and try with
1041
* fresh socket as a last effort.
1043
if (retryCount == (numberOfCallRetries - 2))
1045
flushConnectionPool();
1050
if (retryCount < (numberOfCallRetries - 1))
1052
log.trace(this + " will try again, retries: " + retryCount + " < " + numberOfCallRetries);
1056
log.trace(this + " retries exhausted");
1061
protected Object handleOtherException(Exception ex, Semaphore semaphore, SocketWrapper socketWrapper, boolean oneway)
1062
throws ClassNotFoundException, InvocationFailureException
1064
log.debug(this + " got exception: " + socketWrapper, ex);
1068
semaphore.release();
1069
if (trace) log.trace(this + " released semaphore: " + semaphore.permits());
1070
socketWrapper.close();
1072
catch (Exception ignored)
1079
return handleException(ex, socketWrapper);
1082
protected void initPool()
1084
synchronized (connectionPools)
1086
pool = (LinkedList)connectionPools.get(address);
1087
semaphore = (Semaphore)semaphores.get(address);
1090
pool = new LinkedList();
1091
connectionPools.put(address, pool);
1092
log.debug("Creating semaphore with size " + maxPoolSize);
1093
semaphore = new Semaphore(maxPoolSize);
1094
semaphores.put(address, semaphore);
1100
log.trace(this + " added new pool (" + pool + ") as " + address);
1110
log.trace(this + " using pool (" + pool + ") already defined for " + address);
1117
protected SocketWrapper getConnection(Marshaller marshaller,
1118
UnMarshaller unmarshaller,
1119
boolean tryPool, int timeAllowed)
1122
long start = System.currentTimeMillis();
1123
long timeToWait = (timeAllowed > 0) ? timeAllowed : connectionWait;
1124
boolean timedout = !semaphore.attempt(timeToWait);
1125
if (trace) log.trace(this + " obtained semaphore: " + semaphore.permits());
1129
throw new IllegalStateException("Timeout waiting for a free socket");
1132
SocketWrapper pooled = null;
1138
// if connection within pool, use it
1139
if (pool.size() > 0)
1141
pooled = getPooledConnection();
1142
if (trace) log.trace(this + " reusing pooled connection: " + pooled);
1148
if (trace) log.trace(this + " avoiding connection pool, creating new socket");
1153
//Need to create a new one
1154
Socket socket = null;
1156
if (trace) { log.trace(this + " creating socket "); }
1158
// timeAllowed < 0 indicates no per invocation timeout has been set.
1159
int timeRemaining = -1;
1160
if (0 <= timeAllowed)
1162
timeRemaining = (int) (timeAllowed - (System.currentTimeMillis() - start));
1165
socket = createSocket(address.address, address.port, timeRemaining);
1166
if (trace) log.trace(this + " created socket: " + socket);
1168
socket.setTcpNoDelay(address.enableTcpNoDelay);
1170
Map metadata = getLocator().getParameters();
1171
if (metadata == null)
1173
metadata = new HashMap(2);
1177
metadata = new HashMap(metadata);
1179
metadata.put(SocketWrapper.MARSHALLER, marshaller);
1180
metadata.put(SocketWrapper.UNMARSHALLER, unmarshaller);
1181
if (writeTimeout > 0)
1183
metadata.put(SocketWrapper.WRITE_TIMEOUT, new Integer(writeTimeout));
1185
if (timeAllowed > 0)
1187
timeRemaining = (int) (timeAllowed - (System.currentTimeMillis() - start));
1189
if (timeRemaining <= 0)
1190
throw new IllegalStateException("Timeout creating a new socket");
1192
metadata.put(SocketWrapper.TEMP_TIMEOUT, new Integer(timeRemaining));
1195
pooled = createClientSocket(socket, address.timeout, metadata);
1201
protected SocketWrapper createClientSocket(Socket socket, int timeout, Map metadata)
1204
if (clientSocketConstructor == null)
1206
if(clientSocketClass == null)
1208
clientSocketClass = ClassLoaderUtility.loadClass(clientSocketClassName, getClass());
1211
Class[] args = new Class[]{Socket.class, Map.class, Integer.class};
1212
clientSocketConstructor = clientSocketClass.getConstructor(args);
1215
SocketWrapper clientSocketWrapper = null;
1216
clientSocketWrapper = (SocketWrapper)clientSocketConstructor.
1217
newInstance(new Object[]{socket, metadata, new Integer(timeout)});
1219
return clientSocketWrapper;
1222
protected Socket createSocket(String address, int port, int timeout) throws IOException
1224
Socket s = new Socket();
1226
InetSocketAddress inetAddr = new InetSocketAddress(address, port);
1227
connect(s, inetAddr);
1231
protected void configureSocket(Socket s) throws SocketException
1233
s.setReuseAddress(getReuseAddress());
1235
if (keepAliveSet) s.setKeepAlive(keepAlive);
1236
if (oOBInlineSet) s.setOOBInline(oOBInline);
1237
if (receiveBufferSize > -1) s.setReceiveBufferSize(receiveBufferSize);
1238
if (sendBufferSize > -1) s.setSendBufferSize(sendBufferSize);
1240
soLingerDuration > 0) s.setSoLinger(soLinger, soLingerDuration);
1241
if (trafficClass > -1) s.setTrafficClass(trafficClass);
1244
protected SocketWrapper getPooledConnection()
1246
SocketWrapper socketWrapper = null;
1247
while (pool.size() > 0)
1249
socketWrapper = (SocketWrapper)pool.removeFirst();
1252
if (socketWrapper != null)
1254
if (socketWrapper instanceof OpenConnectionChecker)
1256
((OpenConnectionChecker) socketWrapper).checkOpenConnection();
1258
if (shouldCheckConnection)
1260
socketWrapper.checkConnection();
1261
return socketWrapper;
1265
return socketWrapper;
1269
catch (Exception ex)
1271
if (trace) { log.trace(this + " couldn't reuse connection from pool"); }
1274
socketWrapper.close();
1278
log.debug("Failed to close socket wrapper", e);
1285
// Private --------------------------------------------------------------------------------------
1287
private Object versionedRead(InputStream inputStream, UnMarshaller unmarshaller, int version)
1288
throws IOException, ClassNotFoundException
1290
//TODO: -TME - is switch required?
1293
case Version.VERSION_1:
1294
case Version.VERSION_2:
1295
case Version.VERSION_2_2:
1297
if (trace) { log.trace(this + " reading response from unmarshaller"); }
1298
if (unmarshaller instanceof VersionedUnMarshaller)
1299
return((VersionedUnMarshaller)unmarshaller).read(inputStream, null, version);
1301
return unmarshaller.read(inputStream, null);
1305
throw new IOException("Can not read data for version " + version + ". " +
1306
"Supported versions: " + Version.VERSION_1 + ", " + Version.VERSION_2 + ", " + Version.VERSION_2_2);
1311
private void versionedWrite(OutputStream outputStream, Marshaller marshaller,
1312
Object invocation, int version) throws IOException
1314
//TODO: -TME Should I worry about checking the version here? Only one way to do it at this point
1317
case Version.VERSION_1:
1318
case Version.VERSION_2:
1319
case Version.VERSION_2_2:
1321
if (trace) { log.trace(this + " writing invocation to marshaller"); }
1322
if (marshaller instanceof VersionedMarshaller)
1323
((VersionedMarshaller) marshaller).write(invocation, outputStream, version);
1325
marshaller.write(invocation, outputStream);
1326
if (trace) { log.trace(this + " done writing invocation to marshaller"); }
1332
throw new IOException("Can not write data for version " + version + ". " +
1333
"Supported versions: " + Version.VERSION_1 + ", " + Version.VERSION_2 + ", " + Version.VERSION_2_2);
1338
//TODO: -TME Exact same method in ServerThread
1339
private int readVersion(InputStream inputStream) throws IOException
1341
if (trace) { log.trace(this + " reading version from input stream"); }
1342
int version = inputStream.read();
1343
if (trace) { log.trace(this + " read version " + version + " from input stream"); }
1347
//TODO: -TME Exact same method in ServerThread
1348
private void writeVersion(OutputStream outputStream, int version) throws IOException
1350
if (trace) { log.trace(this + " writing version " + version + " on output stream"); }
1351
outputStream.write(version);
1354
static private void mapJavaBeanProperties(final Object o, final Properties props, final boolean isStrict)
1355
throws IntrospectionException
1357
if (SecurityUtility.skipAccessControl())
1359
PropertyEditors.mapJavaBeanProperties(o, props, isStrict);
1365
AccessController.doPrivileged( new PrivilegedExceptionAction()
1367
public Object run() throws IntrospectionException
1369
PropertyEditors.mapJavaBeanProperties(o, props, isStrict);
1374
catch (PrivilegedActionException e)
1376
throw (IntrospectionException) e.getCause();
1380
static private void connect(final Socket socket, final InetSocketAddress address)
1383
if (SecurityUtility.skipAccessControl())
1385
socket.connect(address);
1391
AccessController.doPrivileged( new PrivilegedExceptionAction()
1393
public Object run() throws Exception
1395
socket.connect(address);
1400
catch (PrivilegedActionException e)
1402
throw (IOException) e.getCause();
1406
static private InetAddress getAddressByName(final String host) throws UnknownHostException
1408
if (SecurityUtility.skipAccessControl())
1410
return InetAddress.getByName(host);
1415
return (InetAddress)AccessController.doPrivileged( new PrivilegedExceptionAction()
1417
public Object run() throws IOException
1419
return InetAddress.getByName(host);
1423
catch (PrivilegedActionException e)
1425
throw (UnknownHostException) e.getCause();
1428
// Inner classes --------------------------------------------------------------------------------