~ubuntu-branches/ubuntu/maverick/python3.1/maverick

« back to all changes in this revision

Viewing changes to Lib/ctypes/test/__init__.py

  • Committer: Bazaar Package Importer
  • Author(s): Matthias Klose
  • Date: 2009-03-23 00:01:27 UTC
  • Revision ID: james.westby@ubuntu.com-20090323000127-5fstfxju4ufrhthq
Tags: upstream-3.1~a1+20090322
ImportĀ upstreamĀ versionĀ 3.1~a1+20090322

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
import os, sys, unittest, getopt, time
 
2
 
 
3
use_resources = []
 
4
 
 
5
class ResourceDenied(Exception):
 
6
    """Test skipped because it requested a disallowed resource.
 
7
 
 
8
    This is raised when a test calls requires() for a resource that
 
9
    has not be enabled.  Resources are defined by test modules.
 
10
    """
 
11
 
 
12
def is_resource_enabled(resource):
 
13
    """Test whether a resource is enabled.
 
14
 
 
15
    If the caller's module is __main__ then automatically return True."""
 
16
    if sys._getframe().f_back.f_globals.get("__name__") == "__main__":
 
17
        return True
 
18
    result = use_resources is not None and \
 
19
           (resource in use_resources or "*" in use_resources)
 
20
    if not result:
 
21
        _unavail[resource] = None
 
22
    return result
 
23
 
 
24
_unavail = {}
 
25
def requires(resource, msg=None):
 
26
    """Raise ResourceDenied if the specified resource is not available.
 
27
 
 
28
    If the caller's module is __main__ then automatically return True."""
 
29
    # see if the caller's module is __main__ - if so, treat as if
 
30
    # the resource was set
 
31
    if sys._getframe().f_back.f_globals.get("__name__") == "__main__":
 
32
        return
 
33
    if not is_resource_enabled(resource):
 
34
        if msg is None:
 
35
            msg = "Use of the `%s' resource not enabled" % resource
 
36
        raise ResourceDenied(msg)
 
37
 
 
38
def find_package_modules(package, mask):
 
39
    import fnmatch
 
40
    if (hasattr(package, "__loader__") and
 
41
            hasattr(package.__loader__, '_files')):
 
42
        path = package.__name__.replace(".", os.path.sep)
 
43
        mask = os.path.join(path, mask)
 
44
        for fnm in package.__loader__._files.keys():
 
45
            if fnmatch.fnmatchcase(fnm, mask):
 
46
                yield os.path.splitext(fnm)[0].replace(os.path.sep, ".")
 
47
    else:
 
48
        path = package.__path__[0]
 
49
        for fnm in os.listdir(path):
 
50
            if fnmatch.fnmatchcase(fnm, mask):
 
51
                yield "%s.%s" % (package.__name__, os.path.splitext(fnm)[0])
 
52
 
 
53
def get_tests(package, mask, verbosity, exclude=()):
 
54
    """Return a list of skipped test modules, and a list of test cases."""
 
55
    tests = []
 
56
    skipped = []
 
57
    for modname in find_package_modules(package, mask):
 
58
        if modname.split(".")[-1] in exclude:
 
59
            skipped.append(modname)
 
60
            if verbosity > 1:
 
61
                print >> sys.stderr, "Skipped %s: excluded" % modname
 
62
            continue
 
63
        try:
 
64
            mod = __import__(modname, globals(), locals(), ['*'])
 
65
        except ResourceDenied as detail:
 
66
            skipped.append(modname)
 
67
            if verbosity > 1:
 
68
                print("Skipped %s: %s" % (modname, detail), file=sys.stderr)
 
69
            continue
 
70
        for name in dir(mod):
 
71
            if name.startswith("_"):
 
72
                continue
 
73
            o = getattr(mod, name)
 
74
            if type(o) is type(unittest.TestCase) and issubclass(o, unittest.TestCase):
 
75
                tests.append(o)
 
76
    return skipped, tests
 
77
 
 
78
def usage():
 
79
    print(__doc__)
 
80
    return 1
 
81
 
 
82
def test_with_refcounts(runner, verbosity, testcase):
 
83
    """Run testcase several times, tracking reference counts."""
 
84
    import gc
 
85
    import ctypes
 
86
    ptc = ctypes._pointer_type_cache.copy()
 
87
    cfc = ctypes._c_functype_cache.copy()
 
88
    wfc = ctypes._win_functype_cache.copy()
 
89
 
 
90
    # when searching for refcount leaks, we have to manually reset any
 
91
    # caches that ctypes has.
 
92
    def cleanup():
 
93
        ctypes._pointer_type_cache = ptc.copy()
 
94
        ctypes._c_functype_cache = cfc.copy()
 
95
        ctypes._win_functype_cache = wfc.copy()
 
96
        gc.collect()
 
97
 
 
98
    test = unittest.makeSuite(testcase)
 
99
    for i in range(5):
 
100
        rc = sys.gettotalrefcount()
 
101
        runner.run(test)
 
102
        cleanup()
 
103
    COUNT = 5
 
104
    refcounts = [None] * COUNT
 
105
    for i in range(COUNT):
 
106
        rc = sys.gettotalrefcount()
 
107
        runner.run(test)
 
108
        cleanup()
 
109
        refcounts[i] = sys.gettotalrefcount() - rc
 
110
    if filter(None, refcounts):
 
111
        print("%s leaks:\n\t" % testcase, refcounts)
 
112
    elif verbosity:
 
113
        print("%s: ok." % testcase)
 
114
 
 
115
class TestRunner(unittest.TextTestRunner):
 
116
    def run(self, test, skipped):
 
117
        "Run the given test case or test suite."
 
118
        # Same as unittest.TextTestRunner.run, except that it reports
 
119
        # skipped tests.
 
120
        result = self._makeResult()
 
121
        startTime = time.time()
 
122
        test(result)
 
123
        stopTime = time.time()
 
124
        timeTaken = stopTime - startTime
 
125
        result.printErrors()
 
126
        self.stream.writeln(result.separator2)
 
127
        run = result.testsRun
 
128
        if _unavail: #skipped:
 
129
            requested = list(_unavail.keys())
 
130
            requested.sort()
 
131
            self.stream.writeln("Ran %d test%s in %.3fs (%s module%s skipped)" %
 
132
                                (run, run != 1 and "s" or "", timeTaken,
 
133
                                 len(skipped),
 
134
                                 len(skipped) != 1 and "s" or ""))
 
135
            self.stream.writeln("Unavailable resources: %s" % ", ".join(requested))
 
136
        else:
 
137
            self.stream.writeln("Ran %d test%s in %.3fs" %
 
138
                                (run, run != 1 and "s" or "", timeTaken))
 
139
        self.stream.writeln()
 
140
        if not result.wasSuccessful():
 
141
            self.stream.write("FAILED (")
 
142
            failed, errored = map(len, (result.failures, result.errors))
 
143
            if failed:
 
144
                self.stream.write("failures=%d" % failed)
 
145
            if errored:
 
146
                if failed: self.stream.write(", ")
 
147
                self.stream.write("errors=%d" % errored)
 
148
            self.stream.writeln(")")
 
149
        else:
 
150
            self.stream.writeln("OK")
 
151
        return result
 
152
 
 
153
 
 
154
def main(*packages):
 
155
    try:
 
156
        opts, args = getopt.getopt(sys.argv[1:], "rqvu:x:")
 
157
    except getopt.error:
 
158
        return usage()
 
159
 
 
160
    verbosity = 1
 
161
    search_leaks = False
 
162
    exclude = []
 
163
    for flag, value in opts:
 
164
        if flag == "-q":
 
165
            verbosity -= 1
 
166
        elif flag == "-v":
 
167
            verbosity += 1
 
168
        elif flag == "-r":
 
169
            try:
 
170
                sys.gettotalrefcount
 
171
            except AttributeError:
 
172
                print("-r flag requires Python debug build", file=sys.stderr)
 
173
                return -1
 
174
            search_leaks = True
 
175
        elif flag == "-u":
 
176
            use_resources.extend(value.split(","))
 
177
        elif flag == "-x":
 
178
            exclude.extend(value.split(","))
 
179
 
 
180
    mask = "test_*.py"
 
181
    if args:
 
182
        mask = args[0]
 
183
 
 
184
    for package in packages:
 
185
        run_tests(package, mask, verbosity, search_leaks, exclude)
 
186
 
 
187
 
 
188
def run_tests(package, mask, verbosity, search_leaks, exclude):
 
189
    skipped, testcases = get_tests(package, mask, verbosity, exclude)
 
190
    runner = TestRunner(verbosity=verbosity)
 
191
 
 
192
    suites = [unittest.makeSuite(o) for o in testcases]
 
193
    suite = unittest.TestSuite(suites)
 
194
    result = runner.run(suite, skipped)
 
195
 
 
196
    if search_leaks:
 
197
        # hunt for refcount leaks
 
198
        runner = BasicTestRunner()
 
199
        for t in testcases:
 
200
            test_with_refcounts(runner, verbosity, t)
 
201
 
 
202
    return bool(result.errors)
 
203
 
 
204
class BasicTestRunner:
 
205
    def run(self, test):
 
206
        result = unittest.TestResult()
 
207
        test(result)
 
208
        return result