1
1
# vim: tabstop=4 shiftwidth=4 softtabstop=4
3
# Copyright 2011 OpenStack LLC
3
# Copyright 2011 OpenStack Foundation
4
4
# Copyright 2011 - 2012, Red Hat, Inc.
6
6
# Licensed under the Apache License, Version 2.0 (the "License"); you may
31
31
from heat.openstack.common.rpc import amqp as rpc_amqp
32
32
from heat.openstack.common.rpc import common as rpc_common
34
qpid_codec = importutils.try_import("qpid.codec010")
34
35
qpid_messaging = importutils.try_import("qpid.messaging")
35
36
qpid_exceptions = importutils.try_import("qpid.messaging.exceptions")
40
41
cfg.StrOpt('qpid_hostname',
41
42
default='localhost',
42
43
help='Qpid broker hostname'),
43
cfg.StrOpt('qpid_port',
44
cfg.IntOpt('qpid_port',
45
46
help='Qpid broker port'),
46
47
cfg.ListOpt('qpid_hosts',
47
48
default=['$qpid_hostname:$qpid_port'],
51
52
help='Username for qpid connection'),
52
53
cfg.StrOpt('qpid_password',
54
help='Password for qpid connection'),
55
help='Password for qpid connection',
55
57
cfg.StrOpt('qpid_sasl_mechanisms',
57
59
help='Space separated list of SASL mechanisms to use for auth'),
117
121
self.reconnect(session)
119
123
def reconnect(self, session):
120
"""Re-declare the receiver after a qpid reconnect"""
124
"""Re-declare the receiver after a qpid reconnect."""
121
125
self.session = session
122
126
self.receiver = session.receiver(self.address)
123
127
self.receiver.capacity = 1
129
def _unpack_json_msg(self, msg):
130
"""Load the JSON data in msg if msg.content_type indicates that it
131
is necessary. Put the loaded data back into msg.content and
132
update msg.content_type appropriately.
134
A Qpid Message containing a dict will have a content_type of
135
'amqp/map', whereas one containing a string that needs to be converted
136
back from JSON will have a content_type of JSON_CONTENT_TYPE.
138
:param msg: a Qpid Message object
141
if msg.content_type == JSON_CONTENT_TYPE:
142
msg.content = jsonutils.loads(msg.content)
143
msg.content_type = 'amqp/map'
125
145
def consume(self):
126
"""Fetch the message and pass it to the callback object"""
146
"""Fetch the message and pass it to the callback object."""
127
147
message = self.receiver.fetch()
149
self._unpack_json_msg(message)
129
150
msg = rpc_common.deserialize_msg(message.content)
130
151
self.callback(msg)
131
152
except Exception:
140
161
class DirectConsumer(ConsumerBase):
141
"""Queue/consumer class for 'direct'"""
162
"""Queue/consumer class for 'direct'."""
143
164
def __init__(self, conf, session, msg_id, callback):
144
165
"""Init a 'direct' queue.
158
179
class TopicConsumer(ConsumerBase):
159
"""Consumer class for 'topic'"""
180
"""Consumer class for 'topic'."""
161
182
def __init__(self, conf, session, topic, callback, name=None,
162
183
exchange_name=None):
178
199
class FanoutConsumer(ConsumerBase):
179
"""Consumer class for 'fanout'"""
200
"""Consumer class for 'fanout'."""
181
202
def __init__(self, conf, session, topic, callback):
182
203
"""Init a 'fanout' queue.
197
218
class Publisher(object):
198
"""Base Publisher class"""
219
"""Base Publisher class."""
200
221
def __init__(self, session, node_name, node_opts=None):
201
222
"""Init the Publisher class with the exchange_name, routing_key,
224
245
self.reconnect(session)
226
247
def reconnect(self, session):
227
"""Re-establish the Sender after a reconnection"""
248
"""Re-establish the Sender after a reconnection."""
228
249
self.sender = session.sender(self.address)
251
def _pack_json_msg(self, msg):
252
"""Qpid cannot serialize dicts containing strings longer than 65535
253
characters. This function dumps the message content to a JSON
254
string, which Qpid is able to handle.
256
:param msg: May be either a Qpid Message object or a bare dict.
257
:returns: A Qpid Message with its content field JSON encoded.
260
msg.content = jsonutils.dumps(msg.content)
261
except AttributeError:
262
# Need to have a Qpid message so we can set the content_type.
263
msg = qpid_messaging.Message(jsonutils.dumps(msg))
264
msg.content_type = JSON_CONTENT_TYPE
230
267
def send(self, msg):
268
"""Send a message."""
270
# Check if Qpid can encode the message
272
if not hasattr(check_msg, 'content_type'):
273
check_msg = qpid_messaging.Message(msg)
274
content_type = check_msg.content_type
275
enc, dec = qpid_messaging.message.get_codec(content_type)
276
enc(check_msg.content)
277
except qpid_codec.CodecException:
278
# This means the message couldn't be serialized as a dict.
279
msg = self._pack_json_msg(msg)
232
280
self.sender.send(msg)
235
283
class DirectPublisher(Publisher):
236
"""Publisher class for 'direct'"""
284
"""Publisher class for 'direct'."""
237
285
def __init__(self, conf, session, msg_id):
238
286
"""Init a 'direct' publisher."""
239
287
super(DirectPublisher, self).__init__(session, msg_id,
243
291
class TopicPublisher(Publisher):
244
"""Publisher class for 'topic'"""
292
"""Publisher class for 'topic'."""
245
293
def __init__(self, conf, session, topic):
246
294
"""init a 'topic' publisher.
253
301
class FanoutPublisher(Publisher):
254
"""Publisher class for 'fanout'"""
302
"""Publisher class for 'fanout'."""
255
303
def __init__(self, conf, session, topic):
256
304
"""init a 'fanout' publisher.
263
311
class NotifyPublisher(Publisher):
264
"""Publisher class for notifications"""
312
"""Publisher class for notifications."""
265
313
def __init__(self, conf, session, topic):
266
314
"""init a 'topic' publisher.
319
367
# Reconnection is done by self.reconnect()
320
368
self.connection.reconnect = False
321
369
self.connection.heartbeat = self.conf.qpid_heartbeat
322
self.connection.protocol = self.conf.qpid_protocol
370
self.connection.transport = self.conf.qpid_protocol
323
371
self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay
325
373
def _register_consumer(self, consumer):
329
377
return self.consumers[str(receiver)]
331
379
def reconnect(self):
332
"""Handles reconnecting and re-establishing sessions and queues"""
333
if self.connection.opened():
335
self.connection.close()
336
except qpid_exceptions.ConnectionError:
380
"""Handles reconnecting and re-establishing sessions and queues."""
384
# Close the session if necessary
385
if self.connection.opened():
387
self.connection.close()
388
except qpid_exceptions.ConnectionError:
342
391
broker = self.brokers[attempt % len(self.brokers)]
346
395
self.connection_create(broker)
347
396
self.connection.open()
348
except qpid_exceptions.ConnectionError, e:
397
except qpid_exceptions.ConnectionError as e:
349
398
msg_dict = dict(e=e, delay=delay)
350
399
msg = _("Unable to connect to AMQP server: %(e)s. "
351
400
"Sleeping %(delay)s seconds") % msg_dict
374
423
return method(*args, **kwargs)
375
424
except (qpid_exceptions.Empty,
376
qpid_exceptions.ConnectionError), e:
425
qpid_exceptions.ConnectionError) as e:
377
426
if error_callback:
378
427
error_callback(e)
382
"""Close/release this connection"""
431
"""Close/release this connection."""
383
432
self.cancel_consumer_thread()
384
433
self.wait_on_proxy_callbacks()
385
self.connection.close()
435
self.connection.close()
437
# NOTE(dripton) Logging exceptions that happen during cleanup just
438
# causes confusion; there's really nothing useful we can do with
386
441
self.connection = None
389
"""Reset a connection so it can be used again"""
444
"""Reset a connection so it can be used again."""
390
445
self.cancel_consumer_thread()
391
446
self.wait_on_proxy_callbacks()
392
447
self.session.close()
410
465
return self.ensure(_connect_error, _declare_consumer)
412
467
def iterconsume(self, limit=None, timeout=None):
413
"""Return an iterator that will consume from all queues/consumers"""
468
"""Return an iterator that will consume from all queues/consumers."""
415
470
def _error_callback(exc):
416
471
if isinstance(exc, qpid_exceptions.Empty):
417
LOG.exception(_('Timed out waiting for RPC response: %s') %
472
LOG.debug(_('Timed out waiting for RPC response: %s') %
419
474
raise rpc_common.Timeout()
421
476
LOG.exception(_('Failed to consume message from queue: %s') %
434
489
yield self.ensure(_error_callback, _consume)
436
491
def cancel_consumer_thread(self):
437
"""Cancel a consumer thread"""
492
"""Cancel a consumer thread."""
438
493
if self.consumer_thread is not None:
439
494
self.consumer_thread.kill()
451
506
def publisher_send(self, cls, topic, msg):
452
"""Send to a publisher based on the publisher class"""
507
"""Send to a publisher based on the publisher class."""
454
509
def _connect_error(exc):
455
510
log_info = {'topic': topic, 'err_str': str(exc)}
481
536
def declare_fanout_consumer(self, topic, callback):
482
"""Create a 'fanout' consumer"""
537
"""Create a 'fanout' consumer."""
483
538
self.declare_consumer(FanoutConsumer, topic, callback)
485
540
def direct_send(self, msg_id, msg):
486
"""Send a 'direct' message"""
541
"""Send a 'direct' message."""
487
542
self.publisher_send(DirectPublisher, msg_id, msg)
489
544
def topic_send(self, topic, msg, timeout=None):
490
"""Send a 'topic' message"""
545
"""Send a 'topic' message."""
492
547
# We want to create a message with attributes, e.g. a TTL. We
493
548
# don't really need to keep 'msg' in its JSON format any longer
502
557
self.publisher_send(TopicPublisher, topic, qpid_message)
504
559
def fanout_send(self, topic, msg):
505
"""Send a 'fanout' message"""
560
"""Send a 'fanout' message."""
506
561
self.publisher_send(FanoutPublisher, topic, msg)
508
563
def notify_send(self, topic, msg, **kwargs):
509
"""Send a notify message on a topic"""
564
"""Send a notify message on a topic."""
510
565
self.publisher_send(NotifyPublisher, topic, msg)
512
567
def consume(self, limit=None):
513
"""Consume from all queues/consumers"""
568
"""Consume from all queues/consumers."""
514
569
it = self.iterconsume(limit=limit)
530
585
return self.consumer_thread
532
587
def create_consumer(self, topic, proxy, fanout=False):
533
"""Create a consumer that calls a method in a proxy object"""
588
"""Create a consumer that calls a method in a proxy object."""
534
589
proxy_cb = rpc_amqp.ProxyCallback(
535
590
self.conf, proxy,
536
591
rpc_amqp.get_connection_pool(self.conf, Connection))
548
603
def create_worker(self, topic, proxy, pool_name):
549
"""Create a worker that calls a method in a proxy object"""
604
"""Create a worker that calls a method in a proxy object."""
550
605
proxy_cb = rpc_amqp.ProxyCallback(
551
606
self.conf, proxy,
552
607
rpc_amqp.get_connection_pool(self.conf, Connection))
617
def join_consumer_pool(self, callback, pool_name, topic,
619
"""Register as a member of a group of consumers for a given topic from
620
the specified exchange.
622
Exactly one member of a given pool will receive each message.
624
A message will be delivered to multiple pools, if more than
627
callback_wrapper = rpc_amqp.CallbackWrapper(
630
connection_pool=rpc_amqp.get_connection_pool(self.conf,
633
self.proxy_callbacks.append(callback_wrapper)
635
consumer = TopicConsumer(conf=self.conf,
636
session=self.session,
638
callback=callback_wrapper,
640
exchange_name=exchange_name)
642
self._register_consumer(consumer)
563
646
def create_connection(conf, new=True):
564
"""Create a connection"""
647
"""Create a connection."""
565
648
return rpc_amqp.create_connection(
567
650
rpc_amqp.get_connection_pool(conf, Connection))