~ubuntu-branches/ubuntu/wily/libjboss-remoting-java/wily

« back to all changes in this revision

Viewing changes to src/main/org/jboss/remoting/ServerInvoker.java

  • Committer: Package Import Robot
  • Author(s): Torsten Werner
  • Date: 2011-09-09 14:01:03 UTC
  • mfrom: (1.1.6 upstream)
  • Revision ID: package-import@ubuntu.com-20110909140103-hqokx61534tas9rg
Tags: 2.5.3.SP1-1
* Newer but not newest upstream release. Do not build samples.
* Change debian/watch to upstream's svn repo.
* Add patch to fix compile error caused by tomcat update.
  (Closes: #628303)
* Switch to source format 3.0.
* Switch to debhelper level 7.
* Remove useless Depends.
* Update Standards-Version: 3.9.2.
* Update README.source.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
* JBoss, Home of Professional Open Source
 
3
* Copyright 2005, JBoss Inc., and individual contributors as indicated
 
4
* by the @authors tag. See the copyright.txt in the distribution for a
 
5
* full listing of individual contributors.
 
6
*
 
7
* This is free software; you can redistribute it and/or modify it
 
8
* under the terms of the GNU Lesser General Public License as
 
9
* published by the Free Software Foundation; either version 2.1 of
 
10
* the License, or (at your option) any later version.
 
11
*
 
12
* This software is distributed in the hope that it will be useful,
 
13
* but WITHOUT ANY WARRANTY; without even the implied warranty of
 
14
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
 
15
* Lesser General Public License for more details.
 
16
*
 
17
* You should have received a copy of the GNU Lesser General Public
 
18
* License along with this software; if not, write to the Free
 
19
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
 
20
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
 
21
*/
 
22
 
 
23
package org.jboss.remoting;
 
24
 
 
25
import org.jboss.remoting.callback.Callback;
 
26
import org.jboss.remoting.callback.InvokerCallbackHandler;
 
27
import org.jboss.remoting.callback.ServerInvokerCallbackHandler;
 
28
import org.jboss.remoting.invocation.InternalInvocation;
 
29
import org.jboss.remoting.invocation.OnewayInvocation;
 
30
import org.jboss.remoting.loading.ClassBytes;
 
31
import org.jboss.remoting.security.SSLSocketBuilder;
 
32
import org.jboss.remoting.security.ServerSocketFactoryMBean;
 
33
import org.jboss.remoting.security.ServerSocketFactoryWrapper;
 
34
import org.jboss.remoting.socketfactory.CreationListenerServerSocketFactory;
 
35
import org.jboss.remoting.socketfactory.SocketCreationListener;
 
36
import org.jboss.remoting.stream.StreamHandler;
 
37
import org.jboss.remoting.stream.StreamInvocationHandler;
 
38
import org.jboss.remoting.transport.PortUtil;
 
39
import org.jboss.remoting.util.SecurityUtility;
 
40
import org.jboss.remoting.serialization.ClassLoaderUtility;
 
41
import org.jboss.util.threadpool.BasicThreadPool;
 
42
import org.jboss.util.threadpool.BlockingMode;
 
43
import org.jboss.util.threadpool.ThreadPool;
 
44
import org.jboss.util.threadpool.ThreadPoolMBean;
 
45
import org.jboss.logging.Logger;
 
46
 
 
47
import java.util.concurrent.ConcurrentHashMap;
 
48
 
 
49
import javax.management.MBeanServer;
 
50
import javax.management.MBeanServerInvocationHandler;
 
51
import javax.management.MalformedObjectNameException;
 
52
import javax.management.ObjectName;
 
53
import javax.net.ServerSocketFactory;
 
54
 
 
55
import java.io.IOException;
 
56
import java.lang.reflect.Constructor;
 
57
import java.net.InetAddress;
 
58
import java.net.UnknownHostException;
 
59
import java.security.AccessController;
 
60
import java.security.PrivilegedAction;
 
61
import java.security.PrivilegedActionException;
 
62
import java.security.PrivilegedExceptionAction;
 
63
import java.util.ArrayList;
 
64
import java.util.Collection;
 
65
import java.util.HashMap;
 
66
import java.util.Iterator;
 
67
import java.util.List;
 
68
import java.util.Map;
 
69
import java.util.Set;
 
70
import java.util.StringTokenizer;
 
71
 
 
72
/**
 
73
 * ServerInvoker is the server-side part of a remote Invoker. The ServerInvoker implementation is
 
74
 * responsible for calling transport, depending on how the protocol receives the incoming data.
 
75
 *
 
76
 * @author <a href="mailto:jhaynie@vocalocity.net">Jeff Haynie</a>
 
77
 * @author <a href="mailto:tom.elrod@jboss.com">Tom Elrod</a>
 
78
 * @author <a href="mailto:ovidiu@jboss.org">Ovidiu Feodorov</a>
 
79
 *
 
80
 * @version $Revision: 5919 $
 
81
 */
 
82
public abstract class ServerInvoker extends AbstractInvoker implements ServerInvokerMBean
 
83
{
 
84
   // Constants ------------------------------------------------------------------------------------
 
85
 
 
86
   protected  static final Logger log = Logger.getLogger(ServerInvoker.class);
 
87
 
 
88
   /**
 
89
    * Key for the the maximum number of threads to be used in the thread pool for one way
 
90
    * invocations (server side).
 
91
    * This property is only used when the default oneway thread pool is used.
 
92
    */
 
93
   public static final String MAX_NUM_ONEWAY_THREADS_KEY = "maxNumThreadsOneway";
 
94
 
 
95
   /**
 
96
    * Key for setting the setting the oneway thread pool to use.
 
97
    * The value used with this key will first be checked to see if is a JMX ObjectName and if so,
 
98
    * try to look up associated mbean for the ObjectName given and cast to type
 
99
    * org.jboss.util.threadpool.ThreadPoolMBean
 
100
    * (via MBeanServerInvocationHandler.newProxyInstance()). If the value is not a JMX ObjectName,
 
101
    * will assume is a fully qualified classname and load the coresponding class and create a new
 
102
    * instance of it (which will require it to have a void constructor). The newly created instance
 
103
    * will then be cast to type of org.jboss.util.threadpool.ThreadPool.
 
104
    */
 
105
   public static final String ONEWAY_THREAD_POOL_CLASS_KEY = "onewayThreadPool";
 
106
 
 
107
   /**
 
108
    * Key for setting the address the server invoker should bind to.
 
109
    * The value can be either host name or IP.
 
110
    */
 
111
   public static final String SERVER_BIND_ADDRESS_KEY = "serverBindAddress";
 
112
 
 
113
   /**
 
114
    * Key for setting the addres the client invoker should connecto to.
 
115
    * This should be used when client will be connecting to server from outside the server's network
 
116
    * and the external address is different from that of the internal address the server invoker
 
117
    * will bind to (e.g. using NAT to expose different external address). This will mostly be useful
 
118
    * when client uses remoting detection to discover remoting servers. The value can be either host
 
119
    * name or IP.
 
120
    */
 
121
   public static final String CLIENT_CONNECT_ADDRESS_KEY = "clientConnectAddress";
 
122
 
 
123
   /**
 
124
    * Key for setting the port the server invoker should bind to.
 
125
    * If the value supplied is less than or equal to zero, the server invoker will randomly choose
 
126
    * a free port to use.
 
127
    */
 
128
   public static final String SERVER_BIND_PORT_KEY = "serverBindPort";
 
129
 
 
130
   /**
 
131
    * key for setting the port the client invoker should connect to.
 
132
    * This should be used when client will be connecting to server from outside the server's network
 
133
    * and the external port is different from that of the internal port the server invoker will bind
 
134
    * to (e.g. using NAT to expose different port routing). This will be mostly useful when client
 
135
    * uses remoting detection to discover remoting servers.
 
136
    */
 
137
   public static final String CLIENT_CONNECT_PORT_KEY = "clientConnectPort";
 
138
 
 
139
   /**
 
140
    * Key used for setting the amount of time (in milliseconds) that a client should renew its
 
141
    * lease.
 
142
    * If this value is not set, the default of five seconds (see DEFAULT_CLIENT_LEASE_PERIOD), will
 
143
    * be used. This value will also be what is given to the client when it initially querys server
 
144
    * for leasing information.
 
145
    */
 
146
   public static final String CLIENT_LEASE_PERIOD = "clientLeasePeriod";
 
147
 
 
148
   /**
 
149
    * Key for setting the timeout value (in milliseconds) for socket connections.
 
150
    */
 
151
   public static final String TIMEOUT = "timeout";
 
152
 
 
153
   /**
 
154
    * Key for setting the value for the server socket factory to be used by the server invoker.
 
155
    * The value can be either a JMX Object name, in which case will lookup the mbean and create
 
156
    * a proxy to it with type of org.jboss.remoting.security.ServerSocketFactoryMBean
 
157
    * (via MBeanServerInvocationHandler.newProxyInstance()).  If not a JMX ObjectName, will assume
 
158
    * is the fully qualified classname to the implementation to be used and will load the class,
 
159
    * create a new instance of it (which requires it to have a void constructor). The instance will
 
160
    * then be cast to type javax.net.ServerSocketFactory.
 
161
    */
 
162
   public static final String SERVER_SOCKET_FACTORY = "serverSocketFactory";
 
163
 
 
164
   /**
 
165
    * The max number of worker threads to be used in the pool for processing one way calls on the
 
166
    * server side. Value is is 100.
 
167
    */
 
168
   public static final int MAX_NUM_ONEWAY_THREADS = 100;
 
169
 
 
170
   /**
 
171
    * Key for the configuration map that determines the queue size for waiting asynchronous
 
172
    * invocations.
 
173
    */
 
174
   public static final String MAX_ONEWAY_THREAD_POOL_QUEUE_SIZE = "maxOnewayThreadPoolQueueSize";
 
175
   
 
176
   /**
 
177
    * The default lease period for clients. This is the number of milliseconds that a client will be
 
178
    * required to renew their lease with the server. The default value is 5 seconds.
 
179
    */
 
180
   public static final int DEFAULT_CLIENT_LEASE_PERIOD = 5000;
 
181
 
 
182
   /**
 
183
    * The default timeout period for socket connections. The default value is 60000 milliseconds.
 
184
    */
 
185
   public static final int DEFAULT_TIMEOUT_PERIOD = 60000;
 
186
   
 
187
   /**
 
188
    * The key to be used to determine if pull callbacks should be obtained in 
 
189
    *  blocking or nonblocking mode
 
190
    */
 
191
   public static final String BLOCKING_MODE = "blockingMode";
 
192
   
 
193
   /**
 
194
    * The key value to use to specify timeout for getting callbacks in blocking mode
 
195
    */ 
 
196
   public static final String BLOCKING_TIMEOUT = "blockingTimeout";
 
197
   
 
198
   /**
 
199
    * The value associated with BLOCKING_MODE that indicates that pull callbacks
 
200
    * should be obtained in blocking mode;
 
201
    */
 
202
   public static final String BLOCKING = "blocking";
 
203
   
 
204
   /**
 
205
    * The value associated with BLOCKING_MODE that indicates that pull callbacks
 
206
    * should be obtained in nonblocking mode;
 
207
    */
 
208
   public static final String NONBLOCKING = "nonblocking";
 
209
   
 
210
   /**
 
211
    * Default timeout for getting callbacks in blocking mode.
 
212
    * Default is 5000 milliseconds.
 
213
    */
 
214
   public static final int DEFAULT_BLOCKING_TIMEOUT = 5000;
 
215
   
 
216
   
 
217
   /**
 
218
    * The key to use to specify if ServerInvokerCallbackHandlers should be
 
219
    * registered as ConnectionListeners.
 
220
    */
 
221
   public static final String REGISTER_CALLBACK_LISTENER = "registerCallbackListener";
 
222
   
 
223
   public static final String ECHO = "$ECHO$";
 
224
   
 
225
   public static final String INVOKER_SESSION_ID = "invokerSessionId";
 
226
   
 
227
   public static final String CONNECTION_LISTENER = "connectionListener";
 
228
 
 
229
 
 
230
   // Static ---------------------------------------------------------------------------------------
 
231
 
 
232
   private static boolean trace = log.isTraceEnabled();
 
233
   private static final InetAddress LOCAL_HOST;
 
234
 
 
235
   static
 
236
   {
 
237
      try
 
238
      {
 
239
         LOCAL_HOST = (InetAddress) AccessController.doPrivileged( new PrivilegedExceptionAction()
 
240
         {
 
241
            public Object run() throws UnknownHostException
 
242
            {
 
243
               try
 
244
               {
 
245
                  return InetAddress.getLocalHost();
 
246
               }
 
247
               catch (UnknownHostException e)
 
248
               {
 
249
                  return InetAddress.getByName("127.0.0.1");
 
250
               }
 
251
            }
 
252
         });
 
253
      }
 
254
      catch (PrivilegedActionException e)
 
255
      {
 
256
         log.debug(ServerInvoker.class.getName() + " unable to get local host address", e.getCause());
 
257
         throw new ExceptionInInitializerError(e.getCause());
 
258
      }
 
259
      catch (SecurityException e)
 
260
      {
 
261
         log.debug(ServerInvoker.class.getName() + " unable to get local host address", e);
 
262
         throw e;
 
263
      }
 
264
   }
 
265
 
 
266
   // Attributes -----------------------------------------------------------------------------------
 
267
 
 
268
   /**
 
269
    * Indicated the max number of threads used within oneway thread pool.
 
270
    */
 
271
   private int maxNumberThreads = MAX_NUM_ONEWAY_THREADS;
 
272
   private int maxOnewayThreadPoolQueueSize = -1;
 
273
   private String onewayThreadPoolClass = null;
 
274
   private ThreadPool onewayThreadPool;
 
275
   private Object onewayThreadPoolLock = new Object();
 
276
   private boolean created = false;
 
277
 
 
278
   private MBeanServer mbeanServer = null;
 
279
 
 
280
   private String dataType;
 
281
   private String serverBindAddress = null;
 
282
   private int serverBindPort = 0;
 
283
   private String clientConnectAddress = null;
 
284
   private int clientConnectPort = -1;
 
285
   
 
286
   protected List connectHomes = new ArrayList();
 
287
   protected List homes = new ArrayList();
 
288
   
 
289
   private int timeout = DEFAULT_TIMEOUT_PERIOD;
 
290
 
 
291
   // indicates the lease timeout period for clients
 
292
   private long leasePeriod = DEFAULT_CLIENT_LEASE_PERIOD;
 
293
   private boolean leaseManagement = false;
 
294
   private Map clientLeases = new ConcurrentHashMap();
 
295
 
 
296
   protected Map handlers = new HashMap();
 
297
   
 
298
   // If there is only one handler we store a direct reference to it, as an optimisation
 
299
   // to avoid lookup in this common case - TLF
 
300
   protected volatile ServerInvocationHandler singleHandler;
 
301
   
 
302
   // If there is only one callback container we store a direct reference to it, as an optimisation
 
303
   // to avoid lookup in this common case - TLF
 
304
   protected volatile CallbackContainer singleCallbackContainer;
 
305
      
 
306
   protected Map callbackHandlers = new HashMap();
 
307
   protected Map clientCallbackListener = new HashMap();
 
308
   protected boolean started = false;
 
309
   protected ConnectionNotifier connectionNotifier = new ConnectionNotifier();
 
310
   protected ServerSocketFactory serverSocketFactory = null;
 
311
   
 
312
   protected boolean registerCallbackListeners = true;
 
313
   
 
314
   protected boolean useClientConnectionIdentity;
 
315
 
 
316
   // Constructors ---------------------------------------------------------------------------------
 
317
 
 
318
   public ServerInvoker(InvokerLocator locator)
 
319
   {
 
320
      super(locator);
 
321
      Map params = locator.getParameters();
 
322
      if(configuration != null && params != null)
 
323
      {
 
324
         configuration.putAll(locator.getParameters());
 
325
      }
 
326
   }
 
327
 
 
328
   public ServerInvoker(InvokerLocator locator, Map configuration)
 
329
   {
 
330
      super(locator, configuration);
 
331
   }
 
332
 
 
333
   // Public ---------------------------------------------------------------------------------------
 
334
 
 
335
   public void setServerSocketFactory(ServerSocketFactory serverSocketFactory)
 
336
   {
 
337
      this.serverSocketFactory = serverSocketFactory;
 
338
   }
 
339
 
 
340
   public ServerSocketFactory getServerSocketFactory()
 
341
   {
 
342
      return serverSocketFactory;
 
343
   }
 
344
 
 
345
   /**
 
346
    * Sets timeout (in millseconds) to be used for the socket connection.
 
347
    */
 
348
   public void setTimeout(int timeout)
 
349
   {
 
350
      this.timeout = timeout;
 
351
   }
 
352
 
 
353
   /**
 
354
    * The timeout (in milliseconds) used for the socket connection.
 
355
    */
 
356
   public int getTimeout()
 
357
   {
 
358
      return timeout;
 
359
   }
 
360
   
 
361
   public ConnectionNotifier getConnectionNotifier()
 
362
   {
 
363
      return connectionNotifier;
 
364
   }
 
365
 
 
366
   public boolean isLeaseActivated()
 
367
   {
 
368
      return leaseManagement;
 
369
   }
 
370
 
 
371
   public void addConnectionListener(ConnectionListener listener)
 
372
   {
 
373
      if(listener != null)
 
374
      {
 
375
         connectionNotifier.addListener(listener);
 
376
 
 
377
         if(leasePeriod > 0)
 
378
         {
 
379
            leaseManagement = true;
 
380
         }
 
381
      }
 
382
      else
 
383
      {
 
384
         throw new IllegalArgumentException("Can not add null ConnectionListener.");
 
385
      }
 
386
   }
 
387
   
 
388
   public void setConnectionListener(Object listener)
 
389
   {
 
390
      if (listener == null)
 
391
      {
 
392
         log.error("ConnectionListener is null");
 
393
         return;
 
394
      }
 
395
      
 
396
      if (listener instanceof ConnectionListener)
 
397
      {
 
398
         addConnectionListener((ConnectionListener) listener);
 
399
         return;
 
400
      }
 
401
 
 
402
      if (!(listener instanceof String))
 
403
      {
 
404
         log.error("Object supplied as ConnectionListener is neither String nor ConnectionListener");
 
405
         return;
 
406
      }
 
407
 
 
408
      ConnectionListener connectionListener = null;
 
409
      try
 
410
      {
 
411
         MBeanServer server = getMBeanServer();
 
412
         ObjectName objName = new ObjectName((String) listener);
 
413
         Class c = ConnectionListener.class;
 
414
         Object o = MBeanServerInvocationHandler.newProxyInstance(server, objName, c, false);
 
415
         connectionListener = (ConnectionListener) o;
 
416
      }
 
417
      catch (MalformedObjectNameException e)
 
418
      {
 
419
         log.debug("Object supplied as ConnectionListener is not an object name.");
 
420
      }
 
421
 
 
422
      if (connectionListener == null)
 
423
      {
 
424
         try
 
425
         {
 
426
            Class listenerClass = ClassLoaderUtility.loadClass((String) listener, ServerInvoker.class);
 
427
            connectionListener = (ConnectionListener) listenerClass.newInstance();
 
428
         }
 
429
         catch (Exception e)
 
430
         {
 
431
            log.error("Unable to instantiate " + listener + ": " + e.getMessage());
 
432
            return;
 
433
         }
 
434
      }
 
435
 
 
436
      if (connectionListener == null)
 
437
      {
 
438
         log.error("Unable to create ConnectionListener from " + listener);
 
439
         return;
 
440
      }
 
441
      
 
442
      addConnectionListener(connectionListener);
 
443
   }
 
444
 
 
445
   public void removeConnectionListener(ConnectionListener listener)
 
446
   {
 
447
      if(connectionNotifier != null)
 
448
      {
 
449
         connectionNotifier.removeListener(listener);
 
450
 
 
451
         // turn off lease management if no listeners (since no one to tell client died)
 
452
         if(connectionNotifier.size() == 0)
 
453
         {
 
454
            leaseManagement = false;
 
455
 
 
456
            // go through any existing leases and terminate them
 
457
            Set clientKeys = clientLeases.keySet();
 
458
            Iterator itr = clientKeys.iterator();
 
459
            while(itr.hasNext())
 
460
            {
 
461
               String sessionId = (String)itr.next();
 
462
               Lease clientLease = (Lease)clientLeases.get(sessionId);
 
463
               clientLease.terminateLease(sessionId);
 
464
            }
 
465
            clientLeases.clear();
 
466
         }
 
467
      }
 
468
   }
 
469
 
 
470
   /**
 
471
    * Sets the amount of time (in milliseconds) that a client should renew its lease. If this value
 
472
    * is not set, the default of five seconds (see DEFAULT_CLIENT_LEASE_PERIOD), will be used. This
 
473
    * value will also be what is given to the client when it initially querys server for leasing
 
474
    * information. If set after create() method called, this value will override value set by
 
475
    * CLIENT_LEASE_PERIOD key.
 
476
    */
 
477
   public void setLeasePeriod(long leasePeriodValue)
 
478
   {
 
479
      this.leasePeriod = leasePeriodValue;
 
480
 
 
481
      if (leasePeriod <= 0)
 
482
      {
 
483
          this.leaseManagement = false;
 
484
      }
 
485
      else
 
486
      {
 
487
         if(connectionNotifier != null && connectionNotifier.size() > 0)
 
488
         {
 
489
            this.leaseManagement = true;
 
490
         }
 
491
      }
 
492
   }
 
493
 
 
494
   public Lease getLease(String sessionId)
 
495
   {
 
496
      return (Lease) clientLeases.get(sessionId);
 
497
   }
 
498
   
 
499
   /**
 
500
    * Gets the amount of time (in milliseconds) that a client should renew its lease.
 
501
    */
 
502
   public long getLeasePeriod()
 
503
   {
 
504
      return leasePeriod;
 
505
   }
 
506
 
 
507
   /**
 
508
    * @jmx:managed-attribute
 
509
    */
 
510
   public String getClientConnectAddress()
 
511
   {
 
512
      return clientConnectAddress;
 
513
   }
 
514
 
 
515
   public int getClientConnectPort()
 
516
   {
 
517
      return clientConnectPort;
 
518
   }
 
519
 
 
520
   public void setClientConnectPort(int clientConnectPort)
 
521
   {
 
522
      this.clientConnectPort = clientConnectPort;
 
523
   }
 
524
 
 
525
   /**
 
526
    * This method should only be called by the service controller when this invoker is specified
 
527
    * within the Connector configuration of a service xml. Calling this directly will have no
 
528
    * effect, as will be used in building the locator uri that is published for detection and this
 
529
    * happens when the invoker is first created and started (after that, no one will be aware of a
 
530
    * change).
 
531
    *
 
532
    * @jmx:managed-attribute
 
533
    */
 
534
   public void setClientConnectAddress(String clientConnectAddress)
 
535
   {
 
536
      this.clientConnectAddress = clientConnectAddress;
 
537
   }
 
538
 
 
539
   public String getServerBindAddress()
 
540
   {
 
541
      return serverBindAddress;
 
542
   }
 
543
 
 
544
   public int getServerBindPort()
 
545
   {
 
546
      return serverBindPort;
 
547
   }
 
548
   
 
549
   public List getConnectHomes()
 
550
   {
 
551
      return new ArrayList(connectHomes);
 
552
   }
 
553
   
 
554
   public void setConnectHomes(List connectHomes)
 
555
   {
 
556
      this.connectHomes = new ArrayList(connectHomes);
 
557
   }
 
558
   
 
559
   public List getHomes()
 
560
   {
 
561
      return new ArrayList(homes);
 
562
   }
 
563
   
 
564
   public void setHomes(List homes)
 
565
   {
 
566
      this.homes = new ArrayList(homes);
 
567
   }
 
568
 
 
569
   /**
 
570
    * Sets the maximum number of thread to be used in the thread pool for one way invocations
 
571
    * (server side). This property is only used when the default oneway thread pool is used. If set
 
572
    * after create() method called, this value will override value set by MAX_NUM_ONEWAY_THREADS_KEY
 
573
    * key.
 
574
    */
 
575
   public void setMaxNumberOfOnewayThreads(int numOfThreads)
 
576
   {
 
577
      this.maxNumberThreads = numOfThreads;
 
578
   }
 
579
 
 
580
   /**
 
581
    * Gets the maximum number of thread to be used in the thread pool for one way invocations
 
582
    * (server side).
 
583
    */
 
584
   public int getMaxNumberOfOnewayThreads()
 
585
   {
 
586
      return this.maxNumberThreads;
 
587
   }
 
588
 
 
589
   /**
 
590
    * Gets the oneway thread pool to use.
 
591
    */
 
592
   public ThreadPool getOnewayThreadPool()
 
593
   {
 
594
      synchronized (onewayThreadPoolLock)
 
595
      {
 
596
         if(onewayThreadPool == null)
 
597
         {
 
598
            // if no thread pool class set, then use default BasicThreadPool
 
599
            if(onewayThreadPoolClass == null || onewayThreadPoolClass.length() == 0)
 
600
            {
 
601
               BasicThreadPool pool = new BasicThreadPool("JBossRemoting Server Oneway");
 
602
               pool.setMaximumPoolSize(maxNumberThreads);
 
603
               if (maxOnewayThreadPoolQueueSize > 0)
 
604
                  pool.setMaximumQueueSize(maxOnewayThreadPoolQueueSize);
 
605
               pool.setBlockingMode(BlockingMode.RUN);
 
606
               onewayThreadPool = pool;
 
607
               log.debug(this + " created new thread pool");
 
608
            }
 
609
            else
 
610
            {
 
611
               //first check to see if this is an ObjectName
 
612
               boolean isObjName = false;
 
613
               try
 
614
               {
 
615
                  ObjectName objName = new ObjectName(onewayThreadPoolClass);
 
616
                  onewayThreadPool = createThreadPoolProxy(objName);
 
617
                  isObjName = true;
 
618
               }
 
619
               catch(MalformedObjectNameException e)
 
620
               {
 
621
                  log.debug("Thread pool class supplied is not an object name.");
 
622
               }
 
623
               
 
624
               if(!isObjName)
 
625
               {
 
626
                  try
 
627
                  {
 
628
                     onewayThreadPool = (ThreadPool)Class.
 
629
                     forName(onewayThreadPoolClass, false, getClassLoader()).newInstance();
 
630
                  }
 
631
                  catch(Exception e)
 
632
                  {
 
633
                     throw new RuntimeException("Error loading instance of ThreadPool based " +
 
634
                           "on class name " + onewayThreadPoolClass);
 
635
                  }
 
636
               }
 
637
            }
 
638
         }
 
639
         else
 
640
         {
 
641
            log.trace("reusing oneway thread pool");
 
642
         }
 
643
         return onewayThreadPool;
 
644
      }
 
645
   }
 
646
 
 
647
   /**
 
648
    * Sets the oneway thread pool to use.
 
649
    */
 
650
   public void setOnewayThreadPool(ThreadPool pool)
 
651
   {
 
652
      this.onewayThreadPool = pool;
 
653
   }
 
654
 
 
655
   public MBeanServer getMBeanServer()
 
656
   {
 
657
      return mbeanServer;
 
658
   }
 
659
 
 
660
   public void setMBeanServer(MBeanServer server)
 
661
   {
 
662
      // This has been added in order to support mbean service configuration. Now supporting
 
663
      // classes, such as the ServerInvokerCallbackHandler can find and use resources such as
 
664
      // CallbackStore, which can be run as a service mbean (and specified via object name within
 
665
      // config). The use of JMX throughout remoting is a problem as now have to tie it in all
 
666
      // throughout the code for service configuration as is being done here. When migrate to use
 
667
      // under new server model, which does not depend on JMX, can rip out code such as this.
 
668
      this.mbeanServer = server;
 
669
   }
 
670
 
 
671
   public boolean isRegisterCallbackListeners()
 
672
   {
 
673
      return registerCallbackListeners;
 
674
   }
 
675
 
 
676
   public void setRegisterCallbackListeners(boolean registerCallbackListeners)
 
677
   {
 
678
      this.registerCallbackListeners = registerCallbackListeners;
 
679
   }
 
680
 
 
681
   /**
 
682
    * @return true if a server invocation handler has been registered for this subsystem.
 
683
    */
 
684
   public synchronized boolean hasInvocationHandler(String subsystem)
 
685
   {
 
686
      return handlers.containsKey(subsystem);
 
687
   }
 
688
 
 
689
   /**
 
690
    * @return an array of keys for each subsystem this invoker can handle.
 
691
    */
 
692
   public synchronized String[] getSupportedSubsystems()
 
693
   {
 
694
      String subsystems [] = new String[handlers.size()];
 
695
      return (String[]) handlers.keySet().toArray(subsystems);
 
696
   }
 
697
 
 
698
   /**
 
699
    * @return an array of the server invocation handlers.
 
700
    */
 
701
   public synchronized ServerInvocationHandler[] getInvocationHandlers()
 
702
   {
 
703
      ServerInvocationHandler ih [] = new ServerInvocationHandler[handlers.size()];
 
704
      return (ServerInvocationHandler[]) handlers.values().toArray(ih);
 
705
   }
 
706
 
 
707
   /**
 
708
    * Add a server invocation handler for a particular subsystem. Typically, subsystems are defined
 
709
    * in org.jboss.remoting.Subsystem, however, this can be any string that the caller knows about.
 
710
    *
 
711
    * @return previous ServerInvocationHandler with the same sybsystem value (case insensitive) or
 
712
    *         null if a previous one did not exist.
 
713
    */
 
714
   public synchronized ServerInvocationHandler addInvocationHandler(String subsystem,
 
715
                                                                    ServerInvocationHandler handler)
 
716
   {
 
717
      handler.setInvoker(this);
 
718
 
 
719
      ServerInvocationHandler oldHandler =
 
720
         (ServerInvocationHandler)handlers.put(subsystem.toUpperCase(), handler);
 
721
 
 
722
      log.debug(this + " added " + handler + " for subsystem '" + subsystem + "'" +
 
723
         (oldHandler == null ? "" : ", replacing old handler " + oldHandler));
 
724
            
 
725
      if (handlers.size() == 1)
 
726
      {
 
727
         singleHandler = handler;
 
728
      }
 
729
      else
 
730
      {
 
731
         singleHandler = null;
 
732
      }
 
733
 
 
734
      return oldHandler;
 
735
   }
 
736
 
 
737
   /**
 
738
    * Remove a subsystem invocation handler.
 
739
    */
 
740
   public synchronized ServerInvocationHandler removeInvocationHandler(String subsystem)
 
741
   {
 
742
      ServerInvocationHandler handler =
 
743
         (ServerInvocationHandler)handlers.remove(subsystem.toUpperCase());
 
744
 
 
745
      log.debug(this + (handler == null ?
 
746
         " tried to remove handler for " + subsystem + " but no handler found" :
 
747
         " removed handler " + handler + " for subsystem '" + subsystem + "'"));
 
748
      
 
749
      if (handlers.size() == 1)
 
750
      {
 
751
         singleHandler = (ServerInvocationHandler)handlers.values().iterator().next();
 
752
      }
 
753
      else
 
754
      {
 
755
         singleHandler = null;
 
756
      }
 
757
 
 
758
      return handler;
 
759
   }
 
760
 
 
761
   /**
 
762
    * Get a ServerInvocationHandler for a given subsystem type.
 
763
    */
 
764
   public synchronized ServerInvocationHandler getInvocationHandler(String subsystem)
 
765
   {
 
766
      return (ServerInvocationHandler) handlers.get(subsystem.toUpperCase());
 
767
   }
 
768
 
 
769
   protected boolean isUseClientConnectionIdentity()
 
770
   {
 
771
      return useClientConnectionIdentity;
 
772
   }
 
773
 
 
774
   protected void setUseClientConnectionIdentity(boolean useClientConnectionIdentity)
 
775
   {
 
776
      this.useClientConnectionIdentity = useClientConnectionIdentity;
 
777
   }
 
778
 
 
779
   public Object invoke(Object invoke) throws IOException
 
780
   {
 
781
      InvocationRequest request = null;
 
782
      InvocationResponse response = null;
 
783
 
 
784
      if(trace) { log.trace("server received invocation " + invoke); }
 
785
 
 
786
      if(invoke != null && invoke instanceof InvocationRequest)
 
787
      {
 
788
         request = (InvocationRequest) invoke;
 
789
         try
 
790
         {
 
791
 
 
792
            Object result = invoke(request);
 
793
 
 
794
            response = new InvocationResponse(request.getSessionId(),
 
795
                                              result, false, request.getReturnPayload());
 
796
 
 
797
         }
 
798
         catch(Throwable throwable)
 
799
         {
 
800
            response = new InvocationResponse(request.getSessionId(),
 
801
                                              throwable, true, request.getReturnPayload());
 
802
         }
 
803
      }
 
804
      else
 
805
      {
 
806
         log.error("server invoker received " + invoke + " as invocation. " +
 
807
            "Must not be null and must be of type InvocationRequest.");
 
808
 
 
809
         Exception e = new Exception("Error processing invocation request on " + getLocator() +
 
810
            ". Either invocation was null or of wrong type.");
 
811
 
 
812
         response =
 
813
            new InvocationResponse(request.getSessionId(), e, true, request.getReturnPayload());
 
814
      }
 
815
      return response;
 
816
   }
 
817
 
 
818
   /**
 
819
    * Processes invocation request depending on the invocation type (internal, name based, oneway,
 
820
    * etc). Can be called on directly when client and server are local to one another (by-passing
 
821
    * serialization).
 
822
    */
 
823
   public Object invoke(InvocationRequest invocation) throws Throwable
 
824
   {
 
825
      if (isStarted())
 
826
      {
 
827
         Object param = invocation.getParameter();
 
828
         Object result = null;
 
829
 
 
830
         if (trace) { log.trace(this + " received " + param); }
 
831
 
 
832
         if (ECHO.equals(param))
 
833
         {
 
834
            return ECHO;
 
835
         }
 
836
 
 
837
         if (param instanceof String)
 
838
         {
 
839
            // check to see if this is a is alive ping
 
840
            if ("$PING$".equals(param))
 
841
            {
 
842
               Map metadata = invocation.getRequestPayload();
 
843
               if (metadata != null)
 
844
               {
 
845
                  String invokerSessionId = (String) metadata.get(INVOKER_SESSION_ID);
 
846
                  if (invokerSessionId != null)
 
847
                  {
 
848
                     // Comes from ConnectionValidator configured to tie validation with lease.
 
849
                     boolean response = checkForClientLease(invokerSessionId);
 
850
                     if (trace) log.trace(this + " responding " + response + " to $PING$ for invoker sessionId " + invokerSessionId);
 
851
                     return new Boolean(response);
 
852
                  }
 
853
               }
 
854
 
 
855
               if (leaseManagement)
 
856
               {
 
857
                  // Otherwise, it's a normal PING.  NOTE we only update the lease when we
 
858
                  // receive a PING, not for all invocations.
 
859
                  updateClientLease(invocation);
 
860
               }
 
861
 
 
862
               // if this is an invocation ping, just pong back
 
863
               Map responseMap = new HashMap();
 
864
               responseMap.put(CLIENT_LEASE_PERIOD, new Long(leasePeriod));
 
865
 
 
866
               InvocationResponse ir = new InvocationResponse(invocation.getSessionId(),
 
867
                     new Boolean(leaseManagement),
 
868
                     false, responseMap);
 
869
 
 
870
               if (trace) { log.trace(this + " returning " + ir); }
 
871
               return ir;
 
872
            }
 
873
 
 
874
            if ("$GET_CLIENT_LOCAL_ADDRESS$".equals(param))
 
875
            {
 
876
               InetAddress address = null;
 
877
               if (invocation.getRequestPayload() != null)
 
878
                  address = (InetAddress) invocation.getRequestPayload().get(Remoting.CLIENT_ADDRESS);
 
879
 
 
880
               return address;
 
881
            }
 
882
 
 
883
            if ("$DISCONNECT$".equals(param))
 
884
            {
 
885
               if (leaseManagement)
 
886
               {
 
887
                  terminateLease(invocation);
 
888
               }
 
889
 
 
890
               if (trace) { log.trace(this + " returning null"); }
 
891
               return null;
 
892
            }
 
893
         }
 
894
 
 
895
         //TODO: -TME both oneway and internal invocation will be broken since have not
 
896
         // deserialized the para yet (removed ClassUtil.deserialize() so would let handler do it).
 
897
         if (param instanceof OnewayInvocation)
 
898
         {
 
899
            // no point in delaying return to client if oneway
 
900
            handleOnewayInvocation((OnewayInvocation)param, invocation);
 
901
            
 
902
            return null;
 
903
         }
 
904
         else
 
905
         {
 
906
            String subsystem = invocation.getSubsystem();
 
907
            String clientId = invocation.getSessionId();
 
908
 
 
909
            //I have optimised this, so that if there is only one handler set (a very common case)
 
910
            //then it will just use that without having to do a lookup or HashMap iteration over
 
911
            //values
 
912
            
 
913
            ServerInvocationHandler handler = findInvocationHandler(subsystem);
 
914
 
 
915
            if (param instanceof InternalInvocation)
 
916
            {
 
917
               result = handleInternalInvocation((InternalInvocation)param, invocation, handler);
 
918
            }
 
919
            else
 
920
            {
 
921
               if (trace) { log.trace(this + " dispatching " + invocation + " from client " + clientId + " to subsystem '" + subsystem + "'"); }
 
922
 
 
923
               if (handler == null)
 
924
               {
 
925
                  throw new InvalidConfigurationException(
 
926
                     "Can not handle invocation request for subsystem '" + subsystem + "' because " +
 
927
                     "there are no matching ServerInvocationHandlers registered. Please add via " +
 
928
                     "xml configuration or via the Connector's addInvocationHandler() method.");
 
929
               }
 
930
               result = handler.invoke(invocation);
 
931
            }
 
932
 
 
933
            if (trace) { log.trace(this + " successfully dispatched invocation, returning " + result + " from subsystem '" + subsystem + "' to client " + clientId); }
 
934
         }
 
935
 
 
936
         return result;
 
937
      }
 
938
      else
 
939
      {
 
940
         log.warn(this + " can not process invocation requests since is not in started state!");
 
941
         throw new InvalidStateException(
 
942
            "Can not process invocation request since is not in started state.");
 
943
      }
 
944
   }
 
945
 
 
946
   /**
 
947
    * Will get the data type for the marshaller factory so know which marshaller to get to marshal
 
948
    * the data. Will first check the locator uri for a 'datatype' parameter and take that value if
 
949
    * it exists. Otherwise, will use the default datatype for the client invoker, based on
 
950
    * transport.
 
951
    */
 
952
   public String getDataType()
 
953
   {
 
954
      if(dataType == null)
 
955
      {
 
956
         dataType = getDataType(getLocator());
 
957
         if(dataType == null)
 
958
         {
 
959
            dataType = getDefaultDataType();
 
960
         }
 
961
      }
 
962
      return dataType;
 
963
   }
 
964
 
 
965
   public void create()
 
966
   {
 
967
      if(!created)
 
968
      {
 
969
         try
 
970
         {
 
971
            setup();
 
972
         }
 
973
         catch(Exception e)
 
974
         {
 
975
            throw new RuntimeException("Error setting up server invoker " + this, e);
 
976
         }
 
977
         created = true;
 
978
      }
 
979
   }
 
980
 
 
981
   /**
 
982
    * Subclasses should override to provide any specific start logic.
 
983
    */
 
984
   public void start() throws IOException
 
985
   {
 
986
      started = true;
 
987
      log.debug(this + " started for locator " + getLocator());
 
988
   }
 
989
 
 
990
   /**
 
991
    * @return true if the server invoker is started, false if not.
 
992
    */
 
993
   public boolean isStarted()
 
994
   {
 
995
      return started;
 
996
   }
 
997
 
 
998
   /**
 
999
    * Subclasses should override to provide any specific stop logic.
 
1000
    */
 
1001
   public void stop()
 
1002
   {
 
1003
      started = false;
 
1004
 
 
1005
      for(Iterator i = callbackHandlers.values().iterator(); i.hasNext(); )
 
1006
      {
 
1007
         ServerInvokerCallbackHandler callbackHandler = (ServerInvokerCallbackHandler)i.next();
 
1008
         callbackHandler.destroy();
 
1009
      }
 
1010
 
 
1011
      log.debug(this + " stopped");
 
1012
   }
 
1013
 
 
1014
   /**
 
1015
    * Destory the invoker permanently.
 
1016
    */
 
1017
   public void destroy()
 
1018
   {
 
1019
      if(classbyteloader != null)
 
1020
      {
 
1021
         classbyteloader.destroy();
 
1022
      }
 
1023
   }
 
1024
 
 
1025
   /**
 
1026
    * Sets the server invoker's transport specific configuration. Will need to set before calling
 
1027
    * start() method (or at least stop() and start() again) before configurations will take affect.
 
1028
    */
 
1029
   public void setConfiguration(Map configuration)
 
1030
   {
 
1031
      this.configuration = configuration;
 
1032
   }
 
1033
 
 
1034
   /**
 
1035
    * Gets the server invoker's transport specific configuration.
 
1036
    */
 
1037
   public Map getConfiguration()
 
1038
   {
 
1039
      return configuration;
 
1040
   }
 
1041
 
 
1042
   public void removeCallbackListener(String subsystem, InvokerCallbackHandler callbackHandler)
 
1043
   {
 
1044
      ServerInvocationHandler handler = null;
 
1045
      if(subsystem != null)
 
1046
      {
 
1047
         handler = (ServerInvocationHandler) handlers.get(subsystem.toUpperCase());
 
1048
      }
 
1049
      else
 
1050
      {
 
1051
         // subsystem not specified, so will hope for a default one being set
 
1052
         if(!handlers.isEmpty())
 
1053
         {
 
1054
            handler = (ServerInvocationHandler) handlers.values().iterator().next();
 
1055
         }
 
1056
      }
 
1057
      handler.removeListener(callbackHandler);
 
1058
   }
 
1059
 
 
1060
   /**
 
1061
    * @return the String for the object name to be used for the invoker.
 
1062
    */
 
1063
   public String getMBeanObjectName()
 
1064
   {
 
1065
      InvokerLocator locator = getLocator();
 
1066
      StringBuffer buffer =
 
1067
         new StringBuffer("jboss.remoting:service=invoker,transport=" + locator.getProtocol());
 
1068
      String host = locator.getHost();
 
1069
      boolean isIPv6 = host.indexOf("[") >= 0 | host.indexOf(":") >= 0;
 
1070
      
 
1071
      buffer.append(",host=");
 
1072
      if (isIPv6)
 
1073
         buffer.append("\"");
 
1074
      buffer.append(locator.getHost());
 
1075
      if (isIPv6)
 
1076
         buffer.append("\"");
 
1077
      
 
1078
      buffer.append(",port=").append(locator.getPort());
 
1079
      Map param = locator.getParameters();
 
1080
      if(param != null)
 
1081
      {
 
1082
         Iterator itr = param.keySet().iterator();
 
1083
         while(itr.hasNext())
 
1084
         {
 
1085
            buffer.append(",");
 
1086
            String key = (String) itr.next();
 
1087
            String value = (String) param.get(key);
 
1088
            buffer.append(key);
 
1089
            buffer.append("=");
 
1090
            buffer.append(value);
 
1091
         }
 
1092
      }
 
1093
 
 
1094
      return buffer.toString();
 
1095
   }
 
1096
 
 
1097
   // Package protected ----------------------------------------------------------------------------
 
1098
 
 
1099
   // Protected ------------------------------------------------------------------------------------
 
1100
 
 
1101
   protected abstract String getDefaultDataType();
 
1102
 
 
1103
   protected void setup() throws Exception
 
1104
   {
 
1105
      Map config = getConfiguration();
 
1106
      PortUtil.updateRange(config);
 
1107
      setupHomes(config);
 
1108
      
 
1109
      String maxNumOfThreads = (String)config.get(MAX_NUM_ONEWAY_THREADS_KEY);
 
1110
 
 
1111
      if(maxNumOfThreads != null && maxNumOfThreads.length() > 0)
 
1112
      {
 
1113
         try
 
1114
         {
 
1115
            maxNumberThreads = Integer.parseInt(maxNumOfThreads);
 
1116
         }
 
1117
         catch(NumberFormatException e)
 
1118
         {
 
1119
            log.error("Can not convert max number of threads value (" +
 
1120
                       maxNumOfThreads + ") into a number.");
 
1121
         }
 
1122
      }
 
1123
 
 
1124
      String param = (String) configuration.get(MAX_ONEWAY_THREAD_POOL_QUEUE_SIZE);
 
1125
      
 
1126
      if (param != null && param.length() > 0)
 
1127
      {
 
1128
         try
 
1129
         {
 
1130
            maxOnewayThreadPoolQueueSize = Integer.parseInt((String) param);
 
1131
         }
 
1132
         catch (NumberFormatException  e)
 
1133
         {
 
1134
            log.error("maxOnewayThreadPoolQueueSize parameter has invalid format: " + param);
 
1135
         }
 
1136
      }
 
1137
      
 
1138
      onewayThreadPoolClass = (String)config.get(ONEWAY_THREAD_POOL_CLASS_KEY);
 
1139
 
 
1140
      // get timeout config
 
1141
      String timeoutPeriod = (String)config.get(TIMEOUT);
 
1142
      if(timeoutPeriod != null && timeoutPeriod.length() > 0)
 
1143
      {
 
1144
         try
 
1145
         {
 
1146
            timeout = Integer.parseInt(timeoutPeriod);
 
1147
         }
 
1148
         catch(NumberFormatException e)
 
1149
         {
 
1150
            throw new InvalidConfigurationException("Can not set timeout because can not " +
 
1151
               "convert give value (" + timeoutPeriod + ") to a number.");
 
1152
         }
 
1153
      }
 
1154
 
 
1155
      // config for client lease period
 
1156
      String clientLeasePeriod = (String)config.get(CLIENT_LEASE_PERIOD);
 
1157
      if(clientLeasePeriod != null)
 
1158
      {
 
1159
         try
 
1160
         {
 
1161
            long leasePeriodValue = Long.parseLong(clientLeasePeriod);
 
1162
            setLeasePeriod(leasePeriodValue);
 
1163
         }
 
1164
         catch(NumberFormatException e)
 
1165
         {
 
1166
            throw new InvalidConfigurationException("Can not set client lease period because " +
 
1167
               "can not convert given value (" + clientLeasePeriod + ") to a number.");
 
1168
         }
 
1169
      }
 
1170
      
 
1171
      // config for useClientConnectionIdentity
 
1172
      String useClientConnectionIdentityString = (String)config.get(Remoting.USE_CLIENT_CONNECTION_IDENTITY);
 
1173
      if(useClientConnectionIdentityString != null)
 
1174
      {
 
1175
         useClientConnectionIdentity = Boolean.valueOf(useClientConnectionIdentityString).booleanValue();
 
1176
      }
 
1177
      
 
1178
      // Inject ConnectionListener
 
1179
      String connectionListener = (String)config.get(CONNECTION_LISTENER);
 
1180
      if (connectionListener != null)
 
1181
      {
 
1182
         setConnectionListener(connectionListener);
 
1183
      }
 
1184
      
 
1185
      String registerCallbackListenersString = (String)config.get(REGISTER_CALLBACK_LISTENER);
 
1186
      if(registerCallbackListenersString != null)
 
1187
      {
 
1188
         registerCallbackListeners = Boolean.valueOf(registerCallbackListenersString).booleanValue();
 
1189
      }
 
1190
 
 
1191
      createServerSocketFactory();
 
1192
      
 
1193
      // need to check invoker locator to see if need to provide binding address (in the case 0.0.0.0 was used)
 
1194
      final InvokerLocator originalLocator = locator;
 
1195
      locator = InvokerLocator.validateLocator(locator);
 
1196
      if (!locator.getLocatorURI().equals(originalLocator.getLocatorURI())) {
 
1197
         log.debug(this + " original locator: " + originalLocator);
 
1198
         log.debug(this + " new locator:      " + locator);
 
1199
      }
 
1200
      
 
1201
      // need to update the locator key used in the invoker registry
 
1202
      AccessController.doPrivileged( new PrivilegedAction()
 
1203
      {
 
1204
         public Object run()
 
1205
         {
 
1206
            InvokerRegistry.updateServerInvokerLocator(originalLocator, locator);
 
1207
            return null;
 
1208
         }
 
1209
      });
 
1210
   }
 
1211
   
 
1212
   protected void setupHomes(Map config) throws Exception
 
1213
   {
 
1214
      // First try to find  address(es) using the new multihome facility.
 
1215
      if (locator.isMultihome())
 
1216
      {
 
1217
         connectHomes = locator.getConnectHomeList();
 
1218
         Object o = config.get(InvokerLocator.CONNECT_HOMES_KEY);
 
1219
         if (o != null)
 
1220
         {
 
1221
            if (o instanceof Collection)
 
1222
               connectHomes.addAll((Collection) o);
 
1223
            else if (o instanceof String)
 
1224
               connectHomes.addAll(createHomeCollection((String) o));
 
1225
            else
 
1226
               log.warn(this + ": " + InvokerLocator.CONNECT_HOMES_KEY + " must be a collection or String: " + o);
 
1227
         }
 
1228
 
 
1229
         homes = locator.getHomeList();
 
1230
         o = config.get(InvokerLocator.HOMES_KEY);
 
1231
         if (o != null)
 
1232
         {
 
1233
            if (o instanceof Collection)
 
1234
               homes.addAll((Collection) o);
 
1235
            else if (o instanceof String)
 
1236
               homes.addAll(createHomeCollection((String) o));
 
1237
            else
 
1238
               log.warn(this + ": " + InvokerLocator.HOMES_KEY + " must be a collection or String: " + o);
 
1239
         }
 
1240
 
 
1241
         if (!homes.isEmpty() && connectHomes.isEmpty())
 
1242
            connectHomes.addAll(homes);
 
1243
 
 
1244
         assignPorts();
 
1245
         return;
 
1246
      }
 
1247
      
 
1248
      // If no bind address(es) found, try the old way.
 
1249
      String locatorhost = locator.getHost();
 
1250
      InetAddress addr = null;
 
1251
      if(locatorhost != null)
 
1252
      {
 
1253
         addr = getAddressByName(locatorhost);
 
1254
      }
 
1255
      else
 
1256
      {
 
1257
         addr = getLocalHost();
 
1258
      }
 
1259
 
 
1260
      int port = locator.getPort();
 
1261
      if(port <= 0)
 
1262
      {
 
1263
         port = assignPort();
 
1264
      }
 
1265
 
 
1266
      // set the bind address
 
1267
      serverBindAddress = (String)config.get(SERVER_BIND_ADDRESS_KEY);
 
1268
      clientConnectAddress = (String)config.get(CLIENT_CONNECT_ADDRESS_KEY);
 
1269
      if(serverBindAddress == null)
 
1270
      {
 
1271
         if(clientConnectAddress != null)
 
1272
         {
 
1273
            // can't use uri address, as is for client only
 
1274
            serverBindAddress = getLocalHost().getHostAddress();
 
1275
         }
 
1276
         else
 
1277
         {
 
1278
            serverBindAddress = addr.getHostAddress();
 
1279
         }
 
1280
      }
 
1281
 
 
1282
      // set the bind port
 
1283
      String serverBindPortString = (String)config.get(SERVER_BIND_PORT_KEY);
 
1284
      String clientConnectPortString = (String)config.get(CLIENT_CONNECT_PORT_KEY);
 
1285
      if(clientConnectPortString != null)
 
1286
      {
 
1287
         try
 
1288
         {
 
1289
            clientConnectPort = Integer.parseInt(clientConnectPortString);
 
1290
         }
 
1291
         catch(NumberFormatException e)
 
1292
         {
 
1293
            throw new InvalidConfigurationException("Can not set client bind port because can " +
 
1294
               "not convert given value (" + clientConnectPortString + ") to a number.");
 
1295
         }
 
1296
      }
 
1297
      if(serverBindPortString != null)
 
1298
      {
 
1299
         try
 
1300
         {
 
1301
            serverBindPort = Integer.parseInt(serverBindPortString);
 
1302
            if(serverBindPort <= 0)
 
1303
            {
 
1304
               serverBindPort = assignPort();
 
1305
            }
 
1306
 
 
1307
         }
 
1308
         catch(NumberFormatException e)
 
1309
         {
 
1310
            throw new InvalidConfigurationException("Can not set server bind port because can " +
 
1311
               "not convert given value (" + serverBindPortString + ") to a number.");
 
1312
         }
 
1313
      }
 
1314
      else
 
1315
      {
 
1316
         if(clientConnectPort > 0)
 
1317
         {
 
1318
            // can't use uri port, as is for client only
 
1319
            serverBindPort = PortUtil.findFreePort(locator.getHost());
 
1320
         }
 
1321
         else
 
1322
         {
 
1323
            serverBindPort = port;
 
1324
         }
 
1325
      }
 
1326
      
 
1327
      Home h = new Home(serverBindAddress, serverBindPort);
 
1328
      homes.add(h);
 
1329
      connectHomes.add(h);
 
1330
   }
 
1331
   
 
1332
   protected Collection createHomeCollection(String s)
 
1333
   {
 
1334
      ArrayList homes = new ArrayList();
 
1335
      StringTokenizer st = new StringTokenizer(s, "!");
 
1336
      
 
1337
      while (st.hasMoreTokens())
 
1338
      {
 
1339
         String token = st.nextToken();
 
1340
         int p = token.indexOf(':');
 
1341
         String host = null;
 
1342
         String portString = null;
 
1343
         if (p < 0)
 
1344
         {
 
1345
            host = token;
 
1346
            portString = "-1";
 
1347
         }
 
1348
         else
 
1349
         {
 
1350
            host = token.substring(0, p);
 
1351
            portString = token.substring(p + 1);
 
1352
         }
 
1353
 
 
1354
         int port = -1;
 
1355
         try
 
1356
         {
 
1357
            port = Integer.parseInt(portString);
 
1358
         }
 
1359
         catch (Exception e)
 
1360
         {
 
1361
            log.warn("invalid port value in Home: " + token + ", using -1");
 
1362
         }
 
1363
         Home home = new Home(host, port);
 
1364
         homes.add(home);
 
1365
      }
 
1366
      
 
1367
      return homes;
 
1368
   }
 
1369
   
 
1370
   protected int assignPort() throws IOException
 
1371
   {
 
1372
      int port;
 
1373
      port = PortUtil.findFreePort(locator.getHost());
 
1374
 
 
1375
      // re-write locator since the port is different
 
1376
      final InvokerLocator newLocator = new InvokerLocator(locator.getProtocol(), locator.getHost(), port,
 
1377
                                                     locator.getPath(), locator.getParameters());
 
1378
 
 
1379
      // need to update the locator key used in the invoker registry
 
1380
      AccessController.doPrivileged( new PrivilegedAction()
 
1381
      {
 
1382
         public Object run()
 
1383
         {
 
1384
            InvokerRegistry.updateServerInvokerLocator(locator, newLocator);
 
1385
            return null;
 
1386
         }
 
1387
      });
 
1388
      
 
1389
      this.locator = newLocator;
 
1390
      return port;
 
1391
   }
 
1392
   
 
1393
   protected void assignPorts() throws IOException
 
1394
   {  
 
1395
      boolean changed = false;
 
1396
      List list = homes;
 
1397
      for (int i = 0; i < list.size(); i++)
 
1398
      {
 
1399
         Home home = (Home) list.get(i);
 
1400
         if (home.port < 0)
 
1401
         {
 
1402
            changed = true;
 
1403
            try
 
1404
            {
 
1405
               home.port = PortUtil.findFreePort(home.host);
 
1406
            }
 
1407
            catch (Exception e)
 
1408
            {
 
1409
               if (trace) log.trace(this + " unable to find free port for: " + home.host);
 
1410
            }
 
1411
         }
 
1412
      }
 
1413
      if (changed)
 
1414
         locator.setHomeList(homes);
 
1415
      
 
1416
      if (connectHomes.size() == homes.size())
 
1417
      {
 
1418
         changed = false;
 
1419
         for (int i = 0; i < homes.size(); i++)
 
1420
         {
 
1421
            Home home = (Home) connectHomes.get(i);
 
1422
            if (home.port < 0)
 
1423
            {
 
1424
               changed = true;
 
1425
               home.port = ((Home) homes.get(i)).port;
 
1426
            }
 
1427
         }
 
1428
         if (changed)
 
1429
            locator.setConnectHomeList(connectHomes);
 
1430
      }
 
1431
   }
 
1432
 
 
1433
   protected ServerSocketFactory createServerSocketFactory() throws IOException
 
1434
   {
 
1435
      // only want to look at config if server socket factory has not already been set
 
1436
      if(serverSocketFactory == null)
 
1437
      {
 
1438
         Object obj = configuration.get(Remoting.CUSTOM_SERVER_SOCKET_FACTORY);
 
1439
         if (obj != null)
 
1440
         {
 
1441
            if (obj instanceof ServerSocketFactory)
 
1442
            {
 
1443
               serverSocketFactory = (ServerSocketFactory)obj;
 
1444
            }
 
1445
            else
 
1446
            {
 
1447
               throw new RuntimeException("Can not set custom server socket factory (" + obj +
 
1448
                                          ") as is not of type javax.net.SocketFactory");
 
1449
            }
 
1450
         }
 
1451
 
 
1452
         if (serverSocketFactory == null)
 
1453
         {
 
1454
            // TODO: -TME This is another big hack because of dependancy on JMX within configuration
 
1455
            // Have to wait till the mbean server is set before can actually set the server socket
 
1456
            // factory since it is an mbean (new server's DI will fix all this). Would prefer this
 
1457
            // to be in the setup() method...
 
1458
            // Also, I can't cast the mbean proxy directly to ServerSocketFactory because it is not
 
1459
            // an interface. Therefore, have to require that ServerSocketFactoryMBean is used.
 
1460
 
 
1461
            String serverSocketFactoryString = (String)configuration.get(SERVER_SOCKET_FACTORY);
 
1462
            if(serverSocketFactoryString != null && serverSocketFactoryString.length() > 0)
 
1463
            {
 
1464
               try
 
1465
               {
 
1466
                  if(serverSocketFactoryString != null)
 
1467
                  {
 
1468
                     MBeanServer server = getMBeanServer();
 
1469
                     ObjectName serverSocketFactoryObjName =
 
1470
                        new ObjectName(serverSocketFactoryString);
 
1471
 
 
1472
                     if(server != null)
 
1473
                     {
 
1474
                        try
 
1475
                        {
 
1476
                           ServerSocketFactoryMBean serverSocketFactoryMBean =
 
1477
                              (ServerSocketFactoryMBean)MBeanServerInvocationHandler.
 
1478
                                 newProxyInstance(server, serverSocketFactoryObjName,
 
1479
                                                  ServerSocketFactoryMBean.class, false);
 
1480
                           serverSocketFactory =
 
1481
                              new ServerSocketFactoryWrapper(serverSocketFactoryMBean);
 
1482
                        }
 
1483
                        catch(Exception e)
 
1484
                        {
 
1485
                           log.debug("Error creating mbean proxy for server socket factory " +
 
1486
                              "for object name " + serverSocketFactoryObjName + ". " +
 
1487
                              "Will try by class name.");
 
1488
                        }
 
1489
                     }
 
1490
                     else
 
1491
                     {
 
1492
                        log.debug("The 'serverSocketFactory' attribute was set with a value, " +
 
1493
                           "but the MBeanServer reference is null.");
 
1494
                     }
 
1495
                  }
 
1496
               }
 
1497
               catch(MalformedObjectNameException e)
 
1498
               {
 
1499
                  log.debug("Attibute value (" + serverSocketFactoryString + ") passed is not a " +
 
1500
                     "valid ObjectName. Can not look up if is a mbean service. Will try by classname.");
 
1501
               }
 
1502
               catch(NullPointerException e)
 
1503
               {
 
1504
                  log.debug("Could not set up the server socket factory as a mbean service " +
 
1505
                     "due to null pointer exception.");
 
1506
               }
 
1507
 
 
1508
               // couldn't create from object name for mbean service, will try by class name
 
1509
               if(serverSocketFactory == null)
 
1510
               {
 
1511
                  try
 
1512
                  {
 
1513
                     Class cl = ClassLoaderUtility.loadClass(serverSocketFactoryString, getClass());
 
1514
 
 
1515
                     Constructor serverSocketConstructor = null;
 
1516
                     serverSocketConstructor = cl.getConstructor(new Class[]{});
 
1517
                     serverSocketFactory =
 
1518
                        (ServerSocketFactory)serverSocketConstructor.newInstance(new Object[] {});
 
1519
                     log.trace("ServerSocketFactory (" + serverSocketFactoryString + ") loaded");
 
1520
                  }
 
1521
                  catch(Exception e)
 
1522
                  {
 
1523
                     log.debug("Could not create server socket factory by classname (" +
 
1524
                        serverSocketFactoryString + ").  Error message: " + e.getMessage());
 
1525
                  }
 
1526
               }
 
1527
            }
 
1528
         }
 
1529
      }
 
1530
 
 
1531
      if (serverSocketFactory == null && needsCustomSSLConfiguration(configuration))
 
1532
      {
 
1533
         try
 
1534
         {
 
1535
            SSLSocketBuilder socketBuilder = new SSLSocketBuilder(configuration);
 
1536
            socketBuilder.setUseSSLServerSocketFactory(false);
 
1537
            serverSocketFactory = socketBuilder.createSSLServerSocketFactory();
 
1538
         }
 
1539
         catch (IOException e)
 
1540
         {
 
1541
            throw new RuntimeException("Unable to create customized SSL socket factory", e);
 
1542
         }
 
1543
      }
 
1544
 
 
1545
      if(serverSocketFactory == null)
 
1546
      {
 
1547
         log.debug(this + " did not find server socket factory configuration as mbean service " +
 
1548
            "or classname. Creating default server socket factory.");
 
1549
 
 
1550
         serverSocketFactory = getDefaultServerSocketFactory();
 
1551
      }
 
1552
 
 
1553
      log.debug(this + " created server socket factory " + serverSocketFactory);
 
1554
 
 
1555
      serverSocketFactory = wrapServerSocketFactory(serverSocketFactory, configuration);
 
1556
      return serverSocketFactory;
 
1557
 
 
1558
   }
 
1559
 
 
1560
   protected boolean justNeedsSSLClientMode(Map configuration)
 
1561
   {
 
1562
      if (configuration.size() == 1 &&
 
1563
         configuration.containsKey(SSLSocketBuilder.REMOTING_SERVER_SOCKET_USE_CLIENT_MODE))
 
1564
      {
 
1565
         String useClientModeString =
 
1566
            (String)configuration.get(SSLSocketBuilder.REMOTING_SERVER_SOCKET_USE_CLIENT_MODE);
 
1567
         return Boolean.valueOf(useClientModeString).booleanValue();
 
1568
      }
 
1569
 
 
1570
      if (configuration.size() == 1 &&
 
1571
         configuration.containsKey(SSLSocketBuilder.REMOTING_SOCKET_USE_CLIENT_MODE))
 
1572
      {
 
1573
         String useClientModeString =
 
1574
            (String)configuration.get(SSLSocketBuilder.REMOTING_SOCKET_USE_CLIENT_MODE);
 
1575
         return Boolean.valueOf(useClientModeString).booleanValue();
 
1576
      }
 
1577
 
 
1578
      if (configuration.size() == 2
 
1579
            && configuration.containsKey(SSLSocketBuilder.REMOTING_SERVER_SOCKET_USE_CLIENT_MODE)
 
1580
            && configuration.containsKey(SSLSocketBuilder.REMOTING_SOCKET_USE_CLIENT_MODE))
 
1581
      {
 
1582
         String useClientModeString =
 
1583
            (String)configuration.get(SSLSocketBuilder.REMOTING_SERVER_SOCKET_USE_CLIENT_MODE);
 
1584
         return Boolean.valueOf(useClientModeString).booleanValue();
 
1585
      }
 
1586
 
 
1587
      return false;
 
1588
   }
 
1589
 
 
1590
   /**
 
1591
    * Gets the default server socket factory to use for the server invoker. The intention is this
 
1592
    * method will be overridden by sub-classes for their specific defaults.
 
1593
    */
 
1594
   protected ServerSocketFactory getDefaultServerSocketFactory() throws IOException
 
1595
   {
 
1596
      return ServerSocketFactory.getDefault();
 
1597
   }
 
1598
 
 
1599
   protected ServerSocketFactory wrapServerSocketFactory(ServerSocketFactory ssf, Map config)
 
1600
   {
 
1601
      if (config == null)
 
1602
      {
 
1603
         return ssf;
 
1604
      }
 
1605
 
 
1606
      Object o = config.get(Remoting.SOCKET_CREATION_SERVER_LISTENER);
 
1607
 
 
1608
      if (o == null)
 
1609
      {
 
1610
         return ssf;
 
1611
      }
 
1612
 
 
1613
      if (o instanceof SocketCreationListener)
 
1614
      {
 
1615
         return new CreationListenerServerSocketFactory(ssf, (SocketCreationListener) o);
 
1616
      }
 
1617
      else if (o instanceof String)
 
1618
      {
 
1619
         try
 
1620
         {
 
1621
            Class c = ClassLoaderUtility.loadClass((String) o, ServerInvoker.class);
 
1622
            SocketCreationListener listener = (SocketCreationListener)c.newInstance();
 
1623
            return new CreationListenerServerSocketFactory(ssf, listener);
 
1624
         }
 
1625
         catch (Exception e)
 
1626
         {
 
1627
            log.warn("unable to instantiate class: " + o, e);
 
1628
            return ssf;
 
1629
         }
 
1630
      }
 
1631
      else
 
1632
      {
 
1633
         log.warn("unrecognized type for socket creation server listener: " + o);
 
1634
         return ssf;
 
1635
      }
 
1636
   }
 
1637
 
 
1638
   /**
 
1639
    * Handles both internal and external invocations (internal meaning only to be used within
 
1640
    * remoting and external for ones that go to handlers.
 
1641
    */
 
1642
   protected Object handleInternalInvocation(InternalInvocation param,
 
1643
                                             InvocationRequest invocation,
 
1644
                                             ServerInvocationHandler handler) throws Throwable
 
1645
   {
 
1646
      Object result = null;
 
1647
      String methodName = param.getMethodName();
 
1648
 
 
1649
      if(trace) { log.trace("handling InternalInvocation where method name = " + methodName); }
 
1650
 
 
1651
      // check if the invocation is for callback handling
 
1652
      if(InternalInvocation.HANDLECALLBACK.equals(methodName))
 
1653
      {
 
1654
         String sessionId = ServerInvokerCallbackHandler.getId(invocation);
 
1655
         if(trace) { log.trace("ServerInvoker (" + this + ") is being asked to deliver callback on client callback handler with session id of " + sessionId + "."); }
 
1656
 
 
1657
         CallbackContainer callbackContainer = null;
 
1658
         
 
1659
         if (singleCallbackContainer != null)
 
1660
         {
 
1661
            callbackContainer = singleCallbackContainer;
 
1662
         }
 
1663
         else
 
1664
         {         
 
1665
            callbackContainer = (CallbackContainer)clientCallbackListener.get(sessionId);
 
1666
         }
 
1667
 
 
1668
         if(callbackContainer != null && callbackContainer.getCallbackHandler() != null)
 
1669
         {
 
1670
            Object[] params = param.getParameters();
 
1671
            
 
1672
            Callback callbackRequest = (Callback) params[0];
 
1673
                                    
 
1674
            Object obj = callbackContainer.getCallbackHandleObject();
 
1675
            
 
1676
            if (obj != null)
 
1677
            {
 
1678
               Map callbackHandleObject = callbackRequest.getReturnPayload();
 
1679
               
 
1680
               if(callbackHandleObject == null)
 
1681
               {
 
1682
                  callbackHandleObject = new HashMap();
 
1683
               }
 
1684
                              
 
1685
               //We only want to add it if it is not null otherwise is a redundant operation
 
1686
               callbackHandleObject.put(Callback.CALLBACK_HANDLE_OBJECT_KEY,
 
1687
                                        obj);
 
1688
               
 
1689
               callbackRequest.setReturnPayload(callbackHandleObject);
 
1690
            }
 
1691
            
 
1692
            InvokerCallbackHandler callbackHandler = callbackContainer.getCallbackHandler();
 
1693
            
 
1694
            callbackHandler.handleCallback(callbackRequest);
 
1695
         }
 
1696
         else
 
1697
         {
 
1698
            log.error("Could not find callback handler to call upon for handleCallback " +
 
1699
                      "where session id equals " + sessionId);
 
1700
         }
 
1701
      }
 
1702
      else if(InternalInvocation.ADDLISTENER.equals(methodName))
 
1703
      {
 
1704
         if(handler == null)
 
1705
         {
 
1706
            throw new InvalidConfigurationException(
 
1707
               "Can not accept a callback listener since there are no ServerInvocationHandlers " +
 
1708
               "registered. Please add via xml configuration or via the Connector's " +
 
1709
               "addInvocationHandler() method.");
 
1710
 
 
1711
         }
 
1712
         ServerInvokerCallbackHandler callbackHandler = getCallbackHandler(invocation);
 
1713
         if (registerCallbackListeners)
 
1714
         {
 
1715
            connectionNotifier.addListenerFirst(callbackHandler);
 
1716
            if(leasePeriod > 0)
 
1717
            {
 
1718
               leaseManagement = true;
 
1719
            }
 
1720
         }
 
1721
         handler.addListener(callbackHandler);
 
1722
      }
 
1723
      else if(InternalInvocation.REMOVELISTENER.equals(methodName))
 
1724
      {
 
1725
         ServerInvokerCallbackHandler callbackHandler = removeCallbackHandler(invocation);
 
1726
         if(callbackHandler != null)
 
1727
         {
 
1728
            if (registerCallbackListeners)
 
1729
            {
 
1730
//               connectionNotifier.removeListener(callbackHandler);
 
1731
               removeConnectionListener(callbackHandler);
 
1732
            }
 
1733
            
 
1734
            callbackHandler.destroy();
 
1735
            
 
1736
            if(handler == null)
 
1737
            {
 
1738
               throw new InvalidConfigurationException(
 
1739
                  "Can not remove a callback listener since there are no ServerInvocationHandlers " +
 
1740
                  "registered.  Please add via xml configuration or via the Connector's " +
 
1741
                  "addInvocationHandler() method.");
 
1742
            }
 
1743
            
 
1744
            handler.removeListener(callbackHandler);
 
1745
 
 
1746
            if(trace) { log.trace("ServerInvoker (" + this + ") removing server callback handler " + callbackHandler + "."); }
 
1747
         }
 
1748
         else
 
1749
         {
 
1750
            String sessionId = ServerInvokerCallbackHandler.getId(invocation);
 
1751
            throw new RuntimeException("Can not remove callback listener from target server with " +
 
1752
               "id of " + sessionId + " as it does not exist as a registered callback listener.");
 
1753
         }
 
1754
      }
 
1755
      else if(InternalInvocation.GETCALLBACKS.equals(methodName))
 
1756
      {
 
1757
         ServerInvokerCallbackHandler callbackHandler = getCallbackHandler(invocation);
 
1758
         if(trace) { log.trace("ServerInvoker (" + this + ") getting callbacks for callback handler " + callbackHandler + "."); }
 
1759
         result = callbackHandler.getCallbacks(invocation.getRequestPayload());
 
1760
      }
 
1761
      else if(InternalInvocation.ACKNOWLEDGECALLBACK.equals(methodName))
 
1762
      {
 
1763
         ServerInvokerCallbackHandler callbackHandler = getCallbackHandler(invocation);
 
1764
         if(trace) { log.trace("ServerInvoker (" + this + ") acknowledge callback on callback handler " + callbackHandler + "."); }
 
1765
         callbackHandler.acknowledgeCallbacks(param);
 
1766
      }
 
1767
      else if(InternalInvocation.ADDCLIENTLISTENER.equals(methodName))
 
1768
      {
 
1769
         String sessionId = ServerInvokerCallbackHandler.getId(invocation);
 
1770
         Object[] params = param.getParameters();
 
1771
 
 
1772
         // the only elements should be the callback handler and possibly the callback handle object
 
1773
         if(params == null || params.length < 0 || params.length > 3)
 
1774
         {
 
1775
            log.debug("Received addClientListener InternalInvocation, but getParameters() " +
 
1776
                      "returned: " + params);
 
1777
            throw new RuntimeException(
 
1778
               "InvokerCallbackHandler and callback handle object (optional) must be supplied as " +
 
1779
               "the only parameter objects within the InternalInvocation when calling " +
 
1780
               "addClientListener.");
 
1781
         }
 
1782
 
 
1783
         InvokerCallbackHandler callbackHandler = (InvokerCallbackHandler)params[0];
 
1784
         Object callbackHandleObject = params[1];
 
1785
         CallbackContainer callbackContainer =
 
1786
            new CallbackContainer(callbackHandler, callbackHandleObject);                          
 
1787
         
 
1788
         clientCallbackListener.put(sessionId, callbackContainer);
 
1789
         
 
1790
         //If there is only one CallbackContainer we store a direct reference to it to avoid
 
1791
         //unnecessary lookups - TLF
 
1792
         if (clientCallbackListener.size() == 1)
 
1793
         {
 
1794
            singleCallbackContainer = callbackContainer;
 
1795
         }
 
1796
         else
 
1797
         {
 
1798
            singleCallbackContainer = null;
 
1799
         }
 
1800
 
 
1801
         log.debug("ServerInvoker (" + this + ") added client callback handler " + callbackHandler +
 
1802
            " with session id of " + sessionId + " and callback handle object of " +
 
1803
            callbackHandleObject + ".");
 
1804
 
 
1805
      }
 
1806
      else if(InternalInvocation.REMOVECLIENTLISTENER.equals(methodName))
 
1807
      {
 
1808
         String sessionId = ServerInvokerCallbackHandler.getId(invocation);
 
1809
 
 
1810
         log.debug("ServerInvoker (" + this + ") removing client callback handler with session " +
 
1811
            "id of " + sessionId + ".");
 
1812
 
 
1813
         Object cbo = clientCallbackListener.remove(sessionId);
 
1814
         if(cbo == null)
 
1815
         {
 
1816
            throw new RuntimeException(
 
1817
               "Can not remove callback listener from callback server with id of " + sessionId +
 
1818
               " as it does not exist as a registered callback listener.");
 
1819
         }
 
1820
         //If there is only one CallbackContainer we store a direct reference to it to avoid
 
1821
         //unnecessary lookups - TLF
 
1822
         if (clientCallbackListener.size() == 1)
 
1823
         {
 
1824
            singleCallbackContainer =
 
1825
               (CallbackContainer)clientCallbackListener.values().iterator().next();
 
1826
         }
 
1827
         else
 
1828
         {
 
1829
            singleCallbackContainer = null;
 
1830
         }
 
1831
 
 
1832
      }
 
1833
      
 
1834
      else if(InternalInvocation.ADDSTREAMCALLBACK.equals(methodName))
 
1835
      {
 
1836
         StreamHandler streamHandler = getStreamHandler(invocation);
 
1837
         if(handler instanceof StreamInvocationHandler)
 
1838
         {
 
1839
            InternalInvocation inv = (InternalInvocation)invocation.getParameter();
 
1840
            // second parameter should be the param payload
 
1841
            result = ((StreamInvocationHandler)handler).
 
1842
               handleStream(streamHandler, (InvocationRequest)inv.getParameters()[1]);
 
1843
         }
 
1844
         else
 
1845
         {
 
1846
            log.debug("Client request is an InputStream, but the registered handlers do not " +
 
1847
                      "implement the StreamInvocationHandler interface, so could not process call.");
 
1848
            throw new RuntimeException(
 
1849
               "No handler registered of proper type (StreamInvocationHandler).");
 
1850
         }
 
1851
      }
 
1852
      else if (InternalInvocation.ECHO.equals(methodName))
 
1853
      {
 
1854
         Object response = null;
 
1855
         Object[] objects = param.getParameters();
 
1856
         if (objects != null && objects.length > 0)
 
1857
            response = objects[0];
 
1858
         
 
1859
         if (trace)
 
1860
         {
 
1861
            log.trace(this + " echoing " + response);
 
1862
         }
 
1863
         return response;
 
1864
      }
 
1865
      else
 
1866
      {
 
1867
         log.debug("Error processing InternalInvocation.  Unable to process method " +
 
1868
                   methodName + ". Please make sure this should be an InternalInvocation.");
 
1869
         throw new RuntimeException(
 
1870
            "Error processing InternalInvocation. Unable to process method " + methodName);
 
1871
      }
 
1872
      return result;
 
1873
   }
 
1874
  
 
1875
   protected ServerInvocationHandler findInvocationHandler(String subsystem)
 
1876
   {
 
1877
      ServerInvocationHandler handler = null;
 
1878
 
 
1879
      if (singleHandler != null)
 
1880
      {
 
1881
         handler = singleHandler;
 
1882
      }
 
1883
      else
 
1884
      {               
 
1885
         if (subsystem != null)
 
1886
         {
 
1887
            handler = (ServerInvocationHandler)handlers.get(subsystem.toUpperCase());
 
1888
         }
 
1889
         else
 
1890
         {
 
1891
            // subsystem not specified, so will hope for a default one being set
 
1892
            if (!handlers.isEmpty())
 
1893
            {
 
1894
               if (trace) { log.trace(this + " handling invocation with no subsystem explicitely specified, using the default handler"); }
 
1895
               handler = (ServerInvocationHandler)handlers.values().iterator().next();
 
1896
            }
 
1897
         }
 
1898
      }
 
1899
      return handler;
 
1900
   }
 
1901
   
 
1902
   /**
 
1903
    * Called prior to an invocation.
 
1904
    * TODO is sending in the arg appropriate?
 
1905
    */
 
1906
   protected void preProcess(String sessionId, ClassBytes arg, Map payload, InvokerLocator locator)
 
1907
   {
 
1908
   }
 
1909
 
 
1910
   /**
 
1911
    * Called after an invocation.
 
1912
    * TODO is sending in the arg appropriate?
 
1913
    */
 
1914
   protected void postProcess(String sessionId, Object param, Map payload, InvokerLocator locator)
 
1915
   {
 
1916
   }
 
1917
 
 
1918
   // Private --------------------------------------------------------------------------------------
 
1919
 
 
1920
   private ThreadPool createThreadPoolProxy(ObjectName objName)
 
1921
   {
 
1922
      ThreadPool pool;
 
1923
      MBeanServer server = getMBeanServer();
 
1924
      if(server != null)
 
1925
      {
 
1926
         ThreadPoolMBean poolMBean = (ThreadPoolMBean)MBeanServerInvocationHandler.
 
1927
            newProxyInstance(server, objName, ThreadPoolMBean.class, false);
 
1928
 
 
1929
         pool = poolMBean.getInstance();
 
1930
      }
 
1931
      else
 
1932
      {
 
1933
         throw new RuntimeException("Can not register MBean ThreadPool as the ServerInvoker " +
 
1934
            "has not been registered with a MBeanServer.");
 
1935
      }
 
1936
      return pool;
 
1937
   }
 
1938
 
 
1939
   //TODO: -TME getting of datatype is duplicated in both the RemoteClientInvoker and the ServerInvoker
 
1940
   private String getDataType(InvokerLocator locator)
 
1941
   {
 
1942
      String type = null;
 
1943
 
 
1944
      if(locator != null)
 
1945
      {
 
1946
         Map params = locator.getParameters();
 
1947
         if(params != null)
 
1948
         {
 
1949
            type = (String) params.get(InvokerLocator.DATATYPE);
 
1950
         }
 
1951
      }
 
1952
      return type;
 
1953
   }
 
1954
 
 
1955
   private void terminateLease(InvocationRequest invocation)
 
1956
   {
 
1957
      if (invocation != null)
 
1958
      {
 
1959
         // clientSessionId == MicroRemoteClientInvoker.invokerSessionID.
 
1960
         String clientSessionId = invocation.getSessionId();
 
1961
         Lease clientLease = (Lease)clientLeases.get(clientSessionId);
 
1962
 
 
1963
         if (clientLease != null)
 
1964
         {
 
1965
            boolean clientOnlyTerminated = false;
 
1966
            // now have to determine if is just Client that disconnected
 
1967
            // or if all Clients disconnected, thus the client invoker
 
1968
            // is also disconnected as well.
 
1969
            Map reqMap = invocation.getRequestPayload();
 
1970
            if (reqMap != null)
 
1971
            {
 
1972
               Object holderObj = reqMap.get(ClientHolder.CLIENT_HOLDER_KEY);
 
1973
               if (holderObj != null && holderObj instanceof ClientHolder)
 
1974
               {
 
1975
                  // just a client that disconnected, so only need to terminate lease for
 
1976
                  // that particular client (by client session id).
 
1977
                  if (trace) log.trace("terminating client lease: " + clientSessionId);
 
1978
                  ClientHolder holder = (ClientHolder) holderObj;
 
1979
                  clientLease.terminateLease(holder.getSessionId());
 
1980
                  clientOnlyTerminated = true;
 
1981
               }
 
1982
            }
 
1983
 
 
1984
            // now see if client invoker needs to be terminated
 
1985
            if (!clientOnlyTerminated)
 
1986
            {
 
1987
               if (trace) log.trace("terminating invoker lease: " + clientSessionId);
 
1988
               clientLease.terminateLease(clientSessionId);
 
1989
               clientLeases.remove(clientSessionId);
 
1990
            }
 
1991
         }
 
1992
         else
 
1993
         {
 
1994
             String type = "invoker";
 
1995
                    Map reqMap = invocation.getRequestPayload();
 
1996
             if (reqMap != null)
 
1997
             {
 
1998
                Object holderObj = reqMap.get(ClientHolder.CLIENT_HOLDER_KEY);
 
1999
                if (holderObj != null && holderObj instanceof ClientHolder)
 
2000
                {
 
2001
                        type = "client";
 
2002
                }
 
2003
             }
 
2004
             log.debug("Asked to terminate " + type + " lease for invoker session id "
 
2005
                       + clientSessionId + ", but lease for this id could not be found." +"" +
 
2006
                       "Probably has been removed due to connection failure.");
 
2007
         }
 
2008
      }
 
2009
   }
 
2010
 
 
2011
   private void updateClientLease(InvocationRequest invocation)
 
2012
   {
 
2013
      if(invocation != null)
 
2014
      {
 
2015
         String clientSessionId = invocation.getSessionId();
 
2016
         if (invocation.getRequestPayload() != null)
 
2017
         {
 
2018
            // Remove per invocation timeout.
 
2019
            invocation.getRequestPayload().remove(TIMEOUT);
 
2020
         }
 
2021
         if(clientSessionId != null)
 
2022
         {
 
2023
            if(trace) { log.trace("Getting lease for invoker session id: " + clientSessionId); }
 
2024
 
 
2025
            Lease clientLease = (Lease)clientLeases.get(clientSessionId);
 
2026
            if(clientLease == null)
 
2027
            {
 
2028
               Lease newClientLease = new Lease(clientSessionId, leasePeriod,
 
2029
                                                locator.getLocatorURI(),
 
2030
                                                invocation.getRequestPayload(),
 
2031
                                                connectionNotifier,
 
2032
                                                clientLeases);
 
2033
 
 
2034
               clientLeases.put(clientSessionId, newClientLease);
 
2035
               newClientLease.startLease();
 
2036
               
 
2037
               if(trace) { log.trace("No lease established for invoker session id (" + clientSessionId + 
 
2038
                                    "), so starting a new one:" + newClientLease); }
 
2039
            }
 
2040
            else
 
2041
            {
 
2042
               if (useClientConnectionIdentity)
 
2043
               {
 
2044
                  String leasePingerId = (String) invocation.getRequestPayload().get(LeasePinger.LEASE_PINGER_ID);;
 
2045
                  if (leasePingerId == null || leasePingerId.equals(clientLease.getLeasePingerId()))
 
2046
                  {
 
2047
                     // including request payload from invocation as may contain updated list of clients.
 
2048
                     if (trace) log.trace(clientLease + " matches: leasePingerId: " + leasePingerId);
 
2049
                     clientLease.updateLease(leasePeriod, invocation.getRequestPayload());
 
2050
                     if(trace) { log.trace("Updated lease for invoker session id (" + clientSessionId + ")"); }
 
2051
                  }
 
2052
                  else
 
2053
                  {
 
2054
                     if (trace) log.trace(clientLease + " does not match: leasePingerId: " + leasePingerId);
 
2055
                     if (trace) log.trace("terminating invoker lease: " + clientLease);
 
2056
                     clientLease.terminateLeaseUponFailure(clientSessionId);
 
2057
                     clientLeases.remove(clientSessionId);
 
2058
 
 
2059
                     Lease newClientLease = new Lease(clientSessionId, leasePeriod,
 
2060
                                                      locator.getLocatorURI(),
 
2061
                                                      invocation.getRequestPayload(),
 
2062
                                                      connectionNotifier,
 
2063
                                                      clientLeases);
 
2064
 
 
2065
                     clientLeases.put(clientSessionId, newClientLease);
 
2066
                     newClientLease.startLease();
 
2067
 
 
2068
                     if(trace) { log.trace("starting a new lease:" + newClientLease); }
 
2069
                  }
 
2070
               }
 
2071
               else
 
2072
               {
 
2073
                  // including request payload from invocation as may contain updated list of clients.
 
2074
                  clientLease.updateLease(leasePeriod, invocation.getRequestPayload());
 
2075
 
 
2076
                  if(trace) { log.trace("Updated lease for client session id (" + clientSessionId + ")"); }
 
2077
               }
 
2078
            }
 
2079
         }
 
2080
      }
 
2081
   }
 
2082
 
 
2083
   private boolean checkForClientLease(String invokerSessionId)
 
2084
   {
 
2085
      if(leaseManagement && invokerSessionId != null)
 
2086
      {
 
2087
         if(trace) { log.trace("Checking lease for invoker session id: " + invokerSessionId); }
 
2088
 
 
2089
         Lease clientLease = (Lease)clientLeases.get(invokerSessionId);
 
2090
         if(clientLease == null)
 
2091
         {
 
2092
            if(trace) { log.trace("No lease established for invoker session id (" + invokerSessionId + ")"); }
 
2093
            return false;
 
2094
         }
 
2095
         else
 
2096
         {
 
2097
            if(trace) { log.trace("Found lease for invoker session id (" + invokerSessionId + ")"); }
 
2098
            return true;
 
2099
         }
 
2100
      }
 
2101
 
 
2102
      return false;
 
2103
   }
 
2104
 
 
2105
   /**
 
2106
    * Takes the real invocation from the client out of the OnewayInvocation and then executes the
 
2107
    * invoke() with the real invocation on a seperate thread.
 
2108
    */
 
2109
   private void handleOnewayInvocation(OnewayInvocation onewayInvocation,
 
2110
                                       InvocationRequest invocation) throws Throwable
 
2111
   {
 
2112
      Object[] objs = onewayInvocation.getParameters();
 
2113
 
 
2114
      // The oneway invocation should contain the real param as it's only param in parameter array
 
2115
      Object realParam = objs[0];
 
2116
      invocation.setParameter(realParam);
 
2117
 
 
2118
      final InvocationRequest newInvocation = invocation;
 
2119
 
 
2120
      ThreadPool executor = getOnewayThreadPool();
 
2121
      Runnable onewayRun = new Runnable()
 
2122
      {
 
2123
         public void run()
 
2124
         {
 
2125
            try
 
2126
            {
 
2127
               invoke(newInvocation);
 
2128
            }
 
2129
            catch(Throwable e)
 
2130
            {
 
2131
               // throw away exception since can't get it back to original caller
 
2132
               log.error("Error executing server oneway invocation request: " + newInvocation, e);
 
2133
            }
 
2134
         }
 
2135
      };
 
2136
 
 
2137
      if(trace) { log.trace(this + " placing " + invocation + " in onewayThreadPool"); }
 
2138
      executor.run(onewayRun);
 
2139
   }
 
2140
 
 
2141
   private StreamHandler getStreamHandler(InvocationRequest invocation) throws Exception
 
2142
   {
 
2143
      InternalInvocation inv = (InternalInvocation)invocation.getParameter();
 
2144
      String locator = (String)inv.getParameters()[0];
 
2145
      return new StreamHandler(locator);
 
2146
   }
 
2147
 
 
2148
   private ServerInvokerCallbackHandler getCallbackHandler(InvocationRequest invocation)
 
2149
      throws Exception
 
2150
   {
 
2151
      ServerInvokerCallbackHandler callbackHandler = null;
 
2152
      String id = ServerInvokerCallbackHandler.getId(invocation);
 
2153
      synchronized(callbackHandlers)
 
2154
      {
 
2155
         callbackHandler = (ServerInvokerCallbackHandler)callbackHandlers.get(id);
 
2156
 
 
2157
         // if does not exist, create it
 
2158
         if(callbackHandler == null)
 
2159
         {
 
2160
            callbackHandler = new ServerInvokerCallbackHandler(invocation, getLocator(), this);
 
2161
            callbackHandlers.put(id, callbackHandler);
 
2162
         }
 
2163
      }
 
2164
 
 
2165
      callbackHandler.connect();
 
2166
      if(trace) { log.trace("ServerInvoker (" + this + ") adding server callback handler " + callbackHandler + " with id of " + id + "."); }
 
2167
      return callbackHandler;
 
2168
   }
 
2169
 
 
2170
   public ServerInvokerCallbackHandler removeCallbackHandler(InvocationRequest invocation)
 
2171
   {
 
2172
      String id = ServerInvokerCallbackHandler.getId(invocation);
 
2173
      ServerInvokerCallbackHandler callbackHandler = null;
 
2174
 
 
2175
      synchronized(callbackHandlers)
 
2176
      {
 
2177
         callbackHandler = (ServerInvokerCallbackHandler) callbackHandlers.remove(id);
 
2178
      }
 
2179
      log.debug(this + " removed " + callbackHandler);
 
2180
      return callbackHandler;
 
2181
   }
 
2182
   
 
2183
   public void shutdownCallbackHandler(ServerInvokerCallbackHandler callbackHandler, InvocationRequest invocation)
 
2184
   {
 
2185
      removeCallbackHandler(invocation);
 
2186
      if (registerCallbackListeners)
 
2187
      {
 
2188
         removeConnectionListener(callbackHandler);
 
2189
      }
 
2190
      ServerInvocationHandler handler = findInvocationHandler(invocation.getSessionId());
 
2191
      if (handler != null)
 
2192
      {
 
2193
         handler.removeListener(callbackHandler);
 
2194
         if(trace) { log.trace(this + " removing server callback handler " + callbackHandler + "."); }
 
2195
      }
 
2196
      else
 
2197
      {
 
2198
         log.debug(this + " cannot remove " + callbackHandler + ": associated ServerInvocationHandler not longer exists");
 
2199
      }
 
2200
   }
 
2201
 
 
2202
   // Inner classes --------------------------------------------------------------------------------
 
2203
 
 
2204
   public static class InvalidStateException extends Exception
 
2205
   {
 
2206
      public InvalidStateException(String msg)
 
2207
      {
 
2208
         super(msg);
 
2209
      }
 
2210
   }
 
2211
 
 
2212
   private class CallbackContainer
 
2213
   {
 
2214
      private InvokerCallbackHandler handler;
 
2215
      private Object handleObject;
 
2216
 
 
2217
      public CallbackContainer(InvokerCallbackHandler handler, Object handleObject)
 
2218
      {
 
2219
         this.handler = handler;
 
2220
         this.handleObject = handleObject;
 
2221
      }
 
2222
 
 
2223
      public InvokerCallbackHandler getCallbackHandler()
 
2224
      {
 
2225
         return handler;
 
2226
      }
 
2227
 
 
2228
      public Object getCallbackHandleObject()
 
2229
      {
 
2230
         return handleObject;
 
2231
      }
 
2232
   }
 
2233
   
 
2234
   static private InetAddress getLocalHost() throws UnknownHostException
 
2235
   {
 
2236
      if (SecurityUtility.skipAccessControl())
 
2237
      {
 
2238
         return doGetLocalHost();
 
2239
      }
 
2240
 
 
2241
      try
 
2242
      {
 
2243
         return (InetAddress) AccessController.doPrivileged( new PrivilegedExceptionAction()
 
2244
         {
 
2245
            public Object run() throws UnknownHostException
 
2246
            {
 
2247
               return doGetLocalHost();
 
2248
            }
 
2249
         });
 
2250
      }
 
2251
      catch (PrivilegedActionException e)
 
2252
      {
 
2253
         throw (UnknownHostException) e.getCause();
 
2254
      }
 
2255
   }
 
2256
   
 
2257
   static private InetAddress doGetLocalHost() throws UnknownHostException
 
2258
   {
 
2259
      if (LOCAL_HOST != null)
 
2260
      {
 
2261
         return LOCAL_HOST;
 
2262
      }
 
2263
 
 
2264
      try
 
2265
      {
 
2266
         return InetAddress.getLocalHost();
 
2267
      }
 
2268
      catch (UnknownHostException e)
 
2269
      {
 
2270
         return InetAddress.getByName("127.0.0.1");
 
2271
      }
 
2272
   }
 
2273
   
 
2274
   static private InetAddress getAddressByName(final String host) throws UnknownHostException
 
2275
   {
 
2276
      if (SecurityUtility.skipAccessControl())
 
2277
      {
 
2278
         return InetAddress.getByName(host);
 
2279
      }
 
2280
      
 
2281
      try
 
2282
      {
 
2283
         return (InetAddress)AccessController.doPrivileged( new PrivilegedExceptionAction()
 
2284
         {
 
2285
            public Object run() throws IOException
 
2286
            {
 
2287
               return InetAddress.getByName(host);
 
2288
            }
 
2289
         });
 
2290
      }
 
2291
      catch (PrivilegedActionException e)
 
2292
      {
 
2293
         throw (UnknownHostException) e.getCause();
 
2294
      }
 
2295
   }
 
2296
}