2
# vi:si:et:sw=4:sts=4:ts=4
4
# gst-python - Python bindings for GStreamer
5
# Copyright (C) 2002 David I. Lehn
6
# Copyright (C) 2004 Johan Dahlin
7
# Copyright (C) 2005 Edward Hervey
9
# This library is free software; you can redistribute it and/or
10
# modify it under the terms of the GNU Lesser General Public
11
# License as published by the Free Software Foundation; either
12
# version 2.1 of the License, or (at your option) any later version.
14
# This library is distributed in the hope that it will be useful,
15
# but WITHOUT ANY WARRANTY; without even the implied warranty of
16
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17
# Lesser General Public License for more details.
19
# You should have received a copy of the GNU Lesser General Public
20
# License along with this library; if not, write to the Free Software
21
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
25
from common import gst, unittest, TestCase
29
class TestConstruction(TestCase):
37
def testGoodConstructor(self):
38
name = 'test-pipeline'
39
pipeline = gst.Pipeline(name)
40
self.assertEquals(pipeline.__gstrefcount__, 1)
41
assert pipeline is not None, 'pipeline is None'
42
self.failUnless(isinstance(pipeline, gst.Pipeline),
43
'pipeline is not a GstPipline')
44
assert pipeline.get_name() == name, 'pipelines name is wrong'
45
self.assertEquals(pipeline.__gstrefcount__, 1)
47
def testParseLaunch(self):
48
pipeline = gst.parse_launch('fakesrc ! fakesink')
50
class Pipeline(TestCase):
53
self.pipeline = gst.Pipeline('test-pipeline')
54
source = gst.element_factory_make('fakesrc', 'source')
55
source.set_property('num-buffers', 5)
56
sink = gst.element_factory_make('fakesink', 'sink')
57
self.pipeline.add(source, sink)
58
gst.element_link_many(source, sink)
66
self.assertEqual(self.pipeline.get_state()[1], gst.STATE_NULL)
67
self.pipeline.set_state(gst.STATE_PLAYING)
68
self.assertEqual(self.pipeline.get_state()[1], gst.STATE_PLAYING)
72
self.assertEqual(self.pipeline.get_state()[1], gst.STATE_PLAYING)
73
self.pipeline.set_state(gst.STATE_NULL)
74
self.assertEqual(self.pipeline.get_state()[1], gst.STATE_NULL)
78
pipeline = gst.Pipeline('test')
79
self.assertEquals(pipeline.__gstrefcount__, 1)
80
bus = pipeline.get_bus()
81
self.assertEquals(pipeline.__gstrefcount__, 1)
82
# one for python and one for the pipeline
83
self.assertEquals(bus.__gstrefcount__, 2)
86
self.failUnless(self.gccollect())
87
self.assertEquals(bus.__gstrefcount__, 1)
89
class PipelineAndBus(TestCase):
92
self.pipeline = gst.Pipeline('test-pipeline')
93
source = gst.element_factory_make('fakesrc', 'source')
94
sink = gst.element_factory_make('fakesink', 'sink')
95
self.pipeline.add(source, sink)
96
gst.element_link_many(source, sink)
98
self.bus = self.pipeline.get_bus()
99
self.assertEquals(self.bus.__gstrefcount__, 2)
100
self.handler = self.bus.add_watch(self._message_received)
101
self.assertEquals(self.bus.__gstrefcount__, 3)
102
self.assertEquals(self.pipeline.__gstrefcount__, 1)
104
self.loop = gobject.MainLoop()
107
# FIXME: fix the refcount issues with the bus/pipeline
108
# flush the bus to be able to assert on the pipeline refcount
109
#while self.pipeline.__gstrefcount__ > 1:
112
# one for the pipeline, two for the snake
113
# three for the watch now shake shake shake but don't you
114
self.assertEquals(self.bus.__gstrefcount__, 3)
115
self.failUnless(gobject.source_remove(self.handler))
116
self.assertEquals(self.bus.__gstrefcount__, 2)
119
gst.debug('THOMAS: pipeline rc %d' % self.pipeline.__gstrefcount__)
120
#self.assertEquals(self.pipeline.__gstrefcount__, 1)
123
#self.assertEquals(self.bus.__gstrefcount__, 2)
127
# the async thread can be holding a ref, Wim is going to work on this
128
#TestCase.tearDown(self)
130
def _message_received(self, bus, message):
131
gst.debug('received message: %s, %s' % (
132
message.src.get_path_string(), message.type.value_nicks[1]))
134
if t == gst.MESSAGE_STATE_CHANGED:
135
old, new, pen = message.parse_state_changed()
136
gst.debug('%r state change from %r to %r' % (
137
message.src.get_path_string(), old, new))
138
if message.src == self.pipeline and new == self.final:
143
def testPlaying(self):
144
self.final = gst.STATE_PLAYING
145
ret = self.pipeline.set_state(gst.STATE_PLAYING)
146
self.assertEquals(ret, gst.STATE_CHANGE_ASYNC)
148
# go into a main loop to wait for messages
151
# we go to READY so we get messages; going to NULL would set
153
self.final = gst.STATE_READY
154
ret = self.pipeline.set_state(gst.STATE_READY)
155
self.assertEquals(ret, gst.STATE_CHANGE_SUCCESS)
158
# FIXME: not setting to NULL causes a deadlock; we might want to
159
# fix this in the bindings
160
self.assertEquals(self.pipeline.set_state(gst.STATE_NULL),
161
gst.STATE_CHANGE_SUCCESS)
162
self.assertEquals(self.pipeline.get_state(),
163
(gst.STATE_CHANGE_SUCCESS, gst.STATE_NULL, gst.STATE_VOID_PENDING))
166
if __name__ == "__main__":