~0x44/nova/bug838466

« back to all changes in this revision

Viewing changes to vendor/carrot/backends/pikachu.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
import asyncore
 
2
import weakref
 
3
import functools
 
4
import itertools
 
5
 
 
6
import pika
 
7
 
 
8
from carrot.backends.base import BaseMessage, BaseBackend
 
9
 
 
10
DEFAULT_PORT = 5672
 
11
 
 
12
 
 
13
class Message(BaseMessage):
 
14
 
 
15
    def __init__(self, backend, amqp_message, **kwargs):
 
16
        channel, method, header, body = amqp_message
 
17
        self._channel = channel
 
18
        self._method = method
 
19
        self._header = header
 
20
        self.backend = backend
 
21
 
 
22
        kwargs.update({"body": body,
 
23
                       "delivery_tag": method.delivery_tag,
 
24
                       "content_type": header.content_type,
 
25
                       "content_encoding": header.content_encoding,
 
26
                       "delivery_info": dict(
 
27
                            consumer_tag=method.consumer_tag,
 
28
                            routing_key=method.routing_key,
 
29
                            delivery_tag=method.delivery_tag,
 
30
                            exchange=method.exchange)})
 
31
 
 
32
        super(Message, self).__init__(backend, **kwargs)
 
33
 
 
34
 
 
35
class SyncBackend(BaseBackend):
 
36
    default_port = DEFAULT_PORT
 
37
    _connection_cls = pika.BlockingConnection
 
38
 
 
39
    Message = Message
 
40
 
 
41
    def __init__(self, connection, **kwargs):
 
42
        self.connection = connection
 
43
        self.default_port = kwargs.get("default_port", self.default_port)
 
44
        self._channel_ref = None
 
45
 
 
46
    @property
 
47
    def _channel(self):
 
48
        return callable(self._channel_ref) and self._channel_ref()
 
49
 
 
50
    @property
 
51
    def channel(self):
 
52
        """If no channel exists, a new one is requested."""
 
53
        if not self._channel:
 
54
            self._channel_ref = weakref.ref(self.connection.get_channel())
 
55
        return self._channel
 
56
 
 
57
    def establish_connection(self):
 
58
        """Establish connection to the AMQP broker."""
 
59
        conninfo = self.connection
 
60
        if not conninfo.port:
 
61
            conninfo.port = self.default_port
 
62
        credentials = pika.PlainCredentials(conninfo.userid,
 
63
                                            conninfo.password)
 
64
        return self._connection_cls(pika.ConnectionParameters(
 
65
                                           conninfo.hostname,
 
66
                                           port=conninfo.port,
 
67
                                           virtual_host=conninfo.virtual_host,
 
68
                                           credentials=credentials))
 
69
 
 
70
    def close_connection(self, connection):
 
71
        """Close the AMQP broker connection."""
 
72
        connection.close()
 
73
 
 
74
    def queue_exists(self, queue):
 
75
        return False # FIXME
 
76
 
 
77
    def queue_delete(self, queue, if_unused=False, if_empty=False):
 
78
        """Delete queue by name."""
 
79
        return self.channel.queue_delete(queue=queue, if_unused=if_unused,
 
80
                                         if_empty=if_empty)
 
81
 
 
82
    def queue_purge(self, queue, **kwargs):
 
83
        """Discard all messages in the queue. This will delete the messages
 
84
        and results in an empty queue."""
 
85
        return self.channel.queue_purge(queue=queue)
 
86
 
 
87
    def queue_declare(self, queue, durable, exclusive, auto_delete,
 
88
            warn_if_exists=False):
 
89
        """Declare a named queue."""
 
90
 
 
91
        return self.channel.queue_declare(queue=queue,
 
92
                                          durable=durable,
 
93
                                          exclusive=exclusive,
 
94
                                          auto_delete=auto_delete)
 
95
 
 
96
    def exchange_declare(self, exchange, type, durable, auto_delete):
 
97
        """Declare an named exchange."""
 
98
        return self.channel.exchange_declare(exchange=exchange,
 
99
                                             type=type,
 
100
                                             durable=durable,
 
101
                                             auto_delete=auto_delete)
 
102
 
 
103
    def queue_bind(self, queue, exchange, routing_key, arguments=None):
 
104
        """Bind queue to an exchange using a routing key."""
 
105
        return self.channel.queue_bind(queue=queue,
 
106
                                       exchange=exchange,
 
107
                                       routing_key=routing_key,
 
108
                                       arguments=arguments)
 
109
 
 
110
    def message_to_python(self, raw_message):
 
111
        """Convert encoded message body back to a Python value."""
 
112
        return self.Message(backend=self, amqp_message=raw_message)
 
113
 
 
114
    def get(self, queue, no_ack=False):
 
115
        """Receive a message from a declared queue by name.
 
116
 
 
117
        :returns: A :class:`Message` object if a message was received,
 
118
            ``None`` otherwise. If ``None`` was returned, it probably means
 
119
            there was no messages waiting on the queue.
 
120
 
 
121
        """
 
122
        raw_message = self.channel.basic_get(queue, no_ack=no_ack)
 
123
        if not raw_message:
 
124
            return None
 
125
        return self.message_to_python(raw_message)
 
126
 
 
127
    def declare_consumer(self, queue, no_ack, callback, consumer_tag,
 
128
            nowait=False):
 
129
        """Declare a consumer."""
 
130
 
 
131
        @functools.wraps(callback)
 
132
        def _callback_decode(channel, method, header, body):
 
133
            return callback((channel, method, header, body))
 
134
 
 
135
        return self.channel.basic_consume(_callback_decode,
 
136
                                          queue=queue,
 
137
                                          no_ack=no_ack,
 
138
                                          consumer_tag=consumer_tag)
 
139
 
 
140
    def consume(self, limit=None):
 
141
        """Returns an iterator that waits for one message at a time."""
 
142
        for total_message_count in itertools.count():
 
143
            if limit and total_message_count >= limit:
 
144
                raise StopIteration
 
145
            self.connection.connection.drain_events()
 
146
            yield True
 
147
 
 
148
    def cancel(self, consumer_tag):
 
149
        """Cancel a channel by consumer tag."""
 
150
        if not self._channel:
 
151
            return
 
152
        self.channel.basic_cancel(consumer_tag)
 
153
 
 
154
    def close(self):
 
155
        """Close the channel if open."""
 
156
        if self._channel and not self._channel.handler.channel_close:
 
157
            self._channel.close()
 
158
        self._channel_ref = None
 
159
 
 
160
    def ack(self, delivery_tag):
 
161
        """Acknowledge a message by delivery tag."""
 
162
        return self.channel.basic_ack(delivery_tag)
 
163
 
 
164
    def reject(self, delivery_tag):
 
165
        """Reject a message by deliver tag."""
 
166
        return self.channel.basic_reject(delivery_tag, requeue=False)
 
167
 
 
168
    def requeue(self, delivery_tag):
 
169
        """Reject and requeue a message by delivery tag."""
 
170
        return self.channel.basic_reject(delivery_tag, requeue=True)
 
171
 
 
172
    def prepare_message(self, message_data, delivery_mode, priority=None,
 
173
            content_type=None, content_encoding=None):
 
174
        """Encapsulate data into a AMQP message."""
 
175
        properties = pika.BasicProperties(priority=priority,
 
176
                                          content_type=content_type,
 
177
                                          content_encoding=content_encoding,
 
178
                                          delivery_mode=delivery_mode)
 
179
        return message_data, properties
 
180
 
 
181
    def publish(self, message, exchange, routing_key, mandatory=None,
 
182
            immediate=None, headers=None):
 
183
        """Publish a message to a named exchange."""
 
184
        body, properties = message
 
185
 
 
186
        if headers:
 
187
            properties.headers = headers
 
188
 
 
189
        ret = self.channel.basic_publish(body=body,
 
190
                                         properties=properties,
 
191
                                         exchange=exchange,
 
192
                                         routing_key=routing_key,
 
193
                                         mandatory=mandatory,
 
194
                                         immediate=immediate)
 
195
        if mandatory or immediate:
 
196
            self.close()
 
197
 
 
198
    def qos(self, prefetch_size, prefetch_count, apply_global=False):
 
199
        """Request specific Quality of Service."""
 
200
        self.channel.basic_qos(prefetch_size, prefetch_count,
 
201
                                apply_global)
 
202
 
 
203
    def flow(self, active):
 
204
        """Enable/disable flow from peer."""
 
205
        self.channel.flow(active)
 
206
 
 
207
 
 
208
class AsyncoreBackend(SyncBackend):
 
209
    _connection_cls = pika.AsyncoreConnection