2
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
3
# See LICENSE for details.
7
Test running processes.
9
from __future__ import nested_scopes, generators
11
from twisted.trial import unittest
12
from twisted.python import log
20
from pprint import pformat
23
import cStringIO as StringIO
28
from twisted.internet import reactor, protocol, error, interfaces, defer
29
from twisted.internet.protocol import ProcessProtocol
30
from twisted.internet.defer import Deferred
32
from twisted.python import util, runtime
33
from twisted.python import procutils
35
class TrivialProcessProtocol(protocol.ProcessProtocol):
36
def __init__(self, d):
39
def processEnded(self, reason):
41
self.deferred.callback(None)
43
class TestProcessProtocol(protocol.ProcessProtocol):
45
def connectionMade(self):
49
self.transport.write("abcd")
51
def outReceived(self, data):
52
self.data = self.data + data
54
def outConnectionLost(self):
56
if self.data != "abcd":
58
self.transport.write("1234")
60
def errReceived(self, data):
61
self.err = self.err + data
63
def errConnectionLost(self):
65
if self.err != "1234":
66
print 'err != 1234: ' + repr(self.err)
68
self.transport.write("abcd")
71
def inConnectionLost(self):
74
def processEnded(self, reason):
76
self.deferred.callback(None)
79
class EchoProtocol(protocol.ProcessProtocol):
87
def __init__(self, onEnded):
88
self.onEnded = onEnded
91
def connectionMade(self):
93
for i in range(self.n - 2):
94
self.transport.write(self.s)
96
self.transport.writeSequence([self.s, self.s])
97
self.buffer = self.s * self.n
99
def outReceived(self, data):
100
if buffer(self.buffer, self.count, len(data)) != buffer(data):
101
self.failure = ("wrong bytes received", data, self.count)
102
self.transport.closeStdin()
104
self.count += len(data)
105
if self.count == len(self.buffer):
106
self.transport.closeStdin()
108
def processEnded(self, reason):
110
if not reason.check(error.ProcessDone):
111
self.failure = "process didn't terminate normally: " + str(reason)
112
self.onEnded.callback(self)
115
class SignalProtocol(protocol.ProcessProtocol):
116
def __init__(self, deferred, sig):
117
self.deferred = deferred
120
def outReceived(self, data):
121
self.transport.signalProcess(self.signal)
123
def processEnded(self, reason):
124
if not reason.check(error.ProcessTerminated):
125
self.deferred.callback("wrong termination: %s" % reason)
128
if v.exitCode is not None:
129
self.deferred.callback("SIG%s: exitCode is %s, not None" %
130
(self.signal, v.exitCode))
132
if v.signal != getattr(signal,'SIG'+self.signal):
133
self.deferred.callback("SIG%s: .signal was %s, wanted %s" %
134
(self.signal, v.signal,
135
getattr(signal,'SIG'+self.signal)))
137
if os.WTERMSIG(v.status) != getattr(signal,'SIG'+self.signal):
138
self.deferred.callback('SIG%s: %s'
139
% (self.signal, os.WTERMSIG(v.status)))
141
self.deferred.callback(None)
145
# XXX: Trial now does this (see
146
# twisted.trial.runner.MethodInfoBase._setUpSigchldHandler)... perhaps
147
# this class should be removed? Or trial shouldn't bother, and this
148
# class used where it matters?
150
sigchldHandler = None
152
def setUpClass(self):
153
# make sure SIGCHLD handler is installed, as it should be on
154
# reactor.run(). Do this because the reactor may not have been run
155
# by the time this test runs.
156
if hasattr(reactor, "_handleSigchld") and hasattr(signal, "SIGCHLD"):
157
log.msg("Installing SIGCHLD signal handler.")
158
self.sigchldHandler = signal.signal(signal.SIGCHLD,
159
reactor._handleSigchld)
161
log.msg("Skipped installing SIGCHLD signal handler.")
163
def tearDownClass(self):
164
if self.sigchldHandler:
165
log.msg("Uninstalled SIGCHLD signal handler.")
166
signal.signal(signal.SIGCHLD, self.sigchldHandler)
168
class TestManyProcessProtocol(TestProcessProtocol):
170
self.deferred = defer.Deferred()
172
def processEnded(self, reason):
174
if reason.check(error.ProcessDone):
175
self.deferred.callback(None)
177
self.deferred.errback(reason)
181
class UtilityProcessProtocol(ProcessProtocol):
183
Helper class for launching a Python process and getting a result from it.
185
@ivar program: A string giving a Python program for the child process to
190
def run(cls, reactor, argv, env):
192
Run a Python process connected to a new instance of this protocol
193
class. Return the protocol instance.
195
The Python process is given C{self.program} on the command line to
196
execute, in addition to anything specified by C{argv}. C{env} is
197
the complete environment.
201
reactor.spawnProcess(
202
self, exe, [exe, "-c", self.program] + argv, env=env)
204
run = classmethod(run)
212
def parseChunks(self, bytes):
214
Called with all bytes received on stdout when the process exits.
216
raise NotImplementedError()
221
Return a Deferred which will fire with the result of L{parseChunks}
222
when the child process exits.
225
self.requests.append(d)
229
def _fireResultDeferreds(self, result):
231
Callback all Deferreds returned up until now by L{getResult}
232
with the given result object.
234
requests = self.requests
240
def outReceived(self, bytes):
242
Accumulate output from the child process in a list.
244
self.bytes.append(bytes)
247
def processEnded(self, reason):
249
Handle process termination by parsing all received output and firing
250
any waiting Deferreds.
252
self._fireResultDeferreds(self.parseChunks(self.bytes))
257
class GetArgumentVector(UtilityProcessProtocol):
259
Protocol which will read a serialized argv from a process and
260
expose it to interested parties.
263
"from sys import stdout, argv\n"
264
"stdout.write(chr(0).join(argv))\n"
267
def parseChunks(self, chunks):
269
Parse the output from the process to which this protocol was
270
connected, which is a single unterminated line of \\0-separated
271
strings giving the argv of that process. Return this as a list of
274
return ''.join(chunks).split('\0')
278
class GetEnvironmentDictionary(UtilityProcessProtocol):
280
Protocol which will read a serialized environment dict from a process
281
and expose it to interested parties.
284
"from sys import stdout\n"
285
"from os import environ\n"
286
"items = environ.iteritems()\n"
287
"stdout.write(chr(0).join([k + chr(0) + v for k, v in items]))\n"
290
def parseChunks(self, chunks):
292
Parse the output from the process to which this protocol was
293
connected, which is a single unterminated line of \\0-separated
294
strings giving key value pairs of the environment from that process.
295
Return this as a dictionary.
297
environString = ''.join(chunks)
298
if not environString:
300
environ = iter(environString.split('\0'))
305
except StopIteration:
314
class ProcessTestCase(SignalMixin, unittest.TestCase):
315
"""Test running a process."""
320
"""twisted.internet.stdio test."""
322
scriptPath = util.sibpath(__file__, "process_twisted.py")
324
d = p.endedDeferred = defer.Deferred()
325
env = {"PYTHONPATH": os.pathsep.join(sys.path)}
326
reactor.spawnProcess(p, exe, [exe, "-u", scriptPath], env=env,
327
path=None, usePTY=self.usePTY)
328
p.transport.write("hello, world")
329
p.transport.write("abc")
330
p.transport.write("123")
331
p.transport.closeStdin()
333
def processEnded(ign):
334
self.assertEquals(p.outF.getvalue(), "hello, worldabc123",
337
"Error message from process_twisted follows:\n"
338
"%s\n" % (p.outF.getvalue(), p.errF.getvalue()))
339
return d.addCallback(processEnded)
342
def testProcess(self):
344
scriptPath = util.sibpath(__file__, "process_tester.py")
346
p = TestProcessProtocol()
348
reactor.spawnProcess(p, exe, [exe, "-u", scriptPath], env=None)
350
self.assertEquals(p.stages, [1, 2, 3, 4, 5])
352
f.trap(error.ProcessTerminated)
353
self.assertEquals(f.value.exitCode, 23)
354
# would .signal be available on non-posix?
355
# self.assertEquals(f.value.signal, None)
357
import process_tester, glob
358
for f in glob.glob(process_tester.test_file_match):
365
def testManyProcesses(self):
367
def _check(results, protocols):
369
self.assertEquals(p.stages, [1, 2, 3, 4, 5], "[%d] stages = %s" % (id(p.transport), str(p.stages)))
372
f.trap(error.ProcessTerminated)
373
self.assertEquals(f.value.exitCode, 23)
376
scriptPath = util.sibpath(__file__, "process_tester.py")
377
args = [exe, "-u", scriptPath]
382
p = TestManyProcessProtocol()
384
reactor.spawnProcess(p, exe, args, env=None)
385
deferreds.append(p.deferred)
387
deferredList = defer.DeferredList(deferreds, consumeErrors=True)
388
deferredList.addCallback(_check, protocols)
392
finished = defer.Deferred()
393
p = EchoProtocol(finished)
396
scriptPath = util.sibpath(__file__, "process_echoer.py")
397
reactor.spawnProcess(p, exe, [exe, "-u", scriptPath], env=None)
399
def asserts(ignored):
400
self.failIf(p.failure, p.failure)
401
self.failUnless(hasattr(p, 'buffer'))
402
self.assertEquals(len(''.join(p.buffer)), len(p.s * p.n))
404
def takedownProcess(err):
405
p.transport.closeStdin()
408
return finished.addCallback(asserts).addErrback(takedownProcess)
409
testEcho.timeout = 60 # XXX This should not be. There is already a
410
# global timeout value. Why do you think this
411
# test can complete more quickly?
414
def testCommandLine(self):
415
args = [r'a\"b ', r'a\b ', r' a\\"b', r' a\\b', r'"foo bar" "', '\tab', '"\\', 'a"b', "a'b"]
416
pyExe = sys.executable
417
scriptPath = util.sibpath(__file__, "process_cmdline.py")
419
d = p.endedDeferred = defer.Deferred()
420
reactor.spawnProcess(p, pyExe, [pyExe, "-u", scriptPath]+args, env=None,
423
def processEnded(ign):
424
self.assertEquals(p.errF.getvalue(), "")
425
recvdArgs = p.outF.getvalue().splitlines()
426
self.assertEquals(recvdArgs, args)
427
return d.addCallback(processEnded)
430
def test_wrongArguments(self):
432
Test invalid arguments to spawnProcess: arguments and environment
433
must only contains string or unicode, and not null bytes.
436
p = protocol.ProcessProtocol()
449
# Sanity check - this will fail for people who have mucked with
450
# their site configuration in a stupid way, but there's nothing we
452
badUnicode = u'\N{SNOWMAN}'
454
badUnicode.encode(sys.getdefaultencoding())
455
except UnicodeEncodeError:
456
# Okay, that unicode doesn't encode, put it in as a bad environment
458
badEnvs.append({badUnicode: 'value for bad unicode key'})
459
badEnvs.append({'key for bad unicode value': badUnicode})
460
badArgs.append([exe, badUnicode])
462
# It _did_ encode. Most likely, Gtk2 is being used and the
463
# default system encoding is UTF-8, which can encode anything.
464
# In any case, if implicit unicode -> str conversion works for
465
# that string, we can't test that TypeError gets raised instead,
466
# so just leave it off.
472
reactor.spawnProcess, p, exe, [exe, "-c", ""], env=env)
477
reactor.spawnProcess, p, exe, args, env=None)
480
# Use upper-case so that the environment key test uses an upper case
481
# name: some versions of Windows only support upper case environment
482
# variable names, and I think Python (as of 2.5) doesn't use the right
483
# syscall for lowercase or mixed case names to work anyway.
484
okayUnicode = u"UNICODE"
485
encodedValue = "UNICODE"
487
def _deprecatedUnicodeSupportTest(self, processProtocolClass, argv=[], env={}):
489
Check that a deprecation warning is emitted when passing unicode to
490
spawnProcess for an argv value or an environment key or value.
491
Check that the warning is of the right type, has the right message,
492
and refers to the correct file. Unfortunately, don't check that the
493
line number is correct, because that is too hard for me to figure
496
@param processProtocolClass: A L{UtilityProcessProtocol} subclass
497
which will be instantiated to communicate with the child process.
499
@param argv: The argv argument to spawnProcess.
501
@param env: The env argument to spawnProcess.
503
@return: A Deferred which fires when the test is complete.
505
# Sanity to check to make sure we can actually encode this unicode
506
# with the default system encoding. This may be excessively
509
self.okayUnicode.encode(sys.getdefaultencoding()),
513
def showwarning(*args):
514
warningsShown.append(args)
516
origshow = warnings.showwarning
517
origregistry = globals().get('__warningregistry__', {})
519
warnings.showwarning = showwarning
520
globals()['__warningregistry__'] = {}
521
p = processProtocolClass.run(reactor, argv, env)
523
warnings.showwarning = origshow
524
globals()['__warningregistry__'] = origregistry
527
self.assertEqual(len(warningsShown), 1, pformat(warningsShown))
528
message, category, filename, lineno = warningsShown[0]
531
("Argument strings and environment keys/values passed to "
532
"reactor.spawnProcess should be str, not unicode.",))
533
self.assertIdentical(category, DeprecationWarning)
535
# Use starts with because of .pyc/.pyo issues.
537
__file__.startswith(filename),
538
'Warning in %r, expected %r' % (filename, __file__))
540
# It would be nice to be able to check the line number as well, but
541
# different configurations actually end up reporting different line
542
# numbers (generally the variation is only 1 line, but that's enough
543
# to fail the test erroneously...).
544
# self.assertEqual(lineno, 202)
548
def test_deprecatedUnicodeArgvSupport(self):
550
Test that a unicode string passed for an argument value is allowed
551
if it can be encoded with the default system encoding, but that a
552
deprecation warning is emitted.
554
d = self._deprecatedUnicodeSupportTest(GetArgumentVector, argv=[self.okayUnicode])
555
def gotArgVector(argv):
556
self.assertEqual(argv, ['-c', self.encodedValue])
557
d.addCallback(gotArgVector)
561
def test_deprecatedUnicodeEnvKeySupport(self):
563
Test that a unicode string passed for the key of the environment
564
dictionary is allowed if it can be encoded with the default system
565
encoding, but that a deprecation warning is emitted.
567
d = self._deprecatedUnicodeSupportTest(
568
GetEnvironmentDictionary, env={self.okayUnicode: self.encodedValue})
569
def gotEnvironment(environ):
570
self.assertEqual(environ[self.encodedValue], self.encodedValue)
571
d.addCallback(gotEnvironment)
575
def test_deprecatedUnicodeEnvValueSupport(self):
577
Test that a unicode string passed for the value of the environment
578
dictionary is allowed if it can be encoded with the default system
579
encoding, but that a deprecation warning is emitted.
581
d = self._deprecatedUnicodeSupportTest(
582
GetEnvironmentDictionary, env={self.encodedValue: self.okayUnicode})
583
def gotEnvironment(environ):
584
# On Windows, the environment contains more things than we
585
# specified, so only make sure that at least the key we wanted
586
# is there, rather than testing the dictionary for exact
588
self.assertEqual(environ[self.encodedValue], self.encodedValue)
589
d.addCallback(gotEnvironment)
594
class TwoProcessProtocol(protocol.ProcessProtocol):
598
self.deferred = defer.Deferred()
599
def outReceived(self, data):
601
def processEnded(self, reason):
603
self.deferred.callback(None)
605
class TestTwoProcessesBase:
607
self.processes = [None, None]
608
self.pp = [None, None]
612
def createProcesses(self, usePTY=0):
614
scriptPath = util.sibpath(__file__, "process_reader.py")
616
self.pp[num] = TwoProcessProtocol()
617
self.pp[num].num = num
618
p = reactor.spawnProcess(self.pp[num],
619
exe, [exe, "-u", scriptPath], env=None,
621
self.processes[num] = p
623
def close(self, num):
624
if self.verbose: print "closing stdin [%d]" % num
625
p = self.processes[num]
627
self.failIf(pp.finished, "Process finished too early")
629
if self.verbose: print self.pp[0].finished, self.pp[1].finished
632
return defer.gatherResults([ p.deferred for p in self.pp ])
635
if self.verbose: print "starting processes"
636
self.createProcesses()
637
reactor.callLater(1, self.close, 0)
638
reactor.callLater(2, self.close, 1)
639
return self._onClose()
641
class TestTwoProcessesNonPosix(TestTwoProcessesBase, SignalMixin, unittest.TestCase):
644
class TestTwoProcessesPosix(TestTwoProcessesBase, SignalMixin, unittest.TestCase):
647
pp, process = self.pp[i], self.processes[i]
650
os.kill(process.pid, signal.SIGTERM)
653
return self._onClose()
656
if self.verbose: print "kill [%d] with SIGTERM" % num
657
p = self.processes[num]
659
self.failIf(pp.finished, "Process finished too early")
660
os.kill(p.pid, signal.SIGTERM)
661
if self.verbose: print self.pp[0].finished, self.pp[1].finished
664
if self.verbose: print "starting processes"
665
self.createProcesses(usePTY=0)
666
reactor.callLater(1, self.kill, 0)
667
reactor.callLater(2, self.kill, 1)
668
return self._onClose()
670
def testClosePty(self):
671
if self.verbose: print "starting processes"
672
self.createProcesses(usePTY=1)
673
reactor.callLater(1, self.close, 0)
674
reactor.callLater(2, self.close, 1)
675
return self._onClose()
677
def testKillPty(self):
678
if self.verbose: print "starting processes"
679
self.createProcesses(usePTY=1)
680
reactor.callLater(1, self.kill, 0)
681
reactor.callLater(2, self.kill, 1)
682
return self._onClose()
684
class FDChecker(protocol.ProcessProtocol):
689
def __init__(self, d):
694
self.deferred.callback(None)
696
def connectionMade(self):
697
self.transport.writeToChild(0, "abcd")
700
def childDataReceived(self, childFD, data):
701
#print "[%d] dataReceived(%d,%s)" % (self.state, childFD, data)
704
self.fail("read '%s' on fd %d (not 1) during state 1" \
708
#print "len", len(self.data)
709
if len(self.data) == 6:
710
if self.data != "righto":
711
self.fail("got '%s' on fd1, expected 'righto'" \
716
#print "state2", self.state
717
self.transport.writeToChild(3, "efgh")
720
self.fail("read '%s' on fd %s during state 2" % (childFD, data))
724
self.fail("read '%s' on fd %s (not 1) during state 3" \
728
if len(self.data) == 6:
729
if self.data != "closed":
730
self.fail("got '%s' on fd1, expected 'closed'" \
736
self.fail("read '%s' on fd %s during state 4" % (childFD, data))
739
def childConnectionLost(self, childFD):
740
#print "[%d] connectionLost(%d)" % (self.state, childFD)
742
self.fail("got connectionLost(%d) during state 1" % childFD)
746
self.fail("got connectionLost(%d) (not 4) during state 2" \
750
self.transport.closeChildFD(5)
753
def processEnded(self, status):
754
#print "[%d] processEnded" % self.state
755
rc = status.value.exitCode
757
self.fail("processEnded early, rc %d" % rc)
759
if status.value.signal != None:
760
self.fail("processEnded with signal %s" % status.value.signal)
763
self.fail("processEnded with rc %d" % rc)
765
self.deferred.callback(None)
767
class FDTest(SignalMixin, unittest.TestCase):
769
from twisted.internet import process
770
process.Process.debug_child = True
771
def NOTtearDown(self):
772
from twisted.internet import process
773
process.Process.debug_child = False
777
scriptPath = util.sibpath(__file__, "process_fds.py")
780
reactor.spawnProcess(p, exe, [exe, "-u", scriptPath], env=None,
782
childFDs={0:"w", 1:"r", 2:2,
783
3:"w", 4:"r", 5:"w"})
784
d.addCallback(lambda x : self.failIf(p.failed, p.failed))
787
def testLinger(self):
788
# See what happens when all the pipes close before the process
789
# actually stops. This test *requires* SIGCHLD catching to work,
790
# as there is no other way to find out the process is done.
792
scriptPath = util.sibpath(__file__, "process_linger.py")
794
d = p.endedDeferred = defer.Deferred()
795
reactor.spawnProcess(p, exe, [exe, "-u", scriptPath], env=None,
797
childFDs={1:"r", 2:2},
799
def processEnded(ign):
800
self.failUnlessEqual(p.outF.getvalue(),
801
"here is some text\ngoodbye\n")
802
return d.addCallback(processEnded)
806
class Accumulator(protocol.ProcessProtocol):
807
"""Accumulate data from a process."""
812
def connectionMade(self):
813
# print "connection made"
814
self.outF = StringIO.StringIO()
815
self.errF = StringIO.StringIO()
817
def outReceived(self, d):
818
# print "data", repr(d)
821
def errReceived(self, d):
822
# print "err", repr(d)
825
def outConnectionLost(self):
829
def errConnectionLost(self):
833
def processEnded(self, reason):
835
if self.endedDeferred is not None:
836
d, self.endedDeferred = self.endedDeferred, None
840
class PosixProcessBase:
841
"""Test running processes."""
844
def testNormalTermination(self):
845
if os.path.exists('/bin/true'): cmd = '/bin/true'
846
elif os.path.exists('/usr/bin/true'): cmd = '/usr/bin/true'
847
else: raise RuntimeError("true not found in /bin or /usr/bin")
850
p = TrivialProcessProtocol(d)
851
reactor.spawnProcess(p, cmd, ['true'], env=None,
854
p.reason.trap(error.ProcessDone)
855
self.assertEquals(p.reason.value.exitCode, 0)
856
self.assertEquals(p.reason.value.signal, None)
860
def testAbnormalTermination(self):
861
if os.path.exists('/bin/false'): cmd = '/bin/false'
862
elif os.path.exists('/usr/bin/false'): cmd = '/usr/bin/false'
863
else: raise RuntimeError("false not found in /bin or /usr/bin")
866
p = TrivialProcessProtocol(d)
867
reactor.spawnProcess(p, cmd, ['false'], env=None,
871
p.reason.trap(error.ProcessTerminated)
872
self.assertEquals(p.reason.value.exitCode, 1)
873
self.assertEquals(p.reason.value.signal, None)
877
def _testSignal(self, sig):
879
scriptPath = util.sibpath(__file__, "process_signal.py")
881
p = SignalProtocol(d, sig)
882
reactor.spawnProcess(p, exe, [exe, "-u", scriptPath, sig],
887
def testSignalHUP(self):
888
d = self._testSignal('HUP')
889
d.addCallback(self.failIf)
892
def testSignalINT(self):
893
d = self._testSignal('INT')
894
d.addCallback(self.failIf)
897
def testSignalKILL(self):
898
d = self._testSignal('KILL')
899
d.addCallback(self.failIf)
903
class PosixProcessTestCase(SignalMixin, unittest.TestCase, PosixProcessBase):
904
# add three non-pty test cases
906
def testStderr(self):
907
# we assume there is no file named ZZXXX..., both in . and in /tmp
908
if not os.path.exists('/bin/ls'):
909
raise RuntimeError("/bin/ls not found")
912
d = p.endedDeferred = defer.Deferred()
913
reactor.spawnProcess(p, '/bin/ls',
915
"ZZXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"],
916
env=None, path="/tmp",
919
def processEnded(ign):
920
self.assertEquals(lsOut, p.errF.getvalue())
921
return d.addCallback(processEnded)
923
def testProcess(self):
924
if os.path.exists('/bin/gzip'): cmd = '/bin/gzip'
925
elif os.path.exists('/usr/bin/gzip'): cmd = '/usr/bin/gzip'
926
else: raise RuntimeError("gzip not found in /bin or /usr/bin")
927
s = "there's no place like home!\n" * 3
929
d = p.endedDeferred = defer.Deferred()
930
reactor.spawnProcess(p, cmd, [cmd, "-c"], env=None, path="/tmp",
933
p.transport.closeStdin()
935
def processEnded(ign):
938
gf = gzip.GzipFile(fileobj=f)
939
self.assertEquals(gf.read(), s)
940
return d.addCallback(processEnded)
944
class PosixProcessTestCasePTY(SignalMixin, unittest.TestCase, PosixProcessBase):
945
"""Just like PosixProcessTestCase, but use ptys instead of pipes."""
947
# PTYs only offer one input and one output. What still makes sense?
948
# testNormalTermination
949
# testAbnormalTermination
951
# testProcess, but not without p.transport.closeStdin
952
# might be solveable: TODO: add test if so
954
def testOpeningTTY(self):
956
scriptPath = util.sibpath(__file__, "process_tty.py")
958
d = p.endedDeferred = defer.Deferred()
959
reactor.spawnProcess(p, exe, [exe, "-u", scriptPath], env=None,
960
path=None, usePTY=self.usePTY)
961
p.transport.write("hello world!\n")
963
def processEnded(ign):
966
"hello world!\r\nhello world!\r\n",
967
"Error message from process_tty follows:\n\n%s\n\n" % p.outF.getvalue())
968
return d.addCallback(processEnded)
971
def testBadArgs(self):
972
pyExe = sys.executable
973
pyArgs = [pyExe, "-u", "-c", "print 'hello'"]
975
self.assertRaises(ValueError, reactor.spawnProcess, p, pyExe, pyArgs, usePTY=1, childFDs={1:'r'})
977
class Win32ProcessTestCase(SignalMixin, unittest.TestCase):
978
"""Test process programs that are packaged with twisted."""
980
def testStdinReader(self):
981
pyExe = sys.executable
982
scriptPath = util.sibpath(__file__, "process_stdinreader.py")
984
d = p.endedDeferred = defer.Deferred()
985
reactor.spawnProcess(p, pyExe, [pyExe, "-u", scriptPath], env=None,
987
p.transport.write("hello, world")
988
p.transport.closeStdin()
990
def processEnded(ign):
991
self.assertEquals(p.errF.getvalue(), "err\nerr\n")
992
self.assertEquals(p.outF.getvalue(), "out\nhello, world\nout\n")
993
return d.addCallback(processEnded)
996
def testBadArgs(self):
997
pyExe = sys.executable
998
pyArgs = [pyExe, "-u", "-c", "print 'hello'"]
1000
self.assertRaises(ValueError, reactor.spawnProcess, p, pyExe, pyArgs, uid=1)
1001
self.assertRaises(ValueError, reactor.spawnProcess, p, pyExe, pyArgs, gid=1)
1002
self.assertRaises(ValueError, reactor.spawnProcess, p, pyExe, pyArgs, usePTY=1)
1003
self.assertRaises(ValueError, reactor.spawnProcess, p, pyExe, pyArgs, childFDs={1:'r'})
1005
class UtilTestCase(unittest.TestCase):
1007
Tests for process-related helper functions (currently only
1012
Create several directories and files, some of which are executable
1013
and some of which are not. Save the current PATH setting.
1017
base = self.mktemp()
1019
self.foo = j(base, "foo")
1020
self.baz = j(base, "baz")
1021
self.foobar = j(self.foo, "bar")
1022
self.foobaz = j(self.foo, "baz")
1023
self.bazfoo = j(self.baz, "foo")
1024
self.bazbar = j(self.baz, "bar")
1026
for d in self.foobar, self.foobaz, self.bazfoo, self.bazbar:
1029
for name, mode in [(j(self.foobaz, "executable"), 0700),
1030
(j(self.foo, "executable"), 0700),
1031
(j(self.bazfoo, "executable"), 0700),
1032
(j(self.bazfoo, "executable.bin"), 0700),
1033
(j(self.bazbar, "executable"), 0)]:
1036
os.chmod(name, mode)
1038
self.oldPath = os.environ.get('PATH', None)
1039
os.environ['PATH'] = os.pathsep.join((
1040
self.foobar, self.foobaz, self.bazfoo, self.bazbar))
1045
Restore the saved PATH setting.
1047
if self.oldPath is None:
1049
del os.environ['PATH']
1053
os.environ['PATH'] = self.oldPath
1056
def test_whichWithoutPATH(self):
1058
Test that if C{os.environ} does not have a C{'PATH'} key,
1059
L{procutils.which} returns an empty list.
1061
del os.environ['PATH']
1062
self.assertEqual(procutils.which("executable"), [])
1065
def testWhich(self):
1067
paths = procutils.which("executable")
1068
expectedPaths = [j(self.foobaz, "executable"),
1069
j(self.bazfoo, "executable")]
1070
if runtime.platform.isWindows():
1071
expectedPaths.append(j(self.bazbar, "executable"))
1072
self.assertEquals(paths, expectedPaths)
1075
def testWhichPathExt(self):
1077
old = os.environ.get('PATHEXT', None)
1078
os.environ['PATHEXT'] = os.pathsep.join(('.bin', '.exe', '.sh'))
1080
paths = procutils.which("executable")
1083
del os.environ['PATHEXT']
1085
os.environ['PATHEXT'] = old
1086
expectedPaths = [j(self.foobaz, "executable"),
1087
j(self.bazfoo, "executable"),
1088
j(self.bazfoo, "executable.bin")]
1089
if runtime.platform.isWindows():
1090
expectedPaths.append(j(self.bazbar, "executable"))
1091
self.assertEquals(paths, expectedPaths)
1095
class ClosingPipesProcessProtocol(protocol.ProcessProtocol):
1099
def __init__(self, outOrErr):
1100
self.deferred = defer.Deferred()
1101
self.outOrErr = outOrErr
1103
def processEnded(self, reason):
1104
self.deferred.callback(reason)
1106
def outReceived(self, data):
1109
def errReceived(self, data):
1113
class ClosingPipes(unittest.TestCase):
1116
p = ClosingPipesProcessProtocol(True)
1117
p.deferred.addCallbacks(
1118
callback=lambda _: self.fail("I wanted an errback."),
1119
errback=self._endProcess, errbackArgs=(p,))
1120
reactor.spawnProcess(p, sys.executable,
1121
[sys.executable, '-u', '-c',
1122
r'raw_input(); import sys, os; os.write(%d, "foo\n"); sys.exit(42)' % fd],
1124
p.transport.write('go\n')
1127
p.transport.closeStdout()
1129
p.transport.closeStderr()
1133
# make the buggy case not hang
1134
p.transport.closeStdin()
1137
def _endProcess(self, reason, p):
1138
self.failIf(reason.check(error.ProcessDone),
1139
'Child should fail due to EPIPE.')
1140
reason.trap(error.ProcessTerminated)
1141
# child must not get past that write without raising
1142
self.failIfEqual(reason.value.exitCode, 42,
1143
'process reason was %r' % reason)
1144
self.failUnlessEqual(p.output, '')
1147
def test_stdout(self):
1148
"""ProcessProtocol.transport.closeStdout actually closes the pipe."""
1151
self.failIfEqual(errput.find('OSError'), -1)
1152
if runtime.platform.getType() != 'win32':
1153
self.failIfEqual(errput.find('Broken pipe'), -1)
1154
d.addCallback(_check)
1157
def test_stderr(self):
1158
"""ProcessProtocol.transport.closeStderr actually closes the pipe."""
1161
# there should be no stderr open, so nothing for it to
1162
# write the error to.
1163
self.failUnlessEqual(errput, '')
1164
d.addCallback(_check)
1168
skipMessage = "wrong platform or reactor doesn't support IReactorProcess"
1169
if (runtime.platform.getType() != 'posix') or (not interfaces.IReactorProcess(reactor, None)):
1170
PosixProcessTestCase.skip = skipMessage
1171
PosixProcessTestCasePTY.skip = skipMessage
1172
TestTwoProcessesPosix.skip = skipMessage
1173
FDTest.skip = skipMessage
1175
# do this before running the tests: it uses SIGCHLD and stuff internally
1176
lsOut = popen2.popen3("/bin/ls ZZXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX")[2].read()
1178
if (runtime.platform.getType() != 'win32') or (not interfaces.IReactorProcess(reactor, None)):
1179
Win32ProcessTestCase.skip = skipMessage
1180
TestTwoProcessesNonPosix.skip = skipMessage
1182
if not interfaces.IReactorProcess(reactor, None):
1183
ProcessTestCase.skip = skipMessage
1184
ClosingPipes.skip = skipMessage