1
# -*- test-case-name: twisted.web2.test.test_stream -*-
4
The stream module provides a simple abstraction of streaming
5
data. While Twisted already has some provisions for handling this in
6
its Producer/Consumer model, the rather complex interactions between
7
producer and consumer makes it difficult to implement something like
8
the CompoundStream object. Thus, this API.
10
The IStream interface is very simple. It consists of two methods:
11
read, and close. The read method should either return some data, None
12
if there is no data left to read, or a Deferred. Close frees up any
13
underlying resources and causes read to return None forevermore.
15
IByteStream adds a bit more to the API:
16
1) read is required to return objects conforming to the buffer interface.
17
2) .length, which may either an integer number of bytes remaining, or
19
3) .split(position). Split takes a position, and splits the
20
stream in two pieces, returning the two new streams. Using the
21
original stream after calling split is not allowed.
23
There are two builtin source stream classes: FileStream and
24
MemoryStream. The first produces data from a file object, the second
25
from a buffer in memory. Any number of these can be combined into one
26
stream with the CompoundStream object. Then, to interface with other
27
parts of Twisted, there are two transcievers: StreamProducer and
28
ProducerStream. The first takes a stream and turns it into an
29
IPushProducer, which will write to a consumer. The second is a
30
consumer which is a stream, so that other producers can write to it.
33
from __future__ import generators
35
import copy, os, types, sys
36
from zope.interface import Interface, Attribute, implements
37
from twisted.internet.defer import Deferred
38
from twisted.internet import interfaces as ti_interfaces, defer, reactor, protocol, error as ti_error
39
from twisted.python import components, log
40
from twisted.python.failure import Failure
42
# Python 2.4.2 (only) has a broken mmap that leaks a fd every time you call it.
43
if sys.version_info[0:3] != (2,4,2):
51
##############################
53
##############################
55
class IStream(Interface):
56
"""A stream of arbitrary data."""
61
Returns some object representing the data.
62
If there is no more data available, returns None.
63
Can also return a Deferred resulting in one of the above.
65
Errors may be indicated by exception or by a Deferred of a Failure.
69
"""Prematurely close. Should also cause further reads to
72
class IByteStream(IStream):
73
"""A stream which is of bytes."""
75
length = Attribute("""How much data is in this stream. Can be None if unknown.""")
80
Returns an object conforming to the buffer interface, or
81
if there is no more data available, returns None.
82
Can also return a Deferred resulting in one of the above.
84
Errors may be indicated by exception or by a Deferred of a Failure.
87
"""Split this stream into two, at byte position 'point'.
89
Returns a tuple of (before, after). After calling split, no other
90
methods should be called on this stream. Doing so will have undefined
93
If you cannot implement split easily, you may implement it as::
95
return fallbackSplit(self, point)
99
"""Prematurely close this stream. Should also cause further reads to
100
return None. Additionally, .length should be set to 0.
103
class ISendfileableStream(Interface):
104
def read(sendfile=False):
107
If sendfile == False, returns an object conforming to the buffer
108
interface, or else a Deferred.
110
If sendfile == True, returns either the above, or a SendfileBuffer.
113
class SimpleStream(object):
114
"""Superclass of simple streams with a single buffer and a offset and length
116
implements(IByteStream)
127
def split(self, point):
128
if self.length is not None:
129
if point > self.length:
130
raise ValueError("split point (%d) > length (%d)" % (point, self.length))
133
if b.length is not None:
138
##############################
140
##############################
143
MMAP_LIMIT = 4*1024*1024
145
MMAP_THRESHOLD = 8*1024
147
# maximum sendfile length
148
SENDFILE_LIMIT = 16777216
149
# minimum sendfile size
150
SENDFILE_THRESHOLD = 256
152
def mmapwrapper(*args, **kwargs):
154
Python's mmap call sucks and ommitted the "offset" argument for no
155
discernable reason. Replace this with a mmap module that has offset.
158
offset = kwargs.get('offset', None)
159
if offset in [None, 0]:
160
if 'offset' in kwargs:
163
raise mmap.error("mmap: Python sucks and does not support offset.")
164
return mmap.mmap(*args, **kwargs)
166
class FileStream(SimpleStream):
167
implements(ISendfileableStream)
168
"""A stream that reads data from a file. File must be a normal
169
file that supports seek, (e.g. not a pipe or device or socket)."""
170
# 65K, minus some slack
171
CHUNK_SIZE = 2 ** 2 ** 2 ** 2 - 32
174
def __init__(self, f, start=0, length=None, useMMap=bool(mmap)):
176
Create the stream from file f. If you specify start and length,
177
use only that portion of the file.
182
self.length = os.fstat(f.fileno()).st_size
185
self.useMMap = useMMap
187
def read(self, sendfile=False):
196
if sendfile and length > SENDFILE_THRESHOLD:
197
# XXX: Yay using non-existent sendfile support!
198
# FIXME: if we return a SendfileBuffer, and then sendfile
199
# fails, then what? Or, what if file is too short?
200
readSize = min(length, SENDFILE_LIMIT)
201
res = SendfileBuffer(self.f, self.start, readSize)
202
self.length -= readSize
203
self.start += readSize
206
if self.useMMap and length > MMAP_THRESHOLD:
207
readSize = min(length, MMAP_LIMIT)
209
res = mmapwrapper(self.f.fileno(), readSize,
210
access=mmap.ACCESS_READ, offset=self.start)
211
#madvise(res, MADV_SEQUENTIAL)
212
self.length -= readSize
213
self.start += readSize
218
# Fall back to standard read.
219
readSize = min(length, self.CHUNK_SIZE)
221
self.f.seek(self.start)
222
b = self.f.read(readSize)
225
raise RuntimeError("Ran out of data reading file %r, expected %d more bytes" % (self.f, length))
227
self.length -= bytesRead
228
self.start += bytesRead
233
SimpleStream.close(self)
235
components.registerAdapter(FileStream, file, IByteStream)
237
##############################
238
#### MemoryStream ####
239
##############################
241
class MemoryStream(SimpleStream):
242
"""A stream that reads data from a buffer object."""
243
def __init__(self, mem, start=0, length=None):
245
Create the stream from buffer object mem. If you specify start and length,
246
use only that portion of the buffer.
251
self.length = len(mem) - start
253
if len(mem) < length:
254
raise ValueError("len(mem) < start + length")
263
result = buffer(self.mem, self.start, self.length)
270
SimpleStream.close(self)
272
components.registerAdapter(MemoryStream, str, IByteStream)
273
components.registerAdapter(MemoryStream, types.BufferType, IByteStream)
275
##############################
276
#### CompoundStream ####
277
##############################
279
class CompoundStream(object):
280
"""A stream which is composed of many other streams.
282
Call addStream to add substreams.
285
implements(IByteStream, ISendfileableStream)
289
def __init__(self, buckets=()):
290
self.buckets = [IByteStream(s) for s in buckets]
292
def addStream(self, bucket):
293
"""Add a stream to the output"""
294
bucket = IByteStream(bucket)
295
self.buckets.append(bucket)
296
if self.length is not None:
297
if bucket.length is None:
300
self.length += bucket.length
302
def read(self, sendfile=False):
303
if self.deferred is not None:
304
raise RuntimeError("Call to read while read is already outstanding")
309
if sendfile and ISendfileableStream.providedBy(self.buckets[0]):
311
result = self.buckets[0].read(sendfile)
313
return self._gotFailure(Failure())
316
result = self.buckets[0].read()
318
return self._gotFailure(Failure())
320
if isinstance(result, Deferred):
321
self.deferred = result
322
result.addCallbacks(self._gotRead, self._gotFailure, (sendfile,))
325
return self._gotRead(result, sendfile)
327
def _gotFailure(self, f):
333
def _gotRead(self, result, sendfile):
338
return self.read(sendfile)
340
if self.length is not None:
341
self.length -= len(result)
344
def split(self, point):
347
for bucket in self.buckets:
352
b.buckets = self.buckets[num:]
353
del self.buckets[num:]
356
if bucket.length is None:
357
# Indeterminate length bucket.
358
# give up and use fallback splitter.
359
return fallbackSplit(self, origPoint)
361
if point < bucket.length:
362
before,after = bucket.split(point)
364
b.buckets = self.buckets[num:]
367
del self.buckets[num+1:]
368
self.buckets[num] = before
371
point -= bucket.length
374
for bucket in self.buckets:
380
##############################
382
##############################
384
class _StreamReader(object):
385
"""Process a stream's data using callbacks for data and stream finish."""
387
def __init__(self, stream, gotDataCallback):
389
self.gotDataCallback = gotDataCallback
390
self.result = Deferred()
393
# self.result may be del'd in _read()
400
result = self.stream.read()
402
self._gotError(Failure())
404
if isinstance(result, Deferred):
405
result.addCallbacks(self._gotData, self._gotError)
407
self._gotData(result)
409
def _gotError(self, failure):
411
del self.result, self.gotDataCallback, self.stream
412
result.errback(failure)
414
def _gotData(self, data):
417
del self.result, self.gotDataCallback, self.stream
418
result.callback(None)
421
self.gotDataCallback(data)
423
self._gotError(Failure())
425
reactor.callLater(0, self._read)
427
def readStream(stream, gotDataCallback):
428
"""Pass a stream's data to a callback.
430
Returns Deferred which will be triggered on finish. Errors in
431
reading the stream or in processing it will be returned via this
434
return _StreamReader(stream, gotDataCallback).run()
437
def readAndDiscard(stream):
438
"""Read all the data from the given stream, and throw it out.
440
Returns Deferred which will be triggered on finish.
442
return readStream(stream, lambda _: None)
444
def readIntoFile(stream, outFile):
445
"""Read a stream and write it into a file.
447
Returns Deferred which will be triggered on finish.
452
return readStream(stream, outFile.write).addBoth(done)
454
def connectStream(inputStream, factory):
455
"""Connect a protocol constructed from a factory to stream.
457
Returns an output stream from the protocol.
459
The protocol's transport will have a finish() method it should
460
call when done writing.
462
# XXX deal better with addresses
463
p = factory.buildProtocol(None)
464
out = ProducerStream()
465
out.disconnecting = False # XXX for LineReceiver suckage
466
p.makeConnection(out)
467
readStream(inputStream, lambda _: p.dataReceived(_)).addCallbacks(
468
lambda _: p.connectionLost(ti_error.ConnectionDone()), lambda _: p.connectionLost(_))
471
##############################
472
#### fallbackSplit ####
473
##############################
475
def fallbackSplit(stream, point):
476
after = PostTruncaterStream(stream, point)
477
before = TruncaterStream(stream, point, after)
478
return (before, after)
480
class TruncaterStream(object):
481
def __init__(self, stream, point, postTruncater):
484
self.postTruncater = postTruncater
488
if self.postTruncater is not None:
489
postTruncater = self.postTruncater
490
self.postTruncater = None
491
postTruncater.sendInitialSegment(self.stream.read())
495
result = self.stream.read()
496
if isinstance(result, Deferred):
497
return result.addCallback(self._gotRead)
499
return self._gotRead(result)
501
def _gotRead(self, data):
503
raise ValueError("Ran out of data for a split of a indeterminate length source")
504
if self.length >= len(data):
505
self.length -= len(data)
508
before = buffer(data, 0, self.length)
509
after = buffer(data, self.length)
511
if self.postTruncater is not None:
512
postTruncater = self.postTruncater
513
self.postTruncater = None
514
postTruncater.sendInitialSegment(after)
518
def split(self, point):
519
if point > self.length:
520
raise ValueError("split point (%d) > length (%d)" % (point, self.length))
522
post = PostTruncaterStream(self.stream, point)
523
trunc = TruncaterStream(post, self.length - point, self.postTruncater)
525
self.postTruncater = post
529
if self.postTruncater is not None:
530
self.postTruncater.notifyClosed(self)
532
# Nothing cares about the rest of the stream
538
class PostTruncaterStream(object):
540
sentInitialSegment = False
541
truncaterClosed = None
545
def __init__(self, stream, point):
547
self.deferred = Deferred()
548
if stream.length is not None:
549
self.length = stream.length - point
552
if not self.sentInitialSegment:
553
self.sentInitialSegment = True
554
if self.truncaterClosed is not None:
555
readAndDiscard(self.truncaterClosed)
556
self.truncaterClosed = None
559
return self.stream.read()
561
def split(self, point):
562
return fallbackSplit(self, point)
566
if self.truncaterClosed is not None:
567
# have first half close itself
568
self.truncaterClosed.postTruncater = None
569
self.truncaterClosed.close()
570
elif self.sentInitialSegment:
571
# first half already finished up
576
# Callbacks from TruncaterStream
577
def sendInitialSegment(self, data):
579
# First half finished, we don't want data.
582
if self.deferred is not None:
583
if isinstance(data, Deferred):
584
data.chainDeferred(self.deferred)
586
self.deferred.callback(data)
588
def notifyClosed(self, truncater):
590
# we are closed, have first half really close
591
truncater.postTruncater = None
593
elif self.sentInitialSegment:
594
# We are trying to read, read up first half
595
readAndDiscard(truncater)
597
# Idle, store closed info.
598
self.truncaterClosed = truncater
600
########################################
601
#### ProducerStream/StreamProducer ####
602
########################################
604
class ProducerStream(object):
605
"""Turns producers into a IByteStream.
606
Thus, implements IConsumer and IByteStream."""
608
implements(IByteStream, ti_interfaces.IConsumer)
613
producerPaused = False
618
def __init__(self, length=None):
622
# IByteStream implementation
625
return self.buffer.pop(0)
634
deferred = self.deferred = Deferred()
635
if self.producer is not None and (not self.streamingProducer
636
or self.producerPaused):
637
self.producerPaused = False
638
self.producer.resumeProducing()
642
def split(self, point):
643
return fallbackSplit(self, point)
646
"""Called by reader of stream when it is done reading."""
649
if self.producer is not None:
650
self.producer.stopProducing()
654
# IConsumer implementation
655
def write(self, data):
660
deferred = self.deferred
662
deferred.callback(data)
664
self.buffer.append(data)
665
if(self.producer is not None and self.streamingProducer
666
and len(self.buffer) > self.bufferSize):
667
self.producer.pauseProducing()
668
self.producerPaused = True
670
def finish(self, failure=None):
671
"""Called by producer when it is done.
673
If the optional failure argument is passed a Failure instance,
674
the stream will return it as errback on next Deferred.
679
if self.deferred is not None:
680
deferred = self.deferred
682
if failure is not None:
684
deferred.errback(failure)
686
deferred.callback(None)
688
if failure is not None:
690
self.failure = failure
692
def registerProducer(self, producer, streaming):
693
if self.producer is not None:
694
raise RuntimeError("Cannot register producer %s, because producer %s was never unregistered." % (producer, self.producer))
697
producer.stopProducing()
699
self.producer = producer
700
self.streamingProducer = streaming
702
producer.resumeProducing()
704
def unregisterProducer(self):
707
class StreamProducer(object):
708
"""A push producer which gets its data by reading a stream."""
709
implements(ti_interfaces.IPushProducer)
712
finishedCallback = None
716
def __init__(self, stream, enforceStr=True):
718
self.enforceStr = enforceStr
720
def beginProducing(self, consumer):
721
if self.stream is None:
722
return defer.succeed(None)
724
self.consumer = consumer
725
finishedCallback = self.finishedCallback = Deferred()
726
self.consumer.registerProducer(self, True)
727
self.resumeProducing()
728
return finishedCallback
730
def resumeProducing(self):
732
if self.deferred is not None:
736
data = self.stream.read()
738
self.stopProducing(Failure())
741
if isinstance(data, Deferred):
742
self.deferred = data.addCallbacks(self._doWrite, self.stopProducing)
746
def _doWrite(self, data):
747
if self.consumer is None:
751
if self.consumer is not None:
752
self.consumer.unregisterProducer()
753
if self.finishedCallback is not None:
754
self.finishedCallback.callback(None)
755
self.finishedCallback = self.deferred = self.consumer = self.stream = None
760
# XXX: sucks that we have to do this. make transport.write(buffer) work!
761
data = str(buffer(data))
762
self.consumer.write(data)
765
self.resumeProducing()
767
def pauseProducing(self):
770
def stopProducing(self, failure=ti_error.ConnectionLost()):
771
if self.consumer is not None:
772
self.consumer.unregisterProducer()
773
if self.finishedCallback is not None:
774
if failure is not None:
775
self.finishedCallback.errback(failure)
777
self.finishedCallback.callback(None)
778
self.finishedCallback = None
780
if self.stream is not None:
783
self.finishedCallback = self.deferred = self.consumer = self.stream = None
785
##############################
786
#### ProcessStreamer ####
787
##############################
789
class _ProcessStreamerProtocol(protocol.ProcessProtocol):
791
def __init__(self, inputStream, outStream, errStream):
792
self.inputStream = inputStream
793
self.outStream = outStream
794
self.errStream = errStream
795
self.resultDeferred = defer.Deferred()
797
def connectionMade(self):
798
p = StreamProducer(self.inputStream)
799
# if the process stopped reading from the input stream,
800
# this is not an error condition, so it oughtn't result
801
# in a ConnectionLost() from the input stream:
802
p.stopProducing = lambda err=None: StreamProducer.stopProducing(p, err)
804
d = p.beginProducing(self.transport)
805
d.addCallbacks(lambda _: self.transport.closeStdin(),
808
def _inputError(self, f):
809
log.msg("Error in input stream for %r" % self.transport)
811
self.transport.closeStdin()
813
def outReceived(self, data):
814
self.outStream.write(data)
816
def errReceived(self, data):
817
self.errStream.write(data)
819
def outConnectionLost(self):
820
self.outStream.finish()
822
def errConnectionLost(self):
823
self.errStream.finish()
825
def processEnded(self, reason):
826
self.resultDeferred.errback(reason)
827
del self.resultDeferred
830
class ProcessStreamer(object):
831
"""Runs a process hooked up to streams.
833
Requires an input stream, has attributes 'outStream' and 'errStream'
834
for stdout and stderr.
836
outStream and errStream are public attributes providing streams
837
for stdout and stderr of the process.
840
def __init__(self, inputStream, program, args, env={}):
841
self.outStream = ProducerStream()
842
self.errStream = ProducerStream()
843
self._protocol = _ProcessStreamerProtocol(IByteStream(inputStream), self.outStream, self.errStream)
844
self._program = program
851
Returns Deferred which will eventually have errback for non-clean (exit code > 0)
852
exit, with ProcessTerminated, or callback with None on exit code 0.
854
# XXX what happens if spawn fails?
855
reactor.spawnProcess(self._protocol, self._program, self._args, env=self._env)
857
return self._protocol.resultDeferred.addErrback(lambda _: _.trap(ti_error.ProcessDone))
859
##############################
860
#### generatorToStream ####
861
##############################
863
class _StreamIterator(object):
874
class _IteratorStream(object):
877
def __init__(self, fun, stream, args, kwargs):
879
self._streamIterator = _StreamIterator()
880
self._gen = fun(self._streamIterator, *args, **kwargs)
884
val = self._gen.next()
885
except StopIteration:
888
if val is _StreamIterator.wait:
889
newdata = self._stream.read()
890
if isinstance(newdata, defer.Deferred):
891
return newdata.addCallback(self._gotRead)
893
return self._gotRead(newdata)
896
def _gotRead(self, data):
898
self._streamIterator.done=True
900
self._streamIterator.value=data
905
del self._gen, self._stream, self._streamIterator
908
return fallbackSplit(self)
910
def generatorToStream(fun):
911
"""Converts a generator function into a stream.
913
The function should take an iterator as its first argument,
914
which will be converted *from* a stream by this wrapper, and
915
yield items which are turned *into* the results from the
916
stream's 'read' call.
918
One important point: before every call to input.next(), you
919
*MUST* do a "yield input.wait" first. Yielding this magic value
920
takes care of ensuring that the input is not a deferred before
923
>>> from twisted.web2 import stream
924
>>> from string import maketrans
925
>>> alphabet = 'abcdefghijklmnopqrstuvwxyz'
927
>>> def encrypt(input, key):
928
... code = alphabet[key:] + alphabet[:key]
929
... translator = maketrans(alphabet+alphabet.upper(), code+code.upper())
932
... yield str(s).translate(translator)
935
>>> encrypt = stream.generatorToStream(encrypt)
937
>>> plaintextStream = stream.MemoryStream('SampleSampleSample')
938
>>> encryptedStream = encrypt(plaintextStream, 13)
939
>>> encryptedStream.read()
942
>>> plaintextStream = stream.MemoryStream('SampleSampleSample')
943
>>> encryptedStream = encrypt(plaintextStream, 13)
944
>>> evenMoreEncryptedStream = encrypt(encryptedStream, 13)
945
>>> evenMoreEncryptedStream.read()
949
def generatorToStream_inner(stream, *args, **kwargs):
950
return _IteratorStream(fun, stream, args, kwargs)
951
return generatorToStream_inner
954
##############################
955
#### BufferedStream ####
956
##############################
958
class BufferedStream(object):
959
"""A stream which buffers its data to provide operations like
960
readline and readExactly."""
963
def __init__(self, stream):
966
def _readUntil(self, f):
967
"""Internal helper function which repeatedly calls f each time
968
after more data has been received, until it returns non-None."""
974
newdata = self.stream.read()
975
if isinstance(newdata, defer.Deferred):
976
newdata = defer.waitForDeferred(newdata)
977
yield newdata; newdata = newdata.getResult()
983
yield newdata; return
984
self.data += str(newdata)
985
_readUntil = defer.deferredGenerator(_readUntil)
987
def readExactly(self, size=None):
988
"""Read exactly size bytes of data, or, if size is None, read
989
the entire stream into a string."""
990
if size is not None and size < 0:
991
raise ValueError("readExactly: size cannot be negative: %s", size)
995
if size is not None and len(data) >= size:
996
pre,post = data[:size], data[size:]
999
return self._readUntil(gotdata)
1002
def readline(self, delimiter='\r\n', size=None):
1004
Read a line of data from the string, bounded by
1005
delimiter. The delimiter is included in the return value.
1007
If size is specified, read and return at most that many bytes,
1008
even if the delimiter has not yet been reached. If the size
1009
limit falls within a delimiter, the rest of the delimiter, and
1010
the next line will be returned together.
1012
if size is not None and size < 0:
1013
raise ValueError("readline: size cannot be negative: %s" % (size, ))
1017
if size is not None:
1018
splitpoint = data.find(delimiter, 0, size)
1019
if splitpoint == -1:
1020
if len(data) >= size:
1023
splitpoint += len(delimiter)
1025
splitpoint = data.find(delimiter)
1026
if splitpoint != -1:
1027
splitpoint += len(delimiter)
1029
if splitpoint != -1:
1030
pre = data[:splitpoint]
1031
self.data = data[splitpoint:]
1033
return self._readUntil(gotdata)
1035
def pushback(self, pushed):
1036
"""Push data back into the buffer."""
1038
self.data = pushed + self.data
1045
return self.stream.read()
1048
l = self.stream.length
1051
return l + len(self.data)
1053
length = property(_len)
1055
def split(self, offset):
1056
off = offset - len(self.data)
1058
pre, post = self.stream.split(max(0, off))
1059
pre = BufferedStream(pre)
1060
post = BufferedStream(post)
1062
pre.data = self.data[:-off]
1063
post.data = self.data[-off:]
1065
pre.data = self.data
1070
def substream(stream, start, end):
1072
raise ValueError("start position must be less than end position %r"
1074
stream = stream.split(start)[1]
1075
return stream.split(end - start)[0]
1079
__all__ = ['IStream', 'IByteStream', 'FileStream', 'MemoryStream', 'CompoundStream',
1080
'readAndDiscard', 'fallbackSplit', 'ProducerStream', 'StreamProducer',
1081
'BufferedStream', 'readStream', 'ProcessStreamer', 'readIntoFile',
1082
'generatorToStream']