~ubuntu-branches/ubuntu/wily/libjboss-remoting-java/wily

« back to all changes in this revision

Viewing changes to src/main/org/jboss/remoting/transport/multiplex/InputMultiplexor.java

  • Committer: Package Import Robot
  • Author(s): Torsten Werner
  • Date: 2011-09-09 14:01:03 UTC
  • mfrom: (1.1.6 upstream)
  • Revision ID: package-import@ubuntu.com-20110909140103-hqokx61534tas9rg
Tags: 2.5.3.SP1-1
* Newer but not newest upstream release. Do not build samples.
* Change debian/watch to upstream's svn repo.
* Add patch to fix compile error caused by tomcat update.
  (Closes: #628303)
* Switch to source format 3.0.
* Switch to debhelper level 7.
* Remove useless Depends.
* Update Standards-Version: 3.9.2.
* Update README.source.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
 * JBoss, Home of Professional Open Source
 
3
 * Copyright 2005, JBoss Inc., and individual contributors as indicated
 
4
 * by the @authors tag. See the copyright.txt in the distribution for a
 
5
 * full listing of individual contributors.
 
6
 *
 
7
 * This is free software; you can redistribute it and/or modify it
 
8
 * under the terms of the GNU Lesser General Public License as
 
9
 * published by the Free Software Foundation; either version 2.1 of
 
10
 * the License, or (at your option) any later version.
 
11
 *
 
12
 * This software is distributed in the hope that it will be useful,
 
13
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 
14
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
 
15
 * Lesser General Public License for more details.
 
16
 *
 
17
 * You should have received a copy of the GNU Lesser General Public
 
18
 * License along with this software; if not, write to the Free
 
19
 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
 
20
 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
 
21
 */
 
22
 
 
23
package org.jboss.remoting.transport.multiplex;
 
24
 
 
25
import org.jboss.logging.Logger;
 
26
import org.jboss.remoting.transport.multiplex.utility.StoppableThread;
 
27
 
 
28
import java.io.BufferedInputStream;
 
29
import java.io.EOFException;
 
30
import java.io.IOException;
 
31
import java.io.InputStream;
 
32
import java.io.OutputStream;
 
33
import java.net.Socket;
 
34
import java.nio.ByteBuffer;
 
35
import java.nio.channels.ClosedSelectorException;
 
36
import java.nio.channels.SelectableChannel;
 
37
import java.nio.channels.SelectionKey;
 
38
import java.nio.channels.Selector;
 
39
import java.nio.channels.SocketChannel;
 
40
import java.util.Collections;
 
41
import java.util.HashMap;
 
42
import java.util.HashSet;
 
43
import java.util.Iterator;
 
44
import java.util.Map;
 
45
import java.util.Set;
 
46
 
 
47
import javax.net.ssl.SSLException;
 
48
 
 
49
/**
 
50
 * <code>InputMultiplexor</code> is one of the key Multiplex classes, responsible for
 
51
 * demultiplexing multiple byte streams sharing a single TCP connection.  It has two
 
52
 * inner classes which can perform this function.  <code>MultiGroupInputThread</code> can perform
 
53
 * demultiplexing for any number of NIO sockets, taking advantage of the <code>Selector</code>
 
54
 * facility.  For non-NIO sockets, notably SSL sockets, <code>SingleGroupInputThread</code>
 
55
 * handles demultiplexing for a single socket.
 
56
 * <p>
 
57
 * The data stream, created at the other end of the TCP connection by the
 
58
 * <code>OutputMultiplexor</code> class, consists of a sequence of packets, each consisting of
 
59
 * a header, giving version, destination virtual socket, and number of bytes. followed
 
60
 * by the specified number of data bytes. (See <code>OutputMultiplexor</code> for the
 
61
 * header format.
 
62
 * Each of the demultiplexing thread classes reads a header and transfers the
 
63
 * following bytes to the input stream of the target virtual socket.
 
64
 *
 
65
 * <p>
 
66
 * Copyright (c) 2005
 
67
 * <p>
 
68
 * @author <a href="mailto:r.sigal@computer.org">Ron Sigal</a>
 
69
 * 
 
70
 * @deprecated As of release 2.4.0 the multiplex transport will no longer be actively supported.
 
71
 */
 
72
public class InputMultiplexor
 
73
{
 
74
   protected static final Logger log = Logger.getLogger(InputMultiplexor.class);
 
75
   private static final int HEADER_LENGTH = 7;
 
76
 
 
77
   private int bufferSize;
 
78
   private int maxErrors;
 
79
 
 
80
 
 
81
   public InputMultiplexor(Map configuration)
 
82
   {
 
83
      bufferSize
 
84
         = Multiplex.getOneParameter(configuration,
 
85
                                    "bufferSize",
 
86
                                    Multiplex.INPUT_BUFFER_SIZE,
 
87
                                    Multiplex.INPUT_BUFFER_SIZE_DEFAULT);
 
88
 
 
89
      maxErrors
 
90
         = Multiplex.getOneParameter(configuration,
 
91
                                     "maxErrors",
 
92
                                     Multiplex.INPUT_MAX_ERRORS,
 
93
                                     Multiplex.INPUT_MAX_ERRORS_DEFAULT);
 
94
   }
 
95
 
 
96
 
 
97
   /**
 
98
    * Returns a <code>MultiGroupInputThread</code> designed to handle multiple virtual socket groups.
 
99
    * @param configuration
 
100
    * @return a <code>MultiGroupInputThread</code> designed to handle multiple virtual socket groups
 
101
    */
 
102
   public MultiGroupInputThread getaMultiGroupInputThread() throws IOException
 
103
   {
 
104
      return new MultiGroupInputThread();
 
105
   }
 
106
 
 
107
 
 
108
   /**
 
109
    * Returns a <code>SingleGroupInputThread</code> designed to handle a single virtual socket group.
 
110
    * @return a <code>SingleGroupInputThread</code> designed to handle a single virtual socket group
 
111
    */
 
112
   public SingleGroupInputThread getaSingleGroupInputThread(MultiplexingManager manager, Socket socket, OutputStream os) throws IOException
 
113
   {
 
114
      return new SingleGroupInputThread(manager, socket, os);
 
115
   }
 
116
 
 
117
 
 
118
   public class MultiGroupInputThread extends StoppableThread
 
119
   {
 
120
      private static final String errMsg1 = "An existing connection was forcibly closed by the remote host";
 
121
      private static final String errMsg2 = "An established connection was aborted by the software in your host machine";
 
122
 
 
123
      private Map managerProcessorMap;
 
124
      private Set socketGroupsToBeRegistered = new HashSet();
 
125
      private Set tempSocketGroupSet = new HashSet();
 
126
      private boolean socketGroupsAreWaiting;
 
127
      private Selector selector;
 
128
      private ByteBuffer buffer;
 
129
      private byte[] data;
 
130
 
 
131
      private boolean trace;
 
132
      private boolean debug;
 
133
      private boolean info;
 
134
 
 
135
 
 
136
      public MultiGroupInputThread() throws IOException
 
137
      {
 
138
         managerProcessorMap = Collections.synchronizedMap(new HashMap());
 
139
         selector = Selector.open();
 
140
         buffer = ByteBuffer.allocate(bufferSize);
 
141
         data = new byte[bufferSize];
 
142
 
 
143
         trace = log.isTraceEnabled();
 
144
         debug = log.isDebugEnabled();
 
145
         info = log.isInfoEnabled();
 
146
      }
 
147
 
 
148
 
 
149
      /**
 
150
       * Registers manager and socket with NIO Selector
 
151
       * @param manager <code>MultiplexingManager</code>
 
152
       * @return
 
153
       * @throws <code>IOException</code>
 
154
       */
 
155
      public void registerSocketGroup(MultiplexingManager manager) throws IOException
 
156
      {
 
157
         if (debug) log.debug(" accepting socket group for registration: " + manager);
 
158
 
 
159
         synchronized (socketGroupsToBeRegistered)
 
160
         {
 
161
            socketGroupsToBeRegistered.add(manager);
 
162
            socketGroupsAreWaiting = true;
 
163
         }
 
164
      }
 
165
 
 
166
 
 
167
      protected void doRegistration()
 
168
      {
 
169
         tempSocketGroupSet.clear();
 
170
         synchronized(socketGroupsToBeRegistered)
 
171
         {
 
172
            tempSocketGroupSet.addAll(socketGroupsToBeRegistered);
 
173
            socketGroupsToBeRegistered.clear();
 
174
            socketGroupsAreWaiting = false;
 
175
         }
 
176
 
 
177
         Iterator it = tempSocketGroupSet.iterator();
 
178
         while (it.hasNext())
 
179
         {
 
180
            MultiplexingManager manager = (MultiplexingManager) it.next();
 
181
            GroupProcessor groupProcessor = new GroupProcessor(manager);
 
182
            SelectableChannel channel = manager.getSocket().getChannel();
 
183
 
 
184
            try
 
185
            {
 
186
               SelectionKey key = channel.register(selector, SelectionKey.OP_READ, groupProcessor);
 
187
               groupProcessor.setKey(key);
 
188
               managerProcessorMap.put(manager, groupProcessor);
 
189
            }
 
190
            catch (IOException e)
 
191
            {
 
192
               // channel might be closed.
 
193
               log.warn(e);
 
194
            }
 
195
         }
 
196
      }
 
197
 
 
198
 
 
199
      /**
 
200
       * Removes references to virtual socket group.
 
201
       * @param manager
 
202
       */
 
203
      public void unregisterSocketGroup(MultiplexingManager manager)
 
204
      {
 
205
         // Leave GroupProcessor in Map until SelectionKey is cancelled.
 
206
         GroupProcessor groupProcessor = (GroupProcessor) managerProcessorMap.get(manager);
 
207
         if(groupProcessor == null)
 
208
         {
 
209
            log.debug("attempting to unregister unknown MultiplexingManager: " + manager);
 
210
            return;
 
211
         }
 
212
 
 
213
         SelectionKey key = groupProcessor.getKey();
 
214
         key.cancel();
 
215
         managerProcessorMap.remove(manager);
 
216
         if (debug) log.debug("unregistered socket group:" + manager);
 
217
      }
 
218
 
 
219
 
 
220
      public void shutdown()
 
221
      {
 
222
         // in case thread is still reading
 
223
         super.shutdown();
 
224
         try
 
225
         {
 
226
            selector.close();
 
227
         }
 
228
         catch (IOException e)
 
229
         {
 
230
            log.error("unable to close selector", e);
 
231
         }
 
232
         interrupt();
 
233
      }
 
234
 
 
235
 
 
236
      protected void doInit()
 
237
      {
 
238
         log.debug("MultiGroupInputThread thread starting");
 
239
      }
 
240
 
 
241
 
 
242
      protected void doRun()
 
243
      {
 
244
         log.debug("entering doRun()");
 
245
         Set keys = null;
 
246
 
 
247
         try
 
248
         {
 
249
            while (true)
 
250
            {
 
251
               if (!running)
 
252
                  return;
 
253
 
 
254
               if (socketGroupsAreWaiting)
 
255
                  doRegistration();
 
256
 
 
257
               selector.select(200);
 
258
               keys = selector.selectedKeys();
 
259
 
 
260
               if (!keys.isEmpty())
 
261
                  break;
 
262
            }
 
263
         }
 
264
         catch (IOException e)
 
265
         {
 
266
            log.info(e);
 
267
         }
 
268
         catch (ClosedSelectorException e)
 
269
         {
 
270
            log.info("Selector is closed: shutting down input thread");
 
271
            super.shutdown();
 
272
            return;
 
273
         }
 
274
 
 
275
         if (trace)
 
276
         {
 
277
            log.trace("keys: " + selector.keys().size());
 
278
            log.trace("selected keys: " + keys.size());
 
279
         }
 
280
 
 
281
         Iterator it = keys.iterator();
 
282
         while (it.hasNext())
 
283
         {
 
284
            SelectionKey key = (SelectionKey) it.next();
 
285
            it.remove();
 
286
            GroupProcessor groupProcessor = (GroupProcessor) key.attachment();
 
287
 
 
288
            if (groupProcessor == null)
 
289
            {
 
290
               if (key.isValid())
 
291
                  log.error("valid SelectionKey has no attachment: " + key);
 
292
 
 
293
               continue;
 
294
            }
 
295
 
 
296
            groupProcessor.processChannel(key);
 
297
         }
 
298
      }
 
299
 
 
300
 
 
301
      protected void doShutDown()
 
302
      {
 
303
         log.debug("MultiGroupInputThread shutting down");
 
304
      }
 
305
 
 
306
 
 
307
      class GroupProcessor
 
308
      {
 
309
         // Message header
 
310
         private byte[] b = new byte[HEADER_LENGTH];
 
311
         private int    headerCount;
 
312
         private byte   version;
 
313
         private int    destination;
 
314
         private short  size;
 
315
 
 
316
         private MultiplexingManager manager;
 
317
         private OutputStream   outputStream;
 
318
         private SelectionKey key;
 
319
         private int errorCount;
 
320
 
 
321
 
 
322
         public GroupProcessor(MultiplexingManager manager)
 
323
         {
 
324
            this.manager = manager;
 
325
         }
 
326
 
 
327
         public void processChannel(SelectionKey key)
 
328
         {
 
329
            log.debug("processChannel()");
 
330
            SocketChannel channel = (SocketChannel) key.channel();
 
331
            buffer.clear();
 
332
 
 
333
            try
 
334
            {
 
335
               if (channel.read(buffer) < 0)
 
336
                     throw new EOFException();
 
337
 
 
338
               buffer.flip();
 
339
 
 
340
               if (debug)
 
341
                  log.debug("read: " + buffer.remaining());
 
342
 
 
343
               while (buffer.hasRemaining())
 
344
               {
 
345
                  if (headerCount < HEADER_LENGTH || size == 0)
 
346
                  {
 
347
                     // then prepare to process next virtual stream.
 
348
                     completeHeader(buffer);
 
349
 
 
350
                     if (headerCount < HEADER_LENGTH)
 
351
                        return;
 
352
 
 
353
                     SocketId socketId = new SocketId(destination);
 
354
                     outputStream = manager.getOutputStreamByLocalSocket(socketId);
 
355
                     if (outputStream == null)
 
356
                     {
 
357
                        // We'll get an OutputStream to stash these bytes, just in case they
 
358
                        // are coming from a valid source and the local VirtualSocket is still
 
359
                        // getting set up.
 
360
                        log.info("unknown socket id: " + destination);
 
361
                        outputStream = manager.getConnectedOutputStream(socketId);
 
362
                     }
 
363
 
 
364
                     if (!buffer.hasRemaining())
 
365
                        return;
 
366
                  }
 
367
 
 
368
                  int n = Math.min(size, buffer.remaining());
 
369
                  buffer.get(data, 0, n);
 
370
                  outputStream.write(data, 0, n);
 
371
 
 
372
                  if (trace)
 
373
                  {
 
374
                     log.trace("received " + n + " bytes for socket: " + destination);
 
375
                     for (int i = 0; i < n; i++)
 
376
                        log.trace("" + (0xff & data[i]));
 
377
                  }
 
378
 
 
379
                  size -= n;
 
380
                  if (size == 0)
 
381
                     headerCount = 0;
 
382
               }
 
383
            }
 
384
            catch (IOException e)
 
385
            {
 
386
               handleChannelException(e, key, channel);
 
387
            }
 
388
            catch (Throwable t)
 
389
            {
 
390
               log.error("doRun()");
 
391
               log.error(t);
 
392
            }
 
393
         }
 
394
 
 
395
         public SelectionKey getKey()
 
396
         {
 
397
            return key;
 
398
         }
 
399
 
 
400
         public void setKey(SelectionKey key)
 
401
         {
 
402
            this.key = key;
 
403
         }
 
404
 
 
405
         private void completeHeader(ByteBuffer bb) throws IOException
 
406
         {
 
407
            int n = Math.min(bb.remaining(), HEADER_LENGTH - headerCount);
 
408
            bb.get(b, headerCount, n);
 
409
            headerCount += n;
 
410
 
 
411
            if (headerCount == HEADER_LENGTH)
 
412
            {
 
413
               version = b[0];
 
414
               destination =                 (b[1] << 24) | (0x00ff0000 & (b[2] << 16)) |
 
415
                               (0x0000ff00 & (b[3] << 8)) | (0x000000ff & b[4]);
 
416
               size = (short) ((0x0000ff00 & (b[5] << 8)) | (0x000000ff & b[6]));
 
417
 
 
418
 
 
419
               if (size  < 0 || bufferSize < size)
 
420
                  throw new CorruptedStreamException("invalid chunk size read on: " + manager + ": "+ size);
 
421
 
 
422
               if (version != 0)
 
423
                  throw new CorruptedStreamException("invalid version read on: " + manager + ": " + version);
 
424
            }
 
425
         }
 
426
 
 
427
         private void handleChannelException(IOException e, SelectionKey key, SocketChannel channel)
 
428
         {
 
429
            try
 
430
            {
 
431
               if (!channel.isOpen())
 
432
               {
 
433
                  key.cancel();
 
434
                  return;
 
435
               }
 
436
 
 
437
               if (e instanceof EOFException)
 
438
               {
 
439
                  key.cancel();
 
440
                  manager.setEOF();
 
441
                  log.debug(e);
 
442
                  return;
 
443
               }
 
444
               
 
445
               if (e instanceof SSLException)
 
446
               {
 
447
                  key.cancel();
 
448
                  log.error(e);
 
449
                  return;
 
450
               }
 
451
 
 
452
               if (++errorCount > maxErrors)
 
453
                 {
 
454
                    manager.setReadException(e);
 
455
                    channel.close();
 
456
                    key.cancel();
 
457
                    log.error(e);
 
458
                    log.error("error count exceeds max errors: " + errorCount);
 
459
                    return;
 
460
                 }
 
461
 
 
462
               Socket socket = channel.socket();
 
463
               String message = e.getMessage();
 
464
 
 
465
               if (socket.isClosed() || socket.isInputShutdown() ||
 
466
                   errMsg1.equals(message) || errMsg2.equals(message) ||
 
467
                   e instanceof CorruptedStreamException)
 
468
               {
 
469
                  manager.setReadException(e);
 
470
                  channel.close();
 
471
                  key.cancel();
 
472
                  log.info(e);
 
473
                  return;
 
474
               }
 
475
 
 
476
               // Haven't reached maxErrors yet
 
477
               log.warn(e);
 
478
            }
 
479
            catch (IOException e2)
 
480
            {
 
481
               log.error("problem closing channel: "  + manager, e2);
 
482
            }
 
483
         }
 
484
 
 
485
         public int          getDestination()   {return destination;}
 
486
         public short        getSize()          {return size;}
 
487
         public byte         getVersion()       {return version;}
 
488
         public OutputStream getOutputStream()  {return outputStream;}
 
489
      }
 
490
   }
 
491
 
 
492
 
 
493
   class SingleGroupInputThread extends StoppableThread
 
494
   {
 
495
      private InputStream is;
 
496
      private OutputStream currentOutputStream;
 
497
      private byte[] dataBytes = new byte[bufferSize];
 
498
      private MultiplexingManager manager;
 
499
      private int dataInCount = 0;
 
500
      private int errorCount;
 
501
      private boolean eof;
 
502
 
 
503
      // Message header
 
504
      private byte[] headerBytes = new byte[HEADER_LENGTH];
 
505
      private int    headerCount;
 
506
      private byte   version;
 
507
      private int    destination;
 
508
      private short  size;
 
509
 
 
510
      private boolean trace;
 
511
      private boolean debug;
 
512
      private boolean info;
 
513
 
 
514
 
 
515
      public SingleGroupInputThread(MultiplexingManager manager, Socket socket, OutputStream os)
 
516
      throws IOException
 
517
      {
 
518
         this.is = new BufferedInputStream(socket.getInputStream());
 
519
         this.manager = manager;
 
520
         currentOutputStream = os;
 
521
 
 
522
         trace = log.isTraceEnabled();
 
523
         debug = log.isDebugEnabled();
 
524
         info = log.isInfoEnabled();
 
525
      }
 
526
 
 
527
 
 
528
      public void shutdown()
 
529
      {
 
530
         // in case thread is still reading
 
531
         super.shutdown();
 
532
         log.info("interrupting input thread");
 
533
         interrupt();
 
534
      }
 
535
 
 
536
 
 
537
      /**
 
538
       *
 
539
       */
 
540
      protected void doInit()
 
541
      {
 
542
         log.debug("SingleGroupInputThread thread starting");
 
543
      }
 
544
 
 
545
 
 
546
      /**
 
547
       *
 
548
       */
 
549
      protected void doRun()
 
550
      {
 
551
         try
 
552
         {
 
553
            // end of file
 
554
            if (!completeHeader())
 
555
            {
 
556
               eof = true;
 
557
               return;
 
558
            }
 
559
 
 
560
            SocketId socketId = new SocketId(destination);
 
561
            currentOutputStream = manager.getOutputStreamByLocalSocket(socketId);
 
562
            if (currentOutputStream == null)
 
563
            {
 
564
               // We'll get an OutputStream to stash these bytes, just in case they
 
565
               // are coming from a valid source and the local VirtualSocket is still
 
566
               // getting set up.
 
567
               log.info("unknown socket id: " + destination);
 
568
               currentOutputStream = manager.getConnectedOutputStream(socketId);
 
569
            }
 
570
 
 
571
            int bytesRead = 0;
 
572
            while (bytesRead < size)
 
573
            {
 
574
               int n = is.read(dataBytes, 0, size - bytesRead);
 
575
               if (n < 0)
 
576
               {
 
577
                  eof = true;
 
578
                  return;
 
579
               }
 
580
 
 
581
               currentOutputStream.write(dataBytes, 0, n);
 
582
               bytesRead += n;
 
583
 
 
584
               if (trace)
 
585
               {
 
586
                  for (int i = 0; i < n; i++)
 
587
                     log.trace("" + dataBytes[i]);
 
588
               }
 
589
            }
 
590
         }
 
591
         catch (SSLException e)
 
592
         {
 
593
            log.debug(e.getMessage());
 
594
         }
 
595
         catch (EOFException e)
 
596
         {
 
597
            eof = true;
 
598
            log.info("end of file");
 
599
         }
 
600
         catch (IOException e)
 
601
         {
 
602
            if (++errorCount > maxErrors)
 
603
            {
 
604
               manager.setReadException(e);
 
605
               super.shutdown();
 
606
               log.error(e);
 
607
            }
 
608
            else
 
609
               log.warn(e);
 
610
         }
 
611
         finally
 
612
         {
 
613
            if (eof)
 
614
            {
 
615
               super.shutdown();
 
616
               manager.setEOF();
 
617
            }
 
618
         }
 
619
      }
 
620
 
 
621
 
 
622
      private boolean completeHeader() throws IOException
 
623
      {
 
624
         while (headerCount < HEADER_LENGTH)
 
625
         {
 
626
            int n = is.read(headerBytes, headerCount, HEADER_LENGTH - headerCount);
 
627
 
 
628
            // end of file
 
629
            if (n < 0)
 
630
               return false;
 
631
 
 
632
            headerCount += n;
 
633
         }
 
634
 
 
635
         // Reset for next header.
 
636
         headerCount = 0;
 
637
 
 
638
         version = headerBytes[0];
 
639
         destination =               (headerBytes[1] << 24) | (0x00ff0000 & (headerBytes[2] << 16)) |
 
640
                       (0x0000ff00 & (headerBytes[3] << 8)) | (0x000000ff & headerBytes[4]);
 
641
         size = (short) ((0x0000ff00 & (headerBytes[5] << 8)) | (0x000000ff & headerBytes[6]));
 
642
 
 
643
         if (trace)
 
644
         {
 
645
            log.trace("version:     " + version);
 
646
            log.trace("destination: " + destination);
 
647
            log.trace("size:        " + size);
 
648
         }
 
649
 
 
650
         if (size  < 0 || bufferSize < size)
 
651
            throw new CorruptedStreamException("invalid chunk size read on: " + manager + ": "+ size);
 
652
 
 
653
         if (version != 0)
 
654
            throw new CorruptedStreamException("invalid version read on: " + manager + ": " + version);
 
655
 
 
656
         return true;
 
657
      }
 
658
 
 
659
 
 
660
      protected void doShutDown()
 
661
      {
 
662
         log.debug("input thread: data bytes read: " + dataInCount);
 
663
         log.debug("input thread shutting down");
 
664
      }
 
665
   }
 
666
 
 
667
 
 
668
   private static class CorruptedStreamException extends IOException
 
669
   {
 
670
      CorruptedStreamException(String message) {super(message);}
 
671
   }
 
672
}
 
 
b'\\ No newline at end of file'