~justin-fathomdb/nova/justinsb-openstack-api-volumes

« back to all changes in this revision

Viewing changes to vendor/carrot/backends/queue.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
"""
 
2
 
 
3
    Backend for unit-tests, using the Python :mod:`Queue` module.
 
4
 
 
5
"""
 
6
from Queue import Queue
 
7
from carrot.backends.base import BaseMessage, BaseBackend
 
8
import time
 
9
import itertools
 
10
 
 
11
mqueue = Queue()
 
12
 
 
13
 
 
14
class Message(BaseMessage):
 
15
    """Message received from the backend.
 
16
 
 
17
    See :class:`carrot.backends.base.BaseMessage`.
 
18
 
 
19
    """
 
20
 
 
21
 
 
22
class Backend(BaseBackend):
 
23
    """Backend using the Python :mod:`Queue` library. Usually only
 
24
    used while executing unit tests.
 
25
 
 
26
    Please not that this backend does not support queues, exchanges
 
27
    or routing keys, so *all messages will be sent to all consumers*.
 
28
 
 
29
    """
 
30
 
 
31
    Message = Message
 
32
 
 
33
    def get(self, *args, **kwargs):
 
34
        """Get the next waiting message from the queue.
 
35
 
 
36
        :returns: A :class:`Message` instance, or ``None`` if there is
 
37
            no messages waiting.
 
38
 
 
39
        """
 
40
        if not mqueue.qsize():
 
41
            return None
 
42
        message_data, content_type, content_encoding = mqueue.get()
 
43
        return self.Message(backend=self, body=message_data,
 
44
                       content_type=content_type,
 
45
                       content_encoding=content_encoding)
 
46
 
 
47
    def declare_consumer(self, queue, no_ack, callback, consumer_tag,
 
48
            nowait=False):
 
49
        """Declare a consumer."""
 
50
        self.callback = callback
 
51
 
 
52
    def consume(self, limit=None):
 
53
        """Go into consume mode."""
 
54
        for total_message_count in itertools.count():
 
55
            if limit and total_message_count >= limit:
 
56
                raise StopIteration
 
57
 
 
58
            message = self.get()
 
59
            if message:
 
60
                self.callback(message.decode(), message)
 
61
                yield True
 
62
            else:
 
63
                time.sleep(0.1)
 
64
 
 
65
    def purge(self, queue, **kwargs):
 
66
        """Discard all messages in the queue."""
 
67
        mqueue = Queue()
 
68
 
 
69
    def prepare_message(self, message_data, delivery_mode,
 
70
                        content_type, content_encoding, **kwargs):
 
71
        """Prepare message for sending."""
 
72
        return (message_data, content_type, content_encoding)
 
73
 
 
74
    def publish(self, message, exchange, routing_key, **kwargs):
 
75
        """Publish a message to the queue."""
 
76
        mqueue.put(message)