1
# vim: tabstop=4 shiftwidth=4 softtabstop=4
3
# Copyright 2010 United States Government as represented by the
4
# Administrator of the National Aeronautics and Space Administration.
7
# Licensed under the Apache License, Version 2.0 (the "License"); you may
8
# not use this file except in compliance with the License. You may obtain
9
# a copy of the License at
11
# http://www.apache.org/licenses/LICENSE-2.0
13
# Unless required by applicable law or agreed to in writing, software
14
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16
# License for the specific language governing permissions and limitations
21
Queues have consumers and publishers.
23
No fan-out support yet.
34
from carrot import connection as carrot_connection
35
from carrot import messaging
37
from eventlet import greenpool
38
from eventlet import pools
39
from eventlet import queue
42
from nova import context
43
from nova import exception
44
from nova import flags
45
from nova import local
46
from nova import log as logging
47
from nova.rpc import common as rpc_common
48
from nova.testing import fake
49
from nova import utils
52
LOG = logging.getLogger(__name__)
55
@utils.deprecated('Use of carrot will be removed in a future release. '
56
'Use kombu, instead.')
57
class Connection(carrot_connection.BrokerConnection, rpc_common.Connection):
58
"""Connection instance object."""
60
def __init__(self, *args, **kwargs):
61
super(Connection, self).__init__(*args, **kwargs)
62
self._rpc_consumers = []
63
self._rpc_consumer_thread = None
66
def instance(cls, new=True):
67
"""Returns the instance."""
68
if new or not hasattr(cls, '_instance'):
69
params = dict(hostname=FLAGS.rabbit_host,
70
port=FLAGS.rabbit_port,
71
ssl=FLAGS.rabbit_use_ssl,
72
userid=FLAGS.rabbit_userid,
73
password=FLAGS.rabbit_password,
74
virtual_host=FLAGS.rabbit_virtual_host)
77
params['backend_cls'] = fake.rabbit.Backend
79
# NOTE(vish): magic is fun!
80
# pylint: disable=W0142
84
cls._instance = cls(**params)
89
"""Recreates the connection instance.
91
This is necessary to recover from some network errors/disconnects.
96
except AttributeError, e:
97
# The _instance stuff is for testing purposes. Usually we don't use
98
# it. So don't freak out if it doesn't exist.
100
return cls.instance()
103
self.cancel_consumer_thread()
104
for consumer in self._rpc_consumers:
110
self._rpc_consumers = []
111
carrot_connection.BrokerConnection.close(self)
113
def consume_in_thread(self):
114
"""Consumer from all queues/consumers in a greenthread"""
116
consumer_set = ConsumerSet(connection=self,
117
consumer_list=self._rpc_consumers)
119
def _consumer_thread():
122
except greenlet.GreenletExit:
124
if self._rpc_consumer_thread is None:
125
self._rpc_consumer_thread = eventlet.spawn(_consumer_thread)
126
return self._rpc_consumer_thread
128
def cancel_consumer_thread(self):
129
"""Cancel a consumer thread"""
130
if self._rpc_consumer_thread is not None:
131
self._rpc_consumer_thread.kill()
133
self._rpc_consumer_thread.wait()
134
except greenlet.GreenletExit:
136
self._rpc_consumer_thread = None
138
def create_consumer(self, topic, proxy, fanout=False):
139
"""Create a consumer that calls methods in the proxy"""
141
consumer = FanoutAdapterConsumer(
146
consumer = TopicAdapterConsumer(
150
self._rpc_consumers.append(consumer)
153
class Pool(pools.Pool):
154
"""Class that implements a Pool of Connections."""
156
# TODO(comstud): Timeout connections not used in a while
158
LOG.debug('Pool creating new connection')
159
return Connection.instance(new=True)
161
# Create a ConnectionPool to use for RPC calls. We'll order the
162
# pool as a stack (LIFO), so that we can potentially loop through and
163
# timeout old unused connections at some point
164
ConnectionPool = Pool(
165
max_size=FLAGS.rpc_conn_pool_size,
169
class Consumer(messaging.Consumer):
170
"""Consumer base class.
172
Contains methods for connecting the fetch method to async loops.
176
def __init__(self, *args, **kwargs):
177
max_retries = FLAGS.rabbit_max_retries
178
sleep_time = FLAGS.rabbit_retry_interval
183
time.sleep(sleep_time)
184
# backoff for next retry attempt.. if there is one
185
sleep_time += FLAGS.rabbit_retry_backoff
189
super(Consumer, self).__init__(*args, **kwargs)
190
self.failed_connection = False
192
except Exception as e: # Catching all because carrot sucks
193
self.failed_connection = True
194
if max_retries > 0 and tries == max_retries:
196
fl_host = FLAGS.rabbit_host
197
fl_port = FLAGS.rabbit_port
199
LOG.error(_('AMQP server on %(fl_host)s:%(fl_port)d is'
200
' unreachable: %(e)s. Trying again in %(fl_intv)d'
201
' seconds.') % locals())
202
if self.failed_connection:
203
LOG.error(_('Unable to connect to AMQP server '
204
'after %(tries)d tries. Shutting down.') % locals())
207
def fetch(self, no_ack=None, auto_ack=None, enable_callbacks=False):
208
"""Wraps the parent fetch with some logic for failed connection."""
209
# TODO(vish): the logic for failed connections and logging should be
210
# refactored into some sort of connection manager object
212
if self.failed_connection:
213
# NOTE(vish): connection is defined in the parent class, we can
214
# recreate it as long as we create the backend too
215
# pylint: disable=W0201
216
self.connection = Connection.recreate()
217
self.backend = self.connection.create_backend()
219
return super(Consumer, self).fetch(no_ack,
222
if self.failed_connection:
223
LOG.error(_('Reconnected to queue'))
224
self.failed_connection = False
225
# NOTE(vish): This is catching all errors because we really don't
226
# want exceptions to be logged 10 times a second if some
227
# persistent failure occurs.
228
except Exception, e: # pylint: disable=W0703
229
if not self.failed_connection:
230
LOG.exception(_('Failed to fetch message from queue: %s') % e)
231
self.failed_connection = True
234
class AdapterConsumer(Consumer):
235
"""Calls methods on a proxy object based on method and args."""
237
def __init__(self, connection=None, topic='broadcast', proxy=None):
238
LOG.debug(_('Initing the Adapter Consumer for %s') % topic)
240
self.pool = greenpool.GreenPool(FLAGS.rpc_thread_pool_size)
241
super(AdapterConsumer, self).__init__(connection=connection,
243
self.register_callback(self.process_data)
245
def process_data(self, message_data, message):
246
"""Consumer callback to call a method on a proxy object.
248
Parses the message for validity and fires off a thread to call the
251
Message data should be a dictionary with two keys:
252
method: string representing the method to call
253
args: dictionary of arg: value
255
Example: {'method': 'echo', 'args': {'value': 42}}
258
# It is important to clear the context here, because at this point
259
# the previous context is stored in local.store.context
260
if hasattr(local.store, 'context'):
261
del local.store.context
262
rpc_common._safe_log(LOG.debug, _('received %s'), message_data)
263
# This will be popped off in _unpack_context
264
msg_id = message_data.get('_msg_id', None)
265
ctxt = _unpack_context(message_data)
267
method = message_data.get('method')
268
args = message_data.get('args', {})
271
# NOTE(vish): we may not want to ack here, but that means that bad
272
# messages stay in the queue indefinitely, so for now
273
# we just log the message and send an error string
275
LOG.warn(_('no method for message: %s') % message_data)
277
_('No method for message: %s') % message_data)
279
self.pool.spawn_n(self._process_data, ctxt, method, args)
281
@exception.wrap_exception()
282
def _process_data(self, ctxt, method, args):
283
"""Thread that magically looks for a method on the proxy
287
node_func = getattr(self.proxy, str(method))
288
node_args = dict((str(k), v) for k, v in args.iteritems())
289
# NOTE(vish): magic is fun!
291
rval = node_func(context=ctxt, **node_args)
292
# Check if the result was a generator
293
if inspect.isgenerator(rval):
297
ctxt.reply(rval, None)
299
# This final None tells multicall that it is done.
300
ctxt.reply(ending=True)
301
except Exception as e:
302
LOG.exception('Exception during message handling')
303
ctxt.reply(None, sys.exc_info())
307
class TopicAdapterConsumer(AdapterConsumer):
308
"""Consumes messages on a specific topic."""
310
exchange_type = 'topic'
312
def __init__(self, connection=None, topic='broadcast', proxy=None):
314
self.routing_key = topic
315
self.exchange = FLAGS.control_exchange
316
self.durable = FLAGS.rabbit_durable_queues
317
super(TopicAdapterConsumer, self).__init__(connection=connection,
318
topic=topic, proxy=proxy)
321
class FanoutAdapterConsumer(AdapterConsumer):
322
"""Consumes messages from a fanout exchange."""
324
exchange_type = 'fanout'
326
def __init__(self, connection=None, topic='broadcast', proxy=None):
327
self.exchange = '%s_fanout' % topic
328
self.routing_key = topic
329
unique = uuid.uuid4().hex
330
self.queue = '%s_fanout_%s' % (topic, unique)
332
# Fanout creates unique queue names, so we should auto-remove
333
# them when done, so they're not left around on restart.
334
# Also, we're the only one that should be consuming. exclusive
335
# implies auto_delete, so we'll just set that..
336
self.exclusive = True
337
LOG.info(_('Created "%(exchange)s" fanout exchange '
338
'with "%(key)s" routing key'),
339
dict(exchange=self.exchange, key=self.routing_key))
340
super(FanoutAdapterConsumer, self).__init__(connection=connection,
341
topic=topic, proxy=proxy)
344
class ConsumerSet(object):
345
"""Groups consumers to listen on together on a single connection."""
347
def __init__(self, connection, consumer_list):
348
self.consumer_list = set(consumer_list)
349
self.consumer_set = None
351
self.init(connection)
353
def init(self, conn):
355
conn = Connection.instance(new=True)
356
if self.consumer_set:
357
self.consumer_set.close()
358
self.consumer_set = messaging.ConsumerSet(conn)
359
for consumer in self.consumer_list:
360
consumer.connection = conn
361
# consumer.backend is set for us
362
self.consumer_set.add_consumer(consumer)
367
def wait(self, limit=None):
370
it = self.consumer_set.iterconsume(limit=limit)
376
except StopIteration:
378
except greenlet.GreenletExit:
381
except Exception as e:
382
LOG.exception(_("Exception while processing consumer"))
384
# Break to outer loop
388
self.consumer_set.close()
391
class Publisher(messaging.Publisher):
392
"""Publisher base class."""
396
class TopicPublisher(Publisher):
397
"""Publishes messages on a specific topic."""
399
exchange_type = 'topic'
401
def __init__(self, connection=None, topic='broadcast', durable=None):
402
self.routing_key = topic
403
self.exchange = FLAGS.control_exchange
404
self.durable = (FLAGS.rabbit_durable_queues if durable is None
406
super(TopicPublisher, self).__init__(connection=connection)
409
class FanoutPublisher(Publisher):
410
"""Publishes messages to a fanout exchange."""
412
exchange_type = 'fanout'
414
def __init__(self, topic, connection=None):
415
self.exchange = '%s_fanout' % topic
416
self.queue = '%s_fanout' % topic
418
self.auto_delete = True
419
LOG.info(_('Creating "%(exchange)s" fanout exchange'),
420
dict(exchange=self.exchange))
421
super(FanoutPublisher, self).__init__(connection=connection)
424
class DirectConsumer(Consumer):
425
"""Consumes messages directly on a channel specified by msg_id."""
427
exchange_type = 'direct'
429
def __init__(self, connection=None, msg_id=None):
431
self.routing_key = msg_id
432
self.exchange = msg_id
434
self.auto_delete = True
435
self.exclusive = True
436
super(DirectConsumer, self).__init__(connection=connection)
439
class DirectPublisher(Publisher):
440
"""Publishes messages directly on a channel specified by msg_id."""
442
exchange_type = 'direct'
444
def __init__(self, connection=None, msg_id=None):
445
self.routing_key = msg_id
446
self.exchange = msg_id
448
self.auto_delete = True
449
super(DirectPublisher, self).__init__(connection=connection)
452
def msg_reply(msg_id, reply=None, failure=None, ending=False):
453
"""Sends a reply or an error on the channel signified by msg_id.
455
Failure should be a sys.exc_info() tuple.
459
message = str(failure[1])
460
tb = traceback.format_exception(*failure)
461
LOG.error(_("Returning exception %s to caller"), message)
463
failure = (failure[0].__name__, str(failure[1]), tb)
465
with ConnectionPool.item() as conn:
466
publisher = DirectPublisher(connection=conn, msg_id=msg_id)
468
msg = {'result': reply, 'failure': failure}
473
msg = {'result': dict((k, repr(v))
474
for k, v in reply.__dict__.iteritems()),
483
def _unpack_context(msg):
484
"""Unpack context from msg."""
486
for key in list(msg.keys()):
487
# NOTE(vish): Some versions of python don't like unicode keys
490
if key.startswith('_context_'):
492
context_dict[key[9:]] = value
493
context_dict['msg_id'] = msg.pop('_msg_id', None)
494
ctx = RpcContext.from_dict(context_dict)
495
LOG.debug(_('unpacked context: %s'), ctx.to_dict())
499
def _pack_context(msg, context):
500
"""Pack context into msg.
502
Values for message keys need to be less than 255 chars, so we pull
503
context out into a bunch of separate keys. If we want to support
504
more arguments in rabbit messages, we may want to do the same
505
for args at some point.
508
context_d = dict([('_context_%s' % key, value)
509
for (key, value) in context.to_dict().iteritems()])
510
msg.update(context_d)
513
class RpcContext(context.RequestContext):
514
def __init__(self, *args, **kwargs):
515
msg_id = kwargs.pop('msg_id', None)
517
super(RpcContext, self).__init__(*args, **kwargs)
519
def reply(self, reply=None, failure=None, ending=False):
521
msg_reply(self.msg_id, reply, failure, ending)
526
def multicall(context, topic, msg, timeout=None):
527
"""Make a call that returns multiple times."""
528
# NOTE(russellb): carrot doesn't support timeouts
529
LOG.debug(_('Making asynchronous call on %s ...'), topic)
530
msg_id = uuid.uuid4().hex
531
msg.update({'_msg_id': msg_id})
532
LOG.debug(_('MSG_ID is %s') % (msg_id))
533
_pack_context(msg, context)
535
con_conn = ConnectionPool.get()
536
consumer = DirectConsumer(connection=con_conn, msg_id=msg_id)
537
wait_msg = MulticallWaiter(consumer)
538
consumer.register_callback(wait_msg)
540
publisher = TopicPublisher(connection=con_conn, topic=topic)
547
class MulticallWaiter(object):
548
def __init__(self, consumer):
549
self._consumer = consumer
550
self._results = queue.Queue()
552
self._got_ending = False
558
self._consumer.close()
559
ConnectionPool.put(self._consumer.connection)
561
def __call__(self, data, message):
562
"""Acks message and sets result."""
565
self._results.put(rpc_common.RemoteError(*data['failure']))
566
elif data.get('ending', False):
567
self._got_ending = True
569
self._results.put(data['result'])
575
while not self._closed:
577
rv = self._consumer.fetch(enable_callbacks=True)
587
result = self._results.get()
588
if isinstance(result, Exception):
594
def create_connection(new=True):
595
"""Create a connection"""
596
return Connection.instance(new=new)
599
def call(context, topic, msg, timeout=None):
600
"""Sends a message on a topic and wait for a response."""
601
rv = multicall(context, topic, msg, timeout)
602
# NOTE(vish): return the last result from the multicall
609
def cast(context, topic, msg):
610
"""Sends a message on a topic without waiting for a response."""
611
LOG.debug(_('Making asynchronous cast on %s...'), topic)
612
_pack_context(msg, context)
613
with ConnectionPool.item() as conn:
614
publisher = TopicPublisher(connection=conn, topic=topic)
619
def fanout_cast(context, topic, msg):
620
"""Sends a message on a fanout exchange without waiting for a response."""
621
LOG.debug(_('Making asynchronous fanout cast...'))
622
_pack_context(msg, context)
623
with ConnectionPool.item() as conn:
624
publisher = FanoutPublisher(topic, connection=conn)
629
def notify(context, topic, msg):
630
"""Sends a notification event on a topic."""
631
LOG.debug(_('Sending notification on %s...'), topic)
632
_pack_context(msg, context)
633
with ConnectionPool.item() as conn:
634
publisher = TopicPublisher(connection=conn, topic=topic,
644
def generic_response(message_data, message):
645
"""Logs a result and exits."""
646
LOG.debug(_('response %s'), message_data)
651
def send_message(topic, message, wait=True):
652
"""Sends a message for testing."""
653
msg_id = uuid.uuid4().hex
654
message.update({'_msg_id': msg_id})
655
LOG.debug(_('topic is %s'), topic)
656
LOG.debug(_('message %s'), message)
659
consumer = messaging.Consumer(connection=Connection.instance(),
663
exchange_type='direct',
665
consumer.register_callback(generic_response)
667
publisher = messaging.Publisher(connection=Connection.instance(),
668
exchange=FLAGS.control_exchange,
669
durable=FLAGS.rabbit_durable_queues,
670
exchange_type='topic',
672
publisher.send(message)
680
if __name__ == '__main__':
681
# You can send messages from the command line using
682
# topic and a json string representing a dictionary
684
send_message(sys.argv[1], json.loads(sys.argv[2]))