~soren/nova/iptables-security-groups

« back to all changes in this revision

Viewing changes to vendor/carrot/backends/pyamqplib.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
"""
 
2
 
 
3
`amqplib`_ backend for carrot.
 
4
 
 
5
.. _`amqplib`: http://barryp.org/software/py-amqplib/
 
6
 
 
7
"""
 
8
from amqplib import client_0_8 as amqp
 
9
from amqplib.client_0_8.exceptions import AMQPChannelException
 
10
from amqplib.client_0_8.serialization import AMQPReader, AMQPWriter
 
11
from carrot.backends.base import BaseMessage, BaseBackend
 
12
from itertools import count
 
13
import warnings
 
14
import weakref
 
15
 
 
16
DEFAULT_PORT = 5672
 
17
 
 
18
 
 
19
class Connection(amqp.Connection):
 
20
 
 
21
    def drain_events(self, allowed_methods=None):
 
22
        """Wait for an event on any channel."""
 
23
        return self.wait_multi(self.channels.values())
 
24
 
 
25
    def wait_multi(self, channels, allowed_methods=None):
 
26
        """Wait for an event on a channel."""
 
27
        chanmap = dict((chan.channel_id, chan) for chan in channels)
 
28
        chanid, method_sig, args, content = self._wait_multiple(
 
29
                chanmap.keys(), allowed_methods)
 
30
 
 
31
        channel = chanmap[chanid]
 
32
 
 
33
        if content \
 
34
        and channel.auto_decode \
 
35
        and hasattr(content, 'content_encoding'):
 
36
            try:
 
37
                content.body = content.body.decode(content.content_encoding)
 
38
            except Exception:
 
39
                pass
 
40
 
 
41
        amqp_method = channel._METHOD_MAP.get(method_sig, None)
 
42
 
 
43
        if amqp_method is None:
 
44
            raise Exception('Unknown AMQP method (%d, %d)' % method_sig)
 
45
 
 
46
        if content is None:
 
47
            return amqp_method(channel, args)
 
48
        else:
 
49
            return amqp_method(channel, args, content)
 
50
 
 
51
    def _wait_multiple(self, channel_ids, allowed_methods):
 
52
        for channel_id in channel_ids:
 
53
            method_queue = self.channels[channel_id].method_queue
 
54
            for queued_method in method_queue:
 
55
                method_sig = queued_method[0]
 
56
                if (allowed_methods is None) \
 
57
                or (method_sig in allowed_methods) \
 
58
                or (method_sig == (20, 40)):
 
59
                    method_queue.remove(queued_method)
 
60
                    method_sig, args, content = queued_method
 
61
                    return channel_id, method_sig, args, content
 
62
 
 
63
        # Nothing queued, need to wait for a method from the peer
 
64
        while True:
 
65
            channel, method_sig, args, content = \
 
66
                self.method_reader.read_method()
 
67
 
 
68
            if (channel in channel_ids) \
 
69
            and ((allowed_methods is None) \
 
70
                or (method_sig in allowed_methods) \
 
71
                or (method_sig == (20, 40))):
 
72
                return channel, method_sig, args, content
 
73
 
 
74
            # Not the channel and/or method we were looking for. Queue
 
75
            # this method for later
 
76
            self.channels[channel].method_queue.append((method_sig,
 
77
                                                        args,
 
78
                                                        content))
 
79
 
 
80
            #
 
81
            # If we just queued up a method for channel 0 (the Connection
 
82
            # itself) it's probably a close method in reaction to some
 
83
            # error, so deal with it right away.
 
84
            #
 
85
            if channel == 0:
 
86
                self.wait()
 
87
 
 
88
 
 
89
class QueueAlreadyExistsWarning(UserWarning):
 
90
    """A queue with that name already exists, so a recently changed
 
91
    ``routing_key`` or other settings might be ignored unless you
 
92
    rename the queue or restart the broker."""
 
93
 
 
94
 
 
95
class Message(BaseMessage):
 
96
    """A message received by the broker.
 
97
 
 
98
    Usually you don't insantiate message objects yourself, but receive
 
99
    them using a :class:`carrot.messaging.Consumer`.
 
100
 
 
101
    :param backend: see :attr:`backend`.
 
102
    :param amqp_message: see :attr:`_amqp_message`.
 
103
 
 
104
 
 
105
    .. attribute:: body
 
106
 
 
107
        The message body.
 
108
 
 
109
    .. attribute:: delivery_tag
 
110
 
 
111
        The message delivery tag, uniquely identifying this message.
 
112
 
 
113
    .. attribute:: backend
 
114
 
 
115
        The message backend used.
 
116
        A subclass of :class:`carrot.backends.base.BaseBackend`.
 
117
 
 
118
    .. attribute:: _amqp_message
 
119
 
 
120
        A :class:`amqplib.client_0_8.basic_message.Message` instance.
 
121
        This is a private attribute and should not be accessed by
 
122
        production code.
 
123
 
 
124
    """
 
125
 
 
126
    def __init__(self, backend, amqp_message, **kwargs):
 
127
        self._amqp_message = amqp_message
 
128
        self.backend = backend
 
129
 
 
130
        for attr_name in ("body",
 
131
                          "delivery_tag",
 
132
                          "content_type",
 
133
                          "content_encoding",
 
134
                          "delivery_info"):
 
135
            kwargs[attr_name] = getattr(amqp_message, attr_name, None)
 
136
 
 
137
        super(Message, self).__init__(backend, **kwargs)
 
138
 
 
139
 
 
140
class Backend(BaseBackend):
 
141
    """amqplib backend
 
142
 
 
143
    :param connection: see :attr:`connection`.
 
144
 
 
145
 
 
146
    .. attribute:: connection
 
147
 
 
148
    A :class:`carrot.connection.BrokerConnection` instance. An established
 
149
    connection to the broker.
 
150
 
 
151
    """
 
152
    default_port = DEFAULT_PORT
 
153
 
 
154
    Message = Message
 
155
 
 
156
    def __init__(self, connection, **kwargs):
 
157
        self.connection = connection
 
158
        self.default_port = kwargs.get("default_port", self.default_port)
 
159
        self._channel_ref = None
 
160
 
 
161
    @property
 
162
    def _channel(self):
 
163
        return callable(self._channel_ref) and self._channel_ref()
 
164
 
 
165
    @property
 
166
    def channel(self):
 
167
        """If no channel exists, a new one is requested."""
 
168
        if not self._channel:
 
169
            self._channel_ref = weakref.ref(self.connection.get_channel())
 
170
        return self._channel
 
171
 
 
172
    def establish_connection(self):
 
173
        """Establish connection to the AMQP broker."""
 
174
        conninfo = self.connection
 
175
        if not conninfo.port:
 
176
            conninfo.port = self.default_port
 
177
        return Connection(host=conninfo.host,
 
178
                          userid=conninfo.userid,
 
179
                          password=conninfo.password,
 
180
                          virtual_host=conninfo.virtual_host,
 
181
                          insist=conninfo.insist,
 
182
                          ssl=conninfo.ssl,
 
183
                          connect_timeout=conninfo.connect_timeout)
 
184
 
 
185
    def close_connection(self, connection):
 
186
        """Close the AMQP broker connection."""
 
187
        connection.close()
 
188
 
 
189
    def queue_exists(self, queue):
 
190
        """Check if a queue has been declared.
 
191
 
 
192
        :rtype bool:
 
193
 
 
194
        """
 
195
        try:
 
196
            self.channel.queue_declare(queue=queue, passive=True)
 
197
        except AMQPChannelException, e:
 
198
            if e.amqp_reply_code == 404:
 
199
                return False
 
200
            raise e
 
201
        else:
 
202
            return True
 
203
 
 
204
    def queue_delete(self, queue, if_unused=False, if_empty=False):
 
205
        """Delete queue by name."""
 
206
        return self.channel.queue_delete(queue, if_unused, if_empty)
 
207
 
 
208
    def queue_purge(self, queue, **kwargs):
 
209
        """Discard all messages in the queue. This will delete the messages
 
210
        and results in an empty queue."""
 
211
        return self.channel.queue_purge(queue=queue)
 
212
 
 
213
    def queue_declare(self, queue, durable, exclusive, auto_delete,
 
214
            warn_if_exists=False):
 
215
        """Declare a named queue."""
 
216
 
 
217
        if warn_if_exists and self.queue_exists(queue):
 
218
            warnings.warn(QueueAlreadyExistsWarning(
 
219
                QueueAlreadyExistsWarning.__doc__))
 
220
 
 
221
        return self.channel.queue_declare(queue=queue,
 
222
                                          durable=durable,
 
223
                                          exclusive=exclusive,
 
224
                                          auto_delete=auto_delete)
 
225
 
 
226
    def exchange_declare(self, exchange, type, durable, auto_delete):
 
227
        """Declare an named exchange."""
 
228
        return self.channel.exchange_declare(exchange=exchange,
 
229
                                             type=type,
 
230
                                             durable=durable,
 
231
                                             auto_delete=auto_delete)
 
232
 
 
233
    def queue_bind(self, queue, exchange, routing_key, arguments=None):
 
234
        """Bind queue to an exchange using a routing key."""
 
235
        return self.channel.queue_bind(queue=queue,
 
236
                                       exchange=exchange,
 
237
                                       routing_key=routing_key,
 
238
                                       arguments=arguments)
 
239
 
 
240
    def message_to_python(self, raw_message):
 
241
        """Convert encoded message body back to a Python value."""
 
242
        return self.Message(backend=self, amqp_message=raw_message)
 
243
 
 
244
    def get(self, queue, no_ack=False):
 
245
        """Receive a message from a declared queue by name.
 
246
 
 
247
        :returns: A :class:`Message` object if a message was received,
 
248
            ``None`` otherwise. If ``None`` was returned, it probably means
 
249
            there was no messages waiting on the queue.
 
250
 
 
251
        """
 
252
        raw_message = self.channel.basic_get(queue, no_ack=no_ack)
 
253
        if not raw_message:
 
254
            return None
 
255
        return self.message_to_python(raw_message)
 
256
 
 
257
    def declare_consumer(self, queue, no_ack, callback, consumer_tag,
 
258
            nowait=False):
 
259
        """Declare a consumer."""
 
260
        return self.channel.basic_consume(queue=queue,
 
261
                                          no_ack=no_ack,
 
262
                                          callback=callback,
 
263
                                          consumer_tag=consumer_tag,
 
264
                                          nowait=nowait)
 
265
 
 
266
    def consume(self, limit=None):
 
267
        """Returns an iterator that waits for one message at a time."""
 
268
        for total_message_count in count():
 
269
            if limit and total_message_count >= limit:
 
270
                raise StopIteration
 
271
            self.channel.wait()
 
272
            yield True
 
273
 
 
274
    def cancel(self, consumer_tag):
 
275
        """Cancel a channel by consumer tag."""
 
276
        if not self.channel.connection:
 
277
            return
 
278
        self.channel.basic_cancel(consumer_tag)
 
279
 
 
280
    def close(self):
 
281
        """Close the channel if open."""
 
282
        if self._channel and self._channel.is_open:
 
283
            self._channel.close()
 
284
        self._channel_ref = None
 
285
 
 
286
    def ack(self, delivery_tag):
 
287
        """Acknowledge a message by delivery tag."""
 
288
        return self.channel.basic_ack(delivery_tag)
 
289
 
 
290
    def reject(self, delivery_tag):
 
291
        """Reject a message by deliver tag."""
 
292
        return self.channel.basic_reject(delivery_tag, requeue=False)
 
293
 
 
294
    def requeue(self, delivery_tag):
 
295
        """Reject and requeue a message by delivery tag."""
 
296
        return self.channel.basic_reject(delivery_tag, requeue=True)
 
297
 
 
298
    def prepare_message(self, message_data, delivery_mode, priority=None,
 
299
                content_type=None, content_encoding=None):
 
300
        """Encapsulate data into a AMQP message."""
 
301
        message = amqp.Message(message_data, priority=priority,
 
302
                               content_type=content_type,
 
303
                               content_encoding=content_encoding)
 
304
        message.properties["delivery_mode"] = delivery_mode
 
305
        return message
 
306
 
 
307
    def publish(self, message, exchange, routing_key, mandatory=None,
 
308
            immediate=None, headers=None):
 
309
        """Publish a message to a named exchange."""
 
310
 
 
311
        if headers:
 
312
            message.properties["headers"] = headers
 
313
 
 
314
        ret = self.channel.basic_publish(message, exchange=exchange,
 
315
                                         routing_key=routing_key,
 
316
                                         mandatory=mandatory,
 
317
                                         immediate=immediate)
 
318
        if mandatory or immediate:
 
319
            self.close()
 
320
 
 
321
    def qos(self, prefetch_size, prefetch_count, apply_global=False):
 
322
        """Request specific Quality of Service."""
 
323
        self.channel.basic_qos(prefetch_size, prefetch_count,
 
324
                                apply_global)
 
325
 
 
326
    def flow(self, active):
 
327
        """Enable/disable flow from peer."""
 
328
        self.channel.flow(active)