~0x44/nova/bug838466

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# vim: tabstop=4 shiftwidth=4 softtabstop=4

# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

"""Based a bit on the carrot.backeds.queue backend... but a lot better."""

import Queue as queue

from carrot.backends import base
from eventlet import greenthread

from nova import log as logging


LOG = logging.getLogger("nova.fakerabbit")


EXCHANGES = {}
QUEUES = {}


class Message(base.BaseMessage):
    pass


class Exchange(object):
    def __init__(self, name, exchange_type):
        self.name = name
        self.exchange_type = exchange_type
        self._queue = queue.Queue()
        self._routes = {}

    def publish(self, message, routing_key=None):
        LOG.debug(_('(%s) publish (key: %s) %s'),
                  self.name, routing_key, message)
        routing_key = routing_key.split('.')[0]
        if routing_key in self._routes:
            for f in self._routes[routing_key]:
                LOG.debug(_('Publishing to route %s'), f)
                f(message, routing_key=routing_key)

    def bind(self, callback, routing_key):
        self._routes.setdefault(routing_key, [])
        self._routes[routing_key].append(callback)


class Queue(object):
    def __init__(self, name):
        self.name = name
        self._queue = queue.Queue()

    def __repr__(self):
        return '<Queue: %s>' % self.name

    def push(self, message, routing_key=None):
        self._queue.put(message)

    def size(self):
        return self._queue.qsize()

    def pop(self):
        return self._queue.get()


class Backend(base.BaseBackend):
    def queue_declare(self, queue, **kwargs):
        global QUEUES
        if queue not in QUEUES:
            LOG.debug(_('Declaring queue %s'), queue)
            QUEUES[queue] = Queue(queue)

    def exchange_declare(self, exchange, type, *args, **kwargs):
        global EXCHANGES
        if exchange not in EXCHANGES:
            LOG.debug(_('Declaring exchange %s'), exchange)
            EXCHANGES[exchange] = Exchange(exchange, type)

    def queue_bind(self, queue, exchange, routing_key, **kwargs):
        global EXCHANGES
        global QUEUES
        LOG.debug(_('Binding %s to %s with key %s'),
                      queue, exchange, routing_key)
        EXCHANGES[exchange].bind(QUEUES[queue].push, routing_key)

    def declare_consumer(self, queue, callback, *args, **kwargs):
        self.current_queue = queue
        self.current_callback = callback

    def consume(self, limit=None):
        while True:
            item = self.get(self.current_queue)
            if item:
                self.current_callback(item)
                raise StopIteration()
            greenthread.sleep(0)

    def get(self, queue, no_ack=False):
        global QUEUES
        if not queue in QUEUES or not QUEUES[queue].size():
            return None
        (message_data, content_type, content_encoding) = QUEUES[queue].pop()
        message = Message(backend=self, body=message_data,
                          content_type=content_type,
                          content_encoding=content_encoding)
        message.result = True
        LOG.debug(_('Getting from %s: %s'), queue, message)
        return message

    def prepare_message(self, message_data, delivery_mode,
                        content_type, content_encoding, **kwargs):
        """Prepare message for sending."""
        return (message_data, content_type, content_encoding)

    def publish(self, message, exchange, routing_key, **kwargs):
        global EXCHANGES
        if exchange in EXCHANGES:
            EXCHANGES[exchange].publish(message, routing_key=routing_key)


def reset_all():
    global EXCHANGES
    global QUEUES
    EXCHANGES = {}
    QUEUES = {}