~certify-web-dev/twisted/certify-trunk

« back to all changes in this revision

Viewing changes to twisted/web2/test/test_stream.py

  • Committer: Bazaar Package Importer
  • Author(s): Matthias Klose
  • Date: 2007-01-17 14:52:35 UTC
  • mfrom: (1.1.5 upstream) (2.1.2 etch)
  • Revision ID: james.westby@ubuntu.com-20070117145235-btmig6qfmqfen0om
Tags: 2.5.0-0ubuntu1
New upstream version, compatible with python2.5.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
import tempfile, operator, sys, os
 
2
 
 
3
from twisted.trial import unittest
 
4
from twisted.internet import reactor, defer, interfaces
 
5
from twisted.python import log
 
6
from zope.interface import Interface, Attribute, implements
 
7
 
 
8
from twisted.python.util import sibpath
 
9
from twisted.web2 import stream
 
10
 
 
11
def bufstr(data):
 
12
    try:
 
13
        return str(buffer(data))
 
14
    except TypeError:
 
15
        raise TypeError("%s doesn't conform to the buffer interface" % (data,))
 
16
    
 
17
    
 
18
class SimpleStreamTests:
 
19
    text = '1234567890'
 
20
    def test_split(self):
 
21
        for point in range(10):
 
22
            s = self.makeStream(0)
 
23
            a,b = s.split(point)
 
24
            if point > 0:
 
25
                self.assertEquals(bufstr(a.read()), self.text[:point])
 
26
            self.assertEquals(a.read(), None)
 
27
            if point < len(self.text):
 
28
                self.assertEquals(bufstr(b.read()), self.text[point:])
 
29
            self.assertEquals(b.read(), None)
 
30
 
 
31
        for point in range(7):
 
32
            s = self.makeStream(2, 6)
 
33
            self.assertEquals(s.length, 6)
 
34
            a,b = s.split(point)
 
35
            if point > 0:
 
36
                self.assertEquals(bufstr(a.read()), self.text[2:point+2])
 
37
            self.assertEquals(a.read(), None)
 
38
            if point < 6:
 
39
                self.assertEquals(bufstr(b.read()), self.text[point+2:8])
 
40
            self.assertEquals(b.read(), None)
 
41
 
 
42
    def test_read(self):
 
43
        s = self.makeStream()
 
44
        self.assertEquals(s.length, len(self.text))
 
45
        self.assertEquals(bufstr(s.read()), self.text)
 
46
        self.assertEquals(s.read(), None)
 
47
 
 
48
        s = self.makeStream(0, 4)
 
49
        self.assertEquals(s.length, 4)
 
50
        self.assertEquals(bufstr(s.read()), self.text[0:4])
 
51
        self.assertEquals(s.read(), None)
 
52
        self.assertEquals(s.length, 0)
 
53
 
 
54
        s = self.makeStream(4, 6)
 
55
        self.assertEquals(s.length, 6)
 
56
        self.assertEquals(bufstr(s.read()), self.text[4:10])
 
57
        self.assertEquals(s.read(), None)
 
58
        self.assertEquals(s.length, 0)
 
59
    
 
60
class FileStreamTest(SimpleStreamTests, unittest.TestCase):
 
61
    def makeStream(self, *args, **kw):
 
62
        return stream.FileStream(self.f, *args, **kw)
 
63
    
 
64
    def setUpClass(self):
 
65
        f = tempfile.TemporaryFile('w+')
 
66
        f.write(self.text)
 
67
        f.seek(0, 0)
 
68
        self.f = f
 
69
 
 
70
    def test_close(self):
 
71
        s = self.makeStream()
 
72
        s.close()
 
73
 
 
74
        self.assertEquals(s.length, 0)
 
75
        # Make sure close doesn't close file
 
76
        # would raise exception if f is closed
 
77
        self.f.seek(0, 0)
 
78
 
 
79
    def test_read2(self):
 
80
        s = self.makeStream(0)
 
81
        s.CHUNK_SIZE = 6
 
82
        self.assertEquals(s.length, 10)
 
83
        self.assertEquals(bufstr(s.read()), self.text[0:6])
 
84
        self.assertEquals(bufstr(s.read()), self.text[6:10])
 
85
        self.assertEquals(s.read(), None)
 
86
 
 
87
        s = self.makeStream(0)
 
88
        s.CHUNK_SIZE = 5
 
89
        self.assertEquals(s.length, 10)
 
90
        self.assertEquals(bufstr(s.read()), self.text[0:5])
 
91
        self.assertEquals(bufstr(s.read()), self.text[5:10])
 
92
        self.assertEquals(s.read(), None)
 
93
 
 
94
        s = self.makeStream(0, 20)
 
95
        self.assertEquals(s.length, 20)
 
96
        self.assertEquals(bufstr(s.read()), self.text)
 
97
        self.assertRaises(RuntimeError, s.read) # ran out of data
 
98
 
 
99
class MMapFileStreamTest(SimpleStreamTests, unittest.TestCase):
 
100
    def makeStream(self, *args, **kw):
 
101
        return stream.FileStream(self.f, *args, **kw)
 
102
    
 
103
    def setUpClass(self):
 
104
        f = tempfile.TemporaryFile('w+')
 
105
        self.text = self.text*(stream.MMAP_THRESHOLD//len(self.text) + 1)
 
106
        f.write(self.text)
 
107
        f.seek(0, 0)
 
108
        self.f=f
 
109
 
 
110
    def test_mmapwrapper(self):
 
111
        self.assertRaises(TypeError, stream.mmapwrapper)
 
112
        self.assertRaises(TypeError, stream.mmapwrapper, offset = 0)
 
113
        self.assertRaises(TypeError, stream.mmapwrapper, offset = None)
 
114
 
 
115
    if not stream.mmap:
 
116
        test_mmapwrapper.skip = 'mmap not supported here'
 
117
            
 
118
class MemoryStreamTest(SimpleStreamTests, unittest.TestCase):
 
119
    def makeStream(self, *args, **kw):
 
120
        return stream.MemoryStream(self.text, *args, **kw)
 
121
 
 
122
    def test_close(self):
 
123
        s = self.makeStream()
 
124
        s.close()
 
125
        self.assertEquals(s.length, 0)
 
126
 
 
127
    def test_read2(self):
 
128
        self.assertRaises(ValueError, self.makeStream, 0, 20)
 
129
 
 
130
 
 
131
testdata = """I was angry with my friend:
 
132
I told my wrath, my wrath did end.
 
133
I was angry with my foe:
 
134
I told it not, my wrath did grow.
 
135
 
 
136
And I water'd it in fears,
 
137
Night and morning with my tears;
 
138
And I sunned it with smiles,
 
139
And with soft deceitful wiles.
 
140
 
 
141
And it grew both day and night,
 
142
Till it bore an apple bright;
 
143
And my foe beheld it shine,
 
144
And he knew that is was mine,
 
145
 
 
146
And into my garden stole
 
147
When the night had veil'd the pole:
 
148
In the morning glad I see
 
149
My foe outstretch'd beneath the tree"""
 
150
 
 
151
class TestSubstream(unittest.TestCase):
 
152
    
 
153
    def setUp(self):
 
154
        self.data = testdata
 
155
        self.s = stream.MemoryStream(self.data)
 
156
 
 
157
    def suckTheMarrow(self, s):
 
158
        return ''.join(map(str, list(iter(s.read, None))))
 
159
 
 
160
    def testStart(self):
 
161
        s = stream.substream(self.s, 0, 11)
 
162
        self.assertEquals('I was angry', self.suckTheMarrow(s))
 
163
 
 
164
    def testNotStart(self):
 
165
        s = stream.substream(self.s, 12, 26)
 
166
        self.assertEquals('with my friend', self.suckTheMarrow(s))
 
167
 
 
168
    def testReverseStartEnd(self):
 
169
        self.assertRaises(ValueError, stream.substream, self.s, 26, 12)
 
170
 
 
171
    def testEmptySubstream(self):
 
172
        s = stream.substream(self.s, 11, 11)
 
173
        self.assertEquals('', self.suckTheMarrow(s))
 
174
 
 
175
    def testEnd(self):
 
176
        size = len(self.data)
 
177
        s = stream.substream(self.s, size-4, size)
 
178
        self.assertEquals('tree', self.suckTheMarrow(s))
 
179
 
 
180
    def testPastEnd(self):
 
181
        size = len(self.data)
 
182
        self.assertRaises(ValueError, stream.substream, self.s, size-4, size+8)
 
183
 
 
184
 
 
185
class TestBufferedStream(unittest.TestCase):
 
186
 
 
187
    def setUp(self):
 
188
        self.data = testdata.replace('\n', '\r\n')
 
189
        s = stream.MemoryStream(self.data)
 
190
        self.s = stream.BufferedStream(s)
 
191
 
 
192
    def _cbGotData(self, data, expected):
 
193
        self.assertEqual(data, expected)
 
194
 
 
195
    def test_readline(self):
 
196
        """Test that readline reads a line."""
 
197
        d = self.s.readline()
 
198
        d.addCallback(self._cbGotData, 'I was angry with my friend:\r\n')
 
199
        return d
 
200
 
 
201
    def test_readlineWithSize(self):
 
202
        """Test the size argument to readline"""
 
203
        d = self.s.readline(size = 5)
 
204
        d.addCallback(self._cbGotData, 'I was')
 
205
        return d
 
206
 
 
207
    def test_readlineWithBigSize(self):
 
208
        """Test the size argument when it's bigger than the length of the line."""
 
209
        d = self.s.readline(size = 40)
 
210
        d.addCallback(self._cbGotData, 'I was angry with my friend:\r\n')
 
211
        return d
 
212
 
 
213
    def test_readlineWithZero(self):
 
214
        """Test readline with size = 0."""
 
215
        d = self.s.readline(size = 0)
 
216
        d.addCallback(self._cbGotData, '')
 
217
        return d
 
218
 
 
219
    def test_readlineFinished(self):
 
220
        """Test readline on a finished stream."""
 
221
        nolines = len(self.data.split('\r\n'))
 
222
        for i in range(nolines):
 
223
            self.s.readline()
 
224
        d = self.s.readline()
 
225
        d.addCallback(self._cbGotData, '')
 
226
        return d
 
227
 
 
228
    def test_readlineNegSize(self):
 
229
        """Ensure that readline with a negative size raises an exception."""
 
230
        self.assertRaises(ValueError, self.s.readline, size = -1)
 
231
 
 
232
    def test_readlineSizeInDelimiter(self):
 
233
        """
 
234
        Test behavior of readline when size falls inside the
 
235
        delimiter.
 
236
        """
 
237
        d = self.s.readline(size=28)
 
238
        d.addCallback(self._cbGotData, "I was angry with my friend:\r")
 
239
        d.addCallback(lambda _: self.s.readline())
 
240
        d.addCallback(self._cbGotData, "\nI told my wrath, my wrath did end.\r\n")
 
241
        
 
242
    def test_readExactly(self):
 
243
        """Make sure readExactly with no arg reads all the data."""
 
244
        d = self.s.readExactly()
 
245
        d.addCallback(self._cbGotData, self.data)
 
246
        return d
 
247
 
 
248
    def test_readExactly(self):
 
249
        """Test readExactly with a number."""
 
250
        d = self.s.readExactly(10)
 
251
        d.addCallback(self._cbGotData, self.data[:10])
 
252
        return d
 
253
 
 
254
    def test_readExactlyBig(self):
 
255
        """
 
256
        Test readExactly with a number larger than the size of the
 
257
        datastream.
 
258
        """
 
259
        d = self.s.readExactly(100000)
 
260
        d.addCallback(self._cbGotData, self.data)
 
261
        return d
 
262
 
 
263
    def test_read(self):
 
264
        """
 
265
        Make sure read() also functions. (note that this test uses
 
266
        an implementation detail of this particular stream. s.read()
 
267
        isn't guaranteed to return self.data on all streams.)
 
268
        """
 
269
        self.assertEqual(str(self.s.read()), self.data)
 
270
 
 
271
class TestStreamer:
 
272
    implements(stream.IStream, stream.IByteStream)
 
273
 
 
274
    length = None
 
275
 
 
276
    readCalled=0
 
277
    closeCalled=0
 
278
    
 
279
    def __init__(self, list):
 
280
        self.list = list
 
281
        
 
282
    def read(self):
 
283
        self.readCalled+=1
 
284
        if self.list:
 
285
            return self.list.pop(0)
 
286
        return None
 
287
 
 
288
    def close(self):
 
289
        self.closeCalled+=1
 
290
        self.list = []
 
291
        
 
292
class FallbackSplitTest(unittest.TestCase):
 
293
    def test_split(self):
 
294
        s = TestStreamer(['abcd', defer.succeed('efgh'), 'ijkl'])
 
295
        left,right = stream.fallbackSplit(s, 5)
 
296
        self.assertEquals(left.length, 5)
 
297
        self.assertEquals(right.length, None)
 
298
        self.assertEquals(bufstr(left.read()), 'abcd')
 
299
        d = left.read()
 
300
        d.addCallback(self._cbSplit, left, right)
 
301
        return d
 
302
 
 
303
    def _cbSplit(self, result, left, right):
 
304
        self.assertEquals(bufstr(result), 'e')
 
305
        self.assertEquals(left.read(), None)
 
306
 
 
307
        self.assertEquals(bufstr(right.read().result), 'fgh')
 
308
        self.assertEquals(bufstr(right.read()), 'ijkl')
 
309
        self.assertEquals(right.read(), None)
 
310
 
 
311
    def test_split2(self):
 
312
        s = TestStreamer(['abcd', defer.succeed('efgh'), 'ijkl'])
 
313
        left,right = stream.fallbackSplit(s, 4)
 
314
        
 
315
        self.assertEquals(left.length, 4)
 
316
        self.assertEquals(right.length, None)
 
317
        
 
318
        self.assertEquals(bufstr(left.read()), 'abcd')
 
319
        self.assertEquals(left.read(), None)
 
320
 
 
321
        self.assertEquals(bufstr(right.read().result), 'efgh')
 
322
        self.assertEquals(bufstr(right.read()), 'ijkl')
 
323
        self.assertEquals(right.read(), None)
 
324
 
 
325
    def test_splitsplit(self):
 
326
        s = TestStreamer(['abcd', defer.succeed('efgh'), 'ijkl'])
 
327
        left,right = stream.fallbackSplit(s, 5)
 
328
        left,middle = left.split(3)
 
329
        
 
330
        self.assertEquals(left.length, 3)
 
331
        self.assertEquals(middle.length, 2)
 
332
        self.assertEquals(right.length, None)
 
333
        
 
334
        self.assertEquals(bufstr(left.read()), 'abc')
 
335
        self.assertEquals(left.read(), None)
 
336
 
 
337
        self.assertEquals(bufstr(middle.read().result), 'd')
 
338
        self.assertEquals(bufstr(middle.read().result), 'e')
 
339
        self.assertEquals(middle.read(), None)
 
340
 
 
341
        self.assertEquals(bufstr(right.read().result), 'fgh')
 
342
        self.assertEquals(bufstr(right.read()), 'ijkl')
 
343
        self.assertEquals(right.read(), None)
 
344
 
 
345
    def test_closeboth(self):
 
346
        s = TestStreamer(['abcd', defer.succeed('efgh'), 'ijkl'])
 
347
        left,right = stream.fallbackSplit(s, 5)
 
348
        left.close()
 
349
        self.assertEquals(s.closeCalled, 0)
 
350
        right.close()
 
351
 
 
352
        # Make sure nothing got read
 
353
        self.assertEquals(s.readCalled, 0)
 
354
        self.assertEquals(s.closeCalled, 1)
 
355
 
 
356
    def test_closeboth_rev(self):
 
357
        s = TestStreamer(['abcd', defer.succeed('efgh'), 'ijkl'])
 
358
        left,right = stream.fallbackSplit(s, 5)
 
359
        right.close()
 
360
        self.assertEquals(s.closeCalled, 0)
 
361
        left.close()
 
362
 
 
363
        # Make sure nothing got read
 
364
        self.assertEquals(s.readCalled, 0)
 
365
        self.assertEquals(s.closeCalled, 1)
 
366
 
 
367
    def test_closeleft(self):
 
368
        s = TestStreamer(['abcd', defer.succeed('efgh'), 'ijkl'])
 
369
        left,right = stream.fallbackSplit(s, 5)
 
370
        left.close()
 
371
        d = right.read()
 
372
        d.addCallback(self._cbCloseleft, right)
 
373
        return d
 
374
 
 
375
    def _cbCloseleft(self, result, right):
 
376
        self.assertEquals(bufstr(result), 'fgh')
 
377
        self.assertEquals(bufstr(right.read()), 'ijkl')
 
378
        self.assertEquals(right.read(), None)
 
379
 
 
380
    def test_closeright(self):
 
381
        s = TestStreamer(['abcd', defer.succeed('efgh'), 'ijkl'])
 
382
        left,right = stream.fallbackSplit(s, 3)
 
383
        right.close()
 
384
 
 
385
        self.assertEquals(bufstr(left.read()), 'abc')
 
386
        self.assertEquals(left.read(), None)
 
387
        
 
388
        self.assertEquals(s.closeCalled, 1)
 
389
 
 
390
 
 
391
class ProcessStreamerTest(unittest.TestCase):
 
392
 
 
393
    if interfaces.IReactorProcess(reactor, None) is None:
 
394
        skip = "Platform lacks spawnProcess support, can't test process streaming."
 
395
 
 
396
    def runCode(self, code, inputStream=None):
 
397
        if inputStream is None:
 
398
            inputStream = stream.MemoryStream("")
 
399
        return stream.ProcessStreamer(inputStream, sys.executable,
 
400
                                      [sys.executable, "-u", "-c", code],
 
401
                                      os.environ)
 
402
 
 
403
    def test_output(self):
 
404
        p = self.runCode("import sys\nfor i in range(100): sys.stdout.write('x' * 1000)")
 
405
        l = []
 
406
        d = stream.readStream(p.outStream, l.append)
 
407
        def verify(_):
 
408
            self.assertEquals("".join(l), ("x" * 1000) * 100)
 
409
        d2 = p.run()
 
410
        return d.addCallback(verify).addCallback(lambda _: d2)
 
411
 
 
412
    def test_errouput(self):
 
413
        p = self.runCode("import sys\nfor i in range(100): sys.stderr.write('x' * 1000)")
 
414
        l = []
 
415
        d = stream.readStream(p.errStream, l.append)
 
416
        def verify(_):
 
417
            self.assertEquals("".join(l), ("x" * 1000) * 100)
 
418
        p.run()
 
419
        return d.addCallback(verify)
 
420
 
 
421
    def test_input(self):
 
422
        p = self.runCode("import sys\nsys.stdout.write(sys.stdin.read())",
 
423
                         "hello world")
 
424
        l = []
 
425
        d = stream.readStream(p.outStream, l.append)
 
426
        d2 = p.run()
 
427
        def verify(_):
 
428
            self.assertEquals("".join(l), "hello world")
 
429
            return d2
 
430
        return d.addCallback(verify)
 
431
 
 
432
    def test_badexit(self):
 
433
        p = self.runCode("raise ValueError")
 
434
        l = []
 
435
        from twisted.internet.error import ProcessTerminated
 
436
        def verify(_):
 
437
            self.assertEquals(l, [1])
 
438
            self.assert_(p.outStream.closed)
 
439
            self.assert_(p.errStream.closed)
 
440
        return p.run().addErrback(lambda _: _.trap(ProcessTerminated) and l.append(1)).addCallback(verify)
 
441
 
 
442
    def test_inputerror(self):
 
443
        p = self.runCode("import sys\nsys.stdout.write(sys.stdin.read())",
 
444
                         TestStreamer(["hello", defer.fail(ZeroDivisionError())]))
 
445
        l = []
 
446
        d = stream.readStream(p.outStream, l.append)
 
447
        d2 = p.run()
 
448
        def verify(_):
 
449
            self.assertEquals("".join(l), "hello")
 
450
            return d2
 
451
        return d.addCallback(verify).addCallback(lambda _: log.flushErrors(ZeroDivisionError))
 
452
 
 
453
    def test_processclosedinput(self):
 
454
        p = self.runCode("import sys; sys.stdout.write(sys.stdin.read(3));" +
 
455
                         "sys.stdin.close(); sys.stdout.write('def')",
 
456
                         "abc123")
 
457
        l = []
 
458
        d = stream.readStream(p.outStream, l.append)
 
459
        def verify(_):
 
460
            self.assertEquals("".join(l), "abcdef")
 
461
        d2 = p.run()
 
462
        return d.addCallback(verify).addCallback(lambda _: d2)
 
463
 
 
464
 
 
465
class AdapterTestCase(unittest.TestCase):
 
466
 
 
467
    def test_adapt(self):
 
468
        fName = self.mktemp()
 
469
        f = file(fName, "w")
 
470
        f.write("test")
 
471
        f.close()
 
472
        for i in ("test", buffer("test"), file(fName)):
 
473
            s = stream.IByteStream(i)
 
474
            self.assertEquals(str(s.read()), "test")
 
475
            self.assertEquals(s.read(), None)
 
476
 
 
477
 
 
478
class ReadStreamTestCase(unittest.TestCase):
 
479
 
 
480
    def test_pull(self):
 
481
        l = []
 
482
        s = TestStreamer(['abcd', defer.succeed('efgh'), 'ijkl'])
 
483
        return readStream(s, l.append).addCallback(
 
484
            lambda _: self.assertEquals(l, ["abcd", "efgh", "ijkl"]))
 
485
        
 
486
    def test_pullFailure(self):
 
487
        l = []
 
488
        s = TestStreamer(['abcd', defer.fail(RuntimeError()), 'ijkl'])
 
489
        def test(result):
 
490
            result.trap(RuntimeError)
 
491
            self.assertEquals(l, ["abcd"])
 
492
        return readStream(s, l.append).addErrback(test)
 
493
    
 
494
    def test_pullException(self):
 
495
        class Failer:
 
496
            def read(self): raise RuntimeError
 
497
        return readStream(Failer(), lambda _: None).addErrback(lambda _: _.trap(RuntimeError))
 
498
 
 
499
    def test_processingException(self):
 
500
        s = TestStreamer(['abcd', defer.succeed('efgh'), 'ijkl'])
 
501
        return readStream(s, lambda x: 1/0).addErrback(lambda _: _.trap(ZeroDivisionError))
 
502
 
 
503
 
 
504
class ProducerStreamTestCase(unittest.TestCase):
 
505
 
 
506
    def test_failfinish(self):
 
507
        p = stream.ProducerStream()
 
508
        p.write("hello")
 
509
        p.finish(RuntimeError())
 
510
        self.assertEquals(p.read(), "hello")
 
511
        d = p.read()
 
512
        l = []
 
513
        d.addErrback(lambda _: (l.append(1), _.trap(RuntimeError))).addCallback(
 
514
            lambda _: self.assertEquals(l, [1]))
 
515
        return d
 
516
 
 
517
 
 
518
from twisted.web2.stream import *
 
519
class CompoundStreamTest:
 
520
    """
 
521
    CompoundStream lets you combine many streams into one continuous stream.
 
522
    For example, let's make a stream:
 
523
    >>> s = CompoundStream()
 
524
    
 
525
    Then, add a couple streams:
 
526
    >>> s.addStream(MemoryStream("Stream1"))
 
527
    >>> s.addStream(MemoryStream("Stream2"))
 
528
    
 
529
    The length is the sum of all the streams:
 
530
    >>> s.length
 
531
    14
 
532
    
 
533
    We can read data from the stream:
 
534
    >>> str(s.read())
 
535
    'Stream1'
 
536
 
 
537
    After having read some data, length is now smaller, as you might expect:
 
538
    >>> s.length
 
539
    7
 
540
 
 
541
    So, continue reading...
 
542
    >>> str(s.read())
 
543
    'Stream2'
 
544
 
 
545
    Now that the stream is exhausted:
 
546
    >>> s.read() is None
 
547
    True
 
548
    >>> s.length
 
549
    0
 
550
 
 
551
    We can also create CompoundStream more easily like so:
 
552
    >>> s = CompoundStream(['hello', MemoryStream(' world')])
 
553
    >>> str(s.read())
 
554
    'hello'
 
555
    >>> str(s.read())
 
556
    ' world'
 
557
    
 
558
    For a more complicated example, let's try reading from a file:
 
559
    >>> s = CompoundStream()
 
560
    >>> s.addStream(FileStream(open(sibpath(__file__, "stream_data.txt"))))
 
561
    >>> s.addStream("================")
 
562
    >>> s.addStream(FileStream(open(sibpath(__file__, "stream_data.txt"))))
 
563
 
 
564
    Again, the length is the sum:
 
565
    >>> int(s.length)
 
566
    58
 
567
    
 
568
    >>> str(s.read())
 
569
    "We've got some text!\\n"
 
570
    >>> str(s.read())
 
571
    '================'
 
572
    
 
573
    What if you close the stream?
 
574
    >>> s.close()
 
575
    >>> s.read() is None
 
576
    True
 
577
    >>> s.length
 
578
    0
 
579
 
 
580
    Error handling works using Deferreds:
 
581
    >>> m = MemoryStream("after")
 
582
    >>> s = CompoundStream([TestStreamer([defer.fail(ZeroDivisionError())]), m])
 
583
    >>> l = []; x = s.read().addErrback(lambda _: l.append(1))
 
584
    >>> l
 
585
    [1]
 
586
    >>> s.length
 
587
    0
 
588
    >>> m.length # streams after the failed one got closed
 
589
    0
 
590
 
 
591
    """
 
592
 
 
593
 
 
594
__doctests__ = ['twisted.web2.test.test_stream', 'twisted.web2.stream']
 
595
# TODO: 
 
596
# CompoundStreamTest
 
597
# more tests for ProducerStreamTest
 
598
# StreamProducerTest