~ubuntu-branches/ubuntu/wily/libjboss-remoting-java/wily

« back to all changes in this revision

Viewing changes to src/org/jboss/remoting/transport/socket/SocketServerInvoker.java

  • Committer: Package Import Robot
  • Author(s): Torsten Werner
  • Date: 2011-09-09 14:01:03 UTC
  • mfrom: (1.1.6 upstream)
  • Revision ID: package-import@ubuntu.com-20110909140103-hqokx61534tas9rg
Tags: 2.5.3.SP1-1
* Newer but not newest upstream release. Do not build samples.
* Change debian/watch to upstream's svn repo.
* Add patch to fix compile error caused by tomcat update.
  (Closes: #628303)
* Switch to source format 3.0.
* Switch to debhelper level 7.
* Remove useless Depends.
* Update Standards-Version: 3.9.2.
* Update README.source.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/*
2
 
* JBoss, Home of Professional Open Source
3
 
* Copyright 2005, JBoss Inc., and individual contributors as indicated
4
 
* by the @authors tag. See the copyright.txt in the distribution for a
5
 
* full listing of individual contributors.
6
 
*
7
 
* This is free software; you can redistribute it and/or modify it
8
 
* under the terms of the GNU Lesser General Public License as
9
 
* published by the Free Software Foundation; either version 2.1 of
10
 
* the License, or (at your option) any later version.
11
 
*
12
 
* This software is distributed in the hope that it will be useful,
13
 
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14
 
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15
 
* Lesser General Public License for more details.
16
 
*
17
 
* You should have received a copy of the GNU Lesser General Public
18
 
* License along with this software; if not, write to the Free
19
 
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20
 
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21
 
*/
22
 
 
23
 
package org.jboss.remoting.transport.socket;
24
 
 
25
 
import org.jboss.remoting.Home;
26
 
import org.jboss.remoting.InvokerLocator;
27
 
import org.jboss.remoting.ServerInvoker;
28
 
import org.jboss.remoting.util.SecurityUtility;
29
 
import org.jboss.remoting.util.TimerUtil;
30
 
import org.jboss.remoting.marshal.serializable.SerializableMarshaller;
31
 
import org.jboss.util.propertyeditor.PropertyEditors;
32
 
import org.jboss.logging.Logger;
33
 
 
34
 
import javax.net.ServerSocketFactory;
35
 
import javax.net.ssl.SSLException;
36
 
 
37
 
import java.beans.IntrospectionException;
38
 
import java.io.IOException;
39
 
import java.net.InetAddress;
40
 
import java.net.InetSocketAddress;
41
 
import java.net.ServerSocket;
42
 
import java.net.Socket;
43
 
import java.net.SocketAddress;
44
 
import java.net.SocketException;
45
 
import java.net.UnknownHostException;
46
 
import java.security.AccessController;
47
 
import java.security.PrivilegedActionException;
48
 
import java.security.PrivilegedExceptionAction;
49
 
import java.util.ArrayList;
50
 
import java.util.HashSet;
51
 
import java.util.Iterator;
52
 
import java.util.LinkedList;
53
 
import java.util.List;
54
 
import java.util.Map;
55
 
import java.util.Properties;
56
 
import java.util.Set;
57
 
import java.util.TimerTask;
58
 
 
59
 
/**
60
 
 * SocketServerInvoker is the server-side of a SOCKET based transport
61
 
 *
62
 
 * @author <a href="mailto:jhaynie@vocalocity.net">Jeff Haynie</a>
63
 
 * @author <a href="mailto:tom.elrod@jboss.com">Tom Elrod</a>
64
 
 * @author <a href="mailto:ovidiu@jboss.org">Ovidiu Feodorov</a>
65
 
 *
66
 
 * @version $Revision: 5082 $
67
 
 * @jmx:mbean
68
 
 */
69
 
public class SocketServerInvoker extends ServerInvoker implements SocketServerInvokerMBean
70
 
{
71
 
   private static final Logger log = Logger.getLogger(SocketServerInvoker.class);
72
 
 
73
 
   private static boolean trace = log.isTraceEnabled();
74
 
 
75
 
   static int clientCount = 0;
76
 
 
77
 
   protected Properties props = new Properties();
78
 
 
79
 
   private static int BACKLOG_DEFAULT = 200;
80
 
   protected static int MAX_POOL_SIZE_DEFAULT = 300;
81
 
 
82
 
   /**
83
 
    * Key for indicating if socket invoker should continue to keep socket connection between
84
 
    * client and server open after invocations by sending a ping on the connection
85
 
    * before being re-used.  The default for this is false.
86
 
    */
87
 
   public static final String CHECK_CONNECTION_KEY = "socket.check_connection";
88
 
 
89
 
   /**
90
 
    * Specifies the fully qualified class name for the custom SocketWrapper implementation to use on the server.
91
 
    */
92
 
   public static final String SERVER_SOCKET_CLASS_FLAG = "serverSocketClass";
93
 
   protected String serverSocketClass = ServerSocketWrapper.class.getName();
94
 
 
95
 
   protected List serverSockets = new ArrayList();
96
 
   protected boolean running = false;
97
 
   protected int backlog = BACKLOG_DEFAULT;
98
 
   protected AcceptThread[] acceptThreads;
99
 
   protected int numAcceptThreads = 1;
100
 
   protected int maxPoolSize = MAX_POOL_SIZE_DEFAULT;
101
 
   protected LRUPool clientpool;
102
 
   protected LinkedList threadpool;
103
 
   protected boolean immediateShutdown;
104
 
 
105
 
   protected ServerSocketRefresh refreshThread;
106
 
   protected boolean newServerSocketFactory = false;
107
 
   protected Object serverSocketFactoryLock = new Object();
108
 
 
109
 
   protected boolean reuseAddress = true;
110
 
   protected int receiveBufferSize = -1;
111
 
   
112
 
   /**
113
 
    * More socket configuration parameters.
114
 
    */
115
 
   protected boolean keepAlive;
116
 
   protected boolean keepAliveSet;
117
 
   protected boolean oOBInline;
118
 
   protected boolean oOBInlineSet;
119
 
   protected int sendBufferSize = -1;
120
 
   protected boolean soLinger;
121
 
   protected boolean soLingerSet;
122
 
   protected int soLingerDuration = -1;
123
 
   protected int trafficClass = -1;
124
 
 
125
 
   // defaults to -1 as to not have idle timeouts
126
 
   protected int idleTimeout = -1;
127
 
   protected IdleTimerTask idleTimerTask = null;
128
 
   
129
 
   protected int writeTimeout = -1;
130
 
 
131
 
   public SocketServerInvoker(InvokerLocator locator)
132
 
   {
133
 
      super(locator);
134
 
   }
135
 
 
136
 
   public SocketServerInvoker(InvokerLocator locator, Map configuration)
137
 
   {
138
 
      super(locator, configuration);
139
 
   }
140
 
 
141
 
   /**
142
 
    * after a truststore update use this to
143
 
    * set a new ServerSocketFactory to the invoker<br>
144
 
    * then a new ServerSocket is created that accepts the new connections
145
 
    * @param serverSocketFactory
146
 
        */
147
 
   public void setNewServerSocketFactory(ServerSocketFactory serverSocketFactory)
148
 
   {
149
 
      log.trace("entering setNewServerSocketFactory()");
150
 
      synchronized (serverSocketFactoryLock)
151
 
      {
152
 
         newServerSocketFactory=true;
153
 
         setServerSocketFactory(serverSocketFactory);
154
 
         serverSocketFactoryLock.notify();
155
 
         log.info("ServerSocketFactory has been updated");
156
 
      }
157
 
   }
158
 
 
159
 
   /**
160
 
    * refreshes the serverSocket by closing old one and
161
 
    * creating a new ServerSocket from new ServerSocketFactory
162
 
    * @throws IOException
163
 
    */
164
 
   protected void refreshServerSocket() throws IOException
165
 
   {
166
 
      log.trace("entering refreshServerSocket()");
167
 
      synchronized (serverSocketFactoryLock)
168
 
      {
169
 
         for (int i = 0; i < acceptThreads.length; i++)
170
 
         {
171
 
            // If release() is able to enter its synchronized block and sees 
172
 
            // serverSocket == null, then it knows that something went wrong.
173
 
            newServerSocketFactory=false;
174
 
            ServerSocket oldServerSocket = acceptThreads[i].getServerSocket();
175
 
            InetAddress address = oldServerSocket.getInetAddress();
176
 
            int port = oldServerSocket.getLocalPort();
177
 
            oldServerSocket.close();
178
 
            ServerSocket newServerSocket = null;
179
 
            
180
 
            for (int j = 0; j < 5; j++)
181
 
            {
182
 
               try
183
 
               {
184
 
                  newServerSocket = createServerSocket(port, backlog, address);
185
 
                  break;
186
 
               }
187
 
               catch (Exception e)
188
 
               {
189
 
                  if (j < 4)
190
 
                  {
191
 
                     // Wait for end of TIME_WAIT state (1 to 4 minutes).
192
 
                     log.warn("Unable to recreate ServerSocket: will try again in 65 seconds", e);
193
 
                     try {Thread.sleep(65000);} catch (InterruptedException ignored) {}
194
 
                  }
195
 
                  else
196
 
                  {
197
 
                     log.error("Unable to recreate ServerSocket after 260 seconds", e);
198
 
                     return;
199
 
                  }
200
 
               }
201
 
            }
202
 
            
203
 
            acceptThreads[i].setServerSocket(newServerSocket);
204
 
            log.info(acceptThreads[i] + " has been updated with new ServerSocket");
205
 
         }
206
 
      }
207
 
      log.trace("leaving refreshServerSocket()");
208
 
   }
209
 
 
210
 
   protected void setup() throws Exception
211
 
   {
212
 
      props.putAll(getConfiguration());
213
 
      mapJavaBeanProperties(this, props, false);
214
 
      super.setup();
215
 
      String ssclass = props.getProperty(SERVER_SOCKET_CLASS_FLAG);
216
 
      if(ssclass != null)
217
 
      {
218
 
         serverSocketClass = ssclass;
219
 
      }
220
 
   }
221
 
 
222
 
   protected void finalize() throws Throwable
223
 
   {
224
 
      stop();
225
 
      super.finalize();
226
 
   }
227
 
 
228
 
   /**
229
 
    * Starts the invoker.
230
 
    *
231
 
    * @jmx.managed-operation description = "Start sets up the ServerInvoker we are wrapping."
232
 
    * impact      = "ACTION"
233
 
    */
234
 
   public synchronized void start() throws IOException
235
 
   {
236
 
      if(!running)
237
 
      {
238
 
         log.debug(this + " starting");
239
 
 
240
 
         if(maxPoolSize <= 0)
241
 
         {
242
 
            //need to reset to default
243
 
            maxPoolSize = MAX_POOL_SIZE_DEFAULT;
244
 
         }
245
 
 
246
 
         clientpool = new LRUPool(2, maxPoolSize);
247
 
         clientpool.create();
248
 
         threadpool = new LinkedList();
249
 
 
250
 
         createServerSockets();
251
 
         
252
 
         refreshThread = new ServerSocketRefresh();
253
 
         refreshThread.setDaemon(true);
254
 
         refreshThread.start();
255
 
         
256
 
         acceptThreads = new AcceptThread[numAcceptThreads * getHomes().size()];
257
 
         int i = 0;
258
 
         Iterator it = serverSockets.iterator();
259
 
         while (it.hasNext())
260
 
         {
261
 
            ServerSocket ss = (ServerSocket) it.next();
262
 
            for(int j = 0; j < numAcceptThreads; j++)
263
 
            {
264
 
               acceptThreads[i++] = new AcceptThread(ss, refreshThread);
265
 
            }
266
 
         }
267
 
      }
268
 
 
269
 
      try
270
 
      {
271
 
         super.start();
272
 
      }
273
 
      catch(IOException e)
274
 
      {
275
 
         log.error("Error starting SocketServerInvoker.", e);
276
 
         cleanup();
277
 
      }
278
 
      if(!running)
279
 
      {
280
 
         running = true;
281
 
 
282
 
         for(int i = 0; i < acceptThreads.length; i++)
283
 
         {
284
 
            acceptThreads[i].start();
285
 
         }
286
 
      }
287
 
 
288
 
      if(idleTimeout > 0)
289
 
      {
290
 
         if(idleTimerTask != null)
291
 
         {
292
 
            idleTimerTask.cancel();
293
 
         }
294
 
         idleTimerTask = new IdleTimerTask();
295
 
         TimerUtil.schedule(idleTimerTask, idleTimeout * 1000);
296
 
      }
297
 
      else
298
 
      {
299
 
         if(idleTimerTask != null)
300
 
         {
301
 
            idleTimerTask.cancel();
302
 
         }
303
 
      }
304
 
 
305
 
      log.debug(this + " started");
306
 
 
307
 
   }
308
 
 
309
 
   protected ServerSocket createServerSocket(int serverBindPort,
310
 
                                             final int backlog,
311
 
                                             InetAddress bindAddress) throws IOException
312
 
   {
313
 
      ServerSocketFactory factory = getServerSocketFactory();
314
 
      ServerSocket ss = null;
315
 
      
316
 
      try
317
 
      {
318
 
         ss = factory.createServerSocket();
319
 
      }
320
 
      catch (SocketException e)
321
 
      {
322
 
         if (getReuseAddress())
323
 
            log.warn("Unable to create unbound ServerSocket: cannot set reuseAddress to true",e);
324
 
 
325
 
         ss = factory.createServerSocket(serverBindPort, backlog, bindAddress);
326
 
         configureServerSocket(ss);
327
 
         return ss;
328
 
      }
329
 
      
330
 
      ss.setReuseAddress(getReuseAddress());
331
 
      configureServerSocket(ss);
332
 
      InetSocketAddress address = new InetSocketAddress(bindAddress, serverBindPort);
333
 
      bind(ss, address, backlog);
334
 
      return ss;
335
 
   }
336
 
   
337
 
   protected void createServerSockets() throws IOException
338
 
   {
339
 
      ServerSocketFactory factory = getServerSocketFactory();
340
 
 
341
 
      Iterator it = getHomes().iterator();
342
 
      while (it.hasNext())
343
 
      {
344
 
         Home home = (Home) it.next();
345
 
         InetAddress inetAddress = getAddressByName(home.host);
346
 
         
347
 
         ServerSocket ss = null;
348
 
         try
349
 
         {
350
 
            ss = factory.createServerSocket();
351
 
            ss.setReuseAddress(getReuseAddress());
352
 
            configureServerSocket(ss);
353
 
            InetSocketAddress address = new InetSocketAddress(inetAddress, home.port);
354
 
            bind(ss, address, backlog);
355
 
            if (log.isDebugEnabled()) log.debug(this + " created " + ss);
356
 
         }
357
 
         catch (SocketException e)
358
 
         {
359
 
            if (getReuseAddress())
360
 
               log.warn("Unable to create unbound ServerSocket: cannot set reuseAddress to true");
361
 
 
362
 
            try
363
 
            {
364
 
               ss = factory.createServerSocket(home.port, backlog, inetAddress);
365
 
               configureServerSocket(ss);
366
 
            }
367
 
            catch (IOException e2)
368
 
            {
369
 
               String m = this + " error creating ServerSocket[" + home + "]: " + e2.getMessage();
370
 
               IOException e3 = new IOException(m);
371
 
               log.debug(m, e3);
372
 
               throw e3;
373
 
            }
374
 
         }
375
 
         catch (IOException e)
376
 
         {
377
 
            String m = this + " error creating ServerSocket[" + home + "]: " + e.getMessage();
378
 
            IOException e2 = new IOException(m);
379
 
            log.debug(m, e2);
380
 
            throw e2;
381
 
         }
382
 
         
383
 
         serverSockets.add(ss);
384
 
      }
385
 
   }
386
 
   
387
 
   protected void configureServerSocket(ServerSocket ss) throws SocketException
388
 
   {
389
 
      if (receiveBufferSize != -1)
390
 
      {
391
 
         ss.setReceiveBufferSize(receiveBufferSize);
392
 
      }
393
 
   }
394
 
 
395
 
   protected String getThreadName(int i)
396
 
   {
397
 
      return "AcceptorThread#" + i + ":" + getServerBindPort();
398
 
   }
399
 
 
400
 
   public void destroy()
401
 
   {
402
 
      if(clientpool != null)
403
 
      {
404
 
         synchronized (clientpool)
405
 
         {
406
 
            clientpool.destroy();
407
 
         }
408
 
      }
409
 
      super.destroy();
410
 
   }
411
 
 
412
 
   /**
413
 
    * Stops the invoker.
414
 
    *
415
 
    * @jmx.managed-operation description = "Stops the invoker."
416
 
    * impact      = "ACTION"
417
 
    */
418
 
   public synchronized void stop()
419
 
   {
420
 
      if(running)
421
 
      {
422
 
         cleanup();
423
 
      }
424
 
      super.stop();
425
 
   }
426
 
 
427
 
   protected void cleanup()
428
 
   {
429
 
      running = false;
430
 
      
431
 
      if(acceptThreads != null)
432
 
      {
433
 
         for(int i = 0; i < acceptThreads.length; i++)
434
 
         {
435
 
            acceptThreads[i].shutdown();
436
 
         }
437
 
      }
438
 
      
439
 
      if (refreshThread != null)
440
 
         refreshThread.shutdown();
441
 
      
442
 
      if (idleTimerTask != null)
443
 
      {
444
 
         idleTimerTask.cancel();
445
 
      }
446
 
 
447
 
      maxPoolSize = 0; // so ServerThreads don't reinsert themselves
448
 
      
449
 
      // The following code has been changed to avoid a race condition with ServerThread.run() which
450
 
      // can result in leaving ServerThreads alive, which causes a memory leak.
451
 
      if (clientpool != null)
452
 
      {
453
 
         synchronized (clientpool)
454
 
         {
455
 
            Set svrThreads = clientpool.getContents();
456
 
            Iterator itr = svrThreads.iterator();
457
 
 
458
 
            while(itr.hasNext())
459
 
            {
460
 
               Object o = itr.next();
461
 
               ServerThread st = (ServerThread) o;
462
 
               if (immediateShutdown)
463
 
               {
464
 
                  st.shutdownImmediately();
465
 
               }
466
 
               else
467
 
               {
468
 
                  st.shutdown();
469
 
               }
470
 
            }
471
 
 
472
 
            clientpool.flush();
473
 
            clientpool.stop();
474
 
            
475
 
            log.debug(this + " stopped threads in clientpool");
476
 
 
477
 
            if (threadpool != null)
478
 
            {
479
 
               int threadsToShutdown = threadpool.size();
480
 
               for(int i = 0; i < threadsToShutdown; i++)
481
 
               {
482
 
                  ServerThread thread = (ServerThread) threadpool.removeFirst();
483
 
                  if (immediateShutdown)
484
 
                  {
485
 
                     thread.shutdownImmediately();
486
 
                  }
487
 
                  else
488
 
                  {
489
 
                     thread.shutdown();
490
 
                  }
491
 
               }
492
 
               
493
 
               log.debug(this + " stopped threads in threadpool");
494
 
            }
495
 
         }
496
 
      }
497
 
      
498
 
      log.debug(this + " exiting");
499
 
   }
500
 
 
501
 
 
502
 
   public int getReceiveBufferSize()
503
 
   {
504
 
      return receiveBufferSize;
505
 
   }
506
 
 
507
 
   public void setReceiveBufferSize(int receiveBufferSize)
508
 
   {
509
 
      this.receiveBufferSize = receiveBufferSize;
510
 
   }
511
 
   
512
 
   /**
513
 
    * Indicates if SO_REUSEADDR is enabled on server sockets
514
 
    * Default is true.
515
 
    */
516
 
   public boolean getReuseAddress()
517
 
   {
518
 
      return reuseAddress;
519
 
   }
520
 
 
521
 
   /**
522
 
    * Sets if SO_REUSEADDR is enabled on server sockets.
523
 
    * Default is true.
524
 
    *
525
 
    * @param reuse
526
 
    */
527
 
   public void setReuseAddress(boolean reuse)
528
 
   {
529
 
      this.reuseAddress = reuse;
530
 
   }
531
 
 
532
 
   public boolean isKeepAlive()
533
 
   {
534
 
      return keepAlive;
535
 
   }
536
 
 
537
 
   public void setKeepAlive(boolean keepAlive)
538
 
   {
539
 
      this.keepAlive = keepAlive;
540
 
      keepAliveSet = true;
541
 
   }
542
 
 
543
 
   public boolean isOOBInline()
544
 
   {
545
 
      return oOBInline;
546
 
   }
547
 
 
548
 
   public void setOOBInline(boolean inline)
549
 
   {
550
 
      oOBInline = inline;
551
 
      oOBInlineSet = true;
552
 
   }
553
 
 
554
 
   public int getSendBufferSize()
555
 
   {
556
 
      return sendBufferSize;
557
 
   }
558
 
 
559
 
   public void setSendBufferSize(int sendBufferSize)
560
 
   {
561
 
      this.sendBufferSize = sendBufferSize;
562
 
   }
563
 
 
564
 
   public boolean isSoLinger()
565
 
   {
566
 
      return soLinger;
567
 
   }
568
 
   
569
 
   public int getSoLingerDuration()
570
 
   {
571
 
      return soLingerDuration;
572
 
   }
573
 
 
574
 
   public void setSoLinger(boolean soLinger)
575
 
   {
576
 
      this.soLinger = soLinger;
577
 
      soLingerSet = true;
578
 
   }
579
 
 
580
 
   public void setSoLingerDuration(int soLingerDuration)
581
 
   {
582
 
      this.soLingerDuration = soLingerDuration;
583
 
   }
584
 
 
585
 
   public int getTrafficClass()
586
 
   {
587
 
      return trafficClass;
588
 
   }
589
 
 
590
 
   public void setTrafficClass(int trafficClass)
591
 
   {
592
 
      this.trafficClass = trafficClass;
593
 
   }
594
 
   
595
 
   /**
596
 
    * @return Number of idle ServerThreads
597
 
    * @jmx:managed-attribute
598
 
    */
599
 
   public int getCurrentThreadPoolSize()
600
 
   {
601
 
      return threadpool.size();
602
 
   }
603
 
 
604
 
   /**
605
 
    * @return Number of ServerThreads current executing or waiting on an invocation
606
 
    * @jmx:managed-attribute
607
 
    */
608
 
   public int getCurrentClientPoolSize()
609
 
   {
610
 
      return clientpool.size();
611
 
   }
612
 
 
613
 
   /**
614
 
    * Getter for property numAcceptThreads
615
 
    *
616
 
    * @return The number of threads that exist for accepting client connections
617
 
    * @jmx:managed-attribute
618
 
    */
619
 
   public int getNumAcceptThreads()
620
 
   {
621
 
      return numAcceptThreads;
622
 
   }
623
 
 
624
 
   /**
625
 
    * Setter for property numAcceptThreads
626
 
    *
627
 
    * @param size The number of threads that exist for accepting client connections
628
 
    * @jmx:managed-attribute
629
 
    */
630
 
   public void setNumAcceptThreads(int size)
631
 
   {
632
 
      this.numAcceptThreads = size;
633
 
   }
634
 
 
635
 
   /**
636
 
    * Setter for max pool size.
637
 
    * The number of server threads for processing client. The default is 300.
638
 
    *
639
 
    * @return
640
 
    * @jmx:managed-attribute
641
 
    */
642
 
   public int getMaxPoolSize()
643
 
   {
644
 
      return maxPoolSize;
645
 
   }
646
 
 
647
 
   /**
648
 
    * The number of server threads for processing client. The default is 300.
649
 
    *
650
 
    * @param maxPoolSize
651
 
    * @jmx:managed-attribute
652
 
    */
653
 
   public void setMaxPoolSize(int maxPoolSize)
654
 
   {
655
 
      this.maxPoolSize = maxPoolSize;
656
 
   }
657
 
 
658
 
   /**
659
 
    * @jmx:managed-attribute
660
 
    */
661
 
   public int getBacklog()
662
 
   {
663
 
      return backlog;
664
 
   }
665
 
 
666
 
   /**
667
 
    * @jmx:managed-attribute
668
 
    */
669
 
   public void setBacklog(int backlog)
670
 
   {
671
 
      if(backlog < 0)
672
 
      {
673
 
         this.backlog = BACKLOG_DEFAULT;
674
 
      }
675
 
      else
676
 
      {
677
 
         this.backlog = backlog;
678
 
      }
679
 
   }
680
 
 
681
 
   public int getIdleTimeout()
682
 
   {
683
 
      return idleTimeout;
684
 
   }
685
 
 
686
 
   /**
687
 
    * Sets the timeout for idle threads to be removed from pool.
688
 
    * If the value is greater than 0, then idle timeout will be
689
 
    * activated, otherwise no idle timeouts will occur.  By default,
690
 
    * this value is -1.
691
 
    *
692
 
    * @param idleTimeout number of seconds before a idle thread is timed out.
693
 
    */
694
 
   public void setIdleTimeout(int idleTimeout)
695
 
   {
696
 
      this.idleTimeout = idleTimeout;
697
 
 
698
 
      if(isStarted())
699
 
      {
700
 
         if(idleTimeout > 0)
701
 
         {
702
 
            if(idleTimerTask != null)
703
 
            {
704
 
               idleTimerTask.cancel();
705
 
            }
706
 
            idleTimerTask = new IdleTimerTask();
707
 
            TimerUtil.schedule(idleTimerTask, idleTimeout * 1000);
708
 
         }
709
 
         else
710
 
         {
711
 
            if(idleTimerTask != null)
712
 
            {
713
 
               idleTimerTask.cancel();
714
 
            }
715
 
         }
716
 
      }
717
 
   }
718
 
 
719
 
   public boolean isImmediateShutdown()
720
 
   {
721
 
      return immediateShutdown;
722
 
   }
723
 
 
724
 
   public void setImmediateShutdown(boolean immediateShutdown)
725
 
   {
726
 
      this.immediateShutdown = immediateShutdown;
727
 
   }
728
 
 
729
 
   public int getWriteTimeout()
730
 
   {
731
 
      return writeTimeout;
732
 
   }
733
 
 
734
 
   public void setWriteTimeout(int writeTimeout)
735
 
   {
736
 
      this.writeTimeout = writeTimeout;
737
 
   }
738
 
 
739
 
   protected void configureSocket(Socket s) throws SocketException
740
 
   {
741
 
      s.setReuseAddress(getReuseAddress());
742
 
      
743
 
      if (keepAliveSet)           s.setKeepAlive(keepAlive);
744
 
      if (oOBInlineSet)           s.setOOBInline(oOBInline);
745
 
      if (receiveBufferSize > -1) s.setReceiveBufferSize(receiveBufferSize);
746
 
      if (sendBufferSize > -1)    s.setSendBufferSize(sendBufferSize);
747
 
      if (soLingerSet && 
748
 
            soLingerDuration > 0) s.setSoLinger(soLinger, soLingerDuration);
749
 
      if (trafficClass > -1)      s.setTrafficClass(trafficClass);
750
 
   }
751
 
   
752
 
   /**
753
 
    * The acceptor thread should spend as little time as possbile doing any kind of operation, and
754
 
    * under no circumstances should perform IO on the new socket, which can potentially block and
755
 
    * lock up the server. For this reason, the acceptor thread should grab a worker thread and
756
 
    * delegate all subsequent work to it.
757
 
    */
758
 
   protected void processInvocation(Socket socket) throws Exception
759
 
   {
760
 
      ServerThread worker = null;
761
 
      boolean newThread = false;
762
 
 
763
 
      synchronized(clientpool)
764
 
      {
765
 
         while(worker == null && running)
766
 
         {
767
 
            if(trace) { log.trace(this + " trying to get a worker thread from threadpool for processing"); }
768
 
 
769
 
            if(threadpool.size() > 0)
770
 
            {
771
 
               worker = (ServerThread)threadpool.removeFirst();
772
 
               if(trace) { log.trace(this + (worker == null ? " found NO threads in threadpool"
773
 
                                                            : " got " + worker + " from threadpool")); }
774
 
               
775
 
            }
776
 
            else if (trace) { { log.trace(this + " has an empty threadpool"); } }
777
 
 
778
 
            if(worker == null)
779
 
            {
780
 
               if(clientpool.size() < maxPoolSize)
781
 
               {
782
 
                  if(trace) { log.trace(this + " creating new worker thread"); }
783
 
                  worker = new ServerThread(socket, this, clientpool, threadpool,
784
 
                                            getTimeout(), writeTimeout, serverSocketClass);
785
 
                  if(trace) { log.trace(this + " created " + worker); }
786
 
                  newThread = true;
787
 
               }
788
 
 
789
 
               if(worker == null)
790
 
               {
791
 
                  if(trace) {log.trace(this + " trying to evict a thread from clientpool"); }
792
 
                  clientpool.evict();
793
 
                  clientpool.wait(1000);  // Keep trying, in case all threads are not evictable.
794
 
                  if(trace) { log.trace(this + " notified of clientpool thread availability"); }
795
 
               }
796
 
            }
797
 
         }
798
 
 
799
 
         if (!running)
800
 
         {
801
 
            return;
802
 
         }
803
 
         clientpool.insert(worker, worker);
804
 
      }
805
 
 
806
 
      if(newThread)
807
 
      {
808
 
         if(trace) {log.trace(this + " starting " + worker); }
809
 
         worker.start();
810
 
      }
811
 
      else
812
 
      {
813
 
         if(trace) { log.trace(this + " reusing " + worker); }
814
 
         worker.wakeup(socket, getTimeout(), this);
815
 
      }
816
 
   }
817
 
 
818
 
   /**
819
 
    * returns true if the transport is bi-directional in nature, for example,
820
 
    * SOAP in unidirectional and SOCKETs are bi-directional (unless behind a firewall
821
 
    * for example).
822
 
    */
823
 
   public boolean isTransportBiDirectional()
824
 
   {
825
 
      return true;
826
 
   }
827
 
 
828
 
   public String toString()
829
 
   {
830
 
      return "SocketServerInvoker[" + locator.getHomes() + "]";
831
 
   }
832
 
 
833
 
   /**
834
 
    * Each implementation of the remote client invoker should have
835
 
    * a default data type that is uses in the case it is not specified
836
 
    * in the invoker locator uri.
837
 
    */
838
 
   protected String getDefaultDataType()
839
 
   {
840
 
      return SerializableMarshaller.DATATYPE;
841
 
   }
842
 
 
843
 
   /**
844
 
    * this thread checks if a new ServerSocketFactory was set,<br>
845
 
    * if so initializes a serversocket refresh
846
 
    * @author Michael Voss
847
 
    *
848
 
    */
849
 
   public class ServerSocketRefresh extends Thread
850
 
   {  
851
 
      private boolean running = true;
852
 
      
853
 
      public ServerSocketRefresh()
854
 
      {
855
 
         super("ServerSocketRefresh");
856
 
      }
857
 
      
858
 
      public void run()
859
 
      {
860
 
         while(running)
861
 
         {
862
 
            synchronized (serverSocketFactoryLock)
863
 
            {  
864
 
               if(newServerSocketFactory)
865
 
               {
866
 
                  log.debug("got notice about new ServerSocketFactory");
867
 
                  try
868
 
                  {
869
 
                     log.debug("refreshing server socket");
870
 
                     refreshServerSocket();
871
 
                     log.debug("server socket refreshed");
872
 
                  } catch (IOException e)
873
 
                  {
874
 
                     log.error("could not refresh server socket", e);
875
 
                  }
876
 
               }
877
 
               
878
 
               try
879
 
               {
880
 
                  serverSocketFactoryLock.wait();
881
 
                  log.trace("ServerSocketRefresh thread woke up");
882
 
               }
883
 
               catch (InterruptedException e)
884
 
               {
885
 
               }
886
 
            }
887
 
         }
888
 
         log.debug("ServerSocketRefresh shutting down");
889
 
      }
890
 
 
891
 
      /**
892
 
       * Let SocketServerInvoker.run() resume when refresh is completed
893
 
       */
894
 
      public void release() throws InvalidStateException
895
 
      {
896
 
         synchronized (serverSocketFactoryLock)
897
 
         {
898
 
//            if (serverSocket == null)
899
 
//            {
900
 
//               throw new InvalidStateException("error refreshing ServerSocket");
901
 
//            }
902
 
            log.trace("passed through ServerSocketRefresh.release()");
903
 
         }
904
 
      }
905
 
      
906
 
      public void shutdown()
907
 
      {
908
 
         running = false;
909
 
         
910
 
         synchronized (serverSocketFactoryLock)
911
 
         {
912
 
            serverSocketFactoryLock.notify();
913
 
         }
914
 
      }
915
 
   }
916
 
 
917
 
   /**
918
 
    * The IdleTimerTask is used to periodically check the server threads to
919
 
    * see if any have been idle for a specified amount of time, and if so,
920
 
    * release those threads and their connections and clear from the server
921
 
    * thread pool.
922
 
    */
923
 
   public class IdleTimerTask extends TimerTask
924
 
   {
925
 
      public void run()
926
 
      {
927
 
         Object[] svrThreadArray = null;
928
 
         Set threadsToShutdown = new HashSet();
929
 
 
930
 
         synchronized(clientpool)
931
 
         {
932
 
            Set svrThreads = clientpool.getContents();
933
 
            svrThreadArray = svrThreads.toArray();
934
 
 
935
 
            if(trace)
936
 
            {
937
 
               if(svrThreadArray != null)
938
 
               {
939
 
                  log.trace("Idle timer task fired.  Number of ServerThreads = " + svrThreadArray.length);
940
 
               }
941
 
            }
942
 
 
943
 
            // iterate through pooled server threads and evict idle ones
944
 
            if(svrThreadArray != null)
945
 
            {
946
 
               long currentTime = System.currentTimeMillis();
947
 
 
948
 
               for(int x = 0; x < svrThreadArray.length; x++)
949
 
               {
950
 
                  ServerThread svrThread = (ServerThread)svrThreadArray[x];
951
 
 
952
 
                  // check the idle time and evict
953
 
                  long idleTime = currentTime - svrThread.getLastRequestTimestamp();
954
 
 
955
 
                  if(trace)
956
 
                  {
957
 
                     log.trace("Idle time for ServerThread (" + svrThread + ") is " + idleTime);
958
 
                  }
959
 
 
960
 
                  long idleTimeout = getIdleTimeout() * 1000;
961
 
                  if(idleTime > idleTimeout)
962
 
                  {
963
 
                     if(trace)
964
 
                     {
965
 
                        log.trace("Idle timeout reached for ServerThread (" + svrThread + ") and will be evicted.");
966
 
                     }
967
 
                     clientpool.remove(svrThread);
968
 
                     threadsToShutdown.add(svrThread);
969
 
//                     svrThread.shutdown();
970
 
//                     svrThread.unblock();
971
 
                  }
972
 
               }
973
 
            }
974
 
 
975
 
            // now check idle server threads in the thread pool
976
 
            svrThreadArray = null;
977
 
 
978
 
            if(threadpool.size() > 0)
979
 
            {
980
 
               // now need to check the tread pool to remove threads
981
 
               svrThreadArray = threadpool.toArray();
982
 
            }
983
 
 
984
 
            if(trace)
985
 
            {
986
 
               if(svrThreadArray != null)
987
 
               {
988
 
                  log.trace("Number of ServerThread in thead pool = " + svrThreadArray.length);
989
 
               }
990
 
            }
991
 
 
992
 
            if(svrThreadArray != null)
993
 
            {
994
 
               long currentTime = System.currentTimeMillis();
995
 
 
996
 
               for(int x = 0; x < svrThreadArray.length; x++)
997
 
               {
998
 
                  ServerThread svrThread = (ServerThread)svrThreadArray[x];
999
 
                  long idleTime = currentTime - svrThread.getLastRequestTimestamp();
1000
 
 
1001
 
                  if(trace)
1002
 
                  {
1003
 
                     log.trace("Idle time for ServerThread (" + svrThread + ") is " + idleTime);
1004
 
                  }
1005
 
 
1006
 
                  long idleTimeout = getIdleTimeout() * 1000;
1007
 
                  if(idleTime > idleTimeout)
1008
 
                  {
1009
 
                     if(trace)
1010
 
                     {
1011
 
                        log.trace("Idle timeout reached for ServerThread (" + svrThread + ") and will be removed from thread pool.");
1012
 
                     }
1013
 
                     threadpool.remove(svrThread);
1014
 
                     threadsToShutdown.add(svrThread);
1015
 
//                     svrThread.shutdown();
1016
 
                  }
1017
 
               }
1018
 
            }
1019
 
         }
1020
 
         
1021
 
         Iterator it = threadsToShutdown.iterator();
1022
 
         while (it.hasNext())
1023
 
         {
1024
 
            ServerThread svrThread = (ServerThread) it.next();
1025
 
            svrThread.shutdown();
1026
 
//            svrThread.unblock();
1027
 
         }
1028
 
      }
1029
 
   }
1030
 
   
1031
 
   public class AcceptThread extends Thread
1032
 
   {
1033
 
      ServerSocket serverSocket;
1034
 
      ServerSocketRefresh refreshThread; 
1035
 
      
1036
 
      public AcceptThread(ServerSocket serverSocket, ServerSocketRefresh refreshThread)
1037
 
      {
1038
 
         this.serverSocket = serverSocket;
1039
 
         this.refreshThread = refreshThread;
1040
 
         setName("AcceptorThread[" + serverSocket + "]");
1041
 
         if(trace) log.trace(SocketServerInvoker.this + " created " + this); 
1042
 
      }
1043
 
      
1044
 
      public void run()
1045
 
      {
1046
 
         if(trace) { log.trace(this + " started execution of method run()"); }
1047
 
 
1048
 
         while(running)
1049
 
         {
1050
 
            try
1051
 
            {
1052
 
               refreshThread.release(); //goes on if serversocket refresh is completed
1053
 
 
1054
 
               if(trace) { log.trace(this + " is going to wait on serverSocket.accept()"); }
1055
 
 
1056
 
               Socket socket = accept(serverSocket);
1057
 
               if(trace) { log.trace(this + " accepted " + socket); }
1058
 
 
1059
 
               // the acceptor thread should spend as little time as possbile doing any kind of
1060
 
               // operation, and under no circumstances should perform IO on the new socket, which
1061
 
               // can potentially block and lock up the server. For this reason, the acceptor thread
1062
 
               // should grab a worker thread and delegate all subsequent work to it. This is what
1063
 
               // processInvocation() does.
1064
 
 
1065
 
               configureSocket(socket);
1066
 
               processInvocation(socket);
1067
 
            }
1068
 
            catch (SSLException e)
1069
 
            {
1070
 
               log.error("SSLServerSocket error", e);
1071
 
               return;
1072
 
            }
1073
 
            catch (InvalidStateException e)
1074
 
            {
1075
 
               log.error("Cannot proceed without functioning server socket.  Shutting down", e);
1076
 
               return;
1077
 
            }
1078
 
            catch(Throwable ex)
1079
 
            {  
1080
 
               if(running)
1081
 
               {
1082
 
                  log.error(this + " failed to handle socket", ex);
1083
 
               }
1084
 
               else
1085
 
               {
1086
 
                  log.trace(this + " caught exception in run()", ex);     
1087
 
               }
1088
 
            }
1089
 
         }
1090
 
      }
1091
 
      
1092
 
      public  void shutdown()
1093
 
      {
1094
 
         try
1095
 
         {
1096
 
            serverSocket.close();
1097
 
         }
1098
 
         catch (IOException e)
1099
 
         {
1100
 
            log.debug(this + " error closing " + serverSocket, e);
1101
 
         }
1102
 
      }
1103
 
      
1104
 
      public ServerSocket getServerSocket()
1105
 
      {
1106
 
         return serverSocket;
1107
 
      }
1108
 
      
1109
 
      public void setServerSocket(ServerSocket serverSocket)
1110
 
      {
1111
 
         this.serverSocket = serverSocket;
1112
 
      }
1113
 
   }
1114
 
   
1115
 
   static private void mapJavaBeanProperties(final Object o, final Properties props, final boolean isStrict)
1116
 
   throws IntrospectionException
1117
 
   {
1118
 
      if (SecurityUtility.skipAccessControl())
1119
 
      {
1120
 
         PropertyEditors.mapJavaBeanProperties(o, props, isStrict);
1121
 
         return;
1122
 
      }
1123
 
 
1124
 
      try
1125
 
      {
1126
 
         AccessController.doPrivileged( new PrivilegedExceptionAction()
1127
 
         {
1128
 
            public Object run() throws IntrospectionException
1129
 
            {
1130
 
               PropertyEditors.mapJavaBeanProperties(o, props, isStrict);
1131
 
               return null;
1132
 
            }
1133
 
         });
1134
 
      }
1135
 
      catch (PrivilegedActionException e)
1136
 
      {
1137
 
         throw (IntrospectionException) e.getCause();
1138
 
      }
1139
 
   }
1140
 
   
1141
 
   static private Socket accept(final ServerSocket ss) throws IOException
1142
 
   {
1143
 
      if (SecurityUtility.skipAccessControl())
1144
 
      {
1145
 
         return ss.accept();
1146
 
      }
1147
 
      
1148
 
      try
1149
 
      {
1150
 
          return (Socket)AccessController.doPrivileged( new PrivilegedExceptionAction()
1151
 
          {
1152
 
             public Object run() throws Exception
1153
 
             {
1154
 
                 return ss.accept();
1155
 
             }
1156
 
          });
1157
 
      }
1158
 
      catch (PrivilegedActionException e)
1159
 
      {
1160
 
          throw (IOException) e.getCause();
1161
 
      }
1162
 
   }
1163
 
 
1164
 
   static private void bind(final ServerSocket ss, final SocketAddress address,
1165
 
                           final int backlog) throws IOException
1166
 
   {
1167
 
      if (SecurityUtility.skipAccessControl())
1168
 
      {
1169
 
         ss.bind(address, backlog);
1170
 
         return;
1171
 
      }
1172
 
      
1173
 
      try
1174
 
      {
1175
 
          AccessController.doPrivileged( new PrivilegedExceptionAction()
1176
 
          {
1177
 
             public Object run() throws Exception
1178
 
             {
1179
 
                ss.bind(address, backlog);
1180
 
                return null;
1181
 
             }
1182
 
          });
1183
 
      }
1184
 
      catch (PrivilegedActionException e)
1185
 
      {
1186
 
          throw (IOException) e.getCause();
1187
 
      }
1188
 
   }
1189
 
   
1190
 
   static private InetAddress getAddressByName(final String host) throws UnknownHostException
1191
 
   {
1192
 
      if (SecurityUtility.skipAccessControl())
1193
 
      {
1194
 
         return InetAddress.getByName(host);
1195
 
      }
1196
 
      
1197
 
      try
1198
 
      {
1199
 
         return (InetAddress)AccessController.doPrivileged( new PrivilegedExceptionAction()
1200
 
         {
1201
 
            public Object run() throws IOException
1202
 
            {
1203
 
               return InetAddress.getByName(host);
1204
 
            }
1205
 
         });
1206
 
      }
1207
 
      catch (PrivilegedActionException e)
1208
 
      {
1209
 
         throw (UnknownHostException) e.getCause();
1210
 
      }
1211
 
   }
1212
 
}