~justin-fathomdb/nova/justinsb-openstack-api-volumes

« back to all changes in this revision

Viewing changes to vendor/Twisted-10.0.0/twisted/test/test_stdio.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (c) 2006-2007 Twisted Matrix Laboratories.
 
2
# See LICENSE for details.
 
3
 
 
4
import os, sys
 
5
 
 
6
from twisted.trial import unittest
 
7
from twisted.python import filepath, log
 
8
from twisted.python.runtime import platform
 
9
from twisted.internet import error, defer, protocol, reactor
 
10
 
 
11
 
 
12
# A short string which is intended to appear here and nowhere else,
 
13
# particularly not in any random garbage output CPython unavoidable
 
14
# generates (such as in warning text and so forth).  This is searched
 
15
# for in the output from stdio_test_lastwrite.py and if it is found at
 
16
# the end, the functionality works.
 
17
UNIQUE_LAST_WRITE_STRING = 'xyz123abc Twisted is great!'
 
18
 
 
19
skipWindowsNopywin32 = None
 
20
if platform.isWindows():
 
21
    try:
 
22
        import win32process
 
23
    except ImportError:
 
24
        skipWindowsNopywin32 = ("On windows, spawnProcess is not available "
 
25
                                "in the absence of win32process.")
 
26
 
 
27
 
 
28
class StandardIOTestProcessProtocol(protocol.ProcessProtocol):
 
29
    """
 
30
    Test helper for collecting output from a child process and notifying
 
31
    something when it exits.
 
32
 
 
33
    @ivar onConnection: A L{defer.Deferred} which will be called back with
 
34
    C{None} when the connection to the child process is established.
 
35
 
 
36
    @ivar onCompletion: A L{defer.Deferred} which will be errbacked with the
 
37
    failure associated with the child process exiting when it exits.
 
38
 
 
39
    @ivar onDataReceived: A L{defer.Deferred} which will be called back with
 
40
    this instance whenever C{childDataReceived} is called, or C{None} to
 
41
    suppress these callbacks.
 
42
 
 
43
    @ivar data: A C{dict} mapping file descriptors to strings containing all
 
44
    bytes received from the child process on each file descriptor.
 
45
    """
 
46
    onDataReceived = None
 
47
 
 
48
    def __init__(self):
 
49
        self.onConnection = defer.Deferred()
 
50
        self.onCompletion = defer.Deferred()
 
51
        self.data = {}
 
52
 
 
53
 
 
54
    def connectionMade(self):
 
55
        self.onConnection.callback(None)
 
56
 
 
57
 
 
58
    def childDataReceived(self, name, bytes):
 
59
        """
 
60
        Record all bytes received from the child process in the C{data}
 
61
        dictionary.  Fire C{onDataReceived} if it is not C{None}.
 
62
        """
 
63
        self.data[name] = self.data.get(name, '') + bytes
 
64
        if self.onDataReceived is not None:
 
65
            d, self.onDataReceived = self.onDataReceived, None
 
66
            d.callback(self)
 
67
 
 
68
 
 
69
    def processEnded(self, reason):
 
70
        self.onCompletion.callback(reason)
 
71
 
 
72
 
 
73
 
 
74
class StandardInputOutputTestCase(unittest.TestCase):
 
75
    def _spawnProcess(self, proto, sibling, *args, **kw):
 
76
        """
 
77
        Launch a child Python process and communicate with it using the
 
78
        given ProcessProtocol.
 
79
 
 
80
        @param proto: A L{ProcessProtocol} instance which will be connected
 
81
        to the child process.
 
82
 
 
83
        @param sibling: The basename of a file containing the Python program
 
84
        to run in the child process.
 
85
 
 
86
        @param *args: strings which will be passed to the child process on
 
87
        the command line as C{argv[2:]}.
 
88
 
 
89
        @param **kw: additional arguments to pass to L{reactor.spawnProcess}.
 
90
 
 
91
        @return: The L{IProcessTransport} provider for the spawned process.
 
92
        """
 
93
        import twisted
 
94
        subenv = dict(os.environ)
 
95
        subenv['PYTHONPATH'] = os.pathsep.join(
 
96
            [os.path.abspath(
 
97
                    os.path.dirname(os.path.dirname(twisted.__file__))),
 
98
             subenv.get('PYTHONPATH', '')
 
99
             ])
 
100
        args = [sys.executable,
 
101
             filepath.FilePath(__file__).sibling(sibling).path,
 
102
             reactor.__class__.__module__] + list(args)
 
103
        return reactor.spawnProcess(
 
104
            proto,
 
105
            sys.executable,
 
106
            args,
 
107
            env=subenv,
 
108
            **kw)
 
109
 
 
110
 
 
111
    def _requireFailure(self, d, callback):
 
112
        def cb(result):
 
113
            self.fail("Process terminated with non-Failure: %r" % (result,))
 
114
        def eb(err):
 
115
            return callback(err)
 
116
        return d.addCallbacks(cb, eb)
 
117
 
 
118
 
 
119
    def test_loseConnection(self):
 
120
        """
 
121
        Verify that a protocol connected to L{StandardIO} can disconnect
 
122
        itself using C{transport.loseConnection}.
 
123
        """
 
124
        errorLogFile = self.mktemp()
 
125
        log.msg("Child process logging to " + errorLogFile)
 
126
        p = StandardIOTestProcessProtocol()
 
127
        d = p.onCompletion
 
128
        self._spawnProcess(p, 'stdio_test_loseconn.py', errorLogFile)
 
129
 
 
130
        def processEnded(reason):
 
131
            # Copy the child's log to ours so it's more visible.
 
132
            for line in file(errorLogFile):
 
133
                log.msg("Child logged: " + line.rstrip())
 
134
 
 
135
            self.failIfIn(1, p.data)
 
136
            reason.trap(error.ProcessDone)
 
137
        return self._requireFailure(d, processEnded)
 
138
    test_loseConnection.skip = skipWindowsNopywin32
 
139
 
 
140
 
 
141
    def test_lastWriteReceived(self):
 
142
        """
 
143
        Verify that a write made directly to stdout using L{os.write}
 
144
        after StandardIO has finished is reliably received by the
 
145
        process reading that stdout.
 
146
        """
 
147
        p = StandardIOTestProcessProtocol()
 
148
 
 
149
        # Note: the OS X bug which prompted the addition of this test
 
150
        # is an apparent race condition involving non-blocking PTYs.
 
151
        # Delaying the parent process significantly increases the
 
152
        # likelihood of the race going the wrong way.  If you need to
 
153
        # fiddle with this code at all, uncommenting the next line
 
154
        # will likely make your life much easier.  It is commented out
 
155
        # because it makes the test quite slow.
 
156
 
 
157
        # p.onConnection.addCallback(lambda ign: __import__('time').sleep(5))
 
158
 
 
159
        try:
 
160
            self._spawnProcess(
 
161
                p, 'stdio_test_lastwrite.py', UNIQUE_LAST_WRITE_STRING,
 
162
                usePTY=True)
 
163
        except ValueError, e:
 
164
            # Some platforms don't work with usePTY=True
 
165
            raise unittest.SkipTest(str(e))
 
166
 
 
167
        def processEnded(reason):
 
168
            """
 
169
            Asserts that the parent received the bytes written by the child
 
170
            immediately after the child starts.
 
171
            """
 
172
            self.assertTrue(
 
173
                p.data[1].endswith(UNIQUE_LAST_WRITE_STRING),
 
174
                "Received %r from child, did not find expected bytes." % (
 
175
                    p.data,))
 
176
            reason.trap(error.ProcessDone)
 
177
        return self._requireFailure(p.onCompletion, processEnded)
 
178
    test_lastWriteReceived.skip = skipWindowsNopywin32
 
179
 
 
180
 
 
181
    def test_hostAndPeer(self):
 
182
        """
 
183
        Verify that the transport of a protocol connected to L{StandardIO}
 
184
        has C{getHost} and C{getPeer} methods.
 
185
        """
 
186
        p = StandardIOTestProcessProtocol()
 
187
        d = p.onCompletion
 
188
        self._spawnProcess(p, 'stdio_test_hostpeer.py')
 
189
 
 
190
        def processEnded(reason):
 
191
            host, peer = p.data[1].splitlines()
 
192
            self.failUnless(host)
 
193
            self.failUnless(peer)
 
194
            reason.trap(error.ProcessDone)
 
195
        return self._requireFailure(d, processEnded)
 
196
    test_hostAndPeer.skip = skipWindowsNopywin32
 
197
 
 
198
 
 
199
    def test_write(self):
 
200
        """
 
201
        Verify that the C{write} method of the transport of a protocol
 
202
        connected to L{StandardIO} sends bytes to standard out.
 
203
        """
 
204
        p = StandardIOTestProcessProtocol()
 
205
        d = p.onCompletion
 
206
 
 
207
        self._spawnProcess(p, 'stdio_test_write.py')
 
208
 
 
209
        def processEnded(reason):
 
210
            self.assertEquals(p.data[1], 'ok!')
 
211
            reason.trap(error.ProcessDone)
 
212
        return self._requireFailure(d, processEnded)
 
213
    test_write.skip = skipWindowsNopywin32
 
214
 
 
215
 
 
216
    def test_writeSequence(self):
 
217
        """
 
218
        Verify that the C{writeSequence} method of the transport of a
 
219
        protocol connected to L{StandardIO} sends bytes to standard out.
 
220
        """
 
221
        p = StandardIOTestProcessProtocol()
 
222
        d = p.onCompletion
 
223
 
 
224
        self._spawnProcess(p, 'stdio_test_writeseq.py')
 
225
 
 
226
        def processEnded(reason):
 
227
            self.assertEquals(p.data[1], 'ok!')
 
228
            reason.trap(error.ProcessDone)
 
229
        return self._requireFailure(d, processEnded)
 
230
    test_writeSequence.skip = skipWindowsNopywin32
 
231
 
 
232
 
 
233
    def _junkPath(self):
 
234
        junkPath = self.mktemp()
 
235
        junkFile = file(junkPath, 'w')
 
236
        for i in xrange(1024):
 
237
            junkFile.write(str(i) + '\n')
 
238
        junkFile.close()
 
239
        return junkPath
 
240
 
 
241
 
 
242
    def test_producer(self):
 
243
        """
 
244
        Verify that the transport of a protocol connected to L{StandardIO}
 
245
        is a working L{IProducer} provider.
 
246
        """
 
247
        p = StandardIOTestProcessProtocol()
 
248
        d = p.onCompletion
 
249
 
 
250
        written = []
 
251
        toWrite = range(100)
 
252
 
 
253
        def connectionMade(ign):
 
254
            if toWrite:
 
255
                written.append(str(toWrite.pop()) + "\n")
 
256
                proc.write(written[-1])
 
257
                reactor.callLater(0.01, connectionMade, None)
 
258
 
 
259
        proc = self._spawnProcess(p, 'stdio_test_producer.py')
 
260
 
 
261
        p.onConnection.addCallback(connectionMade)
 
262
 
 
263
        def processEnded(reason):
 
264
            self.assertEquals(p.data[1], ''.join(written))
 
265
            self.failIf(toWrite, "Connection lost with %d writes left to go." % (len(toWrite),))
 
266
            reason.trap(error.ProcessDone)
 
267
        return self._requireFailure(d, processEnded)
 
268
    test_producer.skip = skipWindowsNopywin32
 
269
 
 
270
 
 
271
    def test_consumer(self):
 
272
        """
 
273
        Verify that the transport of a protocol connected to L{StandardIO}
 
274
        is a working L{IConsumer} provider.
 
275
        """
 
276
        p = StandardIOTestProcessProtocol()
 
277
        d = p.onCompletion
 
278
 
 
279
        junkPath = self._junkPath()
 
280
 
 
281
        self._spawnProcess(p, 'stdio_test_consumer.py', junkPath)
 
282
 
 
283
        def processEnded(reason):
 
284
            self.assertEquals(p.data[1], file(junkPath).read())
 
285
            reason.trap(error.ProcessDone)
 
286
        return self._requireFailure(d, processEnded)
 
287
    test_consumer.skip = skipWindowsNopywin32