~0x44/nova/extdoc

« back to all changes in this revision

Viewing changes to vendor/Twisted-10.0.0/twisted/trial/test/test_runner.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (c) 2005-2010 Twisted Matrix Laboratories.
 
2
# See LICENSE for details.
 
3
#
 
4
# Maintainer: Jonathan Lange
 
5
# Author: Robert Collins
 
6
 
 
7
 
 
8
import StringIO, os, sys
 
9
from zope.interface import implements
 
10
 
 
11
from twisted.trial.itrial import IReporter, ITestCase
 
12
from twisted.trial import unittest, runner, reporter, util
 
13
from twisted.python import failure, log, reflect, filepath
 
14
from twisted.scripts import trial
 
15
from twisted.plugins import twisted_trial
 
16
from twisted import plugin
 
17
from twisted.internet import defer
 
18
 
 
19
 
 
20
pyunit = __import__('unittest')
 
21
 
 
22
 
 
23
class CapturingDebugger(object):
 
24
 
 
25
    def __init__(self):
 
26
        self._calls = []
 
27
 
 
28
    def runcall(self, *args, **kwargs):
 
29
        self._calls.append('runcall')
 
30
        args[0](*args[1:], **kwargs)
 
31
 
 
32
 
 
33
 
 
34
class CapturingReporter(object):
 
35
    """
 
36
    Reporter that keeps a log of all actions performed on it.
 
37
    """
 
38
 
 
39
    implements(IReporter)
 
40
 
 
41
    stream = None
 
42
    tbformat = None
 
43
    args = None
 
44
    separator = None
 
45
    testsRun = None
 
46
 
 
47
    def __init__(self, stream=None, tbformat=None, rterrors=None,
 
48
                 publisher=None):
 
49
        """
 
50
        Create a capturing reporter.
 
51
        """
 
52
        self._calls = []
 
53
        self.shouldStop = False
 
54
        self._stream = stream
 
55
        self._tbformat = tbformat
 
56
        self._rterrors = rterrors
 
57
        self._publisher = publisher
 
58
 
 
59
 
 
60
    def startTest(self, method):
 
61
        """
 
62
        Report the beginning of a run of a single test method
 
63
        @param method: an object that is adaptable to ITestMethod
 
64
        """
 
65
        self._calls.append('startTest')
 
66
 
 
67
 
 
68
    def stopTest(self, method):
 
69
        """
 
70
        Report the status of a single test method
 
71
        @param method: an object that is adaptable to ITestMethod
 
72
        """
 
73
        self._calls.append('stopTest')
 
74
 
 
75
 
 
76
    def cleanupErrors(self, errs):
 
77
        """called when the reactor has been left in a 'dirty' state
 
78
        @param errs: a list of L{twisted.python.failure.Failure}s
 
79
        """
 
80
        self._calls.append('cleanupError')
 
81
 
 
82
 
 
83
    def addSuccess(self, test):
 
84
        self._calls.append('addSuccess')
 
85
 
 
86
 
 
87
    def done(self):
 
88
        """
 
89
        Do nothing. These tests don't care about done.
 
90
        """
 
91
 
 
92
 
 
93
 
 
94
class TrialRunnerTestsMixin:
 
95
    """
 
96
    Mixin defining tests for L{runner.TrialRunner}.
 
97
    """
 
98
    def tearDown(self):
 
99
        self.runner._tearDownLogFile()
 
100
 
 
101
 
 
102
    def test_empty(self):
 
103
        """
 
104
        Empty test method, used by the other tests.
 
105
        """
 
106
 
 
107
 
 
108
    def _getObservers(self):
 
109
        return log.theLogPublisher.observers
 
110
 
 
111
 
 
112
    def test_addObservers(self):
 
113
        """
 
114
        Any log system observers L{TrialRunner.run} adds are removed by the
 
115
        time it returns.
 
116
        """
 
117
        originalCount = len(self._getObservers())
 
118
        self.runner.run(self.test)
 
119
        newCount = len(self._getObservers())
 
120
        self.assertEqual(newCount, originalCount)
 
121
 
 
122
 
 
123
    def test_logFileAlwaysActive(self):
 
124
        """
 
125
        Test that a new file is opened on each run.
 
126
        """
 
127
        oldSetUpLogFile = self.runner._setUpLogFile
 
128
        l = []
 
129
        def setUpLogFile():
 
130
            oldSetUpLogFile()
 
131
            l.append(self.runner._logFileObserver)
 
132
        self.runner._setUpLogFile = setUpLogFile
 
133
        self.runner.run(self.test)
 
134
        self.runner.run(self.test)
 
135
        self.failUnlessEqual(len(l), 2)
 
136
        self.failIf(l[0] is l[1], "Should have created a new file observer")
 
137
 
 
138
 
 
139
    def test_logFileGetsClosed(self):
 
140
        """
 
141
        Test that file created is closed during the run.
 
142
        """
 
143
        oldSetUpLogFile = self.runner._setUpLogFile
 
144
        l = []
 
145
        def setUpLogFile():
 
146
            oldSetUpLogFile()
 
147
            l.append(self.runner._logFileObject)
 
148
        self.runner._setUpLogFile = setUpLogFile
 
149
        self.runner.run(self.test)
 
150
        self.failUnlessEqual(len(l), 1)
 
151
        self.failUnless(l[0].closed)
 
152
 
 
153
 
 
154
 
 
155
class TestTrialRunner(TrialRunnerTestsMixin, unittest.TestCase):
 
156
    """
 
157
    Tests for L{runner.TrialRunner} with the feature to turn unclean errors
 
158
    into warnings disabled.
 
159
    """
 
160
    def setUp(self):
 
161
        self.stream = StringIO.StringIO()
 
162
        self.runner = runner.TrialRunner(CapturingReporter, stream=self.stream)
 
163
        self.test = TestTrialRunner('test_empty')
 
164
 
 
165
 
 
166
    def test_publisher(self):
 
167
        """
 
168
        The reporter constructed by L{runner.TrialRunner} is passed
 
169
        L{twisted.python.log} as the value for the C{publisher} parameter.
 
170
        """
 
171
        result = self.runner._makeResult()
 
172
        self.assertIdentical(result._publisher, log)
 
173
 
 
174
 
 
175
 
 
176
class TrialRunnerWithUncleanWarningsReporter(TrialRunnerTestsMixin,
 
177
                                             unittest.TestCase):
 
178
    """
 
179
    Tests for the TrialRunner's interaction with an unclean-error suppressing
 
180
    reporter.
 
181
    """
 
182
 
 
183
    def setUp(self):
 
184
        self.stream = StringIO.StringIO()
 
185
        self.runner = runner.TrialRunner(CapturingReporter, stream=self.stream,
 
186
                                         uncleanWarnings=True)
 
187
        self.test = TestTrialRunner('test_empty')
 
188
 
 
189
 
 
190
 
 
191
class DryRunMixin(object):
 
192
 
 
193
    suppress = [util.suppress(
 
194
        category=DeprecationWarning,
 
195
        message="Test visitors deprecated in Twisted 8.0")]
 
196
 
 
197
 
 
198
    def setUp(self):
 
199
        self.log = []
 
200
        self.stream = StringIO.StringIO()
 
201
        self.runner = runner.TrialRunner(CapturingReporter,
 
202
                                         runner.TrialRunner.DRY_RUN,
 
203
                                         stream=self.stream)
 
204
        self.makeTestFixtures()
 
205
 
 
206
 
 
207
    def makeTestFixtures(self):
 
208
        """
 
209
        Set C{self.test} and C{self.suite}, where C{self.suite} is an empty
 
210
        TestSuite.
 
211
        """
 
212
 
 
213
 
 
214
    def test_empty(self):
 
215
        """
 
216
        If there are no tests, the reporter should not receive any events to
 
217
        report.
 
218
        """
 
219
        result = self.runner.run(runner.TestSuite())
 
220
        self.assertEqual(result._calls, [])
 
221
 
 
222
 
 
223
    def test_singleCaseReporting(self):
 
224
        """
 
225
        If we are running a single test, check the reporter starts, passes and
 
226
        then stops the test during a dry run.
 
227
        """
 
228
        result = self.runner.run(self.test)
 
229
        self.assertEqual(result._calls, ['startTest', 'addSuccess', 'stopTest'])
 
230
 
 
231
 
 
232
    def test_testsNotRun(self):
 
233
        """
 
234
        When we are doing a dry run, the tests should not actually be run.
 
235
        """
 
236
        self.runner.run(self.test)
 
237
        self.assertEqual(self.log, [])
 
238
 
 
239
 
 
240
 
 
241
class DryRunTest(DryRunMixin, unittest.TestCase):
 
242
    """
 
243
    Check that 'dry run' mode works well with Trial tests.
 
244
    """
 
245
    def makeTestFixtures(self):
 
246
        class MockTest(unittest.TestCase):
 
247
            def test_foo(test):
 
248
                self.log.append('test_foo')
 
249
        self.test = MockTest('test_foo')
 
250
        self.suite = runner.TestSuite()
 
251
 
 
252
 
 
253
 
 
254
class PyUnitDryRunTest(DryRunMixin, unittest.TestCase):
 
255
    """
 
256
    Check that 'dry run' mode works well with stdlib unittest tests.
 
257
    """
 
258
    def makeTestFixtures(self):
 
259
        class PyunitCase(pyunit.TestCase):
 
260
            def test_foo(self):
 
261
                pass
 
262
        self.test = PyunitCase('test_foo')
 
263
        self.suite = pyunit.TestSuite()
 
264
 
 
265
 
 
266
 
 
267
class TestRunner(unittest.TestCase):
 
268
    def setUp(self):
 
269
        self.config = trial.Options()
 
270
        # whitebox hack a reporter in, because plugins are CACHED and will
 
271
        # only reload if the FILE gets changed.
 
272
 
 
273
        parts = reflect.qual(CapturingReporter).split('.')
 
274
        package = '.'.join(parts[:-1])
 
275
        klass = parts[-1]
 
276
        plugins = [twisted_trial._Reporter(
 
277
            "Test Helper Reporter",
 
278
            package,
 
279
            description="Utility for unit testing.",
 
280
            longOpt="capturing",
 
281
            shortOpt=None,
 
282
            klass=klass)]
 
283
 
 
284
 
 
285
        # XXX There should really be a general way to hook the plugin system
 
286
        # for tests.
 
287
        def getPlugins(iface, *a, **kw):
 
288
            self.assertEqual(iface, IReporter)
 
289
            return plugins + list(self.original(iface, *a, **kw))
 
290
 
 
291
        self.original = plugin.getPlugins
 
292
        plugin.getPlugins = getPlugins
 
293
 
 
294
        self.standardReport = ['startTest', 'addSuccess', 'stopTest',
 
295
                               'startTest', 'addSuccess', 'stopTest',
 
296
                               'startTest', 'addSuccess', 'stopTest',
 
297
                               'startTest', 'addSuccess', 'stopTest',
 
298
                               'startTest', 'addSuccess', 'stopTest',
 
299
                               'startTest', 'addSuccess', 'stopTest',
 
300
                               'startTest', 'addSuccess', 'stopTest']
 
301
 
 
302
 
 
303
    def tearDown(self):
 
304
        plugin.getPlugins = self.original
 
305
 
 
306
 
 
307
    def parseOptions(self, args):
 
308
        self.config.parseOptions(args)
 
309
 
 
310
 
 
311
    def getRunner(self):
 
312
        r = trial._makeRunner(self.config)
 
313
        r.stream = StringIO.StringIO()
 
314
        # XXX The runner should always take care of cleaning this up itself.
 
315
        # It's not clear why this is necessary.  The runner always tears down
 
316
        # its log file.
 
317
        self.addCleanup(r._tearDownLogFile)
 
318
        # XXX The runner should always take care of cleaning this up itself as
 
319
        # well.  It's necessary because TrialRunner._setUpTestdir might raise
 
320
        # an exception preventing Reporter.done from being run, leaving the
 
321
        # observer added by Reporter.__init__ still present in the system.
 
322
        # Something better needs to happen inside
 
323
        # TrialRunner._runWithoutDecoration to remove the need for this cludge.
 
324
        r._log = log.LogPublisher()
 
325
        return r
 
326
 
 
327
 
 
328
    def test_runner_can_get_reporter(self):
 
329
        self.parseOptions([])
 
330
        result = self.config['reporter']
 
331
        runner = self.getRunner()
 
332
        self.assertEqual(result, runner._makeResult().__class__)
 
333
 
 
334
 
 
335
    def test_runner_get_result(self):
 
336
        self.parseOptions([])
 
337
        runner = self.getRunner()
 
338
        result = runner._makeResult()
 
339
        self.assertEqual(result.__class__, self.config['reporter'])
 
340
 
 
341
 
 
342
    def test_uncleanWarningsOffByDefault(self):
 
343
        """
 
344
        By default Trial sets the 'uncleanWarnings' option on the runner to
 
345
        False. This means that dirty reactor errors will be reported as
 
346
        errors. See L{test_reporter.TestDirtyReactor}.
 
347
        """
 
348
        self.parseOptions([])
 
349
        runner = self.getRunner()
 
350
        self.assertNotIsInstance(runner._makeResult(),
 
351
                                 reporter.UncleanWarningsReporterWrapper)
 
352
 
 
353
 
 
354
    def test_getsUncleanWarnings(self):
 
355
        """
 
356
        Specifying '--unclean-warnings' on the trial command line will cause
 
357
        reporters to be wrapped in a device which converts unclean errors to
 
358
        warnings.  See L{test_reporter.TestDirtyReactor} for implications.
 
359
        """
 
360
        self.parseOptions(['--unclean-warnings'])
 
361
        runner = self.getRunner()
 
362
        self.assertIsInstance(runner._makeResult(),
 
363
                              reporter.UncleanWarningsReporterWrapper)
 
364
 
 
365
 
 
366
    def test_runner_working_directory(self):
 
367
        self.parseOptions(['--temp-directory', 'some_path'])
 
368
        runner = self.getRunner()
 
369
        self.assertEquals(runner.workingDirectory, 'some_path')
 
370
 
 
371
 
 
372
    def test_concurrentImplicitWorkingDirectory(self):
 
373
        """
 
374
        If no working directory is explicitly specified and the default
 
375
        working directory is in use by another runner, L{TrialRunner.run}
 
376
        selects a different default working directory to use.
 
377
        """
 
378
        self.parseOptions([])
 
379
 
 
380
        initialDirectory = os.getcwd()
 
381
        self.addCleanup(os.chdir, initialDirectory)
 
382
 
 
383
        firstRunner = self.getRunner()
 
384
        secondRunner = self.getRunner()
 
385
 
 
386
        where = {}
 
387
 
 
388
        class ConcurrentCase(unittest.TestCase):
 
389
            def test_first(self):
 
390
                """
 
391
                Start a second test run which will have a default working
 
392
                directory which is the same as the working directory of the
 
393
                test run already in progress.
 
394
                """
 
395
                # Change the working directory to the value it had before this
 
396
                # test suite was started.
 
397
                where['concurrent'] = subsequentDirectory = os.getcwd()
 
398
                os.chdir(initialDirectory)
 
399
                self.addCleanup(os.chdir, subsequentDirectory)
 
400
 
 
401
                secondRunner.run(ConcurrentCase('test_second'))
 
402
 
 
403
            def test_second(self):
 
404
                """
 
405
                Record the working directory for later analysis.
 
406
                """
 
407
                where['record'] = os.getcwd()
 
408
 
 
409
        result = firstRunner.run(ConcurrentCase('test_first'))
 
410
        bad = result.errors + result.failures
 
411
        if bad:
 
412
            self.fail(bad[0][1])
 
413
        self.assertEqual(
 
414
            where, {
 
415
                'concurrent': os.path.join(initialDirectory, '_trial_temp'),
 
416
                'record': os.path.join(initialDirectory, '_trial_temp-1')})
 
417
 
 
418
 
 
419
    def test_concurrentExplicitWorkingDirectory(self):
 
420
        """
 
421
        If a working directory which is already in use is explicitly specified,
 
422
        L{TrialRunner.run} raises L{_WorkingDirectoryBusy}.
 
423
        """
 
424
        self.parseOptions(['--temp-directory', os.path.abspath(self.mktemp())])
 
425
 
 
426
        initialDirectory = os.getcwd()
 
427
        self.addCleanup(os.chdir, initialDirectory)
 
428
 
 
429
        firstRunner = self.getRunner()
 
430
        secondRunner = self.getRunner()
 
431
 
 
432
        class ConcurrentCase(unittest.TestCase):
 
433
            def test_concurrent(self):
 
434
                """
 
435
                Try to start another runner in the same working directory and
 
436
                assert that it raises L{_WorkingDirectoryBusy}.
 
437
                """
 
438
                self.assertRaises(
 
439
                    runner._WorkingDirectoryBusy,
 
440
                    secondRunner.run, ConcurrentCase('test_failure'))
 
441
 
 
442
            def test_failure(self):
 
443
                """
 
444
                Should not be called, always fails.
 
445
                """
 
446
                self.fail("test_failure should never be called.")
 
447
 
 
448
        result = firstRunner.run(ConcurrentCase('test_concurrent'))
 
449
        bad = result.errors + result.failures
 
450
        if bad:
 
451
            self.fail(bad[0][1])
 
452
 
 
453
 
 
454
    def test_runner_normal(self):
 
455
        self.parseOptions(['--temp-directory', self.mktemp(),
 
456
                           '--reporter', 'capturing',
 
457
                           'twisted.trial.test.sample'])
 
458
        my_runner = self.getRunner()
 
459
        loader = runner.TestLoader()
 
460
        suite = loader.loadByName('twisted.trial.test.sample', True)
 
461
        result = my_runner.run(suite)
 
462
        self.assertEqual(self.standardReport, result._calls)
 
463
 
 
464
 
 
465
    def test_runner_debug(self):
 
466
        self.parseOptions(['--reporter', 'capturing',
 
467
                           '--debug', 'twisted.trial.test.sample'])
 
468
        my_runner = self.getRunner()
 
469
        debugger = CapturingDebugger()
 
470
        def get_debugger():
 
471
            return debugger
 
472
        my_runner._getDebugger = get_debugger
 
473
        loader = runner.TestLoader()
 
474
        suite = loader.loadByName('twisted.trial.test.sample', True)
 
475
        result = my_runner.run(suite)
 
476
        self.assertEqual(self.standardReport, result._calls)
 
477
        self.assertEqual(['runcall'], debugger._calls)
 
478
 
 
479
 
 
480
    def test_removeSafelyNoTrialMarker(self):
 
481
        """
 
482
        If a path doesn't contain a node named C{"_trial_marker"}, that path is
 
483
        not removed by L{runner._removeSafely} and a L{runner._NoTrialMarker}
 
484
        exception is raised instead.
 
485
        """
 
486
        directory = self.mktemp()
 
487
        os.mkdir(directory)
 
488
        dirPath = filepath.FilePath(directory)
 
489
 
 
490
        self.parseOptions([])
 
491
        myRunner = self.getRunner()
 
492
        self.assertRaises(runner._NoTrialMarker,
 
493
                          myRunner._removeSafely, dirPath)
 
494
 
 
495
 
 
496
    def test_removeSafelyRemoveFailsMoveSucceeds(self):
 
497
        """
 
498
        If an L{OSError} is raised while removing a path in
 
499
        L{runner._removeSafely}, an attempt is made to move the path to a new
 
500
        name.
 
501
        """
 
502
        def dummyRemove():
 
503
            """
 
504
            Raise an C{OSError} to emulate the branch of L{runner._removeSafely}
 
505
            in which path removal fails.
 
506
            """
 
507
            raise OSError()
 
508
 
 
509
        # Patch stdout so we can check the print statements in _removeSafely
 
510
        out = StringIO.StringIO()
 
511
        stdout = self.patch(sys, 'stdout', out)
 
512
 
 
513
        # Set up a trial directory with a _trial_marker
 
514
        directory = self.mktemp()
 
515
        os.mkdir(directory)
 
516
        dirPath = filepath.FilePath(directory)
 
517
        dirPath.child('_trial_marker').touch()
 
518
        # Ensure that path.remove() raises an OSError
 
519
        dirPath.remove = dummyRemove
 
520
 
 
521
        self.parseOptions([])
 
522
        myRunner = self.getRunner()
 
523
        myRunner._removeSafely(dirPath)
 
524
        self.assertIn("could not remove FilePath", out.getvalue())
 
525
 
 
526
 
 
527
    def test_removeSafelyRemoveFailsMoveFails(self):
 
528
        """
 
529
        If an L{OSError} is raised while removing a path in
 
530
        L{runner._removeSafely}, an attempt is made to move the path to a new
 
531
        name. If that attempt fails, the L{OSError} is re-raised.
 
532
        """
 
533
        def dummyRemove():
 
534
            """
 
535
            Raise an C{OSError} to emulate the branch of L{runner._removeSafely}
 
536
            in which path removal fails.
 
537
            """
 
538
            raise OSError("path removal failed")
 
539
 
 
540
        def dummyMoveTo(path):
 
541
            """
 
542
            Raise an C{OSError} to emulate the branch of L{runner._removeSafely}
 
543
            in which path movement fails.
 
544
            """
 
545
            raise OSError("path movement failed")
 
546
 
 
547
        # Patch stdout so we can check the print statements in _removeSafely
 
548
        out = StringIO.StringIO()
 
549
        stdout = self.patch(sys, 'stdout', out)
 
550
 
 
551
        # Set up a trial directory with a _trial_marker
 
552
        directory = self.mktemp()
 
553
        os.mkdir(directory)
 
554
        dirPath = filepath.FilePath(directory)
 
555
        dirPath.child('_trial_marker').touch()
 
556
 
 
557
        # Ensure that path.remove() and path.moveTo() both raise OSErrors
 
558
        dirPath.remove = dummyRemove
 
559
        dirPath.moveTo = dummyMoveTo
 
560
 
 
561
        self.parseOptions([])
 
562
        myRunner = self.getRunner()
 
563
        error = self.assertRaises(OSError, myRunner._removeSafely, dirPath)
 
564
        self.assertEquals(str(error), "path movement failed")
 
565
        self.assertIn("could not remove FilePath", out.getvalue())
 
566
 
 
567
 
 
568
 
 
569
class TestTrialSuite(unittest.TestCase):
 
570
 
 
571
    def test_imports(self):
 
572
        # FIXME, HTF do you test the reactor can be cleaned up ?!!!
 
573
        from twisted.trial.runner import TrialSuite
 
574
        # silence pyflakes warning
 
575
        silencePyflakes = TrialSuite
 
576
 
 
577
 
 
578
 
 
579
 
 
580
class TestUntilFailure(unittest.TestCase):
 
581
    class FailAfter(unittest.TestCase):
 
582
        """
 
583
        A test  case that fails when run 3 times in a row.
 
584
        """
 
585
        count = []
 
586
        def test_foo(self):
 
587
            self.count.append(None)
 
588
            if len(self.count) == 3:
 
589
                self.fail('Count reached 3')
 
590
 
 
591
 
 
592
    def setUp(self):
 
593
        TestUntilFailure.FailAfter.count = []
 
594
        self.test = TestUntilFailure.FailAfter('test_foo')
 
595
        self.stream = StringIO.StringIO()
 
596
        self.runner = runner.TrialRunner(reporter.Reporter, stream=self.stream)
 
597
 
 
598
 
 
599
    def test_runUntilFailure(self):
 
600
        """
 
601
        Test that the runUntilFailure method of the runner actually fail after
 
602
        a few runs.
 
603
        """
 
604
        result = self.runner.runUntilFailure(self.test)
 
605
        self.failUnlessEqual(result.testsRun, 1)
 
606
        self.failIf(result.wasSuccessful())
 
607
        self.assertEquals(self._getFailures(result), 1)
 
608
 
 
609
 
 
610
    def _getFailures(self, result):
 
611
        """
 
612
        Get the number of failures that were reported to a result.
 
613
        """
 
614
        return len(result.failures)
 
615
 
 
616
 
 
617
    def test_runUntilFailureDecorate(self):
 
618
        """
 
619
        C{runUntilFailure} doesn't decorate the tests uselessly: it does it one
 
620
        time when run starts, but not at each turn.
 
621
        """
 
622
        decorated = []
 
623
        def decorate(test, interface):
 
624
            decorated.append((test, interface))
 
625
            return test
 
626
        self.patch(unittest, "decorate", decorate)
 
627
        result = self.runner.runUntilFailure(self.test)
 
628
        self.failUnlessEqual(result.testsRun, 1)
 
629
 
 
630
        self.assertEquals(len(decorated), 1)
 
631
        self.assertEquals(decorated, [(self.test, ITestCase)])
 
632
 
 
633
 
 
634
    def test_runUntilFailureForceGCDecorate(self):
 
635
        """
 
636
        C{runUntilFailure} applies the force-gc decoration after the standard
 
637
        L{ITestCase} decoration, but only one time.
 
638
        """
 
639
        decorated = []
 
640
        def decorate(test, interface):
 
641
            decorated.append((test, interface))
 
642
            return test
 
643
        self.patch(unittest, "decorate", decorate)
 
644
        self.runner._forceGarbageCollection = True
 
645
        result = self.runner.runUntilFailure(self.test)
 
646
        self.failUnlessEqual(result.testsRun, 1)
 
647
 
 
648
        self.assertEquals(len(decorated), 2)
 
649
        self.assertEquals(decorated,
 
650
            [(self.test, ITestCase),
 
651
             (self.test, unittest._ForceGarbageCollectionDecorator)])
 
652
 
 
653
 
 
654
 
 
655
class UncleanUntilFailureTests(TestUntilFailure):
 
656
    """
 
657
    Test that the run-until-failure feature works correctly with the unclean
 
658
    error suppressor.
 
659
    """
 
660
 
 
661
    def setUp(self):
 
662
        TestUntilFailure.setUp(self)
 
663
        self.runner = runner.TrialRunner(reporter.Reporter, stream=self.stream,
 
664
                                         uncleanWarnings=True)
 
665
 
 
666
    def _getFailures(self, result):
 
667
        """
 
668
        Get the number of failures that were reported to a result that
 
669
        is wrapped in an UncleanFailureWrapper.
 
670
        """
 
671
        return len(result._originalReporter.failures)
 
672
 
 
673
 
 
674
 
 
675
class BreakingSuite(runner.TestSuite):
 
676
    """
 
677
    A L{TestSuite} that logs an error when it is run.
 
678
    """
 
679
 
 
680
    def run(self, result):
 
681
        try:
 
682
            raise RuntimeError("error that occurs outside of a test")
 
683
        except RuntimeError, e:
 
684
            log.err(failure.Failure())
 
685
 
 
686
 
 
687
 
 
688
class TestLoggedErrors(unittest.TestCase):
 
689
    """
 
690
    It is possible for an error generated by a test to be logged I{outside} of
 
691
    any test. The log observers constructed by L{TestCase} won't catch these
 
692
    errors. Here we try to generate such errors and ensure they are reported to
 
693
    a L{TestResult} object.
 
694
    """
 
695
 
 
696
    def tearDown(self):
 
697
        self.flushLoggedErrors(RuntimeError)
 
698
 
 
699
 
 
700
    def test_construct(self):
 
701
        """
 
702
        Check that we can construct a L{runner.LoggedSuite} and that it
 
703
        starts empty.
 
704
        """
 
705
        suite = runner.LoggedSuite()
 
706
        self.assertEqual(suite.countTestCases(), 0)
 
707
 
 
708
 
 
709
    def test_capturesError(self):
 
710
        """
 
711
        Chek that a L{LoggedSuite} reports any logged errors to its result.
 
712
        """
 
713
        result = reporter.TestResult()
 
714
        suite = runner.LoggedSuite([BreakingSuite()])
 
715
        suite.run(result)
 
716
        self.assertEqual(len(result.errors), 1)
 
717
        self.assertEqual(result.errors[0][0].id(), runner.NOT_IN_TEST)
 
718
        self.failUnless(result.errors[0][1].check(RuntimeError))
 
719
 
 
720
 
 
721
 
 
722
class TestTestHolder(unittest.TestCase):
 
723
 
 
724
    def setUp(self):
 
725
        self.description = "description"
 
726
        self.holder = runner.TestHolder(self.description)
 
727
 
 
728
 
 
729
    def test_holder(self):
 
730
        """
 
731
        Check that L{runner.TestHolder} takes a description as a parameter
 
732
        and that this description is returned by the C{id} and
 
733
        C{shortDescription} methods.
 
734
        """
 
735
        self.assertEqual(self.holder.id(), self.description)
 
736
        self.assertEqual(self.holder.shortDescription(), self.description)
 
737
 
 
738
 
 
739
    def test_holderImplementsITestCase(self):
 
740
        """
 
741
        L{runner.TestHolder} implements L{ITestCase}.
 
742
        """
 
743
        self.assertIdentical(self.holder, ITestCase(self.holder))
 
744
 
 
745
 
 
746
 
 
747
class TestErrorHolder(TestTestHolder):
 
748
    """
 
749
    Test L{runner.ErrorHolder} shares behaviour with L{runner.TestHolder}.
 
750
    """
 
751
 
 
752
    def setUp(self):
 
753
        self.description = "description"
 
754
        # make a real Failure so we can construct ErrorHolder()
 
755
        try:
 
756
            1/0
 
757
        except ZeroDivisionError:
 
758
            error = failure.Failure()
 
759
        self.holder = runner.ErrorHolder(self.description, error)
 
760
 
 
761
 
 
762
 
 
763
class TestMalformedMethod(unittest.TestCase):
 
764
    """
 
765
    Test that trial manages when test methods don't have correct signatures.
 
766
    """
 
767
    class ContainMalformed(unittest.TestCase):
 
768
        """
 
769
        This TestCase holds malformed test methods that trial should handle.
 
770
        """
 
771
        def test_foo(self, blah):
 
772
            pass
 
773
        def test_bar():
 
774
            pass
 
775
        test_spam = defer.deferredGenerator(test_bar)
 
776
 
 
777
    def _test(self, method):
 
778
        """
 
779
        Wrapper for one of the test method of L{ContainMalformed}.
 
780
        """
 
781
        stream = StringIO.StringIO()
 
782
        trialRunner = runner.TrialRunner(reporter.Reporter, stream=stream)
 
783
        test = TestMalformedMethod.ContainMalformed(method)
 
784
        result = trialRunner.run(test)
 
785
        self.failUnlessEqual(result.testsRun, 1)
 
786
        self.failIf(result.wasSuccessful())
 
787
        self.failUnlessEqual(len(result.errors), 1)
 
788
 
 
789
    def test_extraArg(self):
 
790
        """
 
791
        Test when the method has extra (useless) arguments.
 
792
        """
 
793
        self._test('test_foo')
 
794
 
 
795
    def test_noArg(self):
 
796
        """
 
797
        Test when the method doesn't have even self as argument.
 
798
        """
 
799
        self._test('test_bar')
 
800
 
 
801
    def test_decorated(self):
 
802
        """
 
803
        Test a decorated method also fails.
 
804
        """
 
805
        self._test('test_spam')
 
806
 
 
807
 
 
808
 
 
809
class DestructiveTestSuiteTestCase(unittest.TestCase):
 
810
    """
 
811
    Test for L{runner.DestructiveTestSuite}.
 
812
    """
 
813
 
 
814
    def test_basic(self):
 
815
        """
 
816
        Thes destructive test suite should run the tests normally.
 
817
        """
 
818
        called = []
 
819
        class MockTest(unittest.TestCase):
 
820
            def test_foo(test):
 
821
                called.append(True)
 
822
        test = MockTest('test_foo')
 
823
        result = reporter.TestResult()
 
824
        suite = runner.DestructiveTestSuite([test])
 
825
        self.assertEquals(called, [])
 
826
        suite.run(result)
 
827
        self.assertEquals(called, [True])
 
828
        self.assertEquals(suite.countTestCases(), 0)
 
829
 
 
830
 
 
831
    def test_shouldStop(self):
 
832
        """
 
833
        Test the C{shouldStop} management: raising a C{KeyboardInterrupt} must
 
834
        interrupt the suite.
 
835
        """
 
836
        called = []
 
837
        class MockTest(unittest.TestCase):
 
838
            def test_foo1(test):
 
839
                called.append(1)
 
840
            def test_foo2(test):
 
841
                raise KeyboardInterrupt()
 
842
            def test_foo3(test):
 
843
                called.append(2)
 
844
        result = reporter.TestResult()
 
845
        loader = runner.TestLoader()
 
846
        loader.suiteFactory = runner.DestructiveTestSuite
 
847
        suite = loader.loadClass(MockTest)
 
848
        self.assertEquals(called, [])
 
849
        suite.run(result)
 
850
        self.assertEquals(called, [1])
 
851
        # The last test shouldn't have been run
 
852
        self.assertEquals(suite.countTestCases(), 1)
 
853
 
 
854
 
 
855
    def test_cleanup(self):
 
856
        """
 
857
        Checks that the test suite cleanups its tests during the run, so that
 
858
        it ends empty.
 
859
        """
 
860
        class MockTest(unittest.TestCase):
 
861
            def test_foo(test):
 
862
                pass
 
863
        test = MockTest('test_foo')
 
864
        result = reporter.TestResult()
 
865
        suite = runner.DestructiveTestSuite([test])
 
866
        self.assertEquals(suite.countTestCases(), 1)
 
867
        suite.run(result)
 
868
        self.assertEquals(suite.countTestCases(), 0)
 
869
 
 
870
 
 
871
 
 
872
class TestRunnerDeprecation(unittest.TestCase):
 
873
 
 
874
    class FakeReporter(reporter.Reporter):
 
875
        """
 
876
        Fake reporter that does *not* implement done() but *does* implement
 
877
        printErrors, separator, printSummary, stream, write and writeln
 
878
        without deprecations.
 
879
        """
 
880
 
 
881
        done = None
 
882
        separator = None
 
883
        stream = None
 
884
 
 
885
        def printErrors(self, *args):
 
886
            pass
 
887
 
 
888
        def printSummary(self, *args):
 
889
            pass
 
890
 
 
891
        def write(self, *args):
 
892
            pass
 
893
 
 
894
        def writeln(self, *args):
 
895
            pass
 
896
 
 
897
 
 
898
    def test_reporterDeprecations(self):
 
899
        """
 
900
        The runner emits a warning if it is using a result that doesn't
 
901
        implement 'done'.
 
902
        """
 
903
        trialRunner = runner.TrialRunner(None)
 
904
        result = self.FakeReporter()
 
905
        trialRunner._makeResult = lambda: result
 
906
        def f():
 
907
            # We have to use a pyunit test, otherwise we'll get deprecation
 
908
            # warnings about using iterate() in a test.
 
909
            trialRunner.run(pyunit.TestCase('id'))
 
910
        self.assertWarns(
 
911
            DeprecationWarning,
 
912
            "%s should implement done() but doesn't. Falling back to "
 
913
            "printErrors() and friends." % reflect.qual(result.__class__),
 
914
            __file__, f)