2
# ------------------------------------------------------------------
4
# Copyright (C) 2013 Canonical Ltd.
5
# Author: Steve Beattie <steve@nxnw.org>
7
# This program is free software; you can redistribute it and/or
8
# modify it under the terms of version 2 of the GNU General Public
9
# License published by the Free Software Foundation.
11
# ------------------------------------------------------------------
21
TIMEOUT_ERROR_CODE = 152
22
DEFAULT_PARSER = '../apparmor_parser'
25
# http://www.chiark.greenend.org.uk/ucgi/~cjwatson/blosxom/2009-07-02-python-sigpipe.html
26
# This is needed so that the subprocesses that produce endless output
27
# actually quit when the reader goes away.
28
def subprocess_setup():
29
# Python installs a SIGPIPE handler by default. This is usually not
30
# what non-Python subprocesses expect.
31
signal.signal(signal.SIGPIPE, signal.SIG_DFL)
34
class AANoCleanupMetaClass(type):
35
def __new__(cls, name, bases, attrs):
37
for attr_name, attr_value in attrs.items():
38
if attr_name.startswith("test_"):
39
attrs[attr_name] = cls.keep_on_fail(attr_value)
40
return super(AANoCleanupMetaClass, cls).__new__(cls, name, bases, attrs)
43
def keep_on_fail(cls, unittest_func):
44
'''wrapping function for unittest testcases to detect failure
45
and leave behind test files in tearDown(); to be used as
48
def new_unittest_func(self):
50
return unittest_func(self)
51
except unittest.SkipTest:
54
self.do_cleanup = False
57
return new_unittest_func
60
class AATestTemplate(unittest.TestCase, metaclass=AANoCleanupMetaClass):
61
'''Stub class for use by test scripts'''
65
def run_cmd_check(self, command, input=None, stderr=subprocess.STDOUT, stdout=subprocess.PIPE,
66
stdin=None, timeout=120, expected_rc=0, expected_string=None):
67
'''Wrapper around run_cmd that checks the rc code against
68
expected_rc and for expected strings in the output if
69
passed. The valgrind tests generally don't care what the
70
rc is as long as it's not a specific set of return codes,
71
so can't push the check directly into run_cmd().'''
72
rc, report = self.run_cmd(command, input, stderr, stdout, stdin, timeout)
73
self.assertEqual(rc, expected_rc, "Got return code %d, expected %d\nCommand run: %s\nOutput: %s" % (rc, expected_rc, (' '.join(command)), report))
75
self.assertIn(expected_string, report, 'Expected message "%s", got: \n%s' % (expected_string, report))
78
def run_cmd(self, command, input=None, stderr=subprocess.STDOUT, stdout=subprocess.PIPE,
79
stdin=None, timeout=120):
80
'''Try to execute given command (array) and return its stdout, or
81
return a textual error if it failed.'''
84
print('\n===> Running command: \'%s\'' % (' '.join(command)))
87
sp = subprocess.Popen(command, stdin=stdin, stdout=stdout, stderr=stderr,
88
close_fds=True, preexec_fn=subprocess_setup)
92
timeout_communicate = TimeoutFunction(sp.communicate, timeout)
93
out, outerr = (None, None)
95
out, outerr = timeout_communicate(input)
97
except TimeoutFunctionException as e:
99
outerr = b'test timed out, killed'
100
rc = TIMEOUT_ERROR_CODE
102
# Handle redirection of stdout
105
# Handle redirection of stderr
109
report = out.decode('utf-8') + outerr.decode('utf-8')
114
# Timeout handler using alarm() from John P. Speno's Pythonic Avocado
115
class TimeoutFunctionException(Exception):
116
"""Exception to raise on a timeout"""
120
class TimeoutFunction:
121
def __init__(self, function, timeout):
122
self.timeout = timeout
123
self.function = function
125
def handle_timeout(self, signum, frame):
126
raise TimeoutFunctionException()
128
def __call__(self, *args, **kwargs):
129
old = signal.signal(signal.SIGALRM, self.handle_timeout)
130
signal.alarm(self.timeout)
132
result = self.function(*args, **kwargs)
134
signal.signal(signal.SIGALRM, old)
139
def filesystem_time_resolution():
140
'''detect whether the filesystem stores subsecond timestamps'''
143
result = (True, default_diff)
145
tmp_dir = tempfile.mkdtemp(prefix='aa-caching-nanostamp-')
151
with open(os.path.join(tmp_dir, 'test.%d' % i), 'w+') as f:
152
s = os.fstat(f.fileno())
154
if (s.st_mtime == last_stamp):
155
print('\n===> WARNING: TMPDIR lacks subsecond timestamp resolution, falling back to slower test')
156
result = (False, 1.0)
159
last_stamp = s.st_mtime
160
time.sleep(default_diff)
164
if os.path.exists(tmp_dir):
165
shutil.rmtree(tmp_dir)
170
def read_features_dir(path):
173
if not os.path.exists(path) or not os.path.isdir(path):
176
for name in os.listdir(path):
177
entry = os.path.join(path, name)
178
result += '%s {' % name
179
if os.path.isfile(entry):
180
with open(entry, 'r') as f:
181
# don't need extra '\n' here as features file contains it
182
result += '%s' % (f.read())
183
elif os.path.isdir(entry):
184
result += '%s' % (read_features_dir(entry))
191
return os.utime(path, None)
194
def write_file(directory, file, contents):
195
'''construct path, write contents to it, and return the constructed path'''
196
path = os.path.join(directory, file)
197
with open(path, 'w+') as f: