1
# Copyright (c) 2010 Jonathan M. Lange. See LICENSE for details.
3
"""Evil reactor-spinning logic for running Twisted tests.
5
This code is highly experimental, liable to change and not to be trusted. If
6
you couldn't write this yourself, you should not be using it.
18
'trap_unhandled_errors',
23
from twisted.internet import defer
24
from twisted.internet.interfaces import IReactorThreads
25
from twisted.python.failure import Failure
26
from twisted.python.util import mergeFunctionMetadata
29
class ReentryError(Exception):
30
"""Raised when we try to re-enter a function that forbids it."""
32
def __init__(self, function):
33
Exception.__init__(self,
34
"%r in not re-entrant but was called within a call to itself."
38
def not_reentrant(function, _calls={}):
39
"""Decorates a function as not being re-entrant.
41
The decorated function will raise an error if called from within itself.
43
def decorated(*args, **kwargs):
44
if _calls.get(function, False):
45
raise ReentryError(function)
46
_calls[function] = True
48
return function(*args, **kwargs)
50
_calls[function] = False
51
return mergeFunctionMetadata(function, decorated)
54
class DeferredNotFired(Exception):
55
"""Raised when we extract a result from a Deferred that's not fired yet."""
58
def extract_result(deferred):
59
"""Extract the result from a fired deferred.
61
It can happen that you have an API that returns Deferreds for
62
compatibility with Twisted code, but is in fact synchronous, i.e. the
63
Deferreds it returns have always fired by the time it returns. In this
64
case, you can use this function to convert the result back into the usual
65
form for a synchronous API, i.e. the result itself or a raised exception.
67
It would be very bad form to use this as some way of checking if a
72
deferred.addCallbacks(successes.append, failures.append)
73
if len(failures) == 1:
74
failures[0].raiseException()
75
elif len(successes) == 1:
78
raise DeferredNotFired("%r has not fired yet." % (deferred,))
81
def trap_unhandled_errors(function, *args, **kwargs):
82
"""Run a function, trapping any unhandled errors in Deferreds.
84
Assumes that 'function' will have handled any errors in Deferreds by the
85
time it is complete. This is almost never true of any Twisted code, since
86
you can never tell when someone has added an errback to a Deferred.
88
If 'function' raises, then don't bother doing any unhandled error
89
jiggery-pokery, since something horrible has probably happened anyway.
91
:return: A tuple of '(result, error)', where 'result' is the value returned
92
by 'function' and 'error' is a list of `defer.DebugInfo` objects that
93
have unhandled errors in Deferreds.
95
real_DebugInfo = defer.DebugInfo
98
info = real_DebugInfo()
99
debug_infos.append(info)
101
defer.DebugInfo = DebugInfo
103
result = function(*args, **kwargs)
105
defer.DebugInfo = real_DebugInfo
107
for info in debug_infos:
108
if info.failResult is not None:
110
# Disable the destructor that logs to error. We are already
111
# catching the error here.
112
info.__del__ = lambda: None
113
return result, errors
116
class TimeoutError(Exception):
117
"""Raised when run_in_reactor takes too long to run a function."""
119
def __init__(self, function, timeout):
120
Exception.__init__(self,
121
"%r took longer than %s seconds" % (function, timeout))
124
class NoResultError(Exception):
125
"""Raised when the reactor has stopped but we don't have any result."""
128
Exception.__init__(self,
129
"Tried to get test's result from Deferred when no result is "
130
"available. Probably means we received SIGINT or similar.")
133
class StaleJunkError(Exception):
134
"""Raised when there's junk in the spinner from a previous run."""
136
def __init__(self, junk):
137
Exception.__init__(self,
138
"There was junk in the spinner from a previous run. "
139
"Use clear_junk() to clear it out: %r" % (junk,))
142
class Spinner(object):
143
"""Spin the reactor until a function is done.
145
This class emulates the behaviour of twisted.trial in that it grotesquely
146
and horribly spins the Twisted reactor while a function is running, and
147
then kills the reactor when that function is complete and all the
148
callbacks in its chains are done.
153
# Signals that we save and restore for each spin.
154
_PRESERVED_SIGNALS = [
160
def __init__(self, reactor):
161
self._reactor = reactor
162
self._timeout_call = None
163
self._success = self._UNSET
164
self._failure = self._UNSET
165
self._saved_signals = []
168
def _cancel_timeout(self):
169
if self._timeout_call:
170
self._timeout_call.cancel()
172
def _get_result(self):
173
if self._failure is not self._UNSET:
174
self._failure.raiseException()
175
if self._success is not self._UNSET:
177
raise NoResultError()
179
def _got_failure(self, result):
180
self._cancel_timeout()
181
self._failure = result
183
def _got_success(self, result):
184
self._cancel_timeout()
185
self._success = result
187
def _stop_reactor(self, ignored=None):
188
"""Stop the reactor!"""
189
self._reactor.crash()
191
def _timed_out(self, function, timeout):
192
e = TimeoutError(function, timeout)
193
self._failure = Failure(e)
197
"""Clean up any junk in the reactor."""
199
for delayed_call in self._reactor.getDelayedCalls():
200
delayed_call.cancel()
201
junk.append(delayed_call)
202
for selectable in self._reactor.removeAll():
203
# Twisted sends a 'KILL' signal to selectables that provide
204
# IProcessTransport. Since only _dumbwin32proc processes do this,
205
# we aren't going to bother.
206
junk.append(selectable)
207
if IReactorThreads.providedBy(self._reactor):
208
self._reactor.suggestThreadPoolSize(0)
209
if self._reactor.threadpool is not None:
210
self._reactor._stopThreadPool()
211
self._junk.extend(junk)
214
def clear_junk(self):
215
"""Clear out our recorded junk.
217
:return: Whatever junk was there before.
224
"""Return any junk that has been found on the reactor."""
227
def _save_signals(self):
228
available_signals = [
229
getattr(signal, name, None) for name in self._PRESERVED_SIGNALS]
230
self._saved_signals = [
231
(sig, signal.getsignal(sig)) for sig in available_signals if sig]
233
def _restore_signals(self):
234
for sig, hdlr in self._saved_signals:
235
signal.signal(sig, hdlr)
236
self._saved_signals = []
239
def run(self, timeout, function, *args, **kwargs):
240
"""Run 'function' in a reactor.
242
If 'function' returns a Deferred, the reactor will keep spinning until
243
the Deferred fires and its chain completes or until the timeout is
244
reached -- whichever comes first.
246
:raise TimeoutError: If 'timeout' is reached before the `Deferred`
247
returned by 'function' has completed its callback chain.
248
:raise NoResultError: If the reactor is somehow interrupted before
249
the `Deferred` returned by 'function' has completed its callback
251
:raise StaleJunkError: If there's junk in the spinner from a previous
253
:return: Whatever is at the end of the function's callback chain. If
254
it's an error, then raise that.
256
junk = self.get_junk()
258
raise StaleJunkError(junk)
260
self._timeout_call = self._reactor.callLater(
261
timeout, self._timed_out, function, timeout)
262
# Calling 'stop' on the reactor will make it impossible to re-start
263
# the reactor. Since the default signal handlers for TERM, BREAK and
264
# INT all call reactor.stop(), we'll patch it over with crash.
265
# XXX: It might be a better idea to either install custom signal
266
# handlers or to override the methods that are Twisted's signal
268
stop, self._reactor.stop = self._reactor.stop, self._reactor.crash
270
d = defer.maybeDeferred(function, *args, **kwargs)
271
d.addCallbacks(self._got_success, self._got_failure)
272
d.addBoth(self._stop_reactor)
274
self._reactor.callWhenRunning(run_function)
277
self._reactor.stop = stop
278
self._restore_signals()
280
return self._get_result()