~ubuntu-branches/ubuntu/lucid/pitivi/lucid

« back to all changes in this revision

Viewing changes to pitivi/pipeline.py

  • Committer: Bazaar Package Importer
  • Author(s): Sebastian Dröge
  • Date: 2009-05-27 14:22:49 UTC
  • mfrom: (1.2.1 upstream) (3.1.13 experimental)
  • Revision ID: james.westby@ubuntu.com-20090527142249-tj0qnkc37320ylml
New upstream release.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# PiTiVi , Non-linear video editor
 
2
#
 
3
#       pitivi/pipeline.py
 
4
#
 
5
# Copyright (c) 2009, Edward Hervey <bilboed@bilboed.com>
 
6
#
 
7
# This program is free software; you can redistribute it and/or
 
8
# modify it under the terms of the GNU Lesser General Public
 
9
# License as published by the Free Software Foundation; either
 
10
# version 2.1 of the License, or (at your option) any later version.
 
11
#
 
12
# This program is distributed in the hope that it will be useful,
 
13
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
14
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 
15
# Lesser General Public License for more details.
 
16
#
 
17
# You should have received a copy of the GNU Lesser General Public
 
18
# License along with this program; if not, write to the
 
19
# Free Software Foundation, Inc., 59 Temple Place - Suite 330,
 
20
# Boston, MA 02111-1307, USA.
 
21
 
 
22
"""
 
23
High-level pipelines
 
24
"""
 
25
from threading import Lock
 
26
from pitivi.signalinterface import Signallable
 
27
from pitivi.factories.base import SourceFactory, SinkFactory
 
28
from pitivi.action import ActionError
 
29
from pitivi.stream import get_src_pads_for_stream, \
 
30
     get_sink_pads_for_stream, get_stream_for_caps
 
31
from pitivi.log.loggable import Loggable
 
32
import gobject
 
33
import gst
 
34
 
 
35
(STATE_NULL,
 
36
 STATE_READY,
 
37
 STATE_PAUSED,
 
38
 STATE_PLAYING) = (gst.STATE_NULL, gst.STATE_READY, gst.STATE_PAUSED, gst.STATE_PLAYING)
 
39
 
 
40
# FIXME : define/document a proper hierarchy
 
41
class PipelineError(Exception):
 
42
    pass
 
43
 
 
44
# TODO : Add a convenience method to automatically do the following process:
 
45
#  * Creating a Pipeline
 
46
#  * Creating an Action
 
47
#  * Setting the Action on the Pipeline
 
48
#  * (optional) Adding producer/consumer factories in the Pipeline
 
49
#  * (optional) Setting the producer/consumer on the Action
 
50
#  * (optional) Activating the Action
 
51
# Maybe that convenience method could be put in a higher-level module, like the
 
52
# one that handles all the Pipelines existing in the application.
 
53
 
 
54
class FactoryEntry(object):
 
55
    def __init__(self, factory):
 
56
        self.factory = factory
 
57
        self.streams = {}
 
58
 
 
59
    def __repr__(self):
 
60
        return "<FactoryEntry %s>" % self.factory
 
61
 
 
62
class StreamEntry(object):
 
63
    def __init__(self, factory_entry, stream, parent=None):
 
64
        self.factory_entry = factory_entry
 
65
        self.stream = stream
 
66
        self.bin = None
 
67
        self.bin_use_count = 0
 
68
        self.tee = None
 
69
        self.tee_use_count = 0
 
70
        self.queue = None
 
71
        self.queue_use_count = 0
 
72
        self.parent = parent
 
73
 
 
74
    def findBinEntry(self):
 
75
        entry = self
 
76
        while entry is not None:
 
77
            if entry.bin is not None:
 
78
                break
 
79
 
 
80
            entry = entry.parent
 
81
 
 
82
        return entry
 
83
 
 
84
    def __repr__(self):
 
85
        return "<StreamEntry %s '%s'>" % (self.factory_entry.factory, self.stream)
 
86
 
 
87
class Pipeline(Signallable, Loggable):
 
88
    """
 
89
    A container for all multimedia processing.
 
90
 
 
91
    The Pipeline is only responsible for:
 
92
     - State changes
 
93
     - Position seeking
 
94
     - Position Querying
 
95
       - Along with an periodic callback (optional)
 
96
 
 
97
    You can set L{Action}s on it, which are responsible for choosing which
 
98
    C{ObjectFactories} should be used, and how they should be linked.
 
99
 
 
100
    Signals:
 
101
     - C{action-added} : A new L{Action} was added.
 
102
     - C{action-removed} : An L{Action} was removed.
 
103
     - C{factory-added} : An L{ObjectFactory} was added.
 
104
     - C{factory-removed} : An L{ObjectFactory} was removed.
 
105
     - C{state-changed} : The state of the pipeline changed.
 
106
     - C{position} : The current position of the pipeline changed.
 
107
     - C{unhandled-stream} : A factory produced a stream which wasn't handled
 
108
       by any of the L{Action}s.
 
109
     - C{eos} : The Pipeline has finished playing.
 
110
     - C{error} : An error happened.
 
111
 
 
112
    @ivar actions: The Action(s) currently used.
 
113
    @type actions: List of L{Action}
 
114
    @ivar factories: The ObjectFactories handled by the Pipeline.
 
115
    @type factories: List of L{ObjectFactory}
 
116
    @ivar bins: The gst.Bins used, FOR ACTION USAGE ONLY
 
117
    @type bins: Dictionnary of L{ObjectFactory} to C{gst.Bin}
 
118
    @ivar tees: The tees used after producers, FOR ACTION USAGE ONLY
 
119
    @type tees: Dictionnary of (L{SourceFactory},L{MultimediaStream}) to C{gst.Element}
 
120
    @ivar queues: The queues used before consumers, FOR ACTION USAGE ONLY
 
121
    @type queues: Dictionnary of (L{SinkFactory},L{MultimediaStream}) to C{gst.Element}
 
122
    """
 
123
 
 
124
    __signals__ = {
 
125
        "action-added" : ["action"],
 
126
        "action-removed" : ["action"],
 
127
        "factory-added" : ["factory"],
 
128
        "factory-removed" : ["factory"],
 
129
        "state-changed" : ["state"],
 
130
        "position" : ["position"],
 
131
        "duration-changed" : ["duration"],
 
132
        "unhandled-stream" : ["factory", "stream"],
 
133
        "eos" : [],
 
134
        "error" : ["message", "details"],
 
135
        "element-message": ["message"]
 
136
        }
 
137
 
 
138
    def __init__(self):
 
139
        Loggable.__init__(self)
 
140
        self._lock = Lock()
 
141
        self._pipeline = gst.Pipeline()
 
142
        self._bus = self._pipeline.get_bus()
 
143
        self._bus.add_signal_watch()
 
144
        self._bus.connect("message", self._busMessageCb)
 
145
        self._bus.set_sync_handler(self._busSyncMessageHandler)
 
146
        self.factories = {}
 
147
        self.actions = []
 
148
        self._listening = False # for the position handler
 
149
        self._listeningInterval = 300 # default 300ms
 
150
        self._listeningSigId = 0
 
151
        self._stream_entry_from_pad = {}
 
152
 
 
153
    def release(self):
 
154
        """
 
155
        Release the L{Pipeline} and all used L{ObjectFactory} and
 
156
        L{Action}s.
 
157
 
 
158
        Call this method when the L{Pipeline} is no longer used. Forgetting to do
 
159
        so will result in memory loss.
 
160
 
 
161
        @postcondition: The L{Pipeline} will no longer be usable.
 
162
        """
 
163
        self._listenToPosition(False)
 
164
        self._bus.disconnect_by_func(self._busMessageCb)
 
165
        self._bus.remove_signal_watch()
 
166
        self._bus.set_sync_handler(None)
 
167
        self.setState(STATE_NULL)
 
168
        self._bus = None
 
169
        self._pipeline = None
 
170
        self.factories = {}
 
171
        for i in [x for x in self.actions if x.isActive()]:
 
172
            i.deactivate()
 
173
            self.removeAction(i)
 
174
 
 
175
    #{ Action-related methods
 
176
 
 
177
    def addAction(self, action):
 
178
        """
 
179
        Add the given L{Action} to the Pipeline.
 
180
 
 
181
        @return: The L{Action} that was set
 
182
        @rtype: L{Action}
 
183
        @raise PipelineError: If the given L{Action} is already set to another
 
184
        Pipeline
 
185
        """
 
186
        self.debug("action:%r", action)
 
187
        if action in self.actions:
 
188
            self.debug("Action is already used by this Pipeline, returning")
 
189
            return action
 
190
 
 
191
        if action.pipeline != None:
 
192
            raise PipelineError("Action is set to another pipeline (%r)" % action.pipeline)
 
193
 
 
194
        action.setPipeline(self)
 
195
        self.debug("Adding action to list of actions")
 
196
        self.actions.append(action)
 
197
        self.debug("Emitting 'action-added' signal")
 
198
        self.emit("action-added", action)
 
199
        self.debug("Returning")
 
200
        return action
 
201
 
 
202
    def setAction(self, action):
 
203
        """
 
204
        Set the given L{Action} on the L{Pipeline}.
 
205
        If an L{Action} of the same type already exists in the L{Pipeline} the
 
206
        L{Action} will not be set.
 
207
 
 
208
        @see: L{addAction}, L{removeAction}
 
209
 
 
210
        @param action: The L{Action} to set on the L{Pipeline}
 
211
        @type action: L{Action}
 
212
        @rtype: L{Action}
 
213
        @return: The L{Action} used. Might be different from the one given as
 
214
        input.
 
215
        """
 
216
        self.debug("action:%r", action)
 
217
        for ac in self.actions:
 
218
            if type(action) == type(ac):
 
219
                self.debug("We already have a %r Action : %r", type(action), ac)
 
220
                return ac
 
221
        return self.addAction(action)
 
222
 
 
223
    def removeAction(self, action):
 
224
        """
 
225
        Remove the given L{Action} from the L{Pipeline}.
 
226
 
 
227
        @precondition: Can only be done if both:
 
228
         - The L{Pipeline} is in READY or NULL
 
229
         - The L{Action} is de-activated
 
230
 
 
231
        @see: L{addAction}, L{setAction}
 
232
 
 
233
        @param action: The L{Action} to remove from the L{Pipeline}
 
234
        @type action: L{Action}
 
235
        @rtype: L{bool}
 
236
        @return: Whether the L{Action} was removed from the L{Pipeline} or not.
 
237
        @raise PipelineError: If L{Action} is activated and L{Pipeline} is not
 
238
        READY or NULL
 
239
        """
 
240
        self.debug("action:%r", action)
 
241
        if not action in self.actions:
 
242
            self.debug("action not controlled by this Pipeline, returning")
 
243
            return
 
244
        if action.isActive():
 
245
            res, current, pending = self._pipeline.get_state(0)
 
246
            if current > STATE_READY or pending > STATE_READY:
 
247
                raise PipelineError("Active actions can't be removed from PLAYING or PAUSED Pipeline")
 
248
        try:
 
249
            action.unsetPipeline()
 
250
        except ActionError:
 
251
            raise PipelineError("Can't unset Pipeline from Action")
 
252
        self.actions.remove(action)
 
253
        self.emit('action-removed', action)
 
254
 
 
255
    #{ State-related methods
 
256
 
 
257
    def setState(self, state):
 
258
        """
 
259
        Set the L{Pipeline} to the given state.
 
260
 
 
261
        @raises PipelineError: If the C{gst.Pipeline} could not be changed to
 
262
        the requested state.
 
263
        """
 
264
        self.debug("state:%r", state)
 
265
        res = self._pipeline.set_state(state)
 
266
        if res == gst.STATE_CHANGE_FAILURE:
 
267
            raise PipelineError("Failure changing state of the gst.Pipeline")
 
268
 
 
269
    def getState(self):
 
270
        """
 
271
        Query the L{Pipeline} for the current state.
 
272
 
 
273
        @see: L{setState}
 
274
 
 
275
        This will do an actual query to the underlying GStreamer Pipeline.
 
276
        @return: The current state.
 
277
        @rtype: C{State}
 
278
        """
 
279
        change, state, pending = self._pipeline.get_state(0)
 
280
        self.debug("change:%r, state:%r, pending:%r", change, state, pending)
 
281
        return state
 
282
 
 
283
    def play(self):
 
284
        """
 
285
        Sets the L{Pipeline} to PLAYING
 
286
        """
 
287
        self.setState(STATE_PLAYING)
 
288
 
 
289
    def pause(self):
 
290
        """
 
291
        Sets the L{Pipeline} to PAUSED
 
292
        """
 
293
        self.setState(STATE_PAUSED)
 
294
 
 
295
    def stop(self):
 
296
        """
 
297
        Sets the L{Pipeline} to READY
 
298
        """
 
299
        self.setState(STATE_READY)
 
300
 
 
301
    def togglePlayback(self):
 
302
        if self.getState() == gst.STATE_PLAYING:
 
303
            self.pause()
 
304
        else:
 
305
            self.play()
 
306
 
 
307
    #{ Position and Seeking methods
 
308
 
 
309
    def getPosition(self, format=gst.FORMAT_TIME):
 
310
        """
 
311
        Get the current position of the L{Pipeline}.
 
312
 
 
313
        @param format: The format to return the current position in
 
314
        @type format: C{gst.Format}
 
315
        @return: The current position or gst.CLOCK_TIME_NONE
 
316
        @rtype: L{long}
 
317
        @raise PipelineError: If the position couldn't be obtained.
 
318
        """
 
319
        self.log("format %r", format)
 
320
        try:
 
321
            cur, format = self._pipeline.query_position(format)
 
322
        except Exception, e:
 
323
            self.handleException(e)
 
324
            raise PipelineError("Couldn't get position")
 
325
        self.log("Got position %s", gst.TIME_ARGS(cur))
 
326
        return cur
 
327
 
 
328
    def getDuration(self, format=gst.FORMAT_TIME):
 
329
        """
 
330
        Get the duration of the C{Pipeline}.
 
331
        """
 
332
        self.log("format %r", format)
 
333
        try:
 
334
            dur, format = self._pipeline.query_duration(format)
 
335
        except Exception, e:
 
336
            self.handleException(e)
 
337
            raise PipelineError("Couldn't get duration")
 
338
        self.log("Got duration %s", gst.TIME_ARGS(dur))
 
339
        self.emit("duration-changed", dur)
 
340
        return dur
 
341
 
 
342
    def activatePositionListener(self, interval=300):
 
343
        """
 
344
        Activate the position listener.
 
345
 
 
346
        When activated, the Pipeline will emit the 'position' signal at the
 
347
        specified interval when it is the PLAYING or PAUSED state.
 
348
 
 
349
        @see: L{deactivatePositionListener}
 
350
        @param interval: Interval between position queries in milliseconds
 
351
        @type interval: L{int} milliseconds
 
352
        @return: Whether the position listener was activated or not
 
353
        @rtype: L{bool}
 
354
        """
 
355
        if self._listening == True:
 
356
            return True
 
357
        self._listening = True
 
358
        self._listeningInterval = interval
 
359
        # if we're in paused or playing, switch it on
 
360
        self._listenToPosition(self.getState() == STATE_PLAYING)
 
361
        return True
 
362
 
 
363
    def deactivatePositionListener(self):
 
364
        """
 
365
        De-activates the position listener.
 
366
 
 
367
        @see: L{activatePositionListener}
 
368
        """
 
369
        self._listenToPosition(False)
 
370
        self._listening = False
 
371
 
 
372
    def _positionListenerCb(self):
 
373
        try:
 
374
            cur = self.getPosition()
 
375
            if cur != gst.CLOCK_TIME_NONE:
 
376
                self.emit('position', cur)
 
377
        finally:
 
378
            return True
 
379
 
 
380
    def _listenToPosition(self, listen=True):
 
381
        # stupid and dumm method, not many checks done
 
382
        # i.e. it does NOT check for current state
 
383
        if listen == True:
 
384
            if self._listening == True and self._listeningSigId == 0:
 
385
                self._listeningSigId = gobject.timeout_add(self._listeningInterval,
 
386
                                                           self._positionListenerCb)
 
387
        elif self._listeningSigId != 0:
 
388
            gobject.source_remove(self._listeningSigId)
 
389
            self._listeningSigId = 0
 
390
 
 
391
    def seek(self, position, format=gst.FORMAT_TIME):
 
392
        """
 
393
        Seeks in the L{Pipeline} to the given position.
 
394
 
 
395
        @param position: Position to seek to
 
396
        @type position: L{long}
 
397
        @param format: The C{Format} of the seek position
 
398
        @type format: C{gst.Format}
 
399
        @raise PipelineError: If seek failed
 
400
        """
 
401
        if format == gst.FORMAT_TIME:
 
402
            self.debug("position : %s", gst.TIME_ARGS (position))
 
403
        else:
 
404
            self.debug("position : %d , format:%d", position, format)
 
405
        # FIXME : temporarily deactivate position listener
 
406
        #self._listenToPosition(False)
 
407
 
 
408
        # clamp between [0, duration]
 
409
        if format==gst.FORMAT_TIME:
 
410
            position = max(0, min(position, self.getDuration()))
 
411
 
 
412
        res = self._pipeline.seek(1.0, format, gst.SEEK_FLAG_FLUSH,
 
413
                                  gst.SEEK_TYPE_SET, position,
 
414
                                  gst.SEEK_TYPE_NONE, -1)
 
415
        if not res:
 
416
            raise PipelineError("seek failed")
 
417
        self.debug("seeking succesfull")
 
418
        self.emit('position', position)
 
419
 
 
420
    def seekRelative(self, time):
 
421
        seekvalue = max(0, min(self.getPosition() + time,
 
422
            self.getDuration()))
 
423
        self.seek(seekvalue)
 
424
 
 
425
    #{ GStreamer object methods (For Action usage only)
 
426
 
 
427
    def _getFactoryEntryForStream(self, factory, stream, create=False):
 
428
        self.debug("factory %r, stream %r" , factory, stream)
 
429
        try:
 
430
            factory_entry = self.factories[factory]
 
431
        except KeyError:
 
432
            if not create:
 
433
                raise PipelineError()
 
434
 
 
435
            change, current, pending = self._pipeline.get_state(0)
 
436
 
 
437
            if (current > STATE_READY or pending > STATE_READY) and \
 
438
                    isinstance(factory, SourceFactory):
 
439
                raise PipelineError("Pipeline not in NULL/READY,"
 
440
                        " can not create source bin")
 
441
 
 
442
            factory_entry = FactoryEntry(factory)
 
443
            self.factories[factory] = factory_entry
 
444
 
 
445
        self.debug("Returning %s", factory_entry)
 
446
        return factory_entry
 
447
 
 
448
    def _getStreamEntryForFactoryStream(self, factory, stream=None, create=False):
 
449
        self.debug("factory %r, stream %r, create:%r", factory, stream, create)
 
450
        factory_entry = self._getFactoryEntryForStream(factory, stream, create)
 
451
        for k, v in factory_entry.streams.iteritems():
 
452
            self.debug("Stream:%r  ==>  %s", k, v)
 
453
 
 
454
        stream_entry = None
 
455
        if stream is None:
 
456
            stream_entry = factory_entry.streams.get(stream, None)
 
457
        else:
 
458
            for factory_stream, entry in factory_entry.streams.iteritems():
 
459
                if factory_stream is None:
 
460
                    continue
 
461
 
 
462
                if stream.isCompatibleWithName(factory_stream):
 
463
                    if stream_entry is None:
 
464
                        stream_entry = entry
 
465
                    elif stream is factory_stream:
 
466
                        stream_entry = entry
 
467
                        break
 
468
 
 
469
        if stream_entry is None:
 
470
            if not create:
 
471
                self.debug("Failure getting stream %s", stream)
 
472
                raise PipelineError()
 
473
 
 
474
            self.debug("Creating StreamEntry")
 
475
            stream_entry = StreamEntry(factory_entry, stream)
 
476
            factory_entry.streams[stream] = stream_entry
 
477
 
 
478
        self.debug("Returning %r", stream_entry)
 
479
        return stream_entry
 
480
 
 
481
    def getBinForFactoryStream(self, factory, stream=None, automake=False):
 
482
        """
 
483
        Fetches the C{gst.Bin} currently used in the C{gst.Pipeline} for the
 
484
        given L{ObjectFactory}. If no bin exists for the given factory and
 
485
        automake is True, one is created.
 
486
 
 
487
        The returned bin will have its reference count incremented. When you are
 
488
        done with the bin, you must call L{releaseBinForFactoryStream}.
 
489
 
 
490
        @param factory: The factory to search.
 
491
        @type factory: L{ObjectFactory}
 
492
        @param stream: stream to create a bin for
 
493
        @type stream: L{MultimediaStream} derived instance
 
494
        @param automake: If set to True, then if there is not a C{gst.Bin}
 
495
        already created for the given factory, one will be created, added to the
 
496
        list of controlled bins and added to the C{gst.Pipeline}.
 
497
        @raise PipelineError: If the factory isn't used in this pipeline.
 
498
        @raise PipelineError: If a source C{gst.Bin} needed to be created and the
 
499
        L{Pipeline} was not in the READY or NULL state.
 
500
        @raise PipelineError: If a C{gst.Bin} needed to be created but the
 
501
        creation of that C{gst.Bin} failed.
 
502
        @return: The bin corresponding to the given factory or None if there
 
503
        are none for the given factory.
 
504
        @rtype: C{gst.Bin}
 
505
        """
 
506
        self.debug("factory:%r , stream:%r , automake:%r", factory, stream, automake)
 
507
 
 
508
        stream_entry = self._getStreamEntryForFactoryStream(factory,
 
509
                                                            stream, automake)
 
510
 
 
511
        bin_entry = stream_entry.findBinEntry()
 
512
        if bin_entry is not None and bin_entry.bin is not None:
 
513
            bin_entry.bin_use_count += 1
 
514
            return bin_entry.bin
 
515
 
 
516
        if not automake:
 
517
            raise PipelineError()
 
518
 
 
519
        bin = stream_entry.bin = factory.makeBin(stream)
 
520
        stream_entry.bin_use_count += 1
 
521
        self._connectToPadSignals(bin)
 
522
        self._pipeline.add(bin)
 
523
 
 
524
        if stream is None:
 
525
            factory_entry = self._getFactoryEntryForStream(factory, stream)
 
526
 
 
527
            for stream in factory.output_streams:
 
528
                factory_entry.streams[stream] = StreamEntry(factory_entry,
 
529
                        stream, parent=stream_entry)
 
530
            for stream in factory.input_streams:
 
531
                factory_entry.streams[stream] = StreamEntry(factory_entry,
 
532
                        stream, parent=stream_entry)
 
533
 
 
534
        self.debug("Setting bin to current state")
 
535
        bin.set_state(self.getState())
 
536
 
 
537
        return bin
 
538
 
 
539
    def releaseBinForFactoryStream(self, factory, stream=None):
 
540
        """
 
541
        Release a bin returned by L{getBinForFactoryStream}.
 
542
 
 
543
        @see getBinForFactoryStream
 
544
        """
 
545
        stream_entry = self._getStreamEntryForFactoryStream(factory, stream)
 
546
        bin_stream_entry = stream_entry.findBinEntry()
 
547
 
 
548
        if bin_stream_entry == None:
 
549
            self.warning("couldn't find stream entry")
 
550
            return
 
551
 
 
552
        if bin_stream_entry.bin_use_count == 1 and \
 
553
                (stream_entry.tee_use_count > 0 or \
 
554
                stream_entry.queue_use_count > 0):
 
555
            raise PipelineError()
 
556
 
 
557
        bin_stream_entry.bin_use_count -= 1
 
558
        if bin_stream_entry.bin_use_count == 0:
 
559
            # do cleanup on our side
 
560
            self.debug("cleaning up")
 
561
            self._disconnectFromPadSignals(bin_stream_entry.bin)
 
562
            bin_stream_entry.bin.set_state(gst.STATE_NULL)
 
563
            self._pipeline.remove(bin_stream_entry.bin)
 
564
            factory_entry = bin_stream_entry.factory_entry
 
565
            del factory_entry.streams[bin_stream_entry.stream]
 
566
 
 
567
            # ask the factory to finish cleanup
 
568
            factory_entry.factory.releaseBin(bin_stream_entry.bin)
 
569
 
 
570
            bin_stream_entry.bin = None
 
571
            if not factory_entry.streams:
 
572
                del self.factories[factory_entry.factory]
 
573
 
 
574
    def getTeeForFactoryStream(self, factory, stream=None, automake=False):
 
575
        """
 
576
        Fetches the C{Tee} currently used in the C{gst.Pipeline} for the given
 
577
        L{SourceFactory}.
 
578
 
 
579
        @param factory: The factory to search.
 
580
        @type factory: L{SourceFactory}
 
581
        @param stream: The stream of the factory to use. If not specified, then
 
582
        a random stream from that factory will be used.
 
583
        @type stream: L{MultimediaStream}
 
584
        @param automake: If set to True, then if there is not a C{Tee}
 
585
        already created for the given factory/stream, one will be created, added
 
586
        to the list of controlled tees and added to the C{gst.Pipeline}.
 
587
        @raise PipelineError: If the factory isn't used in this pipeline.
 
588
        @raise PipelineError: If the factory isn't a L{SourceFactory}.
 
589
        @raise PipelineError: If a C{Tee} needed to be created but the
 
590
        creation of that C{Tee} failed.
 
591
        @return: The C{Tee} corresponding to the given factory/stream or None if
 
592
        there are none for the given factory.
 
593
        @rtype: C{gst.Element}
 
594
        """
 
595
        self.debug("factory:%r , stream:%r, automake:%r", factory, stream, automake)
 
596
        if not isinstance(factory, SourceFactory):
 
597
            raise PipelineError("Given ObjectFactory isn't a SourceFactory")
 
598
 
 
599
        stream_entry = self._getStreamEntryForFactoryStream(factory, stream)
 
600
        bin_stream_entry = stream_entry.findBinEntry()
 
601
        if bin_stream_entry is None:
 
602
            raise PipelineError()
 
603
 
 
604
        bin = bin_stream_entry.bin
 
605
 
 
606
        if stream_entry.tee is not None:
 
607
            stream_entry.tee_use_count += 1
 
608
            # have an existing tee, return it
 
609
            return stream_entry.tee
 
610
 
 
611
        if not automake:
 
612
            raise PipelineError()
 
613
 
 
614
        self.debug("Really creating a tee")
 
615
        pads = get_src_pads_for_stream(bin, stream)
 
616
        if not pads or len(pads) > 1:
 
617
            raise PipelineError("Can't figure out which source pad to use !")
 
618
 
 
619
        srcpad = pads[0]
 
620
        self.debug("Using pad %r", srcpad)
 
621
 
 
622
        stream_entry.tee = gst.element_factory_make("tee")
 
623
        self._pipeline.add(stream_entry.tee)
 
624
        stream_entry.tee_use_count += 1
 
625
        stream_entry.tee.set_state(STATE_PAUSED)
 
626
        self.debug("Linking pad %r to tee", pads[0])
 
627
        srcpad.link(stream_entry.tee.get_pad("sink"))
 
628
 
 
629
        return stream_entry.tee
 
630
 
 
631
    def releaseTeeForFactoryStream(self, factory, stream=None):
 
632
        """
 
633
        Release the tee associated with the given source factory and stream.
 
634
        If this was the last action to release the given (factory,stream), then the
 
635
        tee will be removed.
 
636
 
 
637
        This should be called by Actions when they deactivate, after having called
 
638
        releaseQueueForFactoryStream() for the consumers.
 
639
 
 
640
        @see: L{getTeeForFactoryStream}
 
641
 
 
642
        @param factory: The factory
 
643
        @type factory: L{SinkFactory}
 
644
        @param stream: The stream
 
645
        @type stream: L{MultimediaStream}
 
646
        @raise PipelineError: If the Pipeline isn't in NULL or READY.
 
647
        """
 
648
        self.debug("factory:%r, stream:%r", factory, stream)
 
649
        stream_entry = self._getStreamEntryForFactoryStream(factory, stream)
 
650
 
 
651
        if stream_entry.tee_use_count == 0:
 
652
            raise PipelineError()
 
653
 
 
654
        stream_entry.tee_use_count -= 1
 
655
        if stream_entry.tee_use_count == 0:
 
656
            bin = self.getBinForFactoryStream(factory, stream, automake=False)
 
657
            if stream_entry.tee is not None:
 
658
                bin.unlink(stream_entry.tee)
 
659
                stream_entry.tee.set_state(gst.STATE_NULL)
 
660
                self._pipeline.remove(stream_entry.tee)
 
661
                stream_entry.tee = None
 
662
            self.releaseBinForFactoryStream(factory, stream)
 
663
 
 
664
    def getQueueForFactoryStream(self, factory, stream=None, automake=False,
 
665
                                 queuesize=1):
 
666
        """
 
667
        Fetches the C{Queue} currently used in the C{gst.Pipeline} for the given
 
668
        L{SinkFactory}.
 
669
 
 
670
        @param factory: The factory to search.
 
671
        @type factory: L{SinkFactory}
 
672
        @param stream: The stream of the factory to use. If not specified, then
 
673
        a random stream from that factory will be used.
 
674
        @type stream: L{MultimediaStream}
 
675
        @param automake: If set to True, then if there is not a C{Queue}
 
676
        already created for the given factory/stream, one will be created, added
 
677
        to the list of controlled queues and added to the C{gst.Pipeline}.
 
678
        @param queuesize: The size of the queue in seconds.
 
679
        @raise PipelineError: If the factory isn't used in this pipeline.
 
680
        @raise PipelineError: If the factory isn't a L{SinkFactory}.
 
681
        @raise PipelineError: If a C{Queue} needed to be created but the
 
682
        creation of that C{Queue} failed.
 
683
        @return: The C{Queue} corresponding to the given factory/stream or None if
 
684
        there are none for the given factory.
 
685
        @rtype: C{gst.Element}
 
686
        """
 
687
        self.debug("factory %r, stream %r" , factory, stream)
 
688
        if not isinstance(factory, SinkFactory):
 
689
            raise PipelineError("Given ObjectFactory isn't a SinkFactory")
 
690
 
 
691
        stream_entry = self._getStreamEntryForFactoryStream(factory, stream)
 
692
        if stream_entry.queue is not None:
 
693
            stream_entry.queue_use_count += 1
 
694
            return stream_entry.queue
 
695
 
 
696
        if not automake:
 
697
            raise PipelineError()
 
698
 
 
699
        self.debug("Really creating a queue")
 
700
 
 
701
        bin_entry = stream_entry.findBinEntry()
 
702
        bin = bin_entry.bin
 
703
        # find the source pads compatible with the given stream
 
704
        pads = get_sink_pads_for_stream(bin, stream)
 
705
        if len(pads) > 1:
 
706
            raise PipelineError("Can't figure out which sink pad to use !")
 
707
 
 
708
        if pads == []:
 
709
            raise PipelineError("No compatible sink pads !")
 
710
 
 
711
        stream_entry.queue = gst.element_factory_make("queue")
 
712
        stream_entry.queue.props.max_size_time = queuesize * gst.SECOND
 
713
        stream_entry.queue.props.max_size_buffers = 0
 
714
        stream_entry.queue.props.max_size_bytes = 0
 
715
        self._pipeline.add(stream_entry.queue)
 
716
        stream_entry.queue.set_state(STATE_PAUSED)
 
717
 
 
718
        self.debug("Linking pad %r to queue", pads[0])
 
719
        stream_entry.queue.get_pad("src").link(pads[0])
 
720
 
 
721
        stream_entry.queue_use_count += 1
 
722
        return stream_entry.queue
 
723
 
 
724
    def releaseQueueForFactoryStream(self, factory, stream=None):
 
725
        """
 
726
        Release the queue associated with the given sink factory and stream.
 
727
 
 
728
        The queue object will be internally removed from the gst.Pipeline, along
 
729
        with the link with tee.
 
730
 
 
731
        This should be called by Actions when they deactivate.
 
732
 
 
733
        @see: L{getQueueForFactoryStream}
 
734
 
 
735
        @param factory: The factory
 
736
        @type factory: L{SinkFactory}
 
737
        @param stream: The stream
 
738
        @type stream: L{MultimediaStream}
 
739
        @raise PipelineError: If the Pipeline isn't in NULL or READY.
 
740
        """
 
741
        if not isinstance(factory, SinkFactory):
 
742
            raise PipelineError()
 
743
 
 
744
        stream_entry = self._getStreamEntryForFactoryStream(factory, stream)
 
745
        if stream_entry.queue is None:
 
746
            raise PipelineError()
 
747
 
 
748
        stream_entry.queue_use_count -= 1
 
749
        if stream_entry.queue_use_count == 0:
 
750
            self.debug("Found a corresponding queue, unlink it from the consumer")
 
751
 
 
752
            if stream_entry.bin:
 
753
                # first set the bin to NULL
 
754
                stream_entry.bin.set_state(gst.STATE_NULL)
 
755
 
 
756
                # unlink it from the sink bin
 
757
                stream_entry.queue.unlink(stream_entry.bin)
 
758
 
 
759
            self.debug("Unlinking it from the tee, if present")
 
760
            queue_sinkpad = stream_entry.queue.get_pad("sink")
 
761
            stream_entry.queue.set_state(gst.STATE_NULL)
 
762
            # figure out the peerpad
 
763
            tee_srcpad = queue_sinkpad.get_peer()
 
764
            if tee_srcpad:
 
765
                tee = tee_srcpad.get_parent()
 
766
                tee_srcpad.unlink(queue_sinkpad)
 
767
                tee.release_request_pad(tee_srcpad)
 
768
            self.debug("Removing from gst.Pipeline")
 
769
            self._pipeline.remove(stream_entry.queue)
 
770
            stream_entry.queue = None
 
771
 
 
772
    #}
 
773
    ## Private methods
 
774
 
 
775
    def _busMessageCb(self, unused_bus, message):
 
776
        self.info("%s [%r]" , message.type, message.src)
 
777
        if message.type == gst.MESSAGE_EOS:
 
778
            self.emit('eos')
 
779
        elif message.type == gst.MESSAGE_STATE_CHANGED and message.src == self._pipeline:
 
780
            prev, new, pending = message.parse_state_changed()
 
781
            self.debug("Pipeline change state prev:%r, new:%r, pending:%r", prev, new, pending)
 
782
 
 
783
            emit_state_change = pending == gst.STATE_VOID_PENDING
 
784
            if prev == STATE_READY and new == STATE_PAUSED:
 
785
                # trigger duration-changed
 
786
                try:
 
787
                    self.getDuration()
 
788
                except PipelineError:
 
789
                    # no sinks??
 
790
                    pass
 
791
            elif prev == STATE_PAUSED and new == STATE_PLAYING:
 
792
                self._listenToPosition(True)
 
793
            elif prev == STATE_PLAYING and new == STATE_PAUSED:
 
794
                self._listenToPosition(False)
 
795
 
 
796
            if emit_state_change:
 
797
                self.emit('state-changed', new)
 
798
 
 
799
        elif message.type == gst.MESSAGE_ERROR:
 
800
            error, detail = message.parse_error()
 
801
            self._handleErrorMessage(error, detail, message.src)
 
802
        elif message.type == gst.MESSAGE_DURATION:
 
803
            self.debug("Duration might have changed, querying it")
 
804
            gobject.idle_add(self._queryDurationAsync)
 
805
 
 
806
    def _queryDurationAsync(self, *args, **kwargs):
 
807
        try:
 
808
            self.getDuration()
 
809
        except:
 
810
            self.log("Duration failed... but we don't care")
 
811
        return False
 
812
 
 
813
    def _handleErrorMessage(self, error, detail, source):
 
814
        self.emit('error', error, detail)
 
815
 
 
816
    def _busSyncMessageHandler(self, unused_bus, message):
 
817
        if message.type == gst.MESSAGE_ELEMENT:
 
818
            # handle element message synchronously
 
819
            self.emit('element-message', message)
 
820
        return gst.BUS_PASS
 
821
 
 
822
    def _binPadAddedCb(self, bin, pad):
 
823
        self.debug("bin:%r, pad:%r (%s)", bin, pad, pad.get_caps().to_string())
 
824
        self._lock.acquire()
 
825
 
 
826
        try:
 
827
            factory = None
 
828
            stream = None
 
829
            stream_entry = None
 
830
            for factory_entry in self.factories.itervalues():
 
831
                for stream_entry in factory_entry.streams.itervalues():
 
832
                    if stream_entry.bin == bin:
 
833
                        factory = factory_entry.factory
 
834
                        stream = stream_entry.stream
 
835
                        break
 
836
                if factory is not None:
 
837
                    break
 
838
 
 
839
            if factory is None:
 
840
                raise PipelineError("New pad on an element we don't control ??")
 
841
 
 
842
            stream = get_stream_for_caps(pad.get_caps(), pad)
 
843
            if stream not in factory_entry.streams:
 
844
                factory_entry.streams[stream] = StreamEntry(factory_entry,
 
845
                        stream, parent=stream_entry)
 
846
                stream_entry = factory_entry.streams[stream]
 
847
 
 
848
            self._stream_entry_from_pad[pad] = stream_entry
 
849
 
 
850
            # ask all actions using this producer if they handle it
 
851
            compatactions = [action for action in self.actions
 
852
                             if factory in action.producers]
 
853
            self.debug("Asking all actions (%d/%d) using that producer [%r] if they can handle it",
 
854
                      len(compatactions), len(self.actions), factory)
 
855
            for a in self.actions:
 
856
                self.debug("Action %r, producers %r", a, a.producers)
 
857
            handled = False
 
858
            for action in compatactions:
 
859
                handled |= action.handleNewStream(factory, stream)
 
860
 
 
861
            if handled == False:
 
862
                self.debug("No action handled this Stream")
 
863
                self.emit('unhandled-stream', stream)
 
864
        finally:
 
865
            self.debug("Done handling new pad")
 
866
            self._lock.release()
 
867
 
 
868
 
 
869
    def _binPadRemovedCb(self, bin, pad):
 
870
        self._lock.acquire()
 
871
        try:
 
872
            self.debug("bin:%r, pad:%r", bin, pad)
 
873
            if not pad in self._stream_entry_from_pad:
 
874
                self.warning("Pad not controlled by this pipeline")
 
875
                self._lock.release()
 
876
                return
 
877
            stream_entry = self._stream_entry_from_pad.pop(pad)
 
878
            factory = stream_entry.factory_entry.factory
 
879
            stream = stream_entry.stream
 
880
 
 
881
            for action in [action for action in self.actions
 
882
                    if factory in action.producers]:
 
883
                action.streamRemoved(factory, stream)
 
884
        except:
 
885
            self._lock.release()
 
886
        self._lock.release()
 
887
 
 
888
    def _connectToPadSignals(self, bin):
 
889
        # Listen on the given bin for pads being added/removed
 
890
        bin.connect('pad-added', self._binPadAddedCb)
 
891
        bin.connect('pad-removed', self._binPadRemovedCb)
 
892
 
 
893
    def _disconnectFromPadSignals(self, bin):
 
894
        bin.disconnect_by_func(self._binPadAddedCb)
 
895
        bin.disconnect_by_func(self._binPadRemovedCb)