2
TestCmd.py: a testing framework for commands and scripts.
4
The TestCmd module provides a framework for portable automated testing
5
of executable commands and scripts (in any language, not just Python),
6
especially commands and scripts that require file system interaction.
8
In addition to running tests and evaluating conditions, the TestCmd
9
module manages and cleans up one or more temporary workspace
10
directories, and provides methods for creating files and directories in
11
those workspace directories from in-line data, here-documents), allowing
12
tests to be completely self-contained.
14
A TestCmd environment object is created via the usual invocation:
17
test = TestCmd.TestCmd()
19
There are a bunch of keyword arguments available at instantiation:
21
test = TestCmd.TestCmd(description = 'string',
22
program = 'program_or_script_to_test',
23
interpreter = 'script_interpreter',
27
match = default_match_function,
28
diff = default_diff_function,
31
There are a bunch of methods that let you do different things:
35
test.description_set('string')
37
test.program_set('program_or_script_to_test')
39
test.interpreter_set('script_interpreter')
40
test.interpreter_set(['script_interpreter', 'arg'])
42
test.workdir_set('prefix')
46
test.workpath('subdir', 'file')
48
test.subdir('subdir', ...)
50
test.rmdir('subdir', ...)
52
test.write('file', "contents\n")
53
test.write(['subdir', 'file'], "contents\n")
56
test.read(['subdir', 'file'])
57
test.read('file', mode)
58
test.read(['subdir', 'file'], mode)
60
test.writable('dir', 1)
61
test.writable('dir', None)
63
test.preserve(condition, ...)
65
test.cleanup(condition)
67
test.command_args(program = 'program_or_script_to_run',
68
interpreter = 'script_interpreter',
69
arguments = 'arguments to pass to program')
71
test.run(program = 'program_or_script_to_run',
72
interpreter = 'script_interpreter',
73
arguments = 'arguments to pass to program',
74
chdir = 'directory_to_chdir_to',
75
stdin = 'input to feed to the program\n')
76
universal_newlines = True)
78
p = test.start(program = 'program_or_script_to_run',
79
interpreter = 'script_interpreter',
80
arguments = 'arguments to pass to program',
81
universal_newlines = None)
86
test.pass_test(condition)
87
test.pass_test(condition, function)
90
test.fail_test(condition)
91
test.fail_test(condition, function)
92
test.fail_test(condition, function, skip)
95
test.no_result(condition)
96
test.no_result(condition, function)
97
test.no_result(condition, function, skip)
105
test.symlink(target, link)
108
test.banner(string, width)
110
test.diff(actual, expected)
112
test.match(actual, expected)
114
test.match_exact("actual 1\nactual 2\n", "expected 1\nexpected 2\n")
115
test.match_exact(["actual 1\n", "actual 2\n"],
116
["expected 1\n", "expected 2\n"])
118
test.match_re("actual 1\nactual 2\n", regex_string)
119
test.match_re(["actual 1\n", "actual 2\n"], list_of_regexes)
121
test.match_re_dotall("actual 1\nactual 2\n", regex_string)
122
test.match_re_dotall(["actual 1\n", "actual 2\n"], list_of_regexes)
125
test.tempdir('temporary-directory')
131
test.where_is('foo', 'PATH1:PATH2')
132
test.where_is('foo', 'PATH1;PATH2', '.suffix3;.suffix4')
135
test.unlink('subdir', 'file')
137
The TestCmd module provides pass_test(), fail_test(), and no_result()
138
unbound functions that report test results for use with the Aegis change
139
management system. These methods terminate the test immediately,
140
reporting PASSED, FAILED, or NO RESULT respectively, and exiting with
141
status 0 (success), 1 or 2 respectively. This allows for a distinction
142
between an actual failed test and a test that could not be properly
143
evaluated because of an external condition (such as a full file system
144
or incorrect permissions).
149
TestCmd.pass_test(condition)
150
TestCmd.pass_test(condition, function)
153
TestCmd.fail_test(condition)
154
TestCmd.fail_test(condition, function)
155
TestCmd.fail_test(condition, function, skip)
158
TestCmd.no_result(condition)
159
TestCmd.no_result(condition, function)
160
TestCmd.no_result(condition, function, skip)
162
The TestCmd module also provides unbound functions that handle matching
163
in the same way as the match_*() methods described above.
167
test = TestCmd.TestCmd(match = TestCmd.match_exact)
169
test = TestCmd.TestCmd(match = TestCmd.match_re)
171
test = TestCmd.TestCmd(match = TestCmd.match_re_dotall)
173
The TestCmd module provides unbound functions that can be used for the
174
"diff" argument to TestCmd.TestCmd instantiation:
178
test = TestCmd.TestCmd(match = TestCmd.match_re,
179
diff = TestCmd.diff_re)
181
test = TestCmd.TestCmd(diff = TestCmd.simple_diff)
183
The "diff" argument can also be used with standard difflib functions:
187
test = TestCmd.TestCmd(diff = difflib.context_diff)
189
test = TestCmd.TestCmd(diff = difflib.unified_diff)
191
Lastly, the where_is() method also exists in an unbound function
196
TestCmd.where_is('foo')
197
TestCmd.where_is('foo', 'PATH1:PATH2')
198
TestCmd.where_is('foo', 'PATH1;PATH2', '.suffix3;.suffix4')
201
# Copyright 2000-2010 Steven Knight
202
# This module is free software, and you may redistribute it and/or modify
203
# it under the same terms as Python itself, so long as this copyright message
204
# and disclaimer are retained in their original form.
206
# IN NO EVENT SHALL THE AUTHOR BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT,
207
# SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF
208
# THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH
211
# THE AUTHOR SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT
212
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
213
# PARTICULAR PURPOSE. THE CODE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS,
214
# AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
215
# SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
217
__author__ = "Steven Knight <knight at baldmt dot com>"
218
__revision__ = "TestCmd.py 0.37.D001 2010/01/11 16:55:50 knight"
250
__all__.append('simple_diff')
253
return type(e) is types.ListType \
254
or isinstance(e, UserList.UserList)
257
from UserString import UserString
262
if hasattr(types, 'UnicodeType'):
264
return type(e) is types.StringType \
265
or type(e) is types.UnicodeType \
266
or isinstance(e, UserString)
269
return type(e) is types.StringType or isinstance(e, UserString)
271
tempfile.template = 'testcmd.'
272
if os.name in ('posix', 'nt'):
273
tempfile.template = 'testcmd.' + str(os.getpid()) + '.'
275
tempfile.template = 'testcmd.'
277
re_space = re.compile('\s')
281
_chain_to_exitfunc = None
285
cleanlist = filter(None, _Cleanup)
288
for test in cleanlist:
290
if _chain_to_exitfunc:
296
# TODO(1.5): atexit requires python 2.0, so chain sys.exitfunc
298
_chain_to_exitfunc = sys.exitfunc
299
except AttributeError:
301
sys.exitfunc = _clean
303
atexit.register(_clean)
310
for i in xrange(min(map(len, lists))):
311
result.append(tuple(map(lambda l, i=i: l[i], lists)))
315
def __init__(self, top):
317
def __call__(self, arg, dirname, names):
318
pathjoin = lambda n, d=dirname: os.path.join(d, n)
319
self.entries.extend(map(pathjoin, names))
321
def _caller(tblist, skip):
324
for file, line, name, text in tblist:
325
if file[-10:] == "TestCmd.py":
327
arr = [(file, line, name, text)] + arr
329
for file, line, name, text in arr[skip:]:
330
if name in ("?", "<module>"):
333
name = " (" + name + ")"
334
string = string + ("%s line %d of %s%s\n" % (atfrom, line, file, name))
338
def fail_test(self = None, condition = 1, function = None, skip = 0):
339
"""Cause the test to fail.
341
By default, the fail_test() method reports that the test FAILED
342
and exits with a status of 1. If a condition argument is supplied,
343
the test fails only if the condition is true.
347
if not function is None:
354
of = " of " + self.program
357
desc = " [" + self.description + "]"
360
at = _caller(traceback.extract_stack(), skip)
361
sys.stderr.write("FAILED test" + of + desc + sep + at)
365
def no_result(self = None, condition = 1, function = None, skip = 0):
366
"""Causes a test to exit with no valid result.
368
By default, the no_result() method reports NO RESULT for the test
369
and exits with a status of 2. If a condition argument is supplied,
370
the test fails only if the condition is true.
374
if not function is None:
381
of = " of " + self.program
384
desc = " [" + self.description + "]"
387
at = _caller(traceback.extract_stack(), skip)
388
sys.stderr.write("NO RESULT for test" + of + desc + sep + at)
392
def pass_test(self = None, condition = 1, function = None):
393
"""Causes a test to pass.
395
By default, the pass_test() method reports PASSED for the test
396
and exits with a status of 0. If a condition argument is supplied,
397
the test passes only if the condition is true.
401
if not function is None:
403
sys.stderr.write("PASSED\n")
406
def match_exact(lines = None, matches = None):
409
if not is_List(lines):
410
lines = string.split(lines, "\n")
411
if not is_List(matches):
412
matches = string.split(matches, "\n")
413
if len(lines) != len(matches):
415
for i in range(len(lines)):
416
if lines[i] != matches[i]:
420
def match_re(lines = None, res = None):
423
if not is_List(lines):
424
lines = string.split(lines, "\n")
426
res = string.split(res, "\n")
427
if len(lines) != len(res):
429
for i in range(len(lines)):
430
s = "^" + res[i] + "$"
434
msg = "Regular expression error in %s: %s"
435
raise re.error, msg % (repr(s), e[0])
436
if not expr.search(lines[i]):
440
def match_re_dotall(lines = None, res = None):
443
if not type(lines) is type(""):
444
lines = string.join(lines, "\n")
445
if not type(res) is type(""):
446
res = string.join(res, "\n")
449
expr = re.compile(s, re.DOTALL)
451
msg = "Regular expression error in %s: %s"
452
raise re.error, msg % (repr(s), e[0])
453
if expr.match(lines):
461
def simple_diff(a, b, fromfile='', tofile='',
462
fromfiledate='', tofiledate='', n=3, lineterm='\n'):
464
A function with the same calling signature as difflib.context_diff
465
(diff -c) and difflib.unified_diff (diff -u) but which prints
466
output like the simple, unadorned 'diff" command.
468
sm = difflib.SequenceMatcher(None, a, b)
470
return x1+1 == x2 and str(x2) or '%s,%s' % (x1+1, x2)
472
for op, a1, a2, b1, b2 in sm.get_opcodes():
474
result.append("%sd%d" % (comma(a1, a2), b1))
475
result.extend(map(lambda l: '< ' + l, a[a1:a2]))
477
result.append("%da%s" % (a1, comma(b1, b2)))
478
result.extend(map(lambda l: '> ' + l, b[b1:b2]))
479
elif op == 'replace':
480
result.append("%sc%s" % (comma(a1, a2), comma(b1, b2)))
481
result.extend(map(lambda l: '< ' + l, a[a1:a2]))
483
result.extend(map(lambda l: '> ' + l, b[b1:b2]))
486
def diff_re(a, b, fromfile='', tofile='',
487
fromfiledate='', tofiledate='', n=3, lineterm='\n'):
489
A simple "diff" of two sets of lines when the expected lines
490
are regular expressions. This is a really dumb thing that
491
just compares each line in turn, so it doesn't look for
492
chunks of matching lines and the like--but at least it lets
493
you know exactly which line first didn't compare correctl...
496
diff = len(a) - len(b)
502
for aline, bline in zip(a, b):
503
s = "^" + aline + "$"
507
msg = "Regular expression error in %s: %s"
508
raise re.error, msg % (repr(s), e[0])
509
if not expr.search(bline):
510
result.append("%sc%s" % (i+1, i+1))
511
result.append('< ' + repr(a[i]))
513
result.append('> ' + repr(b[i]))
517
if os.name == 'java':
519
python_executable = os.path.join(sys.prefix, 'jython')
523
python_executable = sys.executable
525
if sys.platform == 'win32':
527
default_sleep_seconds = 2
529
def where_is(file, path=None, pathext=None):
531
path = os.environ['PATH']
533
path = string.split(path, os.pathsep)
535
pathext = os.environ['PATHEXT']
536
if is_String(pathext):
537
pathext = string.split(pathext, os.pathsep)
539
if string.lower(ext) == string.lower(file[-len(ext):]):
543
f = os.path.join(dir, file)
546
if os.path.isfile(fext):
552
def where_is(file, path=None, pathext=None):
554
path = os.environ['PATH']
556
path = string.split(path, os.pathsep)
558
f = os.path.join(dir, file)
559
if os.path.isfile(f):
564
if stat.S_IMODE(st[stat.ST_MODE]) & 0111:
568
default_sleep_seconds = 1
575
# The subprocess module doesn't exist in this version of Python,
576
# so we're going to cobble up something that looks just enough
577
# like its API for our purposes below.
580
subprocess = new.module('subprocess')
582
subprocess.PIPE = 'PIPE'
583
subprocess.STDOUT = 'STDOUT'
584
subprocess.mswindows = (sys.platform == 'win32')
589
except AttributeError:
591
universal_newlines = 1
592
def __init__(self, command, **kw):
593
if sys.platform == 'win32' and command[0] == '"':
594
command = '"' + command + '"'
595
(stdin, stdout, stderr) = os.popen3(' ' + command)
599
def close_output(self):
601
self.resultcode = self.stderr.close()
603
resultcode = self.resultcode
604
if os.WIFEXITED(resultcode):
605
return os.WEXITSTATUS(resultcode)
606
elif os.WIFSIGNALED(resultcode):
607
return os.WTERMSIG(resultcode)
614
except AttributeError:
615
# A cribbed Popen4 class, with some retrofitted code from
616
# the Python 1.5 Popen3 class methods to do certain things
618
class Popen4(popen2.Popen3):
621
def __init__(self, cmd, bufsize=-1):
622
p2cread, p2cwrite = os.pipe()
623
c2pread, c2pwrite = os.pipe()
630
for i in range(3, popen2.MAXFD):
635
os.execvp(cmd[0], cmd)
638
# Shouldn't come here, I guess
641
self.tochild = os.fdopen(p2cwrite, 'w', bufsize)
643
self.fromchild = os.fdopen(c2pread, 'r', bufsize)
644
popen2._active.append(self)
646
popen2.Popen4 = Popen4
648
class Popen3(popen2.Popen3, popen2.Popen4):
649
universal_newlines = 1
650
def __init__(self, command, **kw):
651
if kw.get('stderr') == 'STDOUT':
652
apply(popen2.Popen4.__init__, (self, command, 1))
654
apply(popen2.Popen3.__init__, (self, command, 1))
655
self.stdin = self.tochild
656
self.stdout = self.fromchild
657
self.stderr = self.childerr
658
def wait(self, *args, **kw):
659
resultcode = apply(popen2.Popen3.wait, (self,)+args, kw)
660
if os.WIFEXITED(resultcode):
661
return os.WEXITSTATUS(resultcode)
662
elif os.WIFSIGNALED(resultcode):
663
return os.WTERMSIG(resultcode)
667
subprocess.Popen = Popen3
671
# From Josiah Carlson,
672
# ASPN : Python Cookbook : Module to allow Asynchronous subprocess use on Windows and Posix platforms
673
# http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/440554
675
PIPE = subprocess.PIPE
677
if subprocess.mswindows:
678
from win32file import ReadFile, WriteFile
679
from win32pipe import PeekNamedPipe
686
except AttributeError: fcntl.F_GETFL = 3
689
except AttributeError: fcntl.F_SETFL = 4
691
class Popen(subprocess.Popen):
692
def recv(self, maxsize=None):
693
return self._recv('stdout', maxsize)
695
def recv_err(self, maxsize=None):
696
return self._recv('stderr', maxsize)
698
def send_recv(self, input='', maxsize=None):
699
return self.send(input), self.recv(maxsize), self.recv_err(maxsize)
701
def get_conn_maxsize(self, which, maxsize):
706
return getattr(self, which), maxsize
708
def _close(self, which):
709
getattr(self, which).close()
710
setattr(self, which, None)
712
if subprocess.mswindows:
713
def send(self, input):
718
x = msvcrt.get_osfhandle(self.stdin.fileno())
719
(errCode, written) = WriteFile(x, input)
721
return self._close('stdin')
722
except (subprocess.pywintypes.error, Exception), why:
723
if why[0] in (109, errno.ESHUTDOWN):
724
return self._close('stdin')
729
def _recv(self, which, maxsize):
730
conn, maxsize = self.get_conn_maxsize(which, maxsize)
735
x = msvcrt.get_osfhandle(conn.fileno())
736
(read, nAvail, nMessage) = PeekNamedPipe(x, 0)
740
(errCode, read) = ReadFile(x, nAvail, None)
742
return self._close(which)
743
except (subprocess.pywintypes.error, Exception), why:
744
if why[0] in (109, errno.ESHUTDOWN):
745
return self._close(which)
748
#if self.universal_newlines:
749
# read = self._translate_newlines(read)
753
def send(self, input):
757
if not select.select([], [self.stdin], [], 0)[1]:
761
written = os.write(self.stdin.fileno(), input)
763
if why[0] == errno.EPIPE: #broken pipe
764
return self._close('stdin')
769
def _recv(self, which, maxsize):
770
conn, maxsize = self.get_conn_maxsize(which, maxsize)
775
flags = fcntl.fcntl(conn, fcntl.F_GETFL)
780
fcntl.fcntl(conn, fcntl.F_SETFL, flags| os.O_NONBLOCK)
783
if not select.select([conn], [], [], 0)[0]:
786
r = conn.read(maxsize)
788
return self._close(which)
790
#if self.universal_newlines:
791
# r = self._translate_newlines(r)
794
if not conn.closed and not flags is None:
795
fcntl.fcntl(conn, fcntl.F_SETFL, flags)
797
disconnect_message = "Other end disconnected!"
799
def recv_some(p, t=.1, e=1, tr=5, stderr=0):
808
while time.time() < x or r:
812
raise Exception(disconnect_message)
818
time.sleep(max((x-time.time())/tr, 0))
821
# TODO(3.0: rewrite to use memoryview()
822
def send_all(p, data):
826
raise Exception(disconnect_message)
827
data = buffer(data, sent)
839
class TestCmd(object):
843
def __init__(self, description = None,
852
universal_newlines = 1):
853
self._cwd = os.getcwd()
854
self.description_set(description)
855
self.program_set(program)
856
self.interpreter_set(interpreter)
859
verbose = max( 0, int(os.environ.get('TESTCMD_VERBOSE', 0)) )
862
self.verbose_set(verbose)
863
self.combine = combine
864
self.universal_newlines = universal_newlines
865
if not match is None:
866
self.match_function = match
868
self.match_function = match_re
870
self.diff_function = diff
877
self.diff_function = simple_diff
878
#self.diff_function = difflib.context_diff
879
#self.diff_function = difflib.unified_diff
881
self._preserve = {'pass_test': 0, 'fail_test': 0, 'no_result': 0}
882
if os.environ.has_key('PRESERVE') and not os.environ['PRESERVE'] is '':
883
self._preserve['pass_test'] = os.environ['PRESERVE']
884
self._preserve['fail_test'] = os.environ['PRESERVE']
885
self._preserve['no_result'] = os.environ['PRESERVE']
888
self._preserve['pass_test'] = os.environ['PRESERVE_PASS']
892
self._preserve['fail_test'] = os.environ['PRESERVE_FAIL']
896
self._preserve['no_result'] = os.environ['PRESERVE_NO_RESULT']
902
self.condition = 'no_result'
903
self.workdir_set(workdir)
910
return "%x" % id(self)
915
def banner(self, s, width=None):
917
width = self.banner_width
918
return s + self.banner_char * (width - len(s))
920
if os.name == 'posix':
922
def escape(self, arg):
923
"escape shell special characters"
927
arg = string.replace(arg, slash, slash+slash)
929
arg = string.replace(arg, c, slash+c)
931
if re_space.search(arg):
932
arg = '"' + arg + '"'
937
# Windows does not allow special characters in file names
938
# anyway, so no need for an escape function, we will just quote
940
def escape(self, arg):
941
if re_space.search(arg):
942
arg = '"' + arg + '"'
945
def canonicalize(self, path):
947
path = apply(os.path.join, tuple(path))
948
if not os.path.isabs(path):
949
path = os.path.join(self.workdir, path)
952
def chmod(self, path, mode):
953
"""Changes permissions on the specified file or directory
955
path = self.canonicalize(path)
958
def cleanup(self, condition = None):
959
"""Removes any temporary working directories for the specified
960
TestCmd environment. If the environment variable PRESERVE was
961
set when the TestCmd environment was created, temporary working
962
directories are not removed. If any of the environment variables
963
PRESERVE_PASS, PRESERVE_FAIL, or PRESERVE_NO_RESULT were set
964
when the TestCmd environment was created, then temporary working
965
directories are not removed if the test passed, failed, or had
966
no result, respectively. Temporary working directories are also
967
preserved for conditions specified via the preserve method.
969
Typically, this method is not called directly, but is used when
970
the script exits to clean up temporary working directories as
971
appropriate for the exit status.
973
if not self._dirlist:
977
if condition is None:
978
condition = self.condition
979
if self._preserve[condition]:
980
for dir in self._dirlist:
981
print "Preserved directory", dir
983
list = self._dirlist[:]
986
self.writable(dir, 1)
987
shutil.rmtree(dir, ignore_errors = 1)
992
_Cleanup.remove(self)
993
except (AttributeError, ValueError):
996
def command_args(self, program = None,
1000
if type(program) == type('') and not os.path.isabs(program):
1001
program = os.path.join(self._cwd, program)
1003
program = self.program
1005
interpreter = self.interpreter
1006
if not type(program) in [type([]), type(())]:
1010
if not type(interpreter) in [type([]), type(())]:
1011
interpreter = [interpreter]
1012
cmd = list(interpreter) + cmd
1014
if type(arguments) == type(''):
1015
arguments = string.split(arguments)
1016
cmd.extend(arguments)
1019
def description_set(self, description):
1020
"""Set the description of the functionality being tested.
1022
self.description = description
1027
def diff(self, a, b, name, *args, **kw):
1028
print self.banner('Expected %s' % name)
1030
print self.banner('Actual %s' % name)
1033
def diff(self, a, b, name, *args, **kw):
1034
print self.banner(name)
1035
args = (a.splitlines(), b.splitlines()) + args
1036
lines = apply(self.diff_function, args, kw)
1040
def fail_test(self, condition = 1, function = None, skip = 0):
1041
"""Cause the test to fail.
1045
self.condition = 'fail_test'
1046
fail_test(self = self,
1047
condition = condition,
1048
function = function,
1051
def interpreter_set(self, interpreter):
1052
"""Set the program to be used to interpret the program
1053
under test as a script.
1055
self.interpreter = interpreter
1057
def match(self, lines, matches):
1058
"""Compare actual and expected file contents.
1060
return self.match_function(lines, matches)
1062
def match_exact(self, lines, matches):
1063
"""Compare actual and expected file contents.
1065
return match_exact(lines, matches)
1067
def match_re(self, lines, res):
1068
"""Compare actual and expected file contents.
1070
return match_re(lines, res)
1072
def match_re_dotall(self, lines, res):
1073
"""Compare actual and expected file contents.
1075
return match_re_dotall(lines, res)
1077
def no_result(self, condition = 1, function = None, skip = 0):
1078
"""Report that the test could not be run.
1082
self.condition = 'no_result'
1083
no_result(self = self,
1084
condition = condition,
1085
function = function,
1088
def pass_test(self, condition = 1, function = None):
1089
"""Cause the test to pass.
1093
self.condition = 'pass_test'
1094
pass_test(self = self, condition = condition, function = function)
1096
def preserve(self, *conditions):
1097
"""Arrange for the temporary working directories for the
1098
specified TestCmd environment to be preserved for one or more
1099
conditions. If no conditions are specified, arranges for
1100
the temporary working directories to be preserved for all
1103
if conditions is ():
1104
conditions = ('pass_test', 'fail_test', 'no_result')
1105
for cond in conditions:
1106
self._preserve[cond] = 1
1108
def program_set(self, program):
1109
"""Set the executable program or script to be tested.
1111
if program and not os.path.isabs(program):
1112
program = os.path.join(self._cwd, program)
1113
self.program = program
1115
def read(self, file, mode = 'rb'):
1116
"""Reads and returns the contents of the specified file name.
1117
The file name may be a list, in which case the elements are
1118
concatenated with the os.path.join() method. The file is
1119
assumed to be under the temporary working directory unless it
1120
is an absolute path name. The I/O mode for the file may
1121
be specified; it must begin with an 'r'. The default is
1124
file = self.canonicalize(file)
1126
raise ValueError, "mode must begin with 'r'"
1127
return open(file, mode).read()
1129
def rmdir(self, dir):
1130
"""Removes the specified dir name.
1131
The dir name may be a list, in which case the elements are
1132
concatenated with the os.path.join() method. The dir is
1133
assumed to be under the temporary working directory unless it
1134
is an absolute path name.
1135
The dir must be empty.
1137
dir = self.canonicalize(dir)
1140
def start(self, program = None,
1143
universal_newlines = None,
1146
Starts a program or script for the test environment.
1148
The specified program will have the original directory
1149
prepended unless it is enclosed in a [list].
1151
cmd = self.command_args(program, interpreter, arguments)
1152
cmd_string = string.join(map(self.escape, cmd), ' ')
1154
sys.stderr.write(cmd_string + "\n")
1155
if universal_newlines is None:
1156
universal_newlines = self.universal_newlines
1158
# On Windows, if we make stdin a pipe when we plan to send
1159
# no input, and the test program exits before
1160
# Popen calls msvcrt.open_osfhandle, that call will fail.
1161
# So don't use a pipe for stdin if we don't need one.
1162
stdin = kw.get('stdin', None)
1163
if stdin is not None:
1164
stdin = subprocess.PIPE
1166
combine = kw.get('combine', self.combine)
1168
stderr_value = subprocess.STDOUT
1170
stderr_value = subprocess.PIPE
1174
stdout=subprocess.PIPE,
1175
stderr=stderr_value,
1176
universal_newlines=universal_newlines)
1178
def finish(self, popen, **kw):
1180
Finishes and waits for the process being run under control of
1181
the specified popen argument, recording the exit status,
1182
standard output and error output.
1185
self.status = popen.wait()
1188
self._stdout.append(popen.stdout.read())
1190
stderr = popen.stderr.read()
1193
self._stderr.append(stderr)
1195
def run(self, program = None,
1200
universal_newlines = None):
1201
"""Runs a test of the program or script for the test
1202
environment. Standard output and error output are saved for
1203
future retrieval via the stdout() and stderr() methods.
1205
The specified program will have the original directory
1206
prepended unless it is enclosed in a [list].
1209
oldcwd = os.getcwd()
1210
if not os.path.isabs(chdir):
1211
chdir = os.path.join(self.workpath(chdir))
1213
sys.stderr.write("chdir(" + chdir + ")\n")
1215
p = self.start(program,
1225
p.stdin.write(stdin)
1228
out = p.stdout.read()
1229
if p.stderr is None:
1232
err = p.stderr.read()
1234
close_output = p.close_output
1235
except AttributeError:
1237
if not p.stderr is None:
1242
self._stdout.append(out)
1243
self._stderr.append(err)
1245
self.status = p.wait()
1251
if self.verbose >= 2:
1252
write = sys.stdout.write
1253
write('============ STATUS: %d\n' % self.status)
1255
if out or self.verbose >= 3:
1256
write('============ BEGIN STDOUT (len=%d):\n' % len(out))
1258
write('============ END STDOUT\n')
1260
if err or self.verbose >= 3:
1261
write('============ BEGIN STDERR (len=%d)\n' % len(err))
1263
write('============ END STDERR\n')
1265
def sleep(self, seconds = default_sleep_seconds):
1266
"""Sleeps at least the specified number of seconds. If no
1267
number is specified, sleeps at least the minimum number of
1268
seconds necessary to advance file time stamps on the current
1269
system. Sleeping more seconds is all right.
1273
def stderr(self, run = None):
1274
"""Returns the error output from the specified run number.
1275
If there is no specified run number, then returns the error
1276
output of the last run. If the run number is less than zero,
1277
then returns the error output from that many runs back from the
1281
run = len(self._stderr)
1283
run = len(self._stderr) + run
1285
return self._stderr[run]
1287
def stdout(self, run = None):
1288
"""Returns the standard output from the specified run number.
1289
If there is no specified run number, then returns the standard
1290
output of the last run. If the run number is less than zero,
1291
then returns the standard output from that many runs back from
1295
run = len(self._stdout)
1297
run = len(self._stdout) + run
1299
return self._stdout[run]
1301
def subdir(self, *subdirs):
1302
"""Create new subdirectories under the temporary working
1303
directory, one for each argument. An argument may be a list,
1304
in which case the list elements are concatenated using the
1305
os.path.join() method. Subdirectories multiple levels deep
1306
must be created using a separate argument for each level:
1308
test.subdir('sub', ['sub', 'dir'], ['sub', 'dir', 'ectory'])
1310
Returns the number of subdirectories actually created.
1317
sub = apply(os.path.join, tuple(sub))
1318
new = os.path.join(self.workdir, sub)
1327
def symlink(self, target, link):
1328
"""Creates a symlink to the specified target.
1329
The link name may be a list, in which case the elements are
1330
concatenated with the os.path.join() method. The link is
1331
assumed to be under the temporary working directory unless it
1332
is an absolute path name. The target is *not* assumed to be
1333
under the temporary working directory.
1335
link = self.canonicalize(link)
1336
os.symlink(target, link)
1338
def tempdir(self, path=None):
1339
"""Creates a temporary directory.
1340
A unique directory name is generated if no path name is specified.
1341
The directory is created, and will be removed when the TestCmd
1342
object is destroyed.
1346
path = tempfile.mktemp(prefix=tempfile.template)
1348
path = tempfile.mktemp()
1351
# Symlinks in the path will report things
1352
# differently from os.getcwd(), so chdir there
1353
# and back to fetch the canonical path.
1361
# Uppercase the drive letter since the case of drive
1362
# letters is pretty much random on win32:
1363
drive,rest = os.path.splitdrive(path)
1365
path = string.upper(drive) + rest
1368
self._dirlist.append(path)
1371
_Cleanup.index(self)
1373
_Cleanup.append(self)
1377
def touch(self, path, mtime=None):
1378
"""Updates the modification time on the specified file or
1379
directory path name. The default is to update to the
1380
current time if no explicit modification time is specified.
1382
path = self.canonicalize(path)
1383
atime = os.path.getatime(path)
1386
os.utime(path, (atime, mtime))
1388
def unlink(self, file):
1389
"""Unlinks the specified file name.
1390
The file name may be a list, in which case the elements are
1391
concatenated with the os.path.join() method. The file is
1392
assumed to be under the temporary working directory unless it
1393
is an absolute path name.
1395
file = self.canonicalize(file)
1398
def verbose_set(self, verbose):
1399
"""Set the verbose level.
1401
self.verbose = verbose
1403
def where_is(self, file, path=None, pathext=None):
1404
"""Find an executable file.
1407
file = apply(os.path.join, tuple(file))
1408
if not os.path.isabs(file):
1409
file = where_is(file, path, pathext)
1412
def workdir_set(self, path):
1413
"""Creates a temporary working directory with the specified
1414
path name. If the path is a null string (''), a unique
1415
directory name is created.
1420
path = self.tempdir(path)
1423
def workpath(self, *args):
1424
"""Returns the absolute path name to a subdirectory or file
1425
within the current temporary working directory. Concatenates
1426
the temporary working directory name with the specified
1427
arguments using the os.path.join() method.
1429
return apply(os.path.join, (self.workdir,) + tuple(args))
1431
def readable(self, top, read=1):
1432
"""Make the specified directory tree readable (read == 1)
1433
or not (read == None).
1435
This method has no effect on Windows systems, which use a
1436
completely different mechanism to control file readability.
1439
if sys.platform == 'win32':
1443
def do_chmod(fname):
1444
try: st = os.stat(fname)
1445
except OSError: pass
1446
else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|stat.S_IREAD))
1448
def do_chmod(fname):
1449
try: st = os.stat(fname)
1450
except OSError: pass
1451
else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~stat.S_IREAD))
1453
if os.path.isfile(top):
1454
# If it's a file, that's easy, just chmod it.
1457
# It's a directory and we're trying to turn on read
1458
# permission, so it's also pretty easy, just chmod the
1459
# directory and then chmod every entry on our walk down the
1460
# tree. Because os.path.walk() is top-down, we'll enable
1461
# read permission on any directories that have it disabled
1462
# before os.path.walk() tries to list their contents.
1465
def chmod_entries(arg, dirname, names, do_chmod=do_chmod):
1467
do_chmod(os.path.join(dirname, n))
1469
os.path.walk(top, chmod_entries, None)
1471
# It's a directory and we're trying to turn off read
1472
# permission, which means we have to chmod the directoreis
1473
# in the tree bottom-up, lest disabling read permission from
1474
# the top down get in the way of being able to get at lower
1475
# parts of the tree. But os.path.walk() visits things top
1476
# down, so we just use an object to collect a list of all
1477
# of the entries in the tree, reverse the list, and then
1478
# chmod the reversed (bottom-up) list.
1479
col = Collector(top)
1480
os.path.walk(top, col, None)
1481
col.entries.reverse()
1482
for d in col.entries: do_chmod(d)
1484
def writable(self, top, write=1):
1485
"""Make the specified directory tree writable (write == 1)
1486
or not (write == None).
1489
if sys.platform == 'win32':
1492
def do_chmod(fname):
1493
try: os.chmod(fname, stat.S_IWRITE)
1494
except OSError: pass
1496
def do_chmod(fname):
1497
try: os.chmod(fname, stat.S_IREAD)
1498
except OSError: pass
1503
def do_chmod(fname):
1504
try: st = os.stat(fname)
1505
except OSError: pass
1506
else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|0200))
1508
def do_chmod(fname):
1509
try: st = os.stat(fname)
1510
except OSError: pass
1511
else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~0200))
1513
if os.path.isfile(top):
1516
col = Collector(top)
1517
os.path.walk(top, col, None)
1518
for d in col.entries: do_chmod(d)
1520
def executable(self, top, execute=1):
1521
"""Make the specified directory tree executable (execute == 1)
1522
or not (execute == None).
1524
This method has no effect on Windows systems, which use a
1525
completely different mechanism to control file executability.
1528
if sys.platform == 'win32':
1532
def do_chmod(fname):
1533
try: st = os.stat(fname)
1534
except OSError: pass
1535
else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|stat.S_IEXEC))
1537
def do_chmod(fname):
1538
try: st = os.stat(fname)
1539
except OSError: pass
1540
else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~stat.S_IEXEC))
1542
if os.path.isfile(top):
1543
# If it's a file, that's easy, just chmod it.
1546
# It's a directory and we're trying to turn on execute
1547
# permission, so it's also pretty easy, just chmod the
1548
# directory and then chmod every entry on our walk down the
1549
# tree. Because os.path.walk() is top-down, we'll enable
1550
# execute permission on any directories that have it disabled
1551
# before os.path.walk() tries to list their contents.
1554
def chmod_entries(arg, dirname, names, do_chmod=do_chmod):
1556
do_chmod(os.path.join(dirname, n))
1558
os.path.walk(top, chmod_entries, None)
1560
# It's a directory and we're trying to turn off execute
1561
# permission, which means we have to chmod the directories
1562
# in the tree bottom-up, lest disabling execute permission from
1563
# the top down get in the way of being able to get at lower
1564
# parts of the tree. But os.path.walk() visits things top
1565
# down, so we just use an object to collect a list of all
1566
# of the entries in the tree, reverse the list, and then
1567
# chmod the reversed (bottom-up) list.
1568
col = Collector(top)
1569
os.path.walk(top, col, None)
1570
col.entries.reverse()
1571
for d in col.entries: do_chmod(d)
1573
def write(self, file, content, mode = 'wb'):
1574
"""Writes the specified content text (second argument) to the
1575
specified file name (first argument). The file name may be
1576
a list, in which case the elements are concatenated with the
1577
os.path.join() method. The file is created under the temporary
1578
working directory. Any subdirectories in the path must already
1579
exist. The I/O mode for the file may be specified; it must
1580
begin with a 'w'. The default is 'wb' (binary write).
1582
file = self.canonicalize(file)
1584
raise ValueError, "mode must begin with 'w'"
1585
open(file, mode).write(content)
1589
# indent-tabs-mode:nil
1591
# vim: set expandtab tabstop=4 shiftwidth=4: