~ubuntu-branches/ubuntu/oneiric/mozc/oneiric

« back to all changes in this revision

Viewing changes to third_party/gyp/test/lib/TestCmd.py

  • Committer: Bazaar Package Importer
  • Author(s): Nobuhiro Iwamatsu
  • Date: 2010-07-14 03:26:47 UTC
  • mfrom: (1.1.1 upstream)
  • Revision ID: james.westby@ubuntu.com-20100714032647-13qjisj6m8cm8jdx
Tags: 0.12.410.102-1
* New upstream release (Closes: #588971).
  - Add mozc-server, mozc-utils-gui and scim-mozc packages.
* Update debian/rules.
  Add --gypdir option to build_mozc.py.
* Update debian/control.
  - Bumped standards-version to 3.9.0.
  - Update description.
* Add mozc icon (Closes: #588972).
* Add patch which revises issue 18.
  ibus_mozc_issue18.patch
* kFreeBSD build support.
  support_kfreebsd.patch

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
"""
 
2
TestCmd.py:  a testing framework for commands and scripts.
 
3
 
 
4
The TestCmd module provides a framework for portable automated testing
 
5
of executable commands and scripts (in any language, not just Python),
 
6
especially commands and scripts that require file system interaction.
 
7
 
 
8
In addition to running tests and evaluating conditions, the TestCmd
 
9
module manages and cleans up one or more temporary workspace
 
10
directories, and provides methods for creating files and directories in
 
11
those workspace directories from in-line data, here-documents), allowing
 
12
tests to be completely self-contained.
 
13
 
 
14
A TestCmd environment object is created via the usual invocation:
 
15
 
 
16
    import TestCmd
 
17
    test = TestCmd.TestCmd()
 
18
 
 
19
There are a bunch of keyword arguments available at instantiation:
 
20
 
 
21
    test = TestCmd.TestCmd(description = 'string',
 
22
                           program = 'program_or_script_to_test',
 
23
                           interpreter = 'script_interpreter',
 
24
                           workdir = 'prefix',
 
25
                           subdir = 'subdir',
 
26
                           verbose = Boolean,
 
27
                           match = default_match_function,
 
28
                           diff = default_diff_function,
 
29
                           combine = Boolean)
 
30
 
 
31
There are a bunch of methods that let you do different things:
 
32
 
 
33
    test.verbose_set(1)
 
34
 
 
35
    test.description_set('string')
 
36
 
 
37
    test.program_set('program_or_script_to_test')
 
38
 
 
39
    test.interpreter_set('script_interpreter')
 
40
    test.interpreter_set(['script_interpreter', 'arg'])
 
41
 
 
42
    test.workdir_set('prefix')
 
43
    test.workdir_set('')
 
44
 
 
45
    test.workpath('file')
 
46
    test.workpath('subdir', 'file')
 
47
 
 
48
    test.subdir('subdir', ...)
 
49
 
 
50
    test.rmdir('subdir', ...)
 
51
 
 
52
    test.write('file', "contents\n")
 
53
    test.write(['subdir', 'file'], "contents\n")
 
54
 
 
55
    test.read('file')
 
56
    test.read(['subdir', 'file'])
 
57
    test.read('file', mode)
 
58
    test.read(['subdir', 'file'], mode)
 
59
 
 
60
    test.writable('dir', 1)
 
61
    test.writable('dir', None)
 
62
 
 
63
    test.preserve(condition, ...)
 
64
 
 
65
    test.cleanup(condition)
 
66
 
 
67
    test.command_args(program = 'program_or_script_to_run',
 
68
                      interpreter = 'script_interpreter',
 
69
                      arguments = 'arguments to pass to program')
 
70
 
 
71
    test.run(program = 'program_or_script_to_run',
 
72
             interpreter = 'script_interpreter',
 
73
             arguments = 'arguments to pass to program',
 
74
             chdir = 'directory_to_chdir_to',
 
75
             stdin = 'input to feed to the program\n')
 
76
             universal_newlines = True)
 
77
 
 
78
    p = test.start(program = 'program_or_script_to_run',
 
79
                   interpreter = 'script_interpreter',
 
80
                   arguments = 'arguments to pass to program',
 
81
                   universal_newlines = None)
 
82
 
 
83
    test.finish(self, p)
 
84
 
 
85
    test.pass_test()
 
86
    test.pass_test(condition)
 
87
    test.pass_test(condition, function)
 
88
 
 
89
    test.fail_test()
 
90
    test.fail_test(condition)
 
91
    test.fail_test(condition, function)
 
92
    test.fail_test(condition, function, skip)
 
93
 
 
94
    test.no_result()
 
95
    test.no_result(condition)
 
96
    test.no_result(condition, function)
 
97
    test.no_result(condition, function, skip)
 
98
 
 
99
    test.stdout()
 
100
    test.stdout(run)
 
101
 
 
102
    test.stderr()
 
103
    test.stderr(run)
 
104
 
 
105
    test.symlink(target, link)
 
106
 
 
107
    test.banner(string)
 
108
    test.banner(string, width)
 
109
 
 
110
    test.diff(actual, expected)
 
111
 
 
112
    test.match(actual, expected)
 
113
 
 
114
    test.match_exact("actual 1\nactual 2\n", "expected 1\nexpected 2\n")
 
115
    test.match_exact(["actual 1\n", "actual 2\n"],
 
116
                     ["expected 1\n", "expected 2\n"])
 
117
 
 
118
    test.match_re("actual 1\nactual 2\n", regex_string)
 
119
    test.match_re(["actual 1\n", "actual 2\n"], list_of_regexes)
 
120
 
 
121
    test.match_re_dotall("actual 1\nactual 2\n", regex_string)
 
122
    test.match_re_dotall(["actual 1\n", "actual 2\n"], list_of_regexes)
 
123
 
 
124
    test.tempdir()
 
125
    test.tempdir('temporary-directory')
 
126
 
 
127
    test.sleep()
 
128
    test.sleep(seconds)
 
129
 
 
130
    test.where_is('foo')
 
131
    test.where_is('foo', 'PATH1:PATH2')
 
132
    test.where_is('foo', 'PATH1;PATH2', '.suffix3;.suffix4')
 
133
 
 
134
    test.unlink('file')
 
135
    test.unlink('subdir', 'file')
 
136
 
 
137
The TestCmd module provides pass_test(), fail_test(), and no_result()
 
138
unbound functions that report test results for use with the Aegis change
 
139
management system.  These methods terminate the test immediately,
 
140
reporting PASSED, FAILED, or NO RESULT respectively, and exiting with
 
141
status 0 (success), 1 or 2 respectively.  This allows for a distinction
 
142
between an actual failed test and a test that could not be properly
 
143
evaluated because of an external condition (such as a full file system
 
144
or incorrect permissions).
 
145
 
 
146
    import TestCmd
 
147
 
 
148
    TestCmd.pass_test()
 
149
    TestCmd.pass_test(condition)
 
150
    TestCmd.pass_test(condition, function)
 
151
 
 
152
    TestCmd.fail_test()
 
153
    TestCmd.fail_test(condition)
 
154
    TestCmd.fail_test(condition, function)
 
155
    TestCmd.fail_test(condition, function, skip)
 
156
 
 
157
    TestCmd.no_result()
 
158
    TestCmd.no_result(condition)
 
159
    TestCmd.no_result(condition, function)
 
160
    TestCmd.no_result(condition, function, skip)
 
161
 
 
162
The TestCmd module also provides unbound functions that handle matching
 
163
in the same way as the match_*() methods described above.
 
164
 
 
165
    import TestCmd
 
166
 
 
167
    test = TestCmd.TestCmd(match = TestCmd.match_exact)
 
168
 
 
169
    test = TestCmd.TestCmd(match = TestCmd.match_re)
 
170
 
 
171
    test = TestCmd.TestCmd(match = TestCmd.match_re_dotall)
 
172
 
 
173
The TestCmd module provides unbound functions that can be used for the
 
174
"diff" argument to TestCmd.TestCmd instantiation:
 
175
 
 
176
    import TestCmd
 
177
 
 
178
    test = TestCmd.TestCmd(match = TestCmd.match_re,
 
179
                           diff = TestCmd.diff_re)
 
180
 
 
181
    test = TestCmd.TestCmd(diff = TestCmd.simple_diff)
 
182
 
 
183
The "diff" argument can also be used with standard difflib functions:
 
184
 
 
185
    import difflib
 
186
 
 
187
    test = TestCmd.TestCmd(diff = difflib.context_diff)
 
188
 
 
189
    test = TestCmd.TestCmd(diff = difflib.unified_diff)
 
190
 
 
191
Lastly, the where_is() method also exists in an unbound function
 
192
version.
 
193
 
 
194
    import TestCmd
 
195
 
 
196
    TestCmd.where_is('foo')
 
197
    TestCmd.where_is('foo', 'PATH1:PATH2')
 
198
    TestCmd.where_is('foo', 'PATH1;PATH2', '.suffix3;.suffix4')
 
199
"""
 
200
 
 
201
# Copyright 2000-2010 Steven Knight
 
202
# This module is free software, and you may redistribute it and/or modify
 
203
# it under the same terms as Python itself, so long as this copyright message
 
204
# and disclaimer are retained in their original form.
 
205
#
 
206
# IN NO EVENT SHALL THE AUTHOR BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT,
 
207
# SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF
 
208
# THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH
 
209
# DAMAGE.
 
210
#
 
211
# THE AUTHOR SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT
 
212
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
 
213
# PARTICULAR PURPOSE.  THE CODE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS,
 
214
# AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
 
215
# SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
 
216
 
 
217
__author__ = "Steven Knight <knight at baldmt dot com>"
 
218
__revision__ = "TestCmd.py 0.37.D001 2010/01/11 16:55:50 knight"
 
219
__version__ = "0.37"
 
220
 
 
221
import errno
 
222
import os
 
223
import os.path
 
224
import re
 
225
import shutil
 
226
import stat
 
227
import string
 
228
import sys
 
229
import tempfile
 
230
import time
 
231
import traceback
 
232
import types
 
233
import UserList
 
234
 
 
235
__all__ = [
 
236
    'diff_re',
 
237
    'fail_test',
 
238
    'no_result',
 
239
    'pass_test',
 
240
    'match_exact',
 
241
    'match_re',
 
242
    'match_re_dotall',
 
243
    'python_executable',
 
244
    'TestCmd'
 
245
]
 
246
 
 
247
try:
 
248
    import difflib
 
249
except ImportError:
 
250
    __all__.append('simple_diff')
 
251
 
 
252
def is_List(e):
 
253
    return type(e) is types.ListType \
 
254
        or isinstance(e, UserList.UserList)
 
255
 
 
256
try:
 
257
    from UserString import UserString
 
258
except ImportError:
 
259
    class UserString:
 
260
        pass
 
261
 
 
262
if hasattr(types, 'UnicodeType'):
 
263
    def is_String(e):
 
264
        return type(e) is types.StringType \
 
265
            or type(e) is types.UnicodeType \
 
266
            or isinstance(e, UserString)
 
267
else:
 
268
    def is_String(e):
 
269
        return type(e) is types.StringType or isinstance(e, UserString)
 
270
 
 
271
tempfile.template = 'testcmd.'
 
272
if os.name in ('posix', 'nt'):
 
273
    tempfile.template = 'testcmd.' + str(os.getpid()) + '.'
 
274
else:
 
275
    tempfile.template = 'testcmd.'
 
276
 
 
277
re_space = re.compile('\s')
 
278
 
 
279
_Cleanup = []
 
280
 
 
281
_chain_to_exitfunc = None
 
282
 
 
283
def _clean():
 
284
    global _Cleanup
 
285
    cleanlist = filter(None, _Cleanup)
 
286
    del _Cleanup[:]
 
287
    cleanlist.reverse()
 
288
    for test in cleanlist:
 
289
        test.cleanup()
 
290
    if _chain_to_exitfunc:
 
291
        _chain_to_exitfunc()
 
292
 
 
293
try:
 
294
    import atexit
 
295
except ImportError:
 
296
    # TODO(1.5): atexit requires python 2.0, so chain sys.exitfunc
 
297
    try:
 
298
        _chain_to_exitfunc = sys.exitfunc
 
299
    except AttributeError:
 
300
        pass
 
301
    sys.exitfunc = _clean
 
302
else:
 
303
    atexit.register(_clean)
 
304
 
 
305
try:
 
306
    zip
 
307
except NameError:
 
308
    def zip(*lists):
 
309
        result = []
 
310
        for i in xrange(min(map(len, lists))):
 
311
            result.append(tuple(map(lambda l, i=i: l[i], lists)))
 
312
        return result
 
313
 
 
314
class Collector:
 
315
    def __init__(self, top):
 
316
        self.entries = [top]
 
317
    def __call__(self, arg, dirname, names):
 
318
        pathjoin = lambda n, d=dirname: os.path.join(d, n)
 
319
        self.entries.extend(map(pathjoin, names))
 
320
 
 
321
def _caller(tblist, skip):
 
322
    string = ""
 
323
    arr = []
 
324
    for file, line, name, text in tblist:
 
325
        if file[-10:] == "TestCmd.py":
 
326
                break
 
327
        arr = [(file, line, name, text)] + arr
 
328
    atfrom = "at"
 
329
    for file, line, name, text in arr[skip:]:
 
330
        if name in ("?", "<module>"):
 
331
            name = ""
 
332
        else:
 
333
            name = " (" + name + ")"
 
334
        string = string + ("%s line %d of %s%s\n" % (atfrom, line, file, name))
 
335
        atfrom = "\tfrom"
 
336
    return string
 
337
 
 
338
def fail_test(self = None, condition = 1, function = None, skip = 0):
 
339
    """Cause the test to fail.
 
340
 
 
341
    By default, the fail_test() method reports that the test FAILED
 
342
    and exits with a status of 1.  If a condition argument is supplied,
 
343
    the test fails only if the condition is true.
 
344
    """
 
345
    if not condition:
 
346
        return
 
347
    if not function is None:
 
348
        function()
 
349
    of = ""
 
350
    desc = ""
 
351
    sep = " "
 
352
    if not self is None:
 
353
        if self.program:
 
354
            of = " of " + self.program
 
355
            sep = "\n\t"
 
356
        if self.description:
 
357
            desc = " [" + self.description + "]"
 
358
            sep = "\n\t"
 
359
 
 
360
    at = _caller(traceback.extract_stack(), skip)
 
361
    sys.stderr.write("FAILED test" + of + desc + sep + at)
 
362
 
 
363
    sys.exit(1)
 
364
 
 
365
def no_result(self = None, condition = 1, function = None, skip = 0):
 
366
    """Causes a test to exit with no valid result.
 
367
 
 
368
    By default, the no_result() method reports NO RESULT for the test
 
369
    and exits with a status of 2.  If a condition argument is supplied,
 
370
    the test fails only if the condition is true.
 
371
    """
 
372
    if not condition:
 
373
        return
 
374
    if not function is None:
 
375
        function()
 
376
    of = ""
 
377
    desc = ""
 
378
    sep = " "
 
379
    if not self is None:
 
380
        if self.program:
 
381
            of = " of " + self.program
 
382
            sep = "\n\t"
 
383
        if self.description:
 
384
            desc = " [" + self.description + "]"
 
385
            sep = "\n\t"
 
386
 
 
387
    at = _caller(traceback.extract_stack(), skip)
 
388
    sys.stderr.write("NO RESULT for test" + of + desc + sep + at)
 
389
 
 
390
    sys.exit(2)
 
391
 
 
392
def pass_test(self = None, condition = 1, function = None):
 
393
    """Causes a test to pass.
 
394
 
 
395
    By default, the pass_test() method reports PASSED for the test
 
396
    and exits with a status of 0.  If a condition argument is supplied,
 
397
    the test passes only if the condition is true.
 
398
    """
 
399
    if not condition:
 
400
        return
 
401
    if not function is None:
 
402
        function()
 
403
    sys.stderr.write("PASSED\n")
 
404
    sys.exit(0)
 
405
 
 
406
def match_exact(lines = None, matches = None):
 
407
    """
 
408
    """
 
409
    if not is_List(lines):
 
410
        lines = string.split(lines, "\n")
 
411
    if not is_List(matches):
 
412
        matches = string.split(matches, "\n")
 
413
    if len(lines) != len(matches):
 
414
        return
 
415
    for i in range(len(lines)):
 
416
        if lines[i] != matches[i]:
 
417
            return
 
418
    return 1
 
419
 
 
420
def match_re(lines = None, res = None):
 
421
    """
 
422
    """
 
423
    if not is_List(lines):
 
424
        lines = string.split(lines, "\n")
 
425
    if not is_List(res):
 
426
        res = string.split(res, "\n")
 
427
    if len(lines) != len(res):
 
428
        return
 
429
    for i in range(len(lines)):
 
430
        s = "^" + res[i] + "$"
 
431
        try:
 
432
            expr = re.compile(s)
 
433
        except re.error, e:
 
434
            msg = "Regular expression error in %s: %s"
 
435
            raise re.error, msg % (repr(s), e[0])
 
436
        if not expr.search(lines[i]):
 
437
            return
 
438
    return 1
 
439
 
 
440
def match_re_dotall(lines = None, res = None):
 
441
    """
 
442
    """
 
443
    if not type(lines) is type(""):
 
444
        lines = string.join(lines, "\n")
 
445
    if not type(res) is type(""):
 
446
        res = string.join(res, "\n")
 
447
    s = "^" + res + "$"
 
448
    try:
 
449
        expr = re.compile(s, re.DOTALL)
 
450
    except re.error, e:
 
451
        msg = "Regular expression error in %s: %s"
 
452
        raise re.error, msg % (repr(s), e[0])
 
453
    if expr.match(lines):
 
454
        return 1
 
455
 
 
456
try:
 
457
    import difflib
 
458
except ImportError:
 
459
    pass
 
460
else:
 
461
    def simple_diff(a, b, fromfile='', tofile='',
 
462
                    fromfiledate='', tofiledate='', n=3, lineterm='\n'):
 
463
        """
 
464
        A function with the same calling signature as difflib.context_diff
 
465
        (diff -c) and difflib.unified_diff (diff -u) but which prints
 
466
        output like the simple, unadorned 'diff" command.
 
467
        """
 
468
        sm = difflib.SequenceMatcher(None, a, b)
 
469
        def comma(x1, x2):
 
470
            return x1+1 == x2 and str(x2) or '%s,%s' % (x1+1, x2)
 
471
        result = []
 
472
        for op, a1, a2, b1, b2 in sm.get_opcodes():
 
473
            if op == 'delete':
 
474
                result.append("%sd%d" % (comma(a1, a2), b1))
 
475
                result.extend(map(lambda l: '< ' + l, a[a1:a2]))
 
476
            elif op == 'insert':
 
477
                result.append("%da%s" % (a1, comma(b1, b2)))
 
478
                result.extend(map(lambda l: '> ' + l, b[b1:b2]))
 
479
            elif op == 'replace':
 
480
                result.append("%sc%s" % (comma(a1, a2), comma(b1, b2)))
 
481
                result.extend(map(lambda l: '< ' + l, a[a1:a2]))
 
482
                result.append('---')
 
483
                result.extend(map(lambda l: '> ' + l, b[b1:b2]))
 
484
        return result
 
485
 
 
486
def diff_re(a, b, fromfile='', tofile='',
 
487
                fromfiledate='', tofiledate='', n=3, lineterm='\n'):
 
488
    """
 
489
    A simple "diff" of two sets of lines when the expected lines
 
490
    are regular expressions.  This is a really dumb thing that
 
491
    just compares each line in turn, so it doesn't look for
 
492
    chunks of matching lines and the like--but at least it lets
 
493
    you know exactly which line first didn't compare correctl...
 
494
    """
 
495
    result = []
 
496
    diff = len(a) - len(b)
 
497
    if diff < 0:
 
498
        a = a + ['']*(-diff)
 
499
    elif diff > 0:
 
500
        b = b + ['']*diff
 
501
    i = 0
 
502
    for aline, bline in zip(a, b):
 
503
        s = "^" + aline + "$"
 
504
        try:
 
505
            expr = re.compile(s)
 
506
        except re.error, e:
 
507
            msg = "Regular expression error in %s: %s"
 
508
            raise re.error, msg % (repr(s), e[0])
 
509
        if not expr.search(bline):
 
510
            result.append("%sc%s" % (i+1, i+1))
 
511
            result.append('< ' + repr(a[i]))
 
512
            result.append('---')
 
513
            result.append('> ' + repr(b[i]))
 
514
        i = i+1
 
515
    return result
 
516
 
 
517
if os.name == 'java':
 
518
 
 
519
    python_executable = os.path.join(sys.prefix, 'jython')
 
520
 
 
521
else:
 
522
 
 
523
    python_executable = sys.executable
 
524
 
 
525
if sys.platform == 'win32':
 
526
 
 
527
    default_sleep_seconds = 2
 
528
 
 
529
    def where_is(file, path=None, pathext=None):
 
530
        if path is None:
 
531
            path = os.environ['PATH']
 
532
        if is_String(path):
 
533
            path = string.split(path, os.pathsep)
 
534
        if pathext is None:
 
535
            pathext = os.environ['PATHEXT']
 
536
        if is_String(pathext):
 
537
            pathext = string.split(pathext, os.pathsep)
 
538
        for ext in pathext:
 
539
            if string.lower(ext) == string.lower(file[-len(ext):]):
 
540
                pathext = ['']
 
541
                break
 
542
        for dir in path:
 
543
            f = os.path.join(dir, file)
 
544
            for ext in pathext:
 
545
                fext = f + ext
 
546
                if os.path.isfile(fext):
 
547
                    return fext
 
548
        return None
 
549
 
 
550
else:
 
551
 
 
552
    def where_is(file, path=None, pathext=None):
 
553
        if path is None:
 
554
            path = os.environ['PATH']
 
555
        if is_String(path):
 
556
            path = string.split(path, os.pathsep)
 
557
        for dir in path:
 
558
            f = os.path.join(dir, file)
 
559
            if os.path.isfile(f):
 
560
                try:
 
561
                    st = os.stat(f)
 
562
                except OSError:
 
563
                    continue
 
564
                if stat.S_IMODE(st[stat.ST_MODE]) & 0111:
 
565
                    return f
 
566
        return None
 
567
 
 
568
    default_sleep_seconds = 1
 
569
 
 
570
 
 
571
 
 
572
try:
 
573
    import subprocess
 
574
except ImportError:
 
575
    # The subprocess module doesn't exist in this version of Python,
 
576
    # so we're going to cobble up something that looks just enough
 
577
    # like its API for our purposes below.
 
578
    import new
 
579
 
 
580
    subprocess = new.module('subprocess')
 
581
 
 
582
    subprocess.PIPE = 'PIPE'
 
583
    subprocess.STDOUT = 'STDOUT'
 
584
    subprocess.mswindows = (sys.platform == 'win32')
 
585
 
 
586
    try:
 
587
        import popen2
 
588
        popen2.Popen3
 
589
    except AttributeError:
 
590
        class Popen3:
 
591
            universal_newlines = 1
 
592
            def __init__(self, command, **kw):
 
593
                if sys.platform == 'win32' and command[0] == '"':
 
594
                    command = '"' + command + '"'
 
595
                (stdin, stdout, stderr) = os.popen3(' ' + command)
 
596
                self.stdin = stdin
 
597
                self.stdout = stdout
 
598
                self.stderr = stderr
 
599
            def close_output(self):
 
600
                self.stdout.close()
 
601
                self.resultcode = self.stderr.close()
 
602
            def wait(self):
 
603
                resultcode = self.resultcode
 
604
                if os.WIFEXITED(resultcode):
 
605
                    return os.WEXITSTATUS(resultcode)
 
606
                elif os.WIFSIGNALED(resultcode):
 
607
                    return os.WTERMSIG(resultcode)
 
608
                else:
 
609
                    return None
 
610
 
 
611
    else:
 
612
        try:
 
613
            popen2.Popen4
 
614
        except AttributeError:
 
615
            # A cribbed Popen4 class, with some retrofitted code from
 
616
            # the Python 1.5 Popen3 class methods to do certain things
 
617
            # by hand.
 
618
            class Popen4(popen2.Popen3):
 
619
                childerr = None
 
620
 
 
621
                def __init__(self, cmd, bufsize=-1):
 
622
                    p2cread, p2cwrite = os.pipe()
 
623
                    c2pread, c2pwrite = os.pipe()
 
624
                    self.pid = os.fork()
 
625
                    if self.pid == 0:
 
626
                        # Child
 
627
                        os.dup2(p2cread, 0)
 
628
                        os.dup2(c2pwrite, 1)
 
629
                        os.dup2(c2pwrite, 2)
 
630
                        for i in range(3, popen2.MAXFD):
 
631
                            try:
 
632
                                os.close(i)
 
633
                            except: pass
 
634
                        try:
 
635
                            os.execvp(cmd[0], cmd)
 
636
                        finally:
 
637
                            os._exit(1)
 
638
                        # Shouldn't come here, I guess
 
639
                        os._exit(1)
 
640
                    os.close(p2cread)
 
641
                    self.tochild = os.fdopen(p2cwrite, 'w', bufsize)
 
642
                    os.close(c2pwrite)
 
643
                    self.fromchild = os.fdopen(c2pread, 'r', bufsize)
 
644
                    popen2._active.append(self)
 
645
 
 
646
            popen2.Popen4 = Popen4
 
647
 
 
648
        class Popen3(popen2.Popen3, popen2.Popen4):
 
649
            universal_newlines = 1
 
650
            def __init__(self, command, **kw):
 
651
                if kw.get('stderr') == 'STDOUT':
 
652
                    apply(popen2.Popen4.__init__, (self, command, 1))
 
653
                else:
 
654
                    apply(popen2.Popen3.__init__, (self, command, 1))
 
655
                self.stdin = self.tochild
 
656
                self.stdout = self.fromchild
 
657
                self.stderr = self.childerr
 
658
            def wait(self, *args, **kw):
 
659
                resultcode = apply(popen2.Popen3.wait, (self,)+args, kw)
 
660
                if os.WIFEXITED(resultcode):
 
661
                    return os.WEXITSTATUS(resultcode)
 
662
                elif os.WIFSIGNALED(resultcode):
 
663
                    return os.WTERMSIG(resultcode)
 
664
                else:
 
665
                    return None
 
666
 
 
667
    subprocess.Popen = Popen3
 
668
 
 
669
 
 
670
 
 
671
# From Josiah Carlson,
 
672
# ASPN : Python Cookbook : Module to allow Asynchronous subprocess use on Windows and Posix platforms
 
673
# http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/440554
 
674
 
 
675
PIPE = subprocess.PIPE
 
676
 
 
677
if subprocess.mswindows:
 
678
    from win32file import ReadFile, WriteFile
 
679
    from win32pipe import PeekNamedPipe
 
680
    import msvcrt
 
681
else:
 
682
    import select
 
683
    import fcntl
 
684
 
 
685
    try:                    fcntl.F_GETFL
 
686
    except AttributeError:  fcntl.F_GETFL = 3
 
687
 
 
688
    try:                    fcntl.F_SETFL
 
689
    except AttributeError:  fcntl.F_SETFL = 4
 
690
 
 
691
class Popen(subprocess.Popen):
 
692
    def recv(self, maxsize=None):
 
693
        return self._recv('stdout', maxsize)
 
694
 
 
695
    def recv_err(self, maxsize=None):
 
696
        return self._recv('stderr', maxsize)
 
697
 
 
698
    def send_recv(self, input='', maxsize=None):
 
699
        return self.send(input), self.recv(maxsize), self.recv_err(maxsize)
 
700
 
 
701
    def get_conn_maxsize(self, which, maxsize):
 
702
        if maxsize is None:
 
703
            maxsize = 1024
 
704
        elif maxsize < 1:
 
705
            maxsize = 1
 
706
        return getattr(self, which), maxsize
 
707
 
 
708
    def _close(self, which):
 
709
        getattr(self, which).close()
 
710
        setattr(self, which, None)
 
711
 
 
712
    if subprocess.mswindows:
 
713
        def send(self, input):
 
714
            if not self.stdin:
 
715
                return None
 
716
 
 
717
            try:
 
718
                x = msvcrt.get_osfhandle(self.stdin.fileno())
 
719
                (errCode, written) = WriteFile(x, input)
 
720
            except ValueError:
 
721
                return self._close('stdin')
 
722
            except (subprocess.pywintypes.error, Exception), why:
 
723
                if why[0] in (109, errno.ESHUTDOWN):
 
724
                    return self._close('stdin')
 
725
                raise
 
726
 
 
727
            return written
 
728
 
 
729
        def _recv(self, which, maxsize):
 
730
            conn, maxsize = self.get_conn_maxsize(which, maxsize)
 
731
            if conn is None:
 
732
                return None
 
733
 
 
734
            try:
 
735
                x = msvcrt.get_osfhandle(conn.fileno())
 
736
                (read, nAvail, nMessage) = PeekNamedPipe(x, 0)
 
737
                if maxsize < nAvail:
 
738
                    nAvail = maxsize
 
739
                if nAvail > 0:
 
740
                    (errCode, read) = ReadFile(x, nAvail, None)
 
741
            except ValueError:
 
742
                return self._close(which)
 
743
            except (subprocess.pywintypes.error, Exception), why:
 
744
                if why[0] in (109, errno.ESHUTDOWN):
 
745
                    return self._close(which)
 
746
                raise
 
747
 
 
748
            #if self.universal_newlines:
 
749
            #    read = self._translate_newlines(read)
 
750
            return read
 
751
 
 
752
    else:
 
753
        def send(self, input):
 
754
            if not self.stdin:
 
755
                return None
 
756
 
 
757
            if not select.select([], [self.stdin], [], 0)[1]:
 
758
                return 0
 
759
 
 
760
            try:
 
761
                written = os.write(self.stdin.fileno(), input)
 
762
            except OSError, why:
 
763
                if why[0] == errno.EPIPE: #broken pipe
 
764
                    return self._close('stdin')
 
765
                raise
 
766
 
 
767
            return written
 
768
 
 
769
        def _recv(self, which, maxsize):
 
770
            conn, maxsize = self.get_conn_maxsize(which, maxsize)
 
771
            if conn is None:
 
772
                return None
 
773
 
 
774
            try:
 
775
                flags = fcntl.fcntl(conn, fcntl.F_GETFL)
 
776
            except TypeError:
 
777
                flags = None
 
778
            else:
 
779
                if not conn.closed:
 
780
                    fcntl.fcntl(conn, fcntl.F_SETFL, flags| os.O_NONBLOCK)
 
781
 
 
782
            try:
 
783
                if not select.select([conn], [], [], 0)[0]:
 
784
                    return ''
 
785
 
 
786
                r = conn.read(maxsize)
 
787
                if not r:
 
788
                    return self._close(which)
 
789
 
 
790
                #if self.universal_newlines:
 
791
                #    r = self._translate_newlines(r)
 
792
                return r
 
793
            finally:
 
794
                if not conn.closed and not flags is None:
 
795
                    fcntl.fcntl(conn, fcntl.F_SETFL, flags)
 
796
 
 
797
disconnect_message = "Other end disconnected!"
 
798
 
 
799
def recv_some(p, t=.1, e=1, tr=5, stderr=0):
 
800
    if tr < 1:
 
801
        tr = 1
 
802
    x = time.time()+t
 
803
    y = []
 
804
    r = ''
 
805
    pr = p.recv
 
806
    if stderr:
 
807
        pr = p.recv_err
 
808
    while time.time() < x or r:
 
809
        r = pr()
 
810
        if r is None:
 
811
            if e:
 
812
                raise Exception(disconnect_message)
 
813
            else:
 
814
                break
 
815
        elif r:
 
816
            y.append(r)
 
817
        else:
 
818
            time.sleep(max((x-time.time())/tr, 0))
 
819
    return ''.join(y)
 
820
 
 
821
# TODO(3.0:  rewrite to use memoryview()
 
822
def send_all(p, data):
 
823
    while len(data):
 
824
        sent = p.send(data)
 
825
        if sent is None:
 
826
            raise Exception(disconnect_message)
 
827
        data = buffer(data, sent)
 
828
 
 
829
 
 
830
 
 
831
try:
 
832
    object
 
833
except NameError:
 
834
    class object:
 
835
        pass
 
836
 
 
837
 
 
838
 
 
839
class TestCmd(object):
 
840
    """Class TestCmd
 
841
    """
 
842
 
 
843
    def __init__(self, description = None,
 
844
                       program = None,
 
845
                       interpreter = None,
 
846
                       workdir = None,
 
847
                       subdir = None,
 
848
                       verbose = None,
 
849
                       match = None,
 
850
                       diff = None,
 
851
                       combine = 0,
 
852
                       universal_newlines = 1):
 
853
        self._cwd = os.getcwd()
 
854
        self.description_set(description)
 
855
        self.program_set(program)
 
856
        self.interpreter_set(interpreter)
 
857
        if verbose is None:
 
858
            try:
 
859
                verbose = max( 0, int(os.environ.get('TESTCMD_VERBOSE', 0)) )
 
860
            except ValueError:
 
861
                verbose = 0
 
862
        self.verbose_set(verbose)
 
863
        self.combine = combine
 
864
        self.universal_newlines = universal_newlines
 
865
        if not match is None:
 
866
            self.match_function = match
 
867
        else:
 
868
            self.match_function = match_re
 
869
        if not diff is None:
 
870
            self.diff_function = diff
 
871
        else:
 
872
            try:
 
873
                difflib
 
874
            except NameError:
 
875
                pass
 
876
            else:
 
877
                self.diff_function = simple_diff
 
878
                #self.diff_function = difflib.context_diff
 
879
                #self.diff_function = difflib.unified_diff
 
880
        self._dirlist = []
 
881
        self._preserve = {'pass_test': 0, 'fail_test': 0, 'no_result': 0}
 
882
        if os.environ.has_key('PRESERVE') and not os.environ['PRESERVE'] is '':
 
883
            self._preserve['pass_test'] = os.environ['PRESERVE']
 
884
            self._preserve['fail_test'] = os.environ['PRESERVE']
 
885
            self._preserve['no_result'] = os.environ['PRESERVE']
 
886
        else:
 
887
            try:
 
888
                self._preserve['pass_test'] = os.environ['PRESERVE_PASS']
 
889
            except KeyError:
 
890
                pass
 
891
            try:
 
892
                self._preserve['fail_test'] = os.environ['PRESERVE_FAIL']
 
893
            except KeyError:
 
894
                pass
 
895
            try:
 
896
                self._preserve['no_result'] = os.environ['PRESERVE_NO_RESULT']
 
897
            except KeyError:
 
898
                pass
 
899
        self._stdout = []
 
900
        self._stderr = []
 
901
        self.status = None
 
902
        self.condition = 'no_result'
 
903
        self.workdir_set(workdir)
 
904
        self.subdir(subdir)
 
905
 
 
906
    def __del__(self):
 
907
        self.cleanup()
 
908
 
 
909
    def __repr__(self):
 
910
        return "%x" % id(self)
 
911
 
 
912
    banner_char = '='
 
913
    banner_width = 80
 
914
 
 
915
    def banner(self, s, width=None):
 
916
        if width is None:
 
917
            width = self.banner_width
 
918
        return s + self.banner_char * (width - len(s))
 
919
 
 
920
    if os.name == 'posix':
 
921
 
 
922
        def escape(self, arg):
 
923
            "escape shell special characters"
 
924
            slash = '\\'
 
925
            special = '"$'
 
926
 
 
927
            arg = string.replace(arg, slash, slash+slash)
 
928
            for c in special:
 
929
                arg = string.replace(arg, c, slash+c)
 
930
 
 
931
            if re_space.search(arg):
 
932
                arg = '"' + arg + '"'
 
933
            return arg
 
934
 
 
935
    else:
 
936
 
 
937
        # Windows does not allow special characters in file names
 
938
        # anyway, so no need for an escape function, we will just quote
 
939
        # the arg.
 
940
        def escape(self, arg):
 
941
            if re_space.search(arg):
 
942
                arg = '"' + arg + '"'
 
943
            return arg
 
944
 
 
945
    def canonicalize(self, path):
 
946
        if is_List(path):
 
947
            path = apply(os.path.join, tuple(path))
 
948
        if not os.path.isabs(path):
 
949
            path = os.path.join(self.workdir, path)
 
950
        return path
 
951
 
 
952
    def chmod(self, path, mode):
 
953
        """Changes permissions on the specified file or directory
 
954
        path name."""
 
955
        path = self.canonicalize(path)
 
956
        os.chmod(path, mode)
 
957
 
 
958
    def cleanup(self, condition = None):
 
959
        """Removes any temporary working directories for the specified
 
960
        TestCmd environment.  If the environment variable PRESERVE was
 
961
        set when the TestCmd environment was created, temporary working
 
962
        directories are not removed.  If any of the environment variables
 
963
        PRESERVE_PASS, PRESERVE_FAIL, or PRESERVE_NO_RESULT were set
 
964
        when the TestCmd environment was created, then temporary working
 
965
        directories are not removed if the test passed, failed, or had
 
966
        no result, respectively.  Temporary working directories are also
 
967
        preserved for conditions specified via the preserve method.
 
968
 
 
969
        Typically, this method is not called directly, but is used when
 
970
        the script exits to clean up temporary working directories as
 
971
        appropriate for the exit status.
 
972
        """
 
973
        if not self._dirlist:
 
974
            return
 
975
        os.chdir(self._cwd)
 
976
        self.workdir = None
 
977
        if condition is None:
 
978
            condition = self.condition
 
979
        if self._preserve[condition]:
 
980
            for dir in self._dirlist:
 
981
                print "Preserved directory", dir
 
982
        else:
 
983
            list = self._dirlist[:]
 
984
            list.reverse()
 
985
            for dir in list:
 
986
                self.writable(dir, 1)
 
987
                shutil.rmtree(dir, ignore_errors = 1)
 
988
            self._dirlist = []
 
989
 
 
990
        try:
 
991
            global _Cleanup
 
992
            _Cleanup.remove(self)
 
993
        except (AttributeError, ValueError):
 
994
            pass
 
995
 
 
996
    def command_args(self, program = None,
 
997
                           interpreter = None,
 
998
                           arguments = None):
 
999
        if program:
 
1000
            if type(program) == type('') and not os.path.isabs(program):
 
1001
                program = os.path.join(self._cwd, program)
 
1002
        else:
 
1003
            program = self.program
 
1004
            if not interpreter:
 
1005
                interpreter = self.interpreter
 
1006
        if not type(program) in [type([]), type(())]:
 
1007
            program = [program]
 
1008
        cmd = list(program)
 
1009
        if interpreter:
 
1010
            if not type(interpreter) in [type([]), type(())]:
 
1011
                interpreter = [interpreter]
 
1012
            cmd = list(interpreter) + cmd
 
1013
        if arguments:
 
1014
            if type(arguments) == type(''):
 
1015
                arguments = string.split(arguments)
 
1016
            cmd.extend(arguments)
 
1017
        return cmd
 
1018
 
 
1019
    def description_set(self, description):
 
1020
        """Set the description of the functionality being tested.
 
1021
        """
 
1022
        self.description = description
 
1023
 
 
1024
    try:
 
1025
        difflib
 
1026
    except NameError:
 
1027
        def diff(self, a, b, name, *args, **kw):
 
1028
            print self.banner('Expected %s' % name)
 
1029
            print a
 
1030
            print self.banner('Actual %s' % name)
 
1031
            print b
 
1032
    else:
 
1033
        def diff(self, a, b, name, *args, **kw):
 
1034
            print self.banner(name)
 
1035
            args = (a.splitlines(), b.splitlines()) + args
 
1036
            lines = apply(self.diff_function, args, kw)
 
1037
            for l in lines:
 
1038
                print l
 
1039
 
 
1040
    def fail_test(self, condition = 1, function = None, skip = 0):
 
1041
        """Cause the test to fail.
 
1042
        """
 
1043
        if not condition:
 
1044
            return
 
1045
        self.condition = 'fail_test'
 
1046
        fail_test(self = self,
 
1047
                  condition = condition,
 
1048
                  function = function,
 
1049
                  skip = skip)
 
1050
 
 
1051
    def interpreter_set(self, interpreter):
 
1052
        """Set the program to be used to interpret the program
 
1053
        under test as a script.
 
1054
        """
 
1055
        self.interpreter = interpreter
 
1056
 
 
1057
    def match(self, lines, matches):
 
1058
        """Compare actual and expected file contents.
 
1059
        """
 
1060
        return self.match_function(lines, matches)
 
1061
 
 
1062
    def match_exact(self, lines, matches):
 
1063
        """Compare actual and expected file contents.
 
1064
        """
 
1065
        return match_exact(lines, matches)
 
1066
 
 
1067
    def match_re(self, lines, res):
 
1068
        """Compare actual and expected file contents.
 
1069
        """
 
1070
        return match_re(lines, res)
 
1071
 
 
1072
    def match_re_dotall(self, lines, res):
 
1073
        """Compare actual and expected file contents.
 
1074
        """
 
1075
        return match_re_dotall(lines, res)
 
1076
 
 
1077
    def no_result(self, condition = 1, function = None, skip = 0):
 
1078
        """Report that the test could not be run.
 
1079
        """
 
1080
        if not condition:
 
1081
            return
 
1082
        self.condition = 'no_result'
 
1083
        no_result(self = self,
 
1084
                  condition = condition,
 
1085
                  function = function,
 
1086
                  skip = skip)
 
1087
 
 
1088
    def pass_test(self, condition = 1, function = None):
 
1089
        """Cause the test to pass.
 
1090
        """
 
1091
        if not condition:
 
1092
            return
 
1093
        self.condition = 'pass_test'
 
1094
        pass_test(self = self, condition = condition, function = function)
 
1095
 
 
1096
    def preserve(self, *conditions):
 
1097
        """Arrange for the temporary working directories for the
 
1098
        specified TestCmd environment to be preserved for one or more
 
1099
        conditions.  If no conditions are specified, arranges for
 
1100
        the temporary working directories to be preserved for all
 
1101
        conditions.
 
1102
        """
 
1103
        if conditions is ():
 
1104
            conditions = ('pass_test', 'fail_test', 'no_result')
 
1105
        for cond in conditions:
 
1106
            self._preserve[cond] = 1
 
1107
 
 
1108
    def program_set(self, program):
 
1109
        """Set the executable program or script to be tested.
 
1110
        """
 
1111
        if program and not os.path.isabs(program):
 
1112
            program = os.path.join(self._cwd, program)
 
1113
        self.program = program
 
1114
 
 
1115
    def read(self, file, mode = 'rb'):
 
1116
        """Reads and returns the contents of the specified file name.
 
1117
        The file name may be a list, in which case the elements are
 
1118
        concatenated with the os.path.join() method.  The file is
 
1119
        assumed to be under the temporary working directory unless it
 
1120
        is an absolute path name.  The I/O mode for the file may
 
1121
        be specified; it must begin with an 'r'.  The default is
 
1122
        'rb' (binary read).
 
1123
        """
 
1124
        file = self.canonicalize(file)
 
1125
        if mode[0] != 'r':
 
1126
            raise ValueError, "mode must begin with 'r'"
 
1127
        return open(file, mode).read()
 
1128
 
 
1129
    def rmdir(self, dir):
 
1130
        """Removes the specified dir name.
 
1131
        The dir name may be a list, in which case the elements are
 
1132
        concatenated with the os.path.join() method.  The dir is
 
1133
        assumed to be under the temporary working directory unless it
 
1134
        is an absolute path name.
 
1135
        The dir must be empty.
 
1136
        """
 
1137
        dir = self.canonicalize(dir)
 
1138
        os.rmdir(dir)
 
1139
 
 
1140
    def start(self, program = None,
 
1141
                    interpreter = None,
 
1142
                    arguments = None,
 
1143
                    universal_newlines = None,
 
1144
                    **kw):
 
1145
        """
 
1146
        Starts a program or script for the test environment.
 
1147
 
 
1148
        The specified program will have the original directory
 
1149
        prepended unless it is enclosed in a [list].
 
1150
        """
 
1151
        cmd = self.command_args(program, interpreter, arguments)
 
1152
        cmd_string = string.join(map(self.escape, cmd), ' ')
 
1153
        if self.verbose:
 
1154
            sys.stderr.write(cmd_string + "\n")
 
1155
        if universal_newlines is None:
 
1156
            universal_newlines = self.universal_newlines
 
1157
 
 
1158
        # On Windows, if we make stdin a pipe when we plan to send 
 
1159
        # no input, and the test program exits before
 
1160
        # Popen calls msvcrt.open_osfhandle, that call will fail.
 
1161
        # So don't use a pipe for stdin if we don't need one.
 
1162
        stdin = kw.get('stdin', None)
 
1163
        if stdin is not None:
 
1164
            stdin = subprocess.PIPE
 
1165
 
 
1166
        combine = kw.get('combine', self.combine)
 
1167
        if combine:
 
1168
            stderr_value = subprocess.STDOUT
 
1169
        else:
 
1170
            stderr_value = subprocess.PIPE
 
1171
 
 
1172
        return Popen(cmd,
 
1173
                     stdin=stdin,
 
1174
                     stdout=subprocess.PIPE,
 
1175
                     stderr=stderr_value,
 
1176
                     universal_newlines=universal_newlines)
 
1177
 
 
1178
    def finish(self, popen, **kw):
 
1179
        """
 
1180
        Finishes and waits for the process being run under control of
 
1181
        the specified popen argument, recording the exit status,
 
1182
        standard output and error output.
 
1183
        """
 
1184
        popen.stdin.close()
 
1185
        self.status = popen.wait()
 
1186
        if not self.status:
 
1187
            self.status = 0
 
1188
        self._stdout.append(popen.stdout.read())
 
1189
        if popen.stderr:
 
1190
            stderr = popen.stderr.read()
 
1191
        else:
 
1192
            stderr = ''
 
1193
        self._stderr.append(stderr)
 
1194
 
 
1195
    def run(self, program = None,
 
1196
                  interpreter = None,
 
1197
                  arguments = None,
 
1198
                  chdir = None,
 
1199
                  stdin = None,
 
1200
                  universal_newlines = None):
 
1201
        """Runs a test of the program or script for the test
 
1202
        environment.  Standard output and error output are saved for
 
1203
        future retrieval via the stdout() and stderr() methods.
 
1204
 
 
1205
        The specified program will have the original directory
 
1206
        prepended unless it is enclosed in a [list].
 
1207
        """
 
1208
        if chdir:
 
1209
            oldcwd = os.getcwd()
 
1210
            if not os.path.isabs(chdir):
 
1211
                chdir = os.path.join(self.workpath(chdir))
 
1212
            if self.verbose:
 
1213
                sys.stderr.write("chdir(" + chdir + ")\n")
 
1214
            os.chdir(chdir)
 
1215
        p = self.start(program,
 
1216
                       interpreter,
 
1217
                       arguments,
 
1218
                       universal_newlines,
 
1219
                       stdin=stdin)
 
1220
        if stdin:
 
1221
            if is_List(stdin):
 
1222
                for line in stdin:
 
1223
                    p.stdin.write(line)
 
1224
            else:
 
1225
                p.stdin.write(stdin)
 
1226
            p.stdin.close()
 
1227
 
 
1228
        out = p.stdout.read()
 
1229
        if p.stderr is None:
 
1230
            err = ''
 
1231
        else:
 
1232
            err = p.stderr.read()
 
1233
        try:
 
1234
            close_output = p.close_output
 
1235
        except AttributeError:
 
1236
            p.stdout.close()
 
1237
            if not p.stderr is None:
 
1238
                p.stderr.close()
 
1239
        else:
 
1240
            close_output()
 
1241
 
 
1242
        self._stdout.append(out)
 
1243
        self._stderr.append(err)
 
1244
 
 
1245
        self.status = p.wait()
 
1246
        if not self.status:
 
1247
            self.status = 0
 
1248
 
 
1249
        if chdir:
 
1250
            os.chdir(oldcwd)
 
1251
        if self.verbose >= 2:
 
1252
            write = sys.stdout.write
 
1253
            write('============ STATUS: %d\n' % self.status)
 
1254
            out = self.stdout()
 
1255
            if out or self.verbose >= 3:
 
1256
                write('============ BEGIN STDOUT (len=%d):\n' % len(out))
 
1257
                write(out)
 
1258
                write('============ END STDOUT\n')
 
1259
            err = self.stderr()
 
1260
            if err or self.verbose >= 3:
 
1261
                write('============ BEGIN STDERR (len=%d)\n' % len(err))
 
1262
                write(err)
 
1263
                write('============ END STDERR\n')
 
1264
 
 
1265
    def sleep(self, seconds = default_sleep_seconds):
 
1266
        """Sleeps at least the specified number of seconds.  If no
 
1267
        number is specified, sleeps at least the minimum number of
 
1268
        seconds necessary to advance file time stamps on the current
 
1269
        system.  Sleeping more seconds is all right.
 
1270
        """
 
1271
        time.sleep(seconds)
 
1272
 
 
1273
    def stderr(self, run = None):
 
1274
        """Returns the error output from the specified run number.
 
1275
        If there is no specified run number, then returns the error
 
1276
        output of the last run.  If the run number is less than zero,
 
1277
        then returns the error output from that many runs back from the
 
1278
        current run.
 
1279
        """
 
1280
        if not run:
 
1281
            run = len(self._stderr)
 
1282
        elif run < 0:
 
1283
            run = len(self._stderr) + run
 
1284
        run = run - 1
 
1285
        return self._stderr[run]
 
1286
 
 
1287
    def stdout(self, run = None):
 
1288
        """Returns the standard output from the specified run number.
 
1289
        If there is no specified run number, then returns the standard
 
1290
        output of the last run.  If the run number is less than zero,
 
1291
        then returns the standard output from that many runs back from
 
1292
        the current run.
 
1293
        """
 
1294
        if not run:
 
1295
            run = len(self._stdout)
 
1296
        elif run < 0:
 
1297
            run = len(self._stdout) + run
 
1298
        run = run - 1
 
1299
        return self._stdout[run]
 
1300
 
 
1301
    def subdir(self, *subdirs):
 
1302
        """Create new subdirectories under the temporary working
 
1303
        directory, one for each argument.  An argument may be a list,
 
1304
        in which case the list elements are concatenated using the
 
1305
        os.path.join() method.  Subdirectories multiple levels deep
 
1306
        must be created using a separate argument for each level:
 
1307
 
 
1308
                test.subdir('sub', ['sub', 'dir'], ['sub', 'dir', 'ectory'])
 
1309
 
 
1310
        Returns the number of subdirectories actually created.
 
1311
        """
 
1312
        count = 0
 
1313
        for sub in subdirs:
 
1314
            if sub is None:
 
1315
                continue
 
1316
            if is_List(sub):
 
1317
                sub = apply(os.path.join, tuple(sub))
 
1318
            new = os.path.join(self.workdir, sub)
 
1319
            try:
 
1320
                os.mkdir(new)
 
1321
            except OSError:
 
1322
                pass
 
1323
            else:
 
1324
                count = count + 1
 
1325
        return count
 
1326
 
 
1327
    def symlink(self, target, link):
 
1328
        """Creates a symlink to the specified target.
 
1329
        The link name may be a list, in which case the elements are
 
1330
        concatenated with the os.path.join() method.  The link is
 
1331
        assumed to be under the temporary working directory unless it
 
1332
        is an absolute path name. The target is *not* assumed to be
 
1333
        under the temporary working directory.
 
1334
        """
 
1335
        link = self.canonicalize(link)
 
1336
        os.symlink(target, link)
 
1337
 
 
1338
    def tempdir(self, path=None):
 
1339
        """Creates a temporary directory.
 
1340
        A unique directory name is generated if no path name is specified.
 
1341
        The directory is created, and will be removed when the TestCmd
 
1342
        object is destroyed.
 
1343
        """
 
1344
        if path is None:
 
1345
            try:
 
1346
                path = tempfile.mktemp(prefix=tempfile.template)
 
1347
            except TypeError:
 
1348
                path = tempfile.mktemp()
 
1349
        os.mkdir(path)
 
1350
 
 
1351
        # Symlinks in the path will report things
 
1352
        # differently from os.getcwd(), so chdir there
 
1353
        # and back to fetch the canonical path.
 
1354
        cwd = os.getcwd()
 
1355
        try:
 
1356
            os.chdir(path)
 
1357
            path = os.getcwd()
 
1358
        finally:
 
1359
            os.chdir(cwd)
 
1360
 
 
1361
        # Uppercase the drive letter since the case of drive
 
1362
        # letters is pretty much random on win32:
 
1363
        drive,rest = os.path.splitdrive(path)
 
1364
        if drive:
 
1365
            path = string.upper(drive) + rest
 
1366
 
 
1367
        #
 
1368
        self._dirlist.append(path)
 
1369
        global _Cleanup
 
1370
        try:
 
1371
            _Cleanup.index(self)
 
1372
        except ValueError:
 
1373
            _Cleanup.append(self)
 
1374
 
 
1375
        return path
 
1376
 
 
1377
    def touch(self, path, mtime=None):
 
1378
        """Updates the modification time on the specified file or
 
1379
        directory path name.  The default is to update to the
 
1380
        current time if no explicit modification time is specified.
 
1381
        """
 
1382
        path = self.canonicalize(path)
 
1383
        atime = os.path.getatime(path)
 
1384
        if mtime is None:
 
1385
            mtime = time.time()
 
1386
        os.utime(path, (atime, mtime))
 
1387
 
 
1388
    def unlink(self, file):
 
1389
        """Unlinks the specified file name.
 
1390
        The file name may be a list, in which case the elements are
 
1391
        concatenated with the os.path.join() method.  The file is
 
1392
        assumed to be under the temporary working directory unless it
 
1393
        is an absolute path name.
 
1394
        """
 
1395
        file = self.canonicalize(file)
 
1396
        os.unlink(file)
 
1397
 
 
1398
    def verbose_set(self, verbose):
 
1399
        """Set the verbose level.
 
1400
        """
 
1401
        self.verbose = verbose
 
1402
 
 
1403
    def where_is(self, file, path=None, pathext=None):
 
1404
        """Find an executable file.
 
1405
        """
 
1406
        if is_List(file):
 
1407
            file = apply(os.path.join, tuple(file))
 
1408
        if not os.path.isabs(file):
 
1409
            file = where_is(file, path, pathext)
 
1410
        return file
 
1411
 
 
1412
    def workdir_set(self, path):
 
1413
        """Creates a temporary working directory with the specified
 
1414
        path name.  If the path is a null string (''), a unique
 
1415
        directory name is created.
 
1416
        """
 
1417
        if (path != None):
 
1418
            if path == '':
 
1419
                path = None
 
1420
            path = self.tempdir(path)
 
1421
        self.workdir = path
 
1422
 
 
1423
    def workpath(self, *args):
 
1424
        """Returns the absolute path name to a subdirectory or file
 
1425
        within the current temporary working directory.  Concatenates
 
1426
        the temporary working directory name with the specified
 
1427
        arguments using the os.path.join() method.
 
1428
        """
 
1429
        return apply(os.path.join, (self.workdir,) + tuple(args))
 
1430
 
 
1431
    def readable(self, top, read=1):
 
1432
        """Make the specified directory tree readable (read == 1)
 
1433
        or not (read == None).
 
1434
 
 
1435
        This method has no effect on Windows systems, which use a
 
1436
        completely different mechanism to control file readability.
 
1437
        """
 
1438
 
 
1439
        if sys.platform == 'win32':
 
1440
            return
 
1441
 
 
1442
        if read:
 
1443
            def do_chmod(fname):
 
1444
                try: st = os.stat(fname)
 
1445
                except OSError: pass
 
1446
                else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|stat.S_IREAD))
 
1447
        else:
 
1448
            def do_chmod(fname):
 
1449
                try: st = os.stat(fname)
 
1450
                except OSError: pass
 
1451
                else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~stat.S_IREAD))
 
1452
 
 
1453
        if os.path.isfile(top):
 
1454
            # If it's a file, that's easy, just chmod it.
 
1455
            do_chmod(top)
 
1456
        elif read:
 
1457
            # It's a directory and we're trying to turn on read
 
1458
            # permission, so it's also pretty easy, just chmod the
 
1459
            # directory and then chmod every entry on our walk down the
 
1460
            # tree.  Because os.path.walk() is top-down, we'll enable
 
1461
            # read permission on any directories that have it disabled
 
1462
            # before os.path.walk() tries to list their contents.
 
1463
            do_chmod(top)
 
1464
 
 
1465
            def chmod_entries(arg, dirname, names, do_chmod=do_chmod):
 
1466
                for n in names:
 
1467
                    do_chmod(os.path.join(dirname, n))
 
1468
 
 
1469
            os.path.walk(top, chmod_entries, None)
 
1470
        else:
 
1471
            # It's a directory and we're trying to turn off read
 
1472
            # permission, which means we have to chmod the directoreis
 
1473
            # in the tree bottom-up, lest disabling read permission from
 
1474
            # the top down get in the way of being able to get at lower
 
1475
            # parts of the tree.  But os.path.walk() visits things top
 
1476
            # down, so we just use an object to collect a list of all
 
1477
            # of the entries in the tree, reverse the list, and then
 
1478
            # chmod the reversed (bottom-up) list.
 
1479
            col = Collector(top)
 
1480
            os.path.walk(top, col, None)
 
1481
            col.entries.reverse()
 
1482
            for d in col.entries: do_chmod(d)
 
1483
 
 
1484
    def writable(self, top, write=1):
 
1485
        """Make the specified directory tree writable (write == 1)
 
1486
        or not (write == None).
 
1487
        """
 
1488
 
 
1489
        if sys.platform == 'win32':
 
1490
 
 
1491
            if write:
 
1492
                def do_chmod(fname):
 
1493
                    try: os.chmod(fname, stat.S_IWRITE)
 
1494
                    except OSError: pass
 
1495
            else:
 
1496
                def do_chmod(fname):
 
1497
                    try: os.chmod(fname, stat.S_IREAD)
 
1498
                    except OSError: pass
 
1499
 
 
1500
        else:
 
1501
 
 
1502
            if write:
 
1503
                def do_chmod(fname):
 
1504
                    try: st = os.stat(fname)
 
1505
                    except OSError: pass
 
1506
                    else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|0200))
 
1507
            else:
 
1508
                def do_chmod(fname):
 
1509
                    try: st = os.stat(fname)
 
1510
                    except OSError: pass
 
1511
                    else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~0200))
 
1512
 
 
1513
        if os.path.isfile(top):
 
1514
            do_chmod(top)
 
1515
        else:
 
1516
            col = Collector(top)
 
1517
            os.path.walk(top, col, None)
 
1518
            for d in col.entries: do_chmod(d)
 
1519
 
 
1520
    def executable(self, top, execute=1):
 
1521
        """Make the specified directory tree executable (execute == 1)
 
1522
        or not (execute == None).
 
1523
 
 
1524
        This method has no effect on Windows systems, which use a
 
1525
        completely different mechanism to control file executability.
 
1526
        """
 
1527
 
 
1528
        if sys.platform == 'win32':
 
1529
            return
 
1530
 
 
1531
        if execute:
 
1532
            def do_chmod(fname):
 
1533
                try: st = os.stat(fname)
 
1534
                except OSError: pass
 
1535
                else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|stat.S_IEXEC))
 
1536
        else:
 
1537
            def do_chmod(fname):
 
1538
                try: st = os.stat(fname)
 
1539
                except OSError: pass
 
1540
                else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~stat.S_IEXEC))
 
1541
 
 
1542
        if os.path.isfile(top):
 
1543
            # If it's a file, that's easy, just chmod it.
 
1544
            do_chmod(top)
 
1545
        elif execute:
 
1546
            # It's a directory and we're trying to turn on execute
 
1547
            # permission, so it's also pretty easy, just chmod the
 
1548
            # directory and then chmod every entry on our walk down the
 
1549
            # tree.  Because os.path.walk() is top-down, we'll enable
 
1550
            # execute permission on any directories that have it disabled
 
1551
            # before os.path.walk() tries to list their contents.
 
1552
            do_chmod(top)
 
1553
 
 
1554
            def chmod_entries(arg, dirname, names, do_chmod=do_chmod):
 
1555
                for n in names:
 
1556
                    do_chmod(os.path.join(dirname, n))
 
1557
 
 
1558
            os.path.walk(top, chmod_entries, None)
 
1559
        else:
 
1560
            # It's a directory and we're trying to turn off execute
 
1561
            # permission, which means we have to chmod the directories
 
1562
            # in the tree bottom-up, lest disabling execute permission from
 
1563
            # the top down get in the way of being able to get at lower
 
1564
            # parts of the tree.  But os.path.walk() visits things top
 
1565
            # down, so we just use an object to collect a list of all
 
1566
            # of the entries in the tree, reverse the list, and then
 
1567
            # chmod the reversed (bottom-up) list.
 
1568
            col = Collector(top)
 
1569
            os.path.walk(top, col, None)
 
1570
            col.entries.reverse()
 
1571
            for d in col.entries: do_chmod(d)
 
1572
 
 
1573
    def write(self, file, content, mode = 'wb'):
 
1574
        """Writes the specified content text (second argument) to the
 
1575
        specified file name (first argument).  The file name may be
 
1576
        a list, in which case the elements are concatenated with the
 
1577
        os.path.join() method.  The file is created under the temporary
 
1578
        working directory.  Any subdirectories in the path must already
 
1579
        exist.  The I/O mode for the file may be specified; it must
 
1580
        begin with a 'w'.  The default is 'wb' (binary write).
 
1581
        """
 
1582
        file = self.canonicalize(file)
 
1583
        if mode[0] != 'w':
 
1584
            raise ValueError, "mode must begin with 'w'"
 
1585
        open(file, mode).write(content)
 
1586
 
 
1587
# Local Variables:
 
1588
# tab-width:4
 
1589
# indent-tabs-mode:nil
 
1590
# End:
 
1591
# vim: set expandtab tabstop=4 shiftwidth=4: