~jk0/nova/xs-ipv6

« back to all changes in this revision

Viewing changes to vendor/Twisted-10.0.0/twisted/trial/test/test_util.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
import os
 
2
 
 
3
from zope.interface import implements
 
4
 
 
5
from twisted.internet.interfaces import IProcessTransport
 
6
from twisted.internet import defer
 
7
from twisted.internet.base import DelayedCall
 
8
 
 
9
from twisted.trial.unittest import TestCase
 
10
from twisted.trial import util
 
11
from twisted.trial.util import DirtyReactorAggregateError, _Janitor
 
12
from twisted.trial.test import packages
 
13
 
 
14
 
 
15
 
 
16
class TestMktemp(TestCase):
 
17
    def test_name(self):
 
18
        name = self.mktemp()
 
19
        dirs = os.path.dirname(name).split(os.sep)[:-1]
 
20
        self.failUnlessEqual(
 
21
            dirs, ['twisted.trial.test.test_util', 'TestMktemp', 'test_name'])
 
22
 
 
23
    def test_unique(self):
 
24
        name = self.mktemp()
 
25
        self.failIfEqual(name, self.mktemp())
 
26
 
 
27
    def test_created(self):
 
28
        name = self.mktemp()
 
29
        dirname = os.path.dirname(name)
 
30
        self.failUnless(os.path.exists(dirname))
 
31
        self.failIf(os.path.exists(name))
 
32
 
 
33
    def test_location(self):
 
34
        path = os.path.abspath(self.mktemp())
 
35
        self.failUnless(path.startswith(os.getcwd()))
 
36
 
 
37
 
 
38
class TestIntrospection(TestCase):
 
39
    def test_containers(self):
 
40
        import suppression
 
41
        parents = util.getPythonContainers(
 
42
            suppression.TestSuppression2.testSuppressModule)
 
43
        expected = [suppression.TestSuppression2, suppression]
 
44
        for a, b in zip(parents, expected):
 
45
            self.failUnlessEqual(a, b)
 
46
 
 
47
 
 
48
class TestFindObject(packages.SysPathManglingTest):
 
49
    def test_importPackage(self):
 
50
        package1 = util.findObject('package')
 
51
        import package as package2
 
52
        self.failUnlessEqual(package1, (True, package2))
 
53
 
 
54
    def test_importModule(self):
 
55
        test_sample2 = util.findObject('goodpackage.test_sample')
 
56
        from goodpackage import test_sample
 
57
        self.failUnlessEqual((True, test_sample), test_sample2)
 
58
 
 
59
    def test_importError(self):
 
60
        self.failUnlessRaises(ZeroDivisionError,
 
61
                              util.findObject, 'package.test_bad_module')
 
62
 
 
63
    def test_sophisticatedImportError(self):
 
64
        self.failUnlessRaises(ImportError,
 
65
                              util.findObject, 'package2.test_module')
 
66
 
 
67
    def test_importNonexistentPackage(self):
 
68
        self.failUnlessEqual(util.findObject('doesntexist')[0], False)
 
69
 
 
70
    def test_findNonexistentModule(self):
 
71
        self.failUnlessEqual(util.findObject('package.doesntexist')[0], False)
 
72
 
 
73
    def test_findNonexistentObject(self):
 
74
        self.failUnlessEqual(util.findObject(
 
75
            'goodpackage.test_sample.doesnt')[0], False)
 
76
        self.failUnlessEqual(util.findObject(
 
77
            'goodpackage.test_sample.AlphabetTest.doesntexist')[0], False)
 
78
 
 
79
    def test_findObjectExist(self):
 
80
        alpha1 = util.findObject('goodpackage.test_sample.AlphabetTest')
 
81
        from goodpackage import test_sample
 
82
        self.failUnlessEqual(alpha1, (True, test_sample.AlphabetTest))
 
83
 
 
84
 
 
85
 
 
86
class TestRunSequentially(TestCase):
 
87
    """
 
88
    Sometimes it is useful to be able to run an arbitrary list of callables,
 
89
    one after the other.
 
90
 
 
91
    When some of those callables can return Deferreds, things become complex.
 
92
    """
 
93
 
 
94
    def test_emptyList(self):
 
95
        """
 
96
        When asked to run an empty list of callables, runSequentially returns a
 
97
        successful Deferred that fires an empty list.
 
98
        """
 
99
        d = util._runSequentially([])
 
100
        d.addCallback(self.assertEqual, [])
 
101
        return d
 
102
 
 
103
 
 
104
    def test_singleSynchronousSuccess(self):
 
105
        """
 
106
        When given a callable that succeeds without returning a Deferred,
 
107
        include the return value in the results list, tagged with a SUCCESS
 
108
        flag.
 
109
        """
 
110
        d = util._runSequentially([lambda: None])
 
111
        d.addCallback(self.assertEqual, [(defer.SUCCESS, None)])
 
112
        return d
 
113
 
 
114
 
 
115
    def test_singleSynchronousFailure(self):
 
116
        """
 
117
        When given a callable that raises an exception, include a Failure for
 
118
        that exception in the results list, tagged with a FAILURE flag.
 
119
        """
 
120
        d = util._runSequentially([lambda: self.fail('foo')])
 
121
        def check(results):
 
122
            [(flag, fail)] = results
 
123
            fail.trap(self.failureException)
 
124
            self.assertEqual(fail.getErrorMessage(), 'foo')
 
125
            self.assertEqual(flag, defer.FAILURE)
 
126
        return d.addCallback(check)
 
127
 
 
128
 
 
129
    def test_singleAsynchronousSuccess(self):
 
130
        """
 
131
        When given a callable that returns a successful Deferred, include the
 
132
        result of the Deferred in the results list, tagged with a SUCCESS flag.
 
133
        """
 
134
        d = util._runSequentially([lambda: defer.succeed(None)])
 
135
        d.addCallback(self.assertEqual, [(defer.SUCCESS, None)])
 
136
        return d
 
137
 
 
138
 
 
139
    def test_singleAsynchronousFailure(self):
 
140
        """
 
141
        When given a callable that returns a failing Deferred, include the
 
142
        failure the results list, tagged with a FAILURE flag.
 
143
        """
 
144
        d = util._runSequentially([lambda: defer.fail(ValueError('foo'))])
 
145
        def check(results):
 
146
            [(flag, fail)] = results
 
147
            fail.trap(ValueError)
 
148
            self.assertEqual(fail.getErrorMessage(), 'foo')
 
149
            self.assertEqual(flag, defer.FAILURE)
 
150
        return d.addCallback(check)
 
151
 
 
152
 
 
153
    def test_callablesCalledInOrder(self):
 
154
        """
 
155
        Check that the callables are called in the given order, one after the
 
156
        other.
 
157
        """
 
158
        log = []
 
159
        deferreds = []
 
160
 
 
161
        def append(value):
 
162
            d = defer.Deferred()
 
163
            log.append(value)
 
164
            deferreds.append(d)
 
165
            return d
 
166
 
 
167
        d = util._runSequentially([lambda: append('foo'),
 
168
                                   lambda: append('bar')])
 
169
 
 
170
        # runSequentially should wait until the Deferred has fired before
 
171
        # running the second callable.
 
172
        self.assertEqual(log, ['foo'])
 
173
        deferreds[-1].callback(None)
 
174
        self.assertEqual(log, ['foo', 'bar'])
 
175
 
 
176
        # Because returning created Deferreds makes jml happy.
 
177
        deferreds[-1].callback(None)
 
178
        return d
 
179
 
 
180
 
 
181
    def test_continuesAfterError(self):
 
182
        """
 
183
        If one of the callables raises an error, then runSequentially continues
 
184
        to run the remaining callables.
 
185
        """
 
186
        d = util._runSequentially([lambda: self.fail('foo'), lambda: 'bar'])
 
187
        def check(results):
 
188
            [(flag1, fail), (flag2, result)] = results
 
189
            fail.trap(self.failureException)
 
190
            self.assertEqual(flag1, defer.FAILURE)
 
191
            self.assertEqual(fail.getErrorMessage(), 'foo')
 
192
            self.assertEqual(flag2, defer.SUCCESS)
 
193
            self.assertEqual(result, 'bar')
 
194
        return d.addCallback(check)
 
195
 
 
196
 
 
197
    def test_stopOnFirstError(self):
 
198
        """
 
199
        If the C{stopOnFirstError} option is passed to C{runSequentially}, then
 
200
        no further callables are called after the first exception is raised.
 
201
        """
 
202
        d = util._runSequentially([lambda: self.fail('foo'), lambda: 'bar'],
 
203
                                  stopOnFirstError=True)
 
204
        def check(results):
 
205
            [(flag1, fail)] = results
 
206
            fail.trap(self.failureException)
 
207
            self.assertEqual(flag1, defer.FAILURE)
 
208
            self.assertEqual(fail.getErrorMessage(), 'foo')
 
209
        return d.addCallback(check)
 
210
 
 
211
 
 
212
    def test_stripFlags(self):
 
213
        """
 
214
        If the C{stripFlags} option is passed to C{runSequentially} then the
 
215
        SUCCESS / FAILURE flags are stripped from the output. Instead, the
 
216
        Deferred fires a flat list of results containing only the results and
 
217
        failures.
 
218
        """
 
219
        d = util._runSequentially([lambda: self.fail('foo'), lambda: 'bar'],
 
220
                                  stripFlags=True)
 
221
        def check(results):
 
222
            [fail, result] = results
 
223
            fail.trap(self.failureException)
 
224
            self.assertEqual(fail.getErrorMessage(), 'foo')
 
225
            self.assertEqual(result, 'bar')
 
226
        return d.addCallback(check)
 
227
    test_stripFlags.todo = "YAGNI"
 
228
 
 
229
 
 
230
 
 
231
class DirtyReactorAggregateErrorTest(TestCase):
 
232
    """
 
233
    Tests for the L{DirtyReactorAggregateError}.
 
234
    """
 
235
 
 
236
    def test_formatDelayedCall(self):
 
237
        """
 
238
        Delayed calls are formatted nicely.
 
239
        """
 
240
        error = DirtyReactorAggregateError(["Foo", "bar"])
 
241
        self.assertEquals(str(error),
 
242
                          """\
 
243
Reactor was unclean.
 
244
DelayedCalls: (set twisted.internet.base.DelayedCall.debug = True to debug)
 
245
Foo
 
246
bar""")
 
247
 
 
248
 
 
249
    def test_formatSelectables(self):
 
250
        """
 
251
        Selectables are formatted nicely.
 
252
        """
 
253
        error = DirtyReactorAggregateError([], ["selectable 1", "selectable 2"])
 
254
        self.assertEquals(str(error),
 
255
                          """\
 
256
Reactor was unclean.
 
257
Selectables:
 
258
selectable 1
 
259
selectable 2""")
 
260
 
 
261
 
 
262
    def test_formatDelayedCallsAndSelectables(self):
 
263
        """
 
264
        Both delayed calls and selectables can appear in the same error.
 
265
        """
 
266
        error = DirtyReactorAggregateError(["bleck", "Boozo"],
 
267
                                           ["Sel1", "Sel2"])
 
268
        self.assertEquals(str(error),
 
269
                          """\
 
270
Reactor was unclean.
 
271
DelayedCalls: (set twisted.internet.base.DelayedCall.debug = True to debug)
 
272
bleck
 
273
Boozo
 
274
Selectables:
 
275
Sel1
 
276
Sel2""")
 
277
 
 
278
 
 
279
 
 
280
class StubReactor(object):
 
281
    """
 
282
    A reactor stub which contains enough functionality to be used with the
 
283
    L{_Janitor}.
 
284
 
 
285
    @ivar iterations: A list of the arguments passed to L{iterate}.
 
286
    @ivar removeAllCalled: Number of times that L{removeAll} was called.
 
287
    @ivar selectables: The value that will be returned from L{removeAll}.
 
288
    @ivar delayedCalls: The value to return from L{getDelayedCalls}.
 
289
    """
 
290
 
 
291
    def __init__(self, delayedCalls, selectables=None):
 
292
        """
 
293
        @param delayedCalls: See L{StubReactor.delayedCalls}.
 
294
        @param selectables: See L{StubReactor.selectables}.
 
295
        """
 
296
        self.delayedCalls = delayedCalls
 
297
        self.iterations = []
 
298
        self.removeAllCalled = 0
 
299
        if not selectables:
 
300
            selectables = []
 
301
        self.selectables = selectables
 
302
 
 
303
 
 
304
    def iterate(self, timeout=None):
 
305
        """
 
306
        Increment C{self.iterations}.
 
307
        """
 
308
        self.iterations.append(timeout)
 
309
 
 
310
 
 
311
    def getDelayedCalls(self):
 
312
        """
 
313
        Return C{self.delayedCalls}.
 
314
        """
 
315
        return self.delayedCalls
 
316
 
 
317
 
 
318
    def removeAll(self):
 
319
        """
 
320
        Increment C{self.removeAllCalled} and return C{self.selectables}.
 
321
        """
 
322
        self.removeAllCalled += 1
 
323
        return self.selectables
 
324
 
 
325
 
 
326
 
 
327
class StubErrorReporter(object):
 
328
    """
 
329
    A subset of L{twisted.trial.itrial.IReporter} which records L{addError}
 
330
    calls.
 
331
 
 
332
    @ivar errors: List of two-tuples of (test, error) which were passed to
 
333
        L{addError}.
 
334
    """
 
335
 
 
336
    def __init__(self):
 
337
        self.errors = []
 
338
 
 
339
 
 
340
    def addError(self, test, error):
 
341
        """
 
342
        Record parameters in C{self.errors}.
 
343
        """
 
344
        self.errors.append((test, error))
 
345
 
 
346
 
 
347
 
 
348
class JanitorTests(TestCase):
 
349
    """
 
350
    Tests for L{_Janitor}!
 
351
    """
 
352
 
 
353
    def test_cleanPendingSpinsReactor(self):
 
354
        """
 
355
        During pending-call cleanup, the reactor will be spun twice with an
 
356
        instant timeout. This is not a requirement, it is only a test for
 
357
        current behavior. Hopefully Trial will eventually not do this kind of
 
358
        reactor stuff.
 
359
        """
 
360
        reactor = StubReactor([])
 
361
        jan = _Janitor(None, None, reactor=reactor)
 
362
        jan._cleanPending()
 
363
        self.assertEquals(reactor.iterations, [0, 0])
 
364
 
 
365
 
 
366
    def test_cleanPendingCancelsCalls(self):
 
367
        """
 
368
        During pending-call cleanup, the janitor cancels pending timed calls.
 
369
        """
 
370
        def func():
 
371
            return "Lulz"
 
372
        cancelled = []
 
373
        delayedCall = DelayedCall(300, func, (), {},
 
374
                                  cancelled.append, lambda x: None)
 
375
        reactor = StubReactor([delayedCall])
 
376
        jan = _Janitor(None, None, reactor=reactor)
 
377
        jan._cleanPending()
 
378
        self.assertEquals(cancelled, [delayedCall])
 
379
 
 
380
 
 
381
    def test_cleanPendingReturnsDelayedCallStrings(self):
 
382
        """
 
383
        The Janitor produces string representations of delayed calls from the
 
384
        delayed call cleanup method. It gets the string representations
 
385
        *before* cancelling the calls; this is important because cancelling the
 
386
        call removes critical debugging information from the string
 
387
        representation.
 
388
        """
 
389
        delayedCall = DelayedCall(300, lambda: None, (), {},
 
390
                                  lambda x: None, lambda x: None,
 
391
                                  seconds=lambda: 0)
 
392
        delayedCallString = str(delayedCall)
 
393
        reactor = StubReactor([delayedCall])
 
394
        jan = _Janitor(None, None, reactor=reactor)
 
395
        strings = jan._cleanPending()
 
396
        self.assertEquals(strings, [delayedCallString])
 
397
 
 
398
 
 
399
    def test_cleanReactorRemovesSelectables(self):
 
400
        """
 
401
        The Janitor will remove selectables during reactor cleanup.
 
402
        """
 
403
        reactor = StubReactor([])
 
404
        jan = _Janitor(None, None, reactor=reactor)
 
405
        jan._cleanReactor()
 
406
        self.assertEquals(reactor.removeAllCalled, 1)
 
407
 
 
408
 
 
409
    def test_cleanReactorKillsProcesses(self):
 
410
        """
 
411
        The Janitor will kill processes during reactor cleanup.
 
412
        """
 
413
        class StubProcessTransport(object):
 
414
            """
 
415
            A stub L{IProcessTransport} provider which records signals.
 
416
            @ivar signals: The signals passed to L{signalProcess}.
 
417
            """
 
418
            implements(IProcessTransport)
 
419
 
 
420
            def __init__(self):
 
421
                self.signals = []
 
422
 
 
423
            def signalProcess(self, signal):
 
424
                """
 
425
                Append C{signal} to C{self.signals}.
 
426
                """
 
427
                self.signals.append(signal)
 
428
 
 
429
        pt = StubProcessTransport()
 
430
        reactor = StubReactor([], [pt])
 
431
        jan = _Janitor(None, None, reactor=reactor)
 
432
        jan._cleanReactor()
 
433
        self.assertEquals(pt.signals, ["KILL"])
 
434
 
 
435
 
 
436
    def test_cleanReactorReturnsSelectableStrings(self):
 
437
        """
 
438
        The Janitor returns string representations of the selectables that it
 
439
        cleaned up from the reactor cleanup method.
 
440
        """
 
441
        class Selectable(object):
 
442
            """
 
443
            A stub Selectable which only has an interesting string
 
444
            representation.
 
445
            """
 
446
            def __repr__(self):
 
447
                return "(SELECTABLE!)"
 
448
 
 
449
        reactor = StubReactor([], [Selectable()])
 
450
        jan = _Janitor(None, None, reactor=reactor)
 
451
        self.assertEquals(jan._cleanReactor(), ["(SELECTABLE!)"])
 
452
 
 
453
 
 
454
    def test_postCaseCleanupNoErrors(self):
 
455
        """
 
456
        The post-case cleanup method will return True and not call C{addError}
 
457
        on the result if there are no pending calls.
 
458
        """
 
459
        reactor = StubReactor([])
 
460
        test = object()
 
461
        reporter = StubErrorReporter()
 
462
        jan = _Janitor(test, reporter, reactor=reactor)
 
463
        self.assertTrue(jan.postCaseCleanup())
 
464
        self.assertEquals(reporter.errors, [])
 
465
 
 
466
 
 
467
    def test_postCaseCleanupWithErrors(self):
 
468
        """
 
469
        The post-case cleanup method will return False and call C{addError} on
 
470
        the result with a L{DirtyReactorAggregateError} Failure if there are
 
471
        pending calls.
 
472
        """
 
473
        delayedCall = DelayedCall(300, lambda: None, (), {},
 
474
                                  lambda x: None, lambda x: None,
 
475
                                  seconds=lambda: 0)
 
476
        delayedCallString = str(delayedCall)
 
477
        reactor = StubReactor([delayedCall], [])
 
478
        test = object()
 
479
        reporter = StubErrorReporter()
 
480
        jan = _Janitor(test, reporter, reactor=reactor)
 
481
        self.assertFalse(jan.postCaseCleanup())
 
482
        self.assertEquals(len(reporter.errors), 1)
 
483
        self.assertEquals(reporter.errors[0][1].value.delayedCalls,
 
484
                          [delayedCallString])
 
485
 
 
486
 
 
487
    def test_postClassCleanupNoErrors(self):
 
488
        """
 
489
        The post-class cleanup method will not call C{addError} on the result
 
490
        if there are no pending calls or selectables.
 
491
        """
 
492
        reactor = StubReactor([])
 
493
        test = object()
 
494
        reporter = StubErrorReporter()
 
495
        jan = _Janitor(test, reporter, reactor=reactor)
 
496
        jan.postClassCleanup()
 
497
        self.assertEquals(reporter.errors, [])
 
498
 
 
499
 
 
500
    def test_postClassCleanupWithPendingCallErrors(self):
 
501
        """
 
502
        The post-class cleanup method call C{addError} on the result with a
 
503
        L{DirtyReactorAggregateError} Failure if there are pending calls.
 
504
        """
 
505
        delayedCall = DelayedCall(300, lambda: None, (), {},
 
506
                                  lambda x: None, lambda x: None,
 
507
                                  seconds=lambda: 0)
 
508
        delayedCallString = str(delayedCall)
 
509
        reactor = StubReactor([delayedCall], [])
 
510
        test = object()
 
511
        reporter = StubErrorReporter()
 
512
        jan = _Janitor(test, reporter, reactor=reactor)
 
513
        jan.postClassCleanup()
 
514
        self.assertEquals(len(reporter.errors), 1)
 
515
        self.assertEquals(reporter.errors[0][1].value.delayedCalls,
 
516
                          [delayedCallString])
 
517
 
 
518
 
 
519
    def test_postClassCleanupWithSelectableErrors(self):
 
520
        """
 
521
        The post-class cleanup method call C{addError} on the result with a
 
522
        L{DirtyReactorAggregateError} Failure if there are selectables.
 
523
        """
 
524
        selectable = "SELECTABLE HERE"
 
525
        reactor = StubReactor([], [selectable])
 
526
        test = object()
 
527
        reporter = StubErrorReporter()
 
528
        jan = _Janitor(test, reporter, reactor=reactor)
 
529
        jan.postClassCleanup()
 
530
        self.assertEquals(len(reporter.errors), 1)
 
531
        self.assertEquals(reporter.errors[0][1].value.selectables,
 
532
                          [repr(selectable)])
 
533