1
# PiTiVi , Non-linear video editor
5
# Copyright (c) 2009, Edward Hervey <bilboed@bilboed.com>
7
# This program is free software; you can redistribute it and/or
8
# modify it under the terms of the GNU Lesser General Public
9
# License as published by the Free Software Foundation; either
10
# version 2.1 of the License, or (at your option) any later version.
12
# This program is distributed in the hope that it will be useful,
13
# but WITHOUT ANY WARRANTY; without even the implied warranty of
14
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15
# Lesser General Public License for more details.
17
# You should have received a copy of the GNU Lesser General Public
18
# License along with this program; if not, write to the
19
# Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20
# Boston, MA 02111-1307, USA.
25
from threading import Lock
26
from pitivi.signalinterface import Signallable
27
from pitivi.factories.base import SourceFactory, SinkFactory
28
from pitivi.action import ActionError
29
from pitivi.stream import get_src_pads_for_stream, \
30
get_sink_pads_for_stream, get_stream_for_caps
31
from pitivi.log.loggable import Loggable
38
STATE_PLAYING) = (gst.STATE_NULL, gst.STATE_READY, gst.STATE_PAUSED, gst.STATE_PLAYING)
40
# FIXME : define/document a proper hierarchy
41
class PipelineError(Exception):
44
# TODO : Add a convenience method to automatically do the following process:
45
# * Creating a Pipeline
46
# * Creating an Action
47
# * Setting the Action on the Pipeline
48
# * (optional) Adding producer/consumer factories in the Pipeline
49
# * (optional) Setting the producer/consumer on the Action
50
# * (optional) Activating the Action
51
# Maybe that convenience method could be put in a higher-level module, like the
52
# one that handles all the Pipelines existing in the application.
54
class FactoryEntry(object):
55
def __init__(self, factory):
56
self.factory = factory
60
return "<FactoryEntry %s>" % self.factory
62
class StreamEntry(object):
63
def __init__(self, factory_entry, stream, parent=None):
64
self.factory_entry = factory_entry
67
self.bin_use_count = 0
69
self.tee_use_count = 0
71
self.queue_use_count = 0
74
def findBinEntry(self):
76
while entry is not None:
77
if entry.bin is not None:
85
return "<StreamEntry %s '%s'>" % (self.factory_entry.factory, self.stream)
87
class Pipeline(Signallable, Loggable):
89
A container for all multimedia processing.
91
The Pipeline is only responsible for:
95
- Along with an periodic callback (optional)
97
You can set L{Action}s on it, which are responsible for choosing which
98
C{ObjectFactories} should be used, and how they should be linked.
101
- C{action-added} : A new L{Action} was added.
102
- C{action-removed} : An L{Action} was removed.
103
- C{factory-added} : An L{ObjectFactory} was added.
104
- C{factory-removed} : An L{ObjectFactory} was removed.
105
- C{state-changed} : The state of the pipeline changed.
106
- C{position} : The current position of the pipeline changed.
107
- C{unhandled-stream} : A factory produced a stream which wasn't handled
108
by any of the L{Action}s.
109
- C{eos} : The Pipeline has finished playing.
110
- C{error} : An error happened.
112
@ivar actions: The Action(s) currently used.
113
@type actions: List of L{Action}
114
@ivar factories: The ObjectFactories handled by the Pipeline.
115
@type factories: List of L{ObjectFactory}
116
@ivar bins: The gst.Bins used, FOR ACTION USAGE ONLY
117
@type bins: Dictionnary of L{ObjectFactory} to C{gst.Bin}
118
@ivar tees: The tees used after producers, FOR ACTION USAGE ONLY
119
@type tees: Dictionnary of (L{SourceFactory},L{MultimediaStream}) to C{gst.Element}
120
@ivar queues: The queues used before consumers, FOR ACTION USAGE ONLY
121
@type queues: Dictionnary of (L{SinkFactory},L{MultimediaStream}) to C{gst.Element}
125
"action-added" : ["action"],
126
"action-removed" : ["action"],
127
"factory-added" : ["factory"],
128
"factory-removed" : ["factory"],
129
"state-changed" : ["state"],
130
"position" : ["position"],
131
"duration-changed" : ["duration"],
132
"unhandled-stream" : ["factory", "stream"],
134
"error" : ["message", "details"],
135
"element-message": ["message"]
139
Loggable.__init__(self)
141
self._pipeline = gst.Pipeline()
142
self._bus = self._pipeline.get_bus()
143
self._bus.add_signal_watch()
144
self._bus.connect("message", self._busMessageCb)
145
self._bus.set_sync_handler(self._busSyncMessageHandler)
148
self._listening = False # for the position handler
149
self._listeningInterval = 300 # default 300ms
150
self._listeningSigId = 0
151
self._stream_entry_from_pad = {}
155
Release the L{Pipeline} and all used L{ObjectFactory} and
158
Call this method when the L{Pipeline} is no longer used. Forgetting to do
159
so will result in memory loss.
161
@postcondition: The L{Pipeline} will no longer be usable.
163
self._listenToPosition(False)
164
self._bus.disconnect_by_func(self._busMessageCb)
165
self._bus.remove_signal_watch()
166
self._bus.set_sync_handler(None)
167
self.setState(STATE_NULL)
169
self._pipeline = None
171
for i in [x for x in self.actions if x.isActive()]:
175
#{ Action-related methods
177
def addAction(self, action):
179
Add the given L{Action} to the Pipeline.
181
@return: The L{Action} that was set
183
@raise PipelineError: If the given L{Action} is already set to another
186
self.debug("action:%r", action)
187
if action in self.actions:
188
self.debug("Action is already used by this Pipeline, returning")
191
if action.pipeline != None:
192
raise PipelineError("Action is set to another pipeline (%r)" % action.pipeline)
194
action.setPipeline(self)
195
self.debug("Adding action to list of actions")
196
self.actions.append(action)
197
self.debug("Emitting 'action-added' signal")
198
self.emit("action-added", action)
199
self.debug("Returning")
202
def setAction(self, action):
204
Set the given L{Action} on the L{Pipeline}.
205
If an L{Action} of the same type already exists in the L{Pipeline} the
206
L{Action} will not be set.
208
@see: L{addAction}, L{removeAction}
210
@param action: The L{Action} to set on the L{Pipeline}
211
@type action: L{Action}
213
@return: The L{Action} used. Might be different from the one given as
216
self.debug("action:%r", action)
217
for ac in self.actions:
218
if type(action) == type(ac):
219
self.debug("We already have a %r Action : %r", type(action), ac)
221
return self.addAction(action)
223
def removeAction(self, action):
225
Remove the given L{Action} from the L{Pipeline}.
227
@precondition: Can only be done if both:
228
- The L{Pipeline} is in READY or NULL
229
- The L{Action} is de-activated
231
@see: L{addAction}, L{setAction}
233
@param action: The L{Action} to remove from the L{Pipeline}
234
@type action: L{Action}
236
@return: Whether the L{Action} was removed from the L{Pipeline} or not.
237
@raise PipelineError: If L{Action} is activated and L{Pipeline} is not
240
self.debug("action:%r", action)
241
if not action in self.actions:
242
self.debug("action not controlled by this Pipeline, returning")
244
if action.isActive():
245
res, current, pending = self._pipeline.get_state(0)
246
if current > STATE_READY or pending > STATE_READY:
247
raise PipelineError("Active actions can't be removed from PLAYING or PAUSED Pipeline")
249
action.unsetPipeline()
251
raise PipelineError("Can't unset Pipeline from Action")
252
self.actions.remove(action)
253
self.emit('action-removed', action)
255
#{ State-related methods
257
def setState(self, state):
259
Set the L{Pipeline} to the given state.
261
@raises PipelineError: If the C{gst.Pipeline} could not be changed to
264
self.debug("state:%r", state)
265
res = self._pipeline.set_state(state)
266
if res == gst.STATE_CHANGE_FAILURE:
267
raise PipelineError("Failure changing state of the gst.Pipeline")
271
Query the L{Pipeline} for the current state.
275
This will do an actual query to the underlying GStreamer Pipeline.
276
@return: The current state.
279
change, state, pending = self._pipeline.get_state(0)
280
self.debug("change:%r, state:%r, pending:%r", change, state, pending)
285
Sets the L{Pipeline} to PLAYING
287
self.setState(STATE_PLAYING)
291
Sets the L{Pipeline} to PAUSED
293
self.setState(STATE_PAUSED)
297
Sets the L{Pipeline} to READY
299
self.setState(STATE_READY)
301
def togglePlayback(self):
302
if self.getState() == gst.STATE_PLAYING:
307
#{ Position and Seeking methods
309
def getPosition(self, format=gst.FORMAT_TIME):
311
Get the current position of the L{Pipeline}.
313
@param format: The format to return the current position in
314
@type format: C{gst.Format}
315
@return: The current position or gst.CLOCK_TIME_NONE
317
@raise PipelineError: If the position couldn't be obtained.
319
self.log("format %r", format)
321
cur, format = self._pipeline.query_position(format)
323
self.handleException(e)
324
raise PipelineError("Couldn't get position")
325
self.log("Got position %s", gst.TIME_ARGS(cur))
328
def getDuration(self, format=gst.FORMAT_TIME):
330
Get the duration of the C{Pipeline}.
332
self.log("format %r", format)
334
dur, format = self._pipeline.query_duration(format)
336
self.handleException(e)
337
raise PipelineError("Couldn't get duration")
338
self.log("Got duration %s", gst.TIME_ARGS(dur))
339
self.emit("duration-changed", dur)
342
def activatePositionListener(self, interval=300):
344
Activate the position listener.
346
When activated, the Pipeline will emit the 'position' signal at the
347
specified interval when it is the PLAYING or PAUSED state.
349
@see: L{deactivatePositionListener}
350
@param interval: Interval between position queries in milliseconds
351
@type interval: L{int} milliseconds
352
@return: Whether the position listener was activated or not
355
if self._listening == True:
357
self._listening = True
358
self._listeningInterval = interval
359
# if we're in paused or playing, switch it on
360
self._listenToPosition(self.getState() == STATE_PLAYING)
363
def deactivatePositionListener(self):
365
De-activates the position listener.
367
@see: L{activatePositionListener}
369
self._listenToPosition(False)
370
self._listening = False
372
def _positionListenerCb(self):
374
cur = self.getPosition()
375
if cur != gst.CLOCK_TIME_NONE:
376
self.emit('position', cur)
380
def _listenToPosition(self, listen=True):
381
# stupid and dumm method, not many checks done
382
# i.e. it does NOT check for current state
384
if self._listening == True and self._listeningSigId == 0:
385
self._listeningSigId = gobject.timeout_add(self._listeningInterval,
386
self._positionListenerCb)
387
elif self._listeningSigId != 0:
388
gobject.source_remove(self._listeningSigId)
389
self._listeningSigId = 0
391
def seek(self, position, format=gst.FORMAT_TIME):
393
Seeks in the L{Pipeline} to the given position.
395
@param position: Position to seek to
396
@type position: L{long}
397
@param format: The C{Format} of the seek position
398
@type format: C{gst.Format}
399
@raise PipelineError: If seek failed
401
if format == gst.FORMAT_TIME:
402
self.debug("position : %s", gst.TIME_ARGS (position))
404
self.debug("position : %d , format:%d", position, format)
405
# FIXME : temporarily deactivate position listener
406
#self._listenToPosition(False)
408
# clamp between [0, duration]
409
if format==gst.FORMAT_TIME:
410
position = max(0, min(position, self.getDuration()))
412
res = self._pipeline.seek(1.0, format, gst.SEEK_FLAG_FLUSH,
413
gst.SEEK_TYPE_SET, position,
414
gst.SEEK_TYPE_NONE, -1)
416
raise PipelineError("seek failed")
417
self.debug("seeking succesfull")
418
self.emit('position', position)
420
def seekRelative(self, time):
421
seekvalue = max(0, min(self.getPosition() + time,
425
#{ GStreamer object methods (For Action usage only)
427
def _getFactoryEntryForStream(self, factory, stream, create=False):
428
self.debug("factory %r, stream %r" , factory, stream)
430
factory_entry = self.factories[factory]
433
raise PipelineError()
435
change, current, pending = self._pipeline.get_state(0)
437
if (current > STATE_READY or pending > STATE_READY) and \
438
isinstance(factory, SourceFactory):
439
raise PipelineError("Pipeline not in NULL/READY,"
440
" can not create source bin")
442
factory_entry = FactoryEntry(factory)
443
self.factories[factory] = factory_entry
445
self.debug("Returning %s", factory_entry)
448
def _getStreamEntryForFactoryStream(self, factory, stream=None, create=False):
449
self.debug("factory %r, stream %r, create:%r", factory, stream, create)
450
factory_entry = self._getFactoryEntryForStream(factory, stream, create)
451
for k, v in factory_entry.streams.iteritems():
452
self.debug("Stream:%r ==> %s", k, v)
456
stream_entry = factory_entry.streams.get(stream, None)
458
for factory_stream, entry in factory_entry.streams.iteritems():
459
if factory_stream is None:
462
if stream.isCompatibleWithName(factory_stream):
463
if stream_entry is None:
465
elif stream is factory_stream:
469
if stream_entry is None:
471
self.debug("Failure getting stream %s", stream)
472
raise PipelineError()
474
self.debug("Creating StreamEntry")
475
stream_entry = StreamEntry(factory_entry, stream)
476
factory_entry.streams[stream] = stream_entry
478
self.debug("Returning %r", stream_entry)
481
def getBinForFactoryStream(self, factory, stream=None, automake=False):
483
Fetches the C{gst.Bin} currently used in the C{gst.Pipeline} for the
484
given L{ObjectFactory}. If no bin exists for the given factory and
485
automake is True, one is created.
487
The returned bin will have its reference count incremented. When you are
488
done with the bin, you must call L{releaseBinForFactoryStream}.
490
@param factory: The factory to search.
491
@type factory: L{ObjectFactory}
492
@param stream: stream to create a bin for
493
@type stream: L{MultimediaStream} derived instance
494
@param automake: If set to True, then if there is not a C{gst.Bin}
495
already created for the given factory, one will be created, added to the
496
list of controlled bins and added to the C{gst.Pipeline}.
497
@raise PipelineError: If the factory isn't used in this pipeline.
498
@raise PipelineError: If a source C{gst.Bin} needed to be created and the
499
L{Pipeline} was not in the READY or NULL state.
500
@raise PipelineError: If a C{gst.Bin} needed to be created but the
501
creation of that C{gst.Bin} failed.
502
@return: The bin corresponding to the given factory or None if there
503
are none for the given factory.
506
self.debug("factory:%r , stream:%r , automake:%r", factory, stream, automake)
508
stream_entry = self._getStreamEntryForFactoryStream(factory,
511
bin_entry = stream_entry.findBinEntry()
512
if bin_entry is not None and bin_entry.bin is not None:
513
bin_entry.bin_use_count += 1
517
raise PipelineError()
519
bin = stream_entry.bin = factory.makeBin(stream)
520
stream_entry.bin_use_count += 1
521
self._connectToPadSignals(bin)
522
self._pipeline.add(bin)
525
factory_entry = self._getFactoryEntryForStream(factory, stream)
527
for stream in factory.output_streams:
528
factory_entry.streams[stream] = StreamEntry(factory_entry,
529
stream, parent=stream_entry)
530
for stream in factory.input_streams:
531
factory_entry.streams[stream] = StreamEntry(factory_entry,
532
stream, parent=stream_entry)
534
self.debug("Setting bin to current state")
535
bin.set_state(self.getState())
539
def releaseBinForFactoryStream(self, factory, stream=None):
541
Release a bin returned by L{getBinForFactoryStream}.
543
@see getBinForFactoryStream
545
stream_entry = self._getStreamEntryForFactoryStream(factory, stream)
546
bin_stream_entry = stream_entry.findBinEntry()
548
if bin_stream_entry == None:
549
self.warning("couldn't find stream entry")
552
if bin_stream_entry.bin_use_count == 1 and \
553
(stream_entry.tee_use_count > 0 or \
554
stream_entry.queue_use_count > 0):
555
raise PipelineError()
557
bin_stream_entry.bin_use_count -= 1
558
if bin_stream_entry.bin_use_count == 0:
559
# do cleanup on our side
560
self.debug("cleaning up")
561
self._disconnectFromPadSignals(bin_stream_entry.bin)
562
bin_stream_entry.bin.set_state(gst.STATE_NULL)
563
self._pipeline.remove(bin_stream_entry.bin)
564
factory_entry = bin_stream_entry.factory_entry
565
del factory_entry.streams[bin_stream_entry.stream]
567
# ask the factory to finish cleanup
568
factory_entry.factory.releaseBin(bin_stream_entry.bin)
570
bin_stream_entry.bin = None
571
if not factory_entry.streams:
572
del self.factories[factory_entry.factory]
574
def getTeeForFactoryStream(self, factory, stream=None, automake=False):
576
Fetches the C{Tee} currently used in the C{gst.Pipeline} for the given
579
@param factory: The factory to search.
580
@type factory: L{SourceFactory}
581
@param stream: The stream of the factory to use. If not specified, then
582
a random stream from that factory will be used.
583
@type stream: L{MultimediaStream}
584
@param automake: If set to True, then if there is not a C{Tee}
585
already created for the given factory/stream, one will be created, added
586
to the list of controlled tees and added to the C{gst.Pipeline}.
587
@raise PipelineError: If the factory isn't used in this pipeline.
588
@raise PipelineError: If the factory isn't a L{SourceFactory}.
589
@raise PipelineError: If a C{Tee} needed to be created but the
590
creation of that C{Tee} failed.
591
@return: The C{Tee} corresponding to the given factory/stream or None if
592
there are none for the given factory.
593
@rtype: C{gst.Element}
595
self.debug("factory:%r , stream:%r, automake:%r", factory, stream, automake)
596
if not isinstance(factory, SourceFactory):
597
raise PipelineError("Given ObjectFactory isn't a SourceFactory")
599
stream_entry = self._getStreamEntryForFactoryStream(factory, stream)
600
bin_stream_entry = stream_entry.findBinEntry()
601
if bin_stream_entry is None:
602
raise PipelineError()
604
bin = bin_stream_entry.bin
606
if stream_entry.tee is not None:
607
stream_entry.tee_use_count += 1
608
# have an existing tee, return it
609
return stream_entry.tee
612
raise PipelineError()
614
self.debug("Really creating a tee")
615
pads = get_src_pads_for_stream(bin, stream)
616
if not pads or len(pads) > 1:
617
raise PipelineError("Can't figure out which source pad to use !")
620
self.debug("Using pad %r", srcpad)
622
stream_entry.tee = gst.element_factory_make("tee")
623
self._pipeline.add(stream_entry.tee)
624
stream_entry.tee_use_count += 1
625
stream_entry.tee.set_state(STATE_PAUSED)
626
self.debug("Linking pad %r to tee", pads[0])
627
srcpad.link(stream_entry.tee.get_pad("sink"))
629
return stream_entry.tee
631
def releaseTeeForFactoryStream(self, factory, stream=None):
633
Release the tee associated with the given source factory and stream.
634
If this was the last action to release the given (factory,stream), then the
637
This should be called by Actions when they deactivate, after having called
638
releaseQueueForFactoryStream() for the consumers.
640
@see: L{getTeeForFactoryStream}
642
@param factory: The factory
643
@type factory: L{SinkFactory}
644
@param stream: The stream
645
@type stream: L{MultimediaStream}
646
@raise PipelineError: If the Pipeline isn't in NULL or READY.
648
self.debug("factory:%r, stream:%r", factory, stream)
649
stream_entry = self._getStreamEntryForFactoryStream(factory, stream)
651
if stream_entry.tee_use_count == 0:
652
raise PipelineError()
654
stream_entry.tee_use_count -= 1
655
if stream_entry.tee_use_count == 0:
656
bin = self.getBinForFactoryStream(factory, stream, automake=False)
657
if stream_entry.tee is not None:
658
bin.unlink(stream_entry.tee)
659
stream_entry.tee.set_state(gst.STATE_NULL)
660
self._pipeline.remove(stream_entry.tee)
661
stream_entry.tee = None
662
self.releaseBinForFactoryStream(factory, stream)
664
def getQueueForFactoryStream(self, factory, stream=None, automake=False,
667
Fetches the C{Queue} currently used in the C{gst.Pipeline} for the given
670
@param factory: The factory to search.
671
@type factory: L{SinkFactory}
672
@param stream: The stream of the factory to use. If not specified, then
673
a random stream from that factory will be used.
674
@type stream: L{MultimediaStream}
675
@param automake: If set to True, then if there is not a C{Queue}
676
already created for the given factory/stream, one will be created, added
677
to the list of controlled queues and added to the C{gst.Pipeline}.
678
@param queuesize: The size of the queue in seconds.
679
@raise PipelineError: If the factory isn't used in this pipeline.
680
@raise PipelineError: If the factory isn't a L{SinkFactory}.
681
@raise PipelineError: If a C{Queue} needed to be created but the
682
creation of that C{Queue} failed.
683
@return: The C{Queue} corresponding to the given factory/stream or None if
684
there are none for the given factory.
685
@rtype: C{gst.Element}
687
self.debug("factory %r, stream %r" , factory, stream)
688
if not isinstance(factory, SinkFactory):
689
raise PipelineError("Given ObjectFactory isn't a SinkFactory")
691
stream_entry = self._getStreamEntryForFactoryStream(factory, stream)
692
if stream_entry.queue is not None:
693
stream_entry.queue_use_count += 1
694
return stream_entry.queue
697
raise PipelineError()
699
self.debug("Really creating a queue")
701
bin_entry = stream_entry.findBinEntry()
703
# find the source pads compatible with the given stream
704
pads = get_sink_pads_for_stream(bin, stream)
706
raise PipelineError("Can't figure out which sink pad to use !")
709
raise PipelineError("No compatible sink pads !")
711
stream_entry.queue = gst.element_factory_make("queue")
712
stream_entry.queue.props.max_size_time = queuesize * gst.SECOND
713
stream_entry.queue.props.max_size_buffers = 0
714
stream_entry.queue.props.max_size_bytes = 0
715
self._pipeline.add(stream_entry.queue)
716
stream_entry.queue.set_state(STATE_PAUSED)
718
self.debug("Linking pad %r to queue", pads[0])
719
stream_entry.queue.get_pad("src").link(pads[0])
721
stream_entry.queue_use_count += 1
722
return stream_entry.queue
724
def releaseQueueForFactoryStream(self, factory, stream=None):
726
Release the queue associated with the given sink factory and stream.
728
The queue object will be internally removed from the gst.Pipeline, along
729
with the link with tee.
731
This should be called by Actions when they deactivate.
733
@see: L{getQueueForFactoryStream}
735
@param factory: The factory
736
@type factory: L{SinkFactory}
737
@param stream: The stream
738
@type stream: L{MultimediaStream}
739
@raise PipelineError: If the Pipeline isn't in NULL or READY.
741
if not isinstance(factory, SinkFactory):
742
raise PipelineError()
744
stream_entry = self._getStreamEntryForFactoryStream(factory, stream)
745
if stream_entry.queue is None:
746
raise PipelineError()
748
stream_entry.queue_use_count -= 1
749
if stream_entry.queue_use_count == 0:
750
self.debug("Found a corresponding queue, unlink it from the consumer")
753
# first set the bin to NULL
754
stream_entry.bin.set_state(gst.STATE_NULL)
756
# unlink it from the sink bin
757
stream_entry.queue.unlink(stream_entry.bin)
759
self.debug("Unlinking it from the tee, if present")
760
queue_sinkpad = stream_entry.queue.get_pad("sink")
761
stream_entry.queue.set_state(gst.STATE_NULL)
762
# figure out the peerpad
763
tee_srcpad = queue_sinkpad.get_peer()
765
tee = tee_srcpad.get_parent()
766
tee_srcpad.unlink(queue_sinkpad)
767
tee.release_request_pad(tee_srcpad)
768
self.debug("Removing from gst.Pipeline")
769
self._pipeline.remove(stream_entry.queue)
770
stream_entry.queue = None
775
def _busMessageCb(self, unused_bus, message):
776
self.info("%s [%r]" , message.type, message.src)
777
if message.type == gst.MESSAGE_EOS:
779
elif message.type == gst.MESSAGE_STATE_CHANGED and message.src == self._pipeline:
780
prev, new, pending = message.parse_state_changed()
781
self.debug("Pipeline change state prev:%r, new:%r, pending:%r", prev, new, pending)
783
emit_state_change = pending == gst.STATE_VOID_PENDING
784
if prev == STATE_READY and new == STATE_PAUSED:
785
# trigger duration-changed
788
except PipelineError:
791
elif prev == STATE_PAUSED and new == STATE_PLAYING:
792
self._listenToPosition(True)
793
elif prev == STATE_PLAYING and new == STATE_PAUSED:
794
self._listenToPosition(False)
796
if emit_state_change:
797
self.emit('state-changed', new)
799
elif message.type == gst.MESSAGE_ERROR:
800
error, detail = message.parse_error()
801
self._handleErrorMessage(error, detail, message.src)
802
elif message.type == gst.MESSAGE_DURATION:
803
self.debug("Duration might have changed, querying it")
804
gobject.idle_add(self._queryDurationAsync)
806
def _queryDurationAsync(self, *args, **kwargs):
810
self.log("Duration failed... but we don't care")
813
def _handleErrorMessage(self, error, detail, source):
814
self.emit('error', error, detail)
816
def _busSyncMessageHandler(self, unused_bus, message):
817
if message.type == gst.MESSAGE_ELEMENT:
818
# handle element message synchronously
819
self.emit('element-message', message)
822
def _binPadAddedCb(self, bin, pad):
823
self.debug("bin:%r, pad:%r (%s)", bin, pad, pad.get_caps().to_string())
830
for factory_entry in self.factories.itervalues():
831
for stream_entry in factory_entry.streams.itervalues():
832
if stream_entry.bin == bin:
833
factory = factory_entry.factory
834
stream = stream_entry.stream
836
if factory is not None:
840
raise PipelineError("New pad on an element we don't control ??")
842
stream = get_stream_for_caps(pad.get_caps(), pad)
843
if stream not in factory_entry.streams:
844
factory_entry.streams[stream] = StreamEntry(factory_entry,
845
stream, parent=stream_entry)
846
stream_entry = factory_entry.streams[stream]
848
self._stream_entry_from_pad[pad] = stream_entry
850
# ask all actions using this producer if they handle it
851
compatactions = [action for action in self.actions
852
if factory in action.producers]
853
self.debug("Asking all actions (%d/%d) using that producer [%r] if they can handle it",
854
len(compatactions), len(self.actions), factory)
855
for a in self.actions:
856
self.debug("Action %r, producers %r", a, a.producers)
858
for action in compatactions:
859
handled |= action.handleNewStream(factory, stream)
862
self.debug("No action handled this Stream")
863
self.emit('unhandled-stream', stream)
865
self.debug("Done handling new pad")
869
def _binPadRemovedCb(self, bin, pad):
872
self.debug("bin:%r, pad:%r", bin, pad)
873
if not pad in self._stream_entry_from_pad:
874
self.warning("Pad not controlled by this pipeline")
877
stream_entry = self._stream_entry_from_pad.pop(pad)
878
factory = stream_entry.factory_entry.factory
879
stream = stream_entry.stream
881
for action in [action for action in self.actions
882
if factory in action.producers]:
883
action.streamRemoved(factory, stream)
888
def _connectToPadSignals(self, bin):
889
# Listen on the given bin for pads being added/removed
890
bin.connect('pad-added', self._binPadAddedCb)
891
bin.connect('pad-removed', self._binPadRemovedCb)
893
def _disconnectFromPadSignals(self, bin):
894
bin.disconnect_by_func(self._binPadAddedCb)
895
bin.disconnect_by_func(self._binPadRemovedCb)