1
import tempfile, operator, sys, os
3
from twisted.trial import unittest
4
from twisted.internet import reactor, defer, interfaces
5
from twisted.python import log
6
from zope.interface import Interface, Attribute, implements
8
from twisted.python.util import sibpath
9
from twisted.web2 import stream
13
return str(buffer(data))
15
raise TypeError("%s doesn't conform to the buffer interface" % (data,))
18
class SimpleStreamTests:
21
for point in range(10):
22
s = self.makeStream(0)
25
self.assertEquals(bufstr(a.read()), self.text[:point])
26
self.assertEquals(a.read(), None)
27
if point < len(self.text):
28
self.assertEquals(bufstr(b.read()), self.text[point:])
29
self.assertEquals(b.read(), None)
31
for point in range(7):
32
s = self.makeStream(2, 6)
33
self.assertEquals(s.length, 6)
36
self.assertEquals(bufstr(a.read()), self.text[2:point+2])
37
self.assertEquals(a.read(), None)
39
self.assertEquals(bufstr(b.read()), self.text[point+2:8])
40
self.assertEquals(b.read(), None)
44
self.assertEquals(s.length, len(self.text))
45
self.assertEquals(bufstr(s.read()), self.text)
46
self.assertEquals(s.read(), None)
48
s = self.makeStream(0, 4)
49
self.assertEquals(s.length, 4)
50
self.assertEquals(bufstr(s.read()), self.text[0:4])
51
self.assertEquals(s.read(), None)
52
self.assertEquals(s.length, 0)
54
s = self.makeStream(4, 6)
55
self.assertEquals(s.length, 6)
56
self.assertEquals(bufstr(s.read()), self.text[4:10])
57
self.assertEquals(s.read(), None)
58
self.assertEquals(s.length, 0)
60
class FileStreamTest(SimpleStreamTests, unittest.TestCase):
61
def makeStream(self, *args, **kw):
62
return stream.FileStream(self.f, *args, **kw)
65
f = tempfile.TemporaryFile('w+')
74
self.assertEquals(s.length, 0)
75
# Make sure close doesn't close file
76
# would raise exception if f is closed
80
s = self.makeStream(0)
82
self.assertEquals(s.length, 10)
83
self.assertEquals(bufstr(s.read()), self.text[0:6])
84
self.assertEquals(bufstr(s.read()), self.text[6:10])
85
self.assertEquals(s.read(), None)
87
s = self.makeStream(0)
89
self.assertEquals(s.length, 10)
90
self.assertEquals(bufstr(s.read()), self.text[0:5])
91
self.assertEquals(bufstr(s.read()), self.text[5:10])
92
self.assertEquals(s.read(), None)
94
s = self.makeStream(0, 20)
95
self.assertEquals(s.length, 20)
96
self.assertEquals(bufstr(s.read()), self.text)
97
self.assertRaises(RuntimeError, s.read) # ran out of data
99
class MMapFileStreamTest(SimpleStreamTests, unittest.TestCase):
100
def makeStream(self, *args, **kw):
101
return stream.FileStream(self.f, *args, **kw)
103
def setUpClass(self):
104
f = tempfile.TemporaryFile('w+')
105
self.text = self.text*(stream.MMAP_THRESHOLD//len(self.text) + 1)
110
def test_mmapwrapper(self):
111
self.assertRaises(TypeError, stream.mmapwrapper)
112
self.assertRaises(TypeError, stream.mmapwrapper, offset = 0)
113
self.assertRaises(TypeError, stream.mmapwrapper, offset = None)
116
test_mmapwrapper.skip = 'mmap not supported here'
118
class MemoryStreamTest(SimpleStreamTests, unittest.TestCase):
119
def makeStream(self, *args, **kw):
120
return stream.MemoryStream(self.text, *args, **kw)
122
def test_close(self):
123
s = self.makeStream()
125
self.assertEquals(s.length, 0)
127
def test_read2(self):
128
self.assertRaises(ValueError, self.makeStream, 0, 20)
131
testdata = """I was angry with my friend:
132
I told my wrath, my wrath did end.
133
I was angry with my foe:
134
I told it not, my wrath did grow.
136
And I water'd it in fears,
137
Night and morning with my tears;
138
And I sunned it with smiles,
139
And with soft deceitful wiles.
141
And it grew both day and night,
142
Till it bore an apple bright;
143
And my foe beheld it shine,
144
And he knew that is was mine,
146
And into my garden stole
147
When the night had veil'd the pole:
148
In the morning glad I see
149
My foe outstretch'd beneath the tree"""
151
class TestSubstream(unittest.TestCase):
155
self.s = stream.MemoryStream(self.data)
157
def suckTheMarrow(self, s):
158
return ''.join(map(str, list(iter(s.read, None))))
161
s = stream.substream(self.s, 0, 11)
162
self.assertEquals('I was angry', self.suckTheMarrow(s))
164
def testNotStart(self):
165
s = stream.substream(self.s, 12, 26)
166
self.assertEquals('with my friend', self.suckTheMarrow(s))
168
def testReverseStartEnd(self):
169
self.assertRaises(ValueError, stream.substream, self.s, 26, 12)
171
def testEmptySubstream(self):
172
s = stream.substream(self.s, 11, 11)
173
self.assertEquals('', self.suckTheMarrow(s))
176
size = len(self.data)
177
s = stream.substream(self.s, size-4, size)
178
self.assertEquals('tree', self.suckTheMarrow(s))
180
def testPastEnd(self):
181
size = len(self.data)
182
self.assertRaises(ValueError, stream.substream, self.s, size-4, size+8)
185
class TestBufferedStream(unittest.TestCase):
188
self.data = testdata.replace('\n', '\r\n')
189
s = stream.MemoryStream(self.data)
190
self.s = stream.BufferedStream(s)
192
def _cbGotData(self, data, expected):
193
self.assertEqual(data, expected)
195
def test_readline(self):
196
"""Test that readline reads a line."""
197
d = self.s.readline()
198
d.addCallback(self._cbGotData, 'I was angry with my friend:\r\n')
201
def test_readlineWithSize(self):
202
"""Test the size argument to readline"""
203
d = self.s.readline(size = 5)
204
d.addCallback(self._cbGotData, 'I was')
207
def test_readlineWithBigSize(self):
208
"""Test the size argument when it's bigger than the length of the line."""
209
d = self.s.readline(size = 40)
210
d.addCallback(self._cbGotData, 'I was angry with my friend:\r\n')
213
def test_readlineWithZero(self):
214
"""Test readline with size = 0."""
215
d = self.s.readline(size = 0)
216
d.addCallback(self._cbGotData, '')
219
def test_readlineFinished(self):
220
"""Test readline on a finished stream."""
221
nolines = len(self.data.split('\r\n'))
222
for i in range(nolines):
224
d = self.s.readline()
225
d.addCallback(self._cbGotData, '')
228
def test_readlineNegSize(self):
229
"""Ensure that readline with a negative size raises an exception."""
230
self.assertRaises(ValueError, self.s.readline, size = -1)
232
def test_readlineSizeInDelimiter(self):
234
Test behavior of readline when size falls inside the
237
d = self.s.readline(size=28)
238
d.addCallback(self._cbGotData, "I was angry with my friend:\r")
239
d.addCallback(lambda _: self.s.readline())
240
d.addCallback(self._cbGotData, "\nI told my wrath, my wrath did end.\r\n")
242
def test_readExactly(self):
243
"""Make sure readExactly with no arg reads all the data."""
244
d = self.s.readExactly()
245
d.addCallback(self._cbGotData, self.data)
248
def test_readExactly(self):
249
"""Test readExactly with a number."""
250
d = self.s.readExactly(10)
251
d.addCallback(self._cbGotData, self.data[:10])
254
def test_readExactlyBig(self):
256
Test readExactly with a number larger than the size of the
259
d = self.s.readExactly(100000)
260
d.addCallback(self._cbGotData, self.data)
265
Make sure read() also functions. (note that this test uses
266
an implementation detail of this particular stream. s.read()
267
isn't guaranteed to return self.data on all streams.)
269
self.assertEqual(str(self.s.read()), self.data)
272
implements(stream.IStream, stream.IByteStream)
279
def __init__(self, list):
285
return self.list.pop(0)
292
class FallbackSplitTest(unittest.TestCase):
293
def test_split(self):
294
s = TestStreamer(['abcd', defer.succeed('efgh'), 'ijkl'])
295
left,right = stream.fallbackSplit(s, 5)
296
self.assertEquals(left.length, 5)
297
self.assertEquals(right.length, None)
298
self.assertEquals(bufstr(left.read()), 'abcd')
300
d.addCallback(self._cbSplit, left, right)
303
def _cbSplit(self, result, left, right):
304
self.assertEquals(bufstr(result), 'e')
305
self.assertEquals(left.read(), None)
307
self.assertEquals(bufstr(right.read().result), 'fgh')
308
self.assertEquals(bufstr(right.read()), 'ijkl')
309
self.assertEquals(right.read(), None)
311
def test_split2(self):
312
s = TestStreamer(['abcd', defer.succeed('efgh'), 'ijkl'])
313
left,right = stream.fallbackSplit(s, 4)
315
self.assertEquals(left.length, 4)
316
self.assertEquals(right.length, None)
318
self.assertEquals(bufstr(left.read()), 'abcd')
319
self.assertEquals(left.read(), None)
321
self.assertEquals(bufstr(right.read().result), 'efgh')
322
self.assertEquals(bufstr(right.read()), 'ijkl')
323
self.assertEquals(right.read(), None)
325
def test_splitsplit(self):
326
s = TestStreamer(['abcd', defer.succeed('efgh'), 'ijkl'])
327
left,right = stream.fallbackSplit(s, 5)
328
left,middle = left.split(3)
330
self.assertEquals(left.length, 3)
331
self.assertEquals(middle.length, 2)
332
self.assertEquals(right.length, None)
334
self.assertEquals(bufstr(left.read()), 'abc')
335
self.assertEquals(left.read(), None)
337
self.assertEquals(bufstr(middle.read().result), 'd')
338
self.assertEquals(bufstr(middle.read().result), 'e')
339
self.assertEquals(middle.read(), None)
341
self.assertEquals(bufstr(right.read().result), 'fgh')
342
self.assertEquals(bufstr(right.read()), 'ijkl')
343
self.assertEquals(right.read(), None)
345
def test_closeboth(self):
346
s = TestStreamer(['abcd', defer.succeed('efgh'), 'ijkl'])
347
left,right = stream.fallbackSplit(s, 5)
349
self.assertEquals(s.closeCalled, 0)
352
# Make sure nothing got read
353
self.assertEquals(s.readCalled, 0)
354
self.assertEquals(s.closeCalled, 1)
356
def test_closeboth_rev(self):
357
s = TestStreamer(['abcd', defer.succeed('efgh'), 'ijkl'])
358
left,right = stream.fallbackSplit(s, 5)
360
self.assertEquals(s.closeCalled, 0)
363
# Make sure nothing got read
364
self.assertEquals(s.readCalled, 0)
365
self.assertEquals(s.closeCalled, 1)
367
def test_closeleft(self):
368
s = TestStreamer(['abcd', defer.succeed('efgh'), 'ijkl'])
369
left,right = stream.fallbackSplit(s, 5)
372
d.addCallback(self._cbCloseleft, right)
375
def _cbCloseleft(self, result, right):
376
self.assertEquals(bufstr(result), 'fgh')
377
self.assertEquals(bufstr(right.read()), 'ijkl')
378
self.assertEquals(right.read(), None)
380
def test_closeright(self):
381
s = TestStreamer(['abcd', defer.succeed('efgh'), 'ijkl'])
382
left,right = stream.fallbackSplit(s, 3)
385
self.assertEquals(bufstr(left.read()), 'abc')
386
self.assertEquals(left.read(), None)
388
self.assertEquals(s.closeCalled, 1)
391
class ProcessStreamerTest(unittest.TestCase):
393
if interfaces.IReactorProcess(reactor, None) is None:
394
skip = "Platform lacks spawnProcess support, can't test process streaming."
396
def runCode(self, code, inputStream=None):
397
if inputStream is None:
398
inputStream = stream.MemoryStream("")
399
return stream.ProcessStreamer(inputStream, sys.executable,
400
[sys.executable, "-u", "-c", code],
403
def test_output(self):
404
p = self.runCode("import sys\nfor i in range(100): sys.stdout.write('x' * 1000)")
406
d = stream.readStream(p.outStream, l.append)
408
self.assertEquals("".join(l), ("x" * 1000) * 100)
410
return d.addCallback(verify).addCallback(lambda _: d2)
412
def test_errouput(self):
413
p = self.runCode("import sys\nfor i in range(100): sys.stderr.write('x' * 1000)")
415
d = stream.readStream(p.errStream, l.append)
417
self.assertEquals("".join(l), ("x" * 1000) * 100)
419
return d.addCallback(verify)
421
def test_input(self):
422
p = self.runCode("import sys\nsys.stdout.write(sys.stdin.read())",
425
d = stream.readStream(p.outStream, l.append)
428
self.assertEquals("".join(l), "hello world")
430
return d.addCallback(verify)
432
def test_badexit(self):
433
p = self.runCode("raise ValueError")
435
from twisted.internet.error import ProcessTerminated
437
self.assertEquals(l, [1])
438
self.assert_(p.outStream.closed)
439
self.assert_(p.errStream.closed)
440
return p.run().addErrback(lambda _: _.trap(ProcessTerminated) and l.append(1)).addCallback(verify)
442
def test_inputerror(self):
443
p = self.runCode("import sys\nsys.stdout.write(sys.stdin.read())",
444
TestStreamer(["hello", defer.fail(ZeroDivisionError())]))
446
d = stream.readStream(p.outStream, l.append)
449
self.assertEquals("".join(l), "hello")
451
return d.addCallback(verify).addCallback(lambda _: log.flushErrors(ZeroDivisionError))
453
def test_processclosedinput(self):
454
p = self.runCode("import sys; sys.stdout.write(sys.stdin.read(3));" +
455
"sys.stdin.close(); sys.stdout.write('def')",
458
d = stream.readStream(p.outStream, l.append)
460
self.assertEquals("".join(l), "abcdef")
462
return d.addCallback(verify).addCallback(lambda _: d2)
465
class AdapterTestCase(unittest.TestCase):
467
def test_adapt(self):
468
fName = self.mktemp()
472
for i in ("test", buffer("test"), file(fName)):
473
s = stream.IByteStream(i)
474
self.assertEquals(str(s.read()), "test")
475
self.assertEquals(s.read(), None)
478
class ReadStreamTestCase(unittest.TestCase):
482
s = TestStreamer(['abcd', defer.succeed('efgh'), 'ijkl'])
483
return readStream(s, l.append).addCallback(
484
lambda _: self.assertEquals(l, ["abcd", "efgh", "ijkl"]))
486
def test_pullFailure(self):
488
s = TestStreamer(['abcd', defer.fail(RuntimeError()), 'ijkl'])
490
result.trap(RuntimeError)
491
self.assertEquals(l, ["abcd"])
492
return readStream(s, l.append).addErrback(test)
494
def test_pullException(self):
496
def read(self): raise RuntimeError
497
return readStream(Failer(), lambda _: None).addErrback(lambda _: _.trap(RuntimeError))
499
def test_processingException(self):
500
s = TestStreamer(['abcd', defer.succeed('efgh'), 'ijkl'])
501
return readStream(s, lambda x: 1/0).addErrback(lambda _: _.trap(ZeroDivisionError))
504
class ProducerStreamTestCase(unittest.TestCase):
506
def test_failfinish(self):
507
p = stream.ProducerStream()
509
p.finish(RuntimeError())
510
self.assertEquals(p.read(), "hello")
513
d.addErrback(lambda _: (l.append(1), _.trap(RuntimeError))).addCallback(
514
lambda _: self.assertEquals(l, [1]))
518
from twisted.web2.stream import *
519
class CompoundStreamTest:
521
CompoundStream lets you combine many streams into one continuous stream.
522
For example, let's make a stream:
523
>>> s = CompoundStream()
525
Then, add a couple streams:
526
>>> s.addStream(MemoryStream("Stream1"))
527
>>> s.addStream(MemoryStream("Stream2"))
529
The length is the sum of all the streams:
533
We can read data from the stream:
537
After having read some data, length is now smaller, as you might expect:
541
So, continue reading...
545
Now that the stream is exhausted:
551
We can also create CompoundStream more easily like so:
552
>>> s = CompoundStream(['hello', MemoryStream(' world')])
558
For a more complicated example, let's try reading from a file:
559
>>> s = CompoundStream()
560
>>> s.addStream(FileStream(open(sibpath(__file__, "stream_data.txt"))))
561
>>> s.addStream("================")
562
>>> s.addStream(FileStream(open(sibpath(__file__, "stream_data.txt"))))
564
Again, the length is the sum:
569
"We've got some text!\\n"
573
What if you close the stream?
580
Error handling works using Deferreds:
581
>>> m = MemoryStream("after")
582
>>> s = CompoundStream([TestStreamer([defer.fail(ZeroDivisionError())]), m])
583
>>> l = []; x = s.read().addErrback(lambda _: l.append(1))
588
>>> m.length # streams after the failed one got closed
594
__doctests__ = ['twisted.web2.test.test_stream', 'twisted.web2.stream']
597
# more tests for ProducerStreamTest