~justin-fathomdb/nova/justinsb-openstack-api-volumes

« back to all changes in this revision

Viewing changes to vendor/Twisted-10.0.0/twisted/internet/process.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# -*- test-case-name: twisted.test.test_process -*-
 
2
# Copyright (c) 2001-2010 Twisted Matrix Laboratories.
 
3
# See LICENSE for details.
 
4
 
 
5
"""
 
6
UNIX Process management.
 
7
 
 
8
Do NOT use this module directly - use reactor.spawnProcess() instead.
 
9
 
 
10
Maintainer: Itamar Shtull-Trauring
 
11
"""
 
12
 
 
13
# System Imports
 
14
import gc, os, sys, traceback, select, signal, errno
 
15
 
 
16
try:
 
17
    import pty
 
18
except ImportError:
 
19
    pty = None
 
20
 
 
21
try:
 
22
    import fcntl, termios
 
23
except ImportError:
 
24
    fcntl = None
 
25
 
 
26
from twisted.python import log, failure
 
27
from twisted.python.util import switchUID
 
28
from twisted.internet import fdesc, abstract, error
 
29
from twisted.internet.main import CONNECTION_LOST, CONNECTION_DONE
 
30
from twisted.internet._baseprocess import BaseProcess
 
31
 
 
32
# Some people were importing this, which is incorrect, just keeping it
 
33
# here for backwards compatibility:
 
34
ProcessExitedAlready = error.ProcessExitedAlready
 
35
 
 
36
reapProcessHandlers = {}
 
37
 
 
38
def reapAllProcesses():
 
39
    """
 
40
    Reap all registered processes.
 
41
    """
 
42
    for process in reapProcessHandlers.values():
 
43
        process.reapProcess()
 
44
 
 
45
 
 
46
def registerReapProcessHandler(pid, process):
 
47
    """
 
48
    Register a process handler for the given pid, in case L{reapAllProcesses}
 
49
    is called.
 
50
 
 
51
    @param pid: the pid of the process.
 
52
    @param process: a process handler.
 
53
    """
 
54
    if pid in reapProcessHandlers:
 
55
        raise RuntimeError("Try to register an already registered process.")
 
56
    try:
 
57
        auxPID, status = os.waitpid(pid, os.WNOHANG)
 
58
    except:
 
59
        log.msg('Failed to reap %d:' % pid)
 
60
        log.err()
 
61
        auxPID = None
 
62
    if auxPID:
 
63
        process.processEnded(status)
 
64
    else:
 
65
        # if auxPID is 0, there are children but none have exited
 
66
        reapProcessHandlers[pid] = process
 
67
 
 
68
 
 
69
def unregisterReapProcessHandler(pid, process):
 
70
    """
 
71
    Unregister a process handler previously registered with
 
72
    L{registerReapProcessHandler}.
 
73
    """
 
74
    if not (pid in reapProcessHandlers
 
75
            and reapProcessHandlers[pid] == process):
 
76
        raise RuntimeError("Try to unregister a process not registered.")
 
77
    del reapProcessHandlers[pid]
 
78
 
 
79
 
 
80
def detectLinuxBrokenPipeBehavior():
 
81
    """
 
82
    On some Linux version, write-only pipe are detected as readable. This
 
83
    function is here to check if this bug is present or not.
 
84
 
 
85
    See L{ProcessWriter.doRead} for a more detailed explanation.
 
86
    """
 
87
    global brokenLinuxPipeBehavior
 
88
    r, w = os.pipe()
 
89
    os.write(w, 'a')
 
90
    reads, writes, exes = select.select([w], [], [], 0)
 
91
    if reads:
 
92
        # Linux < 2.6.11 says a write-only pipe is readable.
 
93
        brokenLinuxPipeBehavior = True
 
94
    else:
 
95
        brokenLinuxPipeBehavior = False
 
96
    os.close(r)
 
97
    os.close(w)
 
98
 
 
99
# Call at import time
 
100
detectLinuxBrokenPipeBehavior()
 
101
 
 
102
 
 
103
class ProcessWriter(abstract.FileDescriptor):
 
104
    """
 
105
    (Internal) Helper class to write into a Process's input pipe.
 
106
 
 
107
    I am a helper which describes a selectable asynchronous writer to a
 
108
    process's input pipe, including stdin.
 
109
    """
 
110
    connected = 1
 
111
    ic = 0
 
112
    enableReadHack = False
 
113
 
 
114
    def __init__(self, reactor, proc, name, fileno, forceReadHack=False):
 
115
        """
 
116
        Initialize, specifying a Process instance to connect to.
 
117
        """
 
118
        abstract.FileDescriptor.__init__(self, reactor)
 
119
        fdesc.setNonBlocking(fileno)
 
120
        self.proc = proc
 
121
        self.name = name
 
122
        self.fd = fileno
 
123
 
 
124
        if forceReadHack:
 
125
            self.enableReadHack = True
 
126
        else:
 
127
            # Detect if this fd is actually a write-only fd. If it's
 
128
            # valid to read, don't try to detect closing via read.
 
129
            # This really only means that we cannot detect a TTY's write
 
130
            # pipe being closed.
 
131
            try:
 
132
                os.read(self.fileno(), 0)
 
133
            except OSError:
 
134
                # It's a write-only pipe end, enable hack
 
135
                self.enableReadHack = True
 
136
 
 
137
        if self.enableReadHack:
 
138
            self.startReading()
 
139
 
 
140
    def fileno(self):
 
141
        """
 
142
        Return the fileno() of my process's stdin.
 
143
        """
 
144
        return self.fd
 
145
 
 
146
    def writeSomeData(self, data):
 
147
        """
 
148
        Write some data to the open process.
 
149
        """
 
150
        rv = fdesc.writeToFD(self.fd, data)
 
151
        if rv == len(data) and self.enableReadHack:
 
152
            self.startReading()
 
153
        return rv
 
154
 
 
155
    def write(self, data):
 
156
        self.stopReading()
 
157
        abstract.FileDescriptor.write(self, data)
 
158
 
 
159
    def doRead(self):
 
160
        """
 
161
        The only way a write pipe can become "readable" is at EOF, because the
 
162
        child has closed it, and we're using a reactor which doesn't
 
163
        distinguish between readable and closed (such as the select reactor).
 
164
 
 
165
        Except that's not true on linux < 2.6.11. It has the following
 
166
        characteristics: write pipe is completely empty => POLLOUT (writable in
 
167
        select), write pipe is not completely empty => POLLIN (readable in
 
168
        select), write pipe's reader closed => POLLIN|POLLERR (readable and
 
169
        writable in select)
 
170
 
 
171
        That's what this funky code is for. If linux was not broken, this
 
172
        function could be simply "return CONNECTION_LOST".
 
173
 
 
174
        BUG: We call select no matter what the reactor.
 
175
        If the reactor is pollreactor, and the fd is > 1024, this will fail.
 
176
        (only occurs on broken versions of linux, though).
 
177
        """
 
178
        if self.enableReadHack:
 
179
            if brokenLinuxPipeBehavior:
 
180
                fd = self.fd
 
181
                r, w, x = select.select([fd], [fd], [], 0)
 
182
                if r and w:
 
183
                    return CONNECTION_LOST
 
184
            else:
 
185
                return CONNECTION_LOST
 
186
        else:
 
187
            self.stopReading()
 
188
 
 
189
    def connectionLost(self, reason):
 
190
        """
 
191
        See abstract.FileDescriptor.connectionLost.
 
192
        """
 
193
        # At least on OS X 10.4, exiting while stdout is non-blocking can
 
194
        # result in data loss.  For some reason putting the file descriptor
 
195
        # back into blocking mode seems to resolve this issue.
 
196
        fdesc.setBlocking(self.fd)
 
197
 
 
198
        abstract.FileDescriptor.connectionLost(self, reason)
 
199
        self.proc.childConnectionLost(self.name, reason)
 
200
 
 
201
 
 
202
 
 
203
class ProcessReader(abstract.FileDescriptor):
 
204
    """
 
205
    ProcessReader
 
206
 
 
207
    I am a selectable representation of a process's output pipe, such as
 
208
    stdout and stderr.
 
209
    """
 
210
    connected = 1
 
211
 
 
212
    def __init__(self, reactor, proc, name, fileno):
 
213
        """
 
214
        Initialize, specifying a process to connect to.
 
215
        """
 
216
        abstract.FileDescriptor.__init__(self, reactor)
 
217
        fdesc.setNonBlocking(fileno)
 
218
        self.proc = proc
 
219
        self.name = name
 
220
        self.fd = fileno
 
221
        self.startReading()
 
222
 
 
223
    def fileno(self):
 
224
        """
 
225
        Return the fileno() of my process's stderr.
 
226
        """
 
227
        return self.fd
 
228
 
 
229
    def writeSomeData(self, data):
 
230
        # the only time this is actually called is after .loseConnection Any
 
231
        # actual write attempt would fail, so we must avoid that. This hack
 
232
        # allows us to use .loseConnection on both readers and writers.
 
233
        assert data == ""
 
234
        return CONNECTION_LOST
 
235
 
 
236
    def doRead(self):
 
237
        """
 
238
        This is called when the pipe becomes readable.
 
239
        """
 
240
        return fdesc.readFromFD(self.fd, self.dataReceived)
 
241
 
 
242
    def dataReceived(self, data):
 
243
        self.proc.childDataReceived(self.name, data)
 
244
 
 
245
    def loseConnection(self):
 
246
        if self.connected and not self.disconnecting:
 
247
            self.disconnecting = 1
 
248
            self.stopReading()
 
249
            self.reactor.callLater(0, self.connectionLost,
 
250
                                   failure.Failure(CONNECTION_DONE))
 
251
 
 
252
    def connectionLost(self, reason):
 
253
        """
 
254
        Close my end of the pipe, signal the Process (which signals the
 
255
        ProcessProtocol).
 
256
        """
 
257
        abstract.FileDescriptor.connectionLost(self, reason)
 
258
        self.proc.childConnectionLost(self.name, reason)
 
259
 
 
260
 
 
261
class _BaseProcess(BaseProcess, object):
 
262
    """
 
263
    Base class for Process and PTYProcess.
 
264
    """
 
265
    status = None
 
266
    pid = None
 
267
 
 
268
    def reapProcess(self):
 
269
        """
 
270
        Try to reap a process (without blocking) via waitpid.
 
271
 
 
272
        This is called when sigchild is caught or a Process object loses its
 
273
        "connection" (stdout is closed) This ought to result in reaping all
 
274
        zombie processes, since it will be called twice as often as it needs
 
275
        to be.
 
276
 
 
277
        (Unfortunately, this is a slightly experimental approach, since
 
278
        UNIX has no way to be really sure that your process is going to
 
279
        go away w/o blocking.  I don't want to block.)
 
280
        """
 
281
        try:
 
282
            try:
 
283
                pid, status = os.waitpid(self.pid, os.WNOHANG)
 
284
            except OSError, e:
 
285
                if e.errno == errno.ECHILD:
 
286
                    # no child process
 
287
                    pid = None
 
288
                else:
 
289
                    raise
 
290
        except:
 
291
            log.msg('Failed to reap %d:' % self.pid)
 
292
            log.err()
 
293
            pid = None
 
294
        if pid:
 
295
            self.processEnded(status)
 
296
            unregisterReapProcessHandler(pid, self)
 
297
 
 
298
 
 
299
    def _getReason(self, status):
 
300
        exitCode = sig = None
 
301
        if os.WIFEXITED(status):
 
302
            exitCode = os.WEXITSTATUS(status)
 
303
        else:
 
304
            sig = os.WTERMSIG(status)
 
305
        if exitCode or sig:
 
306
            return error.ProcessTerminated(exitCode, sig, status)
 
307
        return error.ProcessDone(status)
 
308
 
 
309
 
 
310
    def signalProcess(self, signalID):
 
311
        """
 
312
        Send the given signal C{signalID} to the process. It'll translate a
 
313
        few signals ('HUP', 'STOP', 'INT', 'KILL', 'TERM') from a string
 
314
        representation to its int value, otherwise it'll pass directly the
 
315
        value provided
 
316
 
 
317
        @type signalID: C{str} or C{int}
 
318
        """
 
319
        if signalID in ('HUP', 'STOP', 'INT', 'KILL', 'TERM'):
 
320
            signalID = getattr(signal, 'SIG%s' % (signalID,))
 
321
        if self.pid is None:
 
322
            raise ProcessExitedAlready()
 
323
        os.kill(self.pid, signalID)
 
324
 
 
325
 
 
326
    def _resetSignalDisposition(self):
 
327
        # The Python interpreter ignores some signals, and our child
 
328
        # process will inherit that behaviour. To have a child process
 
329
        # that responds to signals normally, we need to reset our
 
330
        # child process's signal handling (just) after we fork and
 
331
        # before we execvpe.
 
332
        for signalnum in range(1, signal.NSIG):
 
333
            if signal.getsignal(signalnum) == signal.SIG_IGN:
 
334
                # Reset signal handling to the default
 
335
                signal.signal(signalnum, signal.SIG_DFL)
 
336
 
 
337
 
 
338
    def _fork(self, path, uid, gid, executable, args, environment, **kwargs):
 
339
        """
 
340
        Fork and then exec sub-process.
 
341
 
 
342
        @param path: the path where to run the new process.
 
343
        @type path: C{str}
 
344
        @param uid: if defined, the uid used to run the new process.
 
345
        @type uid: C{int}
 
346
        @param gid: if defined, the gid used to run the new process.
 
347
        @type gid: C{int}
 
348
        @param executable: the executable to run in a new process.
 
349
        @type executable: C{str}
 
350
        @param args: arguments used to create the new process.
 
351
        @type args: C{list}.
 
352
        @param environment: environment used for the new process.
 
353
        @type environment: C{dict}.
 
354
        @param kwargs: keyword arguments to L{_setupChild} method.
 
355
        """
 
356
        settingUID = (uid is not None) or (gid is not None)
 
357
        if settingUID:
 
358
            curegid = os.getegid()
 
359
            currgid = os.getgid()
 
360
            cureuid = os.geteuid()
 
361
            curruid = os.getuid()
 
362
            if uid is None:
 
363
                uid = cureuid
 
364
            if gid is None:
 
365
                gid = curegid
 
366
            # prepare to change UID in subprocess
 
367
            os.setuid(0)
 
368
            os.setgid(0)
 
369
 
 
370
        collectorEnabled = gc.isenabled()
 
371
        gc.disable()
 
372
        try:
 
373
            self.pid = os.fork()
 
374
        except:
 
375
            # Still in the parent process
 
376
            if settingUID:
 
377
                os.setregid(currgid, curegid)
 
378
                os.setreuid(curruid, cureuid)
 
379
            if collectorEnabled:
 
380
                gc.enable()
 
381
            raise
 
382
        else:
 
383
            if self.pid == 0: # pid is 0 in the child process
 
384
                # do not put *ANY* code outside the try block. The child process
 
385
                # must either exec or _exit. If it gets outside this block (due
 
386
                # to an exception that is not handled here, but which might be
 
387
                # handled higher up), there will be two copies of the parent
 
388
                # running in parallel, doing all kinds of damage.
 
389
 
 
390
                # After each change to this code, review it to make sure there
 
391
                # are no exit paths.
 
392
                try:
 
393
                    # Stop debugging. If I am, I don't care anymore.
 
394
                    sys.settrace(None)
 
395
                    self._setupChild(**kwargs)
 
396
                    self._execChild(path, settingUID, uid, gid,
 
397
                                    executable, args, environment)
 
398
                except:
 
399
                    # If there are errors, bail and try to write something
 
400
                    # descriptive to stderr.
 
401
                    # XXX: The parent's stderr isn't necessarily fd 2 anymore, or
 
402
                    #      even still available
 
403
                    # XXXX: however even libc assumes write(2, err) is a useful
 
404
                    #       thing to attempt
 
405
                    try:
 
406
                        stderr = os.fdopen(2, 'w')
 
407
                        stderr.write("Upon execvpe %s %s in environment %s\n:" %
 
408
                                     (executable, str(args),
 
409
                                      "id %s" % id(environment)))
 
410
                        traceback.print_exc(file=stderr)
 
411
                        stderr.flush()
 
412
                        for fd in range(3):
 
413
                            os.close(fd)
 
414
                    except:
 
415
                        pass # make *sure* the child terminates
 
416
                # Did you read the comment about not adding code here?
 
417
                os._exit(1)
 
418
 
 
419
        # we are now in parent process
 
420
        if settingUID:
 
421
            os.setregid(currgid, curegid)
 
422
            os.setreuid(curruid, cureuid)
 
423
        if collectorEnabled:
 
424
            gc.enable()
 
425
        self.status = -1 # this records the exit status of the child
 
426
 
 
427
    def _setupChild(self, *args, **kwargs):
 
428
        """
 
429
        Setup the child process. Override in subclasses.
 
430
        """
 
431
        raise NotImplementedError()
 
432
 
 
433
    def _execChild(self, path, settingUID, uid, gid,
 
434
                   executable, args, environment):
 
435
        """
 
436
        The exec() which is done in the forked child.
 
437
        """
 
438
        if path:
 
439
            os.chdir(path)
 
440
        # set the UID before I actually exec the process
 
441
        if settingUID:
 
442
            switchUID(uid, gid)
 
443
        os.execvpe(executable, args, environment)
 
444
 
 
445
    def __repr__(self):
 
446
        """
 
447
        String representation of a process.
 
448
        """
 
449
        return "<%s pid=%s status=%s>" % (self.__class__.__name__,
 
450
                                          self.pid, self.status)
 
451
 
 
452
class Process(_BaseProcess):
 
453
    """
 
454
    An operating-system Process.
 
455
 
 
456
    This represents an operating-system process with arbitrary input/output
 
457
    pipes connected to it. Those pipes may represent standard input,
 
458
    standard output, and standard error, or any other file descriptor.
 
459
 
 
460
    On UNIX, this is implemented using fork(), exec(), pipe()
 
461
    and fcntl(). These calls may not exist elsewhere so this
 
462
    code is not cross-platform. (also, windows can only select
 
463
    on sockets...)
 
464
    """
 
465
 
 
466
    debug = False
 
467
    debug_child = False
 
468
 
 
469
    status = -1
 
470
    pid = None
 
471
 
 
472
    processWriterFactory = ProcessWriter
 
473
    processReaderFactory = ProcessReader
 
474
 
 
475
    def __init__(self,
 
476
                 reactor, executable, args, environment, path, proto,
 
477
                 uid=None, gid=None, childFDs=None):
 
478
        """
 
479
        Spawn an operating-system process.
 
480
 
 
481
        This is where the hard work of disconnecting all currently open
 
482
        files / forking / executing the new process happens.  (This is
 
483
        executed automatically when a Process is instantiated.)
 
484
 
 
485
        This will also run the subprocess as a given user ID and group ID, if
 
486
        specified.  (Implementation Note: this doesn't support all the arcane
 
487
        nuances of setXXuid on UNIX: it will assume that either your effective
 
488
        or real UID is 0.)
 
489
        """
 
490
        if not proto:
 
491
            assert 'r' not in childFDs.values()
 
492
            assert 'w' not in childFDs.values()
 
493
        _BaseProcess.__init__(self, proto)
 
494
 
 
495
        self.pipes = {}
 
496
        # keys are childFDs, we can sense them closing
 
497
        # values are ProcessReader/ProcessWriters
 
498
 
 
499
        helpers = {}
 
500
        # keys are childFDs
 
501
        # values are parentFDs
 
502
 
 
503
        if childFDs is None:
 
504
            childFDs = {0: "w", # we write to the child's stdin
 
505
                        1: "r", # we read from their stdout
 
506
                        2: "r", # and we read from their stderr
 
507
                        }
 
508
 
 
509
        debug = self.debug
 
510
        if debug: print "childFDs", childFDs
 
511
 
 
512
        _openedPipes = []
 
513
        def pipe():
 
514
            r, w = os.pipe()
 
515
            _openedPipes.extend([r, w])
 
516
            return r, w
 
517
 
 
518
        # fdmap.keys() are filenos of pipes that are used by the child.
 
519
        fdmap = {} # maps childFD to parentFD
 
520
        try:
 
521
            for childFD, target in childFDs.items():
 
522
                if debug: print "[%d]" % childFD, target
 
523
                if target == "r":
 
524
                    # we need a pipe that the parent can read from
 
525
                    readFD, writeFD = pipe()
 
526
                    if debug: print "readFD=%d, writeFD=%d" % (readFD, writeFD)
 
527
                    fdmap[childFD] = writeFD     # child writes to this
 
528
                    helpers[childFD] = readFD    # parent reads from this
 
529
                elif target == "w":
 
530
                    # we need a pipe that the parent can write to
 
531
                    readFD, writeFD = pipe()
 
532
                    if debug: print "readFD=%d, writeFD=%d" % (readFD, writeFD)
 
533
                    fdmap[childFD] = readFD      # child reads from this
 
534
                    helpers[childFD] = writeFD   # parent writes to this
 
535
                else:
 
536
                    assert type(target) == int, '%r should be an int' % (target,)
 
537
                    fdmap[childFD] = target      # parent ignores this
 
538
            if debug: print "fdmap", fdmap
 
539
            if debug: print "helpers", helpers
 
540
            # the child only cares about fdmap.values()
 
541
 
 
542
            self._fork(path, uid, gid, executable, args, environment, fdmap=fdmap)
 
543
        except:
 
544
            map(os.close, _openedPipes)
 
545
            raise
 
546
 
 
547
        # we are the parent process:
 
548
        self.proto = proto
 
549
 
 
550
        # arrange for the parent-side pipes to be read and written
 
551
        for childFD, parentFD in helpers.items():
 
552
            os.close(fdmap[childFD])
 
553
 
 
554
            if childFDs[childFD] == "r":
 
555
                reader = self.processReaderFactory(reactor, self, childFD,
 
556
                                        parentFD)
 
557
                self.pipes[childFD] = reader
 
558
 
 
559
            if childFDs[childFD] == "w":
 
560
                writer = self.processWriterFactory(reactor, self, childFD,
 
561
                                        parentFD, forceReadHack=True)
 
562
                self.pipes[childFD] = writer
 
563
 
 
564
        try:
 
565
            # the 'transport' is used for some compatibility methods
 
566
            if self.proto is not None:
 
567
                self.proto.makeConnection(self)
 
568
        except:
 
569
            log.err()
 
570
 
 
571
        # The reactor might not be running yet.  This might call back into
 
572
        # processEnded synchronously, triggering an application-visible
 
573
        # callback.  That's probably not ideal.  The replacement API for
 
574
        # spawnProcess should improve upon this situation.
 
575
        registerReapProcessHandler(self.pid, self)
 
576
 
 
577
 
 
578
    def _setupChild(self, fdmap):
 
579
        """
 
580
        fdmap[childFD] = parentFD
 
581
 
 
582
        The child wants to end up with 'childFD' attached to what used to be
 
583
        the parent's parentFD. As an example, a bash command run like
 
584
        'command 2>&1' would correspond to an fdmap of {0:0, 1:1, 2:1}.
 
585
        'command >foo.txt' would be {0:0, 1:os.open('foo.txt'), 2:2}.
 
586
 
 
587
        This is accomplished in two steps::
 
588
 
 
589
            1. close all file descriptors that aren't values of fdmap.  This
 
590
               means 0 .. maxfds.
 
591
 
 
592
            2. for each childFD::
 
593
 
 
594
                 - if fdmap[childFD] == childFD, the descriptor is already in
 
595
                   place.  Make sure the CLOEXEC flag is not set, then delete
 
596
                   the entry from fdmap.
 
597
 
 
598
                 - if childFD is in fdmap.values(), then the target descriptor
 
599
                   is busy. Use os.dup() to move it elsewhere, update all
 
600
                   fdmap[childFD] items that point to it, then close the
 
601
                   original. Then fall through to the next case.
 
602
 
 
603
                 - now fdmap[childFD] is not in fdmap.values(), and is free.
 
604
                   Use os.dup2() to move it to the right place, then close the
 
605
                   original.
 
606
        """
 
607
 
 
608
        debug = self.debug_child
 
609
        if debug:
 
610
            errfd = sys.stderr
 
611
            errfd.write("starting _setupChild\n")
 
612
 
 
613
        destList = fdmap.values()
 
614
        try:
 
615
            import resource
 
616
            maxfds = resource.getrlimit(resource.RLIMIT_NOFILE)[1] + 1
 
617
            # OS-X reports 9223372036854775808. That's a lot of fds to close
 
618
            if maxfds > 1024:
 
619
                maxfds = 1024
 
620
        except:
 
621
            maxfds = 256
 
622
 
 
623
        for fd in xrange(maxfds):
 
624
            if fd in destList:
 
625
                continue
 
626
            if debug and fd == errfd.fileno():
 
627
                continue
 
628
            try:
 
629
                os.close(fd)
 
630
            except:
 
631
                pass
 
632
 
 
633
        # at this point, the only fds still open are the ones that need to
 
634
        # be moved to their appropriate positions in the child (the targets
 
635
        # of fdmap, i.e. fdmap.values() )
 
636
 
 
637
        if debug: print >>errfd, "fdmap", fdmap
 
638
        childlist = fdmap.keys()
 
639
        childlist.sort()
 
640
 
 
641
        for child in childlist:
 
642
            target = fdmap[child]
 
643
            if target == child:
 
644
                # fd is already in place
 
645
                if debug: print >>errfd, "%d already in place" % target
 
646
                fdesc._unsetCloseOnExec(child)
 
647
            else:
 
648
                if child in fdmap.values():
 
649
                    # we can't replace child-fd yet, as some other mapping
 
650
                    # still needs the fd it wants to target. We must preserve
 
651
                    # that old fd by duping it to a new home.
 
652
                    newtarget = os.dup(child) # give it a safe home
 
653
                    if debug: print >>errfd, "os.dup(%d) -> %d" % (child,
 
654
                                                                   newtarget)
 
655
                    os.close(child) # close the original
 
656
                    for c, p in fdmap.items():
 
657
                        if p == child:
 
658
                            fdmap[c] = newtarget # update all pointers
 
659
                # now it should be available
 
660
                if debug: print >>errfd, "os.dup2(%d,%d)" % (target, child)
 
661
                os.dup2(target, child)
 
662
 
 
663
        # At this point, the child has everything it needs. We want to close
 
664
        # everything that isn't going to be used by the child, i.e.
 
665
        # everything not in fdmap.keys(). The only remaining fds open are
 
666
        # those in fdmap.values().
 
667
 
 
668
        # Any given fd may appear in fdmap.values() multiple times, so we
 
669
        # need to remove duplicates first.
 
670
 
 
671
        old = []
 
672
        for fd in fdmap.values():
 
673
            if not fd in old:
 
674
                if not fd in fdmap.keys():
 
675
                    old.append(fd)
 
676
        if debug: print >>errfd, "old", old
 
677
        for fd in old:
 
678
            os.close(fd)
 
679
 
 
680
        self._resetSignalDisposition()
 
681
 
 
682
 
 
683
    def writeToChild(self, childFD, data):
 
684
        self.pipes[childFD].write(data)
 
685
 
 
686
    def closeChildFD(self, childFD):
 
687
        # for writer pipes, loseConnection tries to write the remaining data
 
688
        # out to the pipe before closing it
 
689
        # if childFD is not in the list of pipes, assume that it is already
 
690
        # closed
 
691
        if childFD in self.pipes:
 
692
            self.pipes[childFD].loseConnection()
 
693
 
 
694
    def pauseProducing(self):
 
695
        for p in self.pipes.itervalues():
 
696
            if isinstance(p, ProcessReader):
 
697
                p.stopReading()
 
698
 
 
699
    def resumeProducing(self):
 
700
        for p in self.pipes.itervalues():
 
701
            if isinstance(p, ProcessReader):
 
702
                p.startReading()
 
703
 
 
704
    # compatibility
 
705
    def closeStdin(self):
 
706
        """
 
707
        Call this to close standard input on this process.
 
708
        """
 
709
        self.closeChildFD(0)
 
710
 
 
711
    def closeStdout(self):
 
712
        self.closeChildFD(1)
 
713
 
 
714
    def closeStderr(self):
 
715
        self.closeChildFD(2)
 
716
 
 
717
    def loseConnection(self):
 
718
        self.closeStdin()
 
719
        self.closeStderr()
 
720
        self.closeStdout()
 
721
 
 
722
    def write(self, data):
 
723
        """
 
724
        Call this to write to standard input on this process.
 
725
 
 
726
        NOTE: This will silently lose data if there is no standard input.
 
727
        """
 
728
        if 0 in self.pipes:
 
729
            self.pipes[0].write(data)
 
730
 
 
731
    def registerProducer(self, producer, streaming):
 
732
        """
 
733
        Call this to register producer for standard input.
 
734
 
 
735
        If there is no standard input producer.stopProducing() will
 
736
        be called immediately.
 
737
        """
 
738
        if 0 in self.pipes:
 
739
            self.pipes[0].registerProducer(producer, streaming)
 
740
        else:
 
741
            producer.stopProducing()
 
742
 
 
743
    def unregisterProducer(self):
 
744
        """
 
745
        Call this to unregister producer for standard input."""
 
746
        if 0 in self.pipes:
 
747
            self.pipes[0].unregisterProducer()
 
748
 
 
749
    def writeSequence(self, seq):
 
750
        """
 
751
        Call this to write to standard input on this process.
 
752
 
 
753
        NOTE: This will silently lose data if there is no standard input.
 
754
        """
 
755
        if 0 in self.pipes:
 
756
            self.pipes[0].writeSequence(seq)
 
757
 
 
758
 
 
759
    def childDataReceived(self, name, data):
 
760
        self.proto.childDataReceived(name, data)
 
761
 
 
762
 
 
763
    def childConnectionLost(self, childFD, reason):
 
764
        # this is called when one of the helpers (ProcessReader or
 
765
        # ProcessWriter) notices their pipe has been closed
 
766
        os.close(self.pipes[childFD].fileno())
 
767
        del self.pipes[childFD]
 
768
        try:
 
769
            self.proto.childConnectionLost(childFD)
 
770
        except:
 
771
            log.err()
 
772
        self.maybeCallProcessEnded()
 
773
 
 
774
    def maybeCallProcessEnded(self):
 
775
        # we don't call ProcessProtocol.processEnded until:
 
776
        #  the child has terminated, AND
 
777
        #  all writers have indicated an error status, AND
 
778
        #  all readers have indicated EOF
 
779
        # This insures that we've gathered all output from the process.
 
780
        if self.pipes:
 
781
            return
 
782
        if not self.lostProcess:
 
783
            self.reapProcess()
 
784
            return
 
785
        _BaseProcess.maybeCallProcessEnded(self)
 
786
 
 
787
 
 
788
class PTYProcess(abstract.FileDescriptor, _BaseProcess):
 
789
    """
 
790
    An operating-system Process that uses PTY support.
 
791
    """
 
792
    status = -1
 
793
    pid = None
 
794
 
 
795
    def __init__(self, reactor, executable, args, environment, path, proto,
 
796
                 uid=None, gid=None, usePTY=None):
 
797
        """
 
798
        Spawn an operating-system process.
 
799
 
 
800
        This is where the hard work of disconnecting all currently open
 
801
        files / forking / executing the new process happens.  (This is
 
802
        executed automatically when a Process is instantiated.)
 
803
 
 
804
        This will also run the subprocess as a given user ID and group ID, if
 
805
        specified.  (Implementation Note: this doesn't support all the arcane
 
806
        nuances of setXXuid on UNIX: it will assume that either your effective
 
807
        or real UID is 0.)
 
808
        """
 
809
        if pty is None and not isinstance(usePTY, (tuple, list)):
 
810
            # no pty module and we didn't get a pty to use
 
811
            raise NotImplementedError(
 
812
                "cannot use PTYProcess on platforms without the pty module.")
 
813
        abstract.FileDescriptor.__init__(self, reactor)
 
814
        _BaseProcess.__init__(self, proto)
 
815
 
 
816
        if isinstance(usePTY, (tuple, list)):
 
817
            masterfd, slavefd, ttyname = usePTY
 
818
        else:
 
819
            masterfd, slavefd = pty.openpty()
 
820
            ttyname = os.ttyname(slavefd)
 
821
 
 
822
        try:
 
823
            self._fork(path, uid, gid, executable, args, environment,
 
824
                       masterfd=masterfd, slavefd=slavefd)
 
825
        except:
 
826
            if not isinstance(usePTY, (tuple, list)):
 
827
                os.close(masterfd)
 
828
                os.close(slavefd)
 
829
            raise
 
830
 
 
831
        # we are now in parent process:
 
832
        os.close(slavefd)
 
833
        fdesc.setNonBlocking(masterfd)
 
834
        self.fd = masterfd
 
835
        self.startReading()
 
836
        self.connected = 1
 
837
        self.status = -1
 
838
        try:
 
839
            self.proto.makeConnection(self)
 
840
        except:
 
841
            log.err()
 
842
        registerReapProcessHandler(self.pid, self)
 
843
 
 
844
    def _setupChild(self, masterfd, slavefd):
 
845
        """
 
846
        Setup child process after fork() but before exec().
 
847
        """
 
848
        os.close(masterfd)
 
849
        if hasattr(termios, 'TIOCNOTTY'):
 
850
            try:
 
851
                fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
 
852
            except OSError:
 
853
                pass
 
854
            else:
 
855
                try:
 
856
                    fcntl.ioctl(fd, termios.TIOCNOTTY, '')
 
857
                except:
 
858
                    pass
 
859
                os.close(fd)
 
860
 
 
861
        os.setsid()
 
862
 
 
863
        if hasattr(termios, 'TIOCSCTTY'):
 
864
            fcntl.ioctl(slavefd, termios.TIOCSCTTY, '')
 
865
 
 
866
        for fd in range(3):
 
867
            if fd != slavefd:
 
868
                os.close(fd)
 
869
 
 
870
        os.dup2(slavefd, 0) # stdin
 
871
        os.dup2(slavefd, 1) # stdout
 
872
        os.dup2(slavefd, 2) # stderr
 
873
 
 
874
        for fd in xrange(3, 256):
 
875
            try:
 
876
                os.close(fd)
 
877
            except:
 
878
                pass
 
879
 
 
880
        self._resetSignalDisposition()
 
881
 
 
882
 
 
883
    # PTYs do not have stdin/stdout/stderr. They only have in and out, just
 
884
    # like sockets. You cannot close one without closing off the entire PTY.
 
885
    def closeStdin(self):
 
886
        pass
 
887
 
 
888
    def closeStdout(self):
 
889
        pass
 
890
 
 
891
    def closeStderr(self):
 
892
        pass
 
893
 
 
894
    def doRead(self):
 
895
        """
 
896
        Called when my standard output stream is ready for reading.
 
897
        """
 
898
        return fdesc.readFromFD(
 
899
            self.fd,
 
900
            lambda data: self.proto.childDataReceived(1, data))
 
901
 
 
902
    def fileno(self):
 
903
        """
 
904
        This returns the file number of standard output on this process.
 
905
        """
 
906
        return self.fd
 
907
 
 
908
    def maybeCallProcessEnded(self):
 
909
        # two things must happen before we call the ProcessProtocol's
 
910
        # processEnded method. 1: the child process must die and be reaped
 
911
        # (which calls our own processEnded method). 2: the child must close
 
912
        # their stdin/stdout/stderr fds, causing the pty to close, causing
 
913
        # our connectionLost method to be called. #2 can also be triggered
 
914
        # by calling .loseConnection().
 
915
        if self.lostProcess == 2:
 
916
            _BaseProcess.maybeCallProcessEnded(self)
 
917
 
 
918
    def connectionLost(self, reason):
 
919
        """
 
920
        I call this to clean up when one or all of my connections has died.
 
921
        """
 
922
        abstract.FileDescriptor.connectionLost(self, reason)
 
923
        os.close(self.fd)
 
924
        self.lostProcess += 1
 
925
        self.maybeCallProcessEnded()
 
926
 
 
927
    def writeSomeData(self, data):
 
928
        """
 
929
        Write some data to the open process.
 
930
        """
 
931
        return fdesc.writeToFD(self.fd, data)