8
from carrot.backends.base import BaseMessage, BaseBackend
13
class Message(BaseMessage):
15
def __init__(self, backend, amqp_message, **kwargs):
16
channel, method, header, body = amqp_message
17
self._channel = channel
20
self.backend = backend
22
kwargs.update({"body": body,
23
"delivery_tag": method.delivery_tag,
24
"content_type": header.content_type,
25
"content_encoding": header.content_encoding,
26
"delivery_info": dict(
27
consumer_tag=method.consumer_tag,
28
routing_key=method.routing_key,
29
delivery_tag=method.delivery_tag,
30
exchange=method.exchange)})
32
super(Message, self).__init__(backend, **kwargs)
35
class SyncBackend(BaseBackend):
36
default_port = DEFAULT_PORT
37
_connection_cls = pika.BlockingConnection
41
def __init__(self, connection, **kwargs):
42
self.connection = connection
43
self.default_port = kwargs.get("default_port", self.default_port)
44
self._channel_ref = None
48
return callable(self._channel_ref) and self._channel_ref()
52
"""If no channel exists, a new one is requested."""
54
self._channel_ref = weakref.ref(self.connection.get_channel())
57
def establish_connection(self):
58
"""Establish connection to the AMQP broker."""
59
conninfo = self.connection
61
conninfo.port = self.default_port
62
credentials = pika.PlainCredentials(conninfo.userid,
64
return self._connection_cls(pika.ConnectionParameters(
67
virtual_host=conninfo.virtual_host,
68
credentials=credentials))
70
def close_connection(self, connection):
71
"""Close the AMQP broker connection."""
74
def queue_exists(self, queue):
77
def queue_delete(self, queue, if_unused=False, if_empty=False):
78
"""Delete queue by name."""
79
return self.channel.queue_delete(queue=queue, if_unused=if_unused,
82
def queue_purge(self, queue, **kwargs):
83
"""Discard all messages in the queue. This will delete the messages
84
and results in an empty queue."""
85
return self.channel.queue_purge(queue=queue)
87
def queue_declare(self, queue, durable, exclusive, auto_delete,
88
warn_if_exists=False):
89
"""Declare a named queue."""
91
return self.channel.queue_declare(queue=queue,
94
auto_delete=auto_delete)
96
def exchange_declare(self, exchange, type, durable, auto_delete):
97
"""Declare an named exchange."""
98
return self.channel.exchange_declare(exchange=exchange,
101
auto_delete=auto_delete)
103
def queue_bind(self, queue, exchange, routing_key, arguments=None):
104
"""Bind queue to an exchange using a routing key."""
105
return self.channel.queue_bind(queue=queue,
107
routing_key=routing_key,
110
def message_to_python(self, raw_message):
111
"""Convert encoded message body back to a Python value."""
112
return self.Message(backend=self, amqp_message=raw_message)
114
def get(self, queue, no_ack=False):
115
"""Receive a message from a declared queue by name.
117
:returns: A :class:`Message` object if a message was received,
118
``None`` otherwise. If ``None`` was returned, it probably means
119
there was no messages waiting on the queue.
122
raw_message = self.channel.basic_get(queue, no_ack=no_ack)
125
return self.message_to_python(raw_message)
127
def declare_consumer(self, queue, no_ack, callback, consumer_tag,
129
"""Declare a consumer."""
131
@functools.wraps(callback)
132
def _callback_decode(channel, method, header, body):
133
return callback((channel, method, header, body))
135
return self.channel.basic_consume(_callback_decode,
138
consumer_tag=consumer_tag)
140
def consume(self, limit=None):
141
"""Returns an iterator that waits for one message at a time."""
142
for total_message_count in itertools.count():
143
if limit and total_message_count >= limit:
145
self.connection.connection.drain_events()
148
def cancel(self, consumer_tag):
149
"""Cancel a channel by consumer tag."""
150
if not self._channel:
152
self.channel.basic_cancel(consumer_tag)
155
"""Close the channel if open."""
156
if self._channel and not self._channel.handler.channel_close:
157
self._channel.close()
158
self._channel_ref = None
160
def ack(self, delivery_tag):
161
"""Acknowledge a message by delivery tag."""
162
return self.channel.basic_ack(delivery_tag)
164
def reject(self, delivery_tag):
165
"""Reject a message by deliver tag."""
166
return self.channel.basic_reject(delivery_tag, requeue=False)
168
def requeue(self, delivery_tag):
169
"""Reject and requeue a message by delivery tag."""
170
return self.channel.basic_reject(delivery_tag, requeue=True)
172
def prepare_message(self, message_data, delivery_mode, priority=None,
173
content_type=None, content_encoding=None):
174
"""Encapsulate data into a AMQP message."""
175
properties = pika.BasicProperties(priority=priority,
176
content_type=content_type,
177
content_encoding=content_encoding,
178
delivery_mode=delivery_mode)
179
return message_data, properties
181
def publish(self, message, exchange, routing_key, mandatory=None,
182
immediate=None, headers=None):
183
"""Publish a message to a named exchange."""
184
body, properties = message
187
properties.headers = headers
189
ret = self.channel.basic_publish(body=body,
190
properties=properties,
192
routing_key=routing_key,
195
if mandatory or immediate:
198
def qos(self, prefetch_size, prefetch_count, apply_global=False):
199
"""Request specific Quality of Service."""
200
self.channel.basic_qos(prefetch_size, prefetch_count,
203
def flow(self, active):
204
"""Enable/disable flow from peer."""
205
self.channel.flow(active)
208
class AsyncoreBackend(SyncBackend):
209
_connection_cls = pika.AsyncoreConnection