1
# Copyright (c) 2006-2007 Twisted Matrix Laboratories.
2
# See LICENSE for details.
6
from twisted.trial import unittest
7
from twisted.python import filepath, log
8
from twisted.python.runtime import platform
9
from twisted.internet import error, defer, protocol, reactor
12
# A short string which is intended to appear here and nowhere else,
13
# particularly not in any random garbage output CPython unavoidable
14
# generates (such as in warning text and so forth). This is searched
15
# for in the output from stdio_test_lastwrite.py and if it is found at
16
# the end, the functionality works.
17
UNIQUE_LAST_WRITE_STRING = 'xyz123abc Twisted is great!'
19
skipWindowsNopywin32 = None
20
if platform.isWindows():
24
skipWindowsNopywin32 = ("On windows, spawnProcess is not available "
25
"in the absence of win32process.")
28
class StandardIOTestProcessProtocol(protocol.ProcessProtocol):
30
Test helper for collecting output from a child process and notifying
31
something when it exits.
33
@ivar onConnection: A L{defer.Deferred} which will be called back with
34
C{None} when the connection to the child process is established.
36
@ivar onCompletion: A L{defer.Deferred} which will be errbacked with the
37
failure associated with the child process exiting when it exits.
39
@ivar onDataReceived: A L{defer.Deferred} which will be called back with
40
this instance whenever C{childDataReceived} is called, or C{None} to
41
suppress these callbacks.
43
@ivar data: A C{dict} mapping file descriptors to strings containing all
44
bytes received from the child process on each file descriptor.
49
self.onConnection = defer.Deferred()
50
self.onCompletion = defer.Deferred()
54
def connectionMade(self):
55
self.onConnection.callback(None)
58
def childDataReceived(self, name, bytes):
60
Record all bytes received from the child process in the C{data}
61
dictionary. Fire C{onDataReceived} if it is not C{None}.
63
self.data[name] = self.data.get(name, '') + bytes
64
if self.onDataReceived is not None:
65
d, self.onDataReceived = self.onDataReceived, None
69
def processEnded(self, reason):
70
self.onCompletion.callback(reason)
74
class StandardInputOutputTestCase(unittest.TestCase):
75
def _spawnProcess(self, proto, sibling, *args, **kw):
77
Launch a child Python process and communicate with it using the
78
given ProcessProtocol.
80
@param proto: A L{ProcessProtocol} instance which will be connected
83
@param sibling: The basename of a file containing the Python program
84
to run in the child process.
86
@param *args: strings which will be passed to the child process on
87
the command line as C{argv[2:]}.
89
@param **kw: additional arguments to pass to L{reactor.spawnProcess}.
91
@return: The L{IProcessTransport} provider for the spawned process.
94
subenv = dict(os.environ)
95
subenv['PYTHONPATH'] = os.pathsep.join(
97
os.path.dirname(os.path.dirname(twisted.__file__))),
98
subenv.get('PYTHONPATH', '')
100
args = [sys.executable,
101
filepath.FilePath(__file__).sibling(sibling).path,
102
reactor.__class__.__module__] + list(args)
103
return reactor.spawnProcess(
111
def _requireFailure(self, d, callback):
113
self.fail("Process terminated with non-Failure: %r" % (result,))
116
return d.addCallbacks(cb, eb)
119
def test_loseConnection(self):
121
Verify that a protocol connected to L{StandardIO} can disconnect
122
itself using C{transport.loseConnection}.
124
errorLogFile = self.mktemp()
125
log.msg("Child process logging to " + errorLogFile)
126
p = StandardIOTestProcessProtocol()
128
self._spawnProcess(p, 'stdio_test_loseconn.py', errorLogFile)
130
def processEnded(reason):
131
# Copy the child's log to ours so it's more visible.
132
for line in file(errorLogFile):
133
log.msg("Child logged: " + line.rstrip())
135
self.failIfIn(1, p.data)
136
reason.trap(error.ProcessDone)
137
return self._requireFailure(d, processEnded)
138
test_loseConnection.skip = skipWindowsNopywin32
141
def test_lastWriteReceived(self):
143
Verify that a write made directly to stdout using L{os.write}
144
after StandardIO has finished is reliably received by the
145
process reading that stdout.
147
p = StandardIOTestProcessProtocol()
149
# Note: the OS X bug which prompted the addition of this test
150
# is an apparent race condition involving non-blocking PTYs.
151
# Delaying the parent process significantly increases the
152
# likelihood of the race going the wrong way. If you need to
153
# fiddle with this code at all, uncommenting the next line
154
# will likely make your life much easier. It is commented out
155
# because it makes the test quite slow.
157
# p.onConnection.addCallback(lambda ign: __import__('time').sleep(5))
161
p, 'stdio_test_lastwrite.py', UNIQUE_LAST_WRITE_STRING,
163
except ValueError, e:
164
# Some platforms don't work with usePTY=True
165
raise unittest.SkipTest(str(e))
167
def processEnded(reason):
169
Asserts that the parent received the bytes written by the child
170
immediately after the child starts.
173
p.data[1].endswith(UNIQUE_LAST_WRITE_STRING),
174
"Received %r from child, did not find expected bytes." % (
176
reason.trap(error.ProcessDone)
177
return self._requireFailure(p.onCompletion, processEnded)
178
test_lastWriteReceived.skip = skipWindowsNopywin32
181
def test_hostAndPeer(self):
183
Verify that the transport of a protocol connected to L{StandardIO}
184
has C{getHost} and C{getPeer} methods.
186
p = StandardIOTestProcessProtocol()
188
self._spawnProcess(p, 'stdio_test_hostpeer.py')
190
def processEnded(reason):
191
host, peer = p.data[1].splitlines()
192
self.failUnless(host)
193
self.failUnless(peer)
194
reason.trap(error.ProcessDone)
195
return self._requireFailure(d, processEnded)
196
test_hostAndPeer.skip = skipWindowsNopywin32
199
def test_write(self):
201
Verify that the C{write} method of the transport of a protocol
202
connected to L{StandardIO} sends bytes to standard out.
204
p = StandardIOTestProcessProtocol()
207
self._spawnProcess(p, 'stdio_test_write.py')
209
def processEnded(reason):
210
self.assertEquals(p.data[1], 'ok!')
211
reason.trap(error.ProcessDone)
212
return self._requireFailure(d, processEnded)
213
test_write.skip = skipWindowsNopywin32
216
def test_writeSequence(self):
218
Verify that the C{writeSequence} method of the transport of a
219
protocol connected to L{StandardIO} sends bytes to standard out.
221
p = StandardIOTestProcessProtocol()
224
self._spawnProcess(p, 'stdio_test_writeseq.py')
226
def processEnded(reason):
227
self.assertEquals(p.data[1], 'ok!')
228
reason.trap(error.ProcessDone)
229
return self._requireFailure(d, processEnded)
230
test_writeSequence.skip = skipWindowsNopywin32
234
junkPath = self.mktemp()
235
junkFile = file(junkPath, 'w')
236
for i in xrange(1024):
237
junkFile.write(str(i) + '\n')
242
def test_producer(self):
244
Verify that the transport of a protocol connected to L{StandardIO}
245
is a working L{IProducer} provider.
247
p = StandardIOTestProcessProtocol()
253
def connectionMade(ign):
255
written.append(str(toWrite.pop()) + "\n")
256
proc.write(written[-1])
257
reactor.callLater(0.01, connectionMade, None)
259
proc = self._spawnProcess(p, 'stdio_test_producer.py')
261
p.onConnection.addCallback(connectionMade)
263
def processEnded(reason):
264
self.assertEquals(p.data[1], ''.join(written))
265
self.failIf(toWrite, "Connection lost with %d writes left to go." % (len(toWrite),))
266
reason.trap(error.ProcessDone)
267
return self._requireFailure(d, processEnded)
268
test_producer.skip = skipWindowsNopywin32
271
def test_consumer(self):
273
Verify that the transport of a protocol connected to L{StandardIO}
274
is a working L{IConsumer} provider.
276
p = StandardIOTestProcessProtocol()
279
junkPath = self._junkPath()
281
self._spawnProcess(p, 'stdio_test_consumer.py', junkPath)
283
def processEnded(reason):
284
self.assertEquals(p.data[1], file(junkPath).read())
285
reason.trap(error.ProcessDone)
286
return self._requireFailure(d, processEnded)
287
test_consumer.skip = skipWindowsNopywin32