1
# PiTiVi , Non-linear video editor
3
# tests/test_pipeline.py
5
# Copyright (c) 2009, Edward Hervey <bilboed@bilboed.com>
7
# This program is free software; you can redistribute it and/or
8
# modify it under the terms of the GNU Lesser General Public
9
# License as published by the Free Software Foundation; either
10
# version 2.1 of the License, or (at your option) any later version.
12
# This program is distributed in the hope that it will be useful,
13
# but WITHOUT ANY WARRANTY; without even the implied warranty of
14
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15
# Lesser General Public License for more details.
17
# You should have received a copy of the GNU Lesser General Public
18
# License along with this program; if not, write to the
19
# Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20
# Boston, MA 02110-1301, USA.
24
from unittest import main
25
from pitivi.pipeline import Pipeline, STATE_NULL, STATE_READY, STATE_PAUSED, STATE_PLAYING, PipelineError
26
from pitivi.action import Action, STATE_ACTIVE, STATE_NOT_ACTIVE
27
from pitivi.stream import VideoStream
28
from common import TestCase, SignalMonitor, FakeSinkFactory
29
from pitivi.factories.test import VideoTestSourceFactory
32
class BogusAction(Action):
36
class WeirdAction(Action):
40
class TestPipeline(TestCase):
45
self.pipeline = Pipeline()
46
self.monitor = SignalMonitor(self.pipeline, 'action-added',
47
'action-removed', 'factory-added',
48
'factory-removed', 'state-changed')
49
self.assertEquals(self.monitor.action_added_count, 0)
50
self.assertEquals(self.monitor.action_added_collect, [])
53
self.pipeline.setState(STATE_NULL)
54
self.pipeline.release()
55
self.monitor.disconnectFromObj(self.pipeline)
58
TestCase.tearDown(self)
60
def testAddRemoveActionSimple(self):
61
""" Simple add/remove of Actions """
64
# add the action to the pipeline
65
res = self.pipeline.addAction(ac1)
66
# the returned value should be the given action
67
self.assertEquals(res, ac1)
68
# it should now be in the list of actions...
69
self.failUnlessEqual(self.pipeline.actions, [ac1])
70
# ... and the action should be set to that pipeline
71
self.failUnlessEqual(ac1.pipeline, self.pipeline)
72
# the 'action-added' signal should be triggered once
73
self.assertEquals(self.monitor.action_added_count, 1)
74
# And it contained our action
75
self.assertEquals(self.monitor.action_added_collect, [(ac1, )])
77
# if we try to add that action again, it should be silently ignored
78
res = self.pipeline.addAction(ac1)
79
self.assertEquals(res, ac1)
80
# the list of actions shouldn't have changed
81
self.failUnlessEqual(self.pipeline.actions, [ac1])
82
# it shouldn't have changed the pipeline set on action
83
self.failUnlessEqual(ac1.pipeline, self.pipeline)
84
# the 'action-added' signal should NOT have been triggered again
85
self.assertEquals(self.monitor.action_added_count, 1)
87
# And now to remove it
88
self.pipeline.removeAction(ac1)
89
# the 'action-removed' signal should have been triggered once..
90
self.assertEquals(self.monitor.action_removed_count, 1)
91
# .. with the action as an argument
92
self.assertEquals(self.monitor.action_removed_collect, [(ac1, )])
93
# And there should be no actions left on the pipeline
94
self.assertEquals(self.pipeline.actions, [])
96
def testAddRemoveActionAdvanced(self):
97
""" Advanced add/remove of Actions """
102
res = self.pipeline.addAction(ac1)
103
self.assertEquals(self.pipeline.actions, [ac1])
105
# we can't add an action to two pipelines at the same time
106
self.failUnlessRaises(PipelineError, p2.addAction, ac1)
108
self.pipeline.removeAction(ac1)
109
self.assertEquals(self.pipeline.actions, [])
111
res = self.pipeline.setAction(ac1)
112
self.assertEquals(res, ac1)
113
self.assertEquals(self.pipeline.actions, [ac1])
114
# calling setAction while a similar action is already set should
115
# return the existing action and not change anything else
116
res = self.pipeline.setAction(ac2)
117
self.assertEquals(res, ac1)
118
self.assertEquals(self.pipeline.actions, [ac1])
120
# we can't remove active actions while in PAUSED/PLAYING
121
self.pipeline.setState(STATE_PAUSED)
122
ac1.state = STATE_ACTIVE
123
self.assertEquals(self.pipeline.getState(), STATE_PAUSED)
124
self.failUnlessRaises(PipelineError, self.pipeline.removeAction, ac1)
126
# but we can remove deactivated actions while in PAUSED/PLAYING
127
self.pipeline.setState(STATE_PAUSED)
128
ac1.state = STATE_NOT_ACTIVE
129
self.assertEquals(self.pipeline.getState(), STATE_PAUSED)
130
self.pipeline.removeAction(ac1)
132
# we can add actions while in PAUSED/PLAYING
133
res = self.pipeline.addAction(ac2)
134
self.assertEquals(res, ac2)
135
self.assertEquals(self.pipeline.actions, [ac2])
137
self.pipeline.removeAction(ac2)
140
def testStateChange(self):
141
loop = gobject.MainLoop()
143
bag = {"last_state": None}
145
def state_changed_cb(pipeline, state, bag, loop):
146
bag["last_state"] = state
149
self.pipeline.connect('state-changed', state_changed_cb, bag, loop)
152
self.pipeline.setState(STATE_PLAYING)
154
self.failUnlessEqual(bag["last_state"], STATE_PLAYING)
155
self.failUnlessEqual(self.pipeline.getState(), STATE_PLAYING)
156
self.assertEquals(self.monitor.state_changed_count, 1)
159
self.pipeline.setState(STATE_PLAYING)
160
self.assertEquals(self.monitor.state_changed_count, 1)
163
self.pipeline.setState(STATE_READY)
165
self.failUnlessEqual(bag["last_state"], STATE_READY)
166
self.failUnlessEqual(self.pipeline.getState(), STATE_READY)
167
self.assertEquals(self.monitor.state_changed_count, 2)
172
self.failUnlessEqual(bag["last_state"], STATE_PLAYING)
173
self.failUnlessEqual(self.pipeline.getState(), STATE_PLAYING)
174
self.assertEquals(self.monitor.state_changed_count, 3)
177
self.pipeline.pause()
179
self.failUnlessEqual(bag["last_state"], STATE_PAUSED)
180
self.failUnlessEqual(self.pipeline.getState(), STATE_PAUSED)
181
self.assertEquals(self.monitor.state_changed_count, 4)
185
self.failUnlessEqual(bag["last_state"], STATE_READY)
186
self.failUnlessEqual(self.pipeline.getState(), STATE_READY)
187
self.assertEquals(self.monitor.state_changed_count, 5)
189
def testGetReleaseBinForFactoryStream(self):
190
factory = VideoTestSourceFactory()
191
stream = VideoStream(gst.Caps('video/x-raw-rgb; video/x-raw-yuv'),
193
factory.addOutputStream(stream)
195
# try to get a cached instance
196
self.failUnlessRaises(PipelineError,
197
self.pipeline.getBinForFactoryStream, factory, stream, False)
200
bin1 = self.pipeline.getBinForFactoryStream(factory, stream, True)
201
self.failUnless(isinstance(bin1, gst.Element))
202
# return the cached instance
203
bin2 = self.pipeline.getBinForFactoryStream(factory, stream, True)
204
self.failUnlessEqual(id(bin1), id(bin2))
206
self.pipeline.releaseBinForFactoryStream(factory, stream)
207
self.pipeline.releaseBinForFactoryStream(factory, stream)
209
# the bin has been destroyed at this point
210
self.failUnlessRaises(PipelineError,
211
self.pipeline.releaseBinForFactoryStream, factory, stream)
213
# we should get a new instance
214
bin2 = self.pipeline.getBinForFactoryStream(factory, stream, True)
215
self.pipeline.releaseBinForFactoryStream(factory, stream)
217
def testGetReleaseTeeForFactoryStream(self):
218
factory = VideoTestSourceFactory()
219
stream = VideoStream(gst.Caps('video/x-raw-rgb; video/x-raw-yuv'),
221
factory.addOutputStream(stream)
223
self.failUnlessRaises(PipelineError,
224
self.pipeline.getTeeForFactoryStream, factory, stream, True)
226
# getBinForFactoryStream(factory, stream) must be called before
227
self.failUnlessRaises(PipelineError,
228
self.pipeline.getTeeForFactoryStream, factory, stream, True)
231
bin1 = self.pipeline.getBinForFactoryStream(factory, stream, True)
233
# try to get a cached tee
234
self.failUnlessRaises(PipelineError,
235
self.pipeline.getTeeForFactoryStream, factory, stream, False)
238
tee1 = self.pipeline.getTeeForFactoryStream(factory, stream, True)
239
self.failUnless(isinstance(tee1, gst.Element))
241
# get the cached instance
242
tee2 = self.pipeline.getTeeForFactoryStream(factory, stream, True)
243
self.failUnlessEqual(id(tee1), id(tee2))
246
self.pipeline.releaseTeeForFactoryStream(factory, stream)
248
# there's still a tee alive, so we can't release the bin
249
#self.failUnlessRaises(PipelineError,
250
# self.pipeline.releaseBinForFactoryStream, factory, stream)
252
self.pipeline.releaseTeeForFactoryStream(factory, stream)
253
self.failUnlessRaises(PipelineError,
254
self.pipeline.releaseTeeForFactoryStream, factory, stream)
256
# should always fail with a sink bin
257
factory2 = FakeSinkFactory()
258
stream2 = VideoStream(gst.Caps('video/x-raw-rgb; video/x-raw-yuv'),
260
factory2.addInputStream(stream2)
262
self.failUnlessRaises(PipelineError,
263
self.pipeline.getTeeForFactoryStream, factory2, stream2, True)
264
self.pipeline.releaseBinForFactoryStream(factory, stream)
266
def testGetReleaseQueueForFactoryStream(self):
267
factory = FakeSinkFactory()
268
stream = VideoStream(gst.Caps('any'), 'sink')
269
factory.addInputStream(stream)
271
self.failUnlessRaises(PipelineError,
272
self.pipeline.getQueueForFactoryStream, factory, stream, True)
274
# getBinForFactoryStream(factory, stream) must be called before
275
self.failUnlessRaises(PipelineError,
276
self.pipeline.getQueueForFactoryStream, factory, stream, True)
279
bin1 = self.pipeline.getBinForFactoryStream(factory, stream, True)
281
# try to get a cached queue
282
self.failUnlessRaises(PipelineError,
283
self.pipeline.getQueueForFactoryStream, factory, stream, False)
286
queue1 = self.pipeline.getQueueForFactoryStream(factory, stream, True)
287
self.failUnless(isinstance(queue1, gst.Element))
291
# get the cached instance
292
queue2 = self.pipeline.getQueueForFactoryStream(factory, stream, True)
293
self.failUnlessEqual(id(queue1), id(queue2))
296
self.pipeline.releaseQueueForFactoryStream(factory, stream)
300
# there's still a queue alive, so we can't release the bin
301
self.failUnlessRaises(PipelineError,
302
self.pipeline.releaseBinForFactoryStream, factory, stream)
304
self.pipeline.releaseQueueForFactoryStream(factory, stream)
306
# should always fail with a src bin
307
factory2 = VideoTestSourceFactory()
308
stream2 = VideoStream(gst.Caps('any'), 'src')
309
factory2.addOutputStream(stream2)
311
bin1 = self.pipeline.getBinForFactoryStream(factory, stream, True)
312
self.failUnlessRaises(PipelineError,
313
self.pipeline.getQueueForFactoryStream, factory2, stream2, True)
314
self.pipeline.releaseBinForFactoryStream(factory, stream)
316
self.pipeline.releaseBinForFactoryStream(factory, stream)
317
self.assertEquals(factory.current_bins, 0)
320
if __name__ == "__main__":