~ubuntu-branches/ubuntu/trusty/pitivi/trusty

« back to all changes in this revision

Viewing changes to tests/test_discoverer.py

  • Committer: Bazaar Package Importer
  • Author(s): Sebastien Bacher
  • Date: 2011-07-07 13:43:47 UTC
  • mto: (6.1.9 sid) (1.2.12)
  • mto: This revision was merged to the branch mainline in revision 32.
  • Revision ID: james.westby@ubuntu.com-20110707134347-cari9kxjiakzej9z
Tags: upstream-0.14.1
ImportĀ upstreamĀ versionĀ 0.14.1

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#!/usr/bin/python
 
2
# PiTiVi , Non-linear video editor
 
3
#
 
4
#       test_discoverer.py
 
5
#
 
6
# Copyright (c) 2008, Alessandro Decina <alessandro.decina@collabora.co.uk>
 
7
#
 
8
# This program is free software; you can redistribute it and/or
 
9
# modify it under the terms of the GNU Lesser General Public
 
10
# License as published by the Free Software Foundation; either
 
11
# version 2.1 of the License, or (at your option) any later version.
 
12
#
 
13
# This program is distributed in the hope that it will be useful,
 
14
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
15
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 
16
# Lesser General Public License for more details.
 
17
#
 
18
# You should have received a copy of the GNU Lesser General Public
 
19
# License along with this program; if not, write to the
 
20
# Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
 
21
# Boston, MA 02110-1301, USA.
 
22
 
 
23
import gobject
 
24
gobject.threads_init()
 
25
import gst
 
26
 
 
27
from common import TestCase
 
28
from pitivi.discoverer import Discoverer
 
29
from pitivi.factories.file import FileSourceFactory, PictureFileSourceFactory
 
30
 
 
31
 
 
32
class AddUrisStubDiscoverer(Discoverer):
 
33
    analysis_scheduled = 0
 
34
    current_uri = "bar"
 
35
 
 
36
    def _scheduleAnalysis(self):
 
37
        self.analysis_scheduled += 1
 
38
 
 
39
    def _finishAnalysis(self, reason):
 
40
        self.analysis_scheduled -= 1
 
41
        return Discoverer._finishAnalysis(self, reason)
 
42
 
 
43
 
 
44
class TestAnalysisQueue(TestCase):
 
45
    def testAddUri(self):
 
46
        discoverer = AddUrisStubDiscoverer()
 
47
        self.failIf(discoverer.working)
 
48
        # add a file, should start working
 
49
        discoverer.addUri('meh')
 
50
        self.failUnless(discoverer.working)
 
51
        self.failUnlessEqual(discoverer.analysis_scheduled, 1)
 
52
 
 
53
        # finish analysis, no other files queued
 
54
        discoverer._finishAnalysis("foo")
 
55
        self.failIf(discoverer.working)
 
56
        self.failUnlessEqual(discoverer.analysis_scheduled, 0)
 
57
 
 
58
        # add another file, should start working
 
59
        discoverer.addUri('meh1')
 
60
        self.failUnless(discoverer.working)
 
61
        self.failUnlessEqual(discoverer.analysis_scheduled, 1)
 
62
 
 
63
        # queue another while the first isn't finished yet
 
64
        discoverer.addUri('meh2')
 
65
        # this shouldn't trigger a new analysis until the previous is done
 
66
        self.failUnless(discoverer.analysis_scheduled, 1)
 
67
 
 
68
        discoverer._finishAnalysis("foo")
 
69
        # something queued, keep working
 
70
        self.failUnless(discoverer.working)
 
71
        self.failUnlessEqual(discoverer.analysis_scheduled, 1)
 
72
 
 
73
        discoverer._finishAnalysis("foo")
 
74
        self.failIf(discoverer.working)
 
75
        self.failUnlessEqual(discoverer.analysis_scheduled, 0)
 
76
 
 
77
 
 
78
class Discoverer1(Discoverer):
 
79
    use_decodebin2 = True
 
80
    timeout_scheduled = False
 
81
    timeout_expired = True
 
82
    timeout_cancelled = False
 
83
    new_video_pad_cb = 0
 
84
    new_pad_cb = 0
 
85
 
 
86
    def _scheduleAnalysis(self):
 
87
        # we call _analyze manually so we don't have to do tricks to keep test
 
88
        # methods alive across mainloop iterations
 
89
        pass
 
90
 
 
91
    def _scheduleTimeout(self):
 
92
        self.timeout_scheduled = True
 
93
        self.timeout_id = 1
 
94
        if self.timeout_expired:
 
95
            self._timeoutCb()
 
96
 
 
97
    def _removeTimeout(self):
 
98
        self.timeout_id = 0
 
99
        self.timeout_cancelled = True
 
100
 
 
101
    def _useDecodeBinTwo(self):
 
102
        return self.use_decodebin2
 
103
 
 
104
    def _createSource(self):
 
105
        if self.current_uri == 'foo':
 
106
            # create something that will go to paused
 
107
            source = gst.element_factory_make('videotestsrc')
 
108
            source.props.num_buffers = 1
 
109
        else:
 
110
            source = Discoverer._createSource(self)
 
111
 
 
112
        return source
 
113
 
 
114
    def _newVideoPadCb(self, pad):
 
115
        Discoverer._newVideoPadCb(self, pad)
 
116
        self.new_video_pad_cb += 1
 
117
 
 
118
    def _newPadCb(self, pad):
 
119
        Discoverer._newPadCb(self, pad)
 
120
        self.new_pad_cb += 1
 
121
 
 
122
 
 
123
class TestAnalysis(TestCase):
 
124
    def setUp(self):
 
125
        TestCase.setUp(self)
 
126
        self.discoverer = Discoverer1()
 
127
        self.discoverer.current_uri = "meh"
 
128
        self.discoverer.pipeline = gst.Bin()
 
129
 
 
130
    def tearDown(self):
 
131
        self.discoverer = None
 
132
        TestCase.tearDown(self)
 
133
 
 
134
    def testNoSource(self):
 
135
        """
 
136
        Check that discoverer errors out if it can't create a source element.
 
137
        """
 
138
        bag = {'error': None}
 
139
 
 
140
        def no_media_file_cb(disc, uri, error, error_detail):
 
141
            bag['error'] = error
 
142
 
 
143
        self.discoverer.addUri('buh://asd')
 
144
        self.discoverer.connect('discovery-error', no_media_file_cb)
 
145
        self.discoverer._analyze()
 
146
        self.failUnlessEqual(bag['error'], 'No available source handler.')
 
147
 
 
148
    def testErrorSettingPaused(self):
 
149
        """
 
150
        Check for errors setting the state of the pipeline to PAUSED.
 
151
        """
 
152
        bag = {'error': None}
 
153
 
 
154
        def no_media_file_cb(disc, uri, error, error_detail):
 
155
            bag['error'] = error
 
156
 
 
157
        self.discoverer.addUri('file://i/cant/possibly/exist/and/if/you/'
 
158
            'really/have/a/file/named/like/this/you/deserve/a/faillure')
 
159
        self.discoverer.connect('discovery-error', no_media_file_cb)
 
160
        self.discoverer._analyze()
 
161
        self.failUnlessEqual(bag['error'], 'File does not exist')
 
162
 
 
163
    def testSetTimeout(self):
 
164
        """
 
165
        Check that a timeout is set when analyzing a file.
 
166
        """
 
167
        bag = {'error': None}
 
168
 
 
169
        def discovery_error_cb(disc, uri, error, error_detail):
 
170
            bag['error'] = error
 
171
 
 
172
        self.discoverer.connect('discovery-error', discovery_error_cb)
 
173
        self.discoverer.addUri('foo')
 
174
        self.failUnlessEqual(bag['error'], None)
 
175
        self.discoverer._analyze()
 
176
        # check that a timeout is scheduled once we start analyzing so we don't
 
177
        # hang on one single file
 
178
        self.failUnless(self.discoverer.timeout_scheduled)
 
179
        self.failIf(self.discoverer.working)
 
180
        self.failUnless(bag['error'])
 
181
 
 
182
        self.discoverer.timeout_expired = False
 
183
        self.discoverer.addUri('foo')
 
184
        self.discoverer._analyze()
 
185
        # at this point the timeout is scheduled but not expired, so the
 
186
        # discoverer should still be working
 
187
        self.failUnless(self.discoverer.timeout_scheduled)
 
188
        self.failIf(self.discoverer.timeout_cancelled)
 
189
        self.failUnless(self.discoverer.working)
 
190
        # a call go _finishAnalysis() cancels the timeout
 
191
        self.discoverer._finishAnalysis("foo")
 
192
        self.failUnless(self.discoverer.timeout_cancelled)
 
193
        self.failIf(self.discoverer.working)
 
194
 
 
195
    def testQueryDuration(self):
 
196
        def pad_query_fail(pad, query):
 
197
            return pad.query_default(query)
 
198
 
 
199
        def pad_query_succeed(pad, query):
 
200
            if query.type == gst.QUERY_DURATION:
 
201
                query.set_duration(gst.FORMAT_TIME, 10 * gst.SECOND)
 
202
                return True
 
203
 
 
204
            return pad.query_default(query)
 
205
 
 
206
        def pad_query_succeed2(pad, query):
 
207
            if query.type == gst.QUERY_DURATION:
 
208
                query.set_duration(gst.FORMAT_TIME, 20 * gst.SECOND)
 
209
                return True
 
210
 
 
211
            return pad.query_default(query)
 
212
 
 
213
        pad = gst.Pad('src', gst.PAD_SRC)
 
214
        self.failUnlessEqual(self.discoverer.current_duration,
 
215
                gst.CLOCK_TIME_NONE)
 
216
 
 
217
        pad.set_query_function(pad_query_fail)
 
218
        self.discoverer._maybeQueryDuration(pad)
 
219
        self.failUnlessEqual(self.discoverer.current_duration,
 
220
                gst.CLOCK_TIME_NONE)
 
221
 
 
222
        # retry on other pads
 
223
        pad.set_query_function(pad_query_succeed)
 
224
        self.discoverer._maybeQueryDuration(pad)
 
225
        self.failUnlessEqual(self.discoverer.current_duration,
 
226
                10 * gst.SECOND)
 
227
 
 
228
        # duration should be cached
 
229
        pad.set_query_function(pad_query_succeed2)
 
230
        self.discoverer._maybeQueryDuration(pad)
 
231
        self.failUnlessEqual(self.discoverer.current_duration,
 
232
                10 * gst.SECOND)
 
233
 
 
234
    def testGetThumbnailFilenameFromPad(self):
 
235
        pad = gst.Pad('src0', gst.PAD_SRC)
 
236
        pad1 = gst.Pad('src1', gst.PAD_SRC)
 
237
        self.discoverer.current_uri = "meh"
 
238
        filename1 = self.discoverer._getThumbnailFilenameFromPad(pad)
 
239
        filename2 = self.discoverer._getThumbnailFilenameFromPad(pad)
 
240
        self.discoverer.current_uri = "boo"
 
241
        filename3 = self.discoverer._getThumbnailFilenameFromPad(pad1)
 
242
        self.failUnlessEqual(filename1, filename2)
 
243
        self.failIfEqual(filename2, filename3)
 
244
        # TODO: check for non ascii filenames (which is half broken in python
 
245
        # on UNIX anyway...)
 
246
 
 
247
    def testBusEos(self):
 
248
        bag = {'called': False}
 
249
 
 
250
        def finish_analysis(reason):
 
251
            bag['called'] = True
 
252
 
 
253
        self.discoverer._finishAnalysis = finish_analysis
 
254
        self.discoverer._busMessageEosCb(None, None)
 
255
        self.failUnless(bag['called'], True)
 
256
 
 
257
    def testBusElement(self):
 
258
        bag = {'called': False}
 
259
 
 
260
        def finish_analysis(reason):
 
261
            bag['called'] = True
 
262
 
 
263
        self.discoverer._finishAnalysis = finish_analysis
 
264
        self.failUnlessEqual(self.discoverer.error, None)
 
265
        src = gst.Pad('src', gst.PAD_SRC)
 
266
        # we ignore non-redirect messages
 
267
        structure = gst.Structure('meh')
 
268
        message = gst.message_new_element(src, structure)
 
269
        self.discoverer._busMessageElementCb(None, message)
 
270
        self.failUnlessEqual(self.discoverer.error, None)
 
271
        self.failUnlessEqual(bag['called'], False)
 
272
 
 
273
        # error out on redirects
 
274
        structure = gst.Structure('redirect')
 
275
        message = gst.message_new_element(src, structure)
 
276
        self.discoverer._busMessageElementCb(None, message)
 
277
        self.failIfEqual(self.discoverer.error, None)
 
278
        self.failUnlessEqual(bag['called'], True)
 
279
 
 
280
    def testBusError(self):
 
281
        def discovery_error_cb(discoverer, uri, error, debug, dic):
 
282
            dic['uri'] = uri
 
283
            dic['error'] = error
 
284
            dic['debug'] = debug
 
285
 
 
286
        dic = {}
 
287
        self.discoverer.connect('discovery-error', discovery_error_cb, dic)
 
288
 
 
289
        src = gst.Pad('src', gst.PAD_SRC)
 
290
        gerror = gst.GError(gst.STREAM_ERROR, gst.STREAM_ERROR_FAILED, 'meh')
 
291
        message = gst.message_new_error(src, gerror, 'debug1')
 
292
 
 
293
        self.failUnlessEqual(self.discoverer.error, None)
 
294
        self.discoverer.addUri('popme')
 
295
        self.discoverer._busMessageErrorCb(None, message)
 
296
        self.failUnlessEqual(dic['debug'], 'debug1')
 
297
 
 
298
        # errors shouldn't be overridden
 
299
        gerror = gst.GError(gst.STREAM_ERROR, gst.STREAM_ERROR_FAILED, 'muh')
 
300
        message = gst.message_new_error(src, gerror, 'debug2')
 
301
        self.discoverer.addUri('popme')
 
302
        self.discoverer._busMessageErrorCb(None, message)
 
303
        self.failUnlessEqual(dic['debug'], 'debug2')
 
304
 
 
305
    def testNewDecodedPadFixed(self):
 
306
        video = gst.Pad('video_00', gst.PAD_SRC)
 
307
        video.set_caps(gst.Caps('video/x-raw-rgb'))
 
308
        audio = gst.Pad('audio_00', gst.PAD_SRC)
 
309
        audio.set_caps(gst.Caps('audio/x-raw-int'))
 
310
 
 
311
        self.failUnlessEqual(self.discoverer.current_streams, [])
 
312
        self.discoverer._newDecodedPadCb(None, video, False)
 
313
        self.failUnlessEqual(len(self.discoverer.current_streams), 1)
 
314
        self.failUnlessEqual(self.discoverer.new_video_pad_cb, 1)
 
315
 
 
316
        self.discoverer._newDecodedPadCb(None, audio, False)
 
317
        self.failUnlessEqual(len(self.discoverer.current_streams), 2)
 
318
        self.failUnlessEqual(self.discoverer.new_video_pad_cb, 1)
 
319
 
 
320
    def testNewDecodedPadNotFixed(self):
 
321
        video_template = gst.PadTemplate('video_00', gst.PAD_SRC,
 
322
                gst.PAD_ALWAYS, gst.Caps('video/x-raw-rgb, '
 
323
                        'framerate=[0/1, %d/1]' % ((2 ** 31) - 1)))
 
324
        audio_template = gst.PadTemplate('audio_00', gst.PAD_SRC,
 
325
                gst.PAD_ALWAYS, gst.Caps('audio/x-raw-int, '
 
326
                        'rate=[1, %d]' % ((2 ** 31) - 1)))
 
327
 
 
328
        video = gst.Pad(video_template)
 
329
        audio = gst.Pad(audio_template)
 
330
        video_ghost = gst.GhostPad("video", video)
 
331
        audio_ghost = gst.GhostPad("audio", audio)
 
332
 
 
333
        self.failUnlessEqual(self.discoverer.current_streams, [])
 
334
        self.discoverer._newDecodedPadCb(None, video_ghost, False)
 
335
        self.failUnlessEqual(len(self.discoverer.current_streams), 0)
 
336
        self.failUnlessEqual(self.discoverer.new_video_pad_cb, 1)
 
337
 
 
338
        self.discoverer._newDecodedPadCb(None, audio_ghost, False)
 
339
        self.failUnlessEqual(len(self.discoverer.current_streams), 0)
 
340
        self.failUnlessEqual(self.discoverer.new_video_pad_cb, 1)
 
341
 
 
342
        # fix the caps
 
343
        video.set_caps(gst.Caps('video/x-raw-rgb, framerate=25/1'))
 
344
        self.failUnlessEqual(len(self.discoverer.current_streams), 1)
 
345
        self.failUnlessEqual(self.discoverer.new_video_pad_cb, 1)
 
346
 
 
347
        audio.set_caps(gst.Caps('audio/x-raw-int, rate=44100'))
 
348
        self.failUnlessEqual(len(self.discoverer.current_streams), 2)
 
349
        self.failUnlessEqual(self.discoverer.new_video_pad_cb, 1)
 
350
 
 
351
 
 
352
class TestStateChange(TestCase):
 
353
    def setUp(self):
 
354
        TestCase.setUp(self)
 
355
        self.discoverer = Discoverer1()
 
356
        self.discoverer.current_uri = "meh"
 
357
        # don't plug the thumbnailing branch
 
358
        self.discoverer.current_uri = 'file:///foo/bar'
 
359
        self.src = gst.Bin()
 
360
        self.discoverer.pipeline = self.src
 
361
        self.discoverer.current_duration = 10 * gst.SECOND
 
362
        self.factories = []
 
363
        self.error = None
 
364
        self.error_detail = None
 
365
 
 
366
        self.discoverer.connect('discovery-error', self.discoveryErrorCb)
 
367
        self.discoverer.connect('discovery-done',
 
368
                self.discoveryDoneCb)
 
369
 
 
370
    def tearDown(self):
 
371
        self.discoverer.disconnect_by_function(self.discoveryErrorCb)
 
372
        self.discoverer.disconnect_by_function(self.discoveryDoneCb)
 
373
        self.discoverer = None
 
374
        self.factories = None
 
375
        self.error = None
 
376
        self.src = None
 
377
        TestCase.tearDown(self)
 
378
 
 
379
    def discoveryErrorCb(self, disc, uri, error, debug):
 
380
        self.error = error
 
381
        self.error_detail = debug
 
382
 
 
383
    def discoveryDoneCb(self, disc, uri, factory):
 
384
        self.factories.append(factory)
 
385
 
 
386
    def testBusStateChangedIgnored(self):
 
387
        ignore_src = gst.Bin()
 
388
 
 
389
        # ignore element
 
390
        ignored = gst.message_new_state_changed(ignore_src,
 
391
               gst.STATE_READY, gst.STATE_PAUSED, gst.STATE_VOID_PENDING)
 
392
        self.discoverer._busMessageStateChangedCb(None, ignored)
 
393
        self.failUnlessEqual(self.factories, [])
 
394
 
 
395
        # ignore transition
 
396
        ignored = gst.message_new_state_changed(self.src,
 
397
                gst.STATE_NULL, gst.STATE_READY, gst.STATE_PAUSED)
 
398
        self.discoverer._busMessageStateChangedCb(None, ignored)
 
399
        self.failUnlessEqual(self.factories, [])
 
400
 
 
401
    def testBusStateChangedNoStreams(self):
 
402
        # no streams found
 
403
        message = gst.message_new_state_changed(self.src,
 
404
                gst.STATE_READY, gst.STATE_PAUSED, gst.STATE_VOID_PENDING)
 
405
        self.discoverer.addUri('illbepopped')
 
406
        self.failUnlessEqual(self.error, None)
 
407
        self.discoverer._busMessageStateChangedCb(None, message)
 
408
        self.failUnlessEqual(self.factories, [])
 
409
        # FIXME: be more strict about the error here
 
410
        self.failUnless(self.error)
 
411
 
 
412
    def testBusStateChangedVideoOnly(self):
 
413
        # only video
 
414
        pad = gst.Pad('src', gst.PAD_SRC)
 
415
        pad.set_caps(gst.Caps('video/x-raw-rgb'))
 
416
        self.discoverer._newDecodedPadCb(None, pad, False)
 
417
 
 
418
        self.failUnlessEqual(self.error, None)
 
419
        message = gst.message_new_state_changed(self.src,
 
420
                gst.STATE_READY, gst.STATE_PAUSED, gst.STATE_VOID_PENDING)
 
421
        self.discoverer.addUri('illbepopped')
 
422
        self.failUnlessEqual(self.error, None)
 
423
        self.discoverer._busMessageStateChangedCb(None, message)
 
424
        # should go to PLAYING to do thumbnails
 
425
        self.failUnlessEqual(self.src.get_state(0)[2], gst.STATE_PLAYING)
 
426
        self.discoverer._finishAnalysis("foo")
 
427
        self.failUnlessEqual(len(self.factories), 1)
 
428
        factory = self.factories[0]
 
429
        self.failUnless(isinstance(factory, FileSourceFactory))
 
430
        self.failUnlessEqual(len(factory.output_streams), 1)
 
431
 
 
432
    def testBusStateChangedAudioOnly(self):
 
433
        # only audio
 
434
        pad = gst.Pad('src', gst.PAD_SRC)
 
435
        pad.set_caps(gst.Caps('audio/x-raw-int'))
 
436
        self.discoverer._newDecodedPadCb(None, pad, False)
 
437
 
 
438
        self.failUnlessEqual(self.error, None)
 
439
        message = gst.message_new_state_changed(self.src,
 
440
                gst.STATE_READY, gst.STATE_PAUSED, gst.STATE_VOID_PENDING)
 
441
        self.discoverer.addUri('illbepopped')
 
442
        self.failUnlessEqual(self.error, None)
 
443
        self.discoverer._busMessageStateChangedCb(None, message)
 
444
        self.failUnlessEqual(len(self.factories), 1)
 
445
        factory = self.factories[0]
 
446
        self.failUnless(isinstance(factory, FileSourceFactory))
 
447
        self.failUnlessEqual(len(factory.output_streams), 1)
 
448
 
 
449
    def testBusStateChangedImageOnly(self):
 
450
        # only image
 
451
        pngdec = gst.element_factory_make('pngdec')
 
452
        self.discoverer.pipeline.add(pngdec)
 
453
        # images don't have duration
 
454
        self.discoverer.current_duration = gst.CLOCK_TIME_NONE
 
455
        pad = pngdec.get_pad('src')
 
456
        caps = gst.Caps(pad.get_caps()[0])
 
457
        caps[0]['width'] = 320
 
458
        caps[0]['height'] = 240
 
459
        caps[0]['framerate'] = gst.Fraction(0, 1)
 
460
        pad.set_caps(caps)
 
461
        self.discoverer._newDecodedPadCb(None, pad, False)
 
462
 
 
463
        self.failUnlessEqual(self.error, None)
 
464
        message = gst.message_new_state_changed(self.src,
 
465
                gst.STATE_READY, gst.STATE_PAUSED, gst.STATE_VOID_PENDING)
 
466
        self.discoverer.addUri('illbepopped')
 
467
        self.failUnlessEqual(self.error, None)
 
468
        self.discoverer._busMessageStateChangedCb(None, message)
 
469
        # should go to PLAYING to do thumbnails
 
470
        self.failUnlessEqual(self.src.get_state(0)[2], gst.STATE_PLAYING)
 
471
        self.discoverer._finishAnalysis("foo")
 
472
        self.failUnlessEqual(len(self.factories), 1)
 
473
        factory = self.factories[0]
 
474
        self.failUnless(isinstance(factory, PictureFileSourceFactory))
 
475
        self.failUnlessEqual(len(factory.output_streams), 1)
 
476
 
 
477
    def testDurationCheckImage(self):
 
478
        self.discoverer.current_duration = gst.CLOCK_TIME_NONE
 
479
        pngdec = gst.element_factory_make('pngdec')
 
480
        self.discoverer.pipeline.add(pngdec)
 
481
        pad = pngdec.get_pad('src')
 
482
        caps = gst.Caps(pad.get_caps()[0])
 
483
        caps[0]['width'] = 320
 
484
        caps[0]['height'] = 240
 
485
        caps[0]['framerate'] = gst.Fraction(0, 1)
 
486
        pad.set_caps(caps)
 
487
        self.discoverer._newDecodedPadCb(None, pad, False)
 
488
        self.discoverer.addUri('illbepopped')
 
489
        self.discoverer._finishAnalysis("foo")
 
490
 
 
491
        self.failUnlessEqual(self.error, None)
 
492
        self.failUnlessEqual(self.discoverer.current_duration,
 
493
                gst.CLOCK_TIME_NONE)
 
494
 
 
495
    def testDurationCheckNonImage(self):
 
496
        self.discoverer.current_duration = gst.CLOCK_TIME_NONE
 
497
        pad = gst.Pad('src', gst.PAD_SRC)
 
498
        pad.set_caps(gst.Caps('audio/x-raw-int'))
 
499
        self.discoverer._newDecodedPadCb(None, pad, False)
 
500
        self.discoverer.addUri('illbepopped')
 
501
        self.discoverer._finishAnalysis("foo")
 
502
 
 
503
        self.failUnlessEqual(self.error,
 
504
                "Could not establish the duration of the file.")
 
505
        self.failUnlessEqual(self.discoverer.current_duration,
 
506
                gst.CLOCK_TIME_NONE)