2
* JBoss, Home of Professional Open Source
3
* Copyright 2005, JBoss Inc., and individual contributors as indicated
4
* by the @authors tag. See the copyright.txt in the distribution for a
5
* full listing of individual contributors.
7
* This is free software; you can redistribute it and/or modify it
8
* under the terms of the GNU Lesser General Public License as
9
* published by the Free Software Foundation; either version 2.1 of
10
* the License, or (at your option) any later version.
12
* This software is distributed in the hope that it will be useful,
13
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15
* Lesser General Public License for more details.
17
* You should have received a copy of the GNU Lesser General Public
18
* License along with this software; if not, write to the Free
19
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
23
package org.jboss.remoting.transport.multiplex;
25
import org.jboss.logging.Logger;
26
import org.jboss.remoting.transport.multiplex.utility.StoppableThread;
28
import java.io.BufferedInputStream;
29
import java.io.EOFException;
30
import java.io.IOException;
31
import java.io.InputStream;
32
import java.io.OutputStream;
33
import java.net.Socket;
34
import java.nio.ByteBuffer;
35
import java.nio.channels.ClosedSelectorException;
36
import java.nio.channels.SelectableChannel;
37
import java.nio.channels.SelectionKey;
38
import java.nio.channels.Selector;
39
import java.nio.channels.SocketChannel;
40
import java.util.Collections;
41
import java.util.HashMap;
42
import java.util.HashSet;
43
import java.util.Iterator;
47
import javax.net.ssl.SSLException;
50
* <code>InputMultiplexor</code> is one of the key Multiplex classes, responsible for
51
* demultiplexing multiple byte streams sharing a single TCP connection. It has two
52
* inner classes which can perform this function. <code>MultiGroupInputThread</code> can perform
53
* demultiplexing for any number of NIO sockets, taking advantage of the <code>Selector</code>
54
* facility. For non-NIO sockets, notably SSL sockets, <code>SingleGroupInputThread</code>
55
* handles demultiplexing for a single socket.
57
* The data stream, created at the other end of the TCP connection by the
58
* <code>OutputMultiplexor</code> class, consists of a sequence of packets, each consisting of
59
* a header, giving version, destination virtual socket, and number of bytes. followed
60
* by the specified number of data bytes. (See <code>OutputMultiplexor</code> for the
62
* Each of the demultiplexing thread classes reads a header and transfers the
63
* following bytes to the input stream of the target virtual socket.
68
* @author <a href="mailto:r.sigal@computer.org">Ron Sigal</a>
70
* @deprecated As of release 2.4.0 the multiplex transport will no longer be actively supported.
72
public class InputMultiplexor
74
protected static final Logger log = Logger.getLogger(InputMultiplexor.class);
75
private static final int HEADER_LENGTH = 7;
77
private int bufferSize;
78
private int maxErrors;
81
public InputMultiplexor(Map configuration)
84
= Multiplex.getOneParameter(configuration,
86
Multiplex.INPUT_BUFFER_SIZE,
87
Multiplex.INPUT_BUFFER_SIZE_DEFAULT);
90
= Multiplex.getOneParameter(configuration,
92
Multiplex.INPUT_MAX_ERRORS,
93
Multiplex.INPUT_MAX_ERRORS_DEFAULT);
98
* Returns a <code>MultiGroupInputThread</code> designed to handle multiple virtual socket groups.
99
* @param configuration
100
* @return a <code>MultiGroupInputThread</code> designed to handle multiple virtual socket groups
102
public MultiGroupInputThread getaMultiGroupInputThread() throws IOException
104
return new MultiGroupInputThread();
109
* Returns a <code>SingleGroupInputThread</code> designed to handle a single virtual socket group.
110
* @return a <code>SingleGroupInputThread</code> designed to handle a single virtual socket group
112
public SingleGroupInputThread getaSingleGroupInputThread(MultiplexingManager manager, Socket socket, OutputStream os) throws IOException
114
return new SingleGroupInputThread(manager, socket, os);
118
public class MultiGroupInputThread extends StoppableThread
120
private static final String errMsg1 = "An existing connection was forcibly closed by the remote host";
121
private static final String errMsg2 = "An established connection was aborted by the software in your host machine";
123
private Map managerProcessorMap;
124
private Set socketGroupsToBeRegistered = new HashSet();
125
private Set tempSocketGroupSet = new HashSet();
126
private boolean socketGroupsAreWaiting;
127
private Selector selector;
128
private ByteBuffer buffer;
131
private boolean trace;
132
private boolean debug;
133
private boolean info;
136
public MultiGroupInputThread() throws IOException
138
managerProcessorMap = Collections.synchronizedMap(new HashMap());
139
selector = Selector.open();
140
buffer = ByteBuffer.allocate(bufferSize);
141
data = new byte[bufferSize];
143
trace = log.isTraceEnabled();
144
debug = log.isDebugEnabled();
145
info = log.isInfoEnabled();
150
* Registers manager and socket with NIO Selector
151
* @param manager <code>MultiplexingManager</code>
153
* @throws <code>IOException</code>
155
public void registerSocketGroup(MultiplexingManager manager) throws IOException
157
if (debug) log.debug(" accepting socket group for registration: " + manager);
159
synchronized (socketGroupsToBeRegistered)
161
socketGroupsToBeRegistered.add(manager);
162
socketGroupsAreWaiting = true;
167
protected void doRegistration()
169
tempSocketGroupSet.clear();
170
synchronized(socketGroupsToBeRegistered)
172
tempSocketGroupSet.addAll(socketGroupsToBeRegistered);
173
socketGroupsToBeRegistered.clear();
174
socketGroupsAreWaiting = false;
177
Iterator it = tempSocketGroupSet.iterator();
180
MultiplexingManager manager = (MultiplexingManager) it.next();
181
GroupProcessor groupProcessor = new GroupProcessor(manager);
182
SelectableChannel channel = manager.getSocket().getChannel();
186
SelectionKey key = channel.register(selector, SelectionKey.OP_READ, groupProcessor);
187
groupProcessor.setKey(key);
188
managerProcessorMap.put(manager, groupProcessor);
190
catch (IOException e)
192
// channel might be closed.
200
* Removes references to virtual socket group.
203
public void unregisterSocketGroup(MultiplexingManager manager)
205
// Leave GroupProcessor in Map until SelectionKey is cancelled.
206
GroupProcessor groupProcessor = (GroupProcessor) managerProcessorMap.get(manager);
207
if(groupProcessor == null)
209
log.debug("attempting to unregister unknown MultiplexingManager: " + manager);
213
SelectionKey key = groupProcessor.getKey();
215
managerProcessorMap.remove(manager);
216
if (debug) log.debug("unregistered socket group:" + manager);
220
public void shutdown()
222
// in case thread is still reading
228
catch (IOException e)
230
log.error("unable to close selector", e);
236
protected void doInit()
238
log.debug("MultiGroupInputThread thread starting");
242
protected void doRun()
244
log.debug("entering doRun()");
254
if (socketGroupsAreWaiting)
257
selector.select(200);
258
keys = selector.selectedKeys();
264
catch (IOException e)
268
catch (ClosedSelectorException e)
270
log.info("Selector is closed: shutting down input thread");
277
log.trace("keys: " + selector.keys().size());
278
log.trace("selected keys: " + keys.size());
281
Iterator it = keys.iterator();
284
SelectionKey key = (SelectionKey) it.next();
286
GroupProcessor groupProcessor = (GroupProcessor) key.attachment();
288
if (groupProcessor == null)
291
log.error("valid SelectionKey has no attachment: " + key);
296
groupProcessor.processChannel(key);
301
protected void doShutDown()
303
log.debug("MultiGroupInputThread shutting down");
310
private byte[] b = new byte[HEADER_LENGTH];
311
private int headerCount;
312
private byte version;
313
private int destination;
316
private MultiplexingManager manager;
317
private OutputStream outputStream;
318
private SelectionKey key;
319
private int errorCount;
322
public GroupProcessor(MultiplexingManager manager)
324
this.manager = manager;
327
public void processChannel(SelectionKey key)
329
log.debug("processChannel()");
330
SocketChannel channel = (SocketChannel) key.channel();
335
if (channel.read(buffer) < 0)
336
throw new EOFException();
341
log.debug("read: " + buffer.remaining());
343
while (buffer.hasRemaining())
345
if (headerCount < HEADER_LENGTH || size == 0)
347
// then prepare to process next virtual stream.
348
completeHeader(buffer);
350
if (headerCount < HEADER_LENGTH)
353
SocketId socketId = new SocketId(destination);
354
outputStream = manager.getOutputStreamByLocalSocket(socketId);
355
if (outputStream == null)
357
// We'll get an OutputStream to stash these bytes, just in case they
358
// are coming from a valid source and the local VirtualSocket is still
360
log.info("unknown socket id: " + destination);
361
outputStream = manager.getConnectedOutputStream(socketId);
364
if (!buffer.hasRemaining())
368
int n = Math.min(size, buffer.remaining());
369
buffer.get(data, 0, n);
370
outputStream.write(data, 0, n);
374
log.trace("received " + n + " bytes for socket: " + destination);
375
for (int i = 0; i < n; i++)
376
log.trace("" + (0xff & data[i]));
384
catch (IOException e)
386
handleChannelException(e, key, channel);
390
log.error("doRun()");
395
public SelectionKey getKey()
400
public void setKey(SelectionKey key)
405
private void completeHeader(ByteBuffer bb) throws IOException
407
int n = Math.min(bb.remaining(), HEADER_LENGTH - headerCount);
408
bb.get(b, headerCount, n);
411
if (headerCount == HEADER_LENGTH)
414
destination = (b[1] << 24) | (0x00ff0000 & (b[2] << 16)) |
415
(0x0000ff00 & (b[3] << 8)) | (0x000000ff & b[4]);
416
size = (short) ((0x0000ff00 & (b[5] << 8)) | (0x000000ff & b[6]));
419
if (size < 0 || bufferSize < size)
420
throw new CorruptedStreamException("invalid chunk size read on: " + manager + ": "+ size);
423
throw new CorruptedStreamException("invalid version read on: " + manager + ": " + version);
427
private void handleChannelException(IOException e, SelectionKey key, SocketChannel channel)
431
if (!channel.isOpen())
437
if (e instanceof EOFException)
445
if (e instanceof SSLException)
452
if (++errorCount > maxErrors)
454
manager.setReadException(e);
458
log.error("error count exceeds max errors: " + errorCount);
462
Socket socket = channel.socket();
463
String message = e.getMessage();
465
if (socket.isClosed() || socket.isInputShutdown() ||
466
errMsg1.equals(message) || errMsg2.equals(message) ||
467
e instanceof CorruptedStreamException)
469
manager.setReadException(e);
476
// Haven't reached maxErrors yet
479
catch (IOException e2)
481
log.error("problem closing channel: " + manager, e2);
485
public int getDestination() {return destination;}
486
public short getSize() {return size;}
487
public byte getVersion() {return version;}
488
public OutputStream getOutputStream() {return outputStream;}
493
class SingleGroupInputThread extends StoppableThread
495
private InputStream is;
496
private OutputStream currentOutputStream;
497
private byte[] dataBytes = new byte[bufferSize];
498
private MultiplexingManager manager;
499
private int dataInCount = 0;
500
private int errorCount;
504
private byte[] headerBytes = new byte[HEADER_LENGTH];
505
private int headerCount;
506
private byte version;
507
private int destination;
510
private boolean trace;
511
private boolean debug;
512
private boolean info;
515
public SingleGroupInputThread(MultiplexingManager manager, Socket socket, OutputStream os)
518
this.is = new BufferedInputStream(socket.getInputStream());
519
this.manager = manager;
520
currentOutputStream = os;
522
trace = log.isTraceEnabled();
523
debug = log.isDebugEnabled();
524
info = log.isInfoEnabled();
528
public void shutdown()
530
// in case thread is still reading
532
log.info("interrupting input thread");
540
protected void doInit()
542
log.debug("SingleGroupInputThread thread starting");
549
protected void doRun()
554
if (!completeHeader())
560
SocketId socketId = new SocketId(destination);
561
currentOutputStream = manager.getOutputStreamByLocalSocket(socketId);
562
if (currentOutputStream == null)
564
// We'll get an OutputStream to stash these bytes, just in case they
565
// are coming from a valid source and the local VirtualSocket is still
567
log.info("unknown socket id: " + destination);
568
currentOutputStream = manager.getConnectedOutputStream(socketId);
572
while (bytesRead < size)
574
int n = is.read(dataBytes, 0, size - bytesRead);
581
currentOutputStream.write(dataBytes, 0, n);
586
for (int i = 0; i < n; i++)
587
log.trace("" + dataBytes[i]);
591
catch (SSLException e)
593
log.debug(e.getMessage());
595
catch (EOFException e)
598
log.info("end of file");
600
catch (IOException e)
602
if (++errorCount > maxErrors)
604
manager.setReadException(e);
622
private boolean completeHeader() throws IOException
624
while (headerCount < HEADER_LENGTH)
626
int n = is.read(headerBytes, headerCount, HEADER_LENGTH - headerCount);
635
// Reset for next header.
638
version = headerBytes[0];
639
destination = (headerBytes[1] << 24) | (0x00ff0000 & (headerBytes[2] << 16)) |
640
(0x0000ff00 & (headerBytes[3] << 8)) | (0x000000ff & headerBytes[4]);
641
size = (short) ((0x0000ff00 & (headerBytes[5] << 8)) | (0x000000ff & headerBytes[6]));
645
log.trace("version: " + version);
646
log.trace("destination: " + destination);
647
log.trace("size: " + size);
650
if (size < 0 || bufferSize < size)
651
throw new CorruptedStreamException("invalid chunk size read on: " + manager + ": "+ size);
654
throw new CorruptedStreamException("invalid version read on: " + manager + ": " + version);
660
protected void doShutDown()
662
log.debug("input thread: data bytes read: " + dataInCount);
663
log.debug("input thread shutting down");
668
private static class CorruptedStreamException extends IOException
670
CorruptedStreamException(String message) {super(message);}
b'\\ No newline at end of file'