~ubuntu-branches/ubuntu/saucy/nova/saucy-proposed

« back to all changes in this revision

Viewing changes to nova/openstack/common/rpc/impl_kombu.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short
  • Date: 2013-09-09 13:11:11 UTC
  • mfrom: (1.1.74)
  • Revision ID: package-import@ubuntu.com-20130909131111-aw02ice50wac9tma
Tags: 1:2013.2~b3-0ubuntu1
* New usptream release. 
* debian/patches/avoid_requirements_cheetah.patch: Dropped
* debian/patches/fix-sqlalchemy-0.7.9-usage.patch: Dropped
* debian/patches/fix-requirements.patch: Refreshed.
* debian/patches/path-to-the-xenhost.conf-fixup.patch: Refreshed
* debian/control: Add python-jinja2
* debian/control: Dropped python-cheetah

Show diffs side-by-side

added added

removed removed

Lines of Context:
18
18
import itertools
19
19
import socket
20
20
import ssl
21
 
import sys
22
21
import time
23
22
import uuid
24
23
 
30
29
import kombu.messaging
31
30
from oslo.config import cfg
32
31
 
33
 
from nova.openstack.common.gettextutils import _
 
32
from nova.openstack.common import excutils
 
33
from nova.openstack.common.gettextutils import _  # noqa
34
34
from nova.openstack.common import network_utils
35
35
from nova.openstack.common.rpc import amqp as rpc_amqp
36
36
from nova.openstack.common.rpc import common as rpc_common
 
37
from nova.openstack.common import sslutils
37
38
 
38
39
kombu_opts = [
39
40
    cfg.StrOpt('kombu_ssl_version',
40
41
               default='',
41
 
               help='SSL version to use (valid only if SSL enabled)'),
 
42
               help='SSL version to use (valid only if SSL enabled). '
 
43
                    'valid values are TLSv1, SSLv23 and SSLv3. SSLv2 may '
 
44
                    'be available on some distributions'
 
45
               ),
42
46
    cfg.StrOpt('kombu_ssl_keyfile',
43
47
               default='',
44
48
               help='SSL key file (valid only if SSL enabled)'),
82
86
               default=0,
83
87
               help='maximum retries with trying to connect to RabbitMQ '
84
88
                    '(the default of 0 implies an infinite retry count)'),
85
 
    cfg.BoolOpt('rabbit_durable_queues',
86
 
                default=False,
87
 
                help='use durable queues in RabbitMQ'),
88
89
    cfg.BoolOpt('rabbit_ha_queues',
89
90
                default=False,
90
91
                help='use H/A queues in RabbitMQ (x-ha-policy: all).'
145
146
        Messages that are processed without exception are ack'ed.
146
147
 
147
148
        If the message processing generates an exception, it will be
148
 
        ack'ed if ack_on_error=True. Otherwise it will be .reject()'ed.
149
 
        Rejection is better than waiting for the message to timeout.
150
 
        Rejected messages are immediately requeued.
 
149
        ack'ed if ack_on_error=True. Otherwise it will be .requeue()'ed.
151
150
        """
152
151
 
153
 
        ack_msg = False
154
152
        try:
155
153
            msg = rpc_common.deserialize_msg(message.payload)
156
154
            callback(msg)
157
 
            ack_msg = True
158
155
        except Exception:
159
156
            if self.ack_on_error:
160
 
                ack_msg = True
161
157
                LOG.exception(_("Failed to process message"
162
158
                                " ... skipping it."))
 
159
                message.ack()
163
160
            else:
164
161
                LOG.exception(_("Failed to process message"
165
162
                                " ... will requeue."))
166
 
        finally:
167
 
            if ack_msg:
168
 
                message.ack()
169
 
            else:
170
 
                message.reject()
 
163
                message.requeue()
 
164
        else:
 
165
            message.ack()
171
166
 
172
167
    def consume(self, *args, **kwargs):
173
168
        """Actually declare the consumer on the amqp channel.  This will
256
251
        Other kombu options may be passed as keyword arguments
257
252
        """
258
253
        # Default options
259
 
        options = {'durable': conf.rabbit_durable_queues,
 
254
        options = {'durable': conf.amqp_durable_queues,
260
255
                   'queue_arguments': _get_queue_arguments(conf),
261
 
                   'auto_delete': False,
 
256
                   'auto_delete': conf.amqp_auto_delete,
262
257
                   'exclusive': False}
263
258
        options.update(kwargs)
264
259
        exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
362
357
 
363
358
        Kombu options may be passed as keyword args to override defaults
364
359
        """
365
 
        options = {'durable': conf.rabbit_durable_queues,
366
 
                   'auto_delete': False,
 
360
        options = {'durable': conf.amqp_durable_queues,
 
361
                   'auto_delete': conf.amqp_auto_delete,
367
362
                   'exclusive': False}
368
363
        options.update(kwargs)
369
364
        exchange_name = rpc_amqp.get_control_exchange(conf)
393
388
    """Publisher class for 'notify'."""
394
389
 
395
390
    def __init__(self, conf, channel, topic, **kwargs):
396
 
        self.durable = kwargs.pop('durable', conf.rabbit_durable_queues)
 
391
        self.durable = kwargs.pop('durable', conf.amqp_durable_queues)
397
392
        self.queue_arguments = _get_queue_arguments(conf)
398
393
        super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs)
399
394
 
477
472
 
478
473
        # http://docs.python.org/library/ssl.html - ssl.wrap_socket
479
474
        if self.conf.kombu_ssl_version:
480
 
            ssl_params['ssl_version'] = self.conf.kombu_ssl_version
 
475
            ssl_params['ssl_version'] = sslutils.validate_ssl_version(
 
476
                self.conf.kombu_ssl_version)
481
477
        if self.conf.kombu_ssl_keyfile:
482
478
            ssl_params['keyfile'] = self.conf.kombu_ssl_keyfile
483
479
        if self.conf.kombu_ssl_certfile:
488
484
            # future with this?
489
485
            ssl_params['cert_reqs'] = ssl.CERT_REQUIRED
490
486
 
491
 
        if not ssl_params:
492
 
            # Just have the default behavior
493
 
            return True
494
 
        else:
495
 
            # Return the extended behavior
496
 
            return ssl_params
 
487
        # Return the extended behavior or just have the default behavior
 
488
        return ssl_params or True
497
489
 
498
490
    def _connect(self, params):
499
491
        """Connect to rabbit.  Re-establish any queues that may have
560
552
            log_info.update(params)
561
553
 
562
554
            if self.max_retries and attempt == self.max_retries:
563
 
                LOG.error(_('Unable to connect to AMQP server on '
564
 
                            '%(hostname)s:%(port)d after %(max_retries)d '
565
 
                            'tries: %(err_str)s') % log_info)
566
 
                # NOTE(comstud): Copied from original code.  There's
567
 
                # really no better recourse because if this was a queue we
568
 
                # need to consume on, we have no way to consume anymore.
569
 
                sys.exit(1)
 
555
                msg = _('Unable to connect to AMQP server on '
 
556
                        '%(hostname)s:%(port)d after %(max_retries)d '
 
557
                        'tries: %(err_str)s') % log_info
 
558
                LOG.error(msg)
 
559
                raise rpc_common.RPCException(msg)
570
560
 
571
561
            if attempt == 1:
572
562
                sleep_time = self.interval_start or 1
748
738
 
749
739
    def consume_in_thread(self):
750
740
        """Consumer from all queues/consumers in a greenthread."""
 
741
        @excutils.forever_retry_uncaught_exceptions
751
742
        def _consumer_thread():
752
743
            try:
753
744
                self.consume()
792
783
            callback=callback,
793
784
            connection_pool=rpc_amqp.get_connection_pool(self.conf,
794
785
                                                         Connection),
 
786
            wait_for_consumers=not ack_on_error
795
787
        )
796
788
        self.proxy_callbacks.append(callback_wrapper)
797
789
        self.declare_topic_consumer(