~zulcss/samba/server-dailies-3.4

« back to all changes in this revision

Viewing changes to source3/stf/comfychair.py

  • Committer: Chuck Short
  • Date: 2010-09-28 20:38:39 UTC
  • Revision ID: zulcss@ubuntu.com-20100928203839-pgjulytsi9ue63x1
Initial version

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#! /usr/bin/env python
 
2
 
 
3
# Copyright (C) 2002, 2003 by Martin Pool <mbp@samba.org>
 
4
# Copyright (C) 2003 by Tim Potter <tpot@samba.org>
 
5
 
6
# This program is free software; you can redistribute it and/or
 
7
# modify it under the terms of the GNU General Public License as
 
8
# published by the Free Software Foundation; either version 3 of the
 
9
# License, or (at your option) any later version.
 
10
 
11
# This program is distributed in the hope that it will be useful, but
 
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
 
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 
14
# General Public License for more details.
 
15
 
16
# You should have received a copy of the GNU General Public License
 
17
# along with this program; if not, see <http://www.gnu.org/licenses/>.
 
18
 
 
19
"""comfychair: a Python-based instrument of software torture.
 
20
 
 
21
Copyright (C) 2002, 2003 by Martin Pool <mbp@samba.org>
 
22
Copyright (C) 2003 by Tim Potter <tpot@samba.org>
 
23
 
 
24
This is a test framework designed for testing programs written in
 
25
Python, or (through a fork/exec interface) any other language.
 
26
 
 
27
For more information, see the file README.comfychair.
 
28
 
 
29
To run a test suite based on ComfyChair, just run it as a program.
 
30
"""
 
31
 
 
32
import sys, re
 
33
 
 
34
 
 
35
class TestCase:
 
36
    """A base class for tests.  This class defines required functions which
 
37
    can optionally be overridden by subclasses.  It also provides some
 
38
    utility functions for"""
 
39
 
 
40
    def __init__(self):
 
41
        self.test_log = ""
 
42
        self.background_pids = []
 
43
        self._cleanups = []
 
44
        self._enter_rundir()
 
45
        self._save_environment()
 
46
        self.add_cleanup(self.teardown)
 
47
 
 
48
 
 
49
    # --------------------------------------------------
 
50
    # Save and restore directory
 
51
    def _enter_rundir(self):
 
52
        import os
 
53
        self.basedir = os.getcwd()
 
54
        self.add_cleanup(self._restore_directory)
 
55
        self.rundir = os.path.join(self.basedir,
 
56
                                   'testtmp', 
 
57
                                   self.__class__.__name__)
 
58
        self.tmpdir = os.path.join(self.rundir, 'tmp')
 
59
        os.system("rm -fr %s" % self.rundir)
 
60
        os.makedirs(self.tmpdir)
 
61
        os.system("mkdir -p %s" % self.rundir)
 
62
        os.chdir(self.rundir)
 
63
 
 
64
    def _restore_directory(self):
 
65
        import os
 
66
        os.chdir(self.basedir)
 
67
 
 
68
    # --------------------------------------------------
 
69
    # Save and restore environment
 
70
    def _save_environment(self):
 
71
        import os
 
72
        self._saved_environ = os.environ.copy()
 
73
        self.add_cleanup(self._restore_environment)
 
74
 
 
75
    def _restore_environment(self):
 
76
        import os
 
77
        os.environ.clear()
 
78
        os.environ.update(self._saved_environ)
 
79
 
 
80
    
 
81
    def setup(self):
 
82
        """Set up test fixture."""
 
83
        pass
 
84
 
 
85
    def teardown(self):
 
86
        """Tear down test fixture."""
 
87
        pass
 
88
 
 
89
    def runtest(self):
 
90
        """Run the test."""
 
91
        pass
 
92
 
 
93
 
 
94
    def add_cleanup(self, c):
 
95
        """Queue a cleanup to be run when the test is complete."""
 
96
        self._cleanups.append(c)
 
97
        
 
98
 
 
99
    def fail(self, reason = ""):
 
100
        """Say the test failed."""
 
101
        raise AssertionError(reason)
 
102
 
 
103
 
 
104
    #############################################################
 
105
    # Requisition methods
 
106
 
 
107
    def require(self, predicate, message):
 
108
        """Check a predicate for running this test.
 
109
 
 
110
If the predicate value is not true, the test is skipped with a message explaining
 
111
why."""
 
112
        if not predicate:
 
113
            raise NotRunError, message
 
114
 
 
115
    def require_root(self):
 
116
        """Skip this test unless run by root."""
 
117
        import os
 
118
        self.require(os.getuid() == 0,
 
119
                     "must be root to run this test")
 
120
 
 
121
    #############################################################
 
122
    # Assertion methods
 
123
 
 
124
    def assert_(self, expr, reason = ""):
 
125
        if not expr:
 
126
            raise AssertionError(reason)
 
127
 
 
128
    def assert_equal(self, a, b):
 
129
        if not a == b:
 
130
            raise AssertionError("assertEquals failed: %s" % `(a, b)`)
 
131
            
 
132
    def assert_notequal(self, a, b):
 
133
        if a == b:
 
134
            raise AssertionError("assertNotEqual failed: %s" % `(a, b)`)
 
135
 
 
136
    def assert_re_match(self, pattern, s):
 
137
        """Assert that a string matches a particular pattern
 
138
 
 
139
        Inputs:
 
140
          pattern      string: regular expression
 
141
          s            string: to be matched
 
142
 
 
143
        Raises:
 
144
          AssertionError if not matched
 
145
          """
 
146
        if not re.match(pattern, s):
 
147
            raise AssertionError("string does not match regexp\n"
 
148
                                 "    string: %s\n"
 
149
                                 "    re: %s" % (`s`, `pattern`))
 
150
 
 
151
    def assert_re_search(self, pattern, s):
 
152
        """Assert that a string *contains* a particular pattern
 
153
 
 
154
        Inputs:
 
155
          pattern      string: regular expression
 
156
          s            string: to be searched
 
157
 
 
158
        Raises:
 
159
          AssertionError if not matched
 
160
          """
 
161
        if not re.search(pattern, s):
 
162
            raise AssertionError("string does not contain regexp\n"
 
163
                                 "    string: %s\n"
 
164
                                 "    re: %s" % (`s`, `pattern`))
 
165
 
 
166
 
 
167
    def assert_no_file(self, filename):
 
168
        import os.path
 
169
        assert not os.path.exists(filename), ("file exists but should not: %s" % filename)
 
170
 
 
171
 
 
172
    #############################################################
 
173
    # Methods for running programs
 
174
 
 
175
    def runcmd_background(self, cmd):
 
176
        import os
 
177
        self.test_log = self.test_log + "Run in background:\n" + `cmd` + "\n"
 
178
        pid = os.fork()
 
179
        if pid == 0:
 
180
            # child
 
181
            try:
 
182
                os.execvp("/bin/sh", ["/bin/sh", "-c", cmd])
 
183
            finally:
 
184
                os._exit(127)
 
185
        self.test_log = self.test_log + "pid: %d\n" % pid
 
186
        return pid
 
187
 
 
188
 
 
189
    def runcmd(self, cmd, expectedResult = 0):
 
190
        """Run a command, fail if the command returns an unexpected exit
 
191
        code.  Return the output produced."""
 
192
        rc, output, stderr = self.runcmd_unchecked(cmd)
 
193
        if rc != expectedResult:
 
194
            raise AssertionError("""command returned %d; expected %s: \"%s\"
 
195
stdout:
 
196
%s
 
197
stderr:
 
198
%s""" % (rc, expectedResult, cmd, output, stderr))
 
199
 
 
200
        return output, stderr
 
201
 
 
202
 
 
203
    def run_captured(self, cmd):
 
204
        """Run a command, capturing stdout and stderr.
 
205
 
 
206
        Based in part on popen2.py
 
207
 
 
208
        Returns (waitstatus, stdout, stderr)."""
 
209
        import os, types
 
210
        pid = os.fork()
 
211
        if pid == 0:
 
212
            # child
 
213
            try: 
 
214
                pid = os.getpid()
 
215
                openmode = os.O_WRONLY|os.O_CREAT|os.O_TRUNC
 
216
 
 
217
                outfd = os.open('%d.out' % pid, openmode, 0666)
 
218
                os.dup2(outfd, 1)
 
219
                os.close(outfd)
 
220
 
 
221
                errfd = os.open('%d.err' % pid, openmode, 0666)
 
222
                os.dup2(errfd, 2)
 
223
                os.close(errfd)
 
224
 
 
225
                if isinstance(cmd, types.StringType):
 
226
                    cmd = ['/bin/sh', '-c', cmd]
 
227
 
 
228
                os.execvp(cmd[0], cmd)
 
229
            finally:
 
230
                os._exit(127)
 
231
        else:
 
232
            # parent
 
233
            exited_pid, waitstatus = os.waitpid(pid, 0)
 
234
            stdout = open('%d.out' % pid).read()
 
235
            stderr = open('%d.err' % pid).read()
 
236
            return waitstatus, stdout, stderr
 
237
 
 
238
 
 
239
    def runcmd_unchecked(self, cmd, skip_on_noexec = 0):
 
240
        """Invoke a command; return (exitcode, stdout, stderr)"""
 
241
        import os
 
242
        waitstatus, stdout, stderr = self.run_captured(cmd)
 
243
        assert not os.WIFSIGNALED(waitstatus), \
 
244
               ("%s terminated with signal %d" % (`cmd`, os.WTERMSIG(waitstatus)))
 
245
        rc = os.WEXITSTATUS(waitstatus)
 
246
        self.test_log = self.test_log + ("""Run command: %s
 
247
Wait status: %#x (exit code %d, signal %d)
 
248
stdout:
 
249
%s
 
250
stderr:
 
251
%s""" % (cmd, waitstatus, os.WEXITSTATUS(waitstatus), os.WTERMSIG(waitstatus),
 
252
         stdout, stderr))
 
253
        if skip_on_noexec and rc == 127:
 
254
            # Either we could not execute the command or the command
 
255
            # returned exit code 127.  According to system(3) we can't
 
256
            # tell the difference.
 
257
            raise NotRunError, "could not execute %s" % `cmd`
 
258
        return rc, stdout, stderr
 
259
    
 
260
 
 
261
    def explain_failure(self, exc_info = None):
 
262
        print "test_log:"
 
263
        print self.test_log
 
264
 
 
265
 
 
266
    def log(self, msg):
 
267
        """Log a message to the test log.  This message is displayed if
 
268
        the test fails, or when the runtests function is invoked with
 
269
        the verbose option."""
 
270
        self.test_log = self.test_log + msg + "\n"
 
271
 
 
272
 
 
273
class NotRunError(Exception):
 
274
    """Raised if a test must be skipped because of missing resources"""
 
275
    def __init__(self, value = None):
 
276
        self.value = value
 
277
 
 
278
 
 
279
def _report_error(case, debugger):
 
280
    """Ask the test case to explain failure, and optionally run a debugger
 
281
 
 
282
    Input:
 
283
      case         TestCase instance
 
284
      debugger     if true, a debugger function to be applied to the traceback
 
285
"""
 
286
    import sys
 
287
    ex = sys.exc_info()
 
288
    print "-----------------------------------------------------------------"
 
289
    if ex:
 
290
        import traceback
 
291
        traceback.print_exc(file=sys.stdout)
 
292
    case.explain_failure()
 
293
    print "-----------------------------------------------------------------"
 
294
 
 
295
    if debugger:
 
296
        tb = ex[2]
 
297
        debugger(tb)
 
298
 
 
299
 
 
300
def runtests(test_list, verbose = 0, debugger = None):
 
301
    """Run a series of tests.
 
302
 
 
303
    Inputs:
 
304
      test_list    sequence of TestCase classes
 
305
      verbose      print more information as testing proceeds
 
306
      debugger     debugger object to be applied to errors
 
307
 
 
308
    Returns:
 
309
      unix return code: 0 for success, 1 for failures, 2 for test failure
 
310
    """
 
311
    import traceback
 
312
    ret = 0
 
313
    for test_class in test_list:
 
314
        print "%-30s" % _test_name(test_class),
 
315
        # flush now so that long running tests are easier to follow
 
316
        sys.stdout.flush()
 
317
 
 
318
        obj = None
 
319
        try:
 
320
            try: # run test and show result
 
321
                obj = test_class()
 
322
                obj.setup()
 
323
                obj.runtest()
 
324
                print "OK"
 
325
            except KeyboardInterrupt:
 
326
                print "INTERRUPT"
 
327
                _report_error(obj, debugger)
 
328
                ret = 2
 
329
                break
 
330
            except NotRunError, msg:
 
331
                print "NOTRUN, %s" % msg.value
 
332
            except:
 
333
                print "FAIL"
 
334
                _report_error(obj, debugger)
 
335
                ret = 1
 
336
        finally:
 
337
            while obj and obj._cleanups:
 
338
                try:
 
339
                    apply(obj._cleanups.pop())
 
340
                except KeyboardInterrupt:
 
341
                    print "interrupted during teardown"
 
342
                    _report_error(obj, debugger)
 
343
                    ret = 2
 
344
                    break
 
345
                except:
 
346
                    print "error during teardown"
 
347
                    _report_error(obj, debugger)
 
348
                    ret = 1
 
349
        # Display log file if we're verbose
 
350
        if ret == 0 and verbose:
 
351
            obj.explain_failure()
 
352
            
 
353
    return ret
 
354
 
 
355
 
 
356
def _test_name(test_class):
 
357
    """Return a human-readable name for a test class.
 
358
    """
 
359
    try:
 
360
        return test_class.__name__
 
361
    except:
 
362
        return `test_class`
 
363
 
 
364
 
 
365
def print_help():
 
366
    """Help for people running tests"""
 
367
    import sys
 
368
    print """%s: software test suite based on ComfyChair
 
369
 
 
370
usage:
 
371
    To run all tests, just run this program.  To run particular tests,
 
372
    list them on the command line.
 
373
 
 
374
options:
 
375
    --help              show usage message
 
376
    --list              list available tests
 
377
    --verbose, -v       show more information while running tests
 
378
    --post-mortem, -p   enter Python debugger on error
 
379
""" % sys.argv[0]
 
380
 
 
381
 
 
382
def print_list(test_list):
 
383
    """Show list of available tests"""
 
384
    for test_class in test_list:
 
385
        print "    %s" % _test_name(test_class)
 
386
 
 
387
 
 
388
def main(tests, extra_tests=[]):
 
389
    """Main entry point for test suites based on ComfyChair.
 
390
 
 
391
    inputs:
 
392
      tests       Sequence of TestCase subclasses to be run by default.
 
393
      extra_tests Sequence of TestCase subclasses that are available but
 
394
                  not run by default.
 
395
 
 
396
Test suites should contain this boilerplate:
 
397
 
 
398
    if __name__ == '__main__':
 
399
        comfychair.main(tests)
 
400
 
 
401
This function handles standard options such as --help and --list, and
 
402
by default runs all tests in the suggested order.
 
403
 
 
404
Calls sys.exit() on completion.
 
405
"""
 
406
    from sys import argv
 
407
    import getopt, sys
 
408
 
 
409
    opt_verbose = 0
 
410
    debugger = None
 
411
 
 
412
    opts, args = getopt.getopt(argv[1:], 'pv',
 
413
                               ['help', 'list', 'verbose', 'post-mortem'])
 
414
    for opt, opt_arg in opts:
 
415
        if opt == '--help':
 
416
            print_help()
 
417
            return
 
418
        elif opt == '--list':
 
419
            print_list(tests + extra_tests)
 
420
            return
 
421
        elif opt == '--verbose' or opt == '-v':
 
422
            opt_verbose = 1
 
423
        elif opt == '--post-mortem' or opt == '-p':
 
424
            import pdb
 
425
            debugger = pdb.post_mortem
 
426
 
 
427
    if args:
 
428
        all_tests = tests + extra_tests
 
429
        by_name = {}
 
430
        for t in all_tests:
 
431
            by_name[_test_name(t)] = t
 
432
        which_tests = []
 
433
        for name in args:
 
434
            which_tests.append(by_name[name])
 
435
    else:
 
436
        which_tests = tests
 
437
 
 
438
    sys.exit(runtests(which_tests, verbose=opt_verbose,
 
439
                      debugger=debugger))
 
440
 
 
441
 
 
442
if __name__ == '__main__':
 
443
    print __doc__