50
50
cfg.StrOpt('qpid_sasl_mechanisms',
52
52
help='Space separated list of SASL mechanisms to use for auth'),
53
cfg.BoolOpt('qpid_reconnect',
55
help='Automatically reconnect'),
56
cfg.IntOpt('qpid_reconnect_timeout',
58
help='Reconnection timeout in seconds'),
59
cfg.IntOpt('qpid_reconnect_limit',
61
help='Max reconnections before giving up'),
62
cfg.IntOpt('qpid_reconnect_interval_min',
64
help='Minimum seconds between reconnection attempts'),
65
cfg.IntOpt('qpid_reconnect_interval_max',
67
help='Maximum seconds between reconnection attempts'),
68
cfg.IntOpt('qpid_reconnect_interval',
70
help='Equivalent to setting max and min to the same value'),
71
53
cfg.IntOpt('qpid_heartbeat',
73
55
help='Seconds between connection keepalive heartbeats'),
170
152
class TopicConsumer(ConsumerBase):
171
153
"""Consumer class for 'topic'"""
173
def __init__(self, conf, session, topic, callback, name=None):
155
def __init__(self, conf, session, topic, callback, name=None,
174
157
"""Init a 'topic' queue.
176
159
:param session: the amqp session to use
180
163
:param name: optional queue name, defaults to topic
166
exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
183
167
super(TopicConsumer, self).__init__(session, callback,
184
"%s/%s" % (conf.control_exchange,
168
"%s/%s" % (exchange_name, topic),
186
169
{}, name or topic, {})
256
239
def __init__(self, conf, session, topic):
257
240
"""init a 'topic' publisher.
259
super(TopicPublisher, self).__init__(
261
"%s/%s" % (conf.control_exchange, topic))
242
exchange_name = rpc_amqp.get_control_exchange(conf)
243
super(TopicPublisher, self).__init__(session,
244
"%s/%s" % (exchange_name, topic))
264
247
class FanoutPublisher(Publisher):
276
259
def __init__(self, conf, session, topic):
277
260
"""init a 'topic' publisher.
279
super(NotifyPublisher, self).__init__(
281
"%s/%s" % (conf.control_exchange, topic),
262
exchange_name = rpc_amqp.get_control_exchange(conf)
263
super(NotifyPublisher, self).__init__(session,
264
"%s/%s" % (exchange_name, topic),
285
268
class Connection(object):
293
276
self.consumer_thread = None
296
if server_params is None:
299
default_params = dict(hostname=self.conf.qpid_hostname,
300
port=self.conf.qpid_port,
301
username=self.conf.qpid_username,
302
password=self.conf.qpid_password)
304
params = server_params
305
for key in default_params.keys():
306
params.setdefault(key, default_params[key])
280
'hostname': self.conf.qpid_hostname,
281
'port': self.conf.qpid_port,
282
'username': self.conf.qpid_username,
283
'password': self.conf.qpid_password,
285
params.update(server_params or {})
308
287
self.broker = params['hostname'] + ":" + str(params['port'])
288
self.username = params['username']
289
self.password = params['password']
290
self.connection_create()
293
def connection_create(self):
309
294
# Create the connection - this does not open the connection
310
295
self.connection = qpid.messaging.Connection(self.broker)
312
297
# Check if flags are set and if so set them for the connection
313
298
# before we call open
314
self.connection.username = params['username']
315
self.connection.password = params['password']
299
self.connection.username = self.username
300
self.connection.password = self.password
316
302
self.connection.sasl_mechanisms = self.conf.qpid_sasl_mechanisms
317
self.connection.reconnect = self.conf.qpid_reconnect
318
if self.conf.qpid_reconnect_timeout:
319
self.connection.reconnect_timeout = (
320
self.conf.qpid_reconnect_timeout)
321
if self.conf.qpid_reconnect_limit:
322
self.connection.reconnect_limit = self.conf.qpid_reconnect_limit
323
if self.conf.qpid_reconnect_interval_max:
324
self.connection.reconnect_interval_max = (
325
self.conf.qpid_reconnect_interval_max)
326
if self.conf.qpid_reconnect_interval_min:
327
self.connection.reconnect_interval_min = (
328
self.conf.qpid_reconnect_interval_min)
329
if self.conf.qpid_reconnect_interval:
330
self.connection.reconnect_interval = (
331
self.conf.qpid_reconnect_interval)
303
# Reconnection is done by self.reconnect()
304
self.connection.reconnect = False
332
305
self.connection.heartbeat = self.conf.qpid_heartbeat
333
306
self.connection.protocol = self.conf.qpid_protocol
334
307
self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay
336
# Open is part of reconnect -
337
# NOTE(WGH) not sure we need this with the reconnect flags
340
309
def _register_consumer(self, consumer):
341
310
self.consumers[str(consumer.get_receiver())] = consumer
351
320
except qpid.messaging.exceptions.ConnectionError:
326
self.connection_create()
356
327
self.connection.open()
357
328
except qpid.messaging.exceptions.ConnectionError, e:
358
LOG.error(_('Unable to connect to AMQP server: %s'), e)
359
time.sleep(self.conf.qpid_reconnect_interval or 1)
329
msg_dict = dict(e=e, delay=delay)
330
msg = _("Unable to connect to AMQP server: %(e)s. "
331
"Sleeping %(delay)s seconds") % msg_dict
334
delay = min(2 * delay, 60)
365
340
self.session = self.connection.session()
367
for consumer in self.consumers.itervalues():
368
consumer.reconnect(self.session)
370
342
if self.consumers:
343
consumers = self.consumers
346
for consumer in consumers.itervalues():
347
consumer.reconnect(self.session)
348
self._register_consumer(consumer)
371
350
LOG.debug(_("Re-established AMQP queues"))
373
352
def ensure(self, error_callback, method, *args, **kwargs):
465
444
self.declare_consumer(DirectConsumer, topic, callback)
467
def declare_topic_consumer(self, topic, callback=None, queue_name=None):
446
def declare_topic_consumer(self, topic, callback=None, queue_name=None,
468
448
"""Create a 'topic' consumer."""
469
449
self.declare_consumer(functools.partial(TopicConsumer,
451
exchange_name=exchange_name,