1
##############################################################################
3
# Copyright (c) 2004-2008 Zope Corporation and Contributors.
6
# This software is subject to the provisions of the Zope Public License,
7
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
8
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
9
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
10
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
11
# FOR A PARTICULAR PURPOSE.
13
##############################################################################
16
$Id: __init__.py 86232 2008-05-03 15:09:33Z ctheune $
31
from zope.testing.testrunner.find import import_name
32
from zope.testing.testrunner.find import name_from_layer, _layer_name_cache
33
from zope.testing.testrunner.refcount import TrackRefs
34
from zope.testing.testrunner.options import get_options
35
import zope.testing.testrunner.coverage
36
import zope.testing.testrunner.doctest
37
import zope.testing.testrunner.logsupport
38
import zope.testing.testrunner.selftest
39
import zope.testing.testrunner.profiling
40
import zope.testing.testrunner.filter
41
import zope.testing.testrunner.garbagecollection
42
import zope.testing.testrunner.listing
43
import zope.testing.testrunner.statistics
44
import zope.testing.testrunner.process
45
import zope.testing.testrunner.interfaces
46
import zope.testing.testrunner.debug
47
import zope.testing.testrunner.tb_format
50
PYREFCOUNT_PATTERN = re.compile('\[[0-9]+ refs\]')
52
is_jython = sys.platform.startswith('java')
55
class SubprocessError(Exception):
56
"""An error occurred when running a subprocess
59
def __init__(self, reason, stderr):
64
return '%s: %s' % (self.reason, self.stderr)
67
class CanNotTearDown(Exception):
68
"Couldn't tear down a test"
74
It is the central point of this package and responsible for finding and
75
executing tests as well as configuring itself from the (command-line)
76
options passed into it.
80
def __init__(self, defaults=None, args=None, found_suites=None,
81
options=None, script_parts=None):
82
self.defaults = defaults
84
self.found_suites = found_suites
85
self.options = options
86
self.script_parts = script_parts
93
self.show_report = True
94
self.do_run_tests = True
98
self.tests_by_layer_name = {}
100
def ordered_layers(self):
101
layer_names = dict([(layer_from_name(layer_name), layer_name)
102
for layer_name in self.tests_by_layer_name])
103
for layer in order_by_bases(layer_names):
104
layer_name = layer_names[layer]
105
yield layer_name, layer, self.tests_by_layer_name[layer_name]
107
def register_tests(self, tests):
108
"""Registers tests."""
109
# XXX To support multiple features that find tests this shouldn't be
110
# an update but merge the various layers individually.
111
self.tests_by_layer_name.update(tests)
115
if self.options.fail:
118
# XXX Hacky to support existing code.
119
self.layer_name_cache = _layer_name_cache
120
self.layer_name_cache.clear()
123
for feature in self.features:
124
feature.global_setup()
128
# Some system tools like profilers are really bad with stack frames.
129
# E.g. hotshot doesn't like it when we leave the stack frame that we
130
# called start() from.
131
for feature in self.features:
135
if self.do_run_tests:
139
for feature in reversed(self.features):
140
feature.early_teardown()
142
for feature in reversed(self.features):
143
feature.global_teardown()
146
for feature in self.features:
150
if self.args is None:
151
self.args = sys.argv[:]
152
# Check to see if we are being run as a subprocess. If we are,
153
# then use the resume-layer and defaults passed in.
154
if len(self.args) > 1 and self.args[1] == '--resume-layer':
156
resume_layer = self.args.pop(1)
157
resume_number = int(self.args.pop(1))
159
while len(self.args) > 1 and self.args[1] == '--default':
161
self.defaults.append(self.args.pop(1))
163
sys.stdin = FakeInputContinueGenerator()
165
resume_layer = resume_number = None
167
options = get_options(self.args, self.defaults)
169
options.testrunner_defaults = self.defaults
170
options.resume_layer = resume_layer
171
options.resume_number = resume_number
173
self.options = options
175
self.features.append(zope.testing.testrunner.selftest.SelfTest(self))
176
self.features.append(zope.testing.testrunner.logsupport.Logging(self))
177
self.features.append(zope.testing.testrunner.coverage.Coverage(self))
178
self.features.append(zope.testing.testrunner.doctest.DocTest(self))
179
self.features.append(zope.testing.testrunner.profiling.Profiling(self))
181
# Jython GC support is not yet implemented
184
self.features.append(
185
zope.testing.testrunner.garbagecollection.Threshold(self))
186
self.features.append(
187
zope.testing.testrunner.garbagecollection.Debug(self))
189
self.features.append(zope.testing.testrunner.find.Find(self))
190
self.features.append(zope.testing.testrunner.process.SubProcess(self))
191
self.features.append(zope.testing.testrunner.filter.Filter(self))
192
self.features.append(zope.testing.testrunner.listing.Listing(self))
193
self.features.append(
194
zope.testing.testrunner.statistics.Statistics(self))
195
self.features.append(zope.testing.testrunner.tb_format.Traceback(self))
197
# Remove all features that aren't activated
198
self.features = [f for f in self.features if f.active]
201
"""Run all tests that were registered.
203
Returns True if there where failures or False if all tests passed.
207
layers_to_run = list(self.ordered_layers())
208
should_resume = False
211
layer_name, layer, tests = layers_to_run[0]
212
for feature in self.features:
213
feature.layer_setup(layer)
215
self.ran += run_layer(self.options, layer_name, layer, tests,
216
setup_layers, self.failures, self.errors)
217
except zope.testing.testrunner.interfaces.EndRun:
220
except CanNotTearDown:
221
if not self.options.resume_layer:
226
if self.options.processes > 1:
233
self.ran += resume_tests(
234
self.script_parts, self.options, self.features,
235
layers_to_run, self.failures, self.errors)
238
if self.options.resume_layer is None:
239
self.options.output.info("Tearing down left over layers:")
240
tear_down_unneeded(self.options, (), setup_layers, True)
242
self.failed = bool(self.import_errors or self.failures or self.errors)
245
def run_tests(options, tests, name, failures, errors):
246
repeat = options.repeat or 1
247
repeat_range = iter(range(repeat))
250
output = options.output
253
# Jython has no GC suppport - set count to 0
257
lgarbage = len(gc.garbage)
260
if options.report_refcounts:
262
# XXX This code path is untested
264
rc = sys.gettotalrefcount()
266
for iteration in repeat_range:
268
output.info("Iteration %d" % (iteration + 1))
270
if options.verbose > 0 or options.progress:
271
output.info(' Running:')
272
result = TestResult(options, tests, layer_name=name)
276
if options.post_mortem:
277
# post-mortem debugging
279
if result.shouldStop:
281
result.startTest(test)
282
state = test.__dict__.copy()
286
except KeyboardInterrupt:
291
sys.exc_info()[:2] + (sys.exc_info()[2].tb_next, ),
294
result.addSuccess(test)
296
result.stopTest(test)
297
test.__dict__.clear()
298
test.__dict__.update(state)
303
if result.shouldStop:
305
state = test.__dict__.copy()
307
test.__dict__.clear()
308
test.__dict__.update(state)
312
failures.extend(result.failures)
313
errors.extend(result.errors)
314
output.summary(result.testsRun, len(result.failures),
315
len(result.errors), t)
316
ran = result.testsRun
322
if len(gc.garbage) > lgarbage:
323
output.garbage(gc.garbage[lgarbage:])
324
lgarbage = len(gc.garbage)
326
if options.report_refcounts:
328
# If we are being tested, we don't want stdout itself to
329
# foul up the numbers. :)
331
sys.stdout.getvalue()
332
except AttributeError:
336
rc = sys.gettotalrefcount()
340
output.detailed_refcounts(track, rc, prev)
344
output.refcounts(rc, prev)
349
def run_layer(options, layer_name, layer, tests, setup_layers,
352
output = options.output
354
gather_layers(layer, gathered)
355
needed = dict([(l, 1) for l in gathered])
356
if options.resume_number != 0:
357
output.info("Running %s tests:" % layer_name)
358
tear_down_unneeded(options, needed, setup_layers)
360
if options.resume_layer is not None:
361
output.info_suboptimal(" Running in a subprocess.")
364
setup_layer(options, layer, setup_layers)
365
except zope.testing.testrunner.interfaces.EndRun:
368
f = cStringIO.StringIO()
369
traceback.print_exc(file=f)
370
output.error(f.getvalue())
371
errors.append((SetUpLayerFailure(), sys.exc_info()))
374
return run_tests(options, tests, layer_name, failures, errors)
377
class SetUpLayerFailure(unittest.TestCase):
380
"Layer set up failure."
383
def spawn_layer_in_subprocess(result, script_parts, options, features,
384
layer_name, layer, failures, errors,
388
if script_parts is None:
389
script_parts = sys.argv[0:1]
390
args = [sys.executable]
391
args.extend(script_parts)
392
args.extend(['--resume-layer', layer_name, str(resume_number)])
393
for d in options.testrunner_defaults:
394
args.extend(['--default', d])
396
args.extend(options.original_testrunner_args[1:])
398
# this is because of a bug in Python (http://www.python.org/sf/900092)
399
if (options.profile == 'hotshot'
400
and sys.version_info[:3] <= (2, 4, 1)):
403
if sys.platform.startswith('win'):
404
args = args[0] + ' ' + ' '.join([
405
('"' + a.replace('\\', '\\\\').replace('"', '\\"') + '"')
408
for feature in features:
409
feature.layer_setup(layer)
411
child = subprocess.Popen(args, shell=False, stdin=subprocess.PIPE,
412
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
413
close_fds=not sys.platform.startswith('win'))
418
# We use readline() instead of iterating over stdout
419
# because it appears that iterating over stdout causes a
420
# lot more buffering to take place (probably so it can
421
# return its lines as a batch). We don't want too much
422
# buffering because this foils automatic and human monitors
423
# trying to verify that the subprocess is still alive.
424
l = child.stdout.readline()
429
if e.errno == errno.EINTR:
430
# If the subprocess dies before we finish reading its
431
# output, a SIGCHLD signal can interrupt the reading.
432
# The correct thing to to in that case is to retry.
435
"Error reading subprocess output for %s" % layer_name)
440
# Now stderr should be ready to read the whole thing.
441
erriter = iter(child.stderr.read().splitlines())
445
result.num_ran, nfail, nerr = map(int, line.strip().split())
451
output = options.output
452
output.error_with_banner("Could not communicate with subprocess:\n"
457
failures.append((erriter.next().strip(), None))
460
errors.append((erriter.next().strip(), None))
466
class AbstractSubprocessResult(object):
467
"""A result of a subprocess layer run."""
472
def __init__(self, layer_name, queue):
473
self.layer_name = layer_name
477
def write(self, out):
478
"""Receive a line of the subprocess out."""
481
class DeferredSubprocessResult(AbstractSubprocessResult):
482
"""Keeps stdout around for later processing,"""
484
def write(self, out):
485
if not _is_dots(out):
486
self.stdout.append(out)
489
class ImmediateSubprocessResult(AbstractSubprocessResult):
490
"""Sends complete output to queue."""
492
def write(self, out):
493
sys.stdout.write(out)
494
# Help keep-alive monitors (human or automated) keep up-to-date.
498
_is_dots = re.compile(r'\.+\n').match
499
class KeepaliveSubprocessResult(AbstractSubprocessResult):
500
"Keeps stdout for later processing; sends marks to queue to show activity."
504
def _set_done(self, value):
506
assert value, 'Internal error: unexpectedly setting done to False'
507
self.queue.put((self.layer_name, ' LAYER FINISHED'))
508
done = property(lambda self: self._done, _set_done)
510
def write(self, out):
512
self.queue.put((self.layer_name, out.strip()))
514
self.stdout.append(out)
517
def resume_tests(script_parts, options, features, layers, failures, errors):
520
if options.processes == 1:
521
result_factory = ImmediateSubprocessResult
522
elif options.verbose > 1:
523
result_factory = KeepaliveSubprocessResult
524
stdout_queue = Queue.Queue()
526
result_factory = DeferredSubprocessResult
527
resume_number = int(options.processes > 1)
529
for layer_name, layer, tests in layers:
530
result = result_factory(layer_name, stdout_queue)
531
results.append(result)
532
ready_threads.append(threading.Thread(
533
target=spawn_layer_in_subprocess,
534
args=(result, script_parts, options, features, layer_name, layer,
535
failures, errors, resume_number)))
538
# Now start a few threads at a time.
540
results_iter = iter(results)
541
current_result = results_iter.next()
542
last_layer_intermediate_output = None
544
while ready_threads or running_threads:
545
while len(running_threads) < options.processes and ready_threads:
546
thread = ready_threads.pop(0)
548
running_threads.append(thread)
550
for index, thread in reversed(list(enumerate(running_threads))):
551
if not thread.isAlive():
552
del running_threads[index]
554
# Clear out any messages in queue
555
while stdout_queue is not None:
556
previous_output = output
558
layer_name, output = stdout_queue.get(False)
561
if layer_name != last_layer_intermediate_output:
562
# Clarify what layer is reporting activity.
563
if previous_output is not None:
564
sys.stdout.write(']\n')
566
'[Parallel tests running in %s:\n ' % (layer_name,))
567
last_layer_intermediate_output = layer_name
568
sys.stdout.write(output)
569
# Display results in the order they would have been displayed, had the
570
# work not been done in parallel.
571
while current_result and current_result.done:
572
if output is not None:
573
sys.stdout.write(']\n')
575
map(sys.stdout.write, current_result.stdout)
578
current_result = results_iter.next()
579
except StopIteration:
580
current_result = None
582
# Help keep-alive monitors (human or automated) keep up-to-date.
584
time.sleep(0.01) # Keep the loop from being too tight.
586
# Return the total number of tests run.
587
return sum(r.num_ran for r in results)
590
def tear_down_unneeded(options, needed, setup_layers, optional=False):
591
# Tear down any layers not needed for these tests. The unneeded layers
593
unneeded = [l for l in setup_layers if l not in needed]
594
unneeded = order_by_bases(unneeded)
596
output = options.output
598
output.start_tear_down(name_from_layer(l))
601
if hasattr(l, 'tearDown'):
603
except NotImplementedError:
604
output.tear_down_not_supported()
606
raise CanNotTearDown(l)
608
output.stop_tear_down(time.time() - t)
612
cant_pm_in_subprocess_message = """
613
Can't post-mortem debug when running a layer as a subprocess!
614
Try running layer %r by itself.
618
def setup_layer(options, layer, setup_layers):
619
assert layer is not object
620
output = options.output
621
if layer not in setup_layers:
622
for base in layer.__bases__:
623
if base is not object:
624
setup_layer(options, base, setup_layers)
625
output.start_set_up(name_from_layer(layer))
627
if hasattr(layer, 'setUp'):
631
if options.post_mortem:
632
if options.resume_layer:
633
options.output.error_with_banner(
634
cant_pm_in_subprocess_message
635
% options.resume_layer)
638
zope.testing.testrunner.debug.post_mortem(
643
output.stop_set_up(time.time() - t)
644
setup_layers[layer] = 1
647
class TestResult(unittest.TestResult):
649
def __init__(self, options, tests, layer_name=None):
650
unittest.TestResult.__init__(self)
651
self.options = options
652
# Calculate our list of relevant layers we need to call testSetUp
653
# and testTearDown on.
655
gather_layers(layer_from_name(layer_name), layers)
656
self.layers = order_by_bases(layers)
659
count += test.countTestCases()
663
"""A layer may define a setup method to be called before each
666
for layer in self.layers:
667
if hasattr(layer, 'testSetUp'):
670
def testTearDown(self):
671
"""A layer may define a teardown method to be called after each
674
This is useful for clearing the state of global
675
resources or resetting external systems such as relational
676
databases or daemons.
678
for layer in self.layers[-1::-1]:
679
if hasattr(layer, 'testTearDown'):
682
def startTest(self, test):
684
unittest.TestResult.startTest(self, test)
685
testsRun = self.testsRun - 1 # subtract the one the base class added
686
count = test.countTestCases()
687
self.testsRun = testsRun + count
689
self.options.output.start_test(test, self.testsRun, self.count)
691
self._threads = threading.enumerate()
692
self._start_time = time.time()
694
def addSuccess(self, test):
695
t = max(time.time() - self._start_time, 0.0)
696
self.options.output.test_success(test, t)
698
def addError(self, test, exc_info):
699
self.options.output.test_error(test, time.time() - self._start_time,
702
unittest.TestResult.addError(self, test, exc_info)
704
if self.options.post_mortem:
705
if self.options.resume_layer:
706
self.options.output.error_with_banner("Can't post-mortem debug"
707
" when running a layer"
710
zope.testing.testrunner.debug.post_mortem(exc_info)
712
def addFailure(self, test, exc_info):
713
self.options.output.test_failure(test, time.time() - self._start_time,
716
unittest.TestResult.addFailure(self, test, exc_info)
718
if self.options.post_mortem:
719
# XXX: mgedmin: why isn't there a resume_layer check here like
721
zope.testing.testrunner.debug.post_mortem(exc_info)
723
def stopTest(self, test):
725
self.options.output.stop_test(test)
731
self.options.output.test_garbage(test, gc.garbage)
732
# TODO: Perhaps eat the garbage here, so that the garbage isn't
733
# printed for every subsequent test.
735
# Did the test leave any new threads behind?
736
new_threads = [t for t in threading.enumerate()
739
t not in self._threads)]
741
self.options.output.test_threads(test, new_threads)
744
def layer_from_name(layer_name):
745
"""Return the layer for the corresponding layer_name by discovering
746
and importing the necessary module if necessary.
748
Note that a name -> layer cache is maintained by name_from_layer
749
to allow locating layers in cases where it would otherwise be
752
if layer_name in _layer_name_cache:
753
return _layer_name_cache[layer_name]
754
layer_names = layer_name.split('.')
755
layer_module, module_layer_name = layer_names[:-1], layer_names[-1]
756
module_name = '.'.join(layer_module)
757
module = import_name(module_name)
759
return getattr(module, module_layer_name)
760
except AttributeError, e:
761
# the default error is very uninformative:
762
# AttributeError: 'module' object has no attribute 'DemoLayer'
763
# it doesn't say *which* module
764
raise AttributeError('module %r has no attribute %r'
765
% (module_name, module_layer_name))
768
def order_by_bases(layers):
769
"""Order the layers from least to most specific (bottom to top)
771
named_layers = [(name_from_layer(layer), layer) for layer in layers]
773
named_layers.reverse()
775
for name, layer in named_layers:
776
gather_layers(layer, gathered)
780
for layer in gathered:
781
if layer not in seen:
788
def gather_layers(layer, result):
789
if layer is not object:
791
for b in layer.__bases__:
792
gather_layers(b, result)
795
class FakeInputContinueGenerator:
800
print ("Can't use pdb.set_trace when running a layer"