~ubuntu-branches/ubuntu/saucy/heat/saucy

« back to all changes in this revision

Viewing changes to heat/openstack/common/rpc/impl_qpid.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short, Chuck Short, Adam Gandelman
  • Date: 2013-09-08 21:51:19 UTC
  • mfrom: (1.1.4)
  • Revision ID: package-import@ubuntu.com-20130908215119-r939tu4aumqgdrkx
Tags: 2013.2~b3-0ubuntu1
[ Chuck Short ]
* New upstream release.
* debian/control: Add python-netaddr as build-dep.
* debian/heat-common.install: Remove heat-boto and associated man-page
* debian/heat-common.install: Remove heat-cfn and associated man-page
* debian/heat-common.install: Remove heat-watch and associated man-page
* debian/patches/fix-sqlalchemy-0.8.patch: Dropped

[ Adam Gandelman ]
* debian/patches/default-kombu.patch: Dropped.
* debian/patches/default-sqlite.patch: Refreshed.
* debian/*.install, rules: Install heat.conf.sample as common
  config file in heat-common. Drop other per-package configs, they
  are no longer used.
* debian/rules: Clean pbr .egg from build dir if it exists.

Show diffs side-by-side

added added

removed removed

Lines of Context:
24
24
import greenlet
25
25
from oslo.config import cfg
26
26
 
27
 
from heat.openstack.common.gettextutils import _
 
27
from heat.openstack.common import excutils
 
28
from heat.openstack.common.gettextutils import _  # noqa
28
29
from heat.openstack.common import importutils
29
30
from heat.openstack.common import jsonutils
30
31
from heat.openstack.common import log as logging
66
67
    cfg.BoolOpt('qpid_tcp_nodelay',
67
68
                default=True,
68
69
                help='Disable Nagle algorithm'),
 
70
    # NOTE(russellb) If any additional versions are added (beyond 1 and 2),
 
71
    # this file could probably use some additional refactoring so that the
 
72
    # differences between each version are split into different classes.
 
73
    cfg.IntOpt('qpid_topology_version',
 
74
               default=1,
 
75
               help="The qpid topology version to use.  Version 1 is what "
 
76
                    "was originally used by impl_qpid.  Version 2 includes "
 
77
                    "some backwards-incompatible changes that allow broker "
 
78
                    "federation to work.  Users should update to version 2 "
 
79
                    "when they are able to take everything down, as it "
 
80
                    "requires a clean break."),
69
81
]
70
82
 
71
83
cfg.CONF.register_opts(qpid_opts)
73
85
JSON_CONTENT_TYPE = 'application/json; charset=utf8'
74
86
 
75
87
 
 
88
def raise_invalid_topology_version(conf):
 
89
    msg = (_("Invalid value for qpid_topology_version: %d") %
 
90
           conf.qpid_topology_version)
 
91
    LOG.error(msg)
 
92
    raise Exception(msg)
 
93
 
 
94
 
76
95
class ConsumerBase(object):
77
96
    """Consumer base class."""
78
97
 
79
 
    def __init__(self, session, callback, node_name, node_opts,
 
98
    def __init__(self, conf, session, callback, node_name, node_opts,
80
99
                 link_name, link_opts):
81
100
        """Declare a queue on an amqp session.
82
101
 
94
113
        self.receiver = None
95
114
        self.session = None
96
115
 
97
 
        addr_opts = {
98
 
            "create": "always",
99
 
            "node": {
100
 
                "type": "topic",
101
 
                "x-declare": {
 
116
        if conf.qpid_topology_version == 1:
 
117
            addr_opts = {
 
118
                "create": "always",
 
119
                "node": {
 
120
                    "type": "topic",
 
121
                    "x-declare": {
 
122
                        "durable": True,
 
123
                        "auto-delete": True,
 
124
                    },
 
125
                },
 
126
                "link": {
 
127
                    "name": link_name,
102
128
                    "durable": True,
103
 
                    "auto-delete": True,
104
 
                },
105
 
            },
106
 
            "link": {
107
 
                "name": link_name,
108
 
                "durable": True,
109
 
                "x-declare": {
110
 
                    "durable": False,
111
 
                    "auto-delete": True,
112
 
                    "exclusive": False,
113
 
                },
114
 
            },
115
 
        }
116
 
        addr_opts["node"]["x-declare"].update(node_opts)
 
129
                    "x-declare": {
 
130
                        "durable": False,
 
131
                        "auto-delete": True,
 
132
                        "exclusive": False,
 
133
                    },
 
134
                },
 
135
            }
 
136
            addr_opts["node"]["x-declare"].update(node_opts)
 
137
        elif conf.qpid_topology_version == 2:
 
138
            addr_opts = {
 
139
                "link": {
 
140
                    "x-declare": {
 
141
                        "auto-delete": True,
 
142
                    },
 
143
                },
 
144
            }
 
145
        else:
 
146
            raise_invalid_topology_version()
 
147
 
117
148
        addr_opts["link"]["x-declare"].update(link_opts)
118
149
 
119
150
        self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
120
151
 
121
 
        self.reconnect(session)
 
152
        self.connect(session)
 
153
 
 
154
    def connect(self, session):
 
155
        """Declare the reciever on connect."""
 
156
        self._declare_receiver(session)
122
157
 
123
158
    def reconnect(self, session):
124
159
        """Re-declare the receiver after a qpid reconnect."""
 
160
        self._declare_receiver(session)
 
161
 
 
162
    def _declare_receiver(self, session):
125
163
        self.session = session
126
164
        self.receiver = session.receiver(self.address)
127
165
        self.receiver.capacity = 1
152
190
        except Exception:
153
191
            LOG.exception(_("Failed to process message... skipping it."))
154
192
        finally:
 
193
            # TODO(sandy): Need support for optional ack_on_error.
155
194
            self.session.acknowledge(message)
156
195
 
157
196
    def get_receiver(self):
158
197
        return self.receiver
159
198
 
 
199
    def get_node_name(self):
 
200
        return self.address.split(';')[0]
 
201
 
160
202
 
161
203
class DirectConsumer(ConsumerBase):
162
204
    """Queue/consumer class for 'direct'."""
169
211
        'callback' is the callback to call when messages are received
170
212
        """
171
213
 
172
 
        super(DirectConsumer, self).__init__(session, callback,
173
 
                                             "%s/%s" % (msg_id, msg_id),
174
 
                                             {"type": "direct"},
175
 
                                             msg_id,
176
 
                                             {"exclusive": True})
 
214
        link_opts = {
 
215
            "auto-delete": conf.amqp_auto_delete,
 
216
            "exclusive": True,
 
217
            "durable": conf.amqp_durable_queues,
 
218
        }
 
219
 
 
220
        if conf.qpid_topology_version == 1:
 
221
            node_name = "%s/%s" % (msg_id, msg_id)
 
222
            node_opts = {"type": "direct"}
 
223
        elif conf.qpid_topology_version == 2:
 
224
            node_name = "amq.direct/%s" % msg_id
 
225
            node_opts = {}
 
226
        else:
 
227
            raise_invalid_topology_version()
 
228
 
 
229
        super(DirectConsumer, self).__init__(conf, session, callback,
 
230
                                             node_name, node_opts, msg_id,
 
231
                                             link_opts)
177
232
 
178
233
 
179
234
class TopicConsumer(ConsumerBase):
191
246
        """
192
247
 
193
248
        exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
194
 
        super(TopicConsumer, self).__init__(session, callback,
195
 
                                            "%s/%s" % (exchange_name, topic),
196
 
                                            {}, name or topic, {})
 
249
        link_opts = {
 
250
            "auto-delete": conf.amqp_auto_delete,
 
251
            "durable": conf.amqp_durable_queues,
 
252
        }
 
253
 
 
254
        if conf.qpid_topology_version == 1:
 
255
            node_name = "%s/%s" % (exchange_name, topic)
 
256
        elif conf.qpid_topology_version == 2:
 
257
            node_name = "amq.topic/topic/%s/%s" % (exchange_name, topic)
 
258
        else:
 
259
            raise_invalid_topology_version()
 
260
 
 
261
        super(TopicConsumer, self).__init__(conf, session, callback, node_name,
 
262
                                            {}, name or topic, link_opts)
197
263
 
198
264
 
199
265
class FanoutConsumer(ConsumerBase):
206
272
        'topic' is the topic to listen on
207
273
        'callback' is the callback to call when messages are received
208
274
        """
209
 
 
210
 
        super(FanoutConsumer, self).__init__(
211
 
            session, callback,
212
 
            "%s_fanout" % topic,
213
 
            {"durable": False, "type": "fanout"},
214
 
            "%s_fanout_%s" % (topic, uuid.uuid4().hex),
215
 
            {"exclusive": True})
 
275
        self.conf = conf
 
276
 
 
277
        link_opts = {"exclusive": True}
 
278
 
 
279
        if conf.qpid_topology_version == 1:
 
280
            node_name = "%s_fanout" % topic
 
281
            node_opts = {"durable": False, "type": "fanout"}
 
282
            link_name = "%s_fanout_%s" % (topic, uuid.uuid4().hex)
 
283
        elif conf.qpid_topology_version == 2:
 
284
            node_name = "amq.topic/fanout/%s" % topic
 
285
            node_opts = {}
 
286
            link_name = ""
 
287
        else:
 
288
            raise_invalid_topology_version()
 
289
 
 
290
        super(FanoutConsumer, self).__init__(conf, session, callback,
 
291
                                             node_name, node_opts, link_name,
 
292
                                             link_opts)
 
293
 
 
294
    def reconnect(self, session):
 
295
        topic = self.get_node_name().rpartition('_fanout')[0]
 
296
        params = {
 
297
            'session': session,
 
298
            'topic': topic,
 
299
            'callback': self.callback,
 
300
        }
 
301
 
 
302
        self.__init__(conf=self.conf, **params)
 
303
 
 
304
        super(FanoutConsumer, self).reconnect(session)
216
305
 
217
306
 
218
307
class Publisher(object):
219
308
    """Base Publisher class."""
220
309
 
221
 
    def __init__(self, session, node_name, node_opts=None):
 
310
    def __init__(self, conf, session, node_name, node_opts=None):
222
311
        """Init the Publisher class with the exchange_name, routing_key,
223
312
        and other options
224
313
        """
225
314
        self.sender = None
226
315
        self.session = session
227
316
 
228
 
        addr_opts = {
229
 
            "create": "always",
230
 
            "node": {
231
 
                "type": "topic",
232
 
                "x-declare": {
233
 
                    "durable": False,
234
 
                    # auto-delete isn't implemented for exchanges in qpid,
235
 
                    # but put in here anyway
236
 
                    "auto-delete": True,
 
317
        if conf.qpid_topology_version == 1:
 
318
            addr_opts = {
 
319
                "create": "always",
 
320
                "node": {
 
321
                    "type": "topic",
 
322
                    "x-declare": {
 
323
                        "durable": False,
 
324
                        # auto-delete isn't implemented for exchanges in qpid,
 
325
                        # but put in here anyway
 
326
                        "auto-delete": True,
 
327
                    },
237
328
                },
238
 
            },
239
 
        }
240
 
        if node_opts:
241
 
            addr_opts["node"]["x-declare"].update(node_opts)
 
329
            }
 
330
            if node_opts:
 
331
                addr_opts["node"]["x-declare"].update(node_opts)
242
332
 
243
 
        self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
 
333
            self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
 
334
        elif conf.qpid_topology_version == 2:
 
335
            self.address = node_name
 
336
        else:
 
337
            raise_invalid_topology_version()
244
338
 
245
339
        self.reconnect(session)
246
340
 
284
378
    """Publisher class for 'direct'."""
285
379
    def __init__(self, conf, session, msg_id):
286
380
        """Init a 'direct' publisher."""
287
 
        super(DirectPublisher, self).__init__(session, msg_id,
288
 
                                              {"type": "Direct"})
 
381
 
 
382
        if conf.qpid_topology_version == 1:
 
383
            node_name = msg_id
 
384
            node_opts = {"type": "direct"}
 
385
        elif conf.qpid_topology_version == 2:
 
386
            node_name = "amq.direct/%s" % msg_id
 
387
            node_opts = {}
 
388
        else:
 
389
            raise_invalid_topology_version()
 
390
 
 
391
        super(DirectPublisher, self).__init__(conf, session, node_name,
 
392
                                              node_opts)
289
393
 
290
394
 
291
395
class TopicPublisher(Publisher):
294
398
        """init a 'topic' publisher.
295
399
        """
296
400
        exchange_name = rpc_amqp.get_control_exchange(conf)
297
 
        super(TopicPublisher, self).__init__(session,
298
 
                                             "%s/%s" % (exchange_name, topic))
 
401
 
 
402
        if conf.qpid_topology_version == 1:
 
403
            node_name = "%s/%s" % (exchange_name, topic)
 
404
        elif conf.qpid_topology_version == 2:
 
405
            node_name = "amq.topic/topic/%s/%s" % (exchange_name, topic)
 
406
        else:
 
407
            raise_invalid_topology_version()
 
408
 
 
409
        super(TopicPublisher, self).__init__(conf, session, node_name)
299
410
 
300
411
 
301
412
class FanoutPublisher(Publisher):
303
414
    def __init__(self, conf, session, topic):
304
415
        """init a 'fanout' publisher.
305
416
        """
306
 
        super(FanoutPublisher, self).__init__(
307
 
            session,
308
 
            "%s_fanout" % topic, {"type": "fanout"})
 
417
 
 
418
        if conf.qpid_topology_version == 1:
 
419
            node_name = "%s_fanout" % topic
 
420
            node_opts = {"type": "fanout"}
 
421
        elif conf.qpid_topology_version == 2:
 
422
            node_name = "amq.topic/fanout/%s" % topic
 
423
            node_opts = {}
 
424
        else:
 
425
            raise_invalid_topology_version()
 
426
 
 
427
        super(FanoutPublisher, self).__init__(conf, session, node_name,
 
428
                                              node_opts)
309
429
 
310
430
 
311
431
class NotifyPublisher(Publisher):
314
434
        """init a 'topic' publisher.
315
435
        """
316
436
        exchange_name = rpc_amqp.get_control_exchange(conf)
317
 
        super(NotifyPublisher, self).__init__(session,
318
 
                                              "%s/%s" % (exchange_name, topic),
319
 
                                              {"durable": True})
 
437
        node_opts = {"durable": True}
 
438
 
 
439
        if conf.qpid_topology_version == 1:
 
440
            node_name = "%s/%s" % (exchange_name, topic)
 
441
        elif conf.qpid_topology_version == 2:
 
442
            node_name = "amq.topic/topic/%s/%s" % (exchange_name, topic)
 
443
        else:
 
444
            raise_invalid_topology_version()
 
445
 
 
446
        super(NotifyPublisher, self).__init__(conf, session, node_name,
 
447
                                              node_opts)
320
448
 
321
449
 
322
450
class Connection(object):
575
703
 
576
704
    def consume_in_thread(self):
577
705
        """Consumer from all queues/consumers in a greenthread."""
 
706
        @excutils.forever_retry_uncaught_exceptions
578
707
        def _consumer_thread():
579
708
            try:
580
709
                self.consume()
615
744
        return consumer
616
745
 
617
746
    def join_consumer_pool(self, callback, pool_name, topic,
618
 
                           exchange_name=None):
 
747
                           exchange_name=None, ack_on_error=True):
619
748
        """Register as a member of a group of consumers for a given topic from
620
749
        the specified exchange.
621
750
 
629
758
            callback=callback,
630
759
            connection_pool=rpc_amqp.get_connection_pool(self.conf,
631
760
                                                         Connection),
 
761
            wait_for_consumers=not ack_on_error
632
762
        )
633
763
        self.proxy_callbacks.append(callback_wrapper)
634
764