1
# PiTiVi , Non-linear video editor
3
# tests/test_pipeline.py
5
# Copyright (c) 2009, Edward Hervey <bilboed@bilboed.com>
7
# This program is free software; you can redistribute it and/or
8
# modify it under the terms of the GNU Lesser General Public
9
# License as published by the Free Software Foundation; either
10
# version 2.1 of the License, or (at your option) any later version.
12
# This program is distributed in the hope that it will be useful,
13
# but WITHOUT ANY WARRANTY; without even the implied warranty of
14
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15
# Lesser General Public License for more details.
17
# You should have received a copy of the GNU Lesser General Public
18
# License along with this program; if not, write to the
19
# Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20
# Boston, MA 02111-1307, USA.
23
gobject.threads_init()
25
from unittest import main
26
from pitivi.pipeline import Pipeline, STATE_NULL, STATE_READY, STATE_PAUSED, STATE_PLAYING, PipelineError
27
from pitivi.action import Action, STATE_ACTIVE, STATE_NOT_ACTIVE
28
from pitivi.stream import AudioStream, VideoStream
29
from common import TestCase, SignalMonitor, FakeSourceFactory, FakeSinkFactory
31
class BogusAction(Action):
34
class WeirdAction(Action):
37
class TestPipeline(TestCase):
42
self.pipeline = Pipeline()
43
self.monitor = SignalMonitor(self.pipeline, 'action-added',
44
'action-removed', 'factory-added',
45
'factory-removed', 'state-changed')
46
self.assertEquals(self.monitor.action_added_count, 0)
47
self.assertEquals(self.monitor.action_added_collect, [])
50
self.pipeline.setState(STATE_NULL)
51
self.pipeline.release()
52
self.monitor.disconnectFromObj(self.pipeline)
55
TestCase.tearDown(self)
57
def testAddRemoveActionSimple(self):
58
""" Simple add/remove of Actions """
61
# add the action to the pipeline
62
res = self.pipeline.addAction(ac1)
63
# the returned value should be the given action
64
self.assertEquals(res, ac1)
65
# it should now be in the list of actions...
66
self.failUnlessEqual(self.pipeline.actions, [ac1])
67
# ... and the action should be set to that pipeline
68
self.failUnlessEqual(ac1.pipeline, self.pipeline)
69
# the 'action-added' signal should be triggered once
70
self.assertEquals(self.monitor.action_added_count, 1)
71
# And it contained our action
72
self.assertEquals(self.monitor.action_added_collect, [(ac1, )])
74
# if we try to add that action again, it should be silently ignored
75
res = self.pipeline.addAction(ac1)
76
self.assertEquals(res, ac1)
77
# the list of actions shouldn't have changed
78
self.failUnlessEqual(self.pipeline.actions, [ac1])
79
# it shouldn't have changed the pipeline set on action
80
self.failUnlessEqual(ac1.pipeline, self.pipeline)
81
# the 'action-added' signal should NOT have been triggered again
82
self.assertEquals(self.monitor.action_added_count, 1)
84
# And now to remove it
85
self.pipeline.removeAction(ac1)
86
# the 'action-removed' signal should have been triggered once..
87
self.assertEquals(self.monitor.action_removed_count, 1)
88
# .. with the action as an argument
89
self.assertEquals(self.monitor.action_removed_collect, [(ac1, )])
90
# And there should be no actions left on the pipeline
91
self.assertEquals(self.pipeline.actions, [])
93
def testAddRemoveActionAdvanced(self):
94
""" Advanced add/remove of Actions """
99
res = self.pipeline.addAction(ac1)
100
self.assertEquals(self.pipeline.actions, [ac1])
102
# we can't add an action to two pipelines at the same time
103
self.failUnlessRaises(PipelineError, p2.addAction, ac1)
105
self.pipeline.removeAction(ac1)
106
self.assertEquals(self.pipeline.actions, [])
108
res = self.pipeline.setAction(ac1)
109
self.assertEquals(res, ac1)
110
self.assertEquals(self.pipeline.actions, [ac1])
111
# calling setAction while a similar action is already set should
112
# return the existing action and not change anything else
113
res = self.pipeline.setAction(ac2)
114
self.assertEquals(res, ac1)
115
self.assertEquals(self.pipeline.actions, [ac1])
117
# we can't remove active actions while in PAUSED/PLAYING
118
self.pipeline.setState(STATE_PAUSED)
119
ac1.state = STATE_ACTIVE
120
self.assertEquals(self.pipeline.getState(), STATE_PAUSED)
121
self.failUnlessRaises(PipelineError, self.pipeline.removeAction, ac1)
123
# but we can remove deactivated actions while in PAUSED/PLAYING
124
self.pipeline.setState(STATE_PAUSED)
125
ac1.state = STATE_NOT_ACTIVE
126
self.assertEquals(self.pipeline.getState(), STATE_PAUSED)
127
self.pipeline.removeAction(ac1)
129
# we can add actions while in PAUSED/PLAYING
130
res = self.pipeline.addAction(ac2)
131
self.assertEquals(res, ac2)
132
self.assertEquals(self.pipeline.actions, [ac2])
134
self.pipeline.removeAction(ac2)
137
def testStateChange(self):
138
loop = gobject.MainLoop()
140
bag = {"last_state": None}
141
def state_changed_cb(pipeline, state, bag, loop):
142
bag["last_state"] = state
145
self.pipeline.connect('state-changed', state_changed_cb, bag, loop)
148
self.pipeline.setState(STATE_PLAYING)
150
self.failUnlessEqual(bag["last_state"], STATE_PLAYING)
151
self.failUnlessEqual(self.pipeline.getState(), STATE_PLAYING)
152
self.assertEquals(self.monitor.state_changed_count, 1)
155
self.pipeline.setState(STATE_PLAYING)
156
self.assertEquals(self.monitor.state_changed_count, 1)
159
self.pipeline.setState(STATE_READY)
161
self.failUnlessEqual(bag["last_state"], STATE_READY)
162
self.failUnlessEqual(self.pipeline.getState(), STATE_READY)
163
self.assertEquals(self.monitor.state_changed_count, 2)
168
self.failUnlessEqual(bag["last_state"], STATE_PLAYING)
169
self.failUnlessEqual(self.pipeline.getState(), STATE_PLAYING)
170
self.assertEquals(self.monitor.state_changed_count, 3)
173
self.pipeline.pause()
175
self.failUnlessEqual(bag["last_state"], STATE_PAUSED)
176
self.failUnlessEqual(self.pipeline.getState(), STATE_PAUSED)
177
self.assertEquals(self.monitor.state_changed_count, 4)
181
self.failUnlessEqual(bag["last_state"], STATE_READY)
182
self.failUnlessEqual(self.pipeline.getState(), STATE_READY)
183
self.assertEquals(self.monitor.state_changed_count, 5)
185
def testGetReleaseBinForFactoryStream(self):
186
factory = FakeSourceFactory()
187
stream = VideoStream(gst.Caps('any'), 'src0')
188
factory.addOutputStream(stream)
190
# try to get a cached instance
191
self.failUnlessRaises(PipelineError,
192
self.pipeline.getBinForFactoryStream, factory, stream, False)
195
bin1 = self.pipeline.getBinForFactoryStream(factory, stream, True)
196
self.failUnless(isinstance(bin1, gst.Element))
197
# return the cached instance
198
bin2 = self.pipeline.getBinForFactoryStream(factory, stream, True)
199
self.failUnlessEqual(id(bin1), id(bin2))
201
self.pipeline.releaseBinForFactoryStream(factory, stream)
202
self.pipeline.releaseBinForFactoryStream(factory, stream)
204
# the bin has been destroyed at this point
205
self.failUnlessRaises(PipelineError,
206
self.pipeline.releaseBinForFactoryStream, factory, stream)
208
# we should get a new instance
209
bin2 = self.pipeline.getBinForFactoryStream(factory, stream, True)
210
self.pipeline.releaseBinForFactoryStream(factory, stream)
212
def testGetReleaseTeeForFactoryStream(self):
213
factory = FakeSourceFactory()
214
stream = VideoStream(gst.Caps('any'), 'src')
215
factory.addOutputStream(stream)
217
self.failUnlessRaises(PipelineError,
218
self.pipeline.getTeeForFactoryStream, factory, stream, True)
220
# getBinForFactoryStream(factory, stream) must be called before
221
self.failUnlessRaises(PipelineError,
222
self.pipeline.getTeeForFactoryStream, factory, stream, True)
225
bin1 = self.pipeline.getBinForFactoryStream(factory, stream, True)
227
# try to get a cached tee
228
self.failUnlessRaises(PipelineError,
229
self.pipeline.getTeeForFactoryStream, factory, stream, False)
232
tee1 = self.pipeline.getTeeForFactoryStream(factory, stream, True)
233
self.failUnless(isinstance(tee1, gst.Element))
235
# get the cached instance
236
tee2 = self.pipeline.getTeeForFactoryStream(factory, stream, True)
237
self.failUnlessEqual(id(tee1), id(tee2))
240
self.pipeline.releaseTeeForFactoryStream(factory, stream)
242
# there's still a tee alive, so we can't release the bin
243
#self.failUnlessRaises(PipelineError,
244
# self.pipeline.releaseBinForFactoryStream, factory, stream)
246
self.pipeline.releaseTeeForFactoryStream(factory, stream)
247
self.failUnlessRaises(PipelineError,
248
self.pipeline.releaseTeeForFactoryStream, factory, stream)
250
# should always fail with a sink bin
251
factory2 = FakeSinkFactory()
252
stream2= VideoStream(gst.Caps('any'), 'src')
253
factory2.addInputStream(stream2)
255
self.failUnlessRaises(PipelineError,
256
self.pipeline.getTeeForFactoryStream, factory2, stream2, True)
257
self.pipeline.releaseBinForFactoryStream(factory, stream)
259
def testGetReleaseQueueForFactoryStream(self):
260
factory = FakeSinkFactory()
261
stream = VideoStream(gst.Caps('any'), 'sink')
262
factory.addInputStream(stream)
264
self.failUnlessRaises(PipelineError,
265
self.pipeline.getQueueForFactoryStream, factory, stream, True)
267
# getBinForFactoryStream(factory, stream) must be called before
268
self.failUnlessRaises(PipelineError,
269
self.pipeline.getQueueForFactoryStream, factory, stream, True)
272
bin1 = self.pipeline.getBinForFactoryStream(factory, stream, True)
274
# try to get a cached queue
275
self.failUnlessRaises(PipelineError,
276
self.pipeline.getQueueForFactoryStream, factory, stream, False)
279
queue1 = self.pipeline.getQueueForFactoryStream(factory, stream, True)
280
self.failUnless(isinstance(queue1, gst.Element))
284
# get the cached instance
285
queue2 = self.pipeline.getQueueForFactoryStream(factory, stream, True)
286
self.failUnlessEqual(id(queue1), id(queue2))
289
self.pipeline.releaseQueueForFactoryStream(factory, stream)
293
# there's still a queue alive, so we can't release the bin
294
self.failUnlessRaises(PipelineError,
295
self.pipeline.releaseBinForFactoryStream, factory, stream)
297
self.pipeline.releaseQueueForFactoryStream(factory, stream)
300
self.failUnlessRaises(PipelineError,
301
self.pipeline.releaseQueueForFactoryStream, factory, stream)
303
# should always fail with a src bin
304
factory2 = FakeSourceFactory()
305
stream2 = VideoStream(gst.Caps('any'), 'src')
306
factory2.addOutputStream(stream2)
308
bin1 = self.pipeline.getBinForFactoryStream(factory, stream, True)
309
self.failUnlessRaises(PipelineError,
310
self.pipeline.getQueueForFactoryStream, factory2, stream2, True)
311
self.pipeline.releaseBinForFactoryStream(factory, stream)
313
self.pipeline.releaseBinForFactoryStream(factory, stream)
314
self.assertEquals(factory.current_bins, 0)
317
if __name__ == "__main__":