~ubuntu-branches/ubuntu/trusty/pitivi/trusty

« back to all changes in this revision

Viewing changes to tests/test_integration.py

  • Committer: Bazaar Package Importer
  • Author(s): Sebastien Bacher
  • Date: 2011-07-07 13:43:47 UTC
  • mto: (6.1.9 sid) (1.2.12)
  • mto: This revision was merged to the branch mainline in revision 32.
  • Revision ID: james.westby@ubuntu.com-20110707134347-cari9kxjiakzej9z
Tags: upstream-0.14.1
ImportĀ upstreamĀ versionĀ 0.14.1

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# PiTiVi , Non-linear video editor
2
 
#
3
 
#       tests/test_integration.py
4
 
#
5
 
# Copyright (c) 2008, Alessandro Decina <alessandro.decina@collabora.co.uk>
6
 
#
7
 
# This program is free software; you can redistribute it and/or
8
 
# modify it under the terms of the GNU Lesser General Public
9
 
# License as published by the Free Software Foundation; either
10
 
# version 2.1 of the License, or (at your option) any later version.
11
 
#
12
 
# This program is distributed in the hope that it will be useful,
13
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
14
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15
 
# Lesser General Public License for more details.
16
 
#
17
 
# You should have received a copy of the GNU Lesser General Public
18
 
# License along with this program; if not, write to the
19
 
# Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20
 
# Boston, MA 02111-1307, USA.
21
 
 
22
 
"""Test pitivi core objects at the API level, simulating the UI input for
23
 
QA scenarios """
24
 
 
25
 
import unittest
26
 
TestCase = unittest.TestCase
27
 
from pitivi.application import InteractivePitivi
28
 
from pitivi.timeline.timeline import MoveContext, TrimStartContext,\
29
 
    TrimEndContext
30
 
from pitivi.signalinterface import Signallable
31
 
from pitivi.stream import AudioStream, VideoStream
32
 
import pitivi.instance
33
 
import gobject
34
 
import os.path
35
 
import gst
36
 
import random
37
 
 
38
 
base_uri = "file:///" + os.getcwd() + "/media/"
39
 
test1 = base_uri + "test1.ogg"
40
 
test2 = base_uri + "test2.ogg"
41
 
test3 = base_uri + "test3.ogg"
42
 
 
43
 
class WatchDog(object):
44
 
 
45
 
    """A simple watchdog timer to aid developing integration tests. If
46
 
    keepAlive() is not called every <timeout> ms, then the watchdog timer will
47
 
    quit the specified mainloop."""
48
 
 
49
 
    def __init__(self, mainloop, timeout=10000):
50
 
        self.timeout = timeout
51
 
        self.mainloop = mainloop
52
 
        self.will_quit = False
53
 
        self.keep_going = True
54
 
        self.activated = False
55
 
 
56
 
    def start(self):
57
 
        self.will_quit = False
58
 
        self.keep_going = True
59
 
        gobject.timeout_add(self.timeout, self._timeoutcb)
60
 
 
61
 
    def suspend(self):
62
 
        self.keepAlive()
63
 
        self.keep_going = False
64
 
 
65
 
    def _timeoutcb(self):
66
 
        if self.will_quit:
67
 
            self.mainloop.quit()
68
 
            self.activated = True
69
 
            self.keep_going = False
70
 
        self.will_quit = True
71
 
        return self.keep_going
72
 
 
73
 
    def keepAlive(self):
74
 
        self.will_quit = False
75
 
 
76
 
class TestWatchdog(TestCase):
77
 
 
78
 
    def testWatchdog(self):
79
 
        self.ml = gobject.MainLoop()
80
 
        wd = WatchDog(self.ml, 100)
81
 
        self.timeout_called = False
82
 
        wd.start()
83
 
        gobject.timeout_add(2000, self._timeoutCb)
84
 
        self.ml.run()
85
 
        self.assertFalse(self.timeout_called)
86
 
        self.assertTrue(wd.activated)
87
 
 
88
 
    def testKeepAlive(self):
89
 
        self.ml = gobject.MainLoop()
90
 
        wd = WatchDog(self.ml, 2000)
91
 
        self.timeout_called = False
92
 
        wd.start()
93
 
        gobject.timeout_add(500, wd.keepAlive)
94
 
        gobject.timeout_add(2500, self._timeoutCb)
95
 
        self.ml.run()
96
 
        self.assertTrue(self.timeout_called)
97
 
        self.assertFalse(wd.activated)
98
 
 
99
 
    def testSuspend(self):
100
 
        self.ml = gobject.MainLoop()
101
 
        wd = WatchDog(self.ml, 500)
102
 
        self.timeout_called = False
103
 
        wd.start()
104
 
        wd.suspend()
105
 
        gobject.timeout_add(2000, self._timeoutCb)
106
 
        self.ml.run()
107
 
        self.assertTrue(self.timeout_called)
108
 
        self.assertFalse(wd.activated)
109
 
 
110
 
    def _timeoutCb(self):
111
 
        self.ml.quit()
112
 
        self.timeout_called = True
113
 
        return False
114
 
 
115
 
class Configuration(object):
116
 
 
117
 
    def __init__(self, *sources):
118
 
        self.sources = []
119
 
        self.source_map = {}
120
 
        for source in sources:
121
 
            self.addSource(*source)
122
 
 
123
 
    def clone(self):
124
 
        ret = Configuration()
125
 
        for source in self.sources:
126
 
            if len(source) == 3:
127
 
                name, uri, props = source
128
 
                ret.addSource(name, uri, dict(props))
129
 
            if len(source) == 2:
130
 
                ret.addBadSource(*source)
131
 
        return ret
132
 
 
133
 
    def addSource(self, name, uri, props=None, error=False):
134
 
        if name in self.source_map:
135
 
            raise Exception("Duplicate source: '%d' already defined" % name)
136
 
        self.sources.append((name, uri, props))
137
 
        self.source_map[name] = uri, props
138
 
 
139
 
    def updateSource(self, name, uri=None, props=None):
140
 
        def findSource(name):
141
 
            for i, source in enumerate(self.sources):
142
 
                if source[0] == name:
143
 
                    return i
144
 
            raise Exception("Source %s not in configuration" %
145
 
                name)
146
 
 
147
 
        i = findSource(name)
148
 
        name, orig_uri, orig_props = self.sources[i]
149
 
        if not uri:
150
 
            uri = orig_uri
151
 
        if props:
152
 
            orig_props.update(props)
153
 
 
154
 
        self.sources[i] = (name, uri, orig_props)
155
 
        self.source_map[name] = (uri, orig_props)
156
 
 
157
 
    def addBadSource(self, name, uri):
158
 
        if name in self.source_map:
159
 
            raise Exception("Duplicate source: '%d' already defined" % name)
160
 
        self.sources.append((name, uri))
161
 
        self.source_map[name] = uri, None
162
 
 
163
 
    def getUris(self):
164
 
        return set((source[1] for source in self.sources))
165
 
 
166
 
    def getGoodUris(self):
167
 
        return set((source[1] for source in self.sources if
168
 
            len(source) > 2))
169
 
 
170
 
    def getGoodSources(self):
171
 
        return (source for source in self.sources if len(source) > 2)
172
 
 
173
 
    def matches(self, instance_runner):
174
 
        for name, uri, props in self.getGoodSources():
175
 
            if not hasattr(instance_runner, name):
176
 
                raise Exception("Project missing source %s" % name)
177
 
            timelineObject = getattr(instance_runner, name)
178
 
            if timelineObject.factory.uri != uri:
179
 
                raise Exception("%s has wrong factory type!" % name)
180
 
            if timelineObject:
181
 
                for prop, value in props.iteritems():
182
 
                    actual = getattr(timelineObject, prop)
183
 
                    if not actual == value:
184
 
                        raise Exception("%s.%s: %r != %r" % (name, prop,
185
 
                            actual, value))
186
 
 
187
 
        names = set((source[0] for source in self.getGoodSources()))
188
 
        timelineObjects = set(instance_runner.timelineObjects.iterkeys())
189
 
        if names != timelineObjects:
190
 
            raise Exception("Project has extra sources: %r" % (timelineObjects -
191
 
                names))
192
 
 
193
 
    def __iter__(self):
194
 
        return (source for source in self.sources if len(source) > 2)
195
 
 
196
 
class InstanceRunner(Signallable):
197
 
 
198
 
    no_ui = not(os.getenv("ENABLE_UI"))
199
 
 
200
 
    class container(object):
201
 
 
202
 
        def __init__(self):
203
 
            pass
204
 
 
205
 
    __signals__ = {
206
 
        "sources-loaded" : [],
207
 
        "timeline-configured" : [],
208
 
    }
209
 
 
210
 
    def __init__(self, instance):
211
 
        self.instance = instance
212
 
        self.watchdog = WatchDog(instance.mainloop, 10000)
213
 
        self.factories = set()
214
 
        self.errors = set()
215
 
        self.project = None
216
 
        self.timeline = None
217
 
        self.tracks = {}
218
 
        self.timelineObjects = {}
219
 
        self.pending_configuration = None
220
 
        self.audioTracks = 0
221
 
        self.videoTracks = 0
222
 
        instance.connect("new-project-loaded", self._newProjectLoadedCb)
223
 
 
224
 
    def loadConfiguration(self, configuration):
225
 
        self.pending_configuration = configuration
226
 
 
227
 
    def _newProjectLoadedCb(self, instance, project):
228
 
        self.project = instance.current
229
 
        self.timeline = self.project.timeline
230
 
        for track in self.timeline.tracks:
231
 
            self._trackAddedCb(self.timeline, track)
232
 
        self.project.sources.connect("source-added", self._sourceAdded)
233
 
        self.project.sources.connect("discovery-error", self._discoveryError)
234
 
        self.project.sources.connect("ready", self._readyCb)
235
 
        self.timeline.connect("track-added", self._trackAddedCb)
236
 
 
237
 
        if self.pending_configuration:
238
 
            self._loadSources(self.pending_configuration)
239
 
 
240
 
    def _sourceAdded(self, sourcelist, factory):
241
 
        self.factories.add(factory.uri)
242
 
 
243
 
    def _discoveryError(self, sourcelist, uri, reason, unused):
244
 
        self.errors.add(uri)
245
 
 
246
 
    def _readyCb(self, soucelist):
247
 
        assert self.factories == self.pending_configuration.getGoodUris()
248
 
        if self.pending_configuration:
249
 
            self._setupTimeline(self.pending_configuration)
250
 
        self.emit("sources-loaded")
251
 
 
252
 
    def _loadSources(self, configuration):
253
 
        for uri in configuration.getUris():
254
 
            self.project.sources.addUri(uri)
255
 
 
256
 
    def _trackAddedCb(self, timeline, track):
257
 
        if type(track.stream) is AudioStream:
258
 
            self.audioTracks += 1
259
 
            attrname = "audio%d" % self.audioTracks
260
 
        elif type(track.stream) is VideoStream:
261
 
            self.videoTracks += 1
262
 
            attrname = "video%d" % self.videoTracks
263
 
        container = self.container()
264
 
        setattr(self, attrname, container)
265
 
        self.tracks[track] = container
266
 
        container.transitions = {}
267
 
        track.connect("transition-added", self._transitionAddedCb, container)
268
 
        track.connect("transition-removed", self._transitionRemovedCb,
269
 
            container)
270
 
 
271
 
    def _transitionAddedCb(self, track, transition, container):
272
 
        container.transitions[(transition.a, transition.b)] = transition
273
 
 
274
 
    def _transitionRemovedCb(self, track, transition, container):
275
 
        del container.transitions[(transition.a, transition.b)]
276
 
 
277
 
    def _setupTimeline(self, configuration):
278
 
        for name, uri, props in configuration:
279
 
            factory = self.project.sources.getUri(uri)
280
 
            if not factory:
281
 
                raise Exception("Could not find '%s' in sourcelist" %
282
 
                    source)
283
 
 
284
 
            if not props:
285
 
                continue
286
 
 
287
 
            timelineObject = self.timeline.addSourceFactory(factory)
288
 
            setattr(self, name, timelineObject)
289
 
            self.timelineObjects[name] = timelineObject
290
 
            for trackObject in timelineObject.track_objects:
291
 
                track = self.tracks[trackObject.track]
292
 
                setattr(track, name, trackObject)
293
 
 
294
 
            if not timelineObject:
295
 
                raise Exception("Could not add source '%s' to timeline" %
296
 
                    source)
297
 
            for prop, value in props.iteritems():
298
 
                setattr(timelineObject, prop, value)
299
 
        self.emit("timeline-configured")
300
 
 
301
 
    def run(self):
302
 
        self.watchdog.start()
303
 
        if self.no_ui:
304
 
            self.instance.run(["--no-ui"])
305
 
        else:
306
 
            from pitivi.ui.zoominterface import Zoomable
307
 
            # set a common zoom ratio so that things like edge snapping values
308
 
            # are consistent
309
 
            Zoomable.setZoomLevel((3 * Zoomable.zoom_steps) / 4)
310
 
            self.instance.run([])
311
 
 
312
 
    def shutDown(self):
313
 
        gobject.idle_add(self.instance.shutdown)
314
 
        self.project._dirty = False
315
 
 
316
 
class Brush(Signallable):
317
 
    """Scrubs your timelines until they're squeaky clean."""
318
 
 
319
 
    __signals__ = {
320
 
        "scrub-step" : ["time", "priority"],
321
 
        "scrub-done" : [],
322
 
    }
323
 
 
324
 
    def __init__(self, runner, delay=100, maxtime=7200, maxpriority=10):
325
 
        self.context = None
326
 
        self.time = 0
327
 
        self.priority = 0
328
 
        self.maxPriority = maxpriority
329
 
        self.maxTime = maxtime
330
 
        self.count = 0
331
 
        self.steps = 0
332
 
        self.delay = delay
333
 
        self.runner = runner
334
 
        self.watchdog = runner.watchdog
335
 
 
336
 
    def scrub(self, context, finalTime, finalPriority, steps=10):
337
 
        self.context = context
338
 
        self.time = finalTime
339
 
        self.priority = finalPriority
340
 
        self.count = 0
341
 
        self.steps = steps
342
 
        gobject.timeout_add(self.delay, self._scrubTimeoutCb)
343
 
 
344
 
    def _scrubTimeoutCb(self):
345
 
        self.watchdog.keepAlive()
346
 
        self.count += 1
347
 
        if self.count < self.steps:
348
 
            time_ = random.randint(0, self.maxTime)
349
 
            priority = random.randint(0, self.maxPriority)
350
 
            self.context.editTo(time_, priority)
351
 
            self.emit("scrub-step", time_, priority)
352
 
            return True
353
 
        else:
354
 
            self.context.editTo(self.time, self.priority)
355
 
            self.emit("scrub-step", self.time, self.priority)
356
 
            self.context.finish()
357
 
            self.emit("scrub-done")
358
 
            return False
359
 
 
360
 
class Base(TestCase):
361
 
    """
362
 
    Creates and runs an InteractivePitivi object, then starts the mainloop.
363
 
    Uses a WatchDog to ensure that test cases will eventually terminate with an
364
 
    assertion failure if runtime errors occur inside the mainloop."""
365
 
 
366
 
    def run(self, result):
367
 
        self._result = result
368
 
        self._num_failures = len(result.failures)
369
 
        self._num_errors = len(result.errors)
370
 
        TestCase.run(self, result)
371
 
 
372
 
    def setUp(self):
373
 
        TestCase.setUp(self)
374
 
        ptv = InteractivePitivi()
375
 
        # was the pitivi object created
376
 
        self.assert_(ptv)
377
 
 
378
 
        # were the contents of pitivi properly created
379
 
        self.assertEqual(ptv.current, None)
380
 
 
381
 
        # was the unique instance object properly set
382
 
        self.assertEquals(pitivi.instance.PiTiVi, ptv)
383
 
        self.ptv = ptv
384
 
 
385
 
        # create an instance runner
386
 
        self.runner = InstanceRunner(ptv)
387
 
 
388
 
    def tearDown(self):
389
 
        # make sure we aren't exiting because our watchdog activated
390
 
        self.assertFalse(self.runner.watchdog.activated)
391
 
        # make sure the instance has been unset
392
 
        will_fail = False
393
 
        if ((self._num_errors == self._result.errors) and
394
 
            (self._num_failures == self._result.failures)):
395
 
            will_fail = not (pitivi.instance.PiTiVi is None)
396
 
 
397
 
        pitivi.instance.PiTiVi = None
398
 
        del self.ptv
399
 
        del self.runner
400
 
 
401
 
        if will_fail:
402
 
            raise Exception("Instance was not unset")
403
 
        TestCase.tearDown(self)
404
 
 
405
 
class TestBasic(Base):
406
 
 
407
 
    def testWatchdog(self):
408
 
        self.runner.run()
409
 
        self.assertTrue(self.runner.watchdog.activated)
410
 
        self.runner.watchdog.activated = False
411
 
 
412
 
    def testBasic(self):
413
 
 
414
 
        def newProjectLoaded(pitivi, project):
415
 
            self.runner.shutDown()
416
 
 
417
 
        self.ptv.connect("new-project-loaded", newProjectLoaded)
418
 
        self.runner.run()
419
 
 
420
 
    def testImport(self):
421
 
 
422
 
        def sourcesLoaded(runner):
423
 
            self.runner.shutDown()
424
 
 
425
 
        config = Configuration()
426
 
        config.addSource("test1", test1)
427
 
        config.addSource("test2", test2)
428
 
        config.addBadSource("test3", test3)
429
 
 
430
 
        self.runner.connect("sources-loaded", sourcesLoaded)
431
 
        self.runner.loadConfiguration(config)
432
 
        self.runner.run()
433
 
 
434
 
        self.assertFalse(hasattr(self.runner,test1))
435
 
        self.assertFalse(hasattr(self.runner,test2))
436
 
        self.failUnlessEqual(self.runner.factories, set((test1, test2)))
437
 
        self.failUnlessEqual(self.runner.errors, set((test3,)))
438
 
 
439
 
    def testConfigureTimeline(self):
440
 
 
441
 
        config = Configuration()
442
 
        config.addSource(
443
 
            "object1",
444
 
            test1,
445
 
            {
446
 
                "start" : 0,
447
 
                "duration" : gst.SECOND,
448
 
                "media-start" : gst.SECOND,
449
 
            })
450
 
        config.addSource(
451
 
            "object2",
452
 
            test2,
453
 
            {
454
 
                "start" : gst.SECOND,
455
 
                "duration" : gst.SECOND,
456
 
            })
457
 
 
458
 
        def timelineConfigured(runner):
459
 
            config.matches(self.runner)
460
 
            self.runner.shutDown()
461
 
 
462
 
        self.runner.loadConfiguration(config)
463
 
        self.runner.connect("timeline-configured", timelineConfigured)
464
 
        self.runner.run()
465
 
 
466
 
        self.assertTrue(self.runner.object1)
467
 
        self.assertTrue(self.runner.object2)
468
 
        self.assertTrue(self.runner.video1.object1)
469
 
        self.assertTrue(self.runner.audio1.object2)
470
 
 
471
 
    def testMoveSources(self):
472
 
        initial = Configuration()
473
 
        initial.addSource(
474
 
            "object1",
475
 
            test1,
476
 
            {
477
 
                "start" : 0,
478
 
                "duration" : gst.SECOND,
479
 
                "media-start" : gst.SECOND,
480
 
                "priority" : 0
481
 
            })
482
 
        initial.addSource(
483
 
            "object2",
484
 
            test2,
485
 
            {
486
 
                "start" : gst.SECOND,
487
 
                "duration" : gst.SECOND,
488
 
                "priority" : 1,
489
 
            })
490
 
        final = Configuration()
491
 
        final.addSource(
492
 
            "object1",
493
 
            test1,
494
 
            {
495
 
                "start" : 10 * gst.SECOND,
496
 
            })
497
 
        final.addSource(
498
 
            "object2",
499
 
            test2,
500
 
            {
501
 
                "start" : 11 * gst.SECOND,
502
 
                "priority" : 2,
503
 
            })
504
 
 
505
 
        def timelineConfigured(runner):
506
 
            context = MoveContext(self.runner.timeline,
507
 
                self.runner.video1.object1,
508
 
                set((self.runner.audio1.object2,)))
509
 
            brush.scrub(context, 10 * gst.SECOND, 1, steps=10)
510
 
 
511
 
        def scrubStep(brush, time, priority):
512
 
            pass
513
 
 
514
 
        def scrubDone(brush):
515
 
            final.matches(self.runner)
516
 
            self.runner.shutDown()
517
 
 
518
 
        self.runner.loadConfiguration(initial)
519
 
        self.runner.connect("timeline-configured", timelineConfigured)
520
 
 
521
 
        brush = Brush(self.runner)
522
 
        brush.connect("scrub-step", scrubStep)
523
 
        brush.connect("scrub-done", scrubDone)
524
 
 
525
 
        self.runner.run()
526
 
 
527
 
    def testRippleMoveSimple(self):
528
 
 
529
 
        initial = Configuration()
530
 
        initial.addSource('clip1', test1, {
531
 
            "duration" : gst.SECOND,
532
 
            "start" : gst.SECOND,
533
 
            "priority" : 2})
534
 
        initial.addSource('clip2', test1, {
535
 
            "duration" : gst.SECOND,
536
 
            "start" : 2 * gst.SECOND,
537
 
            "priority" : 5})
538
 
        final = Configuration()
539
 
        final.addSource('clip1', test1, {
540
 
            "duration" : gst.SECOND,
541
 
            "start" : 11 * gst.SECOND,
542
 
            "priority" : 0})
543
 
        final.addSource('clip2', test1, {
544
 
            "duration" : gst.SECOND,
545
 
            "start" : 12 * gst.SECOND,
546
 
            "priority" : 3})
547
 
 
548
 
        def timelineConfigured(runner):
549
 
            initial.matches(self.runner)
550
 
            context = MoveContext(self.runner.timeline,
551
 
                self.runner.video1.clip1, set())
552
 
            context.setMode(context.RIPPLE)
553
 
            brush.scrub(context, 11 * gst.SECOND, 0, steps=0)
554
 
 
555
 
        def scrubDone(brush):
556
 
            final.matches(self.runner)
557
 
            self.runner.shutDown()
558
 
 
559
 
        self.runner.connect("timeline-configured", timelineConfigured)
560
 
        brush = Brush(self.runner)
561
 
        brush.connect("scrub-done", scrubDone)
562
 
 
563
 
        self.runner.loadConfiguration(initial)
564
 
        self.runner.run()
565
 
 
566
 
    def testRippleTrimStartSimple(self):
567
 
        initial = Configuration()
568
 
        initial.addSource('clip1', test1,
569
 
            {
570
 
                "start" : gst.SECOND,
571
 
                "duration" : gst.SECOND,
572
 
            })
573
 
        initial.addSource('clip2', test1,
574
 
            {
575
 
                "start" : 2 * gst.SECOND,
576
 
                "duration" : gst.SECOND,
577
 
            })
578
 
        initial.addSource('clip3', test1,
579
 
            {
580
 
                "start" : 5 * gst.SECOND,
581
 
                "duration" : 10 * gst.SECOND,
582
 
            })
583
 
 
584
 
        final = Configuration()
585
 
        final.addSource('clip1', test1,
586
 
            {
587
 
                "start" : 6 * gst.SECOND,
588
 
                "duration": gst.SECOND,
589
 
            })
590
 
        final.addSource('clip2', test1,
591
 
            {
592
 
                "start" : 7 * gst.SECOND,
593
 
                "duration" : gst.SECOND,
594
 
            })
595
 
        final.addSource('clip3', test1,
596
 
            {
597
 
                "start" : 10 * gst.SECOND,
598
 
                "duration" : 5 * gst.SECOND,
599
 
            })
600
 
 
601
 
        self.runner.loadConfiguration(initial)
602
 
        def timelineConfigured(runner):
603
 
            context = TrimStartContext(self.runner.timeline,
604
 
                self.runner.video1.clip3, set())
605
 
            context.setMode(context.RIPPLE)
606
 
            brush.scrub(context, 10 * gst.SECOND, 0)
607
 
        self.runner.connect("timeline-configured", timelineConfigured)
608
 
 
609
 
        def scrubDone(brush):
610
 
            final.matches(self.runner)
611
 
            self.runner.shutDown()
612
 
 
613
 
        brush = Brush(self.runner)
614
 
        brush.connect("scrub-done", scrubDone)
615
 
        self.runner.run()
616
 
 
617
 
from pitivi.pipeline import PipelineError
618
 
 
619
 
class TestSeeking(Base):
620
 
 
621
 
    count = 0
622
 
    steps = 0
623
 
    cur_pos = 0
624
 
 
625
 
    config = Configuration()
626
 
    for i in xrange(0, 10):
627
 
        config.addSource("clip%d" % i, test1, {
628
 
            "start" : i * gst.SECOND,
629
 
            "duration" : gst.SECOND,
630
 
            "priority" : i % 2,
631
 
        })
632
 
 
633
 
 
634
 
    def _startSeeking(self, interval, steps=10):
635
 
        self.count = 0
636
 
        self.steps = steps
637
 
        self.positions = 0
638
 
        self.runner.project.pipeline.connect("position", self._positionCb)
639
 
        gobject.timeout_add(interval, self._seekTimeoutCb)
640
 
 
641
 
    def _seekTimeoutCb(self):
642
 
        if self.count < self.steps:
643
 
            self.runner.watchdog.keepAlive()
644
 
            self.count += 1
645
 
            self.cur_pos = random.randint(0,
646
 
                self.runner.timeline.duration)
647
 
            self.runner.project.pipeline.seek(self.cur_pos)
648
 
            return True
649
 
        self.failUnlessEqual(self.positions, self.count)
650
 
        self.runner.shutDown()
651
 
        return False
652
 
 
653
 
    def _positionCb(self, pipeline, position):
654
 
        self.positions += 1
655
 
        self.failUnlessEqual(position,
656
 
            self.cur_pos)
657
 
 
658
 
    def testSeeking(self):
659
 
 
660
 
        self.runner.loadConfiguration(self.config)
661
 
 
662
 
        def timelineConfigured(runner):
663
 
            self._startSeeking(100, 10)
664
 
 
665
 
        def timelineConfiguredNoUI(runner):
666
 
            self.runner.shutDown()
667
 
 
668
 
        if self.runner.no_ui:
669
 
            print "UI Disabled: Skipping Seeking Test. " \
670
 
                "Use ENABLE_UI to test" \
671
 
                " seeking"
672
 
            self.runner.connect("timeline-configured", timelineConfiguredNoUI)
673
 
        else:
674
 
            self.runner.connect("timeline-configured", timelineConfigured)
675
 
 
676
 
        self.runner.run()
677
 
 
678
 
class TestRippleExtensive(Base):
679
 
 
680
 
    """Test suite for ripple editing minutia and corner-cases"""
681
 
 
682
 
    def __init__(self, unknown):
683
 
        # The following set of tests share common configuration, harness, and
684
 
        # business logic. We create the configurations in the constructor to
685
 
        # avoid having to re-create them for every test.
686
 
 
687
 
        # create a seqence of adjacent clips in staggered formation, each one
688
 
        # second long
689
 
        self.initial = Configuration()
690
 
        self.finals = []
691
 
        for i in xrange(0, 10):
692
 
            self.initial.addSource('clip%d' % i, test1,
693
 
                { 'start' : gst.SECOND * i, 'duration' : gst.SECOND,
694
 
                    'priority' : i % 2 })
695
 
            # we're going to repeat the same operation using each clip as the
696
 
            # focus of the editing context. We create one final
697
 
            # configuration for the expected result of each scenario.
698
 
            final = Configuration()
699
 
            for j in xrange(0, 10):
700
 
                if j < i:
701
 
                    final.addSource('clip%d' % j, test1,
702
 
                        { 'start' : gst.SECOND * j,
703
 
                          'duration' : gst.SECOND,
704
 
                          'priority' : j % 2})
705
 
                else:
706
 
                    final.addSource('clip%d' % j, test1,
707
 
                        { 'start' : gst.SECOND * (j + 10),
708
 
                          'duration' : gst.SECOND,
709
 
                          'priority' : (j % 2) + 1})
710
 
            self.finals.append(final)
711
 
        Base.__init__(self, unknown)
712
 
 
713
 
    def setUp(self):
714
 
        Base.setUp(self)
715
 
        self.cur = 0
716
 
        self.context = None
717
 
        self.brush = Brush(self.runner)
718
 
        self.runner.loadConfiguration(self.initial)
719
 
        self.runner.connect("timeline-configured", self.timelineConfigured)
720
 
        self.brush.connect("scrub-done", self.scenarioDone)
721
 
 
722
 
    # when the timeline is configured, kick off the test by starting the
723
 
    # first scenario
724
 
    def timelineConfigured(self, runner):
725
 
        self.nextScenario()
726
 
 
727
 
    # for each scenario, create the context using the specified clip as
728
 
    # focus, and not specifying any other clips.
729
 
    def nextScenario(self):
730
 
        cur = self.cur
731
 
        clipname = "clip%d" % cur
732
 
        context = MoveContext(self.runner.timeline,
733
 
            getattr(self.runner.video1, clipname), set())
734
 
        context.snap(False)
735
 
        context.setMode(context.RIPPLE)
736
 
        self.context = context
737
 
        # this isn't a method, but an attribute that will be set by specific
738
 
        # test cases
739
 
        self.scrub_func(context, (cur + 10) * gst.SECOND, (cur % 2) + 1)
740
 
 
741
 
    # when each scrub has finished, verify the current configuration is
742
 
    # correct, reset the timeline, and kick off the next scenario. Shut down
743
 
    # pitivi when we have finished the last scenario.
744
 
    def scenarioDone(self, brush):
745
 
        cur = self.cur
746
 
        config = self.finals[cur]
747
 
        context = self.context
748
 
        context.finish()
749
 
        config.matches(self.runner)
750
 
        restore = MoveContext(self.runner.timeline, context.focus, set())
751
 
        restore.setMode(restore.RIPPLE)
752
 
        restore.editTo(cur * gst.SECOND, (cur % 2))
753
 
        restore.finish()
754
 
        self.initial.matches(self.runner)
755
 
        self.cur += 1
756
 
        if self.cur < 10:
757
 
            self.nextScenario()
758
 
        else:
759
 
            self.runner.shutDown()
760
 
 
761
 
    def testRippleMoveComplex(self):
762
 
        # in this test we move directly to the given position (steps=0)
763
 
        def rippleMoveComplexScrubFunc(context, position, priority):
764
 
            self.brush.scrub(context, position, priority, steps=0)
765
 
        self.scrub_func = rippleMoveComplexScrubFunc
766
 
        self.runner.run()
767
 
 
768
 
    def testRippleMoveComplexRandom(self):
769
 
        # same as above test, but scrub randomly (steps=100)
770
 
        # FIXME: this test fails for unknown reasons
771
 
        def rippleMoveComplexRandomScrubFunc(context, position, priority):
772
 
            self.brush.scrub(context, position, priority, steps=100)
773
 
        self.scrub_func = rippleMoveComplexRandomScrubFunc
774
 
        self.runner.run()
775
 
 
776
 
class TestTransitions(Base):
777
 
 
778
 
    def testSimple(self):
779
 
        initial = Configuration()
780
 
        initial.addSource(
781
 
            "object1",
782
 
            test1,
783
 
            {
784
 
                "start" : 0,
785
 
                "duration" : 5 * gst.SECOND,
786
 
                "priority" : 0,
787
 
            })
788
 
        initial.addSource(
789
 
            "object2",
790
 
            test1,
791
 
            {
792
 
                "start" : 5 * gst.SECOND,
793
 
                "duration" : 5 * gst.SECOND,
794
 
                "priority" : 0,
795
 
            })
796
 
        initial.addSource(
797
 
            "object3",
798
 
            test1,
799
 
            {
800
 
                "start" : 10 * gst.SECOND,
801
 
                "duration" : 5 * gst.SECOND,
802
 
                "priority" : 0,
803
 
            })
804
 
 
805
 
        moves = [
806
 
            (9 * gst.SECOND, 0),
807
 
            (1 * gst.SECOND, 0),
808
 
        ]
809
 
 
810
 
        expected = [
811
 
            ("object2", "object3", 10 * gst.SECOND, 4 * gst.SECOND, 0),
812
 
            ("object1", "object2", 1 * gst.SECOND, 4 * gst.SECOND, 0),
813
 
        ]
814
 
 
815
 
        def timelineConfigured(runner):
816
 
            nextMove()
817
 
 
818
 
        def nextMove():
819
 
            if moves:
820
 
                self._cur_move = moves.pop(0)
821
 
                context = MoveContext(self.runner.timeline,
822
 
                    self.runner.video1.object2,
823
 
                        set([self.runner.video1.object2]))
824
 
                brush.scrub(context, self._cur_move[0], self._cur_move[1], steps=10)
825
 
            else:
826
 
                self.runner.shutDown()
827
 
 
828
 
        def scrubDone(brush):
829
 
            a, b, start, duration, priority = expected.pop(0)
830
 
            a = getattr(self.runner.video1, a)
831
 
            b = getattr(self.runner.video1, b)
832
 
 
833
 
            tr = self.runner.video1.transitions[(a, b)]
834
 
 
835
 
            self.failUnlessEqual(b.start, start)
836
 
            self.failUnlessEqual(a.start + a.duration - start,
837
 
                duration)
838
 
            self.failUnlessEqual(tr.start, start)
839
 
            self.failUnlessEqual(tr.duration, duration)
840
 
            self.failUnlessEqual(tr.priority, 0)
841
 
            self.failUnlessEqual(a.priority, 0)
842
 
            self.failUnlessEqual(b.priority, 0)
843
 
            nextMove()
844
 
 
845
 
        self.runner.loadConfiguration(initial)
846
 
        self.runner.connect("timeline-configured", timelineConfigured)
847
 
 
848
 
        brush = Brush(self.runner)
849
 
        brush.connect("scrub-done", scrubDone)
850
 
 
851
 
        self.runner.run()
852
 
 
853
 
    def testNoTransitionWhenMovingMultipleClips(self):
854
 
        initial = Configuration()
855
 
        initial.addSource(
856
 
            "object1",
857
 
            test1,
858
 
            {
859
 
                "start" : 0,
860
 
                "duration" : 5 * gst.SECOND,
861
 
                "priority" : 0,
862
 
            })
863
 
        initial.addSource(
864
 
            "object2",
865
 
            test1,
866
 
            {
867
 
                "start" : 5 * gst.SECOND,
868
 
                "duration" : 5 * gst.SECOND,
869
 
                "priority" : 0,
870
 
            })
871
 
        initial.addSource(
872
 
            "object3",
873
 
            test1,
874
 
            {
875
 
                "start" : 10 * gst.SECOND,
876
 
                "duration" : 5 * gst.SECOND,
877
 
                "priority" : 0,
878
 
            })
879
 
 
880
 
        moves = [
881
 
            ("object1", 9 * gst.SECOND, 0),
882
 
            ("object3", 1* gst.SECOND, 0),
883
 
        ]
884
 
 
885
 
        def timelineConfigured(runner):
886
 
            nextMove()
887
 
 
888
 
        def nextMove():
889
 
            if moves:
890
 
                self._cur_move = moves.pop(0)
891
 
                other, start, priority = self._cur_move
892
 
                context = MoveContext(self.runner.timeline,
893
 
                    self.runner.video1.object2,
894
 
                        set([getattr(self.runner.video1, other)]))
895
 
                brush.scrub(context, start, priority, steps=10)
896
 
            else:
897
 
                self.runner.shutDown()
898
 
 
899
 
        def scrubDone(brush):
900
 
            self.failUnlessEqual(self.runner.video1.transitions,
901
 
                {})
902
 
            initial.matches(self.runner)
903
 
            nextMove()
904
 
 
905
 
        self.runner.loadConfiguration(initial)
906
 
        self.runner.connect("timeline-configured", timelineConfigured)
907
 
 
908
 
        brush = Brush(self.runner)
909
 
        brush.connect("scrub-done", scrubDone)
910
 
 
911
 
        self.runner.run()
912
 
 
913
 
    def testOverlapOnlyWithValidTransitions(self):
914
 
        initial = Configuration()
915
 
        initial.addSource(
916
 
            "object1",
917
 
            test1,
918
 
            {
919
 
                "start" : 0,
920
 
                "duration" : 5 * gst.SECOND,
921
 
                "priority" : 0,
922
 
            })
923
 
        initial.addSource(
924
 
            "object2",
925
 
            test1,
926
 
            {
927
 
                "start" : 5 * gst.SECOND,
928
 
                "duration" : 3 * gst.SECOND,
929
 
                "priority" : 0,
930
 
            })
931
 
        initial.addSource(
932
 
            "object3",
933
 
            test1,
934
 
            {
935
 
                "start" : 8 * gst.SECOND,
936
 
                "duration" : 5 * gst.SECOND,
937
 
                "priority" : 0,
938
 
            })
939
 
 
940
 
        phase2 = initial.clone()
941
 
        phase2.updateSource(
942
 
            "object2",
943
 
            props={
944
 
                "start" : 4 * gst.SECOND,
945
 
            })
946
 
 
947
 
        phase3 = phase2.clone()
948
 
        phase3.updateSource(
949
 
            "object3",
950
 
            props={
951
 
                "duration" : 1 * gst.SECOND
952
 
            })
953
 
 
954
 
        phase4 = initial.clone()
955
 
        phase4.updateSource(
956
 
            "object2",
957
 
            props={
958
 
                "start" : 3 * gst.SECOND,
959
 
            })
960
 
        phase4.updateSource(
961
 
            "object3",
962
 
            props={
963
 
                "start" : 5 * gst.SECOND,
964
 
                "duration" : 5 * gst.SECOND,
965
 
            })
966
 
 
967
 
        moves = [
968
 
            # [1------]    [3--[2==]]
969
 
            (MoveContext, "object2", 9 * gst.SECOND, 0, initial, []),
970
 
 
971
 
            # [1--[2=]]    [3-------]
972
 
            (MoveContext, "object2", 1 * gst.SECOND, 0, initial, []),
973
 
 
974
 
            # [1------]    [3-------]
975
 
            #        [2--]
976
 
            (MoveContext, "object2", 4 * gst.SECOND, 0, phase2,
977
 
                [("object1", "object2")]),
978
 
 
979
 
            # Activates overlap prevention
980
 
            # [1------]
981
 
            #      [3-------]
982
 
            #        [2--]
983
 
 
984
 
            (MoveContext, "object3", 3 * gst.SECOND, 0, phase2,
985
 
                [("object1", "object2")]),
986
 
 
987
 
            # [1------]  [3-]
988
 
            #        [2--]
989
 
            (TrimEndContext, "object3", 9 * gst.SECOND, 0, phase3,
990
 
                [("object1", "object2")]),
991
 
 
992
 
            # Activates overlap prevention
993
 
            # [1------]
994
 
            #        [3-]
995
 
            #        [2--]
996
 
            (MoveContext, "object3", 4 * gst.SECOND, 0, phase3,
997
 
                [("object1", "object2")]),
998
 
 
999
 
            # Activates overlap prevention
1000
 
            # [1------]
1001
 
            #       [3]
1002
 
            #        [2--]
1003
 
            (MoveContext, "object3", long(3.5 * gst.SECOND), 0, phase3,
1004
 
                [("object1", "object2")]),
1005
 
 
1006
 
            # Activates overlap prevention
1007
 
            # [1      ]
1008
 
            #         [3]
1009
 
            #        [2  ]
1010
 
            (MoveContext, "object3", long(4.5 * gst.SECOND), 0,
1011
 
                phase3, [("object1", "object2")]),
1012
 
 
1013
 
            # Next few commands build this arrangement
1014
 
            # [1      ]
1015
 
            #     [2    ]
1016
 
            #          [3   ]
1017
 
 
1018
 
            (MoveContext, "object2", 3 * gst.SECOND, 0,
1019
 
                None, None),
1020
 
            (MoveContext, "object3", 5 * gst.SECOND, 0,
1021
 
                None, None),
1022
 
            (TrimEndContext, "object3", 10 * gst.SECOND, 0,
1023
 
                phase4, [("object1", "object2"), ("object2",
1024
 
                    "object3")]),
1025
 
 
1026
 
            # Activates Overlap Prevention
1027
 
            # [1      ]
1028
 
            #     [2    ]
1029
 
            #       [3   ]
1030
 
 
1031
 
            (MoveContext, "object3", 4 * gst.SECOND, 0,
1032
 
                phase4, [("object1", "object2"),
1033
 
                    ("object2", "object3")]),
1034
 
 
1035
 
        ]
1036
 
 
1037
 
        nmoves = len(moves)
1038
 
 
1039
 
        def timelineConfigured(runner):
1040
 
            nextMove()
1041
 
 
1042
 
        def nextMove():
1043
 
            if moves:
1044
 
                print "cur_move: %d/%d" % (nmoves - len(moves) + 1, nmoves)
1045
 
                self._cur_move = moves.pop(0)
1046
 
                context, focus, start, priority, config, trans = self._cur_move
1047
 
                obj = getattr(self.runner.video1, focus)
1048
 
                context = context(self.runner.timeline,
1049
 
                     obj, set())
1050
 
                brush.scrub(context, start, priority, steps=10)
1051
 
            else:
1052
 
                self.runner.shutDown()
1053
 
 
1054
 
        def scrubDone(brush):
1055
 
            connect, focus, stream, priority, config, trans = self._cur_move
1056
 
 
1057
 
            if config:
1058
 
                config.matches(self.runner)
1059
 
 
1060
 
            if trans:
1061
 
                expected = set([(getattr(self.runner.video1, a),
1062
 
                    getattr(self.runner.video1, b)) for a, b in
1063
 
                        trans])
1064
 
 
1065
 
                self.failUnlessEqual(set(self.runner.video1.transitions.keys()),
1066
 
                   expected)
1067
 
            nextMove()
1068
 
 
1069
 
        self.runner.loadConfiguration(initial)
1070
 
        self.runner.connect("timeline-configured", timelineConfigured)
1071
 
 
1072
 
        brush = Brush(self.runner)
1073
 
        brush.connect("scrub-done", scrubDone)
1074
 
 
1075
 
        self.runner.run()
1076
 
 
1077
 
    def testSaveAndLoadWithTransitions(self):
1078
 
        pass