1
# -*- test-case-name: rtmpy.tests.test_rtmp -*-
2
# Copyright (c) 2007-2009 The RTMPy Project.
3
# See LICENSE for details.
8
The Real Time Messaging Protocol (RTMP) is a protocol that is primarily used
9
to stream audio and video over the internet to the
10
U{Adobe Flash Player<http://en.wikipedia.org/wiki/Flash_Player>}.
12
The protocol is a container for data packets which may be
13
U{AMF<http://osflash.org/documentation/amf>} or raw audio/video data like
14
found in U{FLV<http://osflash.org/flv>}. A single connection is capable of
15
multiplexing many NetStreams using different channels. Within these channels
16
packets are split up into fixed size body chunks.
18
@see: U{RTMP (external)<http://rtmpy.org/wiki/RTMP>}
22
from twisted.internet import protocol, defer, error
23
from zope.interface import implements
25
from rtmpy.rtmp import interfaces, stream, scheduler, status, event
26
from rtmpy import util
28
#: Set this to C{True} to force all rtmp.* instances to log debugging messages
31
#: The default RTMP port is a registered port at U{IANA<http://iana.org>}
34
#: Maximum number of streams that can be active per RTMP stream
40
Used to log interesting messages from within this module (and submodules).
45
class BaseError(Exception):
47
Base error class for all RTMP related errors.
51
class ErrorLoggingCodecObserver(object):
55
def __init__(self, protocol, codec):
56
self.protocol = protocol
59
self.codec.registerObserver(self)
64
self.codec.deferred.addErrback(self.protocol.logAndDisconnect)
71
class BaseProtocol(protocol.Protocol):
73
Provides basic handshaking and RTMP protocol support.
75
@ivar state: The state of the protocol. Can be either C{HANDSHAKE} or
78
@ivar encrypted: The connection is encrypted (or requested to be
80
@type encrypted: C{bool}
84
interfaces.IHandshakeObserver,
85
interfaces.IStreamManager,
88
HANDSHAKE = 'handshake'
91
#: This value is based on tcp dumps from FME <-> FMS 3.5
92
bytesReadInterval = 1251810L
97
def buildHandshakeNegotiator(self):
99
Builds and returns an object that will handle the handshake phase of
102
@raise NotImplementedError: Must be implemented by subclasses.
104
raise NotImplementedError()
106
def connectionMade(self):
109
if self.debug or DEBUG:
110
log(self, "Connection made")
112
protocol.Protocol.connectionMade(self)
117
self.state = BaseProtocol.HANDSHAKE
118
self.handshaker = self.buildHandshakeNegotiator()
120
def connectionLost(self, reason):
122
Called when the connection is lost for some reason.
124
Cleans up any timeouts/buffer etc.
126
protocol.Protocol.connectionLost(self, reason)
128
if self.debug or DEBUG:
129
log(self, "Lost connection (reason:%s)" % str(reason))
137
def decodeHandshake(self, data):
139
@see: U{RTMP handshake on OSFlash (external)
140
<http://osflash.org/documentation/rtmp#handshake>} for more info.
142
self.handshaker.dataReceived(data)
144
def decodeStream(self, data):
147
self.decoder.dataReceived(data)
149
def logAndDisconnect(self, failure=None):
152
if failure is not None:
153
log(self, 'error %r' % (failure,))
154
log(self, failure.getBriefTraceback())
156
self.transport.loseConnection()
158
def dataReceived(self, data):
160
Called when data is received from the underlying L{transport}.
162
if self.state is BaseProtocol.STREAM:
163
self.decodeStream(data)
164
elif self.state is BaseProtocol.HANDSHAKE:
165
self.decodeHandshake(data)
167
self.transport.loseConnection()
169
raise RuntimeError('Unknown state %r' % (self.state,))
171
# interfaces.IHandshakeObserver
173
def handshakeSuccess(self):
175
Called when the RTMP handshake was successful. Once called, packet
176
streaming can commence.
178
from rtmpy.rtmp import codec
180
if self.debug or DEBUG:
181
log(self, "Successful handshake")
183
self.state = self.STREAM
185
self.activeStreams = []
187
self.decoder = codec.Decoder(self)
188
self.encoder = codec.Encoder(self)
190
ErrorLoggingCodecObserver(self, self.decoder)
191
ErrorLoggingCodecObserver(self, self.encoder)
193
self.encoder.registerConsumer(self.transport)
197
# TODO: slot in support for RTMPE
199
def handshakeFailure(self, reason):
201
Called when the RTMP handshake failed for some reason. Drops the
202
connection immediately.
204
if self.debug or DEBUG:
205
log(self, "Failed handshake (reason:%s)" % str(reason))
207
self.transport.loseConnection()
209
def write(self, data):
212
self.transport.write(data)
214
def writePacket(self, *args, **kwargs):
217
return self.encoder.writePacket(*args, **kwargs)
219
# interfaces.IStreamManager
221
def registerStream(self, streamId, stream):
224
if streamId < 0 or streamId > MAX_STREAMS:
225
raise ValueError('streamId is not in range (got:%r)' % (streamId,))
227
self.streams[streamId] = stream
228
self.activeStreams.append(streamId)
229
stream.streamId = streamId
231
def getStream(self, streamId):
235
return self.streams[streamId]
237
return self.createNewStream(streamId)
239
def removeStream(self, streamId):
241
Removes a stream from this connection.
243
@param streamId: The id of the stream.
244
@type streamId: C{int}
245
@return: The stream that has been removed from the connection.
248
s = self.streams[streamId]
250
raise IndexError('Unknown streamId %r' % (streamId,))
252
del self.streams[streamId]
253
self.activeStreams.remove(streamId)
257
def getNextAvailableStreamId(self):
260
if len(self.activeStreams) == MAX_STREAMS:
263
self.activeStreams.sort()
266
for j, streamId in enumerate(self.activeStreams):
274
def bytesRead(self, bytes):
277
s = self.getStream(0)
279
# XXX: hack we force the timestamp to 0 - see #55
282
s.writeEvent(event.BytesRead(bytes), channelId=2)
285
class ClientProtocol(BaseProtocol):
287
A very basic RTMP protocol that will act like a client.
290
def buildHandshakeNegotiator(self):
292
Generate a client handshake negotiator.
294
@rtype: L{handshake.ClientNegotiator}
296
from rtmpy.rtmp import handshake
298
return handshake.ClientNegotiator(self)
300
def connectionMade(self):
302
Called when a connection is made to the RTMP server. Will begin
303
handshake negotiations.
305
BaseProtocol.connectionMade(self)
307
self.handshaker.start()
310
class ClientFactory(protocol.ClientFactory):
312
A helper class to provide a L{ClientProtocol} factory.
315
protocol = ClientProtocol