1
# vim: tabstop=4 shiftwidth=4 softtabstop=4
3
# Copyright 2010 United States Government as represented by the
4
# Administrator of the National Aeronautics and Space Administration.
6
# Copyright 2011 Red Hat, Inc.
8
# Licensed under the Apache License, Version 2.0 (the "License"); you may
9
# not use this file except in compliance with the License. You may obtain
10
# a copy of the License at
12
# http://www.apache.org/licenses/LICENSE-2.0
14
# Unless required by applicable law or agreed to in writing, software
15
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17
# License for the specific language governing permissions and limitations
25
from nova.openstack.common import cfg
26
from nova.openstack.common import importutils
27
from nova.openstack.common import jsonutils
28
from nova.openstack.common import local
31
LOG = logging.getLogger(__name__)
34
class RPCException(Exception):
35
message = _("An unknown RPC related exception occurred.")
37
def __init__(self, message=None, **kwargs):
42
message = self.message % kwargs
44
except Exception as e:
45
# kwargs doesn't match a variable in the message
46
# log the issue and the kwargs
47
LOG.exception(_('Exception in string format operation'))
48
for name, value in kwargs.iteritems():
49
LOG.error("%s: %s" % (name, value))
50
# at least get the core message out if something happened
51
message = self.message
53
super(RPCException, self).__init__(message)
56
class RemoteError(RPCException):
57
"""Signifies that a remote class has raised an exception.
59
Contains a string representation of the type of the original exception,
60
the value of the original exception, and the traceback. These are
61
sent to the parent as a joined string so printing the exception
62
contains all of the relevant info.
65
message = _("Remote error: %(exc_type)s %(value)s\n%(traceback)s.")
67
def __init__(self, exc_type=None, value=None, traceback=None):
68
self.exc_type = exc_type
70
self.traceback = traceback
71
super(RemoteError, self).__init__(exc_type=exc_type,
76
class Timeout(RPCException):
77
"""Signifies that a timeout has occurred.
79
This exception is raised if the rpc_response_timeout is reached while
80
waiting for a response from the remote side.
82
message = _("Timeout while waiting on RPC response.")
85
class InvalidRPCConnectionReuse(RPCException):
86
message = _("Invalid reuse of an RPC connection.")
89
class UnsupportedRpcVersion(RPCException):
90
message = _("Specified RPC version, %(version)s, not supported by "
94
class Connection(object):
95
"""A connection, returned by rpc.create_connection().
97
This class represents a connection to the message bus used for rpc.
98
An instance of this class should never be created by users of the rpc API.
99
Use rpc.create_connection() instead.
102
"""Close the connection.
104
This method must be called when the connection will no longer be used.
105
It will ensure that any resources associated with the connection, such
106
as a network connection, and cleaned up.
108
raise NotImplementedError()
110
def create_consumer(self, conf, topic, proxy, fanout=False):
111
"""Create a consumer on this connection.
113
A consumer is associated with a message queue on the backend message
114
bus. The consumer will read messages from the queue, unpack them, and
115
dispatch them to the proxy object. The contents of the message pulled
116
off of the queue will determine which method gets called on the proxy
119
:param conf: An openstack.common.cfg configuration object.
120
:param topic: This is a name associated with what to consume from.
121
Multiple instances of a service may consume from the same
122
topic. For example, all instances of nova-compute consume
123
from a queue called "compute". In that case, the
124
messages will get distributed amongst the consumers in a
125
round-robin fashion if fanout=False. If fanout=True,
126
every consumer associated with this topic will get a
127
copy of every message.
128
:param proxy: The object that will handle all incoming messages.
129
:param fanout: Whether or not this is a fanout topic. See the
130
documentation for the topic parameter for some
131
additional comments on this.
133
raise NotImplementedError()
135
def create_worker(self, conf, topic, proxy, pool_name):
136
"""Create a worker on this connection.
138
A worker is like a regular consumer of messages directed to a
139
topic, except that it is part of a set of such consumers (the
140
"pool") which may run in parallel. Every pool of workers will
141
receive a given message, but only one worker in the pool will
142
be asked to process it. Load is distributed across the members
143
of the pool in round-robin fashion.
145
:param conf: An openstack.common.cfg configuration object.
146
:param topic: This is a name associated with what to consume from.
147
Multiple instances of a service may consume from the same
149
:param proxy: The object that will handle all incoming messages.
150
:param pool_name: String containing the name of the pool of workers
152
raise NotImplementedError()
154
def consume_in_thread(self):
155
"""Spawn a thread to handle incoming messages.
157
Spawn a thread that will be responsible for handling all incoming
158
messages for consumers that were set up on this connection.
160
Message dispatching inside of this is expected to be implemented in a
161
non-blocking manner. An example implementation would be having this
162
thread pull messages in for all of the consumers, but utilize a thread
163
pool for dispatching the messages to the proxy objects.
165
raise NotImplementedError()
168
def _safe_log(log_func, msg, msg_data):
169
"""Sanitizes the msg_data field before logging."""
171
'set_admin_password': ('new_pass',),
172
'run_instance': ('admin_password',),
175
has_method = 'method' in msg_data and msg_data['method'] in SANITIZE
176
has_context_token = '_context_auth_token' in msg_data
177
has_token = 'auth_token' in msg_data
179
if not any([has_method, has_context_token, has_token]):
180
return log_func(msg, msg_data)
182
msg_data = copy.deepcopy(msg_data)
185
method = msg_data['method']
186
if method in SANITIZE:
187
args_to_sanitize = SANITIZE[method]
188
for arg in args_to_sanitize:
190
msg_data['args'][arg] = "<SANITIZED>"
194
if has_context_token:
195
msg_data['_context_auth_token'] = '<SANITIZED>'
198
msg_data['auth_token'] = '<SANITIZED>'
200
return log_func(msg, msg_data)
203
def serialize_remote_exception(failure_info):
204
"""Prepares exception data to be sent over rpc.
206
Failure_info should be a sys.exc_info() tuple.
209
tb = traceback.format_exception(*failure_info)
210
failure = failure_info[1]
211
LOG.error(_("Returning exception %s to caller"), unicode(failure))
215
if hasattr(failure, 'kwargs'):
216
kwargs = failure.kwargs
219
'class': str(failure.__class__.__name__),
220
'module': str(failure.__class__.__module__),
221
'message': unicode(failure),
223
'args': failure.args,
227
json_data = jsonutils.dumps(data)
232
def deserialize_remote_exception(conf, data):
233
failure = jsonutils.loads(str(data))
235
trace = failure.get('tb', [])
236
message = failure.get('message', "") + "\n" + "\n".join(trace)
237
name = failure.get('class')
238
module = failure.get('module')
240
# NOTE(ameade): We DO NOT want to allow just any module to be imported, in
241
# order to prevent arbitrary code execution.
242
if not module in conf.allowed_rpc_exception_modules:
243
return RemoteError(name, failure.get('message'), trace)
246
mod = importutils.import_module(module)
247
klass = getattr(mod, name)
248
if not issubclass(klass, Exception):
249
raise TypeError("Can only deserialize Exceptions")
251
failure = klass(**failure.get('kwargs', {}))
252
except (AttributeError, TypeError, ImportError):
253
return RemoteError(name, failure.get('message'), trace)
255
ex_type = type(failure)
256
str_override = lambda self: message
257
new_ex_type = type(ex_type.__name__ + "_Remote", (ex_type,),
258
{'__str__': str_override, '__unicode__': str_override})
260
# NOTE(ameade): Dynamically create a new exception type and swap it in
261
# as the new type for the exception. This only works on user defined
262
# Exceptions and not core python exceptions. This is important because
263
# we cannot necessarily change an exception message so we must override
264
# the __str__ method.
265
failure.__class__ = new_ex_type
266
except TypeError as e:
267
# NOTE(ameade): If a core exception then just add the traceback to the
268
# first exception argument.
269
failure.args = (message,) + failure.args[1:]
273
class CommonRpcContext(object):
274
def __init__(self, **kwargs):
277
def __getattr__(self, key):
279
return self.values[key]
281
raise AttributeError(key)
284
return copy.deepcopy(self.values)
287
def from_dict(cls, values):
290
def update_store(self):
291
local.store.context = self
293
def elevated(self, read_deleted=None, overwrite=False):
294
"""Return a version of this context with admin flag set."""
295
# TODO(russellb) This method is a bit of a nova-ism. It makes
296
# some assumptions about the data in the request context sent
297
# across rpc, while the rest of this class does not. We could get
298
# rid of this if we changed the nova code that uses this to
299
# convert the RpcContext back to its native RequestContext doing
300
# something like nova.context.RequestContext.from_dict(ctxt.to_dict())
302
context = copy.deepcopy(self)
303
context.values['is_admin'] = True
305
context.values.setdefault('roles', [])
307
if 'admin' not in context.values['roles']:
308
context.values['roles'].append('admin')
310
if read_deleted is not None:
311
context.values['read_deleted'] = read_deleted