1
# -*- test-case-name: twisted.trial.test.test_tests -*-
2
# Copyright (c) 2001-2009 Twisted Matrix Laboratories.
3
# See LICENSE for details.
6
Things likely to be used by writers of unit tests.
8
Maintainer: Jonathan Lange
12
import doctest, inspect
13
import os, warnings, sys, tempfile, gc, types
14
from pprint import pformat
16
from dis import findlinestarts as _findlinestarts
18
# Definition copied from Python's Lib/dis.py - findlinestarts was not
19
# available in Python 2.3. This function is copyright Python Software
20
# Foundation, released under the Python license:
21
# http://www.python.org/psf/license/
22
def _findlinestarts(code):
23
"""Find the offsets in a byte code which are start of lines in the source.
25
Generate pairs (offset, lineno) as described in Python/compile.c.
28
byte_increments = [ord(c) for c in code.co_lnotab[0::2]]
29
line_increments = [ord(c) for c in code.co_lnotab[1::2]]
32
lineno = code.co_firstlineno
34
for byte_incr, line_incr in zip(byte_increments, line_increments):
36
if lineno != lastlineno:
41
if lineno != lastlineno:
44
from twisted.internet import defer, utils
45
from twisted.python import components, failure, log, monkey
46
from twisted.python.deprecate import getDeprecationWarningString
48
from twisted.trial import itrial, reporter, util
50
pyunit = __import__('unittest')
52
from zope.interface import implements
56
class SkipTest(Exception):
58
Raise this (with a reason) to skip the current test. You may also set
59
method.skip to a reason string to skip it, or set class.skip to skip the
64
class FailTest(AssertionError):
65
"""Raised to indicate the current test has failed to pass."""
70
Internal object used to mark a L{TestCase} as 'todo'. Tests marked 'todo'
71
are reported differently in Trial L{TestResult}s. If todo'd tests fail,
72
they do not fail the suite and the errors are reported in a separate
73
category. If todo'd tests succeed, Trial L{TestResult}s will report an
77
def __init__(self, reason, errors=None):
79
@param reason: A string explaining why the test is marked 'todo'
81
@param errors: An iterable of exception types that the test is
82
expected to raise. If one of these errors is raised by the test, it
83
will be trapped. Raising any other kind of error will fail the test.
84
If C{None} is passed, then all errors will be trapped.
90
return "<Todo reason=%r errors=%r>" % (self.reason, self.errors)
92
def expected(self, failure):
94
@param failure: A L{twisted.python.failure.Failure}.
96
@return: C{True} if C{failure} is expected, C{False} otherwise.
98
if self.errors is None:
100
for error in self.errors:
101
if failure.check(error):
108
Return a L{Todo} object built from C{value}.
110
If C{value} is a string, return a Todo that expects any exception with
111
C{value} as a reason. If C{value} is a tuple, the second element is used
112
as the reason and the first element as the excepted error(s).
114
@param value: A string or a tuple of C{(errors, reason)}, where C{errors}
115
is either a single exception class or an iterable of exception classes.
117
@return: A L{Todo} object.
119
if isinstance(value, str):
120
return Todo(reason=value)
121
if isinstance(value, tuple):
122
errors, reason = value
124
errors = list(errors)
127
return Todo(reason=reason, errors=errors)
131
class _Warning(object):
133
A L{_Warning} instance represents one warning emitted through the Python
134
warning system (L{warnings}). This is used to insulate callers of
135
L{_collectWarnings} from changes to the Python warnings system which might
136
otherwise require changes to the warning objects that function passes to
137
the observer object it accepts.
139
@ivar message: The string which was passed as the message parameter to
142
@ivar category: The L{Warning} subclass which was passed as the category
143
parameter to L{warnings.warn}.
145
@ivar filename: The name of the file containing the definition of the code
146
object which was C{stacklevel} frames above the call to
147
L{warnings.warn}, where C{stacklevel} is the value of the C{stacklevel}
148
parameter passed to L{warnings.warn}.
150
@ivar lineno: The source line associated with the active instruction of the
151
code object object which was C{stacklevel} frames above the call to
152
L{warnings.warn}, where C{stacklevel} is the value of the C{stacklevel}
153
parameter passed to L{warnings.warn}.
155
def __init__(self, message, category, filename, lineno):
156
self.message = message
157
self.category = category
158
self.filename = filename
163
def _collectWarnings(observeWarning, f, *args, **kwargs):
165
Call C{f} with C{args} positional arguments and C{kwargs} keyword arguments
166
and collect all warnings which are emitted as a result in a list.
168
@param observeWarning: A callable which will be invoked with a L{_Warning}
169
instance each time a warning is emitted.
171
@return: The return value of C{f(*args, **kwargs)}.
173
def showWarning(message, category, filename, lineno, file=None, line=None):
174
assert isinstance(message, Warning)
175
observeWarning(_Warning(
176
message.args[0], category, filename, lineno))
178
# Disable the per-module cache for every module otherwise if the warning
179
# which the caller is expecting us to collect was already emitted it won't
180
# be re-emitted by the call to f which happens below.
181
for v in sys.modules.itervalues():
184
v.__warningregistry__ = None
186
# Don't specify a particular exception type to handle in case
187
# some wacky object raises some wacky exception in response to
188
# the setattr attempt.
191
origFilters = warnings.filters[:]
192
origShow = warnings.showwarning
193
warnings.simplefilter('always')
195
warnings.showwarning = showWarning
196
result = f(*args, **kwargs)
198
warnings.filters[:] = origFilters
199
warnings.showwarning = origShow
204
class _Assertions(pyunit.TestCase, object):
206
Replaces many of the built-in TestCase assertions. In general, these
207
assertions provide better error messages and are easier to use in
208
callbacks. Also provides new assertions such as L{failUnlessFailure}.
210
Although the tests are defined as 'failIf*' and 'failUnless*', they can
211
also be called as 'assertNot*' and 'assert*'.
214
def fail(self, msg=None):
216
Absolutely fail the test. Do not pass go, do not collect $200.
218
@param msg: the message that will be displayed as the reason for the
221
raise self.failureException(msg)
223
def failIf(self, condition, msg=None):
225
Fail the test if C{condition} evaluates to True.
227
@param condition: any object that defines __nonzero__
230
raise self.failureException(msg)
232
assertNot = assertFalse = failUnlessFalse = failIf
234
def failUnless(self, condition, msg=None):
236
Fail the test if C{condition} evaluates to False.
238
@param condition: any object that defines __nonzero__
241
raise self.failureException(msg)
243
assert_ = assertTrue = failUnlessTrue = failUnless
245
def failUnlessRaises(self, exception, f, *args, **kwargs):
247
Fail the test unless calling the function C{f} with the given
248
C{args} and C{kwargs} raises C{exception}. The failure will report
249
the traceback and call stack of the unexpected exception.
251
@param exception: exception type that is to be expected
252
@param f: the function to call
254
@return: The raised exception instance, if it is of the given type.
255
@raise self.failureException: Raised if the function call does
256
not raise an exception or if it raises an exception of a
260
result = f(*args, **kwargs)
261
except exception, inst:
264
raise self.failureException('%s raised instead of %s:\n %s'
265
% (sys.exc_info()[0],
267
failure.Failure().getTraceback()))
269
raise self.failureException('%s not raised (%r returned)'
270
% (exception.__name__, result))
271
assertRaises = failUnlessRaises
273
def failUnlessEqual(self, first, second, msg=''):
275
Fail the test if C{first} and C{second} are not equal.
277
@param msg: A string describing the failure that's included in the
280
if not first == second:
285
raise self.failureException(
286
'%snot equal:\na = %s\nb = %s\n'
287
% (msg, pformat(first), pformat(second)))
289
assertEqual = assertEquals = failUnlessEquals = failUnlessEqual
291
def failUnlessIdentical(self, first, second, msg=None):
293
Fail the test if C{first} is not C{second}. This is an
294
obect-identity-equality test, not an object equality
295
(i.e. C{__eq__}) test.
297
@param msg: if msg is None, then the failure message will be
298
'%r is not %r' % (first, second)
300
if first is not second:
301
raise self.failureException(msg or '%r is not %r' % (first, second))
303
assertIdentical = failUnlessIdentical
305
def failIfIdentical(self, first, second, msg=None):
307
Fail the test if C{first} is C{second}. This is an
308
obect-identity-equality test, not an object equality
309
(i.e. C{__eq__}) test.
311
@param msg: if msg is None, then the failure message will be
312
'%r is %r' % (first, second)
315
raise self.failureException(msg or '%r is %r' % (first, second))
317
assertNotIdentical = failIfIdentical
319
def failIfEqual(self, first, second, msg=None):
321
Fail the test if C{first} == C{second}.
323
@param msg: if msg is None, then the failure message will be
324
'%r == %r' % (first, second)
326
if not first != second:
327
raise self.failureException(msg or '%r == %r' % (first, second))
329
assertNotEqual = assertNotEquals = failIfEquals = failIfEqual
331
def failUnlessIn(self, containee, container, msg=None):
333
Fail the test if C{containee} is not found in C{container}.
335
@param containee: the value that should be in C{container}
336
@param container: a sequence type, or in the case of a mapping type,
337
will follow semantics of 'if key in dict.keys()'
338
@param msg: if msg is None, then the failure message will be
339
'%r not in %r' % (first, second)
341
if containee not in container:
342
raise self.failureException(msg or "%r not in %r"
343
% (containee, container))
345
assertIn = failUnlessIn
347
def failIfIn(self, containee, container, msg=None):
349
Fail the test if C{containee} is found in C{container}.
351
@param containee: the value that should not be in C{container}
352
@param container: a sequence type, or in the case of a mapping type,
353
will follow semantics of 'if key in dict.keys()'
354
@param msg: if msg is None, then the failure message will be
355
'%r in %r' % (first, second)
357
if containee in container:
358
raise self.failureException(msg or "%r in %r"
359
% (containee, container))
361
assertNotIn = failIfIn
363
def failIfAlmostEqual(self, first, second, places=7, msg=None):
365
Fail if the two objects are equal as determined by their
366
difference rounded to the given number of decimal places
367
(default 7) and comparing to zero.
369
@note: decimal places (from zero) is usually not the same
370
as significant digits (measured from the most
373
@note: included for compatiblity with PyUnit test cases
375
if round(second-first, places) == 0:
376
raise self.failureException(msg or '%r == %r within %r places'
377
% (first, second, places))
379
assertNotAlmostEqual = assertNotAlmostEquals = failIfAlmostEqual
380
failIfAlmostEquals = failIfAlmostEqual
382
def failUnlessAlmostEqual(self, first, second, places=7, msg=None):
384
Fail if the two objects are unequal as determined by their
385
difference rounded to the given number of decimal places
386
(default 7) and comparing to zero.
388
@note: decimal places (from zero) is usually not the same
389
as significant digits (measured from the most
392
@note: included for compatiblity with PyUnit test cases
394
if round(second-first, places) != 0:
395
raise self.failureException(msg or '%r != %r within %r places'
396
% (first, second, places))
398
assertAlmostEqual = assertAlmostEquals = failUnlessAlmostEqual
399
failUnlessAlmostEquals = failUnlessAlmostEqual
401
def failUnlessApproximates(self, first, second, tolerance, msg=None):
403
Fail if C{first} - C{second} > C{tolerance}
405
@param msg: if msg is None, then the failure message will be
406
'%r ~== %r' % (first, second)
408
if abs(first - second) > tolerance:
409
raise self.failureException(msg or "%s ~== %s" % (first, second))
411
assertApproximates = failUnlessApproximates
413
def failUnlessFailure(self, deferred, *expectedFailures):
415
Fail if C{deferred} does not errback with one of C{expectedFailures}.
416
Returns the original Deferred with callbacks added. You will need
417
to return this Deferred from your test case.
420
raise self.failureException(
421
"did not catch an error, instead got %r" % (ignore,))
424
if failure.check(*expectedFailures):
427
output = ('\nExpected: %r\nGot:\n%s'
428
% (expectedFailures, str(failure)))
429
raise self.failureException(output)
430
return deferred.addCallbacks(_cb, _eb)
431
assertFailure = failUnlessFailure
433
def failUnlessSubstring(self, substring, astring, msg=None):
435
Fail if C{substring} does not exist within C{astring}.
437
return self.failUnlessIn(substring, astring, msg)
438
assertSubstring = failUnlessSubstring
440
def failIfSubstring(self, substring, astring, msg=None):
442
Fail if C{astring} contains C{substring}.
444
return self.failIfIn(substring, astring, msg)
445
assertNotSubstring = failIfSubstring
447
def failUnlessWarns(self, category, message, filename, f,
450
Fail if the given function doesn't generate the specified warning when
451
called. It calls the function, checks the warning, and forwards the
452
result of the function if everything is fine.
454
@param category: the category of the warning to check.
455
@param message: the output message of the warning to check.
456
@param filename: the filename where the warning should come from.
457
@param f: the function which is supposed to generate the warning.
458
@type f: any callable.
459
@param args: the arguments to C{f}.
460
@param kwargs: the keywords arguments to C{f}.
462
@return: the result of the original function C{f}.
465
result = _collectWarnings(warningsShown.append, f, *args, **kwargs)
467
if not warningsShown:
468
self.fail("No warnings emitted")
469
first = warningsShown[0]
470
for other in warningsShown[1:]:
471
if ((other.message, other.category)
472
!= (first.message, first.category)):
473
self.fail("Can't handle different warnings")
474
self.assertEqual(first.message, message)
475
self.assertIdentical(first.category, category)
477
# Use starts with because of .pyc/.pyo issues.
479
filename.startswith(first.filename),
480
'Warning in %r, expected %r' % (first.filename, filename))
482
# It would be nice to be able to check the line number as well, but
483
# different configurations actually end up reporting different line
484
# numbers (generally the variation is only 1 line, but that's enough
485
# to fail the test erroneously...).
486
# self.assertEqual(lineno, xxx)
489
assertWarns = failUnlessWarns
491
def failUnlessIsInstance(self, instance, classOrTuple):
493
Fail if C{instance} is not an instance of the given class or of
494
one of the given classes.
496
@param instance: the object to test the type (first argument of the
499
@param classOrTuple: the class or classes to test against (second
500
argument of the C{isinstance} call).
501
@type classOrTuple: class, type, or tuple.
503
if not isinstance(instance, classOrTuple):
504
self.fail("%r is not an instance of %s" % (instance, classOrTuple))
505
assertIsInstance = failUnlessIsInstance
507
def failIfIsInstance(self, instance, classOrTuple):
509
Fail if C{instance} is not an instance of the given class or of
510
one of the given classes.
512
@param instance: the object to test the type (first argument of the
515
@param classOrTuple: the class or classes to test against (second
516
argument of the C{isinstance} call).
517
@type classOrTuple: class, type, or tuple.
519
if isinstance(instance, classOrTuple):
520
self.fail("%r is an instance of %s" % (instance, classOrTuple))
521
assertNotIsInstance = failIfIsInstance
524
class _LogObserver(object):
526
Observes the Twisted logs and catches any errors.
528
@ivar _errors: A C{list} of L{Failure} instances which were received as
529
error events from the Twisted logging system.
531
@ivar _added: A C{int} giving the number of times C{_add} has been called
532
less the number of times C{_remove} has been called; used to only add
533
this observer to the Twisted logging since once, regardless of the
534
number of calls to the add method.
536
@ivar _ignored: A C{list} of exception types which will not be recorded.
547
log.addObserver(self.gotEvent)
548
self._oldFE, log._flushErrors = (log._flushErrors, self.flushErrors)
549
self._oldIE, log._ignore = (log._ignore, self._ignoreErrors)
550
self._oldCI, log._clearIgnores = (log._clearIgnores,
557
log.removeObserver(self.gotEvent)
558
log._flushErrors = self._oldFE
559
log._ignore = self._oldIE
560
log._clearIgnores = self._oldCI
563
def _ignoreErrors(self, *errorTypes):
565
Do not store any errors with any of the given types.
567
self._ignored.extend(errorTypes)
570
def _clearIgnores(self):
572
Stop ignoring any errors we might currently be ignoring.
577
def flushErrors(self, *errorTypes):
579
Flush errors from the list of caught errors. If no arguments are
580
specified, remove all errors. If arguments are specified, only remove
581
errors of those types from the stored list.
586
for f in self._errors:
587
if f.check(*errorTypes):
591
self._errors = remainder
593
flushed = self._errors
600
Return a list of errors caught by this observer.
605
def gotEvent(self, event):
607
The actual observer method. Called whenever a message is logged.
609
@param event: A dictionary containing the log message. Actual
610
structure undocumented (see source for L{twisted.python.log}).
612
if event.get('isError', False) and 'failure' in event:
614
if len(self._ignored) == 0 or not f.check(*self._ignored):
615
self._errors.append(f)
619
_logObserver = _LogObserver()
621
_wait_is_running = []
623
class TestCase(_Assertions):
625
A unit test. The atom of the unit testing universe.
627
This class extends C{unittest.TestCase} from the standard library. The
628
main feature is the ability to return C{Deferred}s from tests and fixture
629
methods and to have the suite wait for those C{Deferred}s to fire.
631
To write a unit test, subclass C{TestCase} and define a method (say,
632
'test_foo') on the subclass. To run the test, instantiate your subclass
633
with the name of the method, and call L{run} on the instance, passing a
634
L{TestResult} object.
636
The C{trial} script will automatically find any C{TestCase} subclasses
637
defined in modules beginning with 'test_' and construct test cases for all
638
methods beginning with 'test'.
640
If an error is logged during the test run, the test will fail with an
641
error. See L{log.err}.
643
@ivar failureException: An exception class, defaulting to C{FailTest}. If
644
the test method raises this exception, it will be reported as a failure,
645
rather than an exception. All of the assertion methods raise this if the
648
@ivar skip: C{None} or a string explaining why this test is to be
649
skipped. If defined, the test will not be run. Instead, it will be
650
reported to the result object as 'skipped' (if the C{TestResult} supports
653
@ivar suppress: C{None} or a list of tuples of C{(args, kwargs)} to be
654
passed to C{warnings.filterwarnings}. Use these to suppress warnings
655
raised in a test. Useful for testing deprecated code. See also
658
@ivar timeout: A real number of seconds. If set, the test will
659
raise an error if it takes longer than C{timeout} seconds.
660
If not set, util.DEFAULT_TIMEOUT_DURATION is used.
662
@ivar todo: C{None}, a string or a tuple of C{(errors, reason)} where
663
C{errors} is either an exception class or an iterable of exception
664
classes, and C{reason} is a string. See L{Todo} or L{makeTodo} for more
668
implements(itrial.ITestCase)
669
failureException = FailTest
671
def __init__(self, methodName='runTest'):
673
Construct an asynchronous test case for C{methodName}.
675
@param methodName: The name of a method on C{self}. This method should
676
be a unit test. That is, it should be a short method that calls some of
677
the assert* methods. If C{methodName} is unspecified, L{runTest} will
678
be used as the test method. This is mostly useful for testing Trial.
680
super(TestCase, self).__init__(methodName)
681
self._testMethodName = methodName
682
testMethod = getattr(self, methodName)
683
self._parents = [testMethod, self]
684
self._parents.extend(util.getPythonContainers(testMethod))
688
if sys.version_info >= (2, 6):
689
# Override the comparison defined by the base TestCase which considers
690
# instances of the same class with the same _testMethodName to be
691
# equal. Since trial puts TestCase instances into a set, that
692
# definition of comparison makes it impossible to run the same test
693
# method twice. Most likely, trial should stop using a set to hold
694
# tests, but until it does, this is necessary on Python 2.6. Only
695
# __eq__ and __ne__ are required here, not __hash__, since the
696
# inherited __hash__ is compatible with these equality semantics. A
697
# different __hash__ might be slightly more efficient (by reducing
698
# collisions), but who cares? -exarkun
699
def __eq__(self, other):
702
def __ne__(self, other):
703
return self is not other
706
def _run(self, methodName, result):
707
from twisted.internet import reactor
708
timeout = self.getTimeout()
710
e = defer.TimeoutError("%r (%s) still running at %s secs"
711
% (self, methodName, timeout))
712
f = failure.Failure(e)
713
# try to errback the deferred that the test returns (for no gorram
714
# reason) (see issue1005 and test_errorPropagation in
718
except defer.AlreadyCalledError:
719
# if the deferred has been called already but the *back chain
720
# is still unfinished, crash the reactor and report timeout
723
self._timedOut = True # see self._wait
724
todo = self.getTodo()
725
if todo is not None and todo.expected(f):
726
result.addExpectedFailure(self, f, todo)
728
result.addError(self, f)
729
onTimeout = utils.suppressWarnings(
730
onTimeout, util.suppress(category=DeprecationWarning))
731
method = getattr(self, methodName)
732
d = defer.maybeDeferred(utils.runWithWarningsSuppressed,
733
self.getSuppress(), method)
734
call = reactor.callLater(timeout, onTimeout, d)
735
d.addBoth(lambda x : call.active() and call.cancel() or x)
738
def shortDescription(self):
739
desc = super(TestCase, self).shortDescription()
741
return self._testMethodName
744
def __call__(self, *args, **kwargs):
745
return self.run(*args, **kwargs)
747
def deferSetUp(self, ignored, result):
748
d = self._run('setUp', result)
749
d.addCallbacks(self.deferTestMethod, self._ebDeferSetUp,
750
callbackArgs=(result,),
751
errbackArgs=(result,))
754
def _ebDeferSetUp(self, failure, result):
755
if failure.check(SkipTest):
756
result.addSkip(self, self._getReason(failure))
758
result.addError(self, failure)
759
if failure.check(KeyboardInterrupt):
761
return self.deferRunCleanups(None, result)
763
def deferTestMethod(self, ignored, result):
764
d = self._run(self._testMethodName, result)
765
d.addCallbacks(self._cbDeferTestMethod, self._ebDeferTestMethod,
766
callbackArgs=(result,),
767
errbackArgs=(result,))
768
d.addBoth(self.deferRunCleanups, result)
769
d.addBoth(self.deferTearDown, result)
772
def _cbDeferTestMethod(self, ignored, result):
773
if self.getTodo() is not None:
774
result.addUnexpectedSuccess(self, self.getTodo())
779
def _ebDeferTestMethod(self, f, result):
780
todo = self.getTodo()
781
if todo is not None and todo.expected(f):
782
result.addExpectedFailure(self, f, todo)
783
elif f.check(self.failureException, FailTest):
784
result.addFailure(self, f)
785
elif f.check(KeyboardInterrupt):
786
result.addError(self, f)
788
elif f.check(SkipTest):
789
result.addSkip(self, self._getReason(f))
791
result.addError(self, f)
793
def deferTearDown(self, ignored, result):
794
d = self._run('tearDown', result)
795
d.addErrback(self._ebDeferTearDown, result)
798
def _ebDeferTearDown(self, failure, result):
799
result.addError(self, failure)
800
if failure.check(KeyboardInterrupt):
804
def deferRunCleanups(self, ignored, result):
806
Run any scheduled cleanups and report errors (if any to the result
809
d = self._runCleanups()
810
d.addCallback(self._cbDeferRunCleanups, result)
813
def _cbDeferRunCleanups(self, cleanupResults, result):
814
for flag, failure in cleanupResults:
815
if flag == defer.FAILURE:
816
result.addError(self, failure)
817
if failure.check(KeyboardInterrupt):
821
def _cleanUp(self, result):
823
clean = util._Janitor(self, result).postCaseCleanup()
827
result.addError(self, failure.Failure())
829
for error in self._observer.getErrors():
830
result.addError(self, error)
832
self.flushLoggedErrors()
833
self._removeObserver()
835
result.addSuccess(self)
837
def _classCleanUp(self, result):
839
util._Janitor(self, result).postClassCleanup()
841
result.addError(self, failure.Failure())
843
def _makeReactorMethod(self, name):
845
Create a method which wraps the reactor method C{name}. The new
846
method issues a deprecation warning and calls the original.
849
warnings.warn("reactor.%s cannot be used inside unit tests. "
850
"In the future, using %s will fail the test and may "
851
"crash or hang the test run."
853
stacklevel=2, category=DeprecationWarning)
854
return self._reactorMethods[name](*a, **kw)
857
def _deprecateReactor(self, reactor):
859
Deprecate C{iterate}, C{crash} and C{stop} on C{reactor}. That is,
860
each method is wrapped in a function that issues a deprecation
861
warning, then calls the original.
863
@param reactor: The Twisted reactor.
865
self._reactorMethods = {}
866
for name in ['crash', 'iterate', 'stop']:
867
self._reactorMethods[name] = getattr(reactor, name)
868
setattr(reactor, name, self._makeReactorMethod(name))
870
def _undeprecateReactor(self, reactor):
872
Restore the deprecated reactor methods. Undoes what
873
L{_deprecateReactor} did.
875
@param reactor: The Twisted reactor.
877
for name, method in self._reactorMethods.iteritems():
878
setattr(reactor, name, method)
879
self._reactorMethods = {}
881
def _installObserver(self):
882
self._observer = _logObserver
883
self._observer._add()
885
def _removeObserver(self):
886
self._observer._remove()
888
def flushLoggedErrors(self, *errorTypes):
890
Remove stored errors received from the log.
892
C{TestCase} stores each error logged during the run of the test and
893
reports them as errors during the cleanup phase (after C{tearDown}).
895
@param *errorTypes: If unspecifed, flush all errors. Otherwise, only
896
flush errors that match the given types.
898
@return: A list of failures that have been removed.
900
return self._observer.flushErrors(*errorTypes)
903
def flushWarnings(self, offendingFunctions=None):
905
Remove stored warnings from the list of captured warnings and return
908
@param offendingFunctions: If C{None}, all warnings issued during the
909
currently running test will be flushed. Otherwise, only warnings
910
which I{point} to a function included in this list will be flushed.
911
All warnings include a filename and source line number; if these
912
parts of a warning point to a source line which is part of a
913
function, then the warning I{points} to that function.
914
@type offendingFunctions: L{NoneType} or L{list} of functions or methods.
916
@raise ValueError: If C{offendingFunctions} is not C{None} and includes
917
an object which is not a L{FunctionType} or L{MethodType} instance.
919
@return: A C{list}, each element of which is a C{dict} giving
920
information about one warning which was flushed by this call. The
921
keys of each C{dict} are:
923
- C{'message'}: The string which was passed as the I{message}
924
parameter to L{warnings.warn}.
926
- C{'category'}: The warning subclass which was passed as the
927
I{category} parameter to L{warnings.warn}.
929
- C{'filename'}: The name of the file containing the definition
930
of the code object which was C{stacklevel} frames above the
931
call to L{warnings.warn}, where C{stacklevel} is the value of
932
the C{stacklevel} parameter passed to L{warnings.warn}.
934
- C{'lineno'}: The source line associated with the active
935
instruction of the code object object which was C{stacklevel}
936
frames above the call to L{warnings.warn}, where
937
C{stacklevel} is the value of the C{stacklevel} parameter
938
passed to L{warnings.warn}.
940
if offendingFunctions is None:
941
toFlush = self._warnings[:]
942
self._warnings[:] = []
945
for aWarning in self._warnings:
946
for aFunction in offendingFunctions:
947
if not isinstance(aFunction, (
948
types.FunctionType, types.MethodType)):
949
raise ValueError("%r is not a function or method" % (
952
# inspect.getabsfile(aFunction) sometimes returns a
953
# filename which disagrees with the filename the warning
954
# system generates. This seems to be because a
955
# function's code object doesn't deal with source files
956
# being renamed. inspect.getabsfile(module) seems
957
# better (or at least agrees with the warning system
958
# more often), and does some normalization for us which
959
# is desirable. inspect.getmodule() is attractive, but
960
# somewhat broken in Python 2.3. See Python bug 4845.
961
aModule = sys.modules[aFunction.__module__]
962
filename = inspect.getabsfile(aModule)
964
if filename != os.path.normcase(aWarning.filename):
966
lineStarts = list(_findlinestarts(aFunction.func_code))
967
first = lineStarts[0][1]
968
last = lineStarts[-1][1]
969
if not (first <= aWarning.lineno <= last):
971
# The warning points to this function, flush it and move on
972
# to the next warning.
973
toFlush.append(aWarning)
975
# Remove everything which is being flushed.
976
map(self._warnings.remove, toFlush)
979
{'message': w.message, 'category': w.category,
980
'filename': w.filename, 'lineno': w.lineno}
984
def addCleanup(self, f, *args, **kwargs):
986
Add the given function to a list of functions to be called after the
987
test has run, but before C{tearDown}.
989
Functions will be run in reverse order of being added. This helps
990
ensure that tear down complements set up.
992
The function C{f} may return a Deferred. If so, C{TestCase} will wait
993
until the Deferred has fired before proceeding to the next function.
995
self._cleanups.append((f, args, kwargs))
998
def callDeprecated(self, version, f, *args, **kwargs):
1000
Call a function that was deprecated at a specific version.
1002
@param version: The version that the function was deprecated in.
1003
@param f: The deprecated function to call.
1004
@return: Whatever the function returns.
1006
result = f(*args, **kwargs)
1007
warningsShown = self.flushWarnings([self.callDeprecated])
1009
if len(warningsShown) == 0:
1010
self.fail('%r is not deprecated.' % (f,))
1012
observedWarning = warningsShown[0]['message']
1013
expectedWarning = getDeprecationWarningString(f, version)
1014
self.assertEqual(expectedWarning, observedWarning)
1019
def _runCleanups(self):
1021
Run the cleanups added with L{addCleanup} in order.
1023
@return: A C{Deferred} that fires when all cleanups are run.
1025
def _makeFunction(f, args, kwargs):
1026
return lambda: f(*args, **kwargs)
1028
while len(self._cleanups) > 0:
1029
f, args, kwargs = self._cleanups.pop()
1030
callables.append(_makeFunction(f, args, kwargs))
1031
return util._runSequentially(callables)
1034
def patch(self, obj, attribute, value):
1036
Monkey patch an object for the duration of the test.
1038
The monkey patch will be reverted at the end of the test using the
1039
L{addCleanup} mechanism.
1041
The L{MonkeyPatcher} is returned so that users can restore and
1042
re-apply the monkey patch within their tests.
1044
@param obj: The object to monkey patch.
1045
@param attribute: The name of the attribute to change.
1046
@param value: The value to set the attribute to.
1047
@return: A L{monkey.MonkeyPatcher} object.
1049
monkeyPatch = monkey.MonkeyPatcher((obj, attribute, value))
1051
self.addCleanup(monkeyPatch.restore)
1057
If no C{methodName} argument is passed to the constructor, L{run} will
1058
treat this method as the thing with the actual test inside.
1062
def run(self, result):
1064
Run the test case, storing the results in C{result}.
1066
First runs C{setUp} on self, then runs the test method (defined in the
1067
constructor), then runs C{tearDown}. Any of these may return
1068
L{Deferred}s. After they complete, does some reactor cleanup.
1070
@param result: A L{TestResult} object.
1072
log.msg("--> %s <--" % (self.id()))
1073
from twisted.internet import reactor
1074
new_result = itrial.IReporter(result, None)
1075
if new_result is None:
1076
result = PyUnitResultAdapter(result)
1079
self._timedOut = False
1080
result.startTest(self)
1081
if self.getSkip(): # don't run test methods that are marked as .skip
1082
result.addSkip(self, self.getSkip())
1083
result.stopTest(self)
1085
self._installObserver()
1087
# All the code inside runThunk will be run such that warnings emitted
1088
# by it will be collected and retrievable by flushWarnings.
1090
self._passed = False
1091
self._deprecateReactor(reactor)
1093
d = self.deferSetUp(None, result)
1097
self._cleanUp(result)
1098
self._classCleanUp(result)
1100
self._undeprecateReactor(reactor)
1103
_collectWarnings(self._warnings.append, runThunk)
1105
# Any collected warnings which the test method didn't flush get
1106
# re-emitted so they'll be logged or show up on stdout or whatever.
1107
for w in self.flushWarnings():
1109
warnings.warn_explicit(**w)
1111
result.addError(self, failure.Failure())
1113
result.stopTest(self)
1116
def _getReason(self, f):
1117
if len(f.value.args) > 0:
1118
reason = f.value.args[0]
1120
warnings.warn(("Do not raise unittest.SkipTest with no "
1121
"arguments! Give a reason for skipping tests!"),
1128
Return the skip reason set on this test, if any is set. Checks on the
1129
instance first, then the class, then the module, then packages. As
1130
soon as it finds something with a C{skip} attribute, returns that.
1131
Returns C{None} if it cannot find anything. See L{TestCase} docstring
1134
return util.acquireAttribute(self._parents, 'skip', None)
1138
Return a L{Todo} object if the test is marked todo. Checks on the
1139
instance first, then the class, then the module, then packages. As
1140
soon as it finds something with a C{todo} attribute, returns that.
1141
Returns C{None} if it cannot find anything. See L{TestCase} docstring
1144
todo = util.acquireAttribute(self._parents, 'todo', None)
1147
return makeTodo(todo)
1149
def getTimeout(self):
1151
Returns the timeout value set on this test. Checks on the instance
1152
first, then the class, then the module, then packages. As soon as it
1153
finds something with a C{timeout} attribute, returns that. Returns
1154
L{util.DEFAULT_TIMEOUT_DURATION} if it cannot find anything. See
1155
L{TestCase} docstring for more details.
1157
timeout = util.acquireAttribute(self._parents, 'timeout',
1158
util.DEFAULT_TIMEOUT_DURATION)
1160
return float(timeout)
1161
except (ValueError, TypeError):
1162
# XXX -- this is here because sometimes people will have methods
1163
# called 'timeout', or set timeout to 'orange', or something
1164
# Particularly, test_news.NewsTestCase and ReactorCoreTestCase
1166
warnings.warn("'timeout' attribute needs to be a number.",
1167
category=DeprecationWarning)
1168
return util.DEFAULT_TIMEOUT_DURATION
1170
def getSuppress(self):
1172
Returns any warning suppressions set for this test. Checks on the
1173
instance first, then the class, then the module, then packages. As
1174
soon as it finds something with a C{suppress} attribute, returns that.
1175
Returns any empty list (i.e. suppress no warnings) if it cannot find
1176
anything. See L{TestCase} docstring for more details.
1178
return util.acquireAttribute(self._parents, 'suppress', [])
1181
def visit(self, visitor):
1183
Visit this test case. Call C{visitor} with C{self} as a parameter.
1185
Deprecated in Twisted 8.0.
1187
@param visitor: A callable which expects a single parameter: a test
1192
warnings.warn("Test visitors deprecated in Twisted 8.0",
1193
category=DeprecationWarning)
1198
"""Returns a unique name that may be used as either a temporary
1199
directory or filename.
1201
@note: you must call os.mkdir on the value returned from this
1202
method if you wish to use it as a directory!
1204
MAX_FILENAME = 32 # some platforms limit lengths of filenames
1205
base = os.path.join(self.__class__.__module__[:MAX_FILENAME],
1206
self.__class__.__name__[:MAX_FILENAME],
1207
self._testMethodName[:MAX_FILENAME])
1208
if not os.path.exists(base):
1210
dirname = tempfile.mkdtemp('', '', base)
1211
return os.path.join(dirname, 'temp')
1213
def _wait(self, d, running=_wait_is_running):
1214
"""Take a Deferred that only ever callbacks. Block until it happens.
1216
from twisted.internet import reactor
1218
raise RuntimeError("_wait is not reentrant")
1222
if results is not None:
1225
if results is not None:
1227
crash = utils.suppressWarnings(
1228
crash, util.suppress(message=r'reactor\.crash cannot be used.*',
1229
category=DeprecationWarning))
1232
stop = utils.suppressWarnings(
1233
stop, util.suppress(message=r'reactor\.crash cannot be used.*',
1234
category=DeprecationWarning))
1236
running.append(None)
1240
# d might have already been fired, in which case append is
1241
# called synchronously. Avoid any reactor stuff.
1250
# If the reactor was crashed elsewhere due to a timeout, hopefully
1251
# that crasher also reported an error. Just return.
1252
# _timedOut is most likely to be set when d has fired but hasn't
1253
# completed its callback chain (see self._run)
1254
if results or self._timedOut: #defined in run() and _run()
1257
# If the timeout didn't happen, and we didn't get a result or
1258
# a failure, then the user probably aborted the test, so let's
1259
# just raise KeyboardInterrupt.
1261
# FIXME: imagine this:
1262
# web/test/test_webclient.py:
1263
# exc = self.assertRaises(error.Error, wait, method(url))
1265
# wait() will raise KeyboardInterrupt, and assertRaises will
1266
# swallow it. Therefore, wait() raising KeyboardInterrupt is
1267
# insufficient to stop trial. A suggested solution is to have
1268
# this code set a "stop trial" flag, or otherwise notify trial
1269
# that it should really try to stop as soon as possible.
1270
raise KeyboardInterrupt()
1276
class UnsupportedTrialFeature(Exception):
1277
"""A feature of twisted.trial was used that pyunit cannot support."""
1281
class PyUnitResultAdapter(object):
1283
Wrap a C{TestResult} from the standard library's C{unittest} so that it
1284
supports the extended result types from Trial, and also supports
1285
L{twisted.python.failure.Failure}s being passed to L{addError} and
1289
def __init__(self, original):
1291
@param original: A C{TestResult} instance from C{unittest}.
1293
self.original = original
1295
def _exc_info(self, err):
1296
return util.excInfoOrFailureToExcInfo(err)
1298
def startTest(self, method):
1299
self.original.startTest(method)
1301
def stopTest(self, method):
1302
self.original.stopTest(method)
1304
def addFailure(self, test, fail):
1305
self.original.addFailure(test, self._exc_info(fail))
1307
def addError(self, test, error):
1308
self.original.addError(test, self._exc_info(error))
1310
def _unsupported(self, test, feature, info):
1311
self.original.addFailure(
1313
(UnsupportedTrialFeature,
1314
UnsupportedTrialFeature(feature, info),
1317
def addSkip(self, test, reason):
1319
Report the skip as a failure.
1321
self._unsupported(test, 'skip', reason)
1323
def addUnexpectedSuccess(self, test, todo):
1325
Report the unexpected success as a failure.
1327
self._unsupported(test, 'unexpected success', todo)
1329
def addExpectedFailure(self, test, error):
1331
Report the expected failure (i.e. todo) as a failure.
1333
self._unsupported(test, 'expected failure', error)
1335
def addSuccess(self, test):
1336
self.original.addSuccess(test)
1338
def upDownError(self, method, error, warn, printStatus):
1343
def suiteVisit(suite, visitor):
1345
Visit each test in C{suite} with C{visitor}.
1347
Deprecated in Twisted 8.0.
1349
@param visitor: A callable which takes a single argument, the L{TestCase}
1353
warnings.warn("Test visitors deprecated in Twisted 8.0",
1354
category=DeprecationWarning)
1355
for case in suite._tests:
1356
visit = getattr(case, 'visit', None)
1357
if visit is not None:
1359
elif isinstance(case, pyunit.TestCase):
1360
case = itrial.ITestCase(case)
1362
elif isinstance(case, pyunit.TestSuite):
1363
suiteVisit(case, visitor)
1369
class TestSuite(pyunit.TestSuite):
1371
Extend the standard library's C{TestSuite} with support for the visitor
1372
pattern and a consistently overrideable C{run} method.
1377
def __call__(self, result):
1378
return self.run(result)
1381
def run(self, result):
1383
Call C{run} on every member of the suite.
1385
# we implement this because Python 2.3 unittest defines this code
1386
# in __call__, whereas 2.4 defines the code in run.
1387
for test in self._tests:
1388
if result.shouldStop:
1395
class TestDecorator(components.proxyForInterface(itrial.ITestCase,
1398
Decorator for test cases.
1400
@param _originalTest: The wrapped instance of test.
1401
@type _originalTest: A provider of L{itrial.ITestCase}
1404
implements(itrial.ITestCase)
1407
def __call__(self, result):
1411
@param result: A TestResult object.
1413
return self.run(result)
1416
def run(self, result):
1420
@param result: A TestResult object.
1422
return self._originalTest.run(
1423
reporter._AdaptedReporter(result, self.__class__))
1427
def _clearSuite(suite):
1429
Clear all tests from C{suite}.
1431
This messes with the internals of C{suite}. In particular, it assumes that
1432
the suite keeps all of its tests in a list in an instance variable called
1438
def decorate(test, decorator):
1440
Decorate all test cases in C{test} with C{decorator}.
1442
C{test} can be a test case or a test suite. If it is a test suite, then the
1443
structure of the suite is preserved.
1445
L{decorate} tries to preserve the class of the test suites it finds, but
1446
assumes the presence of the C{_tests} attribute on the suite.
1448
@param test: The C{TestCase} or C{TestSuite} to decorate.
1450
@param decorator: A unary callable used to decorate C{TestCase}s.
1452
@return: A decorated C{TestCase} or a C{TestSuite} containing decorated
1459
return decorator(test)
1461
# At this point, we know that 'test' is a test suite.
1465
test.addTest(decorate(case, decorator))
1470
class _PyUnitTestCaseAdapter(TestDecorator):
1472
Adapt from pyunit.TestCase to ITestCase.
1476
def visit(self, visitor):
1478
Deprecated in Twisted 8.0.
1480
warnings.warn("Test visitors deprecated in Twisted 8.0",
1481
category=DeprecationWarning)
1486
class _BrokenIDTestCaseAdapter(_PyUnitTestCaseAdapter):
1488
Adapter for pyunit-style C{TestCase} subclasses that have undesirable id()
1489
methods. That is L{pyunit.FunctionTestCase} and L{pyunit.DocTestCase}.
1494
Return the fully-qualified Python name of the doctest.
1496
testID = self._originalTest.shortDescription()
1497
if testID is not None:
1499
return self._originalTest.id()
1503
class _ForceGarbageCollectionDecorator(TestDecorator):
1505
Forces garbage collection to be run before and after the test. Any errors
1506
logged during the post-test collection are added to the test result as
1510
def run(self, result):
1512
TestDecorator.run(self, result)
1515
for error in _logObserver.getErrors():
1516
result.addError(self, error)
1517
_logObserver.flushErrors()
1518
_logObserver._remove()
1521
components.registerAdapter(
1522
_PyUnitTestCaseAdapter, pyunit.TestCase, itrial.ITestCase)
1525
components.registerAdapter(
1526
_BrokenIDTestCaseAdapter, pyunit.FunctionTestCase, itrial.ITestCase)
1529
_docTestCase = getattr(doctest, 'DocTestCase', None)
1531
components.registerAdapter(
1532
_BrokenIDTestCaseAdapter, _docTestCase, itrial.ITestCase)
1535
def _iterateTests(testSuiteOrCase):
1537
Iterate through all of the test cases in C{testSuiteOrCase}.
1540
suite = iter(testSuiteOrCase)
1542
yield testSuiteOrCase
1545
for subtest in _iterateTests(test):
1550
# Support for Python 2.3
1552
iter(pyunit.TestSuite())
1554
# Python 2.3's TestSuite doesn't support iteration. Let's monkey patch it!
1556
return iter(self._tests)
1557
pyunit.TestSuite.__iter__ = __iter__
1561
class _SubTestCase(TestCase):
1563
TestCase.__init__(self, 'run')
1565
_inst = _SubTestCase()
1567
def _deprecate(name):
1569
Internal method used to deprecate top-level assertions. Do not use this.
1571
def _(*args, **kwargs):
1572
warnings.warn("unittest.%s is deprecated. Instead use the %r "
1573
"method on unittest.TestCase" % (name, name),
1574
stacklevel=2, category=DeprecationWarning)
1575
return getattr(_inst, name)(*args, **kwargs)
1579
_assertions = ['fail', 'failUnlessEqual', 'failIfEqual', 'failIfEquals',
1580
'failUnless', 'failUnlessIdentical', 'failUnlessIn',
1581
'failIfIdentical', 'failIfIn', 'failIf',
1582
'failUnlessAlmostEqual', 'failIfAlmostEqual',
1583
'failUnlessRaises', 'assertApproximates',
1584
'assertFailure', 'failUnlessSubstring', 'failIfSubstring',
1585
'assertAlmostEqual', 'assertAlmostEquals',
1586
'assertNotAlmostEqual', 'assertNotAlmostEquals', 'assertEqual',
1587
'assertEquals', 'assertNotEqual', 'assertNotEquals',
1588
'assertRaises', 'assert_', 'assertIdentical',
1589
'assertNotIdentical', 'assertIn', 'assertNotIn',
1590
'failUnlessFailure', 'assertSubstring', 'assertNotSubstring']
1593
for methodName in _assertions:
1594
globals()[methodName] = _deprecate(methodName)
1597
__all__ = ['TestCase', 'FailTest', 'SkipTest']