~ubuntu-cloud-archive/ubuntu/precise/nova/trunk

« back to all changes in this revision

Viewing changes to nova/openstack/common/rpc/impl_qpid.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short, Adam Gandelman, Chuck Short
  • Date: 2012-07-06 10:18:33 UTC
  • mfrom: (1.1.58)
  • Revision ID: package-import@ubuntu.com-20120706101833-wp2nv392mpe9re8p
Tags: 2012.2~f2-0ubuntu1
[ Adam Gandelman ]
* Use new rootwrap configuration structure:
  - debian/nova-{compute, network, volume}.{pyinstall, pyremove}: Dropped.
  - debian/nova-common.dirs: Add /etc/nova/rootwrap.d/.
  - debian/nova-common.install: Install /etc/nova/rootwrap.conf.
  - debian/debian/nova.conf: Reference rootwrap.conf in calls to
    nova-rootwrap.
  - debian/nova-{compute, network, volume}.install: Install corresponding
    filter in /etc/nova/rootwrap.d/
* debian/rules: Install logging_sample.conf to /etc/nova/logging.conf
  as part of nova-common.
* debian/pydist-overrides: Add setuptools-git.
* debian/control: Add python-setuptools-git as a Build-Depends.
* debian/rules: Do not remove nova.egg-info during auto_clean.  Now that
  upstream has moved to setuptools-git, doing so results in missing files
  from built package.

[ Chuck Short ]
* New upstream release.

Show diffs side-by-side

added added

removed removed

Lines of Context:
77
77
    cfg.BoolOpt('qpid_tcp_nodelay',
78
78
                default=True,
79
79
                help='Disable Nagle algorithm'),
80
 
    ]
 
80
]
81
81
 
82
82
cfg.CONF.register_opts(qpid_opts)
83
83
 
161
161
        """
162
162
 
163
163
        super(DirectConsumer, self).__init__(session, callback,
164
 
                        "%s/%s" % (msg_id, msg_id),
165
 
                        {"type": "direct"},
166
 
                        msg_id,
167
 
                        {"exclusive": True})
 
164
                                             "%s/%s" % (msg_id, msg_id),
 
165
                                             {"type": "direct"},
 
166
                                             msg_id,
 
167
                                             {"exclusive": True})
168
168
 
169
169
 
170
170
class TopicConsumer(ConsumerBase):
181
181
        """
182
182
 
183
183
        super(TopicConsumer, self).__init__(session, callback,
184
 
                        "%s/%s" % (conf.control_exchange, topic), {},
185
 
                        name or topic, {})
 
184
                                            "%s/%s" % (conf.control_exchange,
 
185
                                                       topic),
 
186
                                            {}, name or topic, {})
186
187
 
187
188
 
188
189
class FanoutConsumer(ConsumerBase):
196
197
        'callback' is the callback to call when messages are received
197
198
        """
198
199
 
199
 
        super(FanoutConsumer, self).__init__(session, callback,
200
 
                        "%s_fanout" % topic,
201
 
                        {"durable": False, "type": "fanout"},
202
 
                        "%s_fanout_%s" % (topic, uuid.uuid4().hex),
203
 
                        {"exclusive": True})
 
200
        super(FanoutConsumer, self).__init__(
 
201
            session, callback,
 
202
            "%s_fanout" % topic,
 
203
            {"durable": False, "type": "fanout"},
 
204
            "%s_fanout_%s" % (topic, uuid.uuid4().hex),
 
205
            {"exclusive": True})
204
206
 
205
207
 
206
208
class Publisher(object):
254
256
    def __init__(self, conf, session, topic):
255
257
        """init a 'topic' publisher.
256
258
        """
257
 
        super(TopicPublisher, self).__init__(session,
258
 
                                "%s/%s" % (conf.control_exchange, topic))
 
259
        super(TopicPublisher, self).__init__(
 
260
            session,
 
261
            "%s/%s" % (conf.control_exchange, topic))
259
262
 
260
263
 
261
264
class FanoutPublisher(Publisher):
263
266
    def __init__(self, conf, session, topic):
264
267
        """init a 'fanout' publisher.
265
268
        """
266
 
        super(FanoutPublisher, self).__init__(session,
267
 
                                "%s_fanout" % topic, {"type": "fanout"})
 
269
        super(FanoutPublisher, self).__init__(
 
270
            session,
 
271
            "%s_fanout" % topic, {"type": "fanout"})
268
272
 
269
273
 
270
274
class NotifyPublisher(Publisher):
272
276
    def __init__(self, conf, session, topic):
273
277
        """init a 'topic' publisher.
274
278
        """
275
 
        super(NotifyPublisher, self).__init__(session,
276
 
                                "%s/%s" % (conf.control_exchange, topic),
277
 
                                {"durable": True})
 
279
        super(NotifyPublisher, self).__init__(
 
280
            session,
 
281
            "%s/%s" % (conf.control_exchange, topic),
 
282
            {"durable": True})
278
283
 
279
284
 
280
285
class Connection(object):
292
297
            server_params = {}
293
298
 
294
299
        default_params = dict(hostname=self.conf.qpid_hostname,
295
 
                port=self.conf.qpid_port,
296
 
                username=self.conf.qpid_username,
297
 
                password=self.conf.qpid_password)
 
300
                              port=self.conf.qpid_port,
 
301
                              username=self.conf.qpid_username,
 
302
                              password=self.conf.qpid_password)
298
303
 
299
304
        params = server_params
300
305
        for key in default_params.keys():
312
317
        self.connection.reconnect = self.conf.qpid_reconnect
313
318
        if self.conf.qpid_reconnect_timeout:
314
319
            self.connection.reconnect_timeout = (
315
 
                    self.conf.qpid_reconnect_timeout)
 
320
                self.conf.qpid_reconnect_timeout)
316
321
        if self.conf.qpid_reconnect_limit:
317
322
            self.connection.reconnect_limit = self.conf.qpid_reconnect_limit
318
323
        if self.conf.qpid_reconnect_interval_max:
319
324
            self.connection.reconnect_interval_max = (
320
 
                    self.conf.qpid_reconnect_interval_max)
 
325
                self.conf.qpid_reconnect_interval_max)
321
326
        if self.conf.qpid_reconnect_interval_min:
322
327
            self.connection.reconnect_interval_min = (
323
 
                    self.conf.qpid_reconnect_interval_min)
 
328
                self.conf.qpid_reconnect_interval_min)
324
329
        if self.conf.qpid_reconnect_interval:
325
330
            self.connection.reconnect_interval = (
326
 
                    self.conf.qpid_reconnect_interval)
 
331
                self.conf.qpid_reconnect_interval)
327
332
        self.connection.hearbeat = self.conf.qpid_heartbeat
328
333
        self.connection.protocol = self.conf.qpid_protocol
329
334
        self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay
395
400
        def _connect_error(exc):
396
401
            log_info = {'topic': topic, 'err_str': str(exc)}
397
402
            LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
398
 
                "%(err_str)s") % log_info)
 
403
                      "%(err_str)s") % log_info)
399
404
 
400
405
        def _declare_consumer():
401
406
            consumer = consumer_cls(self.conf, self.session, topic, callback)
410
415
        def _error_callback(exc):
411
416
            if isinstance(exc, qpid.messaging.exceptions.Empty):
412
417
                LOG.exception(_('Timed out waiting for RPC response: %s') %
413
 
                        str(exc))
 
418
                              str(exc))
414
419
                raise rpc_common.Timeout()
415
420
            else:
416
421
                LOG.exception(_('Failed to consume message from queue: %s') %
417
 
                        str(exc))
 
422
                              str(exc))
418
423
 
419
424
        def _consume():
420
425
            nxt_receiver = self.session.next_receiver(timeout=timeout)
444
449
        def _connect_error(exc):
445
450
            log_info = {'topic': topic, 'err_str': str(exc)}
446
451
            LOG.exception(_("Failed to publish message to topic "
447
 
                "'%(topic)s': %(err_str)s") % log_info)
 
452
                          "'%(topic)s': %(err_str)s") % log_info)
448
453
 
449
454
        def _publisher_send():
450
455
            publisher = cls(self.conf, self.session, topic)
508
513
 
509
514
    def create_consumer(self, topic, proxy, fanout=False):
510
515
        """Create a consumer that calls a method in a proxy object"""
511
 
        proxy_cb = rpc_amqp.ProxyCallback(self.conf, proxy,
512
 
                rpc_amqp.get_connection_pool(self.conf, Connection))
 
516
        proxy_cb = rpc_amqp.ProxyCallback(
 
517
            self.conf, proxy,
 
518
            rpc_amqp.get_connection_pool(self.conf, Connection))
513
519
 
514
520
        if fanout:
515
521
            consumer = FanoutConsumer(self.conf, self.session, topic, proxy_cb)
522
528
 
523
529
    def create_worker(self, topic, proxy, pool_name):
524
530
        """Create a worker that calls a method in a proxy object"""
525
 
        proxy_cb = rpc_amqp.ProxyCallback(self.conf, proxy,
526
 
                rpc_amqp.get_connection_pool(self.conf, Connection))
 
531
        proxy_cb = rpc_amqp.ProxyCallback(
 
532
            self.conf, proxy,
 
533
            rpc_amqp.get_connection_pool(self.conf, Connection))
527
534
 
528
535
        consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb,
529
536
                                 name=pool_name)
535
542
 
536
543
def create_connection(conf, new=True):
537
544
    """Create a connection"""
538
 
    return rpc_amqp.create_connection(conf, new,
539
 
            rpc_amqp.get_connection_pool(conf, Connection))
 
545
    return rpc_amqp.create_connection(
 
546
        conf, new,
 
547
        rpc_amqp.get_connection_pool(conf, Connection))
540
548
 
541
549
 
542
550
def multicall(conf, context, topic, msg, timeout=None):
543
551
    """Make a call that returns multiple times."""
544
 
    return rpc_amqp.multicall(conf, context, topic, msg, timeout,
545
 
            rpc_amqp.get_connection_pool(conf, Connection))
 
552
    return rpc_amqp.multicall(
 
553
        conf, context, topic, msg, timeout,
 
554
        rpc_amqp.get_connection_pool(conf, Connection))
546
555
 
547
556
 
548
557
def call(conf, context, topic, msg, timeout=None):
549
558
    """Sends a message on a topic and wait for a response."""
550
 
    return rpc_amqp.call(conf, context, topic, msg, timeout,
551
 
            rpc_amqp.get_connection_pool(conf, Connection))
 
559
    return rpc_amqp.call(
 
560
        conf, context, topic, msg, timeout,
 
561
        rpc_amqp.get_connection_pool(conf, Connection))
552
562
 
553
563
 
554
564
def cast(conf, context, topic, msg):
555
565
    """Sends a message on a topic without waiting for a response."""
556
 
    return rpc_amqp.cast(conf, context, topic, msg,
557
 
            rpc_amqp.get_connection_pool(conf, Connection))
 
566
    return rpc_amqp.cast(
 
567
        conf, context, topic, msg,
 
568
        rpc_amqp.get_connection_pool(conf, Connection))
558
569
 
559
570
 
560
571
def fanout_cast(conf, context, topic, msg):
561
572
    """Sends a message on a fanout exchange without waiting for a response."""
562
 
    return rpc_amqp.fanout_cast(conf, context, topic, msg,
563
 
            rpc_amqp.get_connection_pool(conf, Connection))
 
573
    return rpc_amqp.fanout_cast(
 
574
        conf, context, topic, msg,
 
575
        rpc_amqp.get_connection_pool(conf, Connection))
564
576
 
565
577
 
566
578
def cast_to_server(conf, context, server_params, topic, msg):
567
579
    """Sends a message on a topic to a specific server."""
568
 
    return rpc_amqp.cast_to_server(conf, context, server_params, topic, msg,
569
 
            rpc_amqp.get_connection_pool(conf, Connection))
 
580
    return rpc_amqp.cast_to_server(
 
581
        conf, context, server_params, topic, msg,
 
582
        rpc_amqp.get_connection_pool(conf, Connection))
570
583
 
571
584
 
572
585
def fanout_cast_to_server(conf, context, server_params, topic, msg):
573
586
    """Sends a message on a fanout exchange to a specific server."""
574
 
    return rpc_amqp.fanout_cast_to_server(conf, context, server_params, topic,
575
 
            msg, rpc_amqp.get_connection_pool(conf, Connection))
 
587
    return rpc_amqp.fanout_cast_to_server(
 
588
        conf, context, server_params, topic, msg,
 
589
        rpc_amqp.get_connection_pool(conf, Connection))
576
590
 
577
591
 
578
592
def notify(conf, context, topic, msg):
579
593
    """Sends a notification event on a topic."""
580
594
    return rpc_amqp.notify(conf, context, topic, msg,
581
 
            rpc_amqp.get_connection_pool(conf, Connection))
 
595
                           rpc_amqp.get_connection_pool(conf, Connection))
582
596
 
583
597
 
584
598
def cleanup():