2
* JBoss, Home of Professional Open Source
3
* Copyright 2005, JBoss Inc., and individual contributors as indicated
4
* by the @authors tag. See the copyright.txt in the distribution for a
5
* full listing of individual contributors.
7
* This is free software; you can redistribute it and/or modify it
8
* under the terms of the GNU Lesser General Public License as
9
* published by the Free Software Foundation; either version 2.1 of
10
* the License, or (at your option) any later version.
12
* This software is distributed in the hope that it will be useful,
13
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15
* Lesser General Public License for more details.
17
* You should have received a copy of the GNU Lesser General Public
18
* License along with this software; if not, write to the Free
19
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
23
package org.jboss.remoting.transport.multiplex;
25
import org.jboss.logging.Logger;
26
import org.jboss.remoting.transport.multiplex.InputMultiplexor.MultiGroupInputThread;
27
import org.jboss.remoting.transport.multiplex.utility.GrowablePipedOutputStream;
28
import org.jboss.remoting.transport.multiplex.utility.StoppableThread;
29
import org.jboss.remoting.transport.multiplex.utility.VirtualSelector;
31
import javax.net.SocketFactory;
32
import javax.net.ssl.HandshakeCompletedEvent;
33
import javax.net.ssl.HandshakeCompletedListener;
34
import javax.net.ssl.SSLSocket;
35
import java.io.ByteArrayOutputStream;
36
import java.io.IOException;
37
import java.io.InputStream;
38
import java.io.OutputStream;
39
import java.net.InetSocketAddress;
40
import java.net.ServerSocket;
41
import java.net.Socket;
42
import java.net.SocketTimeoutException;
43
import java.nio.channels.Channels;
44
import java.nio.channels.SocketChannel;
45
import java.util.ArrayList;
46
import java.util.Collection;
47
import java.util.Collections;
48
import java.util.Date;
49
import java.util.HashMap;
50
import java.util.HashSet;
51
import java.util.Iterator;
52
import java.util.List;
55
import java.util.Timer;
56
import java.util.TimerTask;
60
* <code>MultiplexingManager</code> is the heart of the Multiplex system. It is the implementation
61
* of the virtual socket group concept. See the Multiplex documentation on the
62
* labs.jboss.org website for more information about virtual socket groups.
64
* <code>MultiplexingManager</code> wraps a single real <code>java.net.Socket</code>.
65
* It creates the socket when it is running on the client side, and it is passed a
66
* socket by <code>MasterServerSocket</code> when it is running on the server side.
68
* <code>MultiplexingManager</code> creates the infrastructure
69
* which supports multiplexing, including an <code>OutputMultiplexor</code> output thread and one
70
* or more <code>InputMultiplexor</code> input threads. When the last member leaves the socket
71
* group, a <code>MultiplexingManager</code> is responsible for negotiating with its remote peer
72
* for permission to shut down, and for tearing down the multiplexing infrastructure when
73
* the negotiations succeed.
75
* <code>MultiplexingManager</code> also provides the mechanism by which a virtual socket joins
76
* a virtual socket group, identifying the new socket to the <code>InputMultiplexor</code> input
82
* @author <a href="mailto:r.sigal@computer.org">Ron Sigal</a>
84
* @deprecated As of release 2.4.0 the multiplex transport will no longer be actively supported.
86
public class MultiplexingManager
87
implements OutputMultiplexor.OutputMultiplexorClient, HandshakeCompletedListener
89
private static final Logger log = Logger.getLogger(MultiplexingManager.class);
92
/** Determines how often to check that no MultiplexingManagers have been running,
93
* in which case the static threads can be shut down. */
94
private static int staticThreadsMonitorPeriod;
96
/** True if and only if the MultiplexingManager static threads are running */
97
private static boolean staticThreadsRunning;
99
/** This object is used to synchronized operations on the managersByRemoteAddress map. */
100
private static Object shareableMapLock = new Object();
102
/** A HashMap<InetSocketAddress, HasnSet<MultiplexingManager>> of sets of MultiplexingManager's
103
* indexed by remote address. Holds all MultiplexingManagers whose peer has an attached
104
* VirtualServerSocket. */
105
private static Map shareableManagers = new HashMap();
107
/** This object is used to synchronize operations on the managersByLocalAddress map. */
108
private static Object localAddressMapLock = new Object();
110
/** A HashMap<InetSocketAddress, HasnSet<MultiplexingManager>> of sets of MultiplexingManager's
111
* indexed by local address */
112
private static Map managersByLocalAddress = new HashMap();
114
/** This object is used to synchronized operations on the managersByRemoteAddress map. */
115
private static Object remoteAddressMapLock = new Object();
117
/** A HashMap<InetSocketAddress, HasnSet<MultiplexingManager>> of sets of MultiplexingManager's
118
* indexed by remote address */
119
// private static Map managersByRemoteAddress = Collections.synchronizedMap(new HashMap());
120
private static Map managersByRemoteAddress = new HashMap();
122
/** Set of all MultiplexingManagers */
123
private static Set allManagers = Collections.synchronizedSet(new HashSet());
125
/** InputMultiplexor in this JVM */
126
private static InputMultiplexor inputMultiplexor;
128
/** OutputMultiplexor in this JVM */
129
private static OutputMultiplexor outputMultiplexor;
131
/** Thread for writing to socket's OutputStream */
132
private static OutputMultiplexor.OutputThread outputThread;
134
/** MultiGroupInputThread reading from socket's InputStream and distributing to virtual sockets
135
* Processes all NIO sockets */
136
private static MultiGroupInputThread multiGroupInputThread;
138
/** MultiplexingInputStreams register with virtualSelector when they have bytes to read */
139
private static VirtualSelector virtualSelector;
141
/** Thread for getting asynchronous messages from remote Protocol */
142
private static Protocol.BackChannelThread backChannelThread;
144
/** Holds PendingActions waiting to be performed */
145
private static List pendingActions = new ArrayList();
147
/** Removes virtual sockets from closingSockets and closes them. */
148
private static PendingActionThread pendingActionThread;
150
/** Thread for stashing potentially long activities, as well as periodic activities */
151
private static Timer timer;
153
/** Used to determine when to shut down static threads */
154
private static boolean hasBeenIdle;
156
/** Used to distinguish the static threads in this jvm */
157
private final static short time = (short) System.currentTimeMillis();
159
/** If shutdown request is not answered within this time period, assume a problem
161
private int shutdownRequestTimeout;
163
/** Determines how often ShutdownMonitorTimerTask should check for response
164
* to ShutdownRequestThread. */
165
private int shutdownMonitorPeriod;
167
/** If the peer MultiplexingManager has refused to shutdown this many times,
168
* shut down anyway. */
169
private int shutdownRefusalsMaximum;
171
/** Holds configuration parameters */
172
private static Map configuration = new HashMap();
174
/** A HashMap<SocketId, VirtualSocket> of VirtualSocket's indexed by local SocketId */
175
private Map socketMap = Collections.synchronizedMap(new HashMap());
177
/** A HashSet of SocketId's registered on this MultiplexingManager */
178
private Set registeredSockets = Collections.synchronizedSet(new HashSet());
180
/** A HashMap<Long, OutputStream> of piped OutputStreams associated with InputStreams */
181
private Map outputStreamMap = Collections.synchronizedMap(new HashMap());
183
/** A HashMap<Long, InputStream> of InputStreams associated with virtual sockets */
184
private Map inputStreamMap = Collections.synchronizedMap(new HashMap());
186
/** Holds OutputStreams associated with virtual sockets */
187
private Set outputStreamSet = Collections.synchronizedSet(new HashSet());
189
/** Protocol back channel OutputStream */
190
private OutputStream backchannelOutputStream;
192
/** Threads waiting to be notified of registration of remote server socket */
193
private Set threadsWaitingForRemoteServerSocket = new HashSet();
195
/** Protocol object for handling connection/disconnection communications */
196
private Protocol protocol;
198
/** Actual local socket upon which this family of virtual sockets is based */
199
private Socket socket;
201
/** Returned by toString() and used in log messages */
204
/** bound state of actual local socket */
205
private boolean bound = false;
207
/** connected state of actual local socket */
208
private boolean connected = false;
210
/** SocketAddress of remote actual socket to which this Manager's socket is connected */
211
private InetSocketAddress remoteSocketAddress;
213
/** SocketAddress to which this manager's actual socket is bound */
214
private InetSocketAddress localSocketAddress;
216
/** Represents local port on which this manager's actual socket is bound, with any local address */
217
private InetSocketAddress localWildCardAddress;
219
/** InputStream of real socket */
220
private InputStream inputStream;
222
/** OutputStream of real socket */
223
private OutputStream outputStream;
225
/** Currently registered server socket */
226
private ServerSocket serverSocket;
228
/** Indicates if remote server socket has been registered */
229
private boolean remoteServerSocketRegistered = false;
231
/** True if and only if this MultiplexingManager was originally created by a
232
* call to MasterServerSocket.acceptServerSocketConnection(). */
233
private boolean createdForRemoteServerSocket;
235
/** SingleGroupInputThread reading from socket's InputStream and distributing to virtual sockets */
236
private InputMultiplexor.SingleGroupInputThread inputThread;
238
/** OutputStream for unknown virtual sockets */
239
private OutputStream deadLetterOutputStream = new ByteArrayOutputStream();
241
/** Manages the shutdown handshaking protocol between peer MultiplexingManagers */
242
private ShutdownManager shutdownManager = new ShutdownManager();
244
/** Thread that carries out shut down process */
245
private ShutdownThread shutdownThread;
247
/** true if and only MultiplexingManager is completely shut down */
248
private boolean shutdown = false;
250
/** Indicates if log trace level is enabled */
251
private boolean trace;
253
/** Indicates if log debug level is enabled */
254
private boolean debug;
256
/** Indicates if log info level is enabled */
257
private boolean info;
261
/** SocketFactory to use for creating sockets */
262
private SocketFactory socketFactory;
264
/** Saves HandshakeCompletedEvent for SSLSocket */
265
private HandshakeCompletedEvent handshakeCompletedEvent;
267
/** Holds IOException thrown by real InputStream */
268
private IOException readException;
270
/** Holds IOException thrown by real OutputStream */
271
private IOException writeException;
274
protected synchronized static void init(Map configuration) throws IOException
278
if (staticThreadsRunning)
281
log.debug("starting static threads");
283
// Start output thread.
284
outputMultiplexor = new OutputMultiplexor(configuration);
285
outputThread = outputMultiplexor.getAnOutputThread();
286
outputThread.setName("output:" + time);
287
outputThread.setDaemon(true);
288
outputThread.start();
289
log.debug("started output thread");
291
// Start input thread.
292
inputMultiplexor = new InputMultiplexor(configuration);
293
multiGroupInputThread = inputMultiplexor.getaMultiGroupInputThread();
294
multiGroupInputThread.setName("input:" + time);
295
multiGroupInputThread.setDaemon(true);
296
multiGroupInputThread.start();
297
log.debug("started input thread");
299
// Start back channel thread.
300
virtualSelector = new VirtualSelector();
301
backChannelThread = Protocol.getBackChannelThread(virtualSelector);
302
backChannelThread.setName("backchannel:" + time);
303
backChannelThread.setDaemon(true);
304
backChannelThread.start();
305
log.debug("started backchannel thread");
308
timer = new Timer(true);
309
TimerTask shutdownMonitorTask = new TimerTask()
313
log.trace("allManagers.isEmpty(): " + allManagers.isEmpty());
314
log.trace("hasBeenIdle: " + hasBeenIdle);
315
if (allManagers.isEmpty())
319
MultiplexingManager.shutdownThreads();
332
timer.scheduleAtFixedRate(shutdownMonitorTask, staticThreadsMonitorPeriod, staticThreadsMonitorPeriod);
334
// Start pending actions thread.
335
pendingActionThread = new PendingActionThread();
336
pendingActionThread.setName("pending actions:" + time);
337
pendingActionThread.setDaemon(true);
338
pendingActionThread.start();
339
log.debug("started pendingAction thread");
341
staticThreadsRunning = true;
343
catch (IOException e)
351
protected MultiplexingManager(Map configuration) throws IOException
353
if (configuration != null)
354
MultiplexingManager.configuration.putAll(configuration);
355
socketFactory = (SocketFactory) configuration.get(Multiplex.SOCKET_FACTORY);
356
id = new Date().getTime();
357
socket = createSocket();
358
allManagers.add(this);
359
if (debug) log.debug("new MultiplexingManager(" + id + "): " + description);
366
* @param configuration
367
* @throws IOException
369
protected MultiplexingManager(Socket socket, Map configuration) throws IOException
371
this.socket = socket;
372
if (configuration != null)
373
MultiplexingManager.configuration.putAll(configuration);
374
id = new Date().getTime();
376
allManagers.add(this);
377
if (debug) log.debug("new MultiplexingManager(" + id + "): " + description);
385
* @param configuration
386
* @throws IOException
388
protected MultiplexingManager(InetSocketAddress address, int timeout, Map configuration)
391
if (configuration != null)
392
MultiplexingManager.configuration.putAll(configuration);
393
socketFactory = (SocketFactory) configuration.get(Multiplex.SOCKET_FACTORY);
394
id = new Date().getTime();
395
socket = createSocket(address, timeout);
397
allManagers.add(this);
398
if (debug) log.debug("new MultiplexingManager(" + id + "): " + description);
405
protected synchronized void setup() throws IOException
407
description = socket.toString();
408
trace = log.isTraceEnabled();
409
debug = log.isDebugEnabled();
410
info = log.isInfoEnabled();
412
// Initialize MultiplexingManager parameters.
413
initParameters(configuration);
415
// Make sure static threads are running.
416
synchronized (MultiplexingManager.class)
418
if (!staticThreadsRunning)
422
// Get InputStream and OutputStream
423
if (socket.getChannel() == null)
425
// inputStream = new BufferedInputStream(socket.getInputStream());
426
// outputStream = new BufferedOutputStream(socket.getOutputStream());
427
inputStream = socket.getInputStream();
428
outputStream = socket.getOutputStream();
432
inputStream = Channels.newInputStream(socket.getChannel());
433
outputStream = Channels.newOutputStream(socket.getChannel());
434
socket.setTcpNoDelay(false);
437
// register dead letter output stream (for unrecognized destinations)
438
outputStreamMap.put(SocketId.DEADLETTER_SOCKET_ID, deadLetterOutputStream);
440
// TODO: what was this for???
441
registeredSockets.add(SocketId.PROTOCOL_SOCKET_ID);
442
registeredSockets.add(SocketId.SERVER_SOCKET_ID);
443
registeredSockets.add(SocketId.SERVER_SOCKET_CONNECT_ID);
444
registeredSockets.add(SocketId.SERVER_SOCKET_VERIFY_ID);
445
registeredSockets.add(SocketId.BACKCHANNEL_SOCKET_ID);
447
// set up standard piped streams
448
getAnInputStream(SocketId.PROTOCOL_SOCKET_ID, null);
449
getAnInputStream(SocketId.SERVER_SOCKET_ID, null);
450
getAnInputStream(SocketId.SERVER_SOCKET_CONNECT_ID, null);
451
getAnInputStream(SocketId.SERVER_SOCKET_VERIFY_ID, null);
453
// Create protocol backchannel streams
454
protocol = new Protocol(this);
455
MultiplexingInputStream bcis = getAnInputStream(SocketId.BACKCHANNEL_SOCKET_ID, null);
456
bcis.register(virtualSelector, this);
457
if (debug) log.debug("registered backchannel input stream");
458
backchannelOutputStream = new MultiplexingOutputStream(this, SocketId.PROTOCOL_SOCKET_ID);
460
// Register with OutputMultiplexor
461
outputMultiplexor.register(this);
463
// Create or register with input thread
464
if (socket.getChannel() == null)
466
// start input thread
467
log.debug("creating single group input thread");
469
if (inputMultiplexor == null)
470
inputMultiplexor = new InputMultiplexor(configuration);
472
inputThread = inputMultiplexor.getaSingleGroupInputThread(this, socket, deadLetterOutputStream);
473
inputThread.setName(inputThread.getName() + ":input(" + description + ")");
478
socket.getChannel().configureBlocking(false);
479
multiGroupInputThread.registerSocketGroup(this);
480
log.debug("registered socket group");
483
registerByLocalAddress(new InetSocketAddress(socket.getLocalAddress(), socket.getLocalPort()));
484
registerByRemoteAddress(new InetSocketAddress(socket.getInetAddress(), socket.getPort()));
488
if (socket instanceof SSLSocket)
490
// Object o = configuration.get(Multiplex.SSL_HANDSHAKE_LISTENER);
493
// HandshakeCompletedListener hcl = (HandshakeCompletedListener) o;
494
// ((SSLSocket) socket).addHandshakeCompletedListener(hcl);
496
((SSLSocket) socket).addHandshakeCompletedListener(this);
501
protected void initParameters(Map configuration)
503
this.configuration = configuration;
505
staticThreadsMonitorPeriod
506
= Multiplex.getOneParameter(configuration,
507
"staticThreadsMonitorPeriod",
508
Multiplex.STATIC_THREADS_MONITOR_PERIOD,
509
Multiplex.STATIC_THREADS_MONITOR_PERIOD_DEFAULT);
511
shutdownRequestTimeout
512
= Multiplex.getOneParameter(configuration,
513
"shutdownRequestTimeout",
514
Multiplex.SHUTDOWN_REQUEST_TIMEOUT,
515
Multiplex.SHUTDOWN_REQUEST_TIMEOUT_DEFAULT);
517
shutdownRefusalsMaximum
518
= Multiplex.getOneParameter(configuration,
519
"shutdownRefusalsMaximum",
520
Multiplex.SHUTDOWN_REFUSALS_MAXIMUM,
521
Multiplex.SHUTDOWN_REFUSALS_MAXIMUM_DEFAULT);
523
shutdownMonitorPeriod
524
= Multiplex.getOneParameter(configuration,
525
"shutdownMonitorPeriod",
526
Multiplex.SHUTDOWN_MONITOR_PERIOD,
527
Multiplex.SHUTDOWN_MONITOR_PERIOD_DEFAULT);
534
* @param configuration
536
* @throws IOException
538
* TODO: what if multiplexor already exists?
540
public static MultiplexingManager getaManager(Socket socket, Map configuration) throws IOException
542
log.debug("entering getaManager(Socket socket)");
543
return new MultiplexingManager(socket, configuration);
550
* @throws IOException
552
public static synchronized MultiplexingManager
553
getaManagerByLocalAddress(InetSocketAddress address) throws IOException
555
return getaManagerByLocalAddress(address, null);
564
* @throws IOException
566
public static synchronized MultiplexingManager
567
getaManagerByLocalAddress(InetSocketAddress address, Map conf) throws IOException
569
log.debug("entering getaManagerByLocalAddress(InetSocketAddress address)");
570
MultiplexingManager m = null;
572
synchronized (localAddressMapLock)
574
HashSet managers = (HashSet) managersByLocalAddress.get(address);
576
if (managers != null)
578
Iterator it = managers.iterator();
581
m = (MultiplexingManager) it.next();
584
m.shutdownManager.incrementReferences();
587
catch (IOException e)
594
log.debug("There is no joinable MultiplexingManager. Creating new one.");
595
m = new MultiplexingManager(conf);
606
* @throws IOException
608
public static synchronized MultiplexingManager
609
getaManagerByRemoteAddress(InetSocketAddress address, int timeout) throws IOException
611
return getaManagerByRemoteAddress(address, timeout, null);
619
* @param configuration
621
* @throws IOException
623
public static synchronized MultiplexingManager
624
getaManagerByRemoteAddress(InetSocketAddress address, int timeout, Map conf)
627
log.debug("entering getaManagerByRemoteAddress(InetSocketAddress address)");
632
// Check each of the MultiplexingManagers connected to the target remote address, looking
633
// for one which is not shutting down.
634
synchronized(remoteAddressMapLock)
636
HashSet managers = (HashSet) managersByRemoteAddress.get(address);
638
if (managers != null && !managers.isEmpty())
640
Iterator it = managers.iterator();
643
MultiplexingManager m = (MultiplexingManager) it.next();
646
m.shutdownManager.incrementReferences();
651
log.debug("manager shutting down: " + m);
657
return new MultiplexingManager(address, timeout, conf);
663
* @param remoteAddress
664
* @param localAddress
667
* @throws IOException
669
public static synchronized MultiplexingManager
670
getaManagerByAddressPair(InetSocketAddress remoteAddress, InetSocketAddress localAddress, int timeout)
673
return getaManagerByAddressPair(remoteAddress, localAddress, timeout, null);
678
* @param remoteAddress
679
* @param localAddress
681
* @param configuration
683
* @throws IOException
685
public static synchronized MultiplexingManager
686
getaManagerByAddressPair(InetSocketAddress remoteAddress, InetSocketAddress localAddress,
687
int timeout, Map conf)
690
log.debug("entering getaManagerByRemoteAddress(InetSocketAddress address)");
691
MultiplexingManager m;
693
// Check each of the MultiplexingManagers connected to the target remote address, looking
694
// for one which is not shutting down.
695
synchronized(remoteAddressMapLock)
697
HashSet managers = (HashSet) managersByRemoteAddress.get(remoteAddress);
699
if (managers != null && !managers.isEmpty())
701
Iterator it = managers.iterator();
705
m = (MultiplexingManager) it.next();
706
if (m.getSocket().getLocalAddress().equals(localAddress.getAddress()) &&
707
m.getSocket().getLocalPort() == localAddress.getPort())
711
m.shutdownManager.incrementReferences();
716
log.debug("manager shutting down: " + m);
723
log.debug("There is no joinable MultiplexingManager. Creating new one.");
724
m = new MultiplexingManager(conf);
725
m.bind(localAddress);
734
* @throws IOException
736
public static synchronized MultiplexingManager
737
getaShareableManager(InetSocketAddress address, int timeout) throws IOException
739
return getaShareableManager(address, timeout, null);
748
* @throws IOException
750
public static synchronized MultiplexingManager
751
getaShareableManager(InetSocketAddress address, int timeout, Map conf) throws IOException
753
log.debug("entering getaShareableManager(InetSocketAddress address)");
755
// Check each of the shareable MultiplexingManagers connected to the target remote address, looking
756
// for one which is not shutting down.
757
synchronized(shareableMapLock)
759
HashSet managers = (HashSet) shareableManagers.get(address);
761
if (managers != null && !managers.isEmpty())
763
Iterator it = managers.iterator();
766
MultiplexingManager m = (MultiplexingManager) it.next();
769
m.shutdownManager.incrementReferences();
774
log.debug("manager shutting down: " + m);
780
return new MultiplexingManager(address, timeout, conf);
789
* @throws IOException
791
public static MultiplexingManager
792
getAnExistingShareableManager(InetSocketAddress address, Map conf)
795
log.debug("entering getAnExistingShareableManager()");
797
// Check each of the shareable MultiplexingManagers connected to the target remote address, looking
798
// for one which is not shutting down.
799
synchronized(shareableMapLock)
801
HashSet managers = (HashSet) shareableManagers.get(address);
803
if (managers != null && !managers.isEmpty())
805
Iterator it = managers.iterator();
809
MultiplexingManager m = (MultiplexingManager) it.next();
812
m.shutdownManager.incrementReferences();
817
log.debug("manager shutting down: " + m);
828
* @param remoteAddress
829
* @param localAddress
832
* @throws IOException
834
public static synchronized MultiplexingManager
835
getaShareableManagerByAddressPair(InetSocketAddress remoteAddress, InetSocketAddress localAddress, int timeout)
838
return getaShareableManagerByAddressPair(remoteAddress, localAddress, timeout, null);
843
* @param remoteAddress
844
* @param localAddress
848
* @throws IOException
850
public static synchronized MultiplexingManager
851
getaShareableManagerByAddressPair(InetSocketAddress remoteAddress, InetSocketAddress localAddress,
852
int timeout, Map conf)
855
MultiplexingManager m;
857
// Check each of the shareable MultiplexingManagers connected to the target remote address, looking
858
// for one which is not shutting down.
859
synchronized(shareableMapLock)
861
HashSet managers = (HashSet) shareableManagers.get(remoteAddress);
862
if (managers != null && !managers.isEmpty())
864
Iterator it = managers.iterator();
868
m = (MultiplexingManager) it.next();
869
if (m.getSocket().getLocalAddress().equals(localAddress.getAddress()) &&
870
m.getSocket().getLocalPort() == localAddress.getPort())
874
m.shutdownManager.incrementReferences();
879
log.debug("manager shutting down: " + m);
886
log.debug("There is no joinable MultiplexingManager. Creating new one.");
887
m = new MultiplexingManager(conf);
888
m.bind(localAddress);
894
* @param remoteAddress
895
* @param localAddress
898
* @throws IOException
900
public static MultiplexingManager
901
getAnExistingShareableManagerByAddressPair(InetSocketAddress remoteAddress,
902
InetSocketAddress localAddress,
906
log.debug("entering getaShareableManager(InetSocketAddress address)");
907
MultiplexingManager m;
909
// Check each of the shareable MultiplexingManagers connected to the target remote address, looking
910
// for one which is not shutting down.
911
synchronized(shareableMapLock)
913
HashSet managers = (HashSet) shareableManagers.get(remoteAddress);
915
if (managers != null && !managers.isEmpty())
917
Iterator it = managers.iterator();
921
m = (MultiplexingManager) it.next();
922
if (m.getSocket().getLocalAddress().equals(localAddress.getAddress()) &&
923
m.getSocket().getLocalPort() == localAddress.getPort())
927
m.shutdownManager.incrementReferences();
932
log.debug("manager shutting down: " + m);
946
* @throws IOException
948
public static boolean checkForShareableManager(InetSocketAddress address)
951
log.debug("entering checkForShareableManager(InetSocketAddress address)");
953
// Check each if there is at least one shareable MultiplexingManagers connected to the target remote address.
954
synchronized (shareableMapLock)
956
HashSet managers = (HashSet) shareableManagers.get(address);
958
if (managers != null && !managers.isEmpty())
967
* @param localAddress
968
* @param remoteAddress
971
public static boolean checkForManagerByAddressPair(InetSocketAddress localAddress,
972
InetSocketAddress remoteAddress)
974
log.debug("entering checkForManagerByAddressPair()");
976
// Check each of the MultiplexingManagers connected to the target remote address, looking
977
// for one bound to the local address.
978
synchronized(remoteAddressMapLock)
980
HashSet managers = (HashSet) managersByRemoteAddress.get(remoteAddress);
982
if (managers != null && !managers.isEmpty())
984
Iterator it = managers.iterator();
988
MultiplexingManager m = (MultiplexingManager) it.next();
990
if (m.localSocketAddress.equals(localAddress))
1001
* @param localAddress
1002
* @param remoteAddress
1005
public static boolean checkForShareableManagerByAddressPair(InetSocketAddress localAddress,
1006
InetSocketAddress remoteAddress)
1008
log.debug("entering checkForShareableManagerByAddressPair()");
1010
// Check each of the shareable MultiplexingManagers connected to the target remote address, looking
1011
// for one bound to the local address.
1012
synchronized (shareableMapLock)
1014
HashSet managers = (HashSet) shareableManagers.get(remoteAddress);
1016
if (managers != null && !managers.isEmpty())
1018
Iterator it = managers.iterator();
1020
while (it.hasNext())
1022
MultiplexingManager m = (MultiplexingManager) it.next();
1024
if (m.localSocketAddress.equals(localAddress))
1037
public static int getStaticThreadMonitorPeriod()
1039
return staticThreadsMonitorPeriod;
1046
public static void setStaticThreadsMonitorPeriod(int period)
1048
staticThreadsMonitorPeriod = period;
1055
protected synchronized static void shutdownThreads()
1057
log.debug("entering shutdownThreads");
1058
if (outputThread != null)
1059
outputThread.shutdown();
1061
if (multiGroupInputThread != null)
1062
multiGroupInputThread.shutdown();
1064
if (backChannelThread != null)
1065
backChannelThread.shutdown();
1067
if (pendingActionThread != null)
1068
pendingActionThread.shutdown();
1070
log.debug("cancelling timer");
1078
if (outputThread != null)
1079
outputThread.join();
1081
if (multiGroupInputThread != null)
1082
multiGroupInputThread.join();
1084
if (backChannelThread != null)
1085
backChannelThread.join();
1087
if (pendingActionThread != null)
1088
pendingActionThread.join();
1092
catch (InterruptedException ignored) {}
1095
staticThreadsRunning = false;
1096
log.debug("static threads shut down");
1101
* Adds a <code>PendingAction</code> to the list of actions waiting to be executed.
1102
* @param pendingAction
1104
protected static void addToPendingActions(PendingAction pendingAction)
1106
synchronized (pendingActions)
1108
pendingActions.add(pendingAction);
1109
pendingActions.notifyAll();
1115
* Binds the wrapped socket.
1117
* @throws IOException
1119
public synchronized void bind(InetSocketAddress address) throws IOException
1122
throw new IOException("socket is already bound");
1125
socket = createSocket();
1128
localSocketAddress = address;
1130
socket.bind(address);
1136
* Connects the wrapped socket.
1139
* @throws IOException
1141
public synchronized void connect(InetSocketAddress address, int timeout) throws IOException
1145
if (socket.getRemoteSocketAddress().equals(address))
1148
throw new IOException("socket is already connected");
1151
if (debug) log.debug("connecting to: " + address);
1154
socket = createSocket(address, timeout);
1156
socket.connect(address, timeout);
1164
* Identifies a <code>VirtualServerSocket</code> as the one associated with this virtual socket group.
1165
* @return a <code>MultiplexingInputStream</code> for use by serverSocket.
1167
public synchronized MultiplexingInputStream registerServerSocket(ServerSocket serverSocket) throws IOException
1169
if (this.serverSocket != null && this.serverSocket != serverSocket)
1171
log.error("[" + id + "]: " + "attempt to register a second server socket");
1172
log.error("current server socket: " + this.serverSocket.toString());
1173
log.error("new server socket: " + serverSocket.toString());
1174
throw new IOException("attempt to register a second server socket");
1177
if (debug) log.debug(serverSocket.toString());
1178
this.serverSocket = serverSocket;
1179
return getAnInputStream(SocketId.SERVER_SOCKET_ID, null);
1184
* Indicates a <code>VirtualServerSocket</code> is leaving this virtual socket group.
1185
* @param serverSocket
1186
* @throws IOException
1188
public synchronized void unRegisterServerSocket(ServerSocket serverSocket) throws IOException
1190
if (this.serverSocket != serverSocket)
1192
log.error("server socket attempting unregister but is not registered");
1193
throw new IOException("server socket is not registered");
1196
log.debug("server socket unregistering");
1197
removeAnInputStream(SocketId.SERVER_SOCKET_ID);
1198
this.serverSocket = null;
1199
shutdownManager.decrementReferences();
1204
* Adds a <code>VirtualSocket</code> to this virtual socket group.
1206
* @return a <code>MultiplexingInputStream</code> for use by socket
1207
* @throws IOException
1209
public synchronized MultiplexingInputStream registerSocket(VirtualSocket socket) throws IOException
1211
SocketId localSocketId = socket.getLocalSocketId();
1212
VirtualSocket currentSocket = (VirtualSocket) socketMap.put(localSocketId, socket);
1214
if (currentSocket != null)
1216
String errorMessage = "attempting to register socket on currently used port:"
1217
+ currentSocket.getLocalVirtualPort();
1218
log.error(errorMessage);
1219
throw new IOException(errorMessage);
1222
if (debug) log.debug("registering virtual socket on port: " + localSocketId.getPort());
1223
registeredSockets.add(socket.getLocalSocketId());
1224
return getAnInputStream(localSocketId, socket);
1229
* Indicates that a <code>VirtualSocket</code> is leaving a virtual socket group.
1231
* @throws IOException
1233
public synchronized void unRegisterSocket(VirtualSocket socket) throws IOException
1237
if (debug) log.debug(this + ": entering unRegisterSocket()");
1238
shutdownManager.decrementReferences();
1240
SocketId localSocketId = socket.getLocalSocketId();
1241
if (localSocketId == null)
1244
VirtualSocket currentSocket = (VirtualSocket) socketMap.remove(localSocketId);
1245
if (currentSocket == null)
1247
String errorMessage = "attempting to unregister unrecognized socket: " + socket.getLocalSocketId().getPort();
1248
log.error(errorMessage);
1249
throw new IOException(errorMessage);
1252
if (debug) log.debug("unregistering virtual socket on port: " + localSocketId.getPort());
1253
registeredSockets.remove(localSocketId);
1254
removeAnInputStream(localSocketId);
1255
if (debug) log.debug(this + ": leaving unRegisterSocket()");
1265
* Indicates that a <code>VirtualServerSocket</code> belongs the virtual socket group at the
1266
* remote end of the connection. This virtual socket groupD becomes "joinable", in the
1267
* sense that a new <code>VirtualSocket</code> <it>v</it> can join this virtual group because there is a
1268
* <code>VirtualServerSocket</code> at the other end of the connection to create a remote peer for
1271
public synchronized void registerRemoteServerSocket() throws IOException
1273
log.debug("registerRemoteServerSocket()");
1274
if (remoteServerSocketRegistered)
1276
log.error("duplicate remote server socket registration");
1277
throw new IOException("duplicate remote server socket registration");
1281
remoteServerSocketRegistered = true;
1283
// Now that remote MultiplexingManager has a VirtualServerSocket, we
1284
// add it to set of managers eligible to accept new clients.
1285
registerShareable(remoteSocketAddress);
1287
synchronized (threadsWaitingForRemoteServerSocket)
1289
threadsWaitingForRemoteServerSocket.notifyAll();
1292
if (!createdForRemoteServerSocket)
1293
incrementReferences();
1299
* Indicates there will no longer be a <code>VirtualServeSocket</code> at the remote end of
1300
* this connection. This virtual socket group will no longer be "joinable".
1301
* (See registerRemoteServerSocket().)
1303
public synchronized void unRegisterRemoteServerSocket()
1305
if (!remoteServerSocketRegistered)
1306
log.error("no remote server socket is registered");
1309
if (debug) log.debug(this + ": remote VSS unregistering");
1310
remoteServerSocketRegistered = false;
1311
unregisterShareable();
1313
MultiplexingManager.addToPendingActions
1321
decrementReferences();
1323
catch (IOException e)
1334
public void setCreatedForRemoteServerSocket()
1336
createdForRemoteServerSocket = true;
1340
public synchronized boolean isRemoteServerSocketRegistered()
1342
return remoteServerSocketRegistered;
1346
public boolean waitForRemoteServerSocketRegistered()
1348
if (remoteServerSocketRegistered)
1351
synchronized (threadsWaitingForRemoteServerSocket)
1353
threadsWaitingForRemoteServerSocket.add(Thread.currentThread());
1355
while (!remoteServerSocketRegistered)
1359
threadsWaitingForRemoteServerSocket.wait();
1361
catch (InterruptedException e)
1363
log.info("interrupted waiting for registration of remote server socket");
1367
threadsWaitingForRemoteServerSocket.remove(Thread.currentThread());
1374
threadsWaitingForRemoteServerSocket.remove(Thread.currentThread());
1379
* Increment reference counter for this <code>MultiplexingManager</code>.
1380
* @throws IOException
1382
public void incrementReferences() throws IOException
1384
shutdownManager.incrementReferences();
1389
* Decrement reference counter for this <code>MultiplexingManager</code>.
1390
* @throws IOException
1392
public void decrementReferences() throws IOException
1394
shutdownManager.decrementReferences();
1402
public Collection getAllOutputStreams()
1404
return outputStreamMap.values();
1409
* Returns<code> OutputStream</code> to use when corrupted InputStream gives no known "mailbox"
1410
* @return OutputStream to use when corrupted InputStream gives no known "mailbox"
1412
public OutputStream getDeadLetterOutputStream()
1414
return deadLetterOutputStream;
1419
* Returns <code>InputStream</code> of real socket
1420
* @return InputStream of real socket
1422
public InputStream getInputStream()
1429
* Returns <code>OutputStream</code> of real socket
1430
* @return OutputStream of real socket
1432
public OutputStream getOutputStream()
1434
return outputStream;
1438
* Get an <code>InputStream</code> for a <code>VirtualSocket</code>.
1442
* @throws IOException
1444
public MultiplexingInputStream getAnInputStream(SocketId socketId, VirtualSocket socket) throws IOException
1446
if (debug) log.debug("getAnInputStream(): " + socketId.getPort());
1447
MultiplexingInputStream mis = (MultiplexingInputStream) inputStreamMap.get(socketId);
1451
if (mis.getSocket() == null)
1452
mis.setSocket(socket);
1456
GrowablePipedOutputStream pos = (GrowablePipedOutputStream) outputStreamMap.get(socketId);
1459
pos = new GrowablePipedOutputStream();
1460
outputStreamMap.put(socketId, pos);
1463
mis = new MultiplexingInputStream(pos, this, socket);
1464
inputStreamMap.put(socketId, mis);
1465
if (readException != null)
1466
mis.setReadException(readException);
1472
* Get an <code>OutputStrem</code> for a <code>VirtualSocket</code>.
1476
public GrowablePipedOutputStream getAnOutputStream(SocketId socketId)
1478
if (debug) log.debug("getAnOutputStream(): " + socketId.getPort());
1480
GrowablePipedOutputStream pos = (GrowablePipedOutputStream) outputStreamMap.get(socketId);
1483
pos = new GrowablePipedOutputStream();
1484
outputStreamMap.put(socketId, pos);
1495
public MultiplexingOutputStream getAnOutputStream(VirtualSocket socket, SocketId socketId)
1497
MultiplexingOutputStream mos = new MultiplexingOutputStream(this, socket, socketId);
1498
outputStreamSet.add(mos);
1499
if (writeException != null)
1500
mos.setWriteException(writeException);
1506
* Returns a <code>GrowablePipedOutputStream</code> that is connected to a
1507
* <code>MultiplexingInputStream</code>.
1508
* It will create the <code>MultiplexingInputStream</code> if necessary. This method exists to
1509
* allow <code>InputMultiplexor</code> to get a place to put bytes directed to an unrecognized
1510
* SocketId, which might be necessary if the remote socket starts writing before this
1511
* end of the connection is ready. Originally, we had a three step handshake, in which
1512
* (1) the client socket sent a "connect" message, (2) the server socket sent a "connected"
1513
* message, and (3) the client socket sent a "connect verify" message. In the interests
1514
* of performance we eliminated the final step.
1518
* @throws IOException
1520
public GrowablePipedOutputStream getConnectedOutputStream(SocketId socketId) throws IOException
1522
if (debug) log.debug("getConnectedOutputStream(): " + socketId.getPort());
1524
MultiplexingInputStream mis = (MultiplexingInputStream) inputStreamMap.get(socketId);
1527
GrowablePipedOutputStream pos = (GrowablePipedOutputStream) outputStreamMap.get(socketId);
1530
StringBuffer message = new StringBuffer();
1531
message.append("MultiplexingInputStream exists ")
1532
.append("without matching GrowablePipedOutputStream: ")
1533
.append("socketId = ").append(socketId);
1534
throw new IOException(message.toString());
1539
GrowablePipedOutputStream pos = (GrowablePipedOutputStream) outputStreamMap.get(socketId);
1542
pos = new GrowablePipedOutputStream();
1543
outputStreamMap.put(socketId, pos);
1546
mis = new MultiplexingInputStream(pos, this);
1547
inputStreamMap.put(socketId, mis);
1555
public OutputStream getBackchannelOutputStream()
1557
return backchannelOutputStream;
1562
* @return handshakeCompletedEvent
1564
public HandshakeCompletedEvent getHandshakeCompletedEvent()
1566
return handshakeCompletedEvent;
1572
public OutputMultiplexor getOutputMultiplexor()
1574
return outputMultiplexor;
1583
public OutputStream getOutputStreamByLocalSocket(SocketId socketId)
1585
return (OutputStream) outputStreamMap.get(socketId);
1593
public Protocol getProtocol()
1602
public synchronized ServerSocket getServerSocket()
1604
return serverSocket;
1612
public Socket getSocket()
1623
public VirtualSocket getSocketByLocalPort(SocketId socketId)
1625
return (VirtualSocket) socketMap.get(socketId);
1632
public SocketFactory getSocketFactory()
1634
return socketFactory;
1639
* To implement <code>HandshakeCompletedListener</code> interface.
1641
public void handshakeCompleted(HandshakeCompletedEvent event)
1643
description = socket.toString();
1645
handshakeCompletedEvent = event;
1646
Object obj = configuration.get(Multiplex.SSL_HANDSHAKE_LISTENER);
1649
HandshakeCompletedListener listener = (HandshakeCompletedListener) obj;
1650
listener.handshakeCompleted(event);
1658
public boolean isBound()
1667
public boolean isConnected()
1676
public synchronized boolean isServerSocketRegistered()
1678
return serverSocket != null;
1685
public boolean isShutdown()
1697
public synchronized boolean isSocketRegistered(SocketId socketId)
1699
return registeredSockets.contains(socketId);
1706
public boolean respondToShutdownRequest()
1708
return shutdownManager.respondToShutdownRequest();
1713
* @param socketFactory
1715
public void setSocketFactory(SocketFactory socketFactory)
1717
this.socketFactory = socketFactory;
1724
public int getShutdownMonitorPeriod()
1726
return shutdownMonitorPeriod;
1733
public int getShutdownRefusalsMaximum()
1735
return shutdownRefusalsMaximum;
1742
public int getShutdownRequestTimeout()
1744
return shutdownRequestTimeout;
1751
public void setShutdownRequestTimeout(int timeout)
1753
shutdownRequestTimeout = timeout;
1760
public void setShutdownRefusalsMaximum(int maximum)
1762
shutdownRefusalsMaximum = maximum;
1769
public void setShutdownMonitorPeriod(int period)
1771
shutdownMonitorPeriod = period;
1776
* Needed to implement <code>OutputMultiplexor.OutputMultiplexorClient</code>
1778
public synchronized void outputFlushed()
1780
if (shutdownThread != null)
1781
shutdownThread.setSafeToShutdown(true);
1786
public String toString()
1788
if (description != null)
1790
return super.toString();
1793
/********************************************************************************************
1794
* protected methods and classes
1795
********************************************************************************************/
1798
protected Socket createSocket(InetSocketAddress endpoint, int timeout) throws IOException
1800
Socket socket = null;
1802
if (localSocketAddress == null)
1804
if (socketFactory != null)
1805
socket = socketFactory.createSocket(endpoint.getAddress(), endpoint.getPort());
1807
socket = SocketChannel.open(endpoint).socket();
1811
// It's possible that bind() was called, but a socket hasn't been created yet, since
1812
// SSLSocketFactory will not create an unconnected socket. In that case, bind() just
1813
// saved localSocketAddress for later use.
1814
if (socketFactory != null)
1815
socket = socketFactory.createSocket(endpoint.getAddress(),
1817
localSocketAddress.getAddress(),
1818
localSocketAddress.getPort());
1821
socket = SocketChannel.open().socket();
1822
socket.bind(localSocketAddress);
1823
socket.connect(endpoint);
1827
if (socket instanceof SSLSocket)
1829
// Object o = configuration.get(Multiplex.SSL_HANDSHAKE_LISTENER);
1832
// HandshakeCompletedListener hcl = (HandshakeCompletedListener) o;
1833
// ((SSLSocket) socket).addHandshakeCompletedListener(hcl);
1835
((SSLSocket) socket).addHandshakeCompletedListener(this);
1838
socket.setSoTimeout(timeout);
1842
protected Socket createSocket() throws IOException
1844
Socket socket = null;
1848
if (socketFactory != null)
1849
socket = socketFactory.createSocket();
1851
socket = SocketChannel.open().socket();
1853
if (socket instanceof SSLSocket)
1855
// Object o = configuration.get(Multiplex.SSL_HANDSHAKE_LISTENER);
1858
// HandshakeCompletedListener hcl = (HandshakeCompletedListener) o;
1859
// ((SSLSocket) socket).addHandshakeCompletedListener(hcl);
1861
((SSLSocket) socket).addHandshakeCompletedListener(this);
1864
catch (IOException e)
1866
if ("Unconnected sockets not implemented".equals(e.getMessage()))
1878
protected void registerByLocalAddress(InetSocketAddress address)
1880
synchronized (localAddressMapLock)
1882
localSocketAddress = address;
1883
HashSet managers = (HashSet) managersByLocalAddress.get(address);
1885
if (managers == null)
1887
managers = new HashSet();
1888
managersByLocalAddress.put(address, managers);
1893
// allow searching on any local address
1894
localWildCardAddress = new InetSocketAddress(address.getPort());
1895
managers = (HashSet) managersByLocalAddress.get(localWildCardAddress);
1897
if (managers == null)
1899
managers = new HashSet();
1900
managersByLocalAddress.put(localWildCardAddress, managers);
1911
protected void unregisterByLocalAddress()
1913
synchronized (localAddressMapLock)
1915
HashSet managers = null;
1917
if (localSocketAddress != null)
1919
managers = (HashSet) managersByLocalAddress.get(localSocketAddress);
1921
if (managers != null)
1923
managers.remove(this);
1925
if (managers.isEmpty())
1926
managersByLocalAddress.remove(localSocketAddress);
1930
if (localWildCardAddress != null)
1932
managers = (HashSet) managersByLocalAddress.get(localWildCardAddress);
1934
if (managers != null)
1936
managers.remove(this);
1938
if (managers.isEmpty())
1939
managersByLocalAddress.remove(localWildCardAddress);
1950
protected void registerByRemoteAddress(InetSocketAddress address)
1952
remoteSocketAddress = address;
1954
synchronized (remoteAddressMapLock)
1956
HashSet managers = (HashSet) managersByRemoteAddress.get(address);
1958
if (managers == null)
1960
managers = new HashSet();
1962
managersByRemoteAddress.put(address, managers);
1973
protected void unregisterByRemoteAddress()
1975
if (remoteSocketAddress != null)
1977
synchronized (remoteAddressMapLock)
1979
HashSet managers = (HashSet) managersByRemoteAddress.get(remoteSocketAddress);
1981
if (managers != null)
1983
managers.remove(this);
1985
if (managers.isEmpty())
1986
managersByRemoteAddress.remove(remoteSocketAddress);
1997
protected void registerShareable(InetSocketAddress address)
1999
if (debug) log.debug("registering as shareable: " + this + ": " + address.toString());
2000
synchronized (shareableMapLock)
2002
HashSet managers = (HashSet) shareableManagers.get(address);
2004
if (managers == null)
2006
managers = new HashSet();
2008
shareableManagers.put(address, managers);
2019
protected void unregisterShareable()
2021
if (debug) log.debug("unregistering remote: " + this + ": " + description);
2022
if (remoteSocketAddress != null)
2024
synchronized (shareableMapLock)
2026
HashSet managers = (HashSet) shareableManagers.get(remoteSocketAddress);
2028
if (managers != null)
2030
managers.remove(this);
2032
if (managers.isEmpty())
2033
shareableManagers.remove(remoteSocketAddress);
2043
protected void unregisterAllMaps()
2045
unregisterByLocalAddress();
2046
unregisterByRemoteAddress();
2047
unregisterShareable();
2053
protected void removeAnInputStream(SocketId socketId)
2055
if (debug) log.debug("entering removeAnInputStream(): " + socketId.getPort());
2056
InputStream is = (InputStream) inputStreamMap.remove(socketId);
2057
OutputStream os = (OutputStream) outputStreamMap.remove(socketId);
2065
catch (Exception ignored)
2067
log.error("error closing PipedInputStream (" + socket.getPort() + ")", ignored);
2077
catch (Exception ignored)
2079
log.error("error closing PipedOutputStream (" + socket.getPort() + ")", ignored);
2085
protected void setReadException(IOException e)
2087
// Note. It looks like there could be a race between setReadException() and
2088
// getAnInputStream(). However, suppose getAnInputStream() gets to its test
2089
// (readException != null) before readException is set here. Then if it created a
2090
// new InputStream "is", setReadException() will see "is" in inputStreamMap and will
2091
// set its read exception. Suppose getAnInputStream gets to its test
2092
// (readException != null) after setReadException() sets readException. Then
2093
// if it created a new InputStream "is", it will set the read exception for "is".
2095
// Remove from shareable map (if it's in map).
2096
unregisterAllMaps();
2097
notifySocketsOfException();
2099
// Unregister with input thread.
2100
if (multiGroupInputThread != null)
2101
multiGroupInputThread.unregisterSocketGroup(this);
2106
synchronized (inputStreamMap)
2108
tempSet = new HashSet(inputStreamMap.values());
2111
Iterator it = tempSet.iterator();
2112
while (it.hasNext())
2114
MultiplexingInputStream is = (MultiplexingInputStream) it.next();
2115
is.setReadException(e);
2120
protected void setWriteException(IOException e)
2122
// Note. See Note in setReadException().
2123
// If this connection is unusable, take out of shareable map (if it's in shareable map).
2124
unregisterAllMaps();
2125
notifySocketsOfException();
2127
// Unregister with output thread.
2128
outputMultiplexor.unregister(this);
2133
synchronized (outputStreamMap)
2135
tempSet = new HashSet(outputStreamSet);
2138
Iterator it = tempSet.iterator();
2139
while (it.hasNext())
2141
MultiplexingOutputStream os = (MultiplexingOutputStream) it.next();
2142
os.setWriteException(e);
2147
protected void notifySocketsOfException()
2149
synchronized (socketMap)
2151
Iterator it = socketMap.values().iterator();
2152
while (it.hasNext())
2153
((VirtualSocket) it.next()).notifyOfException();
2158
protected void setEOF()
2160
// Note. See Note in setReadException().
2161
log.debug("setEOF()");
2163
synchronized (inputStreamMap)
2165
tempSet = new HashSet(inputStreamMap.values());
2168
Iterator it = tempSet.iterator();
2169
while (it.hasNext())
2171
MultiplexingInputStream is = (MultiplexingInputStream) it.next();
2174
is.handleRemoteShutdown();
2176
catch (IOException e)
2187
protected synchronized void shutdown()
2189
if (debug) log.debug(description + ": entering shutdown()");
2190
shutdownThread = new ShutdownThread();
2191
shutdownThread.setName(shutdownThread.getName() + ":shutdown");
2192
shutdownThread.start();
2197
* The motivation behind this class is to prevent the following problem. Suppose MultiplexingManager A
2198
* is connected to MultiplexingManager B, and A decides to shut down. Suppose A shuts down before B knows A
2199
* is shutting down, and suppose a VirtualSocket C starts up, finds B, and attaches itself to B. Then C
2200
* will have "died a-borning," in the words of Tom Paxton. We need a handshake protocol to ensure a
2201
* proper shutdown process. In the following, let A be the local MultiplexingManager and let B be its
2204
* There are two forms of synchronization in ShutdownManager. incrementReferences() and decrementReferences()
2205
* maintain the reference count to its MultiplexingManager, and of course the changes to variable
2206
* referenceCount have to be synchronized. Parallel to incrementReferences() and decrementReferences() are
2207
* the pair of methods reseverManager() and unreserveManager(), which are similar but intended for holding a
2208
* MultiplexingManger for more temporary applications. See, for example, getaManagerByRemoteAddress().
2210
* There is also a need for distributed synchronization. When decrementReferences() on A decrements the
2211
* referenceCount to 0, it indicates to B the desire of A to shut down. Since all of the virtual sockets
2212
* on A are connected to virtual sockets on B, normally the request would be honored. However, if a new virtual
2213
* socket attaches itself to B, then it would have > 0 clients, and it would refuse the request to shut down.
2214
* In any case, the request is made through Protocol.requestManagerShutdown(), which results in a call to
2215
* ShutdownManager.respondToShutdownRequest() on B, which is synchronized since it reads the
2216
* readyToShutdown variable, which is modified by decrementReferences(). Here lies the danger of
2217
* distributed deadlock. If decrementReferences() on A and B both start executing at about the same time, they
2218
* would both be waiting for a response from respondToShutdownRequest(). But each respondToShutdownRequest()
2219
* would be locked out because each decrementReferences() is blocked on i/o.
2221
* The solution is to put the i/o operation in a separate thread, ShutdownRequestThread, and have decrementReferences()
2222
* enter a wait state, allowing respondToShutdownRequest() to execute. So, on each end respondToShutdownRequest()
2223
* will return an answer ("go ahead and shut down", in particular), and each ShutdownRequestThread.run() will wake up
2224
* the waiting decrementReferences(), which will then run to completion.
2226
* Note also that while decrementReferences() is waiting, incrementReferences() can run. However, before it waits,
2227
* decrementReferences() sets the shutdownRequestInProgress flag, and if incrementReferences() finds the flag set, it
2228
* will also enter a wait state and will take no action until the outstanding shutdown request is accepted or
2231
* Another issue is what to do if MultiplexingManager B responds negatively to A's request to shut down, not
2232
* because it has a new client but simply because some of its virtual sockets just haven't gotten around to
2233
* closing yet. When B's referenceCount finally goes to 0, it will send a shutdown request to A, and if A's
2234
* referenceCount is still 0, B will shut down. But what about B? If decrementReferences() gets a negative
2235
* response, it will start up a ShutdownMonitorThread, which, as long as readyToShutdown remains true, will
2236
* periodically check to see if remoteShutdown has been set to true. If it has, ShutdownMonitorThread
2237
* initiates the shut down of its enclosing MultiplexingManager.
2239
* reserveManager() interacts with decrementReferences() by preventing decrementReferences() from committing to
2240
* shutting down. If reserveManager() runs first, it sets the flag reserved to true, which causes
2241
* decrementReferences() to return without checking for referenceCount == 0. If decrementReferences() runs
2242
* first and finds referenceCount == 0 and gets a positve response from the remote manager, then reserveManager()
2243
* will throw an IOException. But if decrementReferences() gets a negative response, it will start up a
2244
* ShutdownMonitorThread, which runs as long as the flag readyToShutdown is true. But reserveManager() will
2245
* set readyToShutdown to false, ending the ShutdownMonitorThread. When unreserveManager() eventually runs
2246
* and sees referenceCount == 0, it will increment referenceCount and call decrementReferences(), allowing the
2247
* shutdown attempt to proceed anew. Note that when incrementReferences() runs successfully, it sets the flag
2248
* reserved to false, since incrementing referenceCount will also keep the MultiplexingManager alive.
2250
protected class ShutdownManager
2252
/** referenceCount keeps track of the number of clients of ShutdownManager's enclosing
2253
* MultiplexingManager. */
2254
private int referenceCount = 1;
2256
/** reserved is set to true to prevent the manager from shutting down without incrementing
2257
* the reference count. */
2258
private boolean reserved = false;
2260
/** shutdownRequestInProgress remains true while a remote shutdown request is pending */
2261
private boolean shutdownRequestInProgress = false;
2263
/** readyToShutdown is set to true as long as long as referenceCount == 0. It is interpreted
2264
* by respondToShutdownRequest() as the local manager's willingness to shut down. */
2265
private boolean readyToShutdown = false;
2267
/** shutdownMonitorThread holds a reference to the most recently created ShutdownMonitorThread. */
2268
// ShutdownMonitorThread shutdownMonitorThread;
2269
ShutdownMonitorTimerTask shutdownMonitorTimerTask;
2271
/** shutdown is set to true when the irrevocable decision has been made to shut down. Once it
2272
* is set to true, it is never set to false. */
2273
private boolean shutdown = false;
2275
/** remoteShutdown is set to true when the remote manager makes a shutdown request and
2276
* respondToShutdownRequest(), its agent for the request, discovers that the local manager
2277
* is also willing to shut down. It represents the fact that the remote manager will respond
2278
* by deciding to shut down. */
2279
private boolean remoteShutdown = false;
2281
/** shutdownHandled is set to true when ShutdownMonitorTimerTask begins the
2282
* shut down process */
2283
private boolean shutdownHandled;
2285
/** requestShutdownFailed is true if and only if ShutdownRequestThread's attempt
2286
* to get a response to a shut down request failed */
2287
private boolean requestShutdownFailed;
2290
private class ShutdownRequestThread extends Thread
2292
public ShutdownRequestThread()
2294
shutdownRequestInProgress = true;
2301
// Note. The timeout for Protocol's input stream should be longer than the
2302
// time spent in decrementReferences() or by ShutdownMonitorTimerTask waiting
2304
shutdown = protocol.requestManagerShutdown(shutdownRequestTimeout * 2);
2305
if (debug) log.debug("shutdown: " + shutdown);
2307
catch (SocketTimeoutException e)
2309
requestShutdownFailed = true;
2310
log.debug("socket timeout exception in manager shutdown request");
2314
requestShutdownFailed = true;
2315
log.debug("i/o exception in manager shutdown request", e);
2318
if (debug) log.debug("ShutdownRequestThread.run() done: " + shutdown);
2319
shutdownRequestInProgress = false;
2321
synchronized(ShutdownManager.this)
2323
ShutdownManager.this.notifyAll();
2330
* It is possible,due to a race condition, for there to be multiple ShutdownMonitorThreads running.
2331
* In particular, suppose
2334
* <li> decrementReferences() starts a ShutdownMonitorThread T1,
2335
* <li> reserverManager() runs and calls T1.terminate(),
2336
* <li> unreserveManager() runs, sees referenceCount == 0, and calls decrementReferences(),
2337
* <li> decrementReferences() creates a new ShutdownMonitorThread T2,
2338
* <li> T1 leaves sleep(1000) finds remoteShutdown == true and calls shutdown(), and
2339
* <li> T2 finds remoteShutdown == true.
2342
* For this reason, we turn on shutdown before calling shutdown() in T1, so that T2 will
2343
* not call shutdown().
2346
// private class ShutdownMonitorThread extends Thread
2348
// boolean running = true;
2350
// public void terminate()
2355
// public void run()
2357
// log.debug(socket.toString() + ": entering ShutdownMonitorThread");
2365
// catch (InterruptedException ignored) {}
2367
// synchronized (ShutdownManager.this)
2369
// if (readyToShutdown && remoteShutdown && !shutdown)
2371
// log.debug("ShutdownMonitorThread: found remoteShutdown == true");
2374
// ShutdownManager.this.notifyAll();
2383
private class ShutdownMonitorTimerTask extends TimerTask
2388
public boolean cancel()
2390
log.debug("cancelling ShutdownMonitorTimerTask");
2392
return super.cancel();
2397
if (debug) log.debug(description + ": entering ShutdownMonitorTimerTask");
2400
synchronized (ShutdownManager.this)
2402
// Another ShutdownMonitorTimerTask got here first.
2403
if (shutdownHandled)
2405
if (debug) log.debug(description + ": shutdownHandled == true");
2409
// ShutdownRequestThread got a positive response.
2412
if (debug) log.debug(description + ": shutdown is true");
2413
shutdownHandled = true;
2418
// Peer MultiplexingManager requested shutdown consent.
2419
else if (readyToShutdown && remoteShutdown)
2421
if (debug) log.debug(description + ": ShutdownMonitorTimerTask: found remoteShutdown == true");
2423
shutdownHandled = true;
2425
ShutdownManager.this.notifyAll();
2429
// Timeout (or other error) in ShutdownRequestThread
2430
else if (requestShutdownFailed)
2432
if (debug) log.debug(description + ": ShutdownMonitorTimerTask: found requestShutdownFailed == true");
2434
shutdownHandled = true;
2436
ShutdownManager.this.notifyAll();
2440
// Count of peer MultiplexingManager refusals has reached maximum.
2441
// Assume peer is hung up somehow and shut down.
2442
else if (count > shutdownRefusalsMaximum)
2445
log.debug(description + ": ShutdownMonitorTimerTask: " +
2446
"shutdown refusal count exceeded maximut: " + shutdownRefusalsMaximum);
2449
shutdownHandled = true;
2451
ShutdownManager.this.notifyAll();
2455
// ShutdownRequestThread is still running.
2456
else if (shutdownRequestInProgress)
2458
if (debug) log.debug(description + ": shutdownRequestInProgress == true");
2462
// ShutdownRequestThread got a negative response. If we haven't been cancelled
2463
// yet, we still are trying to shut down. Ask again.
2466
// Note. The timeout for Protocol's input stream should be longer than the
2467
// time spent waiting for a response.
2468
ShutdownRequestThread shutdownRequestThread = new ShutdownRequestThread();
2469
shutdownRequestThread.setName(shutdownRequestThread.getName() + ":shutdownRequest:" + time);
2470
shutdownRequestThread.setDaemon(true);
2471
if (debug) log.debug(description + ": starting ShutdownRequestThread: " + shutdownRequestThread.toString());
2472
shutdownRequestThread.start();
2477
public String toString()
2479
return "shutdownRequest:" + time;
2485
* @throws IOException
2487
public synchronized void reserveManager() throws IOException
2489
if (debug) log.debug(description + referenceCount);
2491
// If decrementReferences() grabbed the lock first, set referenceCount to 0, and initiated a
2492
// remote shutdown request, wait until the answer comes back.
2493
while (shutdownRequestInProgress)
2499
catch (InterruptedException e)
2502
log.error("interruption in ShutdownRequestThread");
2506
// 1. shutdown is true if and only if (a) ShutdownRequestThread returned an indication that the
2507
// remote manager is shutting down, or (b) the wait() in decrementReferences() for the return
2508
// of ShutdownRequestThread timed out and decrementReferences() set shutdown to true;
2509
// 2. remoteShutdown is true if and only if the remote manager initiated a shutdown request
2510
// which found that the local manager was ready to shut down. In this case, the remote
2511
// manager would go ahead and shut down, and the local manager will inevitably shutdown as well.
2513
if (shutdown || remoteShutdown)
2514
throw new IOException("manager shutting down");
2516
readyToShutdown = false;
2519
// if (shutdownMonitorThread != null)
2520
// shutdownMonitorThread.terminate();
2522
if (shutdownMonitorTimerTask != null)
2523
shutdownMonitorTimerTask.cancel();
2525
// wake up decrementReferences() if it is waiting
2533
public synchronized void unreserveManager()
2535
if (debug) log.debug(description + referenceCount);
2539
log.error("attempting to unreserve a MultiplexingManager that was not reserved: " + description);
2545
// If referenceCount == 0, it is because it was decremented by decrementReferences(). But if
2546
// reserveManager() was able to run, it was because decrementReferences() did not succeed in
2547
// negotiating a shutdown. Either it found reserved == true and gave up, or it received a
2548
// negative reply from the remote manager and started up a ShutdownMonitorThread, which would
2549
// have terminated when reserveManager() set readyToShutdown to false. It is therefore
2550
// appropriate to give decrementReferences() another opportunity to shut down.
2551
if (referenceCount == 0)
2554
decrementReferences();
2560
public synchronized void incrementReferences() throws IOException
2562
if (debug) log.debug(description + referenceCount);
2564
// If decrementReferences() grabbed the lock first, set referenceCount to 0, and initiated a
2565
// remote shutdown request, wait until the answer comes back.
2566
while (shutdownRequestInProgress)
2572
catch (InterruptedException e)
2575
log.error("interruption in ShutdownRequestThread");
2579
// 1. shutdown is true if and only if (a) ShutdownRequestThread returned an indication that the
2580
// remote manager is shutting down, or (b) the wait() in decrementReferences() for the return
2581
// of ShutdownRequestThread timed out and decrementReferences() set shutdown to true;
2582
// 2. remoteShutdown is true if and only if the remote manager initiated a shutdown request
2583
// which found that the local manager was ready to shut down. In this case, the remote
2584
// manager would go ahead and shut down, and the local manager will inevitably shutdown as well.
2586
if (shutdown || remoteShutdown)
2587
throw new IOException("not accepting new clients");
2589
readyToShutdown = false;
2593
if (debug) log.debug(description + referenceCount);
2595
// if (shutdownMonitorThread != null)
2596
// shutdownMonitorThread.terminate();
2598
if (shutdownMonitorTimerTask != null)
2599
shutdownMonitorTimerTask.cancel();
2601
// wake up decrementReferences() if it is waiting
2609
public synchronized void decrementReferences()
2612
if (debug) log.debug(description + referenceCount);
2616
if (debug) log.debug(description + ": reserved == true");
2620
if (referenceCount == 0)
2622
readyToShutdown = true;
2626
ShutdownRequestThread shutdownRequestThread = new ShutdownRequestThread();
2627
shutdownRequestThread.setName(shutdownRequestThread.getName() + ":shutdownRequest:" + time);
2628
shutdownRequestThread.setDaemon(true);
2629
if (debug) log.debug(description + "starting ShutdownRequestThread: " + shutdownRequestThread.toString());
2630
shutdownRequestThread.start();
2634
// Note. The timeout for Protocol's input stream should be longer than the
2635
// time spent waiting for a response.
2636
wait(shutdownRequestTimeout);
2638
catch (InterruptedException e)
2641
log.error("interrupt in ShutdownRequestThread");
2644
if (log.isDebugEnabled())
2646
log.debug(description + shutdown);
2647
log.debug(description + shutdownRequestThread.isAlive());
2650
// If shutdownRequestInProgress is still true, we assume that the peer MultiplexingManager
2651
// has shut down or is inaccessible, and we shut down.
2652
if (shutdownRequestInProgress)
2656
// turn off shutdownRequestInProgress in case incrementReferences() or reserveManager()
2658
shutdownRequestInProgress = false;
2661
else // !isConnected()
2669
// wake up incrementReferences() if it is waiting
2674
// shutdownMonitorThread = new ShutdownMonitorThread();
2675
// shutdownMonitorThread.setName(shutdownMonitorThread.getName() + ":shutdownMonitor");
2676
// shutdownMonitorThread.start();
2678
shutdownMonitorTimerTask = new ShutdownMonitorTimerTask();
2679
if (debug) log.debug(description + ": scheduling ShutdownMonitorTask: " + shutdownMonitorTimerTask);
2680
timer.schedule(shutdownMonitorTimerTask, shutdownMonitorPeriod, shutdownMonitorPeriod);
2689
protected synchronized boolean respondToShutdownRequest()
2693
log.debug(description + readyToShutdown);
2694
log.debug(description + shutdown);
2697
if (readyToShutdown)
2699
remoteShutdown = true;
2700
if (debug) log.debug(description + ": respondToShutdownRequest(): set remoteShutdown to true");
2703
return readyToShutdown;
2710
protected boolean isShutdown()
2720
protected class ShutdownThread extends Thread
2722
private boolean safeToShutDown;
2726
String message = null;
2727
if (debug) log.debug(description + ": manager shutting down");
2729
// Unregister this MultiplexingManager by local address(es)
2730
unregisterByLocalAddress();
2732
// Unregister this MultiplexingManager by remote address
2733
unregisterByRemoteAddress();
2735
// Remove this MultiplexingManager from Map of shareable managers
2736
unregisterShareable();
2742
if (outputMultiplexor != null)
2744
outputMultiplexor.unregister(MultiplexingManager.this);
2746
// Don't close socket until all output has been written.
2747
synchronized (MultiplexingManager.this)
2749
while (!safeToShutDown)
2751
if (debug) log.debug("waiting for safe to shut down");
2754
MultiplexingManager.this.wait();
2756
catch (InterruptedException ignored)
2763
if (socket.getChannel() == null)
2767
// socket.getChannel().close();
2768
message = description;
2770
if (multiGroupInputThread != null)
2771
multiGroupInputThread.unregisterSocketGroup(MultiplexingManager.this);
2774
if (debug) log.debug("closed socket: " + description);
2775
// multiGroupInputThread.unregisterSocketGroup(MultiplexingManager.this);
2779
log.debug("manager: closed socket");
2783
log.error("manager: unable to close socket", e);
2787
if (inputThread != null)
2789
inputThread.shutdown();
2794
log.debug("manager: joined input thread");
2796
catch (InterruptedException ignored)
2798
log.debug("manager: interrupted exception waiting for read thread");
2802
removeAnInputStream(SocketId.PROTOCOL_SOCKET_ID);
2803
removeAnInputStream(SocketId.SERVER_SOCKET_ID);
2804
removeAnInputStream(SocketId.SERVER_SOCKET_CONNECT_ID);
2805
removeAnInputStream(SocketId.SERVER_SOCKET_VERIFY_ID);
2806
removeAnInputStream(SocketId.BACKCHANNEL_SOCKET_ID);
2810
// Remove this MultiplexManager from set of all managers
2811
if (debug) log.debug("removing from allManagers: " + description + "(" + id + ")");
2812
allManagers.remove(MultiplexingManager.this);
2814
if (debug) log.debug("manager shut down (: " + id + "): " + message);
2815
if (debug) log.debug("managers left: " + allManagers.size());
2818
public void setSafeToShutdown(boolean safe)
2820
if (debug) log.debug("output flushed");
2821
safeToShutDown = safe;
2826
protected static class PendingActionThread extends StoppableThread
2828
private List pendingActionsTemp = new ArrayList();
2830
protected void doInit()
2832
log.debug("PendingActionThread starting");
2835
protected void doRun()
2837
synchronized (pendingActions)
2839
while (pendingActions.isEmpty())
2843
pendingActions.wait();
2845
catch (InterruptedException ignored)
2852
pendingActionsTemp.addAll(pendingActions);
2853
pendingActions.clear();
2856
Iterator it = pendingActionsTemp.iterator();
2858
while (it.hasNext())
2860
Object o = it.next();
2861
if (o instanceof PendingAction)
2862
((PendingAction) o).doAction();
2864
log.error("object in closePendingSockets has invalid type: " + o.getClass());
2867
pendingActionsTemp.clear();
2870
public void shutdown()
2872
log.debug("pending action thread beginning shut down");
2877
protected void doShutDown()
2879
log.debug("PendingActionThread shutting down");
b'\\ No newline at end of file'