~landscape/zope3/newer-from-ztk

« back to all changes in this revision

Viewing changes to src/zope/testing/testrunner/runner.py

  • Committer: Thomas Hervé
  • Date: 2009-09-21 16:46:07 UTC
  • Revision ID: thomas@canonical.com-20090921164607-sky3xhlt02ji80ka
Revert r8: regression with test failures

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
##############################################################################
2
 
#
3
 
# Copyright (c) 2004-2008 Zope Corporation and Contributors.
4
 
# All Rights Reserved.
5
 
#
6
 
# This software is subject to the provisions of the Zope Public License,
7
 
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
8
 
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
9
 
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
10
 
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
11
 
# FOR A PARTICULAR PURPOSE.
12
 
#
13
 
##############################################################################
14
 
"""Test execution
15
 
 
16
 
$Id: __init__.py 86232 2008-05-03 15:09:33Z ctheune $
17
 
"""
18
 
 
19
 
import subprocess
20
 
 
21
 
import cStringIO
22
 
import gc
23
 
import Queue
24
 
import re
25
 
import sys
26
 
import threading
27
 
import time
28
 
import traceback
29
 
import unittest
30
 
 
31
 
from zope.testing.testrunner.find import import_name
32
 
from zope.testing.testrunner.find import name_from_layer, _layer_name_cache
33
 
from zope.testing.testrunner.refcount import TrackRefs
34
 
from zope.testing.testrunner.options import get_options
35
 
import zope.testing.testrunner.coverage
36
 
import zope.testing.testrunner.doctest
37
 
import zope.testing.testrunner.logsupport
38
 
import zope.testing.testrunner.selftest
39
 
import zope.testing.testrunner.profiling
40
 
import zope.testing.testrunner.filter
41
 
import zope.testing.testrunner.garbagecollection
42
 
import zope.testing.testrunner.listing
43
 
import zope.testing.testrunner.statistics
44
 
import zope.testing.testrunner.process
45
 
import zope.testing.testrunner.interfaces
46
 
import zope.testing.testrunner.debug
47
 
import zope.testing.testrunner.tb_format
48
 
 
49
 
 
50
 
PYREFCOUNT_PATTERN = re.compile('\[[0-9]+ refs\]')
51
 
 
52
 
is_jython = sys.platform.startswith('java')
53
 
 
54
 
 
55
 
class SubprocessError(Exception):
56
 
    """An error occurred when running a subprocess
57
 
    """
58
 
 
59
 
    def __init__(self, reason, stderr):
60
 
        self.reason = reason
61
 
        self.stderr = stderr
62
 
 
63
 
    def __str__(self):
64
 
        return '%s: %s' % (self.reason, self.stderr)
65
 
 
66
 
 
67
 
class CanNotTearDown(Exception):
68
 
    "Couldn't tear down a test"
69
 
 
70
 
 
71
 
class Runner(object):
72
 
    """The test runner.
73
 
 
74
 
    It is the central point of this package and responsible for finding and
75
 
    executing tests as well as configuring itself from the (command-line)
76
 
    options passed into it.
77
 
 
78
 
    """
79
 
 
80
 
    def __init__(self, defaults=None, args=None, found_suites=None,
81
 
                 options=None, script_parts=None):
82
 
        self.defaults = defaults
83
 
        self.args = args
84
 
        self.found_suites = found_suites
85
 
        self.options = options
86
 
        self.script_parts = script_parts
87
 
        self.failed = True
88
 
 
89
 
        self.ran = 0
90
 
        self.failures = []
91
 
        self.errors = []
92
 
 
93
 
        self.show_report = True
94
 
        self.do_run_tests = True
95
 
 
96
 
        self.features = []
97
 
 
98
 
        self.tests_by_layer_name = {}
99
 
 
100
 
    def ordered_layers(self):
101
 
        layer_names = dict([(layer_from_name(layer_name), layer_name)
102
 
                            for layer_name in self.tests_by_layer_name])
103
 
        for layer in order_by_bases(layer_names):
104
 
            layer_name = layer_names[layer]
105
 
            yield layer_name, layer, self.tests_by_layer_name[layer_name]
106
 
 
107
 
    def register_tests(self, tests):
108
 
        """Registers tests."""
109
 
        # XXX To support multiple features that find tests this shouldn't be
110
 
        # an update but merge the various layers individually.
111
 
        self.tests_by_layer_name.update(tests)
112
 
 
113
 
    def run(self):
114
 
        self.configure()
115
 
        if self.options.fail:
116
 
            return True
117
 
 
118
 
        # XXX Hacky to support existing code.
119
 
        self.layer_name_cache = _layer_name_cache
120
 
        self.layer_name_cache.clear()
121
 
 
122
 
        # Global setup
123
 
        for feature in self.features:
124
 
            feature.global_setup()
125
 
 
126
 
        # Late setup
127
 
        #
128
 
        # Some system tools like profilers are really bad with stack frames.
129
 
        # E.g. hotshot doesn't like it when we leave the stack frame that we
130
 
        # called start() from.
131
 
        for feature in self.features:
132
 
            feature.late_setup()
133
 
 
134
 
        try:
135
 
            if self.do_run_tests:
136
 
                self.run_tests()
137
 
        finally:
138
 
            # Early teardown
139
 
            for feature in reversed(self.features):
140
 
                feature.early_teardown()
141
 
            # Global teardown
142
 
            for feature in reversed(self.features):
143
 
                feature.global_teardown()
144
 
 
145
 
        if self.show_report:
146
 
            for feature in self.features:
147
 
                feature.report()
148
 
 
149
 
    def configure(self):
150
 
        if self.args is None:
151
 
            self.args = sys.argv[:]
152
 
        # Check to see if we are being run as a subprocess. If we are,
153
 
        # then use the resume-layer and defaults passed in.
154
 
        if len(self.args) > 1 and self.args[1] == '--resume-layer':
155
 
            self.args.pop(1)
156
 
            resume_layer = self.args.pop(1)
157
 
            resume_number = int(self.args.pop(1))
158
 
            self.defaults = []
159
 
            while len(self.args) > 1 and self.args[1] == '--default':
160
 
                self.args.pop(1)
161
 
                self.defaults.append(self.args.pop(1))
162
 
 
163
 
            sys.stdin = FakeInputContinueGenerator()
164
 
        else:
165
 
            resume_layer = resume_number = None
166
 
 
167
 
        options = get_options(self.args, self.defaults)
168
 
 
169
 
        options.testrunner_defaults = self.defaults
170
 
        options.resume_layer = resume_layer
171
 
        options.resume_number = resume_number
172
 
 
173
 
        self.options = options
174
 
 
175
 
        self.features.append(zope.testing.testrunner.selftest.SelfTest(self))
176
 
        self.features.append(zope.testing.testrunner.logsupport.Logging(self))
177
 
        self.features.append(zope.testing.testrunner.coverage.Coverage(self))
178
 
        self.features.append(zope.testing.testrunner.doctest.DocTest(self))
179
 
        self.features.append(zope.testing.testrunner.profiling.Profiling(self))
180
 
        if is_jython:
181
 
            # Jython GC support is not yet implemented
182
 
            pass
183
 
        else:
184
 
            self.features.append(
185
 
                zope.testing.testrunner.garbagecollection.Threshold(self))
186
 
            self.features.append(
187
 
                zope.testing.testrunner.garbagecollection.Debug(self))
188
 
 
189
 
        self.features.append(zope.testing.testrunner.find.Find(self))
190
 
        self.features.append(zope.testing.testrunner.process.SubProcess(self))
191
 
        self.features.append(zope.testing.testrunner.filter.Filter(self))
192
 
        self.features.append(zope.testing.testrunner.listing.Listing(self))
193
 
        self.features.append(
194
 
            zope.testing.testrunner.statistics.Statistics(self))
195
 
        self.features.append(zope.testing.testrunner.tb_format.Traceback(self))
196
 
 
197
 
        # Remove all features that aren't activated
198
 
        self.features = [f for f in self.features if f.active]
199
 
 
200
 
    def run_tests(self):
201
 
        """Run all tests that were registered.
202
 
 
203
 
        Returns True if there where failures or False if all tests passed.
204
 
 
205
 
        """
206
 
        setup_layers = {}
207
 
        layers_to_run = list(self.ordered_layers())
208
 
        should_resume = False
209
 
 
210
 
        while layers_to_run:
211
 
            layer_name, layer, tests = layers_to_run[0]
212
 
            for feature in self.features:
213
 
                feature.layer_setup(layer)
214
 
            try:
215
 
                self.ran += run_layer(self.options, layer_name, layer, tests,
216
 
                                      setup_layers, self.failures, self.errors)
217
 
            except zope.testing.testrunner.interfaces.EndRun:
218
 
                self.failed = True
219
 
                return
220
 
            except CanNotTearDown:
221
 
                if not self.options.resume_layer:
222
 
                    should_resume = True
223
 
                    break
224
 
 
225
 
            layers_to_run.pop(0)
226
 
            if self.options.processes > 1:
227
 
                should_resume = True
228
 
                break
229
 
 
230
 
        if should_resume:
231
 
            setup_layers = None
232
 
            if layers_to_run:
233
 
                self.ran += resume_tests(
234
 
                    self.script_parts, self.options, self.features,
235
 
                    layers_to_run, self.failures, self.errors)
236
 
 
237
 
        if setup_layers:
238
 
            if self.options.resume_layer is None:
239
 
                self.options.output.info("Tearing down left over layers:")
240
 
            tear_down_unneeded(self.options, (), setup_layers, True)
241
 
 
242
 
        self.failed = bool(self.import_errors or self.failures or self.errors)
243
 
 
244
 
 
245
 
def run_tests(options, tests, name, failures, errors):
246
 
    repeat = options.repeat or 1
247
 
    repeat_range = iter(range(repeat))
248
 
    ran = 0
249
 
 
250
 
    output = options.output
251
 
 
252
 
    if is_jython:
253
 
        # Jython has no GC suppport - set count to 0
254
 
        lgarbage = 0
255
 
    else:
256
 
        gc.collect()
257
 
        lgarbage = len(gc.garbage)
258
 
 
259
 
    sumrc = 0
260
 
    if options.report_refcounts:
261
 
        if options.verbose:
262
 
            # XXX This code path is untested
263
 
            track = TrackRefs()
264
 
        rc = sys.gettotalrefcount()
265
 
 
266
 
    for iteration in repeat_range:
267
 
        if repeat > 1:
268
 
            output.info("Iteration %d" % (iteration + 1))
269
 
 
270
 
        if options.verbose > 0 or options.progress:
271
 
            output.info('  Running:')
272
 
        result = TestResult(options, tests, layer_name=name)
273
 
 
274
 
        t = time.time()
275
 
 
276
 
        if options.post_mortem:
277
 
            # post-mortem debugging
278
 
            for test in tests:
279
 
                if result.shouldStop:
280
 
                    break
281
 
                result.startTest(test)
282
 
                state = test.__dict__.copy()
283
 
                try:
284
 
                    try:
285
 
                        test.debug()
286
 
                    except KeyboardInterrupt:
287
 
                        raise
288
 
                    except:
289
 
                        result.addError(
290
 
                            test,
291
 
                            sys.exc_info()[:2] + (sys.exc_info()[2].tb_next, ),
292
 
                            )
293
 
                    else:
294
 
                        result.addSuccess(test)
295
 
                finally:
296
 
                    result.stopTest(test)
297
 
                test.__dict__.clear()
298
 
                test.__dict__.update(state)
299
 
 
300
 
        else:
301
 
            # normal
302
 
            for test in tests:
303
 
                if result.shouldStop:
304
 
                    break
305
 
                state = test.__dict__.copy()
306
 
                test(result)
307
 
                test.__dict__.clear()
308
 
                test.__dict__.update(state)
309
 
 
310
 
        t = time.time() - t
311
 
        output.stop_tests()
312
 
        failures.extend(result.failures)
313
 
        errors.extend(result.errors)
314
 
        output.summary(result.testsRun, len(result.failures),
315
 
            len(result.errors), t)
316
 
        ran = result.testsRun
317
 
 
318
 
        if is_jython:
319
 
            lgarbage = 0
320
 
        else:
321
 
            gc.collect()
322
 
            if len(gc.garbage) > lgarbage:
323
 
                output.garbage(gc.garbage[lgarbage:])
324
 
                lgarbage = len(gc.garbage)
325
 
 
326
 
        if options.report_refcounts:
327
 
 
328
 
            # If we are being tested, we don't want stdout itself to
329
 
            # foul up the numbers. :)
330
 
            try:
331
 
                sys.stdout.getvalue()
332
 
            except AttributeError:
333
 
                pass
334
 
 
335
 
            prev = rc
336
 
            rc = sys.gettotalrefcount()
337
 
            if options.verbose:
338
 
                track.update()
339
 
                if iteration > 0:
340
 
                    output.detailed_refcounts(track, rc, prev)
341
 
                else:
342
 
                    track.delta = None
343
 
            elif iteration > 0:
344
 
                output.refcounts(rc, prev)
345
 
 
346
 
    return ran
347
 
 
348
 
 
349
 
def run_layer(options, layer_name, layer, tests, setup_layers,
350
 
              failures, errors):
351
 
 
352
 
    output = options.output
353
 
    gathered = []
354
 
    gather_layers(layer, gathered)
355
 
    needed = dict([(l, 1) for l in gathered])
356
 
    if options.resume_number != 0:
357
 
        output.info("Running %s tests:" % layer_name)
358
 
    tear_down_unneeded(options, needed, setup_layers)
359
 
 
360
 
    if options.resume_layer is not None:
361
 
        output.info_suboptimal("  Running in a subprocess.")
362
 
 
363
 
    try:
364
 
        setup_layer(options, layer, setup_layers)
365
 
    except zope.testing.testrunner.interfaces.EndRun:
366
 
        raise
367
 
    except Exception:
368
 
        f = cStringIO.StringIO()
369
 
        traceback.print_exc(file=f)
370
 
        output.error(f.getvalue())
371
 
        errors.append((SetUpLayerFailure(), sys.exc_info()))
372
 
        return 0
373
 
    else:
374
 
        return run_tests(options, tests, layer_name, failures, errors)
375
 
 
376
 
 
377
 
class SetUpLayerFailure(unittest.TestCase):
378
 
 
379
 
    def runTest(self):
380
 
        "Layer set up failure."
381
 
 
382
 
 
383
 
def spawn_layer_in_subprocess(result, script_parts, options, features,
384
 
                              layer_name, layer, failures, errors,
385
 
                              resume_number):
386
 
    try:
387
 
        # BBB
388
 
        if script_parts is None:
389
 
            script_parts = sys.argv[0:1]
390
 
        args = [sys.executable]
391
 
        args.extend(script_parts)
392
 
        args.extend(['--resume-layer', layer_name, str(resume_number)])
393
 
        for d in options.testrunner_defaults:
394
 
            args.extend(['--default', d])
395
 
 
396
 
        args.extend(options.original_testrunner_args[1:])
397
 
 
398
 
        # this is because of a bug in Python (http://www.python.org/sf/900092)
399
 
        if (options.profile == 'hotshot'
400
 
            and sys.version_info[:3] <= (2, 4, 1)):
401
 
            args.insert(1, '-O')
402
 
 
403
 
        if sys.platform.startswith('win'):
404
 
            args = args[0] + ' ' + ' '.join([
405
 
                ('"' + a.replace('\\', '\\\\').replace('"', '\\"') + '"')
406
 
                for a in args[1:]])
407
 
 
408
 
        for feature in features:
409
 
            feature.layer_setup(layer)
410
 
 
411
 
        child = subprocess.Popen(args, shell=False, stdin=subprocess.PIPE,
412
 
            stdout=subprocess.PIPE, stderr=subprocess.PIPE,
413
 
            close_fds=not sys.platform.startswith('win'))
414
 
 
415
 
        while True:
416
 
            try:
417
 
                while True:
418
 
                    # We use readline() instead of iterating over stdout
419
 
                    # because it appears that iterating over stdout causes a
420
 
                    # lot more buffering to take place (probably so it can
421
 
                    # return its lines as a batch). We don't want too much
422
 
                    # buffering because this foils automatic and human monitors
423
 
                    # trying to verify that the subprocess is still alive.
424
 
                    l = child.stdout.readline()
425
 
                    if not l:
426
 
                        break
427
 
                    result.write(l)
428
 
            except IOError, e:
429
 
                if e.errno == errno.EINTR:
430
 
                    # If the subprocess dies before we finish reading its
431
 
                    # output, a SIGCHLD signal can interrupt the reading.
432
 
                    # The correct thing to to in that case is to retry.
433
 
                    continue
434
 
                output.error(
435
 
                    "Error reading subprocess output for %s" % layer_name)
436
 
                output.info(str(e))
437
 
            else:
438
 
                break
439
 
 
440
 
        # Now stderr should be ready to read the whole thing.
441
 
        erriter = iter(child.stderr.read().splitlines())
442
 
        nfail = nerr = 0
443
 
        for line in erriter:
444
 
            try:
445
 
                result.num_ran, nfail, nerr = map(int, line.strip().split())
446
 
            except ValueError:
447
 
                continue
448
 
            else:
449
 
                break
450
 
        else:
451
 
            output = options.output
452
 
            output.error_with_banner("Could not communicate with subprocess:\n"
453
 
                                     "\n" + suberr)
454
 
 
455
 
        while nfail > 0:
456
 
            nfail -= 1
457
 
            failures.append((erriter.next().strip(), None))
458
 
        while nerr > 0:
459
 
            nerr -= 1
460
 
            errors.append((erriter.next().strip(), None))
461
 
 
462
 
    finally:
463
 
        result.done = True
464
 
 
465
 
 
466
 
class AbstractSubprocessResult(object):
467
 
    """A result of a subprocess layer run."""
468
 
 
469
 
    num_ran = 0
470
 
    done = False
471
 
 
472
 
    def __init__(self, layer_name, queue):
473
 
        self.layer_name = layer_name
474
 
        self.queue = queue
475
 
        self.stdout = []
476
 
 
477
 
    def write(self, out):
478
 
        """Receive a line of the subprocess out."""
479
 
 
480
 
 
481
 
class DeferredSubprocessResult(AbstractSubprocessResult):
482
 
    """Keeps stdout around for later processing,"""
483
 
    
484
 
    def write(self, out):
485
 
        if not _is_dots(out):
486
 
            self.stdout.append(out)
487
 
 
488
 
 
489
 
class ImmediateSubprocessResult(AbstractSubprocessResult):
490
 
    """Sends complete output to queue."""
491
 
 
492
 
    def write(self, out):
493
 
        sys.stdout.write(out)
494
 
        # Help keep-alive monitors (human or automated) keep up-to-date.
495
 
        sys.stdout.flush()
496
 
 
497
 
 
498
 
_is_dots = re.compile(r'\.+\n').match
499
 
class KeepaliveSubprocessResult(AbstractSubprocessResult):
500
 
    "Keeps stdout for later processing; sends marks to queue to show activity."
501
 
 
502
 
    _done = False
503
 
 
504
 
    def _set_done(self, value):
505
 
        self._done = value
506
 
        assert value, 'Internal error: unexpectedly setting done to False'
507
 
        self.queue.put((self.layer_name, ' LAYER FINISHED'))
508
 
    done = property(lambda self: self._done, _set_done) 
509
 
 
510
 
    def write(self, out):
511
 
        if _is_dots(out):
512
 
            self.queue.put((self.layer_name, out.strip()))
513
 
        else:
514
 
            self.stdout.append(out)
515
 
 
516
 
 
517
 
def resume_tests(script_parts, options, features, layers, failures, errors):
518
 
    results = []
519
 
    stdout_queue = None
520
 
    if options.processes == 1:
521
 
        result_factory = ImmediateSubprocessResult
522
 
    elif options.verbose > 1:
523
 
        result_factory = KeepaliveSubprocessResult
524
 
        stdout_queue = Queue.Queue()
525
 
    else:
526
 
        result_factory = DeferredSubprocessResult
527
 
    resume_number = int(options.processes > 1)
528
 
    ready_threads = []
529
 
    for layer_name, layer, tests in layers:
530
 
        result = result_factory(layer_name, stdout_queue)
531
 
        results.append(result)
532
 
        ready_threads.append(threading.Thread(
533
 
            target=spawn_layer_in_subprocess,
534
 
            args=(result, script_parts, options, features, layer_name, layer,
535
 
                  failures, errors, resume_number)))
536
 
        resume_number += 1
537
 
 
538
 
    # Now start a few threads at a time.
539
 
    running_threads = []
540
 
    results_iter = iter(results)
541
 
    current_result = results_iter.next()
542
 
    last_layer_intermediate_output = None
543
 
    output = None
544
 
    while ready_threads or running_threads:
545
 
        while len(running_threads) < options.processes and ready_threads:
546
 
            thread = ready_threads.pop(0)
547
 
            thread.start()
548
 
            running_threads.append(thread)
549
 
 
550
 
        for index, thread in reversed(list(enumerate(running_threads))):
551
 
            if not thread.isAlive():
552
 
                del running_threads[index]
553
 
 
554
 
        # Clear out any messages in queue
555
 
        while stdout_queue is not None:
556
 
            previous_output = output
557
 
            try:
558
 
                layer_name, output = stdout_queue.get(False)
559
 
            except Queue.Empty:
560
 
                break
561
 
            if layer_name != last_layer_intermediate_output:
562
 
                # Clarify what layer is reporting activity.
563
 
                if previous_output is not None:
564
 
                    sys.stdout.write(']\n')
565
 
                sys.stdout.write(
566
 
                    '[Parallel tests running in %s:\n  ' % (layer_name,))
567
 
                last_layer_intermediate_output = layer_name
568
 
            sys.stdout.write(output)
569
 
        # Display results in the order they would have been displayed, had the
570
 
        # work not been done in parallel.
571
 
        while current_result and current_result.done:
572
 
            if output is not None:
573
 
                sys.stdout.write(']\n')
574
 
                output = None
575
 
            map(sys.stdout.write, current_result.stdout)
576
 
 
577
 
            try:
578
 
                current_result = results_iter.next()
579
 
            except StopIteration:
580
 
                current_result = None
581
 
 
582
 
        # Help keep-alive monitors (human or automated) keep up-to-date.
583
 
        sys.stdout.flush()
584
 
        time.sleep(0.01) # Keep the loop from being too tight.
585
 
 
586
 
    # Return the total number of tests run.
587
 
    return sum(r.num_ran for r in results)
588
 
 
589
 
 
590
 
def tear_down_unneeded(options, needed, setup_layers, optional=False):
591
 
    # Tear down any layers not needed for these tests. The unneeded layers
592
 
    # might interfere.
593
 
    unneeded = [l for l in setup_layers if l not in needed]
594
 
    unneeded = order_by_bases(unneeded)
595
 
    unneeded.reverse()
596
 
    output = options.output
597
 
    for l in unneeded:
598
 
        output.start_tear_down(name_from_layer(l))
599
 
        t = time.time()
600
 
        try:
601
 
            if hasattr(l, 'tearDown'):
602
 
                l.tearDown()
603
 
        except NotImplementedError:
604
 
            output.tear_down_not_supported()
605
 
            if not optional:
606
 
                raise CanNotTearDown(l)
607
 
        else:
608
 
            output.stop_tear_down(time.time() - t)
609
 
        del setup_layers[l]
610
 
 
611
 
 
612
 
cant_pm_in_subprocess_message = """
613
 
Can't post-mortem debug when running a layer as a subprocess!
614
 
Try running layer %r by itself.
615
 
"""
616
 
 
617
 
 
618
 
def setup_layer(options, layer, setup_layers):
619
 
    assert layer is not object
620
 
    output = options.output
621
 
    if layer not in setup_layers:
622
 
        for base in layer.__bases__:
623
 
            if base is not object:
624
 
                setup_layer(options, base, setup_layers)
625
 
        output.start_set_up(name_from_layer(layer))
626
 
        t = time.time()
627
 
        if hasattr(layer, 'setUp'):
628
 
            try:
629
 
                layer.setUp()
630
 
            except Exception:
631
 
                if options.post_mortem:
632
 
                    if options.resume_layer:
633
 
                        options.output.error_with_banner(
634
 
                            cant_pm_in_subprocess_message
635
 
                            % options.resume_layer)
636
 
                        raise
637
 
                    else:
638
 
                        zope.testing.testrunner.debug.post_mortem(
639
 
                            sys.exc_info())
640
 
                else:
641
 
                    raise
642
 
 
643
 
        output.stop_set_up(time.time() - t)
644
 
        setup_layers[layer] = 1
645
 
 
646
 
 
647
 
class TestResult(unittest.TestResult):
648
 
 
649
 
    def __init__(self, options, tests, layer_name=None):
650
 
        unittest.TestResult.__init__(self)
651
 
        self.options = options
652
 
        # Calculate our list of relevant layers we need to call testSetUp
653
 
        # and testTearDown on.
654
 
        layers = []
655
 
        gather_layers(layer_from_name(layer_name), layers)
656
 
        self.layers = order_by_bases(layers)
657
 
        count = 0
658
 
        for test in tests:
659
 
            count += test.countTestCases()
660
 
        self.count = count
661
 
 
662
 
    def testSetUp(self):
663
 
        """A layer may define a setup method to be called before each
664
 
        individual test.
665
 
        """
666
 
        for layer in self.layers:
667
 
            if hasattr(layer, 'testSetUp'):
668
 
                layer.testSetUp()
669
 
 
670
 
    def testTearDown(self):
671
 
        """A layer may define a teardown method to be called after each
672
 
           individual test.
673
 
 
674
 
           This is useful for clearing the state of global
675
 
           resources or resetting external systems such as relational
676
 
           databases or daemons.
677
 
        """
678
 
        for layer in self.layers[-1::-1]:
679
 
            if hasattr(layer, 'testTearDown'):
680
 
                layer.testTearDown()
681
 
 
682
 
    def startTest(self, test):
683
 
        self.testSetUp()
684
 
        unittest.TestResult.startTest(self, test)
685
 
        testsRun = self.testsRun - 1 # subtract the one the base class added
686
 
        count = test.countTestCases()
687
 
        self.testsRun = testsRun + count
688
 
 
689
 
        self.options.output.start_test(test, self.testsRun, self.count)
690
 
 
691
 
        self._threads = threading.enumerate()
692
 
        self._start_time = time.time()
693
 
 
694
 
    def addSuccess(self, test):
695
 
        t = max(time.time() - self._start_time, 0.0)
696
 
        self.options.output.test_success(test, t)
697
 
 
698
 
    def addError(self, test, exc_info):
699
 
        self.options.output.test_error(test, time.time() - self._start_time,
700
 
                                       exc_info)
701
 
 
702
 
        unittest.TestResult.addError(self, test, exc_info)
703
 
 
704
 
        if self.options.post_mortem:
705
 
            if self.options.resume_layer:
706
 
                self.options.output.error_with_banner("Can't post-mortem debug"
707
 
                                                      " when running a layer"
708
 
                                                      " as a subprocess!")
709
 
            else:
710
 
                zope.testing.testrunner.debug.post_mortem(exc_info)
711
 
 
712
 
    def addFailure(self, test, exc_info):
713
 
        self.options.output.test_failure(test, time.time() - self._start_time,
714
 
                                         exc_info)
715
 
 
716
 
        unittest.TestResult.addFailure(self, test, exc_info)
717
 
 
718
 
        if self.options.post_mortem:
719
 
            # XXX: mgedmin: why isn't there a resume_layer check here like
720
 
            # in addError?
721
 
            zope.testing.testrunner.debug.post_mortem(exc_info)
722
 
 
723
 
    def stopTest(self, test):
724
 
        self.testTearDown()
725
 
        self.options.output.stop_test(test)
726
 
 
727
 
        if is_jython:
728
 
            pass
729
 
        else:
730
 
            if gc.garbage:
731
 
                self.options.output.test_garbage(test, gc.garbage)
732
 
                # TODO: Perhaps eat the garbage here, so that the garbage isn't
733
 
                #       printed for every subsequent test.
734
 
 
735
 
        # Did the test leave any new threads behind?
736
 
        new_threads = [t for t in threading.enumerate()
737
 
                         if (t.isAlive()
738
 
                             and
739
 
                             t not in self._threads)]
740
 
        if new_threads:
741
 
            self.options.output.test_threads(test, new_threads)
742
 
 
743
 
 
744
 
def layer_from_name(layer_name):
745
 
    """Return the layer for the corresponding layer_name by discovering
746
 
       and importing the necessary module if necessary.
747
 
 
748
 
       Note that a name -> layer cache is maintained by name_from_layer
749
 
       to allow locating layers in cases where it would otherwise be
750
 
       impossible.
751
 
    """
752
 
    if layer_name in _layer_name_cache:
753
 
        return _layer_name_cache[layer_name]
754
 
    layer_names = layer_name.split('.')
755
 
    layer_module, module_layer_name = layer_names[:-1], layer_names[-1]
756
 
    module_name = '.'.join(layer_module)
757
 
    module = import_name(module_name)
758
 
    try:
759
 
        return getattr(module, module_layer_name)
760
 
    except AttributeError, e:
761
 
        # the default error is very uninformative:
762
 
        #   AttributeError: 'module' object has no attribute 'DemoLayer'
763
 
        # it doesn't say *which* module
764
 
        raise AttributeError('module %r has no attribute %r'
765
 
                             % (module_name, module_layer_name))
766
 
 
767
 
 
768
 
def order_by_bases(layers):
769
 
    """Order the layers from least to most specific (bottom to top)
770
 
    """
771
 
    named_layers = [(name_from_layer(layer), layer) for layer in layers]
772
 
    named_layers.sort()
773
 
    named_layers.reverse()
774
 
    gathered = []
775
 
    for name, layer in named_layers:
776
 
        gather_layers(layer, gathered)
777
 
    gathered.reverse()
778
 
    seen = {}
779
 
    result = []
780
 
    for layer in gathered:
781
 
        if layer not in seen:
782
 
            seen[layer] = 1
783
 
            if layer in layers:
784
 
                result.append(layer)
785
 
    return result
786
 
 
787
 
 
788
 
def gather_layers(layer, result):
789
 
    if layer is not object:
790
 
        result.append(layer)
791
 
    for b in layer.__bases__:
792
 
        gather_layers(b, result)
793
 
 
794
 
 
795
 
class FakeInputContinueGenerator:
796
 
 
797
 
    def readline(self):
798
 
        print  'c\n'
799
 
        print '*'*70
800
 
        print ("Can't use pdb.set_trace when running a layer"
801
 
               " as a subprocess!")
802
 
        print '*'*70
803
 
        print
804
 
        return 'c\n'