~ntt-pf-lab/nova/monkey_patch_notification

« back to all changes in this revision

Viewing changes to vendor/Twisted-10.0.0/twisted/trial/test/test_reporter.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (c) 2001-2009 Twisted Matrix Laboratories.
 
2
# See LICENSE for details.
 
3
#
 
4
# Maintainer: Jonathan Lange
 
5
 
 
6
"""
 
7
Tests for L{twisted.trial.reporter}.
 
8
"""
 
9
 
 
10
 
 
11
import errno, sys, os, re, StringIO
 
12
 
 
13
from twisted.internet.utils import suppressWarnings
 
14
from twisted.python import log
 
15
from twisted.python.failure import Failure
 
16
from twisted.trial import itrial, unittest, runner, reporter, util
 
17
from twisted.trial.reporter import UncleanWarningsReporterWrapper
 
18
from twisted.trial.test import erroneous
 
19
from twisted.trial.unittest import makeTodo, SkipTest, Todo
 
20
 
 
21
 
 
22
class BrokenStream(object):
 
23
    """
 
24
    Stream-ish object that raises a signal interrupt error. We use this to make
 
25
    sure that Trial still manages to write what it needs to write.
 
26
    """
 
27
    written = False
 
28
    flushed = False
 
29
 
 
30
    def __init__(self, fObj):
 
31
        self.fObj = fObj
 
32
 
 
33
    def write(self, s):
 
34
        if self.written:
 
35
            return self.fObj.write(s)
 
36
        self.written = True
 
37
        raise IOError(errno.EINTR, "Interrupted write")
 
38
 
 
39
    def flush(self):
 
40
        if self.flushed:
 
41
            return self.fObj.flush()
 
42
        self.flushed = True
 
43
        raise IOError(errno.EINTR, "Interrupted flush")
 
44
 
 
45
 
 
46
class StringTest(unittest.TestCase):
 
47
    def stringComparison(self, expect, output):
 
48
        output = filter(None, output)
 
49
        self.failUnless(len(expect) <= len(output),
 
50
                        "Must have more observed than expected"
 
51
                        "lines %d < %d" % (len(output), len(expect)))
 
52
        REGEX_PATTERN_TYPE = type(re.compile(''))
 
53
        for line_number, (exp, out) in enumerate(zip(expect, output)):
 
54
            if exp is None:
 
55
                continue
 
56
            elif isinstance(exp, str):
 
57
                self.assertSubstring(exp, out, "Line %d: %r not in %r"
 
58
                                     % (line_number, exp, out))
 
59
            elif isinstance(exp, REGEX_PATTERN_TYPE):
 
60
                self.failUnless(exp.match(out),
 
61
                                "Line %d: %r did not match string %r"
 
62
                                % (line_number, exp.pattern, out))
 
63
            else:
 
64
                raise TypeError("don't know what to do with object %r"
 
65
                                % (exp,))
 
66
 
 
67
 
 
68
class TestTestResult(unittest.TestCase):
 
69
    def setUp(self):
 
70
        self.result = reporter.TestResult()
 
71
 
 
72
    def test_pyunitAddError(self):
 
73
        # pyunit passes an exc_info tuple directly to addError
 
74
        try:
 
75
            raise RuntimeError('foo')
 
76
        except RuntimeError, excValue:
 
77
            self.result.addError(self, sys.exc_info())
 
78
        failure = self.result.errors[0][1]
 
79
        self.assertEqual(excValue, failure.value)
 
80
        self.assertEqual(RuntimeError, failure.type)
 
81
 
 
82
    def test_pyunitAddFailure(self):
 
83
        # pyunit passes an exc_info tuple directly to addFailure
 
84
        try:
 
85
            raise self.failureException('foo')
 
86
        except self.failureException, excValue:
 
87
            self.result.addFailure(self, sys.exc_info())
 
88
        failure = self.result.failures[0][1]
 
89
        self.assertEqual(excValue, failure.value)
 
90
        self.assertEqual(self.failureException, failure.type)
 
91
 
 
92
 
 
93
class TestReporterRealtime(TestTestResult):
 
94
    def setUp(self):
 
95
        output = StringIO.StringIO()
 
96
        self.result = reporter.Reporter(output, realtime=True)
 
97
 
 
98
 
 
99
class TestErrorReporting(StringTest):
 
100
    doubleSeparator = re.compile(r'^=+$')
 
101
 
 
102
    def setUp(self):
 
103
        self.loader = runner.TestLoader()
 
104
        self.output = StringIO.StringIO()
 
105
        self.result = reporter.Reporter(self.output)
 
106
 
 
107
    def getOutput(self, suite):
 
108
        result = self.getResult(suite)
 
109
        result.done()
 
110
        return self.output.getvalue()
 
111
 
 
112
    def getResult(self, suite):
 
113
        suite.run(self.result)
 
114
        return self.result
 
115
 
 
116
    def testFormatErroredMethod(self):
 
117
        suite = self.loader.loadClass(erroneous.TestFailureInSetUp)
 
118
        output = self.getOutput(suite).splitlines()
 
119
        match = [self.doubleSeparator,
 
120
                 ('[ERROR]: twisted.trial.test.erroneous.'
 
121
                  'TestFailureInSetUp.test_noop'),
 
122
                 'Traceback (most recent call last):',
 
123
                 re.compile(r'^\s+File .*erroneous\.py., line \d+, in setUp$'),
 
124
                 re.compile(r'^\s+raise FoolishError, '
 
125
                            r'.I am a broken setUp method.$'),
 
126
                 ('twisted.trial.test.erroneous.FoolishError: '
 
127
                  'I am a broken setUp method')]
 
128
        self.stringComparison(match, output)
 
129
 
 
130
    def testFormatFailedMethod(self):
 
131
        suite = self.loader.loadMethod(erroneous.TestRegularFail.test_fail)
 
132
        output = self.getOutput(suite).splitlines()
 
133
        match = [
 
134
            self.doubleSeparator,
 
135
            '[FAIL]: '
 
136
            'twisted.trial.test.erroneous.TestRegularFail.test_fail',
 
137
            'Traceback (most recent call last):',
 
138
            re.compile(r'^\s+File .*erroneous\.py., line \d+, in test_fail$'),
 
139
            re.compile(r'^\s+self\.fail\("I fail"\)$'),
 
140
            'twisted.trial.unittest.FailTest: I fail'
 
141
            ]
 
142
        self.stringComparison(match, output)
 
143
 
 
144
    def testDoctestError(self):
 
145
        from twisted.trial.test import erroneous
 
146
        suite = unittest.decorate(
 
147
            self.loader.loadDoctests(erroneous), itrial.ITestCase)
 
148
        output = self.getOutput(suite)
 
149
        path = 'twisted.trial.test.erroneous.unexpectedException'
 
150
        for substring in ['1/0', 'ZeroDivisionError',
 
151
                          'Exception raised:', path]:
 
152
            self.assertSubstring(substring, output)
 
153
        self.failUnless(re.search('Fail(ed|ure in) example:', output),
 
154
                        "Couldn't match 'Failure in example: ' "
 
155
                        "or 'Failed example: '")
 
156
        expect = [self.doubleSeparator,
 
157
                  re.compile(r'\[(ERROR|FAIL)\]: .*' + re.escape(path))]
 
158
        self.stringComparison(expect, output.splitlines())
 
159
 
 
160
    def testHiddenException(self):
 
161
        """
 
162
        Check that errors in C{DelayedCall}s get reported, even if the
 
163
        test already has a failure.
 
164
 
 
165
        Only really necessary for testing the deprecated style of tests that
 
166
        use iterate() directly. See
 
167
        L{erroneous.DelayedCall.testHiddenException} for more details.
 
168
        """
 
169
        test = erroneous.DelayedCall('testHiddenException')
 
170
        output = self.getOutput(test).splitlines()
 
171
        match = [
 
172
            self.doubleSeparator,
 
173
            '[FAIL]: '
 
174
            'twisted.trial.test.erroneous.DelayedCall.testHiddenException',
 
175
            'Traceback (most recent call last):',
 
176
            re.compile(r'^\s+File .*erroneous\.py., line \d+, in '
 
177
                       'testHiddenException$'),
 
178
            re.compile(r'^\s+self\.fail\("Deliberate failure to mask the '
 
179
                       'hidden exception"\)$'),
 
180
            'twisted.trial.unittest.FailTest: '
 
181
            'Deliberate failure to mask the hidden exception',
 
182
            self.doubleSeparator,
 
183
            '[ERROR]: '
 
184
            'twisted.trial.test.erroneous.DelayedCall.testHiddenException',
 
185
            'Traceback (most recent call last):',
 
186
            re.compile(r'^\s+File .* in runUntilCurrent'),
 
187
            re.compile(r'^\s+.*'),
 
188
            re.compile('^\s+File .*erroneous\.py", line \d+, in go'),
 
189
            re.compile('^\s+raise RuntimeError\(self.hiddenExceptionMsg\)'),
 
190
            'exceptions.RuntimeError: something blew up',
 
191
            ]
 
192
 
 
193
        self.stringComparison(match, output)
 
194
 
 
195
 
 
196
 
 
197
class TestUncleanWarningWrapperErrorReporting(TestErrorReporting):
 
198
    """
 
199
    Tests that the L{UncleanWarningsReporterWrapper} can sufficiently proxy
 
200
    IReporter failure and error reporting methods to a L{reporter.Reporter}.
 
201
    """
 
202
    def setUp(self):
 
203
        self.loader = runner.TestLoader()
 
204
        self.output = StringIO.StringIO()
 
205
        self.result = UncleanWarningsReporterWrapper(
 
206
            reporter.Reporter(self.output))
 
207
 
 
208
 
 
209
 
 
210
class TracebackHandling(unittest.TestCase):
 
211
    def getErrorFrames(self, test):
 
212
        stream = StringIO.StringIO()
 
213
        result = reporter.Reporter(stream)
 
214
        test.run(result)
 
215
        bads = result.failures + result.errors
 
216
        assert len(bads) == 1
 
217
        assert bads[0][0] == test
 
218
        return result._trimFrames(bads[0][1].frames)
 
219
 
 
220
    def checkFrames(self, observedFrames, expectedFrames):
 
221
        for observed, expected in zip(observedFrames, expectedFrames):
 
222
            self.assertEqual(observed[0], expected[0])
 
223
            observedSegs = os.path.splitext(observed[1])[0].split(os.sep)
 
224
            expectedSegs = expected[1].split('/')
 
225
            self.assertEqual(observedSegs[-len(expectedSegs):],
 
226
                             expectedSegs)
 
227
        self.assertEqual(len(observedFrames), len(expectedFrames))
 
228
 
 
229
    def test_basic(self):
 
230
        test = erroneous.TestRegularFail('test_fail')
 
231
        frames = self.getErrorFrames(test)
 
232
        self.checkFrames(frames,
 
233
                         [('test_fail', 'twisted/trial/test/erroneous')])
 
234
 
 
235
    def test_subroutine(self):
 
236
        test = erroneous.TestRegularFail('test_subfail')
 
237
        frames = self.getErrorFrames(test)
 
238
        self.checkFrames(frames,
 
239
                         [('test_subfail', 'twisted/trial/test/erroneous'),
 
240
                          ('subroutine', 'twisted/trial/test/erroneous')])
 
241
 
 
242
    def test_deferred(self):
 
243
        test = erroneous.TestFailureInDeferredChain('test_fail')
 
244
        frames = self.getErrorFrames(test)
 
245
        self.checkFrames(frames,
 
246
                         [('_later', 'twisted/trial/test/erroneous')])
 
247
 
 
248
    def test_noFrames(self):
 
249
        result = reporter.Reporter(None)
 
250
        self.assertEqual([], result._trimFrames([]))
 
251
 
 
252
    def test_oneFrame(self):
 
253
        result = reporter.Reporter(None)
 
254
        self.assertEqual(['fake frame'], result._trimFrames(['fake frame']))
 
255
 
 
256
 
 
257
class FormatFailures(StringTest):
 
258
    def setUp(self):
 
259
        try:
 
260
            raise RuntimeError('foo')
 
261
        except RuntimeError:
 
262
            self.f = Failure()
 
263
        self.f.frames = [
 
264
            ['foo', 'foo/bar.py', 5, [('x', 5)], [('y', 'orange')]],
 
265
            ['qux', 'foo/bar.py', 10, [('a', 'two')], [('b', 'MCMXCIX')]]
 
266
            ]
 
267
        self.stream = StringIO.StringIO()
 
268
        self.result = reporter.Reporter(self.stream)
 
269
 
 
270
    def test_formatDefault(self):
 
271
        tb = self.result._formatFailureTraceback(self.f)
 
272
        self.stringComparison([
 
273
            'Traceback (most recent call last):',
 
274
            '  File "foo/bar.py", line 5, in foo',
 
275
            re.compile(r'^\s*$'),
 
276
            '  File "foo/bar.py", line 10, in qux',
 
277
            re.compile(r'^\s*$'),
 
278
            'RuntimeError: foo'], tb.splitlines())
 
279
 
 
280
    def test_formatString(self):
 
281
        tb = '''
 
282
  File "twisted/trial/unittest.py", line 256, in failUnlessSubstring
 
283
    return self.failUnlessIn(substring, astring, msg)
 
284
exceptions.TypeError: iterable argument required
 
285
 
 
286
'''
 
287
        expected = '''
 
288
  File "twisted/trial/unittest.py", line 256, in failUnlessSubstring
 
289
    return self.failUnlessIn(substring, astring, msg)
 
290
exceptions.TypeError: iterable argument required
 
291
'''
 
292
        formatted = self.result._formatFailureTraceback(tb)
 
293
        self.assertEqual(expected, formatted)
 
294
 
 
295
    def test_mutation(self):
 
296
        frames = self.f.frames[:]
 
297
        tb = self.result._formatFailureTraceback(self.f)
 
298
        self.assertEqual(self.f.frames, frames)
 
299
 
 
300
 
 
301
class PyunitTestNames(unittest.TestCase):
 
302
    def setUp(self):
 
303
        from twisted.trial.test import sample
 
304
        self.stream = StringIO.StringIO()
 
305
        self.test = sample.PyunitTest('test_foo')
 
306
 
 
307
    def test_verboseReporter(self):
 
308
        result = reporter.VerboseTextReporter(self.stream)
 
309
        result.startTest(self.test)
 
310
        output = self.stream.getvalue()
 
311
        self.failUnlessEqual(
 
312
            output, 'twisted.trial.test.sample.PyunitTest.test_foo ... ')
 
313
 
 
314
    def test_treeReporter(self):
 
315
        result = reporter.TreeReporter(self.stream)
 
316
        result.startTest(self.test)
 
317
        output = self.stream.getvalue()
 
318
        output = output.splitlines()[-1].strip()
 
319
        self.failUnlessEqual(output, result.getDescription(self.test) + ' ...')
 
320
 
 
321
    def test_getDescription(self):
 
322
        result = reporter.TreeReporter(self.stream)
 
323
        output = result.getDescription(self.test)
 
324
        self.failUnlessEqual(output, 'test_foo')
 
325
 
 
326
 
 
327
    def test_minimalReporter(self):
 
328
        """
 
329
        The summary of L{reporter.MinimalReporter} is a simple list of numbers,
 
330
        indicating how many tests ran, how many failed etc.
 
331
 
 
332
        The numbers represents:
 
333
         * the run time of the tests
 
334
         * the number of tests run, printed 2 times for legacy reasons
 
335
         * the number of errors
 
336
         * the number of failures
 
337
         * the number of skips
 
338
        """
 
339
        result = reporter.MinimalReporter(self.stream)
 
340
        self.test.run(result)
 
341
        result._printSummary()
 
342
        output = self.stream.getvalue().strip().split(' ')
 
343
        self.failUnlessEqual(output[1:], ['1', '1', '0', '0', '0'])
 
344
 
 
345
 
 
346
    def test_minimalReporterTime(self):
 
347
        """
 
348
        L{reporter.MinimalReporter} reports the time to run the tests as first
 
349
        data in its output.
 
350
        """
 
351
        times = [1.0, 1.2, 1.5, 1.9]
 
352
        result = reporter.MinimalReporter(self.stream)
 
353
        result._getTime = lambda: times.pop(0)
 
354
        self.test.run(result)
 
355
        result._printSummary()
 
356
        output = self.stream.getvalue().strip().split(' ')
 
357
        timer = output[0]
 
358
        self.assertEquals(timer, "0.7")
 
359
 
 
360
 
 
361
    def test_emptyMinimalReporter(self):
 
362
        """
 
363
        The summary of L{reporter.MinimalReporter} is a list of zeroes when no
 
364
        test is actually run.
 
365
        """
 
366
        result = reporter.MinimalReporter(self.stream)
 
367
        result._printSummary()
 
368
        output = self.stream.getvalue().strip().split(' ')
 
369
        self.failUnlessEqual(output, ['0', '0', '0', '0', '0', '0'])
 
370
 
 
371
 
 
372
 
 
373
class TestDirtyReactor(unittest.TestCase):
 
374
    """
 
375
    The trial script has an option to treat L{DirtyReactorAggregateError}s as
 
376
    warnings, as a migration tool for test authors. It causes a wrapper to be
 
377
    placed around reporters that replaces L{DirtyReactorAggregatErrors} with
 
378
    warnings.
 
379
    """
 
380
 
 
381
    def setUp(self):
 
382
        self.dirtyError = Failure(
 
383
            util.DirtyReactorAggregateError(['foo'], ['bar']))
 
384
        self.output = StringIO.StringIO()
 
385
        self.test = TestDirtyReactor('test_errorByDefault')
 
386
 
 
387
 
 
388
    def test_errorByDefault(self):
 
389
        """
 
390
        C{DirtyReactorAggregateError}s are reported as errors with the default
 
391
        Reporter.
 
392
        """
 
393
        result = reporter.Reporter(stream=self.output)
 
394
        result.addError(self.test, self.dirtyError)
 
395
        self.assertEqual(len(result.errors), 1)
 
396
        self.assertEqual(result.errors[0][1], self.dirtyError)
 
397
 
 
398
 
 
399
    def test_warningsEnabled(self):
 
400
        """
 
401
        C{DirtyReactorErrors}s are reported as warnings when using the
 
402
        L{UncleanWarningsReporterWrapper}.
 
403
        """
 
404
        result = UncleanWarningsReporterWrapper(
 
405
            reporter.Reporter(stream=self.output))
 
406
        self.assertWarns(UserWarning, self.dirtyError.getErrorMessage(),
 
407
                         reporter.__file__,
 
408
                         result.addError, self.test, self.dirtyError)
 
409
 
 
410
 
 
411
    def test_warningsMaskErrors(self):
 
412
        """
 
413
        C{DirtyReactorErrors}s are I{not} reported as errors if the
 
414
        L{UncleanWarningsReporterWrapper} is used.
 
415
        """
 
416
        result = UncleanWarningsReporterWrapper(
 
417
            reporter.Reporter(stream=self.output))
 
418
        self.assertWarns(UserWarning, self.dirtyError.getErrorMessage(),
 
419
                         reporter.__file__,
 
420
                         result.addError, self.test, self.dirtyError)
 
421
        self.assertEquals(result._originalReporter.errors, [])
 
422
 
 
423
 
 
424
    def test_dealsWithThreeTuples(self):
 
425
        """
 
426
        Some annoying stuff can pass three-tuples to addError instead of
 
427
        Failures (like PyUnit). The wrapper, of course, handles this case,
 
428
        since it is a part of L{twisted.trial.itrial.IReporter}! But it does
 
429
        not convert L{DirtyReactorError} to warnings in this case, because
 
430
        nobody should be passing those in the form of three-tuples.
 
431
        """
 
432
        result = UncleanWarningsReporterWrapper(
 
433
            reporter.Reporter(stream=self.output))
 
434
        result.addError(self.test,
 
435
                        (self.dirtyError.type, self.dirtyError.value, None))
 
436
        self.assertEqual(len(result._originalReporter.errors), 1)
 
437
        self.assertEquals(result._originalReporter.errors[0][1].type,
 
438
                          self.dirtyError.type)
 
439
        self.assertEquals(result._originalReporter.errors[0][1].value,
 
440
                          self.dirtyError.value)
 
441
 
 
442
 
 
443
 
 
444
class TrialTestNames(unittest.TestCase):
 
445
 
 
446
    def setUp(self):
 
447
        from twisted.trial.test import sample
 
448
        self.stream = StringIO.StringIO()
 
449
        self.test = sample.FooTest('test_foo')
 
450
 
 
451
    def test_verboseReporter(self):
 
452
        result = reporter.VerboseTextReporter(self.stream)
 
453
        result.startTest(self.test)
 
454
        output = self.stream.getvalue()
 
455
        self.failUnlessEqual(output, self.test.id() + ' ... ')
 
456
 
 
457
    def test_treeReporter(self):
 
458
        result = reporter.TreeReporter(self.stream)
 
459
        result.startTest(self.test)
 
460
        output = self.stream.getvalue()
 
461
        output = output.splitlines()[-1].strip()
 
462
        self.failUnlessEqual(output, result.getDescription(self.test) + ' ...')
 
463
 
 
464
    def test_treeReporterWithDocstrings(self):
 
465
        """A docstring"""
 
466
        result = reporter.TreeReporter(self.stream)
 
467
        self.assertEqual(result.getDescription(self),
 
468
                         'test_treeReporterWithDocstrings')
 
469
 
 
470
    def test_getDescription(self):
 
471
        result = reporter.TreeReporter(self.stream)
 
472
        output = result.getDescription(self.test)
 
473
        self.failUnlessEqual(output, "test_foo")
 
474
 
 
475
 
 
476
class TestSkip(unittest.TestCase):
 
477
    """
 
478
    Tests for L{reporter.Reporter}'s handling of skips.
 
479
    """
 
480
    def setUp(self):
 
481
        from twisted.trial.test import sample
 
482
        self.stream = StringIO.StringIO()
 
483
        self.result = reporter.Reporter(self.stream)
 
484
        self.test = sample.FooTest('test_foo')
 
485
 
 
486
    def _getSkips(self, result):
 
487
        """
 
488
        Get the number of skips that happened to a reporter.
 
489
        """
 
490
        return len(result.skips)
 
491
 
 
492
    def test_accumulation(self):
 
493
        self.result.addSkip(self.test, 'some reason')
 
494
        self.assertEqual(self._getSkips(self.result), 1)
 
495
 
 
496
    def test_success(self):
 
497
        self.result.addSkip(self.test, 'some reason')
 
498
        self.failUnlessEqual(True, self.result.wasSuccessful())
 
499
 
 
500
 
 
501
    def test_summary(self):
 
502
        """
 
503
        The summary of a successful run with skips indicates that the test
 
504
        suite passed and includes the number of skips.
 
505
        """
 
506
        self.result.addSkip(self.test, 'some reason')
 
507
        self.result.done()
 
508
        output = self.stream.getvalue().splitlines()[-1]
 
509
        prefix = 'PASSED '
 
510
        self.failUnless(output.startswith(prefix))
 
511
        self.failUnlessEqual(output[len(prefix):].strip(), '(skips=1)')
 
512
 
 
513
 
 
514
    def test_basicErrors(self):
 
515
        """
 
516
        The output at the end of a test run with skips includes the reasons
 
517
        for skipping those tests.
 
518
        """
 
519
        self.result.addSkip(self.test, 'some reason')
 
520
        self.result.done()
 
521
        output = self.stream.getvalue().splitlines()[4]
 
522
        self.failUnlessEqual(output.strip(), 'some reason')
 
523
 
 
524
 
 
525
    def test_booleanSkip(self):
 
526
        """
 
527
        Tests can be skipped without specifying a reason by setting the 'skip'
 
528
        attribute to True. When this happens, the test output includes 'True'
 
529
        as the reason.
 
530
        """
 
531
        self.result.addSkip(self.test, True)
 
532
        self.result.done()
 
533
        output = self.stream.getvalue().splitlines()[4]
 
534
        self.failUnlessEqual(output, 'True')
 
535
 
 
536
 
 
537
    def test_exceptionSkip(self):
 
538
        """
 
539
        Skips can be raised as errors. When this happens, the error is
 
540
        included in the summary at the end of the test suite.
 
541
        """
 
542
        try:
 
543
            1/0
 
544
        except Exception, e:
 
545
            error = e
 
546
        self.result.addSkip(self.test, error)
 
547
        self.result.done()
 
548
        output = '\n'.join(self.stream.getvalue().splitlines()[3:5]).strip()
 
549
        self.failUnlessEqual(output, str(e))
 
550
 
 
551
 
 
552
class UncleanWarningSkipTest(TestSkip):
 
553
    """
 
554
    Tests for skips on a L{reporter.Reporter} wrapped by an
 
555
    L{UncleanWarningsReporterWrapper}.
 
556
    """
 
557
    def setUp(self):
 
558
        TestSkip.setUp(self)
 
559
        self.result = UncleanWarningsReporterWrapper(self.result)
 
560
 
 
561
    def _getSkips(self, result):
 
562
        """
 
563
        Get the number of skips that happened to a reporter inside of an
 
564
        unclean warnings reporter wrapper.
 
565
        """
 
566
        return len(result._originalReporter.skips)
 
567
 
 
568
 
 
569
 
 
570
class TodoTest(unittest.TestCase):
 
571
    """
 
572
    Tests for L{reporter.Reporter}'s handling of todos.
 
573
    """
 
574
 
 
575
    def setUp(self):
 
576
        from twisted.trial.test import sample
 
577
        self.stream = StringIO.StringIO()
 
578
        self.result = reporter.Reporter(self.stream)
 
579
        self.test = sample.FooTest('test_foo')
 
580
 
 
581
 
 
582
    def _getTodos(self, result):
 
583
        """
 
584
        Get the number of todos that happened to a reporter.
 
585
        """
 
586
        return len(result.expectedFailures)
 
587
 
 
588
 
 
589
    def _getUnexpectedSuccesses(self, result):
 
590
        """
 
591
        Get the number of unexpected successes that happened to a reporter.
 
592
        """
 
593
        return len(result.unexpectedSuccesses)
 
594
 
 
595
 
 
596
    def test_accumulation(self):
 
597
        """
 
598
        L{reporter.Reporter} accumulates the expected failures that it
 
599
        is notified of.
 
600
        """
 
601
        self.result.addExpectedFailure(self.test, Failure(Exception()),
 
602
                                       makeTodo('todo!'))
 
603
        self.assertEqual(self._getTodos(self.result), 1)
 
604
 
 
605
 
 
606
    def test_success(self):
 
607
        """
 
608
        A test run is still successful even if there are expected failures.
 
609
        """
 
610
        self.result.addExpectedFailure(self.test, Failure(Exception()),
 
611
                                       makeTodo('todo!'))
 
612
        self.assertEqual(True, self.result.wasSuccessful())
 
613
 
 
614
 
 
615
    def test_unexpectedSuccess(self):
 
616
        """
 
617
        A test which is marked as todo but succeeds will have an unexpected
 
618
        success reported to its result. A test run is still successful even
 
619
        when this happens.
 
620
        """
 
621
        self.result.addUnexpectedSuccess(self.test, makeTodo("Heya!"))
 
622
        self.assertEqual(True, self.result.wasSuccessful())
 
623
        self.assertEqual(self._getUnexpectedSuccesses(self.result), 1)
 
624
 
 
625
 
 
626
    def test_summary(self):
 
627
        """
 
628
        The reporter's C{printSummary} method should print the number of
 
629
        expected failures that occured.
 
630
        """
 
631
        self.result.addExpectedFailure(self.test, Failure(Exception()),
 
632
                                       makeTodo('some reason'))
 
633
        self.result.done()
 
634
        output = self.stream.getvalue().splitlines()[-1]
 
635
        prefix = 'PASSED '
 
636
        self.failUnless(output.startswith(prefix))
 
637
        self.assertEqual(output[len(prefix):].strip(),
 
638
                         '(expectedFailures=1)')
 
639
 
 
640
 
 
641
    def test_basicErrors(self):
 
642
        """
 
643
        The reporter's L{printErrors} method should include the value of the
 
644
        Todo.
 
645
        """
 
646
        self.result.addExpectedFailure(self.test, Failure(Exception()),
 
647
                                       makeTodo('some reason'))
 
648
        self.result.done()
 
649
        output = self.stream.getvalue().splitlines()[4].strip()
 
650
        self.assertEqual(output, "Reason: 'some reason'")
 
651
 
 
652
 
 
653
    def test_booleanTodo(self):
 
654
        """
 
655
        Booleans CAN'T be used as the value of a todo. Maybe this sucks. This
 
656
        is a test for current behavior, not a requirement.
 
657
        """
 
658
        self.result.addExpectedFailure(self.test, Failure(Exception()),
 
659
                                       makeTodo(True))
 
660
        self.assertRaises(Exception, self.result.done)
 
661
 
 
662
 
 
663
    def test_exceptionTodo(self):
 
664
        """
 
665
        The exception for expected failures should be shown in the
 
666
        C{printErrors} output.
 
667
        """
 
668
        try:
 
669
            1/0
 
670
        except Exception, e:
 
671
            error = e
 
672
        self.result.addExpectedFailure(self.test, Failure(error),
 
673
                                       makeTodo("todo!"))
 
674
        self.result.done()
 
675
        output = '\n'.join(self.stream.getvalue().splitlines()[3:]).strip()
 
676
        self.assertTrue(str(e) in output)
 
677
 
 
678
 
 
679
 
 
680
class UncleanWarningTodoTest(TodoTest):
 
681
    """
 
682
    Tests for L{UncleanWarningsReporterWrapper}'s handling of todos.
 
683
    """
 
684
 
 
685
    def setUp(self):
 
686
        TodoTest.setUp(self)
 
687
        self.result = UncleanWarningsReporterWrapper(self.result)
 
688
 
 
689
 
 
690
    def _getTodos(self, result):
 
691
        """
 
692
        Get the number of todos that happened to a reporter inside of an
 
693
        unclean warnings reporter wrapper.
 
694
        """
 
695
        return len(result._originalReporter.expectedFailures)
 
696
 
 
697
 
 
698
    def _getUnexpectedSuccesses(self, result):
 
699
        """
 
700
        Get the number of unexpected successes that happened to a reporter
 
701
        inside of an unclean warnings reporter wrapper.
 
702
        """
 
703
        return len(result._originalReporter.unexpectedSuccesses)
 
704
 
 
705
 
 
706
 
 
707
class MockColorizer:
 
708
    """
 
709
    Used by TestTreeReporter to make sure that output is colored correctly.
 
710
    """
 
711
 
 
712
    def __init__(self, stream):
 
713
        self.log = []
 
714
 
 
715
 
 
716
    def write(self, text, color):
 
717
        self.log.append((color, text))
 
718
 
 
719
 
 
720
 
 
721
class TestTreeReporter(unittest.TestCase):
 
722
    def setUp(self):
 
723
        from twisted.trial.test import sample
 
724
        self.test = sample.FooTest('test_foo')
 
725
        self.stream = StringIO.StringIO()
 
726
        self.result = reporter.TreeReporter(self.stream)
 
727
        self.result._colorizer = MockColorizer(self.stream)
 
728
        self.log = self.result._colorizer.log
 
729
 
 
730
    def makeError(self):
 
731
        try:
 
732
            1/0
 
733
        except ZeroDivisionError:
 
734
            f = Failure()
 
735
        return f
 
736
 
 
737
    def test_cleanupError(self):
 
738
        """
 
739
        Run cleanupErrors and check that the output is correct, and colored
 
740
        correctly.
 
741
        """
 
742
        f = self.makeError()
 
743
        self.result.cleanupErrors(f)
 
744
        color, text = self.log[0]
 
745
        self.assertEqual(color.strip(), self.result.ERROR)
 
746
        self.assertEqual(text.strip(), 'cleanup errors')
 
747
        color, text = self.log[1]
 
748
        self.assertEqual(color.strip(), self.result.ERROR)
 
749
        self.assertEqual(text.strip(), '[ERROR]')
 
750
    test_cleanupError = suppressWarnings(
 
751
        test_cleanupError,
 
752
        util.suppress(category=reporter.BrokenTestCaseWarning),
 
753
        util.suppress(category=DeprecationWarning))
 
754
 
 
755
 
 
756
    def test_upDownError(self):
 
757
        """
 
758
        Run upDownError and check that the output is correct and colored
 
759
        correctly.
 
760
        """
 
761
        self.result.upDownError("method", None, None, False)
 
762
        color, text = self.log[0]
 
763
        self.assertEqual(color.strip(), self.result.ERROR)
 
764
        self.assertEqual(text.strip(), 'method')
 
765
    test_upDownError = suppressWarnings(
 
766
        test_upDownError,
 
767
        util.suppress(category=DeprecationWarning,
 
768
                      message="upDownError is deprecated in Twisted 8.0."))
 
769
 
 
770
 
 
771
    def test_summaryColoredSuccess(self):
 
772
        """
 
773
        The summary in case of success should have a good count of successes
 
774
        and be colored properly.
 
775
        """
 
776
        self.result.addSuccess(self.test)
 
777
        self.result.done()
 
778
        self.assertEquals(self.log[1], (self.result.SUCCESS, 'PASSED'))
 
779
        self.assertEquals(
 
780
            self.stream.getvalue().splitlines()[-1].strip(), "(successes=1)")
 
781
 
 
782
 
 
783
    def test_summaryColoredFailure(self):
 
784
        """
 
785
        The summary in case of failure should have a good count of errors
 
786
        and be colored properly.
 
787
        """
 
788
        try:
 
789
            raise RuntimeError('foo')
 
790
        except RuntimeError, excValue:
 
791
            self.result.addError(self, sys.exc_info())
 
792
        self.result.done()
 
793
        self.assertEquals(self.log[1], (self.result.FAILURE, 'FAILED'))
 
794
        self.assertEquals(
 
795
            self.stream.getvalue().splitlines()[-1].strip(), "(errors=1)")
 
796
 
 
797
 
 
798
    def test_getPrelude(self):
 
799
        """
 
800
        The tree needs to get the segments of the test ID that correspond
 
801
        to the module and class that it belongs to.
 
802
        """
 
803
        self.assertEqual(
 
804
            ['foo.bar', 'baz'],
 
805
            self.result._getPreludeSegments('foo.bar.baz.qux'))
 
806
        self.assertEqual(
 
807
            ['foo', 'bar'],
 
808
            self.result._getPreludeSegments('foo.bar.baz'))
 
809
        self.assertEqual(
 
810
            ['foo'],
 
811
            self.result._getPreludeSegments('foo.bar'))
 
812
        self.assertEqual([], self.result._getPreludeSegments('foo'))
 
813
 
 
814
 
 
815
 
 
816
class TestReporterInterface(unittest.TestCase):
 
817
    """
 
818
    Tests for the bare interface of a trial reporter.
 
819
 
 
820
    Subclass this test case and provide a different 'resultFactory' to test
 
821
    that a particular reporter implementation will work with the rest of
 
822
    Trial.
 
823
 
 
824
    @cvar resultFactory: A callable that returns a reporter to be tested. The
 
825
        callable must take the same parameters as L{reporter.Reporter}.
 
826
    """
 
827
 
 
828
    resultFactory = reporter.Reporter
 
829
 
 
830
    def setUp(self):
 
831
        from twisted.trial.test import sample
 
832
        self.test = sample.FooTest('test_foo')
 
833
        self.stream = StringIO.StringIO()
 
834
        self.publisher = log.LogPublisher()
 
835
        self.result = self.resultFactory(self.stream, publisher=self.publisher)
 
836
 
 
837
 
 
838
    def test_shouldStopInitiallyFalse(self):
 
839
        """
 
840
        shouldStop is False to begin with.
 
841
        """
 
842
        self.assertEquals(False, self.result.shouldStop)
 
843
 
 
844
 
 
845
    def test_shouldStopTrueAfterStop(self):
 
846
        """
 
847
        shouldStop becomes True soon as someone calls stop().
 
848
        """
 
849
        self.result.stop()
 
850
        self.assertEquals(True, self.result.shouldStop)
 
851
 
 
852
 
 
853
    def test_wasSuccessfulInitiallyTrue(self):
 
854
        """
 
855
        wasSuccessful() is True when there have been no results reported.
 
856
        """
 
857
        self.assertEquals(True, self.result.wasSuccessful())
 
858
 
 
859
 
 
860
    def test_wasSuccessfulTrueAfterSuccesses(self):
 
861
        """
 
862
        wasSuccessful() is True when there have been only successes, False
 
863
        otherwise.
 
864
        """
 
865
        self.result.addSuccess(self.test)
 
866
        self.assertEquals(True, self.result.wasSuccessful())
 
867
 
 
868
 
 
869
    def test_wasSuccessfulFalseAfterErrors(self):
 
870
        """
 
871
        wasSuccessful() becomes False after errors have been reported.
 
872
        """
 
873
        try:
 
874
            1 / 0
 
875
        except ZeroDivisionError:
 
876
            self.result.addError(self.test, sys.exc_info())
 
877
        self.assertEquals(False, self.result.wasSuccessful())
 
878
 
 
879
 
 
880
    def test_wasSuccessfulFalseAfterFailures(self):
 
881
        """
 
882
        wasSuccessful() becomes False after failures have been reported.
 
883
        """
 
884
        try:
 
885
            self.fail("foo")
 
886
        except self.failureException:
 
887
            self.result.addFailure(self.test, sys.exc_info())
 
888
        self.assertEquals(False, self.result.wasSuccessful())
 
889
 
 
890
 
 
891
 
 
892
class TestReporter(TestReporterInterface):
 
893
    """
 
894
    Tests for the base L{reporter.Reporter} class.
 
895
    """
 
896
 
 
897
    def setUp(self):
 
898
        TestReporterInterface.setUp(self)
 
899
        self._timer = 0
 
900
        self.result._getTime = self._getTime
 
901
 
 
902
 
 
903
    def _getTime(self):
 
904
        self._timer += 1
 
905
        return self._timer
 
906
 
 
907
 
 
908
    def test_startStop(self):
 
909
        self.result.startTest(self.test)
 
910
        self.result.stopTest(self.test)
 
911
        self.assertTrue(self.result._lastTime > 0)
 
912
        self.assertEqual(self.result.testsRun, 1)
 
913
        self.assertEqual(self.result.wasSuccessful(), True)
 
914
 
 
915
 
 
916
    def test_brokenStream(self):
 
917
        """
 
918
        Test that the reporter safely writes to its stream.
 
919
        """
 
920
        result = self.resultFactory(stream=BrokenStream(self.stream))
 
921
        result._writeln("Hello")
 
922
        self.assertEqual(self.stream.getvalue(), 'Hello\n')
 
923
        self.stream.truncate(0)
 
924
        result._writeln("Hello %s!", 'World')
 
925
        self.assertEqual(self.stream.getvalue(), 'Hello World!\n')
 
926
 
 
927
 
 
928
    def test_printErrorsDeprecated(self):
 
929
        """
 
930
        L{IReporter.printErrors} was deprecated in Twisted 8.0.
 
931
        """
 
932
        def f():
 
933
            self.result.printErrors()
 
934
        self.assertWarns(
 
935
            DeprecationWarning, "printErrors is deprecated in Twisted 8.0.",
 
936
            __file__, f)
 
937
 
 
938
 
 
939
    def test_printSummaryDeprecated(self):
 
940
        """
 
941
        L{IReporter.printSummary} was deprecated in Twisted 8.0.
 
942
        """
 
943
        def f():
 
944
            self.result.printSummary()
 
945
        self.assertWarns(
 
946
            DeprecationWarning, "printSummary is deprecated in Twisted 8.0.",
 
947
            __file__, f)
 
948
 
 
949
 
 
950
    def test_writeDeprecated(self):
 
951
        """
 
952
        L{IReporter.write} was deprecated in Twisted 8.0.
 
953
        """
 
954
        def f():
 
955
            self.result.write("")
 
956
        self.assertWarns(
 
957
            DeprecationWarning, "write is deprecated in Twisted 8.0.",
 
958
            __file__, f)
 
959
 
 
960
 
 
961
    def test_writelnDeprecated(self):
 
962
        """
 
963
        L{IReporter.writeln} was deprecated in Twisted 8.0.
 
964
        """
 
965
        def f():
 
966
            self.result.writeln("")
 
967
        self.assertWarns(
 
968
            DeprecationWarning, "writeln is deprecated in Twisted 8.0.",
 
969
            __file__, f)
 
970
 
 
971
 
 
972
    def test_separatorDeprecated(self):
 
973
        """
 
974
        L{IReporter.separator} was deprecated in Twisted 8.0.
 
975
        """
 
976
        def f():
 
977
            return self.result.separator
 
978
        self.assertWarns(
 
979
            DeprecationWarning, "separator is deprecated in Twisted 8.0.",
 
980
            __file__, f)
 
981
 
 
982
 
 
983
    def test_streamDeprecated(self):
 
984
        """
 
985
        L{IReporter.stream} was deprecated in Twisted 8.0.
 
986
        """
 
987
        def f():
 
988
            return self.result.stream
 
989
        self.assertWarns(
 
990
            DeprecationWarning, "stream is deprecated in Twisted 8.0.",
 
991
            __file__, f)
 
992
 
 
993
 
 
994
    def test_upDownErrorDeprecated(self):
 
995
        """
 
996
        L{IReporter.upDownError} was deprecated in Twisted 8.0.
 
997
        """
 
998
        def f():
 
999
            self.result.upDownError(None, None, None, None)
 
1000
        self.assertWarns(
 
1001
            DeprecationWarning, "upDownError is deprecated in Twisted 8.0.",
 
1002
            __file__, f)
 
1003
 
 
1004
 
 
1005
    def test_warning(self):
 
1006
        """
 
1007
        L{reporter.Reporter} observes warnings emitted by the Twisted log
 
1008
        system and writes them to its output stream.
 
1009
        """
 
1010
        message = RuntimeWarning("some warning text")
 
1011
        category = 'exceptions.RuntimeWarning'
 
1012
        filename = "path/to/some/file.py"
 
1013
        lineno = 71
 
1014
        self.publisher.msg(
 
1015
            warning=message, category=category,
 
1016
            filename=filename, lineno=lineno)
 
1017
        self.assertEqual(
 
1018
            self.stream.getvalue(),
 
1019
            "%s:%d: %s: %s\n" % (
 
1020
                filename, lineno, category.split('.')[-1], message))
 
1021
 
 
1022
 
 
1023
    def test_duplicateWarningSuppressed(self):
 
1024
        """
 
1025
        A warning emitted twice within a single test is only written to the
 
1026
        stream once.
 
1027
        """
 
1028
        # Emit the warning and assert that it shows up
 
1029
        self.test_warning()
 
1030
        # Emit the warning again and assert that the stream still only has one
 
1031
        # warning on it.
 
1032
        self.test_warning()
 
1033
 
 
1034
 
 
1035
    def test_warningEmittedForNewTest(self):
 
1036
        """
 
1037
        A warning emitted again after a new test has started is written to the
 
1038
        stream again.
 
1039
        """
 
1040
        test = self.__class__('test_warningEmittedForNewTest')
 
1041
        self.result.startTest(test)
 
1042
 
 
1043
        # Clear whatever startTest wrote to the stream
 
1044
        self.stream.seek(0)
 
1045
        self.stream.truncate()
 
1046
 
 
1047
        # Emit a warning (and incidentally, assert that it was emitted)
 
1048
        self.test_warning()
 
1049
 
 
1050
        # Clean up from the first warning to simplify the rest of the
 
1051
        # assertions.
 
1052
        self.stream.seek(0)
 
1053
        self.stream.truncate()
 
1054
 
 
1055
        # Stop the first test and start another one (it just happens to be the
 
1056
        # same one, but that doesn't matter)
 
1057
        self.result.stopTest(test)
 
1058
        self.result.startTest(test)
 
1059
 
 
1060
        # Clean up the stopTest/startTest output
 
1061
        self.stream.seek(0)
 
1062
        self.stream.truncate()
 
1063
 
 
1064
        # Emit the warning again and make sure it shows up
 
1065
        self.test_warning()
 
1066
 
 
1067
 
 
1068
    def test_stopObserving(self):
 
1069
        """
 
1070
        L{reporter.Reporter} stops observing log events when its C{done} method
 
1071
        is called.
 
1072
        """
 
1073
        self.result.done()
 
1074
        self.stream.seek(0)
 
1075
        self.stream.truncate()
 
1076
        self.publisher.msg(
 
1077
            warning=RuntimeWarning("some message"),
 
1078
            category='exceptions.RuntimeWarning',
 
1079
            filename="file/name.py", lineno=17)
 
1080
        self.assertEqual(self.stream.getvalue(), "")
 
1081
 
 
1082
 
 
1083
 
 
1084
class TestSafeStream(unittest.TestCase):
 
1085
    def test_safe(self):
 
1086
        """
 
1087
        Test that L{reporter.SafeStream} successfully write to its original
 
1088
        stream even if an interrupt happens during the write.
 
1089
        """
 
1090
        stream = StringIO.StringIO()
 
1091
        broken = BrokenStream(stream)
 
1092
        safe = reporter.SafeStream(broken)
 
1093
        safe.write("Hello")
 
1094
        self.assertEqual(stream.getvalue(), "Hello")
 
1095
 
 
1096
 
 
1097
 
 
1098
class TestSubunitReporter(TestReporterInterface):
 
1099
    """
 
1100
    Tests for the subunit reporter.
 
1101
 
 
1102
    This just tests that the subunit reporter implements the basic interface.
 
1103
    """
 
1104
 
 
1105
    resultFactory = reporter.SubunitReporter
 
1106
 
 
1107
 
 
1108
    def setUp(self):
 
1109
        if reporter.TestProtocolClient is None:
 
1110
            raise SkipTest(
 
1111
                "Subunit not installed, cannot test SubunitReporter")
 
1112
        TestReporterInterface.setUp(self)
 
1113
 
 
1114
 
 
1115
    def assertForwardsToSubunit(self, methodName, *args, **kwargs):
 
1116
        """
 
1117
        Assert that 'methodName' on L{SubunitReporter} forwards to the
 
1118
        equivalent method on subunit.
 
1119
 
 
1120
        Checks that the return value from subunit is returned from the
 
1121
        L{SubunitReporter} and that the reporter writes the same data to its
 
1122
        stream as subunit does to its own.
 
1123
 
 
1124
        Assumes that the method on subunit has the same name as the method on
 
1125
        L{SubunitReporter}.
 
1126
        """
 
1127
        stream = StringIO.StringIO()
 
1128
        subunitClient = reporter.TestProtocolClient(stream)
 
1129
        subunitReturn = getattr(subunitClient, methodName)(*args, **kwargs)
 
1130
        subunitOutput = stream.getvalue()
 
1131
        reporterReturn = getattr(self.result, methodName)(*args, **kwargs)
 
1132
        self.assertEquals(subunitReturn, reporterReturn)
 
1133
        self.assertEquals(subunitOutput, self.stream.getvalue())
 
1134
 
 
1135
 
 
1136
    def test_subunitWithoutAddExpectedFailureInstalled(self):
 
1137
        """
 
1138
        Some versions of subunit don't have "addExpectedFailure". For these
 
1139
        versions, we report expected failures as successes.
 
1140
        """
 
1141
        try:
 
1142
            addExpectedFailure = reporter.TestProtocolClient.addExpectedFailure
 
1143
        except AttributeError:
 
1144
            # Then we've actually got one of those old versions installed, and
 
1145
            # the test is immediately applicable.
 
1146
            pass
 
1147
        else:
 
1148
            del reporter.TestProtocolClient.addExpectedFailure
 
1149
            self.addCleanup(
 
1150
                setattr, reporter.TestProtocolClient, 'addExpectedFailure',
 
1151
                addExpectedFailure)
 
1152
        try:
 
1153
            1 / 0
 
1154
        except ZeroDivisionError:
 
1155
            self.result.addExpectedFailure(self.test, sys.exc_info(), "todo")
 
1156
        expectedFailureOutput = self.stream.getvalue()
 
1157
        self.stream.truncate(0)
 
1158
        self.result.addSuccess(self.test)
 
1159
        successOutput = self.stream.getvalue()
 
1160
        self.assertEquals(successOutput, expectedFailureOutput)
 
1161
 
 
1162
 
 
1163
    def test_subunitWithoutAddSkipInstalled(self):
 
1164
        """
 
1165
        Some versions of subunit don't have "addSkip". For these versions, we
 
1166
        report skips as successes.
 
1167
        """
 
1168
        try:
 
1169
            addSkip = reporter.TestProtocolClient.addSkip
 
1170
        except AttributeError:
 
1171
            # Then we've actually got one of those old versions installed, and
 
1172
            # the test is immediately applicable.
 
1173
            pass
 
1174
        else:
 
1175
            del reporter.TestProtocolClient.addSkip
 
1176
            self.addCleanup(
 
1177
                setattr, reporter.TestProtocolClient, 'addSkip', addSkip)
 
1178
        self.result.addSkip(self.test, "reason")
 
1179
        skipOutput = self.stream.getvalue()
 
1180
        self.stream.truncate(0)
 
1181
        self.result.addSuccess(self.test)
 
1182
        successOutput = self.stream.getvalue()
 
1183
        self.assertEquals(successOutput, skipOutput)
 
1184
 
 
1185
 
 
1186
    def test_addExpectedFailurePassedThrough(self):
 
1187
        """
 
1188
        Some versions of subunit have "addExpectedFailure". For these
 
1189
        versions, when we call 'addExpectedFailure' on the test result, we
 
1190
        pass the error and test through to the subunit client.
 
1191
        """
 
1192
        addExpectedFailureCalls = []
 
1193
        def addExpectedFailure(test, error):
 
1194
            addExpectedFailureCalls.append((test, error))
 
1195
 
 
1196
        # Provide our own addExpectedFailure, whether or not the locally
 
1197
        # installed subunit has addExpectedFailure.
 
1198
        self.result._subunit.addExpectedFailure = addExpectedFailure
 
1199
        try:
 
1200
            1 / 0
 
1201
        except ZeroDivisionError:
 
1202
            exc_info = sys.exc_info()
 
1203
            self.result.addExpectedFailure(self.test, exc_info, 'todo')
 
1204
        self.assertEquals(addExpectedFailureCalls, [(self.test, exc_info)])
 
1205
 
 
1206
 
 
1207
    def test_addSkipSendsSubunitAddSkip(self):
 
1208
        """
 
1209
        Some versions of subunit have "addSkip". For these versions, when we
 
1210
        call 'addSkip' on the test result, we pass the test and reason through
 
1211
        to the subunit client.
 
1212
        """
 
1213
        addSkipCalls = []
 
1214
        def addSkip(test, reason):
 
1215
            addSkipCalls.append((test, reason))
 
1216
 
 
1217
        # Provide our own addSkip, whether or not the locally-installed
 
1218
        # subunit has addSkip.
 
1219
        self.result._subunit.addSkip = addSkip
 
1220
        self.result.addSkip(self.test, 'reason')
 
1221
        self.assertEquals(addSkipCalls, [(self.test, 'reason')])
 
1222
 
 
1223
 
 
1224
    def test_doneDoesNothing(self):
 
1225
        """
 
1226
        The subunit reporter doesn't need to print out a summary -- the stream
 
1227
        of results is everything. Thus, done() does nothing.
 
1228
        """
 
1229
        self.result.done()
 
1230
        self.assertEquals('', self.stream.getvalue())
 
1231
 
 
1232
 
 
1233
    def test_startTestSendsSubunitStartTest(self):
 
1234
        """
 
1235
        SubunitReporter.startTest() sends the subunit 'startTest' message.
 
1236
        """
 
1237
        self.assertForwardsToSubunit('startTest', self.test)
 
1238
 
 
1239
 
 
1240
    def test_stopTestSendsSubunitStopTest(self):
 
1241
        """
 
1242
        SubunitReporter.stopTest() sends the subunit 'stopTest' message.
 
1243
        """
 
1244
        self.assertForwardsToSubunit('stopTest', self.test)
 
1245
 
 
1246
 
 
1247
    def test_addSuccessSendsSubunitAddSuccess(self):
 
1248
        """
 
1249
        SubunitReporter.addSuccess() sends the subunit 'addSuccess' message.
 
1250
        """
 
1251
        self.assertForwardsToSubunit('addSuccess', self.test)
 
1252
 
 
1253
 
 
1254
    def test_addErrorSendsSubunitAddError(self):
 
1255
        """
 
1256
        SubunitReporter.addError() sends the subunit 'addError' message.
 
1257
        """
 
1258
        try:
 
1259
            1 / 0
 
1260
        except ZeroDivisionError:
 
1261
            error = sys.exc_info()
 
1262
        self.assertForwardsToSubunit('addError', self.test, error)
 
1263
 
 
1264
 
 
1265
    def test_addFailureSendsSubunitAddFailure(self):
 
1266
        """
 
1267
        SubunitReporter.addFailure() sends the subunit 'addFailure' message.
 
1268
        """
 
1269
        try:
 
1270
            self.fail('hello')
 
1271
        except self.failureException:
 
1272
            failure = sys.exc_info()
 
1273
        self.assertForwardsToSubunit('addFailure', self.test, failure)
 
1274
 
 
1275
 
 
1276
    def test_addUnexpectedSuccessSendsSubunitAddSuccess(self):
 
1277
        """
 
1278
        SubunitReporter.addFailure() sends the subunit 'addSuccess' message,
 
1279
        since subunit doesn't model unexpected success.
 
1280
        """
 
1281
        stream = StringIO.StringIO()
 
1282
        subunitClient = reporter.TestProtocolClient(stream)
 
1283
        subunitClient.addSuccess(self.test)
 
1284
        subunitOutput = stream.getvalue()
 
1285
        self.result.addUnexpectedSuccess(self.test, 'todo')
 
1286
        self.assertEquals(subunitOutput, self.stream.getvalue())
 
1287
 
 
1288
 
 
1289
 
 
1290
class TestSubunitReporterNotInstalled(unittest.TestCase):
 
1291
    """
 
1292
    Test behaviour when the subunit reporter is not installed.
 
1293
    """
 
1294
 
 
1295
    def test_subunitNotInstalled(self):
 
1296
        """
 
1297
        If subunit is not installed, TestProtocolClient will be None, and
 
1298
        SubunitReporter will raise an error when you try to construct it.
 
1299
        """
 
1300
        stream = StringIO.StringIO()
 
1301
        self.patch(reporter, 'TestProtocolClient', None)
 
1302
        e = self.assertRaises(Exception, reporter.SubunitReporter, stream)
 
1303
        self.assertEquals("Subunit not available", str(e))
 
1304
 
 
1305
 
 
1306
 
 
1307
class TestTimingReporter(TestReporter):
 
1308
    resultFactory = reporter.TimingTextReporter
 
1309
 
 
1310
 
 
1311
 
 
1312
class LoggingReporter(reporter.Reporter):
 
1313
    """
 
1314
    Simple reporter that stores the last test that was passed to it.
 
1315
    """
 
1316
 
 
1317
    def __init__(self, *args, **kwargs):
 
1318
        reporter.Reporter.__init__(self, *args, **kwargs)
 
1319
        self.test = None
 
1320
 
 
1321
    def addError(self, test, error):
 
1322
        self.test = test
 
1323
 
 
1324
    def addExpectedFailure(self, test, failure, todo):
 
1325
        self.test = test
 
1326
 
 
1327
    def addFailure(self, test, failure):
 
1328
        self.test = test
 
1329
 
 
1330
    def addSkip(self, test, skip):
 
1331
        self.test = test
 
1332
 
 
1333
    def addUnexpectedSuccess(self, test, todo):
 
1334
        self.test = test
 
1335
 
 
1336
    def startTest(self, test):
 
1337
        self.test = test
 
1338
 
 
1339
    def stopTest(self, test):
 
1340
        self.test = test
 
1341
 
 
1342
 
 
1343
 
 
1344
class TestAdaptedReporter(unittest.TestCase):
 
1345
    """
 
1346
    L{reporter._AdaptedReporter} is a reporter wrapper that wraps all of the
 
1347
    tests it receives before passing them on to the original reporter.
 
1348
    """
 
1349
 
 
1350
    def setUp(self):
 
1351
        self.wrappedResult = self.getWrappedResult()
 
1352
 
 
1353
 
 
1354
    def _testAdapter(self, test):
 
1355
        return test.id()
 
1356
 
 
1357
 
 
1358
    def assertWrapped(self, wrappedResult, test):
 
1359
        self.assertEqual(wrappedResult._originalReporter.test, self._testAdapter(test))
 
1360
 
 
1361
 
 
1362
    def getFailure(self, exceptionInstance):
 
1363
        """
 
1364
        Return a L{Failure} from raising the given exception.
 
1365
 
 
1366
        @param exceptionInstance: The exception to raise.
 
1367
        @return: L{Failure}
 
1368
        """
 
1369
        try:
 
1370
            raise exceptionInstance
 
1371
        except:
 
1372
            return Failure()
 
1373
 
 
1374
 
 
1375
    def getWrappedResult(self):
 
1376
        result = LoggingReporter()
 
1377
        return reporter._AdaptedReporter(result, self._testAdapter)
 
1378
 
 
1379
 
 
1380
    def test_addError(self):
 
1381
        """
 
1382
        C{addError} wraps its test with the provided adapter.
 
1383
        """
 
1384
        self.wrappedResult.addError(self, self.getFailure(RuntimeError()))
 
1385
        self.assertWrapped(self.wrappedResult, self)
 
1386
 
 
1387
 
 
1388
    def test_addFailure(self):
 
1389
        """
 
1390
        C{addFailure} wraps its test with the provided adapter.
 
1391
        """
 
1392
        self.wrappedResult.addFailure(self, self.getFailure(AssertionError()))
 
1393
        self.assertWrapped(self.wrappedResult, self)
 
1394
 
 
1395
 
 
1396
    def test_addSkip(self):
 
1397
        """
 
1398
        C{addSkip} wraps its test with the provided adapter.
 
1399
        """
 
1400
        self.wrappedResult.addSkip(self, self.getFailure(SkipTest('no reason')))
 
1401
        self.assertWrapped(self.wrappedResult, self)
 
1402
 
 
1403
 
 
1404
    def test_startTest(self):
 
1405
        """
 
1406
        C{startTest} wraps its test with the provided adapter.
 
1407
        """
 
1408
        self.wrappedResult.startTest(self)
 
1409
        self.assertWrapped(self.wrappedResult, self)
 
1410
 
 
1411
 
 
1412
    def test_stopTest(self):
 
1413
        """
 
1414
        C{stopTest} wraps its test with the provided adapter.
 
1415
        """
 
1416
        self.wrappedResult.stopTest(self)
 
1417
        self.assertWrapped(self.wrappedResult, self)
 
1418
 
 
1419
 
 
1420
    def test_addExpectedFailure(self):
 
1421
        """
 
1422
        C{addExpectedFailure} wraps its test with the provided adapter.
 
1423
        """
 
1424
        self.wrappedResult.addExpectedFailure(
 
1425
            self, self.getFailure(RuntimeError()), Todo("no reason"))
 
1426
        self.assertWrapped(self.wrappedResult, self)
 
1427
 
 
1428
 
 
1429
    def test_addUnexpectedSuccess(self):
 
1430
        """
 
1431
        C{addUnexpectedSuccess} wraps its test with the provided adapter.
 
1432
        """
 
1433
        self.wrappedResult.addUnexpectedSuccess(self, Todo("no reason"))
 
1434
        self.assertWrapped(self.wrappedResult, self)
 
1435
 
 
1436
 
 
1437
 
 
1438
class FakeStream(object):
 
1439
    """
 
1440
    A fake stream which C{isatty} method returns some predictable.
 
1441
 
 
1442
    @ivar tty: returned value of C{isatty}.
 
1443
    @type tty: C{bool}
 
1444
    """
 
1445
 
 
1446
    def __init__(self, tty=True):
 
1447
        self.tty = tty
 
1448
 
 
1449
 
 
1450
    def isatty(self):
 
1451
        return self.tty
 
1452
 
 
1453
 
 
1454
 
 
1455
class AnsiColorizerTests(unittest.TestCase):
 
1456
    """
 
1457
    Tests for L{reporter._AnsiColorizer}.
 
1458
    """
 
1459
 
 
1460
    def setUp(self):
 
1461
        self.savedModules = sys.modules.copy()
 
1462
 
 
1463
 
 
1464
    def tearDown(self):
 
1465
        sys.modules.clear()
 
1466
        sys.modules.update(self.savedModules)
 
1467
 
 
1468
 
 
1469
    def test_supportedStdOutTTY(self):
 
1470
        """
 
1471
        L{reporter._AnsiColorizer.supported} returns C{False} if the given
 
1472
        stream is not a TTY.
 
1473
        """
 
1474
        self.assertFalse(reporter._AnsiColorizer.supported(FakeStream(False)))
 
1475
 
 
1476
 
 
1477
    def test_supportedNoCurses(self):
 
1478
        """
 
1479
        L{reporter._AnsiColorizer.supported} returns C{False} if the curses
 
1480
        module can't be imported.
 
1481
        """
 
1482
        sys.modules['curses'] = None
 
1483
        self.assertFalse(reporter._AnsiColorizer.supported(FakeStream()))
 
1484
 
 
1485
 
 
1486
    def test_supportedSetupTerm(self):
 
1487
        """
 
1488
        L{reporter._AnsiColorizer.supported} returns C{True} if
 
1489
        C{curses.tigetnum} returns more than 2 supported colors. It only tries
 
1490
        to call C{curses.setupterm} if C{curses.tigetnum} previously failed
 
1491
        with a C{curses.error}.
 
1492
        """
 
1493
        class fakecurses(object):
 
1494
            error = RuntimeError
 
1495
            setUp = 0
 
1496
 
 
1497
            def setupterm(self):
 
1498
                self.setUp += 1
 
1499
 
 
1500
            def tigetnum(self, value):
 
1501
                if self.setUp:
 
1502
                    return 3
 
1503
                else:
 
1504
                    raise self.error()
 
1505
 
 
1506
        sys.modules['curses'] = fakecurses()
 
1507
        self.assertTrue(reporter._AnsiColorizer.supported(FakeStream()))
 
1508
        self.assertTrue(reporter._AnsiColorizer.supported(FakeStream()))
 
1509
 
 
1510
        self.assertEquals(sys.modules['curses'].setUp, 1)
 
1511
 
 
1512
 
 
1513
    def test_supportedTigetNumWrongError(self):
 
1514
        """
 
1515
        L{reporter._AnsiColorizer.supported} returns C{False} and doesn't try
 
1516
        to call C{curses.setupterm} if C{curses.tigetnum} returns something
 
1517
        different than C{curses.error}.
 
1518
        """
 
1519
        class fakecurses(object):
 
1520
            error = RuntimeError
 
1521
 
 
1522
            def tigetnum(self, value):
 
1523
                raise ValueError()
 
1524
 
 
1525
        sys.modules['curses'] = fakecurses()
 
1526
        self.assertFalse(reporter._AnsiColorizer.supported(FakeStream()))
 
1527
 
 
1528
 
 
1529
    def test_supportedTigetNumNotEnoughColor(self):
 
1530
        """
 
1531
        L{reporter._AnsiColorizer.supported} returns C{False} if
 
1532
        C{curses.tigetnum} returns less than 2 supported colors.
 
1533
        """
 
1534
        class fakecurses(object):
 
1535
            error = RuntimeError
 
1536
 
 
1537
            def tigetnum(self, value):
 
1538
                return 1
 
1539
 
 
1540
        sys.modules['curses'] = fakecurses()
 
1541
        self.assertFalse(reporter._AnsiColorizer.supported(FakeStream()))
 
1542
 
 
1543
 
 
1544
    def test_supportedTigetNumErrors(self):
 
1545
        """
 
1546
        L{reporter._AnsiColorizer.supported} returns C{False} if
 
1547
        C{curses.tigetnum} raises an error, and calls C{curses.setupterm} once.
 
1548
        """
 
1549
        class fakecurses(object):
 
1550
            error = RuntimeError
 
1551
            setUp = 0
 
1552
 
 
1553
            def setupterm(self):
 
1554
                self.setUp += 1
 
1555
 
 
1556
            def tigetnum(self, value):
 
1557
                raise self.error()
 
1558
 
 
1559
        sys.modules['curses'] = fakecurses()
 
1560
        self.assertFalse(reporter._AnsiColorizer.supported(FakeStream()))
 
1561
        self.assertEquals(sys.modules['curses'].setUp, 1)