3
Backend for unit-tests, using the Python :mod:`Queue` module.
6
from Queue import Queue
7
from carrot.backends.base import BaseMessage, BaseBackend
14
class Message(BaseMessage):
15
"""Message received from the backend.
17
See :class:`carrot.backends.base.BaseMessage`.
22
class Backend(BaseBackend):
23
"""Backend using the Python :mod:`Queue` library. Usually only
24
used while executing unit tests.
26
Please not that this backend does not support queues, exchanges
27
or routing keys, so *all messages will be sent to all consumers*.
33
def get(self, *args, **kwargs):
34
"""Get the next waiting message from the queue.
36
:returns: A :class:`Message` instance, or ``None`` if there is
40
if not mqueue.qsize():
42
message_data, content_type, content_encoding = mqueue.get()
43
return self.Message(backend=self, body=message_data,
44
content_type=content_type,
45
content_encoding=content_encoding)
47
def declare_consumer(self, queue, no_ack, callback, consumer_tag,
49
"""Declare a consumer."""
50
self.callback = callback
52
def consume(self, limit=None):
53
"""Go into consume mode."""
54
for total_message_count in itertools.count():
55
if limit and total_message_count >= limit:
60
self.callback(message.decode(), message)
65
def purge(self, queue, **kwargs):
66
"""Discard all messages in the queue."""
69
def prepare_message(self, message_data, delivery_mode,
70
content_type, content_encoding, **kwargs):
71
"""Prepare message for sending."""
72
return (message_data, content_type, content_encoding)
74
def publish(self, message, exchange, routing_key, **kwargs):
75
"""Publish a message to the queue."""