~ubuntu-branches/ubuntu/raring/libjboss-remoting-java/raring

« back to all changes in this revision

Viewing changes to src/main/org/jboss/remoting/transport/multiplex/utility/GrowablePipedInputStream.java

  • Committer: Package Import Robot
  • Author(s): Torsten Werner
  • Date: 2011-09-09 14:01:03 UTC
  • mfrom: (1.1.6 upstream)
  • Revision ID: package-import@ubuntu.com-20110909140103-hqokx61534tas9rg
Tags: 2.5.3.SP1-1
* Newer but not newest upstream release. Do not build samples.
* Change debian/watch to upstream's svn repo.
* Add patch to fix compile error caused by tomcat update.
  (Closes: #628303)
* Switch to source format 3.0.
* Switch to debhelper level 7.
* Remove useless Depends.
* Update Standards-Version: 3.9.2.
* Update README.source.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
* JBoss, Home of Professional Open Source
 
3
* Copyright 2005, JBoss Inc., and individual contributors as indicated
 
4
* by the @authors tag. See the copyright.txt in the distribution for a
 
5
* full listing of individual contributors.
 
6
*
 
7
* This is free software; you can redistribute it and/or modify it
 
8
* under the terms of the GNU Lesser General Public License as
 
9
* published by the Free Software Foundation; either version 2.1 of
 
10
* the License, or (at your option) any later version.
 
11
*
 
12
* This software is distributed in the hope that it will be useful,
 
13
* but WITHOUT ANY WARRANTY; without even the implied warranty of
 
14
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
 
15
* Lesser General Public License for more details.
 
16
*
 
17
* You should have received a copy of the GNU Lesser General Public
 
18
* License along with this software; if not, write to the Free
 
19
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
 
20
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
 
21
*/
 
22
 
 
23
/*
 
24
 * Created on Dec 15, 2005
 
25
 */
 
26
 
 
27
package org.jboss.remoting.transport.multiplex.utility;
 
28
 
 
29
import java.io.IOException;
 
30
import java.io.InputStream;
 
31
import java.io.InterruptedIOException;
 
32
import java.net.SocketTimeoutException;
 
33
 
 
34
import org.jboss.logging.Logger;
 
35
 
 
36
 
 
37
/**
 
38
 * <code>GrowablePipedInputStream</code> is the parent of the
 
39
 * <code>MultiplexingInputStream</code> returned by
 
40
 * <code>VirtualSocket.getInputStream()</code>.  <code>GrowablePipedInputStream</code> and
 
41
 * <code>GrowablePipedOutputStream</code> work together like <code>java.io.PipedInputStream</code>
 
42
 * and <code>java.io.PipedOutputStream</code>, so that
 
43
 * calling <code>GrowablePipedOutputStream.write()</code> causes bytes to be deposited with the
 
44
 * matching <code>GrowablePipedInputStream</code>.  However, unlike <code>PipedInputStream</code>,
 
45
 * <code>GrowablePipedInputStream</code> stores bytes in a 
 
46
 * <code>ShrinkableByteArrayOutputStream</code>, which
 
47
 * can grow and contract dynamically in response to the number of bytes it contains.
 
48
 *
 
49
 * <p>
 
50
 * For more information about method behavior, see the <code>java.io.InputStream</code> javadoc.
 
51
 * <p>
 
52
 * @author <a href="mailto:r.sigal@computer.org">Ron Sigal</a>
 
53
 * @version $Revision: 3443 $
 
54
 * <p>
 
55
 * Copyright (c) 2005
 
56
 * </p>
 
57
 * 
 
58
 * @deprecated As of release 2.4.0 the multiplex transport will no longer be actively supported.
 
59
 */
 
60
 
 
61
public class GrowablePipedInputStream extends InputStream
 
62
{
 
63
   protected static final Logger log = Logger.getLogger(GrowablePipedInputStream.class);
 
64
   private GrowablePipedOutputStream source;
 
65
   private ShrinkableByteArrayOutputStream baos = new ShrinkableByteArrayOutputStream();
 
66
   private VirtualSelector virtualSelector;
 
67
   private boolean connected;
 
68
   private int timeout;
 
69
 
 
70
   
 
71
/**
 
72
    * Create a new <code>GrowablePipedInputStream</code>.
 
73
    */
 
74
   public GrowablePipedInputStream()
 
75
   {
 
76
   }
 
77
   
 
78
/**
 
79
 * Create a new <code>GrowablePipedInputStream</code>.
 
80
 * @param virtualSelector 
 
81
 */
 
82
   public GrowablePipedInputStream(VirtualSelector virtualSelector)
 
83
   {
 
84
      this.virtualSelector = virtualSelector;
 
85
   }
 
86
 
 
87
 
 
88
/**
 
89
 * Create a new <code>GrowablePipedInputStream</code>.
 
90
 * @param src
 
91
 * 
 
92
 * @throws java.io.IOException
 
93
 */
 
94
   public GrowablePipedInputStream(GrowablePipedOutputStream source) throws IOException
 
95
   {
 
96
      this.source = source;
 
97
      source.connect(this);
 
98
      connected = true;
 
99
   }
 
100
   
 
101
      
 
102
/**
 
103
 * Create a new <code>GrowablePipedInputStream</code>.
 
104
 * @param virtualSelector 
 
105
 * @param src
 
106
 * 
 
107
 * @throws java.io.IOException
 
108
 */
 
109
   public GrowablePipedInputStream(GrowablePipedOutputStream source, VirtualSelector virtualSelector) throws IOException
 
110
   {
 
111
      this.source = source;
 
112
      this.virtualSelector = virtualSelector;
 
113
      source.connect(this);
 
114
      connected = true;
 
115
   }
 
116
   
 
117
 
 
118
   public synchronized int available()
 
119
   {
 
120
      return baos.available();
 
121
   }
 
122
   
 
123
   
 
124
   public void close() throws IOException
 
125
   {
 
126
      super.close();
 
127
      if (virtualSelector != null)
 
128
         virtualSelector.unregister(this);
 
129
   }
 
130
   
 
131
   
 
132
   public int getTimeout()
 
133
   {
 
134
      return timeout;
 
135
   }
 
136
   
 
137
   
 
138
   public synchronized int read() throws IOException
 
139
   {
 
140
      if (!connected)
 
141
         throw new IOException("Pipe not connected");
 
142
      
 
143
      if (baos.available() == 0)
 
144
      {
 
145
         long start = System.currentTimeMillis();
 
146
         
 
147
         while (true)
 
148
         {  
 
149
            try
 
150
            {
 
151
               log.trace(this + ": entering wait()");
 
152
               wait(timeout);
 
153
               log.trace("leaving wait()");
 
154
               
 
155
               if (baos.available() > 0)
 
156
                  break;
 
157
               
 
158
               if (0 < timeout && timeout <= System.currentTimeMillis() - start)
 
159
                  throw new SocketTimeoutException("Read timed out");
 
160
            }
 
161
            catch (InterruptedException ignored)
 
162
            {
 
163
               log.debug("interrupted");
 
164
               throw new InterruptedIOException();
 
165
            }
 
166
         }
 
167
      }
 
168
      
 
169
      byte[] bytes = baos.toByteArray(1);
 
170
      int answer =  0xff & bytes[baos.start()];
 
171
      
 
172
      if (baos.available() > 0)
 
173
         notify();
 
174
      
 
175
      return answer;
 
176
   }
 
177
   
 
178
   
 
179
   public synchronized int read(byte[] bytes) throws IOException
 
180
   {
 
181
      return read(bytes, 0, bytes.length);
 
182
   }
 
183
   
 
184
   
 
185
   public synchronized int read(byte[] bytes, int offset, int length) throws IOException
 
186
   {
 
187
      if (!connected)
 
188
         throw new IOException("Pipe not connected");
 
189
 
 
190
      if (baos.available() == 0)
 
191
      {
 
192
         long start = System.currentTimeMillis();
 
193
         
 
194
         while (true)
 
195
         {  
 
196
            try
 
197
            {
 
198
               log.trace(this + ": entering wait()");
 
199
               wait(timeout);
 
200
               log.trace("leaving wait()");
 
201
               
 
202
               if (baos.available() > 0)
 
203
                  break;
 
204
               
 
205
               if (0 < timeout && timeout <= System.currentTimeMillis() - start)
 
206
                  throw new SocketTimeoutException("Read timed out");
 
207
            }
 
208
            catch (InterruptedException ignored)
 
209
            {
 
210
               log.debug("interrupted");
 
211
               throw new InterruptedIOException();
 
212
            }
 
213
         }
 
214
      }
 
215
      
 
216
      byte[] localBytes = baos.toByteArray(length);
 
217
      int from = baos.start();
 
218
      int n = baos.bytesReturned();
 
219
      System.arraycopy(localBytes, from, bytes, offset, n);
 
220
      
 
221
      if (baos.available() > 0)
 
222
         notify();
 
223
      
 
224
      return n;
 
225
   }
 
226
   
 
227
   
 
228
   public void register(VirtualSelector virtualSelector, Object attachment)
 
229
   {
 
230
      this.virtualSelector = virtualSelector;
 
231
      virtualSelector.register(this, attachment);
 
232
   }
 
233
   
 
234
   
 
235
   public void setTimeout(int timeout)
 
236
   {
 
237
      this.timeout = timeout;
 
238
   }
 
239
   
 
240
   
 
241
   protected void connect(GrowablePipedOutputStream source) throws IOException
 
242
   {
 
243
      if (source == null)
 
244
         throw new NullPointerException();
 
245
      
 
246
      if (source.isConnected())
 
247
         throw new IOException("Already connected");
 
248
      
 
249
      this.source = source;
 
250
      connected = true;
 
251
   }
 
252
   
 
253
   
 
254
   protected boolean isConnected()
 
255
   {
 
256
      return connected;
 
257
   }
 
258
   
 
259
   
 
260
   protected void receive(int i) throws IOException
 
261
   {
 
262
      log.trace("entering receive()");
 
263
      synchronized (this)
 
264
      {
 
265
         baos.write(i);
 
266
         notify();
 
267
      }
 
268
      
 
269
      if (virtualSelector != null)
 
270
         virtualSelector.addToReadyInputStreams(this);
 
271
   }
 
272
   
 
273
   
 
274
   protected void receive(byte[] bytes) throws IOException
 
275
   {
 
276
      log.trace("entering receive()");
 
277
      synchronized (this)
 
278
      {
 
279
         baos.write(bytes);
 
280
         notify();
 
281
      }
 
282
      
 
283
      if (virtualSelector != null)
 
284
         virtualSelector.addToReadyInputStreams(this);
 
285
   }
 
286
   
 
287
   
 
288
   protected void receive(byte[] bytes, int offset, int length) throws IOException
 
289
   {
 
290
      log.trace(this + ": entering receive()");
 
291
      synchronized (this)
 
292
      {
 
293
         baos.write(bytes, offset, length);
 
294
         log.trace(this + ": notifying");
 
295
         notify();
 
296
      }
 
297
      
 
298
      if (virtualSelector != null)
 
299
         virtualSelector.addToReadyInputStreams(this);
 
300
   }
 
301
}
 
 
b'\\ No newline at end of file'