~ubuntu-branches/ubuntu/raring/nova/raring-proposed

« back to all changes in this revision

Viewing changes to nova/openstack/common/rpc/impl_qpid.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short, Adam Gandelman, Chuck Short
  • Date: 2012-11-23 09:04:58 UTC
  • mfrom: (1.1.66)
  • Revision ID: package-import@ubuntu.com-20121123090458-91565o7aev1i1h71
Tags: 2013.1~g1-0ubuntu1
[ Adam Gandelman ]
* debian/control: Ensure novaclient is upgraded with nova,
  require python-keystoneclient >= 1:2.9.0. (LP: #1073289)
* debian/patches/{ubuntu/*, rbd-security.patch}: Dropped, applied
  upstream.
* debian/control: Add python-testtools to Build-Depends.

[ Chuck Short ]
* New upstream version.
* Refreshed debian/patches/avoid_setuptools_git_dependency.patch.
* debian/rules: FTBFS if missing binaries.
* debian/nova-scheudler.install: Add missing rabbit-queues and
  nova-rpc-zmq-receiver.
* Remove nova-volume since it doesnt exist anymore, transition to cinder-*.
* debian/rules: install apport hook in the right place.
* debian/patches/ubuntu-show-tests.patch: Display test failures.
* debian/control: Add depends on genisoimage
* debian/control: Suggest guestmount.
* debian/control: Suggest websockify. (LP: #1076442)
* debian/nova.conf: Disable nova-volume service.
* debian/control: Depend on xen-system-* rather than the hypervisor.
* debian/control, debian/mans/nova-conductor.8, debian/nova-conductor.init,
  debian/nova-conductor.install, debian/nova-conductor.logrotate
  debian/nova-conductor.manpages, debian/nova-conductor.postrm
  debian/nova-conductor.upstart.in: Add nova-conductor service.
* debian/control: Add python-fixtures as a build deps.

Show diffs side-by-side

added added

removed removed

Lines of Context:
50
50
    cfg.StrOpt('qpid_sasl_mechanisms',
51
51
               default='',
52
52
               help='Space separated list of SASL mechanisms to use for auth'),
53
 
    cfg.BoolOpt('qpid_reconnect',
54
 
                default=True,
55
 
                help='Automatically reconnect'),
56
 
    cfg.IntOpt('qpid_reconnect_timeout',
57
 
               default=0,
58
 
               help='Reconnection timeout in seconds'),
59
 
    cfg.IntOpt('qpid_reconnect_limit',
60
 
               default=0,
61
 
               help='Max reconnections before giving up'),
62
 
    cfg.IntOpt('qpid_reconnect_interval_min',
63
 
               default=0,
64
 
               help='Minimum seconds between reconnection attempts'),
65
 
    cfg.IntOpt('qpid_reconnect_interval_max',
66
 
               default=0,
67
 
               help='Maximum seconds between reconnection attempts'),
68
 
    cfg.IntOpt('qpid_reconnect_interval',
69
 
               default=0,
70
 
               help='Equivalent to setting max and min to the same value'),
71
53
    cfg.IntOpt('qpid_heartbeat',
72
54
               default=60,
73
55
               help='Seconds between connection keepalive heartbeats'),
170
152
class TopicConsumer(ConsumerBase):
171
153
    """Consumer class for 'topic'"""
172
154
 
173
 
    def __init__(self, conf, session, topic, callback, name=None):
 
155
    def __init__(self, conf, session, topic, callback, name=None,
 
156
                 exchange_name=None):
174
157
        """Init a 'topic' queue.
175
158
 
176
159
        :param session: the amqp session to use
180
163
        :param name: optional queue name, defaults to topic
181
164
        """
182
165
 
 
166
        exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
183
167
        super(TopicConsumer, self).__init__(session, callback,
184
 
                                            "%s/%s" % (conf.control_exchange,
185
 
                                                       topic),
 
168
                                            "%s/%s" % (exchange_name, topic),
186
169
                                            {}, name or topic, {})
187
170
 
188
171
 
256
239
    def __init__(self, conf, session, topic):
257
240
        """init a 'topic' publisher.
258
241
        """
259
 
        super(TopicPublisher, self).__init__(
260
 
            session,
261
 
            "%s/%s" % (conf.control_exchange, topic))
 
242
        exchange_name = rpc_amqp.get_control_exchange(conf)
 
243
        super(TopicPublisher, self).__init__(session,
 
244
                                             "%s/%s" % (exchange_name, topic))
262
245
 
263
246
 
264
247
class FanoutPublisher(Publisher):
276
259
    def __init__(self, conf, session, topic):
277
260
        """init a 'topic' publisher.
278
261
        """
279
 
        super(NotifyPublisher, self).__init__(
280
 
            session,
281
 
            "%s/%s" % (conf.control_exchange, topic),
282
 
            {"durable": True})
 
262
        exchange_name = rpc_amqp.get_control_exchange(conf)
 
263
        super(NotifyPublisher, self).__init__(session,
 
264
                                              "%s/%s" % (exchange_name, topic),
 
265
                                              {"durable": True})
283
266
 
284
267
 
285
268
class Connection(object):
293
276
        self.consumer_thread = None
294
277
        self.conf = conf
295
278
 
296
 
        if server_params is None:
297
 
            server_params = {}
298
 
 
299
 
        default_params = dict(hostname=self.conf.qpid_hostname,
300
 
                              port=self.conf.qpid_port,
301
 
                              username=self.conf.qpid_username,
302
 
                              password=self.conf.qpid_password)
303
 
 
304
 
        params = server_params
305
 
        for key in default_params.keys():
306
 
            params.setdefault(key, default_params[key])
 
279
        params = {
 
280
            'hostname': self.conf.qpid_hostname,
 
281
            'port': self.conf.qpid_port,
 
282
            'username': self.conf.qpid_username,
 
283
            'password': self.conf.qpid_password,
 
284
        }
 
285
        params.update(server_params or {})
307
286
 
308
287
        self.broker = params['hostname'] + ":" + str(params['port'])
 
288
        self.username = params['username']
 
289
        self.password = params['password']
 
290
        self.connection_create()
 
291
        self.reconnect()
 
292
 
 
293
    def connection_create(self):
309
294
        # Create the connection - this does not open the connection
310
295
        self.connection = qpid.messaging.Connection(self.broker)
311
296
 
312
297
        # Check if flags are set and if so set them for the connection
313
298
        # before we call open
314
 
        self.connection.username = params['username']
315
 
        self.connection.password = params['password']
 
299
        self.connection.username = self.username
 
300
        self.connection.password = self.password
 
301
 
316
302
        self.connection.sasl_mechanisms = self.conf.qpid_sasl_mechanisms
317
 
        self.connection.reconnect = self.conf.qpid_reconnect
318
 
        if self.conf.qpid_reconnect_timeout:
319
 
            self.connection.reconnect_timeout = (
320
 
                self.conf.qpid_reconnect_timeout)
321
 
        if self.conf.qpid_reconnect_limit:
322
 
            self.connection.reconnect_limit = self.conf.qpid_reconnect_limit
323
 
        if self.conf.qpid_reconnect_interval_max:
324
 
            self.connection.reconnect_interval_max = (
325
 
                self.conf.qpid_reconnect_interval_max)
326
 
        if self.conf.qpid_reconnect_interval_min:
327
 
            self.connection.reconnect_interval_min = (
328
 
                self.conf.qpid_reconnect_interval_min)
329
 
        if self.conf.qpid_reconnect_interval:
330
 
            self.connection.reconnect_interval = (
331
 
                self.conf.qpid_reconnect_interval)
 
303
        # Reconnection is done by self.reconnect()
 
304
        self.connection.reconnect = False
332
305
        self.connection.heartbeat = self.conf.qpid_heartbeat
333
306
        self.connection.protocol = self.conf.qpid_protocol
334
307
        self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay
335
308
 
336
 
        # Open is part of reconnect -
337
 
        # NOTE(WGH) not sure we need this with the reconnect flags
338
 
        self.reconnect()
339
 
 
340
309
    def _register_consumer(self, consumer):
341
310
        self.consumers[str(consumer.get_receiver())] = consumer
342
311
 
351
320
            except qpid.messaging.exceptions.ConnectionError:
352
321
                pass
353
322
 
 
323
        delay = 1
354
324
        while True:
355
325
            try:
 
326
                self.connection_create()
356
327
                self.connection.open()
357
328
            except qpid.messaging.exceptions.ConnectionError, e:
358
 
                LOG.error(_('Unable to connect to AMQP server: %s'), e)
359
 
                time.sleep(self.conf.qpid_reconnect_interval or 1)
 
329
                msg_dict = dict(e=e, delay=delay)
 
330
                msg = _("Unable to connect to AMQP server: %(e)s. "
 
331
                        "Sleeping %(delay)s seconds") % msg_dict
 
332
                LOG.error(msg)
 
333
                time.sleep(delay)
 
334
                delay = min(2 * delay, 60)
360
335
            else:
361
336
                break
362
337
 
364
339
 
365
340
        self.session = self.connection.session()
366
341
 
367
 
        for consumer in self.consumers.itervalues():
368
 
            consumer.reconnect(self.session)
369
 
 
370
342
        if self.consumers:
 
343
            consumers = self.consumers
 
344
            self.consumers = {}
 
345
 
 
346
            for consumer in consumers.itervalues():
 
347
                consumer.reconnect(self.session)
 
348
                self._register_consumer(consumer)
 
349
 
371
350
            LOG.debug(_("Re-established AMQP queues"))
372
351
 
373
352
    def ensure(self, error_callback, method, *args, **kwargs):
464
443
        """
465
444
        self.declare_consumer(DirectConsumer, topic, callback)
466
445
 
467
 
    def declare_topic_consumer(self, topic, callback=None, queue_name=None):
 
446
    def declare_topic_consumer(self, topic, callback=None, queue_name=None,
 
447
                               exchange_name=None):
468
448
        """Create a 'topic' consumer."""
469
449
        self.declare_consumer(functools.partial(TopicConsumer,
470
450
                                                name=queue_name,
 
451
                                                exchange_name=exchange_name,
471
452
                                                ),
472
453
                              topic, callback)
473
454