1
# vim: tabstop=4 shiftwidth=4 softtabstop=4
3
# Copyright 2010 United States Government as represented by the
4
# Administrator of the National Aeronautics and Space Administration.
6
# Copyright 2011 Red Hat, Inc.
8
# Licensed under the Apache License, Version 2.0 (the "License"); you may
9
# not use this file except in compliance with the License. You may obtain
10
# a copy of the License at
12
# http://www.apache.org/licenses/LICENSE-2.0
14
# Unless required by applicable law or agreed to in writing, software
15
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17
# License for the specific language governing permissions and limitations
21
A remote procedure call (rpc) abstraction.
23
For some wrappers that add message versioning to rpc, see:
30
from oslo.config import cfg
32
from libra.openstack.common.gettextutils import _ # noqa
33
from libra.openstack.common import importutils
34
from libra.openstack.common import local
35
from libra.openstack.common import log as logging
38
LOG = logging.getLogger(__name__)
42
cfg.StrOpt('rpc_backend',
43
default='%s.impl_kombu' % __package__,
44
help="The messaging module to use, defaults to kombu."),
45
cfg.IntOpt('rpc_thread_pool_size',
47
help='Size of RPC thread pool'),
48
cfg.IntOpt('rpc_conn_pool_size',
50
help='Size of RPC connection pool'),
51
cfg.IntOpt('rpc_response_timeout',
53
help='Seconds to wait for a response from call or multicall'),
54
cfg.IntOpt('rpc_cast_timeout',
56
help='Seconds to wait before a cast expires (TTL). '
57
'Only supported by impl_zmq.'),
58
cfg.ListOpt('allowed_rpc_exception_modules',
59
default=['nova.exception',
63
help='Modules of exceptions that are permitted to be recreated'
64
' upon receiving exception data from an rpc call.'),
65
cfg.BoolOpt('fake_rabbit',
67
help='If passed, use a fake RabbitMQ provider'),
68
cfg.StrOpt('control_exchange',
70
help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
74
CONF.register_opts(rpc_opts)
77
def set_defaults(control_exchange):
78
cfg.set_defaults(rpc_opts,
79
control_exchange=control_exchange)
82
def create_connection(new=True):
83
"""Create a connection to the message bus used for rpc.
85
For some example usage of creating a connection and some consumers on that
86
connection, see nova.service.
88
:param new: Whether or not to create a new connection. A new connection
89
will be created by default. If new is False, the
90
implementation is free to return an existing connection from a
93
:returns: An instance of openstack.common.rpc.common.Connection
95
return _get_impl().create_connection(CONF, new=new)
98
def _check_for_lock():
102
if ((hasattr(local.strong_store, 'locks_held')
103
and local.strong_store.locks_held)):
104
stack = ' :: '.join([frame[3] for frame in inspect.stack()])
105
LOG.warn(_('A RPC is being made while holding a lock. The locks '
106
'currently held are %(locks)s. This is probably a bug. '
107
'Please report it. Include the following: [%(stack)s].'),
108
{'locks': local.strong_store.locks_held,
115
def call(context, topic, msg, timeout=None, check_for_lock=False):
116
"""Invoke a remote method that returns something.
118
:param context: Information that identifies the user that has made this
120
:param topic: The topic to send the rpc message to. This correlates to the
122
openstack.common.rpc.common.Connection.create_consumer()
123
and only applies when the consumer was created with
125
:param msg: This is a dict in the form { "method" : "method_to_invoke",
126
"args" : dict_of_kwargs }
127
:param timeout: int, number of seconds to use for a response timeout.
128
If set, this overrides the rpc_response_timeout option.
129
:param check_for_lock: if True, a warning is emitted if a RPC call is made
132
:returns: A dict from the remote method.
134
:raises: openstack.common.rpc.common.Timeout if a complete response
135
is not received before the timeout is reached.
139
return _get_impl().call(CONF, context, topic, msg, timeout)
142
def cast(context, topic, msg):
143
"""Invoke a remote method that does not return anything.
145
:param context: Information that identifies the user that has made this
147
:param topic: The topic to send the rpc message to. This correlates to the
149
openstack.common.rpc.common.Connection.create_consumer()
150
and only applies when the consumer was created with
152
:param msg: This is a dict in the form { "method" : "method_to_invoke",
153
"args" : dict_of_kwargs }
157
return _get_impl().cast(CONF, context, topic, msg)
160
def fanout_cast(context, topic, msg):
161
"""Broadcast a remote method invocation with no return.
163
This method will get invoked on all consumers that were set up with this
164
topic name and fanout=True.
166
:param context: Information that identifies the user that has made this
168
:param topic: The topic to send the rpc message to. This correlates to the
170
openstack.common.rpc.common.Connection.create_consumer()
171
and only applies when the consumer was created with
173
:param msg: This is a dict in the form { "method" : "method_to_invoke",
174
"args" : dict_of_kwargs }
178
return _get_impl().fanout_cast(CONF, context, topic, msg)
181
def multicall(context, topic, msg, timeout=None, check_for_lock=False):
182
"""Invoke a remote method and get back an iterator.
184
In this case, the remote method will be returning multiple values in
185
separate messages, so the return values can be processed as the come in via
188
:param context: Information that identifies the user that has made this
190
:param topic: The topic to send the rpc message to. This correlates to the
192
openstack.common.rpc.common.Connection.create_consumer()
193
and only applies when the consumer was created with
195
:param msg: This is a dict in the form { "method" : "method_to_invoke",
196
"args" : dict_of_kwargs }
197
:param timeout: int, number of seconds to use for a response timeout.
198
If set, this overrides the rpc_response_timeout option.
199
:param check_for_lock: if True, a warning is emitted if a RPC call is made
202
:returns: An iterator. The iterator will yield a tuple (N, X) where N is
203
an index that starts at 0 and increases by one for each value
204
returned and X is the Nth value that was returned by the remote
207
:raises: openstack.common.rpc.common.Timeout if a complete response
208
is not received before the timeout is reached.
212
return _get_impl().multicall(CONF, context, topic, msg, timeout)
215
def notify(context, topic, msg, envelope=False):
216
"""Send notification event.
218
:param context: Information that identifies the user that has made this
220
:param topic: The topic to send the notification to.
221
:param msg: This is a dict of content of event.
222
:param envelope: Set to True to enable message envelope for notifications.
226
return _get_impl().notify(cfg.CONF, context, topic, msg, envelope)
230
"""Clean up resoruces in use by implementation.
232
Clean up any resources that have been allocated by the RPC implementation.
233
This is typically open connections to a messaging service. This function
234
would get called before an application using this API exits to allow
235
connections to get torn down cleanly.
239
return _get_impl().cleanup()
242
def cast_to_server(context, server_params, topic, msg):
243
"""Invoke a remote method that does not return anything.
245
:param context: Information that identifies the user that has made this
247
:param server_params: Connection information
248
:param topic: The topic to send the notification to.
249
:param msg: This is a dict in the form { "method" : "method_to_invoke",
250
"args" : dict_of_kwargs }
254
return _get_impl().cast_to_server(CONF, context, server_params, topic,
258
def fanout_cast_to_server(context, server_params, topic, msg):
259
"""Broadcast to a remote method invocation with no return.
261
:param context: Information that identifies the user that has made this
263
:param server_params: Connection information
264
:param topic: The topic to send the notification to.
265
:param msg: This is a dict in the form { "method" : "method_to_invoke",
266
"args" : dict_of_kwargs }
270
return _get_impl().fanout_cast_to_server(CONF, context, server_params,
274
def queue_get_for(context, topic, host):
275
"""Get a queue name for a given topic + host.
277
This function only works if this naming convention is followed on the
278
consumer side, as well. For example, in nova, every instance of the
279
nova-foo service calls create_consumer() for two topics:
284
Messages sent to the 'foo' topic are distributed to exactly one instance of
285
the nova-foo service. The services are chosen in a round-robin fashion.
286
Messages sent to the 'foo.<host>' topic are sent to the nova-foo service on
289
return '%s.%s' % (topic, host) if host else topic
296
"""Delay import of rpc_backend until configuration is loaded."""
300
_RPCIMPL = importutils.import_module(CONF.rpc_backend)
302
# For backwards compatibility with older nova config.
303
impl = CONF.rpc_backend.replace('nova.rpc',
304
'nova.openstack.common.rpc')
305
_RPCIMPL = importutils.import_module(impl)