~plane1/maus/devel_624

« back to all changes in this revision

Viewing changes to third_party/nose-0.11.3/lib/python/nose/plugins/multiprocess.py

  • Committer: tunnell
  • Date: 2010-09-30 13:56:05 UTC
  • Revision ID: tunnell@itchy-20100930135605-wxbkfgy75p0sndk3
add third party

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
"""
 
2
Overview
 
3
========
 
4
 
 
5
.. warning ::
 
6
 
 
7
   The multiprocess plugin is not available on Windows.
 
8
 
 
9
The multiprocess plugin enables you to distribute your test run among a set of
 
10
worker processes that run tests in parallel. This can speed up CPU-bound test
 
11
runs (as long as the number of work processeses is around the number of
 
12
processors or cores available), but is mainly useful for IO-bound tests that
 
13
spend most of their time waiting for data to arrive from someplace else.
 
14
 
 
15
.. note ::
 
16
 
 
17
   See :doc:`../doc_tests/test_multiprocess/multiprocess` for additional
 
18
   documentation and examples. Use of this plugin requires the
 
19
   multiprocessing_ module, also available from PyPI.
 
20
 
 
21
.. _multiprocessing : http://code.google.com/p/python-multiprocessing/
 
22
 
 
23
How tests are distributed
 
24
=========================
 
25
 
 
26
The ideal case would be to dispatch each test to a worker process
 
27
separately. This ideal is not attainable in all cases, however, because many
 
28
test suites depend on context (class, module or package) fixtures.
 
29
 
 
30
The plugin can't know (unless you tell it -- see below!) if a context fixture
 
31
can be called many times concurrently (is re-entrant), or if it can be shared
 
32
among tests running in different processes. Therefore, if a context has
 
33
fixtures, the default behavior is to dispatch the entire suite to a worker as
 
34
a unit.
 
35
 
 
36
Controlling distribution
 
37
^^^^^^^^^^^^^^^^^^^^^^^^
 
38
 
 
39
There are two context-level variables that you can use to control this default
 
40
behavior.
 
41
 
 
42
If a context's fixtures are re-entrant, set ``_multiprocess_can_split_ = True``
 
43
in the context, and the plugin will dispatch tests in suites bound to that
 
44
context as if the context had no fixtures. This means that the fixtures will
 
45
execute concurrently and multiple times, typically once per test.
 
46
 
 
47
If a context's fixtures can be shared by tests running in different processes
 
48
-- such as a package-level fixture that starts an external http server or
 
49
initializes a shared database -- then set ``_multiprocess_shared_ = True`` in
 
50
the context. These fixtures will then execute in the primary nose process, and
 
51
tests in those contexts will be individually dispatched to run in parallel.
 
52
 
 
53
How results are collected and reported
 
54
======================================
 
55
 
 
56
As each test or suite executes in a worker process, results (failures, errors,
 
57
and specially handled exceptions like SkipTest) are collected in that
 
58
process. When the worker process finishes, it returns results to the main
 
59
nose process. There, any progress output is printed (dots!), and the
 
60
results from the test run are combined into a consolidated result
 
61
set. When results have been received for all dispatched tests, or all
 
62
workers have died, the result summary is output as normal.
 
63
 
 
64
Beware!
 
65
=======
 
66
 
 
67
Not all test suites will benefit from, or even operate correctly using, this
 
68
plugin. For example, CPU-bound tests will run more slowly if you don't have
 
69
multiple processors. There are also some differences in plugin
 
70
interactions and behaviors due to the way in which tests are dispatched and
 
71
loaded. In general, test loading under this plugin operates as if it were
 
72
always in directed mode instead of discovered mode. For instance, doctests
 
73
in test modules will always be found when using this plugin with the doctest
 
74
plugin.
 
75
 
 
76
But the biggest issue you will face is probably concurrency. Unless you
 
77
have kept your tests as religiously pure unit tests, with no side-effects, no
 
78
ordering issues, and no external dependencies, chances are you will experience
 
79
odd, intermittent and unexplainable failures and errors when using this
 
80
plugin. This doesn't necessarily mean the plugin is broken; it may mean that
 
81
your test suite is not safe for concurrency.
 
82
 
 
83
"""
 
84
import logging
 
85
import os
 
86
import sys
 
87
import time
 
88
import traceback
 
89
import unittest
 
90
import pickle
 
91
import nose.case
 
92
from nose.core import TextTestRunner
 
93
from nose import failure
 
94
from nose import loader
 
95
from nose.plugins.base import Plugin
 
96
from nose.result import TextTestResult
 
97
from nose.suite import ContextSuite
 
98
from nose.util import test_address
 
99
try:
 
100
    # 2.7+
 
101
    from unittest.runner import _WritelnDecorator
 
102
except ImportError:
 
103
    from unittest import _WritelnDecorator
 
104
from Queue import Empty
 
105
from warnings import warn
 
106
try:
 
107
    from cStringIO import StringIO
 
108
except ImportError:
 
109
    import StringIO
 
110
 
 
111
log = logging.getLogger(__name__)
 
112
 
 
113
Process = Queue = Pool = Event = None
 
114
 
 
115
def _import_mp():
 
116
    global Process, Queue, Pool, Event
 
117
    try:
 
118
        from multiprocessing import Process as Process_, \
 
119
            Queue as Queue_, Pool as Pool_, Event as Event_
 
120
        Process, Queue, Pool, Event = Process_, Queue_, Pool_, Event_
 
121
    except ImportError:
 
122
        warn("multiprocessing module is not available, multiprocess plugin "
 
123
             "cannot be used", RuntimeWarning)
 
124
 
 
125
 
 
126
class TestLet:
 
127
    def __init__(self, case):
 
128
        try:
 
129
            self._id = case.id()
 
130
        except AttributeError:
 
131
            pass
 
132
        self._short_description = case.shortDescription()
 
133
        self._str = str(case)
 
134
 
 
135
    def id(self):
 
136
        return self._id
 
137
 
 
138
    def shortDescription(self):
 
139
        return self._short_description
 
140
 
 
141
    def __str__(self):
 
142
        return self._str
 
143
 
 
144
 
 
145
class MultiProcess(Plugin):
 
146
    """
 
147
    Run tests in multiple processes. Requires processing module.
 
148
    """
 
149
    score = 1000
 
150
    status = {}
 
151
 
 
152
    def options(self, parser, env):
 
153
        """
 
154
        Register command-line options.
 
155
        """
 
156
        parser.add_option("--processes", action="store",
 
157
                          default=env.get('NOSE_PROCESSES', 0),
 
158
                          dest="multiprocess_workers",
 
159
                          metavar="NUM",
 
160
                          help="Spread test run among this many processes. "
 
161
                          "Set a number equal to the number of processors "
 
162
                          "or cores in your machine for best results. "
 
163
                          "[NOSE_PROCESSES]")
 
164
        parser.add_option("--process-timeout", action="store",
 
165
                          default=env.get('NOSE_PROCESS_TIMEOUT', 10),
 
166
                          dest="multiprocess_timeout",
 
167
                          metavar="SECONDS",
 
168
                          help="Set timeout for return of results from each "
 
169
                          "test runner process. [NOSE_PROCESS_TIMEOUT]")
 
170
 
 
171
    def configure(self, options, config):
 
172
        """
 
173
        Configure plugin.
 
174
        """
 
175
        try:
 
176
            self.status.pop('active')
 
177
        except KeyError:
 
178
            pass
 
179
        if not hasattr(options, 'multiprocess_workers'):
 
180
            self.enabled = False
 
181
            return
 
182
        # don't start inside of a worker process
 
183
        if config.worker:
 
184
            return
 
185
        self.config = config
 
186
        try:
 
187
            workers = int(options.multiprocess_workers)
 
188
        except (TypeError, ValueError):
 
189
            workers = 0
 
190
        if workers:
 
191
            _import_mp()
 
192
            if Process is None:
 
193
                self.enabled = False
 
194
                return
 
195
            self.enabled = True
 
196
            self.config.multiprocess_workers = workers
 
197
            self.config.multiprocess_timeout = int(options.multiprocess_timeout)
 
198
            self.status['active'] = True
 
199
 
 
200
    def prepareTestLoader(self, loader):
 
201
        """Remember loader class so MultiProcessTestRunner can instantiate
 
202
        the right loader.
 
203
        """
 
204
        self.loaderClass = loader.__class__
 
205
 
 
206
    def prepareTestRunner(self, runner):
 
207
        """Replace test runner with MultiProcessTestRunner.
 
208
        """
 
209
        # replace with our runner class
 
210
        return MultiProcessTestRunner(stream=runner.stream,
 
211
                                      verbosity=self.config.verbosity,
 
212
                                      config=self.config,
 
213
                                      loaderClass=self.loaderClass)
 
214
 
 
215
 
 
216
class MultiProcessTestRunner(TextTestRunner):
 
217
 
 
218
    def __init__(self, **kw):
 
219
        self.loaderClass = kw.pop('loaderClass', loader.defaultTestLoader)
 
220
        super(MultiProcessTestRunner, self).__init__(**kw)
 
221
 
 
222
    def run(self, test):
 
223
        """
 
224
        Execute the test (which may be a test suite). If the test is a suite,
 
225
        distribute it out among as many processes as have been configured, at
 
226
        as fine a level as is possible given the context fixtures defined in the
 
227
        suite or any sub-suites.
 
228
 
 
229
        """
 
230
        log.debug("%s.run(%s) (%s)", self, test, os.getpid())
 
231
        wrapper = self.config.plugins.prepareTest(test)
 
232
        if wrapper is not None:
 
233
            test = wrapper
 
234
 
 
235
        # plugins can decorate or capture the output stream
 
236
        wrapped = self.config.plugins.setOutputStream(self.stream)
 
237
        if wrapped is not None:
 
238
            self.stream = wrapped
 
239
 
 
240
        testQueue = Queue()
 
241
        resultQueue = Queue()
 
242
        tasks = {}
 
243
        completed = {}
 
244
        workers = []
 
245
        to_teardown = []
 
246
        shouldStop = Event()
 
247
 
 
248
        result = self._makeResult()
 
249
        start = time.time()
 
250
 
 
251
        # dispatch and collect results
 
252
        # put indexes only on queue because tests aren't picklable
 
253
        for case in self.nextBatch(test):
 
254
            log.debug("Next batch %s (%s)", case, type(case))
 
255
            if (isinstance(case, nose.case.Test) and
 
256
                isinstance(case.test, failure.Failure)):
 
257
                log.debug("Case is a Failure")
 
258
                case(result) # run here to capture the failure
 
259
                continue
 
260
            # handle shared fixtures
 
261
            if isinstance(case, ContextSuite) and self.sharedFixtures(case):
 
262
                log.debug("%s has shared fixtures", case)
 
263
                try:
 
264
                    case.setUp()
 
265
                except (KeyboardInterrupt, SystemExit):
 
266
                    raise
 
267
                except:
 
268
                    log.debug("%s setup failed", sys.exc_info())
 
269
                    result.addError(case, sys.exc_info())
 
270
                else:
 
271
                    to_teardown.append(case)
 
272
                    for _t in case:
 
273
                        test_addr = self.address(_t)
 
274
                        testQueue.put(test_addr, block=False)
 
275
                        tasks[test_addr] = None
 
276
                        log.debug("Queued shared-fixture test %s (%s) to %s",
 
277
                                  len(tasks), test_addr, testQueue)
 
278
 
 
279
            else:
 
280
                test_addr = self.address(case)
 
281
                testQueue.put(test_addr, block=False)
 
282
                tasks[test_addr] = None
 
283
                log.debug("Queued test %s (%s) to %s",
 
284
                          len(tasks), test_addr, testQueue)
 
285
 
 
286
        log.debug("Starting %s workers", self.config.multiprocess_workers)
 
287
        for i in range(self.config.multiprocess_workers):
 
288
            p = Process(target=runner, args=(i,
 
289
                                             testQueue,
 
290
                                             resultQueue,
 
291
                                             shouldStop,
 
292
                                             self.loaderClass,
 
293
                                             result.__class__,
 
294
                                             pickle.dumps(self.config)))
 
295
            # p.setDaemon(True)
 
296
            p.start()
 
297
            workers.append(p)
 
298
            log.debug("Started worker process %s", i+1)
 
299
 
 
300
        num_tasks = len(tasks)
 
301
        while tasks:
 
302
            log.debug("Waiting for results (%s/%s tasks)",
 
303
                      len(completed), num_tasks)
 
304
            try:
 
305
                addr, batch_result = resultQueue.get(
 
306
                    timeout=self.config.multiprocess_timeout)
 
307
                log.debug('Results received for %s', addr)
 
308
                try:
 
309
                    tasks.pop(addr)
 
310
                except KeyError:
 
311
                    log.debug("Got result for unknown task? %s", addr)
 
312
                else:
 
313
                    completed[addr] = batch_result
 
314
                self.consolidate(result, batch_result)
 
315
                if (self.config.stopOnError
 
316
                    and not result.wasSuccessful()):
 
317
                    # set the stop condition
 
318
                    shouldStop.set()
 
319
                    break
 
320
            except Empty:
 
321
                log.debug("Timed out with %s tasks pending", len(tasks))
 
322
                any_alive = False
 
323
                for w in workers:
 
324
                    if w.is_alive():
 
325
                        any_alive = True
 
326
                        break
 
327
                if not any_alive:
 
328
                    log.debug("All workers dead")
 
329
                    break
 
330
        log.debug("Completed %s/%s tasks (%s remain)",
 
331
                  len(completed), num_tasks, len(tasks))
 
332
 
 
333
        for case in to_teardown:
 
334
            log.debug("Tearing down shared fixtures for %s", case)
 
335
            try:
 
336
                case.tearDown()
 
337
            except (KeyboardInterrupt, SystemExit):
 
338
                raise
 
339
            except:
 
340
                result.addError(case, sys.exc_info())
 
341
 
 
342
        stop = time.time()
 
343
 
 
344
        result.printErrors()
 
345
        result.printSummary(start, stop)
 
346
        self.config.plugins.finalize(result)
 
347
 
 
348
        # Tell all workers to stop
 
349
        for w in workers:
 
350
            if w.is_alive():
 
351
                testQueue.put('STOP', block=False)
 
352
 
 
353
        return result
 
354
 
 
355
    def address(self, case):
 
356
        if hasattr(case, 'address'):
 
357
            file, mod, call = case.address()
 
358
        elif hasattr(case, 'context'):
 
359
            file, mod, call = test_address(case.context)
 
360
        else:
 
361
            raise Exception("Unable to convert %s to address" % case)
 
362
        parts = []
 
363
        if file is None:
 
364
            if mod is None:
 
365
                raise Exception("Unaddressable case %s" % case)
 
366
            else:
 
367
                parts.append(mod)
 
368
        else:
 
369
            # strip __init__.py(c) from end of file part
 
370
            # if present, having it there confuses loader
 
371
            dirname, basename = os.path.split(file)
 
372
            if basename.startswith('__init__'):
 
373
                file = dirname
 
374
            parts.append(file)
 
375
        if call is not None:
 
376
            parts.append(call)
 
377
        return ':'.join(map(str, parts))
 
378
 
 
379
    def nextBatch(self, test):
 
380
        # allows tests or suites to mark themselves as not safe
 
381
        # for multiprocess execution
 
382
        if hasattr(test, 'context'):
 
383
            if not getattr(test.context, '_multiprocess_', True):
 
384
                return
 
385
 
 
386
        if ((isinstance(test, ContextSuite)
 
387
             and test.hasFixtures(self.checkCanSplit))
 
388
            or not getattr(test, 'can_split', True)
 
389
            or not isinstance(test, unittest.TestSuite)):
 
390
            # regular test case, or a suite with context fixtures
 
391
 
 
392
            # special case: when run like nosetests path/to/module.py
 
393
            # the top-level suite has only one item, and it shares
 
394
            # the same context as that item. In that case, we want the
 
395
            # item, not the top-level suite
 
396
            if isinstance(test, ContextSuite):
 
397
                contained = list(test)
 
398
                if (len(contained) == 1
 
399
                    and getattr(contained[0], 'context', None) == test.context):
 
400
                    test = contained[0]
 
401
            yield test
 
402
        else:
 
403
            # Suite is without fixtures at this level; but it may have
 
404
            # fixtures at any deeper level, so we need to examine it all
 
405
            # the way down to the case level
 
406
            for case in test:
 
407
                for batch in self.nextBatch(case):
 
408
                    yield batch
 
409
 
 
410
    def checkCanSplit(self, context, fixt):
 
411
        """
 
412
        Callback that we use to check whether the fixtures found in a
 
413
        context or ancestor are ones we care about.
 
414
 
 
415
        Contexts can tell us that their fixtures are reentrant by setting
 
416
        _multiprocess_can_split_. So if we see that, we return False to
 
417
        disregard those fixtures.
 
418
        """
 
419
        if not fixt:
 
420
            return False
 
421
        if getattr(context, '_multiprocess_can_split_', False):
 
422
            return False
 
423
        return True
 
424
 
 
425
    def sharedFixtures(self, case):
 
426
        context = getattr(case, 'context', None)
 
427
        if not context:
 
428
            return False
 
429
        return getattr(context, '_multiprocess_shared_', False)
 
430
 
 
431
    def consolidate(self, result, batch_result):
 
432
        log.debug("batch result is %s" , batch_result)
 
433
        try:
 
434
            output, testsRun, failures, errors, errorClasses = batch_result
 
435
        except ValueError:
 
436
            log.debug("result in unexpected format %s", batch_result)
 
437
            failure.Failure(*sys.exc_info())(result)
 
438
            return
 
439
        self.stream.write(output)
 
440
        result.testsRun += testsRun
 
441
        result.failures.extend(failures)
 
442
        result.errors.extend(errors)
 
443
        for key, (storage, label, isfail) in errorClasses.items():
 
444
            if key not in result.errorClasses:
 
445
                # Ordinarily storage is result attribute
 
446
                # but it's only processed through the errorClasses
 
447
                # dict, so it's ok to fake it here
 
448
                result.errorClasses[key] = ([], label, isfail)
 
449
            mystorage, _junk, _junk = result.errorClasses[key]
 
450
            mystorage.extend(storage)
 
451
        log.debug("Ran %s tests (%s)", testsRun, result.testsRun)
 
452
 
 
453
 
 
454
def runner(ix, testQueue, resultQueue, shouldStop,
 
455
           loaderClass, resultClass, config):
 
456
    config = pickle.loads(config)
 
457
    config.plugins.begin()
 
458
    log.debug("Worker %s executing", ix)
 
459
    log.debug("Active plugins worker %s: %s", ix, config.plugins._plugins)
 
460
    loader = loaderClass(config=config)
 
461
    loader.suiteClass.suiteClass = NoSharedFixtureContextSuite
 
462
    
 
463
    def get():
 
464
        case = testQueue.get(timeout=config.multiprocess_timeout)
 
465
        return case
 
466
 
 
467
    def makeResult():
 
468
        stream = _WritelnDecorator(StringIO())
 
469
        result = resultClass(stream, descriptions=1,
 
470
                             verbosity=config.verbosity,
 
471
                             config=config)
 
472
        plug_result = config.plugins.prepareTestResult(result)
 
473
        if plug_result:
 
474
            return plug_result
 
475
        return result
 
476
 
 
477
    def batch(result):
 
478
        failures = [(TestLet(c), err) for c, err in result.failures]
 
479
        errors = [(TestLet(c), err) for c, err in result.errors]
 
480
        errorClasses = {}
 
481
        for key, (storage, label, isfail) in result.errorClasses.items():
 
482
            errorClasses[key] = ([(TestLet(c), err) for c, err in storage],
 
483
                                 label, isfail)
 
484
        return (
 
485
            result.stream.getvalue(),
 
486
            result.testsRun,
 
487
            failures,
 
488
            errors,
 
489
            errorClasses)
 
490
    try:
 
491
        try:
 
492
            for test_addr in iter(get, 'STOP'):
 
493
                if shouldStop.is_set():
 
494
                    break
 
495
                result = makeResult()
 
496
                test = loader.loadTestsFromNames([test_addr])
 
497
                log.debug("Worker %s Test is %s (%s)", ix, test_addr, test)
 
498
 
 
499
                try:
 
500
                    test(result)
 
501
                    resultQueue.put((test_addr, batch(result)))
 
502
                except KeyboardInterrupt, SystemExit:
 
503
                    raise
 
504
                except:
 
505
                    log.exception("Error running test or returning results")
 
506
                    failure.Failure(*sys.exc_info())(result)
 
507
                    resultQueue.put((test_addr, batch(result)))
 
508
        except Empty:
 
509
            log.debug("Worker %s timed out waiting for tasks", ix)
 
510
    finally:
 
511
        testQueue.close()
 
512
        resultQueue.close()
 
513
    log.debug("Worker %s ending", ix)
 
514
 
 
515
 
 
516
class NoSharedFixtureContextSuite(ContextSuite):
 
517
    """
 
518
    Context suite that never fires shared fixtures.
 
519
 
 
520
    When a context sets _multiprocess_shared_, fixtures in that context
 
521
    are executed by the main process. Using this suite class prevents them
 
522
    from executing in the runner process as well.
 
523
 
 
524
    """
 
525
 
 
526
    def setupContext(self, context):
 
527
        if getattr(context, '_multiprocess_shared_', False):
 
528
            return
 
529
        super(NoSharedFixtureContextSuite, self).setupContext(context)
 
530
 
 
531
    def teardownContext(self, context):
 
532
        if getattr(context, '_multiprocess_shared_', False):
 
533
            return
 
534
        super(NoSharedFixtureContextSuite, self).teardownContext(context)