1
# vim: tabstop=4 shiftwidth=4 softtabstop=4
3
# Copyright 2011 OpenStack LLC
5
# Licensed under the Apache License, Version 2.0 (the "License"); you may
6
# not use this file except in compliance with the License. You may obtain
7
# a copy of the License at
9
# http://www.apache.org/licenses/LICENSE-2.0
11
# Unless required by applicable law or agreed to in writing, software
12
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14
# License for the specific language governing permissions and limitations
28
import kombu.connection
30
import kombu.messaging
32
from nova.openstack.common import cfg
33
from nova.openstack.common.rpc import amqp as rpc_amqp
34
from nova.openstack.common.rpc import common as rpc_common
37
cfg.StrOpt('kombu_ssl_version',
39
help='SSL version to use (valid only if SSL enabled)'),
40
cfg.StrOpt('kombu_ssl_keyfile',
42
help='SSL key file (valid only if SSL enabled)'),
43
cfg.StrOpt('kombu_ssl_certfile',
45
help='SSL cert file (valid only if SSL enabled)'),
46
cfg.StrOpt('kombu_ssl_ca_certs',
48
help=('SSL certification authority file '
49
'(valid only if SSL enabled)')),
50
cfg.StrOpt('rabbit_host',
52
help='the RabbitMQ host'),
53
cfg.IntOpt('rabbit_port',
55
help='the RabbitMQ port'),
56
cfg.BoolOpt('rabbit_use_ssl',
58
help='connect over SSL for RabbitMQ'),
59
cfg.StrOpt('rabbit_userid',
61
help='the RabbitMQ userid'),
62
cfg.StrOpt('rabbit_password',
64
help='the RabbitMQ password'),
65
cfg.StrOpt('rabbit_virtual_host',
67
help='the RabbitMQ virtual host'),
68
cfg.IntOpt('rabbit_retry_interval',
70
help='how frequently to retry connecting with RabbitMQ'),
71
cfg.IntOpt('rabbit_retry_backoff',
73
help='how long to backoff for between retries when connecting '
75
cfg.IntOpt('rabbit_max_retries',
77
help='maximum retries with trying to connect to RabbitMQ '
78
'(the default of 0 implies an infinite retry count)'),
79
cfg.BoolOpt('rabbit_durable_queues',
81
help='use durable queues in RabbitMQ'),
85
cfg.CONF.register_opts(kombu_opts)
90
class ConsumerBase(object):
91
"""Consumer base class."""
93
def __init__(self, channel, callback, tag, **kwargs):
94
"""Declare a queue on an amqp channel.
96
'channel' is the amqp channel to use
97
'callback' is the callback to call when messages are received
98
'tag' is a unique ID for the consumer on the channel
100
queue name, exchange name, and other kombu options are
101
passed in here as a dictionary.
103
self.callback = callback
107
self.reconnect(channel)
109
def reconnect(self, channel):
110
"""Re-declare the queue after a rabbit reconnect"""
111
self.channel = channel
112
self.kwargs['channel'] = channel
113
self.queue = kombu.entity.Queue(**self.kwargs)
116
def consume(self, *args, **kwargs):
117
"""Actually declare the consumer on the amqp channel. This will
118
start the flow of messages from the queue. Using the
119
Connection.iterconsume() iterator will process the messages,
120
calling the appropriate callback.
122
If a callback is specified in kwargs, use that. Otherwise,
123
use the callback passed during __init__()
125
If kwargs['nowait'] is True, then this call will block until
128
Messages will automatically be acked if the callback doesn't
132
options = {'consumer_tag': self.tag}
133
options['nowait'] = kwargs.get('nowait', False)
134
callback = kwargs.get('callback', self.callback)
136
raise ValueError("No callback defined")
138
def _callback(raw_message):
139
message = self.channel.message_to_python(raw_message)
141
callback(message.payload)
144
LOG.exception(_("Failed to process message... skipping it."))
146
self.queue.consume(*args, callback=_callback, **options)
149
"""Cancel the consuming from the queue, if it has started"""
151
self.queue.cancel(self.tag)
153
# NOTE(comstud): Kludge to get around a amqplib bug
154
if str(e) != "u'%s'" % self.tag:
159
class DirectConsumer(ConsumerBase):
160
"""Queue/consumer class for 'direct'"""
162
def __init__(self, conf, channel, msg_id, callback, tag, **kwargs):
163
"""Init a 'direct' queue.
165
'channel' is the amqp channel to use
166
'msg_id' is the msg_id to listen on
167
'callback' is the callback to call when messages are received
168
'tag' is a unique ID for the consumer on the channel
170
Other kombu options may be passed
173
options = {'durable': False,
176
options.update(kwargs)
177
exchange = kombu.entity.Exchange(
180
durable=options['durable'],
181
auto_delete=options['auto_delete'])
182
super(DirectConsumer, self).__init__(
192
class TopicConsumer(ConsumerBase):
193
"""Consumer class for 'topic'"""
195
def __init__(self, conf, channel, topic, callback, tag, name=None,
197
"""Init a 'topic' queue.
199
:param channel: the amqp channel to use
200
:param topic: the topic to listen on
201
:paramtype topic: str
202
:param callback: the callback to call when messages are received
203
:param tag: a unique ID for the consumer on the channel
204
:param name: optional queue name, defaults to topic
207
Other kombu options may be passed as keyword arguments
210
options = {'durable': conf.rabbit_durable_queues,
211
'auto_delete': False,
213
options.update(kwargs)
214
exchange = kombu.entity.Exchange(
215
name=conf.control_exchange,
217
durable=options['durable'],
218
auto_delete=options['auto_delete'])
219
super(TopicConsumer, self).__init__(
229
class FanoutConsumer(ConsumerBase):
230
"""Consumer class for 'fanout'"""
232
def __init__(self, conf, channel, topic, callback, tag, **kwargs):
233
"""Init a 'fanout' queue.
235
'channel' is the amqp channel to use
236
'topic' is the topic to listen on
237
'callback' is the callback to call when messages are received
238
'tag' is a unique ID for the consumer on the channel
240
Other kombu options may be passed
242
unique = uuid.uuid4().hex
243
exchange_name = '%s_fanout' % topic
244
queue_name = '%s_fanout_%s' % (topic, unique)
247
options = {'durable': False,
250
options.update(kwargs)
251
exchange = kombu.entity.Exchange(
254
durable=options['durable'],
255
auto_delete=options['auto_delete'])
256
super(FanoutConsumer, self).__init__(
266
class Publisher(object):
267
"""Base Publisher class"""
269
def __init__(self, channel, exchange_name, routing_key, **kwargs):
270
"""Init the Publisher class with the exchange_name, routing_key,
273
self.exchange_name = exchange_name
274
self.routing_key = routing_key
276
self.reconnect(channel)
278
def reconnect(self, channel):
279
"""Re-establish the Producer after a rabbit reconnection"""
280
self.exchange = kombu.entity.Exchange(name=self.exchange_name,
282
self.producer = kombu.messaging.Producer(exchange=self.exchange,
283
channel=channel, routing_key=self.routing_key)
287
self.producer.publish(msg)
290
class DirectPublisher(Publisher):
291
"""Publisher class for 'direct'"""
292
def __init__(self, conf, channel, msg_id, **kwargs):
293
"""init a 'direct' publisher.
295
Kombu options may be passed as keyword args to override defaults
298
options = {'durable': False,
301
options.update(kwargs)
302
super(DirectPublisher, self).__init__(channel,
309
class TopicPublisher(Publisher):
310
"""Publisher class for 'topic'"""
311
def __init__(self, conf, channel, topic, **kwargs):
312
"""init a 'topic' publisher.
314
Kombu options may be passed as keyword args to override defaults
316
options = {'durable': conf.rabbit_durable_queues,
317
'auto_delete': False,
319
options.update(kwargs)
320
super(TopicPublisher, self).__init__(channel,
321
conf.control_exchange,
327
class FanoutPublisher(Publisher):
328
"""Publisher class for 'fanout'"""
329
def __init__(self, conf, channel, topic, **kwargs):
330
"""init a 'fanout' publisher.
332
Kombu options may be passed as keyword args to override defaults
334
options = {'durable': False,
337
options.update(kwargs)
338
super(FanoutPublisher, self).__init__(channel,
345
class NotifyPublisher(TopicPublisher):
346
"""Publisher class for 'notify'"""
348
def __init__(self, conf, channel, topic, **kwargs):
349
self.durable = kwargs.pop('durable', conf.rabbit_durable_queues)
350
super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs)
352
def reconnect(self, channel):
353
super(NotifyPublisher, self).reconnect(channel)
355
# NOTE(jerdfelt): Normally the consumer would create the queue, but
356
# we do this to ensure that messages don't get dropped if the
357
# consumer is started after we do
358
queue = kombu.entity.Queue(channel=channel,
359
exchange=self.exchange,
360
durable=self.durable,
361
name=self.routing_key,
362
routing_key=self.routing_key)
366
class Connection(object):
367
"""Connection object."""
371
def __init__(self, conf, server_params=None):
373
self.consumer_thread = None
375
self.max_retries = self.conf.rabbit_max_retries
377
if self.max_retries <= 0:
378
self.max_retries = None
379
self.interval_start = self.conf.rabbit_retry_interval
380
self.interval_stepping = self.conf.rabbit_retry_backoff
381
# max retry-interval = 30 seconds
382
self.interval_max = 30
383
self.memory_transport = False
385
if server_params is None:
388
# Keys to translate from server_params to kombu params
389
server_params_to_kombu_params = {'username': 'userid'}
392
for sp_key, value in server_params.iteritems():
393
p_key = server_params_to_kombu_params.get(sp_key, sp_key)
394
params[p_key] = value
396
params.setdefault('hostname', self.conf.rabbit_host)
397
params.setdefault('port', self.conf.rabbit_port)
398
params.setdefault('userid', self.conf.rabbit_userid)
399
params.setdefault('password', self.conf.rabbit_password)
400
params.setdefault('virtual_host', self.conf.rabbit_virtual_host)
404
if self.conf.fake_rabbit:
405
self.params['transport'] = 'memory'
406
self.memory_transport = True
408
self.memory_transport = False
410
if self.conf.rabbit_use_ssl:
411
self.params['ssl'] = self._fetch_ssl_params()
413
self.connection = None
416
def _fetch_ssl_params(self):
417
"""Handles fetching what ssl params
418
should be used for the connection (if any)"""
421
# http://docs.python.org/library/ssl.html - ssl.wrap_socket
422
if self.conf.kombu_ssl_version:
423
ssl_params['ssl_version'] = self.conf.kombu_ssl_version
424
if self.conf.kombu_ssl_keyfile:
425
ssl_params['keyfile'] = self.conf.kombu_ssl_keyfile
426
if self.conf.kombu_ssl_certfile:
427
ssl_params['certfile'] = self.conf.kombu_ssl_certfile
428
if self.conf.kombu_ssl_ca_certs:
429
ssl_params['ca_certs'] = self.conf.kombu_ssl_ca_certs
430
# We might want to allow variations in the
432
ssl_params['cert_reqs'] = ssl.CERT_REQUIRED
435
# Just have the default behavior
438
# Return the extended behavior
442
"""Connect to rabbit. Re-establish any queues that may have
443
been declared before if we are reconnecting. Exceptions should
444
be handled by the caller.
447
LOG.info(_("Reconnecting to AMQP server on "
448
"%(hostname)s:%(port)d") % self.params)
450
self.connection.close()
451
except self.connection_errors:
453
# Setting this in case the next statement fails, though
454
# it shouldn't be doing any network operations, yet.
455
self.connection = None
456
self.connection = kombu.connection.BrokerConnection(
458
self.connection_errors = self.connection.connection_errors
459
if self.memory_transport:
460
# Kludge to speed up tests.
461
self.connection.transport.polling_interval = 0.0
462
self.consumer_num = itertools.count(1)
463
self.connection.connect()
464
self.channel = self.connection.channel()
465
# work around 'memory' transport bug in 1.1.3
466
if self.memory_transport:
467
self.channel._new_queue('ae.undeliver')
468
for consumer in self.consumers:
469
consumer.reconnect(self.channel)
470
LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d'),
474
"""Handles reconnecting and re-establishing queues.
475
Will retry up to self.max_retries number of times.
476
self.max_retries = 0 means to retry forever.
477
Sleep between tries, starting at self.interval_start
478
seconds, backing off self.interval_stepping number of seconds
488
except (self.connection_errors, IOError), e:
491
# NOTE(comstud): Unfortunately it's possible for amqplib
492
# to return an error not covered by its transport
493
# connection_errors in the case of a timeout waiting for
494
# a protocol response. (See paste link in LP888621)
495
# So, we check all exceptions for 'timeout' in them
496
# and try to reconnect in this case.
497
if 'timeout' not in str(e):
501
log_info['err_str'] = str(e)
502
log_info['max_retries'] = self.max_retries
503
log_info.update(self.params)
505
if self.max_retries and attempt == self.max_retries:
506
LOG.exception(_('Unable to connect to AMQP server on '
507
'%(hostname)s:%(port)d after %(max_retries)d '
508
'tries: %(err_str)s') % log_info)
509
# NOTE(comstud): Copied from original code. There's
510
# really no better recourse because if this was a queue we
511
# need to consume on, we have no way to consume anymore.
515
sleep_time = self.interval_start or 1
517
sleep_time += self.interval_stepping
518
if self.interval_max:
519
sleep_time = min(sleep_time, self.interval_max)
521
log_info['sleep_time'] = sleep_time
522
LOG.warn(_('AMQP server on %(hostname)s:%(port)d is'
523
' unreachable: %(err_str)s. Trying again in '
524
'%(sleep_time)d seconds.') % log_info)
525
time.sleep(sleep_time)
527
def ensure(self, error_callback, method, *args, **kwargs):
530
return method(*args, **kwargs)
531
except (self.connection_errors, socket.timeout, IOError), e:
534
# NOTE(comstud): Unfortunately it's possible for amqplib
535
# to return an error not covered by its transport
536
# connection_errors in the case of a timeout waiting for
537
# a protocol response. (See paste link in LP888621)
538
# So, we check all exceptions for 'timeout' in them
539
# and try to reconnect in this case.
540
if 'timeout' not in str(e):
546
def get_channel(self):
547
"""Convenience call for bin/clear_rabbit_queues"""
551
"""Close/release this connection"""
552
self.cancel_consumer_thread()
553
self.connection.release()
554
self.connection = None
557
"""Reset a connection so it can be used again"""
558
self.cancel_consumer_thread()
560
self.channel = self.connection.channel()
561
# work around 'memory' transport bug in 1.1.3
562
if self.memory_transport:
563
self.channel._new_queue('ae.undeliver')
566
def declare_consumer(self, consumer_cls, topic, callback):
567
"""Create a Consumer using the class that was passed in and
568
add it to our list of consumers
571
def _connect_error(exc):
572
log_info = {'topic': topic, 'err_str': str(exc)}
573
LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
574
"%(err_str)s") % log_info)
576
def _declare_consumer():
577
consumer = consumer_cls(self.conf, self.channel, topic, callback,
578
self.consumer_num.next())
579
self.consumers.append(consumer)
582
return self.ensure(_connect_error, _declare_consumer)
584
def iterconsume(self, limit=None, timeout=None):
585
"""Return an iterator that will consume from all queues/consumers"""
587
info = {'do_consume': True}
589
def _error_callback(exc):
590
if isinstance(exc, socket.timeout):
591
LOG.exception(_('Timed out waiting for RPC response: %s') %
593
raise rpc_common.Timeout()
595
LOG.exception(_('Failed to consume message from queue: %s') %
597
info['do_consume'] = True
600
if info['do_consume']:
601
queues_head = self.consumers[:-1]
602
queues_tail = self.consumers[-1]
603
for queue in queues_head:
604
queue.consume(nowait=True)
605
queues_tail.consume(nowait=False)
606
info['do_consume'] = False
607
return self.connection.drain_events(timeout=timeout)
609
for iteration in itertools.count(0):
610
if limit and iteration >= limit:
612
yield self.ensure(_error_callback, _consume)
614
def cancel_consumer_thread(self):
615
"""Cancel a consumer thread"""
616
if self.consumer_thread is not None:
617
self.consumer_thread.kill()
619
self.consumer_thread.wait()
620
except greenlet.GreenletExit:
622
self.consumer_thread = None
624
def publisher_send(self, cls, topic, msg, **kwargs):
625
"""Send to a publisher based on the publisher class"""
627
def _error_callback(exc):
628
log_info = {'topic': topic, 'err_str': str(exc)}
629
LOG.exception(_("Failed to publish message to topic "
630
"'%(topic)s': %(err_str)s") % log_info)
633
publisher = cls(self.conf, self.channel, topic, **kwargs)
636
self.ensure(_error_callback, _publish)
638
def declare_direct_consumer(self, topic, callback):
639
"""Create a 'direct' queue.
640
In nova's use, this is generally a msg_id queue used for
641
responses for call/multicall
643
self.declare_consumer(DirectConsumer, topic, callback)
645
def declare_topic_consumer(self, topic, callback=None, queue_name=None):
646
"""Create a 'topic' consumer."""
647
self.declare_consumer(functools.partial(TopicConsumer,
652
def declare_fanout_consumer(self, topic, callback):
653
"""Create a 'fanout' consumer"""
654
self.declare_consumer(FanoutConsumer, topic, callback)
656
def direct_send(self, msg_id, msg):
657
"""Send a 'direct' message"""
658
self.publisher_send(DirectPublisher, msg_id, msg)
660
def topic_send(self, topic, msg):
661
"""Send a 'topic' message"""
662
self.publisher_send(TopicPublisher, topic, msg)
664
def fanout_send(self, topic, msg):
665
"""Send a 'fanout' message"""
666
self.publisher_send(FanoutPublisher, topic, msg)
668
def notify_send(self, topic, msg, **kwargs):
669
"""Send a notify message on a topic"""
670
self.publisher_send(NotifyPublisher, topic, msg, **kwargs)
672
def consume(self, limit=None):
673
"""Consume from all queues/consumers"""
674
it = self.iterconsume(limit=limit)
678
except StopIteration:
681
def consume_in_thread(self):
682
"""Consumer from all queues/consumers in a greenthread"""
683
def _consumer_thread():
686
except greenlet.GreenletExit:
688
if self.consumer_thread is None:
689
self.consumer_thread = eventlet.spawn(_consumer_thread)
690
return self.consumer_thread
692
def create_consumer(self, topic, proxy, fanout=False):
693
"""Create a consumer that calls a method in a proxy object"""
694
proxy_cb = rpc_amqp.ProxyCallback(self.conf, proxy,
695
rpc_amqp.get_connection_pool(self.conf, Connection))
698
self.declare_fanout_consumer(topic, proxy_cb)
700
self.declare_topic_consumer(topic, proxy_cb)
702
def create_worker(self, topic, proxy, pool_name):
703
"""Create a worker that calls a method in a proxy object"""
704
proxy_cb = rpc_amqp.ProxyCallback(self.conf, proxy,
705
rpc_amqp.get_connection_pool(self.conf, Connection))
706
self.declare_topic_consumer(topic, proxy_cb, pool_name)
709
def create_connection(conf, new=True):
710
"""Create a connection"""
711
return rpc_amqp.create_connection(conf, new,
712
rpc_amqp.get_connection_pool(conf, Connection))
715
def multicall(conf, context, topic, msg, timeout=None):
716
"""Make a call that returns multiple times."""
717
return rpc_amqp.multicall(conf, context, topic, msg, timeout,
718
rpc_amqp.get_connection_pool(conf, Connection))
721
def call(conf, context, topic, msg, timeout=None):
722
"""Sends a message on a topic and wait for a response."""
723
return rpc_amqp.call(conf, context, topic, msg, timeout,
724
rpc_amqp.get_connection_pool(conf, Connection))
727
def cast(conf, context, topic, msg):
728
"""Sends a message on a topic without waiting for a response."""
729
return rpc_amqp.cast(conf, context, topic, msg,
730
rpc_amqp.get_connection_pool(conf, Connection))
733
def fanout_cast(conf, context, topic, msg):
734
"""Sends a message on a fanout exchange without waiting for a response."""
735
return rpc_amqp.fanout_cast(conf, context, topic, msg,
736
rpc_amqp.get_connection_pool(conf, Connection))
739
def cast_to_server(conf, context, server_params, topic, msg):
740
"""Sends a message on a topic to a specific server."""
741
return rpc_amqp.cast_to_server(conf, context, server_params, topic, msg,
742
rpc_amqp.get_connection_pool(conf, Connection))
745
def fanout_cast_to_server(conf, context, server_params, topic, msg):
746
"""Sends a message on a fanout exchange to a specific server."""
747
return rpc_amqp.cast_to_server(conf, context, server_params, topic, msg,
748
rpc_amqp.get_connection_pool(conf, Connection))
751
def notify(conf, context, topic, msg):
752
"""Sends a notification event on a topic."""
753
return rpc_amqp.notify(conf, context, topic, msg,
754
rpc_amqp.get_connection_pool(conf, Connection))
758
return rpc_amqp.cleanup(Connection.pool)