~jk0/nova/xs-ipv6

« back to all changes in this revision

Viewing changes to nova/process.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# vim: tabstop=4 shiftwidth=4 softtabstop=4
 
2
# Copyright [2010] [Anso Labs, LLC]
 
3
 
4
#    Licensed under the Apache License, Version 2.0 (the "License");
 
5
#    you may not use this file except in compliance with the License.
 
6
#    You may obtain a copy of the License at
 
7
 
8
#        http://www.apache.org/licenses/LICENSE-2.0
 
9
 
10
#    Unless required by applicable law or agreed to in writing, software
 
11
#    distributed under the License is distributed on an "AS IS" BASIS,
 
12
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
13
#    See the License for the specific language governing permissions and
 
14
#    limitations under the License.
 
15
 
 
16
"""
 
17
Process pool, still buggy right now.
 
18
"""
 
19
 
 
20
import logging
 
21
import multiprocessing
 
22
 
 
23
from nova import vendor
 
24
from twisted.internet import defer
 
25
from twisted.internet import reactor
 
26
from twisted.internet import protocol
 
27
from twisted.internet import threads
 
28
 
 
29
# NOTE(termie): this is copied from twisted.internet.utils but since
 
30
#               they don't export it I've copied.
 
31
class _BackRelay(protocol.ProcessProtocol):
 
32
    """
 
33
    Trivial protocol for communicating with a process and turning its output
 
34
    into the result of a L{Deferred}.
 
35
 
 
36
    @ivar deferred: A L{Deferred} which will be called back with all of stdout
 
37
        and, if C{errortoo} is true, all of stderr as well (mixed together in
 
38
        one string).  If C{errortoo} is false and any bytes are received over
 
39
        stderr, this will fire with an L{_UnexpectedErrorOutput} instance and
 
40
        the attribute will be set to C{None}.
 
41
 
 
42
    @ivar onProcessEnded: If C{errortoo} is false and bytes are received over
 
43
        stderr, this attribute will refer to a L{Deferred} which will be called
 
44
        back when the process ends.  This C{Deferred} is also associated with
 
45
        the L{_UnexpectedErrorOutput} which C{deferred} fires with earlier in
 
46
        this case so that users can determine when the process has actually
 
47
        ended, in addition to knowing when bytes have been received via stderr.
 
48
    """
 
49
 
 
50
    def __init__(self, deferred, errortoo=0):
 
51
        self.deferred = deferred
 
52
        self.s = StringIO.StringIO()
 
53
        if errortoo:
 
54
            self.errReceived = self.errReceivedIsGood
 
55
        else:
 
56
            self.errReceived = self.errReceivedIsBad
 
57
 
 
58
    def errReceivedIsBad(self, text):
 
59
        if self.deferred is not None:
 
60
            self.onProcessEnded = defer.Deferred()
 
61
            err = _UnexpectedErrorOutput(text, self.onProcessEnded)
 
62
            self.deferred.errback(failure.Failure(err))
 
63
            self.deferred = None
 
64
            self.transport.loseConnection()
 
65
 
 
66
    def errReceivedIsGood(self, text):
 
67
        self.s.write(text)
 
68
 
 
69
    def outReceived(self, text):
 
70
        self.s.write(text)
 
71
 
 
72
    def processEnded(self, reason):
 
73
        if self.deferred is not None:
 
74
            self.deferred.callback(self.s.getvalue())
 
75
        elif self.onProcessEnded is not None:
 
76
            self.onProcessEnded.errback(reason)
 
77
 
 
78
 
 
79
class BackRelayWithInput(_BackRelay):
 
80
    def __init__(self, deferred, errortoo=0, input=None):
 
81
        super(BackRelayWithInput, self).__init__(deferred, errortoo)
 
82
        self.input = input
 
83
 
 
84
    def connectionMade(self):
 
85
        if self.input:
 
86
            self.transport.write(self.input)
 
87
        self.transport.closeStdin()
 
88
 
 
89
 
 
90
def getProcessOutput(executable, args=None, env=None, path=None, reactor=None,
 
91
                     errortoo=0, input=None):
 
92
    if reactor is None:
 
93
        from twisted.internet import reactor
 
94
    args = args and args or ()
 
95
    env = env and env and {}
 
96
    d = defer.Deferred()
 
97
    p = BackRelayWithInput(d, errortoo=errortoo, input=input)
 
98
    reactor.spawnProcess(p, executable, (executable,)+tuple(args), env, path)
 
99
    return d
 
100
 
 
101
 
 
102
class Pool(object):
 
103
    """ A simple process pool implementation around mutliprocessing.
 
104
 
 
105
    Allows up to `size` processes at a time and queues the rest.
 
106
 
 
107
    Using workarounds for multiprocessing behavior described in:
 
108
    http://pypi.python.org/pypi/twisted.internet.processes/1.0b1
 
109
    """
 
110
 
 
111
    def __init__(self, size=None):
 
112
        self._size = size
 
113
        self._pool = multiprocessing.Pool(size)
 
114
        self._registerShutdown()
 
115
 
 
116
    def _registerShutdown(self):
 
117
        reactor.addSystemEventTrigger(
 
118
                'during', 'shutdown', self.shutdown, reactor)
 
119
 
 
120
    def shutdown(self, reactor=None):
 
121
        if not self._pool:
 
122
            return
 
123
        self._pool.close()
 
124
        # wait for workers to finish
 
125
        self._pool.terminate()
 
126
        self._pool = None
 
127
 
 
128
    def apply(self, f, *args, **kw):
 
129
        """ Add a task to the pool and return a deferred. """
 
130
        result = self._pool.apply_async(f, args, kw)
 
131
        return threads.deferToThread(result.get)