2
* JBoss, Home of Professional Open Source
3
* Copyright 2005, JBoss Inc., and individual contributors as indicated
4
* by the @authors tag. See the copyright.txt in the distribution for a
5
* full listing of individual contributors.
7
* This is free software; you can redistribute it and/or modify it
8
* under the terms of the GNU Lesser General Public License as
9
* published by the Free Software Foundation; either version 2.1 of
10
* the License, or (at your option) any later version.
12
* This software is distributed in the hope that it will be useful,
13
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15
* Lesser General Public License for more details.
17
* You should have received a copy of the GNU Lesser General Public
18
* License along with this software; if not, write to the Free
19
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
23
package org.jboss.remoting.transport.bisocket;
25
import java.io.IOException;
26
import java.io.OutputStream;
27
import java.lang.reflect.Method;
28
import java.net.Socket;
29
import java.security.AccessController;
30
import java.security.PrivilegedActionException;
31
import java.security.PrivilegedExceptionAction;
32
import java.util.Collections;
33
import java.util.HashMap;
34
import java.util.HashSet;
35
import java.util.Iterator;
36
import java.util.LinkedList;
39
import java.util.Timer;
40
import java.util.TimerTask;
42
import org.jboss.logging.Logger;
43
import org.jboss.remoting.Client;
44
import org.jboss.remoting.ConnectionFailedException;
45
import org.jboss.remoting.InvocationRequest;
46
import org.jboss.remoting.InvokerLocator;
47
import org.jboss.remoting.invocation.InternalInvocation;
48
import org.jboss.remoting.marshal.Marshaller;
49
import org.jboss.remoting.marshal.UnMarshaller;
50
import org.jboss.remoting.transport.BidirectionalClientInvoker;
51
import org.jboss.remoting.transport.socket.SocketClientInvoker;
52
import org.jboss.remoting.transport.socket.SocketWrapper;
53
import org.jboss.remoting.util.SecurityUtility;
55
import EDU.oswego.cs.dl.util.concurrent.Semaphore;
58
* The bisocket transport, an extension of the socket transport, is designed to allow
59
* a callback server to function behind a firewall. All connections are created by
60
* a Socket constructor or factory on the client side connecting to a ServerSocket on
61
* the server side. When a callback client invoker on the server side needs to
62
* open a connection to the callback server, it requests a connection by sending a
63
* request message over a control connection to the client side.
65
* Because all connections are created in one direction, the bisocket transport is
66
* asymmetric, in the sense that client invokers and server invokers behave differently
67
* on the client side and on the server side.
71
* @author <a href="mailto:ron.sigal@jboss.com">Ron Sigal</a>
73
public class BisocketClientInvoker
74
extends SocketClientInvoker
75
implements BidirectionalClientInvoker
77
private static final Logger log = Logger.getLogger(BisocketClientInvoker.class);
78
private static Map listenerIdToClientInvokerMap = Collections.synchronizedMap(new HashMap());
79
private static Map listenerIdToCallbackClientInvokerMap = Collections.synchronizedMap(new HashMap());
80
private static Map listenerIdToSocketsMap = new HashMap();
81
private static Map listenerIdToControlSocketsMap = new HashMap();
82
private static Timer timer;
83
private static Object timerLock = new Object();
85
protected String listenerId;
87
private int pingFrequency = Bisocket.PING_FREQUENCY_DEFAULT;
88
private int pingWindowFactor = Bisocket.PING_WINDOW_FACTOR_DEFAULT;
89
private int pingWindow = pingWindowFactor * pingFrequency;
90
private int maxRetries = Bisocket.MAX_RETRIES_DEFAULT;
91
private Socket controlSocket;
92
private OutputStream controlOutputStream;
93
private Object controlLock = new Object();
94
private PingTimerTask pingTimerTask;
95
protected boolean isCallbackInvoker;
96
protected BooleanHolder pingFailed = new BooleanHolder(false);
103
static BisocketClientInvoker getBisocketClientInvoker(String listenerId)
105
return (BisocketClientInvoker) listenerIdToClientInvokerMap.get(listenerId);
109
static BisocketClientInvoker getBisocketCallbackClientInvoker(String listenerId)
111
return (BisocketClientInvoker) listenerIdToCallbackClientInvokerMap.get(listenerId);
115
static void removeBisocketClientInvoker(String listenerId)
117
listenerIdToClientInvokerMap.remove(listenerId);
121
static void transferSocket(String listenerId, Socket socket, boolean isControlSocket)
127
synchronized (listenerIdToControlSocketsMap)
129
sockets = (Set) listenerIdToControlSocketsMap.get(listenerId);
132
sockets = new HashSet();
133
listenerIdToControlSocketsMap.put(listenerId, sockets);
139
synchronized (listenerIdToSocketsMap)
141
sockets = (Set) listenerIdToSocketsMap.get(listenerId);
144
sockets = new HashSet();
145
listenerIdToSocketsMap.put(listenerId, sockets);
150
synchronized (sockets)
158
public BisocketClientInvoker(InvokerLocator locator) throws IOException
164
public BisocketClientInvoker(InvokerLocator locator, Map config) throws IOException
166
super(locator, config);
168
if (configuration != null)
170
listenerId = (String) configuration.get(Client.LISTENER_ID_KEY);
171
if (listenerId != null)
173
isCallbackInvoker = true;
174
listenerIdToCallbackClientInvokerMap.put(listenerId, this);
175
log.debug(this + " :registered " + listenerId + " -> " + this);
178
// look for pingFrequency param
179
Object val = configuration.get(Bisocket.PING_FREQUENCY);
184
int nVal = Integer.valueOf((String) val).intValue();
185
pingFrequency = nVal;
186
log.debug("Setting ping frequency to: " + pingFrequency);
190
log.warn("Could not convert " + Bisocket.PING_FREQUENCY +
191
" value of " + val + " to an int value.");
195
val = configuration.get(Bisocket.PING_WINDOW_FACTOR);
196
if (val != null && val instanceof String && ((String) val).length() > 0)
200
pingWindowFactor = Integer.valueOf(((String) val)).intValue();
201
log.debug(this + " setting pingWindowFactor to " + pingWindowFactor);
203
catch (NumberFormatException e)
205
log.warn("Invalid format for " + "\"" + Bisocket.PING_WINDOW_FACTOR + "\": " + val);
208
else if (val != null)
210
log.warn("\"" + Bisocket.PING_WINDOW_FACTOR + "\" must be specified as a String");
213
pingWindow = pingWindowFactor * pingFrequency;
215
val = configuration.get(Bisocket.MAX_RETRIES);
220
int nVal = Integer.valueOf((String) val).intValue();
222
log.debug("Setting retry limit: " + maxRetries);
226
log.warn("Could not convert " + Bisocket.MAX_RETRIES +
227
" value of " + val + " to an int value.");
233
public int getMaxRetries()
239
public void setMaxRetries(int maxRetries)
241
this.maxRetries = maxRetries;
245
public int getPingFrequency()
247
return pingFrequency;
251
public void setPingFrequency(int pingFrequency)
253
this.pingFrequency = pingFrequency;
257
public int getPingWindowFactor()
259
return pingWindowFactor;
263
public void setPingWindowFactor(int pingWindowFactor)
265
this.pingWindowFactor = pingWindowFactor;
266
pingWindow = pingWindowFactor * pingFrequency;
270
protected void handleConnect() throws ConnectionFailedException
272
// Callback client on server side.
273
if (isCallbackInvoker)
277
synchronized (listenerIdToControlSocketsMap)
279
sockets = (Set) listenerIdToControlSocketsMap.get(listenerId);
282
sockets = new HashSet();
283
listenerIdToControlSocketsMap.put(listenerId, sockets);
287
synchronized (sockets)
289
if (sockets.isEmpty())
292
long start = System.currentTimeMillis();
294
while (timeout == 0 || wait > 0)
301
catch (InterruptedException e)
303
log.debug("unexpected interrupt");
305
wait = timeout - (System.currentTimeMillis() - start);
310
if (sockets.isEmpty())
311
throw new ConnectionFailedException("Timed out trying to create control socket");
313
Iterator it = sockets.iterator();
314
controlSocket = (Socket) it.next();
318
controlOutputStream = controlSocket.getOutputStream();
320
catch (IOException e1)
322
throw new ConnectionFailedException("Unable to get control socket output stream");
324
log.debug("got control socket( " + listenerId + "): " + controlSocket);
326
if (pingFrequency > 0)
328
pingTimerTask = new PingTimerTask(this);
330
synchronized (timerLock)
334
timer = new Timer(true);
338
timer.schedule(pingTimerTask, pingFrequency, pingFrequency);
340
catch (IllegalStateException e)
342
log.debug("Unable to schedule TimerTask on existing Timer", e);
343
timer = new Timer(true);
344
timer.schedule(pingTimerTask, pingFrequency, pingFrequency);
350
// Bisocket callback client invoker doesn't share socket pools because of the danger
351
// that two distinct callback servers could have the same "artifical" port.
352
pool = new LinkedList();
353
log.debug("Creating semaphore with size " + maxPoolSize);
354
semaphore = new Semaphore(maxPoolSize);
358
// Client on client side.
359
super.handleConnect();
363
protected void handleDisconnect()
365
if (listenerId != null)
367
if (isCallbackInvoker)
369
if (controlSocket != null)
373
controlSocket.close();
375
catch (IOException e)
377
log.debug("unable to close control socket: " + controlSocket);
381
listenerIdToCallbackClientInvokerMap.remove(listenerId);
382
for (Iterator it = pool.iterator(); it.hasNext();)
384
SocketWrapper socketWrapper = (SocketWrapper) it.next();
387
socketWrapper.close();
389
catch (Exception ignored)
396
listenerIdToClientInvokerMap.remove(listenerId);
397
super.handleDisconnect();
400
synchronized (listenerIdToControlSocketsMap)
402
listenerIdToControlSocketsMap.remove(listenerId);
406
synchronized (listenerIdToSocketsMap)
408
sockets = (Set) listenerIdToSocketsMap.remove(listenerId);
411
// Wake up any threads blocked in createSocket().
414
synchronized (sockets)
420
if (pingTimerTask != null)
421
pingTimerTask.shutDown();
425
super.handleDisconnect();
430
protected Object transport(String sessionId, Object invocation, Map metadata,
431
Marshaller marshaller, UnMarshaller unmarshaller)
432
throws IOException, ConnectionFailedException, ClassNotFoundException
434
String listenerId = null;
435
if (invocation instanceof InvocationRequest)
437
InvocationRequest ir = (InvocationRequest) invocation;
438
Object o = ir.getParameter();
439
if (o instanceof InternalInvocation)
441
InternalInvocation ii = (InternalInvocation) o;
442
if (InternalInvocation.ADDLISTENER.equals(ii.getMethodName())
443
&& ir.getLocator() != null) // getLocator() == null for pull callbacks
445
Map requestPayload = ir.getRequestPayload();
446
listenerId = (String) requestPayload.get(Client.LISTENER_ID_KEY);
447
listenerIdToClientInvokerMap.put(listenerId, this);
448
BisocketServerInvoker callbackServerInvoker;
449
callbackServerInvoker = BisocketServerInvoker.getBisocketServerInvoker(listenerId);
450
callbackServerInvoker.createControlConnection(listenerId, true);
453
// Rather than handle the REMOVELISTENER case symmetrically, it is
454
// handled when a REMOVECLIENTLISTENER message is received by
455
// BisocketServerInvoker.handleInternalInvocation(). The reason is that
456
// if the Client executes removeListener() with disconnectTimeout == 0,
457
// no REMOVELISTENER message will be sent.
461
return super.transport(sessionId, invocation, metadata, marshaller, unmarshaller);
465
protected Socket createSocket(String address, int port, int timeout) throws IOException
467
if (!isCallbackInvoker)
468
return super.createSocket(address, port, timeout);
472
timeout = getTimeout();
479
synchronized (listenerIdToSocketsMap)
481
sockets = (Set) listenerIdToSocketsMap.get(listenerId);
485
sockets = new HashSet();
486
listenerIdToSocketsMap.put(listenerId, sockets);
490
synchronized (controlLock)
492
if (log.isTraceEnabled()) log.trace(this + " writing Bisocket.CREATE_ORDINARY_SOCKET on " + controlOutputStream);
495
controlOutputStream.write(Bisocket.CREATE_ORDINARY_SOCKET);
496
if (log.isTraceEnabled()) log.trace(this + " wrote Bisocket.CREATE_ORDINARY_SOCKET");
498
synchronized (sockets)
500
if (!sockets.isEmpty())
502
Iterator it = sockets.iterator();
503
Socket socket = (Socket) it.next();
505
log.debug(this + " found socket (" + listenerId + "): " + socket);
510
catch (IOException e)
512
log.debug(this + " unable to write Bisocket.CREATE_ORDINARY_SOCKET", e);
516
long timeRemaining = timeout;
517
long pingFailedWindow = 2 * pingWindow;
518
long pingFailedTimeRemaining = pingFailedWindow;
519
long start = System.currentTimeMillis();
520
OutputStream savedControlOutputStream = controlOutputStream;
522
while (isConnected() && (!pingFailed.flag || pingFailedTimeRemaining > 0) && (timeout == 0 || timeRemaining > 0))
524
synchronized (sockets)
530
catch (InterruptedException e)
532
log.debug(this + " unexpected interrupt");
535
if (!sockets.isEmpty())
537
Iterator it = sockets.iterator();
538
Socket socket = (Socket) it.next();
540
log.debug(this + " found socket (" + listenerId + "): " + socket);
545
if (savedControlOutputStream != controlOutputStream)
547
savedControlOutputStream = controlOutputStream;
548
log.debug(this + " rewriting Bisocket.CREATE_ORDINARY_SOCKET on " + controlOutputStream);
551
controlOutputStream.write(Bisocket.CREATE_ORDINARY_SOCKET);
552
log.debug(this + " rewrote Bisocket.CREATE_ORDINARY_SOCKET");
554
catch (IOException e)
556
log.debug(this + " unable to rewrite Bisocket.CREATE_ORDINARY_SOCKET" + e.getMessage());
560
long elapsed = System.currentTimeMillis() - start;
562
timeRemaining = timeout - elapsed;
563
pingFailedTimeRemaining = pingFailedWindow - elapsed;
568
throw new IOException("Connection is closed");
573
throw new IOException("Unable to create socket");
576
throw new IOException("Timed out trying to create socket");
580
void replaceControlSocket(Socket socket) throws IOException
582
synchronized (controlLock)
584
if (controlSocket != null)
586
controlSocket.close();
589
log.debug(this + " replacing control socket: " + controlSocket);
590
controlSocket = socket;
591
log.debug(this + " control socket replaced by: " + socket);
592
controlOutputStream = controlSocket.getOutputStream();
593
log.debug("controlOutputStream replaced by: " + controlOutputStream);
596
if (pingTimerTask != null)
597
pingTimerTask.cancel();
599
if (pingFrequency > 0)
601
pingTimerTask = new PingTimerTask(this);
603
synchronized (timerLock)
607
timer = new Timer(true);
611
timer.schedule(pingTimerTask, pingFrequency, pingFrequency);
613
catch (IllegalStateException e)
615
log.debug("Unable to schedule TimerTask on existing Timer", e);
616
timer = new Timer(true);
617
timer.schedule(pingTimerTask, pingFrequency, pingFrequency);
624
InvokerLocator getSecondaryLocator() throws Throwable
626
InternalInvocation ii = new InternalInvocation(Bisocket.GET_SECONDARY_INVOKER_LOCATOR, null);
627
InvocationRequest r = new InvocationRequest(null, null, ii, null, null, null);
628
log.debug("getting secondary locator");
629
Exception savedException = null;
631
for (int i = 0; i < maxRetries; i++)
635
Object o = invoke(r);
636
log.debug("secondary locator: " + o);
637
return (InvokerLocator) o;
642
log.debug("unable to get secondary locator: trying again");
646
throw savedException;
650
public InvokerLocator getCallbackLocator(Map metadata)
652
String transport = (String) metadata.get(Client.CALLBACK_SERVER_PROTOCOL);
653
String host = (String) metadata.get(Client.CALLBACK_SERVER_HOST);
654
String sPort = (String) metadata.get(Client.CALLBACK_SERVER_PORT);
660
port = Integer.parseInt(sPort);
662
catch (NumberFormatException e)
664
throw new RuntimeException("Can not set internal callback server port as configuration value (" + sPort + " is not a number.");
668
return new InvokerLocator(transport, host, port, "callback", metadata);
672
static class PingTimerTask extends TimerTask
674
private Object controlLock;
675
private OutputStream controlOutputStream;
676
private int maxRetries;
677
private Exception savedException;
678
private boolean running = true;
679
private boolean pingSent;
680
private BooleanHolder pingFailed;
682
PingTimerTask(BisocketClientInvoker invoker)
684
controlLock = invoker.controlLock;
685
controlOutputStream = invoker.controlOutputStream;
686
maxRetries = invoker.getMaxRetries();
687
pingFailed = invoker.pingFailed;
688
pingFailed.flag = false;
691
public void shutDown()
693
synchronized (controlLock)
695
controlOutputStream = null;
701
Method purge = getDeclaredMethod(Timer.class, "purge", new Class[]{});
702
purge.invoke(timer, new Object[]{});
706
log.debug("running with jdk 1.4: unable to purge Timer");
714
for (int i = 0; i < maxRetries; i++)
718
synchronized (controlLock)
723
controlOutputStream.write(Bisocket.PING);
731
log.debug("Unable to send ping: trying again");
740
log.warn("Unable to send ping: shutting down PingTimerTask", savedException);
741
pingFailed.flag = true;
747
static class BooleanHolder
751
public BooleanHolder(boolean flag)
757
static private Method getDeclaredMethod(final Class c, final String name, final Class[] parameterTypes)
758
throws NoSuchMethodException
760
if (SecurityUtility.skipAccessControl())
762
Method m = c.getDeclaredMethod(name, parameterTypes);
763
m.setAccessible(true);
769
return (Method) AccessController.doPrivileged( new PrivilegedExceptionAction()
771
public Object run() throws NoSuchMethodException
773
Method m = c.getDeclaredMethod(name, parameterTypes);
774
m.setAccessible(true);
779
catch (PrivilegedActionException e)
781
throw (NoSuchMethodException) e.getCause();
b'\\ No newline at end of file'