~libra-core/libra/master

« back to all changes in this revision

Viewing changes to libra/openstack/common/rpc/__init__.py

  • Committer: Monty Taylor
  • Date: 2015-10-17 20:03:27 UTC
  • Revision ID: git-v1:c7082fa72ac73b23b48ce63fc82aa7da2d3e5d6a
Retire stackforge/libra

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# vim: tabstop=4 shiftwidth=4 softtabstop=4
2
 
 
3
 
# Copyright 2010 United States Government as represented by the
4
 
# Administrator of the National Aeronautics and Space Administration.
5
 
# All Rights Reserved.
6
 
# Copyright 2011 Red Hat, Inc.
7
 
#
8
 
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
9
 
#    not use this file except in compliance with the License. You may obtain
10
 
#    a copy of the License at
11
 
#
12
 
#         http://www.apache.org/licenses/LICENSE-2.0
13
 
#
14
 
#    Unless required by applicable law or agreed to in writing, software
15
 
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16
 
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17
 
#    License for the specific language governing permissions and limitations
18
 
#    under the License.
19
 
 
20
 
"""
21
 
A remote procedure call (rpc) abstraction.
22
 
 
23
 
For some wrappers that add message versioning to rpc, see:
24
 
    rpc.dispatcher
25
 
    rpc.proxy
26
 
"""
27
 
 
28
 
import inspect
29
 
 
30
 
from oslo.config import cfg
31
 
 
32
 
from libra.openstack.common.gettextutils import _  # noqa
33
 
from libra.openstack.common import importutils
34
 
from libra.openstack.common import local
35
 
from libra.openstack.common import log as logging
36
 
 
37
 
 
38
 
LOG = logging.getLogger(__name__)
39
 
 
40
 
 
41
 
rpc_opts = [
42
 
    cfg.StrOpt('rpc_backend',
43
 
               default='%s.impl_kombu' % __package__,
44
 
               help="The messaging module to use, defaults to kombu."),
45
 
    cfg.IntOpt('rpc_thread_pool_size',
46
 
               default=64,
47
 
               help='Size of RPC thread pool'),
48
 
    cfg.IntOpt('rpc_conn_pool_size',
49
 
               default=30,
50
 
               help='Size of RPC connection pool'),
51
 
    cfg.IntOpt('rpc_response_timeout',
52
 
               default=60,
53
 
               help='Seconds to wait for a response from call or multicall'),
54
 
    cfg.IntOpt('rpc_cast_timeout',
55
 
               default=30,
56
 
               help='Seconds to wait before a cast expires (TTL). '
57
 
                    'Only supported by impl_zmq.'),
58
 
    cfg.ListOpt('allowed_rpc_exception_modules',
59
 
                default=['nova.exception',
60
 
                         'cinder.exception',
61
 
                         'exceptions',
62
 
                         ],
63
 
                help='Modules of exceptions that are permitted to be recreated'
64
 
                     ' upon receiving exception data from an rpc call.'),
65
 
    cfg.BoolOpt('fake_rabbit',
66
 
                default=False,
67
 
                help='If passed, use a fake RabbitMQ provider'),
68
 
    cfg.StrOpt('control_exchange',
69
 
               default='openstack',
70
 
               help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
71
 
]
72
 
 
73
 
CONF = cfg.CONF
74
 
CONF.register_opts(rpc_opts)
75
 
 
76
 
 
77
 
def set_defaults(control_exchange):
78
 
    cfg.set_defaults(rpc_opts,
79
 
                     control_exchange=control_exchange)
80
 
 
81
 
 
82
 
def create_connection(new=True):
83
 
    """Create a connection to the message bus used for rpc.
84
 
 
85
 
    For some example usage of creating a connection and some consumers on that
86
 
    connection, see nova.service.
87
 
 
88
 
    :param new: Whether or not to create a new connection.  A new connection
89
 
                will be created by default.  If new is False, the
90
 
                implementation is free to return an existing connection from a
91
 
                pool.
92
 
 
93
 
    :returns: An instance of openstack.common.rpc.common.Connection
94
 
    """
95
 
    return _get_impl().create_connection(CONF, new=new)
96
 
 
97
 
 
98
 
def _check_for_lock():
99
 
    if not CONF.debug:
100
 
        return None
101
 
 
102
 
    if ((hasattr(local.strong_store, 'locks_held')
103
 
         and local.strong_store.locks_held)):
104
 
        stack = ' :: '.join([frame[3] for frame in inspect.stack()])
105
 
        LOG.warn(_('A RPC is being made while holding a lock. The locks '
106
 
                   'currently held are %(locks)s. This is probably a bug. '
107
 
                   'Please report it. Include the following: [%(stack)s].'),
108
 
                 {'locks': local.strong_store.locks_held,
109
 
                  'stack': stack})
110
 
        return True
111
 
 
112
 
    return False
113
 
 
114
 
 
115
 
def call(context, topic, msg, timeout=None, check_for_lock=False):
116
 
    """Invoke a remote method that returns something.
117
 
 
118
 
    :param context: Information that identifies the user that has made this
119
 
                    request.
120
 
    :param topic: The topic to send the rpc message to.  This correlates to the
121
 
                  topic argument of
122
 
                  openstack.common.rpc.common.Connection.create_consumer()
123
 
                  and only applies when the consumer was created with
124
 
                  fanout=False.
125
 
    :param msg: This is a dict in the form { "method" : "method_to_invoke",
126
 
                                             "args" : dict_of_kwargs }
127
 
    :param timeout: int, number of seconds to use for a response timeout.
128
 
                    If set, this overrides the rpc_response_timeout option.
129
 
    :param check_for_lock: if True, a warning is emitted if a RPC call is made
130
 
                    with a lock held.
131
 
 
132
 
    :returns: A dict from the remote method.
133
 
 
134
 
    :raises: openstack.common.rpc.common.Timeout if a complete response
135
 
             is not received before the timeout is reached.
136
 
    """
137
 
    if check_for_lock:
138
 
        _check_for_lock()
139
 
    return _get_impl().call(CONF, context, topic, msg, timeout)
140
 
 
141
 
 
142
 
def cast(context, topic, msg):
143
 
    """Invoke a remote method that does not return anything.
144
 
 
145
 
    :param context: Information that identifies the user that has made this
146
 
                    request.
147
 
    :param topic: The topic to send the rpc message to.  This correlates to the
148
 
                  topic argument of
149
 
                  openstack.common.rpc.common.Connection.create_consumer()
150
 
                  and only applies when the consumer was created with
151
 
                  fanout=False.
152
 
    :param msg: This is a dict in the form { "method" : "method_to_invoke",
153
 
                                             "args" : dict_of_kwargs }
154
 
 
155
 
    :returns: None
156
 
    """
157
 
    return _get_impl().cast(CONF, context, topic, msg)
158
 
 
159
 
 
160
 
def fanout_cast(context, topic, msg):
161
 
    """Broadcast a remote method invocation with no return.
162
 
 
163
 
    This method will get invoked on all consumers that were set up with this
164
 
    topic name and fanout=True.
165
 
 
166
 
    :param context: Information that identifies the user that has made this
167
 
                    request.
168
 
    :param topic: The topic to send the rpc message to.  This correlates to the
169
 
                  topic argument of
170
 
                  openstack.common.rpc.common.Connection.create_consumer()
171
 
                  and only applies when the consumer was created with
172
 
                  fanout=True.
173
 
    :param msg: This is a dict in the form { "method" : "method_to_invoke",
174
 
                                             "args" : dict_of_kwargs }
175
 
 
176
 
    :returns: None
177
 
    """
178
 
    return _get_impl().fanout_cast(CONF, context, topic, msg)
179
 
 
180
 
 
181
 
def multicall(context, topic, msg, timeout=None, check_for_lock=False):
182
 
    """Invoke a remote method and get back an iterator.
183
 
 
184
 
    In this case, the remote method will be returning multiple values in
185
 
    separate messages, so the return values can be processed as the come in via
186
 
    an iterator.
187
 
 
188
 
    :param context: Information that identifies the user that has made this
189
 
                    request.
190
 
    :param topic: The topic to send the rpc message to.  This correlates to the
191
 
                  topic argument of
192
 
                  openstack.common.rpc.common.Connection.create_consumer()
193
 
                  and only applies when the consumer was created with
194
 
                  fanout=False.
195
 
    :param msg: This is a dict in the form { "method" : "method_to_invoke",
196
 
                                             "args" : dict_of_kwargs }
197
 
    :param timeout: int, number of seconds to use for a response timeout.
198
 
                    If set, this overrides the rpc_response_timeout option.
199
 
    :param check_for_lock: if True, a warning is emitted if a RPC call is made
200
 
                    with a lock held.
201
 
 
202
 
    :returns: An iterator.  The iterator will yield a tuple (N, X) where N is
203
 
              an index that starts at 0 and increases by one for each value
204
 
              returned and X is the Nth value that was returned by the remote
205
 
              method.
206
 
 
207
 
    :raises: openstack.common.rpc.common.Timeout if a complete response
208
 
             is not received before the timeout is reached.
209
 
    """
210
 
    if check_for_lock:
211
 
        _check_for_lock()
212
 
    return _get_impl().multicall(CONF, context, topic, msg, timeout)
213
 
 
214
 
 
215
 
def notify(context, topic, msg, envelope=False):
216
 
    """Send notification event.
217
 
 
218
 
    :param context: Information that identifies the user that has made this
219
 
                    request.
220
 
    :param topic: The topic to send the notification to.
221
 
    :param msg: This is a dict of content of event.
222
 
    :param envelope: Set to True to enable message envelope for notifications.
223
 
 
224
 
    :returns: None
225
 
    """
226
 
    return _get_impl().notify(cfg.CONF, context, topic, msg, envelope)
227
 
 
228
 
 
229
 
def cleanup():
230
 
    """Clean up resoruces in use by implementation.
231
 
 
232
 
    Clean up any resources that have been allocated by the RPC implementation.
233
 
    This is typically open connections to a messaging service.  This function
234
 
    would get called before an application using this API exits to allow
235
 
    connections to get torn down cleanly.
236
 
 
237
 
    :returns: None
238
 
    """
239
 
    return _get_impl().cleanup()
240
 
 
241
 
 
242
 
def cast_to_server(context, server_params, topic, msg):
243
 
    """Invoke a remote method that does not return anything.
244
 
 
245
 
    :param context: Information that identifies the user that has made this
246
 
                    request.
247
 
    :param server_params: Connection information
248
 
    :param topic: The topic to send the notification to.
249
 
    :param msg: This is a dict in the form { "method" : "method_to_invoke",
250
 
                                             "args" : dict_of_kwargs }
251
 
 
252
 
    :returns: None
253
 
    """
254
 
    return _get_impl().cast_to_server(CONF, context, server_params, topic,
255
 
                                      msg)
256
 
 
257
 
 
258
 
def fanout_cast_to_server(context, server_params, topic, msg):
259
 
    """Broadcast to a remote method invocation with no return.
260
 
 
261
 
    :param context: Information that identifies the user that has made this
262
 
                    request.
263
 
    :param server_params: Connection information
264
 
    :param topic: The topic to send the notification to.
265
 
    :param msg: This is a dict in the form { "method" : "method_to_invoke",
266
 
                                             "args" : dict_of_kwargs }
267
 
 
268
 
    :returns: None
269
 
    """
270
 
    return _get_impl().fanout_cast_to_server(CONF, context, server_params,
271
 
                                             topic, msg)
272
 
 
273
 
 
274
 
def queue_get_for(context, topic, host):
275
 
    """Get a queue name for a given topic + host.
276
 
 
277
 
    This function only works if this naming convention is followed on the
278
 
    consumer side, as well.  For example, in nova, every instance of the
279
 
    nova-foo service calls create_consumer() for two topics:
280
 
 
281
 
        foo
282
 
        foo.<host>
283
 
 
284
 
    Messages sent to the 'foo' topic are distributed to exactly one instance of
285
 
    the nova-foo service.  The services are chosen in a round-robin fashion.
286
 
    Messages sent to the 'foo.<host>' topic are sent to the nova-foo service on
287
 
    <host>.
288
 
    """
289
 
    return '%s.%s' % (topic, host) if host else topic
290
 
 
291
 
 
292
 
_RPCIMPL = None
293
 
 
294
 
 
295
 
def _get_impl():
296
 
    """Delay import of rpc_backend until configuration is loaded."""
297
 
    global _RPCIMPL
298
 
    if _RPCIMPL is None:
299
 
        try:
300
 
            _RPCIMPL = importutils.import_module(CONF.rpc_backend)
301
 
        except ImportError:
302
 
            # For backwards compatibility with older nova config.
303
 
            impl = CONF.rpc_backend.replace('nova.rpc',
304
 
                                            'nova.openstack.common.rpc')
305
 
            _RPCIMPL = importutils.import_module(impl)
306
 
    return _RPCIMPL