6
from carrot import serialization
8
ACKNOWLEDGED_STATES = frozenset(["ACK", "REJECTED", "REQUEUED"])
11
class MessageStateError(Exception):
12
"""The message has already been acknowledged."""
15
class BaseMessage(object):
16
"""Base class for received messages."""
19
MessageStateError = MessageStateError
21
def __init__(self, backend, **kwargs):
22
self.backend = backend
23
self.body = kwargs.get("body")
24
self.delivery_tag = kwargs.get("delivery_tag")
25
self.content_type = kwargs.get("content_type")
26
self.content_encoding = kwargs.get("content_encoding")
27
self.delivery_info = kwargs.get("delivery_info", {})
28
self._decoded_cache = None
29
self._state = "RECEIVED"
32
"""Deserialize the message body, returning the original
33
python structure sent by the publisher."""
34
return serialization.decode(self.body, self.content_type,
35
self.content_encoding)
39
"""The decoded message."""
40
if not self._decoded_cache:
41
self._decoded_cache = self.decode()
42
return self._decoded_cache
45
"""Acknowledge this message as being processed.,
46
This will remove the message from the queue.
48
:raises MessageStateError: If the message has already been
49
acknowledged/requeued/rejected.
53
raise self.MessageStateError(
54
"Message already acknowledged with state: %s" % self._state)
55
self.backend.ack(self.delivery_tag)
59
"""Reject this message.
61
The message will be discarded by the server.
63
:raises MessageStateError: If the message has already been
64
acknowledged/requeued/rejected.
68
raise self.MessageStateError(
69
"Message already acknowledged with state: %s" % self._state)
70
self.backend.reject(self.delivery_tag)
71
self._state = "REJECTED"
74
"""Reject this message and put it back on the queue.
76
You must not use this method as a means of selecting messages
79
:raises MessageStateError: If the message has already been
80
acknowledged/requeued/rejected.
84
raise self.MessageStateError(
85
"Message already acknowledged with state: %s" % self._state)
86
self.backend.requeue(self.delivery_tag)
87
self._state = "REQUEUED"
90
def acknowledged(self):
91
return self._state in ACKNOWLEDGED_STATES
94
class BaseBackend(object):
95
"""Base class for backends."""
99
def __init__(self, connection, **kwargs):
100
self.connection = connection
101
self.extra_options = kwargs.get("extra_options")
103
def queue_declare(self, *args, **kwargs):
104
"""Declare a queue by name."""
107
def queue_delete(self, *args, **kwargs):
108
"""Delete a queue by name."""
111
def exchange_declare(self, *args, **kwargs):
112
"""Declare an exchange by name."""
115
def queue_bind(self, *args, **kwargs):
116
"""Bind a queue to an exchange."""
119
def get(self, *args, **kwargs):
120
"""Pop a message off the queue."""
123
def declare_consumer(self, *args, **kwargs):
126
def consume(self, *args, **kwargs):
127
"""Iterate over the declared consumers."""
130
def cancel(self, *args, **kwargs):
131
"""Cancel the consumer."""
134
def ack(self, delivery_tag):
135
"""Acknowledge the message."""
138
def queue_purge(self, queue, **kwargs):
139
"""Discard all messages in the queue. This will delete the messages
140
and results in an empty queue."""
143
def reject(self, delivery_tag):
144
"""Reject the message."""
147
def requeue(self, delivery_tag):
148
"""Requeue the message."""
151
def purge(self, queue, **kwargs):
152
"""Discard all messages in the queue."""
155
def message_to_python(self, raw_message):
156
"""Convert received message body to a python datastructure."""
159
def prepare_message(self, message_data, delivery_mode, **kwargs):
160
"""Prepare message for sending."""
163
def publish(self, message, exchange, routing_key, **kwargs):
164
"""Publish a message."""
168
"""Close the backend."""
171
def establish_connection(self):
172
"""Establish a connection to the backend."""
175
def close_connection(self, connection):
176
"""Close the connection."""
179
def flow(self, active):
180
"""Enable/disable flow from peer."""
183
def qos(self, prefetch_size, prefetch_count, apply_global=False):
184
"""Request specific Quality of Service."""