1
# vim: tabstop=4 shiftwidth=4 softtabstop=4
3
# Copyright 2010 United States Government as represented by the
4
# Administrator of the National Aeronautics and Space Administration.
5
# Copyright 2010 FathomDB Inc.
8
# Licensed under the Apache License, Version 2.0 (the "License"); you may
9
# not use this file except in compliance with the License. You may obtain
10
# a copy of the License at
12
# http://www.apache.org/licenses/LICENSE-2.0
14
# Unless required by applicable law or agreed to in writing, software
15
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17
# License for the specific language governing permissions and limitations
21
Process pool using twisted threading
27
from twisted.internet import defer
28
from twisted.internet import error
29
from twisted.internet import protocol
30
from twisted.internet import reactor
32
from nova import flags
33
from nova.exception import ProcessExecutionError
36
flags.DEFINE_integer('process_pool_size', 4,
37
'Number of processes to use in the process pool')
40
# This is based on _BackRelay from twister.internal.utils, but modified to
41
# capture both stdout and stderr, without odd stderr handling, and also to
43
class BackRelayWithInput(protocol.ProcessProtocol):
45
Trivial protocol for communicating with a process and turning its output
46
into the result of a L{Deferred}.
48
@ivar deferred: A L{Deferred} which will be called back with all of stdout
49
and all of stderr as well (as a tuple). C{terminate_on_stderr} is true
50
and any bytes are received over stderr, this will fire with an
51
L{_ProcessExecutionError} instance and the attribute will be set to
54
@ivar onProcessEnded: If C{terminate_on_stderr} is false and bytes are
55
received over stderr, this attribute will refer to a L{Deferred} which
56
will be called back when the process ends. This C{Deferred} is also
57
associated with the L{_ProcessExecutionError} which C{deferred} fires
58
with earlier in this case so that users can determine when the process
59
has actually ended, in addition to knowing when bytes have been
63
def __init__(self, deferred, cmd, started_deferred=None,
64
terminate_on_stderr=False, check_exit_code=True,
66
self.deferred = deferred
68
self.stdout = StringIO.StringIO()
69
self.stderr = StringIO.StringIO()
70
self.started_deferred = started_deferred
71
self.terminate_on_stderr = terminate_on_stderr
72
self.check_exit_code = check_exit_code
73
self.process_input = process_input
74
self.on_process_ended = None
76
def _build_execution_error(self, exit_code=None):
77
return ProcessExecutionError(cmd=self.cmd,
79
stdout=self.stdout.getvalue(),
80
stderr=self.stderr.getvalue())
82
def errReceived(self, text):
83
self.stderr.write(text)
84
if self.terminate_on_stderr and (self.deferred is not None):
85
self.on_process_ended = defer.Deferred()
86
self.deferred.errback(self._build_execution_error())
88
self.transport.loseConnection()
90
def outReceived(self, text):
91
self.stdout.write(text)
93
def processEnded(self, reason):
94
if self.deferred is not None:
95
stdout, stderr = self.stdout.getvalue(), self.stderr.getvalue()
96
exit_code = reason.value.exitCode
97
if self.check_exit_code and exit_code != 0:
98
self.deferred.errback(self._build_execution_error(exit_code))
101
if self.check_exit_code:
102
reason.trap(error.ProcessDone)
103
self.deferred.callback((stdout, stderr))
105
# NOTE(justinsb): This logic is a little suspicious to me.
106
# If the callback throws an exception, then errback will
107
# be called also. However, this is what the unit tests
109
exec_error = self._build_execution_error(exit_code)
110
self.deferred.errback(exec_error)
111
elif self.on_process_ended is not None:
112
self.on_process_ended.errback(reason)
114
def connectionMade(self):
115
if self.started_deferred:
116
self.started_deferred.callback(self)
117
if self.process_input:
118
self.transport.write(str(self.process_input))
119
self.transport.closeStdin()
122
def get_process_output(executable, args=None, env=None, path=None,
123
process_reactor=None, check_exit_code=True,
124
process_input=None, started_deferred=None,
125
terminate_on_stderr=False):
126
if process_reactor is None:
127
process_reactor = reactor
128
args = args and args or ()
129
env = env and env and {}
130
deferred = defer.Deferred()
133
cmd = " ".join([cmd] + args)
134
logging.debug("Running cmd: %s", cmd)
135
process_handler = BackRelayWithInput(
138
started_deferred=started_deferred,
139
check_exit_code=check_exit_code,
140
process_input=process_input,
141
terminate_on_stderr=terminate_on_stderr)
142
# NOTE(vish): commands come in as unicode, but self.executes needs
143
# strings or process.spawn raises a deprecation warning
144
executable = str(executable)
146
args = [str(x) for x in args]
147
process_reactor.spawnProcess(process_handler, executable,
148
(executable,) + tuple(args), env, path)
152
class ProcessPool(object):
153
""" A simple process pool implementation using Twisted's Process bits.
155
This is pretty basic right now, but hopefully the API will be the correct
156
one so that it can be optimized later.
158
def __init__(self, size=None):
159
self.size = size and size or FLAGS.process_pool_size
160
self._pool = defer.DeferredSemaphore(self.size)
162
def simple_execute(self, cmd, **kw):
163
""" Weak emulation of the old utils.execute() function.
165
This only exists as a way to quickly move old execute methods to
166
this new style of code.
168
NOTE(termie): This will break on args with spaces in them.
170
parsed = cmd.split(' ')
171
executable, args = parsed[0], parsed[1:]
172
return self.execute(executable, args, **kw)
174
def execute(self, *args, **kw):
175
deferred = self._pool.acquire()
177
def _associate_process(proto):
178
deferred.process = proto.transport
179
return proto.transport
181
started = defer.Deferred()
182
started.addCallback(_associate_process)
183
kw.setdefault('started_deferred', started)
185
deferred.process = None
186
deferred.started = started
188
deferred.addCallback(lambda _: get_process_output(*args, **kw))
189
deferred.addBoth(self._release)
192
def _release(self, retval=None):
197
class SharedPool(object):
201
if SharedPool._instance is None:
202
self.__class__._instance = ProcessPool()
204
def __getattr__(self, key):
205
return getattr(self._instance, key)
208
def simple_execute(cmd, **kwargs):
209
return SharedPool().simple_execute(cmd, **kwargs)