2
# vi:si:et:sw=4:sts=4:ts=4
4
# gst-python - Python bindings for GStreamer
5
# Copyright (C) 2002 David I. Lehn
6
# Copyright (C) 2004 Johan Dahlin
7
# Copyright (C) 2005 Edward Hervey
9
# This library is free software; you can redistribute it and/or
10
# modify it under the terms of the GNU Lesser General Public
11
# License as published by the Free Software Foundation; either
12
# version 2.1 of the License, or (at your option) any later version.
14
# This library is distributed in the hope that it will be useful,
15
# but WITHOUT ANY WARRANTY; without even the implied warranty of
16
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17
# Lesser General Public License for more details.
19
# You should have received a copy of the GNU Lesser General Public
20
# License along with this library; if not, write to the Free Software
21
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
23
from common import gst, unittest, TestCase
29
class SrcBin(gst.Bin):
31
src = gst.element_factory_make('fakesrc')
33
pad = src.get_pad("src")
34
ghostpad = gst.GhostPad("src", pad)
35
self.add_pad(ghostpad)
36
gobject.type_register(SrcBin)
38
class SinkBin(gst.Bin):
40
sink = gst.element_factory_make('fakesink')
42
pad = sink.get_pad("sink")
43
ghostpad = gst.GhostPad("sink", pad)
44
self.add_pad(ghostpad)
47
def connect_handoff(self, cb, *args, **kwargs):
48
self.sink.set_property('signal-handoffs', True)
49
self.sink.connect('handoff', cb, *args, **kwargs)
51
gobject.type_register(SinkBin)
54
class PipeTest(TestCase):
58
self.pipeline = gst.Pipeline()
59
self.assertEquals(self.pipeline.__gstrefcount__, 1)
60
self.assertEquals(sys.getrefcount(self.pipeline), 3)
66
self.assertEquals(self.src.__gstrefcount__, 1)
67
self.assertEquals(sys.getrefcount(self.src), 3)
68
self.assertEquals(self.sink.__gstrefcount__, 1)
69
self.assertEquals(sys.getrefcount(self.sink), 3)
70
gst.info("end of SetUp")
74
self.assertTrue (self.pipeline.__gstrefcount__ >= 1 and self.pipeline.__gstrefcount__ <= 2)
75
self.assertEquals(sys.getrefcount(self.pipeline), 3)
76
self.assertEquals(self.src.__gstrefcount__, 2)
77
self.assertEquals(sys.getrefcount(self.src), 3)
78
self.assertEquals(self.sink.__gstrefcount__, 2)
79
self.assertEquals(sys.getrefcount(self.sink), 3)
80
gst.debug('deleting pipeline')
84
self.assertEquals(self.src.__gstrefcount__, 1) # parent gone
85
self.assertEquals(self.sink.__gstrefcount__, 1) # parent gone
86
self.assertEquals(sys.getrefcount(self.src), 3)
87
self.assertEquals(sys.getrefcount(self.sink), 3)
88
gst.debug('deleting src')
91
gst.debug('deleting sink')
95
TestCase.tearDown(self)
97
def testBinState(self):
98
self.pipeline.add(self.src, self.sink)
99
self.src.link(self.sink)
100
self.sink.connect_handoff(self._sink_handoff_cb)
103
self.assertTrue(self.pipeline.set_state(gst.STATE_PLAYING) != gst.STATE_CHANGE_FAILURE)
105
(ret, cur, pen) = self.pipeline.get_state()
106
if ret == gst.STATE_CHANGE_SUCCESS and cur == gst.STATE_PLAYING:
109
while self._handoffs < 10:
112
self.assertEquals(self.pipeline.set_state(gst.STATE_NULL), gst.STATE_CHANGE_SUCCESS)
114
(ret, cur, pen) = self.pipeline.get_state()
115
if ret == gst.STATE_CHANGE_SUCCESS and cur == gst.STATE_NULL:
118
## def testProbedLink(self):
119
## self.pipeline.add(self.src)
120
## pad = self.src.get_pad("src")
122
## self.sink.connect_handoff(self._sink_handoff_cb)
123
## self._handoffs = 0
125
## # FIXME: adding a probe to the ghost pad does not work atm
126
## # id = pad.add_buffer_probe(self._src_buffer_probe_cb)
127
## realpad = pad.get_target()
128
## self._probe_id = realpad.add_buffer_probe(self._src_buffer_probe_cb)
130
## self._probed = False
133
## (ret, cur, pen) = self.pipeline.get_state()
134
## if ret == gst.STATE_CHANGE_SUCCESS and cur == gst.STATE_PLAYING:
137
## while not self._probed:
140
## while self._handoffs < 10:
143
## self.pipeline.set_state(gst.STATE_NULL)
145
## (ret, cur, pen) = self.pipeline.get_state()
146
## if ret == gst.STATE_CHANGE_SUCCESS and cur == gst.STATE_NULL:
149
def _src_buffer_probe_cb(self, pad, buffer):
150
gst.debug("received probe on pad %r" % pad)
152
gst.debug('adding sink bin')
153
self.pipeline.add(self.sink)
154
# this seems to get rid of the warnings about pushing on an unactivated
156
gst.debug('setting sink state')
158
# FIXME: attempt one: sync to current pending state of bin
159
(res, cur, pen) = self.pipeline.get_state(timeout=0)
161
if target == gst.STATE_VOID_PENDING:
163
gst.debug("setting sink state to %r" % target)
164
# FIXME: the following print can cause a lock-up; why ?
166
# if we don't set async, it will possibly end up in PAUSED
167
self.sink.set_state(target)
170
self.src.link(self.sink)
171
gst.debug('removing buffer probe id %r' % self._probe_id)
172
pad.remove_buffer_probe(self._probe_id)
173
self._probe_id = None
176
def _sink_handoff_cb(self, sink, buffer, pad):
177
gst.debug('received handoff on pad %r' % pad)
180
if __name__ == "__main__":