1
# Copyright (c) 2001-2010 Twisted Matrix Laboratories.
2
# See LICENSE for details.
5
Test running processes.
20
fcntl = process = None
22
from twisted.internet import process
25
from zope.interface.verify import verifyObject
27
from twisted.python.log import msg
28
from twisted.internet import reactor, protocol, error, interfaces, defer
29
from twisted.trial import unittest
30
from twisted.python import util, runtime, procutils
31
from twisted.python.compat import set
35
class StubProcessProtocol(protocol.ProcessProtocol):
37
ProcessProtocol counter-implementation: all methods on this class raise an
38
exception, so instances of this may be used to verify that only certain
41
def outReceived(self, data):
42
raise NotImplementedError()
44
def errReceived(self, data):
45
raise NotImplementedError()
47
def inConnectionLost(self):
48
raise NotImplementedError()
50
def outConnectionLost(self):
51
raise NotImplementedError()
53
def errConnectionLost(self):
54
raise NotImplementedError()
58
class ProcessProtocolTests(unittest.TestCase):
60
Tests for behavior provided by the process protocol base class,
61
L{protocol.ProcessProtocol}.
63
def test_interface(self):
65
L{ProcessProtocol} implements L{IProcessProtocol}.
67
verifyObject(interfaces.IProcessProtocol, protocol.ProcessProtocol())
70
def test_outReceived(self):
72
Verify that when stdout is delivered to
73
L{ProcessProtocol.childDataReceived}, it is forwarded to
74
L{ProcessProtocol.outReceived}.
77
class OutProtocol(StubProcessProtocol):
78
def outReceived(self, data):
83
p.childDataReceived(1, bytes)
84
self.assertEqual(received, [bytes])
87
def test_errReceived(self):
89
Similar to L{test_outReceived}, but for stderr.
92
class ErrProtocol(StubProcessProtocol):
93
def errReceived(self, data):
98
p.childDataReceived(2, bytes)
99
self.assertEqual(received, [bytes])
102
def test_inConnectionLost(self):
104
Verify that when stdin close notification is delivered to
105
L{ProcessProtocol.childConnectionLost}, it is forwarded to
106
L{ProcessProtocol.inConnectionLost}.
109
class InLostProtocol(StubProcessProtocol):
110
def inConnectionLost(self):
114
p.childConnectionLost(0)
115
self.assertEqual(lost, [None])
118
def test_outConnectionLost(self):
120
Similar to L{test_inConnectionLost}, but for stdout.
123
class OutLostProtocol(StubProcessProtocol):
124
def outConnectionLost(self):
127
p = OutLostProtocol()
128
p.childConnectionLost(1)
129
self.assertEqual(lost, [None])
132
def test_errConnectionLost(self):
134
Similar to L{test_inConnectionLost}, but for stderr.
137
class ErrLostProtocol(StubProcessProtocol):
138
def errConnectionLost(self):
141
p = ErrLostProtocol()
142
p.childConnectionLost(2)
143
self.assertEqual(lost, [None])
147
class TrivialProcessProtocol(protocol.ProcessProtocol):
149
Simple process protocol for tests purpose.
151
@ivar outData: data received from stdin
152
@ivar errData: data received from stderr
155
def __init__(self, d):
157
Create the deferred that will be fired at the end, and initialize
164
def processEnded(self, reason):
166
self.deferred.callback(None)
168
def outReceived(self, data):
169
self.outData.append(data)
171
def errReceived(self, data):
172
self.errData.append(data)
175
class TestProcessProtocol(protocol.ProcessProtocol):
177
def connectionMade(self):
181
self.transport.write("abcd")
183
def childDataReceived(self, childFD, data):
185
Override and disable the dispatch provided by the base class to ensure
186
that it is really this method which is being called, and the transport
187
is not going directly to L{outReceived} or L{errReceived}.
195
def childConnectionLost(self, childFD):
197
Similarly to L{childDataReceived}, disable the automatic dispatch
198
provided by the base implementation to verify that the transport is
199
calling this method directly.
202
self.stages.append(2)
203
if self.data != "abcd":
205
self.transport.write("1234")
207
self.stages.append(3)
208
if self.err != "1234":
209
print 'err != 1234: ' + repr(self.err)
211
self.transport.write("abcd")
212
self.stages.append(4)
214
self.stages.append(5)
216
def processEnded(self, reason):
218
self.deferred.callback(None)
221
class EchoProtocol(protocol.ProcessProtocol):
229
def __init__(self, onEnded):
230
self.onEnded = onEnded
233
def connectionMade(self):
235
for i in range(self.n - 2):
236
self.transport.write(self.s)
238
self.transport.writeSequence([self.s, self.s])
239
self.buffer = self.s * self.n
241
def outReceived(self, data):
242
if buffer(self.buffer, self.count, len(data)) != buffer(data):
243
self.failure = ("wrong bytes received", data, self.count)
244
self.transport.closeStdin()
246
self.count += len(data)
247
if self.count == len(self.buffer):
248
self.transport.closeStdin()
250
def processEnded(self, reason):
252
if not reason.check(error.ProcessDone):
253
self.failure = "process didn't terminate normally: " + str(reason)
254
self.onEnded.callback(self)
258
class SignalProtocol(protocol.ProcessProtocol):
260
A process protocol that sends a signal when data is first received.
262
@ivar deferred: deferred firing on C{processEnded}.
263
@type deferred: L{defer.Deferred}
265
@ivar signal: the signal to send to the process.
268
@ivar signaled: A flag tracking whether the signal has been sent to the
269
child or not yet. C{False} until it is sent, then C{True}.
270
@type signaled: C{bool}
273
def __init__(self, deferred, sig):
274
self.deferred = deferred
276
self.signaled = False
279
def outReceived(self, data):
281
Handle the first output from the child process (which indicates it
282
is set up and ready to receive the signal) by sending the signal to
283
it. Also log all output to help with debugging.
285
msg("Received %r from child stdout" % (data,))
286
if not self.signaled:
288
self.transport.signalProcess(self.signal)
291
def errReceived(self, data):
293
Log all data received from the child's stderr to help with
296
msg("Received %r from child stderr" % (data,))
299
def processEnded(self, reason):
301
Callback C{self.deferred} with C{None} if C{reason} is a
302
L{error.ProcessTerminated} failure with C{exitCode} set to C{None},
303
C{signal} set to C{self.signal}, and C{status} holding the status code
304
of the exited process. Otherwise, errback with a C{ValueError}
305
describing the problem.
307
msg("Child exited: %r" % (reason.getTraceback(),))
308
if not reason.check(error.ProcessTerminated):
309
return self.deferred.errback(
310
ValueError("wrong termination: %s" % (reason,)))
312
if isinstance(self.signal, str):
313
signalValue = getattr(signal, 'SIG' + self.signal)
315
signalValue = self.signal
316
if v.exitCode is not None:
317
return self.deferred.errback(
318
ValueError("SIG%s: exitCode is %s, not None" %
319
(self.signal, v.exitCode)))
320
if v.signal != signalValue:
321
return self.deferred.errback(
322
ValueError("SIG%s: .signal was %s, wanted %s" %
323
(self.signal, v.signal, signalValue)))
324
if os.WTERMSIG(v.status) != signalValue:
325
return self.deferred.errback(
326
ValueError('SIG%s: %s' % (self.signal, os.WTERMSIG(v.status))))
327
self.deferred.callback(None)
331
class TestManyProcessProtocol(TestProcessProtocol):
333
self.deferred = defer.Deferred()
335
def processEnded(self, reason):
337
if reason.check(error.ProcessDone):
338
self.deferred.callback(None)
340
self.deferred.errback(reason)
344
class UtilityProcessProtocol(protocol.ProcessProtocol):
346
Helper class for launching a Python process and getting a result from it.
348
@ivar program: A string giving a Python program for the child process to
353
def run(cls, reactor, argv, env):
355
Run a Python process connected to a new instance of this protocol
356
class. Return the protocol instance.
358
The Python process is given C{self.program} on the command line to
359
execute, in addition to anything specified by C{argv}. C{env} is
360
the complete environment.
364
reactor.spawnProcess(
365
self, exe, [exe, "-c", self.program] + argv, env=env)
367
run = classmethod(run)
375
def parseChunks(self, bytes):
377
Called with all bytes received on stdout when the process exits.
379
raise NotImplementedError()
384
Return a Deferred which will fire with the result of L{parseChunks}
385
when the child process exits.
388
self.requests.append(d)
392
def _fireResultDeferreds(self, result):
394
Callback all Deferreds returned up until now by L{getResult}
395
with the given result object.
397
requests = self.requests
403
def outReceived(self, bytes):
405
Accumulate output from the child process in a list.
407
self.bytes.append(bytes)
410
def processEnded(self, reason):
412
Handle process termination by parsing all received output and firing
413
any waiting Deferreds.
415
self._fireResultDeferreds(self.parseChunks(self.bytes))
420
class GetArgumentVector(UtilityProcessProtocol):
422
Protocol which will read a serialized argv from a process and
423
expose it to interested parties.
426
"from sys import stdout, argv\n"
427
"stdout.write(chr(0).join(argv))\n"
430
def parseChunks(self, chunks):
432
Parse the output from the process to which this protocol was
433
connected, which is a single unterminated line of \\0-separated
434
strings giving the argv of that process. Return this as a list of
437
return ''.join(chunks).split('\0')
441
class GetEnvironmentDictionary(UtilityProcessProtocol):
443
Protocol which will read a serialized environment dict from a process
444
and expose it to interested parties.
447
"from sys import stdout\n"
448
"from os import environ\n"
449
"items = environ.iteritems()\n"
450
"stdout.write(chr(0).join([k + chr(0) + v for k, v in items]))\n"
453
def parseChunks(self, chunks):
455
Parse the output from the process to which this protocol was
456
connected, which is a single unterminated line of \\0-separated
457
strings giving key value pairs of the environment from that process.
458
Return this as a dictionary.
460
environString = ''.join(chunks)
461
if not environString:
463
environ = iter(environString.split('\0'))
468
except StopIteration:
477
class ProcessTestCase(unittest.TestCase):
478
"""Test running a process."""
483
"""twisted.internet.stdio test."""
485
scriptPath = util.sibpath(__file__, "process_twisted.py")
487
d = p.endedDeferred = defer.Deferred()
488
env = {"PYTHONPATH": os.pathsep.join(sys.path)}
489
reactor.spawnProcess(p, exe, [exe, "-u", scriptPath], env=env,
490
path=None, usePTY=self.usePTY)
491
p.transport.write("hello, world")
492
p.transport.write("abc")
493
p.transport.write("123")
494
p.transport.closeStdin()
496
def processEnded(ign):
497
self.assertEquals(p.outF.getvalue(), "hello, worldabc123",
500
"Error message from process_twisted follows:\n"
501
"%s\n" % (p.outF.getvalue(), p.errF.getvalue()))
502
return d.addCallback(processEnded)
505
def test_unsetPid(self):
507
Test if pid is None/non-None before/after process termination. This
508
reuses process_echoer.py to get a process that blocks on stdin.
510
finished = defer.Deferred()
511
p = TrivialProcessProtocol(finished)
513
scriptPath = util.sibpath(__file__, "process_echoer.py")
514
procTrans = reactor.spawnProcess(p, exe,
515
[exe, scriptPath], env=None)
516
self.failUnless(procTrans.pid)
518
def afterProcessEnd(ignored):
519
self.assertEqual(procTrans.pid, None)
521
p.transport.closeStdin()
522
return finished.addCallback(afterProcessEnd)
525
def test_process(self):
527
Test running a process: check its output, it exitCode, some property of
531
scriptPath = util.sibpath(__file__, "process_tester.py")
533
p = TestProcessProtocol()
535
reactor.spawnProcess(p, exe, [exe, "-u", scriptPath], env=None)
537
self.assertEquals(p.stages, [1, 2, 3, 4, 5])
539
f.trap(error.ProcessTerminated)
540
self.assertEquals(f.value.exitCode, 23)
541
# would .signal be available on non-posix?
542
# self.assertEquals(f.value.signal, None)
544
error.ProcessExitedAlready, p.transport.signalProcess, 'INT')
546
import process_tester, glob
547
for f in glob.glob(process_tester.test_file_match):
554
def testManyProcesses(self):
556
def _check(results, protocols):
558
self.assertEquals(p.stages, [1, 2, 3, 4, 5], "[%d] stages = %s" % (id(p.transport), str(p.stages)))
561
f.trap(error.ProcessTerminated)
562
self.assertEquals(f.value.exitCode, 23)
565
scriptPath = util.sibpath(__file__, "process_tester.py")
566
args = [exe, "-u", scriptPath]
571
p = TestManyProcessProtocol()
573
reactor.spawnProcess(p, exe, args, env=None)
574
deferreds.append(p.deferred)
576
deferredList = defer.DeferredList(deferreds, consumeErrors=True)
577
deferredList.addCallback(_check, protocols)
583
A spawning a subprocess which echoes its stdin to its stdout via
584
C{reactor.spawnProcess} will result in that echoed output being
585
delivered to outReceived.
587
finished = defer.Deferred()
588
p = EchoProtocol(finished)
591
scriptPath = util.sibpath(__file__, "process_echoer.py")
592
reactor.spawnProcess(p, exe, [exe, scriptPath], env=None)
594
def asserts(ignored):
595
self.failIf(p.failure, p.failure)
596
self.failUnless(hasattr(p, 'buffer'))
597
self.assertEquals(len(''.join(p.buffer)), len(p.s * p.n))
599
def takedownProcess(err):
600
p.transport.closeStdin()
603
return finished.addCallback(asserts).addErrback(takedownProcess)
606
def testCommandLine(self):
607
args = [r'a\"b ', r'a\b ', r' a\\"b', r' a\\b', r'"foo bar" "', '\tab', '"\\', 'a"b', "a'b"]
608
pyExe = sys.executable
609
scriptPath = util.sibpath(__file__, "process_cmdline.py")
611
d = p.endedDeferred = defer.Deferred()
612
reactor.spawnProcess(p, pyExe, [pyExe, "-u", scriptPath]+args, env=None,
615
def processEnded(ign):
616
self.assertEquals(p.errF.getvalue(), "")
617
recvdArgs = p.outF.getvalue().splitlines()
618
self.assertEquals(recvdArgs, args)
619
return d.addCallback(processEnded)
622
def test_wrongArguments(self):
624
Test invalid arguments to spawnProcess: arguments and environment
625
must only contains string or unicode, and not null bytes.
628
p = protocol.ProcessProtocol()
641
# Sanity check - this will fail for people who have mucked with
642
# their site configuration in a stupid way, but there's nothing we
644
badUnicode = u'\N{SNOWMAN}'
646
badUnicode.encode(sys.getdefaultencoding())
647
except UnicodeEncodeError:
648
# Okay, that unicode doesn't encode, put it in as a bad environment
650
badEnvs.append({badUnicode: 'value for bad unicode key'})
651
badEnvs.append({'key for bad unicode value': badUnicode})
652
badArgs.append([exe, badUnicode])
654
# It _did_ encode. Most likely, Gtk2 is being used and the
655
# default system encoding is UTF-8, which can encode anything.
656
# In any case, if implicit unicode -> str conversion works for
657
# that string, we can't test that TypeError gets raised instead,
658
# so just leave it off.
664
reactor.spawnProcess, p, exe, [exe, "-c", ""], env=env)
669
reactor.spawnProcess, p, exe, args, env=None)
672
# Use upper-case so that the environment key test uses an upper case
673
# name: some versions of Windows only support upper case environment
674
# variable names, and I think Python (as of 2.5) doesn't use the right
675
# syscall for lowercase or mixed case names to work anyway.
676
okayUnicode = u"UNICODE"
677
encodedValue = "UNICODE"
679
def _deprecatedUnicodeSupportTest(self, processProtocolClass, argv=[], env={}):
681
Check that a deprecation warning is emitted when passing unicode to
682
spawnProcess for an argv value or an environment key or value.
683
Check that the warning is of the right type, has the right message,
684
and refers to the correct file. Unfortunately, don't check that the
685
line number is correct, because that is too hard for me to figure
688
@param processProtocolClass: A L{UtilityProcessProtocol} subclass
689
which will be instantiated to communicate with the child process.
691
@param argv: The argv argument to spawnProcess.
693
@param env: The env argument to spawnProcess.
695
@return: A Deferred which fires when the test is complete.
697
# Sanity to check to make sure we can actually encode this unicode
698
# with the default system encoding. This may be excessively
701
self.okayUnicode.encode(sys.getdefaultencoding()),
704
p = self.assertWarns(DeprecationWarning,
705
"Argument strings and environment keys/values passed to "
706
"reactor.spawnProcess should be str, not unicode.", __file__,
707
processProtocolClass.run, reactor, argv, env)
711
def test_deprecatedUnicodeArgvSupport(self):
713
Test that a unicode string passed for an argument value is allowed
714
if it can be encoded with the default system encoding, but that a
715
deprecation warning is emitted.
717
d = self._deprecatedUnicodeSupportTest(GetArgumentVector, argv=[self.okayUnicode])
718
def gotArgVector(argv):
719
self.assertEqual(argv, ['-c', self.encodedValue])
720
d.addCallback(gotArgVector)
724
def test_deprecatedUnicodeEnvKeySupport(self):
726
Test that a unicode string passed for the key of the environment
727
dictionary is allowed if it can be encoded with the default system
728
encoding, but that a deprecation warning is emitted.
730
d = self._deprecatedUnicodeSupportTest(
731
GetEnvironmentDictionary, env={self.okayUnicode: self.encodedValue})
732
def gotEnvironment(environ):
733
self.assertEqual(environ[self.encodedValue], self.encodedValue)
734
d.addCallback(gotEnvironment)
738
def test_deprecatedUnicodeEnvValueSupport(self):
740
Test that a unicode string passed for the value of the environment
741
dictionary is allowed if it can be encoded with the default system
742
encoding, but that a deprecation warning is emitted.
744
d = self._deprecatedUnicodeSupportTest(
745
GetEnvironmentDictionary, env={self.encodedValue: self.okayUnicode})
746
def gotEnvironment(environ):
747
# On Windows, the environment contains more things than we
748
# specified, so only make sure that at least the key we wanted
749
# is there, rather than testing the dictionary for exact
751
self.assertEqual(environ[self.encodedValue], self.encodedValue)
752
d.addCallback(gotEnvironment)
757
class TwoProcessProtocol(protocol.ProcessProtocol):
761
self.deferred = defer.Deferred()
762
def outReceived(self, data):
764
def processEnded(self, reason):
766
self.deferred.callback(None)
768
class TestTwoProcessesBase:
770
self.processes = [None, None]
771
self.pp = [None, None]
775
def createProcesses(self, usePTY=0):
777
scriptPath = util.sibpath(__file__, "process_reader.py")
779
self.pp[num] = TwoProcessProtocol()
780
self.pp[num].num = num
781
p = reactor.spawnProcess(self.pp[num],
782
exe, [exe, "-u", scriptPath], env=None,
784
self.processes[num] = p
786
def close(self, num):
787
if self.verbose: print "closing stdin [%d]" % num
788
p = self.processes[num]
790
self.failIf(pp.finished, "Process finished too early")
792
if self.verbose: print self.pp[0].finished, self.pp[1].finished
795
return defer.gatherResults([ p.deferred for p in self.pp ])
798
if self.verbose: print "starting processes"
799
self.createProcesses()
800
reactor.callLater(1, self.close, 0)
801
reactor.callLater(2, self.close, 1)
802
return self._onClose()
804
class TestTwoProcessesNonPosix(TestTwoProcessesBase, unittest.TestCase):
807
class TestTwoProcessesPosix(TestTwoProcessesBase, unittest.TestCase):
809
for pp, pr in zip(self.pp, self.processes):
812
os.kill(pr.pid, signal.SIGTERM)
814
# If the test failed the process may already be dead
815
# The error here is only noise
817
return self._onClose()
820
if self.verbose: print "kill [%d] with SIGTERM" % num
821
p = self.processes[num]
823
self.failIf(pp.finished, "Process finished too early")
824
os.kill(p.pid, signal.SIGTERM)
825
if self.verbose: print self.pp[0].finished, self.pp[1].finished
828
if self.verbose: print "starting processes"
829
self.createProcesses(usePTY=0)
830
reactor.callLater(1, self.kill, 0)
831
reactor.callLater(2, self.kill, 1)
832
return self._onClose()
834
def testClosePty(self):
835
if self.verbose: print "starting processes"
836
self.createProcesses(usePTY=1)
837
reactor.callLater(1, self.close, 0)
838
reactor.callLater(2, self.close, 1)
839
return self._onClose()
841
def testKillPty(self):
842
if self.verbose: print "starting processes"
843
self.createProcesses(usePTY=1)
844
reactor.callLater(1, self.kill, 0)
845
reactor.callLater(2, self.kill, 1)
846
return self._onClose()
848
class FDChecker(protocol.ProcessProtocol):
853
def __init__(self, d):
858
self.deferred.callback(None)
860
def connectionMade(self):
861
self.transport.writeToChild(0, "abcd")
864
def childDataReceived(self, childFD, data):
867
self.fail("read '%s' on fd %d (not 1) during state 1" \
871
#print "len", len(self.data)
872
if len(self.data) == 6:
873
if self.data != "righto":
874
self.fail("got '%s' on fd1, expected 'righto'" \
879
#print "state2", self.state
880
self.transport.writeToChild(3, "efgh")
883
self.fail("read '%s' on fd %s during state 2" % (childFD, data))
887
self.fail("read '%s' on fd %s (not 1) during state 3" \
891
if len(self.data) == 6:
892
if self.data != "closed":
893
self.fail("got '%s' on fd1, expected 'closed'" \
899
self.fail("read '%s' on fd %s during state 4" % (childFD, data))
902
def childConnectionLost(self, childFD):
904
self.fail("got connectionLost(%d) during state 1" % childFD)
908
self.fail("got connectionLost(%d) (not 4) during state 2" \
912
self.transport.closeChildFD(5)
915
def processEnded(self, status):
916
rc = status.value.exitCode
918
self.fail("processEnded early, rc %d" % rc)
920
if status.value.signal != None:
921
self.fail("processEnded with signal %s" % status.value.signal)
924
self.fail("processEnded with rc %d" % rc)
926
self.deferred.callback(None)
929
class FDTest(unittest.TestCase):
933
scriptPath = util.sibpath(__file__, "process_fds.py")
936
reactor.spawnProcess(p, exe, [exe, "-u", scriptPath], env=None,
938
childFDs={0:"w", 1:"r", 2:2,
939
3:"w", 4:"r", 5:"w"})
940
d.addCallback(lambda x : self.failIf(p.failed, p.failed))
943
def testLinger(self):
944
# See what happens when all the pipes close before the process
945
# actually stops. This test *requires* SIGCHLD catching to work,
946
# as there is no other way to find out the process is done.
948
scriptPath = util.sibpath(__file__, "process_linger.py")
950
d = p.endedDeferred = defer.Deferred()
951
reactor.spawnProcess(p, exe, [exe, "-u", scriptPath], env=None,
953
childFDs={1:"r", 2:2},
955
def processEnded(ign):
956
self.failUnlessEqual(p.outF.getvalue(),
957
"here is some text\ngoodbye\n")
958
return d.addCallback(processEnded)
962
class Accumulator(protocol.ProcessProtocol):
963
"""Accumulate data from a process."""
968
def connectionMade(self):
969
self.outF = StringIO.StringIO()
970
self.errF = StringIO.StringIO()
972
def outReceived(self, d):
975
def errReceived(self, d):
978
def outConnectionLost(self):
981
def errConnectionLost(self):
984
def processEnded(self, reason):
986
if self.endedDeferred is not None:
987
d, self.endedDeferred = self.endedDeferred, None
991
class PosixProcessBase:
993
Test running processes.
997
def getCommand(self, commandName):
999
Return the path of the shell command named C{commandName}, looking at
1002
if os.path.exists('/bin/%s' % (commandName,)):
1003
cmd = '/bin/%s' % (commandName,)
1004
elif os.path.exists('/usr/bin/%s' % (commandName,)):
1005
cmd = '/usr/bin/%s' % (commandName,)
1008
"%s not found in /bin or /usr/bin" % (commandName,))
1011
def testNormalTermination(self):
1012
cmd = self.getCommand('true')
1014
d = defer.Deferred()
1015
p = TrivialProcessProtocol(d)
1016
reactor.spawnProcess(p, cmd, ['true'], env=None,
1019
p.reason.trap(error.ProcessDone)
1020
self.assertEquals(p.reason.value.exitCode, 0)
1021
self.assertEquals(p.reason.value.signal, None)
1022
d.addCallback(check)
1026
def test_abnormalTermination(self):
1028
When a process terminates with a system exit code set to 1,
1029
C{processEnded} is called with a L{error.ProcessTerminated} error,
1030
the C{exitCode} attribute reflecting the system exit code.
1032
exe = sys.executable
1034
d = defer.Deferred()
1035
p = TrivialProcessProtocol(d)
1036
reactor.spawnProcess(p, exe, [exe, '-c', 'import sys; sys.exit(1)'],
1037
env=None, usePTY=self.usePTY)
1040
p.reason.trap(error.ProcessTerminated)
1041
self.assertEquals(p.reason.value.exitCode, 1)
1042
self.assertEquals(p.reason.value.signal, None)
1043
d.addCallback(check)
1047
def _testSignal(self, sig):
1048
exe = sys.executable
1049
scriptPath = util.sibpath(__file__, "process_signal.py")
1050
d = defer.Deferred()
1051
p = SignalProtocol(d, sig)
1052
reactor.spawnProcess(p, exe, [exe, "-u", scriptPath], env=None,
1057
def test_signalHUP(self):
1059
Sending the SIGHUP signal to a running process interrupts it, and
1060
C{processEnded} is called with a L{error.ProcessTerminated} instance
1061
with the C{exitCode} set to C{None} and the C{signal} attribute set to
1062
C{signal.SIGHUP}. C{os.WTERMSIG} can also be used on the C{status}
1063
attribute to extract the signal value.
1065
return self._testSignal('HUP')
1068
def test_signalINT(self):
1070
Sending the SIGINT signal to a running process interrupts it, and
1071
C{processEnded} is called with a L{error.ProcessTerminated} instance
1072
with the C{exitCode} set to C{None} and the C{signal} attribute set to
1073
C{signal.SIGINT}. C{os.WTERMSIG} can also be used on the C{status}
1074
attribute to extract the signal value.
1076
return self._testSignal('INT')
1079
def test_signalKILL(self):
1081
Sending the SIGKILL signal to a running process interrupts it, and
1082
C{processEnded} is called with a L{error.ProcessTerminated} instance
1083
with the C{exitCode} set to C{None} and the C{signal} attribute set to
1084
C{signal.SIGKILL}. C{os.WTERMSIG} can also be used on the C{status}
1085
attribute to extract the signal value.
1087
return self._testSignal('KILL')
1090
def test_signalTERM(self):
1092
Sending the SIGTERM signal to a running process interrupts it, and
1093
C{processEnded} is called with a L{error.ProcessTerminated} instance
1094
with the C{exitCode} set to C{None} and the C{signal} attribute set to
1095
C{signal.SIGTERM}. C{os.WTERMSIG} can also be used on the C{status}
1096
attribute to extract the signal value.
1098
return self._testSignal('TERM')
1101
def test_childSignalHandling(self):
1103
The disposition of signals which are ignored in the parent
1104
process is reset to the default behavior for the child
1107
# Somewhat arbitrarily select SIGUSR1 here. It satisfies our
1108
# requirements that:
1109
# - The interpreter not fiddle around with the handler
1110
# behind our backs at startup time (this disqualifies
1111
# signals like SIGINT and SIGPIPE).
1112
# - The default behavior is to exit.
1114
# This lets us send the signal to the child and then verify
1115
# that it exits with a status code indicating that it was
1116
# indeed the signal which caused it to exit.
1117
which = signal.SIGUSR1
1119
# Ignore the signal in the parent (and make sure we clean it
1121
handler = signal.signal(which, signal.SIG_IGN)
1122
self.addCleanup(signal.signal, signal.SIGUSR1, handler)
1125
return self._testSignal(signal.SIGUSR1)
1128
def test_executionError(self):
1130
Raise an error during execvpe to check error management.
1132
cmd = self.getCommand('false')
1134
d = defer.Deferred()
1135
p = TrivialProcessProtocol(d)
1136
def buggyexecvpe(command, args, environment):
1137
raise RuntimeError("Ouch")
1138
oldexecvpe = os.execvpe
1139
os.execvpe = buggyexecvpe
1141
reactor.spawnProcess(p, cmd, ['false'], env=None,
1145
errData = "".join(p.errData + p.outData)
1146
self.assertIn("Upon execvpe", errData)
1147
self.assertIn("Ouch", errData)
1148
d.addCallback(check)
1150
os.execvpe = oldexecvpe
1154
def test_errorInProcessEnded(self):
1156
The handler which reaps a process is removed when the process is
1157
reaped, even if the protocol's C{processEnded} method raises an
1160
connected = defer.Deferred()
1161
ended = defer.Deferred()
1163
# This script runs until we disconnect its transport.
1164
pythonExecutable = sys.executable
1165
scriptPath = util.sibpath(__file__, "process_twisted.py")
1167
class ErrorInProcessEnded(protocol.ProcessProtocol):
1169
A protocol that raises an error in C{processEnded}.
1171
def makeConnection(self, transport):
1172
connected.callback(transport)
1174
def processEnded(self, reason):
1175
reactor.callLater(0, ended.callback, None)
1176
raise RuntimeError("Deliberate error")
1178
# Launch the process.
1179
reactor.spawnProcess(
1180
ErrorInProcessEnded(), pythonExecutable,
1181
[pythonExecutable, scriptPath],
1182
env=None, path=None)
1185
def cbConnected(transport):
1186
pid.append(transport.pid)
1187
# There's now a reap process handler registered.
1188
self.assertIn(transport.pid, process.reapProcessHandlers)
1190
# Kill the process cleanly, triggering an error in the protocol.
1191
transport.loseConnection()
1192
connected.addCallback(cbConnected)
1194
def checkTerminated(ignored):
1195
# The exception was logged.
1196
excs = self.flushLoggedErrors(RuntimeError)
1197
self.assertEqual(len(excs), 1)
1198
# The process is no longer scheduled for reaping.
1199
self.assertNotIn(pid[0], process.reapProcessHandlers)
1200
ended.addCallback(checkTerminated)
1206
class MockSignal(object):
1208
Neuter L{signal.signal}, but pass other attributes unscathed
1210
def signal(self, sig, action):
1211
return signal.getsignal(sig)
1213
def __getattr__(self, attr):
1214
return getattr(signal, attr)
1217
class MockOS(object):
1219
The mock OS: overwrite L{os}, L{fcntl} and {sys} functions with fake ones.
1221
@ivar exited: set to True when C{_exit} is called.
1222
@type exited: C{bool}
1224
@ivar O_RDWR: dumb value faking C{os.O_RDWR}.
1225
@type O_RDWR: C{int}
1227
@ivar O_NOCTTY: dumb value faking C{os.O_NOCTTY}.
1228
@type O_NOCTTY: C{int}
1230
@ivar WNOHANG: dumb value faking C{os.WNOHANG}.
1231
@type WNOHANG: C{int}
1233
@ivar raiseFork: if not C{None}, subsequent calls to fork will raise this
1235
@type raiseFork: C{NoneType} or C{Exception}
1237
@ivar raiseExec: if set, subsequent calls to execvpe will raise an error.
1238
@type raiseExec: C{bool}
1240
@ivar fdio: fake file object returned by calls to fdopen.
1241
@type fdio: C{StringIO.StringIO}
1243
@ivar actions: hold names of some actions executed by the object, in order
1246
@type actions: C{list} of C{str}
1248
@ivar closed: keep track of the file descriptor closed.
1249
@param closed: C{list} of C{int}
1251
@ivar child: whether fork return for the child or the parent.
1252
@type child: C{bool}
1254
@ivar pipeCount: count the number of time that C{os.pipe} has been called.
1255
@type pipeCount: C{int}
1257
@ivar raiseWaitPid: if set, subsequent calls to waitpid will raise an
1258
the error specified.
1259
@type raiseWaitPid: C{None} or a class
1261
@ivar waitChild: if set, subsequent calls to waitpid will return it.
1262
@type waitChild: C{None} or a tuple
1264
@ivar euid: the uid returned by the fake C{os.geteuid}
1267
@ivar egid: the gid returned by the fake C{os.getegid}
1270
@ivar seteuidCalls: stored results of C{os.seteuid} calls.
1271
@type seteuidCalls: C{list}
1273
@ivar setegidCalls: stored results of C{os.setegid} calls.
1274
@type setegidCalls: C{list}
1276
@ivar path: the path returned by C{os.path.expanduser}.
1292
Initialize data structures.
1300
self.WEXITSTATUS = lambda x: 0
1301
self.WIFEXITED = lambda x: 1
1302
self.seteuidCalls = []
1303
self.setegidCalls = []
1306
def open(self, dev, flags):
1308
Fake C{os.open}. Return a non fd number to be sure it's not used
1314
def fdopen(self, fd, flag):
1316
Fake C{os.fdopen}. Return a StringIO object whose content can be tested
1317
later via C{self.fdio}.
1319
self.fdio = StringIO.StringIO()
1325
Fake C{os.setsid}. Do nothing.
1331
Fake C{os.fork}. Save the action in C{self.actions}, and return 0 if
1332
C{self.child} is set, or a dumb number.
1334
self.actions.append(('fork', gc.isenabled()))
1335
if self.raiseFork is not None:
1336
raise self.raiseFork
1344
def close(self, fd):
1346
Fake C{os.close}, saving the closed fd in C{self.closed}.
1348
self.closed.append(fd)
1351
def dup2(self, fd1, fd2):
1353
Fake C{os.dup2}. Do nothing.
1357
def write(self, fd, data):
1359
Fake C{os.write}. Do nothing.
1363
def execvpe(self, command, args, env):
1365
Fake C{os.execvpe}. Save the action, and raise an error if
1366
C{self.raiseExec} is set.
1368
self.actions.append('exec')
1370
raise RuntimeError("Bar")
1375
Fake C{os.pipe}. Return non fd numbers to be sure it's not used
1376
elsewhere, and increment C{self.pipeCount}. This is used to uniquify
1380
return - 2 * self.pipeCount + 1, - 2 * self.pipeCount
1383
def ttyname(self, fd):
1385
Fake C{os.ttyname}. Return a dumb string.
1390
def _exit(self, code):
1392
Fake C{os._exit}. Save the action, set the C{self.exited} flag, and
1393
raise C{SystemError}.
1395
self.actions.append('exit')
1397
# Don't forget to raise an error, or you'll end up in parent
1402
def ioctl(self, fd, flags, arg):
1404
Override C{fcntl.ioctl}. Do nothing.
1408
def setNonBlocking(self, fd):
1410
Override C{fdesc.setNonBlocking}. Do nothing.
1414
def waitpid(self, pid, options):
1416
Override C{os.waitpid}. Return values meaning that the child process
1417
has exited, save executed action.
1419
self.actions.append('waitpid')
1420
if self.raiseWaitPid is not None:
1421
raise self.raiseWaitPid
1422
if self.waitChild is not None:
1423
return self.waitChild
1427
def settrace(self, arg):
1429
Override C{sys.settrace} to keep coverage working.
1435
Override C{os.getgid}. Return a dumb number.
1442
Override C{os.getuid}. Return a dumb number.
1447
def setuid(self, val):
1449
Override C{os.setuid}. Do nothing.
1451
self.actions.append(('setuid', val))
1454
def setgid(self, val):
1456
Override C{os.setgid}. Do nothing.
1458
self.actions.append(('setgid', val))
1461
def setregid(self, val1, val2):
1463
Override C{os.setregid}. Do nothing.
1465
self.actions.append(('setregid', val1, val2))
1468
def setreuid(self, val1, val2):
1470
Override C{os.setreuid}. Save the action.
1472
self.actions.append(('setreuid', val1, val2))
1475
def switchUID(self, uid, gid):
1477
Override C{util.switchuid}. Save the action.
1479
self.actions.append(('switchuid', uid, gid))
1484
Override C{pty.openpty}, returning fake file descriptors.
1491
Mock C{os.geteuid}, returning C{self.euid} instead.
1498
Mock C{os.getegid}, returning C{self.egid} instead.
1503
def seteuid(self, egid):
1505
Mock C{os.seteuid}, store result.
1507
self.seteuidCalls.append(egid)
1510
def setegid(self, egid):
1512
Mock C{os.setegid}, store result.
1514
self.setegidCalls.append(egid)
1517
def expanduser(self, path):
1519
Mock C{os.path.expanduser}.
1524
def getpwnam(self, user):
1526
Mock C{pwd.getpwnam}.
1532
if process is not None:
1533
class DumbProcessWriter(process.ProcessWriter):
1535
A fake L{process.ProcessWriter} used for tests.
1538
def startReading(self):
1540
Here's the faking: don't do anything here.
1545
class DumbProcessReader(process.ProcessReader):
1547
A fake L{process.ProcessReader} used for tests.
1550
def startReading(self):
1552
Here's the faking: don't do anything here.
1557
class DumbPTYProcess(process.PTYProcess):
1559
A fake L{process.PTYProcess} used for tests.
1562
def startReading(self):
1564
Here's the faking: don't do anything here.
1569
class MockProcessTestCase(unittest.TestCase):
1571
Mock a process runner to test forked child code path.
1574
skip = "twisted.internet.process is never used on Windows"
1578
Replace L{process} os, fcntl, sys, switchUID, fdesc and pty modules
1579
with the mock class L{MockOS}.
1582
self.addCleanup(gc.enable)
1584
self.addCleanup(gc.disable)
1585
self.mockos = MockOS()
1586
self.mockos.euid = 1236
1587
self.mockos.egid = 1234
1588
self.patch(process, "os", self.mockos)
1589
self.patch(process, "fcntl", self.mockos)
1590
self.patch(process, "sys", self.mockos)
1591
self.patch(process, "switchUID", self.mockos.switchUID)
1592
self.patch(process, "fdesc", self.mockos)
1593
self.patch(process.Process, "processReaderFactory", DumbProcessReader)
1594
self.patch(process.Process, "processWriterFactory", DumbProcessWriter)
1595
self.patch(process, "pty", self.mockos)
1597
self.mocksig = MockSignal()
1598
self.patch(process, "signal", self.mocksig)
1603
Reset processes registered for reap.
1605
process.reapProcessHandlers = {}
1608
def test_mockFork(self):
1610
Test a classic spawnProcess. Check the path of the client code:
1617
d = defer.Deferred()
1618
p = TrivialProcessProtocol(d)
1620
reactor.spawnProcess(p, cmd, ['ouch'], env=None,
1623
self.assert_(self.mockos.exited)
1625
self.mockos.actions, [("fork", False), "exec", "exit"])
1627
self.fail("Should not be here")
1629
# It should leave the garbage collector disabled.
1630
self.assertFalse(gc.isenabled())
1633
def _mockForkInParentTest(self):
1635
Assert that in the main process, spawnProcess disables the garbage
1636
collector, calls fork, closes the pipe file descriptors it created for
1637
the child process, and calls waitpid.
1639
self.mockos.child = False
1642
d = defer.Deferred()
1643
p = TrivialProcessProtocol(d)
1644
reactor.spawnProcess(p, cmd, ['ouch'], env=None,
1646
# It should close the first read pipe, and the 2 last write pipes
1647
self.assertEqual(set(self.mockos.closed), set([-1, -4, -6]))
1648
self.assertEquals(self.mockos.actions, [("fork", False), "waitpid"])
1651
def test_mockForkInParentGarbageCollectorEnabled(self):
1653
The garbage collector should be enabled when L{reactor.spawnProcess}
1654
returns if it was initially enabled.
1656
@see L{_mockForkInParentTest}
1659
self._mockForkInParentTest()
1660
self.assertTrue(gc.isenabled())
1663
def test_mockForkInParentGarbageCollectorDisabled(self):
1665
The garbage collector should be disabled when L{reactor.spawnProcess}
1666
returns if it was initially disabled.
1668
@see L{_mockForkInParentTest}
1671
self._mockForkInParentTest()
1672
self.assertFalse(gc.isenabled())
1675
def test_mockForkTTY(self):
1677
Test a TTY spawnProcess: check the path of the client code:
1682
d = defer.Deferred()
1683
p = TrivialProcessProtocol(d)
1685
reactor.spawnProcess(p, cmd, ['ouch'], env=None,
1688
self.assert_(self.mockos.exited)
1690
self.mockos.actions, [("fork", False), "exec", "exit"])
1692
self.fail("Should not be here")
1695
def _mockWithForkError(self):
1697
Assert that if the fork call fails, no other process setup calls are
1698
made and that spawnProcess raises the exception fork raised.
1700
self.mockos.raiseFork = OSError(errno.EAGAIN, None)
1701
protocol = TrivialProcessProtocol(None)
1702
self.assertRaises(OSError, reactor.spawnProcess, protocol, None)
1703
self.assertEqual(self.mockos.actions, [("fork", False)])
1706
def test_mockWithForkErrorGarbageCollectorEnabled(self):
1708
The garbage collector should be enabled when L{reactor.spawnProcess}
1709
raises because L{os.fork} raised, if it was initially enabled.
1712
self._mockWithForkError()
1713
self.assertTrue(gc.isenabled())
1716
def test_mockWithForkErrorGarbageCollectorDisabled(self):
1718
The garbage collector should be disabled when
1719
L{reactor.spawnProcess} raises because L{os.fork} raised, if it was
1723
self._mockWithForkError()
1724
self.assertFalse(gc.isenabled())
1727
def test_mockForkErrorCloseFDs(self):
1729
When C{os.fork} raises an exception, the file descriptors created
1730
before are closed and don't leak.
1732
self._mockWithForkError()
1733
self.assertEqual(set(self.mockos.closed), set([-1, -4, -6, -2, -3, -5]))
1736
def test_mockForkErrorGivenFDs(self):
1738
When C{os.forks} raises an exception and that file descriptors have
1739
been specified with the C{childFDs} arguments of
1740
L{reactor.spawnProcess}, they are not closed.
1742
self.mockos.raiseFork = OSError(errno.EAGAIN, None)
1743
protocol = TrivialProcessProtocol(None)
1744
self.assertRaises(OSError, reactor.spawnProcess, protocol, None,
1745
childFDs={0: -10, 1: -11, 2: -13})
1746
self.assertEqual(self.mockos.actions, [("fork", False)])
1747
self.assertEqual(self.mockos.closed, [])
1749
# We can also put "r" or "w" to let twisted create the pipes
1750
self.assertRaises(OSError, reactor.spawnProcess, protocol, None,
1751
childFDs={0: "r", 1: -11, 2: -13})
1752
self.assertEqual(set(self.mockos.closed), set([-1, -2]))
1755
def test_mockForkErrorClosePTY(self):
1757
When C{os.fork} raises an exception, the file descriptors created by
1758
C{pty.openpty} are closed and don't leak, when C{usePTY} is set to
1761
self.mockos.raiseFork = OSError(errno.EAGAIN, None)
1762
protocol = TrivialProcessProtocol(None)
1763
self.assertRaises(OSError, reactor.spawnProcess, protocol, None,
1765
self.assertEqual(self.mockos.actions, [("fork", False)])
1766
self.assertEqual(set(self.mockos.closed), set([-12, -13]))
1769
def test_mockForkErrorPTYGivenFDs(self):
1771
If a tuple is passed to C{usePTY} to specify slave and master file
1772
descriptors and that C{os.fork} raises an exception, these file
1773
descriptors aren't closed.
1775
self.mockos.raiseFork = OSError(errno.EAGAIN, None)
1776
protocol = TrivialProcessProtocol(None)
1777
self.assertRaises(OSError, reactor.spawnProcess, protocol, None,
1778
usePTY=(-20, -21, 'foo'))
1779
self.assertEqual(self.mockos.actions, [("fork", False)])
1780
self.assertEqual(self.mockos.closed, [])
1783
def test_mockWithExecError(self):
1785
Spawn a process but simulate an error during execution in the client
1786
path: C{os.execvpe} raises an error. It should close all the standard
1787
fds, try to print the error encountered, and exit cleanly.
1791
d = defer.Deferred()
1792
p = TrivialProcessProtocol(d)
1793
self.mockos.raiseExec = True
1795
reactor.spawnProcess(p, cmd, ['ouch'], env=None,
1798
self.assert_(self.mockos.exited)
1800
self.mockos.actions, [("fork", False), "exec", "exit"])
1801
# Check that fd have been closed
1802
self.assertIn(0, self.mockos.closed)
1803
self.assertIn(1, self.mockos.closed)
1804
self.assertIn(2, self.mockos.closed)
1805
# Check content of traceback
1806
self.assertIn("RuntimeError: Bar", self.mockos.fdio.getvalue())
1808
self.fail("Should not be here")
1811
def test_mockSetUid(self):
1813
Try creating a process with setting its uid: it's almost the same path
1814
as the standard path, but with a C{switchUID} call before the exec.
1818
d = defer.Deferred()
1819
p = TrivialProcessProtocol(d)
1821
reactor.spawnProcess(p, cmd, ['ouch'], env=None,
1822
usePTY=False, uid=8080)
1824
self.assert_(self.mockos.exited)
1825
self.assertEquals(self.mockos.actions,
1826
[('setuid', 0), ('setgid', 0), ('fork', False),
1827
('switchuid', 8080, 1234), 'exec', 'exit'])
1829
self.fail("Should not be here")
1832
def test_mockSetUidInParent(self):
1834
Try creating a process with setting its uid, in the parent path: it
1835
should switch to root before fork, then restore initial uid/gids.
1837
self.mockos.child = False
1840
d = defer.Deferred()
1841
p = TrivialProcessProtocol(d)
1842
reactor.spawnProcess(p, cmd, ['ouch'], env=None,
1843
usePTY=False, uid=8080)
1844
self.assertEquals(self.mockos.actions,
1845
[('setuid', 0), ('setgid', 0), ('fork', False),
1846
('setregid', 1235, 1234), ('setreuid', 1237, 1236), 'waitpid'])
1849
def test_mockPTYSetUid(self):
1851
Try creating a PTY process with setting its uid: it's almost the same
1852
path as the standard path, but with a C{switchUID} call before the
1857
d = defer.Deferred()
1858
p = TrivialProcessProtocol(d)
1860
reactor.spawnProcess(p, cmd, ['ouch'], env=None,
1861
usePTY=True, uid=8081)
1863
self.assert_(self.mockos.exited)
1864
self.assertEquals(self.mockos.actions,
1865
[('setuid', 0), ('setgid', 0), ('fork', False),
1866
('switchuid', 8081, 1234), 'exec', 'exit'])
1868
self.fail("Should not be here")
1871
def test_mockPTYSetUidInParent(self):
1873
Try creating a PTY process with setting its uid, in the parent path: it
1874
should switch to root before fork, then restore initial uid/gids.
1876
self.mockos.child = False
1879
d = defer.Deferred()
1880
p = TrivialProcessProtocol(d)
1881
oldPTYProcess = process.PTYProcess
1883
process.PTYProcess = DumbPTYProcess
1884
reactor.spawnProcess(p, cmd, ['ouch'], env=None,
1885
usePTY=True, uid=8080)
1887
process.PTYProcess = oldPTYProcess
1888
self.assertEquals(self.mockos.actions,
1889
[('setuid', 0), ('setgid', 0), ('fork', False),
1890
('setregid', 1235, 1234), ('setreuid', 1237, 1236), 'waitpid'])
1893
def test_mockWithWaitError(self):
1895
Test that reapProcess logs errors raised.
1897
self.mockos.child = False
1899
self.mockos.waitChild = (0, 0)
1901
d = defer.Deferred()
1902
p = TrivialProcessProtocol(d)
1903
proc = reactor.spawnProcess(p, cmd, ['ouch'], env=None,
1905
self.assertEquals(self.mockos.actions, [("fork", False), "waitpid"])
1907
self.mockos.raiseWaitPid = OSError()
1909
errors = self.flushLoggedErrors()
1910
self.assertEquals(len(errors), 1)
1911
errors[0].trap(OSError)
1914
def test_mockErrorECHILDInReapProcess(self):
1916
Test that reapProcess doesn't log anything when waitpid raises a
1917
C{OSError} with errno C{ECHILD}.
1919
self.mockos.child = False
1921
self.mockos.waitChild = (0, 0)
1923
d = defer.Deferred()
1924
p = TrivialProcessProtocol(d)
1925
proc = reactor.spawnProcess(p, cmd, ['ouch'], env=None,
1927
self.assertEquals(self.mockos.actions, [("fork", False), "waitpid"])
1929
self.mockos.raiseWaitPid = OSError()
1930
self.mockos.raiseWaitPid.errno = errno.ECHILD
1931
# This should not produce any errors
1935
def test_mockErrorInPipe(self):
1937
If C{os.pipe} raises an exception after some pipes where created, the
1938
created pipes are closed and don't leak.
1940
pipes = [-1, -2, -3, -4]
1943
return pipes.pop(0), pipes.pop(0)
1946
self.mockos.pipe = pipe
1947
protocol = TrivialProcessProtocol(None)
1948
self.assertRaises(OSError, reactor.spawnProcess, protocol, None)
1949
self.assertEqual(self.mockos.actions, [])
1950
self.assertEqual(set(self.mockos.closed), set([-4, -3, -2, -1]))
1953
def test_mockErrorInForkRestoreUID(self):
1955
If C{os.fork} raises an exception and a UID change has been made, the
1956
previous UID and GID are restored.
1958
self.mockos.raiseFork = OSError(errno.EAGAIN, None)
1959
protocol = TrivialProcessProtocol(None)
1960
self.assertRaises(OSError, reactor.spawnProcess, protocol, None,
1962
self.assertEqual(self.mockos.actions,
1963
[('setuid', 0), ('setgid', 0), ("fork", False),
1964
('setregid', 1235, 1234), ('setreuid', 1237, 1236)])
1968
class PosixProcessTestCase(unittest.TestCase, PosixProcessBase):
1969
# add two non-pty test cases
1971
def testStderr(self):
1972
# we assume there is no file named ZZXXX..., both in . and in /tmp
1973
cmd = self.getCommand('ls')
1976
d = p.endedDeferred = defer.Deferred()
1977
reactor.spawnProcess(p, cmd,
1979
"ZZXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"],
1980
env=None, path="/tmp",
1983
def processEnded(ign):
1984
self.assertEquals(lsOut, p.errF.getvalue())
1985
return d.addCallback(processEnded)
1987
def testProcess(self):
1988
cmd = self.getCommand('gzip')
1989
s = "there's no place like home!\n" * 3
1991
d = p.endedDeferred = defer.Deferred()
1992
reactor.spawnProcess(p, cmd, [cmd, "-c"], env=None, path="/tmp",
1994
p.transport.write(s)
1995
p.transport.closeStdin()
1997
def processEnded(ign):
2000
gf = gzip.GzipFile(fileobj=f)
2001
self.assertEquals(gf.read(), s)
2002
return d.addCallback(processEnded)
2006
class PosixProcessTestCasePTY(unittest.TestCase, PosixProcessBase):
2008
Just like PosixProcessTestCase, but use ptys instead of pipes.
2011
# PTYs only offer one input and one output. What still makes sense?
2012
# testNormalTermination
2013
# test_abnormalTermination
2015
# testProcess, but not without p.transport.closeStdin
2016
# might be solveable: TODO: add test if so
2018
def testOpeningTTY(self):
2019
exe = sys.executable
2020
scriptPath = util.sibpath(__file__, "process_tty.py")
2022
d = p.endedDeferred = defer.Deferred()
2023
reactor.spawnProcess(p, exe, [exe, "-u", scriptPath], env=None,
2024
path=None, usePTY=self.usePTY)
2025
p.transport.write("hello world!\n")
2027
def processEnded(ign):
2029
error.ProcessExitedAlready, p.transport.signalProcess, 'HUP')
2032
"hello world!\r\nhello world!\r\n",
2033
"Error message from process_tty follows:\n\n%s\n\n" % p.outF.getvalue())
2034
return d.addCallback(processEnded)
2037
def testBadArgs(self):
2038
pyExe = sys.executable
2039
pyArgs = [pyExe, "-u", "-c", "print 'hello'"]
2041
self.assertRaises(ValueError, reactor.spawnProcess, p, pyExe, pyArgs,
2042
usePTY=1, childFDs={1:'r'})
2046
class Win32SignalProtocol(SignalProtocol):
2048
A win32-specific process protocol that handles C{processEnded}
2049
differently: processes should exit with exit code 1.
2052
def processEnded(self, reason):
2054
Callback C{self.deferred} with C{None} if C{reason} is a
2055
L{error.ProcessTerminated} failure with C{exitCode} set to 1.
2056
Otherwise, errback with a C{ValueError} describing the problem.
2058
if not reason.check(error.ProcessTerminated):
2059
return self.deferred.errback(
2060
ValueError("wrong termination: %s" % (reason,)))
2063
return self.deferred.errback(
2064
ValueError("Wrong exit code: %s" % (reason.exitCode,)))
2065
self.deferred.callback(None)
2069
class Win32ProcessTestCase(unittest.TestCase):
2071
Test process programs that are packaged with twisted.
2074
def testStdinReader(self):
2075
pyExe = sys.executable
2076
scriptPath = util.sibpath(__file__, "process_stdinreader.py")
2078
d = p.endedDeferred = defer.Deferred()
2079
reactor.spawnProcess(p, pyExe, [pyExe, "-u", scriptPath], env=None,
2081
p.transport.write("hello, world")
2082
p.transport.closeStdin()
2084
def processEnded(ign):
2085
self.assertEquals(p.errF.getvalue(), "err\nerr\n")
2086
self.assertEquals(p.outF.getvalue(), "out\nhello, world\nout\n")
2087
return d.addCallback(processEnded)
2090
def testBadArgs(self):
2091
pyExe = sys.executable
2092
pyArgs = [pyExe, "-u", "-c", "print 'hello'"]
2094
self.assertRaises(ValueError,
2095
reactor.spawnProcess, p, pyExe, pyArgs, uid=1)
2096
self.assertRaises(ValueError,
2097
reactor.spawnProcess, p, pyExe, pyArgs, gid=1)
2098
self.assertRaises(ValueError,
2099
reactor.spawnProcess, p, pyExe, pyArgs, usePTY=1)
2100
self.assertRaises(ValueError,
2101
reactor.spawnProcess, p, pyExe, pyArgs, childFDs={1:'r'})
2104
def _testSignal(self, sig):
2105
exe = sys.executable
2106
scriptPath = util.sibpath(__file__, "process_signal.py")
2107
d = defer.Deferred()
2108
p = Win32SignalProtocol(d, sig)
2109
reactor.spawnProcess(p, exe, [exe, "-u", scriptPath], env=None)
2113
def test_signalTERM(self):
2115
Sending the SIGTERM signal terminates a created process, and
2116
C{processEnded} is called with a L{error.ProcessTerminated} instance
2117
with the C{exitCode} attribute set to 1.
2119
return self._testSignal('TERM')
2122
def test_signalINT(self):
2124
Sending the SIGINT signal terminates a created process, and
2125
C{processEnded} is called with a L{error.ProcessTerminated} instance
2126
with the C{exitCode} attribute set to 1.
2128
return self._testSignal('INT')
2131
def test_signalKILL(self):
2133
Sending the SIGKILL signal terminates a created process, and
2134
C{processEnded} is called with a L{error.ProcessTerminated} instance
2135
with the C{exitCode} attribute set to 1.
2137
return self._testSignal('KILL')
2140
def test_closeHandles(self):
2142
The win32 handles should be properly closed when the process exits.
2146
connected = defer.Deferred()
2147
ended = defer.Deferred()
2149
class SimpleProtocol(protocol.ProcessProtocol):
2151
A protocol that fires deferreds when connected and disconnected.
2153
def makeConnection(self, transport):
2154
connected.callback(transport)
2156
def processEnded(self, reason):
2157
ended.callback(None)
2159
p = SimpleProtocol()
2161
pyExe = sys.executable
2162
pyArgs = [pyExe, "-u", "-c", "print 'hello'"]
2163
proc = reactor.spawnProcess(p, pyExe, pyArgs)
2165
def cbConnected(transport):
2166
self.assertIdentical(transport, proc)
2167
# perform a basic validity test on the handles
2168
win32api.GetHandleInformation(proc.hProcess)
2169
win32api.GetHandleInformation(proc.hThread)
2170
# And save their values for later
2171
self.hProcess = proc.hProcess
2172
self.hThread = proc.hThread
2173
connected.addCallback(cbConnected)
2175
def checkTerminated(ignored):
2176
# The attributes on the process object must be reset...
2177
self.assertIdentical(proc.pid, None)
2178
self.assertIdentical(proc.hProcess, None)
2179
self.assertIdentical(proc.hThread, None)
2180
# ...and the handles must be closed.
2181
self.assertRaises(win32api.error,
2182
win32api.GetHandleInformation, self.hProcess)
2183
self.assertRaises(win32api.error,
2184
win32api.GetHandleInformation, self.hThread)
2185
ended.addCallback(checkTerminated)
2187
return defer.gatherResults([connected, ended])
2191
class Dumbwin32procPidTest(unittest.TestCase):
2193
Simple test for the pid attribute of Process on win32.
2198
Launch process with mock win32process. The only mock aspect of this
2199
module is that the pid of the process created will always be 42.
2201
from twisted.internet import _dumbwin32proc
2202
from twisted.test import mock_win32process
2203
self.patch(_dumbwin32proc, "win32process", mock_win32process)
2204
exe = sys.executable
2205
scriptPath = util.sibpath(__file__, "process_cmdline.py")
2207
d = defer.Deferred()
2208
processProto = TrivialProcessProtocol(d)
2209
comspec = str(os.environ["COMSPEC"])
2210
cmd = [comspec, "/c", exe, scriptPath]
2212
p = _dumbwin32proc.Process(reactor,
2218
self.assertEquals(42, p.pid)
2219
self.assertEquals("<Process pid=42>", repr(p))
2221
def pidCompleteCb(result):
2222
self.assertEquals(None, p.pid)
2223
return d.addCallback(pidCompleteCb)
2227
class UtilTestCase(unittest.TestCase):
2229
Tests for process-related helper functions (currently only
2234
Create several directories and files, some of which are executable
2235
and some of which are not. Save the current PATH setting.
2239
base = self.mktemp()
2241
self.foo = j(base, "foo")
2242
self.baz = j(base, "baz")
2243
self.foobar = j(self.foo, "bar")
2244
self.foobaz = j(self.foo, "baz")
2245
self.bazfoo = j(self.baz, "foo")
2246
self.bazbar = j(self.baz, "bar")
2248
for d in self.foobar, self.foobaz, self.bazfoo, self.bazbar:
2251
for name, mode in [(j(self.foobaz, "executable"), 0700),
2252
(j(self.foo, "executable"), 0700),
2253
(j(self.bazfoo, "executable"), 0700),
2254
(j(self.bazfoo, "executable.bin"), 0700),
2255
(j(self.bazbar, "executable"), 0)]:
2258
os.chmod(name, mode)
2260
self.oldPath = os.environ.get('PATH', None)
2261
os.environ['PATH'] = os.pathsep.join((
2262
self.foobar, self.foobaz, self.bazfoo, self.bazbar))
2267
Restore the saved PATH setting, and set all created files readable
2268
again so that they can be deleted easily.
2270
os.chmod(os.path.join(self.bazbar, "executable"), stat.S_IWUSR)
2271
if self.oldPath is None:
2273
del os.environ['PATH']
2277
os.environ['PATH'] = self.oldPath
2280
def test_whichWithoutPATH(self):
2282
Test that if C{os.environ} does not have a C{'PATH'} key,
2283
L{procutils.which} returns an empty list.
2285
del os.environ['PATH']
2286
self.assertEqual(procutils.which("executable"), [])
2289
def testWhich(self):
2291
paths = procutils.which("executable")
2292
expectedPaths = [j(self.foobaz, "executable"),
2293
j(self.bazfoo, "executable")]
2294
if runtime.platform.isWindows():
2295
expectedPaths.append(j(self.bazbar, "executable"))
2296
self.assertEquals(paths, expectedPaths)
2299
def testWhichPathExt(self):
2301
old = os.environ.get('PATHEXT', None)
2302
os.environ['PATHEXT'] = os.pathsep.join(('.bin', '.exe', '.sh'))
2304
paths = procutils.which("executable")
2307
del os.environ['PATHEXT']
2309
os.environ['PATHEXT'] = old
2310
expectedPaths = [j(self.foobaz, "executable"),
2311
j(self.bazfoo, "executable"),
2312
j(self.bazfoo, "executable.bin")]
2313
if runtime.platform.isWindows():
2314
expectedPaths.append(j(self.bazbar, "executable"))
2315
self.assertEquals(paths, expectedPaths)
2319
class ClosingPipesProcessProtocol(protocol.ProcessProtocol):
2323
def __init__(self, outOrErr):
2324
self.deferred = defer.Deferred()
2325
self.outOrErr = outOrErr
2327
def processEnded(self, reason):
2328
self.deferred.callback(reason)
2330
def outReceived(self, data):
2333
def errReceived(self, data):
2337
class ClosingPipes(unittest.TestCase):
2340
p = ClosingPipesProcessProtocol(True)
2341
p.deferred.addCallbacks(
2342
callback=lambda _: self.fail("I wanted an errback."),
2343
errback=self._endProcess, errbackArgs=(p,))
2344
reactor.spawnProcess(p, sys.executable,
2345
[sys.executable, '-u', '-c',
2346
r'raw_input(); import sys, os; os.write(%d, "foo\n"); sys.exit(42)' % fd],
2348
p.transport.write('go\n')
2351
p.transport.closeStdout()
2353
p.transport.closeStderr()
2357
# make the buggy case not hang
2358
p.transport.closeStdin()
2361
def _endProcess(self, reason, p):
2362
self.failIf(reason.check(error.ProcessDone),
2363
'Child should fail due to EPIPE.')
2364
reason.trap(error.ProcessTerminated)
2365
# child must not get past that write without raising
2366
self.failIfEqual(reason.value.exitCode, 42,
2367
'process reason was %r' % reason)
2368
self.failUnlessEqual(p.output, '')
2371
def test_stdout(self):
2372
"""ProcessProtocol.transport.closeStdout actually closes the pipe."""
2375
self.failIfEqual(errput.find('OSError'), -1)
2376
if runtime.platform.getType() != 'win32':
2377
self.failIfEqual(errput.find('Broken pipe'), -1)
2378
d.addCallback(_check)
2381
def test_stderr(self):
2382
"""ProcessProtocol.transport.closeStderr actually closes the pipe."""
2385
# there should be no stderr open, so nothing for it to
2386
# write the error to.
2387
self.failUnlessEqual(errput, '')
2388
d.addCallback(_check)
2392
skipMessage = "wrong platform or reactor doesn't support IReactorProcess"
2393
if (runtime.platform.getType() != 'posix') or (not interfaces.IReactorProcess(reactor, None)):
2394
PosixProcessTestCase.skip = skipMessage
2395
PosixProcessTestCasePTY.skip = skipMessage
2396
TestTwoProcessesPosix.skip = skipMessage
2397
FDTest.skip = skipMessage
2399
# do this before running the tests: it uses SIGCHLD and stuff internally
2400
lsOut = popen2.popen3("/bin/ls ZZXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX")[2].read()
2402
if (runtime.platform.getType() != 'win32') or (not interfaces.IReactorProcess(reactor, None)):
2403
Win32ProcessTestCase.skip = skipMessage
2404
TestTwoProcessesNonPosix.skip = skipMessage
2405
Dumbwin32procPidTest.skip = skipMessage
2407
if not interfaces.IReactorProcess(reactor, None):
2408
ProcessTestCase.skip = skipMessage
2409
ClosingPipes.skip = skipMessage