~landscape/zope3/newer-from-ztk

« back to all changes in this revision

Viewing changes to src/twisted/web2/stream.py

  • Committer: Thomas Hervé
  • Date: 2009-07-08 13:52:04 UTC
  • Revision ID: thomas@canonical.com-20090708135204-df5eesrthifpylf8
Remove twisted copy

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# -*- test-case-name: twisted.web2.test.test_stream -*-
2
 
 
3
 
"""
4
 
The stream module provides a simple abstraction of streaming
5
 
data. While Twisted already has some provisions for handling this in
6
 
its Producer/Consumer model, the rather complex interactions between
7
 
producer and consumer makes it difficult to implement something like
8
 
the CompoundStream object. Thus, this API.
9
 
 
10
 
The IStream interface is very simple. It consists of two methods:
11
 
read, and close. The read method should either return some data, None
12
 
if there is no data left to read, or a Deferred. Close frees up any
13
 
underlying resources and causes read to return None forevermore.
14
 
 
15
 
IByteStream adds a bit more to the API:
16
 
1) read is required to return objects conforming to the buffer interface.  
17
 
2) .length, which may either an integer number of bytes remaining, or
18
 
None if unknown
19
 
3) .split(position). Split takes a position, and splits the
20
 
stream in two pieces, returning the two new streams. Using the
21
 
original stream after calling split is not allowed. 
22
 
 
23
 
There are two builtin source stream classes: FileStream and
24
 
MemoryStream. The first produces data from a file object, the second
25
 
from a buffer in memory. Any number of these can be combined into one
26
 
stream with the CompoundStream object. Then, to interface with other
27
 
parts of Twisted, there are two transcievers: StreamProducer and
28
 
ProducerStream. The first takes a stream and turns it into an
29
 
IPushProducer, which will write to a consumer. The second is a
30
 
consumer which is a stream, so that other producers can write to it.
31
 
"""
32
 
 
33
 
from __future__ import generators
34
 
 
35
 
import copy, os, types, sys
36
 
from zope.interface import Interface, Attribute, implements
37
 
from twisted.internet.defer import Deferred
38
 
from twisted.internet import interfaces as ti_interfaces, defer, reactor, protocol, error as ti_error
39
 
from twisted.python import components, log
40
 
from twisted.python.failure import Failure
41
 
 
42
 
# Python 2.4.2 (only) has a broken mmap that leaks a fd every time you call it.
43
 
if sys.version_info[0:3] != (2,4,2):
44
 
    try:
45
 
        import mmap
46
 
    except ImportError:
47
 
        mmap = None
48
 
else:
49
 
    mmap = None
50
 
    
51
 
##############################
52
 
####      Interfaces      ####
53
 
##############################
54
 
 
55
 
class IStream(Interface):
56
 
    """A stream of arbitrary data."""
57
 
    
58
 
    def read():
59
 
        """Read some data.
60
 
 
61
 
        Returns some object representing the data.
62
 
        If there is no more data available, returns None.
63
 
        Can also return a Deferred resulting in one of the above.
64
 
 
65
 
        Errors may be indicated by exception or by a Deferred of a Failure.
66
 
        """
67
 
        
68
 
    def close():
69
 
        """Prematurely close. Should also cause further reads to
70
 
        return None."""
71
 
 
72
 
class IByteStream(IStream):
73
 
    """A stream which is of bytes."""
74
 
    
75
 
    length = Attribute("""How much data is in this stream. Can be None if unknown.""")
76
 
    
77
 
    def read():
78
 
        """Read some data.
79
 
        
80
 
        Returns an object conforming to the buffer interface, or
81
 
        if there is no more data available, returns None.
82
 
        Can also return a Deferred resulting in one of the above.
83
 
 
84
 
        Errors may be indicated by exception or by a Deferred of a Failure.
85
 
        """
86
 
    def split(point):
87
 
        """Split this stream into two, at byte position 'point'.
88
 
 
89
 
        Returns a tuple of (before, after). After calling split, no other
90
 
        methods should be called on this stream. Doing so will have undefined
91
 
        behavior.
92
 
 
93
 
        If you cannot implement split easily, you may implement it as::
94
 
 
95
 
            return fallbackSplit(self, point)
96
 
        """
97
 
 
98
 
    def close():
99
 
        """Prematurely close this stream. Should also cause further reads to
100
 
        return None. Additionally, .length should be set to 0.
101
 
        """
102
 
 
103
 
class ISendfileableStream(Interface):
104
 
    def read(sendfile=False):
105
 
        """
106
 
        Read some data.
107
 
        If sendfile == False, returns an object conforming to the buffer
108
 
        interface, or else a Deferred.
109
 
 
110
 
        If sendfile == True, returns either the above, or a SendfileBuffer.
111
 
        """
112
 
        
113
 
class SimpleStream(object):
114
 
    """Superclass of simple streams with a single buffer and a offset and length
115
 
    into that buffer."""
116
 
    implements(IByteStream)
117
 
    
118
 
    length = None
119
 
    start = None
120
 
    
121
 
    def read(self):
122
 
        return None
123
 
 
124
 
    def close(self):
125
 
        self.length = 0
126
 
    
127
 
    def split(self, point):
128
 
        if self.length is not None:
129
 
            if point > self.length:
130
 
                raise ValueError("split point (%d) > length (%d)" % (point, self.length))
131
 
        b = copy.copy(self)
132
 
        self.length = point
133
 
        if b.length is not None:
134
 
            b.length -= point
135
 
        b.start += point
136
 
        return (self, b)
137
 
 
138
 
##############################
139
 
####      FileStream      ####
140
 
##############################
141
 
    
142
 
# maximum mmap size
143
 
MMAP_LIMIT = 4*1024*1024
144
 
# minimum mmap size
145
 
MMAP_THRESHOLD = 8*1024
146
 
 
147
 
# maximum sendfile length
148
 
SENDFILE_LIMIT = 16777216
149
 
# minimum sendfile size
150
 
SENDFILE_THRESHOLD = 256
151
 
 
152
 
def mmapwrapper(*args, **kwargs):
153
 
    """
154
 
    Python's mmap call sucks and ommitted the "offset" argument for no
155
 
    discernable reason. Replace this with a mmap module that has offset.
156
 
    """
157
 
    
158
 
    offset = kwargs.get('offset', None)
159
 
    if offset in [None, 0]:
160
 
        if 'offset' in kwargs:
161
 
            del kwargs['offset']
162
 
    else:
163
 
        raise mmap.error("mmap: Python sucks and does not support offset.")
164
 
    return mmap.mmap(*args, **kwargs)
165
 
 
166
 
class FileStream(SimpleStream):
167
 
    implements(ISendfileableStream)
168
 
    """A stream that reads data from a file. File must be a normal
169
 
    file that supports seek, (e.g. not a pipe or device or socket)."""
170
 
    # 65K, minus some slack
171
 
    CHUNK_SIZE = 2 ** 2 ** 2 ** 2 - 32
172
 
 
173
 
    f = None
174
 
    def __init__(self, f, start=0, length=None, useMMap=bool(mmap)):
175
 
        """
176
 
        Create the stream from file f. If you specify start and length,
177
 
        use only that portion of the file.
178
 
        """
179
 
        self.f = f
180
 
        self.start = start
181
 
        if length is None:
182
 
            self.length = os.fstat(f.fileno()).st_size
183
 
        else:
184
 
            self.length = length
185
 
        self.useMMap = useMMap
186
 
        
187
 
    def read(self, sendfile=False):
188
 
        if self.f is None:
189
 
            return None
190
 
 
191
 
        length = self.length
192
 
        if length == 0:
193
 
            self.f = None
194
 
            return None
195
 
 
196
 
        if sendfile and length > SENDFILE_THRESHOLD:
197
 
            # XXX: Yay using non-existent sendfile support!
198
 
            # FIXME: if we return a SendfileBuffer, and then sendfile
199
 
            #        fails, then what? Or, what if file is too short?
200
 
            readSize = min(length, SENDFILE_LIMIT)
201
 
            res = SendfileBuffer(self.f, self.start, readSize)
202
 
            self.length -= readSize
203
 
            self.start += readSize
204
 
            return res
205
 
 
206
 
        if self.useMMap and length > MMAP_THRESHOLD:
207
 
            readSize = min(length, MMAP_LIMIT)
208
 
            try:
209
 
                res = mmapwrapper(self.f.fileno(), readSize,
210
 
                                  access=mmap.ACCESS_READ, offset=self.start)
211
 
                #madvise(res, MADV_SEQUENTIAL)
212
 
                self.length -= readSize
213
 
                self.start += readSize
214
 
                return res
215
 
            except mmap.error:
216
 
                pass
217
 
 
218
 
        # Fall back to standard read.
219
 
        readSize = min(length, self.CHUNK_SIZE)
220
 
 
221
 
        self.f.seek(self.start)
222
 
        b = self.f.read(readSize)
223
 
        bytesRead = len(b)
224
 
        if not bytesRead:
225
 
            raise RuntimeError("Ran out of data reading file %r, expected %d more bytes" % (self.f, length))
226
 
        else:
227
 
            self.length -= bytesRead
228
 
            self.start += bytesRead
229
 
            return b
230
 
 
231
 
    def close(self):
232
 
        self.f = None
233
 
        SimpleStream.close(self)
234
 
 
235
 
components.registerAdapter(FileStream, file, IByteStream)
236
 
 
237
 
##############################
238
 
####     MemoryStream     ####
239
 
##############################
240
 
 
241
 
class MemoryStream(SimpleStream):
242
 
    """A stream that reads data from a buffer object."""
243
 
    def __init__(self, mem, start=0, length=None):
244
 
        """
245
 
        Create the stream from buffer object mem. If you specify start and length,
246
 
        use only that portion of the buffer.
247
 
        """
248
 
        self.mem = mem
249
 
        self.start = start
250
 
        if length is None:
251
 
            self.length = len(mem) - start
252
 
        else:
253
 
            if len(mem) < length:
254
 
                raise ValueError("len(mem) < start + length")
255
 
            self.length = length
256
 
 
257
 
    def read(self):
258
 
        if self.mem is None:
259
 
            return None
260
 
        if self.length == 0:
261
 
            result = None
262
 
        else:
263
 
            result = buffer(self.mem, self.start, self.length)
264
 
        self.mem = None
265
 
        self.length = 0
266
 
        return result
267
 
 
268
 
    def close(self):
269
 
        self.mem = None
270
 
        SimpleStream.close(self)
271
 
 
272
 
components.registerAdapter(MemoryStream, str, IByteStream)
273
 
components.registerAdapter(MemoryStream, types.BufferType, IByteStream)
274
 
 
275
 
##############################
276
 
####    CompoundStream    ####
277
 
##############################
278
 
 
279
 
class CompoundStream(object):
280
 
    """A stream which is composed of many other streams.
281
 
 
282
 
    Call addStream to add substreams.
283
 
    """
284
 
    
285
 
    implements(IByteStream, ISendfileableStream)
286
 
    deferred = None
287
 
    length = 0
288
 
    
289
 
    def __init__(self, buckets=()):
290
 
        self.buckets = [IByteStream(s) for s in buckets]
291
 
        
292
 
    def addStream(self, bucket):
293
 
        """Add a stream to the output"""
294
 
        bucket = IByteStream(bucket)
295
 
        self.buckets.append(bucket)
296
 
        if self.length is not None:
297
 
            if bucket.length is None:
298
 
                self.length = None
299
 
            else:
300
 
                self.length += bucket.length
301
 
 
302
 
    def read(self, sendfile=False):
303
 
        if self.deferred is not None:
304
 
            raise RuntimeError("Call to read while read is already outstanding")
305
 
 
306
 
        if not self.buckets:
307
 
            return None
308
 
        
309
 
        if sendfile and ISendfileableStream.providedBy(self.buckets[0]):
310
 
            try:
311
 
                result = self.buckets[0].read(sendfile)
312
 
            except:
313
 
                return self._gotFailure(Failure())
314
 
        else:
315
 
            try:
316
 
                result = self.buckets[0].read()
317
 
            except:
318
 
                return self._gotFailure(Failure())
319
 
        
320
 
        if isinstance(result, Deferred):
321
 
            self.deferred = result
322
 
            result.addCallbacks(self._gotRead, self._gotFailure, (sendfile,))
323
 
            return result
324
 
        
325
 
        return self._gotRead(result, sendfile)
326
 
 
327
 
    def _gotFailure(self, f):
328
 
        self.deferred = None
329
 
        del self.buckets[0]
330
 
        self.close()
331
 
        return f
332
 
    
333
 
    def _gotRead(self, result, sendfile):
334
 
        self.deferred = None
335
 
        if result is None:
336
 
            del self.buckets[0]
337
 
            # Next bucket
338
 
            return self.read(sendfile)
339
 
        
340
 
        if self.length is not None:
341
 
            self.length -= len(result)
342
 
        return result
343
 
    
344
 
    def split(self, point):
345
 
        num = 0
346
 
        origPoint = point
347
 
        for bucket in self.buckets:
348
 
            num+=1
349
 
 
350
 
            if point == 0:
351
 
                b = CompoundStream()
352
 
                b.buckets = self.buckets[num:]
353
 
                del self.buckets[num:]
354
 
                return self,b
355
 
            
356
 
            if bucket.length is None:
357
 
                # Indeterminate length bucket.
358
 
                # give up and use fallback splitter.
359
 
                return fallbackSplit(self, origPoint)
360
 
            
361
 
            if point < bucket.length:
362
 
                before,after = bucket.split(point)
363
 
                b = CompoundStream()
364
 
                b.buckets = self.buckets[num:]
365
 
                b.buckets[0] = after
366
 
                
367
 
                del self.buckets[num+1:]
368
 
                self.buckets[num] = before
369
 
                return self,b
370
 
            
371
 
            point -= bucket.length
372
 
    
373
 
    def close(self):
374
 
        for bucket in self.buckets:
375
 
            bucket.close()
376
 
        self.buckets = []
377
 
        self.length = 0
378
 
 
379
 
 
380
 
##############################
381
 
####      readStream      ####
382
 
##############################
383
 
 
384
 
class _StreamReader(object):
385
 
    """Process a stream's data using callbacks for data and stream finish."""
386
 
 
387
 
    def __init__(self, stream, gotDataCallback):
388
 
        self.stream = stream
389
 
        self.gotDataCallback = gotDataCallback
390
 
        self.result = Deferred()
391
 
 
392
 
    def run(self):
393
 
        # self.result may be del'd in _read()
394
 
        result = self.result
395
 
        self._read()
396
 
        return result
397
 
    
398
 
    def _read(self):
399
 
        try:
400
 
            result = self.stream.read()
401
 
        except:
402
 
            self._gotError(Failure())
403
 
            return
404
 
        if isinstance(result, Deferred):
405
 
            result.addCallbacks(self._gotData, self._gotError)
406
 
        else:
407
 
            self._gotData(result)
408
 
 
409
 
    def _gotError(self, failure):
410
 
        result = self.result
411
 
        del self.result, self.gotDataCallback, self.stream
412
 
        result.errback(failure)
413
 
    
414
 
    def _gotData(self, data):
415
 
        if data is None:
416
 
            result = self.result
417
 
            del self.result, self.gotDataCallback, self.stream
418
 
            result.callback(None)
419
 
            return
420
 
        try:
421
 
            self.gotDataCallback(data)
422
 
        except:
423
 
            self._gotError(Failure())
424
 
            return
425
 
        reactor.callLater(0, self._read)
426
 
 
427
 
def readStream(stream, gotDataCallback):
428
 
    """Pass a stream's data to a callback.
429
 
 
430
 
    Returns Deferred which will be triggered on finish.  Errors in
431
 
    reading the stream or in processing it will be returned via this
432
 
    Deferred.
433
 
    """
434
 
    return _StreamReader(stream, gotDataCallback).run()
435
 
 
436
 
 
437
 
def readAndDiscard(stream):
438
 
    """Read all the data from the given stream, and throw it out.
439
 
 
440
 
    Returns Deferred which will be triggered on finish.
441
 
    """
442
 
    return readStream(stream, lambda _: None)
443
 
 
444
 
def readIntoFile(stream, outFile):
445
 
    """Read a stream and write it into a file.
446
 
 
447
 
    Returns Deferred which will be triggered on finish.
448
 
    """
449
 
    def done(_):
450
 
        outFile.close()
451
 
        return _
452
 
    return readStream(stream, outFile.write).addBoth(done)
453
 
 
454
 
def connectStream(inputStream, factory):
455
 
    """Connect a protocol constructed from a factory to stream.
456
 
 
457
 
    Returns an output stream from the protocol.
458
 
 
459
 
    The protocol's transport will have a finish() method it should
460
 
    call when done writing.
461
 
    """
462
 
    # XXX deal better with addresses
463
 
    p = factory.buildProtocol(None)
464
 
    out = ProducerStream()
465
 
    out.disconnecting = False # XXX for LineReceiver suckage
466
 
    p.makeConnection(out)
467
 
    readStream(inputStream, lambda _: p.dataReceived(_)).addCallbacks(
468
 
        lambda _: p.connectionLost(ti_error.ConnectionDone()), lambda _: p.connectionLost(_))
469
 
    return out
470
 
 
471
 
##############################
472
 
####     fallbackSplit    ####
473
 
##############################
474
 
 
475
 
def fallbackSplit(stream, point):
476
 
    after = PostTruncaterStream(stream, point)
477
 
    before = TruncaterStream(stream, point, after)
478
 
    return (before, after)
479
 
 
480
 
class TruncaterStream(object):
481
 
    def __init__(self, stream, point, postTruncater):
482
 
        self.stream = stream
483
 
        self.length = point
484
 
        self.postTruncater = postTruncater
485
 
        
486
 
    def read(self):
487
 
        if self.length == 0:
488
 
            if self.postTruncater is not None:
489
 
                postTruncater = self.postTruncater
490
 
                self.postTruncater = None
491
 
                postTruncater.sendInitialSegment(self.stream.read())
492
 
            self.stream = None
493
 
            return None
494
 
        
495
 
        result = self.stream.read()
496
 
        if isinstance(result, Deferred):
497
 
            return result.addCallback(self._gotRead)
498
 
        else:
499
 
            return self._gotRead(result)
500
 
        
501
 
    def _gotRead(self, data):
502
 
        if data is None:
503
 
            raise ValueError("Ran out of data for a split of a indeterminate length source")
504
 
        if self.length >= len(data):
505
 
            self.length -= len(data)
506
 
            return data
507
 
        else:
508
 
            before = buffer(data, 0, self.length)
509
 
            after = buffer(data, self.length)
510
 
            self.length = 0
511
 
            if self.postTruncater is not None:
512
 
                postTruncater = self.postTruncater
513
 
                self.postTruncater = None
514
 
                postTruncater.sendInitialSegment(after)
515
 
                self.stream = None
516
 
            return before
517
 
    
518
 
    def split(self, point):
519
 
        if point > self.length:
520
 
            raise ValueError("split point (%d) > length (%d)" % (point, self.length))
521
 
 
522
 
        post = PostTruncaterStream(self.stream, point)
523
 
        trunc = TruncaterStream(post, self.length - point, self.postTruncater)
524
 
        self.length = point
525
 
        self.postTruncater = post
526
 
        return self, trunc
527
 
    
528
 
    def close(self):
529
 
        if self.postTruncater is not None:
530
 
            self.postTruncater.notifyClosed(self)
531
 
        else:
532
 
            # Nothing cares about the rest of the stream
533
 
            self.stream.close()
534
 
            self.stream = None
535
 
            self.length = 0
536
 
            
537
 
 
538
 
class PostTruncaterStream(object):
539
 
    deferred = None
540
 
    sentInitialSegment = False
541
 
    truncaterClosed = None
542
 
    closed = False
543
 
    
544
 
    length = None
545
 
    def __init__(self, stream, point):
546
 
        self.stream = stream
547
 
        self.deferred = Deferred()
548
 
        if stream.length is not None:
549
 
            self.length = stream.length - point
550
 
 
551
 
    def read(self):
552
 
        if not self.sentInitialSegment:
553
 
            self.sentInitialSegment = True
554
 
            if self.truncaterClosed is not None:
555
 
                readAndDiscard(self.truncaterClosed)
556
 
                self.truncaterClosed = None
557
 
            return self.deferred
558
 
        
559
 
        return self.stream.read()
560
 
    
561
 
    def split(self, point):
562
 
        return fallbackSplit(self, point)
563
 
        
564
 
    def close(self):
565
 
        self.closed = True
566
 
        if self.truncaterClosed is not None:
567
 
            # have first half close itself
568
 
            self.truncaterClosed.postTruncater = None
569
 
            self.truncaterClosed.close()
570
 
        elif self.sentInitialSegment:
571
 
            # first half already finished up
572
 
            self.stream.close()
573
 
            
574
 
        self.deferred = None
575
 
    
576
 
    # Callbacks from TruncaterStream
577
 
    def sendInitialSegment(self, data):
578
 
        if self.closed:
579
 
            # First half finished, we don't want data.
580
 
            self.stream.close()
581
 
            self.stream = None
582
 
        if self.deferred is not None:
583
 
            if isinstance(data, Deferred):
584
 
                data.chainDeferred(self.deferred)
585
 
            else:
586
 
                self.deferred.callback(data)
587
 
        
588
 
    def notifyClosed(self, truncater):
589
 
        if self.closed:
590
 
            # we are closed, have first half really close
591
 
            truncater.postTruncater = None
592
 
            truncater.close()
593
 
        elif self.sentInitialSegment:
594
 
            # We are trying to read, read up first half
595
 
            readAndDiscard(truncater)
596
 
        else:
597
 
            # Idle, store closed info.
598
 
            self.truncaterClosed = truncater
599
 
 
600
 
########################################
601
 
#### ProducerStream/StreamProducer  ####
602
 
########################################
603
 
            
604
 
class ProducerStream(object):
605
 
    """Turns producers into a IByteStream.
606
 
    Thus, implements IConsumer and IByteStream."""
607
 
 
608
 
    implements(IByteStream, ti_interfaces.IConsumer)
609
 
    length = None
610
 
    closed = False
611
 
    failed = False
612
 
    producer = None
613
 
    producerPaused = False
614
 
    deferred = None
615
 
    
616
 
    bufferSize = 5
617
 
    
618
 
    def __init__(self, length=None):
619
 
        self.buffer = []
620
 
        self.length = length
621
 
        
622
 
    # IByteStream implementation
623
 
    def read(self):
624
 
        if self.buffer:
625
 
            return self.buffer.pop(0)
626
 
        elif self.closed:
627
 
            self.length = 0
628
 
            if self.failed:
629
 
                f = self.failure
630
 
                del self.failure
631
 
                return defer.fail(f)
632
 
            return None
633
 
        else:
634
 
            deferred = self.deferred = Deferred()
635
 
            if self.producer is not None and (not self.streamingProducer
636
 
                                              or self.producerPaused):
637
 
                self.producerPaused = False
638
 
                self.producer.resumeProducing()
639
 
                
640
 
            return deferred
641
 
        
642
 
    def split(self, point):
643
 
        return fallbackSplit(self, point)
644
 
    
645
 
    def close(self):
646
 
        """Called by reader of stream when it is done reading."""
647
 
        self.buffer=[]
648
 
        self.closed = True
649
 
        if self.producer is not None:
650
 
            self.producer.stopProducing()
651
 
            self.producer = None
652
 
        self.deferred = None
653
 
        
654
 
    # IConsumer implementation
655
 
    def write(self, data):
656
 
        if self.closed:
657
 
            return
658
 
        
659
 
        if self.deferred:
660
 
            deferred = self.deferred
661
 
            self.deferred = None
662
 
            deferred.callback(data)
663
 
        else:
664
 
            self.buffer.append(data)
665
 
            if(self.producer is not None and self.streamingProducer
666
 
               and len(self.buffer) > self.bufferSize):
667
 
                self.producer.pauseProducing()
668
 
                self.producerPaused = True
669
 
 
670
 
    def finish(self, failure=None):
671
 
        """Called by producer when it is done.
672
 
 
673
 
        If the optional failure argument is passed a Failure instance,
674
 
        the stream will return it as errback on next Deferred.
675
 
        """
676
 
        self.closed = True
677
 
        if not self.buffer:
678
 
            self.length = 0
679
 
        if self.deferred is not None:
680
 
            deferred = self.deferred
681
 
            self.deferred = None
682
 
            if failure is not None:
683
 
                self.failed = True
684
 
                deferred.errback(failure)
685
 
            else:
686
 
                deferred.callback(None)
687
 
        else:
688
 
            if failure is not None:
689
 
               self.failed = True
690
 
               self.failure = failure
691
 
    
692
 
    def registerProducer(self, producer, streaming):
693
 
        if self.producer is not None:
694
 
            raise RuntimeError("Cannot register producer %s, because producer %s was never unregistered." % (producer, self.producer))
695
 
        
696
 
        if self.closed:
697
 
            producer.stopProducing()
698
 
        else:
699
 
            self.producer = producer
700
 
            self.streamingProducer = streaming
701
 
            if not streaming:
702
 
                producer.resumeProducing()
703
 
 
704
 
    def unregisterProducer(self):
705
 
        self.producer = None
706
 
        
707
 
class StreamProducer(object):
708
 
    """A push producer which gets its data by reading a stream."""
709
 
    implements(ti_interfaces.IPushProducer)
710
 
 
711
 
    deferred = None
712
 
    finishedCallback = None
713
 
    paused = False
714
 
    consumer = None
715
 
    
716
 
    def __init__(self, stream, enforceStr=True):
717
 
        self.stream = stream
718
 
        self.enforceStr = enforceStr
719
 
        
720
 
    def beginProducing(self, consumer):
721
 
        if self.stream is None:
722
 
            return defer.succeed(None)
723
 
        
724
 
        self.consumer = consumer
725
 
        finishedCallback = self.finishedCallback = Deferred()
726
 
        self.consumer.registerProducer(self, True)
727
 
        self.resumeProducing()
728
 
        return finishedCallback
729
 
    
730
 
    def resumeProducing(self):
731
 
        self.paused = False
732
 
        if self.deferred is not None:
733
 
            return
734
 
 
735
 
        try:
736
 
            data = self.stream.read()
737
 
        except:
738
 
            self.stopProducing(Failure())
739
 
            return
740
 
        
741
 
        if isinstance(data, Deferred):
742
 
            self.deferred = data.addCallbacks(self._doWrite, self.stopProducing)
743
 
        else:
744
 
            self._doWrite(data)
745
 
 
746
 
    def _doWrite(self, data):
747
 
        if self.consumer is None:
748
 
            return
749
 
        if data is None:
750
 
            # The end.
751
 
            if self.consumer is not None:
752
 
                self.consumer.unregisterProducer()
753
 
            if self.finishedCallback is not None:
754
 
                self.finishedCallback.callback(None)
755
 
            self.finishedCallback = self.deferred = self.consumer = self.stream = None
756
 
            return
757
 
        
758
 
        self.deferred = None
759
 
        if self.enforceStr:
760
 
            # XXX: sucks that we have to do this. make transport.write(buffer) work!
761
 
            data = str(buffer(data))
762
 
        self.consumer.write(data)
763
 
        
764
 
        if not self.paused:
765
 
            self.resumeProducing()
766
 
        
767
 
    def pauseProducing(self):
768
 
        self.paused = True
769
 
 
770
 
    def stopProducing(self, failure=ti_error.ConnectionLost()):
771
 
        if self.consumer is not None:
772
 
            self.consumer.unregisterProducer()
773
 
        if self.finishedCallback is not None:
774
 
            if failure is not None:
775
 
                self.finishedCallback.errback(failure)
776
 
            else:
777
 
                self.finishedCallback.callback(None)
778
 
            self.finishedCallback = None
779
 
        self.paused = True
780
 
        if self.stream is not None:
781
 
            self.stream.close()
782
 
            
783
 
        self.finishedCallback = self.deferred = self.consumer = self.stream = None
784
 
 
785
 
##############################
786
 
####    ProcessStreamer   ####
787
 
##############################
788
 
 
789
 
class _ProcessStreamerProtocol(protocol.ProcessProtocol):
790
 
 
791
 
    def __init__(self, inputStream, outStream, errStream):
792
 
        self.inputStream = inputStream
793
 
        self.outStream = outStream
794
 
        self.errStream = errStream
795
 
        self.resultDeferred = defer.Deferred()
796
 
    
797
 
    def connectionMade(self):
798
 
        p = StreamProducer(self.inputStream)
799
 
        # if the process stopped reading from the input stream,
800
 
        # this is not an error condition, so it oughtn't result
801
 
        # in a ConnectionLost() from the input stream:
802
 
        p.stopProducing = lambda err=None: StreamProducer.stopProducing(p, err)
803
 
        
804
 
        d = p.beginProducing(self.transport)
805
 
        d.addCallbacks(lambda _: self.transport.closeStdin(),
806
 
                       self._inputError)
807
 
 
808
 
    def _inputError(self, f):
809
 
        log.msg("Error in input stream for %r" % self.transport)
810
 
        log.err(f)
811
 
        self.transport.closeStdin()
812
 
    
813
 
    def outReceived(self, data):
814
 
        self.outStream.write(data)
815
 
 
816
 
    def errReceived(self, data):
817
 
        self.errStream.write(data)
818
 
 
819
 
    def outConnectionLost(self):
820
 
        self.outStream.finish()
821
 
 
822
 
    def errConnectionLost(self):
823
 
        self.errStream.finish()
824
 
    
825
 
    def processEnded(self, reason):
826
 
        self.resultDeferred.errback(reason)
827
 
        del self.resultDeferred
828
 
 
829
 
 
830
 
class ProcessStreamer(object):
831
 
    """Runs a process hooked up to streams.
832
 
 
833
 
    Requires an input stream, has attributes 'outStream' and 'errStream'
834
 
    for stdout and stderr.
835
 
 
836
 
    outStream and errStream are public attributes providing streams
837
 
    for stdout and stderr of the process.
838
 
    """
839
 
 
840
 
    def __init__(self, inputStream, program, args, env={}):
841
 
        self.outStream = ProducerStream()
842
 
        self.errStream = ProducerStream()
843
 
        self._protocol = _ProcessStreamerProtocol(IByteStream(inputStream), self.outStream, self.errStream)
844
 
        self._program = program
845
 
        self._args = args
846
 
        self._env = env
847
 
    
848
 
    def run(self):
849
 
        """Run the process.
850
 
 
851
 
        Returns Deferred which will eventually have errback for non-clean (exit code > 0)
852
 
        exit, with ProcessTerminated, or callback with None on exit code 0.
853
 
        """
854
 
        # XXX what happens if spawn fails?
855
 
        reactor.spawnProcess(self._protocol, self._program, self._args, env=self._env)
856
 
        del self._env
857
 
        return self._protocol.resultDeferred.addErrback(lambda _: _.trap(ti_error.ProcessDone))
858
 
 
859
 
##############################
860
 
####   generatorToStream  ####
861
 
##############################
862
 
 
863
 
class _StreamIterator(object):
864
 
    done=False
865
 
 
866
 
    def __iter__(self):
867
 
        return self
868
 
    def next(self):
869
 
        if self.done:
870
 
            raise StopIteration
871
 
        return self.value
872
 
    wait=object()
873
 
 
874
 
class _IteratorStream(object):
875
 
    length = None
876
 
    
877
 
    def __init__(self, fun, stream, args, kwargs):
878
 
        self._stream=stream
879
 
        self._streamIterator = _StreamIterator()
880
 
        self._gen = fun(self._streamIterator, *args, **kwargs)
881
 
        
882
 
    def read(self):
883
 
        try:
884
 
            val = self._gen.next()
885
 
        except StopIteration:
886
 
            return None
887
 
        else:
888
 
            if val is _StreamIterator.wait:
889
 
                newdata = self._stream.read()
890
 
                if isinstance(newdata, defer.Deferred):
891
 
                    return newdata.addCallback(self._gotRead)
892
 
                else:
893
 
                    return self._gotRead(newdata)
894
 
            return val
895
 
        
896
 
    def _gotRead(self, data):
897
 
        if data is None:
898
 
            self._streamIterator.done=True
899
 
        else:
900
 
            self._streamIterator.value=data
901
 
        return self.read()
902
 
 
903
 
    def close(self):
904
 
        self._stream.close()
905
 
        del self._gen, self._stream, self._streamIterator
906
 
 
907
 
    def split(self):
908
 
        return fallbackSplit(self)
909
 
    
910
 
def generatorToStream(fun):
911
 
    """Converts a generator function into a stream.
912
 
    
913
 
    The function should take an iterator as its first argument,
914
 
    which will be converted *from* a stream by this wrapper, and
915
 
    yield items which are turned *into* the results from the
916
 
    stream's 'read' call.
917
 
    
918
 
    One important point: before every call to input.next(), you
919
 
    *MUST* do a "yield input.wait" first. Yielding this magic value
920
 
    takes care of ensuring that the input is not a deferred before
921
 
    you see it.
922
 
    
923
 
    >>> from twisted.web2 import stream
924
 
    >>> from string import maketrans
925
 
    >>> alphabet = 'abcdefghijklmnopqrstuvwxyz'
926
 
    >>>
927
 
    >>> def encrypt(input, key):
928
 
    ...     code = alphabet[key:] + alphabet[:key]
929
 
    ...     translator = maketrans(alphabet+alphabet.upper(), code+code.upper())
930
 
    ...     yield input.wait
931
 
    ...     for s in input:
932
 
    ...         yield str(s).translate(translator)
933
 
    ...         yield input.wait
934
 
    ...
935
 
    >>> encrypt = stream.generatorToStream(encrypt)
936
 
    >>>
937
 
    >>> plaintextStream = stream.MemoryStream('SampleSampleSample')
938
 
    >>> encryptedStream = encrypt(plaintextStream, 13)
939
 
    >>> encryptedStream.read()
940
 
    'FnzcyrFnzcyrFnzcyr'
941
 
    >>>
942
 
    >>> plaintextStream = stream.MemoryStream('SampleSampleSample')
943
 
    >>> encryptedStream = encrypt(plaintextStream, 13)
944
 
    >>> evenMoreEncryptedStream = encrypt(encryptedStream, 13)
945
 
    >>> evenMoreEncryptedStream.read()
946
 
    'SampleSampleSample'
947
 
    
948
 
    """
949
 
    def generatorToStream_inner(stream, *args, **kwargs):
950
 
        return _IteratorStream(fun, stream, args, kwargs)
951
 
    return generatorToStream_inner
952
 
 
953
 
 
954
 
##############################
955
 
####    BufferedStream    ####
956
 
##############################
957
 
 
958
 
class BufferedStream(object):
959
 
    """A stream which buffers its data to provide operations like
960
 
    readline and readExactly."""
961
 
    
962
 
    data = ""
963
 
    def __init__(self, stream):
964
 
        self.stream = stream
965
 
 
966
 
    def _readUntil(self, f):
967
 
        """Internal helper function which repeatedly calls f each time
968
 
        after more data has been received, until it returns non-None."""
969
 
        while True:
970
 
            r = f()
971
 
            if r is not None:
972
 
                yield r; return
973
 
            
974
 
            newdata = self.stream.read()
975
 
            if isinstance(newdata, defer.Deferred):
976
 
                newdata = defer.waitForDeferred(newdata)
977
 
                yield newdata; newdata = newdata.getResult()
978
 
            
979
 
            if newdata is None:
980
 
                # End Of File
981
 
                newdata = self.data
982
 
                self.data = ''
983
 
                yield newdata; return
984
 
            self.data += str(newdata)
985
 
    _readUntil = defer.deferredGenerator(_readUntil)
986
 
 
987
 
    def readExactly(self, size=None):
988
 
        """Read exactly size bytes of data, or, if size is None, read
989
 
        the entire stream into a string."""
990
 
        if size is not None and size < 0:
991
 
            raise ValueError("readExactly: size cannot be negative: %s", size)
992
 
        
993
 
        def gotdata():
994
 
            data = self.data
995
 
            if size is not None and len(data) >= size:
996
 
                pre,post = data[:size], data[size:]
997
 
                self.data = post
998
 
                return pre
999
 
        return self._readUntil(gotdata)
1000
 
    
1001
 
        
1002
 
    def readline(self, delimiter='\r\n', size=None):
1003
 
        """
1004
 
        Read a line of data from the string, bounded by
1005
 
        delimiter. The delimiter is included in the return value.
1006
 
 
1007
 
        If size is specified, read and return at most that many bytes,
1008
 
        even if the delimiter has not yet been reached. If the size
1009
 
        limit falls within a delimiter, the rest of the delimiter, and
1010
 
        the next line will be returned together.
1011
 
        """
1012
 
        if size is not None and size < 0:
1013
 
            raise ValueError("readline: size cannot be negative: %s" % (size, ))
1014
 
 
1015
 
        def gotdata():
1016
 
            data = self.data
1017
 
            if size is not None:
1018
 
                splitpoint = data.find(delimiter, 0, size)
1019
 
                if splitpoint == -1:
1020
 
                    if len(data) >= size:
1021
 
                        splitpoint = size
1022
 
                else:
1023
 
                    splitpoint += len(delimiter)
1024
 
            else:
1025
 
                splitpoint = data.find(delimiter)
1026
 
                if splitpoint != -1:
1027
 
                    splitpoint += len(delimiter)
1028
 
            
1029
 
            if splitpoint != -1:
1030
 
                pre = data[:splitpoint]
1031
 
                self.data = data[splitpoint:]
1032
 
                return pre
1033
 
        return self._readUntil(gotdata)
1034
 
    
1035
 
    def pushback(self, pushed):
1036
 
        """Push data back into the buffer."""
1037
 
        
1038
 
        self.data = pushed + self.data
1039
 
        
1040
 
    def read(self):
1041
 
        data = self.data
1042
 
        if data:
1043
 
            self.data = ""
1044
 
            return data
1045
 
        return self.stream.read()
1046
 
 
1047
 
    def _len(self):
1048
 
        l = self.stream.length
1049
 
        if l is None:
1050
 
            return None
1051
 
        return l + len(self.data)
1052
 
    
1053
 
    length = property(_len)
1054
 
    
1055
 
    def split(self, offset):
1056
 
        off = offset - len(self.data)
1057
 
        
1058
 
        pre, post = self.stream.split(max(0, off))
1059
 
        pre = BufferedStream(pre)
1060
 
        post = BufferedStream(post)
1061
 
        if off < 0:
1062
 
            pre.data = self.data[:-off]
1063
 
            post.data = self.data[-off:]
1064
 
        else:
1065
 
            pre.data = self.data
1066
 
        
1067
 
        return pre, post
1068
 
 
1069
 
        
1070
 
def substream(stream, start, end):
1071
 
    if start > end:
1072
 
        raise ValueError("start position must be less than end position %r"
1073
 
                         % ((start, end),))
1074
 
    stream = stream.split(start)[1]
1075
 
    return stream.split(end - start)[0]
1076
 
 
1077
 
 
1078
 
 
1079
 
__all__ = ['IStream', 'IByteStream', 'FileStream', 'MemoryStream', 'CompoundStream',
1080
 
           'readAndDiscard', 'fallbackSplit', 'ProducerStream', 'StreamProducer',
1081
 
           'BufferedStream', 'readStream', 'ProcessStreamer', 'readIntoFile',
1082
 
           'generatorToStream']
1083