1
# -*- test-case-name: twisted.test.test_protocols -*-
2
# Copyright (c) 2001-2009 Twisted Matrix Laboratories.
3
# See LICENSE for details.
7
Basic protocols, such as line-oriented, netstring, and int prefixed strings.
9
Maintainer: Itamar Shtull-Trauring
17
from zope.interface import implements
20
from twisted.internet import protocol, defer, interfaces, error
21
from twisted.python import log
23
LENGTH, DATA, COMMA = range(3)
24
NUMBER = re.compile('(\d*)(:?)')
27
class NetstringParseError(ValueError):
28
"""The incoming data is not in valid Netstring format."""
32
class NetstringReceiver(protocol.Protocol):
33
"""This uses djb's Netstrings protocol to break up the input into strings.
35
Each string makes a callback to stringReceived, with a single
36
argument of that string.
39
1. Messages are limited in size, useful if you don't want someone
40
sending you a 500MB netstring (change MAX_LENGTH to the maximum
41
length you wish to accept).
42
2. The connection is lost if an illegal message is received.
50
def stringReceived(self, line):
54
raise NotImplementedError
57
buffer,self.__data = self.__data[:int(self._readerLength)],self.__data[int(self._readerLength):]
58
self._readerLength = self._readerLength - len(buffer)
59
self.__buffer = self.__buffer + buffer
60
if self._readerLength != 0:
62
self.stringReceived(self.__buffer)
63
self._readerState = COMMA
66
self._readerState = LENGTH
67
if self.__data[0] != ',':
69
raise NetstringParseError(repr(self.__data))
71
raise NetstringParseError
72
self.__data = self.__data[1:]
76
m = NUMBER.match(self.__data)
79
raise NetstringParseError(repr(self.__data))
81
raise NetstringParseError
82
self.__data = self.__data[m.end():]
85
self._readerLength = self._readerLength * (10**len(m.group(1))) + long(m.group(1))
87
raise NetstringParseError, "netstring too long"
88
if self._readerLength > self.MAX_LENGTH:
89
raise NetstringParseError, "netstring too long"
92
self._readerState = DATA
94
def dataReceived(self, data):
98
if self._readerState == DATA:
100
elif self._readerState == COMMA:
102
elif self._readerState == LENGTH:
105
raise RuntimeError, "mode is not DATA, COMMA or LENGTH"
106
except NetstringParseError:
107
self.transport.loseConnection()
110
def sendString(self, data):
112
A method for sending a Netstring. This method accepts a string and
113
writes it to the transport.
117
if not isinstance(data, str):
119
"data passed to sendString() must be a string. Non-string "
120
"support is deprecated since Twisted 10.0",
121
DeprecationWarning, 2)
123
self.transport.write('%d:%s,' % (len(data), data))
126
class SafeNetstringReceiver(NetstringReceiver):
127
"""This class is deprecated, use NetstringReceiver instead.
131
class LineOnlyReceiver(protocol.Protocol):
132
"""A protocol that receives only lines.
134
This is purely a speed optimisation over LineReceiver, for the
135
cases that raw mode is known to be unnecessary.
137
@cvar delimiter: The line-ending delimiter to use. By default this is
139
@cvar MAX_LENGTH: The maximum length of a line to allow (If a
140
sent line is longer than this, the connection is dropped).
147
def dataReceived(self, data):
148
"""Translates bytes into lines, and calls lineReceived."""
149
lines = (self._buffer+data).split(self.delimiter)
150
self._buffer = lines.pop(-1)
152
if self.transport.disconnecting:
153
# this is necessary because the transport may be told to lose
154
# the connection by a line within a larger packet, and it is
155
# important to disregard all the lines in that packet following
156
# the one that told it to close.
158
if len(line) > self.MAX_LENGTH:
159
return self.lineLengthExceeded(line)
161
self.lineReceived(line)
162
if len(self._buffer) > self.MAX_LENGTH:
163
return self.lineLengthExceeded(self._buffer)
165
def lineReceived(self, line):
166
"""Override this for when each line is received.
168
raise NotImplementedError
170
def sendLine(self, line):
171
"""Sends a line to the other end of the connection.
173
return self.transport.writeSequence((line,self.delimiter))
175
def lineLengthExceeded(self, line):
176
"""Called when the maximum line length has been reached.
177
Override if it needs to be dealt with in some special way.
179
return error.ConnectionLost('Line length exceeded')
182
class _PauseableMixin:
185
def pauseProducing(self):
187
self.transport.pauseProducing()
189
def resumeProducing(self):
191
self.transport.resumeProducing()
192
self.dataReceived('')
194
def stopProducing(self):
196
self.transport.stopProducing()
199
class LineReceiver(protocol.Protocol, _PauseableMixin):
200
"""A protocol that receives lines and/or raw data, depending on mode.
202
In line mode, each line that's received becomes a callback to
203
L{lineReceived}. In raw data mode, each chunk of raw data becomes a
204
callback to L{rawDataReceived}. The L{setLineMode} and L{setRawMode}
205
methods switch between the two modes.
207
This is useful for line-oriented protocols such as IRC, HTTP, POP, etc.
209
@cvar delimiter: The line-ending delimiter to use. By default this is
211
@cvar MAX_LENGTH: The maximum length of a line to allow (If a
212
sent line is longer than this, the connection is dropped).
220
def clearLineBuffer(self):
224
@return: All of the cleared buffered data.
231
def dataReceived(self, data):
232
"""Protocol.dataReceived.
233
Translates bytes into lines, and calls lineReceived (or
234
rawDataReceived, depending on mode.)
236
self.__buffer = self.__buffer+data
237
while self.line_mode and not self.paused:
239
line, self.__buffer = self.__buffer.split(self.delimiter, 1)
241
if len(self.__buffer) > self.MAX_LENGTH:
242
line, self.__buffer = self.__buffer, ''
243
return self.lineLengthExceeded(line)
246
linelength = len(line)
247
if linelength > self.MAX_LENGTH:
248
exceeded = line + self.__buffer
250
return self.lineLengthExceeded(exceeded)
251
why = self.lineReceived(line)
252
if why or self.transport and self.transport.disconnecting:
259
return self.rawDataReceived(data)
261
def setLineMode(self, extra=''):
262
"""Sets the line-mode of this receiver.
264
If you are calling this from a rawDataReceived callback,
265
you can pass in extra unhandled data, and that data will
266
be parsed for lines. Further data received will be sent
267
to lineReceived rather than rawDataReceived.
269
Do not pass extra data if calling this function from
270
within a lineReceived callback.
274
return self.dataReceived(extra)
276
def setRawMode(self):
277
"""Sets the raw mode of this receiver.
278
Further data received will be sent to rawDataReceived rather
283
def rawDataReceived(self, data):
284
"""Override this for when raw data is received.
286
raise NotImplementedError
288
def lineReceived(self, line):
289
"""Override this for when each line is received.
291
raise NotImplementedError
293
def sendLine(self, line):
294
"""Sends a line to the other end of the connection.
296
return self.transport.write(line + self.delimiter)
298
def lineLengthExceeded(self, line):
299
"""Called when the maximum line length has been reached.
300
Override if it needs to be dealt with in some special way.
302
The argument 'line' contains the remainder of the buffer, starting
303
with (at least some part) of the line which is too long. This may
304
be more than one line, or may be only the initial portion of the
307
return self.transport.loseConnection()
310
class StringTooLongError(AssertionError):
312
Raised when trying to send a string too long for a length prefixed
317
class IntNStringReceiver(protocol.Protocol, _PauseableMixin):
319
Generic class for length prefixed protocols.
321
@ivar recvd: buffer holding received data when splitted.
324
@ivar structFormat: format used for struct packing/unpacking. Define it in
326
@type structFormat: C{str}
328
@ivar prefixLength: length of the prefix, in bytes. Define it in subclass,
329
using C{struct.calcsize(structFormat)}
330
@type prefixLength: C{int}
335
def stringReceived(self, msg):
339
raise NotImplementedError
342
def lengthLimitExceeded(self, length):
344
Callback invoked when a length prefix greater than C{MAX_LENGTH} is
345
received. The default implementation disconnects the transport.
348
@param length: The length prefix which was received.
351
self.transport.loseConnection()
354
def dataReceived(self, recd):
356
Convert int prefixed strings into calls to stringReceived.
358
self.recvd = self.recvd + recd
359
while len(self.recvd) >= self.prefixLength and not self.paused:
360
length ,= struct.unpack(
361
self.structFormat, self.recvd[:self.prefixLength])
362
if length > self.MAX_LENGTH:
363
self.lengthLimitExceeded(length)
365
if len(self.recvd) < length + self.prefixLength:
367
packet = self.recvd[self.prefixLength:length + self.prefixLength]
368
self.recvd = self.recvd[length + self.prefixLength:]
369
self.stringReceived(packet)
371
def sendString(self, data):
373
Send an prefixed string to the other end of the connection.
377
if len(data) >= 2 ** (8 * self.prefixLength):
378
raise StringTooLongError(
379
"Try to send %s bytes whereas maximum is %s" % (
380
len(data), 2 ** (8 * self.prefixLength)))
381
self.transport.write(struct.pack(self.structFormat, len(data)) + data)
384
class Int32StringReceiver(IntNStringReceiver):
386
A receiver for int32-prefixed strings.
388
An int32 string is a string prefixed by 4 bytes, the 32-bit length of
389
the string encoded in network byte order.
391
This class publishes the same interface as NetstringReceiver.
394
prefixLength = struct.calcsize(structFormat)
397
class Int16StringReceiver(IntNStringReceiver):
399
A receiver for int16-prefixed strings.
401
An int16 string is a string prefixed by 2 bytes, the 16-bit length of
402
the string encoded in network byte order.
404
This class publishes the same interface as NetstringReceiver.
407
prefixLength = struct.calcsize(structFormat)
410
class Int8StringReceiver(IntNStringReceiver):
412
A receiver for int8-prefixed strings.
414
An int8 string is a string prefixed by 1 byte, the 8-bit length of
417
This class publishes the same interface as NetstringReceiver.
420
prefixLength = struct.calcsize(structFormat)
423
class StatefulStringProtocol:
425
A stateful string protocol.
427
This is a mixin for string protocols (Int32StringReceiver,
428
NetstringReceiver) which translates stringReceived into a callback
429
(prefixed with 'proto_') depending on state.
431
The state 'done' is special; if a proto_* method returns it, the
432
connection will be closed immediately.
437
def stringReceived(self,string):
438
"""Choose a protocol phase function and call it.
440
Call back to the appropriate protocol phase; this begins with
441
the function proto_init and moves on to proto_* depending on
442
what each proto_* function returns. (For example, if
443
self.proto_init returns 'foo', then self.proto_foo will be the
444
next function called when a protocol message is received.
447
pto = 'proto_'+self.state
448
statehandler = getattr(self,pto)
449
except AttributeError:
450
log.msg('callback',self.state,'not found')
452
self.state = statehandler(string)
453
if self.state == 'done':
454
self.transport.loseConnection()
457
"""A producer that sends the contents of a file to a consumer.
459
This is a helper for protocols that, at some point, will take a
460
file-like object, read its contents, and write them out to the network,
461
optionally performing some transformation on the bytes in between.
463
implements(interfaces.IProducer)
470
def beginFileTransfer(self, file, consumer, transform = None):
471
"""Begin transferring a file
473
@type file: Any file-like object
474
@param file: The file object to read data from
476
@type consumer: Any implementor of IConsumer
477
@param consumer: The object to write data to
479
@param transform: A callable taking one string argument and returning
480
the same. All bytes read from the file are passed through this before
481
being written to the consumer.
484
@return: A deferred whose callback will be invoked when the file has been
485
completely written to the consumer. The last byte written to the consumer
486
is passed to the callback.
489
self.consumer = consumer
490
self.transform = transform
492
self.deferred = deferred = defer.Deferred()
493
self.consumer.registerProducer(self, False)
496
def resumeProducing(self):
499
chunk = self.file.read(self.CHUNK_SIZE)
502
self.consumer.unregisterProducer()
504
self.deferred.callback(self.lastSent)
509
chunk = self.transform(chunk)
510
self.consumer.write(chunk)
511
self.lastSent = chunk[-1]
513
def pauseProducing(self):
516
def stopProducing(self):
518
self.deferred.errback(Exception("Consumer asked us to stop producing"))