2
* JBoss, Home of Professional Open Source
3
* Copyright 2005, JBoss Inc., and individual contributors as indicated
4
* by the @authors tag. See the copyright.txt in the distribution for a
5
* full listing of individual contributors.
7
* This is free software; you can redistribute it and/or modify it
8
* under the terms of the GNU Lesser General Public License as
9
* published by the Free Software Foundation; either version 2.1 of
10
* the License, or (at your option) any later version.
12
* This software is distributed in the hope that it will be useful,
13
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15
* Lesser General Public License for more details.
17
* You should have received a copy of the GNU Lesser General Public
18
* License along with this software; if not, write to the Free
19
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
23
package org.jboss.remoting.transport.multiplex;
25
import java.io.EOFException;
26
import java.io.IOException;
27
import java.net.SocketException;
28
import java.util.HashSet;
29
import java.util.Iterator;
32
import org.jboss.logging.Logger;
33
import org.jboss.remoting.transport.multiplex.utility.GrowablePipedInputStream;
34
import org.jboss.remoting.transport.multiplex.utility.GrowablePipedOutputStream;
35
import org.jboss.remoting.transport.multiplex.utility.VirtualSelector;
38
* <code>MultiplexingInputStream</code> is the class returned by
39
* <code>VirtualSocket.getInputStream()</code>.
40
* It supports the methods and behavior implemented by the <code>InputStream</code> returned by
41
* <code>java.net.Socket.getInputStream()</code>. For more information about the behavior
42
* of the methods, see the javadoc for <code>java.io.InputStream</code>.
46
* @author <a href="mailto:r.sigal@computer.org">Ron Sigal</a>
48
* @deprecated As of release 2.4.0 the multiplex transport will no longer be actively supported.
50
public class MultiplexingInputStream extends GrowablePipedInputStream
52
protected static final Logger log = Logger.getLogger(MultiplexingInputStream.class);
53
private VirtualSocket socket;
54
private boolean eof = false;
55
private boolean closed = false;
56
private boolean remoteShutDownPending = false;
57
private Set readingThreads = new HashSet();
58
private IOException readException;
59
private long skipCount = 0;
60
private boolean tracing;
67
public MultiplexingInputStream(GrowablePipedOutputStream sourceStream, MultiplexingManager manager)
70
this(sourceStream, manager, null, null);
80
public MultiplexingInputStream(GrowablePipedOutputStream sourceStream,
81
MultiplexingManager manager,
85
this(sourceStream, manager, socket, null);
93
* @param virtualSelector
96
public MultiplexingInputStream(GrowablePipedOutputStream sourceStream,
97
MultiplexingManager manager,
99
VirtualSelector virtualSelector)
102
super(sourceStream, virtualSelector);
103
this.socket = socket;
104
tracing = log.isTraceEnabled();
108
//////////////////////////////////////////////////////////////////////////////////////////////////
109
/// The following methods are required of all InputStreams '///
110
//////////////////////////////////////////////////////////////////////////////////////////////////
112
/*************************************************************************************************
113
ok: public int read() throws IOException;
114
ok: public int read(byte b[]) throws IOException;
115
ok: public int read(byte b[], int off, int len) throws IOException;
116
ok: public long skip(long n) throws IOException;
117
ok: public int available() throws IOException;
118
ok: public void close() throws IOException;
119
ok: public void mark(int readlimit);
120
ok: public void reset() throws IOException;
121
ok: public boolean markSupported();
122
*************************************************************************************************/
125
* See superclass javadoc.
127
public void close() throws IOException
132
log.debug("MultiplexingInputStream closing");
139
// If a thread is currently in read(), interrupt it.
140
interruptReadingThreads();
145
* See superclass javadoc.
147
public synchronized int read() throws IOException
153
throw new SocketException("Socket closed");
155
if (readException != null)
163
// We leave a reference to the current thread so that close() and handleRemoteShutdown()
164
// can interrupt it if necessary.
165
readingThreads.add(Thread.currentThread());
166
int b = super.read();
167
readingThreads.remove(Thread.currentThread());
170
log.trace("read(): super.read() returned: " + b);
172
if (remoteShutDownPending && available() == 0)
177
catch (IOException e)
179
readingThreads.remove(Thread.currentThread());
182
throw new SocketException("Socket closed");
187
if (readException != null)
196
* See superclass javadoc.
198
public int read(byte[] bytes) throws IOException
200
return read(bytes, 0, bytes.length);
205
* See superclass javadoc.
207
public synchronized int read(byte[] bytes, int off, int len) throws IOException
209
log.trace("entering read()");
215
throw new SocketException("Socket closed");
217
if (readException != null)
225
// We leave a reference to the current thread so that handleRemoteShutdown() can
226
// interrupt it if necessary.
227
readingThreads.add(Thread.currentThread());
228
int n = super.read(bytes, off, len);
229
readingThreads.remove(Thread.currentThread());
232
log.trace("super.read() returned " + n + " bytes: "
233
+ "[" + (0xff & bytes[off]) + ".." + (0xff & bytes[off+n-1]) + "]");
235
if (remoteShutDownPending && available() == 0)
240
catch (IOException e)
242
readingThreads.remove(Thread.currentThread());
248
throw new SocketException("Socket closed");
256
* See superclass javadoc.
258
public synchronized long skip(long n) throws IOException
264
throw new SocketException("Socket closed");
266
if (readException != null)
276
readingThreads.add(Thread.currentThread());
278
while (skipped < n && (skipped == 0 || available() > 0))
286
readingThreads.remove(Thread.currentThread());
288
if (remoteShutDownPending && available() == 0)
293
catch (IOException e)
295
readingThreads.remove(Thread.currentThread());
301
throw new SocketException("Socket closed");
308
//////////////////////////////////////////////////////////////////////////////////////////////////
309
/// The following methods are specific to MultiplexingInputStream '///
310
//////////////////////////////////////////////////////////////////////////////////////////////////
315
protected VirtualSocket getSocket()
322
* <code>handleRemoteShutdown()</code> is responsible for informing the <code>MultiplexingInputStream</code>
323
* that no more bytes will be coming from the remote <code>MultiplexingOutputStream</code> to which
324
* it is connected, because <code>shutdownOutput()</code> or <code>close()</code> has been called on the
325
* remote <code>VirtualSocket</code>. The result is that once all bytes sent by the remote socket have
326
* been consumed, all subsequent calls to read() will return -1 and all subsequent calls to skip() will
327
* return 0, indicating end of file has been reached.
329
protected synchronized void handleRemoteShutdown() throws IOException
332
* handleRemoteShutdown() needs to handle two cases correctly:
334
* Case 1. all bytes transmitted by the remote MultiplexingOutputStream have been consumed by the time
335
* handleRemoteShutdown() executes, and
337
* Case 2. not all bytes transmitted by the remote MultiplexingOutputStream have been consumed
338
* by the time handleRemoteShutdown() executes..
340
* Correctness argument:
344
* The bracketing facility implemented by OutputMultiplexor guarantees that all bytes
345
* transmitted by the remote MultiplexingOutputStream will arrive and be stored in this
346
* MultiplexingInputStream before the protocol message arrives that leads to
347
* handleRemoteShutdown() being called. Therefore, if available() == 0 is true upon entering
348
* handleRemoteShutdown(), all transmitted bytes have been consumed and it is correct to indicate
349
* that this MultiplexingInputStream is at end of file.
351
* Case 1a. No threads are currently in read() or skip():
353
* Calling setEOF() will guarantee that all subsequent calls to read() will return -1 and all
354
* subsequent calls to skip() will return 0.
356
* Case 1b. One or more threads are currently in read() or skip():
358
* Since all read() methods, skip(), and handleRemoteShutdown() are synchronized, the only way
359
* handleRemoteShutdown() can be executing is if all of the threads in read() and skip() are
360
* blocked on the wait() call in super.read(). Then all of the blocked threads are referenced in
361
* the Set readingThreads, and calling interruptReadingThreads will guarantee that they are
362
* interrupted, which will lead to their throwing an InterruptedIOException. Moreover, calling
363
* setEOF() will guarantee that all such threads will see eof == true in the exception and
364
* will return -1. If any of the threads made the call to read() by way of skip(), then the
365
* condition (skipped == 0 || available() > 0) was true when read() was called, and since we
366
* are assumeing available() == 0, then skipped == 0 must have been true. Therefore, when
367
* it gets -1 from the call to read(), skip() will return 0. Finally, calling setEOF() in
368
* handleRemoteShutdown() will guarantee that all subsequent calls to read() will return -1
369
* and all subsequent calls to skip() will return 0.
373
* Suppose, on the other hand, that available() == 0 is false. Then the only action taken by
374
* handleRemoteShutdown() is to set remoteShutdownPending to true, and as long as bytes are
375
* available, there is no obstacle to their being read or skipped.
377
* Fact. The last transmitted byte has been consumed if and only if available() == 0.
380
* The fact that handleRemoteShutdown() has been called implies that all bytes transmitted from the remote
381
* socket have already arrived and been stored in this MultiplexingInputStream, so the value returned
382
* by available() will decrease monotonically, and once available() == 0, the last transmitted byte has
386
* This direction is obvious.
388
* Now, if no thread ever requests the last available byte to be read or skipped, then all calls to read()
389
* or skip() following the call to handleRemoteShutdown() will execute with available() > 0, and there
390
* will be no impediment to their successful completion.
392
* Suppose, then, that some thread T makes a call to read() that retrieves the last available byte.
393
* Upon returning from super.read(), T will find (remoteShutdownPending && available() == 0) is true,
394
* and it will call setEOF(), which, by the Fact argued above, is a correct action. Since available()
395
* was > 0 when the T entered read(), T will never call wait() in super.read(), so no other thread will
396
* enter read() or skip() until T leaves read(). At that point any threads entering read() will find
397
* eof == true and will return -1, and any threads entering skip() will find eof == true and return 0.
399
* Finally, suppose that some thread T makes a call to skip() to skip the last available byte.
400
* T will eventually leave the while loop in skip() with available() == 0, and when it reaches
401
* the test for (remoteShutDownPending && available() == 0), it will call setEOF().
402
* Since available() was > 0 when the T entered skip(), T will never call wait() in super.read(),
403
* so no other thread will enter read() or skip() until T leaves skip(). At that point any threads
404
* entering read() will find eof == true and will return -1, and any threads entering skip() will
405
* find eof == true and return 0.
408
log.debug("entering handleRemoteShutdown()");
413
remoteShutDownPending = true;
415
if (available() == 0)
418
interruptReadingThreads();
421
log.debug("leaving handleRemoteShutdown()");
428
protected synchronized void interruptReadingThreads()
430
// If we obtained the lock, then either there are no threads in read() or skip(),
431
// or any such threads are blocked in super.read(), having executed wait().
432
Iterator it = readingThreads.iterator();
436
Thread t = (Thread) it.next();
444
* <code>readInt()</code> is borrowed from DataInputStream. It saves the extra expense of
445
* creating a DataInputStream
447
public final int readInt() throws IOException
454
if ((b1 | b2 | b3 | b4) < 0)
455
throw new EOFException();
457
return ((b1 << 24) + (b2 << 16) + (b3 << 8) + (b4 << 0));
463
protected void setEOF()
469
protected void setReadException(IOException e)
472
interruptReadingThreads();
478
protected synchronized void setSkip(long n)
485
* A MultiplexingInputStream may be created without reference to a VirtualSocket.
486
* (See MultiplexingManager.getAnOutputStream().) setSocket() allows the socket to
491
protected void setSocket(VirtualSocket socket)
493
this.socket = socket;
b'\\ No newline at end of file'