~ubuntu-branches/ubuntu/oneiric/libjboss-remoting-java/oneiric

« back to all changes in this revision

Viewing changes to src/org/jboss/remoting/transport/multiplex/MultiplexingManager.java

  • Committer: Package Import Robot
  • Author(s): Torsten Werner
  • Date: 2011-09-09 14:01:03 UTC
  • mfrom: (1.1.6 upstream)
  • Revision ID: package-import@ubuntu.com-20110909140103-hqokx61534tas9rg
Tags: 2.5.3.SP1-1
* Newer but not newest upstream release. Do not build samples.
* Change debian/watch to upstream's svn repo.
* Add patch to fix compile error caused by tomcat update.
  (Closes: #628303)
* Switch to source format 3.0.
* Switch to debhelper level 7.
* Remove useless Depends.
* Update Standards-Version: 3.9.2.
* Update README.source.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/*
2
 
* JBoss, Home of Professional Open Source
3
 
* Copyright 2005, JBoss Inc., and individual contributors as indicated
4
 
* by the @authors tag. See the copyright.txt in the distribution for a
5
 
* full listing of individual contributors.
6
 
*
7
 
* This is free software; you can redistribute it and/or modify it
8
 
* under the terms of the GNU Lesser General Public License as
9
 
* published by the Free Software Foundation; either version 2.1 of
10
 
* the License, or (at your option) any later version.
11
 
*
12
 
* This software is distributed in the hope that it will be useful,
13
 
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14
 
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15
 
* Lesser General Public License for more details.
16
 
*
17
 
* You should have received a copy of the GNU Lesser General Public
18
 
* License along with this software; if not, write to the Free
19
 
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20
 
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21
 
*/
22
 
 
23
 
package org.jboss.remoting.transport.multiplex;
24
 
 
25
 
import org.jboss.logging.Logger;
26
 
import org.jboss.remoting.transport.multiplex.InputMultiplexor.MultiGroupInputThread;
27
 
import org.jboss.remoting.transport.multiplex.utility.GrowablePipedOutputStream;
28
 
import org.jboss.remoting.transport.multiplex.utility.StoppableThread;
29
 
import org.jboss.remoting.transport.multiplex.utility.VirtualSelector;
30
 
 
31
 
import javax.net.SocketFactory;
32
 
import javax.net.ssl.HandshakeCompletedEvent;
33
 
import javax.net.ssl.HandshakeCompletedListener;
34
 
import javax.net.ssl.SSLSocket;
35
 
import java.io.ByteArrayOutputStream;
36
 
import java.io.IOException;
37
 
import java.io.InputStream;
38
 
import java.io.OutputStream;
39
 
import java.net.InetSocketAddress;
40
 
import java.net.ServerSocket;
41
 
import java.net.Socket;
42
 
import java.net.SocketTimeoutException;
43
 
import java.nio.channels.Channels;
44
 
import java.nio.channels.SocketChannel;
45
 
import java.util.ArrayList;
46
 
import java.util.Collection;
47
 
import java.util.Collections;
48
 
import java.util.Date;
49
 
import java.util.HashMap;
50
 
import java.util.HashSet;
51
 
import java.util.Iterator;
52
 
import java.util.List;
53
 
import java.util.Map;
54
 
import java.util.Set;
55
 
import java.util.Timer;
56
 
import java.util.TimerTask;
57
 
 
58
 
 
59
 
/**
60
 
 * <code>MultiplexingManager</code> is the heart of the Multiplex system.  It is the implementation
61
 
 * of the virtual socket group concept.  See the Multiplex documentation on the
62
 
 * labs.jboss.org website for more information about virtual socket groups.
63
 
 * <p>
64
 
 * <code>MultiplexingManager</code> wraps a single real <code>java.net.Socket</code>.
65
 
 * It creates the socket when it is running on the client side, and it is passed a
66
 
 * socket by <code>MasterServerSocket</code> when it is running on the server side.
67
 
 * <p>
68
 
 * <code>MultiplexingManager</code> creates the infrastructure
69
 
 * which supports multiplexing, including an <code>OutputMultiplexor</code> output thread and one
70
 
 * or more <code>InputMultiplexor</code> input threads.  When the last member leaves the socket
71
 
 * group, a <code>MultiplexingManager</code> is responsible for negotiating with its remote peer
72
 
 * for permission to shut down, and for tearing down the multiplexing infrastructure when
73
 
 * the negotiations succeed.
74
 
 * <p>
75
 
 * <code>MultiplexingManager</code> also provides the mechanism by which a virtual socket joins
76
 
 * a virtual socket group, identifying the new socket to the <code>InputMultiplexor</code> input
77
 
 * thread.
78
 
 *
79
 
 * <p>
80
 
 * Copyright (c) 2005
81
 
 * <p>
82
 
 * @author <a href="mailto:r.sigal@computer.org">Ron Sigal</a>
83
 
 * 
84
 
 * @deprecated As of release 2.4.0 the multiplex transport will no longer be actively supported.
85
 
 */
86
 
public class MultiplexingManager
87
 
implements OutputMultiplexor.OutputMultiplexorClient, HandshakeCompletedListener
88
 
{
89
 
   private static final Logger log = Logger.getLogger(MultiplexingManager.class);
90
 
 
91
 
 
92
 
   /** Determines how often to check that no MultiplexingManagers have been running,
93
 
    *  in which case the static threads can be shut down. */
94
 
   private static int staticThreadsMonitorPeriod;
95
 
 
96
 
   /** True if and only if the MultiplexingManager static threads are running */
97
 
   private static boolean staticThreadsRunning;
98
 
 
99
 
   /** This object is used to synchronized operations on the managersByRemoteAddress map. */
100
 
   private static Object shareableMapLock = new Object();
101
 
 
102
 
   /** A HashMap<InetSocketAddress, HasnSet<MultiplexingManager>> of sets of MultiplexingManager's
103
 
    *  indexed by remote address.  Holds all MultiplexingManagers whose peer has an attached
104
 
    *  VirtualServerSocket. */
105
 
   private static Map shareableManagers = new HashMap();
106
 
 
107
 
   /** This object is used to synchronize operations on the managersByLocalAddress map. */
108
 
   private static Object localAddressMapLock = new Object();
109
 
 
110
 
   /** A HashMap<InetSocketAddress, HasnSet<MultiplexingManager>> of sets of MultiplexingManager's
111
 
    *  indexed by local address */
112
 
   private static Map managersByLocalAddress = new HashMap();
113
 
 
114
 
   /** This object is used to synchronized operations on the managersByRemoteAddress map. */
115
 
   private static Object remoteAddressMapLock = new Object();
116
 
 
117
 
   /** A HashMap<InetSocketAddress, HasnSet<MultiplexingManager>> of sets of MultiplexingManager's
118
 
    *  indexed by remote address */
119
 
//   private static Map managersByRemoteAddress = Collections.synchronizedMap(new HashMap());
120
 
   private static Map managersByRemoteAddress = new HashMap();
121
 
 
122
 
   /** Set of all MultiplexingManagers */
123
 
   private static Set allManagers = Collections.synchronizedSet(new HashSet());
124
 
 
125
 
   /** InputMultiplexor in this JVM */
126
 
   private static InputMultiplexor inputMultiplexor;
127
 
 
128
 
   /** OutputMultiplexor in this JVM */
129
 
   private static OutputMultiplexor outputMultiplexor;
130
 
 
131
 
   /** Thread for writing to socket's OutputStream */
132
 
   private static OutputMultiplexor.OutputThread outputThread;
133
 
 
134
 
   /** MultiGroupInputThread reading from socket's InputStream and distributing to virtual sockets
135
 
    *  Processes all NIO sockets */
136
 
   private static MultiGroupInputThread multiGroupInputThread;
137
 
 
138
 
   /** MultiplexingInputStreams register with virtualSelector when they have bytes to read */
139
 
   private static VirtualSelector virtualSelector;
140
 
 
141
 
   /** Thread for getting asynchronous messages from remote Protocol */
142
 
   private static Protocol.BackChannelThread backChannelThread;
143
 
 
144
 
   /** Holds PendingActions waiting to be performed */
145
 
   private static List pendingActions = new ArrayList();
146
 
 
147
 
   /** Removes virtual sockets from closingSockets and closes them. */
148
 
   private static PendingActionThread pendingActionThread;
149
 
 
150
 
   /** Thread for stashing potentially long activities, as well as periodic activities */
151
 
   private static Timer timer;
152
 
 
153
 
   /** Used to determine when to shut down static threads */
154
 
   private static boolean hasBeenIdle;
155
 
 
156
 
   /** Used to distinguish the static threads in this jvm */
157
 
   private final static short time = (short) System.currentTimeMillis();
158
 
 
159
 
   /** If shutdown request is not answered within this time period, assume a problem
160
 
    *  snd shut down. */
161
 
   private int shutdownRequestTimeout;
162
 
 
163
 
   /** Determines how often ShutdownMonitorTimerTask should check for response
164
 
    *  to ShutdownRequestThread. */
165
 
   private int shutdownMonitorPeriod;
166
 
 
167
 
   /** If the peer MultiplexingManager has refused to shutdown this many times,
168
 
    *  shut down anyway. */
169
 
   private int shutdownRefusalsMaximum;
170
 
 
171
 
   /** Holds configuration parameters */
172
 
   private static Map configuration = new HashMap();
173
 
 
174
 
   /** A HashMap<SocketId, VirtualSocket> of VirtualSocket's indexed by local SocketId */
175
 
   private Map socketMap = Collections.synchronizedMap(new HashMap());
176
 
 
177
 
   /** A HashSet of SocketId's registered on this MultiplexingManager */
178
 
   private Set registeredSockets = Collections.synchronizedSet(new HashSet());
179
 
 
180
 
   /** A HashMap<Long, OutputStream> of piped OutputStreams associated with InputStreams  */
181
 
   private Map outputStreamMap = Collections.synchronizedMap(new HashMap());
182
 
 
183
 
   /** A HashMap<Long, InputStream> of InputStreams associated with virtual sockets  */
184
 
   private Map inputStreamMap = Collections.synchronizedMap(new HashMap());
185
 
 
186
 
   /** Holds OutputStreams associated with virtual sockets */
187
 
   private Set outputStreamSet = Collections.synchronizedSet(new HashSet());
188
 
 
189
 
   /** Protocol back channel OutputStream */
190
 
   private OutputStream backchannelOutputStream;
191
 
 
192
 
   /** Threads waiting to be notified of registration of remote server socket */
193
 
   private Set threadsWaitingForRemoteServerSocket = new HashSet();
194
 
 
195
 
   /** Protocol object for handling connection/disconnection communications */
196
 
   private Protocol protocol;
197
 
 
198
 
   /** Actual local socket upon which this family of virtual sockets is based */
199
 
   private Socket socket;
200
 
 
201
 
   /** Returned by toString() and used in log messages */
202
 
   String description;
203
 
 
204
 
   /** bound state of actual local socket */
205
 
   private boolean bound = false;
206
 
 
207
 
   /** connected state of actual local socket */
208
 
   private boolean connected = false;
209
 
 
210
 
   /** SocketAddress of remote actual socket to which this Manager's socket is connected */
211
 
   private InetSocketAddress remoteSocketAddress;
212
 
 
213
 
   /** SocketAddress to which this manager's actual socket is bound */
214
 
   private InetSocketAddress localSocketAddress;
215
 
 
216
 
   /** Represents local port on which this manager's actual socket is bound, with any local address */
217
 
   private InetSocketAddress localWildCardAddress;
218
 
 
219
 
   /** InputStream of real socket */
220
 
   private InputStream inputStream;
221
 
 
222
 
   /** OutputStream of real socket */
223
 
   private OutputStream outputStream;
224
 
 
225
 
   /** Currently registered server socket */
226
 
   private ServerSocket serverSocket;
227
 
 
228
 
   /** Indicates if remote server socket has been registered */
229
 
   private boolean remoteServerSocketRegistered = false;
230
 
 
231
 
   /** True if and only if this MultiplexingManager was originally created by a
232
 
    *  call to MasterServerSocket.acceptServerSocketConnection(). */
233
 
   private boolean createdForRemoteServerSocket;
234
 
 
235
 
   /** SingleGroupInputThread reading from socket's InputStream and distributing to virtual sockets */
236
 
   private InputMultiplexor.SingleGroupInputThread inputThread;
237
 
 
238
 
   /** OutputStream for unknown virtual sockets */
239
 
   private OutputStream deadLetterOutputStream = new ByteArrayOutputStream();
240
 
 
241
 
   /** Manages the shutdown handshaking protocol between peer MultiplexingManagers */
242
 
   private ShutdownManager shutdownManager = new ShutdownManager();
243
 
 
244
 
   /** Thread that carries out shut down process */
245
 
   private ShutdownThread shutdownThread;
246
 
 
247
 
   /** true if and only MultiplexingManager is completely shut down */
248
 
   private boolean shutdown = false;
249
 
 
250
 
   /** Indicates if log trace level is enabled */
251
 
   private boolean trace;
252
 
 
253
 
   /** Indicates if log debug level is enabled */
254
 
   private boolean debug;
255
 
 
256
 
   /** Indicates if log info level is enabled */
257
 
   private boolean info;
258
 
 
259
 
   private long id;
260
 
 
261
 
   /** SocketFactory to use for creating sockets */
262
 
   private SocketFactory socketFactory;
263
 
 
264
 
   /** Saves HandshakeCompletedEvent for SSLSocket */
265
 
   private HandshakeCompletedEvent handshakeCompletedEvent;
266
 
 
267
 
   /** Holds IOException thrown by real InputStream */
268
 
   private IOException readException;
269
 
 
270
 
   /** Holds IOException thrown by real OutputStream */
271
 
   private IOException writeException;
272
 
 
273
 
 
274
 
   protected synchronized static void init(Map configuration) throws IOException
275
 
   {
276
 
      try
277
 
      {
278
 
         if (staticThreadsRunning)
279
 
            return;
280
 
 
281
 
         log.debug("starting static threads");
282
 
 
283
 
         // Start output thread.
284
 
         outputMultiplexor = new OutputMultiplexor(configuration);
285
 
         outputThread = outputMultiplexor.getAnOutputThread();
286
 
         outputThread.setName("output:" + time);
287
 
         outputThread.setDaemon(true);
288
 
         outputThread.start();
289
 
         log.debug("started output thread");
290
 
 
291
 
         // Start input thread.
292
 
         inputMultiplexor = new InputMultiplexor(configuration);
293
 
         multiGroupInputThread = inputMultiplexor.getaMultiGroupInputThread();
294
 
         multiGroupInputThread.setName("input:" + time);
295
 
         multiGroupInputThread.setDaemon(true);
296
 
         multiGroupInputThread.start();
297
 
         log.debug("started input thread");
298
 
 
299
 
         // Start back channel thread.
300
 
         virtualSelector = new VirtualSelector();
301
 
         backChannelThread = Protocol.getBackChannelThread(virtualSelector);
302
 
         backChannelThread.setName("backchannel:" + time);
303
 
         backChannelThread.setDaemon(true);
304
 
         backChannelThread.start();
305
 
         log.debug("started backchannel thread");
306
 
 
307
 
         // Start timer.
308
 
         timer = new Timer(true);
309
 
         TimerTask shutdownMonitorTask = new TimerTask()
310
 
         {
311
 
            public void run()
312
 
            {
313
 
               log.trace("allManagers.isEmpty(): " + allManagers.isEmpty());
314
 
               log.trace("hasBeenIdle: " + hasBeenIdle);
315
 
               if (allManagers.isEmpty())
316
 
               {
317
 
                  if (hasBeenIdle)
318
 
                  {
319
 
                     MultiplexingManager.shutdownThreads();
320
 
                     this.cancel();
321
 
                  }
322
 
                  else
323
 
                     hasBeenIdle = true;
324
 
               }
325
 
               else
326
 
               {
327
 
                  hasBeenIdle = false;
328
 
               }
329
 
            }
330
 
 
331
 
         };
332
 
         timer.scheduleAtFixedRate(shutdownMonitorTask, staticThreadsMonitorPeriod, staticThreadsMonitorPeriod);
333
 
 
334
 
         // Start pending actions thread.
335
 
         pendingActionThread = new PendingActionThread();
336
 
         pendingActionThread.setName("pending actions:" + time);
337
 
         pendingActionThread.setDaemon(true);
338
 
         pendingActionThread.start();
339
 
         log.debug("started pendingAction thread");
340
 
 
341
 
         staticThreadsRunning = true;
342
 
      }
343
 
      catch (IOException e)
344
 
      {
345
 
         log.error(e);
346
 
         throw e;
347
 
      }
348
 
   }
349
 
 
350
 
 
351
 
   protected MultiplexingManager(Map configuration) throws IOException
352
 
   {
353
 
      if (configuration != null)
354
 
         MultiplexingManager.configuration.putAll(configuration);
355
 
      socketFactory = (SocketFactory) configuration.get(Multiplex.SOCKET_FACTORY);
356
 
      id = new Date().getTime();
357
 
      socket = createSocket();
358
 
      allManagers.add(this);
359
 
      if (debug) log.debug("new MultiplexingManager(" + id + "): " + description);
360
 
   }
361
 
 
362
 
 
363
 
/**
364
 
 *
365
 
 * @param socket
366
 
 * @param configuration
367
 
 * @throws IOException
368
 
 */
369
 
   protected MultiplexingManager(Socket socket, Map configuration) throws IOException
370
 
   {
371
 
      this.socket = socket;
372
 
      if (configuration != null)
373
 
         MultiplexingManager.configuration.putAll(configuration);
374
 
      id = new Date().getTime();
375
 
      setup();
376
 
      allManagers.add(this);
377
 
      if (debug) log.debug("new MultiplexingManager(" + id + "): " + description);
378
 
   }
379
 
 
380
 
 
381
 
/**
382
 
 *
383
 
 * @param address
384
 
 * @param timeout
385
 
 * @param configuration
386
 
 * @throws IOException
387
 
 */
388
 
   protected MultiplexingManager(InetSocketAddress address, int timeout, Map configuration)
389
 
   throws IOException
390
 
   {
391
 
      if (configuration != null)
392
 
         MultiplexingManager.configuration.putAll(configuration);
393
 
      socketFactory = (SocketFactory) configuration.get(Multiplex.SOCKET_FACTORY);
394
 
      id = new Date().getTime();
395
 
      socket = createSocket(address, timeout);
396
 
      setup();
397
 
      allManagers.add(this);
398
 
      if (debug) log.debug("new MultiplexingManager(" + id + "): " + description);
399
 
   }
400
 
 
401
 
 
402
 
/**
403
 
 *
404
 
 */
405
 
   protected synchronized void setup() throws IOException
406
 
   {
407
 
      description = socket.toString();
408
 
      trace = log.isTraceEnabled();
409
 
      debug = log.isDebugEnabled();
410
 
      info = log.isInfoEnabled();
411
 
 
412
 
      // Initialize MultiplexingManager parameters.
413
 
      initParameters(configuration);
414
 
 
415
 
      // Make sure static threads are running.
416
 
      synchronized (MultiplexingManager.class)
417
 
      {
418
 
         if (!staticThreadsRunning)
419
 
            init(configuration);
420
 
      }
421
 
 
422
 
      // Get InputStream and OutputStream
423
 
      if (socket.getChannel() == null)
424
 
      {
425
 
//         inputStream = new BufferedInputStream(socket.getInputStream());
426
 
//         outputStream = new BufferedOutputStream(socket.getOutputStream());
427
 
         inputStream = socket.getInputStream();
428
 
         outputStream = socket.getOutputStream();
429
 
      }
430
 
      else
431
 
      {
432
 
         inputStream = Channels.newInputStream(socket.getChannel());
433
 
         outputStream = Channels.newOutputStream(socket.getChannel());
434
 
         socket.setTcpNoDelay(false);
435
 
      }
436
 
 
437
 
      // register dead letter output stream (for unrecognized destinations)
438
 
      outputStreamMap.put(SocketId.DEADLETTER_SOCKET_ID, deadLetterOutputStream);
439
 
 
440
 
      // TODO: what was this for???
441
 
      registeredSockets.add(SocketId.PROTOCOL_SOCKET_ID);
442
 
      registeredSockets.add(SocketId.SERVER_SOCKET_ID);
443
 
      registeredSockets.add(SocketId.SERVER_SOCKET_CONNECT_ID);
444
 
      registeredSockets.add(SocketId.SERVER_SOCKET_VERIFY_ID);
445
 
      registeredSockets.add(SocketId.BACKCHANNEL_SOCKET_ID);
446
 
 
447
 
      // set up standard piped streams
448
 
      getAnInputStream(SocketId.PROTOCOL_SOCKET_ID, null);
449
 
      getAnInputStream(SocketId.SERVER_SOCKET_ID, null);
450
 
      getAnInputStream(SocketId.SERVER_SOCKET_CONNECT_ID, null);
451
 
      getAnInputStream(SocketId.SERVER_SOCKET_VERIFY_ID, null);
452
 
 
453
 
      // Create protocol backchannel streams
454
 
      protocol = new Protocol(this);
455
 
      MultiplexingInputStream bcis = getAnInputStream(SocketId.BACKCHANNEL_SOCKET_ID, null);
456
 
      bcis.register(virtualSelector, this);
457
 
      if (debug) log.debug("registered backchannel input stream");
458
 
      backchannelOutputStream = new MultiplexingOutputStream(this, SocketId.PROTOCOL_SOCKET_ID);
459
 
 
460
 
      // Register with OutputMultiplexor
461
 
      outputMultiplexor.register(this);
462
 
 
463
 
      // Create or register with input thread
464
 
      if (socket.getChannel() == null)
465
 
      {
466
 
         // start input thread
467
 
         log.debug("creating single group input thread");
468
 
 
469
 
         if (inputMultiplexor == null)
470
 
            inputMultiplexor = new InputMultiplexor(configuration);
471
 
 
472
 
         inputThread = inputMultiplexor.getaSingleGroupInputThread(this, socket, deadLetterOutputStream);
473
 
         inputThread.setName(inputThread.getName() + ":input(" + description + ")");
474
 
         inputThread.start();
475
 
      }
476
 
      else
477
 
      {
478
 
         socket.getChannel().configureBlocking(false);
479
 
         multiGroupInputThread.registerSocketGroup(this);
480
 
         log.debug("registered socket group");
481
 
      }
482
 
 
483
 
      registerByLocalAddress(new InetSocketAddress(socket.getLocalAddress(), socket.getLocalPort()));
484
 
      registerByRemoteAddress(new InetSocketAddress(socket.getInetAddress(), socket.getPort()));
485
 
      bound = true;
486
 
      connected = true;
487
 
 
488
 
      if (socket instanceof SSLSocket)
489
 
      {
490
 
//         Object o = configuration.get(Multiplex.SSL_HANDSHAKE_LISTENER);
491
 
//         if (o != null)
492
 
//         {
493
 
//            HandshakeCompletedListener hcl = (HandshakeCompletedListener) o;
494
 
//            ((SSLSocket) socket).addHandshakeCompletedListener(hcl);
495
 
//         }
496
 
         ((SSLSocket) socket).addHandshakeCompletedListener(this);
497
 
      }
498
 
   }
499
 
 
500
 
 
501
 
   protected void initParameters(Map configuration)
502
 
   {
503
 
      this.configuration = configuration;
504
 
 
505
 
      staticThreadsMonitorPeriod
506
 
         = Multiplex.getOneParameter(configuration,
507
 
                                     "staticThreadsMonitorPeriod",
508
 
                                      Multiplex.STATIC_THREADS_MONITOR_PERIOD,
509
 
                                      Multiplex.STATIC_THREADS_MONITOR_PERIOD_DEFAULT);
510
 
 
511
 
      shutdownRequestTimeout
512
 
         = Multiplex.getOneParameter(configuration,
513
 
                                     "shutdownRequestTimeout",
514
 
                                     Multiplex.SHUTDOWN_REQUEST_TIMEOUT,
515
 
                                     Multiplex.SHUTDOWN_REQUEST_TIMEOUT_DEFAULT);
516
 
 
517
 
      shutdownRefusalsMaximum
518
 
         = Multiplex.getOneParameter(configuration,
519
 
                                     "shutdownRefusalsMaximum",
520
 
                                     Multiplex.SHUTDOWN_REFUSALS_MAXIMUM,
521
 
                                     Multiplex.SHUTDOWN_REFUSALS_MAXIMUM_DEFAULT);
522
 
 
523
 
      shutdownMonitorPeriod
524
 
         = Multiplex.getOneParameter(configuration,
525
 
                                     "shutdownMonitorPeriod",
526
 
                                     Multiplex.SHUTDOWN_MONITOR_PERIOD,
527
 
                                     Multiplex.SHUTDOWN_MONITOR_PERIOD_DEFAULT);
528
 
   }
529
 
 
530
 
 
531
 
/**
532
 
 *
533
 
 * @param socket
534
 
 * @param configuration
535
 
 * @return
536
 
 * @throws IOException
537
 
 *
538
 
 * TODO: what if multiplexor already exists?
539
 
 */
540
 
   public static MultiplexingManager getaManager(Socket socket, Map configuration) throws IOException
541
 
   {
542
 
      log.debug("entering getaManager(Socket socket)");
543
 
      return new MultiplexingManager(socket, configuration);
544
 
   }
545
 
 
546
 
 
547
 
/**
548
 
 * @param address
549
 
 * @return
550
 
 * @throws IOException
551
 
 */
552
 
   public static synchronized MultiplexingManager
553
 
   getaManagerByLocalAddress(InetSocketAddress address) throws IOException
554
 
   {
555
 
      return getaManagerByLocalAddress(address, null);
556
 
   }
557
 
 
558
 
 
559
 
/**
560
 
 *
561
 
 * @param address
562
 
 * @param conf
563
 
 * @return
564
 
 * @throws IOException
565
 
 */
566
 
   public static synchronized MultiplexingManager
567
 
   getaManagerByLocalAddress(InetSocketAddress address, Map conf) throws IOException
568
 
   {
569
 
      log.debug("entering getaManagerByLocalAddress(InetSocketAddress address)");
570
 
      MultiplexingManager m = null;
571
 
 
572
 
      synchronized (localAddressMapLock)
573
 
      {
574
 
         HashSet managers = (HashSet) managersByLocalAddress.get(address);
575
 
 
576
 
         if (managers != null)
577
 
         {
578
 
            Iterator it = managers.iterator();
579
 
            while (it.hasNext())
580
 
            {
581
 
               m = (MultiplexingManager) it.next();
582
 
               try
583
 
               {
584
 
                  m.shutdownManager.incrementReferences();
585
 
                  return m;
586
 
               }
587
 
               catch (IOException e)
588
 
               {
589
 
               }
590
 
            }
591
 
         }
592
 
      }
593
 
 
594
 
      log.debug("There is no joinable MultiplexingManager. Creating new one.");
595
 
      m = new MultiplexingManager(conf);
596
 
      m.bind(address);
597
 
      return m;
598
 
   }
599
 
 
600
 
 
601
 
/**
602
 
 *
603
 
 * @param address
604
 
 * @param timeout
605
 
 * @return
606
 
 * @throws IOException
607
 
 */
608
 
   public static synchronized MultiplexingManager
609
 
   getaManagerByRemoteAddress(InetSocketAddress address, int timeout) throws IOException
610
 
   {
611
 
      return getaManagerByRemoteAddress(address, timeout, null);
612
 
   }
613
 
 
614
 
 
615
 
/**
616
 
 *
617
 
 * @param address
618
 
 * @param timeout
619
 
 * @param configuration
620
 
 * @return
621
 
 * @throws IOException
622
 
 */
623
 
   public static synchronized MultiplexingManager
624
 
   getaManagerByRemoteAddress(InetSocketAddress address, int timeout, Map conf)
625
 
   throws IOException
626
 
   {
627
 
      log.debug("entering getaManagerByRemoteAddress(InetSocketAddress address)");
628
 
 
629
 
//      if (isOnServer())
630
 
//      {
631
 
 
632
 
      // Check each of the MultiplexingManagers connected to the target remote address, looking
633
 
      // for one which is not shutting down.
634
 
      synchronized(remoteAddressMapLock)
635
 
      {
636
 
         HashSet managers = (HashSet) managersByRemoteAddress.get(address);
637
 
 
638
 
         if (managers != null && !managers.isEmpty())
639
 
         {
640
 
            Iterator it = managers.iterator();
641
 
            while (it.hasNext())
642
 
            {
643
 
               MultiplexingManager m = (MultiplexingManager) it.next();
644
 
               try
645
 
               {
646
 
                  m.shutdownManager.incrementReferences();
647
 
                  return m;
648
 
               }
649
 
               catch (Exception e)
650
 
               {
651
 
                  log.debug("manager shutting down: " + m);
652
 
               }
653
 
            }
654
 
         }
655
 
      }
656
 
 
657
 
      return new MultiplexingManager(address, timeout, conf);
658
 
   }
659
 
 
660
 
 
661
 
 
662
 
/**
663
 
 * @param remoteAddress
664
 
 * @param localAddress
665
 
 * @param timeout
666
 
 * @return
667
 
 * @throws IOException
668
 
 */
669
 
   public static synchronized MultiplexingManager
670
 
   getaManagerByAddressPair(InetSocketAddress remoteAddress, InetSocketAddress localAddress, int timeout)
671
 
   throws IOException
672
 
   {
673
 
      return getaManagerByAddressPair(remoteAddress, localAddress, timeout, null);
674
 
   }
675
 
 
676
 
 
677
 
/**
678
 
 * @param remoteAddress
679
 
 * @param localAddress
680
 
 * @param timeout
681
 
 * @param configuration
682
 
 * @return
683
 
 * @throws IOException
684
 
 */
685
 
   public static synchronized MultiplexingManager
686
 
   getaManagerByAddressPair(InetSocketAddress remoteAddress, InetSocketAddress localAddress,
687
 
                            int timeout, Map conf)
688
 
   throws IOException
689
 
   {
690
 
      log.debug("entering getaManagerByRemoteAddress(InetSocketAddress address)");
691
 
      MultiplexingManager m;
692
 
 
693
 
      // Check each of the MultiplexingManagers connected to the target remote address, looking
694
 
      // for one which is not shutting down.
695
 
      synchronized(remoteAddressMapLock)
696
 
      {
697
 
         HashSet managers = (HashSet) managersByRemoteAddress.get(remoteAddress);
698
 
 
699
 
         if (managers != null && !managers.isEmpty())
700
 
         {
701
 
            Iterator it = managers.iterator();
702
 
 
703
 
            while (it.hasNext())
704
 
            {
705
 
               m = (MultiplexingManager) it.next();
706
 
               if (m.getSocket().getLocalAddress().equals(localAddress.getAddress()) &&
707
 
                   m.getSocket().getLocalPort() == localAddress.getPort())
708
 
               {
709
 
                  try
710
 
                  {
711
 
                     m.shutdownManager.incrementReferences();
712
 
                     return m;
713
 
                  }
714
 
                  catch (Exception e)
715
 
                  {
716
 
                     log.debug("manager shutting down: " + m);
717
 
                  }
718
 
               }
719
 
            }
720
 
         }
721
 
      }
722
 
 
723
 
      log.debug("There is no joinable MultiplexingManager. Creating new one.");
724
 
      m = new MultiplexingManager(conf);
725
 
      m.bind(localAddress);
726
 
      return m;
727
 
   }
728
 
 
729
 
 
730
 
/**
731
 
 * @param address
732
 
 * @param timeout
733
 
 * @return
734
 
 * @throws IOException
735
 
 */
736
 
   public static synchronized MultiplexingManager
737
 
   getaShareableManager(InetSocketAddress address, int timeout) throws IOException
738
 
   {
739
 
      return getaShareableManager(address, timeout, null);
740
 
   }
741
 
 
742
 
 
743
 
/**
744
 
 * @param address
745
 
 * @param timeout
746
 
 * @param conf
747
 
 * @return
748
 
 * @throws IOException
749
 
 */
750
 
   public static synchronized MultiplexingManager
751
 
   getaShareableManager(InetSocketAddress address, int timeout, Map conf) throws IOException
752
 
   {
753
 
      log.debug("entering getaShareableManager(InetSocketAddress address)");
754
 
 
755
 
      // Check each of the shareable MultiplexingManagers connected to the target remote address, looking
756
 
      // for one which is not shutting down.
757
 
      synchronized(shareableMapLock)
758
 
      {
759
 
         HashSet managers = (HashSet) shareableManagers.get(address);
760
 
 
761
 
         if (managers != null && !managers.isEmpty())
762
 
         {
763
 
            Iterator it = managers.iterator();
764
 
            while (it.hasNext())
765
 
            {
766
 
               MultiplexingManager m = (MultiplexingManager) it.next();
767
 
               try
768
 
               {
769
 
                  m.shutdownManager.incrementReferences();
770
 
                  return m;
771
 
               }
772
 
               catch (Exception e)
773
 
               {
774
 
                  log.debug("manager shutting down: " + m);
775
 
               }
776
 
            }
777
 
         }
778
 
      }
779
 
 
780
 
      return new MultiplexingManager(address, timeout, conf);
781
 
   }
782
 
 
783
 
 
784
 
/**
785
 
 * @param address
786
 
 * @param timeout
787
 
 * @param conf
788
 
 * @return
789
 
 * @throws IOException
790
 
 */
791
 
   public static  MultiplexingManager
792
 
   getAnExistingShareableManager(InetSocketAddress address, Map conf)
793
 
   throws IOException
794
 
   {
795
 
      log.debug("entering getAnExistingShareableManager()");
796
 
 
797
 
      // Check each of the shareable MultiplexingManagers connected to the target remote address, looking
798
 
      // for one which is not shutting down.
799
 
      synchronized(shareableMapLock)
800
 
      {
801
 
         HashSet managers = (HashSet) shareableManagers.get(address);
802
 
 
803
 
         if (managers != null && !managers.isEmpty())
804
 
         {
805
 
            Iterator it = managers.iterator();
806
 
 
807
 
            while (it.hasNext())
808
 
            {
809
 
               MultiplexingManager m = (MultiplexingManager) it.next();
810
 
               try
811
 
               {
812
 
                  m.shutdownManager.incrementReferences();
813
 
                  return m;
814
 
               }
815
 
               catch (Exception e)
816
 
               {
817
 
                  log.debug("manager shutting down: " + m);
818
 
               }
819
 
            }
820
 
         }
821
 
      }
822
 
 
823
 
      return null;
824
 
   }
825
 
 
826
 
 
827
 
/**
828
 
 * @param remoteAddress
829
 
 * @param localAddress
830
 
 * @param timeout
831
 
 * @return
832
 
 * @throws IOException
833
 
 */
834
 
   public static synchronized MultiplexingManager
835
 
   getaShareableManagerByAddressPair(InetSocketAddress remoteAddress, InetSocketAddress localAddress, int timeout)
836
 
   throws IOException
837
 
   {
838
 
      return getaShareableManagerByAddressPair(remoteAddress, localAddress, timeout, null);
839
 
   }
840
 
 
841
 
 
842
 
/**
843
 
 * @param remoteAddress
844
 
 * @param localAddress
845
 
 * @param timeout
846
 
 * @param conf
847
 
 * @return
848
 
 * @throws IOException
849
 
 */
850
 
   public static synchronized MultiplexingManager
851
 
   getaShareableManagerByAddressPair(InetSocketAddress remoteAddress, InetSocketAddress localAddress,
852
 
                                     int timeout, Map conf)
853
 
   throws IOException
854
 
   {
855
 
      MultiplexingManager m;
856
 
 
857
 
      // Check each of the shareable MultiplexingManagers connected to the target remote address, looking
858
 
      // for one which is not shutting down.
859
 
      synchronized(shareableMapLock)
860
 
      {
861
 
         HashSet managers = (HashSet) shareableManagers.get(remoteAddress);
862
 
         if (managers != null && !managers.isEmpty())
863
 
         {
864
 
            Iterator it = managers.iterator();
865
 
 
866
 
            while (it.hasNext())
867
 
            {
868
 
               m = (MultiplexingManager) it.next();
869
 
               if (m.getSocket().getLocalAddress().equals(localAddress.getAddress()) &&
870
 
                   m.getSocket().getLocalPort() == localAddress.getPort())
871
 
               {
872
 
                  try
873
 
                  {
874
 
                     m.shutdownManager.incrementReferences();
875
 
                     return m;
876
 
                  }
877
 
                  catch (Exception e)
878
 
                  {
879
 
                     log.debug("manager shutting down: " + m);
880
 
                  }
881
 
               }
882
 
            }
883
 
         }
884
 
      }
885
 
 
886
 
      log.debug("There is no joinable MultiplexingManager. Creating new one.");
887
 
      m = new MultiplexingManager(conf);
888
 
      m.bind(localAddress);
889
 
      return m;
890
 
   }
891
 
 
892
 
 
893
 
/**
894
 
 * @param remoteAddress
895
 
 * @param localAddress
896
 
 * @param conf
897
 
 * @return
898
 
 * @throws IOException
899
 
 */
900
 
   public static MultiplexingManager
901
 
   getAnExistingShareableManagerByAddressPair(InetSocketAddress remoteAddress,
902
 
                                              InetSocketAddress localAddress,
903
 
                                              Map conf)
904
 
   throws IOException
905
 
   {
906
 
      log.debug("entering getaShareableManager(InetSocketAddress address)");
907
 
      MultiplexingManager m;
908
 
 
909
 
      // Check each of the shareable MultiplexingManagers connected to the target remote address, looking
910
 
      // for one which is not shutting down.
911
 
      synchronized(shareableMapLock)
912
 
      {
913
 
         HashSet managers = (HashSet) shareableManagers.get(remoteAddress);
914
 
 
915
 
         if (managers != null && !managers.isEmpty())
916
 
         {
917
 
            Iterator it = managers.iterator();
918
 
 
919
 
            while (it.hasNext())
920
 
            {
921
 
               m = (MultiplexingManager) it.next();
922
 
               if (m.getSocket().getLocalAddress().equals(localAddress.getAddress()) &&
923
 
                   m.getSocket().getLocalPort() == localAddress.getPort())
924
 
               {
925
 
                  try
926
 
                  {
927
 
                     m.shutdownManager.incrementReferences();
928
 
                     return m;
929
 
                  }
930
 
                  catch (Exception e)
931
 
                  {
932
 
                     log.debug("manager shutting down: " + m);
933
 
                  }
934
 
               }
935
 
            }
936
 
         }
937
 
      }
938
 
 
939
 
      return null;
940
 
   }
941
 
 
942
 
 
943
 
/**
944
 
 * @param address
945
 
 * @return
946
 
 * @throws IOException
947
 
 */
948
 
   public static boolean checkForShareableManager(InetSocketAddress address)
949
 
   throws IOException
950
 
   {
951
 
      log.debug("entering checkForShareableManager(InetSocketAddress address)");
952
 
 
953
 
      // Check each if there is at least one shareable MultiplexingManagers connected to the target remote address.
954
 
      synchronized (shareableMapLock)
955
 
      {
956
 
         HashSet managers = (HashSet) shareableManagers.get(address);
957
 
 
958
 
         if (managers != null && !managers.isEmpty())
959
 
            return true;
960
 
 
961
 
         return false;
962
 
      }
963
 
   }
964
 
 
965
 
 
966
 
/**
967
 
 * @param localAddress
968
 
 * @param remoteAddress
969
 
 * @return
970
 
 */
971
 
   public static boolean checkForManagerByAddressPair(InetSocketAddress localAddress,
972
 
                                                      InetSocketAddress remoteAddress)
973
 
   {
974
 
      log.debug("entering checkForManagerByAddressPair()");
975
 
 
976
 
      // Check each of the MultiplexingManagers connected to the target remote address, looking
977
 
      // for one bound to the local address.
978
 
      synchronized(remoteAddressMapLock)
979
 
      {
980
 
         HashSet managers = (HashSet) managersByRemoteAddress.get(remoteAddress);
981
 
 
982
 
         if (managers != null && !managers.isEmpty())
983
 
         {
984
 
            Iterator it = managers.iterator();
985
 
 
986
 
            while (it.hasNext())
987
 
            {
988
 
               MultiplexingManager m = (MultiplexingManager) it.next();
989
 
 
990
 
               if (m.localSocketAddress.equals(localAddress))
991
 
                  return true;
992
 
            }
993
 
         }
994
 
      }
995
 
 
996
 
      return false;
997
 
   }
998
 
 
999
 
 
1000
 
/**
1001
 
 * @param localAddress
1002
 
 * @param remoteAddress
1003
 
 * @return
1004
 
 */
1005
 
   public static boolean checkForShareableManagerByAddressPair(InetSocketAddress localAddress,
1006
 
                                                               InetSocketAddress remoteAddress)
1007
 
   {
1008
 
      log.debug("entering checkForShareableManagerByAddressPair()");
1009
 
 
1010
 
      // Check each of the shareable MultiplexingManagers connected to the target remote address, looking
1011
 
      // for one bound to the local address.
1012
 
      synchronized (shareableMapLock)
1013
 
      {
1014
 
         HashSet managers = (HashSet) shareableManagers.get(remoteAddress);
1015
 
 
1016
 
         if (managers != null && !managers.isEmpty())
1017
 
         {
1018
 
            Iterator it = managers.iterator();
1019
 
 
1020
 
            while (it.hasNext())
1021
 
            {
1022
 
               MultiplexingManager m = (MultiplexingManager) it.next();
1023
 
 
1024
 
               if (m.localSocketAddress.equals(localAddress))
1025
 
                  return true;
1026
 
            }
1027
 
         }
1028
 
      }
1029
 
 
1030
 
      return false;
1031
 
   }
1032
 
 
1033
 
 
1034
 
/**
1035
 
 * @return
1036
 
 */
1037
 
   public static int getStaticThreadMonitorPeriod()
1038
 
   {
1039
 
      return staticThreadsMonitorPeriod;
1040
 
   }
1041
 
 
1042
 
 
1043
 
/**
1044
 
 * @param period
1045
 
 */
1046
 
   public static void setStaticThreadsMonitorPeriod(int period)
1047
 
   {
1048
 
      staticThreadsMonitorPeriod = period;
1049
 
   }
1050
 
 
1051
 
 
1052
 
/**
1053
 
 *
1054
 
 */
1055
 
   protected synchronized static void shutdownThreads()
1056
 
   {
1057
 
      log.debug("entering shutdownThreads");
1058
 
      if (outputThread != null)
1059
 
         outputThread.shutdown();
1060
 
 
1061
 
      if (multiGroupInputThread != null)
1062
 
         multiGroupInputThread.shutdown();
1063
 
 
1064
 
      if (backChannelThread != null)
1065
 
         backChannelThread.shutdown();
1066
 
 
1067
 
      if (pendingActionThread != null)
1068
 
         pendingActionThread.shutdown();
1069
 
 
1070
 
      log.debug("cancelling timer");
1071
 
      if (timer != null)
1072
 
         timer.cancel();
1073
 
 
1074
 
      while (true)
1075
 
      {
1076
 
         try
1077
 
         {
1078
 
            if (outputThread != null)
1079
 
               outputThread.join();
1080
 
 
1081
 
            if (multiGroupInputThread != null)
1082
 
               multiGroupInputThread.join();
1083
 
 
1084
 
            if (backChannelThread != null)
1085
 
               backChannelThread.join();
1086
 
 
1087
 
            if (pendingActionThread != null)
1088
 
               pendingActionThread.join();
1089
 
 
1090
 
            break;
1091
 
         }
1092
 
         catch (InterruptedException ignored) {}
1093
 
      }
1094
 
 
1095
 
      staticThreadsRunning = false;
1096
 
      log.debug("static threads shut down");
1097
 
   }
1098
 
 
1099
 
 
1100
 
/**
1101
 
 * Adds a <code>PendingAction</code> to the list of actions waiting to be executed.
1102
 
 * @param pendingAction
1103
 
 */
1104
 
   protected static void addToPendingActions(PendingAction pendingAction)
1105
 
   {
1106
 
      synchronized (pendingActions)
1107
 
      {
1108
 
         pendingActions.add(pendingAction);
1109
 
         pendingActions.notifyAll();
1110
 
      }
1111
 
   }
1112
 
 
1113
 
 
1114
 
/**
1115
 
 * Binds the wrapped socket.
1116
 
 * @param address
1117
 
 * @throws IOException
1118
 
 */
1119
 
   public synchronized void bind(InetSocketAddress address) throws IOException
1120
 
   {
1121
 
      if (bound)
1122
 
         throw new IOException("socket is already bound");
1123
 
 
1124
 
      if (socket == null)
1125
 
         socket = createSocket();
1126
 
 
1127
 
      if (socket == null)
1128
 
         localSocketAddress = address;
1129
 
      else
1130
 
         socket.bind(address);
1131
 
 
1132
 
      bound = true;
1133
 
   }
1134
 
 
1135
 
/**
1136
 
 * Connects the wrapped socket.
1137
 
 * @param address
1138
 
 * @param timeout
1139
 
 * @throws IOException
1140
 
 */
1141
 
   public synchronized void connect(InetSocketAddress address, int timeout) throws IOException
1142
 
   {
1143
 
      if (connected)
1144
 
      {
1145
 
         if (socket.getRemoteSocketAddress().equals(address))
1146
 
            return;
1147
 
         else
1148
 
            throw new IOException("socket is already connected");
1149
 
      }
1150
 
 
1151
 
      if (debug) log.debug("connecting to: " + address);
1152
 
 
1153
 
      if (socket == null)
1154
 
         socket = createSocket(address, timeout);
1155
 
      else
1156
 
         socket.connect(address, timeout);
1157
 
 
1158
 
      connected = true;
1159
 
      setup();
1160
 
   }
1161
 
 
1162
 
 
1163
 
/**
1164
 
 * Identifies a <code>VirtualServerSocket</code> as the one associated with this virtual socket group.
1165
 
 * @return a <code>MultiplexingInputStream</code> for use by serverSocket.
1166
 
 */
1167
 
   public synchronized MultiplexingInputStream registerServerSocket(ServerSocket serverSocket) throws IOException
1168
 
   {
1169
 
      if (this.serverSocket != null && this.serverSocket != serverSocket)
1170
 
      {
1171
 
         log.error("[" + id + "]: " + "attempt to register a second server socket");
1172
 
         log.error("current server socket: " + this.serverSocket.toString());
1173
 
         log.error("new server socket:     " + serverSocket.toString());
1174
 
         throw new IOException("attempt to register a second server socket");
1175
 
      }
1176
 
 
1177
 
      if (debug) log.debug(serverSocket.toString());
1178
 
      this.serverSocket = serverSocket;
1179
 
      return getAnInputStream(SocketId.SERVER_SOCKET_ID, null);
1180
 
   }
1181
 
 
1182
 
 
1183
 
/**
1184
 
 * Indicates a <code>VirtualServerSocket</code> is leaving this virtual socket group.
1185
 
 * @param serverSocket
1186
 
 * @throws IOException
1187
 
 */
1188
 
   public synchronized void unRegisterServerSocket(ServerSocket serverSocket) throws IOException
1189
 
   {
1190
 
      if (this.serverSocket != serverSocket)
1191
 
      {
1192
 
         log.error("server socket attempting unregister but is not registered");
1193
 
         throw new IOException("server socket is not registered");
1194
 
      }
1195
 
 
1196
 
      log.debug("server socket unregistering");
1197
 
      removeAnInputStream(SocketId.SERVER_SOCKET_ID);
1198
 
      this.serverSocket = null;
1199
 
      shutdownManager.decrementReferences();
1200
 
   }
1201
 
 
1202
 
 
1203
 
/**
1204
 
 * Adds a <code>VirtualSocket</code> to this virtual socket group.
1205
 
 * @param socket
1206
 
 * @return a <code>MultiplexingInputStream</code> for use by socket
1207
 
 * @throws IOException
1208
 
 */
1209
 
   public synchronized MultiplexingInputStream registerSocket(VirtualSocket socket) throws IOException
1210
 
   {
1211
 
      SocketId localSocketId = socket.getLocalSocketId();
1212
 
      VirtualSocket currentSocket = (VirtualSocket) socketMap.put(localSocketId, socket);
1213
 
 
1214
 
      if (currentSocket != null)
1215
 
      {
1216
 
         String errorMessage = "attempting to register socket on currently used port:"
1217
 
                              + currentSocket.getLocalVirtualPort();
1218
 
         log.error(errorMessage);
1219
 
         throw new IOException(errorMessage);
1220
 
      }
1221
 
 
1222
 
      if (debug) log.debug("registering virtual socket on port: " + localSocketId.getPort());
1223
 
      registeredSockets.add(socket.getLocalSocketId());
1224
 
      return getAnInputStream(localSocketId, socket);
1225
 
   }
1226
 
 
1227
 
 
1228
 
/**
1229
 
 * Indicates that a <code>VirtualSocket</code> is leaving a virtual socket group.
1230
 
 * @param socket
1231
 
 * @throws IOException
1232
 
 */
1233
 
   public synchronized void unRegisterSocket(VirtualSocket socket) throws IOException
1234
 
   {
1235
 
      try
1236
 
      {
1237
 
         if (debug) log.debug(this + ": entering unRegisterSocket()");
1238
 
         shutdownManager.decrementReferences();
1239
 
 
1240
 
         SocketId localSocketId = socket.getLocalSocketId();
1241
 
         if (localSocketId == null)
1242
 
            return;
1243
 
 
1244
 
         VirtualSocket currentSocket = (VirtualSocket) socketMap.remove(localSocketId);
1245
 
         if (currentSocket == null)
1246
 
         {
1247
 
            String errorMessage = "attempting to unregister unrecognized socket: " + socket.getLocalSocketId().getPort();
1248
 
            log.error(errorMessage);
1249
 
            throw new IOException(errorMessage);
1250
 
         }
1251
 
 
1252
 
         if (debug) log.debug("unregistering virtual socket on port: " + localSocketId.getPort());
1253
 
         registeredSockets.remove(localSocketId);
1254
 
         removeAnInputStream(localSocketId);
1255
 
         if (debug) log.debug(this + ": leaving unRegisterSocket()");
1256
 
      }
1257
 
      finally
1258
 
      {
1259
 
         socket.close();
1260
 
      }
1261
 
   }
1262
 
 
1263
 
 
1264
 
 /**
1265
 
  * Indicates that a <code>VirtualServerSocket</code> belongs the virtual socket group at the
1266
 
  * remote end of the connection.  This virtual socket groupD becomes "joinable", in the
1267
 
  * sense that a new <code>VirtualSocket</code> <it>v</it> can join this virtual group because there is a
1268
 
  * <code>VirtualServerSocket</code> at the other end of the connection to create a remote peer for
1269
 
  * <code>v</code>.
1270
 
  */
1271
 
    public synchronized void registerRemoteServerSocket() throws IOException
1272
 
    {
1273
 
       log.debug("registerRemoteServerSocket()");
1274
 
       if (remoteServerSocketRegistered)
1275
 
       {
1276
 
          log.error("duplicate remote server socket registration");
1277
 
          throw new IOException("duplicate remote server socket registration");
1278
 
       }
1279
 
       else
1280
 
       {
1281
 
          remoteServerSocketRegistered = true;
1282
 
 
1283
 
          // Now that remote MultiplexingManager has a VirtualServerSocket, we
1284
 
          // add it to set of managers eligible to accept new clients.
1285
 
          registerShareable(remoteSocketAddress);
1286
 
 
1287
 
          synchronized (threadsWaitingForRemoteServerSocket)
1288
 
          {
1289
 
             threadsWaitingForRemoteServerSocket.notifyAll();
1290
 
          }
1291
 
 
1292
 
          if (!createdForRemoteServerSocket)
1293
 
             incrementReferences();
1294
 
       }
1295
 
    }
1296
 
 
1297
 
 
1298
 
/**
1299
 
 * Indicates there will no longer be a <code>VirtualServeSocket</code> at the remote end of
1300
 
 * this connection.  This virtual socket group will no longer be "joinable".
1301
 
 * (See registerRemoteServerSocket().)
1302
 
 */
1303
 
   public synchronized void unRegisterRemoteServerSocket()
1304
 
   {
1305
 
      if (!remoteServerSocketRegistered)
1306
 
         log.error("no remote server socket is registered");
1307
 
      else
1308
 
      {
1309
 
         if (debug) log.debug(this + ": remote VSS unregistering");
1310
 
         remoteServerSocketRegistered = false;
1311
 
         unregisterShareable();
1312
 
 
1313
 
         MultiplexingManager.addToPendingActions
1314
 
         (
1315
 
               new PendingAction()
1316
 
               {
1317
 
                  void doAction()
1318
 
                  {
1319
 
                     try
1320
 
                     {
1321
 
                        decrementReferences();
1322
 
                     }
1323
 
                     catch (IOException e)
1324
 
                     {
1325
 
                        log.error(e);
1326
 
                     }
1327
 
                  }
1328
 
               }
1329
 
         );
1330
 
      }
1331
 
   }
1332
 
 
1333
 
 
1334
 
   public void setCreatedForRemoteServerSocket()
1335
 
   {
1336
 
      createdForRemoteServerSocket = true;
1337
 
   }
1338
 
 
1339
 
 
1340
 
   public synchronized boolean isRemoteServerSocketRegistered()
1341
 
   {
1342
 
      return remoteServerSocketRegistered;
1343
 
   }
1344
 
 
1345
 
 
1346
 
   public boolean waitForRemoteServerSocketRegistered()
1347
 
   {
1348
 
      if (remoteServerSocketRegistered)
1349
 
         return true;
1350
 
 
1351
 
      synchronized (threadsWaitingForRemoteServerSocket)
1352
 
      {
1353
 
         threadsWaitingForRemoteServerSocket.add(Thread.currentThread());
1354
 
 
1355
 
         while (!remoteServerSocketRegistered)
1356
 
         {
1357
 
            try
1358
 
            {
1359
 
               threadsWaitingForRemoteServerSocket.wait();
1360
 
            }
1361
 
            catch (InterruptedException e)
1362
 
            {
1363
 
               log.info("interrupted waiting for registration of remote server socket");
1364
 
 
1365
 
               if (shutdown)
1366
 
               {
1367
 
                  threadsWaitingForRemoteServerSocket.remove(Thread.currentThread());
1368
 
                  return false;
1369
 
               }
1370
 
            }
1371
 
         }
1372
 
      }
1373
 
 
1374
 
      threadsWaitingForRemoteServerSocket.remove(Thread.currentThread());
1375
 
      return true;
1376
 
   }
1377
 
 
1378
 
/**
1379
 
 * Increment reference counter for this <code>MultiplexingManager</code>.
1380
 
 * @throws IOException
1381
 
 */
1382
 
   public void incrementReferences() throws IOException
1383
 
   {
1384
 
      shutdownManager.incrementReferences();
1385
 
   }
1386
 
 
1387
 
 
1388
 
/**
1389
 
 * Decrement reference counter for this <code>MultiplexingManager</code>.
1390
 
 * @throws IOException
1391
 
 */
1392
 
   public void decrementReferences() throws IOException
1393
 
   {
1394
 
      shutdownManager.decrementReferences();
1395
 
   }
1396
 
 
1397
 
 
1398
 
/**
1399
 
 *
1400
 
 * @return
1401
 
 */
1402
 
   public Collection getAllOutputStreams()
1403
 
   {
1404
 
      return outputStreamMap.values();
1405
 
   }
1406
 
 
1407
 
 
1408
 
/**
1409
 
 * Returns<code> OutputStream</code> to use when corrupted InputStream gives no known "mailbox"
1410
 
 * @return OutputStream to use when corrupted InputStream gives no known "mailbox"
1411
 
 */
1412
 
   public OutputStream getDeadLetterOutputStream()
1413
 
   {
1414
 
      return deadLetterOutputStream;
1415
 
   }
1416
 
 
1417
 
 
1418
 
/**
1419
 
 * Returns <code>InputStream</code> of real socket
1420
 
 * @return InputStream of real socket
1421
 
 */
1422
 
   public InputStream getInputStream()
1423
 
   {
1424
 
      return inputStream;
1425
 
   }
1426
 
 
1427
 
 
1428
 
/**
1429
 
 * Returns <code>OutputStream</code> of real socket
1430
 
 * @return OutputStream of real socket
1431
 
 */
1432
 
   public OutputStream getOutputStream()
1433
 
   {
1434
 
      return outputStream;
1435
 
   }
1436
 
 
1437
 
/**
1438
 
 * Get an <code>InputStream</code> for a <code>VirtualSocket</code>.
1439
 
 * @param socketId
1440
 
 * @param socket
1441
 
 * @return
1442
 
 * @throws IOException
1443
 
 */
1444
 
   public MultiplexingInputStream getAnInputStream(SocketId socketId, VirtualSocket socket) throws IOException
1445
 
   {
1446
 
      if (debug) log.debug("getAnInputStream(): " + socketId.getPort());
1447
 
      MultiplexingInputStream mis = (MultiplexingInputStream) inputStreamMap.get(socketId);
1448
 
 
1449
 
      if (mis != null)
1450
 
      {
1451
 
         if (mis.getSocket() == null)
1452
 
            mis.setSocket(socket);
1453
 
         return mis;
1454
 
      }
1455
 
 
1456
 
      GrowablePipedOutputStream pos = (GrowablePipedOutputStream) outputStreamMap.get(socketId);
1457
 
      if (pos == null)
1458
 
      {
1459
 
         pos = new GrowablePipedOutputStream();
1460
 
         outputStreamMap.put(socketId, pos);
1461
 
      }
1462
 
 
1463
 
      mis = new MultiplexingInputStream(pos, this, socket);
1464
 
      inputStreamMap.put(socketId, mis);
1465
 
      if (readException != null)
1466
 
         mis.setReadException(readException);
1467
 
 
1468
 
      return mis;
1469
 
   }
1470
 
 
1471
 
/**
1472
 
 * Get an <code>OutputStrem</code> for a <code>VirtualSocket</code>.
1473
 
 * @param socketId
1474
 
 * @return
1475
 
 */
1476
 
   public GrowablePipedOutputStream getAnOutputStream(SocketId socketId)
1477
 
   {
1478
 
      if (debug) log.debug("getAnOutputStream(): " + socketId.getPort());
1479
 
 
1480
 
      GrowablePipedOutputStream pos = (GrowablePipedOutputStream) outputStreamMap.get(socketId);
1481
 
      if (pos == null)
1482
 
      {
1483
 
         pos = new GrowablePipedOutputStream();
1484
 
         outputStreamMap.put(socketId, pos);
1485
 
      }
1486
 
 
1487
 
      return pos;
1488
 
   }
1489
 
 
1490
 
 
1491
 
/**
1492
 
 * @param socketId
1493
 
 * @return
1494
 
 */
1495
 
   public MultiplexingOutputStream getAnOutputStream(VirtualSocket socket, SocketId socketId)
1496
 
   {
1497
 
      MultiplexingOutputStream mos = new MultiplexingOutputStream(this, socket, socketId);
1498
 
      outputStreamSet.add(mos);
1499
 
      if (writeException != null)
1500
 
         mos.setWriteException(writeException);
1501
 
      return mos;
1502
 
   }
1503
 
 
1504
 
 
1505
 
/**
1506
 
 * Returns a <code>GrowablePipedOutputStream</code> that is connected to a
1507
 
 * <code>MultiplexingInputStream</code>.
1508
 
 * It will create the <code>MultiplexingInputStream</code> if necessary.  This method exists to
1509
 
 * allow <code>InputMultiplexor</code> to get a place to put bytes directed to an unrecognized
1510
 
 * SocketId, which might be necessary if the remote socket starts writing before this
1511
 
 * end of the connection is ready.  Originally, we had a three step handshake, in which
1512
 
 * (1) the client socket sent a "connect" message, (2) the server socket sent a "connected"
1513
 
 * message, and (3) the client socket sent a "connect verify" message.  In the interests
1514
 
 * of performance we eliminated the final step.
1515
 
 *
1516
 
 * @param socketId
1517
 
 * @return
1518
 
 * @throws IOException
1519
 
 */
1520
 
   public GrowablePipedOutputStream getConnectedOutputStream(SocketId socketId) throws IOException
1521
 
   {
1522
 
      if (debug) log.debug("getConnectedOutputStream(): " + socketId.getPort());
1523
 
 
1524
 
      MultiplexingInputStream mis = (MultiplexingInputStream) inputStreamMap.get(socketId);
1525
 
      if (mis != null)
1526
 
      {
1527
 
         GrowablePipedOutputStream pos = (GrowablePipedOutputStream) outputStreamMap.get(socketId);
1528
 
         if (pos == null)
1529
 
         {
1530
 
            StringBuffer message = new StringBuffer();
1531
 
            message.append("MultiplexingInputStream exists ")
1532
 
                   .append("without matching GrowablePipedOutputStream: ")
1533
 
                   .append("socketId = ").append(socketId);
1534
 
            throw new IOException(message.toString());
1535
 
         }
1536
 
         return pos;
1537
 
      }
1538
 
 
1539
 
      GrowablePipedOutputStream pos = (GrowablePipedOutputStream) outputStreamMap.get(socketId);
1540
 
      if (pos == null)
1541
 
      {
1542
 
         pos = new GrowablePipedOutputStream();
1543
 
         outputStreamMap.put(socketId, pos);
1544
 
      }
1545
 
 
1546
 
      mis = new MultiplexingInputStream(pos, this);
1547
 
      inputStreamMap.put(socketId, mis);
1548
 
      return pos;
1549
 
   }
1550
 
 
1551
 
 
1552
 
/**
1553
 
 * @return
1554
 
 */
1555
 
   public OutputStream getBackchannelOutputStream()
1556
 
   {
1557
 
      return backchannelOutputStream;
1558
 
   }
1559
 
 
1560
 
 
1561
 
/**
1562
 
 * @return handshakeCompletedEvent
1563
 
 */
1564
 
   public HandshakeCompletedEvent getHandshakeCompletedEvent()
1565
 
   {
1566
 
      return handshakeCompletedEvent;
1567
 
   }
1568
 
/**
1569
 
 *
1570
 
 * @return
1571
 
 */
1572
 
   public OutputMultiplexor getOutputMultiplexor()
1573
 
   {
1574
 
      return outputMultiplexor;
1575
 
   }
1576
 
 
1577
 
 
1578
 
/**
1579
 
 *
1580
 
 * @param socketId
1581
 
 * @return
1582
 
 */
1583
 
   public OutputStream getOutputStreamByLocalSocket(SocketId socketId)
1584
 
   {
1585
 
      return (OutputStream) outputStreamMap.get(socketId);
1586
 
   }
1587
 
 
1588
 
 
1589
 
/**
1590
 
 *
1591
 
 * @return
1592
 
 */
1593
 
   public Protocol getProtocol()
1594
 
   {
1595
 
      return protocol;
1596
 
   }
1597
 
 
1598
 
 
1599
 
/**
1600
 
 * @return
1601
 
 */
1602
 
   public synchronized ServerSocket getServerSocket()
1603
 
   {
1604
 
      return serverSocket;
1605
 
   }
1606
 
 
1607
 
 
1608
 
/**
1609
 
 *
1610
 
 * @return
1611
 
 */
1612
 
   public Socket getSocket()
1613
 
   {
1614
 
      return socket;
1615
 
   }
1616
 
 
1617
 
 
1618
 
/**
1619
 
 *
1620
 
 * @param socketId
1621
 
 * @return
1622
 
 */
1623
 
   public VirtualSocket getSocketByLocalPort(SocketId socketId)
1624
 
   {
1625
 
      return (VirtualSocket) socketMap.get(socketId);
1626
 
   }
1627
 
 
1628
 
 
1629
 
/**
1630
 
 * @return
1631
 
 */
1632
 
   public SocketFactory getSocketFactory()
1633
 
   {
1634
 
      return socketFactory;
1635
 
   }
1636
 
 
1637
 
 
1638
 
/**
1639
 
 * To implement <code>HandshakeCompletedListener</code> interface.
1640
 
 */
1641
 
   public void handshakeCompleted(HandshakeCompletedEvent event)
1642
 
   {
1643
 
      description = socket.toString();
1644
 
 
1645
 
      handshakeCompletedEvent = event;
1646
 
      Object obj = configuration.get(Multiplex.SSL_HANDSHAKE_LISTENER);
1647
 
      if (obj != null)
1648
 
      {
1649
 
         HandshakeCompletedListener listener = (HandshakeCompletedListener) obj;
1650
 
         listener.handshakeCompleted(event);
1651
 
      }
1652
 
   }
1653
 
 
1654
 
 
1655
 
/**
1656
 
 * @return
1657
 
 */
1658
 
   public boolean isBound()
1659
 
   {
1660
 
      return bound;
1661
 
   }
1662
 
 
1663
 
 
1664
 
/**
1665
 
 * @return
1666
 
 */
1667
 
   public boolean isConnected()
1668
 
   {
1669
 
      return connected;
1670
 
   }
1671
 
 
1672
 
 
1673
 
/**
1674
 
 * @return
1675
 
 */
1676
 
   public synchronized boolean isServerSocketRegistered()
1677
 
   {
1678
 
      return serverSocket != null;
1679
 
   }
1680
 
 
1681
 
 
1682
 
/**
1683
 
 *
1684
 
 */
1685
 
   public boolean isShutdown()
1686
 
   {
1687
 
      return shutdown;
1688
 
   }
1689
 
 
1690
 
 
1691
 
/**
1692
 
 *
1693
 
 * @param socketId
1694
 
 * @return
1695
 
 * TODO: isn't used
1696
 
 */
1697
 
   public synchronized boolean isSocketRegistered(SocketId socketId)
1698
 
   {
1699
 
      return registeredSockets.contains(socketId);
1700
 
   }
1701
 
 
1702
 
 
1703
 
/**
1704
 
 * @return
1705
 
 */
1706
 
   public boolean respondToShutdownRequest()
1707
 
   {
1708
 
      return shutdownManager.respondToShutdownRequest();
1709
 
   }
1710
 
 
1711
 
 
1712
 
/**
1713
 
 * @param socketFactory
1714
 
 */
1715
 
   public void setSocketFactory(SocketFactory socketFactory)
1716
 
   {
1717
 
      this.socketFactory = socketFactory;
1718
 
   }
1719
 
 
1720
 
 
1721
 
/**
1722
 
 * @return
1723
 
 */
1724
 
   public int getShutdownMonitorPeriod()
1725
 
   {
1726
 
      return shutdownMonitorPeriod;
1727
 
   }
1728
 
 
1729
 
 
1730
 
/**
1731
 
 * @return
1732
 
 */
1733
 
   public int getShutdownRefusalsMaximum()
1734
 
   {
1735
 
      return shutdownRefusalsMaximum;
1736
 
   }
1737
 
 
1738
 
 
1739
 
/**
1740
 
 * @return
1741
 
 */
1742
 
   public int getShutdownRequestTimeout()
1743
 
   {
1744
 
      return shutdownRequestTimeout;
1745
 
   }
1746
 
 
1747
 
 
1748
 
/**
1749
 
 * @param timeout
1750
 
 */
1751
 
   public void setShutdownRequestTimeout(int timeout)
1752
 
   {
1753
 
      shutdownRequestTimeout = timeout;
1754
 
   }
1755
 
 
1756
 
 
1757
 
/**
1758
 
 * @param maximum
1759
 
 */
1760
 
   public void setShutdownRefusalsMaximum(int maximum)
1761
 
   {
1762
 
      shutdownRefusalsMaximum = maximum;
1763
 
   }
1764
 
 
1765
 
 
1766
 
/**
1767
 
 * @param period
1768
 
 */
1769
 
   public void setShutdownMonitorPeriod(int period)
1770
 
   {
1771
 
      shutdownMonitorPeriod = period;
1772
 
   }
1773
 
 
1774
 
 
1775
 
/**
1776
 
 * Needed to implement <code>OutputMultiplexor.OutputMultiplexorClient</code>
1777
 
 */
1778
 
   public synchronized void outputFlushed()
1779
 
   {
1780
 
      if (shutdownThread != null)
1781
 
         shutdownThread.setSafeToShutdown(true);
1782
 
      notifyAll();
1783
 
   }
1784
 
 
1785
 
 
1786
 
   public String toString()
1787
 
   {
1788
 
      if (description != null)
1789
 
         return description;
1790
 
      return super.toString();
1791
 
   }
1792
 
 
1793
 
/********************************************************************************************
1794
 
 *                      protected methods and classes
1795
 
 ********************************************************************************************/
1796
 
 
1797
 
 
1798
 
   protected Socket createSocket(InetSocketAddress endpoint, int timeout) throws IOException
1799
 
   {
1800
 
      Socket socket = null;
1801
 
 
1802
 
      if (localSocketAddress == null)
1803
 
      {
1804
 
         if (socketFactory != null)
1805
 
            socket = socketFactory.createSocket(endpoint.getAddress(), endpoint.getPort());
1806
 
         else
1807
 
            socket = SocketChannel.open(endpoint).socket();
1808
 
      }
1809
 
      else
1810
 
      {
1811
 
         // It's possible that bind() was called, but a socket hasn't been created yet, since
1812
 
         // SSLSocketFactory will not create an unconnected socket.  In that case, bind() just
1813
 
         // saved localSocketAddress for later use.
1814
 
         if (socketFactory != null)
1815
 
            socket = socketFactory.createSocket(endpoint.getAddress(),
1816
 
                                                endpoint.getPort(),
1817
 
                                                localSocketAddress.getAddress(),
1818
 
                                                localSocketAddress.getPort());
1819
 
         else
1820
 
         {
1821
 
            socket = SocketChannel.open().socket();
1822
 
            socket.bind(localSocketAddress);
1823
 
            socket.connect(endpoint);
1824
 
         }
1825
 
      }
1826
 
 
1827
 
      if (socket instanceof SSLSocket)
1828
 
      {
1829
 
//         Object o = configuration.get(Multiplex.SSL_HANDSHAKE_LISTENER);
1830
 
//         if (o != null)
1831
 
//         {
1832
 
//            HandshakeCompletedListener hcl = (HandshakeCompletedListener) o;
1833
 
//            ((SSLSocket) socket).addHandshakeCompletedListener(hcl);
1834
 
//         }
1835
 
         ((SSLSocket) socket).addHandshakeCompletedListener(this);
1836
 
      }
1837
 
 
1838
 
      socket.setSoTimeout(timeout);
1839
 
      return socket;
1840
 
   }
1841
 
 
1842
 
   protected Socket createSocket() throws IOException
1843
 
   {
1844
 
      Socket socket = null;
1845
 
 
1846
 
      try
1847
 
      {
1848
 
         if (socketFactory != null)
1849
 
            socket = socketFactory.createSocket();
1850
 
         else
1851
 
            socket = SocketChannel.open().socket();
1852
 
 
1853
 
         if (socket instanceof SSLSocket)
1854
 
         {
1855
 
//            Object o = configuration.get(Multiplex.SSL_HANDSHAKE_LISTENER);
1856
 
//            if (o != null)
1857
 
//            {
1858
 
//               HandshakeCompletedListener hcl = (HandshakeCompletedListener) o;
1859
 
//               ((SSLSocket) socket).addHandshakeCompletedListener(hcl);
1860
 
//            }
1861
 
            ((SSLSocket) socket).addHandshakeCompletedListener(this);
1862
 
         }
1863
 
      }
1864
 
      catch (IOException e)
1865
 
      {
1866
 
         if ("Unconnected sockets not implemented".equals(e.getMessage()))
1867
 
            return null;
1868
 
         throw e;
1869
 
      }
1870
 
 
1871
 
      return socket;
1872
 
   }
1873
 
 
1874
 
 
1875
 
/**
1876
 
 * @param address
1877
 
 */
1878
 
   protected void registerByLocalAddress(InetSocketAddress address)
1879
 
   {
1880
 
      synchronized (localAddressMapLock)
1881
 
      {
1882
 
         localSocketAddress = address;
1883
 
         HashSet managers = (HashSet) managersByLocalAddress.get(address);
1884
 
 
1885
 
         if (managers == null)
1886
 
         {
1887
 
            managers = new HashSet();
1888
 
            managersByLocalAddress.put(address, managers);
1889
 
         }
1890
 
 
1891
 
         managers.add(this);
1892
 
 
1893
 
         // allow searching on any local address
1894
 
         localWildCardAddress = new InetSocketAddress(address.getPort());
1895
 
         managers = (HashSet) managersByLocalAddress.get(localWildCardAddress);
1896
 
 
1897
 
         if (managers == null)
1898
 
         {
1899
 
            managers = new HashSet();
1900
 
            managersByLocalAddress.put(localWildCardAddress, managers);
1901
 
         }
1902
 
 
1903
 
         managers.add(this);
1904
 
      }
1905
 
   }
1906
 
 
1907
 
 
1908
 
/**
1909
 
 *
1910
 
 */
1911
 
   protected void unregisterByLocalAddress()
1912
 
   {
1913
 
      synchronized (localAddressMapLock)
1914
 
      {
1915
 
         HashSet managers = null;
1916
 
 
1917
 
         if (localSocketAddress != null)
1918
 
         {
1919
 
            managers = (HashSet) managersByLocalAddress.get(localSocketAddress);
1920
 
 
1921
 
            if (managers != null)
1922
 
            {
1923
 
               managers.remove(this);
1924
 
 
1925
 
               if (managers.isEmpty())
1926
 
                  managersByLocalAddress.remove(localSocketAddress);
1927
 
            }
1928
 
         }
1929
 
 
1930
 
         if (localWildCardAddress != null)
1931
 
         {
1932
 
            managers = (HashSet) managersByLocalAddress.get(localWildCardAddress);
1933
 
 
1934
 
            if (managers != null)
1935
 
            {
1936
 
               managers.remove(this);
1937
 
 
1938
 
               if (managers.isEmpty())
1939
 
                  managersByLocalAddress.remove(localWildCardAddress);
1940
 
            }
1941
 
         }
1942
 
      }
1943
 
   }
1944
 
 
1945
 
 
1946
 
/**
1947
 
 *
1948
 
 * @param address
1949
 
 */
1950
 
   protected void registerByRemoteAddress(InetSocketAddress address)
1951
 
   {
1952
 
      remoteSocketAddress = address;
1953
 
 
1954
 
      synchronized (remoteAddressMapLock)
1955
 
      {
1956
 
         HashSet managers = (HashSet) managersByRemoteAddress.get(address);
1957
 
 
1958
 
         if (managers == null)
1959
 
         {
1960
 
            managers = new HashSet();
1961
 
            managers.add(this);
1962
 
            managersByRemoteAddress.put(address, managers);
1963
 
         }
1964
 
         else
1965
 
            managers.add(this);
1966
 
      }
1967
 
   }
1968
 
 
1969
 
 
1970
 
/**
1971
 
 *
1972
 
 */
1973
 
   protected void unregisterByRemoteAddress()
1974
 
   {
1975
 
      if (remoteSocketAddress != null)
1976
 
      {
1977
 
         synchronized (remoteAddressMapLock)
1978
 
         {
1979
 
            HashSet managers = (HashSet) managersByRemoteAddress.get(remoteSocketAddress);
1980
 
 
1981
 
            if (managers != null)
1982
 
            {
1983
 
               managers.remove(this);
1984
 
 
1985
 
               if (managers.isEmpty())
1986
 
                  managersByRemoteAddress.remove(remoteSocketAddress);
1987
 
            }
1988
 
         }
1989
 
      }
1990
 
   }
1991
 
 
1992
 
 
1993
 
/**
1994
 
 *
1995
 
 * @param address
1996
 
 */
1997
 
   protected void registerShareable(InetSocketAddress address)
1998
 
   {
1999
 
      if (debug) log.debug("registering as shareable: " + this + ": " +  address.toString());
2000
 
      synchronized (shareableMapLock)
2001
 
      {
2002
 
         HashSet managers = (HashSet) shareableManagers.get(address);
2003
 
 
2004
 
         if (managers == null)
2005
 
         {
2006
 
            managers = new HashSet();
2007
 
            managers.add(this);
2008
 
            shareableManagers.put(address, managers);
2009
 
         }
2010
 
         else
2011
 
            managers.add(this);
2012
 
      }
2013
 
   }
2014
 
 
2015
 
 
2016
 
/**
2017
 
 *
2018
 
 */
2019
 
   protected void unregisterShareable()
2020
 
   {
2021
 
      if (debug) log.debug("unregistering remote: " + this + ": " + description);
2022
 
      if (remoteSocketAddress != null)
2023
 
      {
2024
 
         synchronized (shareableMapLock)
2025
 
         {
2026
 
            HashSet managers = (HashSet) shareableManagers.get(remoteSocketAddress);
2027
 
 
2028
 
            if (managers != null)
2029
 
            {
2030
 
               managers.remove(this);
2031
 
 
2032
 
               if (managers.isEmpty())
2033
 
                  shareableManagers.remove(remoteSocketAddress);
2034
 
            }
2035
 
         }
2036
 
      }
2037
 
   }
2038
 
 
2039
 
 
2040
 
/*
2041
 
 *
2042
 
 */
2043
 
   protected void unregisterAllMaps()
2044
 
   {
2045
 
      unregisterByLocalAddress();
2046
 
      unregisterByRemoteAddress();
2047
 
      unregisterShareable();
2048
 
   }
2049
 
 
2050
 
/**
2051
 
 * @param socketId
2052
 
 */
2053
 
   protected void removeAnInputStream(SocketId socketId)
2054
 
   {
2055
 
      if (debug) log.debug("entering removeAnInputStream(): " + socketId.getPort());
2056
 
      InputStream is = (InputStream) inputStreamMap.remove(socketId);
2057
 
      OutputStream os = (OutputStream) outputStreamMap.remove(socketId);
2058
 
 
2059
 
      if (is != null)
2060
 
      {
2061
 
         try
2062
 
         {
2063
 
            is.close();
2064
 
         }
2065
 
         catch (Exception ignored)
2066
 
         {
2067
 
            log.error("error closing PipedInputStream (" + socket.getPort() + ")", ignored);
2068
 
         }
2069
 
      }
2070
 
 
2071
 
      if (os != null)
2072
 
      {
2073
 
         try
2074
 
         {
2075
 
            os.close();
2076
 
         }
2077
 
         catch (Exception ignored)
2078
 
         {
2079
 
            log.error("error closing PipedOutputStream (" + socket.getPort() + ")", ignored);
2080
 
         }
2081
 
      }
2082
 
   }
2083
 
 
2084
 
 
2085
 
   protected void setReadException(IOException e)
2086
 
   {
2087
 
      // Note.  It looks like there could be a race between setReadException() and
2088
 
      // getAnInputStream().  However, suppose getAnInputStream() gets to its test
2089
 
      // (readException != null) before readException is set here. Then if it created a
2090
 
      // new InputStream "is", setReadException() will see "is" in inputStreamMap and will
2091
 
      // set its read exception.  Suppose getAnInputStream gets to its test
2092
 
      // (readException != null) after setReadException() sets readException.  Then
2093
 
      // if it created a new InputStream "is", it will set the read exception for "is".
2094
 
 
2095
 
      // Remove from shareable map (if it's in map).
2096
 
      unregisterAllMaps();
2097
 
      notifySocketsOfException();
2098
 
 
2099
 
      // Unregister with input thread.
2100
 
      if (multiGroupInputThread != null)
2101
 
         multiGroupInputThread.unregisterSocketGroup(this);
2102
 
 
2103
 
      readException = e;
2104
 
 
2105
 
      HashSet tempSet;
2106
 
      synchronized (inputStreamMap)
2107
 
      {
2108
 
         tempSet = new HashSet(inputStreamMap.values());
2109
 
      }
2110
 
 
2111
 
      Iterator it = tempSet.iterator();
2112
 
      while (it.hasNext())
2113
 
      {
2114
 
         MultiplexingInputStream is = (MultiplexingInputStream) it.next();
2115
 
         is.setReadException(e);
2116
 
      }
2117
 
   }
2118
 
 
2119
 
 
2120
 
   protected void setWriteException(IOException e)
2121
 
   {
2122
 
      // Note.  See Note in setReadException().
2123
 
      // If this connection is unusable, take out of shareable map (if it's in shareable map).
2124
 
      unregisterAllMaps();
2125
 
      notifySocketsOfException();
2126
 
 
2127
 
      // Unregister with output thread.
2128
 
      outputMultiplexor.unregister(this);
2129
 
 
2130
 
      writeException = e;
2131
 
 
2132
 
      HashSet tempSet;
2133
 
      synchronized (outputStreamMap)
2134
 
      {
2135
 
         tempSet = new HashSet(outputStreamSet);
2136
 
      }
2137
 
 
2138
 
      Iterator it = tempSet.iterator();
2139
 
      while (it.hasNext())
2140
 
      {
2141
 
         MultiplexingOutputStream os = (MultiplexingOutputStream) it.next();
2142
 
         os.setWriteException(e);
2143
 
      }
2144
 
   }
2145
 
 
2146
 
   
2147
 
   protected void notifySocketsOfException()
2148
 
   {
2149
 
      synchronized (socketMap)
2150
 
      {
2151
 
         Iterator it = socketMap.values().iterator();
2152
 
         while (it.hasNext())
2153
 
            ((VirtualSocket) it.next()).notifyOfException();
2154
 
      }
2155
 
   }
2156
 
 
2157
 
 
2158
 
   protected void setEOF()
2159
 
   {
2160
 
      // Note.  See Note in setReadException().
2161
 
      log.debug("setEOF()");
2162
 
      HashSet tempSet;
2163
 
      synchronized (inputStreamMap)
2164
 
      {
2165
 
         tempSet = new HashSet(inputStreamMap.values());
2166
 
      }
2167
 
 
2168
 
      Iterator it = tempSet.iterator();
2169
 
      while (it.hasNext())
2170
 
      {
2171
 
         MultiplexingInputStream is = (MultiplexingInputStream) it.next();
2172
 
         try
2173
 
         {
2174
 
            is.handleRemoteShutdown();
2175
 
         }
2176
 
         catch (IOException e)
2177
 
         {
2178
 
            log.error(e);
2179
 
         }
2180
 
      }
2181
 
   }
2182
 
 
2183
 
 
2184
 
/**
2185
 
 *
2186
 
 */
2187
 
   protected synchronized void shutdown()
2188
 
   {
2189
 
      if (debug) log.debug(description + ": entering shutdown()");
2190
 
      shutdownThread = new ShutdownThread();
2191
 
      shutdownThread.setName(shutdownThread.getName() + ":shutdown");
2192
 
      shutdownThread.start();
2193
 
   }
2194
 
 
2195
 
 
2196
 
/**
2197
 
 * The motivation behind this class is to prevent the following problem.  Suppose MultiplexingManager A
2198
 
 * is connected to MultiplexingManager B, and A decides to shut down.  Suppose A shuts down before B knows A
2199
 
 * is shutting down, and suppose a VirtualSocket C starts up, finds B, and attaches itself to B.  Then C
2200
 
 * will have "died a-borning," in the words of Tom Paxton.  We need a handshake protocol to ensure a
2201
 
 * proper shutdown process.  In the following, let A be the local MultiplexingManager and let B be its
2202
 
 * remote peer.
2203
 
 *
2204
 
 * There are two forms of synchronization in ShutdownManager.  incrementReferences() and decrementReferences()
2205
 
 * maintain the reference count to its MultiplexingManager, and of course the changes to variable
2206
 
 * referenceCount have to be synchronized.  Parallel to incrementReferences() and decrementReferences() are
2207
 
 * the pair of methods reseverManager() and unreserveManager(), which are similar but intended for holding a
2208
 
 * MultiplexingManger for more temporary applications.  See, for example, getaManagerByRemoteAddress().
2209
 
 *
2210
 
 * There is also a need for distributed synchronization. When decrementReferences() on A decrements the
2211
 
 * referenceCount to 0, it indicates to B the desire of A to shut down. Since all of the virtual sockets
2212
 
 * on A are connected to virtual sockets on B, normally the request would be honored.  However, if a new virtual
2213
 
 * socket attaches itself to B, then it would have > 0 clients, and it would refuse the request to shut down.
2214
 
 * In any case, the request is made through Protocol.requestManagerShutdown(), which results in a call to
2215
 
 * ShutdownManager.respondToShutdownRequest() on B, which is synchronized since it reads the
2216
 
 * readyToShutdown variable, which is modified by decrementReferences().  Here lies the danger of
2217
 
 * distributed deadlock.  If decrementReferences() on A and B both start executing at about the same time, they
2218
 
 * would both be waiting for a response from respondToShutdownRequest().  But each respondToShutdownRequest()
2219
 
 * would be locked out because each decrementReferences() is blocked on i/o.
2220
 
 *
2221
 
 * The solution is to put the i/o operation in a separate thread, ShutdownRequestThread, and have decrementReferences()
2222
 
 * enter a wait state, allowing respondToShutdownRequest() to execute.  So, on each end respondToShutdownRequest()
2223
 
 * will return an answer ("go ahead and shut down", in particular), and each ShutdownRequestThread.run() will wake up
2224
 
 * the waiting decrementReferences(), which will then run to completion.
2225
 
 *
2226
 
 * Note also that while decrementReferences() is waiting, incrementReferences() can run.  However, before it waits,
2227
 
 * decrementReferences() sets the shutdownRequestInProgress flag, and if incrementReferences() finds the flag set, it
2228
 
 * will also enter a wait state and will take no action until the outstanding shutdown request is accepted or
2229
 
 * rejected.
2230
 
 *
2231
 
 * Another issue is what to do if MultiplexingManager B responds negatively to A's request to shut down, not
2232
 
 * because it has a new client but simply because some of its virtual sockets just haven't gotten around to
2233
 
 * closing yet.  When B's referenceCount finally goes to 0, it will send a shutdown request to A, and if A's
2234
 
 * referenceCount is still 0, B will shut down.  But what about B?  If decrementReferences() gets a negative
2235
 
 * response, it will start up a ShutdownMonitorThread, which, as long as readyToShutdown remains true, will
2236
 
 * periodically check to see if remoteShutdown has been set to true.  If it has, ShutdownMonitorThread
2237
 
 * initiates the shut down of its enclosing MultiplexingManager.
2238
 
 *
2239
 
 * reserveManager() interacts with decrementReferences() by preventing decrementReferences() from committing to
2240
 
 * shutting down.  If reserveManager() runs first, it sets the flag reserved to true, which causes
2241
 
 * decrementReferences() to return without checking for referenceCount == 0.  If decrementReferences() runs
2242
 
 * first and finds referenceCount == 0 and gets a positve response from the remote manager, then reserveManager()
2243
 
 * will throw an IOException.  But if decrementReferences() gets a negative response, it will start up a
2244
 
 * ShutdownMonitorThread, which runs as long as the flag readyToShutdown is true.  But reserveManager() will
2245
 
 * set readyToShutdown to false, ending the ShutdownMonitorThread.  When unreserveManager() eventually runs
2246
 
 * and sees referenceCount == 0, it will increment referenceCount and call decrementReferences(), allowing the
2247
 
 * shutdown attempt to proceed anew.  Note that when incrementReferences() runs successfully, it sets the flag
2248
 
 * reserved to false, since incrementing referenceCount will also keep the MultiplexingManager alive.
2249
 
 */
2250
 
   protected class ShutdownManager
2251
 
   {
2252
 
      /** referenceCount keeps track of the number of clients of ShutdownManager's enclosing
2253
 
       *  MultiplexingManager. */
2254
 
      private int referenceCount = 1;
2255
 
 
2256
 
      /** reserved is set to true to prevent the manager from shutting down without incrementing
2257
 
       *  the reference count. */
2258
 
      private boolean reserved = false;
2259
 
 
2260
 
      /** shutdownRequestInProgress remains true while a remote shutdown request is pending */
2261
 
      private boolean shutdownRequestInProgress = false;
2262
 
 
2263
 
      /** readyToShutdown is set to true as long as long as referenceCount == 0.  It is interpreted
2264
 
       *  by respondToShutdownRequest() as the local manager's willingness to shut down. */
2265
 
      private boolean readyToShutdown = false;
2266
 
 
2267
 
      /** shutdownMonitorThread holds a reference to the most recently created ShutdownMonitorThread. */
2268
 
//      ShutdownMonitorThread shutdownMonitorThread;
2269
 
      ShutdownMonitorTimerTask shutdownMonitorTimerTask;
2270
 
 
2271
 
      /** shutdown is set to true when the irrevocable decision has been made to shut down.  Once it
2272
 
       *  is set to true, it is never set to false. */
2273
 
      private boolean shutdown = false;
2274
 
 
2275
 
      /** remoteShutdown is set to true when the remote manager makes a shutdown request and
2276
 
       *  respondToShutdownRequest(), its agent for the request, discovers that the local manager
2277
 
       *  is also willing to shut down.  It represents the fact that the remote manager will respond
2278
 
       *  by deciding to shut down. */
2279
 
      private boolean remoteShutdown = false;
2280
 
 
2281
 
      /** shutdownHandled is set to true when ShutdownMonitorTimerTask begins the
2282
 
       *  shut down process */
2283
 
      private boolean shutdownHandled;
2284
 
 
2285
 
      /** requestShutdownFailed is true if and only if ShutdownRequestThread's attempt
2286
 
       *  to get a response to a shut down request failed */
2287
 
      private boolean requestShutdownFailed;
2288
 
 
2289
 
 
2290
 
      private class ShutdownRequestThread extends Thread
2291
 
      {
2292
 
         public ShutdownRequestThread()
2293
 
         {
2294
 
            shutdownRequestInProgress = true;
2295
 
         }
2296
 
 
2297
 
         public void run()
2298
 
         {
2299
 
            try
2300
 
            {
2301
 
               // Note. The timeout for Protocol's input stream should be longer than the
2302
 
               // time spent in decrementReferences() or by ShutdownMonitorTimerTask waiting
2303
 
               // for a response.
2304
 
               shutdown = protocol.requestManagerShutdown(shutdownRequestTimeout * 2);
2305
 
               if (debug) log.debug("shutdown: " + shutdown);
2306
 
            }
2307
 
            catch (SocketTimeoutException e)
2308
 
            {
2309
 
               requestShutdownFailed = true;
2310
 
               log.debug("socket timeout exception in manager shutdown request");
2311
 
            }
2312
 
            catch (Exception e)
2313
 
            {
2314
 
               requestShutdownFailed = true;
2315
 
               log.debug("i/o exception in manager shutdown request", e);
2316
 
            }
2317
 
 
2318
 
            if (debug) log.debug("ShutdownRequestThread.run() done: " + shutdown);
2319
 
            shutdownRequestInProgress = false;
2320
 
 
2321
 
            synchronized(ShutdownManager.this)
2322
 
            {
2323
 
               ShutdownManager.this.notifyAll();
2324
 
            }
2325
 
         }
2326
 
      }
2327
 
 
2328
 
 
2329
 
/**
2330
 
 * It is possible,due to a race condition, for there to be multiple ShutdownMonitorThreads running.
2331
 
 * In particular, suppose
2332
 
 * <p>
2333
 
 * <ol>
2334
 
 *   <li> decrementReferences() starts a ShutdownMonitorThread T1,
2335
 
 *   <li> reserverManager() runs and calls T1.terminate(),
2336
 
 *   <li> unreserveManager() runs, sees referenceCount == 0, and calls decrementReferences(),
2337
 
 *   <li> decrementReferences() creates a new ShutdownMonitorThread T2,
2338
 
 *   <li> T1 leaves sleep(1000) finds remoteShutdown == true and calls shutdown(), and
2339
 
 *   <li> T2 finds remoteShutdown == true.
2340
 
 * </ol>
2341
 
 * <p>
2342
 
 * For this reason, we turn on shutdown before calling shutdown() in T1, so that T2 will
2343
 
 * not call shutdown().
2344
 
 *
2345
 
 */
2346
 
//      private class ShutdownMonitorThread extends Thread
2347
 
//      {
2348
 
//         boolean running = true;
2349
 
//
2350
 
//         public void terminate()
2351
 
//         {
2352
 
//            running = false;
2353
 
//         }
2354
 
//
2355
 
//         public void run()
2356
 
//         {
2357
 
//            log.debug(socket.toString() + ": entering ShutdownMonitorThread");
2358
 
//
2359
 
//            while (running)
2360
 
//            {
2361
 
//               try
2362
 
//               {
2363
 
//                  sleep(1000);
2364
 
//               }
2365
 
//               catch (InterruptedException ignored) {}
2366
 
//
2367
 
//               synchronized (ShutdownManager.this)
2368
 
//               {
2369
 
//                  if (readyToShutdown && remoteShutdown && !shutdown)
2370
 
//                  {
2371
 
//                     log.debug("ShutdownMonitorThread: found remoteShutdown == true");
2372
 
//                     shutdown = true;
2373
 
//                     shutdown();
2374
 
//                     ShutdownManager.this.notifyAll();
2375
 
//                     return;
2376
 
//                  }
2377
 
//               }
2378
 
//            }
2379
 
//         }
2380
 
//      }
2381
 
 
2382
 
 
2383
 
      private class ShutdownMonitorTimerTask extends TimerTask
2384
 
      {
2385
 
         int count;
2386
 
         boolean cancelled;
2387
 
 
2388
 
         public boolean cancel()
2389
 
         {
2390
 
            log.debug("cancelling ShutdownMonitorTimerTask");
2391
 
            cancelled = true;
2392
 
            return super.cancel();
2393
 
         }
2394
 
 
2395
 
         public void run()
2396
 
         {
2397
 
            if (debug) log.debug(description + ": entering ShutdownMonitorTimerTask");
2398
 
            count++;
2399
 
 
2400
 
            synchronized (ShutdownManager.this)
2401
 
            {
2402
 
               // Another ShutdownMonitorTimerTask got here first.
2403
 
               if (shutdownHandled)
2404
 
               {
2405
 
                  if (debug) log.debug(description + ": shutdownHandled == true");
2406
 
                  cancel();
2407
 
               }
2408
 
 
2409
 
               // ShutdownRequestThread got a positive response.
2410
 
               else if (shutdown)
2411
 
               {
2412
 
                  if (debug) log.debug(description + ": shutdown is true");
2413
 
                  shutdownHandled = true;
2414
 
                  shutdown();
2415
 
                  cancel();
2416
 
               }
2417
 
 
2418
 
               // Peer MultiplexingManager requested shutdown consent.
2419
 
               else if (readyToShutdown && remoteShutdown)
2420
 
               {
2421
 
                  if (debug) log.debug(description + ": ShutdownMonitorTimerTask: found remoteShutdown == true");
2422
 
                  shutdown = true;
2423
 
                  shutdownHandled = true;
2424
 
                  shutdown();
2425
 
                  ShutdownManager.this.notifyAll();
2426
 
                  cancel();
2427
 
               }
2428
 
 
2429
 
               // Timeout (or other error) in ShutdownRequestThread
2430
 
               else if (requestShutdownFailed)
2431
 
               {
2432
 
                  if (debug) log.debug(description + ": ShutdownMonitorTimerTask: found requestShutdownFailed == true");
2433
 
                  shutdown = true;
2434
 
                  shutdownHandled = true;
2435
 
                  shutdown();
2436
 
                  ShutdownManager.this.notifyAll();
2437
 
                  cancel();
2438
 
               }
2439
 
 
2440
 
               // Count of peer MultiplexingManager refusals has reached maximum.
2441
 
               // Assume peer is hung up somehow and shut down.
2442
 
               else if (count > shutdownRefusalsMaximum)
2443
 
               {
2444
 
                  if (debug)
2445
 
                     log.debug(description + ": ShutdownMonitorTimerTask: " +
2446
 
                              "shutdown refusal count exceeded maximut: " + shutdownRefusalsMaximum);
2447
 
 
2448
 
                  shutdown = true;
2449
 
                  shutdownHandled = true;
2450
 
                  shutdown();
2451
 
                  ShutdownManager.this.notifyAll();
2452
 
                  cancel();
2453
 
               }
2454
 
 
2455
 
               // ShutdownRequestThread is still running.
2456
 
               else if (shutdownRequestInProgress)
2457
 
               {
2458
 
                  if (debug) log.debug(description + ": shutdownRequestInProgress == true");
2459
 
                  return;
2460
 
               }
2461
 
 
2462
 
               // ShutdownRequestThread got a negative response.  If we haven't been cancelled
2463
 
               // yet, we still are trying to shut down.  Ask again.
2464
 
               else
2465
 
               {
2466
 
                  // Note. The timeout for Protocol's input stream should be longer than the
2467
 
                  // time spent waiting for a response.
2468
 
                  ShutdownRequestThread shutdownRequestThread = new ShutdownRequestThread();
2469
 
                  shutdownRequestThread.setName(shutdownRequestThread.getName() + ":shutdownRequest:" + time);
2470
 
                  shutdownRequestThread.setDaemon(true);
2471
 
                  if (debug) log.debug(description + ": starting ShutdownRequestThread: " + shutdownRequestThread.toString());
2472
 
                  shutdownRequestThread.start();
2473
 
               }
2474
 
            }
2475
 
         }
2476
 
 
2477
 
         public String toString()
2478
 
         {
2479
 
            return "shutdownRequest:" + time;
2480
 
         }
2481
 
      }
2482
 
 
2483
 
/**
2484
 
 *
2485
 
 * @throws IOException
2486
 
 */
2487
 
      public synchronized void reserveManager() throws IOException
2488
 
      {
2489
 
         if (debug) log.debug(description + referenceCount);
2490
 
 
2491
 
         // If decrementReferences() grabbed the lock first, set referenceCount to 0, and initiated a
2492
 
         // remote shutdown request, wait until the answer comes back.
2493
 
         while (shutdownRequestInProgress)
2494
 
         {
2495
 
            try
2496
 
            {
2497
 
               wait();
2498
 
            }
2499
 
            catch (InterruptedException e)
2500
 
            {
2501
 
               // shouldn't happen
2502
 
               log.error("interruption in ShutdownRequestThread");
2503
 
            }
2504
 
         }
2505
 
 
2506
 
         // 1. shutdown is true if and only if (a) ShutdownRequestThread returned an indication that the
2507
 
         //    remote manager is shutting down, or (b) the wait() in decrementReferences() for the return
2508
 
         //    of ShutdownRequestThread timed out and decrementReferences() set shutdown to true;
2509
 
         // 2. remoteShutdown is true if and only if the remote manager initiated a shutdown request
2510
 
         //    which found that the local manager was ready to shut down. In this case, the remote
2511
 
         //    manager would go ahead and shut down, and the local manager will inevitably shutdown as well.
2512
 
 
2513
 
         if (shutdown || remoteShutdown)
2514
 
            throw new IOException("manager shutting down");
2515
 
 
2516
 
         readyToShutdown = false;
2517
 
         reserved = true;
2518
 
 
2519
 
//         if (shutdownMonitorThread != null)
2520
 
//            shutdownMonitorThread.terminate();
2521
 
 
2522
 
         if (shutdownMonitorTimerTask != null)
2523
 
            shutdownMonitorTimerTask.cancel();
2524
 
 
2525
 
         // wake up decrementReferences() if it is waiting
2526
 
         notifyAll();
2527
 
      }
2528
 
 
2529
 
 
2530
 
/**
2531
 
 *
2532
 
 */
2533
 
      public synchronized void unreserveManager()
2534
 
      {
2535
 
         if (debug) log.debug(description + referenceCount);
2536
 
 
2537
 
         if (!reserved)
2538
 
         {
2539
 
            log.error("attempting to unreserve a MultiplexingManager that was not reserved: " + description);
2540
 
            return;
2541
 
         }
2542
 
 
2543
 
         reserved = false;
2544
 
 
2545
 
         // If referenceCount == 0, it is because it was decremented by decrementReferences(). But if
2546
 
         // reserveManager() was able to run, it was because decrementReferences() did not succeed in
2547
 
         // negotiating a shutdown.  Either it found reserved == true and gave up, or it received a
2548
 
         // negative reply from the remote manager and started up a ShutdownMonitorThread, which would
2549
 
         // have terminated when reserveManager() set readyToShutdown to false.  It is therefore
2550
 
         // appropriate to give decrementReferences() another opportunity to shut down.
2551
 
         if (referenceCount == 0)
2552
 
         {
2553
 
            referenceCount++;
2554
 
            decrementReferences();
2555
 
         }
2556
 
      }
2557
 
/**
2558
 
 *
2559
 
 */
2560
 
      public synchronized void incrementReferences() throws IOException
2561
 
      {
2562
 
         if (debug) log.debug(description + referenceCount);
2563
 
 
2564
 
         // If decrementReferences() grabbed the lock first, set referenceCount to 0, and initiated a
2565
 
         // remote shutdown request, wait until the answer comes back.
2566
 
         while (shutdownRequestInProgress)
2567
 
         {
2568
 
            try
2569
 
            {
2570
 
               wait();
2571
 
            }
2572
 
            catch (InterruptedException e)
2573
 
            {
2574
 
               // shouldn't happen
2575
 
               log.error("interruption in ShutdownRequestThread");
2576
 
            }
2577
 
         }
2578
 
 
2579
 
         // 1. shutdown is true if and only if (a) ShutdownRequestThread returned an indication that the
2580
 
         //    remote manager is shutting down, or (b) the wait() in decrementReferences() for the return
2581
 
         //    of ShutdownRequestThread timed out and decrementReferences() set shutdown to true;
2582
 
         // 2. remoteShutdown is true if and only if the remote manager initiated a shutdown request
2583
 
         //    which found that the local manager was ready to shut down. In this case, the remote
2584
 
         //    manager would go ahead and shut down, and the local manager will inevitably shutdown as well.
2585
 
 
2586
 
         if (shutdown || remoteShutdown)
2587
 
            throw new IOException("not accepting new clients");
2588
 
 
2589
 
         readyToShutdown = false;
2590
 
         reserved = false;
2591
 
         referenceCount++;
2592
 
 
2593
 
         if (debug) log.debug(description + referenceCount);
2594
 
 
2595
 
//         if (shutdownMonitorThread != null)
2596
 
//            shutdownMonitorThread.terminate();
2597
 
 
2598
 
         if (shutdownMonitorTimerTask != null)
2599
 
            shutdownMonitorTimerTask.cancel();
2600
 
 
2601
 
         // wake up decrementReferences() if it is waiting
2602
 
         notifyAll();
2603
 
      }
2604
 
 
2605
 
 
2606
 
/**
2607
 
 *
2608
 
 */
2609
 
      public synchronized void decrementReferences()
2610
 
      {
2611
 
         referenceCount--;
2612
 
         if (debug) log.debug(description + referenceCount);
2613
 
 
2614
 
         if (reserved)
2615
 
         {
2616
 
            if (debug) log.debug(description + ": reserved == true");
2617
 
            return;
2618
 
         }
2619
 
 
2620
 
         if (referenceCount == 0)
2621
 
         {
2622
 
            readyToShutdown = true;
2623
 
 
2624
 
            if (isConnected())
2625
 
            {
2626
 
               ShutdownRequestThread shutdownRequestThread = new ShutdownRequestThread();
2627
 
               shutdownRequestThread.setName(shutdownRequestThread.getName() + ":shutdownRequest:" + time);
2628
 
               shutdownRequestThread.setDaemon(true);
2629
 
               if (debug) log.debug(description + "starting ShutdownRequestThread: " + shutdownRequestThread.toString());
2630
 
               shutdownRequestThread.start();
2631
 
 
2632
 
               try
2633
 
               {
2634
 
                  // Note. The timeout for Protocol's input stream should be longer than the
2635
 
                  // time spent waiting for a response.
2636
 
                  wait(shutdownRequestTimeout);
2637
 
               }
2638
 
               catch (InterruptedException e)
2639
 
               {
2640
 
                  // shouldn't happen
2641
 
                  log.error("interrupt in ShutdownRequestThread");
2642
 
               }
2643
 
 
2644
 
               if (log.isDebugEnabled())
2645
 
               {
2646
 
                  log.debug(description + shutdown);
2647
 
                  log.debug(description + shutdownRequestThread.isAlive());
2648
 
               }
2649
 
 
2650
 
               // If shutdownRequestInProgress is still true, we assume that the peer MultiplexingManager
2651
 
               // has shut down or is inaccessible, and we shut down.
2652
 
               if (shutdownRequestInProgress)
2653
 
               {
2654
 
                  shutdown = true;
2655
 
 
2656
 
                  // turn off shutdownRequestInProgress in case incrementReferences() or reserveManager()
2657
 
                  // are waiting
2658
 
                  shutdownRequestInProgress = false;
2659
 
               }
2660
 
            }
2661
 
            else // !isConnected()
2662
 
               shutdown = true;
2663
 
 
2664
 
 
2665
 
            if (shutdown)
2666
 
            {
2667
 
               shutdown();
2668
 
 
2669
 
               // wake up incrementReferences() if it is waiting
2670
 
               notifyAll();
2671
 
            }
2672
 
            else
2673
 
            {
2674
 
//               shutdownMonitorThread = new ShutdownMonitorThread();
2675
 
//               shutdownMonitorThread.setName(shutdownMonitorThread.getName() + ":shutdownMonitor");
2676
 
//               shutdownMonitorThread.start();
2677
 
 
2678
 
               shutdownMonitorTimerTask = new ShutdownMonitorTimerTask();
2679
 
               if (debug) log.debug(description + ": scheduling ShutdownMonitorTask: " + shutdownMonitorTimerTask);
2680
 
               timer.schedule(shutdownMonitorTimerTask, shutdownMonitorPeriod, shutdownMonitorPeriod);
2681
 
            }
2682
 
         }
2683
 
      }
2684
 
 
2685
 
 
2686
 
/**
2687
 
 * @return
2688
 
 */
2689
 
      protected synchronized boolean respondToShutdownRequest()
2690
 
      {
2691
 
         if (debug)
2692
 
         {
2693
 
            log.debug(description + readyToShutdown);
2694
 
            log.debug(description + shutdown);
2695
 
         }
2696
 
 
2697
 
         if (readyToShutdown)
2698
 
         {
2699
 
            remoteShutdown = true;
2700
 
            if (debug) log.debug(description + ": respondToShutdownRequest(): set remoteShutdown to true");
2701
 
         }
2702
 
 
2703
 
         return readyToShutdown;
2704
 
      }
2705
 
 
2706
 
 
2707
 
/**
2708
 
 * @return
2709
 
 */
2710
 
      protected boolean isShutdown()
2711
 
      {
2712
 
         return shutdown;
2713
 
      }
2714
 
   }
2715
 
 
2716
 
 
2717
 
/**
2718
 
 *
2719
 
 */
2720
 
   protected class ShutdownThread extends Thread
2721
 
   {
2722
 
      private boolean safeToShutDown;
2723
 
 
2724
 
      public void run()
2725
 
      {
2726
 
         String message = null;
2727
 
         if (debug) log.debug(description + ": manager shutting down");
2728
 
 
2729
 
         // Unregister this MultiplexingManager by local address(es)
2730
 
         unregisterByLocalAddress();
2731
 
 
2732
 
         // Unregister this MultiplexingManager by remote address
2733
 
         unregisterByRemoteAddress();
2734
 
 
2735
 
         // Remove this MultiplexingManager from Map of shareable managers
2736
 
         unregisterShareable();
2737
 
 
2738
 
         if (socket != null)
2739
 
         {
2740
 
            try
2741
 
            {
2742
 
               if (outputMultiplexor != null)
2743
 
               {
2744
 
                  outputMultiplexor.unregister(MultiplexingManager.this);
2745
 
 
2746
 
                  // Don't close socket until all output has been written.
2747
 
                  synchronized (MultiplexingManager.this)
2748
 
                  {
2749
 
                     while (!safeToShutDown)
2750
 
                     {
2751
 
                        if (debug) log.debug("waiting for safe to shut down");
2752
 
                        try
2753
 
                        {
2754
 
                           MultiplexingManager.this.wait();
2755
 
                        }
2756
 
                        catch (InterruptedException ignored)
2757
 
                        {
2758
 
                        }
2759
 
                     }
2760
 
                  }
2761
 
               }
2762
 
 
2763
 
               if (socket.getChannel() == null)
2764
 
                  socket.close();
2765
 
               else
2766
 
               {
2767
 
//                  socket.getChannel().close();
2768
 
                  message = description;
2769
 
 
2770
 
                  if (multiGroupInputThread != null)
2771
 
                     multiGroupInputThread.unregisterSocketGroup(MultiplexingManager.this);
2772
 
 
2773
 
                  socket.close();
2774
 
                  if (debug) log.debug("closed socket: " + description);
2775
 
//                  multiGroupInputThread.unregisterSocketGroup(MultiplexingManager.this);
2776
 
//                  socket.close();
2777
 
               }
2778
 
 
2779
 
               log.debug("manager: closed socket");
2780
 
            }
2781
 
            catch (Exception e)
2782
 
            {
2783
 
               log.error("manager: unable to close socket", e);
2784
 
            }
2785
 
         }
2786
 
 
2787
 
         if (inputThread != null)
2788
 
         {
2789
 
            inputThread.shutdown();
2790
 
 
2791
 
            try
2792
 
            {
2793
 
               inputThread.join();
2794
 
               log.debug("manager: joined input thread");
2795
 
            }
2796
 
            catch (InterruptedException ignored)
2797
 
            {
2798
 
               log.debug("manager: interrupted exception waiting for read thread");
2799
 
            }
2800
 
         }
2801
 
 
2802
 
         removeAnInputStream(SocketId.PROTOCOL_SOCKET_ID);
2803
 
         removeAnInputStream(SocketId.SERVER_SOCKET_ID);
2804
 
         removeAnInputStream(SocketId.SERVER_SOCKET_CONNECT_ID);
2805
 
         removeAnInputStream(SocketId.SERVER_SOCKET_VERIFY_ID);
2806
 
         removeAnInputStream(SocketId.BACKCHANNEL_SOCKET_ID);
2807
 
 
2808
 
         shutdown = true;
2809
 
 
2810
 
         // Remove this MultiplexManager from set of all managers
2811
 
         if (debug) log.debug("removing from allManagers: " + description + "(" + id + ")");
2812
 
         allManagers.remove(MultiplexingManager.this);
2813
 
 
2814
 
         if (debug) log.debug("manager shut down (: " + id + "): "  + message);
2815
 
         if (debug) log.debug("managers left: " + allManagers.size());
2816
 
      }
2817
 
 
2818
 
      public void setSafeToShutdown(boolean safe)
2819
 
      {
2820
 
         if (debug) log.debug("output flushed");
2821
 
         safeToShutDown = safe;
2822
 
      }
2823
 
   }
2824
 
 
2825
 
 
2826
 
   protected static class PendingActionThread extends StoppableThread
2827
 
   {
2828
 
      private List pendingActionsTemp = new ArrayList();
2829
 
 
2830
 
      protected void doInit()
2831
 
      {
2832
 
         log.debug("PendingActionThread starting");
2833
 
      }
2834
 
 
2835
 
      protected void doRun()
2836
 
      {
2837
 
         synchronized (pendingActions)
2838
 
         {
2839
 
            while (pendingActions.isEmpty())
2840
 
            {
2841
 
               try
2842
 
               {
2843
 
                  pendingActions.wait();
2844
 
               }
2845
 
               catch (InterruptedException ignored)
2846
 
               {
2847
 
                  if (!isRunning())
2848
 
                     return;
2849
 
               }
2850
 
            }
2851
 
 
2852
 
            pendingActionsTemp.addAll(pendingActions);
2853
 
            pendingActions.clear();
2854
 
         }
2855
 
 
2856
 
         Iterator it = pendingActionsTemp.iterator();
2857
 
 
2858
 
         while (it.hasNext())
2859
 
         {
2860
 
            Object o = it.next();
2861
 
            if (o instanceof PendingAction)
2862
 
               ((PendingAction) o).doAction();
2863
 
            else
2864
 
               log.error("object in closePendingSockets has invalid type: " + o.getClass());
2865
 
         }
2866
 
 
2867
 
         pendingActionsTemp.clear();
2868
 
      }
2869
 
 
2870
 
      public void shutdown()
2871
 
      {
2872
 
         log.debug("pending action thread beginning shut down");
2873
 
         super.shutdown();
2874
 
         interrupt();
2875
 
      }
2876
 
 
2877
 
      protected void doShutDown()
2878
 
      {
2879
 
         log.debug("PendingActionThread shutting down");
2880
 
      }
2881
 
   }
2882
 
}
 
 
b'\\ No newline at end of file'