2
* JBoss, Home of Professional Open Source
3
* Copyright 2005, JBoss Inc., and individual contributors as indicated
4
* by the @authors tag. See the copyright.txt in the distribution for a
5
* full listing of individual contributors.
7
* This is free software; you can redistribute it and/or modify it
8
* under the terms of the GNU Lesser General Public License as
9
* published by the Free Software Foundation; either version 2.1 of
10
* the License, or (at your option) any later version.
12
* This software is distributed in the hope that it will be useful,
13
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15
* Lesser General Public License for more details.
17
* You should have received a copy of the GNU Lesser General Public
18
* License along with this software; if not, write to the Free
19
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
24
* Created on Dec 15, 2005
27
package org.jboss.remoting.transport.multiplex.utility;
29
import java.io.IOException;
30
import java.io.InputStream;
31
import java.io.InterruptedIOException;
32
import java.net.SocketTimeoutException;
34
import org.jboss.logging.Logger;
38
* <code>GrowablePipedInputStream</code> is the parent of the
39
* <code>MultiplexingInputStream</code> returned by
40
* <code>VirtualSocket.getInputStream()</code>. <code>GrowablePipedInputStream</code> and
41
* <code>GrowablePipedOutputStream</code> work together like <code>java.io.PipedInputStream</code>
42
* and <code>java.io.PipedOutputStream</code>, so that
43
* calling <code>GrowablePipedOutputStream.write()</code> causes bytes to be deposited with the
44
* matching <code>GrowablePipedInputStream</code>. However, unlike <code>PipedInputStream</code>,
45
* <code>GrowablePipedInputStream</code> stores bytes in a
46
* <code>ShrinkableByteArrayOutputStream</code>, which
47
* can grow and contract dynamically in response to the number of bytes it contains.
50
* For more information about method behavior, see the <code>java.io.InputStream</code> javadoc.
52
* @author <a href="mailto:r.sigal@computer.org">Ron Sigal</a>
53
* @version $Revision: 3443 $
58
* @deprecated As of release 2.4.0 the multiplex transport will no longer be actively supported.
61
public class GrowablePipedInputStream extends InputStream
63
protected static final Logger log = Logger.getLogger(GrowablePipedInputStream.class);
64
private GrowablePipedOutputStream source;
65
private ShrinkableByteArrayOutputStream baos = new ShrinkableByteArrayOutputStream();
66
private VirtualSelector virtualSelector;
67
private boolean connected;
72
* Create a new <code>GrowablePipedInputStream</code>.
74
public GrowablePipedInputStream()
79
* Create a new <code>GrowablePipedInputStream</code>.
80
* @param virtualSelector
82
public GrowablePipedInputStream(VirtualSelector virtualSelector)
84
this.virtualSelector = virtualSelector;
89
* Create a new <code>GrowablePipedInputStream</code>.
92
* @throws java.io.IOException
94
public GrowablePipedInputStream(GrowablePipedOutputStream source) throws IOException
103
* Create a new <code>GrowablePipedInputStream</code>.
104
* @param virtualSelector
107
* @throws java.io.IOException
109
public GrowablePipedInputStream(GrowablePipedOutputStream source, VirtualSelector virtualSelector) throws IOException
111
this.source = source;
112
this.virtualSelector = virtualSelector;
113
source.connect(this);
118
public synchronized int available()
120
return baos.available();
124
public void close() throws IOException
127
if (virtualSelector != null)
128
virtualSelector.unregister(this);
132
public int getTimeout()
138
public synchronized int read() throws IOException
141
throw new IOException("Pipe not connected");
143
if (baos.available() == 0)
145
long start = System.currentTimeMillis();
151
log.trace(this + ": entering wait()");
153
log.trace("leaving wait()");
155
if (baos.available() > 0)
158
if (0 < timeout && timeout <= System.currentTimeMillis() - start)
159
throw new SocketTimeoutException("Read timed out");
161
catch (InterruptedException ignored)
163
log.debug("interrupted");
164
throw new InterruptedIOException();
169
byte[] bytes = baos.toByteArray(1);
170
int answer = 0xff & bytes[baos.start()];
172
if (baos.available() > 0)
179
public synchronized int read(byte[] bytes) throws IOException
181
return read(bytes, 0, bytes.length);
185
public synchronized int read(byte[] bytes, int offset, int length) throws IOException
188
throw new IOException("Pipe not connected");
190
if (baos.available() == 0)
192
long start = System.currentTimeMillis();
198
log.trace(this + ": entering wait()");
200
log.trace("leaving wait()");
202
if (baos.available() > 0)
205
if (0 < timeout && timeout <= System.currentTimeMillis() - start)
206
throw new SocketTimeoutException("Read timed out");
208
catch (InterruptedException ignored)
210
log.debug("interrupted");
211
throw new InterruptedIOException();
216
byte[] localBytes = baos.toByteArray(length);
217
int from = baos.start();
218
int n = baos.bytesReturned();
219
System.arraycopy(localBytes, from, bytes, offset, n);
221
if (baos.available() > 0)
228
public void register(VirtualSelector virtualSelector, Object attachment)
230
this.virtualSelector = virtualSelector;
231
virtualSelector.register(this, attachment);
235
public void setTimeout(int timeout)
237
this.timeout = timeout;
241
protected void connect(GrowablePipedOutputStream source) throws IOException
244
throw new NullPointerException();
246
if (source.isConnected())
247
throw new IOException("Already connected");
249
this.source = source;
254
protected boolean isConnected()
260
protected void receive(int i) throws IOException
262
log.trace("entering receive()");
269
if (virtualSelector != null)
270
virtualSelector.addToReadyInputStreams(this);
274
protected void receive(byte[] bytes) throws IOException
276
log.trace("entering receive()");
283
if (virtualSelector != null)
284
virtualSelector.addToReadyInputStreams(this);
288
protected void receive(byte[] bytes, int offset, int length) throws IOException
290
log.trace(this + ": entering receive()");
293
baos.write(bytes, offset, length);
294
log.trace(this + ": notifying");
298
if (virtualSelector != null)
299
virtualSelector.addToReadyInputStreams(this);
b'\\ No newline at end of file'