3
from zope.interface import implements
5
from twisted.internet.interfaces import IProcessTransport
6
from twisted.internet import defer
7
from twisted.internet.base import DelayedCall
9
from twisted.trial.unittest import TestCase
10
from twisted.trial import util
11
from twisted.trial.util import DirtyReactorAggregateError, _Janitor
12
from twisted.trial.test import packages
16
class TestMktemp(TestCase):
19
dirs = os.path.dirname(name).split(os.sep)[:-1]
21
dirs, ['twisted.trial.test.test_util', 'TestMktemp', 'test_name'])
23
def test_unique(self):
25
self.failIfEqual(name, self.mktemp())
27
def test_created(self):
29
dirname = os.path.dirname(name)
30
self.failUnless(os.path.exists(dirname))
31
self.failIf(os.path.exists(name))
33
def test_location(self):
34
path = os.path.abspath(self.mktemp())
35
self.failUnless(path.startswith(os.getcwd()))
38
class TestIntrospection(TestCase):
39
def test_containers(self):
41
parents = util.getPythonContainers(
42
suppression.TestSuppression2.testSuppressModule)
43
expected = [suppression.TestSuppression2, suppression]
44
for a, b in zip(parents, expected):
45
self.failUnlessEqual(a, b)
48
class TestFindObject(packages.SysPathManglingTest):
49
def test_importPackage(self):
50
package1 = util.findObject('package')
51
import package as package2
52
self.failUnlessEqual(package1, (True, package2))
54
def test_importModule(self):
55
test_sample2 = util.findObject('goodpackage.test_sample')
56
from goodpackage import test_sample
57
self.failUnlessEqual((True, test_sample), test_sample2)
59
def test_importError(self):
60
self.failUnlessRaises(ZeroDivisionError,
61
util.findObject, 'package.test_bad_module')
63
def test_sophisticatedImportError(self):
64
self.failUnlessRaises(ImportError,
65
util.findObject, 'package2.test_module')
67
def test_importNonexistentPackage(self):
68
self.failUnlessEqual(util.findObject('doesntexist')[0], False)
70
def test_findNonexistentModule(self):
71
self.failUnlessEqual(util.findObject('package.doesntexist')[0], False)
73
def test_findNonexistentObject(self):
74
self.failUnlessEqual(util.findObject(
75
'goodpackage.test_sample.doesnt')[0], False)
76
self.failUnlessEqual(util.findObject(
77
'goodpackage.test_sample.AlphabetTest.doesntexist')[0], False)
79
def test_findObjectExist(self):
80
alpha1 = util.findObject('goodpackage.test_sample.AlphabetTest')
81
from goodpackage import test_sample
82
self.failUnlessEqual(alpha1, (True, test_sample.AlphabetTest))
86
class TestRunSequentially(TestCase):
88
Sometimes it is useful to be able to run an arbitrary list of callables,
91
When some of those callables can return Deferreds, things become complex.
94
def test_emptyList(self):
96
When asked to run an empty list of callables, runSequentially returns a
97
successful Deferred that fires an empty list.
99
d = util._runSequentially([])
100
d.addCallback(self.assertEqual, [])
104
def test_singleSynchronousSuccess(self):
106
When given a callable that succeeds without returning a Deferred,
107
include the return value in the results list, tagged with a SUCCESS
110
d = util._runSequentially([lambda: None])
111
d.addCallback(self.assertEqual, [(defer.SUCCESS, None)])
115
def test_singleSynchronousFailure(self):
117
When given a callable that raises an exception, include a Failure for
118
that exception in the results list, tagged with a FAILURE flag.
120
d = util._runSequentially([lambda: self.fail('foo')])
122
[(flag, fail)] = results
123
fail.trap(self.failureException)
124
self.assertEqual(fail.getErrorMessage(), 'foo')
125
self.assertEqual(flag, defer.FAILURE)
126
return d.addCallback(check)
129
def test_singleAsynchronousSuccess(self):
131
When given a callable that returns a successful Deferred, include the
132
result of the Deferred in the results list, tagged with a SUCCESS flag.
134
d = util._runSequentially([lambda: defer.succeed(None)])
135
d.addCallback(self.assertEqual, [(defer.SUCCESS, None)])
139
def test_singleAsynchronousFailure(self):
141
When given a callable that returns a failing Deferred, include the
142
failure the results list, tagged with a FAILURE flag.
144
d = util._runSequentially([lambda: defer.fail(ValueError('foo'))])
146
[(flag, fail)] = results
147
fail.trap(ValueError)
148
self.assertEqual(fail.getErrorMessage(), 'foo')
149
self.assertEqual(flag, defer.FAILURE)
150
return d.addCallback(check)
153
def test_callablesCalledInOrder(self):
155
Check that the callables are called in the given order, one after the
167
d = util._runSequentially([lambda: append('foo'),
168
lambda: append('bar')])
170
# runSequentially should wait until the Deferred has fired before
171
# running the second callable.
172
self.assertEqual(log, ['foo'])
173
deferreds[-1].callback(None)
174
self.assertEqual(log, ['foo', 'bar'])
176
# Because returning created Deferreds makes jml happy.
177
deferreds[-1].callback(None)
181
def test_continuesAfterError(self):
183
If one of the callables raises an error, then runSequentially continues
184
to run the remaining callables.
186
d = util._runSequentially([lambda: self.fail('foo'), lambda: 'bar'])
188
[(flag1, fail), (flag2, result)] = results
189
fail.trap(self.failureException)
190
self.assertEqual(flag1, defer.FAILURE)
191
self.assertEqual(fail.getErrorMessage(), 'foo')
192
self.assertEqual(flag2, defer.SUCCESS)
193
self.assertEqual(result, 'bar')
194
return d.addCallback(check)
197
def test_stopOnFirstError(self):
199
If the C{stopOnFirstError} option is passed to C{runSequentially}, then
200
no further callables are called after the first exception is raised.
202
d = util._runSequentially([lambda: self.fail('foo'), lambda: 'bar'],
203
stopOnFirstError=True)
205
[(flag1, fail)] = results
206
fail.trap(self.failureException)
207
self.assertEqual(flag1, defer.FAILURE)
208
self.assertEqual(fail.getErrorMessage(), 'foo')
209
return d.addCallback(check)
212
def test_stripFlags(self):
214
If the C{stripFlags} option is passed to C{runSequentially} then the
215
SUCCESS / FAILURE flags are stripped from the output. Instead, the
216
Deferred fires a flat list of results containing only the results and
219
d = util._runSequentially([lambda: self.fail('foo'), lambda: 'bar'],
222
[fail, result] = results
223
fail.trap(self.failureException)
224
self.assertEqual(fail.getErrorMessage(), 'foo')
225
self.assertEqual(result, 'bar')
226
return d.addCallback(check)
227
test_stripFlags.todo = "YAGNI"
231
class DirtyReactorAggregateErrorTest(TestCase):
233
Tests for the L{DirtyReactorAggregateError}.
236
def test_formatDelayedCall(self):
238
Delayed calls are formatted nicely.
240
error = DirtyReactorAggregateError(["Foo", "bar"])
241
self.assertEquals(str(error),
244
DelayedCalls: (set twisted.internet.base.DelayedCall.debug = True to debug)
249
def test_formatSelectables(self):
251
Selectables are formatted nicely.
253
error = DirtyReactorAggregateError([], ["selectable 1", "selectable 2"])
254
self.assertEquals(str(error),
262
def test_formatDelayedCallsAndSelectables(self):
264
Both delayed calls and selectables can appear in the same error.
266
error = DirtyReactorAggregateError(["bleck", "Boozo"],
268
self.assertEquals(str(error),
271
DelayedCalls: (set twisted.internet.base.DelayedCall.debug = True to debug)
280
class StubReactor(object):
282
A reactor stub which contains enough functionality to be used with the
285
@ivar iterations: A list of the arguments passed to L{iterate}.
286
@ivar removeAllCalled: Number of times that L{removeAll} was called.
287
@ivar selectables: The value that will be returned from L{removeAll}.
288
@ivar delayedCalls: The value to return from L{getDelayedCalls}.
291
def __init__(self, delayedCalls, selectables=None):
293
@param delayedCalls: See L{StubReactor.delayedCalls}.
294
@param selectables: See L{StubReactor.selectables}.
296
self.delayedCalls = delayedCalls
298
self.removeAllCalled = 0
301
self.selectables = selectables
304
def iterate(self, timeout=None):
306
Increment C{self.iterations}.
308
self.iterations.append(timeout)
311
def getDelayedCalls(self):
313
Return C{self.delayedCalls}.
315
return self.delayedCalls
320
Increment C{self.removeAllCalled} and return C{self.selectables}.
322
self.removeAllCalled += 1
323
return self.selectables
327
class StubErrorReporter(object):
329
A subset of L{twisted.trial.itrial.IReporter} which records L{addError}
332
@ivar errors: List of two-tuples of (test, error) which were passed to
340
def addError(self, test, error):
342
Record parameters in C{self.errors}.
344
self.errors.append((test, error))
348
class JanitorTests(TestCase):
350
Tests for L{_Janitor}!
353
def test_cleanPendingSpinsReactor(self):
355
During pending-call cleanup, the reactor will be spun twice with an
356
instant timeout. This is not a requirement, it is only a test for
357
current behavior. Hopefully Trial will eventually not do this kind of
360
reactor = StubReactor([])
361
jan = _Janitor(None, None, reactor=reactor)
363
self.assertEquals(reactor.iterations, [0, 0])
366
def test_cleanPendingCancelsCalls(self):
368
During pending-call cleanup, the janitor cancels pending timed calls.
373
delayedCall = DelayedCall(300, func, (), {},
374
cancelled.append, lambda x: None)
375
reactor = StubReactor([delayedCall])
376
jan = _Janitor(None, None, reactor=reactor)
378
self.assertEquals(cancelled, [delayedCall])
381
def test_cleanPendingReturnsDelayedCallStrings(self):
383
The Janitor produces string representations of delayed calls from the
384
delayed call cleanup method. It gets the string representations
385
*before* cancelling the calls; this is important because cancelling the
386
call removes critical debugging information from the string
389
delayedCall = DelayedCall(300, lambda: None, (), {},
390
lambda x: None, lambda x: None,
392
delayedCallString = str(delayedCall)
393
reactor = StubReactor([delayedCall])
394
jan = _Janitor(None, None, reactor=reactor)
395
strings = jan._cleanPending()
396
self.assertEquals(strings, [delayedCallString])
399
def test_cleanReactorRemovesSelectables(self):
401
The Janitor will remove selectables during reactor cleanup.
403
reactor = StubReactor([])
404
jan = _Janitor(None, None, reactor=reactor)
406
self.assertEquals(reactor.removeAllCalled, 1)
409
def test_cleanReactorKillsProcesses(self):
411
The Janitor will kill processes during reactor cleanup.
413
class StubProcessTransport(object):
415
A stub L{IProcessTransport} provider which records signals.
416
@ivar signals: The signals passed to L{signalProcess}.
418
implements(IProcessTransport)
423
def signalProcess(self, signal):
425
Append C{signal} to C{self.signals}.
427
self.signals.append(signal)
429
pt = StubProcessTransport()
430
reactor = StubReactor([], [pt])
431
jan = _Janitor(None, None, reactor=reactor)
433
self.assertEquals(pt.signals, ["KILL"])
436
def test_cleanReactorReturnsSelectableStrings(self):
438
The Janitor returns string representations of the selectables that it
439
cleaned up from the reactor cleanup method.
441
class Selectable(object):
443
A stub Selectable which only has an interesting string
447
return "(SELECTABLE!)"
449
reactor = StubReactor([], [Selectable()])
450
jan = _Janitor(None, None, reactor=reactor)
451
self.assertEquals(jan._cleanReactor(), ["(SELECTABLE!)"])
454
def test_postCaseCleanupNoErrors(self):
456
The post-case cleanup method will return True and not call C{addError}
457
on the result if there are no pending calls.
459
reactor = StubReactor([])
461
reporter = StubErrorReporter()
462
jan = _Janitor(test, reporter, reactor=reactor)
463
self.assertTrue(jan.postCaseCleanup())
464
self.assertEquals(reporter.errors, [])
467
def test_postCaseCleanupWithErrors(self):
469
The post-case cleanup method will return False and call C{addError} on
470
the result with a L{DirtyReactorAggregateError} Failure if there are
473
delayedCall = DelayedCall(300, lambda: None, (), {},
474
lambda x: None, lambda x: None,
476
delayedCallString = str(delayedCall)
477
reactor = StubReactor([delayedCall], [])
479
reporter = StubErrorReporter()
480
jan = _Janitor(test, reporter, reactor=reactor)
481
self.assertFalse(jan.postCaseCleanup())
482
self.assertEquals(len(reporter.errors), 1)
483
self.assertEquals(reporter.errors[0][1].value.delayedCalls,
487
def test_postClassCleanupNoErrors(self):
489
The post-class cleanup method will not call C{addError} on the result
490
if there are no pending calls or selectables.
492
reactor = StubReactor([])
494
reporter = StubErrorReporter()
495
jan = _Janitor(test, reporter, reactor=reactor)
496
jan.postClassCleanup()
497
self.assertEquals(reporter.errors, [])
500
def test_postClassCleanupWithPendingCallErrors(self):
502
The post-class cleanup method call C{addError} on the result with a
503
L{DirtyReactorAggregateError} Failure if there are pending calls.
505
delayedCall = DelayedCall(300, lambda: None, (), {},
506
lambda x: None, lambda x: None,
508
delayedCallString = str(delayedCall)
509
reactor = StubReactor([delayedCall], [])
511
reporter = StubErrorReporter()
512
jan = _Janitor(test, reporter, reactor=reactor)
513
jan.postClassCleanup()
514
self.assertEquals(len(reporter.errors), 1)
515
self.assertEquals(reporter.errors[0][1].value.delayedCalls,
519
def test_postClassCleanupWithSelectableErrors(self):
521
The post-class cleanup method call C{addError} on the result with a
522
L{DirtyReactorAggregateError} Failure if there are selectables.
524
selectable = "SELECTABLE HERE"
525
reactor = StubReactor([], [selectable])
527
reporter = StubErrorReporter()
528
jan = _Janitor(test, reporter, reactor=reactor)
529
jan.postClassCleanup()
530
self.assertEquals(len(reporter.errors), 1)
531
self.assertEquals(reporter.errors[0][1].value.selectables,