245
241
class TopicPublisher(Publisher):
246
242
"""Publisher class for 'topic'"""
247
def __init__(self, session, topic):
243
def __init__(self, conf, session, topic):
248
244
"""init a 'topic' publisher.
250
246
super(TopicPublisher, self).__init__(session,
251
"%s/%s" % (FLAGS.control_exchange, topic))
247
"%s/%s" % (conf.control_exchange, topic))
254
250
class FanoutPublisher(Publisher):
255
251
"""Publisher class for 'fanout'"""
256
def __init__(self, session, topic):
252
def __init__(self, conf, session, topic):
257
253
"""init a 'fanout' publisher.
259
255
super(FanoutPublisher, self).__init__(session,
263
259
class NotifyPublisher(Publisher):
264
260
"""Publisher class for notifications"""
265
def __init__(self, session, topic):
261
def __init__(self, conf, session, topic):
266
262
"""init a 'topic' publisher.
268
264
super(NotifyPublisher, self).__init__(session,
269
"%s/%s" % (FLAGS.control_exchange, topic),
265
"%s/%s" % (conf.control_exchange, topic),
270
266
{"durable": True})
273
269
class Connection(object):
274
270
"""Connection object."""
276
def __init__(self, server_params=None):
274
def __init__(self, conf, server_params=None):
277
275
self.session = None
278
276
self.consumers = {}
279
277
self.consumer_thread = None
281
280
if server_params is None:
282
281
server_params = {}
284
default_params = dict(hostname=FLAGS.qpid_hostname,
285
port=FLAGS.qpid_port,
286
username=FLAGS.qpid_username,
287
password=FLAGS.qpid_password)
283
default_params = dict(hostname=self.conf.qpid_hostname,
284
port=self.conf.qpid_port,
285
username=self.conf.qpid_username,
286
password=self.conf.qpid_password)
289
288
params = server_params
290
289
for key in default_params.keys():
298
297
# before we call open
299
298
self.connection.username = params['username']
300
299
self.connection.password = params['password']
301
self.connection.sasl_mechanisms = FLAGS.qpid_sasl_mechanisms
302
self.connection.reconnect = FLAGS.qpid_reconnect
303
if FLAGS.qpid_reconnect_timeout:
304
self.connection.reconnect_timeout = FLAGS.qpid_reconnect_timeout
305
if FLAGS.qpid_reconnect_limit:
306
self.connection.reconnect_limit = FLAGS.qpid_reconnect_limit
307
if FLAGS.qpid_reconnect_interval_max:
300
self.connection.sasl_mechanisms = self.conf.qpid_sasl_mechanisms
301
self.connection.reconnect = self.conf.qpid_reconnect
302
if self.conf.qpid_reconnect_timeout:
303
self.connection.reconnect_timeout = (
304
self.conf.qpid_reconnect_timeout)
305
if self.conf.qpid_reconnect_limit:
306
self.connection.reconnect_limit = self.conf.qpid_reconnect_limit
307
if self.conf.qpid_reconnect_interval_max:
308
308
self.connection.reconnect_interval_max = (
309
FLAGS.qpid_reconnect_interval_max)
310
if FLAGS.qpid_reconnect_interval_min:
309
self.conf.qpid_reconnect_interval_max)
310
if self.conf.qpid_reconnect_interval_min:
311
311
self.connection.reconnect_interval_min = (
312
FLAGS.qpid_reconnect_interval_min)
313
if FLAGS.qpid_reconnect_interval:
314
self.connection.reconnect_interval = FLAGS.qpid_reconnect_interval
315
self.connection.hearbeat = FLAGS.qpid_heartbeat
316
self.connection.protocol = FLAGS.qpid_protocol
317
self.connection.tcp_nodelay = FLAGS.qpid_tcp_nodelay
312
self.conf.qpid_reconnect_interval_min)
313
if self.conf.qpid_reconnect_interval:
314
self.connection.reconnect_interval = (
315
self.conf.qpid_reconnect_interval)
316
self.connection.hearbeat = self.conf.qpid_heartbeat
317
self.connection.protocol = self.conf.qpid_protocol
318
self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay
319
320
# Open is part of reconnect -
320
321
# NOTE(WGH) not sure we need this with the reconnect flags
494
495
def create_consumer(self, topic, proxy, fanout=False):
495
496
"""Create a consumer that calls a method in a proxy object"""
497
proxy_cb = rpc_amqp.ProxyCallback(self.conf, proxy,
498
rpc_amqp.get_connection_pool(self, Connection))
497
consumer = FanoutConsumer(self.session, topic,
498
rpc_amqp.ProxyCallback(proxy, Connection.pool))
501
consumer = FanoutConsumer(self.conf, self.session, topic, proxy_cb)
500
consumer = TopicConsumer(self.session, topic,
501
rpc_amqp.ProxyCallback(proxy, Connection.pool))
503
consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb)
502
505
self._register_consumer(consumer)
506
Connection.pool = rpc_amqp.Pool(connection_cls=Connection)
509
def create_connection(new=True):
510
def create_connection(conf, new=True):
510
511
"""Create a connection"""
511
return rpc_amqp.create_connection(new, Connection.pool)
514
def multicall(context, topic, msg, timeout=None):
512
return rpc_amqp.create_connection(conf, new,
513
rpc_amqp.get_connection_pool(conf, Connection))
516
def multicall(conf, context, topic, msg, timeout=None):
515
517
"""Make a call that returns multiple times."""
516
return rpc_amqp.multicall(context, topic, msg, timeout, Connection.pool)
519
def call(context, topic, msg, timeout=None):
518
return rpc_amqp.multicall(conf, context, topic, msg, timeout,
519
rpc_amqp.get_connection_pool(conf, Connection))
522
def call(conf, context, topic, msg, timeout=None):
520
523
"""Sends a message on a topic and wait for a response."""
521
return rpc_amqp.call(context, topic, msg, timeout, Connection.pool)
524
def cast(context, topic, msg):
524
return rpc_amqp.call(conf, context, topic, msg, timeout,
525
rpc_amqp.get_connection_pool(conf, Connection))
528
def cast(conf, context, topic, msg):
525
529
"""Sends a message on a topic without waiting for a response."""
526
return rpc_amqp.cast(context, topic, msg, Connection.pool)
529
def fanout_cast(context, topic, msg):
530
return rpc_amqp.cast(conf, context, topic, msg,
531
rpc_amqp.get_connection_pool(conf, Connection))
534
def fanout_cast(conf, context, topic, msg):
530
535
"""Sends a message on a fanout exchange without waiting for a response."""
531
return rpc_amqp.fanout_cast(context, topic, msg, Connection.pool)
534
def cast_to_server(context, server_params, topic, msg):
536
return rpc_amqp.fanout_cast(conf, context, topic, msg,
537
rpc_amqp.get_connection_pool(conf, Connection))
540
def cast_to_server(conf, context, server_params, topic, msg):
535
541
"""Sends a message on a topic to a specific server."""
536
return rpc_amqp.cast_to_server(context, server_params, topic, msg,
540
def fanout_cast_to_server(context, server_params, topic, msg):
542
return rpc_amqp.cast_to_server(conf, context, server_params, topic, msg,
543
rpc_amqp.get_connection_pool(conf, Connection))
546
def fanout_cast_to_server(conf, context, server_params, topic, msg):
541
547
"""Sends a message on a fanout exchange to a specific server."""
542
return rpc_amqp.fanout_cast_to_server(context, server_params, topic,
543
msg, Connection.pool)
546
def notify(context, topic, msg):
548
return rpc_amqp.fanout_cast_to_server(conf, context, server_params, topic,
549
msg, rpc_amqp.get_connection_pool(conf, Connection))
552
def notify(conf, context, topic, msg):
547
553
"""Sends a notification event on a topic."""
548
return rpc_amqp.notify(context, topic, msg, Connection.pool)
554
return rpc_amqp.notify(conf, context, topic, msg,
555
rpc_amqp.get_connection_pool(conf, Connection))
552
559
return rpc_amqp.cleanup(Connection.pool)
562
def register_opts(conf):
563
conf.register_opts(qpid_opts)