~gandelman-a/ubuntu/precise/nova/UCA_2012.2.1

« back to all changes in this revision

Viewing changes to nova/rpc/impl_qpid.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short
  • Date: 2012-05-24 13:12:53 UTC
  • mfrom: (1.1.55)
  • Revision ID: package-import@ubuntu.com-20120524131253-ommql08fg1en06ut
Tags: 2012.2~f1-0ubuntu1
* New upstream release.
* Prepare for quantal:
  - Dropped debian/patches/upstream/0006-Use-project_id-in-ec2.cloud._format_image.patch
  - Dropped debian/patches/upstream/0005-Populate-image-properties-with-project_id-again.patch
  - Dropped debian/patches/upstream/0004-Fixed-bug-962840-added-a-test-case.patch
  - Dropped debian/patches/upstream/0003-Allow-unprivileged-RADOS-users-to-access-rbd-volumes.patch
  - Dropped debian/patches/upstream/0002-Stop-libvirt-test-from-deleting-instances-dir.patch
  - Dropped debian/patches/upstream/0001-fix-bug-where-nova-ignores-glance-host-in-imageref.patch 
  - Dropped debian/patches/0001-fix-useexisting-deprecation-warnings.patch
* debian/control: Add python-keystone as a dependency. (LP: #907197)
* debian/patches/kombu_tests_timeout.patch: Refreshed.
* debian/nova.conf, debian/nova-common.postinst: Convert to new ini
  file configuration
* debian/patches/nova-manage_flagfile_location.patch: Refreshed

Show diffs side-by-side

added added

removed removed

Lines of Context:
16
16
#    under the License.
17
17
 
18
18
import itertools
 
19
import json
19
20
import time
20
21
import uuid
21
 
import json
22
22
 
23
23
import eventlet
24
24
import greenlet
25
25
import qpid.messaging
26
26
import qpid.messaging.exceptions
27
27
 
28
 
from nova import flags
29
28
from nova import log as logging
30
29
from nova.openstack.common import cfg
31
30
from nova.rpc import amqp as rpc_amqp
78
77
                help='Disable Nagle algorithm'),
79
78
    ]
80
79
 
81
 
FLAGS = flags.FLAGS
82
 
FLAGS.register_opts(qpid_opts)
83
 
 
84
80
 
85
81
class ConsumerBase(object):
86
82
    """Consumer base class."""
147
143
class DirectConsumer(ConsumerBase):
148
144
    """Queue/consumer class for 'direct'"""
149
145
 
150
 
    def __init__(self, session, msg_id, callback):
 
146
    def __init__(self, conf, session, msg_id, callback):
151
147
        """Init a 'direct' queue.
152
148
 
153
149
        'session' is the amqp session to use
165
161
class TopicConsumer(ConsumerBase):
166
162
    """Consumer class for 'topic'"""
167
163
 
168
 
    def __init__(self, session, topic, callback):
 
164
    def __init__(self, conf, session, topic, callback):
169
165
        """Init a 'topic' queue.
170
166
 
171
167
        'session' is the amqp session to use
174
170
        """
175
171
 
176
172
        super(TopicConsumer, self).__init__(session, callback,
177
 
                        "%s/%s" % (FLAGS.control_exchange, topic), {},
 
173
                        "%s/%s" % (conf.control_exchange, topic), {},
178
174
                        topic, {})
179
175
 
180
176
 
181
177
class FanoutConsumer(ConsumerBase):
182
178
    """Consumer class for 'fanout'"""
183
179
 
184
 
    def __init__(self, session, topic, callback):
 
180
    def __init__(self, conf, session, topic, callback):
185
181
        """Init a 'fanout' queue.
186
182
 
187
183
        'session' is the amqp session to use
236
232
 
237
233
class DirectPublisher(Publisher):
238
234
    """Publisher class for 'direct'"""
239
 
    def __init__(self, session, msg_id):
 
235
    def __init__(self, conf, session, msg_id):
240
236
        """Init a 'direct' publisher."""
241
237
        super(DirectPublisher, self).__init__(session, msg_id,
242
238
                                              {"type": "Direct"})
244
240
 
245
241
class TopicPublisher(Publisher):
246
242
    """Publisher class for 'topic'"""
247
 
    def __init__(self, session, topic):
 
243
    def __init__(self, conf, session, topic):
248
244
        """init a 'topic' publisher.
249
245
        """
250
246
        super(TopicPublisher, self).__init__(session,
251
 
                                "%s/%s" % (FLAGS.control_exchange, topic))
 
247
                                "%s/%s" % (conf.control_exchange, topic))
252
248
 
253
249
 
254
250
class FanoutPublisher(Publisher):
255
251
    """Publisher class for 'fanout'"""
256
 
    def __init__(self, session, topic):
 
252
    def __init__(self, conf, session, topic):
257
253
        """init a 'fanout' publisher.
258
254
        """
259
255
        super(FanoutPublisher, self).__init__(session,
262
258
 
263
259
class NotifyPublisher(Publisher):
264
260
    """Publisher class for notifications"""
265
 
    def __init__(self, session, topic):
 
261
    def __init__(self, conf, session, topic):
266
262
        """init a 'topic' publisher.
267
263
        """
268
264
        super(NotifyPublisher, self).__init__(session,
269
 
                                "%s/%s" % (FLAGS.control_exchange, topic),
 
265
                                "%s/%s" % (conf.control_exchange, topic),
270
266
                                {"durable": True})
271
267
 
272
268
 
273
269
class Connection(object):
274
270
    """Connection object."""
275
271
 
276
 
    def __init__(self, server_params=None):
 
272
    pool = None
 
273
 
 
274
    def __init__(self, conf, server_params=None):
277
275
        self.session = None
278
276
        self.consumers = {}
279
277
        self.consumer_thread = None
 
278
        self.conf = conf
280
279
 
281
280
        if server_params is None:
282
281
            server_params = {}
283
282
 
284
 
        default_params = dict(hostname=FLAGS.qpid_hostname,
285
 
                port=FLAGS.qpid_port,
286
 
                username=FLAGS.qpid_username,
287
 
                password=FLAGS.qpid_password)
 
283
        default_params = dict(hostname=self.conf.qpid_hostname,
 
284
                port=self.conf.qpid_port,
 
285
                username=self.conf.qpid_username,
 
286
                password=self.conf.qpid_password)
288
287
 
289
288
        params = server_params
290
289
        for key in default_params.keys():
298
297
        # before we call open
299
298
        self.connection.username = params['username']
300
299
        self.connection.password = params['password']
301
 
        self.connection.sasl_mechanisms = FLAGS.qpid_sasl_mechanisms
302
 
        self.connection.reconnect = FLAGS.qpid_reconnect
303
 
        if FLAGS.qpid_reconnect_timeout:
304
 
            self.connection.reconnect_timeout = FLAGS.qpid_reconnect_timeout
305
 
        if FLAGS.qpid_reconnect_limit:
306
 
            self.connection.reconnect_limit = FLAGS.qpid_reconnect_limit
307
 
        if FLAGS.qpid_reconnect_interval_max:
 
300
        self.connection.sasl_mechanisms = self.conf.qpid_sasl_mechanisms
 
301
        self.connection.reconnect = self.conf.qpid_reconnect
 
302
        if self.conf.qpid_reconnect_timeout:
 
303
            self.connection.reconnect_timeout = (
 
304
                    self.conf.qpid_reconnect_timeout)
 
305
        if self.conf.qpid_reconnect_limit:
 
306
            self.connection.reconnect_limit = self.conf.qpid_reconnect_limit
 
307
        if self.conf.qpid_reconnect_interval_max:
308
308
            self.connection.reconnect_interval_max = (
309
 
                    FLAGS.qpid_reconnect_interval_max)
310
 
        if FLAGS.qpid_reconnect_interval_min:
 
309
                    self.conf.qpid_reconnect_interval_max)
 
310
        if self.conf.qpid_reconnect_interval_min:
311
311
            self.connection.reconnect_interval_min = (
312
 
                    FLAGS.qpid_reconnect_interval_min)
313
 
        if FLAGS.qpid_reconnect_interval:
314
 
            self.connection.reconnect_interval = FLAGS.qpid_reconnect_interval
315
 
        self.connection.hearbeat = FLAGS.qpid_heartbeat
316
 
        self.connection.protocol = FLAGS.qpid_protocol
317
 
        self.connection.tcp_nodelay = FLAGS.qpid_tcp_nodelay
 
312
                    self.conf.qpid_reconnect_interval_min)
 
313
        if self.conf.qpid_reconnect_interval:
 
314
            self.connection.reconnect_interval = (
 
315
                    self.conf.qpid_reconnect_interval)
 
316
        self.connection.hearbeat = self.conf.qpid_heartbeat
 
317
        self.connection.protocol = self.conf.qpid_protocol
 
318
        self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay
318
319
 
319
320
        # Open is part of reconnect -
320
321
        # NOTE(WGH) not sure we need this with the reconnect flags
338
339
            try:
339
340
                self.connection.open()
340
341
            except qpid.messaging.exceptions.ConnectionError, e:
341
 
                LOG.error(_('Unable to connect to AMQP server: %s ') % e)
342
 
                time.sleep(FLAGS.qpid_reconnect_interval or 1)
 
342
                LOG.error(_('Unable to connect to AMQP server: %s'), e)
 
343
                time.sleep(self.conf.qpid_reconnect_interval or 1)
343
344
            else:
344
345
                break
345
346
 
346
 
        LOG.info(_('Connected to AMQP server on %s') % self.broker)
 
347
        LOG.info(_('Connected to AMQP server on %s'), self.broker)
347
348
 
348
349
        self.session = self.connection.session()
349
350
 
386
387
                "%(err_str)s") % log_info)
387
388
 
388
389
        def _declare_consumer():
389
 
            consumer = consumer_cls(self.session, topic, callback)
 
390
            consumer = consumer_cls(self.conf, self.session, topic, callback)
390
391
            self._register_consumer(consumer)
391
392
            return consumer
392
393
 
435
436
                "'%(topic)s': %(err_str)s") % log_info)
436
437
 
437
438
        def _publisher_send():
438
 
            publisher = cls(self.session, topic)
 
439
            publisher = cls(self.conf, self.session, topic)
439
440
            publisher.send(msg)
440
441
 
441
442
        return self.ensure(_connect_error, _publisher_send)
493
494
 
494
495
    def create_consumer(self, topic, proxy, fanout=False):
495
496
        """Create a consumer that calls a method in a proxy object"""
 
497
        proxy_cb = rpc_amqp.ProxyCallback(self.conf, proxy,
 
498
                rpc_amqp.get_connection_pool(self, Connection))
 
499
 
496
500
        if fanout:
497
 
            consumer = FanoutConsumer(self.session, topic,
498
 
                    rpc_amqp.ProxyCallback(proxy, Connection.pool))
 
501
            consumer = FanoutConsumer(self.conf, self.session, topic, proxy_cb)
499
502
        else:
500
 
            consumer = TopicConsumer(self.session, topic,
501
 
                    rpc_amqp.ProxyCallback(proxy, Connection.pool))
 
503
            consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb)
 
504
 
502
505
        self._register_consumer(consumer)
 
506
 
503
507
        return consumer
504
508
 
505
509
 
506
 
Connection.pool = rpc_amqp.Pool(connection_cls=Connection)
507
 
 
508
 
 
509
 
def create_connection(new=True):
 
510
def create_connection(conf, new=True):
510
511
    """Create a connection"""
511
 
    return rpc_amqp.create_connection(new, Connection.pool)
512
 
 
513
 
 
514
 
def multicall(context, topic, msg, timeout=None):
 
512
    return rpc_amqp.create_connection(conf, new,
 
513
            rpc_amqp.get_connection_pool(conf, Connection))
 
514
 
 
515
 
 
516
def multicall(conf, context, topic, msg, timeout=None):
515
517
    """Make a call that returns multiple times."""
516
 
    return rpc_amqp.multicall(context, topic, msg, timeout, Connection.pool)
517
 
 
518
 
 
519
 
def call(context, topic, msg, timeout=None):
 
518
    return rpc_amqp.multicall(conf, context, topic, msg, timeout,
 
519
            rpc_amqp.get_connection_pool(conf, Connection))
 
520
 
 
521
 
 
522
def call(conf, context, topic, msg, timeout=None):
520
523
    """Sends a message on a topic and wait for a response."""
521
 
    return rpc_amqp.call(context, topic, msg, timeout, Connection.pool)
522
 
 
523
 
 
524
 
def cast(context, topic, msg):
 
524
    return rpc_amqp.call(conf, context, topic, msg, timeout,
 
525
            rpc_amqp.get_connection_pool(conf, Connection))
 
526
 
 
527
 
 
528
def cast(conf, context, topic, msg):
525
529
    """Sends a message on a topic without waiting for a response."""
526
 
    return rpc_amqp.cast(context, topic, msg, Connection.pool)
527
 
 
528
 
 
529
 
def fanout_cast(context, topic, msg):
 
530
    return rpc_amqp.cast(conf, context, topic, msg,
 
531
            rpc_amqp.get_connection_pool(conf, Connection))
 
532
 
 
533
 
 
534
def fanout_cast(conf, context, topic, msg):
530
535
    """Sends a message on a fanout exchange without waiting for a response."""
531
 
    return rpc_amqp.fanout_cast(context, topic, msg, Connection.pool)
532
 
 
533
 
 
534
 
def cast_to_server(context, server_params, topic, msg):
 
536
    return rpc_amqp.fanout_cast(conf, context, topic, msg,
 
537
            rpc_amqp.get_connection_pool(conf, Connection))
 
538
 
 
539
 
 
540
def cast_to_server(conf, context, server_params, topic, msg):
535
541
    """Sends a message on a topic to a specific server."""
536
 
    return rpc_amqp.cast_to_server(context, server_params, topic, msg,
537
 
            Connection.pool)
538
 
 
539
 
 
540
 
def fanout_cast_to_server(context, server_params, topic, msg):
 
542
    return rpc_amqp.cast_to_server(conf, context, server_params, topic, msg,
 
543
            rpc_amqp.get_connection_pool(conf, Connection))
 
544
 
 
545
 
 
546
def fanout_cast_to_server(conf, context, server_params, topic, msg):
541
547
    """Sends a message on a fanout exchange to a specific server."""
542
 
    return rpc_amqp.fanout_cast_to_server(context, server_params, topic,
543
 
            msg, Connection.pool)
544
 
 
545
 
 
546
 
def notify(context, topic, msg):
 
548
    return rpc_amqp.fanout_cast_to_server(conf, context, server_params, topic,
 
549
            msg, rpc_amqp.get_connection_pool(conf, Connection))
 
550
 
 
551
 
 
552
def notify(conf, context, topic, msg):
547
553
    """Sends a notification event on a topic."""
548
 
    return rpc_amqp.notify(context, topic, msg, Connection.pool)
 
554
    return rpc_amqp.notify(conf, context, topic, msg,
 
555
            rpc_amqp.get_connection_pool(conf, Connection))
549
556
 
550
557
 
551
558
def cleanup():
552
559
    return rpc_amqp.cleanup(Connection.pool)
 
560
 
 
561
 
 
562
def register_opts(conf):
 
563
    conf.register_opts(qpid_opts)