1
from stompy import Client
2
from stompy import Empty as QueueEmpty
3
from carrot.backends.base import BaseMessage, BaseBackend
4
from itertools import count
10
class Message(BaseMessage):
11
"""A message received by the STOMP broker.
13
Usually you don't insantiate message objects yourself, but receive
14
them using a :class:`carrot.messaging.Consumer`.
16
:param backend: see :attr:`backend`.
17
:param frame: see :attr:`_frame`.
23
.. attribute:: delivery_tag
25
The message delivery tag, uniquely identifying this message.
27
.. attribute:: backend
29
The message backend used.
30
A subclass of :class:`carrot.backends.base.BaseBackend`.
34
The frame received by the STOMP client. This is considered a private
35
variable and should never be used in production code.
39
def __init__(self, backend, frame, **kwargs):
41
self.backend = backend
43
kwargs["body"] = frame.body
44
kwargs["delivery_tag"] = frame.headers["message-id"]
45
kwargs["content_type"] = frame.headers.get("content-type")
46
kwargs["content_encoding"] = frame.headers.get("content-encoding")
47
kwargs["priority"] = frame.headers.get("priority")
49
super(Message, self).__init__(backend, **kwargs)
52
"""Acknowledge this message as being processed.,
53
This will remove the message from the queue.
55
:raises MessageStateError: If the message has already been
56
acknowledged/requeued/rejected.
60
raise self.MessageStateError(
61
"Message already acknowledged with state: %s" % self._state)
62
self.backend.ack(self._frame)
66
raise NotImplementedError(
67
"The STOMP backend does not implement basic.reject")
70
raise NotImplementedError(
71
"The STOMP backend does not implement requeue")
74
class Backend(BaseBackend):
77
default_port = DEFAULT_PORT
79
def __init__(self, connection, **kwargs):
80
self.connection = connection
81
self.default_port = kwargs.get("default_port", self.default_port)
83
self._consumers = {} # open consumers by consumer tag
86
def establish_connection(self):
87
conninfo = self.connection
89
conninfo.port = self.default_port
90
stomp = self.Stomp(conninfo.hostname, conninfo.port)
94
def close_connection(self, connection):
96
connection.disconnect()
100
def queue_exists(self, queue):
103
def queue_purge(self, queue, **kwargs):
104
for purge_count in count(0):
106
frame = self.channel.get_nowait()
110
self.channel.ack(frame)
112
def declare_consumer(self, queue, no_ack, callback, consumer_tag,
114
ack = no_ack and "auto" or "client"
115
self.channel.subscribe(queue, ack=ack)
116
self._consumers[consumer_tag] = queue
117
self._callbacks[queue] = callback
119
def consume(self, limit=None):
120
"""Returns an iterator that waits for one message at a time."""
121
for total_message_count in count():
122
if limit and total_message_count >= limit:
125
frame = self.channel.get()
128
queue = frame.headers.get("destination")
130
if not queue or queue not in self._callbacks:
133
self._callbacks[queue](frame)
137
def queue_declare(self, queue, *args, **kwargs):
138
self.channel.subscribe(queue, ack="client")
140
def get(self, queue, no_ack=False):
142
frame = self.channel.get_nowait()
146
return self.message_to_python(frame)
148
def ack(self, frame):
149
self.channel.ack(frame)
151
def message_to_python(self, raw_message):
152
"""Convert encoded message body back to a Python value."""
153
return self.Message(backend=self, frame=raw_message)
155
def prepare_message(self, message_data, delivery_mode, priority=0,
156
content_type=None, content_encoding=None):
158
if delivery_mode == 2:
160
priority = priority or 0
161
return {"body": message_data,
162
"persistent": persistent,
163
"priority": priority,
164
"content-encoding": content_encoding,
165
"content-type": content_type}
167
def publish(self, message, exchange, routing_key, **kwargs):
168
message["destination"] = exchange
169
self.channel.stomp.send(message)
171
def cancel(self, consumer_tag):
172
if not self._channel or consumer_tag not in self._consumers:
174
queue = self._consumers.pop(consumer_tag)
175
self.channel.unsubscribe(queue)
178
for consumer_tag in self._consumers.keys():
179
self.cancel(consumer_tag)
182
self._channel.disconnect()
188
if not self._channel:
189
# Sorry, but the python-stomp library needs one connection
191
self._channel = self.establish_connection()