~xianghui/ubuntu/trusty/oslo.messaging/icehouse-lp1521958

« back to all changes in this revision

Viewing changes to oslo/messaging/_drivers/impl_qpid.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short
  • Date: 2014-03-27 13:01:34 UTC
  • mfrom: (1.1.3)
  • Revision ID: package-import@ubuntu.com-20140327130134-va1pxzs253r43n15
Tags: 1.3.0~a9-0ubuntu1
* New upstream release (LP: #1298970)
* debian/control:
  - Add python-oslotest as a build dependency.
  - Use python-oslosphinx instead of python-oslo.sphinx

Show diffs side-by-side

added added

removed removed

Lines of Context:
39
39
qpid_opts = [
40
40
    cfg.StrOpt('qpid_hostname',
41
41
               default='localhost',
42
 
               help='Qpid broker hostname'),
 
42
               help='Qpid broker hostname.'),
43
43
    cfg.IntOpt('qpid_port',
44
44
               default=5672,
45
 
               help='Qpid broker port'),
 
45
               help='Qpid broker port.'),
46
46
    cfg.ListOpt('qpid_hosts',
47
47
                default=['$qpid_hostname:$qpid_port'],
48
 
                help='Qpid HA cluster host:port pairs'),
 
48
                help='Qpid HA cluster host:port pairs.'),
49
49
    cfg.StrOpt('qpid_username',
50
50
               default='',
51
 
               help='Username for Qpid connection'),
 
51
               help='Username for Qpid connection.'),
52
52
    cfg.StrOpt('qpid_password',
53
53
               default='',
54
 
               help='Password for Qpid connection',
 
54
               help='Password for Qpid connection.',
55
55
               secret=True),
56
56
    cfg.StrOpt('qpid_sasl_mechanisms',
57
57
               default='',
58
 
               help='Space separated list of SASL mechanisms to use for auth'),
 
58
               help='Space separated list of SASL mechanisms to use for '
 
59
                    'auth.'),
59
60
    cfg.IntOpt('qpid_heartbeat',
60
61
               default=60,
61
 
               help='Seconds between connection keepalive heartbeats'),
 
62
               help='Seconds between connection keepalive heartbeats.'),
62
63
    cfg.StrOpt('qpid_protocol',
63
64
               default='tcp',
64
 
               help="Transport to use, either 'tcp' or 'ssl'"),
 
65
               help="Transport to use, either 'tcp' or 'ssl'."),
65
66
    cfg.BoolOpt('qpid_tcp_nodelay',
66
67
                default=True,
67
 
                help='Disable Nagle algorithm'),
 
68
                help='Whether to disable the Nagle algorithm.'),
68
69
    # NOTE(russellb) If any additional versions are added (beyond 1 and 2),
69
70
    # this file could probably use some additional refactoring so that the
70
71
    # differences between each version are split into different classes.
88
89
    raise Exception(msg)
89
90
 
90
91
 
 
92
class QpidMessage(dict):
 
93
    def __init__(self, session, raw_message):
 
94
        super(QpidMessage, self).__init__(
 
95
            rpc_common.deserialize_msg(raw_message.content))
 
96
        self._raw_message = raw_message
 
97
        self._session = session
 
98
 
 
99
    def acknowledge(self):
 
100
        self._session.acknowledge(self._raw_message)
 
101
 
 
102
    def requeue(self):
 
103
        pass
 
104
 
 
105
 
91
106
class ConsumerBase(object):
92
107
    """Consumer base class."""
93
108
 
183
198
        message = self.receiver.fetch()
184
199
        try:
185
200
            self._unpack_json_msg(message)
186
 
            msg = rpc_common.deserialize_msg(message.content)
187
 
            self.callback(msg)
 
201
            self.callback(QpidMessage(self.session, message))
188
202
        except Exception:
189
203
            LOG.exception(_("Failed to process message... skipping it."))
190
 
        finally:
191
204
            self.session.acknowledge(message)
192
205
 
193
206
    def get_receiver(self):
461
474
        params.update(server_params or {})
462
475
 
463
476
        self.brokers = params['qpid_hosts']
 
477
 
 
478
        brokers_count = len(self.brokers)
 
479
        self.next_broker_indices = itertools.cycle(range(brokers_count))
 
480
 
464
481
        self.username = params['username']
465
482
        self.password = params['password']
466
483
        self.reconnect()
489
506
 
490
507
    def reconnect(self):
491
508
        """Handles reconnecting and re-establishing sessions and queues."""
492
 
        attempt = 0
493
509
        delay = 1
494
510
        while True:
495
511
            # Close the session if necessary
499
515
                except qpid_exceptions.ConnectionError:
500
516
                    pass
501
517
 
502
 
            broker = self.brokers[attempt % len(self.brokers)]
503
 
            attempt += 1
 
518
            broker = self.brokers[next(self.next_broker_indices)]
504
519
 
505
520
            try:
506
521
                self.connection_create(broker)
511
526
                        "Sleeping %(delay)s seconds") % msg_dict
512
527
                LOG.error(msg)
513
528
                time.sleep(delay)
514
 
                delay = min(2 * delay, 60)
 
529
                delay = min(delay + 1, 5)
515
530
            else:
516
531
                LOG.info(_('Connected to AMQP server on %s'), broker)
517
532
                break