~timo-jyrinki/ubuntu/trusty/pitivi/backport_utopic_fixes

« back to all changes in this revision

Viewing changes to tests/test_pipeline.py

  • Committer: Package Import Robot
  • Author(s): Sebastian Dröge
  • Date: 2014-04-05 15:28:16 UTC
  • mfrom: (6.1.13 sid)
  • Revision ID: package-import@ubuntu.com-20140405152816-6lijoax4cngiz5j5
Tags: 0.93-3
* debian/control:
  + Depend on python-gi (>= 3.10), older versions do not work
    with pitivi (Closes: #732813).
  + Add missing dependency on gir1.2-clutter-gst-2.0 (Closes: #743692).
  + Add suggests on gir1.2-notify-0.7 and gir1.2-gnomedesktop-3.0.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# PiTiVi , Non-linear video editor
2
 
#
3
 
#       tests/test_pipeline.py
4
 
#
5
 
# Copyright (c) 2009, Edward Hervey <bilboed@bilboed.com>
6
 
#
7
 
# This program is free software; you can redistribute it and/or
8
 
# modify it under the terms of the GNU Lesser General Public
9
 
# License as published by the Free Software Foundation; either
10
 
# version 2.1 of the License, or (at your option) any later version.
11
 
#
12
 
# This program is distributed in the hope that it will be useful,
13
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
14
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15
 
# Lesser General Public License for more details.
16
 
#
17
 
# You should have received a copy of the GNU Lesser General Public
18
 
# License along with this program; if not, write to the
19
 
# Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20
 
# Boston, MA 02110-1301, USA.
21
 
 
22
 
import gobject
23
 
import gst
24
 
from unittest import main
25
 
from pitivi.pipeline import Pipeline, STATE_NULL, STATE_READY, STATE_PAUSED, STATE_PLAYING, PipelineError
26
 
from pitivi.action import Action, STATE_ACTIVE, STATE_NOT_ACTIVE
27
 
from pitivi.stream import VideoStream
28
 
from common import TestCase, SignalMonitor, FakeSinkFactory
29
 
from pitivi.factories.test import VideoTestSourceFactory
30
 
 
31
 
 
32
 
class BogusAction(Action):
33
 
    pass
34
 
 
35
 
 
36
 
class WeirdAction(Action):
37
 
    pass
38
 
 
39
 
 
40
 
class TestPipeline(TestCase):
41
 
 
42
 
    def setUp(self):
43
 
        TestCase.setUp(self)
44
 
        gst.debug("start")
45
 
        self.pipeline = Pipeline()
46
 
        self.monitor = SignalMonitor(self.pipeline, 'action-added',
47
 
                                     'action-removed', 'factory-added',
48
 
                                     'factory-removed', 'state-changed')
49
 
        self.assertEquals(self.monitor.action_added_count, 0)
50
 
        self.assertEquals(self.monitor.action_added_collect, [])
51
 
 
52
 
    def tearDown(self):
53
 
        self.pipeline.setState(STATE_NULL)
54
 
        self.pipeline.release()
55
 
        self.monitor.disconnectFromObj(self.pipeline)
56
 
        del self.pipeline
57
 
        del self.monitor
58
 
        TestCase.tearDown(self)
59
 
 
60
 
    def testAddRemoveActionSimple(self):
61
 
        """ Simple add/remove of Actions """
62
 
        ac1 = BogusAction()
63
 
 
64
 
        # add the action to the pipeline
65
 
        res = self.pipeline.addAction(ac1)
66
 
        # the returned value should be the given action
67
 
        self.assertEquals(res, ac1)
68
 
        # it should now be in the list of actions...
69
 
        self.failUnlessEqual(self.pipeline.actions, [ac1])
70
 
        # ... and the action should be set to that pipeline
71
 
        self.failUnlessEqual(ac1.pipeline, self.pipeline)
72
 
        # the 'action-added' signal should be triggered once
73
 
        self.assertEquals(self.monitor.action_added_count, 1)
74
 
        # And it contained our action
75
 
        self.assertEquals(self.monitor.action_added_collect, [(ac1, )])
76
 
 
77
 
        # if we try to add that action again, it should be silently ignored
78
 
        res = self.pipeline.addAction(ac1)
79
 
        self.assertEquals(res, ac1)
80
 
        # the list of actions shouldn't have changed
81
 
        self.failUnlessEqual(self.pipeline.actions, [ac1])
82
 
        # it shouldn't have changed the pipeline set on action
83
 
        self.failUnlessEqual(ac1.pipeline, self.pipeline)
84
 
        # the 'action-added' signal should NOT have been triggered again
85
 
        self.assertEquals(self.monitor.action_added_count, 1)
86
 
 
87
 
        # And now to remove it
88
 
        self.pipeline.removeAction(ac1)
89
 
        # the 'action-removed' signal should have been triggered once..
90
 
        self.assertEquals(self.monitor.action_removed_count, 1)
91
 
        # .. with the action as an argument
92
 
        self.assertEquals(self.monitor.action_removed_collect, [(ac1, )])
93
 
        # And there should be no actions left on the pipeline
94
 
        self.assertEquals(self.pipeline.actions, [])
95
 
 
96
 
    def testAddRemoveActionAdvanced(self):
97
 
        """ Advanced add/remove of Actions """
98
 
        ac1 = BogusAction()
99
 
        ac2 = BogusAction()
100
 
        p2 = Pipeline()
101
 
 
102
 
        res = self.pipeline.addAction(ac1)
103
 
        self.assertEquals(self.pipeline.actions, [ac1])
104
 
 
105
 
        # we can't add an action to two pipelines at the same time
106
 
        self.failUnlessRaises(PipelineError, p2.addAction, ac1)
107
 
 
108
 
        self.pipeline.removeAction(ac1)
109
 
        self.assertEquals(self.pipeline.actions, [])
110
 
 
111
 
        res = self.pipeline.setAction(ac1)
112
 
        self.assertEquals(res, ac1)
113
 
        self.assertEquals(self.pipeline.actions, [ac1])
114
 
        # calling setAction while a similar action is already set should
115
 
        # return the existing action and not change anything else
116
 
        res = self.pipeline.setAction(ac2)
117
 
        self.assertEquals(res, ac1)
118
 
        self.assertEquals(self.pipeline.actions, [ac1])
119
 
 
120
 
        # we can't remove active actions while in PAUSED/PLAYING
121
 
        self.pipeline.setState(STATE_PAUSED)
122
 
        ac1.state = STATE_ACTIVE
123
 
        self.assertEquals(self.pipeline.getState(), STATE_PAUSED)
124
 
        self.failUnlessRaises(PipelineError, self.pipeline.removeAction, ac1)
125
 
 
126
 
        # but we can remove deactivated actions while in PAUSED/PLAYING
127
 
        self.pipeline.setState(STATE_PAUSED)
128
 
        ac1.state = STATE_NOT_ACTIVE
129
 
        self.assertEquals(self.pipeline.getState(), STATE_PAUSED)
130
 
        self.pipeline.removeAction(ac1)
131
 
 
132
 
        # we can add actions while in PAUSED/PLAYING
133
 
        res = self.pipeline.addAction(ac2)
134
 
        self.assertEquals(res, ac2)
135
 
        self.assertEquals(self.pipeline.actions, [ac2])
136
 
 
137
 
        self.pipeline.removeAction(ac2)
138
 
        p2.release()
139
 
 
140
 
    def testStateChange(self):
141
 
        loop = gobject.MainLoop()
142
 
 
143
 
        bag = {"last_state": None}
144
 
 
145
 
        def state_changed_cb(pipeline, state, bag, loop):
146
 
            bag["last_state"] = state
147
 
            loop.quit()
148
 
 
149
 
        self.pipeline.connect('state-changed', state_changed_cb, bag, loop)
150
 
 
151
 
        # playing
152
 
        self.pipeline.setState(STATE_PLAYING)
153
 
        loop.run()
154
 
        self.failUnlessEqual(bag["last_state"], STATE_PLAYING)
155
 
        self.failUnlessEqual(self.pipeline.getState(), STATE_PLAYING)
156
 
        self.assertEquals(self.monitor.state_changed_count, 1)
157
 
 
158
 
        # playing again
159
 
        self.pipeline.setState(STATE_PLAYING)
160
 
        self.assertEquals(self.monitor.state_changed_count, 1)
161
 
 
162
 
        # ready
163
 
        self.pipeline.setState(STATE_READY)
164
 
        loop.run()
165
 
        self.failUnlessEqual(bag["last_state"], STATE_READY)
166
 
        self.failUnlessEqual(self.pipeline.getState(), STATE_READY)
167
 
        self.assertEquals(self.monitor.state_changed_count, 2)
168
 
 
169
 
        # PLAYING
170
 
        self.pipeline.play()
171
 
        loop.run()
172
 
        self.failUnlessEqual(bag["last_state"], STATE_PLAYING)
173
 
        self.failUnlessEqual(self.pipeline.getState(), STATE_PLAYING)
174
 
        self.assertEquals(self.monitor.state_changed_count, 3)
175
 
 
176
 
        # PAUSE
177
 
        self.pipeline.pause()
178
 
        loop.run()
179
 
        self.failUnlessEqual(bag["last_state"], STATE_PAUSED)
180
 
        self.failUnlessEqual(self.pipeline.getState(), STATE_PAUSED)
181
 
        self.assertEquals(self.monitor.state_changed_count, 4)
182
 
 
183
 
        self.pipeline.stop()
184
 
        loop.run()
185
 
        self.failUnlessEqual(bag["last_state"], STATE_READY)
186
 
        self.failUnlessEqual(self.pipeline.getState(), STATE_READY)
187
 
        self.assertEquals(self.monitor.state_changed_count, 5)
188
 
 
189
 
    def testGetReleaseBinForFactoryStream(self):
190
 
        factory = VideoTestSourceFactory()
191
 
        stream = VideoStream(gst.Caps('video/x-raw-rgb; video/x-raw-yuv'),
192
 
                'src0')
193
 
        factory.addOutputStream(stream)
194
 
 
195
 
        # try to get a cached instance
196
 
        self.failUnlessRaises(PipelineError,
197
 
                self.pipeline.getBinForFactoryStream, factory, stream, False)
198
 
 
199
 
        # create a bin
200
 
        bin1 = self.pipeline.getBinForFactoryStream(factory, stream, True)
201
 
        self.failUnless(isinstance(bin1, gst.Element))
202
 
        # return the cached instance
203
 
        bin2 = self.pipeline.getBinForFactoryStream(factory, stream, True)
204
 
        self.failUnlessEqual(id(bin1), id(bin2))
205
 
 
206
 
        self.pipeline.releaseBinForFactoryStream(factory, stream)
207
 
        self.pipeline.releaseBinForFactoryStream(factory, stream)
208
 
 
209
 
        # the bin has been destroyed at this point
210
 
        self.failUnlessRaises(PipelineError,
211
 
                self.pipeline.releaseBinForFactoryStream, factory, stream)
212
 
 
213
 
        # we should get a new instance
214
 
        bin2 = self.pipeline.getBinForFactoryStream(factory, stream, True)
215
 
        self.pipeline.releaseBinForFactoryStream(factory, stream)
216
 
 
217
 
    def testGetReleaseTeeForFactoryStream(self):
218
 
        factory = VideoTestSourceFactory()
219
 
        stream = VideoStream(gst.Caps('video/x-raw-rgb; video/x-raw-yuv'),
220
 
                'src')
221
 
        factory.addOutputStream(stream)
222
 
 
223
 
        self.failUnlessRaises(PipelineError,
224
 
            self.pipeline.getTeeForFactoryStream, factory, stream, True)
225
 
 
226
 
        # getBinForFactoryStream(factory, stream) must be called before
227
 
        self.failUnlessRaises(PipelineError,
228
 
            self.pipeline.getTeeForFactoryStream, factory, stream, True)
229
 
 
230
 
        # create the bin
231
 
        bin1 = self.pipeline.getBinForFactoryStream(factory, stream, True)
232
 
 
233
 
        # try to get a cached tee
234
 
        self.failUnlessRaises(PipelineError,
235
 
            self.pipeline.getTeeForFactoryStream, factory, stream, False)
236
 
 
237
 
        # create tee
238
 
        tee1 = self.pipeline.getTeeForFactoryStream(factory, stream, True)
239
 
        self.failUnless(isinstance(tee1, gst.Element))
240
 
 
241
 
        # get the cached instance
242
 
        tee2 = self.pipeline.getTeeForFactoryStream(factory, stream, True)
243
 
        self.failUnlessEqual(id(tee1), id(tee2))
244
 
 
245
 
        # release
246
 
        self.pipeline.releaseTeeForFactoryStream(factory, stream)
247
 
 
248
 
        # there's still a tee alive, so we can't release the bin
249
 
        #self.failUnlessRaises(PipelineError,
250
 
        #        self.pipeline.releaseBinForFactoryStream, factory, stream)
251
 
 
252
 
        self.pipeline.releaseTeeForFactoryStream(factory, stream)
253
 
        self.failUnlessRaises(PipelineError,
254
 
                self.pipeline.releaseTeeForFactoryStream, factory, stream)
255
 
 
256
 
        # should always fail with a sink bin
257
 
        factory2 = FakeSinkFactory()
258
 
        stream2 = VideoStream(gst.Caps('video/x-raw-rgb; video/x-raw-yuv'),
259
 
                'src')
260
 
        factory2.addInputStream(stream2)
261
 
 
262
 
        self.failUnlessRaises(PipelineError,
263
 
            self.pipeline.getTeeForFactoryStream, factory2, stream2, True)
264
 
        self.pipeline.releaseBinForFactoryStream(factory, stream)
265
 
 
266
 
    def testGetReleaseQueueForFactoryStream(self):
267
 
        factory = FakeSinkFactory()
268
 
        stream = VideoStream(gst.Caps('any'), 'sink')
269
 
        factory.addInputStream(stream)
270
 
 
271
 
        self.failUnlessRaises(PipelineError,
272
 
            self.pipeline.getQueueForFactoryStream, factory, stream, True)
273
 
 
274
 
        # getBinForFactoryStream(factory, stream) must be called before
275
 
        self.failUnlessRaises(PipelineError,
276
 
            self.pipeline.getQueueForFactoryStream, factory, stream, True)
277
 
 
278
 
        # create the bin
279
 
        bin1 = self.pipeline.getBinForFactoryStream(factory, stream, True)
280
 
 
281
 
        # try to get a cached queue
282
 
        self.failUnlessRaises(PipelineError,
283
 
            self.pipeline.getQueueForFactoryStream, factory, stream, False)
284
 
 
285
 
        # create queue
286
 
        queue1 = self.pipeline.getQueueForFactoryStream(factory, stream, True)
287
 
        self.failUnless(isinstance(queue1, gst.Element))
288
 
 
289
 
        gst.debug("pouet")
290
 
 
291
 
        # get the cached instance
292
 
        queue2 = self.pipeline.getQueueForFactoryStream(factory, stream, True)
293
 
        self.failUnlessEqual(id(queue1), id(queue2))
294
 
 
295
 
        # release
296
 
        self.pipeline.releaseQueueForFactoryStream(factory, stream)
297
 
 
298
 
        gst.debug("pouet")
299
 
 
300
 
        # there's still a queue alive, so we can't release the bin
301
 
        self.failUnlessRaises(PipelineError,
302
 
                self.pipeline.releaseBinForFactoryStream, factory, stream)
303
 
 
304
 
        self.pipeline.releaseQueueForFactoryStream(factory, stream)
305
 
 
306
 
        # should always fail with a src bin
307
 
        factory2 = VideoTestSourceFactory()
308
 
        stream2 = VideoStream(gst.Caps('any'), 'src')
309
 
        factory2.addOutputStream(stream2)
310
 
 
311
 
        bin1 = self.pipeline.getBinForFactoryStream(factory, stream, True)
312
 
        self.failUnlessRaises(PipelineError,
313
 
            self.pipeline.getQueueForFactoryStream, factory2, stream2, True)
314
 
        self.pipeline.releaseBinForFactoryStream(factory, stream)
315
 
 
316
 
        self.pipeline.releaseBinForFactoryStream(factory, stream)
317
 
        self.assertEquals(factory.current_bins, 0)
318
 
 
319
 
 
320
 
if __name__ == "__main__":
321
 
    main()