29
28
from nova.openstack.common import cfg
30
29
from nova.openstack.common.gettextutils import _
31
30
from nova.openstack.common import jsonutils
31
from nova.openstack.common import log as logging
32
32
from nova.openstack.common.rpc import amqp as rpc_amqp
33
33
from nova.openstack.common.rpc import common as rpc_common
41
41
cfg.StrOpt('qpid_port',
43
43
help='Qpid broker port'),
44
cfg.ListOpt('qpid_hosts',
45
default=['$qpid_hostname:$qpid_port'],
46
help='Qpid HA cluster host:port pairs'),
44
47
cfg.StrOpt('qpid_username',
46
49
help='Username for qpid connection'),
121
124
"""Fetch the message and pass it to the callback object"""
122
125
message = self.receiver.fetch()
124
self.callback(message.content)
127
msg = rpc_common.deserialize_msg(message.content)
125
129
except Exception:
126
130
LOG.exception(_("Failed to process message... skipping it."))
274
278
self.session = None
275
279
self.consumers = {}
276
280
self.consumer_thread = None
281
self.proxy_callbacks = []
284
if server_params and 'hostname' in server_params:
285
# NOTE(russellb) This enables support for cast_to_server.
286
server_params['qpid_hosts'] = [
287
'%s:%d' % (server_params['hostname'],
288
server_params.get('port', 5672))
280
'hostname': self.conf.qpid_hostname,
281
'port': self.conf.qpid_port,
292
'qpid_hosts': self.conf.qpid_hosts,
282
293
'username': self.conf.qpid_username,
283
294
'password': self.conf.qpid_password,
285
296
params.update(server_params or {})
287
self.broker = params['hostname'] + ":" + str(params['port'])
298
self.brokers = params['qpid_hosts']
288
299
self.username = params['username']
289
300
self.password = params['password']
290
self.connection_create()
301
self.connection_create(self.brokers[0])
293
def connection_create(self):
304
def connection_create(self, broker):
294
305
# Create the connection - this does not open the connection
295
self.connection = qpid.messaging.Connection(self.broker)
306
self.connection = qpid.messaging.Connection(broker)
297
308
# Check if flags are set and if so set them for the connection
298
309
# before we call open
320
331
except qpid.messaging.exceptions.ConnectionError:
337
broker = self.brokers[attempt % len(self.brokers)]
326
self.connection_create()
341
self.connection_create(broker)
327
342
self.connection.open()
328
343
except qpid.messaging.exceptions.ConnectionError, e:
329
344
msg_dict = dict(e=e, delay=delay)
333
348
time.sleep(delay)
334
349
delay = min(2 * delay, 60)
351
LOG.info(_('Connected to AMQP server on %s'), broker)
338
LOG.info(_('Connected to AMQP server on %s'), self.broker)
340
354
self.session = self.connection.session()
342
356
if self.consumers:
363
377
"""Close/release this connection"""
364
378
self.cancel_consumer_thread()
379
self.wait_on_proxy_callbacks()
365
380
self.connection.close()
366
381
self.connection = None
369
384
"""Reset a connection so it can be used again"""
370
385
self.cancel_consumer_thread()
386
self.wait_on_proxy_callbacks()
371
387
self.session.close()
372
388
self.session = self.connection.session()
373
389
self.consumers = {}
423
439
self.consumer_thread = None
441
def wait_on_proxy_callbacks(self):
442
"""Wait for all proxy callback threads to exit."""
443
for proxy_cb in self.proxy_callbacks:
425
446
def publisher_send(self, cls, topic, msg):
426
447
"""Send to a publisher based on the publisher class"""
497
518
proxy_cb = rpc_amqp.ProxyCallback(
498
519
self.conf, proxy,
499
520
rpc_amqp.get_connection_pool(self.conf, Connection))
521
self.proxy_callbacks.append(proxy_cb)
502
524
consumer = FanoutConsumer(self.conf, self.session, topic, proxy_cb)
512
534
proxy_cb = rpc_amqp.ProxyCallback(
513
535
self.conf, proxy,
514
536
rpc_amqp.get_connection_pool(self.conf, Connection))
537
self.proxy_callbacks.append(proxy_cb)
516
539
consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb,
570
593
rpc_amqp.get_connection_pool(conf, Connection))
573
def notify(conf, context, topic, msg):
596
def notify(conf, context, topic, msg, envelope):
574
597
"""Sends a notification event on a topic."""
575
598
return rpc_amqp.notify(conf, context, topic, msg,
576
rpc_amqp.get_connection_pool(conf, Connection))
599
rpc_amqp.get_connection_pool(conf, Connection),