1
import os, sys, unittest, getopt, time
5
class ResourceDenied(Exception):
6
"""Test skipped because it requested a disallowed resource.
8
This is raised when a test calls requires() for a resource that
9
has not be enabled. Resources are defined by test modules.
12
def is_resource_enabled(resource):
13
"""Test whether a resource is enabled.
15
If the caller's module is __main__ then automatically return True."""
16
if sys._getframe().f_back.f_globals.get("__name__") == "__main__":
18
result = use_resources is not None and \
19
(resource in use_resources or "*" in use_resources)
21
_unavail[resource] = None
25
def requires(resource, msg=None):
26
"""Raise ResourceDenied if the specified resource is not available.
28
If the caller's module is __main__ then automatically return True."""
29
# see if the caller's module is __main__ - if so, treat as if
30
# the resource was set
31
if sys._getframe().f_back.f_globals.get("__name__") == "__main__":
33
if not is_resource_enabled(resource):
35
msg = "Use of the `%s' resource not enabled" % resource
36
raise ResourceDenied(msg)
38
def find_package_modules(package, mask):
40
if (hasattr(package, "__loader__") and
41
hasattr(package.__loader__, '_files')):
42
path = package.__name__.replace(".", os.path.sep)
43
mask = os.path.join(path, mask)
44
for fnm in package.__loader__._files.keys():
45
if fnmatch.fnmatchcase(fnm, mask):
46
yield os.path.splitext(fnm)[0].replace(os.path.sep, ".")
48
path = package.__path__[0]
49
for fnm in os.listdir(path):
50
if fnmatch.fnmatchcase(fnm, mask):
51
yield "%s.%s" % (package.__name__, os.path.splitext(fnm)[0])
53
def get_tests(package, mask, verbosity, exclude=()):
54
"""Return a list of skipped test modules, and a list of test cases."""
57
for modname in find_package_modules(package, mask):
58
if modname.split(".")[-1] in exclude:
59
skipped.append(modname)
61
print >> sys.stderr, "Skipped %s: excluded" % modname
64
mod = __import__(modname, globals(), locals(), ['*'])
65
except ResourceDenied as detail:
66
skipped.append(modname)
68
print("Skipped %s: %s" % (modname, detail), file=sys.stderr)
71
if name.startswith("_"):
73
o = getattr(mod, name)
74
if type(o) is type(unittest.TestCase) and issubclass(o, unittest.TestCase):
82
def test_with_refcounts(runner, verbosity, testcase):
83
"""Run testcase several times, tracking reference counts."""
86
ptc = ctypes._pointer_type_cache.copy()
87
cfc = ctypes._c_functype_cache.copy()
88
wfc = ctypes._win_functype_cache.copy()
90
# when searching for refcount leaks, we have to manually reset any
91
# caches that ctypes has.
93
ctypes._pointer_type_cache = ptc.copy()
94
ctypes._c_functype_cache = cfc.copy()
95
ctypes._win_functype_cache = wfc.copy()
98
test = unittest.makeSuite(testcase)
100
rc = sys.gettotalrefcount()
104
refcounts = [None] * COUNT
105
for i in range(COUNT):
106
rc = sys.gettotalrefcount()
109
refcounts[i] = sys.gettotalrefcount() - rc
110
if filter(None, refcounts):
111
print("%s leaks:\n\t" % testcase, refcounts)
113
print("%s: ok." % testcase)
115
class TestRunner(unittest.TextTestRunner):
116
def run(self, test, skipped):
117
"Run the given test case or test suite."
118
# Same as unittest.TextTestRunner.run, except that it reports
120
result = self._makeResult()
121
startTime = time.time()
123
stopTime = time.time()
124
timeTaken = stopTime - startTime
126
self.stream.writeln(result.separator2)
127
run = result.testsRun
128
if _unavail: #skipped:
129
requested = list(_unavail.keys())
131
self.stream.writeln("Ran %d test%s in %.3fs (%s module%s skipped)" %
132
(run, run != 1 and "s" or "", timeTaken,
134
len(skipped) != 1 and "s" or ""))
135
self.stream.writeln("Unavailable resources: %s" % ", ".join(requested))
137
self.stream.writeln("Ran %d test%s in %.3fs" %
138
(run, run != 1 and "s" or "", timeTaken))
139
self.stream.writeln()
140
if not result.wasSuccessful():
141
self.stream.write("FAILED (")
142
failed, errored = map(len, (result.failures, result.errors))
144
self.stream.write("failures=%d" % failed)
146
if failed: self.stream.write(", ")
147
self.stream.write("errors=%d" % errored)
148
self.stream.writeln(")")
150
self.stream.writeln("OK")
156
opts, args = getopt.getopt(sys.argv[1:], "rqvu:x:")
163
for flag, value in opts:
171
except AttributeError:
172
print("-r flag requires Python debug build", file=sys.stderr)
176
use_resources.extend(value.split(","))
178
exclude.extend(value.split(","))
184
for package in packages:
185
run_tests(package, mask, verbosity, search_leaks, exclude)
188
def run_tests(package, mask, verbosity, search_leaks, exclude):
189
skipped, testcases = get_tests(package, mask, verbosity, exclude)
190
runner = TestRunner(verbosity=verbosity)
192
suites = [unittest.makeSuite(o) for o in testcases]
193
suite = unittest.TestSuite(suites)
194
result = runner.run(suite, skipped)
197
# hunt for refcount leaks
198
runner = BasicTestRunner()
200
test_with_refcounts(runner, verbosity, t)
202
return bool(result.errors)
204
class BasicTestRunner:
206
result = unittest.TestResult()