2
* JBoss, Home of Professional Open Source
3
* Copyright 2005, JBoss Inc., and individual contributors as indicated
4
* by the @authors tag. See the copyright.txt in the distribution for a
5
* full listing of individual contributors.
7
* This is free software; you can redistribute it and/or modify it
8
* under the terms of the GNU Lesser General Public License as
9
* published by the Free Software Foundation; either version 2.1 of
10
* the License, or (at your option) any later version.
12
* This software is distributed in the hope that it will be useful,
13
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15
* Lesser General Public License for more details.
17
* You should have received a copy of the GNU Lesser General Public
18
* License along with this software; if not, write to the Free
19
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
23
package org.jboss.remoting.transport.bisocket;
25
import java.io.DataInputStream;
26
import java.io.DataOutputStream;
27
import java.io.IOException;
28
import java.lang.reflect.Method;
29
import java.net.InetAddress;
30
import java.net.ServerSocket;
31
import java.net.Socket;
32
import java.security.AccessController;
33
import java.security.PrivilegedActionException;
34
import java.security.PrivilegedExceptionAction;
35
import java.util.ArrayList;
36
import java.util.Collection;
37
import java.util.Collections;
38
import java.util.HashMap;
39
import java.util.HashSet;
40
import java.util.Iterator;
41
import java.util.LinkedList;
42
import java.util.List;
45
import java.util.StringTokenizer;
46
import java.util.Timer;
47
import java.util.TimerTask;
49
import org.jboss.logging.Logger;
50
import org.jboss.remoting.Client;
51
import org.jboss.remoting.Home;
52
import org.jboss.remoting.InvocationRequest;
53
import org.jboss.remoting.InvokerLocator;
54
import org.jboss.remoting.Remoting;
55
import org.jboss.remoting.ServerInvocationHandler;
56
import org.jboss.remoting.invocation.InternalInvocation;
57
import org.jboss.remoting.socketfactory.CreationListenerServerSocket;
58
import org.jboss.remoting.socketfactory.CreationListenerSocketFactory;
59
import org.jboss.remoting.socketfactory.SocketCreationListener;
60
import org.jboss.remoting.transport.PortUtil;
61
import org.jboss.remoting.transport.socket.LRUPool;
62
import org.jboss.remoting.transport.socket.SocketServerInvoker;
63
import org.jboss.remoting.util.SecurityUtility;
68
* @author <a href="ron.sigal@jboss.com">Ron Sigal</a>
69
* @version $Revision: 5605 $
71
* Copyright Nov 23, 2006
74
public class BisocketServerInvoker extends SocketServerInvoker
76
private static final Logger log = Logger.getLogger(BisocketServerInvoker.class);
78
private static Map listenerIdToServerInvokerMap = Collections.synchronizedMap(new HashMap());
79
private static Timer timer;
80
private static Object timerLock = new Object();
82
private Map listenerIdToInvokerLocatorMap = Collections.synchronizedMap(new HashMap());
83
private Set secondaryServerSockets = new HashSet();
84
private InvokerLocator secondaryLocator;
85
private Set secondaryServerSocketThreads = new HashSet();
86
private Map controlConnectionThreadMap = new HashMap();
87
private Map controlConnectionRestartsMap = Collections.synchronizedMap(new HashMap());
88
private int pingFrequency = Bisocket.PING_FREQUENCY_DEFAULT;
89
private int pingWindowFactor = Bisocket.PING_WINDOW_FACTOR_DEFAULT;
90
private int pingWindow = pingWindowFactor * pingFrequency;
91
private int socketCreationRetries = Bisocket.MAX_RETRIES_DEFAULT;
92
private int controlConnectionRestarts = Bisocket.MAX_CONTROL_CONNECTION_RESTARTS_DEFAULT;
93
private ControlMonitorTimerTask controlMonitorTimerTask;
94
protected boolean isCallbackServer = false;
95
protected List secondaryBindPorts = new ArrayList();
96
protected List secondaryConnectPorts = new ArrayList();
99
public static BisocketServerInvoker getBisocketServerInvoker(String listenerId)
101
return (BisocketServerInvoker) listenerIdToServerInvokerMap.get(listenerId);
105
public BisocketServerInvoker(InvokerLocator locator)
111
public BisocketServerInvoker(InvokerLocator locator, Map configuration)
113
super(locator, configuration);
117
public void start() throws IOException
119
if (isCallbackServer)
121
Object val = configuration.get(Bisocket.MAX_RETRIES);
126
int nVal = Integer.valueOf((String) val).intValue();
127
socketCreationRetries = nVal;
128
log.debug("Setting socket creation retry limit: " + socketCreationRetries);
132
log.warn("Could not convert " + Bisocket.MAX_RETRIES +
133
" value of " + val + " to an int value.");
137
val = configuration.get(Bisocket.MAX_CONTROL_CONNECTION_RESTARTS);
142
int nVal = Integer.valueOf((String) val).intValue();
143
controlConnectionRestarts = nVal;
144
log.debug("Setting control connection restart limit: " + controlConnectionRestarts);
148
log.warn("Could not convert " + Bisocket.MAX_CONTROL_CONNECTION_RESTARTS +
149
" value of " + val + " to an int value.");
155
maxPoolSize = MAX_POOL_SIZE_DEFAULT;
157
clientpool = new LRUPool(2, maxPoolSize);
159
threadpool = new LinkedList();
160
checkSocketFactoryWrapper();
162
if (pingFrequency > 0)
164
controlMonitorTimerTask = new ControlMonitorTimerTask(this);
165
synchronized (timerLock)
169
timer = new Timer(true);
173
timer.schedule(controlMonitorTimerTask, pingFrequency, pingFrequency);
175
catch (IllegalStateException e)
177
log.debug("Unable to schedule TimerTask on existing Timer", e);
178
timer = new Timer(true);
179
timer.schedule(controlMonitorTimerTask, pingFrequency, pingFrequency);
191
if (serverSockets.size() < secondaryBindPorts.size())
192
log.warn(this + " extra secondary bind ports will be ignored");
193
else if (serverSockets.size() > secondaryBindPorts.size())
194
log.warn(this + " not enough secondary bind ports: will use anonymous ports as necessary");
196
if (secondaryConnectPorts.size() == 0)
198
secondaryConnectPorts = secondaryBindPorts;
200
else if(secondaryConnectPorts.size() != secondaryBindPorts.size())
202
log.warn(this + " number of secondary connect ports != number of secondary bind ports");
203
log.warn(this + " will ignore secondary connect ports");
204
secondaryConnectPorts = secondaryBindPorts;
208
Iterator it = serverSockets.iterator();
211
ServerSocket ss = (ServerSocket) it.next();
212
final InetAddress host = ss.getInetAddress();
213
int secondaryBindPort = -1;
214
if (secondaryBindPorts.size() > i)
216
secondaryBindPort = ((Integer) secondaryBindPorts.get(i)).intValue();
220
secondaryBindPorts.add(new Integer(-1));
222
if (secondaryBindPort < 0)
224
secondaryBindPort = PortUtil.findFreePort(host.getHostAddress());
225
secondaryBindPorts.set(i, new Integer(secondaryBindPort));
228
ServerSocket secondaryServerSocket = null;
229
final int finalBindPort = secondaryBindPort;
233
secondaryServerSocket = (ServerSocket) AccessController.doPrivileged( new PrivilegedExceptionAction()
235
public Object run() throws Exception
237
ServerSocket ss = null;
238
if (serverSocketFactory != null)
240
ss = serverSocketFactory.createServerSocket(finalBindPort, 0, host);
244
ss = new ServerSocket(finalBindPort, 0, host);
250
catch (PrivilegedActionException e)
252
throw (IOException) e.getCause();
255
ss = checkSecondaryServerSocketWrapper(secondaryServerSocket);
256
secondaryServerSockets.add(ss);
257
log.debug(this + " created secondary " + ss);
262
it = secondaryServerSockets.iterator();
265
ServerSocket secondaryServerSocket = (ServerSocket) it.next();
266
Thread t = new SecondaryServerSocketThread(secondaryServerSocket);
267
t.setName("secondaryServerSocketThread[" + i++ + "]");
270
secondaryServerSocketThreads.add(t);
271
log.debug(this + " created " + t);
274
if (getLocator().isMultihome())
277
String host = ((Home) connectHomes.get(j)).host;
278
int port = ((Integer) secondaryConnectPorts.get(j)).intValue();
280
port = ((Integer) secondaryBindPorts.get(j)).intValue();
281
StringBuffer sb = new StringBuffer(host).append(':').append(port);
282
for (j = 1; j < connectHomes.size(); j++)
284
host = ((Home) connectHomes.get(j)).host;
285
port = ((Integer) secondaryConnectPorts.get(j)).intValue();
287
port = ((Integer) secondaryBindPorts.get(j)).intValue();
288
sb.append('!').append(host).append(':').append(port);
291
Map params = new HashMap();
292
params.put(InvokerLocator.HOMES_KEY, sb.toString());
293
secondaryLocator = new InvokerLocator(null, InvokerLocator.MULTIHOME, -1, null, params);
297
String connectAddress = getLocator().getHost();
298
int connectPort = ((Integer) secondaryConnectPorts.get(0)).intValue();
300
connectPort = ((Integer) secondaryBindPorts.get(0)).intValue();
301
secondaryLocator = new InvokerLocator(null, connectAddress, connectPort, null, null);
304
log.debug(this + " created secondary InvokerLocator: " + secondaryLocator);
309
public boolean isTransportBiDirectional()
315
public void createControlConnection(String listenerId, boolean firstConnection)
318
BisocketClientInvoker clientInvoker = BisocketClientInvoker.getBisocketClientInvoker(listenerId);
320
if (clientInvoker == null)
322
log.debug("Unable to retrieve client invoker: must have disconnected");
323
throw new ClientUnavailableException();
326
InvokerLocator oldLocator = (InvokerLocator) listenerIdToInvokerLocatorMap.get(listenerId);
327
InvokerLocator newLocator = null;
331
newLocator = clientInvoker.getSecondaryLocator();
335
log.debug("unable to get secondary locator", t);
336
throw new IOException("unable to get secondary locator: " + t.getMessage());
340
// If a server restarts, it is likely that it creates a new secondary server socket on
341
// a different port. It will possible to recreate the control connection, but if
342
// there is no PingTimerTask running in the new server to keep it alive, it will just
343
// die again. Once a new secondary server socket address is detected, a count is kept
344
// of the number of times the control connection is restarted, and when it hits a
345
// configured maximum, it is allowed to die. See JBREM-731.
347
boolean locatorChanged = !newLocator.equals(oldLocator);
348
listenerIdToInvokerLocatorMap.put(listenerId, newLocator);
350
String host = newLocator.getHost();
351
int port = newLocator.getPort();
352
if (newLocator.isMultihome())
354
host = clientInvoker.getHomeInUse().host;
357
if (newLocator.getConnectHomeList().isEmpty())
358
it = newLocator.getHomeList().iterator();
360
it = newLocator.getConnectHomeList().iterator();
364
Home h = (Home) it.next();
365
if (host.equals(h.host))
368
newLocator.setHomeInUse(h);
376
throw new IOException("Cannot find matching home for control connection");
379
log.debug("creating control connection: " + newLocator);
381
Socket socket = null;
382
IOException savedException = null;
383
final String finalHost = host;
384
final int finalPort = port;
386
for (int i = 0; i < socketCreationRetries; i++)
390
socket = (Socket) AccessController.doPrivileged( new PrivilegedExceptionAction()
392
public Object run() throws Exception
395
if (socketFactory != null)
396
s = socketFactory.createSocket(finalHost, finalPort);
398
s = new Socket(finalHost, finalPort);
403
catch (PrivilegedActionException e)
405
IOException ioe = (IOException) e.getCause();
406
log.debug("Error creating a control socket", ioe);
407
savedException = ioe;
417
catch (InterruptedException e)
419
log.debug("received interrupt");
425
log.debug("unable to create control connection after "
426
+ socketCreationRetries + " retries", savedException);
427
throw savedException;
430
DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
433
dos.write(Bisocket.CREATE_CONTROL_SOCKET);
437
dos.write(Bisocket.RECREATE_CONTROL_SOCKET);
439
dos.writeUTF(listenerId);
441
Thread thread = new ControlConnectionThread(socket, listenerId);
442
thread.setName("control: " + socket.toString());
443
thread.setDaemon(true);
445
synchronized (controlConnectionThreadMap)
447
controlConnectionThreadMap.put(listenerId, thread);
450
Object o = controlConnectionRestartsMap.get(listenerId);
453
int restarts = ((Integer) o).intValue();
454
if (locatorChanged || restarts > 0)
455
controlConnectionRestartsMap.put(listenerId, new Integer(++restarts));
459
controlConnectionRestartsMap.put(listenerId, new Integer(0));
463
log.debug(this + " created control connection (" + listenerId + "): " + socket.toString());
467
public void destroyControlConnection(String listenerId)
471
synchronized (controlConnectionThreadMap)
473
t = (Thread) controlConnectionThreadMap.remove(listenerId);
478
((ControlConnectionThread)t).shutdown();
479
log.debug(this + " shutting down control connection: " + listenerId);
483
log.debug("unrecognized listener ID: " + listenerId);
486
listenerIdToInvokerLocatorMap.remove(listenerId);
487
controlConnectionRestartsMap.remove(listenerId);
491
public int getControlConnectionRestarts()
493
return controlConnectionRestarts;
497
public void setControlConnectionRestarts(int controlConnectionRestarts)
499
this.controlConnectionRestarts = controlConnectionRestarts;
503
public int getPingFrequency()
505
return pingFrequency;
509
public void setPingFrequency(int pingFrequency)
511
this.pingFrequency = pingFrequency;
512
pingWindow = pingWindowFactor * pingFrequency;
516
public int getPingWindowFactor()
518
return pingWindowFactor;
522
public void setPingWindowFactor(int pingWindowFactor)
524
this.pingWindowFactor = pingWindowFactor;
525
pingWindow = pingWindowFactor * pingFrequency;
529
public int getSecondaryBindPort()
531
if (secondaryBindPorts.size() == 0 || secondaryBindPorts.size() > 1)
534
return ((Integer) secondaryBindPorts.get(0)).intValue();
538
public void setSecondaryBindPort(int secondaryPort)
540
secondaryBindPorts.clear();
541
secondaryBindPorts.add(new Integer(secondaryPort));
545
public List getSecondaryBindPorts()
547
return new ArrayList(secondaryBindPorts);
551
public void setSecondaryBindPorts(List secondaryBindPorts)
553
this.secondaryBindPorts = secondaryBindPorts;
557
public void setSecondaryBindPorts(String secondaryBindPortString)
559
StringTokenizer tok = new StringTokenizer(secondaryBindPortString, "!");
561
while (tok.hasMoreTokens())
565
token = tok.nextToken();
566
secondaryBindPorts.add(Integer.valueOf(token));
568
catch (NumberFormatException e)
570
log.warn("Invalid format for " + "\"" + Bisocket.SECONDARY_BIND_PORT + "\": " + token);
571
secondaryBindPorts.add(new Integer(-1));
577
public int getSecondaryConnectPort()
579
if (secondaryConnectPorts.size() == 0 || secondaryConnectPorts.size() > 1)
582
return ((Integer) secondaryConnectPorts.get(0)).intValue();
586
public void setSecondaryConnectPort(int secondaryConnectPort)
588
secondaryConnectPorts.clear();
589
secondaryConnectPorts.add(new Integer(secondaryConnectPort));
593
public List getSecondaryConnectPorts()
595
return new ArrayList(secondaryConnectPorts);
599
public void setSecondaryConnectPorts(List secondaryConnectPorts)
601
this.secondaryConnectPorts = secondaryConnectPorts;
605
public void setSecondaryConnectPorts(String secondaryConnectPortString)
607
StringTokenizer tok = new StringTokenizer(secondaryConnectPortString, "!");
609
while (tok.hasMoreTokens())
613
token = tok.nextToken();
614
secondaryConnectPorts.add(Integer.valueOf(token));
616
catch (NumberFormatException e)
618
log.warn("Invalid format for " + "\"" + Bisocket.SECONDARY_CONNECT_PORT + "\": " + token);
619
secondaryConnectPorts.add(new Integer(-1));
625
public int getSocketCreationRetries()
627
return socketCreationRetries;
631
public void setSocketCreationRetries(int socketCreationRetries)
633
this.socketCreationRetries = socketCreationRetries;
637
protected void setup() throws Exception
639
Object o = configuration.get(Bisocket.IS_CALLBACK_SERVER);
642
if (o instanceof String)
643
isCallbackServer = Boolean.valueOf((String) o).booleanValue();
644
else if (o instanceof Boolean)
645
isCallbackServer = ((Boolean) o).booleanValue();
647
log.error("unrecognized value for configuration key \"" +
648
Bisocket.IS_CALLBACK_SERVER + "\": " + o);
653
o = configuration.get(Bisocket.PING_FREQUENCY);
654
if (o instanceof String && ((String) o).length() > 0)
658
pingFrequency = Integer.valueOf(((String) o)).intValue();
659
log.debug(this + " setting pingFrequency to " + pingFrequency);
661
catch (NumberFormatException e)
663
log.warn("Invalid format for " + "\"" + Bisocket.PING_FREQUENCY + "\": " + o);
668
log.warn("\"" + Bisocket.PING_FREQUENCY + "\" must be specified as a String");
671
o = configuration.get(Bisocket.PING_WINDOW_FACTOR);
672
if (o instanceof String && ((String) o).length() > 0)
676
pingWindowFactor = Integer.valueOf(((String) o)).intValue();
677
log.debug(this + " setting pingWindowFactor to " + pingWindowFactor);
679
catch (NumberFormatException e)
681
log.warn("Invalid format for " + "\"" + Bisocket.PING_WINDOW_FACTOR + "\": " + o);
686
log.warn("\"" + Bisocket.PING_WINDOW_FACTOR + "\" must be specified as a String");
689
pingWindow = pingWindowFactor * pingFrequency;
691
o = configuration.get(Bisocket.SECONDARY_BIND_PORTS);
692
if (o instanceof String && ((String) o).length() > 0)
694
setSecondaryBindPorts((String) o);
696
else if (o instanceof List)
698
setSecondaryBindPorts((List) o);
702
log.warn("\"" + Bisocket.SECONDARY_BIND_PORTS + "\" must be specified as a String or a List");
705
o = configuration.get(Bisocket.SECONDARY_CONNECT_PORTS);
706
if (o instanceof String && ((String) o).length() > 0)
708
setSecondaryConnectPorts((String) o);
710
else if (o instanceof List)
712
setSecondaryConnectPorts((List) o);
716
log.warn("\"" + Bisocket.SECONDARY_CONNECT_PORTS + "\" must be specified as a String or a List");
719
if (secondaryBindPorts.isEmpty())
721
for (int i = 0; i < homes.size(); i++)
722
secondaryBindPorts.add(new Integer(-1));
725
if (secondaryConnectPorts.isEmpty())
727
secondaryConnectPorts = new ArrayList(secondaryBindPorts);
730
if (isCallbackServer)
732
socketFactory = createSocketFactory(configuration);
737
protected void cleanup()
739
synchronized (controlConnectionThreadMap)
741
Iterator it = controlConnectionThreadMap.values().iterator();
744
ControlConnectionThread t = (ControlConnectionThread) it.next();
752
if (controlMonitorTimerTask != null)
753
controlMonitorTimerTask.shutdown();
755
Iterator it = secondaryServerSocketThreads.iterator();
758
SecondaryServerSocketThread t = (SecondaryServerSocketThread) it.next();
762
it = secondaryServerSockets.iterator();
767
ServerSocket ss = (ServerSocket) it.next();
770
catch (IOException e)
772
log.info("Error closing secondary server socket: " + e.getMessage());
776
secondaryBindPorts.clear();
777
secondaryConnectPorts.clear();
781
protected InvokerLocator getSecondaryLocator()
783
return secondaryLocator;
787
protected void checkSocketFactoryWrapper() throws IOException
790
Object o = configuration.get(Remoting.SOCKET_CREATION_SERVER_LISTENER);
793
if (o instanceof SocketCreationListener)
795
SocketCreationListener listener = (SocketCreationListener) o;
796
if (socketFactory instanceof CreationListenerSocketFactory)
798
CreationListenerSocketFactory clsf = (CreationListenerSocketFactory) socketFactory;
799
clsf.setListener(listener);
803
socketFactory = new CreationListenerSocketFactory(socketFactory, listener);
808
log.error("socket creation listener of invalid type: " + o);
813
if (socketFactory instanceof CreationListenerSocketFactory)
815
CreationListenerSocketFactory clsf = (CreationListenerSocketFactory) socketFactory;
816
socketFactory = clsf.getFactory();
822
protected ServerSocket checkSecondaryServerSocketWrapper(ServerSocket secondaryServerSocket) throws IOException
824
Object o = configuration.get(Remoting.SOCKET_CREATION_CLIENT_LISTENER);
827
if (o instanceof SocketCreationListener)
829
SocketCreationListener listener = (SocketCreationListener) o;
830
if (secondaryServerSocket instanceof CreationListenerServerSocket)
832
CreationListenerServerSocket clss = (CreationListenerServerSocket) secondaryServerSocket;
833
clss.setListener(listener);
837
secondaryServerSocket = new CreationListenerServerSocket(secondaryServerSocket, listener);
842
log.error("socket creation listener of invalid type: " + o);
847
if (secondaryServerSocket instanceof CreationListenerServerSocket)
849
CreationListenerServerSocket clss = (CreationListenerServerSocket) secondaryServerSocket;
850
secondaryServerSocket = clss.getServerSocket();
854
return secondaryServerSocket;
858
protected Object handleInternalInvocation(InternalInvocation ii,
859
InvocationRequest ir,
860
ServerInvocationHandler handler)
863
if(Bisocket.GET_SECONDARY_INVOKER_LOCATOR.equals(ii.getMethodName()))
865
return secondaryLocator;
868
Object response = super.handleInternalInvocation(ii, ir, handler);
870
if(InternalInvocation.ADDCLIENTLISTENER.equals(ii.getMethodName()))
872
Map metadata = ir.getRequestPayload();
875
String listenerId = (String) metadata.get(Client.LISTENER_ID_KEY);
876
if (listenerId != null)
878
listenerIdToServerInvokerMap.put(listenerId, this);
882
else if(InternalInvocation.REMOVECLIENTLISTENER.equals(ii.getMethodName()))
884
Map metadata = ir.getRequestPayload();
887
String listenerId = (String) metadata.get(Client.LISTENER_ID_KEY);
888
if (listenerId != null)
890
listenerIdToServerInvokerMap.remove(listenerId);
891
BisocketClientInvoker.removeBisocketClientInvoker(listenerId);
892
destroyControlConnection(listenerId);
901
class ControlConnectionThread extends Thread
903
private static final int MAX_INITIAL_ATTEMPTS = 5;
904
private Socket controlSocket;
905
private String listenerId;
906
private DataInputStream dis;
907
private boolean running;
908
private int errorCount;
909
private long lastPing = -1;
910
private int initialAttempts;
912
ControlConnectionThread(Socket socket, String listenerId) throws IOException
914
controlSocket = socket;
915
this.listenerId = listenerId;
916
dis = new DataInputStream(socket.getInputStream());
925
controlSocket.close();
927
catch (IOException e)
929
log.warn("unable to close controlSocket");
934
boolean checkConnection()
936
if (lastPing < 0 && initialAttempts++ < MAX_INITIAL_ATTEMPTS)
940
else if (lastPing < 0)
945
long currentTime = System.currentTimeMillis();
947
if (log.isTraceEnabled())
949
log.trace("elapsed: " + (currentTime - lastPing));
951
return (currentTime - lastPing <= pingWindow);
954
String getListenerId()
964
Socket socket = null;
968
int action = dis.read();
969
lastPing = System.currentTimeMillis();
973
case Bisocket.CREATE_ORDINARY_SOCKET:
974
InvokerLocator locator = (InvokerLocator) listenerIdToInvokerLocatorMap.get(listenerId);
976
IOException savedException = null;
977
final String finalHost = locator.getHost();
978
final int finalPort = locator.getPort();
980
for (int i = 0; i < socketCreationRetries; i++)
984
socket = (Socket) AccessController.doPrivileged( new PrivilegedExceptionAction()
986
public Object run() throws Exception
989
if (socketFactory != null)
990
s = socketFactory.createSocket(finalHost, finalPort);
992
s = new Socket(finalHost, finalPort);
997
catch (PrivilegedActionException e)
999
IOException ioe = (IOException) e.getCause();
1000
log.debug("Error creating a socket", ioe);
1001
savedException = ioe;
1011
catch (InterruptedException e)
1015
log.debug("received unexpected interrupt");
1027
log.error("Unable to create socket after " + socketCreationRetries
1028
+ " retries", savedException);
1032
DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
1033
dos.write(Bisocket.CREATE_ORDINARY_SOCKET);
1034
dos.writeUTF(listenerId);
1045
log.error("unrecognized action on ControlConnectionThread (" +
1046
listenerId + "): " + action);
1050
catch (IOException e)
1054
if ("Socket closed".equalsIgnoreCase(e.getMessage()) ||
1055
"Socket is closed".equalsIgnoreCase(e.getMessage()) ||
1056
"Connection reset".equalsIgnoreCase(e.getMessage()))
1061
log.error("Unable to process control connection: " + e.getMessage(), e);
1062
if (++errorCount > 5)
1080
processInvocation(socket);
1084
log.error("Unable to create new ServerThread: " + e.getMessage(), e);
1091
class SecondaryServerSocketThread extends Thread
1093
private ServerSocket secondaryServerSocket;
1094
boolean running = true;
1096
SecondaryServerSocketThread(ServerSocket secondaryServerSocket) throws IOException
1098
this.secondaryServerSocket = secondaryServerSocket;
1113
Socket socket = null;
1116
socket = (Socket)AccessController.doPrivileged( new PrivilegedExceptionAction()
1118
public Object run() throws Exception
1120
return secondaryServerSocket.accept();
1124
catch (PrivilegedActionException e)
1126
throw (IOException) e.getCause();
1129
if (log.isTraceEnabled()) log.trace("accepted: " + socket);
1130
DataInputStream dis = new DataInputStream(socket.getInputStream());
1131
int action = dis.read();
1132
String listenerId = dis.readUTF();
1136
case Bisocket.CREATE_CONTROL_SOCKET:
1137
BisocketClientInvoker.transferSocket(listenerId, socket, true);
1138
if (log.isTraceEnabled())
1139
log.trace("SecondaryServerSocketThread: created control socket: (" + socket + ")"+ listenerId);
1142
case Bisocket.RECREATE_CONTROL_SOCKET:
1143
BisocketClientInvoker invoker = BisocketClientInvoker.getBisocketCallbackClientInvoker(listenerId);
1144
if (invoker == null)
1146
log.debug("received new control socket for unrecognized listenerId: " + listenerId);
1150
invoker.replaceControlSocket(socket);
1151
if (log.isTraceEnabled())
1152
log.trace("SecondaryServerSocketThread: recreated control socket: " + listenerId);
1156
case Bisocket.CREATE_ORDINARY_SOCKET:
1157
BisocketClientInvoker.transferSocket(listenerId, socket, false);
1158
if (log.isTraceEnabled())
1159
log.trace("SecondaryServerSocketThread: transferred socket: " + listenerId);
1163
log.error("unrecognized action on SecondaryServerSocketThread: " + action);
1166
catch (IOException e)
1169
log.error("Failed to accept socket connection", e);
1177
ServerSocket getServerSocket()
1179
return secondaryServerSocket;
1184
static class ControlMonitorTimerTask extends TimerTask
1186
private boolean running = true;
1187
private BisocketServerInvoker invoker;
1188
private Map listenerIdToInvokerLocatorMap;
1189
private Map controlConnectionThreadMap;
1190
private Map controlConnectionRestartsMap;
1191
private int controlConnectionRestarts;
1193
ControlMonitorTimerTask(BisocketServerInvoker invoker)
1195
this.invoker = invoker;
1196
listenerIdToInvokerLocatorMap = invoker.listenerIdToInvokerLocatorMap;
1197
controlConnectionThreadMap = invoker.controlConnectionThreadMap;
1198
controlConnectionRestartsMap = invoker.controlConnectionRestartsMap;
1199
controlConnectionRestarts = invoker.controlConnectionRestarts;
1202
synchronized void shutdown()
1204
// Note that there is a race between shutdown() and run(). But if run()
1205
// were synchronized, then shutdown() could be held up waiting on network
1206
// i/o, including invocations on a server that no longer is accessible.
1207
// So only minimal synchronization is imposed on run(), enough to avoid
1208
// NullPointerExceptions.
1212
listenerIdToInvokerLocatorMap = null;
1213
controlConnectionThreadMap = null;
1218
Method purge = getDeclaredMethod(Timer.class, "purge", new Class[]{});
1219
purge.invoke(timer, new Object[]{});
1223
log.debug("running with jdk 1.4: unable to purge Timer");
1232
if (log.isTraceEnabled())
1233
log.trace("checking connections");
1235
Collection controlConnectionThreads = null;
1241
controlConnectionThreads = new HashSet(controlConnectionThreadMap.values());
1244
Iterator it = controlConnectionThreads.iterator();
1245
while (it.hasNext())
1247
final ControlConnectionThread t = (ControlConnectionThread) it.next();
1248
final String listenerId = t.getListenerId();
1249
final Object locator;
1256
locator = listenerIdToInvokerLocatorMap.get(listenerId);
1259
if (!t.checkConnection())
1268
controlConnectionThreadMap.remove(listenerId);
1269
Object o = controlConnectionRestartsMap.get(listenerId);
1270
int restarts = ((Integer)o).intValue();
1272
if (restarts + 1 > controlConnectionRestarts)
1274
log.warn(this + ": detected failure on control connection " + t);
1275
log.warn("Control connection " + listenerId + " has been recreated " + restarts + " times.");
1276
log.warn("Assuming it is a connection to an old server, and will not restart");
1277
controlConnectionRestartsMap.remove(listenerId);
1281
log.warn(this + ": detected failure on control connection " + t +
1283
": requesting new control connection");
1286
Thread t2 = new Thread()
1295
invoker.createControlConnection(listenerId, false);
1297
catch (ClientUnavailableException e)
1299
log.debug("Unable to recreate control connection: " + locator, e);
1301
catch (IOException e)
1304
log.error("Unable to recreate control connection: " + locator, e);
1306
log.debug("Unable to recreate control connection: " + locator, e);
1310
t2.setName("controlConnectionRecreate:" + t.getName());
1317
static class ClientUnavailableException extends IOException
1319
private static final long serialVersionUID = 2846502029152028732L;
1322
static private Method getDeclaredMethod(final Class c, final String name, final Class[] parameterTypes)
1323
throws NoSuchMethodException
1325
if (SecurityUtility.skipAccessControl())
1327
Method m = c.getDeclaredMethod(name, parameterTypes);
1328
m.setAccessible(true);
1334
return (Method) AccessController.doPrivileged( new PrivilegedExceptionAction()
1336
public Object run() throws NoSuchMethodException
1338
Method m = c.getDeclaredMethod(name, parameterTypes);
1339
m.setAccessible(true);
1344
catch (PrivilegedActionException e)
1346
throw (NoSuchMethodException) e.getCause();
b'\\ No newline at end of file'