~apparmor-dev/apparmor/master

« back to all changes in this revision

Viewing changes to parser/tst/testlib.py

  • Committer: Steve Beattie
  • Date: 2019-02-19 09:38:13 UTC
  • Revision ID: sbeattie@ubuntu.com-20190219093813-ud526ee6hwn8nljz
The AppArmor project has been converted to git and is now hosted on
gitlab.

To get the converted repository, please do
  git clone https://gitlab.com/apparmor/apparmor

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
#!/usr/bin/env python3
2
 
# ------------------------------------------------------------------
3
 
#
4
 
#   Copyright (C) 2013 Canonical Ltd.
5
 
#   Author: Steve Beattie <steve@nxnw.org>
6
 
#
7
 
#   This program is free software; you can redistribute it and/or
8
 
#   modify it under the terms of version 2 of the GNU General Public
9
 
#   License published by the Free Software Foundation.
10
 
#
11
 
# ------------------------------------------------------------------
12
 
 
13
 
import os
14
 
import shutil
15
 
import signal
16
 
import subprocess
17
 
import tempfile
18
 
import time
19
 
import unittest
20
 
 
21
 
TIMEOUT_ERROR_CODE = 152
22
 
DEFAULT_PARSER = '../apparmor_parser'
23
 
 
24
 
 
25
 
# http://www.chiark.greenend.org.uk/ucgi/~cjwatson/blosxom/2009-07-02-python-sigpipe.html
26
 
# This is needed so that the subprocesses that produce endless output
27
 
# actually quit when the reader goes away.
28
 
def subprocess_setup():
29
 
    # Python installs a SIGPIPE handler by default. This is usually not
30
 
    # what non-Python subprocesses expect.
31
 
    signal.signal(signal.SIGPIPE, signal.SIG_DFL)
32
 
 
33
 
 
34
 
class AANoCleanupMetaClass(type):
35
 
    def __new__(cls, name, bases, attrs):
36
 
 
37
 
        for attr_name, attr_value in attrs.items():
38
 
            if attr_name.startswith("test_"):
39
 
                attrs[attr_name] = cls.keep_on_fail(attr_value)
40
 
        return super(AANoCleanupMetaClass, cls).__new__(cls, name, bases, attrs)
41
 
 
42
 
    @classmethod
43
 
    def keep_on_fail(cls, unittest_func):
44
 
        '''wrapping function for unittest testcases to detect failure
45
 
           and leave behind test files in tearDown(); to be used as
46
 
           a decorator'''
47
 
 
48
 
        def new_unittest_func(self):
49
 
            try:
50
 
                return unittest_func(self)
51
 
            except unittest.SkipTest:
52
 
                raise
53
 
            except Exception:
54
 
                self.do_cleanup = False
55
 
                raise
56
 
 
57
 
        return new_unittest_func
58
 
 
59
 
 
60
 
class AATestTemplate(unittest.TestCase, metaclass=AANoCleanupMetaClass):
61
 
    '''Stub class for use by test scripts'''
62
 
    debug = False
63
 
    do_cleanup = True
64
 
 
65
 
    def run_cmd_check(self, command, input=None, stderr=subprocess.STDOUT, stdout=subprocess.PIPE,
66
 
                      stdin=None, timeout=120, expected_rc=0, expected_string=None):
67
 
        '''Wrapper around run_cmd that checks the rc code against
68
 
           expected_rc and for expected strings in the output if
69
 
           passed. The valgrind tests generally don't care what the
70
 
           rc is as long as it's not a specific set of return codes,
71
 
           so can't push the check directly into run_cmd().'''
72
 
        rc, report = self.run_cmd(command, input, stderr, stdout, stdin, timeout)
73
 
        self.assertEqual(rc, expected_rc, "Got return code %d, expected %d\nCommand run: %s\nOutput: %s" % (rc, expected_rc, (' '.join(command)), report))
74
 
        if expected_string:
75
 
            self.assertIn(expected_string, report, 'Expected message "%s", got: \n%s' % (expected_string, report))
76
 
        return report
77
 
 
78
 
    def run_cmd(self, command, input=None, stderr=subprocess.STDOUT, stdout=subprocess.PIPE,
79
 
                stdin=None, timeout=120):
80
 
        '''Try to execute given command (array) and return its stdout, or
81
 
           return a textual error if it failed.'''
82
 
 
83
 
        if self.debug:
84
 
            print('\n===> Running command: \'%s\'' % (' '.join(command)))
85
 
 
86
 
        try:
87
 
            sp = subprocess.Popen(command, stdin=stdin, stdout=stdout, stderr=stderr,
88
 
                                  close_fds=True, preexec_fn=subprocess_setup)
89
 
        except OSError as e:
90
 
            return [127, str(e)]
91
 
 
92
 
        timeout_communicate = TimeoutFunction(sp.communicate, timeout)
93
 
        out, outerr = (None, None)
94
 
        try:
95
 
            out, outerr = timeout_communicate(input)
96
 
            rc = sp.returncode
97
 
        except TimeoutFunctionException as e:
98
 
            sp.terminate()
99
 
            outerr = b'test timed out, killed'
100
 
            rc = TIMEOUT_ERROR_CODE
101
 
 
102
 
        # Handle redirection of stdout
103
 
        if out is None:
104
 
            out = b''
105
 
        # Handle redirection of stderr
106
 
        if outerr is None:
107
 
            outerr = b''
108
 
 
109
 
        report = out.decode('utf-8') + outerr.decode('utf-8')
110
 
 
111
 
        return [rc, report]
112
 
 
113
 
 
114
 
# Timeout handler using alarm() from John P. Speno's Pythonic Avocado
115
 
class TimeoutFunctionException(Exception):
116
 
    """Exception to raise on a timeout"""
117
 
    pass
118
 
 
119
 
 
120
 
class TimeoutFunction:
121
 
    def __init__(self, function, timeout):
122
 
        self.timeout = timeout
123
 
        self.function = function
124
 
 
125
 
    def handle_timeout(self, signum, frame):
126
 
        raise TimeoutFunctionException()
127
 
 
128
 
    def __call__(self, *args, **kwargs):
129
 
        old = signal.signal(signal.SIGALRM, self.handle_timeout)
130
 
        signal.alarm(self.timeout)
131
 
        try:
132
 
            result = self.function(*args, **kwargs)
133
 
        finally:
134
 
            signal.signal(signal.SIGALRM, old)
135
 
        signal.alarm(0)
136
 
        return result
137
 
 
138
 
 
139
 
def filesystem_time_resolution():
140
 
    '''detect whether the filesystem stores subsecond timestamps'''
141
 
 
142
 
    default_diff = 0.1
143
 
    result = (True, default_diff)
144
 
 
145
 
    tmp_dir = tempfile.mkdtemp(prefix='aa-caching-nanostamp-')
146
 
    try:
147
 
        last_stamp = None
148
 
        for i in range(10):
149
 
            s = None
150
 
 
151
 
            with open(os.path.join(tmp_dir, 'test.%d' % i), 'w+') as f:
152
 
                s = os.fstat(f.fileno())
153
 
 
154
 
            if (s.st_mtime == last_stamp):
155
 
                print('\n===> WARNING: TMPDIR lacks subsecond timestamp resolution, falling back to slower test')
156
 
                result = (False, 1.0)
157
 
                break
158
 
 
159
 
            last_stamp = s.st_mtime
160
 
            time.sleep(default_diff)
161
 
    except:
162
 
        pass
163
 
    finally:
164
 
        if os.path.exists(tmp_dir):
165
 
            shutil.rmtree(tmp_dir)
166
 
 
167
 
    return result
168
 
 
169
 
 
170
 
def read_features_dir(path):
171
 
 
172
 
    result = ''
173
 
    if not os.path.exists(path) or not os.path.isdir(path):
174
 
        return result
175
 
 
176
 
    for name in os.listdir(path):
177
 
        entry = os.path.join(path, name)
178
 
        result += '%s {' % name
179
 
        if os.path.isfile(entry):
180
 
            with open(entry, 'r') as f:
181
 
                # don't need extra '\n' here as features file contains it
182
 
                result += '%s' % (f.read())
183
 
        elif os.path.isdir(entry):
184
 
            result += '%s' % (read_features_dir(entry))
185
 
        result += '}\n'
186
 
 
187
 
    return result
188
 
 
189
 
 
190
 
def touch(path):
191
 
    return os.utime(path, None)
192
 
 
193
 
 
194
 
def write_file(directory, file, contents):
195
 
    '''construct path, write contents to it, and return the constructed path'''
196
 
    path = os.path.join(directory, file)
197
 
    with open(path, 'w+') as f:
198
 
        f.write(contents)
199
 
    return path
200
 
 
201