~ubuntu-branches/ubuntu/raring/libjboss-remoting-java/raring

« back to all changes in this revision

Viewing changes to src/main/org/jboss/remoting/transport/socket/SocketServerInvoker.java

  • Committer: Package Import Robot
  • Author(s): Torsten Werner
  • Date: 2011-09-09 14:01:03 UTC
  • mfrom: (1.1.6 upstream)
  • Revision ID: package-import@ubuntu.com-20110909140103-hqokx61534tas9rg
Tags: 2.5.3.SP1-1
* Newer but not newest upstream release. Do not build samples.
* Change debian/watch to upstream's svn repo.
* Add patch to fix compile error caused by tomcat update.
  (Closes: #628303)
* Switch to source format 3.0.
* Switch to debhelper level 7.
* Remove useless Depends.
* Update Standards-Version: 3.9.2.
* Update README.source.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
* JBoss, Home of Professional Open Source
 
3
* Copyright 2005, JBoss Inc., and individual contributors as indicated
 
4
* by the @authors tag. See the copyright.txt in the distribution for a
 
5
* full listing of individual contributors.
 
6
*
 
7
* This is free software; you can redistribute it and/or modify it
 
8
* under the terms of the GNU Lesser General Public License as
 
9
* published by the Free Software Foundation; either version 2.1 of
 
10
* the License, or (at your option) any later version.
 
11
*
 
12
* This software is distributed in the hope that it will be useful,
 
13
* but WITHOUT ANY WARRANTY; without even the implied warranty of
 
14
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
 
15
* Lesser General Public License for more details.
 
16
*
 
17
* You should have received a copy of the GNU Lesser General Public
 
18
* License along with this software; if not, write to the Free
 
19
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
 
20
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
 
21
*/
 
22
 
 
23
package org.jboss.remoting.transport.socket;
 
24
 
 
25
import org.jboss.remoting.Home;
 
26
import org.jboss.remoting.InvokerLocator;
 
27
import org.jboss.remoting.ServerInvoker;
 
28
import org.jboss.remoting.util.SecurityUtility;
 
29
import org.jboss.remoting.util.TimerUtil;
 
30
import org.jboss.remoting.marshal.serializable.SerializableMarshaller;
 
31
import org.jboss.util.propertyeditor.PropertyEditors;
 
32
import org.jboss.logging.Logger;
 
33
 
 
34
import javax.net.ServerSocketFactory;
 
35
import javax.net.ssl.SSLException;
 
36
 
 
37
import java.beans.IntrospectionException;
 
38
import java.io.IOException;
 
39
import java.net.InetAddress;
 
40
import java.net.InetSocketAddress;
 
41
import java.net.ServerSocket;
 
42
import java.net.Socket;
 
43
import java.net.SocketAddress;
 
44
import java.net.SocketException;
 
45
import java.net.UnknownHostException;
 
46
import java.security.AccessController;
 
47
import java.security.PrivilegedActionException;
 
48
import java.security.PrivilegedExceptionAction;
 
49
import java.util.ArrayList;
 
50
import java.util.HashSet;
 
51
import java.util.Iterator;
 
52
import java.util.LinkedList;
 
53
import java.util.List;
 
54
import java.util.Map;
 
55
import java.util.Properties;
 
56
import java.util.Set;
 
57
import java.util.TimerTask;
 
58
 
 
59
/**
 
60
 * SocketServerInvoker is the server-side of a SOCKET based transport
 
61
 *
 
62
 * @author <a href="mailto:jhaynie@vocalocity.net">Jeff Haynie</a>
 
63
 * @author <a href="mailto:tom.elrod@jboss.com">Tom Elrod</a>
 
64
 * @author <a href="mailto:ovidiu@jboss.org">Ovidiu Feodorov</a>
 
65
 *
 
66
 * @version $Revision: 5082 $
 
67
 * @jmx:mbean
 
68
 */
 
69
public class SocketServerInvoker extends ServerInvoker implements SocketServerInvokerMBean
 
70
{
 
71
   private static final Logger log = Logger.getLogger(SocketServerInvoker.class);
 
72
 
 
73
   private static boolean trace = log.isTraceEnabled();
 
74
 
 
75
   static int clientCount = 0;
 
76
 
 
77
   protected Properties props = new Properties();
 
78
 
 
79
   private static int BACKLOG_DEFAULT = 200;
 
80
   protected static int MAX_POOL_SIZE_DEFAULT = 300;
 
81
 
 
82
   /**
 
83
    * Key for indicating if socket invoker should continue to keep socket connection between
 
84
    * client and server open after invocations by sending a ping on the connection
 
85
    * before being re-used.  The default for this is false.
 
86
    */
 
87
   public static final String CHECK_CONNECTION_KEY = "socket.check_connection";
 
88
 
 
89
   /**
 
90
    * Specifies the fully qualified class name for the custom SocketWrapper implementation to use on the server.
 
91
    */
 
92
   public static final String SERVER_SOCKET_CLASS_FLAG = "serverSocketClass";
 
93
   protected String serverSocketClass = ServerSocketWrapper.class.getName();
 
94
 
 
95
   protected List serverSockets = new ArrayList();
 
96
   protected boolean running = false;
 
97
   protected int backlog = BACKLOG_DEFAULT;
 
98
   protected AcceptThread[] acceptThreads;
 
99
   protected int numAcceptThreads = 1;
 
100
   protected int maxPoolSize = MAX_POOL_SIZE_DEFAULT;
 
101
   protected LRUPool clientpool;
 
102
   protected LinkedList threadpool;
 
103
   protected boolean immediateShutdown;
 
104
 
 
105
   protected ServerSocketRefresh refreshThread;
 
106
   protected boolean newServerSocketFactory = false;
 
107
   protected Object serverSocketFactoryLock = new Object();
 
108
 
 
109
   protected boolean reuseAddress = true;
 
110
   protected int receiveBufferSize = -1;
 
111
   
 
112
   /**
 
113
    * More socket configuration parameters.
 
114
    */
 
115
   protected boolean keepAlive;
 
116
   protected boolean keepAliveSet;
 
117
   protected boolean oOBInline;
 
118
   protected boolean oOBInlineSet;
 
119
   protected int sendBufferSize = -1;
 
120
   protected boolean soLinger;
 
121
   protected boolean soLingerSet;
 
122
   protected int soLingerDuration = -1;
 
123
   protected int trafficClass = -1;
 
124
 
 
125
   // defaults to -1 as to not have idle timeouts
 
126
   protected int idleTimeout = -1;
 
127
   protected IdleTimerTask idleTimerTask = null;
 
128
   
 
129
   protected int writeTimeout = -1;
 
130
 
 
131
   public SocketServerInvoker(InvokerLocator locator)
 
132
   {
 
133
      super(locator);
 
134
   }
 
135
 
 
136
   public SocketServerInvoker(InvokerLocator locator, Map configuration)
 
137
   {
 
138
      super(locator, configuration);
 
139
   }
 
140
 
 
141
   /**
 
142
    * after a truststore update use this to
 
143
    * set a new ServerSocketFactory to the invoker<br>
 
144
    * then a new ServerSocket is created that accepts the new connections
 
145
    * @param serverSocketFactory
 
146
        */
 
147
   public void setNewServerSocketFactory(ServerSocketFactory serverSocketFactory)
 
148
   {
 
149
      log.trace("entering setNewServerSocketFactory()");
 
150
      synchronized (serverSocketFactoryLock)
 
151
      {
 
152
         newServerSocketFactory=true;
 
153
         setServerSocketFactory(serverSocketFactory);
 
154
         serverSocketFactoryLock.notify();
 
155
         log.info("ServerSocketFactory has been updated");
 
156
      }
 
157
   }
 
158
 
 
159
   /**
 
160
    * refreshes the serverSocket by closing old one and
 
161
    * creating a new ServerSocket from new ServerSocketFactory
 
162
    * @throws IOException
 
163
    */
 
164
   protected void refreshServerSocket() throws IOException
 
165
   {
 
166
      log.trace("entering refreshServerSocket()");
 
167
      synchronized (serverSocketFactoryLock)
 
168
      {
 
169
         for (int i = 0; i < acceptThreads.length; i++)
 
170
         {
 
171
            // If release() is able to enter its synchronized block and sees 
 
172
            // serverSocket == null, then it knows that something went wrong.
 
173
            newServerSocketFactory=false;
 
174
            ServerSocket oldServerSocket = acceptThreads[i].getServerSocket();
 
175
            InetAddress address = oldServerSocket.getInetAddress();
 
176
            int port = oldServerSocket.getLocalPort();
 
177
            oldServerSocket.close();
 
178
            ServerSocket newServerSocket = null;
 
179
            
 
180
            for (int j = 0; j < 5; j++)
 
181
            {
 
182
               try
 
183
               {
 
184
                  newServerSocket = createServerSocket(port, backlog, address);
 
185
                  break;
 
186
               }
 
187
               catch (Exception e)
 
188
               {
 
189
                  if (j < 4)
 
190
                  {
 
191
                     // Wait for end of TIME_WAIT state (1 to 4 minutes).
 
192
                     log.warn("Unable to recreate ServerSocket: will try again in 65 seconds", e);
 
193
                     try {Thread.sleep(65000);} catch (InterruptedException ignored) {}
 
194
                  }
 
195
                  else
 
196
                  {
 
197
                     log.error("Unable to recreate ServerSocket after 260 seconds", e);
 
198
                     return;
 
199
                  }
 
200
               }
 
201
            }
 
202
            
 
203
            acceptThreads[i].setServerSocket(newServerSocket);
 
204
            log.info(acceptThreads[i] + " has been updated with new ServerSocket");
 
205
         }
 
206
      }
 
207
      log.trace("leaving refreshServerSocket()");
 
208
   }
 
209
 
 
210
   protected void setup() throws Exception
 
211
   {
 
212
      props.putAll(getConfiguration());
 
213
      mapJavaBeanProperties(this, props, false);
 
214
      super.setup();
 
215
      String ssclass = props.getProperty(SERVER_SOCKET_CLASS_FLAG);
 
216
      if(ssclass != null)
 
217
      {
 
218
         serverSocketClass = ssclass;
 
219
      }
 
220
   }
 
221
 
 
222
   protected void finalize() throws Throwable
 
223
   {
 
224
      stop();
 
225
      super.finalize();
 
226
   }
 
227
 
 
228
   /**
 
229
    * Starts the invoker.
 
230
    *
 
231
    * @jmx.managed-operation description = "Start sets up the ServerInvoker we are wrapping."
 
232
    * impact      = "ACTION"
 
233
    */
 
234
   public synchronized void start() throws IOException
 
235
   {
 
236
      if(!running)
 
237
      {
 
238
         log.debug(this + " starting");
 
239
 
 
240
         if(maxPoolSize <= 0)
 
241
         {
 
242
            //need to reset to default
 
243
            maxPoolSize = MAX_POOL_SIZE_DEFAULT;
 
244
         }
 
245
 
 
246
         clientpool = new LRUPool(2, maxPoolSize);
 
247
         clientpool.create();
 
248
         threadpool = new LinkedList();
 
249
 
 
250
         createServerSockets();
 
251
         
 
252
         refreshThread = new ServerSocketRefresh();
 
253
         refreshThread.setDaemon(true);
 
254
         refreshThread.start();
 
255
         
 
256
         acceptThreads = new AcceptThread[numAcceptThreads * getHomes().size()];
 
257
         int i = 0;
 
258
         Iterator it = serverSockets.iterator();
 
259
         while (it.hasNext())
 
260
         {
 
261
            ServerSocket ss = (ServerSocket) it.next();
 
262
            for(int j = 0; j < numAcceptThreads; j++)
 
263
            {
 
264
               acceptThreads[i++] = new AcceptThread(ss, refreshThread);
 
265
            }
 
266
         }
 
267
      }
 
268
 
 
269
      try
 
270
      {
 
271
         super.start();
 
272
      }
 
273
      catch(IOException e)
 
274
      {
 
275
         log.error("Error starting SocketServerInvoker.", e);
 
276
         cleanup();
 
277
      }
 
278
      if(!running)
 
279
      {
 
280
         running = true;
 
281
 
 
282
         for(int i = 0; i < acceptThreads.length; i++)
 
283
         {
 
284
            acceptThreads[i].start();
 
285
         }
 
286
      }
 
287
 
 
288
      if(idleTimeout > 0)
 
289
      {
 
290
         if(idleTimerTask != null)
 
291
         {
 
292
            idleTimerTask.cancel();
 
293
         }
 
294
         idleTimerTask = new IdleTimerTask();
 
295
         TimerUtil.schedule(idleTimerTask, idleTimeout * 1000);
 
296
      }
 
297
      else
 
298
      {
 
299
         if(idleTimerTask != null)
 
300
         {
 
301
            idleTimerTask.cancel();
 
302
         }
 
303
      }
 
304
 
 
305
      log.debug(this + " started");
 
306
 
 
307
   }
 
308
 
 
309
   protected ServerSocket createServerSocket(int serverBindPort,
 
310
                                             final int backlog,
 
311
                                             InetAddress bindAddress) throws IOException
 
312
   {
 
313
      ServerSocketFactory factory = getServerSocketFactory();
 
314
      ServerSocket ss = null;
 
315
      
 
316
      try
 
317
      {
 
318
         ss = factory.createServerSocket();
 
319
      }
 
320
      catch (SocketException e)
 
321
      {
 
322
         if (getReuseAddress())
 
323
            log.warn("Unable to create unbound ServerSocket: cannot set reuseAddress to true",e);
 
324
 
 
325
         ss = factory.createServerSocket(serverBindPort, backlog, bindAddress);
 
326
         configureServerSocket(ss);
 
327
         return ss;
 
328
      }
 
329
      
 
330
      ss.setReuseAddress(getReuseAddress());
 
331
      configureServerSocket(ss);
 
332
      InetSocketAddress address = new InetSocketAddress(bindAddress, serverBindPort);
 
333
      bind(ss, address, backlog);
 
334
      return ss;
 
335
   }
 
336
   
 
337
   protected void createServerSockets() throws IOException
 
338
   {
 
339
      ServerSocketFactory factory = getServerSocketFactory();
 
340
 
 
341
      Iterator it = getHomes().iterator();
 
342
      while (it.hasNext())
 
343
      {
 
344
         Home home = (Home) it.next();
 
345
         InetAddress inetAddress = getAddressByName(home.host);
 
346
         
 
347
         ServerSocket ss = null;
 
348
         try
 
349
         {
 
350
            ss = factory.createServerSocket();
 
351
            ss.setReuseAddress(getReuseAddress());
 
352
            configureServerSocket(ss);
 
353
            InetSocketAddress address = new InetSocketAddress(inetAddress, home.port);
 
354
            bind(ss, address, backlog);
 
355
            if (log.isDebugEnabled()) log.debug(this + " created " + ss);
 
356
         }
 
357
         catch (SocketException e)
 
358
         {
 
359
            if (getReuseAddress())
 
360
               log.warn("Unable to create unbound ServerSocket: cannot set reuseAddress to true");
 
361
 
 
362
            try
 
363
            {
 
364
               ss = factory.createServerSocket(home.port, backlog, inetAddress);
 
365
               configureServerSocket(ss);
 
366
            }
 
367
            catch (IOException e2)
 
368
            {
 
369
               String m = this + " error creating ServerSocket[" + home + "]: " + e2.getMessage();
 
370
               IOException e3 = new IOException(m);
 
371
               log.debug(m, e3);
 
372
               throw e3;
 
373
            }
 
374
         }
 
375
         catch (IOException e)
 
376
         {
 
377
            String m = this + " error creating ServerSocket[" + home + "]: " + e.getMessage();
 
378
            IOException e2 = new IOException(m);
 
379
            log.debug(m, e2);
 
380
            throw e2;
 
381
         }
 
382
         
 
383
         serverSockets.add(ss);
 
384
      }
 
385
   }
 
386
   
 
387
   protected void configureServerSocket(ServerSocket ss) throws SocketException
 
388
   {
 
389
      if (receiveBufferSize != -1)
 
390
      {
 
391
         ss.setReceiveBufferSize(receiveBufferSize);
 
392
      }
 
393
   }
 
394
 
 
395
   protected String getThreadName(int i)
 
396
   {
 
397
      return "AcceptorThread#" + i + ":" + getServerBindPort();
 
398
   }
 
399
 
 
400
   public void destroy()
 
401
   {
 
402
      if(clientpool != null)
 
403
      {
 
404
         synchronized (clientpool)
 
405
         {
 
406
            clientpool.destroy();
 
407
         }
 
408
      }
 
409
      super.destroy();
 
410
   }
 
411
 
 
412
   /**
 
413
    * Stops the invoker.
 
414
    *
 
415
    * @jmx.managed-operation description = "Stops the invoker."
 
416
    * impact      = "ACTION"
 
417
    */
 
418
   public synchronized void stop()
 
419
   {
 
420
      if(running)
 
421
      {
 
422
         cleanup();
 
423
      }
 
424
      super.stop();
 
425
   }
 
426
 
 
427
   protected void cleanup()
 
428
   {
 
429
      running = false;
 
430
      
 
431
      if(acceptThreads != null)
 
432
      {
 
433
         for(int i = 0; i < acceptThreads.length; i++)
 
434
         {
 
435
            acceptThreads[i].shutdown();
 
436
         }
 
437
      }
 
438
      
 
439
      if (refreshThread != null)
 
440
         refreshThread.shutdown();
 
441
      
 
442
      if (idleTimerTask != null)
 
443
      {
 
444
         idleTimerTask.cancel();
 
445
      }
 
446
 
 
447
      maxPoolSize = 0; // so ServerThreads don't reinsert themselves
 
448
      
 
449
      // The following code has been changed to avoid a race condition with ServerThread.run() which
 
450
      // can result in leaving ServerThreads alive, which causes a memory leak.
 
451
      if (clientpool != null)
 
452
      {
 
453
         synchronized (clientpool)
 
454
         {
 
455
            Set svrThreads = clientpool.getContents();
 
456
            Iterator itr = svrThreads.iterator();
 
457
 
 
458
            while(itr.hasNext())
 
459
            {
 
460
               Object o = itr.next();
 
461
               ServerThread st = (ServerThread) o;
 
462
               if (immediateShutdown)
 
463
               {
 
464
                  st.shutdownImmediately();
 
465
               }
 
466
               else
 
467
               {
 
468
                  st.shutdown();
 
469
               }
 
470
            }
 
471
 
 
472
            clientpool.flush();
 
473
            clientpool.stop();
 
474
            
 
475
            log.debug(this + " stopped threads in clientpool");
 
476
 
 
477
            if (threadpool != null)
 
478
            {
 
479
               int threadsToShutdown = threadpool.size();
 
480
               for(int i = 0; i < threadsToShutdown; i++)
 
481
               {
 
482
                  ServerThread thread = (ServerThread) threadpool.removeFirst();
 
483
                  if (immediateShutdown)
 
484
                  {
 
485
                     thread.shutdownImmediately();
 
486
                  }
 
487
                  else
 
488
                  {
 
489
                     thread.shutdown();
 
490
                  }
 
491
               }
 
492
               
 
493
               log.debug(this + " stopped threads in threadpool");
 
494
            }
 
495
         }
 
496
      }
 
497
      
 
498
      log.debug(this + " exiting");
 
499
   }
 
500
 
 
501
 
 
502
   public int getReceiveBufferSize()
 
503
   {
 
504
      return receiveBufferSize;
 
505
   }
 
506
 
 
507
   public void setReceiveBufferSize(int receiveBufferSize)
 
508
   {
 
509
      this.receiveBufferSize = receiveBufferSize;
 
510
   }
 
511
   
 
512
   /**
 
513
    * Indicates if SO_REUSEADDR is enabled on server sockets
 
514
    * Default is true.
 
515
    */
 
516
   public boolean getReuseAddress()
 
517
   {
 
518
      return reuseAddress;
 
519
   }
 
520
 
 
521
   /**
 
522
    * Sets if SO_REUSEADDR is enabled on server sockets.
 
523
    * Default is true.
 
524
    *
 
525
    * @param reuse
 
526
    */
 
527
   public void setReuseAddress(boolean reuse)
 
528
   {
 
529
      this.reuseAddress = reuse;
 
530
   }
 
531
 
 
532
   public boolean isKeepAlive()
 
533
   {
 
534
      return keepAlive;
 
535
   }
 
536
 
 
537
   public void setKeepAlive(boolean keepAlive)
 
538
   {
 
539
      this.keepAlive = keepAlive;
 
540
      keepAliveSet = true;
 
541
   }
 
542
 
 
543
   public boolean isOOBInline()
 
544
   {
 
545
      return oOBInline;
 
546
   }
 
547
 
 
548
   public void setOOBInline(boolean inline)
 
549
   {
 
550
      oOBInline = inline;
 
551
      oOBInlineSet = true;
 
552
   }
 
553
 
 
554
   public int getSendBufferSize()
 
555
   {
 
556
      return sendBufferSize;
 
557
   }
 
558
 
 
559
   public void setSendBufferSize(int sendBufferSize)
 
560
   {
 
561
      this.sendBufferSize = sendBufferSize;
 
562
   }
 
563
 
 
564
   public boolean isSoLinger()
 
565
   {
 
566
      return soLinger;
 
567
   }
 
568
   
 
569
   public int getSoLingerDuration()
 
570
   {
 
571
      return soLingerDuration;
 
572
   }
 
573
 
 
574
   public void setSoLinger(boolean soLinger)
 
575
   {
 
576
      this.soLinger = soLinger;
 
577
      soLingerSet = true;
 
578
   }
 
579
 
 
580
   public void setSoLingerDuration(int soLingerDuration)
 
581
   {
 
582
      this.soLingerDuration = soLingerDuration;
 
583
   }
 
584
 
 
585
   public int getTrafficClass()
 
586
   {
 
587
      return trafficClass;
 
588
   }
 
589
 
 
590
   public void setTrafficClass(int trafficClass)
 
591
   {
 
592
      this.trafficClass = trafficClass;
 
593
   }
 
594
   
 
595
   /**
 
596
    * @return Number of idle ServerThreads
 
597
    * @jmx:managed-attribute
 
598
    */
 
599
   public int getCurrentThreadPoolSize()
 
600
   {
 
601
      return threadpool.size();
 
602
   }
 
603
 
 
604
   /**
 
605
    * @return Number of ServerThreads current executing or waiting on an invocation
 
606
    * @jmx:managed-attribute
 
607
    */
 
608
   public int getCurrentClientPoolSize()
 
609
   {
 
610
      return clientpool.size();
 
611
   }
 
612
 
 
613
   /**
 
614
    * Getter for property numAcceptThreads
 
615
    *
 
616
    * @return The number of threads that exist for accepting client connections
 
617
    * @jmx:managed-attribute
 
618
    */
 
619
   public int getNumAcceptThreads()
 
620
   {
 
621
      return numAcceptThreads;
 
622
   }
 
623
 
 
624
   /**
 
625
    * Setter for property numAcceptThreads
 
626
    *
 
627
    * @param size The number of threads that exist for accepting client connections
 
628
    * @jmx:managed-attribute
 
629
    */
 
630
   public void setNumAcceptThreads(int size)
 
631
   {
 
632
      this.numAcceptThreads = size;
 
633
   }
 
634
 
 
635
   /**
 
636
    * Setter for max pool size.
 
637
    * The number of server threads for processing client. The default is 300.
 
638
    *
 
639
    * @return
 
640
    * @jmx:managed-attribute
 
641
    */
 
642
   public int getMaxPoolSize()
 
643
   {
 
644
      return maxPoolSize;
 
645
   }
 
646
 
 
647
   /**
 
648
    * The number of server threads for processing client. The default is 300.
 
649
    *
 
650
    * @param maxPoolSize
 
651
    * @jmx:managed-attribute
 
652
    */
 
653
   public void setMaxPoolSize(int maxPoolSize)
 
654
   {
 
655
      this.maxPoolSize = maxPoolSize;
 
656
   }
 
657
 
 
658
   /**
 
659
    * @jmx:managed-attribute
 
660
    */
 
661
   public int getBacklog()
 
662
   {
 
663
      return backlog;
 
664
   }
 
665
 
 
666
   /**
 
667
    * @jmx:managed-attribute
 
668
    */
 
669
   public void setBacklog(int backlog)
 
670
   {
 
671
      if(backlog < 0)
 
672
      {
 
673
         this.backlog = BACKLOG_DEFAULT;
 
674
      }
 
675
      else
 
676
      {
 
677
         this.backlog = backlog;
 
678
      }
 
679
   }
 
680
 
 
681
   public int getIdleTimeout()
 
682
   {
 
683
      return idleTimeout;
 
684
   }
 
685
 
 
686
   /**
 
687
    * Sets the timeout for idle threads to be removed from pool.
 
688
    * If the value is greater than 0, then idle timeout will be
 
689
    * activated, otherwise no idle timeouts will occur.  By default,
 
690
    * this value is -1.
 
691
    *
 
692
    * @param idleTimeout number of seconds before a idle thread is timed out.
 
693
    */
 
694
   public void setIdleTimeout(int idleTimeout)
 
695
   {
 
696
      this.idleTimeout = idleTimeout;
 
697
 
 
698
      if(isStarted())
 
699
      {
 
700
         if(idleTimeout > 0)
 
701
         {
 
702
            if(idleTimerTask != null)
 
703
            {
 
704
               idleTimerTask.cancel();
 
705
            }
 
706
            idleTimerTask = new IdleTimerTask();
 
707
            TimerUtil.schedule(idleTimerTask, idleTimeout * 1000);
 
708
         }
 
709
         else
 
710
         {
 
711
            if(idleTimerTask != null)
 
712
            {
 
713
               idleTimerTask.cancel();
 
714
            }
 
715
         }
 
716
      }
 
717
   }
 
718
 
 
719
   public boolean isImmediateShutdown()
 
720
   {
 
721
      return immediateShutdown;
 
722
   }
 
723
 
 
724
   public void setImmediateShutdown(boolean immediateShutdown)
 
725
   {
 
726
      this.immediateShutdown = immediateShutdown;
 
727
   }
 
728
 
 
729
   public int getWriteTimeout()
 
730
   {
 
731
      return writeTimeout;
 
732
   }
 
733
 
 
734
   public void setWriteTimeout(int writeTimeout)
 
735
   {
 
736
      this.writeTimeout = writeTimeout;
 
737
   }
 
738
 
 
739
   protected void configureSocket(Socket s) throws SocketException
 
740
   {
 
741
      s.setReuseAddress(getReuseAddress());
 
742
      
 
743
      if (keepAliveSet)           s.setKeepAlive(keepAlive);
 
744
      if (oOBInlineSet)           s.setOOBInline(oOBInline);
 
745
      if (receiveBufferSize > -1) s.setReceiveBufferSize(receiveBufferSize);
 
746
      if (sendBufferSize > -1)    s.setSendBufferSize(sendBufferSize);
 
747
      if (soLingerSet && 
 
748
            soLingerDuration > 0) s.setSoLinger(soLinger, soLingerDuration);
 
749
      if (trafficClass > -1)      s.setTrafficClass(trafficClass);
 
750
   }
 
751
   
 
752
   /**
 
753
    * The acceptor thread should spend as little time as possbile doing any kind of operation, and
 
754
    * under no circumstances should perform IO on the new socket, which can potentially block and
 
755
    * lock up the server. For this reason, the acceptor thread should grab a worker thread and
 
756
    * delegate all subsequent work to it.
 
757
    */
 
758
   protected void processInvocation(Socket socket) throws Exception
 
759
   {
 
760
      ServerThread worker = null;
 
761
      boolean newThread = false;
 
762
 
 
763
      synchronized(clientpool)
 
764
      {
 
765
         while(worker == null && running)
 
766
         {
 
767
            if(trace) { log.trace(this + " trying to get a worker thread from threadpool for processing"); }
 
768
 
 
769
            if(threadpool.size() > 0)
 
770
            {
 
771
               worker = (ServerThread)threadpool.removeFirst();
 
772
               if(trace) { log.trace(this + (worker == null ? " found NO threads in threadpool"
 
773
                                                            : " got " + worker + " from threadpool")); }
 
774
               
 
775
            }
 
776
            else if (trace) { { log.trace(this + " has an empty threadpool"); } }
 
777
 
 
778
            if(worker == null)
 
779
            {
 
780
               if(clientpool.size() < maxPoolSize)
 
781
               {
 
782
                  if(trace) { log.trace(this + " creating new worker thread"); }
 
783
                  worker = new ServerThread(socket, this, clientpool, threadpool,
 
784
                                            getTimeout(), writeTimeout, serverSocketClass);
 
785
                  if(trace) { log.trace(this + " created " + worker); }
 
786
                  newThread = true;
 
787
               }
 
788
 
 
789
               if(worker == null)
 
790
               {
 
791
                  if(trace) {log.trace(this + " trying to evict a thread from clientpool"); }
 
792
                  clientpool.evict();
 
793
                  clientpool.wait(1000);  // Keep trying, in case all threads are not evictable.
 
794
                  if(trace) { log.trace(this + " notified of clientpool thread availability"); }
 
795
               }
 
796
            }
 
797
         }
 
798
 
 
799
         if (!running)
 
800
         {
 
801
            return;
 
802
         }
 
803
         clientpool.insert(worker, worker);
 
804
      }
 
805
 
 
806
      if(newThread)
 
807
      {
 
808
         if(trace) {log.trace(this + " starting " + worker); }
 
809
         worker.start();
 
810
      }
 
811
      else
 
812
      {
 
813
         if(trace) { log.trace(this + " reusing " + worker); }
 
814
         worker.wakeup(socket, getTimeout(), this);
 
815
      }
 
816
   }
 
817
 
 
818
   /**
 
819
    * returns true if the transport is bi-directional in nature, for example,
 
820
    * SOAP in unidirectional and SOCKETs are bi-directional (unless behind a firewall
 
821
    * for example).
 
822
    */
 
823
   public boolean isTransportBiDirectional()
 
824
   {
 
825
      return true;
 
826
   }
 
827
 
 
828
   public String toString()
 
829
   {
 
830
      return "SocketServerInvoker[" + locator.getHomes() + "]";
 
831
   }
 
832
 
 
833
   /**
 
834
    * Each implementation of the remote client invoker should have
 
835
    * a default data type that is uses in the case it is not specified
 
836
    * in the invoker locator uri.
 
837
    */
 
838
   protected String getDefaultDataType()
 
839
   {
 
840
      return SerializableMarshaller.DATATYPE;
 
841
   }
 
842
 
 
843
   /**
 
844
    * this thread checks if a new ServerSocketFactory was set,<br>
 
845
    * if so initializes a serversocket refresh
 
846
    * @author Michael Voss
 
847
    *
 
848
    */
 
849
   public class ServerSocketRefresh extends Thread
 
850
   {  
 
851
      private boolean running = true;
 
852
      
 
853
      public ServerSocketRefresh()
 
854
      {
 
855
         super("ServerSocketRefresh");
 
856
      }
 
857
      
 
858
      public void run()
 
859
      {
 
860
         while(running)
 
861
         {
 
862
            synchronized (serverSocketFactoryLock)
 
863
            {  
 
864
               if(newServerSocketFactory)
 
865
               {
 
866
                  log.debug("got notice about new ServerSocketFactory");
 
867
                  try
 
868
                  {
 
869
                     log.debug("refreshing server socket");
 
870
                     refreshServerSocket();
 
871
                     log.debug("server socket refreshed");
 
872
                  } catch (IOException e)
 
873
                  {
 
874
                     log.error("could not refresh server socket", e);
 
875
                  }
 
876
               }
 
877
               
 
878
               try
 
879
               {
 
880
                  serverSocketFactoryLock.wait();
 
881
                  log.trace("ServerSocketRefresh thread woke up");
 
882
               }
 
883
               catch (InterruptedException e)
 
884
               {
 
885
               }
 
886
            }
 
887
         }
 
888
         log.debug("ServerSocketRefresh shutting down");
 
889
      }
 
890
 
 
891
      /**
 
892
       * Let SocketServerInvoker.run() resume when refresh is completed
 
893
       */
 
894
      public void release() throws InvalidStateException
 
895
      {
 
896
         synchronized (serverSocketFactoryLock)
 
897
         {
 
898
//            if (serverSocket == null)
 
899
//            {
 
900
//               throw new InvalidStateException("error refreshing ServerSocket");
 
901
//            }
 
902
            log.trace("passed through ServerSocketRefresh.release()");
 
903
         }
 
904
      }
 
905
      
 
906
      public void shutdown()
 
907
      {
 
908
         running = false;
 
909
         
 
910
         synchronized (serverSocketFactoryLock)
 
911
         {
 
912
            serverSocketFactoryLock.notify();
 
913
         }
 
914
      }
 
915
   }
 
916
 
 
917
   /**
 
918
    * The IdleTimerTask is used to periodically check the server threads to
 
919
    * see if any have been idle for a specified amount of time, and if so,
 
920
    * release those threads and their connections and clear from the server
 
921
    * thread pool.
 
922
    */
 
923
   public class IdleTimerTask extends TimerTask
 
924
   {
 
925
      public void run()
 
926
      {
 
927
         Object[] svrThreadArray = null;
 
928
         Set threadsToShutdown = new HashSet();
 
929
 
 
930
         synchronized(clientpool)
 
931
         {
 
932
            Set svrThreads = clientpool.getContents();
 
933
            svrThreadArray = svrThreads.toArray();
 
934
 
 
935
            if(trace)
 
936
            {
 
937
               if(svrThreadArray != null)
 
938
               {
 
939
                  log.trace("Idle timer task fired.  Number of ServerThreads = " + svrThreadArray.length);
 
940
               }
 
941
            }
 
942
 
 
943
            // iterate through pooled server threads and evict idle ones
 
944
            if(svrThreadArray != null)
 
945
            {
 
946
               long currentTime = System.currentTimeMillis();
 
947
 
 
948
               for(int x = 0; x < svrThreadArray.length; x++)
 
949
               {
 
950
                  ServerThread svrThread = (ServerThread)svrThreadArray[x];
 
951
 
 
952
                  // check the idle time and evict
 
953
                  long idleTime = currentTime - svrThread.getLastRequestTimestamp();
 
954
 
 
955
                  if(trace)
 
956
                  {
 
957
                     log.trace("Idle time for ServerThread (" + svrThread + ") is " + idleTime);
 
958
                  }
 
959
 
 
960
                  long idleTimeout = getIdleTimeout() * 1000;
 
961
                  if(idleTime > idleTimeout)
 
962
                  {
 
963
                     if(trace)
 
964
                     {
 
965
                        log.trace("Idle timeout reached for ServerThread (" + svrThread + ") and will be evicted.");
 
966
                     }
 
967
                     clientpool.remove(svrThread);
 
968
                     threadsToShutdown.add(svrThread);
 
969
//                     svrThread.shutdown();
 
970
//                     svrThread.unblock();
 
971
                  }
 
972
               }
 
973
            }
 
974
 
 
975
            // now check idle server threads in the thread pool
 
976
            svrThreadArray = null;
 
977
 
 
978
            if(threadpool.size() > 0)
 
979
            {
 
980
               // now need to check the tread pool to remove threads
 
981
               svrThreadArray = threadpool.toArray();
 
982
            }
 
983
 
 
984
            if(trace)
 
985
            {
 
986
               if(svrThreadArray != null)
 
987
               {
 
988
                  log.trace("Number of ServerThread in thead pool = " + svrThreadArray.length);
 
989
               }
 
990
            }
 
991
 
 
992
            if(svrThreadArray != null)
 
993
            {
 
994
               long currentTime = System.currentTimeMillis();
 
995
 
 
996
               for(int x = 0; x < svrThreadArray.length; x++)
 
997
               {
 
998
                  ServerThread svrThread = (ServerThread)svrThreadArray[x];
 
999
                  long idleTime = currentTime - svrThread.getLastRequestTimestamp();
 
1000
 
 
1001
                  if(trace)
 
1002
                  {
 
1003
                     log.trace("Idle time for ServerThread (" + svrThread + ") is " + idleTime);
 
1004
                  }
 
1005
 
 
1006
                  long idleTimeout = getIdleTimeout() * 1000;
 
1007
                  if(idleTime > idleTimeout)
 
1008
                  {
 
1009
                     if(trace)
 
1010
                     {
 
1011
                        log.trace("Idle timeout reached for ServerThread (" + svrThread + ") and will be removed from thread pool.");
 
1012
                     }
 
1013
                     threadpool.remove(svrThread);
 
1014
                     threadsToShutdown.add(svrThread);
 
1015
//                     svrThread.shutdown();
 
1016
                  }
 
1017
               }
 
1018
            }
 
1019
         }
 
1020
         
 
1021
         Iterator it = threadsToShutdown.iterator();
 
1022
         while (it.hasNext())
 
1023
         {
 
1024
            ServerThread svrThread = (ServerThread) it.next();
 
1025
            svrThread.shutdown();
 
1026
//            svrThread.unblock();
 
1027
         }
 
1028
      }
 
1029
   }
 
1030
   
 
1031
   public class AcceptThread extends Thread
 
1032
   {
 
1033
      ServerSocket serverSocket;
 
1034
      ServerSocketRefresh refreshThread; 
 
1035
      
 
1036
      public AcceptThread(ServerSocket serverSocket, ServerSocketRefresh refreshThread)
 
1037
      {
 
1038
         this.serverSocket = serverSocket;
 
1039
         this.refreshThread = refreshThread;
 
1040
         setName("AcceptorThread[" + serverSocket + "]");
 
1041
         if(trace) log.trace(SocketServerInvoker.this + " created " + this); 
 
1042
      }
 
1043
      
 
1044
      public void run()
 
1045
      {
 
1046
         if(trace) { log.trace(this + " started execution of method run()"); }
 
1047
 
 
1048
         while(running)
 
1049
         {
 
1050
            try
 
1051
            {
 
1052
               refreshThread.release(); //goes on if serversocket refresh is completed
 
1053
 
 
1054
               if(trace) { log.trace(this + " is going to wait on serverSocket.accept()"); }
 
1055
 
 
1056
               Socket socket = accept(serverSocket);
 
1057
               if(trace) { log.trace(this + " accepted " + socket); }
 
1058
 
 
1059
               // the acceptor thread should spend as little time as possbile doing any kind of
 
1060
               // operation, and under no circumstances should perform IO on the new socket, which
 
1061
               // can potentially block and lock up the server. For this reason, the acceptor thread
 
1062
               // should grab a worker thread and delegate all subsequent work to it. This is what
 
1063
               // processInvocation() does.
 
1064
 
 
1065
               configureSocket(socket);
 
1066
               processInvocation(socket);
 
1067
            }
 
1068
            catch (SSLException e)
 
1069
            {
 
1070
               log.error("SSLServerSocket error", e);
 
1071
               return;
 
1072
            }
 
1073
            catch (InvalidStateException e)
 
1074
            {
 
1075
               log.error("Cannot proceed without functioning server socket.  Shutting down", e);
 
1076
               return;
 
1077
            }
 
1078
            catch(Throwable ex)
 
1079
            {  
 
1080
               if(running)
 
1081
               {
 
1082
                  log.error(this + " failed to handle socket", ex);
 
1083
               }
 
1084
               else
 
1085
               {
 
1086
                  log.trace(this + " caught exception in run()", ex);     
 
1087
               }
 
1088
            }
 
1089
         }
 
1090
      }
 
1091
      
 
1092
      public  void shutdown()
 
1093
      {
 
1094
         try
 
1095
         {
 
1096
            serverSocket.close();
 
1097
         }
 
1098
         catch (IOException e)
 
1099
         {
 
1100
            log.debug(this + " error closing " + serverSocket, e);
 
1101
         }
 
1102
      }
 
1103
      
 
1104
      public ServerSocket getServerSocket()
 
1105
      {
 
1106
         return serverSocket;
 
1107
      }
 
1108
      
 
1109
      public void setServerSocket(ServerSocket serverSocket)
 
1110
      {
 
1111
         this.serverSocket = serverSocket;
 
1112
      }
 
1113
   }
 
1114
   
 
1115
   static private void mapJavaBeanProperties(final Object o, final Properties props, final boolean isStrict)
 
1116
   throws IntrospectionException
 
1117
   {
 
1118
      if (SecurityUtility.skipAccessControl())
 
1119
      {
 
1120
         PropertyEditors.mapJavaBeanProperties(o, props, isStrict);
 
1121
         return;
 
1122
      }
 
1123
 
 
1124
      try
 
1125
      {
 
1126
         AccessController.doPrivileged( new PrivilegedExceptionAction()
 
1127
         {
 
1128
            public Object run() throws IntrospectionException
 
1129
            {
 
1130
               PropertyEditors.mapJavaBeanProperties(o, props, isStrict);
 
1131
               return null;
 
1132
            }
 
1133
         });
 
1134
      }
 
1135
      catch (PrivilegedActionException e)
 
1136
      {
 
1137
         throw (IntrospectionException) e.getCause();
 
1138
      }
 
1139
   }
 
1140
   
 
1141
   static private Socket accept(final ServerSocket ss) throws IOException
 
1142
   {
 
1143
      if (SecurityUtility.skipAccessControl())
 
1144
      {
 
1145
         return ss.accept();
 
1146
      }
 
1147
      
 
1148
      try
 
1149
      {
 
1150
          return (Socket)AccessController.doPrivileged( new PrivilegedExceptionAction()
 
1151
          {
 
1152
             public Object run() throws Exception
 
1153
             {
 
1154
                 return ss.accept();
 
1155
             }
 
1156
          });
 
1157
      }
 
1158
      catch (PrivilegedActionException e)
 
1159
      {
 
1160
          throw (IOException) e.getCause();
 
1161
      }
 
1162
   }
 
1163
 
 
1164
   static private void bind(final ServerSocket ss, final SocketAddress address,
 
1165
                           final int backlog) throws IOException
 
1166
   {
 
1167
      if (SecurityUtility.skipAccessControl())
 
1168
      {
 
1169
         ss.bind(address, backlog);
 
1170
         return;
 
1171
      }
 
1172
      
 
1173
      try
 
1174
      {
 
1175
          AccessController.doPrivileged( new PrivilegedExceptionAction()
 
1176
          {
 
1177
             public Object run() throws Exception
 
1178
             {
 
1179
                ss.bind(address, backlog);
 
1180
                return null;
 
1181
             }
 
1182
          });
 
1183
      }
 
1184
      catch (PrivilegedActionException e)
 
1185
      {
 
1186
          throw (IOException) e.getCause();
 
1187
      }
 
1188
   }
 
1189
   
 
1190
   static private InetAddress getAddressByName(final String host) throws UnknownHostException
 
1191
   {
 
1192
      if (SecurityUtility.skipAccessControl())
 
1193
      {
 
1194
         return InetAddress.getByName(host);
 
1195
      }
 
1196
      
 
1197
      try
 
1198
      {
 
1199
         return (InetAddress)AccessController.doPrivileged( new PrivilegedExceptionAction()
 
1200
         {
 
1201
            public Object run() throws IOException
 
1202
            {
 
1203
               return InetAddress.getByName(host);
 
1204
            }
 
1205
         });
 
1206
      }
 
1207
      catch (PrivilegedActionException e)
 
1208
      {
 
1209
         throw (UnknownHostException) e.getCause();
 
1210
      }
 
1211
   }
 
1212
}