1
# vim: tabstop=4 shiftwidth=4 softtabstop=4
3
# Copyright 2011 OpenStack LLC
4
# Copyright 2011 - 2012, Red Hat, Inc.
6
# Licensed under the Apache License, Version 2.0 (the "License"); you may
7
# not use this file except in compliance with the License. You may obtain
8
# a copy of the License at
10
# http://www.apache.org/licenses/LICENSE-2.0
12
# Unless required by applicable law or agreed to in writing, software
13
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15
# License for the specific language governing permissions and limitations
28
import qpid.messaging.exceptions
30
from nova.openstack.common import cfg
31
from nova.openstack.common.gettextutils import _
32
from nova.openstack.common.rpc import amqp as rpc_amqp
33
from nova.openstack.common.rpc import common as rpc_common
35
LOG = logging.getLogger(__name__)
38
cfg.StrOpt('qpid_hostname',
40
help='Qpid broker hostname'),
41
cfg.StrOpt('qpid_port',
43
help='Qpid broker port'),
44
cfg.StrOpt('qpid_username',
46
help='Username for qpid connection'),
47
cfg.StrOpt('qpid_password',
49
help='Password for qpid connection'),
50
cfg.StrOpt('qpid_sasl_mechanisms',
52
help='Space separated list of SASL mechanisms to use for auth'),
53
cfg.BoolOpt('qpid_reconnect',
55
help='Automatically reconnect'),
56
cfg.IntOpt('qpid_reconnect_timeout',
58
help='Reconnection timeout in seconds'),
59
cfg.IntOpt('qpid_reconnect_limit',
61
help='Max reconnections before giving up'),
62
cfg.IntOpt('qpid_reconnect_interval_min',
64
help='Minimum seconds between reconnection attempts'),
65
cfg.IntOpt('qpid_reconnect_interval_max',
67
help='Maximum seconds between reconnection attempts'),
68
cfg.IntOpt('qpid_reconnect_interval',
70
help='Equivalent to setting max and min to the same value'),
71
cfg.IntOpt('qpid_heartbeat',
73
help='Seconds between connection keepalive heartbeats'),
74
cfg.StrOpt('qpid_protocol',
76
help="Transport to use, either 'tcp' or 'ssl'"),
77
cfg.BoolOpt('qpid_tcp_nodelay',
79
help='Disable Nagle algorithm'),
82
cfg.CONF.register_opts(qpid_opts)
85
class ConsumerBase(object):
86
"""Consumer base class."""
88
def __init__(self, session, callback, node_name, node_opts,
89
link_name, link_opts):
90
"""Declare a queue on an amqp session.
92
'session' is the amqp session to use
93
'callback' is the callback to call when messages are received
94
'node_name' is the first part of the Qpid address string, before ';'
95
'node_opts' will be applied to the "x-declare" section of "node"
96
in the address string.
97
'link_name' goes into the "name" field of the "link" in the address
99
'link_opts' will be applied to the "x-declare" section of "link"
100
in the address string.
102
self.callback = callback
125
addr_opts["node"]["x-declare"].update(node_opts)
126
addr_opts["link"]["x-declare"].update(link_opts)
128
self.address = "%s ; %s" % (node_name, json.dumps(addr_opts))
130
self.reconnect(session)
132
def reconnect(self, session):
133
"""Re-declare the receiver after a qpid reconnect"""
134
self.session = session
135
self.receiver = session.receiver(self.address)
136
self.receiver.capacity = 1
139
"""Fetch the message and pass it to the callback object"""
140
message = self.receiver.fetch()
142
self.callback(message.content)
144
LOG.exception(_("Failed to process message... skipping it."))
146
self.session.acknowledge(message)
148
def get_receiver(self):
152
class DirectConsumer(ConsumerBase):
153
"""Queue/consumer class for 'direct'"""
155
def __init__(self, conf, session, msg_id, callback):
156
"""Init a 'direct' queue.
158
'session' is the amqp session to use
159
'msg_id' is the msg_id to listen on
160
'callback' is the callback to call when messages are received
163
super(DirectConsumer, self).__init__(session, callback,
164
"%s/%s" % (msg_id, msg_id),
170
class TopicConsumer(ConsumerBase):
171
"""Consumer class for 'topic'"""
173
def __init__(self, conf, session, topic, callback, name=None):
174
"""Init a 'topic' queue.
176
:param session: the amqp session to use
177
:param topic: is the topic to listen on
178
:paramtype topic: str
179
:param callback: the callback to call when messages are received
180
:param name: optional queue name, defaults to topic
183
super(TopicConsumer, self).__init__(session, callback,
184
"%s/%s" % (conf.control_exchange, topic), {},
188
class FanoutConsumer(ConsumerBase):
189
"""Consumer class for 'fanout'"""
191
def __init__(self, conf, session, topic, callback):
192
"""Init a 'fanout' queue.
194
'session' is the amqp session to use
195
'topic' is the topic to listen on
196
'callback' is the callback to call when messages are received
199
super(FanoutConsumer, self).__init__(session, callback,
201
{"durable": False, "type": "fanout"},
202
"%s_fanout_%s" % (topic, uuid.uuid4().hex),
206
class Publisher(object):
207
"""Base Publisher class"""
209
def __init__(self, session, node_name, node_opts=None):
210
"""Init the Publisher class with the exchange_name, routing_key,
214
self.session = session
222
# auto-delete isn't implemented for exchanges in qpid,
223
# but put in here anyway
229
addr_opts["node"]["x-declare"].update(node_opts)
231
self.address = "%s ; %s" % (node_name, json.dumps(addr_opts))
233
self.reconnect(session)
235
def reconnect(self, session):
236
"""Re-establish the Sender after a reconnection"""
237
self.sender = session.sender(self.address)
241
self.sender.send(msg)
244
class DirectPublisher(Publisher):
245
"""Publisher class for 'direct'"""
246
def __init__(self, conf, session, msg_id):
247
"""Init a 'direct' publisher."""
248
super(DirectPublisher, self).__init__(session, msg_id,
252
class TopicPublisher(Publisher):
253
"""Publisher class for 'topic'"""
254
def __init__(self, conf, session, topic):
255
"""init a 'topic' publisher.
257
super(TopicPublisher, self).__init__(session,
258
"%s/%s" % (conf.control_exchange, topic))
261
class FanoutPublisher(Publisher):
262
"""Publisher class for 'fanout'"""
263
def __init__(self, conf, session, topic):
264
"""init a 'fanout' publisher.
266
super(FanoutPublisher, self).__init__(session,
267
"%s_fanout" % topic, {"type": "fanout"})
270
class NotifyPublisher(Publisher):
271
"""Publisher class for notifications"""
272
def __init__(self, conf, session, topic):
273
"""init a 'topic' publisher.
275
super(NotifyPublisher, self).__init__(session,
276
"%s/%s" % (conf.control_exchange, topic),
280
class Connection(object):
281
"""Connection object."""
285
def __init__(self, conf, server_params=None):
288
self.consumer_thread = None
291
if server_params is None:
294
default_params = dict(hostname=self.conf.qpid_hostname,
295
port=self.conf.qpid_port,
296
username=self.conf.qpid_username,
297
password=self.conf.qpid_password)
299
params = server_params
300
for key in default_params.keys():
301
params.setdefault(key, default_params[key])
303
self.broker = params['hostname'] + ":" + str(params['port'])
304
# Create the connection - this does not open the connection
305
self.connection = qpid.messaging.Connection(self.broker)
307
# Check if flags are set and if so set them for the connection
308
# before we call open
309
self.connection.username = params['username']
310
self.connection.password = params['password']
311
self.connection.sasl_mechanisms = self.conf.qpid_sasl_mechanisms
312
self.connection.reconnect = self.conf.qpid_reconnect
313
if self.conf.qpid_reconnect_timeout:
314
self.connection.reconnect_timeout = (
315
self.conf.qpid_reconnect_timeout)
316
if self.conf.qpid_reconnect_limit:
317
self.connection.reconnect_limit = self.conf.qpid_reconnect_limit
318
if self.conf.qpid_reconnect_interval_max:
319
self.connection.reconnect_interval_max = (
320
self.conf.qpid_reconnect_interval_max)
321
if self.conf.qpid_reconnect_interval_min:
322
self.connection.reconnect_interval_min = (
323
self.conf.qpid_reconnect_interval_min)
324
if self.conf.qpid_reconnect_interval:
325
self.connection.reconnect_interval = (
326
self.conf.qpid_reconnect_interval)
327
self.connection.hearbeat = self.conf.qpid_heartbeat
328
self.connection.protocol = self.conf.qpid_protocol
329
self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay
331
# Open is part of reconnect -
332
# NOTE(WGH) not sure we need this with the reconnect flags
335
def _register_consumer(self, consumer):
336
self.consumers[str(consumer.get_receiver())] = consumer
338
def _lookup_consumer(self, receiver):
339
return self.consumers[str(receiver)]
342
"""Handles reconnecting and re-establishing sessions and queues"""
343
if self.connection.opened():
345
self.connection.close()
346
except qpid.messaging.exceptions.ConnectionError:
351
self.connection.open()
352
except qpid.messaging.exceptions.ConnectionError, e:
353
LOG.error(_('Unable to connect to AMQP server: %s'), e)
354
time.sleep(self.conf.qpid_reconnect_interval or 1)
358
LOG.info(_('Connected to AMQP server on %s'), self.broker)
360
self.session = self.connection.session()
362
for consumer in self.consumers.itervalues():
363
consumer.reconnect(self.session)
366
LOG.debug(_("Re-established AMQP queues"))
368
def ensure(self, error_callback, method, *args, **kwargs):
371
return method(*args, **kwargs)
372
except (qpid.messaging.exceptions.Empty,
373
qpid.messaging.exceptions.ConnectionError), e:
379
"""Close/release this connection"""
380
self.cancel_consumer_thread()
381
self.connection.close()
382
self.connection = None
385
"""Reset a connection so it can be used again"""
386
self.cancel_consumer_thread()
388
self.session = self.connection.session()
391
def declare_consumer(self, consumer_cls, topic, callback):
392
"""Create a Consumer using the class that was passed in and
393
add it to our list of consumers
395
def _connect_error(exc):
396
log_info = {'topic': topic, 'err_str': str(exc)}
397
LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
398
"%(err_str)s") % log_info)
400
def _declare_consumer():
401
consumer = consumer_cls(self.conf, self.session, topic, callback)
402
self._register_consumer(consumer)
405
return self.ensure(_connect_error, _declare_consumer)
407
def iterconsume(self, limit=None, timeout=None):
408
"""Return an iterator that will consume from all queues/consumers"""
410
def _error_callback(exc):
411
if isinstance(exc, qpid.messaging.exceptions.Empty):
412
LOG.exception(_('Timed out waiting for RPC response: %s') %
414
raise rpc_common.Timeout()
416
LOG.exception(_('Failed to consume message from queue: %s') %
420
nxt_receiver = self.session.next_receiver(timeout=timeout)
422
self._lookup_consumer(nxt_receiver).consume()
424
LOG.exception(_("Error processing message. Skipping it."))
426
for iteration in itertools.count(0):
427
if limit and iteration >= limit:
429
yield self.ensure(_error_callback, _consume)
431
def cancel_consumer_thread(self):
432
"""Cancel a consumer thread"""
433
if self.consumer_thread is not None:
434
self.consumer_thread.kill()
436
self.consumer_thread.wait()
437
except greenlet.GreenletExit:
439
self.consumer_thread = None
441
def publisher_send(self, cls, topic, msg):
442
"""Send to a publisher based on the publisher class"""
444
def _connect_error(exc):
445
log_info = {'topic': topic, 'err_str': str(exc)}
446
LOG.exception(_("Failed to publish message to topic "
447
"'%(topic)s': %(err_str)s") % log_info)
449
def _publisher_send():
450
publisher = cls(self.conf, self.session, topic)
453
return self.ensure(_connect_error, _publisher_send)
455
def declare_direct_consumer(self, topic, callback):
456
"""Create a 'direct' queue.
457
In nova's use, this is generally a msg_id queue used for
458
responses for call/multicall
460
self.declare_consumer(DirectConsumer, topic, callback)
462
def declare_topic_consumer(self, topic, callback=None, queue_name=None):
463
"""Create a 'topic' consumer."""
464
self.declare_consumer(functools.partial(TopicConsumer,
469
def declare_fanout_consumer(self, topic, callback):
470
"""Create a 'fanout' consumer"""
471
self.declare_consumer(FanoutConsumer, topic, callback)
473
def direct_send(self, msg_id, msg):
474
"""Send a 'direct' message"""
475
self.publisher_send(DirectPublisher, msg_id, msg)
477
def topic_send(self, topic, msg):
478
"""Send a 'topic' message"""
479
self.publisher_send(TopicPublisher, topic, msg)
481
def fanout_send(self, topic, msg):
482
"""Send a 'fanout' message"""
483
self.publisher_send(FanoutPublisher, topic, msg)
485
def notify_send(self, topic, msg, **kwargs):
486
"""Send a notify message on a topic"""
487
self.publisher_send(NotifyPublisher, topic, msg)
489
def consume(self, limit=None):
490
"""Consume from all queues/consumers"""
491
it = self.iterconsume(limit=limit)
495
except StopIteration:
498
def consume_in_thread(self):
499
"""Consumer from all queues/consumers in a greenthread"""
500
def _consumer_thread():
503
except greenlet.GreenletExit:
505
if self.consumer_thread is None:
506
self.consumer_thread = eventlet.spawn(_consumer_thread)
507
return self.consumer_thread
509
def create_consumer(self, topic, proxy, fanout=False):
510
"""Create a consumer that calls a method in a proxy object"""
511
proxy_cb = rpc_amqp.ProxyCallback(self.conf, proxy,
512
rpc_amqp.get_connection_pool(self.conf, Connection))
515
consumer = FanoutConsumer(self.conf, self.session, topic, proxy_cb)
517
consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb)
519
self._register_consumer(consumer)
523
def create_worker(self, topic, proxy, pool_name):
524
"""Create a worker that calls a method in a proxy object"""
525
proxy_cb = rpc_amqp.ProxyCallback(self.conf, proxy,
526
rpc_amqp.get_connection_pool(self.conf, Connection))
528
consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb,
531
self._register_consumer(consumer)
536
def create_connection(conf, new=True):
537
"""Create a connection"""
538
return rpc_amqp.create_connection(conf, new,
539
rpc_amqp.get_connection_pool(conf, Connection))
542
def multicall(conf, context, topic, msg, timeout=None):
543
"""Make a call that returns multiple times."""
544
return rpc_amqp.multicall(conf, context, topic, msg, timeout,
545
rpc_amqp.get_connection_pool(conf, Connection))
548
def call(conf, context, topic, msg, timeout=None):
549
"""Sends a message on a topic and wait for a response."""
550
return rpc_amqp.call(conf, context, topic, msg, timeout,
551
rpc_amqp.get_connection_pool(conf, Connection))
554
def cast(conf, context, topic, msg):
555
"""Sends a message on a topic without waiting for a response."""
556
return rpc_amqp.cast(conf, context, topic, msg,
557
rpc_amqp.get_connection_pool(conf, Connection))
560
def fanout_cast(conf, context, topic, msg):
561
"""Sends a message on a fanout exchange without waiting for a response."""
562
return rpc_amqp.fanout_cast(conf, context, topic, msg,
563
rpc_amqp.get_connection_pool(conf, Connection))
566
def cast_to_server(conf, context, server_params, topic, msg):
567
"""Sends a message on a topic to a specific server."""
568
return rpc_amqp.cast_to_server(conf, context, server_params, topic, msg,
569
rpc_amqp.get_connection_pool(conf, Connection))
572
def fanout_cast_to_server(conf, context, server_params, topic, msg):
573
"""Sends a message on a fanout exchange to a specific server."""
574
return rpc_amqp.fanout_cast_to_server(conf, context, server_params, topic,
575
msg, rpc_amqp.get_connection_pool(conf, Connection))
578
def notify(conf, context, topic, msg):
579
"""Sends a notification event on a topic."""
580
return rpc_amqp.notify(conf, context, topic, msg,
581
rpc_amqp.get_connection_pool(conf, Connection))
585
return rpc_amqp.cleanup(Connection.pool)