~cbehrens/nova/lp844160-build-works-with-zones

« back to all changes in this revision

Viewing changes to vendor/Twisted-10.0.0/twisted/trial/reporter.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# -*- test-case-name: twisted.trial.test.test_reporter -*-
 
2
# Copyright (c) 2001-2009 Twisted Matrix Laboratories.
 
3
# See LICENSE for details.
 
4
#
 
5
# Maintainer: Jonathan Lange
 
6
 
 
7
"""
 
8
Defines classes that handle the results of tests.
 
9
"""
 
10
 
 
11
import sys, os
 
12
import time
 
13
import warnings
 
14
 
 
15
from twisted.python.compat import set
 
16
from twisted.python import reflect, log
 
17
from twisted.python.components import proxyForInterface
 
18
from twisted.python.failure import Failure
 
19
from twisted.python.util import untilConcludes
 
20
from twisted.trial import itrial, util
 
21
 
 
22
try:
 
23
    from subunit import TestProtocolClient
 
24
except ImportError:
 
25
    TestProtocolClient = None
 
26
from zope.interface import implements
 
27
 
 
28
pyunit = __import__('unittest')
 
29
 
 
30
 
 
31
class BrokenTestCaseWarning(Warning):
 
32
    """
 
33
    Emitted as a warning when an exception occurs in one of setUp or tearDown.
 
34
    """
 
35
 
 
36
 
 
37
class SafeStream(object):
 
38
    """
 
39
    Wraps a stream object so that all C{write} calls are wrapped in
 
40
    L{untilConcludes}.
 
41
    """
 
42
 
 
43
    def __init__(self, original):
 
44
        self.original = original
 
45
 
 
46
    def __getattr__(self, name):
 
47
        return getattr(self.original, name)
 
48
 
 
49
    def write(self, *a, **kw):
 
50
        return untilConcludes(self.original.write, *a, **kw)
 
51
 
 
52
 
 
53
class TestResult(pyunit.TestResult, object):
 
54
    """
 
55
    Accumulates the results of several L{twisted.trial.unittest.TestCase}s.
 
56
 
 
57
    @ivar successes: count the number of successes achieved by the test run.
 
58
    @type successes: C{int}
 
59
    """
 
60
    implements(itrial.IReporter)
 
61
 
 
62
    def __init__(self):
 
63
        super(TestResult, self).__init__()
 
64
        self.skips = []
 
65
        self.expectedFailures = []
 
66
        self.unexpectedSuccesses = []
 
67
        self.successes = 0
 
68
        self._timings = []
 
69
 
 
70
    def __repr__(self):
 
71
        return ('<%s run=%d errors=%d failures=%d todos=%d dones=%d skips=%d>'
 
72
                % (reflect.qual(self.__class__), self.testsRun,
 
73
                   len(self.errors), len(self.failures),
 
74
                   len(self.expectedFailures), len(self.skips),
 
75
                   len(self.unexpectedSuccesses)))
 
76
 
 
77
    def _getTime(self):
 
78
        return time.time()
 
79
 
 
80
    def _getFailure(self, error):
 
81
        """
 
82
        Convert a C{sys.exc_info()}-style tuple to a L{Failure}, if necessary.
 
83
        """
 
84
        if isinstance(error, tuple):
 
85
            return Failure(error[1], error[0], error[2])
 
86
        return error
 
87
 
 
88
    def startTest(self, test):
 
89
        """
 
90
        This must be called before the given test is commenced.
 
91
 
 
92
        @type test: L{pyunit.TestCase}
 
93
        """
 
94
        super(TestResult, self).startTest(test)
 
95
        self._testStarted = self._getTime()
 
96
 
 
97
    def stopTest(self, test):
 
98
        """
 
99
        This must be called after the given test is completed.
 
100
 
 
101
        @type test: L{pyunit.TestCase}
 
102
        """
 
103
        super(TestResult, self).stopTest(test)
 
104
        self._lastTime = self._getTime() - self._testStarted
 
105
 
 
106
    def addFailure(self, test, fail):
 
107
        """
 
108
        Report a failed assertion for the given test.
 
109
 
 
110
        @type test: L{pyunit.TestCase}
 
111
        @type fail: L{Failure} or L{tuple}
 
112
        """
 
113
        self.failures.append((test, self._getFailure(fail)))
 
114
 
 
115
    def addError(self, test, error):
 
116
        """
 
117
        Report an error that occurred while running the given test.
 
118
 
 
119
        @type test: L{pyunit.TestCase}
 
120
        @type error: L{Failure} or L{tuple}
 
121
        """
 
122
        self.errors.append((test, self._getFailure(error)))
 
123
 
 
124
    def addSkip(self, test, reason):
 
125
        """
 
126
        Report that the given test was skipped.
 
127
 
 
128
        In Trial, tests can be 'skipped'. Tests are skipped mostly because there
 
129
        is some platform or configuration issue that prevents them from being
 
130
        run correctly.
 
131
 
 
132
        @type test: L{pyunit.TestCase}
 
133
        @type reason: L{str}
 
134
        """
 
135
        self.skips.append((test, reason))
 
136
 
 
137
    def addUnexpectedSuccess(self, test, todo):
 
138
        """Report that the given test succeeded against expectations.
 
139
 
 
140
        In Trial, tests can be marked 'todo'. That is, they are expected to fail.
 
141
        When a test that is expected to fail instead succeeds, it should call
 
142
        this method to report the unexpected success.
 
143
 
 
144
        @type test: L{pyunit.TestCase}
 
145
        @type todo: L{unittest.Todo}
 
146
        """
 
147
        # XXX - 'todo' should just be a string
 
148
        self.unexpectedSuccesses.append((test, todo))
 
149
 
 
150
    def addExpectedFailure(self, test, error, todo):
 
151
        """Report that the given test failed, and was expected to do so.
 
152
 
 
153
        In Trial, tests can be marked 'todo'. That is, they are expected to fail.
 
154
 
 
155
        @type test: L{pyunit.TestCase}
 
156
        @type error: L{Failure}
 
157
        @type todo: L{unittest.Todo}
 
158
        """
 
159
        # XXX - 'todo' should just be a string
 
160
        self.expectedFailures.append((test, error, todo))
 
161
 
 
162
    def addSuccess(self, test):
 
163
        """Report that the given test succeeded.
 
164
 
 
165
        @type test: L{pyunit.TestCase}
 
166
        """
 
167
        self.successes += 1
 
168
 
 
169
    def upDownError(self, method, error, warn, printStatus):
 
170
        warnings.warn("upDownError is deprecated in Twisted 8.0.",
 
171
                      category=DeprecationWarning, stacklevel=3)
 
172
 
 
173
    def cleanupErrors(self, errs):
 
174
        """Report an error that occurred during the cleanup between tests.
 
175
        """
 
176
        warnings.warn("Cleanup errors are actual errors. Use addError. "
 
177
                      "Deprecated in Twisted 8.0",
 
178
                      category=DeprecationWarning, stacklevel=2)
 
179
 
 
180
    def startSuite(self, name):
 
181
        warnings.warn("startSuite deprecated in Twisted 8.0",
 
182
                      category=DeprecationWarning, stacklevel=2)
 
183
 
 
184
    def endSuite(self, name):
 
185
        warnings.warn("endSuite deprecated in Twisted 8.0",
 
186
                      category=DeprecationWarning, stacklevel=2)
 
187
 
 
188
 
 
189
    def done(self):
 
190
        """
 
191
        The test suite has finished running.
 
192
        """
 
193
 
 
194
 
 
195
 
 
196
class TestResultDecorator(proxyForInterface(itrial.IReporter,
 
197
                                            "_originalReporter")):
 
198
    """
 
199
    Base class for TestResult decorators.
 
200
 
 
201
    @ivar _originalReporter: The wrapped instance of reporter.
 
202
    @type _originalReporter: A provider of L{itrial.IReporter}
 
203
    """
 
204
 
 
205
    implements(itrial.IReporter)
 
206
 
 
207
 
 
208
 
 
209
class UncleanWarningsReporterWrapper(TestResultDecorator):
 
210
    """
 
211
    A wrapper for a reporter that converts L{util.DirtyReactorError}s
 
212
    to warnings.
 
213
    """
 
214
    implements(itrial.IReporter)
 
215
 
 
216
    def addError(self, test, error):
 
217
        """
 
218
        If the error is a L{util.DirtyReactorError}, instead of
 
219
        reporting it as a normal error, throw a warning.
 
220
        """
 
221
 
 
222
        if (isinstance(error, Failure)
 
223
            and error.check(util.DirtyReactorAggregateError)):
 
224
            warnings.warn(error.getErrorMessage())
 
225
        else:
 
226
            self._originalReporter.addError(test, error)
 
227
 
 
228
 
 
229
 
 
230
class _AdaptedReporter(TestResultDecorator):
 
231
    """
 
232
    TestResult decorator that makes sure that addError only gets tests that
 
233
    have been adapted with a particular test adapter.
 
234
    """
 
235
 
 
236
    def __init__(self, original, testAdapter):
 
237
        """
 
238
        Construct an L{_AdaptedReporter}.
 
239
 
 
240
        @param original: An {itrial.IReporter}.
 
241
        @param testAdapter: A callable that returns an L{itrial.ITestCase}.
 
242
        """
 
243
        TestResultDecorator.__init__(self, original)
 
244
        self.testAdapter = testAdapter
 
245
 
 
246
 
 
247
    def addError(self, test, error):
 
248
        """
 
249
        See L{itrial.IReporter}.
 
250
        """
 
251
        test = self.testAdapter(test)
 
252
        return self._originalReporter.addError(test, error)
 
253
 
 
254
 
 
255
    def addExpectedFailure(self, test, failure, todo):
 
256
        """
 
257
        See L{itrial.IReporter}.
 
258
        """
 
259
        return self._originalReporter.addExpectedFailure(
 
260
            self.testAdapter(test), failure, todo)
 
261
 
 
262
 
 
263
    def addFailure(self, test, failure):
 
264
        """
 
265
        See L{itrial.IReporter}.
 
266
        """
 
267
        test = self.testAdapter(test)
 
268
        return self._originalReporter.addFailure(test, failure)
 
269
 
 
270
 
 
271
    def addSkip(self, test, skip):
 
272
        """
 
273
        See L{itrial.IReporter}.
 
274
        """
 
275
        test = self.testAdapter(test)
 
276
        return self._originalReporter.addSkip(test, skip)
 
277
 
 
278
 
 
279
    def addUnexpectedSuccess(self, test, todo):
 
280
        """
 
281
        See L{itrial.IReporter}.
 
282
        """
 
283
        test = self.testAdapter(test)
 
284
        return self._originalReporter.addUnexpectedSuccess(test, todo)
 
285
 
 
286
 
 
287
    def startTest(self, test):
 
288
        """
 
289
        See L{itrial.IReporter}.
 
290
        """
 
291
        return self._originalReporter.startTest(self.testAdapter(test))
 
292
 
 
293
 
 
294
    def stopTest(self, test):
 
295
        """
 
296
        See L{itrial.IReporter}.
 
297
        """
 
298
        return self._originalReporter.stopTest(self.testAdapter(test))
 
299
 
 
300
 
 
301
 
 
302
class Reporter(TestResult):
 
303
    """
 
304
    A basic L{TestResult} with support for writing to a stream.
 
305
 
 
306
    @ivar _startTime: The time when the first test was started. It defaults to
 
307
        C{None}, which means that no test was actually launched.
 
308
    @type _startTime: C{float} or C{NoneType}
 
309
 
 
310
    @ivar _warningCache: A C{set} of tuples of warning message (file, line,
 
311
        text, category) which have already been written to the output stream
 
312
        during the currently executing test.  This is used to avoid writing
 
313
        duplicates of the same warning to the output stream.
 
314
    @type _warningCache: C{set}
 
315
 
 
316
    @ivar _publisher: The log publisher which will be observed for warning
 
317
        events.
 
318
    @type _publisher: L{LogPublisher} (or another type sufficiently similar)
 
319
    """
 
320
 
 
321
    implements(itrial.IReporter)
 
322
 
 
323
    _separator = '-' * 79
 
324
    _doubleSeparator = '=' * 79
 
325
 
 
326
    def __init__(self, stream=sys.stdout, tbformat='default', realtime=False,
 
327
                 publisher=None):
 
328
        super(Reporter, self).__init__()
 
329
        self._stream = SafeStream(stream)
 
330
        self.tbformat = tbformat
 
331
        self.realtime = realtime
 
332
        self._startTime = None
 
333
        self._warningCache = set()
 
334
 
 
335
        # Start observing log events so as to be able to report warnings.
 
336
        self._publisher = publisher
 
337
        if publisher is not None:
 
338
            publisher.addObserver(self._observeWarnings)
 
339
 
 
340
 
 
341
    def _observeWarnings(self, event):
 
342
        """
 
343
        Observe warning events and write them to C{self._stream}.
 
344
 
 
345
        This method is a log observer which will be registered with
 
346
        C{self._publisher.addObserver}.
 
347
 
 
348
        @param event: A C{dict} from the logging system.  If it has a
 
349
            C{'warning'} key, a logged warning will be extracted from it and
 
350
            possibly written to C{self.stream}.
 
351
        """
 
352
        if 'warning' in event:
 
353
            key = (event['filename'], event['lineno'],
 
354
                   event['category'].split('.')[-1],
 
355
                   str(event['warning']))
 
356
            if key not in self._warningCache:
 
357
                self._warningCache.add(key)
 
358
                self._stream.write('%s:%s: %s: %s\n' % key)
 
359
 
 
360
 
 
361
    def stream(self):
 
362
        warnings.warn("stream is deprecated in Twisted 8.0.",
 
363
                      category=DeprecationWarning, stacklevel=2)
 
364
        return self._stream
 
365
    stream = property(stream)
 
366
 
 
367
 
 
368
    def separator(self):
 
369
        warnings.warn("separator is deprecated in Twisted 8.0.",
 
370
                      category=DeprecationWarning, stacklevel=2)
 
371
        return self._separator
 
372
    separator = property(separator)
 
373
 
 
374
 
 
375
    def startTest(self, test):
 
376
        """
 
377
        Called when a test begins to run. Records the time when it was first
 
378
        called and resets the warning cache.
 
379
 
 
380
        @param test: L{ITestCase}
 
381
        """
 
382
        super(Reporter, self).startTest(test)
 
383
        if self._startTime is None:
 
384
            self._startTime = self._getTime()
 
385
        self._warningCache = set()
 
386
 
 
387
 
 
388
    def addFailure(self, test, fail):
 
389
        """
 
390
        Called when a test fails. If L{realtime} is set, then it prints the
 
391
        error to the stream.
 
392
 
 
393
        @param test: L{ITestCase} that failed.
 
394
        @param fail: L{failure.Failure} containing the error.
 
395
        """
 
396
        super(Reporter, self).addFailure(test, fail)
 
397
        if self.realtime:
 
398
            fail = self.failures[-1][1] # guarantee it's a Failure
 
399
            self._write(self._formatFailureTraceback(fail))
 
400
 
 
401
 
 
402
    def addError(self, test, error):
 
403
        """
 
404
        Called when a test raises an error. If L{realtime} is set, then it
 
405
        prints the error to the stream.
 
406
 
 
407
        @param test: L{ITestCase} that raised the error.
 
408
        @param error: L{failure.Failure} containing the error.
 
409
        """
 
410
        error = self._getFailure(error)
 
411
        super(Reporter, self).addError(test, error)
 
412
        if self.realtime:
 
413
            error = self.errors[-1][1] # guarantee it's a Failure
 
414
            self._write(self._formatFailureTraceback(error))
 
415
 
 
416
 
 
417
    def write(self, format, *args):
 
418
        warnings.warn("write is deprecated in Twisted 8.0.",
 
419
                      category=DeprecationWarning, stacklevel=2)
 
420
        self._write(format, *args)
 
421
 
 
422
 
 
423
    def _write(self, format, *args):
 
424
        """
 
425
        Safely write to the reporter's stream.
 
426
 
 
427
        @param format: A format string to write.
 
428
        @param *args: The arguments for the format string.
 
429
        """
 
430
        s = str(format)
 
431
        assert isinstance(s, type(''))
 
432
        if args:
 
433
            self._stream.write(s % args)
 
434
        else:
 
435
            self._stream.write(s)
 
436
        untilConcludes(self._stream.flush)
 
437
 
 
438
 
 
439
    def writeln(self, format, *args):
 
440
        warnings.warn("writeln is deprecated in Twisted 8.0.",
 
441
                      category=DeprecationWarning, stacklevel=2)
 
442
        self._writeln(format, *args)
 
443
 
 
444
 
 
445
    def _writeln(self, format, *args):
 
446
        """
 
447
        Safely write a line to the reporter's stream. Newline is appended to
 
448
        the format string.
 
449
 
 
450
        @param format: A format string to write.
 
451
        @param *args: The arguments for the format string.
 
452
        """
 
453
        self._write(format, *args)
 
454
        self._write('\n')
 
455
 
 
456
 
 
457
    def upDownError(self, method, error, warn, printStatus):
 
458
        super(Reporter, self).upDownError(method, error, warn, printStatus)
 
459
        if warn:
 
460
            tbStr = self._formatFailureTraceback(error)
 
461
            log.msg(tbStr)
 
462
            msg = ("caught exception in %s, your TestCase is broken\n\n%s"
 
463
                   % (method, tbStr))
 
464
            warnings.warn(msg, BrokenTestCaseWarning, stacklevel=2)
 
465
 
 
466
 
 
467
    def cleanupErrors(self, errs):
 
468
        super(Reporter, self).cleanupErrors(errs)
 
469
        warnings.warn("%s\n%s" % ("REACTOR UNCLEAN! traceback(s) follow: ",
 
470
                                  self._formatFailureTraceback(errs)),
 
471
                      BrokenTestCaseWarning)
 
472
 
 
473
 
 
474
    def _trimFrames(self, frames):
 
475
        # when a method fails synchronously, the stack looks like this:
 
476
        #  [0]: defer.maybeDeferred()
 
477
        #  [1]: utils.runWithWarningsSuppressed()
 
478
        #  [2:-2]: code in the test method which failed
 
479
        #  [-1]: unittest.fail
 
480
 
 
481
        # when a method fails inside a Deferred (i.e., when the test method
 
482
        # returns a Deferred, and that Deferred's errback fires), the stack
 
483
        # captured inside the resulting Failure looks like this:
 
484
        #  [0]: defer.Deferred._runCallbacks
 
485
        #  [1:-2]: code in the testmethod which failed
 
486
        #  [-1]: unittest.fail
 
487
 
 
488
        # as a result, we want to trim either [maybeDeferred,runWWS] or
 
489
        # [Deferred._runCallbacks] from the front, and trim the
 
490
        # [unittest.fail] from the end.
 
491
 
 
492
        # There is also another case, when the test method is badly defined and
 
493
        # contains extra arguments.
 
494
 
 
495
        newFrames = list(frames)
 
496
 
 
497
        if len(frames) < 2:
 
498
            return newFrames
 
499
 
 
500
        first = newFrames[0]
 
501
        second = newFrames[1]
 
502
        if (first[0] == "maybeDeferred"
 
503
            and os.path.splitext(os.path.basename(first[1]))[0] == 'defer'
 
504
            and second[0] == "runWithWarningsSuppressed"
 
505
            and os.path.splitext(os.path.basename(second[1]))[0] == 'utils'):
 
506
            newFrames = newFrames[2:]
 
507
        elif (first[0] == "_runCallbacks"
 
508
              and os.path.splitext(os.path.basename(first[1]))[0] == 'defer'):
 
509
            newFrames = newFrames[1:]
 
510
 
 
511
        if not newFrames:
 
512
            # The method fails before getting called, probably an argument problem
 
513
            return newFrames
 
514
 
 
515
        last = newFrames[-1]
 
516
        if (last[0].startswith('fail')
 
517
            and os.path.splitext(os.path.basename(last[1]))[0] == 'unittest'):
 
518
            newFrames = newFrames[:-1]
 
519
 
 
520
        return newFrames
 
521
 
 
522
 
 
523
    def _formatFailureTraceback(self, fail):
 
524
        if isinstance(fail, str):
 
525
            return fail.rstrip() + '\n'
 
526
        fail.frames, frames = self._trimFrames(fail.frames), fail.frames
 
527
        result = fail.getTraceback(detail=self.tbformat, elideFrameworkCode=True)
 
528
        fail.frames = frames
 
529
        return result
 
530
 
 
531
 
 
532
    def _printResults(self, flavour, errors, formatter):
 
533
        """
 
534
        Print a group of errors to the stream.
 
535
 
 
536
        @param flavour: A string indicating the kind of error (e.g. 'TODO').
 
537
        @param errors: A list of errors, often L{failure.Failure}s, but
 
538
            sometimes 'todo' errors.
 
539
        @param formatter: A callable that knows how to format the errors.
 
540
        """
 
541
        for content in errors:
 
542
            self._writeln(self._doubleSeparator)
 
543
            self._writeln('%s: %s' % (flavour, content[0].id()))
 
544
            self._writeln('')
 
545
            self._write(formatter(*(content[1:])))
 
546
 
 
547
 
 
548
    def _printExpectedFailure(self, error, todo):
 
549
        return 'Reason: %r\n%s' % (todo.reason,
 
550
                                   self._formatFailureTraceback(error))
 
551
 
 
552
 
 
553
    def _printUnexpectedSuccess(self, todo):
 
554
        ret = 'Reason: %r\n' % (todo.reason,)
 
555
        if todo.errors:
 
556
            ret += 'Expected errors: %s\n' % (', '.join(todo.errors),)
 
557
        return ret
 
558
 
 
559
 
 
560
    def printErrors(self):
 
561
        """
 
562
        Print all of the non-success results in full to the stream.
 
563
        """
 
564
        warnings.warn("printErrors is deprecated in Twisted 8.0.",
 
565
                      category=DeprecationWarning, stacklevel=2)
 
566
        self._printErrors()
 
567
 
 
568
 
 
569
    def _printErrors(self):
 
570
        """
 
571
        Print all of the non-success results to the stream in full.
 
572
        """
 
573
        self._write('\n')
 
574
        self._printResults('[SKIPPED]', self.skips, lambda x : '%s\n' % x)
 
575
        self._printResults('[TODO]', self.expectedFailures,
 
576
                           self._printExpectedFailure)
 
577
        self._printResults('[FAIL]', self.failures,
 
578
                           self._formatFailureTraceback)
 
579
        self._printResults('[ERROR]', self.errors,
 
580
                           self._formatFailureTraceback)
 
581
        self._printResults('[SUCCESS!?!]', self.unexpectedSuccesses,
 
582
                           self._printUnexpectedSuccess)
 
583
 
 
584
 
 
585
    def _getSummary(self):
 
586
        """
 
587
        Return a formatted count of tests status results.
 
588
        """
 
589
        summaries = []
 
590
        for stat in ("skips", "expectedFailures", "failures", "errors",
 
591
                     "unexpectedSuccesses"):
 
592
            num = len(getattr(self, stat))
 
593
            if num:
 
594
                summaries.append('%s=%d' % (stat, num))
 
595
        if self.successes:
 
596
           summaries.append('successes=%d' % (self.successes,))
 
597
        summary = (summaries and ' ('+', '.join(summaries)+')') or ''
 
598
        return summary
 
599
 
 
600
 
 
601
    def printSummary(self):
 
602
        """
 
603
        Print a line summarising the test results to the stream.
 
604
        """
 
605
        warnings.warn("printSummary is deprecated in Twisted 8.0.",
 
606
                      category=DeprecationWarning, stacklevel=2)
 
607
        self._printSummary()
 
608
 
 
609
 
 
610
    def _printSummary(self):
 
611
        """
 
612
        Print a line summarising the test results to the stream.
 
613
        """
 
614
        summary = self._getSummary()
 
615
        if self.wasSuccessful():
 
616
            status = "PASSED"
 
617
        else:
 
618
            status = "FAILED"
 
619
        self._write("%s%s\n", status, summary)
 
620
 
 
621
 
 
622
    def done(self):
 
623
        """
 
624
        Summarize the result of the test run.
 
625
 
 
626
        The summary includes a report of all of the errors, todos, skips and
 
627
        so forth that occurred during the run. It also includes the number of
 
628
        tests that were run and how long it took to run them (not including
 
629
        load time).
 
630
 
 
631
        Expects that L{_printErrors}, L{_writeln}, L{_write}, L{_printSummary}
 
632
        and L{_separator} are all implemented.
 
633
        """
 
634
        if self._publisher is not None:
 
635
            self._publisher.removeObserver(self._observeWarnings)
 
636
        self._printErrors()
 
637
        self._writeln(self._separator)
 
638
        if self._startTime is not None:
 
639
            self._writeln('Ran %d tests in %.3fs', self.testsRun,
 
640
                          time.time() - self._startTime)
 
641
        self._write('\n')
 
642
        self._printSummary()
 
643
 
 
644
 
 
645
 
 
646
class MinimalReporter(Reporter):
 
647
    """
 
648
    A minimalist reporter that prints only a summary of the test result, in
 
649
    the form of (timeTaken, #tests, #tests, #errors, #failures, #skips).
 
650
    """
 
651
 
 
652
    def _printErrors(self):
 
653
        """
 
654
        Don't print a detailed summary of errors. We only care about the
 
655
        counts.
 
656
        """
 
657
 
 
658
 
 
659
    def _printSummary(self):
 
660
        """
 
661
        Print out a one-line summary of the form:
 
662
        '%(runtime) %(number_of_tests) %(number_of_tests) %(num_errors)
 
663
        %(num_failures) %(num_skips)'
 
664
        """
 
665
        numTests = self.testsRun
 
666
        if self._startTime is not None:
 
667
            timing = self._getTime() - self._startTime
 
668
        else:
 
669
            timing = 0
 
670
        t = (timing, numTests, numTests,
 
671
             len(self.errors), len(self.failures), len(self.skips))
 
672
        self._writeln(' '.join(map(str, t)))
 
673
 
 
674
 
 
675
 
 
676
class TextReporter(Reporter):
 
677
    """
 
678
    Simple reporter that prints a single character for each test as it runs,
 
679
    along with the standard Trial summary text.
 
680
    """
 
681
 
 
682
    def addSuccess(self, test):
 
683
        super(TextReporter, self).addSuccess(test)
 
684
        self._write('.')
 
685
 
 
686
 
 
687
    def addError(self, *args):
 
688
        super(TextReporter, self).addError(*args)
 
689
        self._write('E')
 
690
 
 
691
 
 
692
    def addFailure(self, *args):
 
693
        super(TextReporter, self).addFailure(*args)
 
694
        self._write('F')
 
695
 
 
696
 
 
697
    def addSkip(self, *args):
 
698
        super(TextReporter, self).addSkip(*args)
 
699
        self._write('S')
 
700
 
 
701
 
 
702
    def addExpectedFailure(self, *args):
 
703
        super(TextReporter, self).addExpectedFailure(*args)
 
704
        self._write('T')
 
705
 
 
706
 
 
707
    def addUnexpectedSuccess(self, *args):
 
708
        super(TextReporter, self).addUnexpectedSuccess(*args)
 
709
        self._write('!')
 
710
 
 
711
 
 
712
 
 
713
class VerboseTextReporter(Reporter):
 
714
    """
 
715
    A verbose reporter that prints the name of each test as it is running.
 
716
 
 
717
    Each line is printed with the name of the test, followed by the result of
 
718
    that test.
 
719
    """
 
720
 
 
721
    # This is actually the bwverbose option
 
722
 
 
723
    def startTest(self, tm):
 
724
        self._write('%s ... ', tm.id())
 
725
        super(VerboseTextReporter, self).startTest(tm)
 
726
 
 
727
 
 
728
    def addSuccess(self, test):
 
729
        super(VerboseTextReporter, self).addSuccess(test)
 
730
        self._write('[OK]')
 
731
 
 
732
 
 
733
    def addError(self, *args):
 
734
        super(VerboseTextReporter, self).addError(*args)
 
735
        self._write('[ERROR]')
 
736
 
 
737
 
 
738
    def addFailure(self, *args):
 
739
        super(VerboseTextReporter, self).addFailure(*args)
 
740
        self._write('[FAILURE]')
 
741
 
 
742
 
 
743
    def addSkip(self, *args):
 
744
        super(VerboseTextReporter, self).addSkip(*args)
 
745
        self._write('[SKIPPED]')
 
746
 
 
747
 
 
748
    def addExpectedFailure(self, *args):
 
749
        super(VerboseTextReporter, self).addExpectedFailure(*args)
 
750
        self._write('[TODO]')
 
751
 
 
752
 
 
753
    def addUnexpectedSuccess(self, *args):
 
754
        super(VerboseTextReporter, self).addUnexpectedSuccess(*args)
 
755
        self._write('[SUCCESS!?!]')
 
756
 
 
757
 
 
758
    def stopTest(self, test):
 
759
        super(VerboseTextReporter, self).stopTest(test)
 
760
        self._write('\n')
 
761
 
 
762
 
 
763
 
 
764
class TimingTextReporter(VerboseTextReporter):
 
765
    """
 
766
    Prints out each test as it is running, followed by the time taken for each
 
767
    test to run.
 
768
    """
 
769
 
 
770
    def stopTest(self, method):
 
771
        """
 
772
        Mark the test as stopped, and write the time it took to run the test
 
773
        to the stream.
 
774
        """
 
775
        super(TimingTextReporter, self).stopTest(method)
 
776
        self._write("(%.03f secs)\n" % self._lastTime)
 
777
 
 
778
 
 
779
 
 
780
class _AnsiColorizer(object):
 
781
    """
 
782
    A colorizer is an object that loosely wraps around a stream, allowing
 
783
    callers to write text to the stream in a particular color.
 
784
 
 
785
    Colorizer classes must implement C{supported()} and C{write(text, color)}.
 
786
    """
 
787
    _colors = dict(black=30, red=31, green=32, yellow=33,
 
788
                   blue=34, magenta=35, cyan=36, white=37)
 
789
 
 
790
    def __init__(self, stream):
 
791
        self.stream = stream
 
792
 
 
793
    def supported(cls, stream=sys.stdout):
 
794
        """
 
795
        A class method that returns True if the current platform supports
 
796
        coloring terminal output using this method. Returns False otherwise.
 
797
        """
 
798
        if not stream.isatty():
 
799
            return False # auto color only on TTYs
 
800
        try:
 
801
            import curses
 
802
        except ImportError:
 
803
            return False
 
804
        else:
 
805
            try:
 
806
                try:
 
807
                    return curses.tigetnum("colors") > 2
 
808
                except curses.error:
 
809
                    curses.setupterm()
 
810
                    return curses.tigetnum("colors") > 2
 
811
            except:
 
812
                # guess false in case of error
 
813
                return False
 
814
    supported = classmethod(supported)
 
815
 
 
816
    def write(self, text, color):
 
817
        """
 
818
        Write the given text to the stream in the given color.
 
819
 
 
820
        @param text: Text to be written to the stream.
 
821
 
 
822
        @param color: A string label for a color. e.g. 'red', 'white'.
 
823
        """
 
824
        color = self._colors[color]
 
825
        self.stream.write('\x1b[%s;1m%s\x1b[0m' % (color, text))
 
826
 
 
827
 
 
828
class _Win32Colorizer(object):
 
829
    """
 
830
    See _AnsiColorizer docstring.
 
831
    """
 
832
    def __init__(self, stream):
 
833
        from win32console import GetStdHandle, STD_OUTPUT_HANDLE, \
 
834
             FOREGROUND_RED, FOREGROUND_BLUE, FOREGROUND_GREEN, \
 
835
             FOREGROUND_INTENSITY
 
836
        red, green, blue, bold = (FOREGROUND_RED, FOREGROUND_GREEN,
 
837
                                  FOREGROUND_BLUE, FOREGROUND_INTENSITY)
 
838
        self.stream = stream
 
839
        self.screenBuffer = GetStdHandle(STD_OUTPUT_HANDLE)
 
840
        self._colors = {
 
841
            'normal': red | green | blue,
 
842
            'red': red | bold,
 
843
            'green': green | bold,
 
844
            'blue': blue | bold,
 
845
            'yellow': red | green | bold,
 
846
            'magenta': red | blue | bold,
 
847
            'cyan': green | blue | bold,
 
848
            'white': red | green | blue | bold
 
849
            }
 
850
 
 
851
    def supported(cls, stream=sys.stdout):
 
852
        try:
 
853
            import win32console
 
854
            screenBuffer = win32console.GetStdHandle(
 
855
                win32console.STD_OUTPUT_HANDLE)
 
856
        except ImportError:
 
857
            return False
 
858
        import pywintypes
 
859
        try:
 
860
            screenBuffer.SetConsoleTextAttribute(
 
861
                win32console.FOREGROUND_RED |
 
862
                win32console.FOREGROUND_GREEN |
 
863
                win32console.FOREGROUND_BLUE)
 
864
        except pywintypes.error:
 
865
            return False
 
866
        else:
 
867
            return True
 
868
    supported = classmethod(supported)
 
869
 
 
870
    def write(self, text, color):
 
871
        color = self._colors[color]
 
872
        self.screenBuffer.SetConsoleTextAttribute(color)
 
873
        self.stream.write(text)
 
874
        self.screenBuffer.SetConsoleTextAttribute(self._colors['normal'])
 
875
 
 
876
 
 
877
class _NullColorizer(object):
 
878
    """
 
879
    See _AnsiColorizer docstring.
 
880
    """
 
881
    def __init__(self, stream):
 
882
        self.stream = stream
 
883
 
 
884
    def supported(cls, stream=sys.stdout):
 
885
        return True
 
886
    supported = classmethod(supported)
 
887
 
 
888
    def write(self, text, color):
 
889
        self.stream.write(text)
 
890
 
 
891
 
 
892
 
 
893
class SubunitReporter(object):
 
894
    """
 
895
    Reports test output via Subunit.
 
896
 
 
897
    @ivar _subunit: The subunit protocol client that we are wrapping.
 
898
 
 
899
    @ivar _successful: An internal variable, used to track whether we have
 
900
        received only successful results.
 
901
 
 
902
    @since: 10.0
 
903
    """
 
904
    implements(itrial.IReporter)
 
905
 
 
906
 
 
907
    def __init__(self, stream=sys.stdout, tbformat='default',
 
908
                 realtime=False, publisher=None):
 
909
        """
 
910
        Construct a L{SubunitReporter}.
 
911
 
 
912
        @param stream: A file-like object representing the stream to print
 
913
            output to. Defaults to stdout.
 
914
        @param tbformat: The format for tracebacks. Ignored, since subunit
 
915
            always uses Python's standard format.
 
916
        @param realtime: Whether or not to print exceptions in the middle
 
917
            of the test results. Ignored, since subunit always does this.
 
918
        @param publisher: The log publisher which will be preserved for
 
919
            reporting events. Ignored, as it's not relevant to subunit.
 
920
        """
 
921
        if TestProtocolClient is None:
 
922
            raise Exception("Subunit not available")
 
923
        self._subunit = TestProtocolClient(stream)
 
924
        self._successful = True
 
925
 
 
926
 
 
927
    def done(self):
 
928
        """
 
929
        Record that the entire test suite run is finished.
 
930
 
 
931
        We do nothing, since a summary clause is irrelevant to the subunit
 
932
        protocol.
 
933
        """
 
934
        pass
 
935
 
 
936
 
 
937
    def shouldStop(self):
 
938
        """
 
939
        Whether or not the test runner should stop running tests.
 
940
        """
 
941
        return self._subunit.shouldStop
 
942
    shouldStop = property(shouldStop)
 
943
 
 
944
 
 
945
    def stop(self):
 
946
        """
 
947
        Signal that the test runner should stop running tests.
 
948
        """
 
949
        return self._subunit.stop()
 
950
 
 
951
 
 
952
    def wasSuccessful(self):
 
953
        """
 
954
        Has the test run been successful so far?
 
955
 
 
956
        @return: C{True} if we have received no reports of errors or failures,
 
957
            C{False} otherwise.
 
958
        """
 
959
        # Subunit has a bug in its implementation of wasSuccessful, see
 
960
        # https://bugs.edge.launchpad.net/subunit/+bug/491090, so we can't
 
961
        # simply forward it on.
 
962
        return self._successful
 
963
 
 
964
 
 
965
    def startTest(self, test):
 
966
        """
 
967
        Record that C{test} has started.
 
968
        """
 
969
        return self._subunit.startTest(test)
 
970
 
 
971
 
 
972
    def stopTest(self, test):
 
973
        """
 
974
        Record that C{test} has completed.
 
975
        """
 
976
        return self._subunit.stopTest(test)
 
977
 
 
978
 
 
979
    def addSuccess(self, test):
 
980
        """
 
981
        Record that C{test} was successful.
 
982
        """
 
983
        return self._subunit.addSuccess(test)
 
984
 
 
985
 
 
986
    def addSkip(self, test, reason):
 
987
        """
 
988
        Record that C{test} was skipped for C{reason}.
 
989
 
 
990
        Some versions of subunit don't have support for addSkip. In those
 
991
        cases, the skip is reported as a success.
 
992
 
 
993
        @param test: A unittest-compatible C{TestCase}.
 
994
        @param reason: The reason for it being skipped. The C{str()} of this
 
995
            object will be included in the subunit output stream.
 
996
        """
 
997
        addSkip = getattr(self._subunit, 'addSkip', None)
 
998
        if addSkip is None:
 
999
            self.addSuccess(test)
 
1000
        else:
 
1001
            self._subunit.addSkip(test, reason)
 
1002
 
 
1003
 
 
1004
    def addError(self, test, err):
 
1005
        """
 
1006
        Record that C{test} failed with an unexpected error C{err}.
 
1007
 
 
1008
        Also marks the run as being unsuccessful, causing
 
1009
        L{SubunitReporter.wasSuccessful} to return C{False}.
 
1010
        """
 
1011
        self._successful = False
 
1012
        return self._subunit.addError(
 
1013
            test, util.excInfoOrFailureToExcInfo(err))
 
1014
 
 
1015
 
 
1016
    def addFailure(self, test, err):
 
1017
        """
 
1018
        Record that C{test} failed an assertion with the error C{err}.
 
1019
 
 
1020
        Also marks the run as being unsuccessful, causing
 
1021
        L{SubunitReporter.wasSuccessful} to return C{False}.
 
1022
        """
 
1023
        self._successful = False
 
1024
        return self._subunit.addFailure(
 
1025
            test, util.excInfoOrFailureToExcInfo(err))
 
1026
 
 
1027
 
 
1028
    def addExpectedFailure(self, test, failure, todo):
 
1029
        """
 
1030
        Record an expected failure from a test.
 
1031
 
 
1032
        Some versions of subunit do not implement this. For those versions, we
 
1033
        record a success.
 
1034
        """
 
1035
        failure = util.excInfoOrFailureToExcInfo(failure)
 
1036
        addExpectedFailure = getattr(self._subunit, 'addExpectedFailure', None)
 
1037
        if addExpectedFailure is None:
 
1038
            self.addSuccess(test)
 
1039
        else:
 
1040
            addExpectedFailure(test, failure)
 
1041
 
 
1042
 
 
1043
    def addUnexpectedSuccess(self, test, todo):
 
1044
        """
 
1045
        Record an unexpected success.
 
1046
 
 
1047
        Since subunit has no way of expressing this concept, we record a
 
1048
        success on the subunit stream.
 
1049
        """
 
1050
        # Not represented in pyunit/subunit.
 
1051
        self.addSuccess(test)
 
1052
 
 
1053
 
 
1054
 
 
1055
class TreeReporter(Reporter):
 
1056
    """
 
1057
    Print out the tests in the form a tree.
 
1058
 
 
1059
    Tests are indented according to which class and module they belong.
 
1060
    Results are printed in ANSI color.
 
1061
    """
 
1062
 
 
1063
    currentLine = ''
 
1064
    indent = '  '
 
1065
    columns = 79
 
1066
 
 
1067
    FAILURE = 'red'
 
1068
    ERROR = 'red'
 
1069
    TODO = 'blue'
 
1070
    SKIP = 'blue'
 
1071
    TODONE = 'red'
 
1072
    SUCCESS = 'green'
 
1073
 
 
1074
    def __init__(self, stream=sys.stdout, *args, **kwargs):
 
1075
        super(TreeReporter, self).__init__(stream, *args, **kwargs)
 
1076
        self._lastTest = []
 
1077
        for colorizer in [_Win32Colorizer, _AnsiColorizer, _NullColorizer]:
 
1078
            if colorizer.supported(stream):
 
1079
                self._colorizer = colorizer(stream)
 
1080
                break
 
1081
 
 
1082
    def getDescription(self, test):
 
1083
        """
 
1084
        Return the name of the method which 'test' represents.  This is
 
1085
        what gets displayed in the leaves of the tree.
 
1086
 
 
1087
        e.g. getDescription(TestCase('test_foo')) ==> test_foo
 
1088
        """
 
1089
        return test.id().split('.')[-1]
 
1090
 
 
1091
    def addSuccess(self, test):
 
1092
        super(TreeReporter, self).addSuccess(test)
 
1093
        self.endLine('[OK]', self.SUCCESS)
 
1094
 
 
1095
    def addError(self, *args):
 
1096
        super(TreeReporter, self).addError(*args)
 
1097
        self.endLine('[ERROR]', self.ERROR)
 
1098
 
 
1099
    def addFailure(self, *args):
 
1100
        super(TreeReporter, self).addFailure(*args)
 
1101
        self.endLine('[FAIL]', self.FAILURE)
 
1102
 
 
1103
    def addSkip(self, *args):
 
1104
        super(TreeReporter, self).addSkip(*args)
 
1105
        self.endLine('[SKIPPED]', self.SKIP)
 
1106
 
 
1107
    def addExpectedFailure(self, *args):
 
1108
        super(TreeReporter, self).addExpectedFailure(*args)
 
1109
        self.endLine('[TODO]', self.TODO)
 
1110
 
 
1111
    def addUnexpectedSuccess(self, *args):
 
1112
        super(TreeReporter, self).addUnexpectedSuccess(*args)
 
1113
        self.endLine('[SUCCESS!?!]', self.TODONE)
 
1114
 
 
1115
    def _write(self, format, *args):
 
1116
        if args:
 
1117
            format = format % args
 
1118
        self.currentLine = format
 
1119
        super(TreeReporter, self)._write(self.currentLine)
 
1120
 
 
1121
 
 
1122
    def _getPreludeSegments(self, testID):
 
1123
        """
 
1124
        Return a list of all non-leaf segments to display in the tree.
 
1125
 
 
1126
        Normally this is the module and class name.
 
1127
        """
 
1128
        segments = testID.split('.')[:-1]
 
1129
        if len(segments) == 0:
 
1130
            return segments
 
1131
        segments = [
 
1132
            seg for seg in '.'.join(segments[:-1]), segments[-1]
 
1133
            if len(seg) > 0]
 
1134
        return segments
 
1135
 
 
1136
 
 
1137
    def _testPrelude(self, testID):
 
1138
        """
 
1139
        Write the name of the test to the stream, indenting it appropriately.
 
1140
 
 
1141
        If the test is the first test in a new 'branch' of the tree, also
 
1142
        write all of the parents in that branch.
 
1143
        """
 
1144
        segments = self._getPreludeSegments(testID)
 
1145
        indentLevel = 0
 
1146
        for seg in segments:
 
1147
            if indentLevel < len(self._lastTest):
 
1148
                if seg != self._lastTest[indentLevel]:
 
1149
                    self._write('%s%s\n' % (self.indent * indentLevel, seg))
 
1150
            else:
 
1151
                self._write('%s%s\n' % (self.indent * indentLevel, seg))
 
1152
            indentLevel += 1
 
1153
        self._lastTest = segments
 
1154
 
 
1155
 
 
1156
    def cleanupErrors(self, errs):
 
1157
        self._colorizer.write('    cleanup errors', self.ERROR)
 
1158
        self.endLine('[ERROR]', self.ERROR)
 
1159
        super(TreeReporter, self).cleanupErrors(errs)
 
1160
 
 
1161
    def upDownError(self, method, error, warn, printStatus):
 
1162
        self._colorizer.write("  %s" % method, self.ERROR)
 
1163
        if printStatus:
 
1164
            self.endLine('[ERROR]', self.ERROR)
 
1165
        super(TreeReporter, self).upDownError(method, error, warn, printStatus)
 
1166
 
 
1167
    def startTest(self, test):
 
1168
        """
 
1169
        Called when C{test} starts. Writes the tests name to the stream using
 
1170
        a tree format.
 
1171
        """
 
1172
        self._testPrelude(test.id())
 
1173
        self._write('%s%s ... ' % (self.indent * (len(self._lastTest)),
 
1174
                                   self.getDescription(test)))
 
1175
        super(TreeReporter, self).startTest(test)
 
1176
 
 
1177
 
 
1178
    def endLine(self, message, color):
 
1179
        """
 
1180
        Print 'message' in the given color.
 
1181
 
 
1182
        @param message: A string message, usually '[OK]' or something similar.
 
1183
        @param color: A string color, 'red', 'green' and so forth.
 
1184
        """
 
1185
        spaces = ' ' * (self.columns - len(self.currentLine) - len(message))
 
1186
        super(TreeReporter, self)._write(spaces)
 
1187
        self._colorizer.write(message, color)
 
1188
        super(TreeReporter, self)._write("\n")
 
1189
 
 
1190
 
 
1191
    def _printSummary(self):
 
1192
        """
 
1193
        Print a line summarising the test results to the stream, and color the
 
1194
        status result.
 
1195
        """
 
1196
        summary = self._getSummary()
 
1197
        if self.wasSuccessful():
 
1198
            status = "PASSED"
 
1199
            color = self.SUCCESS
 
1200
        else:
 
1201
            status = "FAILED"
 
1202
            color = self.FAILURE
 
1203
        self._colorizer.write(status, color)
 
1204
        self._write("%s\n", summary)