~forest-bond/sclapp/dev

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
# Copyright (c) 2005-2007 Forest Bond.
# This file is part of the sclapp software package.
# 
# sclapp is free software; you can redistribute it and/or modify it under the
# terms of the GNU General Public License version 2 as published by the Free
# Software Foundation.
# 
# A copy of the license has been included in the COPYING file.

import os, sys, signal, errno
from unittest import main

from common import (
  logSignals,
  execHead,
  waitForPid,
  assertLogFileContainsExactly,
  assertSignalCaught,
  assertLogFileContains,
  SclappTestCase,
)
from manager import manager

import sclapp

def _main(argv):
    import time
    sclapp.debug()
    try:
        for i in range(30):
            try:
                print 'foo'
                sys.stdout.flush()
            except sclapp.CriticalError:
                pass
            except IOError, e:
                if e.errno != errno.EPIPE:
                    raise
                break

            try:
                print >>sys.stderr, 'bar'
                sys.stderr.flush()
            except sclapp.CriticalError:
                pass
            except IOError, e:
                if e.errno != errno.EPIPE:
                    raise
                break

            time.sleep(0.2)
    finally:
        time.sleep(2)
        logSignals()
    return 0

# Use announce_signals = False to prevent signal announcement (on stderr) from
# disturbing our stdio testing.

main_with_signal_handling_with_output_protection = sclapp.mainWrapper(
  _main,
  exit_signals = [signal.SIGPIPE],
  protect_output = True,
  announce_signals = False,
  bug_message = None,
)
main_with_signal_handling_without_output_protection = sclapp.mainWrapper(
  _main,
  exit_signals = [signal.SIGPIPE],
  protect_output = False,
  announce_signals = False,
  bug_message = None,
)
main_without_signal_handling_with_output_protection = sclapp.mainWrapper(
  _main,
  handle_signals = False,
  protect_output = True,
  bug_message = None,
)
main_without_signal_handling_without_output_protection = sclapp.mainWrapper(
  _main,
  handle_signals = False,
  protect_output = False,
  bug_message = None,
)

def _stdio_fails(main_fn, pipe_out = True, pipe_err = False):
    from sclapp.debug_logging import DEBUG_LOGFILE
    from sclapp import pipes as s_p

    fns = [ main_fn, execHead ]
    argses = [ None, [ 2 ] ]
    kwargses = [ None, None ]
    pid = s_p.pipeFns(
      fns, argses, kwargses,
      pipe_out = pipe_out, pipe_err = pipe_err,
      stdin = '/dev/null', stdout = DEBUG_LOGFILE, stderr = DEBUG_LOGFILE
    )
    waitForPid(pid)
    return pid

def _stdout_fails(main_fn):
    return _stdio_fails(main_fn)

def _stdout_stderr_fail(main_fn):
    return _stdio_fails(main_fn, pipe_err = True)

def _make_test_fn(name, failure_fn, main_fn, foos, bars, signals,
  exceptions = ()):
    def test_fn():
        pid = failure_fn(main_fn)
        if foos is not None:
            assertLogFileContainsExactly('foo', foos)
        if bars is not None:
            assertLogFileContainsExactly('bar', bars)
        for signal in signals:
            assertSignalCaught(signal, pid)
        for exception in exceptions:
            assertLogFileContains(exception.__name__)
    test_fn.__name__ = name
    return test_fn

class TestOutputProtection(SclappTestCase):
    test_stdout_fails_with_signal_handling_with_output_protection = (
      staticmethod(_make_test_fn(
        'test_stdout_fails_with_signal_handling_with_output_protection',
        _stdout_fails,
        main_with_signal_handling_with_output_protection,
        foos = 2, bars = 30, signals = (signal.SIGPIPE,)
    )))
    test_stdout_stderr_fail_with_signal_handling_with_output_protection = (
      staticmethod(_make_test_fn(
        'test_stdout_stderr_fail_with_signal_handling_with_output_protection',
        _stdout_stderr_fail,
        main_with_signal_handling_with_output_protection,
        foos = 1, bars = 1, signals = (signal.SIGPIPE,)
    )))
    test_stdout_fails_with_signal_handling_without_output_protection = (
      staticmethod(_make_test_fn(
        'test_stdout_fails_with_signal_handling_without_output_protection',
        _stdout_fails,
        main_with_signal_handling_without_output_protection,
        foos = 0, bars = 0, signals = (), exceptions = (AssertionError,)
    )))
    # Don't expect to hear about the AssertionError since stderr is borked.
    test_stdout_stderr_fail_with_signal_handling_without_output_protection = (
      staticmethod(_make_test_fn(
        'test_stdout_stderr_fail_with_signal_handling_without_output_protection',
        _stdout_stderr_fail,
        main_with_signal_handling_without_output_protection,
        foos = 0, bars = 0, signals = ()
    )))

    # Note that, depending upon buffering of stdio, we may or may not get an
    # IOError when writing into a broken pipe with signals disabled.

    test_stdout_fails_without_signal_handling_with_output_protection = (
      staticmethod(_make_test_fn(
        'test_stdout_fails_without_signal_handling_with_output_protection',
        _stdout_fails,
        main_without_signal_handling_with_output_protection,
        2, 30, ()
    )))
    test_stdout_stderr_fail_without_signal_handling_with_output_protection = (
      staticmethod(_make_test_fn(
        'test_stdout_stderr_fail_without_signal_handling_with_output_protection',
        _stdout_stderr_fail,
        main_without_signal_handling_with_output_protection,
        1, 1, ()
    )))
    test_stdout_fails_without_signal_handling_without_output_protection = (
      staticmethod(_make_test_fn(
        'test_stdout_fails_without_signal_handling_without_output_protection',
        _stdout_fails,
        main_without_signal_handling_without_output_protection,
        2, None, ()
    )))
    test_stdout_stderr_fail_without_signal_handling_without_output_protection = (
      staticmethod(_make_test_fn(
        'test_stdout_stderr_fail_without_signal_handling_without_output_protection',
        _stdout_stderr_fail,
        main_without_signal_handling_without_output_protection,
        1, 1, ()
    )))

manager.add_test_case_class(TestOutputProtection)