3
`amqplib`_ backend for carrot.
5
.. _`amqplib`: http://barryp.org/software/py-amqplib/
8
from amqplib import client_0_8 as amqp
9
from amqplib.client_0_8.exceptions import AMQPChannelException
10
from amqplib.client_0_8.serialization import AMQPReader, AMQPWriter
11
from carrot.backends.base import BaseMessage, BaseBackend
12
from itertools import count
19
class Connection(amqp.Connection):
21
def drain_events(self, allowed_methods=None):
22
"""Wait for an event on any channel."""
23
return self.wait_multi(self.channels.values())
25
def wait_multi(self, channels, allowed_methods=None):
26
"""Wait for an event on a channel."""
27
chanmap = dict((chan.channel_id, chan) for chan in channels)
28
chanid, method_sig, args, content = self._wait_multiple(
29
chanmap.keys(), allowed_methods)
31
channel = chanmap[chanid]
34
and channel.auto_decode \
35
and hasattr(content, 'content_encoding'):
37
content.body = content.body.decode(content.content_encoding)
41
amqp_method = channel._METHOD_MAP.get(method_sig, None)
43
if amqp_method is None:
44
raise Exception('Unknown AMQP method (%d, %d)' % method_sig)
47
return amqp_method(channel, args)
49
return amqp_method(channel, args, content)
51
def _wait_multiple(self, channel_ids, allowed_methods):
52
for channel_id in channel_ids:
53
method_queue = self.channels[channel_id].method_queue
54
for queued_method in method_queue:
55
method_sig = queued_method[0]
56
if (allowed_methods is None) \
57
or (method_sig in allowed_methods) \
58
or (method_sig == (20, 40)):
59
method_queue.remove(queued_method)
60
method_sig, args, content = queued_method
61
return channel_id, method_sig, args, content
63
# Nothing queued, need to wait for a method from the peer
65
channel, method_sig, args, content = \
66
self.method_reader.read_method()
68
if (channel in channel_ids) \
69
and ((allowed_methods is None) \
70
or (method_sig in allowed_methods) \
71
or (method_sig == (20, 40))):
72
return channel, method_sig, args, content
74
# Not the channel and/or method we were looking for. Queue
75
# this method for later
76
self.channels[channel].method_queue.append((method_sig,
81
# If we just queued up a method for channel 0 (the Connection
82
# itself) it's probably a close method in reaction to some
83
# error, so deal with it right away.
89
class QueueAlreadyExistsWarning(UserWarning):
90
"""A queue with that name already exists, so a recently changed
91
``routing_key`` or other settings might be ignored unless you
92
rename the queue or restart the broker."""
95
class Message(BaseMessage):
96
"""A message received by the broker.
98
Usually you don't insantiate message objects yourself, but receive
99
them using a :class:`carrot.messaging.Consumer`.
101
:param backend: see :attr:`backend`.
102
:param amqp_message: see :attr:`_amqp_message`.
109
.. attribute:: delivery_tag
111
The message delivery tag, uniquely identifying this message.
113
.. attribute:: backend
115
The message backend used.
116
A subclass of :class:`carrot.backends.base.BaseBackend`.
118
.. attribute:: _amqp_message
120
A :class:`amqplib.client_0_8.basic_message.Message` instance.
121
This is a private attribute and should not be accessed by
126
def __init__(self, backend, amqp_message, **kwargs):
127
self._amqp_message = amqp_message
128
self.backend = backend
130
for attr_name in ("body",
135
kwargs[attr_name] = getattr(amqp_message, attr_name, None)
137
super(Message, self).__init__(backend, **kwargs)
140
class Backend(BaseBackend):
143
:param connection: see :attr:`connection`.
146
.. attribute:: connection
148
A :class:`carrot.connection.BrokerConnection` instance. An established
149
connection to the broker.
152
default_port = DEFAULT_PORT
156
def __init__(self, connection, **kwargs):
157
self.connection = connection
158
self.default_port = kwargs.get("default_port", self.default_port)
159
self._channel_ref = None
163
return callable(self._channel_ref) and self._channel_ref()
167
"""If no channel exists, a new one is requested."""
168
if not self._channel:
169
self._channel_ref = weakref.ref(self.connection.get_channel())
172
def establish_connection(self):
173
"""Establish connection to the AMQP broker."""
174
conninfo = self.connection
175
if not conninfo.port:
176
conninfo.port = self.default_port
177
return Connection(host=conninfo.host,
178
userid=conninfo.userid,
179
password=conninfo.password,
180
virtual_host=conninfo.virtual_host,
181
insist=conninfo.insist,
183
connect_timeout=conninfo.connect_timeout)
185
def close_connection(self, connection):
186
"""Close the AMQP broker connection."""
189
def queue_exists(self, queue):
190
"""Check if a queue has been declared.
196
self.channel.queue_declare(queue=queue, passive=True)
197
except AMQPChannelException, e:
198
if e.amqp_reply_code == 404:
204
def queue_delete(self, queue, if_unused=False, if_empty=False):
205
"""Delete queue by name."""
206
return self.channel.queue_delete(queue, if_unused, if_empty)
208
def queue_purge(self, queue, **kwargs):
209
"""Discard all messages in the queue. This will delete the messages
210
and results in an empty queue."""
211
return self.channel.queue_purge(queue=queue)
213
def queue_declare(self, queue, durable, exclusive, auto_delete,
214
warn_if_exists=False):
215
"""Declare a named queue."""
217
if warn_if_exists and self.queue_exists(queue):
218
warnings.warn(QueueAlreadyExistsWarning(
219
QueueAlreadyExistsWarning.__doc__))
221
return self.channel.queue_declare(queue=queue,
224
auto_delete=auto_delete)
226
def exchange_declare(self, exchange, type, durable, auto_delete):
227
"""Declare an named exchange."""
228
return self.channel.exchange_declare(exchange=exchange,
231
auto_delete=auto_delete)
233
def queue_bind(self, queue, exchange, routing_key, arguments=None):
234
"""Bind queue to an exchange using a routing key."""
235
return self.channel.queue_bind(queue=queue,
237
routing_key=routing_key,
240
def message_to_python(self, raw_message):
241
"""Convert encoded message body back to a Python value."""
242
return self.Message(backend=self, amqp_message=raw_message)
244
def get(self, queue, no_ack=False):
245
"""Receive a message from a declared queue by name.
247
:returns: A :class:`Message` object if a message was received,
248
``None`` otherwise. If ``None`` was returned, it probably means
249
there was no messages waiting on the queue.
252
raw_message = self.channel.basic_get(queue, no_ack=no_ack)
255
return self.message_to_python(raw_message)
257
def declare_consumer(self, queue, no_ack, callback, consumer_tag,
259
"""Declare a consumer."""
260
return self.channel.basic_consume(queue=queue,
263
consumer_tag=consumer_tag,
266
def consume(self, limit=None):
267
"""Returns an iterator that waits for one message at a time."""
268
for total_message_count in count():
269
if limit and total_message_count >= limit:
274
def cancel(self, consumer_tag):
275
"""Cancel a channel by consumer tag."""
276
if not self.channel.connection:
278
self.channel.basic_cancel(consumer_tag)
281
"""Close the channel if open."""
282
if self._channel and self._channel.is_open:
283
self._channel.close()
284
self._channel_ref = None
286
def ack(self, delivery_tag):
287
"""Acknowledge a message by delivery tag."""
288
return self.channel.basic_ack(delivery_tag)
290
def reject(self, delivery_tag):
291
"""Reject a message by deliver tag."""
292
return self.channel.basic_reject(delivery_tag, requeue=False)
294
def requeue(self, delivery_tag):
295
"""Reject and requeue a message by delivery tag."""
296
return self.channel.basic_reject(delivery_tag, requeue=True)
298
def prepare_message(self, message_data, delivery_mode, priority=None,
299
content_type=None, content_encoding=None):
300
"""Encapsulate data into a AMQP message."""
301
message = amqp.Message(message_data, priority=priority,
302
content_type=content_type,
303
content_encoding=content_encoding)
304
message.properties["delivery_mode"] = delivery_mode
307
def publish(self, message, exchange, routing_key, mandatory=None,
308
immediate=None, headers=None):
309
"""Publish a message to a named exchange."""
312
message.properties["headers"] = headers
314
ret = self.channel.basic_publish(message, exchange=exchange,
315
routing_key=routing_key,
318
if mandatory or immediate:
321
def qos(self, prefetch_size, prefetch_count, apply_global=False):
322
"""Request specific Quality of Service."""
323
self.channel.basic_qos(prefetch_size, prefetch_count,
326
def flow(self, active):
327
"""Enable/disable flow from peer."""
328
self.channel.flow(active)