40
40
cfg.StrOpt('qpid_hostname',
41
41
default='localhost',
42
help='Qpid broker hostname'),
42
help='Qpid broker hostname.'),
43
43
cfg.IntOpt('qpid_port',
45
help='Qpid broker port'),
45
help='Qpid broker port.'),
46
46
cfg.ListOpt('qpid_hosts',
47
47
default=['$qpid_hostname:$qpid_port'],
48
help='Qpid HA cluster host:port pairs'),
48
help='Qpid HA cluster host:port pairs.'),
49
49
cfg.StrOpt('qpid_username',
51
help='Username for Qpid connection'),
51
help='Username for Qpid connection.'),
52
52
cfg.StrOpt('qpid_password',
54
help='Password for Qpid connection',
54
help='Password for Qpid connection.',
56
56
cfg.StrOpt('qpid_sasl_mechanisms',
58
help='Space separated list of SASL mechanisms to use for auth'),
58
help='Space separated list of SASL mechanisms to use for '
59
60
cfg.IntOpt('qpid_heartbeat',
61
help='Seconds between connection keepalive heartbeats'),
62
help='Seconds between connection keepalive heartbeats.'),
62
63
cfg.StrOpt('qpid_protocol',
64
help="Transport to use, either 'tcp' or 'ssl'"),
65
help="Transport to use, either 'tcp' or 'ssl'."),
65
66
cfg.BoolOpt('qpid_tcp_nodelay',
67
help='Disable Nagle algorithm'),
68
help='Whether to disable the Nagle algorithm.'),
68
69
# NOTE(russellb) If any additional versions are added (beyond 1 and 2),
69
70
# this file could probably use some additional refactoring so that the
70
71
# differences between each version are split into different classes.
88
89
raise Exception(msg)
92
class QpidMessage(dict):
93
def __init__(self, session, raw_message):
94
super(QpidMessage, self).__init__(
95
rpc_common.deserialize_msg(raw_message.content))
96
self._raw_message = raw_message
97
self._session = session
99
def acknowledge(self):
100
self._session.acknowledge(self._raw_message)
91
106
class ConsumerBase(object):
92
107
"""Consumer base class."""
183
198
message = self.receiver.fetch()
185
200
self._unpack_json_msg(message)
186
msg = rpc_common.deserialize_msg(message.content)
201
self.callback(QpidMessage(self.session, message))
188
202
except Exception:
189
203
LOG.exception(_("Failed to process message... skipping it."))
191
204
self.session.acknowledge(message)
193
206
def get_receiver(self):
461
474
params.update(server_params or {})
463
476
self.brokers = params['qpid_hosts']
478
brokers_count = len(self.brokers)
479
self.next_broker_indices = itertools.cycle(range(brokers_count))
464
481
self.username = params['username']
465
482
self.password = params['password']
511
526
"Sleeping %(delay)s seconds") % msg_dict
513
528
time.sleep(delay)
514
delay = min(2 * delay, 60)
529
delay = min(delay + 1, 5)
516
531
LOG.info(_('Connected to AMQP server on %s'), broker)