~ubuntu-cloud-archive/ubuntu/precise/nova/trunk

« back to all changes in this revision

Viewing changes to nova/rpc/common.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short, Chuck Short, Adam Gandelman
  • Date: 2012-06-22 12:39:57 UTC
  • mfrom: (1.1.57)
  • Revision ID: package-import@ubuntu.com-20120622123957-hbzwg84nt9rqwg8r
Tags: 2012.2~f2~20120621.14517-0ubuntu1
[ Chuck Short ]
* New upstream version.

[ Adam Gandelman ]
* debian/rules: Temporarily disable test suite while blocking
  tests are investigated. 
* debian/patches/kombu_tests_timeout.patch: Dropped.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# vim: tabstop=4 shiftwidth=4 softtabstop=4
2
 
 
3
 
# Copyright 2010 United States Government as represented by the
4
 
# Administrator of the National Aeronautics and Space Administration.
5
 
# All Rights Reserved.
6
 
# Copyright 2011 Red Hat, Inc.
7
 
#
8
 
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
9
 
#    not use this file except in compliance with the License. You may obtain
10
 
#    a copy of the License at
11
 
#
12
 
#         http://www.apache.org/licenses/LICENSE-2.0
13
 
#
14
 
#    Unless required by applicable law or agreed to in writing, software
15
 
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16
 
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17
 
#    License for the specific language governing permissions and limitations
18
 
#    under the License.
19
 
 
20
 
import copy
21
 
import logging
22
 
import sys
23
 
import traceback
24
 
 
25
 
from nova.openstack.common import cfg
26
 
from nova.openstack.common import importutils
27
 
from nova.openstack.common import jsonutils
28
 
from nova.openstack.common import local
29
 
 
30
 
 
31
 
LOG = logging.getLogger(__name__)
32
 
 
33
 
 
34
 
class RPCException(Exception):
35
 
    message = _("An unknown RPC related exception occurred.")
36
 
 
37
 
    def __init__(self, message=None, **kwargs):
38
 
        self.kwargs = kwargs
39
 
 
40
 
        if not message:
41
 
            try:
42
 
                message = self.message % kwargs
43
 
 
44
 
            except Exception as e:
45
 
                # kwargs doesn't match a variable in the message
46
 
                # log the issue and the kwargs
47
 
                LOG.exception(_('Exception in string format operation'))
48
 
                for name, value in kwargs.iteritems():
49
 
                    LOG.error("%s: %s" % (name, value))
50
 
                # at least get the core message out if something happened
51
 
                message = self.message
52
 
 
53
 
        super(RPCException, self).__init__(message)
54
 
 
55
 
 
56
 
class RemoteError(RPCException):
57
 
    """Signifies that a remote class has raised an exception.
58
 
 
59
 
    Contains a string representation of the type of the original exception,
60
 
    the value of the original exception, and the traceback.  These are
61
 
    sent to the parent as a joined string so printing the exception
62
 
    contains all of the relevant info.
63
 
 
64
 
    """
65
 
    message = _("Remote error: %(exc_type)s %(value)s\n%(traceback)s.")
66
 
 
67
 
    def __init__(self, exc_type=None, value=None, traceback=None):
68
 
        self.exc_type = exc_type
69
 
        self.value = value
70
 
        self.traceback = traceback
71
 
        super(RemoteError, self).__init__(exc_type=exc_type,
72
 
                                          value=value,
73
 
                                          traceback=traceback)
74
 
 
75
 
 
76
 
class Timeout(RPCException):
77
 
    """Signifies that a timeout has occurred.
78
 
 
79
 
    This exception is raised if the rpc_response_timeout is reached while
80
 
    waiting for a response from the remote side.
81
 
    """
82
 
    message = _("Timeout while waiting on RPC response.")
83
 
 
84
 
 
85
 
class InvalidRPCConnectionReuse(RPCException):
86
 
    message = _("Invalid reuse of an RPC connection.")
87
 
 
88
 
 
89
 
class UnsupportedRpcVersion(RPCException):
90
 
    message = _("Specified RPC version, %(version)s, not supported by "
91
 
                "this endpoint.")
92
 
 
93
 
 
94
 
class Connection(object):
95
 
    """A connection, returned by rpc.create_connection().
96
 
 
97
 
    This class represents a connection to the message bus used for rpc.
98
 
    An instance of this class should never be created by users of the rpc API.
99
 
    Use rpc.create_connection() instead.
100
 
    """
101
 
    def close(self):
102
 
        """Close the connection.
103
 
 
104
 
        This method must be called when the connection will no longer be used.
105
 
        It will ensure that any resources associated with the connection, such
106
 
        as a network connection, and cleaned up.
107
 
        """
108
 
        raise NotImplementedError()
109
 
 
110
 
    def create_consumer(self, conf, topic, proxy, fanout=False):
111
 
        """Create a consumer on this connection.
112
 
 
113
 
        A consumer is associated with a message queue on the backend message
114
 
        bus.  The consumer will read messages from the queue, unpack them, and
115
 
        dispatch them to the proxy object.  The contents of the message pulled
116
 
        off of the queue will determine which method gets called on the proxy
117
 
        object.
118
 
 
119
 
        :param conf:  An openstack.common.cfg configuration object.
120
 
        :param topic: This is a name associated with what to consume from.
121
 
                      Multiple instances of a service may consume from the same
122
 
                      topic. For example, all instances of nova-compute consume
123
 
                      from a queue called "compute".  In that case, the
124
 
                      messages will get distributed amongst the consumers in a
125
 
                      round-robin fashion if fanout=False.  If fanout=True,
126
 
                      every consumer associated with this topic will get a
127
 
                      copy of every message.
128
 
        :param proxy: The object that will handle all incoming messages.
129
 
        :param fanout: Whether or not this is a fanout topic.  See the
130
 
                       documentation for the topic parameter for some
131
 
                       additional comments on this.
132
 
        """
133
 
        raise NotImplementedError()
134
 
 
135
 
    def create_worker(self, conf, topic, proxy, pool_name):
136
 
        """Create a worker on this connection.
137
 
 
138
 
        A worker is like a regular consumer of messages directed to a
139
 
        topic, except that it is part of a set of such consumers (the
140
 
        "pool") which may run in parallel. Every pool of workers will
141
 
        receive a given message, but only one worker in the pool will
142
 
        be asked to process it. Load is distributed across the members
143
 
        of the pool in round-robin fashion.
144
 
 
145
 
        :param conf:  An openstack.common.cfg configuration object.
146
 
        :param topic: This is a name associated with what to consume from.
147
 
                      Multiple instances of a service may consume from the same
148
 
                      topic.
149
 
        :param proxy: The object that will handle all incoming messages.
150
 
        :param pool_name: String containing the name of the pool of workers
151
 
        """
152
 
        raise NotImplementedError()
153
 
 
154
 
    def consume_in_thread(self):
155
 
        """Spawn a thread to handle incoming messages.
156
 
 
157
 
        Spawn a thread that will be responsible for handling all incoming
158
 
        messages for consumers that were set up on this connection.
159
 
 
160
 
        Message dispatching inside of this is expected to be implemented in a
161
 
        non-blocking manner.  An example implementation would be having this
162
 
        thread pull messages in for all of the consumers, but utilize a thread
163
 
        pool for dispatching the messages to the proxy objects.
164
 
        """
165
 
        raise NotImplementedError()
166
 
 
167
 
 
168
 
def _safe_log(log_func, msg, msg_data):
169
 
    """Sanitizes the msg_data field before logging."""
170
 
    SANITIZE = {
171
 
                'set_admin_password': ('new_pass',),
172
 
                'run_instance': ('admin_password',),
173
 
               }
174
 
 
175
 
    has_method = 'method' in msg_data and msg_data['method'] in SANITIZE
176
 
    has_context_token = '_context_auth_token' in msg_data
177
 
    has_token = 'auth_token' in msg_data
178
 
 
179
 
    if not any([has_method, has_context_token, has_token]):
180
 
        return log_func(msg, msg_data)
181
 
 
182
 
    msg_data = copy.deepcopy(msg_data)
183
 
 
184
 
    if has_method:
185
 
        method = msg_data['method']
186
 
        if method in SANITIZE:
187
 
            args_to_sanitize = SANITIZE[method]
188
 
            for arg in args_to_sanitize:
189
 
                try:
190
 
                    msg_data['args'][arg] = "<SANITIZED>"
191
 
                except KeyError:
192
 
                    pass
193
 
 
194
 
    if has_context_token:
195
 
        msg_data['_context_auth_token'] = '<SANITIZED>'
196
 
 
197
 
    if has_token:
198
 
        msg_data['auth_token'] = '<SANITIZED>'
199
 
 
200
 
    return log_func(msg, msg_data)
201
 
 
202
 
 
203
 
def serialize_remote_exception(failure_info):
204
 
    """Prepares exception data to be sent over rpc.
205
 
 
206
 
    Failure_info should be a sys.exc_info() tuple.
207
 
 
208
 
    """
209
 
    tb = traceback.format_exception(*failure_info)
210
 
    failure = failure_info[1]
211
 
    LOG.error(_("Returning exception %s to caller"), unicode(failure))
212
 
    LOG.error(tb)
213
 
 
214
 
    kwargs = {}
215
 
    if hasattr(failure, 'kwargs'):
216
 
        kwargs = failure.kwargs
217
 
 
218
 
    data = {
219
 
        'class': str(failure.__class__.__name__),
220
 
        'module': str(failure.__class__.__module__),
221
 
        'message': unicode(failure),
222
 
        'tb': tb,
223
 
        'args': failure.args,
224
 
        'kwargs': kwargs
225
 
    }
226
 
 
227
 
    json_data = jsonutils.dumps(data)
228
 
 
229
 
    return json_data
230
 
 
231
 
 
232
 
def deserialize_remote_exception(conf, data):
233
 
    failure = jsonutils.loads(str(data))
234
 
 
235
 
    trace = failure.get('tb', [])
236
 
    message = failure.get('message', "") + "\n" + "\n".join(trace)
237
 
    name = failure.get('class')
238
 
    module = failure.get('module')
239
 
 
240
 
    # NOTE(ameade): We DO NOT want to allow just any module to be imported, in
241
 
    # order to prevent arbitrary code execution.
242
 
    if not module in conf.allowed_rpc_exception_modules:
243
 
        return RemoteError(name, failure.get('message'), trace)
244
 
 
245
 
    try:
246
 
        mod = importutils.import_module(module)
247
 
        klass = getattr(mod, name)
248
 
        if not issubclass(klass, Exception):
249
 
            raise TypeError("Can only deserialize Exceptions")
250
 
 
251
 
        failure = klass(**failure.get('kwargs', {}))
252
 
    except (AttributeError, TypeError, ImportError):
253
 
        return RemoteError(name, failure.get('message'), trace)
254
 
 
255
 
    ex_type = type(failure)
256
 
    str_override = lambda self: message
257
 
    new_ex_type = type(ex_type.__name__ + "_Remote", (ex_type,),
258
 
                       {'__str__': str_override, '__unicode__': str_override})
259
 
    try:
260
 
        # NOTE(ameade): Dynamically create a new exception type and swap it in
261
 
        # as the new type for the exception. This only works on user defined
262
 
        # Exceptions and not core python exceptions. This is important because
263
 
        # we cannot necessarily change an exception message so we must override
264
 
        # the __str__ method.
265
 
        failure.__class__ = new_ex_type
266
 
    except TypeError as e:
267
 
        # NOTE(ameade): If a core exception then just add the traceback to the
268
 
        # first exception argument.
269
 
        failure.args = (message,) + failure.args[1:]
270
 
    return failure
271
 
 
272
 
 
273
 
class CommonRpcContext(object):
274
 
    def __init__(self, **kwargs):
275
 
        self.values = kwargs
276
 
 
277
 
    def __getattr__(self, key):
278
 
        try:
279
 
            return self.values[key]
280
 
        except KeyError:
281
 
            raise AttributeError(key)
282
 
 
283
 
    def to_dict(self):
284
 
        return copy.deepcopy(self.values)
285
 
 
286
 
    @classmethod
287
 
    def from_dict(cls, values):
288
 
        return cls(**values)
289
 
 
290
 
    def update_store(self):
291
 
        local.store.context = self
292
 
 
293
 
    def elevated(self, read_deleted=None, overwrite=False):
294
 
        """Return a version of this context with admin flag set."""
295
 
        # TODO(russellb) This method is a bit of a nova-ism.  It makes
296
 
        # some assumptions about the data in the request context sent
297
 
        # across rpc, while the rest of this class does not.  We could get
298
 
        # rid of this if we changed the nova code that uses this to
299
 
        # convert the RpcContext back to its native RequestContext doing
300
 
        # something like nova.context.RequestContext.from_dict(ctxt.to_dict())
301
 
 
302
 
        context = copy.deepcopy(self)
303
 
        context.values['is_admin'] = True
304
 
 
305
 
        context.values.setdefault('roles', [])
306
 
 
307
 
        if 'admin' not in context.values['roles']:
308
 
            context.values['roles'].append('admin')
309
 
 
310
 
        if read_deleted is not None:
311
 
            context.values['read_deleted'] = read_deleted
312
 
 
313
 
        return context