~justin-fathomdb/nova/justinsb-openstack-api-volumes

« back to all changes in this revision

Viewing changes to vendor/Twisted-10.0.0/twisted/protocols/basic.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# -*- test-case-name: twisted.test.test_protocols -*-
 
2
# Copyright (c) 2001-2009 Twisted Matrix Laboratories.
 
3
# See LICENSE for details.
 
4
 
 
5
 
 
6
"""
 
7
Basic protocols, such as line-oriented, netstring, and int prefixed strings.
 
8
 
 
9
Maintainer: Itamar Shtull-Trauring
 
10
"""
 
11
 
 
12
# System imports
 
13
import re
 
14
import struct
 
15
import warnings
 
16
 
 
17
from zope.interface import implements
 
18
 
 
19
# Twisted imports
 
20
from twisted.internet import protocol, defer, interfaces, error
 
21
from twisted.python import log
 
22
 
 
23
LENGTH, DATA, COMMA = range(3)
 
24
NUMBER = re.compile('(\d*)(:?)')
 
25
DEBUG = 0
 
26
 
 
27
class NetstringParseError(ValueError):
 
28
    """The incoming data is not in valid Netstring format."""
 
29
    pass
 
30
 
 
31
 
 
32
class NetstringReceiver(protocol.Protocol):
 
33
    """This uses djb's Netstrings protocol to break up the input into strings.
 
34
 
 
35
    Each string makes a callback to stringReceived, with a single
 
36
    argument of that string.
 
37
 
 
38
    Security features:
 
39
        1. Messages are limited in size, useful if you don't want someone
 
40
           sending you a 500MB netstring (change MAX_LENGTH to the maximum
 
41
           length you wish to accept).
 
42
        2. The connection is lost if an illegal message is received.
 
43
    """
 
44
 
 
45
    MAX_LENGTH = 99999
 
46
    brokenPeer = 0
 
47
    _readerState = LENGTH
 
48
    _readerLength = 0
 
49
 
 
50
    def stringReceived(self, line):
 
51
        """
 
52
        Override this.
 
53
        """
 
54
        raise NotImplementedError
 
55
 
 
56
    def doData(self):
 
57
        buffer,self.__data = self.__data[:int(self._readerLength)],self.__data[int(self._readerLength):]
 
58
        self._readerLength = self._readerLength - len(buffer)
 
59
        self.__buffer = self.__buffer + buffer
 
60
        if self._readerLength != 0:
 
61
            return
 
62
        self.stringReceived(self.__buffer)
 
63
        self._readerState = COMMA
 
64
 
 
65
    def doComma(self):
 
66
        self._readerState = LENGTH
 
67
        if self.__data[0] != ',':
 
68
            if DEBUG:
 
69
                raise NetstringParseError(repr(self.__data))
 
70
            else:
 
71
                raise NetstringParseError
 
72
        self.__data = self.__data[1:]
 
73
 
 
74
 
 
75
    def doLength(self):
 
76
        m = NUMBER.match(self.__data)
 
77
        if not m.end():
 
78
            if DEBUG:
 
79
                raise NetstringParseError(repr(self.__data))
 
80
            else:
 
81
                raise NetstringParseError
 
82
        self.__data = self.__data[m.end():]
 
83
        if m.group(1):
 
84
            try:
 
85
                self._readerLength = self._readerLength * (10**len(m.group(1))) + long(m.group(1))
 
86
            except OverflowError:
 
87
                raise NetstringParseError, "netstring too long"
 
88
            if self._readerLength > self.MAX_LENGTH:
 
89
                raise NetstringParseError, "netstring too long"
 
90
        if m.group(2):
 
91
            self.__buffer = ''
 
92
            self._readerState = DATA
 
93
 
 
94
    def dataReceived(self, data):
 
95
        self.__data = data
 
96
        try:
 
97
            while self.__data:
 
98
                if self._readerState == DATA:
 
99
                    self.doData()
 
100
                elif self._readerState == COMMA:
 
101
                    self.doComma()
 
102
                elif self._readerState == LENGTH:
 
103
                    self.doLength()
 
104
                else:
 
105
                    raise RuntimeError, "mode is not DATA, COMMA or LENGTH"
 
106
        except NetstringParseError:
 
107
            self.transport.loseConnection()
 
108
            self.brokenPeer = 1
 
109
 
 
110
    def sendString(self, data):
 
111
        """
 
112
        A method for sending a Netstring. This method accepts a string and
 
113
        writes it to the transport.
 
114
 
 
115
        @type data: C{str}
 
116
        """
 
117
        if not isinstance(data, str):
 
118
            warnings.warn(
 
119
                "data passed to sendString() must be a string. Non-string "
 
120
                "support is deprecated since Twisted 10.0",
 
121
                DeprecationWarning, 2)
 
122
            data = str(data)
 
123
        self.transport.write('%d:%s,' % (len(data), data))
 
124
 
 
125
 
 
126
class SafeNetstringReceiver(NetstringReceiver):
 
127
    """This class is deprecated, use NetstringReceiver instead.
 
128
    """
 
129
 
 
130
 
 
131
class LineOnlyReceiver(protocol.Protocol):
 
132
    """A protocol that receives only lines.
 
133
 
 
134
    This is purely a speed optimisation over LineReceiver, for the
 
135
    cases that raw mode is known to be unnecessary.
 
136
 
 
137
    @cvar delimiter: The line-ending delimiter to use. By default this is
 
138
                     '\\r\\n'.
 
139
    @cvar MAX_LENGTH: The maximum length of a line to allow (If a
 
140
                      sent line is longer than this, the connection is dropped).
 
141
                      Default is 16384.
 
142
    """
 
143
    _buffer = ''
 
144
    delimiter = '\r\n'
 
145
    MAX_LENGTH = 16384
 
146
 
 
147
    def dataReceived(self, data):
 
148
        """Translates bytes into lines, and calls lineReceived."""
 
149
        lines  = (self._buffer+data).split(self.delimiter)
 
150
        self._buffer = lines.pop(-1)
 
151
        for line in lines:
 
152
            if self.transport.disconnecting:
 
153
                # this is necessary because the transport may be told to lose
 
154
                # the connection by a line within a larger packet, and it is
 
155
                # important to disregard all the lines in that packet following
 
156
                # the one that told it to close.
 
157
                return
 
158
            if len(line) > self.MAX_LENGTH:
 
159
                return self.lineLengthExceeded(line)
 
160
            else:
 
161
                self.lineReceived(line)
 
162
        if len(self._buffer) > self.MAX_LENGTH:
 
163
            return self.lineLengthExceeded(self._buffer)
 
164
 
 
165
    def lineReceived(self, line):
 
166
        """Override this for when each line is received.
 
167
        """
 
168
        raise NotImplementedError
 
169
 
 
170
    def sendLine(self, line):
 
171
        """Sends a line to the other end of the connection.
 
172
        """
 
173
        return self.transport.writeSequence((line,self.delimiter))
 
174
 
 
175
    def lineLengthExceeded(self, line):
 
176
        """Called when the maximum line length has been reached.
 
177
        Override if it needs to be dealt with in some special way.
 
178
        """
 
179
        return error.ConnectionLost('Line length exceeded')
 
180
 
 
181
 
 
182
class _PauseableMixin:
 
183
    paused = False
 
184
 
 
185
    def pauseProducing(self):
 
186
        self.paused = True
 
187
        self.transport.pauseProducing()
 
188
 
 
189
    def resumeProducing(self):
 
190
        self.paused = False
 
191
        self.transport.resumeProducing()
 
192
        self.dataReceived('')
 
193
 
 
194
    def stopProducing(self):
 
195
        self.paused = True
 
196
        self.transport.stopProducing()
 
197
 
 
198
 
 
199
class LineReceiver(protocol.Protocol, _PauseableMixin):
 
200
    """A protocol that receives lines and/or raw data, depending on mode.
 
201
 
 
202
    In line mode, each line that's received becomes a callback to
 
203
    L{lineReceived}.  In raw data mode, each chunk of raw data becomes a
 
204
    callback to L{rawDataReceived}.  The L{setLineMode} and L{setRawMode}
 
205
    methods switch between the two modes.
 
206
 
 
207
    This is useful for line-oriented protocols such as IRC, HTTP, POP, etc.
 
208
 
 
209
    @cvar delimiter: The line-ending delimiter to use. By default this is
 
210
                     '\\r\\n'.
 
211
    @cvar MAX_LENGTH: The maximum length of a line to allow (If a
 
212
                      sent line is longer than this, the connection is dropped).
 
213
                      Default is 16384.
 
214
    """
 
215
    line_mode = 1
 
216
    __buffer = ''
 
217
    delimiter = '\r\n'
 
218
    MAX_LENGTH = 16384
 
219
 
 
220
    def clearLineBuffer(self):
 
221
        """
 
222
        Clear buffered data.
 
223
 
 
224
        @return: All of the cleared buffered data.
 
225
        @rtype: C{str}
 
226
        """
 
227
        b = self.__buffer
 
228
        self.__buffer = ""
 
229
        return b
 
230
 
 
231
    def dataReceived(self, data):
 
232
        """Protocol.dataReceived.
 
233
        Translates bytes into lines, and calls lineReceived (or
 
234
        rawDataReceived, depending on mode.)
 
235
        """
 
236
        self.__buffer = self.__buffer+data
 
237
        while self.line_mode and not self.paused:
 
238
            try:
 
239
                line, self.__buffer = self.__buffer.split(self.delimiter, 1)
 
240
            except ValueError:
 
241
                if len(self.__buffer) > self.MAX_LENGTH:
 
242
                    line, self.__buffer = self.__buffer, ''
 
243
                    return self.lineLengthExceeded(line)
 
244
                break
 
245
            else:
 
246
                linelength = len(line)
 
247
                if linelength > self.MAX_LENGTH:
 
248
                    exceeded = line + self.__buffer
 
249
                    self.__buffer = ''
 
250
                    return self.lineLengthExceeded(exceeded)
 
251
                why = self.lineReceived(line)
 
252
                if why or self.transport and self.transport.disconnecting:
 
253
                    return why
 
254
        else:
 
255
            if not self.paused:
 
256
                data=self.__buffer
 
257
                self.__buffer=''
 
258
                if data:
 
259
                    return self.rawDataReceived(data)
 
260
 
 
261
    def setLineMode(self, extra=''):
 
262
        """Sets the line-mode of this receiver.
 
263
 
 
264
        If you are calling this from a rawDataReceived callback,
 
265
        you can pass in extra unhandled data, and that data will
 
266
        be parsed for lines.  Further data received will be sent
 
267
        to lineReceived rather than rawDataReceived.
 
268
 
 
269
        Do not pass extra data if calling this function from
 
270
        within a lineReceived callback.
 
271
        """
 
272
        self.line_mode = 1
 
273
        if extra:
 
274
            return self.dataReceived(extra)
 
275
 
 
276
    def setRawMode(self):
 
277
        """Sets the raw mode of this receiver.
 
278
        Further data received will be sent to rawDataReceived rather
 
279
        than lineReceived.
 
280
        """
 
281
        self.line_mode = 0
 
282
 
 
283
    def rawDataReceived(self, data):
 
284
        """Override this for when raw data is received.
 
285
        """
 
286
        raise NotImplementedError
 
287
 
 
288
    def lineReceived(self, line):
 
289
        """Override this for when each line is received.
 
290
        """
 
291
        raise NotImplementedError
 
292
 
 
293
    def sendLine(self, line):
 
294
        """Sends a line to the other end of the connection.
 
295
        """
 
296
        return self.transport.write(line + self.delimiter)
 
297
 
 
298
    def lineLengthExceeded(self, line):
 
299
        """Called when the maximum line length has been reached.
 
300
        Override if it needs to be dealt with in some special way.
 
301
 
 
302
        The argument 'line' contains the remainder of the buffer, starting
 
303
        with (at least some part) of the line which is too long. This may
 
304
        be more than one line, or may be only the initial portion of the
 
305
        line.
 
306
        """
 
307
        return self.transport.loseConnection()
 
308
 
 
309
 
 
310
class StringTooLongError(AssertionError):
 
311
    """
 
312
    Raised when trying to send a string too long for a length prefixed
 
313
    protocol.
 
314
    """
 
315
 
 
316
 
 
317
class IntNStringReceiver(protocol.Protocol, _PauseableMixin):
 
318
    """
 
319
    Generic class for length prefixed protocols.
 
320
 
 
321
    @ivar recvd: buffer holding received data when splitted.
 
322
    @type recvd: C{str}
 
323
 
 
324
    @ivar structFormat: format used for struct packing/unpacking. Define it in
 
325
        subclass.
 
326
    @type structFormat: C{str}
 
327
 
 
328
    @ivar prefixLength: length of the prefix, in bytes. Define it in subclass,
 
329
        using C{struct.calcsize(structFormat)}
 
330
    @type prefixLength: C{int}
 
331
    """
 
332
    MAX_LENGTH = 99999
 
333
    recvd = ""
 
334
 
 
335
    def stringReceived(self, msg):
 
336
        """
 
337
        Override this.
 
338
        """
 
339
        raise NotImplementedError
 
340
 
 
341
 
 
342
    def lengthLimitExceeded(self, length):
 
343
        """
 
344
        Callback invoked when a length prefix greater than C{MAX_LENGTH} is
 
345
        received.  The default implementation disconnects the transport.
 
346
        Override this.
 
347
 
 
348
        @param length: The length prefix which was received.
 
349
        @type length: C{int}
 
350
        """
 
351
        self.transport.loseConnection()
 
352
 
 
353
 
 
354
    def dataReceived(self, recd):
 
355
        """
 
356
        Convert int prefixed strings into calls to stringReceived.
 
357
        """
 
358
        self.recvd = self.recvd + recd
 
359
        while len(self.recvd) >= self.prefixLength and not self.paused:
 
360
            length ,= struct.unpack(
 
361
                self.structFormat, self.recvd[:self.prefixLength])
 
362
            if length > self.MAX_LENGTH:
 
363
                self.lengthLimitExceeded(length)
 
364
                return
 
365
            if len(self.recvd) < length + self.prefixLength:
 
366
                break
 
367
            packet = self.recvd[self.prefixLength:length + self.prefixLength]
 
368
            self.recvd = self.recvd[length + self.prefixLength:]
 
369
            self.stringReceived(packet)
 
370
 
 
371
    def sendString(self, data):
 
372
        """
 
373
        Send an prefixed string to the other end of the connection.
 
374
 
 
375
        @type data: C{str}
 
376
        """
 
377
        if len(data) >= 2 ** (8 * self.prefixLength):
 
378
            raise StringTooLongError(
 
379
                "Try to send %s bytes whereas maximum is %s" % (
 
380
                len(data), 2 ** (8 * self.prefixLength)))
 
381
        self.transport.write(struct.pack(self.structFormat, len(data)) + data)
 
382
 
 
383
 
 
384
class Int32StringReceiver(IntNStringReceiver):
 
385
    """
 
386
    A receiver for int32-prefixed strings.
 
387
 
 
388
    An int32 string is a string prefixed by 4 bytes, the 32-bit length of
 
389
    the string encoded in network byte order.
 
390
 
 
391
    This class publishes the same interface as NetstringReceiver.
 
392
    """
 
393
    structFormat = "!I"
 
394
    prefixLength = struct.calcsize(structFormat)
 
395
 
 
396
 
 
397
class Int16StringReceiver(IntNStringReceiver):
 
398
    """
 
399
    A receiver for int16-prefixed strings.
 
400
 
 
401
    An int16 string is a string prefixed by 2 bytes, the 16-bit length of
 
402
    the string encoded in network byte order.
 
403
 
 
404
    This class publishes the same interface as NetstringReceiver.
 
405
    """
 
406
    structFormat = "!H"
 
407
    prefixLength = struct.calcsize(structFormat)
 
408
 
 
409
 
 
410
class Int8StringReceiver(IntNStringReceiver):
 
411
    """
 
412
    A receiver for int8-prefixed strings.
 
413
 
 
414
    An int8 string is a string prefixed by 1 byte, the 8-bit length of
 
415
    the string.
 
416
 
 
417
    This class publishes the same interface as NetstringReceiver.
 
418
    """
 
419
    structFormat = "!B"
 
420
    prefixLength = struct.calcsize(structFormat)
 
421
 
 
422
 
 
423
class StatefulStringProtocol:
 
424
    """
 
425
    A stateful string protocol.
 
426
 
 
427
    This is a mixin for string protocols (Int32StringReceiver,
 
428
    NetstringReceiver) which translates stringReceived into a callback
 
429
    (prefixed with 'proto_') depending on state.
 
430
 
 
431
    The state 'done' is special; if a proto_* method returns it, the
 
432
    connection will be closed immediately.
 
433
    """
 
434
 
 
435
    state = 'init'
 
436
 
 
437
    def stringReceived(self,string):
 
438
        """Choose a protocol phase function and call it.
 
439
 
 
440
        Call back to the appropriate protocol phase; this begins with
 
441
        the function proto_init and moves on to proto_* depending on
 
442
        what each proto_* function returns.  (For example, if
 
443
        self.proto_init returns 'foo', then self.proto_foo will be the
 
444
        next function called when a protocol message is received.
 
445
        """
 
446
        try:
 
447
            pto = 'proto_'+self.state
 
448
            statehandler = getattr(self,pto)
 
449
        except AttributeError:
 
450
            log.msg('callback',self.state,'not found')
 
451
        else:
 
452
            self.state = statehandler(string)
 
453
            if self.state == 'done':
 
454
                self.transport.loseConnection()
 
455
 
 
456
class FileSender:
 
457
    """A producer that sends the contents of a file to a consumer.
 
458
 
 
459
    This is a helper for protocols that, at some point, will take a
 
460
    file-like object, read its contents, and write them out to the network,
 
461
    optionally performing some transformation on the bytes in between.
 
462
    """
 
463
    implements(interfaces.IProducer)
 
464
 
 
465
    CHUNK_SIZE = 2 ** 14
 
466
 
 
467
    lastSent = ''
 
468
    deferred = None
 
469
 
 
470
    def beginFileTransfer(self, file, consumer, transform = None):
 
471
        """Begin transferring a file
 
472
 
 
473
        @type file: Any file-like object
 
474
        @param file: The file object to read data from
 
475
 
 
476
        @type consumer: Any implementor of IConsumer
 
477
        @param consumer: The object to write data to
 
478
 
 
479
        @param transform: A callable taking one string argument and returning
 
480
        the same.  All bytes read from the file are passed through this before
 
481
        being written to the consumer.
 
482
 
 
483
        @rtype: C{Deferred}
 
484
        @return: A deferred whose callback will be invoked when the file has been
 
485
        completely written to the consumer.  The last byte written to the consumer
 
486
        is passed to the callback.
 
487
        """
 
488
        self.file = file
 
489
        self.consumer = consumer
 
490
        self.transform = transform
 
491
 
 
492
        self.deferred = deferred = defer.Deferred()
 
493
        self.consumer.registerProducer(self, False)
 
494
        return deferred
 
495
 
 
496
    def resumeProducing(self):
 
497
        chunk = ''
 
498
        if self.file:
 
499
            chunk = self.file.read(self.CHUNK_SIZE)
 
500
        if not chunk:
 
501
            self.file = None
 
502
            self.consumer.unregisterProducer()
 
503
            if self.deferred:
 
504
                self.deferred.callback(self.lastSent)
 
505
                self.deferred = None
 
506
            return
 
507
 
 
508
        if self.transform:
 
509
            chunk = self.transform(chunk)
 
510
        self.consumer.write(chunk)
 
511
        self.lastSent = chunk[-1]
 
512
 
 
513
    def pauseProducing(self):
 
514
        pass
 
515
 
 
516
    def stopProducing(self):
 
517
        if self.deferred:
 
518
            self.deferred.errback(Exception("Consumer asked us to stop producing"))
 
519
            self.deferred = None