1
"""Profiling support for unit and performance tests.
3
These are special purpose profiling methods which operate
4
in a more fine-grained way than nose's profiling plugin.
10
from test.lib.util import gc_collect, decorator
11
from test.lib import testing
12
from nose import SkipTest
16
from sqlalchemy import util
21
from sqlalchemy.util.compat import jython, pypy, win32
23
from test.lib.requires import _has_cextensions
24
_has_cext = _has_cextensions()
26
def profiled(target=None, **target_opts):
27
"""Function profiling.
31
@profiled('label', report=True, sort=('calls',), limit=20)
33
Enables profiling for a function when 'label' is targetted for
34
profiling. Report options can be supplied, and override the global
35
configuration and command-line options.
38
profile_config = {'targets': set(),
40
'print_callers': False,
41
'print_callees': False,
43
'sort': ('time', 'calls'),
46
target = 'anonymous_target'
48
filename = "%s.prof" % target
51
def decorate(fn, *args, **kw):
52
elapsed, load_stats, result = _profile(
53
filename, fn, *args, **kw)
55
graphic = target_opts.get('graphic', profile_config['graphic'])
57
os.system("runsnake %s" % filename)
59
report = target_opts.get('report', profile_config['report'])
61
sort_ = target_opts.get('sort', profile_config['sort'])
62
limit = target_opts.get('limit', profile_config['limit'])
63
print ("Profile report for target '%s' (%s)" % (
68
stats.sort_stats(*sort_)
70
stats.print_stats(limit)
74
print_callers = target_opts.get('print_callers',
75
profile_config['print_callers'])
79
print_callees = target_opts.get('print_callees',
80
profile_config['print_callees'])
89
class ProfileStatsFile(object):
90
""""Store per-platform/fn profiling results in a file.
92
We're still targeting Py2.5, 2.4 on 0.7 with no dependencies,
93
so no json lib :( need to roll something silly
97
from test.bootstrap.config import options
98
self.write = options.write_profiles
99
dirname, fname = os.path.split(__file__)
100
self.short_fname = "profiles.txt"
101
self.fname = os.path.join(dirname, self.short_fname)
102
self.data = util.defaultdict(lambda: util.defaultdict(dict))
105
# rewrite for the case where features changed,
109
@util.memoized_property
110
def platform_key(self):
112
dbapi_key = testing.db.name + "_" + testing.db.driver
114
# keep it at 2.7, 3.1, 3.2, etc. for now.
115
py_version = '.'.join([str(v) for v in sys.version_info[0:2]])
117
platform_tokens = [py_version]
118
platform_tokens.append(dbapi_key)
120
platform_tokens.append("jython")
122
platform_tokens.append("pypy")
124
platform_tokens.append("win")
125
platform_tokens.append(_has_cext and "cextensions" or "nocextensions")
126
return "_".join(platform_tokens)
129
test_key = testing.current_test
130
return test_key in self.data and self.platform_key in self.data[test_key]
132
def result(self, callcount):
133
test_key = testing.current_test
134
per_fn = self.data[test_key]
135
per_platform = per_fn[self.platform_key]
137
if 'counts' not in per_platform:
138
per_platform['counts'] = counts = []
140
counts = per_platform['counts']
142
if 'current_count' not in per_platform:
143
per_platform['current_count'] = current_count = 0
145
current_count = per_platform['current_count']
147
has_count = len(counts) > current_count
150
counts.append(callcount)
155
result = per_platform['lineno'], counts[current_count]
156
per_platform['current_count'] += 1
163
"# This file is written out on a per-environment basis.\n"\
164
"# For each test in aaa_profiling, the corresponding function and \n"\
165
"# environment is located within this file. If it doesn't exist,\n"\
166
"# the test is skipped.\n"\
167
"# If a callcount does exist, it is compared to what we received. \n"\
168
"# assertions are raised if the counts do not match.\n"\
170
"# To add a new callcount test, apply the function_call_count \n"\
171
"# decorator and re-run the tests using the --write-profiles option - \n"\
172
"# this file will be rewritten including the new count.\n"\
177
profile_f = open(self.fname)
178
for lineno, line in enumerate(profile_f):
180
if not line or line.startswith("#"):
183
test_key, platform_key, counts = line.split()
184
per_fn = self.data[test_key]
185
per_platform = per_fn[platform_key]
186
per_platform['counts'] = [int(count) for count in counts.split(",")]
187
per_platform['lineno'] = lineno + 1
188
per_platform['current_count'] = 0
192
print("Writing profile file %s" % self.fname)
193
profile_f = open(self.fname, "w")
194
profile_f.write(self._header())
195
for test_key in sorted(self.data):
197
per_fn = self.data[test_key]
198
profile_f.write("\n# TEST: %s\n\n" % test_key)
199
for platform_key in sorted(per_fn):
200
per_platform = per_fn[platform_key]
204
platform_key, ",".join(str(count) for count in per_platform['counts'])
209
_profile_stats = ProfileStatsFile()
211
from sqlalchemy.util.compat import update_wrapper
213
def function_call_count(variance=0.05):
214
"""Assert a target for a test case's function call count.
216
The main purpose of this assertion is to detect changes in
217
callcounts for various functions - the actual number is not as important.
218
Callcounts are stored in a file keyed to Python version and OS platform
219
information. This file is generated automatically for new tests,
220
and versioned so that unexpected changes in callcounts will be detected.
225
def wrap(*args, **kw):
229
raise SkipTest("cProfile is not installed")
231
if not _profile_stats.has_stats() and not _profile_stats.write:
232
raise SkipTest("No profiling stats available on this "
233
"platform for this function. Run tests with "
234
"--write-profiles to add statistics to %s for "
235
"this platform." % _profile_stats.short_fname)
240
timespent, load_stats, fn_result = _profile(
244
callcount = stats.total_calls
246
expected = _profile_stats.result(callcount)
248
expected_count = None
250
line_no, expected_count = expected
252
print("Pstats calls: %d Expected %s" % (
258
#stats.print_callers()
261
deviance = int(callcount * variance)
262
if abs(callcount - expected_count) > deviance:
263
raise AssertionError(
264
"Adjusted function call count %s not within %s%% "
265
"of expected %s. (Delete line %d of file %s to regenerate "
266
"this callcount, when tests are run with --write-profiles.)"
268
callcount, (variance * 100),
269
expected_count, line_no,
270
_profile_stats.fname))
272
return update_wrapper(wrap, fn)
276
def _profile(fn, *args, **kw):
277
filename = "%s.prof" % fn.__name__
280
st = pstats.Stats(filename)
285
cProfile.runctx('result = fn(*args, **kw)', globals(), locals(),
289
return ended - began, load_stats, locals()['result']