7
The multiprocess plugin is not available on Windows.
9
The multiprocess plugin enables you to distribute your test run among a set of
10
worker processes that run tests in parallel. This can speed up CPU-bound test
11
runs (as long as the number of work processeses is around the number of
12
processors or cores available), but is mainly useful for IO-bound tests that
13
spend most of their time waiting for data to arrive from someplace else.
17
See :doc:`../doc_tests/test_multiprocess/multiprocess` for additional
18
documentation and examples. Use of this plugin requires the
19
multiprocessing_ module, also available from PyPI.
21
.. _multiprocessing : http://code.google.com/p/python-multiprocessing/
23
How tests are distributed
24
=========================
26
The ideal case would be to dispatch each test to a worker process
27
separately. This ideal is not attainable in all cases, however, because many
28
test suites depend on context (class, module or package) fixtures.
30
The plugin can't know (unless you tell it -- see below!) if a context fixture
31
can be called many times concurrently (is re-entrant), or if it can be shared
32
among tests running in different processes. Therefore, if a context has
33
fixtures, the default behavior is to dispatch the entire suite to a worker as
36
Controlling distribution
37
^^^^^^^^^^^^^^^^^^^^^^^^
39
There are two context-level variables that you can use to control this default
42
If a context's fixtures are re-entrant, set ``_multiprocess_can_split_ = True``
43
in the context, and the plugin will dispatch tests in suites bound to that
44
context as if the context had no fixtures. This means that the fixtures will
45
execute concurrently and multiple times, typically once per test.
47
If a context's fixtures can be shared by tests running in different processes
48
-- such as a package-level fixture that starts an external http server or
49
initializes a shared database -- then set ``_multiprocess_shared_ = True`` in
50
the context. These fixtures will then execute in the primary nose process, and
51
tests in those contexts will be individually dispatched to run in parallel.
53
How results are collected and reported
54
======================================
56
As each test or suite executes in a worker process, results (failures, errors,
57
and specially handled exceptions like SkipTest) are collected in that
58
process. When the worker process finishes, it returns results to the main
59
nose process. There, any progress output is printed (dots!), and the
60
results from the test run are combined into a consolidated result
61
set. When results have been received for all dispatched tests, or all
62
workers have died, the result summary is output as normal.
67
Not all test suites will benefit from, or even operate correctly using, this
68
plugin. For example, CPU-bound tests will run more slowly if you don't have
69
multiple processors. There are also some differences in plugin
70
interactions and behaviors due to the way in which tests are dispatched and
71
loaded. In general, test loading under this plugin operates as if it were
72
always in directed mode instead of discovered mode. For instance, doctests
73
in test modules will always be found when using this plugin with the doctest
76
But the biggest issue you will face is probably concurrency. Unless you
77
have kept your tests as religiously pure unit tests, with no side-effects, no
78
ordering issues, and no external dependencies, chances are you will experience
79
odd, intermittent and unexplainable failures and errors when using this
80
plugin. This doesn't necessarily mean the plugin is broken; it may mean that
81
your test suite is not safe for concurrency.
92
from nose.core import TextTestRunner
93
from nose import failure
94
from nose import loader
95
from nose.plugins.base import Plugin
96
from nose.result import TextTestResult
97
from nose.suite import ContextSuite
98
from nose.util import test_address
101
from unittest.runner import _WritelnDecorator
103
from unittest import _WritelnDecorator
104
from Queue import Empty
105
from warnings import warn
107
from cStringIO import StringIO
111
log = logging.getLogger(__name__)
113
Process = Queue = Pool = Event = None
116
global Process, Queue, Pool, Event
118
from multiprocessing import Process as Process_, \
119
Queue as Queue_, Pool as Pool_, Event as Event_
120
Process, Queue, Pool, Event = Process_, Queue_, Pool_, Event_
122
warn("multiprocessing module is not available, multiprocess plugin "
123
"cannot be used", RuntimeWarning)
127
def __init__(self, case):
130
except AttributeError:
132
self._short_description = case.shortDescription()
133
self._str = str(case)
138
def shortDescription(self):
139
return self._short_description
145
class MultiProcess(Plugin):
147
Run tests in multiple processes. Requires processing module.
152
def options(self, parser, env):
154
Register command-line options.
156
parser.add_option("--processes", action="store",
157
default=env.get('NOSE_PROCESSES', 0),
158
dest="multiprocess_workers",
160
help="Spread test run among this many processes. "
161
"Set a number equal to the number of processors "
162
"or cores in your machine for best results. "
164
parser.add_option("--process-timeout", action="store",
165
default=env.get('NOSE_PROCESS_TIMEOUT', 10),
166
dest="multiprocess_timeout",
168
help="Set timeout for return of results from each "
169
"test runner process. [NOSE_PROCESS_TIMEOUT]")
171
def configure(self, options, config):
176
self.status.pop('active')
179
if not hasattr(options, 'multiprocess_workers'):
182
# don't start inside of a worker process
187
workers = int(options.multiprocess_workers)
188
except (TypeError, ValueError):
196
self.config.multiprocess_workers = workers
197
self.config.multiprocess_timeout = int(options.multiprocess_timeout)
198
self.status['active'] = True
200
def prepareTestLoader(self, loader):
201
"""Remember loader class so MultiProcessTestRunner can instantiate
204
self.loaderClass = loader.__class__
206
def prepareTestRunner(self, runner):
207
"""Replace test runner with MultiProcessTestRunner.
209
# replace with our runner class
210
return MultiProcessTestRunner(stream=runner.stream,
211
verbosity=self.config.verbosity,
213
loaderClass=self.loaderClass)
216
class MultiProcessTestRunner(TextTestRunner):
218
def __init__(self, **kw):
219
self.loaderClass = kw.pop('loaderClass', loader.defaultTestLoader)
220
super(MultiProcessTestRunner, self).__init__(**kw)
224
Execute the test (which may be a test suite). If the test is a suite,
225
distribute it out among as many processes as have been configured, at
226
as fine a level as is possible given the context fixtures defined in the
227
suite or any sub-suites.
230
log.debug("%s.run(%s) (%s)", self, test, os.getpid())
231
wrapper = self.config.plugins.prepareTest(test)
232
if wrapper is not None:
235
# plugins can decorate or capture the output stream
236
wrapped = self.config.plugins.setOutputStream(self.stream)
237
if wrapped is not None:
238
self.stream = wrapped
241
resultQueue = Queue()
248
result = self._makeResult()
251
# dispatch and collect results
252
# put indexes only on queue because tests aren't picklable
253
for case in self.nextBatch(test):
254
log.debug("Next batch %s (%s)", case, type(case))
255
if (isinstance(case, nose.case.Test) and
256
isinstance(case.test, failure.Failure)):
257
log.debug("Case is a Failure")
258
case(result) # run here to capture the failure
260
# handle shared fixtures
261
if isinstance(case, ContextSuite) and self.sharedFixtures(case):
262
log.debug("%s has shared fixtures", case)
265
except (KeyboardInterrupt, SystemExit):
268
log.debug("%s setup failed", sys.exc_info())
269
result.addError(case, sys.exc_info())
271
to_teardown.append(case)
273
test_addr = self.address(_t)
274
testQueue.put(test_addr, block=False)
275
tasks[test_addr] = None
276
log.debug("Queued shared-fixture test %s (%s) to %s",
277
len(tasks), test_addr, testQueue)
280
test_addr = self.address(case)
281
testQueue.put(test_addr, block=False)
282
tasks[test_addr] = None
283
log.debug("Queued test %s (%s) to %s",
284
len(tasks), test_addr, testQueue)
286
log.debug("Starting %s workers", self.config.multiprocess_workers)
287
for i in range(self.config.multiprocess_workers):
288
p = Process(target=runner, args=(i,
294
pickle.dumps(self.config)))
298
log.debug("Started worker process %s", i+1)
300
num_tasks = len(tasks)
302
log.debug("Waiting for results (%s/%s tasks)",
303
len(completed), num_tasks)
305
addr, batch_result = resultQueue.get(
306
timeout=self.config.multiprocess_timeout)
307
log.debug('Results received for %s', addr)
311
log.debug("Got result for unknown task? %s", addr)
313
completed[addr] = batch_result
314
self.consolidate(result, batch_result)
315
if (self.config.stopOnError
316
and not result.wasSuccessful()):
317
# set the stop condition
321
log.debug("Timed out with %s tasks pending", len(tasks))
328
log.debug("All workers dead")
330
log.debug("Completed %s/%s tasks (%s remain)",
331
len(completed), num_tasks, len(tasks))
333
for case in to_teardown:
334
log.debug("Tearing down shared fixtures for %s", case)
337
except (KeyboardInterrupt, SystemExit):
340
result.addError(case, sys.exc_info())
345
result.printSummary(start, stop)
346
self.config.plugins.finalize(result)
348
# Tell all workers to stop
351
testQueue.put('STOP', block=False)
355
def address(self, case):
356
if hasattr(case, 'address'):
357
file, mod, call = case.address()
358
elif hasattr(case, 'context'):
359
file, mod, call = test_address(case.context)
361
raise Exception("Unable to convert %s to address" % case)
365
raise Exception("Unaddressable case %s" % case)
369
# strip __init__.py(c) from end of file part
370
# if present, having it there confuses loader
371
dirname, basename = os.path.split(file)
372
if basename.startswith('__init__'):
377
return ':'.join(map(str, parts))
379
def nextBatch(self, test):
380
# allows tests or suites to mark themselves as not safe
381
# for multiprocess execution
382
if hasattr(test, 'context'):
383
if not getattr(test.context, '_multiprocess_', True):
386
if ((isinstance(test, ContextSuite)
387
and test.hasFixtures(self.checkCanSplit))
388
or not getattr(test, 'can_split', True)
389
or not isinstance(test, unittest.TestSuite)):
390
# regular test case, or a suite with context fixtures
392
# special case: when run like nosetests path/to/module.py
393
# the top-level suite has only one item, and it shares
394
# the same context as that item. In that case, we want the
395
# item, not the top-level suite
396
if isinstance(test, ContextSuite):
397
contained = list(test)
398
if (len(contained) == 1
399
and getattr(contained[0], 'context', None) == test.context):
403
# Suite is without fixtures at this level; but it may have
404
# fixtures at any deeper level, so we need to examine it all
405
# the way down to the case level
407
for batch in self.nextBatch(case):
410
def checkCanSplit(self, context, fixt):
412
Callback that we use to check whether the fixtures found in a
413
context or ancestor are ones we care about.
415
Contexts can tell us that their fixtures are reentrant by setting
416
_multiprocess_can_split_. So if we see that, we return False to
417
disregard those fixtures.
421
if getattr(context, '_multiprocess_can_split_', False):
425
def sharedFixtures(self, case):
426
context = getattr(case, 'context', None)
429
return getattr(context, '_multiprocess_shared_', False)
431
def consolidate(self, result, batch_result):
432
log.debug("batch result is %s" , batch_result)
434
output, testsRun, failures, errors, errorClasses = batch_result
436
log.debug("result in unexpected format %s", batch_result)
437
failure.Failure(*sys.exc_info())(result)
439
self.stream.write(output)
440
result.testsRun += testsRun
441
result.failures.extend(failures)
442
result.errors.extend(errors)
443
for key, (storage, label, isfail) in errorClasses.items():
444
if key not in result.errorClasses:
445
# Ordinarily storage is result attribute
446
# but it's only processed through the errorClasses
447
# dict, so it's ok to fake it here
448
result.errorClasses[key] = ([], label, isfail)
449
mystorage, _junk, _junk = result.errorClasses[key]
450
mystorage.extend(storage)
451
log.debug("Ran %s tests (%s)", testsRun, result.testsRun)
454
def runner(ix, testQueue, resultQueue, shouldStop,
455
loaderClass, resultClass, config):
456
config = pickle.loads(config)
457
config.plugins.begin()
458
log.debug("Worker %s executing", ix)
459
log.debug("Active plugins worker %s: %s", ix, config.plugins._plugins)
460
loader = loaderClass(config=config)
461
loader.suiteClass.suiteClass = NoSharedFixtureContextSuite
464
case = testQueue.get(timeout=config.multiprocess_timeout)
468
stream = _WritelnDecorator(StringIO())
469
result = resultClass(stream, descriptions=1,
470
verbosity=config.verbosity,
472
plug_result = config.plugins.prepareTestResult(result)
478
failures = [(TestLet(c), err) for c, err in result.failures]
479
errors = [(TestLet(c), err) for c, err in result.errors]
481
for key, (storage, label, isfail) in result.errorClasses.items():
482
errorClasses[key] = ([(TestLet(c), err) for c, err in storage],
485
result.stream.getvalue(),
492
for test_addr in iter(get, 'STOP'):
493
if shouldStop.is_set():
495
result = makeResult()
496
test = loader.loadTestsFromNames([test_addr])
497
log.debug("Worker %s Test is %s (%s)", ix, test_addr, test)
501
resultQueue.put((test_addr, batch(result)))
502
except KeyboardInterrupt, SystemExit:
505
log.exception("Error running test or returning results")
506
failure.Failure(*sys.exc_info())(result)
507
resultQueue.put((test_addr, batch(result)))
509
log.debug("Worker %s timed out waiting for tasks", ix)
513
log.debug("Worker %s ending", ix)
516
class NoSharedFixtureContextSuite(ContextSuite):
518
Context suite that never fires shared fixtures.
520
When a context sets _multiprocess_shared_, fixtures in that context
521
are executed by the main process. Using this suite class prevents them
522
from executing in the runner process as well.
526
def setupContext(self, context):
527
if getattr(context, '_multiprocess_shared_', False):
529
super(NoSharedFixtureContextSuite, self).setupContext(context)
531
def teardownContext(self, context):
532
if getattr(context, '_multiprocess_shared_', False):
534
super(NoSharedFixtureContextSuite, self).teardownContext(context)