25
25
from oslo.config import cfg
27
from heat.openstack.common.gettextutils import _
27
from heat.openstack.common import excutils
28
from heat.openstack.common.gettextutils import _ # noqa
28
29
from heat.openstack.common import importutils
29
30
from heat.openstack.common import jsonutils
30
31
from heat.openstack.common import log as logging
66
67
cfg.BoolOpt('qpid_tcp_nodelay',
68
69
help='Disable Nagle algorithm'),
70
# NOTE(russellb) If any additional versions are added (beyond 1 and 2),
71
# this file could probably use some additional refactoring so that the
72
# differences between each version are split into different classes.
73
cfg.IntOpt('qpid_topology_version',
75
help="The qpid topology version to use. Version 1 is what "
76
"was originally used by impl_qpid. Version 2 includes "
77
"some backwards-incompatible changes that allow broker "
78
"federation to work. Users should update to version 2 "
79
"when they are able to take everything down, as it "
80
"requires a clean break."),
71
83
cfg.CONF.register_opts(qpid_opts)
73
85
JSON_CONTENT_TYPE = 'application/json; charset=utf8'
88
def raise_invalid_topology_version(conf):
89
msg = (_("Invalid value for qpid_topology_version: %d") %
90
conf.qpid_topology_version)
76
95
class ConsumerBase(object):
77
96
"""Consumer base class."""
79
def __init__(self, session, callback, node_name, node_opts,
98
def __init__(self, conf, session, callback, node_name, node_opts,
80
99
link_name, link_opts):
81
100
"""Declare a queue on an amqp session.
94
113
self.receiver = None
95
114
self.session = None
116
if conf.qpid_topology_version == 1:
116
addr_opts["node"]["x-declare"].update(node_opts)
136
addr_opts["node"]["x-declare"].update(node_opts)
137
elif conf.qpid_topology_version == 2:
146
raise_invalid_topology_version()
117
148
addr_opts["link"]["x-declare"].update(link_opts)
119
150
self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
121
self.reconnect(session)
152
self.connect(session)
154
def connect(self, session):
155
"""Declare the reciever on connect."""
156
self._declare_receiver(session)
123
158
def reconnect(self, session):
124
159
"""Re-declare the receiver after a qpid reconnect."""
160
self._declare_receiver(session)
162
def _declare_receiver(self, session):
125
163
self.session = session
126
164
self.receiver = session.receiver(self.address)
127
165
self.receiver.capacity = 1
152
190
except Exception:
153
191
LOG.exception(_("Failed to process message... skipping it."))
193
# TODO(sandy): Need support for optional ack_on_error.
155
194
self.session.acknowledge(message)
157
196
def get_receiver(self):
158
197
return self.receiver
199
def get_node_name(self):
200
return self.address.split(';')[0]
161
203
class DirectConsumer(ConsumerBase):
162
204
"""Queue/consumer class for 'direct'."""
169
211
'callback' is the callback to call when messages are received
172
super(DirectConsumer, self).__init__(session, callback,
173
"%s/%s" % (msg_id, msg_id),
215
"auto-delete": conf.amqp_auto_delete,
217
"durable": conf.amqp_durable_queues,
220
if conf.qpid_topology_version == 1:
221
node_name = "%s/%s" % (msg_id, msg_id)
222
node_opts = {"type": "direct"}
223
elif conf.qpid_topology_version == 2:
224
node_name = "amq.direct/%s" % msg_id
227
raise_invalid_topology_version()
229
super(DirectConsumer, self).__init__(conf, session, callback,
230
node_name, node_opts, msg_id,
179
234
class TopicConsumer(ConsumerBase):
193
248
exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
194
super(TopicConsumer, self).__init__(session, callback,
195
"%s/%s" % (exchange_name, topic),
196
{}, name or topic, {})
250
"auto-delete": conf.amqp_auto_delete,
251
"durable": conf.amqp_durable_queues,
254
if conf.qpid_topology_version == 1:
255
node_name = "%s/%s" % (exchange_name, topic)
256
elif conf.qpid_topology_version == 2:
257
node_name = "amq.topic/topic/%s/%s" % (exchange_name, topic)
259
raise_invalid_topology_version()
261
super(TopicConsumer, self).__init__(conf, session, callback, node_name,
262
{}, name or topic, link_opts)
199
265
class FanoutConsumer(ConsumerBase):
206
272
'topic' is the topic to listen on
207
273
'callback' is the callback to call when messages are received
210
super(FanoutConsumer, self).__init__(
213
{"durable": False, "type": "fanout"},
214
"%s_fanout_%s" % (topic, uuid.uuid4().hex),
277
link_opts = {"exclusive": True}
279
if conf.qpid_topology_version == 1:
280
node_name = "%s_fanout" % topic
281
node_opts = {"durable": False, "type": "fanout"}
282
link_name = "%s_fanout_%s" % (topic, uuid.uuid4().hex)
283
elif conf.qpid_topology_version == 2:
284
node_name = "amq.topic/fanout/%s" % topic
288
raise_invalid_topology_version()
290
super(FanoutConsumer, self).__init__(conf, session, callback,
291
node_name, node_opts, link_name,
294
def reconnect(self, session):
295
topic = self.get_node_name().rpartition('_fanout')[0]
299
'callback': self.callback,
302
self.__init__(conf=self.conf, **params)
304
super(FanoutConsumer, self).reconnect(session)
218
307
class Publisher(object):
219
308
"""Base Publisher class."""
221
def __init__(self, session, node_name, node_opts=None):
310
def __init__(self, conf, session, node_name, node_opts=None):
222
311
"""Init the Publisher class with the exchange_name, routing_key,
223
312
and other options
225
314
self.sender = None
226
315
self.session = session
234
# auto-delete isn't implemented for exchanges in qpid,
235
# but put in here anyway
317
if conf.qpid_topology_version == 1:
324
# auto-delete isn't implemented for exchanges in qpid,
325
# but put in here anyway
241
addr_opts["node"]["x-declare"].update(node_opts)
331
addr_opts["node"]["x-declare"].update(node_opts)
243
self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
333
self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
334
elif conf.qpid_topology_version == 2:
335
self.address = node_name
337
raise_invalid_topology_version()
245
339
self.reconnect(session)
284
378
"""Publisher class for 'direct'."""
285
379
def __init__(self, conf, session, msg_id):
286
380
"""Init a 'direct' publisher."""
287
super(DirectPublisher, self).__init__(session, msg_id,
382
if conf.qpid_topology_version == 1:
384
node_opts = {"type": "direct"}
385
elif conf.qpid_topology_version == 2:
386
node_name = "amq.direct/%s" % msg_id
389
raise_invalid_topology_version()
391
super(DirectPublisher, self).__init__(conf, session, node_name,
291
395
class TopicPublisher(Publisher):
294
398
"""init a 'topic' publisher.
296
400
exchange_name = rpc_amqp.get_control_exchange(conf)
297
super(TopicPublisher, self).__init__(session,
298
"%s/%s" % (exchange_name, topic))
402
if conf.qpid_topology_version == 1:
403
node_name = "%s/%s" % (exchange_name, topic)
404
elif conf.qpid_topology_version == 2:
405
node_name = "amq.topic/topic/%s/%s" % (exchange_name, topic)
407
raise_invalid_topology_version()
409
super(TopicPublisher, self).__init__(conf, session, node_name)
301
412
class FanoutPublisher(Publisher):
303
414
def __init__(self, conf, session, topic):
304
415
"""init a 'fanout' publisher.
306
super(FanoutPublisher, self).__init__(
308
"%s_fanout" % topic, {"type": "fanout"})
418
if conf.qpid_topology_version == 1:
419
node_name = "%s_fanout" % topic
420
node_opts = {"type": "fanout"}
421
elif conf.qpid_topology_version == 2:
422
node_name = "amq.topic/fanout/%s" % topic
425
raise_invalid_topology_version()
427
super(FanoutPublisher, self).__init__(conf, session, node_name,
311
431
class NotifyPublisher(Publisher):
314
434
"""init a 'topic' publisher.
316
436
exchange_name = rpc_amqp.get_control_exchange(conf)
317
super(NotifyPublisher, self).__init__(session,
318
"%s/%s" % (exchange_name, topic),
437
node_opts = {"durable": True}
439
if conf.qpid_topology_version == 1:
440
node_name = "%s/%s" % (exchange_name, topic)
441
elif conf.qpid_topology_version == 2:
442
node_name = "amq.topic/topic/%s/%s" % (exchange_name, topic)
444
raise_invalid_topology_version()
446
super(NotifyPublisher, self).__init__(conf, session, node_name,
322
450
class Connection(object):