2
# PiTiVi , Non-linear video editor
6
# Copyright (c) 2008, Alessandro Decina <alessandro.decina@collabora.co.uk>
8
# This program is free software; you can redistribute it and/or
9
# modify it under the terms of the GNU Lesser General Public
10
# License as published by the Free Software Foundation; either
11
# version 2.1 of the License, or (at your option) any later version.
13
# This program is distributed in the hope that it will be useful,
14
# but WITHOUT ANY WARRANTY; without even the implied warranty of
15
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16
# Lesser General Public License for more details.
18
# You should have received a copy of the GNU Lesser General Public
19
# License along with this program; if not, write to the
20
# Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
21
# Boston, MA 02110-1301, USA.
24
gobject.threads_init()
27
from common import TestCase
28
from pitivi.discoverer import Discoverer
29
from pitivi.factories.file import FileSourceFactory, PictureFileSourceFactory
32
class AddUrisStubDiscoverer(Discoverer):
33
analysis_scheduled = 0
36
def _scheduleAnalysis(self):
37
self.analysis_scheduled += 1
39
def _finishAnalysis(self, reason):
40
self.analysis_scheduled -= 1
41
return Discoverer._finishAnalysis(self, reason)
44
class TestAnalysisQueue(TestCase):
46
discoverer = AddUrisStubDiscoverer()
47
self.failIf(discoverer.working)
48
# add a file, should start working
49
discoverer.addUri('meh')
50
self.failUnless(discoverer.working)
51
self.failUnlessEqual(discoverer.analysis_scheduled, 1)
53
# finish analysis, no other files queued
54
discoverer._finishAnalysis("foo")
55
self.failIf(discoverer.working)
56
self.failUnlessEqual(discoverer.analysis_scheduled, 0)
58
# add another file, should start working
59
discoverer.addUri('meh1')
60
self.failUnless(discoverer.working)
61
self.failUnlessEqual(discoverer.analysis_scheduled, 1)
63
# queue another while the first isn't finished yet
64
discoverer.addUri('meh2')
65
# this shouldn't trigger a new analysis until the previous is done
66
self.failUnless(discoverer.analysis_scheduled, 1)
68
discoverer._finishAnalysis("foo")
69
# something queued, keep working
70
self.failUnless(discoverer.working)
71
self.failUnlessEqual(discoverer.analysis_scheduled, 1)
73
discoverer._finishAnalysis("foo")
74
self.failIf(discoverer.working)
75
self.failUnlessEqual(discoverer.analysis_scheduled, 0)
78
class Discoverer1(Discoverer):
80
timeout_scheduled = False
81
timeout_expired = True
82
timeout_cancelled = False
86
def _scheduleAnalysis(self):
87
# we call _analyze manually so we don't have to do tricks to keep test
88
# methods alive across mainloop iterations
91
def _scheduleTimeout(self):
92
self.timeout_scheduled = True
94
if self.timeout_expired:
97
def _removeTimeout(self):
99
self.timeout_cancelled = True
101
def _useDecodeBinTwo(self):
102
return self.use_decodebin2
104
def _createSource(self):
105
if self.current_uri == 'foo':
106
# create something that will go to paused
107
source = gst.element_factory_make('videotestsrc')
108
source.props.num_buffers = 1
110
source = Discoverer._createSource(self)
114
def _newVideoPadCb(self, pad):
115
Discoverer._newVideoPadCb(self, pad)
116
self.new_video_pad_cb += 1
118
def _newPadCb(self, pad):
119
Discoverer._newPadCb(self, pad)
123
class TestAnalysis(TestCase):
126
self.discoverer = Discoverer1()
127
self.discoverer.current_uri = "meh"
128
self.discoverer.pipeline = gst.Bin()
131
self.discoverer = None
132
TestCase.tearDown(self)
134
def testNoSource(self):
136
Check that discoverer errors out if it can't create a source element.
138
bag = {'error': None}
140
def no_media_file_cb(disc, uri, error, error_detail):
143
self.discoverer.addUri('buh://asd')
144
self.discoverer.connect('discovery-error', no_media_file_cb)
145
self.discoverer._analyze()
146
self.failUnlessEqual(bag['error'], 'No available source handler.')
148
def testErrorSettingPaused(self):
150
Check for errors setting the state of the pipeline to PAUSED.
152
bag = {'error': None}
154
def no_media_file_cb(disc, uri, error, error_detail):
157
self.discoverer.addUri('file://i/cant/possibly/exist/and/if/you/'
158
'really/have/a/file/named/like/this/you/deserve/a/faillure')
159
self.discoverer.connect('discovery-error', no_media_file_cb)
160
self.discoverer._analyze()
161
self.failUnlessEqual(bag['error'], 'File does not exist')
163
def testSetTimeout(self):
165
Check that a timeout is set when analyzing a file.
167
bag = {'error': None}
169
def discovery_error_cb(disc, uri, error, error_detail):
172
self.discoverer.connect('discovery-error', discovery_error_cb)
173
self.discoverer.addUri('foo')
174
self.failUnlessEqual(bag['error'], None)
175
self.discoverer._analyze()
176
# check that a timeout is scheduled once we start analyzing so we don't
177
# hang on one single file
178
self.failUnless(self.discoverer.timeout_scheduled)
179
self.failIf(self.discoverer.working)
180
self.failUnless(bag['error'])
182
self.discoverer.timeout_expired = False
183
self.discoverer.addUri('foo')
184
self.discoverer._analyze()
185
# at this point the timeout is scheduled but not expired, so the
186
# discoverer should still be working
187
self.failUnless(self.discoverer.timeout_scheduled)
188
self.failIf(self.discoverer.timeout_cancelled)
189
self.failUnless(self.discoverer.working)
190
# a call go _finishAnalysis() cancels the timeout
191
self.discoverer._finishAnalysis("foo")
192
self.failUnless(self.discoverer.timeout_cancelled)
193
self.failIf(self.discoverer.working)
195
def testQueryDuration(self):
196
def pad_query_fail(pad, query):
197
return pad.query_default(query)
199
def pad_query_succeed(pad, query):
200
if query.type == gst.QUERY_DURATION:
201
query.set_duration(gst.FORMAT_TIME, 10 * gst.SECOND)
204
return pad.query_default(query)
206
def pad_query_succeed2(pad, query):
207
if query.type == gst.QUERY_DURATION:
208
query.set_duration(gst.FORMAT_TIME, 20 * gst.SECOND)
211
return pad.query_default(query)
213
pad = gst.Pad('src', gst.PAD_SRC)
214
self.failUnlessEqual(self.discoverer.current_duration,
217
pad.set_query_function(pad_query_fail)
218
self.discoverer._maybeQueryDuration(pad)
219
self.failUnlessEqual(self.discoverer.current_duration,
222
# retry on other pads
223
pad.set_query_function(pad_query_succeed)
224
self.discoverer._maybeQueryDuration(pad)
225
self.failUnlessEqual(self.discoverer.current_duration,
228
# duration should be cached
229
pad.set_query_function(pad_query_succeed2)
230
self.discoverer._maybeQueryDuration(pad)
231
self.failUnlessEqual(self.discoverer.current_duration,
234
def testGetThumbnailFilenameFromPad(self):
235
pad = gst.Pad('src0', gst.PAD_SRC)
236
pad1 = gst.Pad('src1', gst.PAD_SRC)
237
self.discoverer.current_uri = "meh"
238
filename1 = self.discoverer._getThumbnailFilenameFromPad(pad)
239
filename2 = self.discoverer._getThumbnailFilenameFromPad(pad)
240
self.discoverer.current_uri = "boo"
241
filename3 = self.discoverer._getThumbnailFilenameFromPad(pad1)
242
self.failUnlessEqual(filename1, filename2)
243
self.failIfEqual(filename2, filename3)
244
# TODO: check for non ascii filenames (which is half broken in python
247
def testBusEos(self):
248
bag = {'called': False}
250
def finish_analysis(reason):
253
self.discoverer._finishAnalysis = finish_analysis
254
self.discoverer._busMessageEosCb(None, None)
255
self.failUnless(bag['called'], True)
257
def testBusElement(self):
258
bag = {'called': False}
260
def finish_analysis(reason):
263
self.discoverer._finishAnalysis = finish_analysis
264
self.failUnlessEqual(self.discoverer.error, None)
265
src = gst.Pad('src', gst.PAD_SRC)
266
# we ignore non-redirect messages
267
structure = gst.Structure('meh')
268
message = gst.message_new_element(src, structure)
269
self.discoverer._busMessageElementCb(None, message)
270
self.failUnlessEqual(self.discoverer.error, None)
271
self.failUnlessEqual(bag['called'], False)
273
# error out on redirects
274
structure = gst.Structure('redirect')
275
message = gst.message_new_element(src, structure)
276
self.discoverer._busMessageElementCb(None, message)
277
self.failIfEqual(self.discoverer.error, None)
278
self.failUnlessEqual(bag['called'], True)
280
def testBusError(self):
281
def discovery_error_cb(discoverer, uri, error, debug, dic):
287
self.discoverer.connect('discovery-error', discovery_error_cb, dic)
289
src = gst.Pad('src', gst.PAD_SRC)
290
gerror = gst.GError(gst.STREAM_ERROR, gst.STREAM_ERROR_FAILED, 'meh')
291
message = gst.message_new_error(src, gerror, 'debug1')
293
self.failUnlessEqual(self.discoverer.error, None)
294
self.discoverer.addUri('popme')
295
self.discoverer._busMessageErrorCb(None, message)
296
self.failUnlessEqual(dic['debug'], 'debug1')
298
# errors shouldn't be overridden
299
gerror = gst.GError(gst.STREAM_ERROR, gst.STREAM_ERROR_FAILED, 'muh')
300
message = gst.message_new_error(src, gerror, 'debug2')
301
self.discoverer.addUri('popme')
302
self.discoverer._busMessageErrorCb(None, message)
303
self.failUnlessEqual(dic['debug'], 'debug2')
305
def testNewDecodedPadFixed(self):
306
video = gst.Pad('video_00', gst.PAD_SRC)
307
video.set_caps(gst.Caps('video/x-raw-rgb'))
308
audio = gst.Pad('audio_00', gst.PAD_SRC)
309
audio.set_caps(gst.Caps('audio/x-raw-int'))
311
self.failUnlessEqual(self.discoverer.current_streams, [])
312
self.discoverer._newDecodedPadCb(None, video, False)
313
self.failUnlessEqual(len(self.discoverer.current_streams), 1)
314
self.failUnlessEqual(self.discoverer.new_video_pad_cb, 1)
316
self.discoverer._newDecodedPadCb(None, audio, False)
317
self.failUnlessEqual(len(self.discoverer.current_streams), 2)
318
self.failUnlessEqual(self.discoverer.new_video_pad_cb, 1)
320
def testNewDecodedPadNotFixed(self):
321
video_template = gst.PadTemplate('video_00', gst.PAD_SRC,
322
gst.PAD_ALWAYS, gst.Caps('video/x-raw-rgb, '
323
'framerate=[0/1, %d/1]' % ((2 ** 31) - 1)))
324
audio_template = gst.PadTemplate('audio_00', gst.PAD_SRC,
325
gst.PAD_ALWAYS, gst.Caps('audio/x-raw-int, '
326
'rate=[1, %d]' % ((2 ** 31) - 1)))
328
video = gst.Pad(video_template)
329
audio = gst.Pad(audio_template)
330
video_ghost = gst.GhostPad("video", video)
331
audio_ghost = gst.GhostPad("audio", audio)
333
self.failUnlessEqual(self.discoverer.current_streams, [])
334
self.discoverer._newDecodedPadCb(None, video_ghost, False)
335
self.failUnlessEqual(len(self.discoverer.current_streams), 0)
336
self.failUnlessEqual(self.discoverer.new_video_pad_cb, 1)
338
self.discoverer._newDecodedPadCb(None, audio_ghost, False)
339
self.failUnlessEqual(len(self.discoverer.current_streams), 0)
340
self.failUnlessEqual(self.discoverer.new_video_pad_cb, 1)
343
video.set_caps(gst.Caps('video/x-raw-rgb, framerate=25/1'))
344
self.failUnlessEqual(len(self.discoverer.current_streams), 1)
345
self.failUnlessEqual(self.discoverer.new_video_pad_cb, 1)
347
audio.set_caps(gst.Caps('audio/x-raw-int, rate=44100'))
348
self.failUnlessEqual(len(self.discoverer.current_streams), 2)
349
self.failUnlessEqual(self.discoverer.new_video_pad_cb, 1)
352
class TestStateChange(TestCase):
355
self.discoverer = Discoverer1()
356
self.discoverer.current_uri = "meh"
357
# don't plug the thumbnailing branch
358
self.discoverer.current_uri = 'file:///foo/bar'
360
self.discoverer.pipeline = self.src
361
self.discoverer.current_duration = 10 * gst.SECOND
364
self.error_detail = None
366
self.discoverer.connect('discovery-error', self.discoveryErrorCb)
367
self.discoverer.connect('discovery-done',
368
self.discoveryDoneCb)
371
self.discoverer.disconnect_by_function(self.discoveryErrorCb)
372
self.discoverer.disconnect_by_function(self.discoveryDoneCb)
373
self.discoverer = None
374
self.factories = None
377
TestCase.tearDown(self)
379
def discoveryErrorCb(self, disc, uri, error, debug):
381
self.error_detail = debug
383
def discoveryDoneCb(self, disc, uri, factory):
384
self.factories.append(factory)
386
def testBusStateChangedIgnored(self):
387
ignore_src = gst.Bin()
390
ignored = gst.message_new_state_changed(ignore_src,
391
gst.STATE_READY, gst.STATE_PAUSED, gst.STATE_VOID_PENDING)
392
self.discoverer._busMessageStateChangedCb(None, ignored)
393
self.failUnlessEqual(self.factories, [])
396
ignored = gst.message_new_state_changed(self.src,
397
gst.STATE_NULL, gst.STATE_READY, gst.STATE_PAUSED)
398
self.discoverer._busMessageStateChangedCb(None, ignored)
399
self.failUnlessEqual(self.factories, [])
401
def testBusStateChangedNoStreams(self):
403
message = gst.message_new_state_changed(self.src,
404
gst.STATE_READY, gst.STATE_PAUSED, gst.STATE_VOID_PENDING)
405
self.discoverer.addUri('illbepopped')
406
self.failUnlessEqual(self.error, None)
407
self.discoverer._busMessageStateChangedCb(None, message)
408
self.failUnlessEqual(self.factories, [])
409
# FIXME: be more strict about the error here
410
self.failUnless(self.error)
412
def testBusStateChangedVideoOnly(self):
414
pad = gst.Pad('src', gst.PAD_SRC)
415
pad.set_caps(gst.Caps('video/x-raw-rgb'))
416
self.discoverer._newDecodedPadCb(None, pad, False)
418
self.failUnlessEqual(self.error, None)
419
message = gst.message_new_state_changed(self.src,
420
gst.STATE_READY, gst.STATE_PAUSED, gst.STATE_VOID_PENDING)
421
self.discoverer.addUri('illbepopped')
422
self.failUnlessEqual(self.error, None)
423
self.discoverer._busMessageStateChangedCb(None, message)
424
# should go to PLAYING to do thumbnails
425
self.failUnlessEqual(self.src.get_state(0)[2], gst.STATE_PLAYING)
426
self.discoverer._finishAnalysis("foo")
427
self.failUnlessEqual(len(self.factories), 1)
428
factory = self.factories[0]
429
self.failUnless(isinstance(factory, FileSourceFactory))
430
self.failUnlessEqual(len(factory.output_streams), 1)
432
def testBusStateChangedAudioOnly(self):
434
pad = gst.Pad('src', gst.PAD_SRC)
435
pad.set_caps(gst.Caps('audio/x-raw-int'))
436
self.discoverer._newDecodedPadCb(None, pad, False)
438
self.failUnlessEqual(self.error, None)
439
message = gst.message_new_state_changed(self.src,
440
gst.STATE_READY, gst.STATE_PAUSED, gst.STATE_VOID_PENDING)
441
self.discoverer.addUri('illbepopped')
442
self.failUnlessEqual(self.error, None)
443
self.discoverer._busMessageStateChangedCb(None, message)
444
self.failUnlessEqual(len(self.factories), 1)
445
factory = self.factories[0]
446
self.failUnless(isinstance(factory, FileSourceFactory))
447
self.failUnlessEqual(len(factory.output_streams), 1)
449
def testBusStateChangedImageOnly(self):
451
pngdec = gst.element_factory_make('pngdec')
452
self.discoverer.pipeline.add(pngdec)
453
# images don't have duration
454
self.discoverer.current_duration = gst.CLOCK_TIME_NONE
455
pad = pngdec.get_pad('src')
456
caps = gst.Caps(pad.get_caps()[0])
457
caps[0]['width'] = 320
458
caps[0]['height'] = 240
459
caps[0]['framerate'] = gst.Fraction(0, 1)
461
self.discoverer._newDecodedPadCb(None, pad, False)
463
self.failUnlessEqual(self.error, None)
464
message = gst.message_new_state_changed(self.src,
465
gst.STATE_READY, gst.STATE_PAUSED, gst.STATE_VOID_PENDING)
466
self.discoverer.addUri('illbepopped')
467
self.failUnlessEqual(self.error, None)
468
self.discoverer._busMessageStateChangedCb(None, message)
469
# should go to PLAYING to do thumbnails
470
self.failUnlessEqual(self.src.get_state(0)[2], gst.STATE_PLAYING)
471
self.discoverer._finishAnalysis("foo")
472
self.failUnlessEqual(len(self.factories), 1)
473
factory = self.factories[0]
474
self.failUnless(isinstance(factory, PictureFileSourceFactory))
475
self.failUnlessEqual(len(factory.output_streams), 1)
477
def testDurationCheckImage(self):
478
self.discoverer.current_duration = gst.CLOCK_TIME_NONE
479
pngdec = gst.element_factory_make('pngdec')
480
self.discoverer.pipeline.add(pngdec)
481
pad = pngdec.get_pad('src')
482
caps = gst.Caps(pad.get_caps()[0])
483
caps[0]['width'] = 320
484
caps[0]['height'] = 240
485
caps[0]['framerate'] = gst.Fraction(0, 1)
487
self.discoverer._newDecodedPadCb(None, pad, False)
488
self.discoverer.addUri('illbepopped')
489
self.discoverer._finishAnalysis("foo")
491
self.failUnlessEqual(self.error, None)
492
self.failUnlessEqual(self.discoverer.current_duration,
495
def testDurationCheckNonImage(self):
496
self.discoverer.current_duration = gst.CLOCK_TIME_NONE
497
pad = gst.Pad('src', gst.PAD_SRC)
498
pad.set_caps(gst.Caps('audio/x-raw-int'))
499
self.discoverer._newDecodedPadCb(None, pad, False)
500
self.discoverer.addUri('illbepopped')
501
self.discoverer._finishAnalysis("foo")
503
self.failUnlessEqual(self.error,
504
"Could not establish the duration of the file.")
505
self.failUnlessEqual(self.discoverer.current_duration,