1
# Copyright (c) 2010 Jonathan M. Lange. See LICENSE for details.
3
"""Tests for the DeferredRunTest single test execution logic."""
8
from testtools import (
12
from testtools.content import (
15
from testtools.helpers import try_import
16
from testtools.tests.helpers import ExtendedTestResult
17
from testtools.matchers import (
23
from testtools.runtest import RunTest
24
from testtools.tests.test_spinner import NeedsTwistedTestCase
26
assert_fails_with = try_import('testtools.deferredruntest.assert_fails_with')
27
AsynchronousDeferredRunTest = try_import(
28
'testtools.deferredruntest.AsynchronousDeferredRunTest')
29
flush_logged_errors = try_import(
30
'testtools.deferredruntest.flush_logged_errors')
31
SynchronousDeferredRunTest = try_import(
32
'testtools.deferredruntest.SynchronousDeferredRunTest')
34
defer = try_import('twisted.internet.defer')
35
failure = try_import('twisted.python.failure')
36
log = try_import('twisted.python.log')
37
DelayedCall = try_import('twisted.internet.base.DelayedCall')
41
"""Tests that we run as part of our tests, nested to avoid discovery."""
45
super(X.Base, self).setUp()
46
self.calls = ['setUp']
47
self.addCleanup(self.calls.append, 'clean-up')
48
def test_something(self):
49
self.calls.append('test')
51
self.calls.append('tearDown')
52
super(X.Base, self).tearDown()
54
class ErrorInSetup(Base):
55
expected_calls = ['setUp', 'clean-up']
56
expected_results = [('addError', RuntimeError)]
58
super(X.ErrorInSetup, self).setUp()
59
raise RuntimeError("Error in setUp")
61
class ErrorInTest(Base):
62
expected_calls = ['setUp', 'tearDown', 'clean-up']
63
expected_results = [('addError', RuntimeError)]
64
def test_something(self):
65
raise RuntimeError("Error in test")
67
class FailureInTest(Base):
68
expected_calls = ['setUp', 'tearDown', 'clean-up']
69
expected_results = [('addFailure', AssertionError)]
70
def test_something(self):
71
self.fail("test failed")
73
class ErrorInTearDown(Base):
74
expected_calls = ['setUp', 'test', 'clean-up']
75
expected_results = [('addError', RuntimeError)]
77
raise RuntimeError("Error in tearDown")
79
class ErrorInCleanup(Base):
80
expected_calls = ['setUp', 'test', 'tearDown', 'clean-up']
81
expected_results = [('addError', ZeroDivisionError)]
82
def test_something(self):
83
self.calls.append('test')
84
self.addCleanup(lambda: 1/0)
86
class TestIntegration(NeedsTwistedTestCase):
88
def assertResultsMatch(self, test, result):
89
events = list(result._events)
90
self.assertEqual(('startTest', test), events.pop(0))
91
for expected_result in test.expected_results:
92
result = events.pop(0)
93
if len(expected_result) == 1:
94
self.assertEqual((expected_result[0], test), result)
96
self.assertEqual((expected_result[0], test), result[:2])
97
error_type = expected_result[1]
98
self.assertIn(error_type.__name__, str(result[2]))
99
self.assertEqual([('stopTest', test)], events)
101
def test_runner(self):
102
result = ExtendedTestResult()
103
test = self.test_factory('test_something', runTest=self.runner)
105
self.assertEqual(test.calls, self.test_factory.expected_calls)
106
self.assertResultsMatch(test, result)
109
def make_integration_tests():
110
from unittest import TestSuite
111
from testtools import clone_test_with_new_id
113
('RunTest', RunTest),
114
('SynchronousDeferredRunTest', SynchronousDeferredRunTest),
115
('AsynchronousDeferredRunTest', AsynchronousDeferredRunTest),
125
base_test = X.TestIntegration('test_runner')
126
integration_tests = []
127
for runner_name, runner in runners:
129
new_test = clone_test_with_new_id(
130
base_test, '%s(%s, %s)' % (
134
new_test.test_factory = test
135
new_test.runner = runner
136
integration_tests.append(new_test)
137
return TestSuite(integration_tests)
140
class TestSynchronousDeferredRunTest(NeedsTwistedTestCase):
142
def make_result(self):
143
return ExtendedTestResult()
145
def make_runner(self, test):
146
return SynchronousDeferredRunTest(test, test.exception_handlers)
148
def test_success(self):
149
class SomeCase(TestCase):
150
def test_success(self):
151
return defer.succeed(None)
152
test = SomeCase('test_success')
153
runner = self.make_runner(test)
154
result = self.make_result()
157
result._events, Equals([
159
('addSuccess', test),
160
('stopTest', test)]))
162
def test_failure(self):
163
class SomeCase(TestCase):
164
def test_failure(self):
165
return defer.maybeDeferred(self.fail, "Egads!")
166
test = SomeCase('test_failure')
167
runner = self.make_runner(test)
168
result = self.make_result()
171
[event[:2] for event in result._events], Equals([
173
('addFailure', test),
174
('stopTest', test)]))
176
def test_setUp_followed_by_test(self):
177
class SomeCase(TestCase):
179
super(SomeCase, self).setUp()
180
return defer.succeed(None)
181
def test_failure(self):
182
return defer.maybeDeferred(self.fail, "Egads!")
183
test = SomeCase('test_failure')
184
runner = self.make_runner(test)
185
result = self.make_result()
188
[event[:2] for event in result._events], Equals([
190
('addFailure', test),
191
('stopTest', test)]))
194
class TestAsynchronousDeferredRunTest(NeedsTwistedTestCase):
196
def make_reactor(self):
197
from twisted.internet import reactor
200
def make_result(self):
201
return ExtendedTestResult()
203
def make_runner(self, test, timeout=None):
205
timeout = self.make_timeout()
206
return AsynchronousDeferredRunTest(
207
test, test.exception_handlers, timeout=timeout)
209
def make_timeout(self):
212
def test_setUp_returns_deferred_that_fires_later(self):
213
# setUp can return a Deferred that might fire at any time.
214
# AsynchronousDeferredRunTest will not go on to running the test until
215
# the Deferred returned by setUp actually fires.
218
d = defer.Deferred().addCallback(call_log.append)
219
class SomeCase(TestCase):
221
super(SomeCase, self).setUp()
222
call_log.append('setUp')
224
def test_something(self):
225
call_log.append('test')
227
self.assertThat(call_log, Equals(['setUp']))
229
test = SomeCase('test_something')
230
timeout = self.make_timeout()
231
runner = self.make_runner(test, timeout=timeout)
232
result = self.make_result()
233
reactor = self.make_reactor()
234
reactor.callLater(timeout, fire_deferred)
236
self.assertThat(call_log, Equals(['setUp', marker, 'test']))
238
def test_calls_setUp_test_tearDown_in_sequence(self):
239
# setUp, the test method and tearDown can all return
240
# Deferreds. AsynchronousDeferredRunTest will make sure that each of
241
# these are run in turn, only going on to the next stage once the
242
# Deferred from the previous stage has fired.
245
a.addCallback(lambda x: call_log.append('a'))
247
b.addCallback(lambda x: call_log.append('b'))
249
c.addCallback(lambda x: call_log.append('c'))
250
class SomeCase(TestCase):
252
super(SomeCase, self).setUp()
253
call_log.append('setUp')
255
def test_success(self):
256
call_log.append('test')
259
super(SomeCase, self).tearDown()
260
call_log.append('tearDown')
262
test = SomeCase('test_success')
263
timeout = self.make_timeout()
264
runner = self.make_runner(test, timeout)
265
result = self.make_result()
266
reactor = self.make_reactor()
268
self.assertThat(call_log, Equals(['setUp']))
271
self.assertThat(call_log, Equals(['setUp', 'a', 'test']))
275
call_log, Equals(['setUp', 'a', 'test', 'b', 'tearDown']))
277
reactor.callLater(timeout * 0.25, fire_a)
278
reactor.callLater(timeout * 0.5, fire_b)
279
reactor.callLater(timeout * 0.75, fire_c)
282
call_log, Equals(['setUp', 'a', 'test', 'b', 'tearDown', 'c']))
284
def test_async_cleanups(self):
285
# Cleanups added with addCleanup can return
286
# Deferreds. AsynchronousDeferredRunTest will run each of them in
288
class SomeCase(TestCase):
289
def test_whatever(self):
291
test = SomeCase('test_whatever')
293
a = defer.Deferred().addCallback(lambda x: call_log.append('a'))
294
b = defer.Deferred().addCallback(lambda x: call_log.append('b'))
295
c = defer.Deferred().addCallback(lambda x: call_log.append('c'))
296
test.addCleanup(lambda: a)
297
test.addCleanup(lambda: b)
298
test.addCleanup(lambda: c)
300
self.assertThat(call_log, Equals([]))
303
self.assertThat(call_log, Equals(['a']))
306
self.assertThat(call_log, Equals(['a', 'b']))
308
timeout = self.make_timeout()
309
reactor = self.make_reactor()
310
reactor.callLater(timeout * 0.25, fire_a)
311
reactor.callLater(timeout * 0.5, fire_b)
312
reactor.callLater(timeout * 0.75, fire_c)
313
runner = self.make_runner(test, timeout)
314
result = self.make_result()
316
self.assertThat(call_log, Equals(['a', 'b', 'c']))
318
def test_clean_reactor(self):
319
# If there's cruft left over in the reactor, the test fails.
320
reactor = self.make_reactor()
321
timeout = self.make_timeout()
322
class SomeCase(TestCase):
323
def test_cruft(self):
324
reactor.callLater(timeout * 10.0, lambda: None)
325
test = SomeCase('test_cruft')
326
runner = self.make_runner(test, timeout)
327
result = self.make_result()
330
[event[:2] for event in result._events],
332
[('startTest', test),
334
('stopTest', test)]))
335
error = result._events[1][2]
336
self.assertThat(error, KeysEqual('traceback', 'twisted-log'))
338
def test_unhandled_error_from_deferred(self):
339
# If there's a Deferred with an unhandled error, the test fails. Each
340
# unhandled error is reported with a separate traceback.
341
class SomeCase(TestCase):
342
def test_cruft(self):
343
# Note we aren't returning the Deferred so that the error will
345
defer.maybeDeferred(lambda: 1/0)
346
defer.maybeDeferred(lambda: 2/0)
347
test = SomeCase('test_cruft')
348
runner = self.make_runner(test)
349
result = self.make_result()
351
error = result._events[1][2]
352
result._events[1] = ('addError', test, None)
353
self.assertThat(result._events, Equals(
354
[('startTest', test),
355
('addError', test, None),
356
('stopTest', test)]))
360
'unhandled-error-in-deferred',
361
'unhandled-error-in-deferred-1',
364
def test_unhandled_error_from_deferred_combined_with_error(self):
365
# If there's a Deferred with an unhandled error, the test fails. Each
366
# unhandled error is reported with a separate traceback, and the error
368
class SomeCase(TestCase):
369
def test_cruft(self):
370
# Note we aren't returning the Deferred so that the error will
372
defer.maybeDeferred(lambda: 1/0)
374
test = SomeCase('test_cruft')
375
runner = self.make_runner(test)
376
result = self.make_result()
378
error = result._events[1][2]
379
result._events[1] = ('addError', test, None)
380
self.assertThat(result._events, Equals(
381
[('startTest', test),
382
('addError', test, None),
383
('stopTest', test)]))
388
'unhandled-error-in-deferred',
391
@skipIf(os.name != "posix", "Sending SIGINT with os.kill is posix only")
392
def test_keyboard_interrupt_stops_test_run(self):
393
# If we get a SIGINT during a test run, the test stops and no more
395
SIGINT = getattr(signal, 'SIGINT', None)
397
raise self.skipTest("SIGINT unavailable")
398
class SomeCase(TestCase):
399
def test_pause(self):
400
return defer.Deferred()
401
test = SomeCase('test_pause')
402
reactor = self.make_reactor()
403
timeout = self.make_timeout()
404
runner = self.make_runner(test, timeout * 5)
405
result = self.make_result()
406
reactor.callLater(timeout, os.kill, os.getpid(), SIGINT)
407
self.assertThat(lambda:runner.run(result),
408
Raises(MatchesException(KeyboardInterrupt)))
410
@skipIf(os.name != "posix", "Sending SIGINT with os.kill is posix only")
411
def test_fast_keyboard_interrupt_stops_test_run(self):
412
# If we get a SIGINT during a test run, the test stops and no more
414
SIGINT = getattr(signal, 'SIGINT', None)
416
raise self.skipTest("SIGINT unavailable")
417
class SomeCase(TestCase):
418
def test_pause(self):
419
return defer.Deferred()
420
test = SomeCase('test_pause')
421
reactor = self.make_reactor()
422
timeout = self.make_timeout()
423
runner = self.make_runner(test, timeout * 5)
424
result = self.make_result()
425
reactor.callWhenRunning(os.kill, os.getpid(), SIGINT)
426
self.assertThat(lambda:runner.run(result),
427
Raises(MatchesException(KeyboardInterrupt)))
429
def test_timeout_causes_test_error(self):
430
# If a test times out, it reports itself as having failed with a
432
class SomeCase(TestCase):
433
def test_pause(self):
434
return defer.Deferred()
435
test = SomeCase('test_pause')
436
runner = self.make_runner(test)
437
result = self.make_result()
439
error = result._events[1][2]
441
[event[:2] for event in result._events], Equals(
442
[('startTest', test),
444
('stopTest', test)]))
445
self.assertIn('TimeoutError', str(error['traceback']))
447
def test_convenient_construction(self):
448
# As a convenience method, AsynchronousDeferredRunTest has a
449
# classmethod that returns an AsynchronousDeferredRunTest
450
# factory. This factory has the same API as the RunTest constructor.
454
factory = AsynchronousDeferredRunTest.make_factory(reactor, timeout)
455
runner = factory(self, [handler])
456
self.assertIs(reactor, runner._reactor)
457
self.assertIs(timeout, runner._timeout)
458
self.assertIs(self, runner.case)
459
self.assertEqual([handler], runner.handlers)
461
def test_use_convenient_factory(self):
462
# Make sure that the factory can actually be used.
463
factory = AsynchronousDeferredRunTest.make_factory()
464
class SomeCase(TestCase):
465
run_tests_with = factory
466
def test_something(self):
468
case = SomeCase('test_something')
471
def test_convenient_construction_default_reactor(self):
472
# As a convenience method, AsynchronousDeferredRunTest has a
473
# classmethod that returns an AsynchronousDeferredRunTest
474
# factory. This factory has the same API as the RunTest constructor.
477
factory = AsynchronousDeferredRunTest.make_factory(reactor=reactor)
478
runner = factory(self, [handler])
479
self.assertIs(reactor, runner._reactor)
480
self.assertIs(self, runner.case)
481
self.assertEqual([handler], runner.handlers)
483
def test_convenient_construction_default_timeout(self):
484
# As a convenience method, AsynchronousDeferredRunTest has a
485
# classmethod that returns an AsynchronousDeferredRunTest
486
# factory. This factory has the same API as the RunTest constructor.
489
factory = AsynchronousDeferredRunTest.make_factory(timeout=timeout)
490
runner = factory(self, [handler])
491
self.assertIs(timeout, runner._timeout)
492
self.assertIs(self, runner.case)
493
self.assertEqual([handler], runner.handlers)
495
def test_convenient_construction_default_debugging(self):
496
# As a convenience method, AsynchronousDeferredRunTest has a
497
# classmethod that returns an AsynchronousDeferredRunTest
498
# factory. This factory has the same API as the RunTest constructor.
500
factory = AsynchronousDeferredRunTest.make_factory(debug=True)
501
runner = factory(self, [handler])
502
self.assertIs(self, runner.case)
503
self.assertEqual([handler], runner.handlers)
504
self.assertEqual(True, runner._debug)
506
def test_deferred_error(self):
507
class SomeTest(TestCase):
508
def test_something(self):
509
return defer.maybeDeferred(lambda: 1/0)
510
test = SomeTest('test_something')
511
runner = self.make_runner(test)
512
result = self.make_result()
515
[event[:2] for event in result._events],
519
('stopTest', test)]))
520
error = result._events[1][2]
521
self.assertThat(error, KeysEqual('traceback', 'twisted-log'))
523
def test_only_addError_once(self):
524
# Even if the reactor is unclean and the test raises an error and the
525
# cleanups raise errors, we only called addError once per test.
526
reactor = self.make_reactor()
527
class WhenItRains(TestCase):
529
# Add a dirty cleanup.
530
self.addCleanup(lambda: 3 / 0)
532
from twisted.internet.protocol import ServerFactory
533
reactor.listenTCP(0, ServerFactory())
535
defer.maybeDeferred(lambda: 2 / 0)
537
raise RuntimeError("Excess precipitation")
538
test = WhenItRains('it_pours')
539
runner = self.make_runner(test)
540
result = self.make_result()
543
[event[:2] for event in result._events],
547
('stopTest', test)]))
548
error = result._events[1][2]
555
'unhandled-error-in-deferred',
558
def test_log_err_is_error(self):
559
# An error logged during the test run is recorded as an error in the
561
class LogAnError(TestCase):
562
def test_something(self):
565
except ZeroDivisionError:
566
f = failure.Failure()
568
test = LogAnError('test_something')
569
runner = self.make_runner(test)
570
result = self.make_result()
573
[event[:2] for event in result._events],
577
('stopTest', test)]))
578
error = result._events[1][2]
579
self.assertThat(error, KeysEqual('logged-error', 'twisted-log'))
581
def test_log_err_flushed_is_success(self):
582
# An error logged during the test run is recorded as an error in the
584
class LogAnError(TestCase):
585
def test_something(self):
588
except ZeroDivisionError:
589
f = failure.Failure()
591
flush_logged_errors(ZeroDivisionError)
592
test = LogAnError('test_something')
593
runner = self.make_runner(test)
594
result = self.make_result()
600
('addSuccess', test, {'twisted-log': text_content('')}),
601
('stopTest', test)]))
603
def test_log_in_details(self):
604
class LogAnError(TestCase):
605
def test_something(self):
608
test = LogAnError('test_something')
609
runner = self.make_runner(test)
610
result = self.make_result()
613
[event[:2] for event in result._events],
617
('stopTest', test)]))
618
error = result._events[1][2]
619
self.assertThat(error, KeysEqual('traceback', 'twisted-log'))
621
def test_debugging_unchanged_during_test_by_default(self):
622
debugging = [(defer.Deferred.debug, DelayedCall.debug)]
623
class SomeCase(TestCase):
624
def test_debugging_enabled(self):
625
debugging.append((defer.Deferred.debug, DelayedCall.debug))
626
test = SomeCase('test_debugging_enabled')
627
runner = AsynchronousDeferredRunTest(
628
test, handlers=test.exception_handlers,
629
reactor=self.make_reactor(), timeout=self.make_timeout())
630
runner.run(self.make_result())
631
self.assertEqual(debugging[0], debugging[1])
633
def test_debugging_enabled_during_test_with_debug_flag(self):
634
self.patch(defer.Deferred, 'debug', False)
635
self.patch(DelayedCall, 'debug', False)
637
class SomeCase(TestCase):
638
def test_debugging_enabled(self):
639
debugging.append((defer.Deferred.debug, DelayedCall.debug))
640
test = SomeCase('test_debugging_enabled')
641
runner = AsynchronousDeferredRunTest(
642
test, handlers=test.exception_handlers,
643
reactor=self.make_reactor(), timeout=self.make_timeout(),
645
runner.run(self.make_result())
646
self.assertEqual([(True, True)], debugging)
647
self.assertEqual(False, defer.Deferred.debug)
648
self.assertEqual(False, defer.Deferred.debug)
651
class TestAssertFailsWith(NeedsTwistedTestCase):
652
"""Tests for `assert_fails_with`."""
654
if SynchronousDeferredRunTest is not None:
655
run_tests_with = SynchronousDeferredRunTest
657
def test_assert_fails_with_success(self):
658
# assert_fails_with fails the test if it's given a Deferred that
661
d = assert_fails_with(defer.succeed(marker), RuntimeError)
662
def check_result(failure):
663
failure.trap(self.failureException)
666
Equals("RuntimeError not raised (%r returned)" % (marker,)))
668
lambda x: self.fail("Should not have succeeded"), check_result)
671
def test_assert_fails_with_success_multiple_types(self):
672
# assert_fails_with fails the test if it's given a Deferred that
675
d = assert_fails_with(
676
defer.succeed(marker), RuntimeError, ZeroDivisionError)
677
def check_result(failure):
678
failure.trap(self.failureException)
681
Equals("RuntimeError, ZeroDivisionError not raised "
682
"(%r returned)" % (marker,)))
684
lambda x: self.fail("Should not have succeeded"), check_result)
687
def test_assert_fails_with_wrong_exception(self):
688
# assert_fails_with fails the test if it's given a Deferred that
690
d = assert_fails_with(
691
defer.maybeDeferred(lambda: 1/0), RuntimeError, KeyboardInterrupt)
692
def check_result(failure):
693
failure.trap(self.failureException)
694
lines = str(failure.value).splitlines()
698
("ZeroDivisionError raised instead of RuntimeError, "
699
"KeyboardInterrupt:"),
700
" Traceback (most recent call last):",
703
lambda x: self.fail("Should not have succeeded"), check_result)
706
def test_assert_fails_with_expected_exception(self):
707
# assert_fails_with calls back with the value of the failure if it's
708
# one of the expected types of failures.
711
except ZeroDivisionError:
712
f = failure.Failure()
713
d = assert_fails_with(defer.fail(f), ZeroDivisionError)
714
return d.addCallback(self.assertThat, Equals(f.value))
716
def test_custom_failure_exception(self):
717
# If assert_fails_with is passed a 'failureException' keyword
718
# argument, then it will raise that instead of `AssertionError`.
719
class CustomException(Exception):
722
d = assert_fails_with(
723
defer.succeed(marker), RuntimeError,
724
failureException=CustomException)
725
def check_result(failure):
726
failure.trap(CustomException)
729
Equals("RuntimeError not raised (%r returned)" % (marker,)))
730
return d.addCallbacks(
731
lambda x: self.fail("Should not have succeeded"), check_result)
735
from unittest import TestLoader, TestSuite
737
[TestLoader().loadTestsFromName(__name__),
738
make_integration_tests()])