1
# -*- test-case-name: twisted.test.test_protocols -*-
3
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
4
# See LICENSE for details.
7
"""Basic protocols, such as line-oriented, netstring, and 32-bit-int prefixed strings.
9
API Stability: semi-stable.
11
Maintainer: U{Itamar Shtull-Trauring<mailto:twisted@itamarst.org>}
19
from twisted.internet import protocol, defer, interfaces, error
20
from twisted.python import log
21
from zope.interface import implements
23
LENGTH, DATA, COMMA = range(3)
24
NUMBER = re.compile('(\d*)(:?)')
27
class NetstringParseError(ValueError):
28
"""The incoming data is not in valid Netstring format."""
32
class NetstringReceiver(protocol.Protocol):
33
"""This uses djb's Netstrings protocol to break up the input into strings.
35
Each string makes a callback to stringReceived, with a single
36
argument of that string.
39
1. Messages are limited in size, useful if you don't want someone
40
sending you a 500MB netstring (change MAX_LENGTH to the maximum
41
length you wish to accept).
42
2. The connection is lost if an illegal message is received.
50
def stringReceived(self, line):
54
raise NotImplementedError
57
buffer,self.__data = self.__data[:int(self._readerLength)],self.__data[int(self._readerLength):]
58
self._readerLength = self._readerLength - len(buffer)
59
self.__buffer = self.__buffer + buffer
60
if self._readerLength != 0:
62
self.stringReceived(self.__buffer)
63
self._readerState = COMMA
66
self._readerState = LENGTH
67
if self.__data[0] != ',':
69
raise NetstringParseError(repr(self.__data))
71
raise NetstringParseError
72
self.__data = self.__data[1:]
76
m = NUMBER.match(self.__data)
79
raise NetstringParseError(repr(self.__data))
81
raise NetstringParseError
82
self.__data = self.__data[m.end():]
85
self._readerLength = self._readerLength * (10**len(m.group(1))) + long(m.group(1))
87
raise NetstringParseError, "netstring too long"
88
if self._readerLength > self.MAX_LENGTH:
89
raise NetstringParseError, "netstring too long"
92
self._readerState = DATA
94
def dataReceived(self, data):
98
if self._readerState == DATA:
100
elif self._readerState == COMMA:
102
elif self._readerState == LENGTH:
105
raise RuntimeError, "mode is not DATA, COMMA or LENGTH"
106
except NetstringParseError:
107
self.transport.loseConnection()
110
def sendString(self, data):
111
self.transport.write('%d:%s,' % (len(data), data))
114
class SafeNetstringReceiver(NetstringReceiver):
115
"""This class is deprecated, use NetstringReceiver instead.
119
class LineOnlyReceiver(protocol.Protocol):
120
"""A protocol that receives only lines.
122
This is purely a speed optimisation over LineReceiver, for the
123
cases that raw mode is known to be unnecessary.
125
@cvar delimiter: The line-ending delimiter to use. By default this is
127
@cvar MAX_LENGTH: The maximum length of a line to allow (If a
128
sent line is longer than this, the connection is dropped).
135
def dataReceived(self, data):
136
"""Translates bytes into lines, and calls lineReceived."""
137
lines = (self._buffer+data).split(self.delimiter)
138
self._buffer = lines.pop(-1)
140
if self.transport.disconnecting:
141
# this is necessary because the transport may be told to lose
142
# the connection by a line within a larger packet, and it is
143
# important to disregard all the lines in that packet following
144
# the one that told it to close.
146
if len(line) > self.MAX_LENGTH:
147
return self.lineLengthExceeded(line)
149
self.lineReceived(line)
150
if len(self._buffer) > self.MAX_LENGTH:
151
return self.lineLengthExceeded(self._buffer)
153
def lineReceived(self, line):
154
"""Override this for when each line is received.
156
raise NotImplementedError
158
def sendLine(self, line):
159
"""Sends a line to the other end of the connection.
161
return self.transport.writeSequence((line,self.delimiter))
163
def lineLengthExceeded(self, line):
164
"""Called when the maximum line length has been reached.
165
Override if it needs to be dealt with in some special way.
167
return error.ConnectionLost('Line length exceeded')
170
class _PauseableMixin:
173
def pauseProducing(self):
175
self.transport.pauseProducing()
177
def resumeProducing(self):
179
self.transport.resumeProducing()
180
self.dataReceived('')
182
def stopProducing(self):
184
self.transport.stopProducing()
187
class LineReceiver(protocol.Protocol, _PauseableMixin):
188
"""A protocol that receives lines and/or raw data, depending on mode.
190
In line mode, each line that's received becomes a callback to
191
L{lineReceived}. In raw data mode, each chunk of raw data becomes a
192
callback to L{rawDataReceived}. The L{setLineMode} and L{setRawMode}
193
methods switch between the two modes.
195
This is useful for line-oriented protocols such as IRC, HTTP, POP, etc.
197
@cvar delimiter: The line-ending delimiter to use. By default this is
199
@cvar MAX_LENGTH: The maximum length of a line to allow (If a
200
sent line is longer than this, the connection is dropped).
208
def clearLineBuffer(self):
209
"""Clear buffered data."""
212
def dataReceived(self, data):
213
"""Protocol.dataReceived.
214
Translates bytes into lines, and calls lineReceived (or
215
rawDataReceived, depending on mode.)
217
self.__buffer = self.__buffer+data
218
while self.line_mode and not self.paused:
220
line, self.__buffer = self.__buffer.split(self.delimiter, 1)
222
if len(self.__buffer) > self.MAX_LENGTH:
223
line, self.__buffer = self.__buffer, ''
224
return self.lineLengthExceeded(line)
227
linelength = len(line)
228
if linelength > self.MAX_LENGTH:
229
exceeded = line + self.__buffer
231
return self.lineLengthExceeded(exceeded)
232
why = self.lineReceived(line)
233
if why or self.transport and self.transport.disconnecting:
240
return self.rawDataReceived(data)
242
def setLineMode(self, extra=''):
243
"""Sets the line-mode of this receiver.
245
If you are calling this from a rawDataReceived callback,
246
you can pass in extra unhandled data, and that data will
247
be parsed for lines. Further data received will be sent
248
to lineReceived rather than rawDataReceived.
250
Do not pass extra data if calling this function from
251
within a lineReceived callback.
255
return self.dataReceived(extra)
257
def setRawMode(self):
258
"""Sets the raw mode of this receiver.
259
Further data received will be sent to rawDataReceived rather
264
def rawDataReceived(self, data):
265
"""Override this for when raw data is received.
267
raise NotImplementedError
269
def lineReceived(self, line):
270
"""Override this for when each line is received.
272
raise NotImplementedError
274
def sendLine(self, line):
275
"""Sends a line to the other end of the connection.
277
return self.transport.write(line + self.delimiter)
279
def lineLengthExceeded(self, line):
280
"""Called when the maximum line length has been reached.
281
Override if it needs to be dealt with in some special way.
283
The argument 'line' contains the remainder of the buffer, starting
284
with (at least some part) of the line which is too long. This may
285
be more than one line, or may be only the initial portion of the
288
return self.transport.loseConnection()
291
class Int32StringReceiver(protocol.Protocol, _PauseableMixin):
292
"""A receiver for int32-prefixed strings.
294
An int32 string is a string prefixed by 4 bytes, the 32-bit length of
295
the string encoded in network byte order.
297
This class publishes the same interface as NetstringReceiver.
303
def stringReceived(self, msg):
306
raise NotImplementedError
308
def dataReceived(self, recd):
309
"""Convert int32 prefixed strings into calls to stringReceived.
311
self.recvd = self.recvd + recd
312
while len(self.recvd) > 3 and not self.paused:
313
length ,= struct.unpack("!i",self.recvd[:4])
314
if length > self.MAX_LENGTH:
315
self.transport.loseConnection()
317
if len(self.recvd) < length+4:
319
packet = self.recvd[4:length+4]
320
self.recvd = self.recvd[length+4:]
321
self.stringReceived(packet)
323
def sendString(self, data):
324
"""Send an int32-prefixed string to the other end of the connection.
326
self.transport.write(struct.pack("!i",len(data))+data)
329
class Int16StringReceiver(protocol.Protocol, _PauseableMixin):
330
"""A receiver for int16-prefixed strings.
332
An int16 string is a string prefixed by 2 bytes, the 16-bit length of
333
the string encoded in network byte order.
335
This class publishes the same interface as NetstringReceiver.
340
def stringReceived(self, msg):
343
raise NotImplementedError
345
def dataReceived(self, recd):
346
"""Convert int16 prefixed strings into calls to stringReceived.
348
self.recvd = self.recvd + recd
349
while len(self.recvd) > 1 and not self.paused:
350
length = (ord(self.recvd[0]) * 256) + ord(self.recvd[1])
351
if len(self.recvd) < length+2:
353
packet = self.recvd[2:length+2]
354
self.recvd = self.recvd[length+2:]
355
self.stringReceived(packet)
357
def sendString(self, data):
358
"""Send an int16-prefixed string to the other end of the connection.
360
assert len(data) < 65536, "message too long"
361
self.transport.write(struct.pack("!h",len(data)) + data)
364
class StatefulStringProtocol:
365
"""A stateful string protocol.
367
This is a mixin for string protocols (Int32StringReceiver,
368
NetstringReceiver) which translates stringReceived into a callback
369
(prefixed with 'proto_') depending on state."""
373
def stringReceived(self,string):
374
"""Choose a protocol phase function and call it.
376
Call back to the appropriate protocol phase; this begins with
377
the function proto_init and moves on to proto_* depending on
378
what each proto_* function returns. (For example, if
379
self.proto_init returns 'foo', then self.proto_foo will be the
380
next function called when a protocol message is received.
383
pto = 'proto_'+self.state
384
statehandler = getattr(self,pto)
385
except AttributeError:
386
log.msg('callback',self.state,'not found')
388
self.state = statehandler(string)
389
if self.state == 'done':
390
self.transport.loseConnection()
393
"""A producer that sends the contents of a file to a consumer.
395
This is a helper for protocols that, at some point, will take a
396
file-like object, read its contents, and write them out to the network,
397
optionally performing some transformation on the bytes in between.
399
This API is unstable.
401
implements(interfaces.IProducer)
408
def beginFileTransfer(self, file, consumer, transform = None):
409
"""Begin transferring a file
411
@type file: Any file-like object
412
@param file: The file object to read data from
414
@type consumer: Any implementor of IConsumer
415
@param consumer: The object to write data to
417
@param transform: A callable taking one string argument and returning
418
the same. All bytes read from the file are passed through this before
419
being written to the consumer.
422
@return: A deferred whose callback will be invoked when the file has been
423
completely written to the consumer. The last byte written to the consumer
424
is passed to the callback.
427
self.consumer = consumer
428
self.transform = transform
430
self.deferred = deferred = defer.Deferred()
431
self.consumer.registerProducer(self, False)
434
def resumeProducing(self):
437
chunk = self.file.read(self.CHUNK_SIZE)
440
self.consumer.unregisterProducer()
442
self.deferred.callback(self.lastSent)
447
chunk = self.transform(chunk)
448
self.consumer.write(chunk)
449
self.lastSent = chunk[-1]
451
def pauseProducing(self):
454
def stopProducing(self):
456
self.deferred.errback(Exception("Consumer asked us to stop producing"))