~ubuntu-branches/ubuntu/quantal/nova/quantal-proposed

« back to all changes in this revision

Viewing changes to nova/process.py

  • Committer: Bazaar Package Importer
  • Author(s): Chuck Short
  • Date: 2011-01-21 11:48:06 UTC
  • mto: This revision was merged to the branch mainline in revision 9.
  • Revision ID: james.westby@ubuntu.com-20110121114806-v8fvnnl6az4m4ohv
Tags: upstream-2011.1~bzr597
ImportĀ upstreamĀ versionĀ 2011.1~bzr597

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# vim: tabstop=4 shiftwidth=4 softtabstop=4
2
 
 
3
 
# Copyright 2010 United States Government as represented by the
4
 
# Administrator of the National Aeronautics and Space Administration.
5
 
# Copyright 2010 FathomDB Inc.
6
 
# All Rights Reserved.
7
 
#
8
 
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
9
 
#    not use this file except in compliance with the License. You may obtain
10
 
#    a copy of the License at
11
 
#
12
 
#         http://www.apache.org/licenses/LICENSE-2.0
13
 
#
14
 
#    Unless required by applicable law or agreed to in writing, software
15
 
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16
 
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17
 
#    License for the specific language governing permissions and limitations
18
 
#    under the License.
19
 
 
20
 
"""
21
 
Process pool using twisted threading
22
 
"""
23
 
 
24
 
import logging
25
 
import StringIO
26
 
 
27
 
from twisted.internet import defer
28
 
from twisted.internet import error
29
 
from twisted.internet import protocol
30
 
from twisted.internet import reactor
31
 
 
32
 
from nova import flags
33
 
from nova.exception import ProcessExecutionError
34
 
 
35
 
FLAGS = flags.FLAGS
36
 
flags.DEFINE_integer('process_pool_size', 4,
37
 
                     'Number of processes to use in the process pool')
38
 
 
39
 
 
40
 
# This is based on _BackRelay from twister.internal.utils, but modified to
41
 
#  capture both stdout and stderr, without odd stderr handling, and also to
42
 
#  handle stdin
43
 
class BackRelayWithInput(protocol.ProcessProtocol):
44
 
    """
45
 
    Trivial protocol for communicating with a process and turning its output
46
 
    into the result of a L{Deferred}.
47
 
 
48
 
    @ivar deferred: A L{Deferred} which will be called back with all of stdout
49
 
        and all of stderr as well (as a tuple).  C{terminate_on_stderr} is true
50
 
        and any bytes are received over stderr, this will fire with an
51
 
        L{_ProcessExecutionError} instance and the attribute will be set to
52
 
        C{None}.
53
 
 
54
 
    @ivar onProcessEnded: If C{terminate_on_stderr} is false and bytes are
55
 
        received over stderr, this attribute will refer to a L{Deferred} which
56
 
        will be called back when the process ends.  This C{Deferred} is also
57
 
        associated with the L{_ProcessExecutionError} which C{deferred} fires
58
 
        with earlier in this case so that users can determine when the process
59
 
        has actually ended, in addition to knowing when bytes have been
60
 
        received via stderr.
61
 
    """
62
 
 
63
 
    def __init__(self, deferred, cmd, started_deferred=None,
64
 
                 terminate_on_stderr=False, check_exit_code=True,
65
 
                 process_input=None):
66
 
        self.deferred = deferred
67
 
        self.cmd = cmd
68
 
        self.stdout = StringIO.StringIO()
69
 
        self.stderr = StringIO.StringIO()
70
 
        self.started_deferred = started_deferred
71
 
        self.terminate_on_stderr = terminate_on_stderr
72
 
        self.check_exit_code = check_exit_code
73
 
        self.process_input = process_input
74
 
        self.on_process_ended = None
75
 
 
76
 
    def _build_execution_error(self, exit_code=None):
77
 
        return ProcessExecutionError(cmd=self.cmd,
78
 
                                     exit_code=exit_code,
79
 
                                     stdout=self.stdout.getvalue(),
80
 
                                     stderr=self.stderr.getvalue())
81
 
 
82
 
    def errReceived(self, text):
83
 
        self.stderr.write(text)
84
 
        if self.terminate_on_stderr and (self.deferred is not None):
85
 
            self.on_process_ended = defer.Deferred()
86
 
            self.deferred.errback(self._build_execution_error())
87
 
            self.deferred = None
88
 
            self.transport.loseConnection()
89
 
 
90
 
    def outReceived(self, text):
91
 
        self.stdout.write(text)
92
 
 
93
 
    def processEnded(self, reason):
94
 
        if self.deferred is not None:
95
 
            stdout, stderr = self.stdout.getvalue(), self.stderr.getvalue()
96
 
            exit_code = reason.value.exitCode
97
 
            if self.check_exit_code and exit_code != 0:
98
 
                self.deferred.errback(self._build_execution_error(exit_code))
99
 
            else:
100
 
                try:
101
 
                    if self.check_exit_code:
102
 
                        reason.trap(error.ProcessDone)
103
 
                    self.deferred.callback((stdout, stderr))
104
 
                except:
105
 
                    # NOTE(justinsb): This logic is a little suspicious to me.
106
 
                    # If the callback throws an exception, then errback will
107
 
                    # be called also. However, this is what the unit tests
108
 
                    # test for.
109
 
                    exec_error = self._build_execution_error(exit_code)
110
 
                    self.deferred.errback(exec_error)
111
 
        elif self.on_process_ended is not None:
112
 
            self.on_process_ended.errback(reason)
113
 
 
114
 
    def connectionMade(self):
115
 
        if self.started_deferred:
116
 
            self.started_deferred.callback(self)
117
 
        if self.process_input:
118
 
            self.transport.write(str(self.process_input))
119
 
        self.transport.closeStdin()
120
 
 
121
 
 
122
 
def get_process_output(executable, args=None, env=None, path=None,
123
 
                       process_reactor=None, check_exit_code=True,
124
 
                       process_input=None, started_deferred=None,
125
 
                       terminate_on_stderr=False):
126
 
    if process_reactor is None:
127
 
        process_reactor = reactor
128
 
    args = args and args or ()
129
 
    env = env and env and {}
130
 
    deferred = defer.Deferred()
131
 
    cmd = executable
132
 
    if args:
133
 
        cmd = " ".join([cmd] + args)
134
 
    logging.debug("Running cmd: %s", cmd)
135
 
    process_handler = BackRelayWithInput(
136
 
            deferred,
137
 
            cmd,
138
 
            started_deferred=started_deferred,
139
 
            check_exit_code=check_exit_code,
140
 
            process_input=process_input,
141
 
            terminate_on_stderr=terminate_on_stderr)
142
 
    # NOTE(vish): commands come in as unicode, but self.executes needs
143
 
    #             strings or process.spawn raises a deprecation warning
144
 
    executable = str(executable)
145
 
    if not args is None:
146
 
        args = [str(x) for x in args]
147
 
    process_reactor.spawnProcess(process_handler, executable,
148
 
                                 (executable,) + tuple(args), env, path)
149
 
    return deferred
150
 
 
151
 
 
152
 
class ProcessPool(object):
153
 
    """ A simple process pool implementation using Twisted's Process bits.
154
 
 
155
 
    This is pretty basic right now, but hopefully the API will be the correct
156
 
    one so that it can be optimized later.
157
 
    """
158
 
    def __init__(self, size=None):
159
 
        self.size = size and size or FLAGS.process_pool_size
160
 
        self._pool = defer.DeferredSemaphore(self.size)
161
 
 
162
 
    def simple_execute(self, cmd, **kw):
163
 
        """ Weak emulation of the old utils.execute() function.
164
 
 
165
 
        This only exists as a way to quickly move old execute methods to
166
 
        this new style of code.
167
 
 
168
 
        NOTE(termie): This will break on args with spaces in them.
169
 
        """
170
 
        parsed = cmd.split(' ')
171
 
        executable, args = parsed[0], parsed[1:]
172
 
        return self.execute(executable, args, **kw)
173
 
 
174
 
    def execute(self, *args, **kw):
175
 
        deferred = self._pool.acquire()
176
 
 
177
 
        def _associate_process(proto):
178
 
            deferred.process = proto.transport
179
 
            return proto.transport
180
 
 
181
 
        started = defer.Deferred()
182
 
        started.addCallback(_associate_process)
183
 
        kw.setdefault('started_deferred', started)
184
 
 
185
 
        deferred.process = None
186
 
        deferred.started = started
187
 
 
188
 
        deferred.addCallback(lambda _: get_process_output(*args, **kw))
189
 
        deferred.addBoth(self._release)
190
 
        return deferred
191
 
 
192
 
    def _release(self, retval=None):
193
 
        self._pool.release()
194
 
        return retval
195
 
 
196
 
 
197
 
class SharedPool(object):
198
 
    _instance = None
199
 
 
200
 
    def __init__(self):
201
 
        if SharedPool._instance is None:
202
 
            self.__class__._instance = ProcessPool()
203
 
 
204
 
    def __getattr__(self, key):
205
 
        return getattr(self._instance, key)
206
 
 
207
 
 
208
 
def simple_execute(cmd, **kwargs):
209
 
    return SharedPool().simple_execute(cmd, **kwargs)