~ubuntu-branches/ubuntu/saucy/nova/saucy-proposed

« back to all changes in this revision

Viewing changes to nova/rpc/impl_kombu.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short, Chuck Short, Adam Gandleman
  • Date: 2012-01-13 09:51:10 UTC
  • mfrom: (1.1.40)
  • Revision ID: package-import@ubuntu.com-20120113095110-ffd6163drcg77wez
Tags: 2012.1~e3~20120113.12049-0ubuntu1
[Chuck Short]
* New upstream version.
* debian/nova_sudoers, debian/nova-common.install, 
  Switch out to nova-rootwrap. (LP: #681774)
* Add "get-origsource-git" which allows developers to 
  generate a tarball from github, by doing:
  fakeroot debian/rules get-orig-source-git
* debian/debian/nova-objectstore.logrotate: Dont determine
  if we are running Debian or Ubuntu. (LP: #91379)

[Adam Gandleman]
* Removed python-nova.postinst, let dh_python2 generate instead since
  python-support is not a dependency. (LP: #907543)

Show diffs side-by-side

added added

removed removed

Lines of Context:
18
18
import kombu.entity
19
19
import kombu.messaging
20
20
import kombu.connection
 
21
import inspect
21
22
import itertools
22
23
import sys
23
24
import time
24
25
import traceback
25
 
import types
26
26
import uuid
27
27
 
28
28
import eventlet
33
33
from nova import context
34
34
from nova import exception
35
35
from nova import flags
 
36
from nova.rpc import common as rpc_common
36
37
from nova.rpc.common import RemoteError, LOG
37
38
 
38
39
# Needed for tests
289
290
                **options)
290
291
 
291
292
 
 
293
class NotifyPublisher(TopicPublisher):
 
294
    """Publisher class for 'notify'"""
 
295
 
 
296
    def __init__(self, *args, **kwargs):
 
297
        self.durable = kwargs.pop('durable', FLAGS.rabbit_durable_queues)
 
298
        super(NotifyPublisher, self).__init__(*args, **kwargs)
 
299
 
 
300
    def reconnect(self, channel):
 
301
        super(NotifyPublisher, self).reconnect(channel)
 
302
 
 
303
        # NOTE(jerdfelt): Normally the consumer would create the queue, but
 
304
        # we do this to ensure that messages don't get dropped if the
 
305
        # consumer is started after we do
 
306
        queue = kombu.entity.Queue(channel=channel,
 
307
                exchange=self.exchange,
 
308
                durable=self.durable,
 
309
                name=self.routing_key,
 
310
                routing_key=self.routing_key)
 
311
        queue.declare()
 
312
 
 
313
 
292
314
class Connection(object):
293
315
    """Connection object."""
294
316
 
424
446
                pass
425
447
            self.consumer_thread = None
426
448
 
427
 
    def publisher_send(self, cls, topic, msg):
 
449
    def publisher_send(self, cls, topic, msg, **kwargs):
428
450
        """Send to a publisher based on the publisher class"""
429
451
        while True:
430
 
            publisher = None
431
452
            try:
432
 
                publisher = cls(self.channel, topic)
 
453
                publisher = cls(self.channel, topic, **kwargs)
433
454
                publisher.send(msg)
434
455
                return
435
456
            except self.connection.connection_errors, e:
436
457
                LOG.exception(_('Failed to publish message %s' % str(e)))
437
458
                try:
438
459
                    self.reconnect()
439
 
                    if publisher:
440
 
                        publisher.reconnect(self.channel)
441
460
                except self.connection.connection_errors, e:
442
461
                    pass
443
462
 
468
487
        """Send a 'fanout' message"""
469
488
        self.publisher_send(FanoutPublisher, topic, msg)
470
489
 
 
490
    def notify_send(self, topic, msg, **kwargs):
 
491
        """Send a notify message on a topic"""
 
492
        self.publisher_send(NotifyPublisher, topic, msg, **kwargs)
 
493
 
471
494
    def consume(self, limit=None):
472
495
        """Consume from all queues/consumers"""
473
496
        it = self.iterconsume(limit=limit)
512
535
        order_as_stack=True)
513
536
 
514
537
 
515
 
class ConnectionContext(object):
 
538
class ConnectionContext(rpc_common.Connection):
516
539
    """The class that is actually returned to the caller of
517
540
    create_connection().  This is a essentially a wrapper around
518
541
    Connection that supports 'with' and can return a new Connection or
569
592
        """Caller is done with this connection."""
570
593
        self._done()
571
594
 
 
595
    def create_consumer(self, topic, proxy, fanout=False):
 
596
        self.connection.create_consumer(topic, proxy, fanout)
 
597
 
 
598
    def consume_in_thread(self):
 
599
        self.connection.consume_in_thread()
 
600
 
572
601
    def __getattr__(self, key):
573
602
        """Proxy all other calls to the Connection instance"""
574
603
        if self.connection:
613
642
        object and calls it.
614
643
        """
615
644
 
616
 
        node_func = getattr(self.proxy, str(method))
617
 
        node_args = dict((str(k), v) for k, v in args.iteritems())
618
 
        # NOTE(vish): magic is fun!
619
645
        try:
 
646
            node_func = getattr(self.proxy, str(method))
 
647
            node_args = dict((str(k), v) for k, v in args.iteritems())
 
648
            # NOTE(vish): magic is fun!
620
649
            rval = node_func(context=ctxt, **node_args)
621
650
            # Check if the result was a generator
622
 
            if isinstance(rval, types.GeneratorType):
 
651
            if inspect.isgenerator(rval):
623
652
                for x in rval:
624
653
                    ctxt.reply(x, None)
625
654
            else:
766
795
        conn.fanout_send(topic, msg)
767
796
 
768
797
 
 
798
def notify(context, topic, msg):
 
799
    """Sends a notification event on a topic."""
 
800
    LOG.debug(_('Sending notification on %s...'), topic)
 
801
    _pack_context(msg, context)
 
802
    with ConnectionContext() as conn:
 
803
        conn.notify_send(topic, msg, durable=True)
 
804
 
 
805
 
769
806
def msg_reply(msg_id, reply=None, failure=None, ending=False):
770
807
    """Sends a reply or an error on the channel signified by msg_id.
771
808