~hudson-openstack/nova/trunk

« back to all changes in this revision

Viewing changes to nova/fakerabbit.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# vim: tabstop=4 shiftwidth=4 softtabstop=4
 
2
# Copyright [2010] [Anso Labs, LLC]
 
3
 
4
#    Licensed under the Apache License, Version 2.0 (the "License");
 
5
#    you may not use this file except in compliance with the License.
 
6
#    You may obtain a copy of the License at
 
7
 
8
#        http://www.apache.org/licenses/LICENSE-2.0
 
9
 
10
#    Unless required by applicable law or agreed to in writing, software
 
11
#    distributed under the License is distributed on an "AS IS" BASIS,
 
12
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
13
#    See the License for the specific language governing permissions and
 
14
#    limitations under the License.
 
15
 
 
16
""" Based a bit on the carrot.backeds.queue backend... but a lot better """
 
17
 
 
18
import logging
 
19
import Queue as queue
 
20
 
 
21
from carrot.backends import base
 
22
 
 
23
 
 
24
class Message(base.BaseMessage):
 
25
    pass
 
26
 
 
27
 
 
28
class Exchange(object):
 
29
    def __init__(self, name, exchange_type):
 
30
        self.name = name
 
31
        self.exchange_type = exchange_type
 
32
        self._queue = queue.Queue()
 
33
        self._routes = {}
 
34
 
 
35
    def publish(self, message, routing_key=None):
 
36
        logging.debug('(%s) publish (key: %s) %s',
 
37
                      self.name, routing_key, message)
 
38
        if routing_key in self._routes:
 
39
            for f in self._routes[routing_key]:
 
40
                logging.debug('Publishing to route %s', f)
 
41
                f(message, routing_key=routing_key)
 
42
    
 
43
    def bind(self, callback, routing_key):
 
44
        self._routes.setdefault(routing_key, [])
 
45
        self._routes[routing_key].append(callback)
 
46
 
 
47
 
 
48
class Queue(object):
 
49
    def __init__(self, name):
 
50
        self.name = name
 
51
        self._queue = queue.Queue()
 
52
 
 
53
    def __repr__(self):
 
54
        return '<Queue: %s>' % self.name
 
55
    
 
56
    def push(self, message, routing_key=None):
 
57
        self._queue.put(message)
 
58
 
 
59
    def size(self):
 
60
        return self._queue.qsize()
 
61
 
 
62
    def pop(self):
 
63
        return self._queue.get()
 
64
 
 
65
 
 
66
class Backend(object):
 
67
    """ Singleton backend for testing """
 
68
    class __impl(base.BaseBackend):
 
69
        def __init__(self, *args, **kwargs):
 
70
            #super(__impl, self).__init__(*args, **kwargs)
 
71
            self._exchanges = {}
 
72
            self._queues = {}
 
73
        
 
74
        def _reset_all(self):
 
75
            self._exchanges = {}
 
76
            self._queues = {}
 
77
 
 
78
        def queue_declare(self, queue, **kwargs):
 
79
            if queue not in self._queues:
 
80
                logging.debug('Declaring queue %s', queue)
 
81
                self._queues[queue] = Queue(queue) 
 
82
 
 
83
        def exchange_declare(self, exchange, type, *args, **kwargs):
 
84
            if exchange not in self._exchanges:
 
85
                logging.debug('Declaring exchange %s', exchange)
 
86
                self._exchanges[exchange] = Exchange(exchange, type)
 
87
 
 
88
        def queue_bind(self, queue, exchange, routing_key, **kwargs):
 
89
            logging.debug('Binding %s to %s with key %s',
 
90
                          queue, exchange, routing_key)
 
91
            self._exchanges[exchange].bind(self._queues[queue].push,
 
92
                                           routing_key)
 
93
 
 
94
        def get(self, queue, no_ack=False):
 
95
            if not self._queues[queue].size():
 
96
                return None
 
97
            (message_data, content_type, content_encoding) = \
 
98
                    self._queues[queue].pop()
 
99
            message = Message(backend=self, body=message_data,
 
100
                              content_type=content_type,
 
101
                              content_encoding=content_encoding)
 
102
            logging.debug('Getting from %s: %s', queue, message)
 
103
            return message
 
104
 
 
105
        def prepare_message(self, message_data, delivery_mode,
 
106
                            content_type, content_encoding, **kwargs):
 
107
            """Prepare message for sending."""
 
108
            return (message_data, content_type, content_encoding)
 
109
 
 
110
        def publish(self, message, exchange, routing_key, **kwargs):
 
111
            if exchange in self._exchanges:
 
112
                self._exchanges[exchange].publish(
 
113
                        message, routing_key=routing_key)
 
114
 
 
115
 
 
116
    __instance = None
 
117
 
 
118
    def __init__(self, *args, **kwargs):
 
119
        if Backend.__instance is None:
 
120
            Backend.__instance = Backend.__impl(*args, **kwargs)
 
121
        self.__dict__['_Backend__instance'] = Backend.__instance
 
122
 
 
123
    def __getattr__(self, attr):
 
124
        return getattr(self.__instance, attr)
 
125
    
 
126
    def __setattr__(self, attr, value):
 
127
        return setattr(self.__instance, attr, value)
 
128
 
 
129
 
 
130
def reset_all():
 
131
    Backend()._reset_all()