~ubuntu-branches/ubuntu/raring/libjboss-remoting-java/raring

« back to all changes in this revision

Viewing changes to src/main/org/jboss/remoting/transport/bisocket/BisocketServerInvoker.java

  • Committer: Package Import Robot
  • Author(s): Torsten Werner
  • Date: 2011-09-09 14:01:03 UTC
  • mfrom: (1.1.6 upstream)
  • Revision ID: package-import@ubuntu.com-20110909140103-hqokx61534tas9rg
Tags: 2.5.3.SP1-1
* Newer but not newest upstream release. Do not build samples.
* Change debian/watch to upstream's svn repo.
* Add patch to fix compile error caused by tomcat update.
  (Closes: #628303)
* Switch to source format 3.0.
* Switch to debhelper level 7.
* Remove useless Depends.
* Update Standards-Version: 3.9.2.
* Update README.source.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
* JBoss, Home of Professional Open Source
 
3
* Copyright 2005, JBoss Inc., and individual contributors as indicated
 
4
* by the @authors tag. See the copyright.txt in the distribution for a
 
5
* full listing of individual contributors.
 
6
*
 
7
* This is free software; you can redistribute it and/or modify it
 
8
* under the terms of the GNU Lesser General Public License as
 
9
* published by the Free Software Foundation; either version 2.1 of
 
10
* the License, or (at your option) any later version.
 
11
*
 
12
* This software is distributed in the hope that it will be useful,
 
13
* but WITHOUT ANY WARRANTY; without even the implied warranty of
 
14
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
 
15
* Lesser General Public License for more details.
 
16
*
 
17
* You should have received a copy of the GNU Lesser General Public
 
18
* License along with this software; if not, write to the Free
 
19
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
 
20
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
 
21
*/
 
22
 
 
23
package org.jboss.remoting.transport.bisocket;
 
24
 
 
25
import java.io.DataInputStream;
 
26
import java.io.DataOutputStream;
 
27
import java.io.IOException;
 
28
import java.lang.reflect.Method;
 
29
import java.net.InetAddress;
 
30
import java.net.ServerSocket;
 
31
import java.net.Socket;
 
32
import java.security.AccessController;
 
33
import java.security.PrivilegedActionException;
 
34
import java.security.PrivilegedExceptionAction;
 
35
import java.util.ArrayList;
 
36
import java.util.Collection;
 
37
import java.util.Collections;
 
38
import java.util.HashMap;
 
39
import java.util.HashSet;
 
40
import java.util.Iterator;
 
41
import java.util.LinkedList;
 
42
import java.util.List;
 
43
import java.util.Map;
 
44
import java.util.Set;
 
45
import java.util.StringTokenizer;
 
46
import java.util.Timer;
 
47
import java.util.TimerTask;
 
48
 
 
49
import org.jboss.logging.Logger;
 
50
import org.jboss.remoting.Client;
 
51
import org.jboss.remoting.Home;
 
52
import org.jboss.remoting.InvocationRequest;
 
53
import org.jboss.remoting.InvokerLocator;
 
54
import org.jboss.remoting.Remoting;
 
55
import org.jboss.remoting.ServerInvocationHandler;
 
56
import org.jboss.remoting.invocation.InternalInvocation;
 
57
import org.jboss.remoting.socketfactory.CreationListenerServerSocket;
 
58
import org.jboss.remoting.socketfactory.CreationListenerSocketFactory;
 
59
import org.jboss.remoting.socketfactory.SocketCreationListener;
 
60
import org.jboss.remoting.transport.PortUtil;
 
61
import org.jboss.remoting.transport.socket.LRUPool;
 
62
import org.jboss.remoting.transport.socket.SocketServerInvoker;
 
63
import org.jboss.remoting.util.SecurityUtility;
 
64
 
 
65
 
 
66
/**
 
67
 *
 
68
 * @author <a href="ron.sigal@jboss.com">Ron Sigal</a>
 
69
 * @version $Revision: 5605 $
 
70
 * <p>
 
71
 * Copyright Nov 23, 2006
 
72
 * </p>
 
73
 */
 
74
public class BisocketServerInvoker extends SocketServerInvoker
 
75
{
 
76
   private static final Logger log = Logger.getLogger(BisocketServerInvoker.class);
 
77
 
 
78
   private static Map listenerIdToServerInvokerMap = Collections.synchronizedMap(new HashMap());
 
79
   private static Timer timer;
 
80
   private static Object timerLock = new Object();
 
81
 
 
82
   private Map listenerIdToInvokerLocatorMap = Collections.synchronizedMap(new HashMap());
 
83
   private Set secondaryServerSockets = new HashSet();
 
84
   private InvokerLocator secondaryLocator;
 
85
   private Set secondaryServerSocketThreads = new HashSet();
 
86
   private Map controlConnectionThreadMap = new HashMap();
 
87
   private Map controlConnectionRestartsMap = Collections.synchronizedMap(new HashMap());
 
88
   private int pingFrequency = Bisocket.PING_FREQUENCY_DEFAULT;
 
89
   private int pingWindowFactor = Bisocket.PING_WINDOW_FACTOR_DEFAULT;
 
90
   private int pingWindow = pingWindowFactor * pingFrequency;
 
91
   private int socketCreationRetries = Bisocket.MAX_RETRIES_DEFAULT;
 
92
   private int controlConnectionRestarts = Bisocket.MAX_CONTROL_CONNECTION_RESTARTS_DEFAULT;
 
93
   private ControlMonitorTimerTask controlMonitorTimerTask;
 
94
   protected boolean isCallbackServer = false;
 
95
   protected List secondaryBindPorts = new ArrayList();
 
96
   protected List secondaryConnectPorts = new ArrayList();
 
97
 
 
98
 
 
99
   public static BisocketServerInvoker getBisocketServerInvoker(String listenerId)
 
100
   {
 
101
      return (BisocketServerInvoker) listenerIdToServerInvokerMap.get(listenerId);
 
102
   }
 
103
 
 
104
 
 
105
   public BisocketServerInvoker(InvokerLocator locator)
 
106
   {
 
107
      super(locator);
 
108
   }
 
109
 
 
110
 
 
111
   public BisocketServerInvoker(InvokerLocator locator, Map configuration)
 
112
   {
 
113
      super(locator, configuration);
 
114
   }
 
115
 
 
116
 
 
117
   public void start() throws IOException
 
118
   {
 
119
      if (isCallbackServer)
 
120
      {
 
121
         Object val = configuration.get(Bisocket.MAX_RETRIES);
 
122
         if (val != null)
 
123
         {
 
124
            try
 
125
            {
 
126
               int nVal = Integer.valueOf((String) val).intValue();
 
127
               socketCreationRetries = nVal;
 
128
               log.debug("Setting socket creation retry limit: " + socketCreationRetries);
 
129
            }
 
130
            catch (Exception e)
 
131
            {
 
132
               log.warn("Could not convert " + Bisocket.MAX_RETRIES +
 
133
                     " value of " + val + " to an int value.");
 
134
            }
 
135
         }
 
136
         
 
137
         val = configuration.get(Bisocket.MAX_CONTROL_CONNECTION_RESTARTS);
 
138
         if (val != null)
 
139
         {
 
140
            try
 
141
            {
 
142
               int nVal = Integer.valueOf((String) val).intValue();
 
143
               controlConnectionRestarts = nVal;
 
144
               log.debug("Setting control connection restart limit: " + controlConnectionRestarts);
 
145
            }
 
146
            catch (Exception e)
 
147
            {
 
148
               log.warn("Could not convert " + Bisocket.MAX_CONTROL_CONNECTION_RESTARTS +
 
149
                     " value of " + val + " to an int value.");
 
150
            }
 
151
         }
 
152
         
 
153
         if(maxPoolSize <= 0)
 
154
         {
 
155
            maxPoolSize = MAX_POOL_SIZE_DEFAULT;
 
156
         }
 
157
         clientpool = new LRUPool(2, maxPoolSize);
 
158
         clientpool.create();
 
159
         threadpool = new LinkedList();
 
160
         checkSocketFactoryWrapper();
 
161
 
 
162
         if (pingFrequency > 0)
 
163
         {
 
164
            controlMonitorTimerTask = new ControlMonitorTimerTask(this);
 
165
            synchronized (timerLock)
 
166
            {
 
167
               if (timer == null)
 
168
               {
 
169
                  timer = new Timer(true);
 
170
               }
 
171
               try
 
172
               {
 
173
                  timer.schedule(controlMonitorTimerTask, pingFrequency, pingFrequency);
 
174
               }
 
175
               catch (IllegalStateException e)
 
176
               {
 
177
                  log.debug("Unable to schedule TimerTask on existing Timer", e);
 
178
                  timer = new Timer(true);
 
179
                  timer.schedule(controlMonitorTimerTask, pingFrequency, pingFrequency);
 
180
               }
 
181
            }
 
182
         }
 
183
         
 
184
         running = true;
 
185
         started = true;
 
186
      }
 
187
      else
 
188
      {
 
189
         super.start();
 
190
         
 
191
         if (serverSockets.size() < secondaryBindPorts.size())
 
192
            log.warn(this + " extra secondary bind ports will be ignored");
 
193
         else if (serverSockets.size() > secondaryBindPorts.size())
 
194
            log.warn(this + " not enough secondary bind ports: will use anonymous ports as necessary");
 
195
        
 
196
         if (secondaryConnectPorts.size() == 0)
 
197
         {
 
198
            secondaryConnectPorts = secondaryBindPorts;
 
199
         }
 
200
         else if(secondaryConnectPorts.size() != secondaryBindPorts.size())
 
201
         {
 
202
            log.warn(this + " number of secondary connect ports != number of secondary bind ports");
 
203
            log.warn(this + " will ignore secondary connect ports");
 
204
            secondaryConnectPorts = secondaryBindPorts;
 
205
         }         
 
206
 
 
207
         int i = 0;
 
208
         Iterator it = serverSockets.iterator();
 
209
         while (it.hasNext())
 
210
         {
 
211
            ServerSocket ss = (ServerSocket) it.next();
 
212
            final InetAddress host = ss.getInetAddress();
 
213
            int secondaryBindPort = -1;
 
214
            if (secondaryBindPorts.size() > i)
 
215
            {
 
216
               secondaryBindPort = ((Integer) secondaryBindPorts.get(i)).intValue();
 
217
            }
 
218
            else
 
219
            {
 
220
               secondaryBindPorts.add(new Integer(-1));
 
221
            }
 
222
            if (secondaryBindPort < 0)
 
223
            {
 
224
               secondaryBindPort = PortUtil.findFreePort(host.getHostAddress());
 
225
               secondaryBindPorts.set(i, new Integer(secondaryBindPort));
 
226
            }
 
227
            
 
228
            ServerSocket secondaryServerSocket = null;
 
229
            final int finalBindPort = secondaryBindPort;
 
230
            
 
231
            try
 
232
            {
 
233
               secondaryServerSocket = (ServerSocket) AccessController.doPrivileged( new PrivilegedExceptionAction()
 
234
               {
 
235
                  public Object run() throws Exception
 
236
                  {
 
237
                     ServerSocket ss = null;
 
238
                     if (serverSocketFactory != null)
 
239
                     {
 
240
                        ss = serverSocketFactory.createServerSocket(finalBindPort, 0, host);
 
241
                     }
 
242
                     else
 
243
                     {
 
244
                        ss = new ServerSocket(finalBindPort, 0, host);
 
245
                     }
 
246
                     return ss;
 
247
                  }
 
248
               });
 
249
            }
 
250
            catch (PrivilegedActionException e)
 
251
            {
 
252
               throw (IOException) e.getCause();
 
253
            }
 
254
            
 
255
            ss = checkSecondaryServerSocketWrapper(secondaryServerSocket);
 
256
            secondaryServerSockets.add(ss);
 
257
            log.debug(this + " created secondary " + ss);
 
258
            i++;
 
259
         }
 
260
         
 
261
         i = 0;
 
262
         it = secondaryServerSockets.iterator();
 
263
         while (it.hasNext())
 
264
         {
 
265
            ServerSocket secondaryServerSocket = (ServerSocket) it.next();
 
266
            Thread t = new SecondaryServerSocketThread(secondaryServerSocket);
 
267
            t.setName("secondaryServerSocketThread[" + i++ + "]");
 
268
            t.setDaemon(true);
 
269
            t.start();
 
270
            secondaryServerSocketThreads.add(t);
 
271
            log.debug(this + " created " + t);
 
272
         }
 
273
         
 
274
         if (getLocator().isMultihome())
 
275
         {
 
276
            int j = 0;
 
277
            String host = ((Home) connectHomes.get(j)).host;
 
278
            int port = ((Integer) secondaryConnectPorts.get(j)).intValue();
 
279
            if (port < 0)
 
280
               port = ((Integer) secondaryBindPorts.get(j)).intValue();
 
281
            StringBuffer sb = new StringBuffer(host).append(':').append(port);
 
282
            for (j = 1; j < connectHomes.size(); j++)
 
283
            {
 
284
               host = ((Home) connectHomes.get(j)).host;
 
285
               port = ((Integer) secondaryConnectPorts.get(j)).intValue();
 
286
               if (port < 0)
 
287
                  port = ((Integer) secondaryBindPorts.get(j)).intValue();
 
288
               sb.append('!').append(host).append(':').append(port);
 
289
            }
 
290
            
 
291
            Map params = new HashMap();
 
292
            params.put(InvokerLocator.HOMES_KEY, sb.toString());
 
293
            secondaryLocator = new InvokerLocator(null, InvokerLocator.MULTIHOME, -1, null, params);
 
294
         }
 
295
         else
 
296
         {
 
297
            String connectAddress = getLocator().getHost();
 
298
            int connectPort = ((Integer) secondaryConnectPorts.get(0)).intValue();
 
299
            if (connectPort < 0)
 
300
               connectPort = ((Integer) secondaryBindPorts.get(0)).intValue();
 
301
            secondaryLocator = new InvokerLocator(null, connectAddress, connectPort, null, null);
 
302
         }
 
303
 
 
304
         log.debug(this + " created secondary InvokerLocator: " + secondaryLocator);
 
305
      }
 
306
   }
 
307
 
 
308
 
 
309
   public boolean isTransportBiDirectional()
 
310
   {
 
311
      return true;
 
312
   }
 
313
 
 
314
 
 
315
   public void createControlConnection(String listenerId, boolean firstConnection)
 
316
   throws IOException
 
317
   {
 
318
      BisocketClientInvoker clientInvoker = BisocketClientInvoker.getBisocketClientInvoker(listenerId);
 
319
      
 
320
      if (clientInvoker == null)
 
321
      {
 
322
         log.debug("Unable to retrieve client invoker: must have disconnected");
 
323
         throw new ClientUnavailableException();
 
324
      }
 
325
      
 
326
      InvokerLocator oldLocator = (InvokerLocator) listenerIdToInvokerLocatorMap.get(listenerId);
 
327
      InvokerLocator newLocator = null;
 
328
      
 
329
      try
 
330
      {
 
331
         newLocator = clientInvoker.getSecondaryLocator();
 
332
      }
 
333
      catch (Throwable t)
 
334
      {
 
335
         log.debug("unable to get secondary locator", t);
 
336
         throw new IOException("unable to get secondary locator: " + t.getMessage());
 
337
      }
 
338
      
 
339
 
 
340
      // If a server restarts, it is likely that it creates a new secondary server socket on
 
341
      // a different port.  It will possible to recreate the control connection, but if
 
342
      // there is no PingTimerTask running in the new server to keep it alive, it will just
 
343
      // die again.  Once a new secondary server socket address is detected, a count is kept
 
344
      // of the number of times the control connection is restarted, and when it hits a
 
345
      // configured maximum, it is allowed to die.  See JBREM-731.
 
346
      
 
347
      boolean locatorChanged = !newLocator.equals(oldLocator);
 
348
      listenerIdToInvokerLocatorMap.put(listenerId, newLocator);
 
349
      
 
350
      String host = newLocator.getHost();
 
351
      int port = newLocator.getPort();
 
352
      if (newLocator.isMultihome())
 
353
      {
 
354
         host = clientInvoker.getHomeInUse().host;
 
355
         port = -1;
 
356
         Iterator it = null;
 
357
         if (newLocator.getConnectHomeList().isEmpty())
 
358
            it =  newLocator.getHomeList().iterator();
 
359
         else
 
360
            it = newLocator.getConnectHomeList().iterator();
 
361
         
 
362
         while (it.hasNext())
 
363
         {
 
364
            Home h = (Home) it.next();
 
365
            if (host.equals(h.host))
 
366
            {
 
367
               port = h.port;
 
368
               newLocator.setHomeInUse(h);
 
369
               break;
 
370
            }
 
371
         }
 
372
      }
 
373
      
 
374
      if (port == -1)
 
375
      {
 
376
         throw new IOException("Cannot find matching home for control connection");
 
377
      }
 
378
      
 
379
      log.debug("creating control connection: " + newLocator);
 
380
 
 
381
      Socket socket = null;
 
382
      IOException savedException = null;
 
383
      final String finalHost = host;
 
384
      final int finalPort = port;
 
385
      
 
386
      for (int i = 0; i < socketCreationRetries; i++)
 
387
      {
 
388
         try
 
389
         {
 
390
            socket = (Socket) AccessController.doPrivileged( new PrivilegedExceptionAction()
 
391
            {
 
392
               public Object run() throws Exception
 
393
               {
 
394
                  Socket s = null;
 
395
                  if (socketFactory != null)
 
396
                     s = socketFactory.createSocket(finalHost, finalPort);
 
397
                  else
 
398
                     s = new Socket(finalHost, finalPort);
 
399
                  return s;
 
400
               }
 
401
            });
 
402
         }
 
403
         catch (PrivilegedActionException e)
 
404
         {
 
405
            IOException ioe = (IOException) e.getCause();
 
406
            log.debug("Error creating a control socket", ioe);
 
407
            savedException = ioe;
 
408
         }
 
409
         
 
410
         if (socket != null)
 
411
            break;
 
412
         
 
413
         try
 
414
         {
 
415
            Thread.sleep(1000);
 
416
         }
 
417
         catch (InterruptedException e)
 
418
         {
 
419
            log.debug("received interrupt");
 
420
         }
 
421
      }
 
422
 
 
423
      if (socket == null)
 
424
      {
 
425
         log.debug("unable to create control connection after "
 
426
                   + socketCreationRetries + " retries", savedException);
 
427
         throw savedException;
 
428
      }
 
429
      
 
430
      DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
 
431
      if (firstConnection)
 
432
      {
 
433
         dos.write(Bisocket.CREATE_CONTROL_SOCKET);
 
434
      }
 
435
      else
 
436
      {
 
437
         dos.write(Bisocket.RECREATE_CONTROL_SOCKET);
 
438
      }
 
439
      dos.writeUTF(listenerId);
 
440
      
 
441
      Thread thread = new ControlConnectionThread(socket, listenerId);
 
442
      thread.setName("control: " + socket.toString());
 
443
      thread.setDaemon(true);
 
444
 
 
445
      synchronized (controlConnectionThreadMap)
 
446
      {
 
447
         controlConnectionThreadMap.put(listenerId, thread);
 
448
      }
 
449
      
 
450
      Object o = controlConnectionRestartsMap.get(listenerId);
 
451
      if (o != null)
 
452
      {
 
453
         int restarts = ((Integer) o).intValue();
 
454
         if (locatorChanged || restarts > 0)
 
455
         controlConnectionRestartsMap.put(listenerId, new Integer(++restarts));
 
456
      }
 
457
      else
 
458
      {
 
459
         controlConnectionRestartsMap.put(listenerId, new Integer(0));
 
460
      }
 
461
 
 
462
      thread.start();
 
463
      log.debug(this + " created control connection (" + listenerId + "): " + socket.toString());
 
464
   }
 
465
 
 
466
 
 
467
   public void destroyControlConnection(String listenerId)
 
468
   {
 
469
      Thread t = null;
 
470
      
 
471
      synchronized (controlConnectionThreadMap)
 
472
      {
 
473
         t = (Thread) controlConnectionThreadMap.remove(listenerId);
 
474
      }
 
475
      
 
476
      if (t != null)
 
477
      {
 
478
         ((ControlConnectionThread)t).shutdown();
 
479
         log.debug(this + " shutting down control connection: " + listenerId);
 
480
      }
 
481
      else
 
482
      {
 
483
         log.debug("unrecognized listener ID: " + listenerId);
 
484
      }
 
485
      
 
486
      listenerIdToInvokerLocatorMap.remove(listenerId);
 
487
      controlConnectionRestartsMap.remove(listenerId);
 
488
   }
 
489
   
 
490
   
 
491
   public int getControlConnectionRestarts()
 
492
   {
 
493
      return controlConnectionRestarts;
 
494
   }
 
495
   
 
496
   
 
497
   public void setControlConnectionRestarts(int controlConnectionRestarts)
 
498
   {
 
499
      this.controlConnectionRestarts = controlConnectionRestarts;
 
500
   }
 
501
   
 
502
   
 
503
   public int getPingFrequency()
 
504
   {
 
505
      return pingFrequency;
 
506
   }
 
507
 
 
508
 
 
509
   public void setPingFrequency(int pingFrequency)
 
510
   {
 
511
      this.pingFrequency = pingFrequency;
 
512
      pingWindow = pingWindowFactor * pingFrequency;
 
513
   }
 
514
   
 
515
   
 
516
   public int getPingWindowFactor()
 
517
   {
 
518
      return pingWindowFactor;
 
519
   }
 
520
   
 
521
   
 
522
   public void setPingWindowFactor(int pingWindowFactor)
 
523
   {
 
524
      this.pingWindowFactor = pingWindowFactor;
 
525
      pingWindow = pingWindowFactor * pingFrequency;
 
526
   }
 
527
   
 
528
   
 
529
   public int getSecondaryBindPort()
 
530
   {
 
531
      if (secondaryBindPorts.size() == 0 || secondaryBindPorts.size() > 1)
 
532
         return -1;
 
533
      
 
534
      return ((Integer) secondaryBindPorts.get(0)).intValue();
 
535
   }
 
536
   
 
537
   
 
538
   public void setSecondaryBindPort(int secondaryPort)
 
539
   {
 
540
      secondaryBindPorts.clear();
 
541
      secondaryBindPorts.add(new Integer(secondaryPort));
 
542
   }
 
543
   
 
544
   
 
545
   public List getSecondaryBindPorts()
 
546
   {
 
547
      return new ArrayList(secondaryBindPorts);
 
548
   }
 
549
 
 
550
 
 
551
   public void setSecondaryBindPorts(List secondaryBindPorts)
 
552
   {
 
553
      this.secondaryBindPorts = secondaryBindPorts;
 
554
   }
 
555
   
 
556
   
 
557
   public void setSecondaryBindPorts(String secondaryBindPortString)
 
558
   {
 
559
      StringTokenizer tok = new StringTokenizer(secondaryBindPortString, "!");
 
560
      String token = null;
 
561
      while (tok.hasMoreTokens())
 
562
      {
 
563
         try
 
564
         {
 
565
            token = tok.nextToken();
 
566
            secondaryBindPorts.add(Integer.valueOf(token));
 
567
         }
 
568
         catch (NumberFormatException e)
 
569
         {
 
570
            log.warn("Invalid format for " + "\"" + Bisocket.SECONDARY_BIND_PORT + "\": " + token);
 
571
            secondaryBindPorts.add(new Integer(-1));
 
572
         }
 
573
      }
 
574
   }
 
575
 
 
576
 
 
577
   public int getSecondaryConnectPort()
 
578
   {
 
579
      if (secondaryConnectPorts.size() ==  0 || secondaryConnectPorts.size() > 1)
 
580
         return -1;
 
581
      
 
582
      return ((Integer) secondaryConnectPorts.get(0)).intValue();
 
583
   }
 
584
   
 
585
   
 
586
   public void setSecondaryConnectPort(int secondaryConnectPort)
 
587
   {
 
588
      secondaryConnectPorts.clear();
 
589
      secondaryConnectPorts.add(new Integer(secondaryConnectPort));
 
590
   }
 
591
   
 
592
   
 
593
   public List getSecondaryConnectPorts()
 
594
   {
 
595
      return new ArrayList(secondaryConnectPorts);
 
596
   }
 
597
 
 
598
 
 
599
   public void setSecondaryConnectPorts(List secondaryConnectPorts)
 
600
   {
 
601
      this.secondaryConnectPorts = secondaryConnectPorts;
 
602
   }
 
603
   
 
604
   
 
605
   public void setSecondaryConnectPorts(String secondaryConnectPortString)
 
606
   {
 
607
      StringTokenizer tok = new StringTokenizer(secondaryConnectPortString, "!");
 
608
      String token = null;
 
609
      while (tok.hasMoreTokens())
 
610
      {
 
611
         try
 
612
         {
 
613
            token = tok.nextToken();
 
614
            secondaryConnectPorts.add(Integer.valueOf(token));
 
615
         }
 
616
         catch (NumberFormatException e)
 
617
         {
 
618
            log.warn("Invalid format for " + "\"" + Bisocket.SECONDARY_CONNECT_PORT + "\": " + token);
 
619
            secondaryConnectPorts.add(new Integer(-1));
 
620
         }
 
621
      }
 
622
   }
 
623
   
 
624
   
 
625
   public int getSocketCreationRetries()
 
626
   {
 
627
      return socketCreationRetries;
 
628
   }
 
629
   
 
630
   
 
631
   public void setSocketCreationRetries(int socketCreationRetries)
 
632
   {
 
633
      this.socketCreationRetries = socketCreationRetries;
 
634
   }
 
635
   
 
636
   
 
637
   protected void setup() throws Exception
 
638
   {  
 
639
      Object o = configuration.get(Bisocket.IS_CALLBACK_SERVER);
 
640
      if (o != null)
 
641
      {
 
642
         if (o instanceof String)
 
643
            isCallbackServer = Boolean.valueOf((String) o).booleanValue();
 
644
         else if (o instanceof Boolean)
 
645
            isCallbackServer = ((Boolean) o).booleanValue();
 
646
         else
 
647
            log.error("unrecognized value for configuration key \"" +
 
648
                  Bisocket.IS_CALLBACK_SERVER + "\": " + o);
 
649
      }
 
650
      
 
651
      super.setup();
 
652
      
 
653
      o = configuration.get(Bisocket.PING_FREQUENCY);
 
654
      if (o instanceof String && ((String) o).length() > 0)
 
655
      {
 
656
            try
 
657
            {
 
658
               pingFrequency = Integer.valueOf(((String) o)).intValue();
 
659
               log.debug(this + " setting pingFrequency to " + pingFrequency);
 
660
            }
 
661
            catch (NumberFormatException e)
 
662
            {
 
663
               log.warn("Invalid format for " + "\"" + Bisocket.PING_FREQUENCY + "\": " + o);
 
664
            }
 
665
      }
 
666
      else if (o != null)
 
667
      {
 
668
         log.warn("\"" + Bisocket.PING_FREQUENCY + "\" must be specified as a String");
 
669
      }
 
670
      
 
671
      o = configuration.get(Bisocket.PING_WINDOW_FACTOR);
 
672
      if (o instanceof String && ((String) o).length() > 0)
 
673
      {
 
674
            try
 
675
            {
 
676
               pingWindowFactor = Integer.valueOf(((String) o)).intValue();
 
677
               log.debug(this + " setting pingWindowFactor to " + pingWindowFactor);
 
678
            }
 
679
            catch (NumberFormatException e)
 
680
            {
 
681
               log.warn("Invalid format for " + "\"" + Bisocket.PING_WINDOW_FACTOR + "\": " + o);
 
682
            }
 
683
      }
 
684
      else if (o != null)
 
685
      {
 
686
         log.warn("\"" + Bisocket.PING_WINDOW_FACTOR + "\" must be specified as a String");
 
687
      }
 
688
 
 
689
      pingWindow = pingWindowFactor * pingFrequency;
 
690
 
 
691
      o = configuration.get(Bisocket.SECONDARY_BIND_PORTS);
 
692
      if (o instanceof String && ((String) o).length() > 0)
 
693
      {
 
694
         setSecondaryBindPorts((String) o);
 
695
      }
 
696
      else if (o instanceof List)
 
697
      {
 
698
         setSecondaryBindPorts((List) o);
 
699
      }
 
700
      else if (o != null)
 
701
      {
 
702
         log.warn("\"" + Bisocket.SECONDARY_BIND_PORTS + "\" must be specified as a String or a List");
 
703
      }
 
704
      
 
705
      o = configuration.get(Bisocket.SECONDARY_CONNECT_PORTS);
 
706
      if (o instanceof String && ((String) o).length() > 0)
 
707
      {
 
708
         setSecondaryConnectPorts((String) o);
 
709
      }
 
710
      else if (o instanceof List)
 
711
      {
 
712
         setSecondaryConnectPorts((List) o);
 
713
      }
 
714
      else if (o != null)
 
715
      {
 
716
         log.warn("\"" + Bisocket.SECONDARY_CONNECT_PORTS + "\" must be specified as a String or a List");
 
717
      }
 
718
 
 
719
      if (secondaryBindPorts.isEmpty())
 
720
      {
 
721
         for (int i = 0; i < homes.size(); i++)
 
722
            secondaryBindPorts.add(new Integer(-1));
 
723
      }
 
724
      
 
725
      if (secondaryConnectPorts.isEmpty())
 
726
      {
 
727
         secondaryConnectPorts = new ArrayList(secondaryBindPorts);
 
728
      }
 
729
      
 
730
      if (isCallbackServer)
 
731
      {
 
732
         socketFactory = createSocketFactory(configuration);
 
733
      }
 
734
   }
 
735
 
 
736
 
 
737
   protected void cleanup()
 
738
   {
 
739
      synchronized (controlConnectionThreadMap)
 
740
      {
 
741
         Iterator it = controlConnectionThreadMap.values().iterator();
 
742
         while (it.hasNext())
 
743
         {
 
744
            ControlConnectionThread t = (ControlConnectionThread) it.next();
 
745
            it.remove();
 
746
            t.shutdown();
 
747
         }
 
748
      }
 
749
 
 
750
      super.cleanup();
 
751
 
 
752
      if (controlMonitorTimerTask != null)
 
753
         controlMonitorTimerTask.shutdown();
 
754
      
 
755
      Iterator it = secondaryServerSocketThreads.iterator();
 
756
      while (it.hasNext())
 
757
      {
 
758
         SecondaryServerSocketThread t = (SecondaryServerSocketThread) it.next();
 
759
         t.shutdown();
 
760
      }
 
761
      
 
762
      it = secondaryServerSockets.iterator();
 
763
      while (it.hasNext())
 
764
      {
 
765
         try
 
766
         {
 
767
            ServerSocket ss = (ServerSocket) it.next();
 
768
            ss.close();
 
769
         }
 
770
         catch (IOException e)
 
771
         {
 
772
            log.info("Error closing secondary server socket: " + e.getMessage());
 
773
         }
 
774
      }
 
775
      
 
776
      secondaryBindPorts.clear();
 
777
      secondaryConnectPorts.clear();
 
778
   }
 
779
 
 
780
 
 
781
   protected InvokerLocator getSecondaryLocator()
 
782
   {
 
783
      return secondaryLocator;
 
784
   }
 
785
 
 
786
 
 
787
   protected void checkSocketFactoryWrapper() throws IOException
 
788
   {
 
789
 
 
790
      Object o = configuration.get(Remoting.SOCKET_CREATION_SERVER_LISTENER);
 
791
      if (o != null)
 
792
      {
 
793
         if (o instanceof SocketCreationListener)
 
794
         {
 
795
            SocketCreationListener listener = (SocketCreationListener) o;
 
796
            if (socketFactory instanceof CreationListenerSocketFactory)
 
797
            {
 
798
               CreationListenerSocketFactory clsf = (CreationListenerSocketFactory) socketFactory;
 
799
               clsf.setListener(listener);
 
800
            }
 
801
            else
 
802
            {
 
803
               socketFactory = new CreationListenerSocketFactory(socketFactory, listener);
 
804
            }
 
805
         }
 
806
         else
 
807
         {
 
808
            log.error("socket creation listener of invalid type: " + o);
 
809
         }
 
810
      }
 
811
      else
 
812
      {
 
813
         if (socketFactory instanceof CreationListenerSocketFactory)
 
814
         {
 
815
            CreationListenerSocketFactory clsf = (CreationListenerSocketFactory) socketFactory;
 
816
            socketFactory = clsf.getFactory();
 
817
         }
 
818
      }
 
819
   }
 
820
 
 
821
 
 
822
   protected ServerSocket checkSecondaryServerSocketWrapper(ServerSocket secondaryServerSocket) throws IOException
 
823
   {
 
824
      Object o = configuration.get(Remoting.SOCKET_CREATION_CLIENT_LISTENER);
 
825
      if (o != null)
 
826
      {
 
827
         if (o instanceof SocketCreationListener)
 
828
         {
 
829
            SocketCreationListener listener = (SocketCreationListener) o;
 
830
            if (secondaryServerSocket instanceof CreationListenerServerSocket)
 
831
            {
 
832
               CreationListenerServerSocket clss = (CreationListenerServerSocket) secondaryServerSocket;
 
833
               clss.setListener(listener);
 
834
            }
 
835
            else
 
836
            {
 
837
               secondaryServerSocket = new CreationListenerServerSocket(secondaryServerSocket, listener);
 
838
            }
 
839
         }
 
840
         else
 
841
         {
 
842
            log.error("socket creation listener of invalid type: " + o);
 
843
         }
 
844
      }
 
845
      else
 
846
      {
 
847
         if (secondaryServerSocket instanceof CreationListenerServerSocket)
 
848
         {
 
849
            CreationListenerServerSocket clss = (CreationListenerServerSocket) secondaryServerSocket;
 
850
            secondaryServerSocket = clss.getServerSocket();
 
851
         }
 
852
      }
 
853
      
 
854
      return secondaryServerSocket;
 
855
   }
 
856
 
 
857
 
 
858
   protected Object handleInternalInvocation(InternalInvocation ii,
 
859
                                             InvocationRequest ir,
 
860
                                             ServerInvocationHandler handler)
 
861
   throws Throwable
 
862
   {
 
863
      if(Bisocket.GET_SECONDARY_INVOKER_LOCATOR.equals(ii.getMethodName()))
 
864
      {
 
865
         return secondaryLocator;
 
866
      }
 
867
 
 
868
      Object response = super.handleInternalInvocation(ii, ir, handler);
 
869
 
 
870
      if(InternalInvocation.ADDCLIENTLISTENER.equals(ii.getMethodName()))
 
871
      {
 
872
         Map metadata = ir.getRequestPayload();
 
873
         if(metadata != null)
 
874
         {
 
875
            String listenerId = (String) metadata.get(Client.LISTENER_ID_KEY);
 
876
            if (listenerId != null)
 
877
            {
 
878
               listenerIdToServerInvokerMap.put(listenerId, this);
 
879
            }
 
880
         }
 
881
      }
 
882
      else if(InternalInvocation.REMOVECLIENTLISTENER.equals(ii.getMethodName()))
 
883
      {
 
884
         Map metadata = ir.getRequestPayload();
 
885
         if(metadata != null)
 
886
         {
 
887
            String listenerId = (String) metadata.get(Client.LISTENER_ID_KEY);
 
888
            if (listenerId != null)
 
889
            {
 
890
               listenerIdToServerInvokerMap.remove(listenerId);
 
891
               BisocketClientInvoker.removeBisocketClientInvoker(listenerId);
 
892
               destroyControlConnection(listenerId);
 
893
            }
 
894
         }
 
895
      }
 
896
 
 
897
      return response;
 
898
   }
 
899
 
 
900
 
 
901
   class ControlConnectionThread extends Thread
 
902
   {
 
903
      private static final int MAX_INITIAL_ATTEMPTS = 5;
 
904
      private Socket controlSocket;
 
905
      private String listenerId;
 
906
      private DataInputStream dis;
 
907
      private boolean running;
 
908
      private int errorCount;
 
909
      private long lastPing = -1;
 
910
      private int initialAttempts;
 
911
 
 
912
      ControlConnectionThread(Socket socket, String listenerId) throws IOException
 
913
      {
 
914
         controlSocket = socket;
 
915
         this.listenerId = listenerId;
 
916
         dis = new DataInputStream(socket.getInputStream());
 
917
      }
 
918
 
 
919
      void shutdown()
 
920
      {
 
921
         running = false;
 
922
 
 
923
         try
 
924
         {
 
925
            controlSocket.close();
 
926
         }
 
927
         catch (IOException e)
 
928
         {
 
929
            log.warn("unable to close controlSocket");
 
930
         }
 
931
         interrupt();
 
932
      }
 
933
 
 
934
      boolean checkConnection()
 
935
      {
 
936
         if (lastPing < 0 && initialAttempts++ < MAX_INITIAL_ATTEMPTS)
 
937
         {
 
938
            return true;
 
939
         }
 
940
         else if (lastPing < 0)
 
941
         {
 
942
            return false;
 
943
         }
 
944
         
 
945
         long currentTime = System.currentTimeMillis();
 
946
 
 
947
         if (log.isTraceEnabled())
 
948
         {
 
949
            log.trace("elapsed: " + (currentTime - lastPing));
 
950
         }
 
951
         return (currentTime - lastPing <= pingWindow);
 
952
      }
 
953
 
 
954
      String getListenerId()
 
955
      {
 
956
         return listenerId;
 
957
      }
 
958
 
 
959
      public void run()
 
960
      {
 
961
         running = true;
 
962
         while (running)
 
963
         {
 
964
            Socket socket = null;
 
965
 
 
966
            try
 
967
            {
 
968
               int action = dis.read();
 
969
               lastPing = System.currentTimeMillis();
 
970
 
 
971
               switch (action)
 
972
               {
 
973
                  case Bisocket.CREATE_ORDINARY_SOCKET:
 
974
                     InvokerLocator locator = (InvokerLocator) listenerIdToInvokerLocatorMap.get(listenerId);
 
975
                     
 
976
                     IOException savedException = null;
 
977
                     final String finalHost = locator.getHost();
 
978
                     final int finalPort = locator.getPort();
 
979
 
 
980
                     for (int i = 0; i < socketCreationRetries; i++)
 
981
                     {
 
982
                        try
 
983
                        {
 
984
                           socket = (Socket) AccessController.doPrivileged( new PrivilegedExceptionAction()
 
985
                           {
 
986
                              public Object run() throws Exception
 
987
                              {
 
988
                                 Socket s = null;
 
989
                                 if (socketFactory != null)
 
990
                                    s = socketFactory.createSocket(finalHost, finalPort);
 
991
                                 else
 
992
                                    s = new Socket(finalHost, finalPort);
 
993
                                 return s;
 
994
                              }
 
995
                           });
 
996
                        }
 
997
                        catch (PrivilegedActionException e)
 
998
                        {
 
999
                           IOException ioe = (IOException) e.getCause();
 
1000
                           log.debug("Error creating a socket", ioe);
 
1001
                           savedException = ioe;
 
1002
                        }
 
1003
                        
 
1004
                        if (socket != null)
 
1005
                           break;
 
1006
                        
 
1007
                        try
 
1008
                        {
 
1009
                           Thread.sleep(1000);
 
1010
                        }
 
1011
                        catch (InterruptedException e)
 
1012
                        {
 
1013
                           if (running)
 
1014
                           {
 
1015
                              log.debug("received unexpected interrupt");
 
1016
                              continue;
 
1017
                           }
 
1018
                           else
 
1019
                           {
 
1020
                              return;
 
1021
                           }
 
1022
                        }
 
1023
                     }
 
1024
                     
 
1025
                     if (socket == null)
 
1026
                     {
 
1027
                        log.error("Unable to create socket after " + socketCreationRetries 
 
1028
                                  + " retries", savedException);
 
1029
                        continue;
 
1030
                     }
 
1031
                     
 
1032
                     DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
 
1033
                     dos.write(Bisocket.CREATE_ORDINARY_SOCKET);
 
1034
                     dos.writeUTF(listenerId);
 
1035
                     break;
 
1036
 
 
1037
                  case Bisocket.PING:
 
1038
                     continue;
 
1039
 
 
1040
                  case -1:
 
1041
                     shutdown();
 
1042
                     return;
 
1043
 
 
1044
                  default:
 
1045
                     log.error("unrecognized action on ControlConnectionThread (" +
 
1046
                               listenerId + "): " +  action);
 
1047
                     continue;
 
1048
               }
 
1049
            }
 
1050
            catch (IOException e)
 
1051
            {
 
1052
               if (running)
 
1053
               {
 
1054
                  if ("Socket closed".equalsIgnoreCase(e.getMessage()) ||
 
1055
                      "Socket is closed".equalsIgnoreCase(e.getMessage()) ||
 
1056
                      "Connection reset".equalsIgnoreCase(e.getMessage()))
 
1057
                  {
 
1058
                     shutdown();
 
1059
                     return;
 
1060
                  }
 
1061
                  log.error("Unable to process control connection: " + e.getMessage(), e);
 
1062
                  if (++errorCount > 5)
 
1063
                  {
 
1064
                     shutdown();
 
1065
                     return;
 
1066
                  }
 
1067
                  continue;
 
1068
               }
 
1069
 
 
1070
               return;
 
1071
            }
 
1072
 
 
1073
            if (!running)
 
1074
            {
 
1075
               return;
 
1076
            }
 
1077
            
 
1078
            try
 
1079
            {
 
1080
               processInvocation(socket);
 
1081
            }
 
1082
            catch (Exception e)
 
1083
            {
 
1084
               log.error("Unable to create new ServerThread: " + e.getMessage(), e);
 
1085
            }
 
1086
         }
 
1087
      }
 
1088
   }
 
1089
 
 
1090
 
 
1091
   class SecondaryServerSocketThread extends Thread
 
1092
   {
 
1093
      private ServerSocket secondaryServerSocket;
 
1094
      boolean running = true;
 
1095
 
 
1096
      SecondaryServerSocketThread(ServerSocket secondaryServerSocket) throws IOException
 
1097
      {
 
1098
         this.secondaryServerSocket = secondaryServerSocket;
 
1099
      }
 
1100
 
 
1101
      void shutdown()
 
1102
      {
 
1103
         running = false;
 
1104
         interrupt();
 
1105
      }
 
1106
 
 
1107
      public void run()
 
1108
      {
 
1109
         while (running)
 
1110
         {
 
1111
            try
 
1112
            {
 
1113
               Socket socket = null;
 
1114
               try
 
1115
               {
 
1116
                   socket = (Socket)AccessController.doPrivileged( new PrivilegedExceptionAction()
 
1117
                   {
 
1118
                      public Object run() throws Exception
 
1119
                      {
 
1120
                          return secondaryServerSocket.accept();
 
1121
                      }
 
1122
                   });
 
1123
               }
 
1124
               catch (PrivilegedActionException e)
 
1125
               {
 
1126
                   throw (IOException) e.getCause();
 
1127
               }
 
1128
               
 
1129
               if (log.isTraceEnabled()) log.trace("accepted: " + socket);
 
1130
               DataInputStream dis = new DataInputStream(socket.getInputStream());
 
1131
               int action = dis.read();
 
1132
               String listenerId = dis.readUTF();
 
1133
 
 
1134
               switch (action)
 
1135
               {
 
1136
                  case Bisocket.CREATE_CONTROL_SOCKET:
 
1137
                     BisocketClientInvoker.transferSocket(listenerId, socket, true);
 
1138
                     if (log.isTraceEnabled()) 
 
1139
                        log.trace("SecondaryServerSocketThread: created control socket: (" + socket + ")"+ listenerId);
 
1140
                     break;
 
1141
                     
 
1142
                  case Bisocket.RECREATE_CONTROL_SOCKET:
 
1143
                     BisocketClientInvoker invoker =  BisocketClientInvoker.getBisocketCallbackClientInvoker(listenerId);
 
1144
                     if (invoker == null)
 
1145
                     {
 
1146
                        log.debug("received new control socket for unrecognized listenerId: " + listenerId);
 
1147
                     }
 
1148
                     else
 
1149
                     {
 
1150
                        invoker.replaceControlSocket(socket);
 
1151
                        if (log.isTraceEnabled())
 
1152
                           log.trace("SecondaryServerSocketThread: recreated control socket: " + listenerId);
 
1153
                     }
 
1154
                     break;
 
1155
 
 
1156
                  case Bisocket.CREATE_ORDINARY_SOCKET:
 
1157
                     BisocketClientInvoker.transferSocket(listenerId, socket, false);
 
1158
                     if (log.isTraceEnabled())
 
1159
                        log.trace("SecondaryServerSocketThread: transferred socket: " + listenerId);
 
1160
                     break;
 
1161
 
 
1162
                  default:
 
1163
                     log.error("unrecognized action on SecondaryServerSocketThread: " + action);
 
1164
               }
 
1165
            }
 
1166
            catch (IOException e)
 
1167
            {
 
1168
               if (running)
 
1169
                  log.error("Failed to accept socket connection", e);
 
1170
               else
 
1171
                  return;
 
1172
 
 
1173
            }
 
1174
         }
 
1175
      }
 
1176
 
 
1177
      ServerSocket getServerSocket()
 
1178
      {
 
1179
         return secondaryServerSocket;
 
1180
      }
 
1181
   }
 
1182
 
 
1183
 
 
1184
   static class ControlMonitorTimerTask extends TimerTask
 
1185
   {
 
1186
      private boolean running = true;
 
1187
      private BisocketServerInvoker invoker;
 
1188
      private Map listenerIdToInvokerLocatorMap;
 
1189
      private Map controlConnectionThreadMap;
 
1190
      private Map controlConnectionRestartsMap;
 
1191
      private int controlConnectionRestarts;
 
1192
      
 
1193
      ControlMonitorTimerTask(BisocketServerInvoker invoker)
 
1194
      {
 
1195
         this.invoker = invoker;
 
1196
         listenerIdToInvokerLocatorMap = invoker.listenerIdToInvokerLocatorMap;
 
1197
         controlConnectionThreadMap = invoker.controlConnectionThreadMap;
 
1198
         controlConnectionRestartsMap = invoker.controlConnectionRestartsMap;
 
1199
         controlConnectionRestarts = invoker.controlConnectionRestarts;
 
1200
      }
 
1201
 
 
1202
      synchronized void shutdown()
 
1203
      {
 
1204
         // Note that there is a race between shutdown() and run().  But if run()
 
1205
         // were synchronized, then shutdown() could be held up waiting on network
 
1206
         // i/o, including invocations on a server that no longer is accessible.
 
1207
         // So only minimal synchronization is imposed on run(), enough to avoid
 
1208
         // NullPointerExceptions.
 
1209
         
 
1210
         running = false;
 
1211
         invoker = null;
 
1212
         listenerIdToInvokerLocatorMap = null;
 
1213
         controlConnectionThreadMap = null;
 
1214
         cancel();
 
1215
 
 
1216
         try
 
1217
         {
 
1218
            Method purge = getDeclaredMethod(Timer.class, "purge", new Class[]{});
 
1219
            purge.invoke(timer, new Object[]{});
 
1220
         }
 
1221
         catch (Exception e)
 
1222
         {
 
1223
            log.debug("running with jdk 1.4: unable to purge Timer");
 
1224
         }
 
1225
      }
 
1226
 
 
1227
      public void run()
 
1228
      {
 
1229
         if (!running)
 
1230
            return;
 
1231
         
 
1232
         if (log.isTraceEnabled())
 
1233
            log.trace("checking connections");
 
1234
 
 
1235
         Collection controlConnectionThreads = null;
 
1236
         synchronized (this)
 
1237
         {
 
1238
            if (!running)
 
1239
               return;
 
1240
            
 
1241
            controlConnectionThreads = new HashSet(controlConnectionThreadMap.values());
 
1242
         }
 
1243
         
 
1244
         Iterator it = controlConnectionThreads.iterator();
 
1245
         while (it.hasNext())
 
1246
         {
 
1247
            final ControlConnectionThread t = (ControlConnectionThread) it.next();
 
1248
            final String listenerId = t.getListenerId();
 
1249
            final Object locator;
 
1250
            
 
1251
            synchronized (this)
 
1252
            {
 
1253
               if (!running)
 
1254
                  return;
 
1255
                  
 
1256
               locator = listenerIdToInvokerLocatorMap.get(listenerId);
 
1257
            }
 
1258
            
 
1259
            if (!t.checkConnection())
 
1260
            {
 
1261
               t.shutdown();
 
1262
               
 
1263
               synchronized (this)
 
1264
               {
 
1265
                  if (!running)
 
1266
                     return;
 
1267
                  
 
1268
                  controlConnectionThreadMap.remove(listenerId);
 
1269
                  Object o = controlConnectionRestartsMap.get(listenerId);
 
1270
                  int restarts = ((Integer)o).intValue();
 
1271
                  
 
1272
                  if (restarts + 1 > controlConnectionRestarts)
 
1273
                  {
 
1274
                     log.warn(this + ": detected failure on control connection " + t);
 
1275
                     log.warn("Control connection " + listenerId + " has been recreated " + restarts + " times.");
 
1276
                     log.warn("Assuming it is a connection to an old server, and will not restart");
 
1277
                     controlConnectionRestartsMap.remove(listenerId);
 
1278
                     continue;
 
1279
                  }
 
1280
                  
 
1281
                  log.warn(this + ": detected failure on control connection " + t + 
 
1282
                                  " (" + listenerId + 
 
1283
                                  ": requesting new control connection");
 
1284
               }
 
1285
               
 
1286
               Thread t2 = new Thread()
 
1287
               {
 
1288
                  public void run()
 
1289
                  {
 
1290
                     if (!running)
 
1291
                        return;
 
1292
                     
 
1293
                     try
 
1294
                     {
 
1295
                        invoker.createControlConnection(listenerId, false);
 
1296
                     }
 
1297
                     catch (ClientUnavailableException e)
 
1298
                     {
 
1299
                        log.debug("Unable to recreate control connection: " + locator, e);
 
1300
                     }
 
1301
                     catch (IOException e)
 
1302
                     {
 
1303
                        if (running)
 
1304
                           log.error("Unable to recreate control connection: " + locator, e);
 
1305
                        else
 
1306
                           log.debug("Unable to recreate control connection: " + locator, e);
 
1307
                     }
 
1308
                  }
 
1309
               };
 
1310
               t2.setName("controlConnectionRecreate:" + t.getName());
 
1311
               t2.start();
 
1312
            }
 
1313
         }
 
1314
      }
 
1315
   }
 
1316
   
 
1317
   static class ClientUnavailableException extends IOException
 
1318
   {
 
1319
      private static final long serialVersionUID = 2846502029152028732L;
 
1320
   }
 
1321
   
 
1322
   static private Method getDeclaredMethod(final Class c, final String name, final Class[] parameterTypes)
 
1323
   throws NoSuchMethodException
 
1324
   {
 
1325
      if (SecurityUtility.skipAccessControl())
 
1326
      {
 
1327
         Method m = c.getDeclaredMethod(name, parameterTypes);
 
1328
         m.setAccessible(true);
 
1329
         return m;
 
1330
      }
 
1331
 
 
1332
      try
 
1333
      {
 
1334
         return (Method) AccessController.doPrivileged( new PrivilegedExceptionAction()
 
1335
         {
 
1336
            public Object run() throws NoSuchMethodException
 
1337
            {
 
1338
               Method m = c.getDeclaredMethod(name, parameterTypes);
 
1339
               m.setAccessible(true);
 
1340
               return m;
 
1341
            }
 
1342
         });
 
1343
      }
 
1344
      catch (PrivilegedActionException e)
 
1345
      {
 
1346
         throw (NoSuchMethodException) e.getCause();
 
1347
      }
 
1348
   }
 
1349
}
 
 
b'\\ No newline at end of file'