~ubuntu-branches/ubuntu/saucy/nova/saucy-proposed

« back to all changes in this revision

Viewing changes to nova/testing/fake/rabbit.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short
  • Date: 2012-05-24 13:12:53 UTC
  • mfrom: (1.1.55)
  • Revision ID: package-import@ubuntu.com-20120524131253-ommql08fg1en06ut
Tags: 2012.2~f1-0ubuntu1
* New upstream release.
* Prepare for quantal:
  - Dropped debian/patches/upstream/0006-Use-project_id-in-ec2.cloud._format_image.patch
  - Dropped debian/patches/upstream/0005-Populate-image-properties-with-project_id-again.patch
  - Dropped debian/patches/upstream/0004-Fixed-bug-962840-added-a-test-case.patch
  - Dropped debian/patches/upstream/0003-Allow-unprivileged-RADOS-users-to-access-rbd-volumes.patch
  - Dropped debian/patches/upstream/0002-Stop-libvirt-test-from-deleting-instances-dir.patch
  - Dropped debian/patches/upstream/0001-fix-bug-where-nova-ignores-glance-host-in-imageref.patch 
  - Dropped debian/patches/0001-fix-useexisting-deprecation-warnings.patch
* debian/control: Add python-keystone as a dependency. (LP: #907197)
* debian/patches/kombu_tests_timeout.patch: Refreshed.
* debian/nova.conf, debian/nova-common.postinst: Convert to new ini
  file configuration
* debian/patches/nova-manage_flagfile_location.patch: Refreshed

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# vim: tabstop=4 shiftwidth=4 softtabstop=4
2
 
 
3
 
# Copyright 2010 United States Government as represented by the
4
 
# Administrator of the National Aeronautics and Space Administration.
5
 
# All Rights Reserved.
6
 
#
7
 
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
8
 
#    not use this file except in compliance with the License. You may obtain
9
 
#    a copy of the License at
10
 
#
11
 
#         http://www.apache.org/licenses/LICENSE-2.0
12
 
#
13
 
#    Unless required by applicable law or agreed to in writing, software
14
 
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15
 
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16
 
#    License for the specific language governing permissions and limitations
17
 
#    under the License.
18
 
 
19
 
"""Based a bit on the carrot.backends.queue backend... but a lot better."""
20
 
 
21
 
import Queue as queue
22
 
 
23
 
from carrot.backends import base
24
 
from eventlet import greenthread
25
 
 
26
 
from nova import log as logging
27
 
 
28
 
 
29
 
LOG = logging.getLogger(__name__)
30
 
 
31
 
 
32
 
EXCHANGES = {}
33
 
QUEUES = {}
34
 
CONSUMERS = {}
35
 
 
36
 
 
37
 
class Message(base.BaseMessage):
38
 
    pass
39
 
 
40
 
 
41
 
class Exchange(object):
42
 
    def __init__(self, name, exchange_type):
43
 
        self.name = name
44
 
        self.exchange_type = exchange_type
45
 
        self._queue = queue.Queue()
46
 
        self._routes = {}
47
 
 
48
 
    def publish(self, message, routing_key=None):
49
 
        nm = self.name
50
 
        LOG.debug(_('(%(nm)s) publish (key: %(routing_key)s)'
51
 
                ' %(message)s') % locals())
52
 
        if routing_key in self._routes:
53
 
            for f in self._routes[routing_key]:
54
 
                LOG.debug(_('Publishing to route %s'), f)
55
 
                f(message, routing_key=routing_key)
56
 
 
57
 
    def bind(self, callback, routing_key):
58
 
        self._routes.setdefault(routing_key, [])
59
 
        self._routes[routing_key].append(callback)
60
 
 
61
 
 
62
 
class Queue(object):
63
 
    def __init__(self, name):
64
 
        self.name = name
65
 
        self._queue = queue.Queue()
66
 
 
67
 
    def __repr__(self):
68
 
        return '<Queue: %s>' % self.name
69
 
 
70
 
    def push(self, message, routing_key=None):
71
 
        self._queue.put(message)
72
 
 
73
 
    def size(self):
74
 
        return self._queue.qsize()
75
 
 
76
 
    def pop(self):
77
 
        return self._queue.get()
78
 
 
79
 
 
80
 
class Backend(base.BaseBackend):
81
 
    def queue_declare(self, queue, **kwargs):
82
 
        global QUEUES
83
 
        if queue not in QUEUES:
84
 
            LOG.debug(_('Declaring queue %s'), queue)
85
 
            QUEUES[queue] = Queue(queue)
86
 
 
87
 
    def exchange_declare(self, exchange, type, *args, **kwargs):
88
 
        global EXCHANGES
89
 
        if exchange not in EXCHANGES:
90
 
            LOG.debug(_('Declaring exchange %s'), exchange)
91
 
            EXCHANGES[exchange] = Exchange(exchange, type)
92
 
 
93
 
    def queue_bind(self, queue, exchange, routing_key, **kwargs):
94
 
        global EXCHANGES
95
 
        global QUEUES
96
 
        LOG.debug(_('Binding %(queue)s to %(exchange)s with'
97
 
                ' key %(routing_key)s') % locals())
98
 
        EXCHANGES[exchange].bind(QUEUES[queue].push, routing_key)
99
 
 
100
 
    def declare_consumer(self, queue, callback, consumer_tag, *args, **kwargs):
101
 
        global CONSUMERS
102
 
        LOG.debug("Adding consumer %s", consumer_tag)
103
 
        CONSUMERS[consumer_tag] = (queue, callback)
104
 
 
105
 
    def cancel(self, consumer_tag):
106
 
        global CONSUMERS
107
 
        LOG.debug("Removing consumer %s", consumer_tag)
108
 
        del CONSUMERS[consumer_tag]
109
 
 
110
 
    def consume(self, limit=None):
111
 
        global CONSUMERS
112
 
        num = 0
113
 
        while True:
114
 
            for (queue, callback) in CONSUMERS.itervalues():
115
 
                item = self.get(queue)
116
 
                if item:
117
 
                    callback(item)
118
 
                    num += 1
119
 
                    yield
120
 
                    if limit and num == limit:
121
 
                        raise StopIteration()
122
 
            greenthread.sleep(0.1)
123
 
 
124
 
    def get(self, queue, no_ack=False):
125
 
        global QUEUES
126
 
        if not queue in QUEUES or not QUEUES[queue].size():
127
 
            return None
128
 
        (message_data, content_type, content_encoding) = QUEUES[queue].pop()
129
 
        message = Message(backend=self, body=message_data,
130
 
                          content_type=content_type,
131
 
                          content_encoding=content_encoding)
132
 
        message.result = True
133
 
        LOG.debug(_('Getting from %(queue)s: %(message)s') % locals())
134
 
        return message
135
 
 
136
 
    def prepare_message(self, message_data, delivery_mode,
137
 
                        content_type, content_encoding, **kwargs):
138
 
        """Prepare message for sending."""
139
 
        return (message_data, content_type, content_encoding)
140
 
 
141
 
    def publish(self, message, exchange, routing_key, **kwargs):
142
 
        global EXCHANGES
143
 
        if exchange in EXCHANGES:
144
 
            EXCHANGES[exchange].publish(message, routing_key=routing_key)
145
 
 
146
 
 
147
 
def reset_all():
148
 
    global EXCHANGES
149
 
    global QUEUES
150
 
    global CONSUMERS
151
 
    EXCHANGES = {}
152
 
    QUEUES = {}
153
 
    CONSUMERS = {}