~ubuntu-branches/ubuntu/raring/libjboss-remoting-java/raring

« back to all changes in this revision

Viewing changes to src/main/org/jboss/remoting/transport/multiplex/MultiplexingInputStream.java

  • Committer: Package Import Robot
  • Author(s): Torsten Werner
  • Date: 2011-09-09 14:01:03 UTC
  • mfrom: (1.1.6 upstream)
  • Revision ID: package-import@ubuntu.com-20110909140103-hqokx61534tas9rg
Tags: 2.5.3.SP1-1
* Newer but not newest upstream release. Do not build samples.
* Change debian/watch to upstream's svn repo.
* Add patch to fix compile error caused by tomcat update.
  (Closes: #628303)
* Switch to source format 3.0.
* Switch to debhelper level 7.
* Remove useless Depends.
* Update Standards-Version: 3.9.2.
* Update README.source.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
* JBoss, Home of Professional Open Source
 
3
* Copyright 2005, JBoss Inc., and individual contributors as indicated
 
4
* by the @authors tag. See the copyright.txt in the distribution for a
 
5
* full listing of individual contributors.
 
6
*
 
7
* This is free software; you can redistribute it and/or modify it
 
8
* under the terms of the GNU Lesser General Public License as
 
9
* published by the Free Software Foundation; either version 2.1 of
 
10
* the License, or (at your option) any later version.
 
11
*
 
12
* This software is distributed in the hope that it will be useful,
 
13
* but WITHOUT ANY WARRANTY; without even the implied warranty of
 
14
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
 
15
* Lesser General Public License for more details.
 
16
*
 
17
* You should have received a copy of the GNU Lesser General Public
 
18
* License along with this software; if not, write to the Free
 
19
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
 
20
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
 
21
*/
 
22
 
 
23
package org.jboss.remoting.transport.multiplex;
 
24
 
 
25
import java.io.EOFException;
 
26
import java.io.IOException;
 
27
import java.net.SocketException;
 
28
import java.util.HashSet;
 
29
import java.util.Iterator;
 
30
import java.util.Set;
 
31
 
 
32
import org.jboss.logging.Logger;
 
33
import org.jboss.remoting.transport.multiplex.utility.GrowablePipedInputStream;
 
34
import org.jboss.remoting.transport.multiplex.utility.GrowablePipedOutputStream;
 
35
import org.jboss.remoting.transport.multiplex.utility.VirtualSelector;
 
36
 
 
37
/**
 
38
 * <code>MultiplexingInputStream</code> is the class returned by
 
39
 * <code>VirtualSocket.getInputStream()</code>.  
 
40
 * It supports the methods and behavior implemented by the <code>InputStream</code> returned by
 
41
 * <code>java.net.Socket.getInputStream()</code>.  For more information about the behavior
 
42
 * of the methods, see the javadoc for <code>java.io.InputStream</code>.
 
43
 * <p>
 
44
 * Copyright (c) 2005
 
45
 * <p>
 
46
 * @author <a href="mailto:r.sigal@computer.org">Ron Sigal</a>
 
47
 * 
 
48
 * @deprecated As of release 2.4.0 the multiplex transport will no longer be actively supported.
 
49
 */
 
50
public class MultiplexingInputStream extends GrowablePipedInputStream
 
51
{
 
52
   protected static final Logger log = Logger.getLogger(MultiplexingInputStream.class);
 
53
   private VirtualSocket socket;
 
54
   private boolean eof = false;
 
55
   private boolean closed = false;
 
56
   private boolean remoteShutDownPending = false;
 
57
   private Set readingThreads = new HashSet();
 
58
   private IOException readException;
 
59
   private long skipCount = 0;
 
60
   private boolean tracing;
 
61
 
 
62
   
 
63
/**
 
64
 * @param sourceStream 
 
65
 * @param manager 
 
66
 */
 
67
   public MultiplexingInputStream(GrowablePipedOutputStream sourceStream, MultiplexingManager manager)
 
68
   throws IOException
 
69
   {
 
70
      this(sourceStream, manager, null, null);
 
71
   }
 
72
   
 
73
   
 
74
/**
 
75
  * @param sourceStream
 
76
  * @param manager 
 
77
  * @param socket 
 
78
  * 
 
79
  */
 
80
   public MultiplexingInputStream(GrowablePipedOutputStream sourceStream,
 
81
                                  MultiplexingManager manager,
 
82
                                  VirtualSocket socket)
 
83
   throws IOException
 
84
   {
 
85
      this(sourceStream, manager, socket, null);
 
86
   }
 
87
   
 
88
      
 
89
/**
 
90
 * @param sourceStream
 
91
 * @param manager 
 
92
 * @param socket
 
93
 * @param virtualSelector 
 
94
 * 
 
95
 */
 
96
   public MultiplexingInputStream(GrowablePipedOutputStream sourceStream,
 
97
                                  MultiplexingManager manager,
 
98
                                  VirtualSocket socket,
 
99
                                  VirtualSelector virtualSelector)
 
100
   throws IOException
 
101
   {
 
102
      super(sourceStream, virtualSelector);
 
103
      this.socket = socket;
 
104
      tracing = log.isTraceEnabled();
 
105
   }
 
106
      
 
107
   
 
108
//////////////////////////////////////////////////////////////////////////////////////////////////
 
109
///                 The following methods are required of all InputStreams                    '///
 
110
//////////////////////////////////////////////////////////////////////////////////////////////////
 
111
 
 
112
/*************************************************************************************************
 
113
      ok: public int read() throws IOException;
 
114
      ok: public int read(byte b[]) throws IOException;
 
115
      ok: public int read(byte b[], int off, int len) throws IOException;
 
116
      ok: public long skip(long n) throws IOException;
 
117
      ok: public int available() throws IOException;
 
118
      ok: public void close() throws IOException;
 
119
      ok: public void mark(int readlimit);
 
120
      ok: public void reset() throws IOException;
 
121
      ok: public boolean markSupported();
 
122
*************************************************************************************************/
 
123
      
 
124
/**
 
125
 * See superclass javadoc.
 
126
 */
 
127
   public void close() throws IOException
 
128
   {
 
129
      if (closed)
 
130
         return;
 
131
      
 
132
      log.debug("MultiplexingInputStream closing");
 
133
      closed = true;
 
134
      super.close();
 
135
      
 
136
      if (socket != null)
 
137
         socket.close();
 
138
 
 
139
      // If a thread is currently in read(), interrupt it.
 
140
      interruptReadingThreads();
 
141
   }
 
142
 
 
143
   
 
144
/**
 
145
 * See superclass javadoc.
 
146
 */
 
147
   public synchronized int read() throws IOException
 
148
   {
 
149
      if (eof)
 
150
         return -1;
 
151
      
 
152
      if (closed)
 
153
         throw new SocketException("Socket closed");
 
154
      
 
155
      if (readException != null)
 
156
         throw readException;
 
157
   
 
158
      if (skipCount > 0)
 
159
         skip(skipCount);
 
160
 
 
161
      try
 
162
      {
 
163
         // We leave a reference to the current thread so that close() and handleRemoteShutdown()
 
164
         // can interrupt it if necessary.
 
165
         readingThreads.add(Thread.currentThread());
 
166
         int b = super.read();
 
167
         readingThreads.remove(Thread.currentThread());
 
168
         
 
169
         if (tracing)
 
170
            log.trace("read(): super.read() returned: " + b);
 
171
 
 
172
         if (remoteShutDownPending && available() == 0)
 
173
            setEOF();
 
174
            
 
175
         return b & 0xff;
 
176
      }
 
177
      catch (IOException e)
 
178
      {
 
179
         readingThreads.remove(Thread.currentThread());
 
180
         
 
181
         if (closed)
 
182
            throw new SocketException("Socket closed");
 
183
 
 
184
         if (eof)
 
185
            return -1;
 
186
         
 
187
         if (readException != null)
 
188
            throw readException;
 
189
 
 
190
         throw e;
 
191
      }
 
192
   }
 
193
   
 
194
   
 
195
/**
 
196
 * See superclass javadoc.
 
197
 */
 
198
   public int read(byte[] bytes) throws IOException
 
199
   {
 
200
      return read(bytes, 0, bytes.length);
 
201
   }
 
202
   
 
203
   
 
204
/**
 
205
 * See superclass javadoc.
 
206
 */
 
207
   public synchronized int read(byte[] bytes, int off, int len) throws IOException
 
208
   {      
 
209
      log.trace("entering read()");
 
210
      
 
211
      if (eof)
 
212
         return -1;
 
213
      
 
214
      if (closed)
 
215
         throw new SocketException("Socket closed");
 
216
      
 
217
      if (readException != null)
 
218
         throw readException;
 
219
 
 
220
      if (skipCount > 0)
 
221
         skip(skipCount);
 
222
      
 
223
      try
 
224
      {
 
225
         // We leave a reference to the current thread so that handleRemoteShutdown() can
 
226
         // interrupt it if necessary.
 
227
         readingThreads.add(Thread.currentThread());
 
228
         int n = super.read(bytes, off, len);
 
229
         readingThreads.remove(Thread.currentThread());
 
230
         
 
231
         if (tracing)
 
232
            log.trace("super.read() returned " + n + " bytes: "
 
233
                  + "[" + (0xff & bytes[off]) + ".." + (0xff & bytes[off+n-1]) + "]");
 
234
 
 
235
         if (remoteShutDownPending && available() == 0)
 
236
            setEOF();
 
237
         
 
238
         return n;
 
239
      }
 
240
      catch (IOException e)
 
241
      {
 
242
         readingThreads.remove(Thread.currentThread());
 
243
         
 
244
         if (eof)
 
245
            return -1;
 
246
         
 
247
         if (closed)
 
248
            throw new SocketException("Socket closed");
 
249
 
 
250
         throw e;
 
251
      }
 
252
   }
 
253
   
 
254
   
 
255
   /**
 
256
    * See superclass javadoc.
 
257
    */
 
258
   public synchronized long skip(long n) throws IOException
 
259
   {
 
260
      if (eof)
 
261
         return 0;
 
262
      
 
263
      if (closed)
 
264
         throw new SocketException("Socket closed");
 
265
      
 
266
      if (readException != null)
 
267
         throw readException;
 
268
 
 
269
      if (n <= 0)
 
270
         return 0;
 
271
      
 
272
      int skipped = 0;
 
273
      
 
274
      try
 
275
      {
 
276
         readingThreads.add(Thread.currentThread());
 
277
         
 
278
         while (skipped < n && (skipped == 0 || available() > 0))
 
279
         {
 
280
            if (read() == -1)
 
281
               break;
 
282
            
 
283
            skipped++;
 
284
         }
 
285
         
 
286
         readingThreads.remove(Thread.currentThread());
 
287
         
 
288
         if (remoteShutDownPending && available() == 0)
 
289
            setEOF();
 
290
         
 
291
         return skipped;
 
292
      }
 
293
      catch (IOException e)
 
294
      {
 
295
         readingThreads.remove(Thread.currentThread());
 
296
         
 
297
         if (eof)
 
298
            return -1;
 
299
         
 
300
         if (closed)
 
301
            throw new SocketException("Socket closed");
 
302
 
 
303
         throw e;
 
304
      }
 
305
   }
 
306
 
 
307
   
 
308
//////////////////////////////////////////////////////////////////////////////////////////////////
 
309
///              The following methods are specific to MultiplexingInputStream                '///
 
310
//////////////////////////////////////////////////////////////////////////////////////////////////
 
311
 
 
312
/**
 
313
 * 
 
314
 */
 
315
   protected VirtualSocket getSocket()
 
316
   {
 
317
      return socket;
 
318
   }
 
319
   
 
320
   
 
321
/**
 
322
 * <code>handleRemoteShutdown()</code> is responsible for informing the <code>MultiplexingInputStream</code>
 
323
 * that no more bytes will be coming from the remote <code>MultiplexingOutputStream</code> to which
 
324
 * it is connected, because <code>shutdownOutput()</code> or <code>close()</code> has been called on the 
 
325
 * remote <code>VirtualSocket</code>.  The result is that once all bytes sent by the remote socket have
 
326
 * been consumed, all subsequent calls to read() will return -1 and all subsequent calls to skip() will
 
327
 * return 0, indicating end of file has been reached.
 
328
 */
 
329
   protected synchronized void handleRemoteShutdown() throws IOException
 
330
   {
 
331
   /*
 
332
    * handleRemoteShutdown() needs to handle two cases correctly:
 
333
    *
 
334
    * Case 1. all bytes transmitted by the remote MultiplexingOutputStream have been consumed by the time
 
335
    *   handleRemoteShutdown() executes, and
 
336
    * 
 
337
    * Case 2. not all bytes transmitted by the remote MultiplexingOutputStream have been consumed
 
338
    *   by the time handleRemoteShutdown() executes..
 
339
    *
 
340
    * Correctness argument:
 
341
    * 
 
342
    * Case 1. 
 
343
    * 
 
344
    * The bracketing facility implemented by OutputMultiplexor guarantees that all bytes
 
345
    * transmitted by the remote MultiplexingOutputStream will arrive and be stored in this
 
346
    * MultiplexingInputStream before the protocol message arrives that leads to 
 
347
    * handleRemoteShutdown() being called.  Therefore, if available() == 0 is true upon entering
 
348
    * handleRemoteShutdown(), all transmitted bytes have been consumed and it is correct to indicate
 
349
    * that this MultiplexingInputStream is at end of file.  
 
350
    * 
 
351
    * Case 1a. No threads are currently in read() or skip():
 
352
    * 
 
353
    *   Calling setEOF() will guarantee that all subsequent calls to read() will return -1 and all
 
354
    *   subsequent calls to skip() will return 0.
 
355
    * 
 
356
    * Case 1b. One or more threads are currently in read() or skip():
 
357
    * 
 
358
    *   Since all read() methods, skip(), and handleRemoteShutdown() are synchronized, the only way
 
359
    *   handleRemoteShutdown() can be executing is if all of the threads in read() and skip() are
 
360
    *   blocked on the wait() call in super.read().  Then all of the blocked threads are referenced in
 
361
    *   the Set readingThreads, and calling interruptReadingThreads will guarantee that they are 
 
362
    *   interrupted, which will lead to their throwing an InterruptedIOException.  Moreover, calling
 
363
    *   setEOF() will guarantee that all such threads will see eof == true in the exception  and
 
364
    *   will return -1. If any of the threads made the call to read() by way of skip(), then the
 
365
    *   condition (skipped == 0 || available() > 0) was true when read() was called, and since we
 
366
    *   are assumeing available() == 0, then skipped == 0 must have been true.  Therefore, when 
 
367
    *   it gets -1 from the call to read(), skip() will return 0.  Finally, calling setEOF() in
 
368
    *   handleRemoteShutdown() will guarantee that all subsequent calls to read() will return -1
 
369
    *   and all subsequent calls to skip() will return 0.
 
370
    * 
 
371
    * Case 2.
 
372
    * 
 
373
    * Suppose, on the other hand, that available() == 0 is false.  Then the only action taken by
 
374
    * handleRemoteShutdown() is to set remoteShutdownPending to true, and as long as bytes are
 
375
    * available, there is no obstacle to their being read or skipped.
 
376
    * 
 
377
    * Fact.  The last transmitted byte has been consumed if and only if available() == 0.
 
378
    * 
 
379
    * (If): 
 
380
    * The fact that handleRemoteShutdown() has been called implies that all bytes transmitted from the remote
 
381
    * socket have already arrived and been stored in this MultiplexingInputStream, so the value returned
 
382
    * by available() will decrease monotonically, and  once available() == 0, the last transmitted byte has
 
383
    * been consumed.  
 
384
    * 
 
385
    * (Only if):
 
386
    * This direction is obvious.
 
387
    * 
 
388
    * Now, if no thread ever requests the last available byte to be read or skipped, then all calls to read()
 
389
    * or skip() following the call to handleRemoteShutdown() will execute with available() > 0, and there
 
390
    * will be no impediment to their successful completion.
 
391
    * 
 
392
    * Suppose, then, that some thread T makes a call to read() that retrieves the last available byte.
 
393
    * Upon returning from super.read(), T will find (remoteShutdownPending && available() == 0) is true,
 
394
    * and it will call setEOF(), which, by the Fact argued above, is a correct action. Since available()
 
395
    * was > 0 when the T entered read(), T will never call wait() in super.read(), so no other thread will
 
396
    * enter read() or skip() until T leaves read().  At that point any threads entering read() will find
 
397
    * eof == true and will return -1, and any threads entering skip() will find eof == true and return 0.
 
398
    *
 
399
    * Finally, suppose that some thread T makes a call to skip() to skip the last available byte.
 
400
    * T will eventually leave the while loop in skip() with available() == 0, and when it reaches
 
401
    * the test for (remoteShutDownPending && available() == 0), it will call setEOF().
 
402
    * Since available() was > 0 when the T entered skip(), T will never call wait() in super.read(),
 
403
    * so no other thread will enter read() or skip() until T leaves skip().  At that point any threads
 
404
    * entering read() will find eof == true and will return -1, and any threads entering skip() will
 
405
    * find eof == true and return 0.
 
406
    */
 
407
      
 
408
      log.debug("entering handleRemoteShutdown()");
 
409
      
 
410
      if (eof)
 
411
         return;
 
412
      
 
413
      remoteShutDownPending = true;
 
414
      
 
415
      if (available() == 0)
 
416
      {  
 
417
         setEOF();
 
418
         interruptReadingThreads();
 
419
      }
 
420
      
 
421
      log.debug("leaving handleRemoteShutdown()");
 
422
   }
 
423
 
 
424
 
 
425
/**
 
426
 *
 
427
 */
 
428
   protected synchronized void interruptReadingThreads()
 
429
   {  
 
430
      // If we obtained the lock, then either there are no threads in read() or skip(),
 
431
      // or any such threads are blocked in super.read(), having executed wait().
 
432
      Iterator it = readingThreads.iterator();
 
433
      
 
434
      while (it.hasNext())
 
435
      {
 
436
         Thread t = (Thread) it.next();
 
437
         it.remove();
 
438
         t.interrupt();
 
439
      }
 
440
   }
 
441
   
 
442
   
 
443
/**
 
444
 * <code>readInt()</code> is borrowed from DataInputStream.  It saves the extra expense of
 
445
 * creating a DataInputStream
 
446
 */
 
447
   public final int readInt() throws IOException
 
448
   {
 
449
      int b1 = read();
 
450
      int b2 = read();
 
451
      int b3 = read();
 
452
      int b4 = read();
 
453
      
 
454
      if ((b1 | b2 | b3 | b4) < 0)
 
455
          throw new EOFException();
 
456
      
 
457
      return ((b1 << 24) + (b2 << 16) + (b3 << 8) + (b4 << 0));
 
458
  }
 
459
   
 
460
   
 
461
/**
 
462
 */
 
463
   protected void setEOF()
 
464
   {
 
465
      eof = true;
 
466
   }
 
467
   
 
468
   
 
469
   protected void setReadException(IOException e)
 
470
   {
 
471
      readException = e;
 
472
      interruptReadingThreads();
 
473
   }
 
474
   
 
475
/**
 
476
 * @param n
 
477
 */
 
478
   protected synchronized void setSkip(long n)
 
479
   {
 
480
      skipCount += n;
 
481
   }
 
482
   
 
483
   
 
484
/**
 
485
 * A MultiplexingInputStream may be created without reference to a VirtualSocket.  
 
486
 * (See MultiplexingManager.getAnOutputStream().)  setSocket() allows the socket to 
 
487
 * be set afterwards.
 
488
 * 
 
489
 * @param socket
 
490
 */
 
491
   protected void setSocket(VirtualSocket socket)
 
492
   {
 
493
      this.socket = socket;
 
494
   }
 
495
}
 
 
b'\\ No newline at end of file'