~ubuntu-branches/ubuntu/vivid/oslo.messaging/vivid-proposed

« back to all changes in this revision

Viewing changes to oslo/messaging/_drivers/impl_rabbit.py

  • Committer: Package Import Robot
  • Author(s): James Page
  • Date: 2014-12-11 14:34:04 UTC
  • mfrom: (1.1.8)
  • Revision ID: package-import@ubuntu.com-20141211143404-jmuzfr2q6k06wqxc
Tags: 1.5.1-0ubuntu1
* New upstream release.
  - d/p/*: Dropped, patches accepted upstream.
  - d/control: Add new dependencies, update minimum version
    requirements inline with new release.
  - d/p/enable-zmq-tests.patch: Cherry pick patch to fix misc
    ZeroMQ issues and enable test suite.
  - d/p/matchmaker-redis-fix.patch: Cherry pick fix for Redis
    matchmaker.
  - d/p/disable-zmq-tests.patch: Make conditional execution of
    zmq tests a little more intelligent.
* Enable easier use of ZeroMQ receiver daemon:
  - d/control: Add new oslo-messaging-zmq-receiver package.
  - d/oslo-messaging-zmq-receiver.{upstart,service}: Add upstart
    and systemd configuration files for receiver.
  - d/oslo-messaging-zmq-receiver.postinst: Create oslo user and
    group for daemon processes to run under.
  - d/etc/oslo-messaging.conf: Add default configuration file.
* d/control: Add Suggests on python-zmq to python-oslo.messaging. 
* d/control,compat: Bump compat level to 9.
* d/control,rules: Add subunit to BD's, generate pretty test output.
* d/control: Align dependency version requirements with upstream.
* d/control: Bumped Standards-Version to 3.9.6, no changes.
* d/control: Add dh-python to BD's.

Show diffs side-by-side

added added

removed removed

Lines of Context:
15
15
import functools
16
16
import itertools
17
17
import logging
18
 
import random
19
18
import socket
20
19
import ssl
21
20
import time
24
23
import kombu
25
24
import kombu.connection
26
25
import kombu.entity
 
26
import kombu.exceptions
27
27
import kombu.messaging
28
28
import six
 
29
from six.moves.urllib import parse
29
30
 
30
31
from oslo.config import cfg
31
32
from oslo.messaging._drivers import amqp as rpc_amqp
32
33
from oslo.messaging._drivers import amqpdriver
33
34
from oslo.messaging._drivers import common as rpc_common
 
35
from oslo.messaging._i18n import _
34
36
from oslo.messaging import exceptions
35
 
from oslo.messaging.openstack.common.gettextutils import _
36
37
from oslo.utils import netutils
37
38
 
 
39
 
38
40
rabbit_opts = [
39
41
    cfg.StrOpt('kombu_ssl_version',
40
42
               default='',
41
43
               help='SSL version to use (valid only if SSL enabled). '
42
 
                    'valid values are TLSv1, SSLv23 and SSLv3. SSLv2 may '
43
 
                    'be available on some distributions.'
 
44
                    'valid values are TLSv1 and SSLv23. SSLv2 and '
 
45
                    'SSLv3 may be available on some distributions.'
44
46
               ),
45
47
    cfg.StrOpt('kombu_ssl_keyfile',
46
48
               default='',
78
80
               secret=True),
79
81
    cfg.StrOpt('rabbit_login_method',
80
82
               default='AMQPLAIN',
81
 
               help='the RabbitMQ login method'),
 
83
               help='The RabbitMQ login method.'),
82
84
    cfg.StrOpt('rabbit_virtual_host',
83
85
               default='/',
84
86
               help='The RabbitMQ virtual host.'),
99
101
                     'If you change this option, you must wipe the '
100
102
                     'RabbitMQ database.'),
101
103
 
102
 
    # FIXME(markmc): this was toplevel in openstack.common.rpc
 
104
    # NOTE(sileht): deprecated option since oslo.messaging 1.5.0,
103
105
    cfg.BoolOpt('fake_rabbit',
104
106
                default=False,
105
 
                help='If passed, use a fake RabbitMQ provider.'),
 
107
                help='Deprecated, use rpc_backend=kombu+memory or '
 
108
                'rpc_backend=fake'),
106
109
]
107
110
 
108
111
LOG = logging.getLogger(__name__)
424
427
 
425
428
    def __init__(self, conf, url):
426
429
        self.consumers = []
 
430
        self.consumer_num = itertools.count(1)
427
431
        self.conf = conf
428
432
        self.max_retries = self.conf.rabbit_max_retries
429
433
        # Try forever?
433
437
        self.interval_stepping = self.conf.rabbit_retry_backoff
434
438
        # max retry-interval = 30 seconds
435
439
        self.interval_max = 30
436
 
        self.memory_transport = False
437
440
 
438
 
        ssl_params = self._fetch_ssl_params()
 
441
        self._ssl_params = self._fetch_ssl_params()
 
442
        self._login_method = self.conf.rabbit_login_method
439
443
 
440
444
        if url.virtual_host is not None:
441
445
            virtual_host = url.virtual_host
442
446
        else:
443
447
            virtual_host = self.conf.rabbit_virtual_host
444
448
 
445
 
        self.brokers_params = []
446
 
        if url.hosts:
 
449
        self._url = ''
 
450
        if self.conf.fake_rabbit:
 
451
            LOG.warn("Deprecated: fake_rabbit option is deprecated, set "
 
452
                     "rpc_backend to kombu+memory or use the fake "
 
453
                     "driver instead.")
 
454
            self._url = 'memory://%s/' % virtual_host
 
455
        elif url.hosts:
447
456
            for host in url.hosts:
448
 
                params = {
449
 
                    'hostname': host.hostname,
450
 
                    'port': host.port or 5672,
451
 
                    'userid': host.username or '',
452
 
                    'password': host.password or '',
453
 
                    'login_method': self.conf.rabbit_login_method,
454
 
                    'virtual_host': virtual_host
455
 
                }
456
 
                if self.conf.fake_rabbit:
457
 
                    params['transport'] = 'memory'
458
 
                if self.conf.rabbit_use_ssl:
459
 
                    params['ssl'] = ssl_params
460
 
 
461
 
                self.brokers_params.append(params)
 
457
                transport = url.transport.replace('kombu+', '')
 
458
                transport = url.transport.replace('rabbit', 'amqp')
 
459
                self._url += '%s%s://%s:%s@%s:%s/%s' % (
 
460
                    ";" if self._url else '',
 
461
                    transport,
 
462
                    parse.quote(host.username or ''),
 
463
                    parse.quote(host.password or ''),
 
464
                    host.hostname or '', str(host.port or 5672),
 
465
                    virtual_host)
 
466
        elif url.transport.startswith('kombu+'):
 
467
            # NOTE(sileht): url have a + but no hosts
 
468
            # (like kombu+memory:///), pass it to kombu as-is
 
469
            transport = url.transport.replace('kombu+', '')
 
470
            self._url = "%s://%s" % (transport, virtual_host)
462
471
        else:
463
 
            # Old configuration format
464
472
            for adr in self.conf.rabbit_hosts:
465
473
                hostname, port = netutils.parse_host_port(
466
474
                    adr, default_port=self.conf.rabbit_port)
467
 
 
468
 
                params = {
469
 
                    'hostname': hostname,
470
 
                    'port': port,
471
 
                    'userid': self.conf.rabbit_userid,
472
 
                    'password': self.conf.rabbit_password,
473
 
                    'login_method': self.conf.rabbit_login_method,
474
 
                    'virtual_host': virtual_host
475
 
                }
476
 
 
477
 
                if self.conf.fake_rabbit:
478
 
                    params['transport'] = 'memory'
479
 
                if self.conf.rabbit_use_ssl:
480
 
                    params['ssl'] = ssl_params
481
 
 
482
 
                self.brokers_params.append(params)
483
 
 
484
 
        random.shuffle(self.brokers_params)
485
 
        self.brokers = itertools.cycle(self.brokers_params)
486
 
 
487
 
        self.memory_transport = self.conf.fake_rabbit
488
 
 
489
 
        self.connection = None
490
 
        self.do_consume = None
491
 
        self.reconnect()
 
475
                self._url += '%samqp://%s:%s@%s:%s/%s' % (
 
476
                    ";" if self._url else '',
 
477
                    parse.quote(self.conf.rabbit_userid),
 
478
                    parse.quote(self.conf.rabbit_password),
 
479
                    hostname, port,
 
480
                    virtual_host)
 
481
 
 
482
        self.do_consume = True
 
483
 
 
484
        self.channel = None
 
485
        self.connection = kombu.connection.Connection(
 
486
            self._url, ssl=self._ssl_params, login_method=self._login_method,
 
487
            failover_strategy="shuffle")
 
488
 
 
489
        LOG.info(_('Connecting to AMQP server on %(hostname)s:%(port)d'),
 
490
                 {'hostname': self.connection.hostname,
 
491
                  'port': self.connection.port})
 
492
        # NOTE(sileht): just ensure the connection is setuped at startup
 
493
        self.ensure(error_callback=None,
 
494
                    method=lambda channel: True)
 
495
        LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d'),
 
496
                 {'hostname': self.connection.hostname,
 
497
                  'port': self.connection.port})
 
498
 
 
499
        if self._url.startswith('memory://'):
 
500
            # Kludge to speed up tests.
 
501
            self.connection.transport.polling_interval = 0.0
492
502
 
493
503
    # FIXME(markmc): use oslo sslutils when it is available as a library
494
504
    _SSL_PROTOCOLS = {
495
505
        "tlsv1": ssl.PROTOCOL_TLSv1,
496
 
        "sslv23": ssl.PROTOCOL_SSLv23,
497
 
        "sslv3": ssl.PROTOCOL_SSLv3
 
506
        "sslv23": ssl.PROTOCOL_SSLv23
498
507
    }
499
508
 
500
509
    try:
502
511
    except AttributeError:
503
512
        pass
504
513
 
 
514
    try:
 
515
        _SSL_PROTOCOLS["sslv3"] = ssl.PROTOCOL_SSLv3
 
516
    except AttributeError:
 
517
        pass
 
518
 
505
519
    @classmethod
506
520
    def validate_ssl_version(cls, version):
507
521
        key = version.lower()
531
545
            ssl_params['cert_reqs'] = ssl.CERT_REQUIRED
532
546
 
533
547
        # Return the extended behavior or just have the default behavior
534
 
        return ssl_params or True
 
548
        return ssl_params or None
535
549
 
536
 
    def _connect(self, broker):
537
 
        """Connect to rabbit.  Re-establish any queues that may have
538
 
        been declared before if we are reconnecting.  Exceptions should
539
 
        be handled by the caller.
 
550
    def ensure(self, error_callback, method, retry=None,
 
551
               timeout_is_error=True):
 
552
        """Will retry up to retry number of times.
 
553
        retry = None means use the value of rabbit_max_retries
 
554
        retry = -1 means to retry forever
 
555
        retry = 0 means no retry
 
556
        retry = N means N retries
540
557
        """
541
 
        LOG.info(_("Connecting to AMQP server on "
542
 
                   "%(hostname)s:%(port)d"), broker)
543
 
        self.connection = kombu.connection.BrokerConnection(**broker)
544
 
        self.connection_errors = self.connection.connection_errors
545
 
        self.channel_errors = self.connection.channel_errors
546
 
        if self.memory_transport:
547
 
            # Kludge to speed up tests.
548
 
            self.connection.transport.polling_interval = 0.0
549
 
        self.do_consume = True
550
 
        self.consumer_num = itertools.count(1)
551
 
        self.connection.connect()
552
 
        self.channel = self.connection.channel()
553
 
        # work around 'memory' transport bug in 1.1.3
554
 
        if self.memory_transport:
555
 
            self.channel._new_queue('ae.undeliver')
556
 
        for consumer in self.consumers:
557
 
            consumer.reconnect(self.channel)
558
 
        LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d'),
559
 
                 broker)
560
 
 
561
 
    def _disconnect(self):
562
 
        if self.connection:
 
558
 
 
559
        if retry is None:
 
560
            retry = self.max_retries
 
561
        if retry is None or retry < 0:
 
562
            retry = None
 
563
 
 
564
        def on_error(exc, interval):
 
565
            self.channel = None
 
566
 
 
567
            error_callback and error_callback(exc)
 
568
 
 
569
            interval = (self.conf.kombu_reconnect_delay + interval
 
570
                        if self.conf.kombu_reconnect_delay > 0 else interval)
 
571
 
 
572
            info = {'hostname': self.connection.hostname,
 
573
                    'port': self.connection.port,
 
574
                    'err_str': exc, 'sleep_time': interval}
 
575
 
 
576
            if 'Socket closed' in six.text_type(exc):
 
577
                LOG.error(_('AMQP server %(hostname)s:%(port)s closed'
 
578
                            ' the connection. Check login credentials:'
 
579
                            ' %(err_str)s'), info)
 
580
            else:
 
581
                LOG.error(_('AMQP server on %(hostname)s:%(port)s is '
 
582
                            'unreachable: %(err_str)s. Trying again in '
 
583
                            '%(sleep_time)d seconds.'), info)
 
584
 
563
585
            # XXX(nic): when reconnecting to a RabbitMQ cluster
564
586
            # with mirrored queues in use, the attempt to release the
565
587
            # connection can hang "indefinitely" somewhere deep down
566
588
            # in Kombu.  Blocking the thread for a bit prior to
567
589
            # release seems to kludge around the problem where it is
568
590
            # otherwise reproduceable.
 
591
            # TODO(sileht): Check if this is useful since we
 
592
            # use kombu for HA connection, the interval_step
 
593
            # should sufficient, because the underlying kombu transport
 
594
            # connection object freed.
569
595
            if self.conf.kombu_reconnect_delay > 0:
570
 
                LOG.info(_("Delaying reconnect for %1.1f seconds...") %
571
 
                         self.conf.kombu_reconnect_delay)
572
596
                time.sleep(self.conf.kombu_reconnect_delay)
573
597
 
574
 
            try:
575
 
                self.connection.release()
576
 
            except self.connection_errors:
577
 
                pass
578
 
            self.connection = None
579
 
 
580
 
    def reconnect(self, retry=None):
581
 
        """Handles reconnecting and re-establishing queues.
582
 
        Will retry up to retry number of times.
583
 
        retry = None means use the value of rabbit_max_retries
584
 
        retry = -1 means to retry forever
585
 
        retry = 0 means no retry
586
 
        retry = N means N retries
587
 
        Sleep between tries, starting at self.interval_start
588
 
        seconds, backing off self.interval_stepping number of seconds
589
 
        each attempt.
590
 
        """
591
 
 
592
 
        attempt = 0
593
 
        loop_forever = False
594
 
        if retry is None:
595
 
            retry = self.max_retries
596
 
        if retry is None or retry < 0:
597
 
            loop_forever = True
598
 
 
599
 
        while True:
600
 
            self._disconnect()
601
 
 
602
 
            broker = six.next(self.brokers)
603
 
            attempt += 1
604
 
            try:
605
 
                self._connect(broker)
606
 
                return
607
 
            except IOError as ex:
608
 
                e = ex
609
 
            except self.connection_errors as ex:
610
 
                e = ex
611
 
            except Exception as ex:
612
 
                # NOTE(comstud): Unfortunately it's possible for amqplib
613
 
                # to return an error not covered by its transport
614
 
                # connection_errors in the case of a timeout waiting for
615
 
                # a protocol response.  (See paste link in LP888621)
616
 
                # So, we check all exceptions for 'timeout' in them
617
 
                # and try to reconnect in this case.
618
 
                if 'timeout' not in six.text_type(e):
619
 
                    raise
620
 
                e = ex
621
 
 
622
 
            log_info = {}
623
 
            log_info['err_str'] = e
624
 
            log_info['retry'] = retry or 0
625
 
            log_info.update(broker)
626
 
 
627
 
            if not loop_forever and attempt > retry:
628
 
                msg = _('Unable to connect to AMQP server on '
629
 
                        '%(hostname)s:%(port)d after %(retry)d '
630
 
                        'tries: %(err_str)s') % log_info
631
 
                LOG.error(msg)
632
 
                raise exceptions.MessageDeliveryFailure(msg)
633
 
            else:
634
 
                if attempt == 1:
635
 
                    sleep_time = self.interval_start or 1
636
 
                elif attempt > 1:
637
 
                    sleep_time += self.interval_stepping
638
 
 
639
 
                sleep_time = min(sleep_time, self.interval_max)
640
 
 
641
 
                log_info['sleep_time'] = sleep_time
642
 
                if 'Socket closed' in six.text_type(e):
643
 
                    LOG.error(_('AMQP server %(hostname)s:%(port)d closed'
644
 
                                ' the connection. Check login credentials:'
645
 
                                ' %(err_str)s'), log_info)
646
 
                else:
647
 
                    LOG.error(_('AMQP server on %(hostname)s:%(port)d is '
648
 
                                'unreachable: %(err_str)s. Trying again in '
649
 
                                '%(sleep_time)d seconds.'), log_info)
650
 
                time.sleep(sleep_time)
651
 
 
652
 
    def ensure(self, error_callback, method, retry=None):
653
 
        while True:
654
 
            try:
655
 
                return method()
656
 
            except self.connection_errors as e:
657
 
                if error_callback:
658
 
                    error_callback(e)
659
 
            except self.channel_errors as e:
660
 
                if error_callback:
661
 
                    error_callback(e)
662
 
            except (socket.timeout, IOError) as e:
663
 
                if error_callback:
664
 
                    error_callback(e)
665
 
            except Exception as e:
666
 
                # NOTE(comstud): Unfortunately it's possible for amqplib
667
 
                # to return an error not covered by its transport
668
 
                # connection_errors in the case of a timeout waiting for
669
 
                # a protocol response.  (See paste link in LP888621)
670
 
                # So, we check all exceptions for 'timeout' in them
671
 
                # and try to reconnect in this case.
672
 
                if 'timeout' not in six.text_type(e):
673
 
                    raise
674
 
                if error_callback:
675
 
                    error_callback(e)
676
 
            self.reconnect(retry=retry)
677
 
 
678
 
    def get_channel(self):
679
 
        """Convenience call for bin/clear_rabbit_queues."""
680
 
        return self.channel
 
598
        def on_reconnection(new_channel):
 
599
            """Callback invoked when the kombu reconnects and creates
 
600
            a new channel, we use it the reconfigure our consumers.
 
601
            """
 
602
            self.consumer_num = itertools.count(1)
 
603
            for consumer in self.consumers:
 
604
                consumer.reconnect(new_channel)
 
605
 
 
606
        recoverable_errors = (self.connection.recoverable_channel_errors +
 
607
                              self.connection.recoverable_connection_errors)
 
608
        try:
 
609
            autoretry_method = self.connection.autoretry(
 
610
                method, channel=self.channel,
 
611
                max_retries=retry,
 
612
                errback=on_error,
 
613
                interval_start=self.interval_start or 1,
 
614
                interval_step=self.interval_stepping,
 
615
                on_revive=on_reconnection,
 
616
            )
 
617
            ret, channel = autoretry_method()
 
618
            self.channel = channel
 
619
            return ret
 
620
        except recoverable_errors as exc:
 
621
            self.channel = None
 
622
            # NOTE(sileht): number of retry exceeded and the connection
 
623
            # is still broken
 
624
            msg = _('Unable to connect to AMQP server on '
 
625
                    '%(hostname)s:%(port)d after %(retry)d '
 
626
                    'tries: %(err_str)s') % {
 
627
                        'hostname': self.connection.hostname,
 
628
                        'port': self.connection.port,
 
629
                        'err_str': exc,
 
630
                        'retry': retry}
 
631
            LOG.error(msg)
 
632
            raise exceptions.MessageDeliveryFailure(msg)
681
633
 
682
634
    def close(self):
683
635
        """Close/release this connection."""
687
639
 
688
640
    def reset(self):
689
641
        """Reset a connection so it can be used again."""
690
 
        self.channel.close()
691
 
        self.channel = self.connection.channel()
692
 
        # work around 'memory' transport bug in 1.1.3
693
 
        if self.memory_transport:
694
 
            self.channel._new_queue('ae.undeliver')
 
642
        if self.channel is not None:
 
643
            self.channel.close()
 
644
            self.channel = self.connection.channel()
695
645
        self.consumers = []
 
646
        self.consumer_num = itertools.count(1)
696
647
 
697
648
    def declare_consumer(self, consumer_cls, topic, callback):
698
649
        """Create a Consumer using the class that was passed in and
704
655
            LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
705
656
                      "%(err_str)s"), log_info)
706
657
 
707
 
        def _declare_consumer():
708
 
            consumer = consumer_cls(self.conf, self.channel, topic, callback,
 
658
        def _declare_consumer(channel):
 
659
            consumer = consumer_cls(self.conf, channel, topic, callback,
709
660
                                    six.next(self.consumer_num))
710
661
            self.consumers.append(consumer)
711
662
            return consumer
715
666
    def iterconsume(self, limit=None, timeout=None):
716
667
        """Return an iterator that will consume from all queues/consumers."""
717
668
 
718
 
        def _error_callback(exc):
719
 
            if isinstance(exc, socket.timeout):
 
669
        if timeout is None:
 
670
            deadline = None
 
671
        else:
 
672
            deadline = time.time() + timeout
 
673
 
 
674
        def _raise_timeout_if_deadline_is_reached(exc):
 
675
            if deadline is not None and deadline - time.time() < 0:
720
676
                LOG.debug('Timed out waiting for RPC response: %s', exc)
721
677
                raise rpc_common.Timeout()
722
 
            else:
723
 
                LOG.exception(_('Failed to consume message from queue: %s'),
724
 
                              exc)
725
 
                self.do_consume = True
726
 
 
727
 
        def _consume():
 
678
 
 
679
        def _error_callback(exc):
 
680
            self.do_consume = True
 
681
            _raise_timeout_if_deadline_is_reached(exc)
 
682
            LOG.exception(_('Failed to consume message from queue: %s'),
 
683
                          exc)
 
684
 
 
685
        def _consume(channel):
728
686
            if self.do_consume:
729
687
                queues_head = self.consumers[:-1]  # not fanout.
730
688
                queues_tail = self.consumers[-1]  # fanout
732
690
                    queue.consume(nowait=True)
733
691
                queues_tail.consume(nowait=False)
734
692
                self.do_consume = False
735
 
            return self.connection.drain_events(timeout=timeout)
 
693
            while True:
 
694
                try:
 
695
                    return self.connection.drain_events(timeout=1)
 
696
                except socket.timeout as exc:
 
697
                    _raise_timeout_if_deadline_is_reached(exc)
736
698
 
737
699
        for iteration in itertools.count(0):
738
700
            if limit and iteration >= limit:
748
710
            LOG.exception(_("Failed to publish message to topic "
749
711
                          "'%(topic)s': %(err_str)s"), log_info)
750
712
 
751
 
        def _publish():
752
 
            publisher = cls(self.conf, self.channel, topic=topic, **kwargs)
 
713
        def _publish(channel):
 
714
            publisher = cls(self.conf, channel, topic=topic, **kwargs)
753
715
            publisher.send(msg, timeout)
754
716
 
755
717
        self.ensure(_error_callback, _publish, retry=retry)
810
772
        conf.register_opts(rabbit_opts)
811
773
        conf.register_opts(rpc_amqp.amqp_opts)
812
774
 
813
 
        connection_pool = rpc_amqp.get_connection_pool(conf, url, Connection)
 
775
        connection_pool = rpc_amqp.ConnectionPool(conf, url, Connection)
814
776
 
815
777
        super(RabbitDriver, self).__init__(conf, url,
816
778
                                           connection_pool,