29
30
"""Raised to indicate the current test has failed to pass."""
32
class TestCase(pyunit.TestCase, object):
33
zi.implements(itrial.ITestCase)
34
failureException = FailTest
36
def __init__(self, methodName=None):
37
super(TestCase, self).__init__(methodName)
38
self._testMethodName = methodName
39
testMethod = getattr(self, methodName)
40
self._parents = [testMethod, self]
41
self._parents.extend(util.getPythonContainers(testMethod))
42
self._prepareClassFixture()
43
self.startTime = self.endTime = 0 # XXX - this is a kludge
45
def _prepareClassFixture(self):
46
"""Lots of tests assume that test methods all run in the same instance
47
of TestCase. This isn't true. Calling this method ensures that
48
self.__class__._testCaseInstance contains an instance of this class
49
that will remain the same for all tests from this class.
51
if not hasattr(self.__class__, '_testCaseInstance'):
52
self.__class__._testCaseInstance = self
53
if self.__class__._testCaseInstance.__class__ != self.__class__:
54
self.__class__._testCaseInstance = self
56
def _sharedTestCase(self):
57
return self.__class__._testCaseInstance
60
# only overriding this because Python 2.2's unittest has a broken
62
return "%s.%s" % (reflect.qual(self.__class__), self._testMethodName)
64
def shortDescription(self):
65
desc = super(TestCase, self).shortDescription()
67
return self._testMethodName
70
def __call__(self, *args, **kwargs):
71
return self.run(*args, **kwargs)
73
def run(self, reporter):
74
log.msg("--> %s <--" % (self.id()))
75
_signalStateMgr = util.SignalStateManager()
76
_signalStateMgr.save()
77
janitor = util._Janitor()
78
testCase = self._sharedTestCase()
79
testMethod = getattr(testCase, self._testMethodName)
81
reporter.startTest(self)
82
if self.getSkip(): # don't run test methods that are marked as .skip
83
reporter.addSkip(self, self.getSkip())
84
reporter.stopTest(self)
87
setUp = util.UserMethodWrapper(testCase.setUp,
88
suppress=self.getSuppress())
91
except util.UserMethodError:
92
for error in setUp.errors:
93
self._eb(error, reporter)
95
reporter.upDownError(setUp, warn=False,
99
orig = util.UserMethodWrapper(testMethod,
101
timeout=self.getTimeout(),
102
suppress=self.getSuppress())
103
orig.errorHook = lambda x : self._eb(x, reporter)
105
self.startTime = time.time()
107
if (self.getTodo() is not None
108
and len(orig.errors) == 0):
109
reporter.addUnexpectedSuccess(self, self.getTodo())
111
self.endTime = time.time()
112
um = util.UserMethodWrapper(testCase.tearDown,
113
suppress=self.getSuppress())
116
except util.UserMethodError:
117
for error in um.errors:
118
self._eb(error, reporter)
120
reporter.upDownError(um, warn=False)
123
janitor.postMethodCleanup()
124
except util.MultiError, e:
126
self._eb(f, reporter)
127
reporter.stopTest(self)
128
_signalStateMgr.restore()
130
def _eb(self, f, reporter):
131
log.msg(f.getTraceback())
132
if self.getTodo() is not None:
133
if self._todoExpected(f):
134
reporter.addExpectedFailure(self, f, self._getTodoMessage())
136
if f.check(util.DirtyReactorWarning):
137
reporter.cleanupErrors(f)
138
elif f.check(self.failureException, FailTest):
139
reporter.addFailure(self, f)
140
elif f.check(KeyboardInterrupt):
141
reporter.shouldStop = True
142
reporter.addError(self, f)
143
elif f.check(SkipTest):
144
if len(f.value.args) > 0:
145
reason = f.value.args[0]
147
warnings.warn(("Do not raise unittest.SkipTest with no "
148
"arguments! Give a reason for skipping tests!"),
152
reporter.addSkip(self, reason)
154
reporter.addError(self, f)
157
return util.acquireAttribute(self._parents, 'skip', None)
160
return util.acquireAttribute(self._parents, 'todo', None)
162
def _todoExpected(self, failure):
163
todo = self.getTodo()
166
if isinstance(todo, str):
34
def __init__(self, reason, errors=None):
39
return "<Todo reason=%r errors=%r>" % (self.reason, self.errors)
41
def expected(self, failure):
42
if self.errors is None:
168
elif isinstance(todo, tuple):
170
expected = list(todo[0])
173
for error in expected:
174
if failure.check(error):
178
raise ValueError('%r is not a valid .todo attribute' % (todo,))
180
def _getTodoMessage(self):
181
todo = self.getTodo()
182
if todo is None or isinstance(todo, str):
184
elif isinstance(todo, tuple):
187
raise ValueError("%r is not a valid .todo attribute" % (todo,))
189
def getSuppress(self):
190
return util.acquireAttribute(self._parents, 'suppress', None)
192
def getTimeout(self):
193
return util.acquireAttribute(self._parents, 'timeout', None)
195
def visit(self, visitor):
196
"""Call visitor.visitCase(self)."""
197
visitor.visitCase(self)
44
for error in self.errors:
45
if failure.check(error):
51
if isinstance(value, str):
52
return Todo(reason=value)
53
if isinstance(value, tuple):
54
errors, reason = value
59
return Todo(reason=reason, errors=errors)
62
class _Assertions(pyunit.TestCase, object):
199
63
def fail(self, msg=None):
200
64
"""absolutely fails the test, do not pass go, do not collect $200
387
251
assertFailure = failUnlessFailure
389
253
def failUnlessSubstring(self, substring, astring, msg=None):
390
"""a python2.2 friendly test to assert that substring is found in astring
391
parameters follow the semantics of failUnlessIn
393
if astring.find(substring) == -1:
394
raise self.failureException(msg or "%r not found in %r"
395
% (substring, astring))
254
return self.failUnlessIn(substring, astring, msg)
397
255
assertSubstring = failUnlessSubstring
399
257
def failIfSubstring(self, substring, astring, msg=None):
400
"""a python2.2 friendly test to assert that substring is not found in
401
astring parameters follow the semantics of failUnlessIn
403
if astring.find(substring) != -1:
404
raise self.failureException(msg or "%r found in %r"
405
% (substring, astring))
258
return self.failIfIn(substring, astring, msg)
407
259
assertNotSubstring = failIfSubstring
262
class TestCase(_Assertions):
263
zi.implements(itrial.ITestCase)
264
failureException = FailTest
266
def __init__(self, methodName=None):
267
super(TestCase, self).__init__(methodName)
268
self._testMethodName = methodName
269
testMethod = getattr(self, methodName)
270
self._parents = [testMethod, self]
271
self._parents.extend(util.getPythonContainers(testMethod))
272
self._shared = (hasattr(self, 'setUpClass') or
273
hasattr(self, 'tearDownClass'))
275
self._prepareClassFixture()
276
if not hasattr(self.__class__, '_instances'):
277
self._initInstances()
278
self.__class__._instances.add(self)
281
def _initInstances(cls):
282
cls._instances = sets.Set()
283
cls._instancesRun = sets.Set()
284
_initInstances = classmethod(_initInstances)
287
return len(self.__class__._instancesRun) == 0
290
return self.__class__._instancesRun == self.__class__._instances
292
def _prepareClassFixture(self):
293
"""Lots of tests assume that test methods all run in the same instance
294
of TestCase. This isn't true. Calling this method ensures that
295
self.__class__._testCaseInstance contains an instance of this class
296
that will remain the same for all tests from this class.
298
if not hasattr(self.__class__, '_testCaseInstance'):
299
self.__class__._testCaseInstance = self
300
if self.__class__._testCaseInstance.__class__ != self.__class__:
301
self.__class__._testCaseInstance = self
303
def _run(self, methodName, result):
304
from twisted.internet import reactor
305
timeout = self.getTimeout()
307
e = defer.TimeoutError("%r (%s) still running at %s secs"
308
% (self, methodName, timeout))
309
f = failure.Failure(e)
310
# try to errback the deferred that the test returns (for no gorram
311
# reason) (see issue1005 and test_errorPropagation in test_deferred)
314
except defer.AlreadyCalledError:
315
# if the deferred has been called already but the *back chain is
316
# still unfinished, crash the reactor and report timeout error
319
self._timedOut = True # see self._wait
320
todo = self.getTodo()
321
if todo is not None and todo.expected(f):
322
result.addExpectedFailure(self, f, todo)
324
result.addError(self, f)
326
test = self.__class__._testCaseInstance
329
method = getattr(test, methodName)
330
d = defer.maybeDeferred(utils.runWithWarningsSuppressed,
331
self.getSuppress(), method)
332
call = reactor.callLater(timeout, onTimeout, d)
333
d.addBoth(lambda x : call.active() and call.cancel() or x)
336
def shortDescription(self):
337
desc = super(TestCase, self).shortDescription()
339
return self._testMethodName
342
def __call__(self, *args, **kwargs):
343
return self.run(*args, **kwargs)
345
def deferSetUpClass(self, result):
346
if not hasattr(self, 'setUpClass'):
347
d = defer.succeed(None)
348
d.addCallback(self.deferSetUp, result)
350
d = self._run('setUpClass', result)
351
d.addCallbacks(self.deferSetUp, self._ebDeferSetUpClass,
352
callbackArgs=(result,),
353
errbackArgs=(result,))
356
def _ebDeferSetUpClass(self, error, result):
357
if error.check(SkipTest):
358
result.addSkip(self, self._getReason(error))
359
self.__class__._instancesRun.remove(self)
360
elif error.check(KeyboardInterrupt):
363
result.upDownError('setUpClass', error, warn=True,
365
result.addError(self, error)
366
self.__class__._instancesRun.remove(self)
368
def deferSetUp(self, ignored, result):
369
d = self._run('setUp', result)
370
d.addCallbacks(self.deferTestMethod, self._ebDeferSetUp,
371
callbackArgs=(result,),
372
errbackArgs=(result,))
375
def _ebDeferSetUp(self, failure, result):
376
if failure.check(SkipTest):
377
result.addSkip(self, self._getReason(failure))
379
result.addError(self, failure)
380
result.upDownError('setUp', failure, warn=False, printStatus=False)
381
if failure.check(KeyboardInterrupt):
384
def deferTestMethod(self, ignored, result):
385
d = self._run(self._testMethodName, result)
386
d.addCallbacks(self._cbDeferTestMethod, self._ebDeferTestMethod,
387
callbackArgs=(result,),
388
errbackArgs=(result,))
389
d.addBoth(self.deferTearDown, result)
390
if self._shared and hasattr(self, 'tearDownClass') and self._isLast():
391
d.addBoth(self.deferTearDownClass, result)
394
def _cbDeferTestMethod(self, ignored, result):
395
if self.getTodo() is not None:
396
result.addUnexpectedSuccess(self, self.getTodo())
401
def _ebDeferTestMethod(self, f, result):
402
todo = self.getTodo()
403
if todo is not None and todo.expected(f):
404
result.addExpectedFailure(self, f, todo)
405
elif f.check(self.failureException, FailTest):
406
result.addFailure(self, f)
407
elif f.check(KeyboardInterrupt):
408
result.addError(self, f)
410
elif f.check(SkipTest):
411
result.addSkip(self, self._getReason(f))
413
result.addError(self, f)
415
def deferTearDown(self, ignored, result):
416
d = self._run('tearDown', result)
417
d.addErrback(self._ebDeferTearDown, result)
420
def _ebDeferTearDown(self, failure, result):
421
result.addError(self, failure)
422
if failure.check(KeyboardInterrupt):
424
result.upDownError('tearDown', failure, warn=False, printStatus=True)
427
def deferTearDownClass(self, ignored, result):
428
d = self._run('tearDownClass', result)
429
d.addErrback(self._ebTearDownClass, result)
432
def _ebTearDownClass(self, error, result):
433
if error.check(KeyboardInterrupt):
435
result.upDownError('tearDownClass', error, warn=True, printStatus=True)
437
def _cleanUp(self, result):
439
util._Janitor().postCaseCleanup()
440
except util.FailureError, e:
441
result.addError(self, e.original)
444
result.cleanupErrors(failure.Failure())
447
result.addSuccess(self)
449
def _classCleanUp(self, result):
451
util._Janitor().postClassCleanup()
452
except util.FailureError, e:
453
result.cleanupErrors(e.original)
455
result.cleanupErrors(failure.Failure())
457
def run(self, result):
458
log.msg("--> %s <--" % (self.id()))
459
self._timedOut = False
460
if self._shared and self not in self.__class__._instances:
461
self.__class__._instances.add(self)
462
result.startTest(self)
463
if self.getSkip(): # don't run test methods that are marked as .skip
464
result.addSkip(self, self.getSkip())
465
result.stopTest(self)
470
first = self._isFirst()
471
self.__class__._instancesRun.add(self)
473
d = self.deferSetUpClass(result)
475
d = self.deferSetUp(None, result)
479
self._cleanUp(result)
480
result.stopTest(self)
481
if self._shared and self._isLast():
482
self._initInstances()
483
self._classCleanUp(result)
485
self._classCleanUp(result)
487
def _getReason(self, f):
488
if len(f.value.args) > 0:
489
reason = f.value.args[0]
491
warnings.warn(("Do not raise unittest.SkipTest with no "
492
"arguments! Give a reason for skipping tests!"),
498
return util.acquireAttribute(self._parents, 'skip', None)
501
todo = util.acquireAttribute(self._parents, 'todo', None)
504
return makeTodo(todo)
506
def getTimeout(self):
507
timeout = util.acquireAttribute(self._parents, 'timeout',
508
util.DEFAULT_TIMEOUT_DURATION)
510
return float(timeout)
511
except (ValueError, TypeError):
512
# XXX -- this is here because sometimes people will have methods
513
# called 'timeout', or set timeout to 'orange', or something
514
# Particularly, test_news.NewsTestCase and ReactorCoreTestCase
516
warnings.warn("'timeout' attribute needs to be a number.",
517
category=DeprecationWarning)
518
return util.DEFAULT_TIMEOUT_DURATION
520
def getSuppress(self):
521
return util.acquireAttribute(self._parents, 'suppress', [])
523
def visit(self, visitor):
524
"""Call visitor.visitCase(self)."""
525
visitor.visitCase(self)
409
527
def mktemp(self):
410
528
"""will return a unique name that may be used as either a temporary
411
529
directory or filename
412
530
@note: you must call os.mkdir on the value returned from this
413
531
method if you wish to use it as a directory!
415
# FIXME: when we drop support for python 2.2 and start to require 2.3,
416
# we should ditch most of this cruft and just call
419
base = os.path.join(cls.__module__, cls.__name__,
420
self._testMethodName[:32])
533
MAX_FILENAME = 32 # some platforms limit lengths of filenames
534
base = os.path.join(self.__class__.__module__[:MAX_FILENAME],
535
self.__class__.__name__[:MAX_FILENAME],
536
self._testMethodName[:MAX_FILENAME])
537
if not os.path.exists(base):
422
538
os.makedirs(base)
425
if code == errno.EEXIST:
431
num = self._mktGetCounter(base)
432
name = os.path.join(base, "%s.%s" % (pid, num))
433
if not os.path.exists(name):
437
# mktemp helper to increment a counter
438
def _mktGetCounter(self, base):
439
if getattr(self, "_mktCounters", None) is None:
440
self._mktCounters = {}
441
if base not in self._mktCounters:
442
self._mktCounters[base] = 2
444
n = self._mktCounters[base]
445
self._mktCounters[base] += 1
448
def runReactor(self, timesOrSeconds, seconds=False):
449
"""DEPRECATED: just return a deferred from your test method and
450
trial with do the Right Thing. Alternatively, call
451
twisted.trial.util.wait to block until the deferred fires.
453
I'll iterate the reactor for a while.
455
You probably want to use expectedAssertions with this.
457
@type timesOrSeconds: int
458
@param timesOrSeconds: Either the number of iterations to run,
459
or, if `seconds' is True, the number of seconds to run for.
462
@param seconds: If this is True, `timesOrSeconds' will be
463
interpreted as seconds, rather than iterations.
539
dirname = tempfile.mkdtemp('', '', base)
540
return os.path.join(dirname, 'temp')
542
def _wait(self, d, running=util._wait_is_running):
543
"""Take a Deferred that only ever callbacks. Block until it happens.
465
warnings.warn("runReactor is deprecated. return a deferred from "
466
"your test method, and trial will wait for results."
467
"Alternatively, call twisted.trial.util.wait to"
468
"block until the deferred fires.",
469
DeprecationWarning, stacklevel=2)
470
545
from twisted.internet import reactor
473
reactor.callLater(timesOrSeconds, reactor.crash)
477
for i in xrange(timesOrSeconds):
547
raise WaitIsNotReentrantError, REENTRANT_WAIT_ERROR_MSG
551
if results is not None:
554
if results is not None:
563
# d might have already been fired, in which case append is called
564
# synchronously. Avoid any reactor stuff.
573
# If the reactor was crashed elsewhere due to a timeout, hopefully
574
# that crasher also reported an error. Just return.
575
# _timedOut is most likely to be set when d has fired but hasn't
576
# completed its callback chain (see self._run)
577
if results or self._timedOut: #defined in run() and _run()
580
# If the timeout didn't happen, and we didn't get a result or
581
# a failure, then the user probably aborted the test, so let's
582
# just raise KeyboardInterrupt.
584
# FIXME: imagine this:
585
# web/test/test_webclient.py:
586
# exc = self.assertRaises(error.Error, unittest.wait, method(url))
588
# wait() will raise KeyboardInterrupt, and assertRaises will
589
# swallow it. Therefore, wait() raising KeyboardInterrupt is
590
# insufficient to stop trial. A suggested solution is to have
591
# this code set a "stop trial" flag, or otherwise notify trial
592
# that it should really try to stop as soon as possible.
593
raise KeyboardInterrupt()
481
599
class TestVisitor(object):