~ubuntu-branches/ubuntu/oneiric/libjboss-remoting-java/oneiric

« back to all changes in this revision

Viewing changes to src/org/jboss/remoting/transport/multiplex/InputMultiplexor.java

  • Committer: Package Import Robot
  • Author(s): Torsten Werner
  • Date: 2011-09-09 14:01:03 UTC
  • mfrom: (1.1.6 upstream)
  • Revision ID: package-import@ubuntu.com-20110909140103-hqokx61534tas9rg
Tags: 2.5.3.SP1-1
* Newer but not newest upstream release. Do not build samples.
* Change debian/watch to upstream's svn repo.
* Add patch to fix compile error caused by tomcat update.
  (Closes: #628303)
* Switch to source format 3.0.
* Switch to debhelper level 7.
* Remove useless Depends.
* Update Standards-Version: 3.9.2.
* Update README.source.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/*
2
 
 * JBoss, Home of Professional Open Source
3
 
 * Copyright 2005, JBoss Inc., and individual contributors as indicated
4
 
 * by the @authors tag. See the copyright.txt in the distribution for a
5
 
 * full listing of individual contributors.
6
 
 *
7
 
 * This is free software; you can redistribute it and/or modify it
8
 
 * under the terms of the GNU Lesser General Public License as
9
 
 * published by the Free Software Foundation; either version 2.1 of
10
 
 * the License, or (at your option) any later version.
11
 
 *
12
 
 * This software is distributed in the hope that it will be useful,
13
 
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14
 
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15
 
 * Lesser General Public License for more details.
16
 
 *
17
 
 * You should have received a copy of the GNU Lesser General Public
18
 
 * License along with this software; if not, write to the Free
19
 
 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20
 
 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21
 
 */
22
 
 
23
 
package org.jboss.remoting.transport.multiplex;
24
 
 
25
 
import org.jboss.logging.Logger;
26
 
import org.jboss.remoting.transport.multiplex.utility.StoppableThread;
27
 
 
28
 
import java.io.BufferedInputStream;
29
 
import java.io.EOFException;
30
 
import java.io.IOException;
31
 
import java.io.InputStream;
32
 
import java.io.OutputStream;
33
 
import java.net.Socket;
34
 
import java.nio.ByteBuffer;
35
 
import java.nio.channels.ClosedSelectorException;
36
 
import java.nio.channels.SelectableChannel;
37
 
import java.nio.channels.SelectionKey;
38
 
import java.nio.channels.Selector;
39
 
import java.nio.channels.SocketChannel;
40
 
import java.util.Collections;
41
 
import java.util.HashMap;
42
 
import java.util.HashSet;
43
 
import java.util.Iterator;
44
 
import java.util.Map;
45
 
import java.util.Set;
46
 
 
47
 
import javax.net.ssl.SSLException;
48
 
 
49
 
/**
50
 
 * <code>InputMultiplexor</code> is one of the key Multiplex classes, responsible for
51
 
 * demultiplexing multiple byte streams sharing a single TCP connection.  It has two
52
 
 * inner classes which can perform this function.  <code>MultiGroupInputThread</code> can perform
53
 
 * demultiplexing for any number of NIO sockets, taking advantage of the <code>Selector</code>
54
 
 * facility.  For non-NIO sockets, notably SSL sockets, <code>SingleGroupInputThread</code>
55
 
 * handles demultiplexing for a single socket.
56
 
 * <p>
57
 
 * The data stream, created at the other end of the TCP connection by the
58
 
 * <code>OutputMultiplexor</code> class, consists of a sequence of packets, each consisting of
59
 
 * a header, giving version, destination virtual socket, and number of bytes. followed
60
 
 * by the specified number of data bytes. (See <code>OutputMultiplexor</code> for the
61
 
 * header format.
62
 
 * Each of the demultiplexing thread classes reads a header and transfers the
63
 
 * following bytes to the input stream of the target virtual socket.
64
 
 *
65
 
 * <p>
66
 
 * Copyright (c) 2005
67
 
 * <p>
68
 
 * @author <a href="mailto:r.sigal@computer.org">Ron Sigal</a>
69
 
 * 
70
 
 * @deprecated As of release 2.4.0 the multiplex transport will no longer be actively supported.
71
 
 */
72
 
public class InputMultiplexor
73
 
{
74
 
   protected static final Logger log = Logger.getLogger(InputMultiplexor.class);
75
 
   private static final int HEADER_LENGTH = 7;
76
 
 
77
 
   private int bufferSize;
78
 
   private int maxErrors;
79
 
 
80
 
 
81
 
   public InputMultiplexor(Map configuration)
82
 
   {
83
 
      bufferSize
84
 
         = Multiplex.getOneParameter(configuration,
85
 
                                    "bufferSize",
86
 
                                    Multiplex.INPUT_BUFFER_SIZE,
87
 
                                    Multiplex.INPUT_BUFFER_SIZE_DEFAULT);
88
 
 
89
 
      maxErrors
90
 
         = Multiplex.getOneParameter(configuration,
91
 
                                     "maxErrors",
92
 
                                     Multiplex.INPUT_MAX_ERRORS,
93
 
                                     Multiplex.INPUT_MAX_ERRORS_DEFAULT);
94
 
   }
95
 
 
96
 
 
97
 
   /**
98
 
    * Returns a <code>MultiGroupInputThread</code> designed to handle multiple virtual socket groups.
99
 
    * @param configuration
100
 
    * @return a <code>MultiGroupInputThread</code> designed to handle multiple virtual socket groups
101
 
    */
102
 
   public MultiGroupInputThread getaMultiGroupInputThread() throws IOException
103
 
   {
104
 
      return new MultiGroupInputThread();
105
 
   }
106
 
 
107
 
 
108
 
   /**
109
 
    * Returns a <code>SingleGroupInputThread</code> designed to handle a single virtual socket group.
110
 
    * @return a <code>SingleGroupInputThread</code> designed to handle a single virtual socket group
111
 
    */
112
 
   public SingleGroupInputThread getaSingleGroupInputThread(MultiplexingManager manager, Socket socket, OutputStream os) throws IOException
113
 
   {
114
 
      return new SingleGroupInputThread(manager, socket, os);
115
 
   }
116
 
 
117
 
 
118
 
   public class MultiGroupInputThread extends StoppableThread
119
 
   {
120
 
      private static final String errMsg1 = "An existing connection was forcibly closed by the remote host";
121
 
      private static final String errMsg2 = "An established connection was aborted by the software in your host machine";
122
 
 
123
 
      private Map managerProcessorMap;
124
 
      private Set socketGroupsToBeRegistered = new HashSet();
125
 
      private Set tempSocketGroupSet = new HashSet();
126
 
      private boolean socketGroupsAreWaiting;
127
 
      private Selector selector;
128
 
      private ByteBuffer buffer;
129
 
      private byte[] data;
130
 
 
131
 
      private boolean trace;
132
 
      private boolean debug;
133
 
      private boolean info;
134
 
 
135
 
 
136
 
      public MultiGroupInputThread() throws IOException
137
 
      {
138
 
         managerProcessorMap = Collections.synchronizedMap(new HashMap());
139
 
         selector = Selector.open();
140
 
         buffer = ByteBuffer.allocate(bufferSize);
141
 
         data = new byte[bufferSize];
142
 
 
143
 
         trace = log.isTraceEnabled();
144
 
         debug = log.isDebugEnabled();
145
 
         info = log.isInfoEnabled();
146
 
      }
147
 
 
148
 
 
149
 
      /**
150
 
       * Registers manager and socket with NIO Selector
151
 
       * @param manager <code>MultiplexingManager</code>
152
 
       * @return
153
 
       * @throws <code>IOException</code>
154
 
       */
155
 
      public void registerSocketGroup(MultiplexingManager manager) throws IOException
156
 
      {
157
 
         if (debug) log.debug(" accepting socket group for registration: " + manager);
158
 
 
159
 
         synchronized (socketGroupsToBeRegistered)
160
 
         {
161
 
            socketGroupsToBeRegistered.add(manager);
162
 
            socketGroupsAreWaiting = true;
163
 
         }
164
 
      }
165
 
 
166
 
 
167
 
      protected void doRegistration()
168
 
      {
169
 
         tempSocketGroupSet.clear();
170
 
         synchronized(socketGroupsToBeRegistered)
171
 
         {
172
 
            tempSocketGroupSet.addAll(socketGroupsToBeRegistered);
173
 
            socketGroupsToBeRegistered.clear();
174
 
            socketGroupsAreWaiting = false;
175
 
         }
176
 
 
177
 
         Iterator it = tempSocketGroupSet.iterator();
178
 
         while (it.hasNext())
179
 
         {
180
 
            MultiplexingManager manager = (MultiplexingManager) it.next();
181
 
            GroupProcessor groupProcessor = new GroupProcessor(manager);
182
 
            SelectableChannel channel = manager.getSocket().getChannel();
183
 
 
184
 
            try
185
 
            {
186
 
               SelectionKey key = channel.register(selector, SelectionKey.OP_READ, groupProcessor);
187
 
               groupProcessor.setKey(key);
188
 
               managerProcessorMap.put(manager, groupProcessor);
189
 
            }
190
 
            catch (IOException e)
191
 
            {
192
 
               // channel might be closed.
193
 
               log.warn(e);
194
 
            }
195
 
         }
196
 
      }
197
 
 
198
 
 
199
 
      /**
200
 
       * Removes references to virtual socket group.
201
 
       * @param manager
202
 
       */
203
 
      public void unregisterSocketGroup(MultiplexingManager manager)
204
 
      {
205
 
         // Leave GroupProcessor in Map until SelectionKey is cancelled.
206
 
         GroupProcessor groupProcessor = (GroupProcessor) managerProcessorMap.get(manager);
207
 
         if(groupProcessor == null)
208
 
         {
209
 
            log.debug("attempting to unregister unknown MultiplexingManager: " + manager);
210
 
            return;
211
 
         }
212
 
 
213
 
         SelectionKey key = groupProcessor.getKey();
214
 
         key.cancel();
215
 
         managerProcessorMap.remove(manager);
216
 
         if (debug) log.debug("unregistered socket group:" + manager);
217
 
      }
218
 
 
219
 
 
220
 
      public void shutdown()
221
 
      {
222
 
         // in case thread is still reading
223
 
         super.shutdown();
224
 
         try
225
 
         {
226
 
            selector.close();
227
 
         }
228
 
         catch (IOException e)
229
 
         {
230
 
            log.error("unable to close selector", e);
231
 
         }
232
 
         interrupt();
233
 
      }
234
 
 
235
 
 
236
 
      protected void doInit()
237
 
      {
238
 
         log.debug("MultiGroupInputThread thread starting");
239
 
      }
240
 
 
241
 
 
242
 
      protected void doRun()
243
 
      {
244
 
         log.debug("entering doRun()");
245
 
         Set keys = null;
246
 
 
247
 
         try
248
 
         {
249
 
            while (true)
250
 
            {
251
 
               if (!running)
252
 
                  return;
253
 
 
254
 
               if (socketGroupsAreWaiting)
255
 
                  doRegistration();
256
 
 
257
 
               selector.select(200);
258
 
               keys = selector.selectedKeys();
259
 
 
260
 
               if (!keys.isEmpty())
261
 
                  break;
262
 
            }
263
 
         }
264
 
         catch (IOException e)
265
 
         {
266
 
            log.info(e);
267
 
         }
268
 
         catch (ClosedSelectorException e)
269
 
         {
270
 
            log.info("Selector is closed: shutting down input thread");
271
 
            super.shutdown();
272
 
            return;
273
 
         }
274
 
 
275
 
         if (trace)
276
 
         {
277
 
            log.trace("keys: " + selector.keys().size());
278
 
            log.trace("selected keys: " + keys.size());
279
 
         }
280
 
 
281
 
         Iterator it = keys.iterator();
282
 
         while (it.hasNext())
283
 
         {
284
 
            SelectionKey key = (SelectionKey) it.next();
285
 
            it.remove();
286
 
            GroupProcessor groupProcessor = (GroupProcessor) key.attachment();
287
 
 
288
 
            if (groupProcessor == null)
289
 
            {
290
 
               if (key.isValid())
291
 
                  log.error("valid SelectionKey has no attachment: " + key);
292
 
 
293
 
               continue;
294
 
            }
295
 
 
296
 
            groupProcessor.processChannel(key);
297
 
         }
298
 
      }
299
 
 
300
 
 
301
 
      protected void doShutDown()
302
 
      {
303
 
         log.debug("MultiGroupInputThread shutting down");
304
 
      }
305
 
 
306
 
 
307
 
      class GroupProcessor
308
 
      {
309
 
         // Message header
310
 
         private byte[] b = new byte[HEADER_LENGTH];
311
 
         private int    headerCount;
312
 
         private byte   version;
313
 
         private int    destination;
314
 
         private short  size;
315
 
 
316
 
         private MultiplexingManager manager;
317
 
         private OutputStream   outputStream;
318
 
         private SelectionKey key;
319
 
         private int errorCount;
320
 
 
321
 
 
322
 
         public GroupProcessor(MultiplexingManager manager)
323
 
         {
324
 
            this.manager = manager;
325
 
         }
326
 
 
327
 
         public void processChannel(SelectionKey key)
328
 
         {
329
 
            log.debug("processChannel()");
330
 
            SocketChannel channel = (SocketChannel) key.channel();
331
 
            buffer.clear();
332
 
 
333
 
            try
334
 
            {
335
 
               if (channel.read(buffer) < 0)
336
 
                     throw new EOFException();
337
 
 
338
 
               buffer.flip();
339
 
 
340
 
               if (debug)
341
 
                  log.debug("read: " + buffer.remaining());
342
 
 
343
 
               while (buffer.hasRemaining())
344
 
               {
345
 
                  if (headerCount < HEADER_LENGTH || size == 0)
346
 
                  {
347
 
                     // then prepare to process next virtual stream.
348
 
                     completeHeader(buffer);
349
 
 
350
 
                     if (headerCount < HEADER_LENGTH)
351
 
                        return;
352
 
 
353
 
                     SocketId socketId = new SocketId(destination);
354
 
                     outputStream = manager.getOutputStreamByLocalSocket(socketId);
355
 
                     if (outputStream == null)
356
 
                     {
357
 
                        // We'll get an OutputStream to stash these bytes, just in case they
358
 
                        // are coming from a valid source and the local VirtualSocket is still
359
 
                        // getting set up.
360
 
                        log.info("unknown socket id: " + destination);
361
 
                        outputStream = manager.getConnectedOutputStream(socketId);
362
 
                     }
363
 
 
364
 
                     if (!buffer.hasRemaining())
365
 
                        return;
366
 
                  }
367
 
 
368
 
                  int n = Math.min(size, buffer.remaining());
369
 
                  buffer.get(data, 0, n);
370
 
                  outputStream.write(data, 0, n);
371
 
 
372
 
                  if (trace)
373
 
                  {
374
 
                     log.trace("received " + n + " bytes for socket: " + destination);
375
 
                     for (int i = 0; i < n; i++)
376
 
                        log.trace("" + (0xff & data[i]));
377
 
                  }
378
 
 
379
 
                  size -= n;
380
 
                  if (size == 0)
381
 
                     headerCount = 0;
382
 
               }
383
 
            }
384
 
            catch (IOException e)
385
 
            {
386
 
               handleChannelException(e, key, channel);
387
 
            }
388
 
            catch (Throwable t)
389
 
            {
390
 
               log.error("doRun()");
391
 
               log.error(t);
392
 
            }
393
 
         }
394
 
 
395
 
         public SelectionKey getKey()
396
 
         {
397
 
            return key;
398
 
         }
399
 
 
400
 
         public void setKey(SelectionKey key)
401
 
         {
402
 
            this.key = key;
403
 
         }
404
 
 
405
 
         private void completeHeader(ByteBuffer bb) throws IOException
406
 
         {
407
 
            int n = Math.min(bb.remaining(), HEADER_LENGTH - headerCount);
408
 
            bb.get(b, headerCount, n);
409
 
            headerCount += n;
410
 
 
411
 
            if (headerCount == HEADER_LENGTH)
412
 
            {
413
 
               version = b[0];
414
 
               destination =                 (b[1] << 24) | (0x00ff0000 & (b[2] << 16)) |
415
 
                               (0x0000ff00 & (b[3] << 8)) | (0x000000ff & b[4]);
416
 
               size = (short) ((0x0000ff00 & (b[5] << 8)) | (0x000000ff & b[6]));
417
 
 
418
 
 
419
 
               if (size  < 0 || bufferSize < size)
420
 
                  throw new CorruptedStreamException("invalid chunk size read on: " + manager + ": "+ size);
421
 
 
422
 
               if (version != 0)
423
 
                  throw new CorruptedStreamException("invalid version read on: " + manager + ": " + version);
424
 
            }
425
 
         }
426
 
 
427
 
         private void handleChannelException(IOException e, SelectionKey key, SocketChannel channel)
428
 
         {
429
 
            try
430
 
            {
431
 
               if (!channel.isOpen())
432
 
               {
433
 
                  key.cancel();
434
 
                  return;
435
 
               }
436
 
 
437
 
               if (e instanceof EOFException)
438
 
               {
439
 
                  key.cancel();
440
 
                  manager.setEOF();
441
 
                  log.debug(e);
442
 
                  return;
443
 
               }
444
 
               
445
 
               if (e instanceof SSLException)
446
 
               {
447
 
                  key.cancel();
448
 
                  log.error(e);
449
 
                  return;
450
 
               }
451
 
 
452
 
               if (++errorCount > maxErrors)
453
 
                 {
454
 
                    manager.setReadException(e);
455
 
                    channel.close();
456
 
                    key.cancel();
457
 
                    log.error(e);
458
 
                    log.error("error count exceeds max errors: " + errorCount);
459
 
                    return;
460
 
                 }
461
 
 
462
 
               Socket socket = channel.socket();
463
 
               String message = e.getMessage();
464
 
 
465
 
               if (socket.isClosed() || socket.isInputShutdown() ||
466
 
                   errMsg1.equals(message) || errMsg2.equals(message) ||
467
 
                   e instanceof CorruptedStreamException)
468
 
               {
469
 
                  manager.setReadException(e);
470
 
                  channel.close();
471
 
                  key.cancel();
472
 
                  log.info(e);
473
 
                  return;
474
 
               }
475
 
 
476
 
               // Haven't reached maxErrors yet
477
 
               log.warn(e);
478
 
            }
479
 
            catch (IOException e2)
480
 
            {
481
 
               log.error("problem closing channel: "  + manager, e2);
482
 
            }
483
 
         }
484
 
 
485
 
         public int          getDestination()   {return destination;}
486
 
         public short        getSize()          {return size;}
487
 
         public byte         getVersion()       {return version;}
488
 
         public OutputStream getOutputStream()  {return outputStream;}
489
 
      }
490
 
   }
491
 
 
492
 
 
493
 
   class SingleGroupInputThread extends StoppableThread
494
 
   {
495
 
      private InputStream is;
496
 
      private OutputStream currentOutputStream;
497
 
      private byte[] dataBytes = new byte[bufferSize];
498
 
      private MultiplexingManager manager;
499
 
      private int dataInCount = 0;
500
 
      private int errorCount;
501
 
      private boolean eof;
502
 
 
503
 
      // Message header
504
 
      private byte[] headerBytes = new byte[HEADER_LENGTH];
505
 
      private int    headerCount;
506
 
      private byte   version;
507
 
      private int    destination;
508
 
      private short  size;
509
 
 
510
 
      private boolean trace;
511
 
      private boolean debug;
512
 
      private boolean info;
513
 
 
514
 
 
515
 
      public SingleGroupInputThread(MultiplexingManager manager, Socket socket, OutputStream os)
516
 
      throws IOException
517
 
      {
518
 
         this.is = new BufferedInputStream(socket.getInputStream());
519
 
         this.manager = manager;
520
 
         currentOutputStream = os;
521
 
 
522
 
         trace = log.isTraceEnabled();
523
 
         debug = log.isDebugEnabled();
524
 
         info = log.isInfoEnabled();
525
 
      }
526
 
 
527
 
 
528
 
      public void shutdown()
529
 
      {
530
 
         // in case thread is still reading
531
 
         super.shutdown();
532
 
         log.info("interrupting input thread");
533
 
         interrupt();
534
 
      }
535
 
 
536
 
 
537
 
      /**
538
 
       *
539
 
       */
540
 
      protected void doInit()
541
 
      {
542
 
         log.debug("SingleGroupInputThread thread starting");
543
 
      }
544
 
 
545
 
 
546
 
      /**
547
 
       *
548
 
       */
549
 
      protected void doRun()
550
 
      {
551
 
         try
552
 
         {
553
 
            // end of file
554
 
            if (!completeHeader())
555
 
            {
556
 
               eof = true;
557
 
               return;
558
 
            }
559
 
 
560
 
            SocketId socketId = new SocketId(destination);
561
 
            currentOutputStream = manager.getOutputStreamByLocalSocket(socketId);
562
 
            if (currentOutputStream == null)
563
 
            {
564
 
               // We'll get an OutputStream to stash these bytes, just in case they
565
 
               // are coming from a valid source and the local VirtualSocket is still
566
 
               // getting set up.
567
 
               log.info("unknown socket id: " + destination);
568
 
               currentOutputStream = manager.getConnectedOutputStream(socketId);
569
 
            }
570
 
 
571
 
            int bytesRead = 0;
572
 
            while (bytesRead < size)
573
 
            {
574
 
               int n = is.read(dataBytes, 0, size - bytesRead);
575
 
               if (n < 0)
576
 
               {
577
 
                  eof = true;
578
 
                  return;
579
 
               }
580
 
 
581
 
               currentOutputStream.write(dataBytes, 0, n);
582
 
               bytesRead += n;
583
 
 
584
 
               if (trace)
585
 
               {
586
 
                  for (int i = 0; i < n; i++)
587
 
                     log.trace("" + dataBytes[i]);
588
 
               }
589
 
            }
590
 
         }
591
 
         catch (SSLException e)
592
 
         {
593
 
            log.debug(e.getMessage());
594
 
         }
595
 
         catch (EOFException e)
596
 
         {
597
 
            eof = true;
598
 
            log.info("end of file");
599
 
         }
600
 
         catch (IOException e)
601
 
         {
602
 
            if (++errorCount > maxErrors)
603
 
            {
604
 
               manager.setReadException(e);
605
 
               super.shutdown();
606
 
               log.error(e);
607
 
            }
608
 
            else
609
 
               log.warn(e);
610
 
         }
611
 
         finally
612
 
         {
613
 
            if (eof)
614
 
            {
615
 
               super.shutdown();
616
 
               manager.setEOF();
617
 
            }
618
 
         }
619
 
      }
620
 
 
621
 
 
622
 
      private boolean completeHeader() throws IOException
623
 
      {
624
 
         while (headerCount < HEADER_LENGTH)
625
 
         {
626
 
            int n = is.read(headerBytes, headerCount, HEADER_LENGTH - headerCount);
627
 
 
628
 
            // end of file
629
 
            if (n < 0)
630
 
               return false;
631
 
 
632
 
            headerCount += n;
633
 
         }
634
 
 
635
 
         // Reset for next header.
636
 
         headerCount = 0;
637
 
 
638
 
         version = headerBytes[0];
639
 
         destination =               (headerBytes[1] << 24) | (0x00ff0000 & (headerBytes[2] << 16)) |
640
 
                       (0x0000ff00 & (headerBytes[3] << 8)) | (0x000000ff & headerBytes[4]);
641
 
         size = (short) ((0x0000ff00 & (headerBytes[5] << 8)) | (0x000000ff & headerBytes[6]));
642
 
 
643
 
         if (trace)
644
 
         {
645
 
            log.trace("version:     " + version);
646
 
            log.trace("destination: " + destination);
647
 
            log.trace("size:        " + size);
648
 
         }
649
 
 
650
 
         if (size  < 0 || bufferSize < size)
651
 
            throw new CorruptedStreamException("invalid chunk size read on: " + manager + ": "+ size);
652
 
 
653
 
         if (version != 0)
654
 
            throw new CorruptedStreamException("invalid version read on: " + manager + ": " + version);
655
 
 
656
 
         return true;
657
 
      }
658
 
 
659
 
 
660
 
      protected void doShutDown()
661
 
      {
662
 
         log.debug("input thread: data bytes read: " + dataInCount);
663
 
         log.debug("input thread shutting down");
664
 
      }
665
 
   }
666
 
 
667
 
 
668
 
   private static class CorruptedStreamException extends IOException
669
 
   {
670
 
      CorruptedStreamException(String message) {super(message);}
671
 
   }
672
 
}
 
 
b'\\ No newline at end of file'