~percona-core/percona-server/5.5

« back to all changes in this revision

Viewing changes to python-for-subunit2junitxml/testtools/tests/test_deferredruntest.py

  • Committer: Stewart Smith
  • Date: 2011-10-05 06:14:21 UTC
  • mto: This revision was merged to the branch mainline in revision 177.
  • Revision ID: stewart@flamingspork.com-20111005061421-fp6t1wuey0a8iw1a
add subunit2junitxml and needed libraries

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (c) 2010 testtools developers. See LICENSE for details.
 
2
 
 
3
"""Tests for the DeferredRunTest single test execution logic."""
 
4
 
 
5
import os
 
6
import signal
 
7
 
 
8
from testtools import (
 
9
    skipIf,
 
10
    TestCase,
 
11
    )
 
12
from testtools.content import (
 
13
    text_content,
 
14
    )
 
15
from testtools.helpers import try_import
 
16
from testtools.tests.helpers import ExtendedTestResult
 
17
from testtools.matchers import (
 
18
    Equals,
 
19
    KeysEqual,
 
20
    MatchesException,
 
21
    Raises,
 
22
    )
 
23
from testtools.runtest import RunTest
 
24
from testtools.tests.test_spinner import NeedsTwistedTestCase
 
25
 
 
26
assert_fails_with = try_import('testtools.deferredruntest.assert_fails_with')
 
27
AsynchronousDeferredRunTest = try_import(
 
28
    'testtools.deferredruntest.AsynchronousDeferredRunTest')
 
29
flush_logged_errors = try_import(
 
30
    'testtools.deferredruntest.flush_logged_errors')
 
31
SynchronousDeferredRunTest = try_import(
 
32
    'testtools.deferredruntest.SynchronousDeferredRunTest')
 
33
 
 
34
defer = try_import('twisted.internet.defer')
 
35
failure = try_import('twisted.python.failure')
 
36
log = try_import('twisted.python.log')
 
37
DelayedCall = try_import('twisted.internet.base.DelayedCall')
 
38
 
 
39
 
 
40
class X(object):
 
41
    """Tests that we run as part of our tests, nested to avoid discovery."""
 
42
 
 
43
    class Base(TestCase):
 
44
        def setUp(self):
 
45
            super(X.Base, self).setUp()
 
46
            self.calls = ['setUp']
 
47
            self.addCleanup(self.calls.append, 'clean-up')
 
48
        def test_something(self):
 
49
            self.calls.append('test')
 
50
        def tearDown(self):
 
51
            self.calls.append('tearDown')
 
52
            super(X.Base, self).tearDown()
 
53
 
 
54
    class ErrorInSetup(Base):
 
55
        expected_calls = ['setUp', 'clean-up']
 
56
        expected_results = [('addError', RuntimeError)]
 
57
        def setUp(self):
 
58
            super(X.ErrorInSetup, self).setUp()
 
59
            raise RuntimeError("Error in setUp")
 
60
 
 
61
    class ErrorInTest(Base):
 
62
        expected_calls = ['setUp', 'tearDown', 'clean-up']
 
63
        expected_results = [('addError', RuntimeError)]
 
64
        def test_something(self):
 
65
            raise RuntimeError("Error in test")
 
66
 
 
67
    class FailureInTest(Base):
 
68
        expected_calls = ['setUp', 'tearDown', 'clean-up']
 
69
        expected_results = [('addFailure', AssertionError)]
 
70
        def test_something(self):
 
71
            self.fail("test failed")
 
72
 
 
73
    class ErrorInTearDown(Base):
 
74
        expected_calls = ['setUp', 'test', 'clean-up']
 
75
        expected_results = [('addError', RuntimeError)]
 
76
        def tearDown(self):
 
77
            raise RuntimeError("Error in tearDown")
 
78
 
 
79
    class ErrorInCleanup(Base):
 
80
        expected_calls = ['setUp', 'test', 'tearDown', 'clean-up']
 
81
        expected_results = [('addError', ZeroDivisionError)]
 
82
        def test_something(self):
 
83
            self.calls.append('test')
 
84
            self.addCleanup(lambda: 1/0)
 
85
 
 
86
    class TestIntegration(NeedsTwistedTestCase):
 
87
 
 
88
        def assertResultsMatch(self, test, result):
 
89
            events = list(result._events)
 
90
            self.assertEqual(('startTest', test), events.pop(0))
 
91
            for expected_result in test.expected_results:
 
92
                result = events.pop(0)
 
93
                if len(expected_result) == 1:
 
94
                    self.assertEqual((expected_result[0], test), result)
 
95
                else:
 
96
                    self.assertEqual((expected_result[0], test), result[:2])
 
97
                    error_type = expected_result[1]
 
98
                    self.assertIn(error_type.__name__, str(result[2]))
 
99
            self.assertEqual([('stopTest', test)], events)
 
100
 
 
101
        def test_runner(self):
 
102
            result = ExtendedTestResult()
 
103
            test = self.test_factory('test_something', runTest=self.runner)
 
104
            test.run(result)
 
105
            self.assertEqual(test.calls, self.test_factory.expected_calls)
 
106
            self.assertResultsMatch(test, result)
 
107
 
 
108
 
 
109
def make_integration_tests():
 
110
    from unittest import TestSuite
 
111
    from testtools import clone_test_with_new_id
 
112
    runners = [
 
113
        ('RunTest', RunTest),
 
114
        ('SynchronousDeferredRunTest', SynchronousDeferredRunTest),
 
115
        ('AsynchronousDeferredRunTest', AsynchronousDeferredRunTest),
 
116
        ]
 
117
 
 
118
    tests = [
 
119
        X.ErrorInSetup,
 
120
        X.ErrorInTest,
 
121
        X.ErrorInTearDown,
 
122
        X.FailureInTest,
 
123
        X.ErrorInCleanup,
 
124
        ]
 
125
    base_test = X.TestIntegration('test_runner')
 
126
    integration_tests = []
 
127
    for runner_name, runner in runners:
 
128
        for test in tests:
 
129
            new_test = clone_test_with_new_id(
 
130
                base_test, '%s(%s, %s)' % (
 
131
                    base_test.id(),
 
132
                    runner_name,
 
133
                    test.__name__))
 
134
            new_test.test_factory = test
 
135
            new_test.runner = runner
 
136
            integration_tests.append(new_test)
 
137
    return TestSuite(integration_tests)
 
138
 
 
139
 
 
140
class TestSynchronousDeferredRunTest(NeedsTwistedTestCase):
 
141
 
 
142
    def make_result(self):
 
143
        return ExtendedTestResult()
 
144
 
 
145
    def make_runner(self, test):
 
146
        return SynchronousDeferredRunTest(test, test.exception_handlers)
 
147
 
 
148
    def test_success(self):
 
149
        class SomeCase(TestCase):
 
150
            def test_success(self):
 
151
                return defer.succeed(None)
 
152
        test = SomeCase('test_success')
 
153
        runner = self.make_runner(test)
 
154
        result = self.make_result()
 
155
        runner.run(result)
 
156
        self.assertThat(
 
157
            result._events, Equals([
 
158
                ('startTest', test),
 
159
                ('addSuccess', test),
 
160
                ('stopTest', test)]))
 
161
 
 
162
    def test_failure(self):
 
163
        class SomeCase(TestCase):
 
164
            def test_failure(self):
 
165
                return defer.maybeDeferred(self.fail, "Egads!")
 
166
        test = SomeCase('test_failure')
 
167
        runner = self.make_runner(test)
 
168
        result = self.make_result()
 
169
        runner.run(result)
 
170
        self.assertThat(
 
171
            [event[:2] for event in result._events], Equals([
 
172
                ('startTest', test),
 
173
                ('addFailure', test),
 
174
                ('stopTest', test)]))
 
175
 
 
176
    def test_setUp_followed_by_test(self):
 
177
        class SomeCase(TestCase):
 
178
            def setUp(self):
 
179
                super(SomeCase, self).setUp()
 
180
                return defer.succeed(None)
 
181
            def test_failure(self):
 
182
                return defer.maybeDeferred(self.fail, "Egads!")
 
183
        test = SomeCase('test_failure')
 
184
        runner = self.make_runner(test)
 
185
        result = self.make_result()
 
186
        runner.run(result)
 
187
        self.assertThat(
 
188
            [event[:2] for event in result._events], Equals([
 
189
                ('startTest', test),
 
190
                ('addFailure', test),
 
191
                ('stopTest', test)]))
 
192
 
 
193
 
 
194
class TestAsynchronousDeferredRunTest(NeedsTwistedTestCase):
 
195
 
 
196
    def make_reactor(self):
 
197
        from twisted.internet import reactor
 
198
        return reactor
 
199
 
 
200
    def make_result(self):
 
201
        return ExtendedTestResult()
 
202
 
 
203
    def make_runner(self, test, timeout=None):
 
204
        if timeout is None:
 
205
            timeout = self.make_timeout()
 
206
        return AsynchronousDeferredRunTest(
 
207
            test, test.exception_handlers, timeout=timeout)
 
208
 
 
209
    def make_timeout(self):
 
210
        return 0.005
 
211
 
 
212
    def test_setUp_returns_deferred_that_fires_later(self):
 
213
        # setUp can return a Deferred that might fire at any time.
 
214
        # AsynchronousDeferredRunTest will not go on to running the test until
 
215
        # the Deferred returned by setUp actually fires.
 
216
        call_log = []
 
217
        marker = object()
 
218
        d = defer.Deferred().addCallback(call_log.append)
 
219
        class SomeCase(TestCase):
 
220
            def setUp(self):
 
221
                super(SomeCase, self).setUp()
 
222
                call_log.append('setUp')
 
223
                return d
 
224
            def test_something(self):
 
225
                call_log.append('test')
 
226
        def fire_deferred():
 
227
            self.assertThat(call_log, Equals(['setUp']))
 
228
            d.callback(marker)
 
229
        test = SomeCase('test_something')
 
230
        timeout = self.make_timeout()
 
231
        runner = self.make_runner(test, timeout=timeout)
 
232
        result = self.make_result()
 
233
        reactor = self.make_reactor()
 
234
        reactor.callLater(timeout, fire_deferred)
 
235
        runner.run(result)
 
236
        self.assertThat(call_log, Equals(['setUp', marker, 'test']))
 
237
 
 
238
    def test_calls_setUp_test_tearDown_in_sequence(self):
 
239
        # setUp, the test method and tearDown can all return
 
240
        # Deferreds. AsynchronousDeferredRunTest will make sure that each of
 
241
        # these are run in turn, only going on to the next stage once the
 
242
        # Deferred from the previous stage has fired.
 
243
        call_log = []
 
244
        a = defer.Deferred()
 
245
        a.addCallback(lambda x: call_log.append('a'))
 
246
        b = defer.Deferred()
 
247
        b.addCallback(lambda x: call_log.append('b'))
 
248
        c = defer.Deferred()
 
249
        c.addCallback(lambda x: call_log.append('c'))
 
250
        class SomeCase(TestCase):
 
251
            def setUp(self):
 
252
                super(SomeCase, self).setUp()
 
253
                call_log.append('setUp')
 
254
                return a
 
255
            def test_success(self):
 
256
                call_log.append('test')
 
257
                return b
 
258
            def tearDown(self):
 
259
                super(SomeCase, self).tearDown()
 
260
                call_log.append('tearDown')
 
261
                return c
 
262
        test = SomeCase('test_success')
 
263
        timeout = self.make_timeout()
 
264
        runner = self.make_runner(test, timeout)
 
265
        result = self.make_result()
 
266
        reactor = self.make_reactor()
 
267
        def fire_a():
 
268
            self.assertThat(call_log, Equals(['setUp']))
 
269
            a.callback(None)
 
270
        def fire_b():
 
271
            self.assertThat(call_log, Equals(['setUp', 'a', 'test']))
 
272
            b.callback(None)
 
273
        def fire_c():
 
274
            self.assertThat(
 
275
                call_log, Equals(['setUp', 'a', 'test', 'b', 'tearDown']))
 
276
            c.callback(None)
 
277
        reactor.callLater(timeout * 0.25, fire_a)
 
278
        reactor.callLater(timeout * 0.5, fire_b)
 
279
        reactor.callLater(timeout * 0.75, fire_c)
 
280
        runner.run(result)
 
281
        self.assertThat(
 
282
            call_log, Equals(['setUp', 'a', 'test', 'b', 'tearDown', 'c']))
 
283
 
 
284
    def test_async_cleanups(self):
 
285
        # Cleanups added with addCleanup can return
 
286
        # Deferreds. AsynchronousDeferredRunTest will run each of them in
 
287
        # turn.
 
288
        class SomeCase(TestCase):
 
289
            def test_whatever(self):
 
290
                pass
 
291
        test = SomeCase('test_whatever')
 
292
        call_log = []
 
293
        a = defer.Deferred().addCallback(lambda x: call_log.append('a'))
 
294
        b = defer.Deferred().addCallback(lambda x: call_log.append('b'))
 
295
        c = defer.Deferred().addCallback(lambda x: call_log.append('c'))
 
296
        test.addCleanup(lambda: a)
 
297
        test.addCleanup(lambda: b)
 
298
        test.addCleanup(lambda: c)
 
299
        def fire_a():
 
300
            self.assertThat(call_log, Equals([]))
 
301
            a.callback(None)
 
302
        def fire_b():
 
303
            self.assertThat(call_log, Equals(['a']))
 
304
            b.callback(None)
 
305
        def fire_c():
 
306
            self.assertThat(call_log, Equals(['a', 'b']))
 
307
            c.callback(None)
 
308
        timeout = self.make_timeout()
 
309
        reactor = self.make_reactor()
 
310
        reactor.callLater(timeout * 0.25, fire_a)
 
311
        reactor.callLater(timeout * 0.5, fire_b)
 
312
        reactor.callLater(timeout * 0.75, fire_c)
 
313
        runner = self.make_runner(test, timeout)
 
314
        result = self.make_result()
 
315
        runner.run(result)
 
316
        self.assertThat(call_log, Equals(['a', 'b', 'c']))
 
317
 
 
318
    def test_clean_reactor(self):
 
319
        # If there's cruft left over in the reactor, the test fails.
 
320
        reactor = self.make_reactor()
 
321
        timeout = self.make_timeout()
 
322
        class SomeCase(TestCase):
 
323
            def test_cruft(self):
 
324
                reactor.callLater(timeout * 10.0, lambda: None)
 
325
        test = SomeCase('test_cruft')
 
326
        runner = self.make_runner(test, timeout)
 
327
        result = self.make_result()
 
328
        runner.run(result)
 
329
        self.assertThat(
 
330
            [event[:2] for event in result._events],
 
331
            Equals(
 
332
                [('startTest', test),
 
333
                 ('addError', test),
 
334
                 ('stopTest', test)]))
 
335
        error = result._events[1][2]
 
336
        self.assertThat(error, KeysEqual('traceback', 'twisted-log'))
 
337
 
 
338
    def test_unhandled_error_from_deferred(self):
 
339
        # If there's a Deferred with an unhandled error, the test fails.  Each
 
340
        # unhandled error is reported with a separate traceback.
 
341
        class SomeCase(TestCase):
 
342
            def test_cruft(self):
 
343
                # Note we aren't returning the Deferred so that the error will
 
344
                # be unhandled.
 
345
                defer.maybeDeferred(lambda: 1/0)
 
346
                defer.maybeDeferred(lambda: 2/0)
 
347
        test = SomeCase('test_cruft')
 
348
        runner = self.make_runner(test)
 
349
        result = self.make_result()
 
350
        runner.run(result)
 
351
        error = result._events[1][2]
 
352
        result._events[1] = ('addError', test, None)
 
353
        self.assertThat(result._events, Equals(
 
354
            [('startTest', test),
 
355
             ('addError', test, None),
 
356
             ('stopTest', test)]))
 
357
        self.assertThat(
 
358
            error, KeysEqual(
 
359
                'twisted-log',
 
360
                'unhandled-error-in-deferred',
 
361
                'unhandled-error-in-deferred-1',
 
362
                ))
 
363
 
 
364
    def test_unhandled_error_from_deferred_combined_with_error(self):
 
365
        # If there's a Deferred with an unhandled error, the test fails.  Each
 
366
        # unhandled error is reported with a separate traceback, and the error
 
367
        # is still reported.
 
368
        class SomeCase(TestCase):
 
369
            def test_cruft(self):
 
370
                # Note we aren't returning the Deferred so that the error will
 
371
                # be unhandled.
 
372
                defer.maybeDeferred(lambda: 1/0)
 
373
                2 / 0
 
374
        test = SomeCase('test_cruft')
 
375
        runner = self.make_runner(test)
 
376
        result = self.make_result()
 
377
        runner.run(result)
 
378
        error = result._events[1][2]
 
379
        result._events[1] = ('addError', test, None)
 
380
        self.assertThat(result._events, Equals(
 
381
            [('startTest', test),
 
382
             ('addError', test, None),
 
383
             ('stopTest', test)]))
 
384
        self.assertThat(
 
385
            error, KeysEqual(
 
386
                'traceback',
 
387
                'twisted-log',
 
388
                'unhandled-error-in-deferred',
 
389
                ))
 
390
 
 
391
    @skipIf(os.name != "posix", "Sending SIGINT with os.kill is posix only")
 
392
    def test_keyboard_interrupt_stops_test_run(self):
 
393
        # If we get a SIGINT during a test run, the test stops and no more
 
394
        # tests run.
 
395
        SIGINT = getattr(signal, 'SIGINT', None)
 
396
        if not SIGINT:
 
397
            raise self.skipTest("SIGINT unavailable")
 
398
        class SomeCase(TestCase):
 
399
            def test_pause(self):
 
400
                return defer.Deferred()
 
401
        test = SomeCase('test_pause')
 
402
        reactor = self.make_reactor()
 
403
        timeout = self.make_timeout()
 
404
        runner = self.make_runner(test, timeout * 5)
 
405
        result = self.make_result()
 
406
        reactor.callLater(timeout, os.kill, os.getpid(), SIGINT)
 
407
        self.assertThat(lambda:runner.run(result),
 
408
            Raises(MatchesException(KeyboardInterrupt)))
 
409
 
 
410
    @skipIf(os.name != "posix", "Sending SIGINT with os.kill is posix only")
 
411
    def test_fast_keyboard_interrupt_stops_test_run(self):
 
412
        # If we get a SIGINT during a test run, the test stops and no more
 
413
        # tests run.
 
414
        SIGINT = getattr(signal, 'SIGINT', None)
 
415
        if not SIGINT:
 
416
            raise self.skipTest("SIGINT unavailable")
 
417
        class SomeCase(TestCase):
 
418
            def test_pause(self):
 
419
                return defer.Deferred()
 
420
        test = SomeCase('test_pause')
 
421
        reactor = self.make_reactor()
 
422
        timeout = self.make_timeout()
 
423
        runner = self.make_runner(test, timeout * 5)
 
424
        result = self.make_result()
 
425
        reactor.callWhenRunning(os.kill, os.getpid(), SIGINT)
 
426
        self.assertThat(lambda:runner.run(result),
 
427
            Raises(MatchesException(KeyboardInterrupt)))
 
428
 
 
429
    def test_timeout_causes_test_error(self):
 
430
        # If a test times out, it reports itself as having failed with a
 
431
        # TimeoutError.
 
432
        class SomeCase(TestCase):
 
433
            def test_pause(self):
 
434
                return defer.Deferred()
 
435
        test = SomeCase('test_pause')
 
436
        runner = self.make_runner(test)
 
437
        result = self.make_result()
 
438
        runner.run(result)
 
439
        error = result._events[1][2]
 
440
        self.assertThat(
 
441
            [event[:2] for event in result._events], Equals(
 
442
            [('startTest', test),
 
443
             ('addError', test),
 
444
             ('stopTest', test)]))
 
445
        self.assertIn('TimeoutError', str(error['traceback']))
 
446
 
 
447
    def test_convenient_construction(self):
 
448
        # As a convenience method, AsynchronousDeferredRunTest has a
 
449
        # classmethod that returns an AsynchronousDeferredRunTest
 
450
        # factory. This factory has the same API as the RunTest constructor.
 
451
        reactor = object()
 
452
        timeout = object()
 
453
        handler = object()
 
454
        factory = AsynchronousDeferredRunTest.make_factory(reactor, timeout)
 
455
        runner = factory(self, [handler])
 
456
        self.assertIs(reactor, runner._reactor)
 
457
        self.assertIs(timeout, runner._timeout)
 
458
        self.assertIs(self, runner.case)
 
459
        self.assertEqual([handler], runner.handlers)
 
460
 
 
461
    def test_use_convenient_factory(self):
 
462
        # Make sure that the factory can actually be used.
 
463
        factory = AsynchronousDeferredRunTest.make_factory()
 
464
        class SomeCase(TestCase):
 
465
            run_tests_with = factory
 
466
            def test_something(self):
 
467
                pass
 
468
        case = SomeCase('test_something')
 
469
        case.run()
 
470
 
 
471
    def test_convenient_construction_default_reactor(self):
 
472
        # As a convenience method, AsynchronousDeferredRunTest has a
 
473
        # classmethod that returns an AsynchronousDeferredRunTest
 
474
        # factory. This factory has the same API as the RunTest constructor.
 
475
        reactor = object()
 
476
        handler = object()
 
477
        factory = AsynchronousDeferredRunTest.make_factory(reactor=reactor)
 
478
        runner = factory(self, [handler])
 
479
        self.assertIs(reactor, runner._reactor)
 
480
        self.assertIs(self, runner.case)
 
481
        self.assertEqual([handler], runner.handlers)
 
482
 
 
483
    def test_convenient_construction_default_timeout(self):
 
484
        # As a convenience method, AsynchronousDeferredRunTest has a
 
485
        # classmethod that returns an AsynchronousDeferredRunTest
 
486
        # factory. This factory has the same API as the RunTest constructor.
 
487
        timeout = object()
 
488
        handler = object()
 
489
        factory = AsynchronousDeferredRunTest.make_factory(timeout=timeout)
 
490
        runner = factory(self, [handler])
 
491
        self.assertIs(timeout, runner._timeout)
 
492
        self.assertIs(self, runner.case)
 
493
        self.assertEqual([handler], runner.handlers)
 
494
 
 
495
    def test_convenient_construction_default_debugging(self):
 
496
        # As a convenience method, AsynchronousDeferredRunTest has a
 
497
        # classmethod that returns an AsynchronousDeferredRunTest
 
498
        # factory. This factory has the same API as the RunTest constructor.
 
499
        handler = object()
 
500
        factory = AsynchronousDeferredRunTest.make_factory(debug=True)
 
501
        runner = factory(self, [handler])
 
502
        self.assertIs(self, runner.case)
 
503
        self.assertEqual([handler], runner.handlers)
 
504
        self.assertEqual(True, runner._debug)
 
505
 
 
506
    def test_deferred_error(self):
 
507
        class SomeTest(TestCase):
 
508
            def test_something(self):
 
509
                return defer.maybeDeferred(lambda: 1/0)
 
510
        test = SomeTest('test_something')
 
511
        runner = self.make_runner(test)
 
512
        result = self.make_result()
 
513
        runner.run(result)
 
514
        self.assertThat(
 
515
            [event[:2] for event in result._events],
 
516
            Equals([
 
517
                ('startTest', test),
 
518
                ('addError', test),
 
519
                ('stopTest', test)]))
 
520
        error = result._events[1][2]
 
521
        self.assertThat(error, KeysEqual('traceback', 'twisted-log'))
 
522
 
 
523
    def test_only_addError_once(self):
 
524
        # Even if the reactor is unclean and the test raises an error and the
 
525
        # cleanups raise errors, we only called addError once per test.
 
526
        reactor = self.make_reactor()
 
527
        class WhenItRains(TestCase):
 
528
            def it_pours(self):
 
529
                # Add a dirty cleanup.
 
530
                self.addCleanup(lambda: 3 / 0)
 
531
                # Dirty the reactor.
 
532
                from twisted.internet.protocol import ServerFactory
 
533
                reactor.listenTCP(0, ServerFactory())
 
534
                # Unhandled error.
 
535
                defer.maybeDeferred(lambda: 2 / 0)
 
536
                # Actual error.
 
537
                raise RuntimeError("Excess precipitation")
 
538
        test = WhenItRains('it_pours')
 
539
        runner = self.make_runner(test)
 
540
        result = self.make_result()
 
541
        runner.run(result)
 
542
        self.assertThat(
 
543
            [event[:2] for event in result._events],
 
544
            Equals([
 
545
                ('startTest', test),
 
546
                ('addError', test),
 
547
                ('stopTest', test)]))
 
548
        error = result._events[1][2]
 
549
        self.assertThat(
 
550
            error, KeysEqual(
 
551
                'traceback',
 
552
                'traceback-1',
 
553
                'traceback-2',
 
554
                'twisted-log',
 
555
                'unhandled-error-in-deferred',
 
556
                ))
 
557
 
 
558
    def test_log_err_is_error(self):
 
559
        # An error logged during the test run is recorded as an error in the
 
560
        # tests.
 
561
        class LogAnError(TestCase):
 
562
            def test_something(self):
 
563
                try:
 
564
                    1/0
 
565
                except ZeroDivisionError:
 
566
                    f = failure.Failure()
 
567
                log.err(f)
 
568
        test = LogAnError('test_something')
 
569
        runner = self.make_runner(test)
 
570
        result = self.make_result()
 
571
        runner.run(result)
 
572
        self.assertThat(
 
573
            [event[:2] for event in result._events],
 
574
            Equals([
 
575
                ('startTest', test),
 
576
                ('addError', test),
 
577
                ('stopTest', test)]))
 
578
        error = result._events[1][2]
 
579
        self.assertThat(error, KeysEqual('logged-error', 'twisted-log'))
 
580
 
 
581
    def test_log_err_flushed_is_success(self):
 
582
        # An error logged during the test run is recorded as an error in the
 
583
        # tests.
 
584
        class LogAnError(TestCase):
 
585
            def test_something(self):
 
586
                try:
 
587
                    1/0
 
588
                except ZeroDivisionError:
 
589
                    f = failure.Failure()
 
590
                log.err(f)
 
591
                flush_logged_errors(ZeroDivisionError)
 
592
        test = LogAnError('test_something')
 
593
        runner = self.make_runner(test)
 
594
        result = self.make_result()
 
595
        runner.run(result)
 
596
        self.assertThat(
 
597
            result._events,
 
598
            Equals([
 
599
                ('startTest', test),
 
600
                ('addSuccess', test, {'twisted-log': text_content('')}),
 
601
                ('stopTest', test)]))
 
602
 
 
603
    def test_log_in_details(self):
 
604
        class LogAnError(TestCase):
 
605
            def test_something(self):
 
606
                log.msg("foo")
 
607
                1/0
 
608
        test = LogAnError('test_something')
 
609
        runner = self.make_runner(test)
 
610
        result = self.make_result()
 
611
        runner.run(result)
 
612
        self.assertThat(
 
613
            [event[:2] for event in result._events],
 
614
            Equals([
 
615
                ('startTest', test),
 
616
                ('addError', test),
 
617
                ('stopTest', test)]))
 
618
        error = result._events[1][2]
 
619
        self.assertThat(error, KeysEqual('traceback', 'twisted-log'))
 
620
 
 
621
    def test_debugging_unchanged_during_test_by_default(self):
 
622
        debugging = [(defer.Deferred.debug, DelayedCall.debug)]
 
623
        class SomeCase(TestCase):
 
624
            def test_debugging_enabled(self):
 
625
                debugging.append((defer.Deferred.debug, DelayedCall.debug))
 
626
        test = SomeCase('test_debugging_enabled')
 
627
        runner = AsynchronousDeferredRunTest(
 
628
            test, handlers=test.exception_handlers,
 
629
            reactor=self.make_reactor(), timeout=self.make_timeout())
 
630
        runner.run(self.make_result())
 
631
        self.assertEqual(debugging[0], debugging[1])
 
632
 
 
633
    def test_debugging_enabled_during_test_with_debug_flag(self):
 
634
        self.patch(defer.Deferred, 'debug', False)
 
635
        self.patch(DelayedCall, 'debug', False)
 
636
        debugging = []
 
637
        class SomeCase(TestCase):
 
638
            def test_debugging_enabled(self):
 
639
                debugging.append((defer.Deferred.debug, DelayedCall.debug))
 
640
        test = SomeCase('test_debugging_enabled')
 
641
        runner = AsynchronousDeferredRunTest(
 
642
            test, handlers=test.exception_handlers,
 
643
            reactor=self.make_reactor(), timeout=self.make_timeout(),
 
644
            debug=True)
 
645
        runner.run(self.make_result())
 
646
        self.assertEqual([(True, True)], debugging)
 
647
        self.assertEqual(False, defer.Deferred.debug)
 
648
        self.assertEqual(False, defer.Deferred.debug)
 
649
 
 
650
 
 
651
class TestAssertFailsWith(NeedsTwistedTestCase):
 
652
    """Tests for `assert_fails_with`."""
 
653
 
 
654
    if SynchronousDeferredRunTest is not None:
 
655
        run_tests_with = SynchronousDeferredRunTest
 
656
 
 
657
    def test_assert_fails_with_success(self):
 
658
        # assert_fails_with fails the test if it's given a Deferred that
 
659
        # succeeds.
 
660
        marker = object()
 
661
        d = assert_fails_with(defer.succeed(marker), RuntimeError)
 
662
        def check_result(failure):
 
663
            failure.trap(self.failureException)
 
664
            self.assertThat(
 
665
                str(failure.value),
 
666
                Equals("RuntimeError not raised (%r returned)" % (marker,)))
 
667
        d.addCallbacks(
 
668
            lambda x: self.fail("Should not have succeeded"), check_result)
 
669
        return d
 
670
 
 
671
    def test_assert_fails_with_success_multiple_types(self):
 
672
        # assert_fails_with fails the test if it's given a Deferred that
 
673
        # succeeds.
 
674
        marker = object()
 
675
        d = assert_fails_with(
 
676
            defer.succeed(marker), RuntimeError, ZeroDivisionError)
 
677
        def check_result(failure):
 
678
            failure.trap(self.failureException)
 
679
            self.assertThat(
 
680
                str(failure.value),
 
681
                Equals("RuntimeError, ZeroDivisionError not raised "
 
682
                       "(%r returned)" % (marker,)))
 
683
        d.addCallbacks(
 
684
            lambda x: self.fail("Should not have succeeded"), check_result)
 
685
        return d
 
686
 
 
687
    def test_assert_fails_with_wrong_exception(self):
 
688
        # assert_fails_with fails the test if it's given a Deferred that
 
689
        # succeeds.
 
690
        d = assert_fails_with(
 
691
            defer.maybeDeferred(lambda: 1/0), RuntimeError, KeyboardInterrupt)
 
692
        def check_result(failure):
 
693
            failure.trap(self.failureException)
 
694
            lines = str(failure.value).splitlines()
 
695
            self.assertThat(
 
696
                lines[:2],
 
697
                Equals([
 
698
                    ("ZeroDivisionError raised instead of RuntimeError, "
 
699
                     "KeyboardInterrupt:"),
 
700
                    " Traceback (most recent call last):",
 
701
                    ]))
 
702
        d.addCallbacks(
 
703
            lambda x: self.fail("Should not have succeeded"), check_result)
 
704
        return d
 
705
 
 
706
    def test_assert_fails_with_expected_exception(self):
 
707
        # assert_fails_with calls back with the value of the failure if it's
 
708
        # one of the expected types of failures.
 
709
        try:
 
710
            1/0
 
711
        except ZeroDivisionError:
 
712
            f = failure.Failure()
 
713
        d = assert_fails_with(defer.fail(f), ZeroDivisionError)
 
714
        return d.addCallback(self.assertThat, Equals(f.value))
 
715
 
 
716
    def test_custom_failure_exception(self):
 
717
        # If assert_fails_with is passed a 'failureException' keyword
 
718
        # argument, then it will raise that instead of `AssertionError`.
 
719
        class CustomException(Exception):
 
720
            pass
 
721
        marker = object()
 
722
        d = assert_fails_with(
 
723
            defer.succeed(marker), RuntimeError,
 
724
            failureException=CustomException)
 
725
        def check_result(failure):
 
726
            failure.trap(CustomException)
 
727
            self.assertThat(
 
728
                str(failure.value),
 
729
                Equals("RuntimeError not raised (%r returned)" % (marker,)))
 
730
        return d.addCallbacks(
 
731
            lambda x: self.fail("Should not have succeeded"), check_result)
 
732
 
 
733
 
 
734
def test_suite():
 
735
    from unittest import TestLoader, TestSuite
 
736
    return TestSuite(
 
737
        [TestLoader().loadTestsFromName(__name__),
 
738
         make_integration_tests()])