1
# -*- test-case-name: twisted.test.test_process -*-
2
# Copyright (c) 2001-2010 Twisted Matrix Laboratories.
3
# See LICENSE for details.
6
UNIX Process management.
8
Do NOT use this module directly - use reactor.spawnProcess() instead.
10
Maintainer: Itamar Shtull-Trauring
14
import gc, os, sys, traceback, select, signal, errno
26
from twisted.python import log, failure
27
from twisted.python.util import switchUID
28
from twisted.internet import fdesc, abstract, error
29
from twisted.internet.main import CONNECTION_LOST, CONNECTION_DONE
30
from twisted.internet._baseprocess import BaseProcess
32
# Some people were importing this, which is incorrect, just keeping it
33
# here for backwards compatibility:
34
ProcessExitedAlready = error.ProcessExitedAlready
36
reapProcessHandlers = {}
38
def reapAllProcesses():
40
Reap all registered processes.
42
for process in reapProcessHandlers.values():
46
def registerReapProcessHandler(pid, process):
48
Register a process handler for the given pid, in case L{reapAllProcesses}
51
@param pid: the pid of the process.
52
@param process: a process handler.
54
if pid in reapProcessHandlers:
55
raise RuntimeError("Try to register an already registered process.")
57
auxPID, status = os.waitpid(pid, os.WNOHANG)
59
log.msg('Failed to reap %d:' % pid)
63
process.processEnded(status)
65
# if auxPID is 0, there are children but none have exited
66
reapProcessHandlers[pid] = process
69
def unregisterReapProcessHandler(pid, process):
71
Unregister a process handler previously registered with
72
L{registerReapProcessHandler}.
74
if not (pid in reapProcessHandlers
75
and reapProcessHandlers[pid] == process):
76
raise RuntimeError("Try to unregister a process not registered.")
77
del reapProcessHandlers[pid]
80
def detectLinuxBrokenPipeBehavior():
82
On some Linux version, write-only pipe are detected as readable. This
83
function is here to check if this bug is present or not.
85
See L{ProcessWriter.doRead} for a more detailed explanation.
87
global brokenLinuxPipeBehavior
90
reads, writes, exes = select.select([w], [], [], 0)
92
# Linux < 2.6.11 says a write-only pipe is readable.
93
brokenLinuxPipeBehavior = True
95
brokenLinuxPipeBehavior = False
100
detectLinuxBrokenPipeBehavior()
103
class ProcessWriter(abstract.FileDescriptor):
105
(Internal) Helper class to write into a Process's input pipe.
107
I am a helper which describes a selectable asynchronous writer to a
108
process's input pipe, including stdin.
112
enableReadHack = False
114
def __init__(self, reactor, proc, name, fileno, forceReadHack=False):
116
Initialize, specifying a Process instance to connect to.
118
abstract.FileDescriptor.__init__(self, reactor)
119
fdesc.setNonBlocking(fileno)
125
self.enableReadHack = True
127
# Detect if this fd is actually a write-only fd. If it's
128
# valid to read, don't try to detect closing via read.
129
# This really only means that we cannot detect a TTY's write
132
os.read(self.fileno(), 0)
134
# It's a write-only pipe end, enable hack
135
self.enableReadHack = True
137
if self.enableReadHack:
142
Return the fileno() of my process's stdin.
146
def writeSomeData(self, data):
148
Write some data to the open process.
150
rv = fdesc.writeToFD(self.fd, data)
151
if rv == len(data) and self.enableReadHack:
155
def write(self, data):
157
abstract.FileDescriptor.write(self, data)
161
The only way a write pipe can become "readable" is at EOF, because the
162
child has closed it, and we're using a reactor which doesn't
163
distinguish between readable and closed (such as the select reactor).
165
Except that's not true on linux < 2.6.11. It has the following
166
characteristics: write pipe is completely empty => POLLOUT (writable in
167
select), write pipe is not completely empty => POLLIN (readable in
168
select), write pipe's reader closed => POLLIN|POLLERR (readable and
171
That's what this funky code is for. If linux was not broken, this
172
function could be simply "return CONNECTION_LOST".
174
BUG: We call select no matter what the reactor.
175
If the reactor is pollreactor, and the fd is > 1024, this will fail.
176
(only occurs on broken versions of linux, though).
178
if self.enableReadHack:
179
if brokenLinuxPipeBehavior:
181
r, w, x = select.select([fd], [fd], [], 0)
183
return CONNECTION_LOST
185
return CONNECTION_LOST
189
def connectionLost(self, reason):
191
See abstract.FileDescriptor.connectionLost.
193
# At least on OS X 10.4, exiting while stdout is non-blocking can
194
# result in data loss. For some reason putting the file descriptor
195
# back into blocking mode seems to resolve this issue.
196
fdesc.setBlocking(self.fd)
198
abstract.FileDescriptor.connectionLost(self, reason)
199
self.proc.childConnectionLost(self.name, reason)
203
class ProcessReader(abstract.FileDescriptor):
207
I am a selectable representation of a process's output pipe, such as
212
def __init__(self, reactor, proc, name, fileno):
214
Initialize, specifying a process to connect to.
216
abstract.FileDescriptor.__init__(self, reactor)
217
fdesc.setNonBlocking(fileno)
225
Return the fileno() of my process's stderr.
229
def writeSomeData(self, data):
230
# the only time this is actually called is after .loseConnection Any
231
# actual write attempt would fail, so we must avoid that. This hack
232
# allows us to use .loseConnection on both readers and writers.
234
return CONNECTION_LOST
238
This is called when the pipe becomes readable.
240
return fdesc.readFromFD(self.fd, self.dataReceived)
242
def dataReceived(self, data):
243
self.proc.childDataReceived(self.name, data)
245
def loseConnection(self):
246
if self.connected and not self.disconnecting:
247
self.disconnecting = 1
249
self.reactor.callLater(0, self.connectionLost,
250
failure.Failure(CONNECTION_DONE))
252
def connectionLost(self, reason):
254
Close my end of the pipe, signal the Process (which signals the
257
abstract.FileDescriptor.connectionLost(self, reason)
258
self.proc.childConnectionLost(self.name, reason)
261
class _BaseProcess(BaseProcess, object):
263
Base class for Process and PTYProcess.
268
def reapProcess(self):
270
Try to reap a process (without blocking) via waitpid.
272
This is called when sigchild is caught or a Process object loses its
273
"connection" (stdout is closed) This ought to result in reaping all
274
zombie processes, since it will be called twice as often as it needs
277
(Unfortunately, this is a slightly experimental approach, since
278
UNIX has no way to be really sure that your process is going to
279
go away w/o blocking. I don't want to block.)
283
pid, status = os.waitpid(self.pid, os.WNOHANG)
285
if e.errno == errno.ECHILD:
291
log.msg('Failed to reap %d:' % self.pid)
295
self.processEnded(status)
296
unregisterReapProcessHandler(pid, self)
299
def _getReason(self, status):
300
exitCode = sig = None
301
if os.WIFEXITED(status):
302
exitCode = os.WEXITSTATUS(status)
304
sig = os.WTERMSIG(status)
306
return error.ProcessTerminated(exitCode, sig, status)
307
return error.ProcessDone(status)
310
def signalProcess(self, signalID):
312
Send the given signal C{signalID} to the process. It'll translate a
313
few signals ('HUP', 'STOP', 'INT', 'KILL', 'TERM') from a string
314
representation to its int value, otherwise it'll pass directly the
317
@type signalID: C{str} or C{int}
319
if signalID in ('HUP', 'STOP', 'INT', 'KILL', 'TERM'):
320
signalID = getattr(signal, 'SIG%s' % (signalID,))
322
raise ProcessExitedAlready()
323
os.kill(self.pid, signalID)
326
def _resetSignalDisposition(self):
327
# The Python interpreter ignores some signals, and our child
328
# process will inherit that behaviour. To have a child process
329
# that responds to signals normally, we need to reset our
330
# child process's signal handling (just) after we fork and
332
for signalnum in range(1, signal.NSIG):
333
if signal.getsignal(signalnum) == signal.SIG_IGN:
334
# Reset signal handling to the default
335
signal.signal(signalnum, signal.SIG_DFL)
338
def _fork(self, path, uid, gid, executable, args, environment, **kwargs):
340
Fork and then exec sub-process.
342
@param path: the path where to run the new process.
344
@param uid: if defined, the uid used to run the new process.
346
@param gid: if defined, the gid used to run the new process.
348
@param executable: the executable to run in a new process.
349
@type executable: C{str}
350
@param args: arguments used to create the new process.
352
@param environment: environment used for the new process.
353
@type environment: C{dict}.
354
@param kwargs: keyword arguments to L{_setupChild} method.
356
settingUID = (uid is not None) or (gid is not None)
358
curegid = os.getegid()
359
currgid = os.getgid()
360
cureuid = os.geteuid()
361
curruid = os.getuid()
366
# prepare to change UID in subprocess
370
collectorEnabled = gc.isenabled()
375
# Still in the parent process
377
os.setregid(currgid, curegid)
378
os.setreuid(curruid, cureuid)
383
if self.pid == 0: # pid is 0 in the child process
384
# do not put *ANY* code outside the try block. The child process
385
# must either exec or _exit. If it gets outside this block (due
386
# to an exception that is not handled here, but which might be
387
# handled higher up), there will be two copies of the parent
388
# running in parallel, doing all kinds of damage.
390
# After each change to this code, review it to make sure there
393
# Stop debugging. If I am, I don't care anymore.
395
self._setupChild(**kwargs)
396
self._execChild(path, settingUID, uid, gid,
397
executable, args, environment)
399
# If there are errors, bail and try to write something
400
# descriptive to stderr.
401
# XXX: The parent's stderr isn't necessarily fd 2 anymore, or
402
# even still available
403
# XXXX: however even libc assumes write(2, err) is a useful
406
stderr = os.fdopen(2, 'w')
407
stderr.write("Upon execvpe %s %s in environment %s\n:" %
408
(executable, str(args),
409
"id %s" % id(environment)))
410
traceback.print_exc(file=stderr)
415
pass # make *sure* the child terminates
416
# Did you read the comment about not adding code here?
419
# we are now in parent process
421
os.setregid(currgid, curegid)
422
os.setreuid(curruid, cureuid)
425
self.status = -1 # this records the exit status of the child
427
def _setupChild(self, *args, **kwargs):
429
Setup the child process. Override in subclasses.
431
raise NotImplementedError()
433
def _execChild(self, path, settingUID, uid, gid,
434
executable, args, environment):
436
The exec() which is done in the forked child.
440
# set the UID before I actually exec the process
443
os.execvpe(executable, args, environment)
447
String representation of a process.
449
return "<%s pid=%s status=%s>" % (self.__class__.__name__,
450
self.pid, self.status)
452
class Process(_BaseProcess):
454
An operating-system Process.
456
This represents an operating-system process with arbitrary input/output
457
pipes connected to it. Those pipes may represent standard input,
458
standard output, and standard error, or any other file descriptor.
460
On UNIX, this is implemented using fork(), exec(), pipe()
461
and fcntl(). These calls may not exist elsewhere so this
462
code is not cross-platform. (also, windows can only select
472
processWriterFactory = ProcessWriter
473
processReaderFactory = ProcessReader
476
reactor, executable, args, environment, path, proto,
477
uid=None, gid=None, childFDs=None):
479
Spawn an operating-system process.
481
This is where the hard work of disconnecting all currently open
482
files / forking / executing the new process happens. (This is
483
executed automatically when a Process is instantiated.)
485
This will also run the subprocess as a given user ID and group ID, if
486
specified. (Implementation Note: this doesn't support all the arcane
487
nuances of setXXuid on UNIX: it will assume that either your effective
491
assert 'r' not in childFDs.values()
492
assert 'w' not in childFDs.values()
493
_BaseProcess.__init__(self, proto)
496
# keys are childFDs, we can sense them closing
497
# values are ProcessReader/ProcessWriters
501
# values are parentFDs
504
childFDs = {0: "w", # we write to the child's stdin
505
1: "r", # we read from their stdout
506
2: "r", # and we read from their stderr
510
if debug: print "childFDs", childFDs
515
_openedPipes.extend([r, w])
518
# fdmap.keys() are filenos of pipes that are used by the child.
519
fdmap = {} # maps childFD to parentFD
521
for childFD, target in childFDs.items():
522
if debug: print "[%d]" % childFD, target
524
# we need a pipe that the parent can read from
525
readFD, writeFD = pipe()
526
if debug: print "readFD=%d, writeFD=%d" % (readFD, writeFD)
527
fdmap[childFD] = writeFD # child writes to this
528
helpers[childFD] = readFD # parent reads from this
530
# we need a pipe that the parent can write to
531
readFD, writeFD = pipe()
532
if debug: print "readFD=%d, writeFD=%d" % (readFD, writeFD)
533
fdmap[childFD] = readFD # child reads from this
534
helpers[childFD] = writeFD # parent writes to this
536
assert type(target) == int, '%r should be an int' % (target,)
537
fdmap[childFD] = target # parent ignores this
538
if debug: print "fdmap", fdmap
539
if debug: print "helpers", helpers
540
# the child only cares about fdmap.values()
542
self._fork(path, uid, gid, executable, args, environment, fdmap=fdmap)
544
map(os.close, _openedPipes)
547
# we are the parent process:
550
# arrange for the parent-side pipes to be read and written
551
for childFD, parentFD in helpers.items():
552
os.close(fdmap[childFD])
554
if childFDs[childFD] == "r":
555
reader = self.processReaderFactory(reactor, self, childFD,
557
self.pipes[childFD] = reader
559
if childFDs[childFD] == "w":
560
writer = self.processWriterFactory(reactor, self, childFD,
561
parentFD, forceReadHack=True)
562
self.pipes[childFD] = writer
565
# the 'transport' is used for some compatibility methods
566
if self.proto is not None:
567
self.proto.makeConnection(self)
571
# The reactor might not be running yet. This might call back into
572
# processEnded synchronously, triggering an application-visible
573
# callback. That's probably not ideal. The replacement API for
574
# spawnProcess should improve upon this situation.
575
registerReapProcessHandler(self.pid, self)
578
def _setupChild(self, fdmap):
580
fdmap[childFD] = parentFD
582
The child wants to end up with 'childFD' attached to what used to be
583
the parent's parentFD. As an example, a bash command run like
584
'command 2>&1' would correspond to an fdmap of {0:0, 1:1, 2:1}.
585
'command >foo.txt' would be {0:0, 1:os.open('foo.txt'), 2:2}.
587
This is accomplished in two steps::
589
1. close all file descriptors that aren't values of fdmap. This
592
2. for each childFD::
594
- if fdmap[childFD] == childFD, the descriptor is already in
595
place. Make sure the CLOEXEC flag is not set, then delete
596
the entry from fdmap.
598
- if childFD is in fdmap.values(), then the target descriptor
599
is busy. Use os.dup() to move it elsewhere, update all
600
fdmap[childFD] items that point to it, then close the
601
original. Then fall through to the next case.
603
- now fdmap[childFD] is not in fdmap.values(), and is free.
604
Use os.dup2() to move it to the right place, then close the
608
debug = self.debug_child
611
errfd.write("starting _setupChild\n")
613
destList = fdmap.values()
616
maxfds = resource.getrlimit(resource.RLIMIT_NOFILE)[1] + 1
617
# OS-X reports 9223372036854775808. That's a lot of fds to close
623
for fd in xrange(maxfds):
626
if debug and fd == errfd.fileno():
633
# at this point, the only fds still open are the ones that need to
634
# be moved to their appropriate positions in the child (the targets
635
# of fdmap, i.e. fdmap.values() )
637
if debug: print >>errfd, "fdmap", fdmap
638
childlist = fdmap.keys()
641
for child in childlist:
642
target = fdmap[child]
644
# fd is already in place
645
if debug: print >>errfd, "%d already in place" % target
646
fdesc._unsetCloseOnExec(child)
648
if child in fdmap.values():
649
# we can't replace child-fd yet, as some other mapping
650
# still needs the fd it wants to target. We must preserve
651
# that old fd by duping it to a new home.
652
newtarget = os.dup(child) # give it a safe home
653
if debug: print >>errfd, "os.dup(%d) -> %d" % (child,
655
os.close(child) # close the original
656
for c, p in fdmap.items():
658
fdmap[c] = newtarget # update all pointers
659
# now it should be available
660
if debug: print >>errfd, "os.dup2(%d,%d)" % (target, child)
661
os.dup2(target, child)
663
# At this point, the child has everything it needs. We want to close
664
# everything that isn't going to be used by the child, i.e.
665
# everything not in fdmap.keys(). The only remaining fds open are
666
# those in fdmap.values().
668
# Any given fd may appear in fdmap.values() multiple times, so we
669
# need to remove duplicates first.
672
for fd in fdmap.values():
674
if not fd in fdmap.keys():
676
if debug: print >>errfd, "old", old
680
self._resetSignalDisposition()
683
def writeToChild(self, childFD, data):
684
self.pipes[childFD].write(data)
686
def closeChildFD(self, childFD):
687
# for writer pipes, loseConnection tries to write the remaining data
688
# out to the pipe before closing it
689
# if childFD is not in the list of pipes, assume that it is already
691
if childFD in self.pipes:
692
self.pipes[childFD].loseConnection()
694
def pauseProducing(self):
695
for p in self.pipes.itervalues():
696
if isinstance(p, ProcessReader):
699
def resumeProducing(self):
700
for p in self.pipes.itervalues():
701
if isinstance(p, ProcessReader):
705
def closeStdin(self):
707
Call this to close standard input on this process.
711
def closeStdout(self):
714
def closeStderr(self):
717
def loseConnection(self):
722
def write(self, data):
724
Call this to write to standard input on this process.
726
NOTE: This will silently lose data if there is no standard input.
729
self.pipes[0].write(data)
731
def registerProducer(self, producer, streaming):
733
Call this to register producer for standard input.
735
If there is no standard input producer.stopProducing() will
736
be called immediately.
739
self.pipes[0].registerProducer(producer, streaming)
741
producer.stopProducing()
743
def unregisterProducer(self):
745
Call this to unregister producer for standard input."""
747
self.pipes[0].unregisterProducer()
749
def writeSequence(self, seq):
751
Call this to write to standard input on this process.
753
NOTE: This will silently lose data if there is no standard input.
756
self.pipes[0].writeSequence(seq)
759
def childDataReceived(self, name, data):
760
self.proto.childDataReceived(name, data)
763
def childConnectionLost(self, childFD, reason):
764
# this is called when one of the helpers (ProcessReader or
765
# ProcessWriter) notices their pipe has been closed
766
os.close(self.pipes[childFD].fileno())
767
del self.pipes[childFD]
769
self.proto.childConnectionLost(childFD)
772
self.maybeCallProcessEnded()
774
def maybeCallProcessEnded(self):
775
# we don't call ProcessProtocol.processEnded until:
776
# the child has terminated, AND
777
# all writers have indicated an error status, AND
778
# all readers have indicated EOF
779
# This insures that we've gathered all output from the process.
782
if not self.lostProcess:
785
_BaseProcess.maybeCallProcessEnded(self)
788
class PTYProcess(abstract.FileDescriptor, _BaseProcess):
790
An operating-system Process that uses PTY support.
795
def __init__(self, reactor, executable, args, environment, path, proto,
796
uid=None, gid=None, usePTY=None):
798
Spawn an operating-system process.
800
This is where the hard work of disconnecting all currently open
801
files / forking / executing the new process happens. (This is
802
executed automatically when a Process is instantiated.)
804
This will also run the subprocess as a given user ID and group ID, if
805
specified. (Implementation Note: this doesn't support all the arcane
806
nuances of setXXuid on UNIX: it will assume that either your effective
809
if pty is None and not isinstance(usePTY, (tuple, list)):
810
# no pty module and we didn't get a pty to use
811
raise NotImplementedError(
812
"cannot use PTYProcess on platforms without the pty module.")
813
abstract.FileDescriptor.__init__(self, reactor)
814
_BaseProcess.__init__(self, proto)
816
if isinstance(usePTY, (tuple, list)):
817
masterfd, slavefd, ttyname = usePTY
819
masterfd, slavefd = pty.openpty()
820
ttyname = os.ttyname(slavefd)
823
self._fork(path, uid, gid, executable, args, environment,
824
masterfd=masterfd, slavefd=slavefd)
826
if not isinstance(usePTY, (tuple, list)):
831
# we are now in parent process:
833
fdesc.setNonBlocking(masterfd)
839
self.proto.makeConnection(self)
842
registerReapProcessHandler(self.pid, self)
844
def _setupChild(self, masterfd, slavefd):
846
Setup child process after fork() but before exec().
849
if hasattr(termios, 'TIOCNOTTY'):
851
fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
856
fcntl.ioctl(fd, termios.TIOCNOTTY, '')
863
if hasattr(termios, 'TIOCSCTTY'):
864
fcntl.ioctl(slavefd, termios.TIOCSCTTY, '')
870
os.dup2(slavefd, 0) # stdin
871
os.dup2(slavefd, 1) # stdout
872
os.dup2(slavefd, 2) # stderr
874
for fd in xrange(3, 256):
880
self._resetSignalDisposition()
883
# PTYs do not have stdin/stdout/stderr. They only have in and out, just
884
# like sockets. You cannot close one without closing off the entire PTY.
885
def closeStdin(self):
888
def closeStdout(self):
891
def closeStderr(self):
896
Called when my standard output stream is ready for reading.
898
return fdesc.readFromFD(
900
lambda data: self.proto.childDataReceived(1, data))
904
This returns the file number of standard output on this process.
908
def maybeCallProcessEnded(self):
909
# two things must happen before we call the ProcessProtocol's
910
# processEnded method. 1: the child process must die and be reaped
911
# (which calls our own processEnded method). 2: the child must close
912
# their stdin/stdout/stderr fds, causing the pty to close, causing
913
# our connectionLost method to be called. #2 can also be triggered
914
# by calling .loseConnection().
915
if self.lostProcess == 2:
916
_BaseProcess.maybeCallProcessEnded(self)
918
def connectionLost(self, reason):
920
I call this to clean up when one or all of my connections has died.
922
abstract.FileDescriptor.connectionLost(self, reason)
924
self.lostProcess += 1
925
self.maybeCallProcessEnded()
927
def writeSomeData(self, data):
929
Write some data to the open process.
931
return fdesc.writeToFD(self.fd, data)