~jamalta/+junk/rtmpdump

« back to all changes in this revision

Viewing changes to rtmpy/rtmp/__init__.py

  • Committer: Jamal Fanaian
  • Date: 2009-11-16 20:52:13 UTC
  • Revision ID: jfanaian@its-4-20091116205213-oajo3qgo21i5yc7g
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# -*- test-case-name: rtmpy.tests.test_rtmp -*-
 
2
# Copyright (c) 2007-2009 The RTMPy Project.
 
3
# See LICENSE for details.
 
4
 
 
5
"""
 
6
RTMP implementation.
 
7
 
 
8
The Real Time Messaging Protocol (RTMP) is a protocol that is primarily used
 
9
to stream audio and video over the internet to the
 
10
U{Adobe Flash Player<http://en.wikipedia.org/wiki/Flash_Player>}.
 
11
 
 
12
The protocol is a container for data packets which may be
 
13
U{AMF<http://osflash.org/documentation/amf>} or raw audio/video data like
 
14
found in U{FLV<http://osflash.org/flv>}. A single connection is capable of
 
15
multiplexing many NetStreams using different channels. Within these channels
 
16
packets are split up into fixed size body chunks.
 
17
 
 
18
@see: U{RTMP (external)<http://rtmpy.org/wiki/RTMP>}
 
19
@since: 0.1
 
20
"""
 
21
 
 
22
from twisted.internet import protocol, defer, error
 
23
from zope.interface import implements
 
24
 
 
25
from rtmpy.rtmp import interfaces, stream, scheduler, status, event
 
26
from rtmpy import util
 
27
 
 
28
#: Set this to C{True} to force all rtmp.* instances to log debugging messages
 
29
DEBUG = False
 
30
 
 
31
#: The default RTMP port is a registered port at U{IANA<http://iana.org>}
 
32
RTMP_PORT = 1935
 
33
 
 
34
#: Maximum number of streams that can be active per RTMP stream
 
35
MAX_STREAMS = 0xffff
 
36
 
 
37
 
 
38
def log(obj, msg):
 
39
    """
 
40
    Used to log interesting messages from within this module (and submodules).
 
41
    """
 
42
    print repr(obj), msg
 
43
 
 
44
 
 
45
class BaseError(Exception):
 
46
    """
 
47
    Base error class for all RTMP related errors.
 
48
    """
 
49
 
 
50
 
 
51
class ErrorLoggingCodecObserver(object):
 
52
    """
 
53
    """
 
54
 
 
55
    def __init__(self, protocol, codec):
 
56
        self.protocol = protocol
 
57
        self.codec = codec
 
58
 
 
59
        self.codec.registerObserver(self)
 
60
 
 
61
    def started(self):
 
62
        """
 
63
        """
 
64
        self.codec.deferred.addErrback(self.protocol.logAndDisconnect)
 
65
 
 
66
    def stopped(self):
 
67
        """
 
68
        """
 
69
 
 
70
 
 
71
class BaseProtocol(protocol.Protocol):
 
72
    """
 
73
    Provides basic handshaking and RTMP protocol support.
 
74
 
 
75
    @ivar state: The state of the protocol. Can be either C{HANDSHAKE} or
 
76
        C{STREAM}.
 
77
    @type state: C{str}
 
78
    @ivar encrypted: The connection is encrypted (or requested to be
 
79
        encrypted)
 
80
    @type encrypted: C{bool}
 
81
    """
 
82
 
 
83
    implements(
 
84
        interfaces.IHandshakeObserver,
 
85
        interfaces.IStreamManager,
 
86
    )
 
87
 
 
88
    HANDSHAKE = 'handshake'
 
89
    STREAM = 'stream'
 
90
 
 
91
    #: This value is based on tcp dumps from FME <-> FMS 3.5
 
92
    bytesReadInterval = 1251810L
 
93
 
 
94
    def __init__(self):
 
95
        self.debug = DEBUG
 
96
 
 
97
    def buildHandshakeNegotiator(self):
 
98
        """
 
99
        Builds and returns an object that will handle the handshake phase of
 
100
        the connection.
 
101
 
 
102
        @raise NotImplementedError:  Must be implemented by subclasses.
 
103
        """
 
104
        raise NotImplementedError()
 
105
 
 
106
    def connectionMade(self):
 
107
        """
 
108
        """
 
109
        if self.debug or DEBUG:
 
110
            log(self, "Connection made")
 
111
 
 
112
        protocol.Protocol.connectionMade(self)
 
113
 
 
114
        self.encoder = None
 
115
        self.decoder = None
 
116
 
 
117
        self.state = BaseProtocol.HANDSHAKE
 
118
        self.handshaker = self.buildHandshakeNegotiator()
 
119
 
 
120
    def connectionLost(self, reason):
 
121
        """
 
122
        Called when the connection is lost for some reason.
 
123
 
 
124
        Cleans up any timeouts/buffer etc.
 
125
        """
 
126
        protocol.Protocol.connectionLost(self, reason)
 
127
 
 
128
        if self.debug or DEBUG:
 
129
            log(self, "Lost connection (reason:%s)" % str(reason))
 
130
 
 
131
        if self.decoder:
 
132
            self.decoder.pause()
 
133
 
 
134
        if self.encoder:
 
135
            self.encoder.pause()
 
136
 
 
137
    def decodeHandshake(self, data):
 
138
        """
 
139
        @see: U{RTMP handshake on OSFlash (external)
 
140
        <http://osflash.org/documentation/rtmp#handshake>} for more info.
 
141
        """
 
142
        self.handshaker.dataReceived(data)
 
143
 
 
144
    def decodeStream(self, data):
 
145
        """
 
146
        """
 
147
        self.decoder.dataReceived(data)
 
148
 
 
149
    def logAndDisconnect(self, failure=None):
 
150
        """
 
151
        """
 
152
        if failure is not None:
 
153
            log(self, 'error %r' % (failure,))
 
154
            log(self, failure.getBriefTraceback())
 
155
 
 
156
        self.transport.loseConnection()
 
157
 
 
158
    def dataReceived(self, data):
 
159
        """
 
160
        Called when data is received from the underlying L{transport}.
 
161
        """
 
162
        if self.state is BaseProtocol.STREAM:
 
163
            self.decodeStream(data)
 
164
        elif self.state is BaseProtocol.HANDSHAKE:
 
165
            self.decodeHandshake(data)
 
166
        else:
 
167
            self.transport.loseConnection()
 
168
 
 
169
            raise RuntimeError('Unknown state %r' % (self.state,))
 
170
 
 
171
    # interfaces.IHandshakeObserver
 
172
 
 
173
    def handshakeSuccess(self):
 
174
        """
 
175
        Called when the RTMP handshake was successful. Once called, packet
 
176
        streaming can commence.
 
177
        """
 
178
        from rtmpy.rtmp import codec
 
179
 
 
180
        if self.debug or DEBUG:
 
181
            log(self, "Successful handshake")
 
182
 
 
183
        self.state = self.STREAM
 
184
        self.streams = {}
 
185
        self.activeStreams = []
 
186
 
 
187
        self.decoder = codec.Decoder(self)
 
188
        self.encoder = codec.Encoder(self)
 
189
 
 
190
        ErrorLoggingCodecObserver(self, self.decoder)
 
191
        ErrorLoggingCodecObserver(self, self.encoder)
 
192
 
 
193
        self.encoder.registerConsumer(self.transport)
 
194
 
 
195
        del self.handshaker
 
196
 
 
197
        # TODO: slot in support for RTMPE
 
198
 
 
199
    def handshakeFailure(self, reason):
 
200
        """
 
201
        Called when the RTMP handshake failed for some reason. Drops the
 
202
        connection immediately.
 
203
        """
 
204
        if self.debug or DEBUG:
 
205
            log(self, "Failed handshake (reason:%s)" % str(reason))
 
206
 
 
207
        self.transport.loseConnection()
 
208
 
 
209
    def write(self, data):
 
210
        """
 
211
        """
 
212
        self.transport.write(data)
 
213
 
 
214
    def writePacket(self, *args, **kwargs):
 
215
        """
 
216
        """
 
217
        return self.encoder.writePacket(*args, **kwargs)
 
218
 
 
219
    # interfaces.IStreamManager
 
220
 
 
221
    def registerStream(self, streamId, stream):
 
222
        """
 
223
        """
 
224
        if streamId < 0 or streamId > MAX_STREAMS:
 
225
            raise ValueError('streamId is not in range (got:%r)' % (streamId,))
 
226
 
 
227
        self.streams[streamId] = stream
 
228
        self.activeStreams.append(streamId)
 
229
        stream.streamId = streamId
 
230
 
 
231
    def getStream(self, streamId):
 
232
        """
 
233
        """
 
234
        try:
 
235
            return self.streams[streamId]
 
236
        except KeyError:
 
237
            return self.createNewStream(streamId)
 
238
 
 
239
    def removeStream(self, streamId):
 
240
        """
 
241
        Removes a stream from this connection.
 
242
 
 
243
        @param streamId: The id of the stream.
 
244
        @type streamId: C{int}
 
245
        @return: The stream that has been removed from the connection.
 
246
        """
 
247
        try:
 
248
            s = self.streams[streamId]
 
249
        except KeyError:
 
250
            raise IndexError('Unknown streamId %r' % (streamId,))
 
251
 
 
252
        del self.streams[streamId]
 
253
        self.activeStreams.remove(streamId)
 
254
 
 
255
        return s
 
256
 
 
257
    def getNextAvailableStreamId(self):
 
258
        """
 
259
        """
 
260
        if len(self.activeStreams) == MAX_STREAMS:
 
261
            return None
 
262
 
 
263
        self.activeStreams.sort()
 
264
        i = 0
 
265
 
 
266
        for j, streamId in enumerate(self.activeStreams):
 
267
            if j != i:
 
268
                return i
 
269
 
 
270
            i += 1
 
271
 
 
272
        return i
 
273
 
 
274
    def bytesRead(self, bytes):
 
275
        """
 
276
        """
 
277
        s = self.getStream(0)
 
278
 
 
279
        # XXX: hack we force the timestamp to 0 - see #55
 
280
        s.timestamp = 0
 
281
 
 
282
        s.writeEvent(event.BytesRead(bytes), channelId=2)
 
283
 
 
284
 
 
285
class ClientProtocol(BaseProtocol):
 
286
    """
 
287
    A very basic RTMP protocol that will act like a client.
 
288
    """
 
289
 
 
290
    def buildHandshakeNegotiator(self):
 
291
        """
 
292
        Generate a client handshake negotiator.
 
293
 
 
294
        @rtype: L{handshake.ClientNegotiator}
 
295
        """
 
296
        from rtmpy.rtmp import handshake
 
297
 
 
298
        return handshake.ClientNegotiator(self)
 
299
 
 
300
    def connectionMade(self):
 
301
        """
 
302
        Called when a connection is made to the RTMP server. Will begin
 
303
        handshake negotiations.
 
304
        """
 
305
        BaseProtocol.connectionMade(self)
 
306
 
 
307
        self.handshaker.start()
 
308
 
 
309
 
 
310
class ClientFactory(protocol.ClientFactory):
 
311
    """
 
312
    A helper class to provide a L{ClientProtocol} factory.
 
313
    """
 
314
 
 
315
    protocol = ClientProtocol