1
# vim: tabstop=4 shiftwidth=4 softtabstop=4
2
# Copyright [2010] [Anso Labs, LLC]
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
8
# http://www.apache.org/licenses/LICENSE-2.0
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
17
Process pool, still buggy right now.
21
import multiprocessing
23
from nova import vendor
24
from twisted.internet import defer
25
from twisted.internet import reactor
26
from twisted.internet import protocol
27
from twisted.internet import threads
29
# NOTE(termie): this is copied from twisted.internet.utils but since
30
# they don't export it I've copied.
31
class _BackRelay(protocol.ProcessProtocol):
33
Trivial protocol for communicating with a process and turning its output
34
into the result of a L{Deferred}.
36
@ivar deferred: A L{Deferred} which will be called back with all of stdout
37
and, if C{errortoo} is true, all of stderr as well (mixed together in
38
one string). If C{errortoo} is false and any bytes are received over
39
stderr, this will fire with an L{_UnexpectedErrorOutput} instance and
40
the attribute will be set to C{None}.
42
@ivar onProcessEnded: If C{errortoo} is false and bytes are received over
43
stderr, this attribute will refer to a L{Deferred} which will be called
44
back when the process ends. This C{Deferred} is also associated with
45
the L{_UnexpectedErrorOutput} which C{deferred} fires with earlier in
46
this case so that users can determine when the process has actually
47
ended, in addition to knowing when bytes have been received via stderr.
50
def __init__(self, deferred, errortoo=0):
51
self.deferred = deferred
52
self.s = StringIO.StringIO()
54
self.errReceived = self.errReceivedIsGood
56
self.errReceived = self.errReceivedIsBad
58
def errReceivedIsBad(self, text):
59
if self.deferred is not None:
60
self.onProcessEnded = defer.Deferred()
61
err = _UnexpectedErrorOutput(text, self.onProcessEnded)
62
self.deferred.errback(failure.Failure(err))
64
self.transport.loseConnection()
66
def errReceivedIsGood(self, text):
69
def outReceived(self, text):
72
def processEnded(self, reason):
73
if self.deferred is not None:
74
self.deferred.callback(self.s.getvalue())
75
elif self.onProcessEnded is not None:
76
self.onProcessEnded.errback(reason)
79
class BackRelayWithInput(_BackRelay):
80
def __init__(self, deferred, errortoo=0, input=None):
81
super(BackRelayWithInput, self).__init__(deferred, errortoo)
84
def connectionMade(self):
86
self.transport.write(self.input)
87
self.transport.closeStdin()
90
def getProcessOutput(executable, args=None, env=None, path=None, reactor=None,
91
errortoo=0, input=None):
93
from twisted.internet import reactor
94
args = args and args or ()
95
env = env and env and {}
97
p = BackRelayWithInput(d, errortoo=errortoo, input=input)
98
reactor.spawnProcess(p, executable, (executable,)+tuple(args), env, path)
103
""" A simple process pool implementation around mutliprocessing.
105
Allows up to `size` processes at a time and queues the rest.
107
Using workarounds for multiprocessing behavior described in:
108
http://pypi.python.org/pypi/twisted.internet.processes/1.0b1
111
def __init__(self, size=None):
113
self._pool = multiprocessing.Pool(size)
114
self._registerShutdown()
116
def _registerShutdown(self):
117
reactor.addSystemEventTrigger(
118
'during', 'shutdown', self.shutdown, reactor)
120
def shutdown(self, reactor=None):
124
# wait for workers to finish
125
self._pool.terminate()
128
def apply(self, f, *args, **kw):
129
""" Add a task to the pool and return a deferred. """
130
result = self._pool.apply_async(f, args, kw)
131
return threads.deferToThread(result.get)